pubsub

package
v0.14.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2026 License: Apache-2.0 Imports: 18 Imported by: 7

Documentation

Overview

Package pubsub provides a message broker for one-to-many or many-to-many message distribution. In addition pubsub includes a generic deque and queue implementations suited to concurrent use.

Index

Constants

View Source
const (
	// ErrQueueFull is returned by the Add method of a queue when the queue has
	// reached its hard capacity limit.
	ErrQueueFull = ers.Error("queue is full")

	// ErrQueueNoCredit is returned by the Add method of a queue when the queue has
	// exceeded its soft quota and there is insufficient burst credit.
	ErrQueueNoCredit = ers.Error("insufficient burst credit")

	// ErrQueueClosed is returned by the Add method of a closed queue, and by
	// the Wait method of a closed empty queue.
	ErrQueueClosed = ers.ErrContainerClosed
	// ErrQueueDraining is returned by Add methods when the queue is in a draining state before all the work has completed,
	// when the Add method will begin returning ErrQueueClosed.
	ErrQueueDraining = ers.Error("the queue is shutting down")
)

Variables

This section is empty.

Functions

func Convert added in v0.14.0

func Convert[T, O any](op fnx.Converter[T, O]) interface {
	Stream(*Stream[T]) *Stream[O]
	Parallel(*Stream[T], ...opt.Provider[*wpa.WorkerGroupConf]) *Stream[O]
}

Convert takes an input stream of one type, and returns a function which takes an fnx.Converter function that returns the output stream. and converts it to a stream of the another type. All errors from the original stream are propagated to the output stream.

func ConvertFn added in v0.14.0

func ConvertFn[T any, O any](op fn.Converter[T, O]) interface {
	Stream(*Stream[T]) *Stream[O]
	Parallel(*Stream[T], ...opt.Provider[*wpa.WorkerGroupConf]) *Stream[O]
}

ConvertFn simplifies calls to pubsub.Convert for fn.Convert types.

func RateLimit added in v0.14.0

func RateLimit[T any](ctx context.Context, seq iter.Seq[T], num int, window time.Duration) iter.Seq[T]

RateLimit wraps a iterator with a rate-limiter to ensure that the output iterator will produce no more than <num> items in any given <window>.

Types

type Broker

type Broker[T any] struct {
	// contains filtered or unexported fields
}

Broker is a simple message broker that provides a useable interface for distributing messages to an arbitrary group of channels.

func NewBroker

func NewBroker[T any](ctx context.Context, opts BrokerOptions) *Broker[T]

NewBroker constructs with a simple distrubtion scheme: the incoming and outgoing messages are not buffered, but the client subscription channels are not buffered.

All brokers respect the BrokerOptions, which control how messages are set to subscribers. The specific configuration of these settings can have profound impacts on the semantics and ordering of messages in the broker.

func NewDequeBroker

func NewDequeBroker[T any](ctx context.Context, deque *Deque[T], opts BrokerOptions) *Broker[T]

NewDequeBroker constructs a broker that uses the queue object to buffer incoming requests if subscribers are slow to process requests. The semantics of the Deque depends a bit on the configuration of it's limits and capacity.

This broker distributes messages in a FIFO order, dropping older messages to make room for new messages.

func NewLIFOBroker

func NewLIFOBroker[T any](ctx context.Context, deque *Deque[T], opts BrokerOptions) *Broker[T]

NewLIFOBroker constructs a broker that uses the queue object to buffer incoming requests if subscribers are slow to process requests. The semantics of the Deque depends a bit on the configuration of it's limits and capacity.

This broker distributes messages in a LIFO order, dropping older messages to make room for new messages. The capacity of the queue is fixed, and must be a positive integer greater than 0, NewLIFOBroker will panic if the capcity is less than or equal to 0.

func NewQueueBroker

func NewQueueBroker[T any](ctx context.Context, queue *Queue[T], opts BrokerOptions) *Broker[T]

NewQueueBroker constructs a broker that uses the queue object to buffer incoming requests if subscribers are slow to process requests. Queue have a system for sheding load when the queue's limits have been exceeded. In general the messages are distributed in FIFO order, and Publish calls will drop messages if the queue is full.

