flow

package
v0.1.73 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2026 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CollectAsState

func CollectAsState[T any](c api.Composer, key string, flow Flow[T], initial T, options ...state.StateTypedOption[T]) state.ValueTyped[T]

CollectAsState collects values from a Flow and represents it as a State. The initial value is used until the first value is emitted by the flow.

func CollectStateFlowAsState

func CollectStateFlowAsState[T any](c api.Composer, key string, flow StateFlow[T], options ...state.StateTypedOption[T]) state.ValueTyped[T]

CollectStateFlowAsState collects values from a StateFlow. It uses `flow.Value()` as the initial value.

func First

func First[T any](ctx context.Context, flow Flow[T]) (T, error)

First returns the first value emitted by the flow and then stops.

func ToList

func ToList[T any](ctx context.Context, flow Flow[T]) ([]T, error)

ToList collects all values from a flow and returns them as a slice.

Types

type Flow

type Flow[T any] interface {
	Collect(ctx context.Context, collector func(T)) error
}

Flow represents a cold stream of data.

func Combine

func Combine[T1, T2, R any](
	ctx context.Context,
	flow1 Flow[T1],
	flow2 Flow[T2],
	transform func(T1, T2) R,
) Flow[R]

func Map

func Map[T, R any](upstream Flow[T], transform func(T) R) Flow[R]

func NewFlow

func NewFlow[T any](block func(ctx context.Context, emit func(T)) error) Flow[T]

NewFlow creates a cold flow. The 'block' isn't executed until Collect is called.

func Zip

func Zip[T1, T2, R any](
	ctx context.Context,
	flow1 Flow[T1],
	flow2 Flow[T2],
	transform func(T1, T2) R,
) Flow[R]

type MutableStateFlow

type MutableStateFlow[T any] struct {
	// contains filtered or unexported fields
}

MutableStateFlow is the hot, stateful producer. It matches Kotlin's MutableStateFlow semantics: - Equality-based conflation: updates with equal values are ignored - Thread-safe atomic operations - CAS-based update methods

func NewMutableStateFlow

func NewMutableStateFlow[T any](initial T, opts ...StateFlowOption[T]) *MutableStateFlow[T]

NewMutableStateFlow creates a new MutableStateFlow with the given initial value. Options can be used to customize behavior (e.g., WithPolicy for custom equality).

func (*MutableStateFlow[T]) AsStateFlow

func (s *MutableStateFlow[T]) AsStateFlow() StateFlow[T]

func (*MutableStateFlow[T]) Collect

func (s *MutableStateFlow[T]) Collect(ctx context.Context, collector func(T)) error

Collect follows the Kotlin pattern: it blocks until the context is cancelled

func (*MutableStateFlow[T]) CompareAndSet added in v0.1.73

func (s *MutableStateFlow[T]) CompareAndSet(expect, update T) bool

CompareAndSet atomically compares the current value with expect and sets it to update if equal. Returns true if the value was set to update, false otherwise. If both expect and update equal the current value, returns true but does not notify subscribers. This matches Kotlin's MutableStateFlow.compareAndSet semantics.

func (*MutableStateFlow[T]) Emit

func (s *MutableStateFlow[T]) Emit(value T)

Emit updates the value and notifies all collectors and state subscribers. If the new value equals the current value (using the comparator), no notification occurs. This matches Kotlin's equality-based conflation behavior.

func (*MutableStateFlow[T]) GetAndUpdate added in v0.1.73

func (s *MutableStateFlow[T]) GetAndUpdate(f func(current T) T) T

GetAndUpdate atomically updates the value and returns the previous value. Uses a CAS loop internally, so the function may be called multiple times. This matches Kotlin's MutableStateFlow.getAndUpdate extension function.

func (*MutableStateFlow[T]) Subscribe added in v0.1.68

func (s *MutableStateFlow[T]) Subscribe(callback func()) state.Subscription

Subscribe registers a callback to be invoked when the flow's value changes. This implements state.StateChangeNotifier, enabling MutableStateFlow to be used as a dependency for DerivedState.

func (*MutableStateFlow[T]) Update

func (s *MutableStateFlow[T]) Update(f func(current T) T)

Update atomically updates the value using the given function. Uses a CAS loop internally, so the function may be called multiple times if there are concurrent modifications. This matches Kotlin's MutableStateFlow.update extension function.

func (*MutableStateFlow[T]) UpdateAndGet added in v0.1.73

func (s *MutableStateFlow[T]) UpdateAndGet(f func(current T) T) T

UpdateAndGet atomically updates the value and returns the new value. Uses a CAS loop internally, so the function may be called multiple times. This matches Kotlin's MutableStateFlow.updateAndGet extension function.

func (*MutableStateFlow[T]) Value

func (s *MutableStateFlow[T]) Value() T

Value returns the current state (Thread-safe). Also notifies the read observer to enable derived state dependency tracking.

type StateFlow

type StateFlow[T any] interface {
	Value() T
	Flow[T]
}

StateFlow defines the read-only behavior

type StateFlowOption added in v0.1.73

type StateFlowOption[T any] func(*stateFlowConfig[T])

StateFlowOption configures a MutableStateFlow

func WithPolicy added in v0.1.73

func WithPolicy[T any](policy state.MutationPolicy[T]) StateFlowOption[T]

WithPolicy sets a custom mutation policy for the StateFlow. If not set, StructuralEqualityPolicy is used by default.

Directories

Path Synopsis
demo
stateflow command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL