Documentation
¶
Index ¶
- func CollectAsState[T any](c api.Composer, key string, flow Flow[T], initial T, ...) state.ValueTyped[T]
- func CollectStateFlowAsState[T any](c api.Composer, key string, flow StateFlow[T], ...) state.ValueTyped[T]
- func First[T any](ctx context.Context, flow Flow[T]) (T, error)
- func ToList[T any](ctx context.Context, flow Flow[T]) ([]T, error)
- type BufferOverflow
- type Flow
- func Combine[T1, T2, R any](ctx context.Context, flow1 Flow[T1], flow2 Flow[T2], transform func(T1, T2) R) Flow[R]
- func Map[T, R any](upstream Flow[T], transform func(T) R) Flow[R]
- func NewFlow[T any](block func(ctx context.Context, emit func(T)) error) Flow[T]
- func Zip[T1, T2, R any](ctx context.Context, flow1 Flow[T1], flow2 Flow[T2], transform func(T1, T2) R) Flow[R]
- type MutableSharedFlow
- type MutableStateFlow
- func (s *MutableStateFlow[T]) AsStateFlow() StateFlow[T]
- func (s *MutableStateFlow[T]) Collect(ctx context.Context, collector func(T)) error
- func (s *MutableStateFlow[T]) CompareAndSet(expect, update T) bool
- func (s *MutableStateFlow[T]) Emit(value T)
- func (s *MutableStateFlow[T]) GetAndUpdate(f func(current T) T) T
- func (s *MutableStateFlow[T]) ReplayCache() []T
- func (s *MutableStateFlow[T]) ResetReplayCache()
- func (s *MutableStateFlow[T]) Subscribe(callback func()) state.Subscription
- func (s *MutableStateFlow[T]) SubscriptionCount() StateFlow[int]
- func (s *MutableStateFlow[T]) TryEmit(value T) bool
- func (s *MutableStateFlow[T]) Update(f func(current T) T)
- func (s *MutableStateFlow[T]) UpdateAndGet(f func(current T) T) T
- func (s *MutableStateFlow[T]) Value() T
- type SharedFlow
- type SharingCommand
- type SharingStarted
- type StateFlow
- type StateFlowOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CollectAsState ¶
func CollectAsState[T any](c api.Composer, key string, flow Flow[T], initial T, options ...state.StateTypedOption[T]) state.ValueTyped[T]
CollectAsState collects values from a Flow and represents it as a State. The initial value is used until the first value is emitted by the flow.
func CollectStateFlowAsState ¶
func CollectStateFlowAsState[T any](c api.Composer, key string, flow StateFlow[T], options ...state.StateTypedOption[T]) state.ValueTyped[T]
CollectStateFlowAsState collects values from a StateFlow. It uses `flow.Value()` as the initial value.
Types ¶
type BufferOverflow ¶ added in v0.1.74
type BufferOverflow int
BufferOverflow represents the strategy to handle buffer overflow in SharedFlow.
const ( // BufferOverflowSuspend indicates that the sender should suspend on buffer overflow. BufferOverflowSuspend BufferOverflow = iota // BufferOverflowDropOldest indicates that the sender should drop the oldest value on buffer overflow. BufferOverflowDropOldest // BufferOverflowDropLatest indicates that the sender should drop the latest value on buffer overflow. BufferOverflowDropLatest )
type Flow ¶
Flow represents a cold stream of data.
type MutableSharedFlow ¶ added in v0.1.74
type MutableSharedFlow[T any] interface { SharedFlow[T] Emit(value T) TryEmit(value T) bool SubscriptionCount() StateFlow[int] ResetReplayCache() }
MutableSharedFlow is a SharedFlow that can also emit values.
func NewMutableSharedFlow ¶ added in v0.1.74
func NewMutableSharedFlow[T any]( replay int, extraBufferCapacity int, onBufferOverflow BufferOverflow, ) MutableSharedFlow[T]
NewMutableSharedFlow creates a new MutableSharedFlow.
type MutableStateFlow ¶
type MutableStateFlow[T any] struct { // contains filtered or unexported fields }
MutableStateFlow is the hot, stateful producer. It matches Kotlin's MutableStateFlow semantics: - Equality-based conflation: updates with equal values are ignored - Thread-safe atomic operations - CAS-based update methods
func NewMutableStateFlow ¶
func NewMutableStateFlow[T any](initial T, opts ...StateFlowOption[T]) *MutableStateFlow[T]
NewMutableStateFlow creates a new MutableStateFlow with the given initial value. Options can be used to customize behavior (e.g., WithPolicy for custom equality).
func (*MutableStateFlow[T]) AsStateFlow ¶
func (s *MutableStateFlow[T]) AsStateFlow() StateFlow[T]
func (*MutableStateFlow[T]) Collect ¶
func (s *MutableStateFlow[T]) Collect(ctx context.Context, collector func(T)) error
Collect follows the Kotlin pattern: it blocks until the context is cancelled
func (*MutableStateFlow[T]) CompareAndSet ¶ added in v0.1.73
func (s *MutableStateFlow[T]) CompareAndSet(expect, update T) bool
CompareAndSet atomically compares the current value with expect and sets it to update if equal. Returns true if the value was set to update, false otherwise. If both expect and update equal the current value, returns true but does not notify subscribers. This matches Kotlin's MutableStateFlow.compareAndSet semantics.
func (*MutableStateFlow[T]) Emit ¶
func (s *MutableStateFlow[T]) Emit(value T)
Emit updates the value and notifies all collectors and state subscribers. If the new value equals the current value (using the comparator), no notification occurs. This matches Kotlin's equality-based conflation behavior.
func (*MutableStateFlow[T]) GetAndUpdate ¶ added in v0.1.73
func (s *MutableStateFlow[T]) GetAndUpdate(f func(current T) T) T
GetAndUpdate atomically updates the value and returns the previous value. Uses a CAS loop internally, so the function may be called multiple times. This matches Kotlin's MutableStateFlow.getAndUpdate extension function.
func (*MutableStateFlow[T]) ReplayCache ¶ added in v0.1.74
func (s *MutableStateFlow[T]) ReplayCache() []T
ReplayCache returns the current value in a slice (replay=1).
func (*MutableStateFlow[T]) ResetReplayCache ¶ added in v0.1.74
func (s *MutableStateFlow[T]) ResetReplayCache()
ResetReplayCache is not supported for StateFlow as it must always have a value.
func (*MutableStateFlow[T]) Subscribe ¶ added in v0.1.68
func (s *MutableStateFlow[T]) Subscribe(callback func()) state.Subscription
Subscribe registers a callback to be invoked when the flow's value changes. This implements state.StateChangeNotifier, enabling MutableStateFlow to be used as a dependency for DerivedState.
func (*MutableStateFlow[T]) SubscriptionCount ¶ added in v0.1.74
func (s *MutableStateFlow[T]) SubscriptionCount() StateFlow[int]
SubscriptionCount returns a StateFlow that tracks the number of active subscribers.
func (*MutableStateFlow[T]) TryEmit ¶ added in v0.1.74
func (s *MutableStateFlow[T]) TryEmit(value T) bool
TryEmit attempts to emit a value. For StateFlow, this always succeeds (with conflation).
func (*MutableStateFlow[T]) Update ¶
func (s *MutableStateFlow[T]) Update(f func(current T) T)
Update atomically updates the value using the given function. Uses a CAS loop internally, so the function may be called multiple times if there are concurrent modifications. This matches Kotlin's MutableStateFlow.update extension function.
func (*MutableStateFlow[T]) UpdateAndGet ¶ added in v0.1.73
func (s *MutableStateFlow[T]) UpdateAndGet(f func(current T) T) T
UpdateAndGet atomically updates the value and returns the new value. Uses a CAS loop internally, so the function may be called multiple times. This matches Kotlin's MutableStateFlow.updateAndGet extension function.
func (*MutableStateFlow[T]) Value ¶
func (s *MutableStateFlow[T]) Value() T
Value returns the current state (Thread-safe). Also notifies the read observer to enable derived state dependency tracking.
type SharedFlow ¶ added in v0.1.74
SharedFlow is a hot flow that shares emitted values among all collectors in a broadcast fashion.
func ShareIn ¶ added in v0.1.74
func ShareIn[T any]( ctx context.Context, upstream Flow[T], started SharingStarted, replay int, ) SharedFlow[T]
ShareIn transforms a cold Flow into a hot SharedFlow.
type SharingCommand ¶ added in v0.1.74
type SharingCommand int
SharingCommand controls the upstream flow in shareIn/stateIn.
const ( // SharingCommandStart starts the upstream flow. SharingCommandStart SharingCommand = iota // SharingCommandStop stops the upstream flow. SharingCommandStop // SharingCommandStopAndResetReplayCache stops the upstream flow and resets the replay cache. SharingCommandStopAndResetReplayCache )
type SharingStarted ¶ added in v0.1.74
type SharingStarted interface {
Command(subscriptionCount StateFlow[int]) Flow[SharingCommand]
}
SharingStarted controls when the sharing is started and stopped.
func SharingStartedEagerly ¶ added in v0.1.74
func SharingStartedEagerly() SharingStarted
Eagerly starts sharing immediately and never stops.
func SharingStartedLazily ¶ added in v0.1.74
func SharingStartedLazily() SharingStarted
Lazily starts sharing when the first subscriber appears and never stops.
func SharingStartedWhileSubscribed ¶ added in v0.1.74
func SharingStartedWhileSubscribed(stopTimeout time.Duration, replayExpiration time.Duration) SharingStarted
WhileSubscribed starts sharing when the first subscriber appears and stops when the last subscriber disappears. You can configure a grace period for stopping and a duration for keeping the replay cache.
type StateFlow ¶
type StateFlow[T any] interface { SharedFlow[T] Value() T }
StateFlow defines the read-only behavior
type StateFlowOption ¶ added in v0.1.73
type StateFlowOption[T any] func(*stateFlowConfig[T])
StateFlowOption configures a MutableStateFlow
func WithPolicy ¶ added in v0.1.73
func WithPolicy[T any](policy state.MutationPolicy[T]) StateFlowOption[T]
WithPolicy sets a custom mutation policy for the StateFlow. If not set, StructuralEqualityPolicy is used by default.
func WithoutSubscriptionCount ¶ added in v0.1.74
func WithoutSubscriptionCount[T any]() StateFlowOption[T]
WithoutSubscriptionCount disables internal subscription counting for this flow. This is primarily used to prevent infinite recursion when creating the subscription count flow itself.