All brokers respect the BrokerOptions, which control the size of the worker pool used to send messages to senders and if the Broker should use non-blocking sends. All channels between the broker and the subscribers are un-buffered.

func (*Broker[T]) Publish

func (b *Broker[T]) Publish(ctx context.Context, msg T)

Publish distributes a message to all subscribers.

func (*Broker[T]) Send added in v0.13.0

func (b *Broker[T]) Send(ctx context.Context, msg T) error

Send distributes a message to all subscribers.

func (*Broker[T]) Stats added in v0.8.0

func (b *Broker[T]) Stats(ctx context.Context) BrokerStats

Stats provides introspection into the current state of the broker.

func (*Broker[T]) Stop

func (b *Broker[T]) Stop()

Stop cancels the broker, allowing background work to stop.

func (*Broker[T]) Subscribe

func (b *Broker[T]) Subscribe(ctx context.Context) chan T

Subscribe generates a new subscription channel, of the specified buffer size. You *must* call Unsubcribe on this channel when you are no longer listening to this channel.

Subscription channels are *not* closed and should never be closed by the caller. Closing a subscription channel will cause an unhandled panic.

func (*Broker[T]) Unsubscribe

func (b *Broker[T]) Unsubscribe(ctx context.Context, msgCh chan T)

Unsubscribe removes a channel from the broker.

func (*Broker[T]) Wait

func (b *Broker[T]) Wait(ctx context.Context)

Wait blocks until either the context has been canceled, or all work has been completed.

type BrokerOptions

type BrokerOptions struct {
	// BufferSize controls the buffer size of the internal broker
	// channels that handle subscription creation and deletion
	// (unsubscribe.) Buffering
	BufferSize int
	// ParallelDispatch, when true, sends each message to
	// subscribers in parallel, and waits for all messages to be
	// delivered before continuing.
	ParallelDispatch bool
	// WorkerPoolSize controls the number of go routines used for
	// sending messages to subscribers, when using Queue-backed
	// brokers. If unset this defaults to 1.
	//
	// When this value is larger than 1, the order of messages
	// observed by individual subscribers will not be consistent.
	WorkerPoolSize int
}

BrokerOptions configures the semantics of a broker. The zero-values produce a blocking unbuffered queue message broker with every message distributed to every subscriber. While the default settings make it possible for one subscriber to block another subscriber, they guarantee that all messages will be delivered. Buffered brokers may lose messages.

type BrokerStats added in v0.8.0

type BrokerStats struct {
	Subscriptions int
	BufferDepth   int
	MessageCount  uint64
}

BrokerStats is a data struct used to report on the internal state of the broker.

type Deque

type Deque[T any] struct {
	// contains filtered or unexported fields
}

Deque proves a basic double ended queue backed by a doubly linked list, with features to support a maximum capacity, burstable limits and soft quotas, as well as iterators, that safe for access from multiple concurrent go-routines. Furthermore, the implementation safely handles multiple concurrent blocking operations (e.g. Wait, WaitPop IteratorWait, IteratorWaitPop).

Use the NewDeque constructor to instantiate a Deque object.

func NewDeque

func NewDeque[T any](opts DequeOptions) (*Deque[T], error)

NewDeque constructs a Deque according to the options, and errors if there are any problems with the configuration.

func NewUnlimitedDeque deprecated added in v0.9.4

func NewUnlimitedDeque[T any]() *Deque[T]

NewUnlimitedDeque constructs an unbounded Deque.

Deprecated: you can use a literal constructor, &Deque[T]{}, to get an unlimited dequeue.

func (*Deque[T]) Close

func (dq *Deque[T]) Close() error

Close marks the deque as closed, after which point all blocking consumers will stop and no more operations will succeed. The error value is not used in the current operation.

func (*Deque[T]) Drain added in v0.14.0

func (dq *Deque[T]) Drain(ctx context.Context) error

Drain marks the deque as draining so that new items cannot be added, and then blocks until the deque is empty (or its context is canceled.) This does not close the deque: when Drain returns the deque is empty, but new work can then be added. To Drain and shutdown, use the Shutdown method.

func (*Deque[T]) ForcePushBack

func (dq *Deque[T]) ForcePushBack(it T) error

ForcePushBack is the same as PushBack, except, if the deque is at capacity, it removes one item from the front of the deque and then, having made room prepends the item. Returns an error if the deque is closed.

func (*Deque[T]) ForcePushFront

func (dq *Deque[T]) ForcePushFront(it T) error

ForcePushFront is the same as PushFront, except, if the deque is at capacity, it removes one item from the back of the deque and then, having made room appends the item. Returns an error if the deque is closed.

func (*Deque[T]) IteratorBack added in v0.14.0

func (dq *Deque[T]) IteratorBack(ctx context.Context) iter.Seq[T]

IteratorBack starts at the back of the Deque and iterates towards the front. When the iterator reaches the end (front) of the queue iteration halts.

func (*Deque[T]) IteratorFront added in v0.14.0

func (dq *Deque[T]) IteratorFront(ctx context.Context) iter.Seq[T]

IteratorFront starts at the front of the Deque and iterates towards the back. When the iterator reaches the end (back) of the deque iteration halts.

func (*Deque[T]) IteratorWaitBack added in v0.14.0

func (dq *Deque[T]) IteratorWaitBack(ctx context.Context) iter.Seq[T]

IteratorWaitBack yields items from the back of the Deque to the front. When it reaches the first element, it waits for a new element to be added. It does not modify the elements in the Deque.

func (*Deque[T]) IteratorWaitFront added in v0.14.0

func (dq *Deque[T]) IteratorWaitFront(ctx context.Context) iter.Seq[T]

IteratorWaitFront yields items from the front of the Deque to the back. When it reaches the last element, it waits for a new element to be added. It does not modify the elements in the Deque.

func (*Deque[T]) IteratorWaitPopBack added in v0.14.0

func (dq *Deque[T]) IteratorWaitPopBack(ctx context.Context) iter.Seq[T]

IteratorWaitPopBack returns a sequence that removes and returns objects from the back of the deque. When the Deque is empty, iteration ends.

func (*Deque[T]) IteratorWaitPopFront added in v0.14.0

func (dq *Deque[T]) IteratorWaitPopFront(ctx context.Context) iter.Seq[T]

IteratorWaitPopFront returns a sequence that removes and returns objects from the front of the deque. When the Deque is empty, iteration ends.

func (*Deque[T]) Len

func (dq *Deque[T]) Len() int

Len returns the length of the queue. This is an O(1) operation in this implementation.

func (*Deque[T]) PopBack

func (dq *Deque[T]) PopBack() (T, bool)

PopBack removes the last (tail) item of the queue, with the second value being false if the queue is empty or closed.

func (*Deque[T]) PopFront

func (dq *Deque[T]) PopFront() (T, bool)

PopFront removes the first (head) item of the queue, with the second value being false if the queue is empty or closed.

func (*Deque[T]) PushBack

func (dq *Deque[T]) PushBack(it T) error

PushBack adds an item to the back or end of the deque, and erroring if the queue is closed, at capacity, or has reached its limit.

func (*Deque[T]) PushFront

func (dq *Deque[T]) PushFront(it T) error

PushFront adds an item to the front or head of the deque, and erroring if the queue is closed, at capacity, or has reached its limit.

func (*Deque[T]) Shutdown added in v0.14.0

func (dq *Deque[T]) Shutdown(ctx context.Context) error

Shutdown drains the deque, waiting for all items to be removed from the deque and then closes it so no additional work can be added to the deque.

func (*Deque[T]) WaitPopBack added in v0.14.0

func (dq *Deque[T]) WaitPopBack(ctx context.Context) (T, error)

WaitPopBack pops the last (tail) item in the deque, and if the queue is empty, will block until an item is added, returning an error if the context canceled or the queue is closed.

func (*Deque[T]) WaitPopFront added in v0.14.0

func (dq *Deque[T]) WaitPopFront(ctx context.Context) (v T, err error)

WaitPopFront pops the first (head) item in the deque, and if the queue is empty, will block until an item is added, returning an error if the context canceled or the queue is closed.

func (*Deque[T]) WaitPushBack added in v0.6.0

func (dq *Deque[T]) WaitPushBack(ctx context.Context, it T) error

WaitPushBack performs a blocking add to the deque: if the deque is at capacity, this operation blocks until the deque is closed or there is capacity to add an item. The new item is added to the back of the deque.

func (*Deque[T]) WaitPushFront added in v0.6.0

func (dq *Deque[T]) WaitPushFront(ctx context.Context, it T) error

WaitPushFront performs a blocking add to the deque: if the deque is at capacity, this operation blocks until the deque is closed or there is capacity to add an item. The new item is added to the front of the deque.

type DequeOptions

type DequeOptions struct {
	Unlimited    bool
	Capacity     int
	QueueOptions *QueueOptions
}

DequeOptions configure the semantics of the deque. The Validate() method ensures that you do not produce a configuration that is impossible.

func (*DequeOptions) Validate

func (opts *DequeOptions) Validate() error

Validate ensures that the options are consistent. Exported as a convenience function. All errors have ErrConfigurationMalformed as their root.

type Queue

type Queue[T any] struct {
	// contains filtered or unexported fields
}

A Queue is a limited-capacity FIFO queue of arbitrary data items.

A queue has a soft quota and a hard limit on the number of items that may be contained in the queue. Adding items in excess of the hard limit will fail unconditionally.

For items in excess of the soft quota, a credit system applies: Each queue maintains a burst credit score. Adding an item ein excess of the soft quota costs 1 unit of burst credit. If there is not enough burst credit, the add will fail.

The initial burst credit is assigned when the queue is constructed. Removing items from the queue adds additional credit if the resulting queue length is less than the current soft quota. Burst credit is capped by the hard limit.

A Queue is safe for concurrent use by multiple goroutines.

func NewQueue

func NewQueue[T any](opts QueueOptions) (*Queue[T], error)

NewQueue constructs a new empty queue with the specified options. It reports an error if any of the option values are invalid.

func NewUnlimitedQueue

func NewUnlimitedQueue[T any]() *Queue[T]

NewUnlimitedQueue produces an unbounded queue.

func (*Queue[T]) Close

func (q *Queue[T]) Close() error

Close closes the queue. After closing, any further Add calls will report an error, but items that were added to the queue prior to closing will still be available for Pop and WaitPop. WaitPop will report an error without blocking if it is called on a closed, empty queue.

func (*Queue[T]) Drain added in v0.13.0

func (q *Queue[T]) Drain(ctx context.Context) error

Drain marks the queue as draining so that new items cannot be added, and then blocks until the queue is empty (or it's context is canceled.) This does not close the queue: when Drain returns the queue is empty, but new work can then be added. To Drain and shutdown, use the Shutdown method.

func (*Queue[T]) Iterator

func (q *Queue[T]) Iterator() iter.Seq[T]

Iterator returns an iterator for all items in the queue. Does not block.

func (*Queue[T]) IteratorWait added in v0.14.0

func (q *Queue[T]) IteratorWait(ctx context.Context) iter.Seq[T]

IteratorWait produces an iteratorthat wraps the underlying queue linked list. The iterator respects the Queue's mutex and is safe for concurrent access and current queue operations, without additional locking. The iterator does not modify or remove items from the queue, and will only terminate when the queue has been closed via the Close() method.

For a consuming stream, use IteratorWaitPop.

func (*Queue[T]) IteratorWaitPop added in v0.14.0

func (q *Queue[T]) IteratorWaitPop(ctx context.Context) iter.Seq[T]

IteratorWaitPop returns a consuming iterator that removes items from the queue. Blocks waiting for new items when the queue is empty. Iterator terminates on context cancellation or queue closure. Each item returned is removed from the queue (destructive read). Safe for concurrent access.

func (*Queue[T]) Len added in v0.8.0

func (q *Queue[T]) Len() int

Len returns the number of items in the queue. Because the queue tracks its size this is a constant time operation.

func (*Queue[T]) Pop added in v0.14.0

func (q *Queue[T]) Pop() (out T, ok bool)

Pop removes and returns the frontmost (oldest) item in the queue and reports whether an item was available. If the queue is empty, Pop returns T<zero>, false.

func (*Queue[T]) Push added in v0.14.0

func (q *Queue[T]) Push(item T) error

Push adds item to the back of the queue. It reports an error and does not enqueue the item if the queue is full or closed, or if it exceeds its soft quota and there is not enough burst credit.

func (*Queue[T]) Shutdown added in v0.13.0

func (q *Queue[T]) Shutdown(ctx context.Context) error

Shutdown drains the queue, waiting for all items to be removed from the queue and then clsoes it so no additional work can be added to the queue.

func (*Queue[T]) WaitPop added in v0.14.0

func (q *Queue[T]) WaitPop(ctx context.Context) (out T, _ error)

WaitPop blocks until q is non-empty or closed, and then returns the frontmost (oldest) item from the queue. If ctx ends before an item is available, WaitPop returns a nil value and a context error. If the queue is closed while it is still, WaitPop returns nil, ErrQueueClosed.

WaitPop is destructive: every item returned is removed from the queue.

func (*Queue[T]) WaitPush added in v0.14.0

func (q *Queue[T]) WaitPush(ctx context.Context, item T) error

WaitPush attempts to add an item to the queue, as with Add, but if the queue is full, blocks until the queue has capacity, is closed, or the context is canceled. Returns an error if the context is canceled or the queue is closed.

type QueueOptions

type QueueOptions struct {
	// The maximum number of items the queue will ever be
	// permitted to hold. This value must be positive, and greater
	// than or equal to SoftQuota. The hard limit is fixed and
	// does not change as the queue is used.
	//
	// The hard limit should be chosen to exceed the largest burst
	// size expected under normal operating conditions.
	HardLimit int

	// The initial expected maximum number of items the queue
	// should contain on an average workload. If this value is
	// zero, it is initialized to the hard limit. The soft quota
	// is adjusted from the initial value dynamically as the queue
	// is used.
	SoftQuota int

	// The initial burst credit score.  This value must be greater
	// than or equal to zero. If it is zero, the soft quota is
	// used.
	BurstCredit float64
}

QueueOptions are the initial settings for a Queue or Deque.

func (*QueueOptions) Validate

func (opts *QueueOptions) Validate() error

Validate ensures that the options are consistent. Exported as a convenience function. All errors have ErrConfigurationMalformed as their root.

type Stream added in v0.14.0

type Stream[T any] struct {
	// contains filtered or unexported fields
}

Stream provides a safe, context-respecting iteration/sequence paradigm, and entire tool kit for consumer functions, converters, and generation options.

The implementation of Stream predated Go's native iterators, and Stream (originally named Iterator,) was an attempt to provide lazy, incremental execution and iteration-focused flows, with a decidedly functional and event-driven bent.

This gave rise to the `fnx` package, and in many ways everything in the `tychoish/fun` package, all of which _do_ succeed at the aim of providing ergonomic, batteries-included programming models to address fundamental (and shared concerns.)

Fundamentally, this type has been superceded by iterators which provide a much more reasonable and safe interface for common operations. However, being able to use simple context-aware functions as the basis for iteration/event-driven program has some potential utility, particularly for pub-sub use cases, and so it remains.

Consider it deprecated: it will be removed in the future, but without a deadline. New code should use the iter.Seq[T] model, and the `fun/irt` package provides a lot of second-order tools for using iterators to ease the transition.

func ChannelStream added in v0.14.0

func ChannelStream[T any](ch <-chan T) *Stream[T]

ChannelStream exposes access to an existing "receive" channel as a stream.

func JoinStreams added in v0.14.0

func JoinStreams[T any](iters ...*Stream[T]) *Stream[T]

JoinStreams takes a sequence of streams and produces a combined stream. JoinStreams processes items sequentially from each stream. By contrast, MergeStreams constructs a stream that reads all of the items from the input streams in parallel, and returns items in an arbitrary order.

Use JoinStreams or FlattenStreams if order is important. Use FlattenStream for larger numbers of streams. Use MergeStreams when producing an item takes a non-trivial amount of time.

func MakeStream added in v0.14.0

func MakeStream[T any](gen func(context.Context) (T, error)) *Stream[T]

MakeStream constructs a stream that calls the Future function once for every item, until it errors. Errors other than context cancellation errors and io.EOF are propgated to the stream's Close method.

func MergeStreams deprecated added in v0.14.0

func MergeStreams[T any](iters *Stream[*Stream[T]]) *Stream[T]

MergeStreams takes a collection of streams of the same type of objects and provides a single stream over these items.

There are a collection of background threads, one for each input stream, which will iterate over the inputs and will provide the items to the output stream. These threads start on the first iteration and will return if this context is canceled.

The stream will continue to produce items until all input streams have been consumed, the initial context is canceled, or the Close method is called, or all of the input streams have returned an error.

Use MergeStreams when producing an item takes a non-trivial amount of time. Use ChainStreams or FlattenStreams if order is important. Use FlattenStream for larger numbers of streams.

Deprecated: use irt.Chain() and iter.Seq[T] iterators instead.

func SliceStream added in v0.14.0

func SliceStream[T any](in []T) *Stream[T]

SliceStream provides Stream access to the elements in a slice.

func VariadicStream added in v0.14.0

func VariadicStream[T any](in ...T) *Stream[T]

VariadicStream produces a stream from an arbitrary collection of objects, passed into the constructor.

func (*Stream[T]) AddError added in v0.14.0

func (st *Stream[T]) AddError(e error)

AddError can be used by calling code to add errors to the stream, which are merged.

AddError is not safe for concurrent use (with regards to other AddError calls or Close).

func (*Stream[T]) Buffer added in v0.14.0

func (st *Stream[T]) Buffer(n int) *Stream[T]

Buffer adds a buffer in the queue using a channel as buffer to smooth out iteration performance, if the iteration function (future) and the consumer both take time, even a small buffer will improve the throughput of the system and prevent both components of the system from blocking on eachother.

The ordering of elements in the output stream is the same as the order of elements in the input stream.

func (*Stream[T]) BufferParallel added in v0.14.0

func (st *Stream[T]) BufferParallel(n int) *Stream[T]

BufferParallel processes the input queue and stores those items in a channel (like Buffer); however, unlike Buffer, multiple workers consume the input stream: as a result the order of the elements in the output stream is not the same as the input order.

Otherwise, the two Buffer methods are equivalent and serve the same purpose: process the items from a stream without blocking the consumer of the stream.

func (*Stream[T]) BufferedChannel added in v0.14.0

func (st *Stream[T]) BufferedChannel(ctx context.Context, size int) <-chan T

BufferedChannel provides access to the content of the stream with a buffered channel that is closed when the stream is exhausted.

func (*Stream[T]) Channel added in v0.14.0

func (st *Stream[T]) Channel(ctx context.Context) <-chan T

Channel proides access to the contents of the stream as a channel. The channel is closed when the stream is exhausted.

func (*Stream[T]) Close added in v0.14.0

func (st *Stream[T]) Close() error

Close terminates the stream and returns any errors collected during iteration. If the stream allocates resources, this will typically release them, but close may not block until all resources are released.

Close is safe to call more than once and always resolves the error handler (e.g. AddError),.

func (*Stream[T]) CloseHook added in v0.14.0

func (st *Stream[T]) CloseHook() func(*Stream[T])

CloseHook returns a function that can be passed to the WithHook() method on a _new_ stream that wraps this stream, so that the other stream will call the inner stream's close method and include the inner stream's errors.

func (*Stream[T]) Count added in v0.14.0

func (st *Stream[T]) Count(ctx context.Context) (count int)

Count returns the number of items observed by the stream. Callers should still manually call Close on the stream.

func (*Stream[T]) ErrorHandler added in v0.14.0

func (st *Stream[T]) ErrorHandler() fn.Handler[error]

ErrorHandler provides access to the AddError method as an error observer.

func (*Stream[T]) Filter added in v0.14.0

func (st *Stream[T]) Filter(check func(T) bool) *Stream[T]

Filter passes every item in the stream and, if the check function returns true propagates it to the output stream. There is no buffering, and check functions should return quickly. For more advanced use, consider using itertool.Map().

func (*Stream[T]) Iterator added in v0.14.0

func (st *Stream[T]) Iterator(ctx context.Context) iter.Seq[T]

Iterator converts a pubsub.Stream[T] into a native go iterator.

func (*Stream[T]) Join added in v0.14.0

func (st *Stream[T]) Join(iters ...*Stream[T]) *Stream[T]

Join merges multiple streams processing and producing their results sequentially, and without starting any go routines. Otherwise similar to Flatten (which processes each stream in parallel).

func (*Stream[T]) Next added in v0.14.0

func (st *Stream[T]) Next(ctx context.Context) bool

Next advances the stream (using ReadOne) and caches the current value for access with the Value() method. When Next is true, the Value() will return the next item. When false, either the stream has been exhausted (e.g. the Future function has returned io.EOF) or the context passed to Next has been canceled.

Using Next/Value cannot be done safely if stream is accessed from multiple go routines concurrently. In these cases use ReadOne directly, or use Split to create a stream that safely draws items from the parent stream.

func (*Stream[T]) Parallel added in v0.14.0

func (st *Stream[T]) Parallel(fn fnx.Handler[T], opts ...opt.Provider[*wpa.WorkerGroupConf]) fnx.Worker

Parallel produces a worker that, when executed, will iteratively processes the contents of the stream. The options control the error handling and parallelism semantics of the operation.

This is the work-house operation of the package, and can be used as the basis of worker pools, even processing, or message dispatching for pubsub queues and related systems.

func (*Stream[T]) Read added in v0.14.0

func (st *Stream[T]) Read(ctx context.Context) (out T, err error)

Read returns a single value from the stream. This operation IS safe for concurrent use.

Read returns the io.EOF error when the stream has been exhausted, a context expiration error or the underlying error produced by the stream. All errors produced by Read are terminal and indicate that no further iteration is possible.

func (*Stream[T]) ReadAll added in v0.14.0

func (st *Stream[T]) ReadAll(fn fnx.Handler[T]) fnx.Worker

ReadAll provides a function consumes all items in the stream with the provided processor function.

All panics are converted to errors and propagated in the response of the worker, and abort the processing. If the handler function returns ers.ErrCurrentOpSkip, processing continues. All other errors abort processing and are returned by the worker.

func (*Stream[T]) Reduce added in v0.14.0

func (st *Stream[T]) Reduce(reducer func(T, T) (T, error)) *Stream[T]

Reduce processes a stream with a reducer function. The output function is a Future operation which runs synchronously, and no processing happens before future is called. If the reducer function returns ers.ErrCurrentOpSkip, the output value is ignored, and the reducer operation continues.

If the underlying stream returns an error, it's returned by the close method of this the new stream. If the reducer function returns an error, that error is returned either by the Read or Close methods of the stream. If the underlying stream terminates cleanly, then the reducer will return it's last value without error. Otherwise any error returned by the Reduce method, other than ers.ErrCurrentOpSkip, is propagated to the caller.

The "previous" value for the first reduce option is the zero value for the type T.

func (*Stream[T]) Slice added in v0.14.0

func (st *Stream[T]) Slice(ctx context.Context) (out []T, _ error)

Slice converts a stream to the slice of it's values, and closes the stream at the when the stream has been exhausted..

In the case of an error in the underlying stream the output slice will have the values encountered before the error.

func (*Stream[T]) Split added in v0.14.0

func (st *Stream[T]) Split(num int) []*Stream[T]

Split produces an arbitrary number of streams which divide the input. The division is lazy and depends on the rate of consumption of output streams, but every item from the input stream is sent to exactly one output stream, each of which can be safely used from a different go routine.

The input stream is not closed after the output streams are exhausted. There is one background go routine that reads items off of the input stream, which starts when the first output stream is advanced: be aware that canceling this context will effectively cancel all streams.

func (*Stream[T]) Value added in v0.14.0

func (st *Stream[T]) Value() T

Value returns the object at the current position in the stream. It's often used with Next() for looping over the stream.

Value and Next cannot be done safely when the stream is being used concrrently. Use ReadOne or the Future method.

func (*Stream[T]) WithHook added in v0.14.0

func (st *Stream[T]) WithHook(hook fn.Handler[*Stream[T]]) *Stream[T]

WithHook constructs a stream from the future. The provided hook function will run during the Stream's Close() method.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL