flow

package
v0.1.118 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2026 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CollectAsState

func CollectAsState[T any](c api.Composer, key string, flow Flow[T], initial T, options ...state.StateTypedOption[T]) state.ValueTyped[T]

CollectAsState collects values from a Flow and represents it as a State. The initial value is used until the first value is emitted by the flow.

func CollectStateFlowAsState

func CollectStateFlowAsState[T any](c api.Composer, key string, flow StateFlow[T], options ...state.StateTypedOption[T]) state.ValueTyped[T]

CollectStateFlowAsState collects values from a StateFlow. It uses `flow.Value()` as the initial value.

func First

func First[T any](ctx context.Context, flow Flow[T]) (T, error)

First returns the first value emitted by the flow and then stops.

func ToList

func ToList[T any](ctx context.Context, flow Flow[T]) ([]T, error)

ToList collects all values from a flow and returns them as a slice.

Types

type BufferOverflow added in v0.1.74

type BufferOverflow int

BufferOverflow represents the strategy to handle buffer overflow in SharedFlow.

const (
	// BufferOverflowSuspend indicates that the sender should suspend on buffer overflow.
	BufferOverflowSuspend BufferOverflow = iota
	// BufferOverflowDropOldest indicates that the sender should drop the oldest value on buffer overflow.
	BufferOverflowDropOldest
	// BufferOverflowDropLatest indicates that the sender should drop the latest value on buffer overflow.
	BufferOverflowDropLatest
)

type Flow

type Flow[T any] interface {
	Collect(ctx context.Context, collector func(T)) error
}

Flow represents a cold stream of data.

func Combine

func Combine[T1, T2, R any](
	ctx context.Context,
	flow1 Flow[T1],
	flow2 Flow[T2],
	transform func(T1, T2) R,
) Flow[R]

func Map

func Map[T, R any](upstream Flow[T], transform func(T) R) Flow[R]

func NewFlow

func NewFlow[T any](block func(ctx context.Context, emit func(T)) error) Flow[T]

NewFlow creates a cold flow. The 'block' isn't executed until Collect is called.

func Zip

func Zip[T1, T2, R any](
	ctx context.Context,
	flow1 Flow[T1],
	flow2 Flow[T2],
	transform func(T1, T2) R,
) Flow[R]

type MutableSharedFlow added in v0.1.74

type MutableSharedFlow[T any] interface {
	SharedFlow[T]
	// Emit emits a value to the shared flow.
	Emit(value T)
	// TryEmit attempts to emit a value without suspending. Returns true if successful.
	TryEmit(value T) bool
	// SubscriptionCount returns a StateFlow that tracks the number of active subscribers.
	SubscriptionCount() StateFlow[int]
	// ResetReplayCache resets the replay cache to an empty state.
	ResetReplayCache()
}

MutableSharedFlow is a SharedFlow that can also emit values.

func NewMutableSharedFlow added in v0.1.74

func NewMutableSharedFlow[T any](
	replay int,
	extraBufferCapacity int,
	onBufferOverflow BufferOverflow,
) MutableSharedFlow[T]

NewMutableSharedFlow creates a new MutableSharedFlow.

type MutableStateFlow

type MutableStateFlow[T any] struct {
	// contains filtered or unexported fields
}

MutableStateFlow is the hot, stateful producer. It matches Kotlin's MutableStateFlow semantics: - Equality-based conflation: updates with equal values are ignored - Thread-safe atomic operations - CAS-based update methods

func NewMutableStateFlow

func NewMutableStateFlow[T any](initial T, opts ...StateFlowOption[T]) *MutableStateFlow[T]

NewMutableStateFlow creates a new MutableStateFlow with the given initial value. Options can be used to customize behavior (e.g., WithPolicy for custom equality).

func (*MutableStateFlow[T]) AsStateFlow

func (s *MutableStateFlow[T]) AsStateFlow() StateFlow[T]

func (*MutableStateFlow[T]) Collect

func (s *MutableStateFlow[T]) Collect(ctx context.Context, collector func(T)) error

Collect follows the Kotlin pattern: it blocks until the context is cancelled

func (*MutableStateFlow[T]) CompareAndSet added in v0.1.73

func (s *MutableStateFlow[T]) CompareAndSet(expect, update T) bool

CompareAndSet atomically compares the current value with expect and sets it to update if equal. Returns true if the value was set to update, false otherwise. If both expect and update equal the current value, returns true but does not notify subscribers. This matches Kotlin's MutableStateFlow.compareAndSet semantics.

func (*MutableStateFlow[T]) Emit

func (s *MutableStateFlow[T]) Emit(value T)

Emit updates the value and notifies all collectors and state subscribers. If the new value equals the current value (using the comparator), no notification occurs. This matches Kotlin's equality-based conflation behavior.

func (*MutableStateFlow[T]) GetAndUpdate added in v0.1.73

func (s *MutableStateFlow[T]) GetAndUpdate(f func(current T) T) T

GetAndUpdate atomically updates the value and returns the previous value. Uses a CAS loop internally, so the function may be called multiple times. This matches Kotlin's MutableStateFlow.getAndUpdate extension function.

func (*MutableStateFlow[T]) ReplayCache added in v0.1.74

func (s *MutableStateFlow[T]) ReplayCache() []T

ReplayCache returns the current value in a slice (replay=1).

func (*MutableStateFlow[T]) ResetReplayCache added in v0.1.74

func (s *MutableStateFlow[T]) ResetReplayCache()

ResetReplayCache is not supported for StateFlow as it must always have a value.

func (*MutableStateFlow[T]) Subscribe added in v0.1.68

func (s *MutableStateFlow[T]) Subscribe(callback func()) state.Subscription

Subscribe registers a callback to be invoked when the flow's value changes. This implements state.StateChangeNotifier, enabling MutableStateFlow to be used as a dependency for DerivedState.

func (*MutableStateFlow[T]) SubscriptionCount added in v0.1.74

func (s *MutableStateFlow[T]) SubscriptionCount() StateFlow[int]

SubscriptionCount returns a StateFlow that tracks the number of active subscribers.

func (*MutableStateFlow[T]) TryEmit added in v0.1.74

func (s *MutableStateFlow[T]) TryEmit(value T) bool

TryEmit attempts to emit a value. For StateFlow, this always succeeds (with conflation).

func (*MutableStateFlow[T]) Update

func (s *MutableStateFlow[T]) Update(f func(current T) T)

Update atomically updates the value using the given function. Uses a CAS loop internally, so the function may be called multiple times if there are concurrent modifications. This matches Kotlin's MutableStateFlow.update extension function.

func (*MutableStateFlow[T]) UpdateAndGet added in v0.1.73

func (s *MutableStateFlow[T]) UpdateAndGet(f func(current T) T) T

UpdateAndGet atomically updates the value and returns the new value. Uses a CAS loop internally, so the function may be called multiple times. This matches Kotlin's MutableStateFlow.updateAndGet extension function.

func (*MutableStateFlow[T]) Value

func (s *MutableStateFlow[T]) Value() T

Value returns the current state (Thread-safe). Also notifies the read observer to enable derived state dependency tracking.

type SharedFlow added in v0.1.74

type SharedFlow[T any] interface {
	Flow[T]
	// ReplayCache returns the current replay cache.
	ReplayCache() []T
}

SharedFlow is a hot flow that shares emitted values among all collectors in a broadcast fashion.

func ShareIn added in v0.1.74

func ShareIn[T any](
	ctx context.Context,
	upstream Flow[T],
	started SharingStarted,
	replay int,
) SharedFlow[T]

ShareIn transforms a cold Flow into a hot SharedFlow.

type SharingCommand added in v0.1.74

type SharingCommand int

SharingCommand controls the upstream flow in shareIn/stateIn.

const (
	// SharingCommandStart starts the upstream flow.
	SharingCommandStart SharingCommand = iota
	// SharingCommandStop stops the upstream flow.
	SharingCommandStop
	// SharingCommandStopAndResetReplayCache stops the upstream flow and resets the replay cache.
	SharingCommandStopAndResetReplayCache
)

type SharingStarted added in v0.1.74

type SharingStarted interface {
	Command(subscriptionCount StateFlow[int]) Flow[SharingCommand]
}

SharingStarted controls when the sharing is started and stopped.

func SharingStartedEagerly added in v0.1.74

func SharingStartedEagerly() SharingStarted

Eagerly starts sharing immediately and never stops.

func SharingStartedLazily added in v0.1.74

func SharingStartedLazily() SharingStarted

Lazily starts sharing when the first subscriber appears and never stops.

func SharingStartedWhileSubscribed added in v0.1.74

func SharingStartedWhileSubscribed(stopTimeout time.Duration, replayExpiration time.Duration) SharingStarted

WhileSubscribed starts sharing when the first subscriber appears and stops when the last subscriber disappears. You can configure a grace period for stopping and a duration for keeping the replay cache.

type StateFlow

type StateFlow[T any] interface {
	SharedFlow[T]
	Value() T
}

StateFlow defines the read-only behavior

func StateIn added in v0.1.74

func StateIn[T any](
	ctx context.Context,
	upstream Flow[T],
	started SharingStarted,
	initialValue T,
) StateFlow[T]

StateIn transforms a flow into a StateFlow.

type StateFlowOption added in v0.1.73

type StateFlowOption[T any] func(*stateFlowConfig[T])

StateFlowOption configures a MutableStateFlow

func WithPolicy added in v0.1.73

func WithPolicy[T any](policy state.MutationPolicy[T]) StateFlowOption[T]

WithPolicy sets a custom mutation policy for the StateFlow. If not set, StructuralEqualityPolicy is used by default.

func WithoutSubscriptionCount added in v0.1.74

func WithoutSubscriptionCount[T any]() StateFlowOption[T]

WithoutSubscriptionCount disables internal subscription counting for this flow. This is primarily used to prevent infinite recursion when creating the subscription count flow itself.

Directories

Path Synopsis
demo
stateflow command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL