cascade

package module
v0.0.0-...-43777c9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 28, 2025 License: MIT, Unlicense Imports: 8 Imported by: 0

README

cascade

Cascade provides helpers for building cancellable asynchronous computation graphs using Go generics and iterators.

The design is based on lessons learned from writing skel, though this implementation is not tied to a UI framework and may prove useful in server contexts.

Status

Documentation is incomplete and API may still undergo breaking changes.

Usage

Given any iterable data source (a slice, a channel, a gRPC stream, whatever) if you can construct a function structured like this:

// Here is the result type for your reference
type Result[T any] struct {
	Value T
	Err   error
}

func(context.Context) iter.Seq[cascade.Result[T]]

You can then build a cascade.Iterator with:

iter := cascade.IteratorFunc(myFuncReturningInts)

The above does not actually begin iteration, but creates a source that we can use to construct a computation graph.

We can then do modify the iteration like:

incremented := cascade.Map(iter, func(i int) int { return i+1 })
filtered := cascade.Filter(incremented, func(i int) bool { return i & 1 == 0 })

Finally, we can actually begin iteration and acquire the results with a terminal operator:

results := cascade.Collect(ctx, filtered)

Iteration will run until the source iterator terminates or the context cancels.

You can also use a Leaser to read cascades from iterations of an event loop, or one of several other terminal operators (Reduce, Iterate) to acquire the results.

Operations like ZipLatest allow combining multiple iterators together.

Acknowledgements

Cascade's initial development occurred within Plato Team Inc. Thanks for releasing it!

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Collect

func Collect[T any](ctx context.Context, input Iterator[T]) ([]T, error)

Collect buffers all items emitted from the Iterator into a slice.

func Iterate

func Iterate[T any](ctx context.Context, source Iterator[T], name string) iter.Seq2[T, error]

func Reduce

func Reduce[T any, S any](ctx context.Context, input Iterator[T], reducer func(s S, t T) S) (S, error)

Reduce aggregates all items emitted from the Iterator into a single state value.

Types

type FanOut

type FanOut[T any] struct {
	// contains filtered or unexported fields
}

FanOut implements channel-like fan-out semantics for [Iterator]s. Values emitted by the FanOut's source Iterator are to everything consuming the FanOut as an Iterator.

func NewFanOut

func NewFanOut[T any](ctx context.Context, source Iterator[T], autoclose bool, clone func(T) T) *FanOut[T]

NewFanOut creates a fanOut with its lifetime governed by ctx. If autoclose is true, the fanOut will close itself when it has no subscribers (after it has had at least 1 subscriber). The clone and merge funcs define how to manage the data elements as they flow through the fanOut.

type Iterator

type Iterator[T any] interface {
	// contains filtered or unexported methods
}

Iterator can be synchronously iterated to receive Result[T] values. The Iterator must terminate when the provided context is cancelled.

func Chunk

func Chunk[T any](input Iterator[T], n int) Iterator[[]T]

Chunk returns an Iterator that emits chunks of T up to n. Errors in the pipeline will be emitted as separate values.

func Debounce

func Debounce[T any](input Iterator[T], threshold time.Duration) Iterator[T]

Debounces returns an Iterator that emits one value per interval.

func Distinct

func Distinct[T Value[T]](input Iterator[T]) Iterator[T]

Distinct returns a Iterator[T] that emits only the first of consecutive equal elements.

func Filter

func Filter[T any](input Iterator[T], keep func(val T) bool) Iterator[T]

Filter returns a Iterator[T] that conditionally emits certain elements based on the provided keep func.

func FlatMapLatest

func FlatMapLatest[In Value[In], Out any](input Iterator[In], transform func(Result[In]) Iterator[Out]) Iterator[Out]

FlatMapLatest swaps the output Iterator when receiving values from the input Iterator, where the transform func defines how to build the new output.

func Flatten

func Flatten[T any](input Iterator[Iterator[T]]) Iterator[T]

Flatten converts an iterator of iterators of T into an intertaor of T by draining the values from each iterator in sequence onto the output iterator.

func Latest

func Latest[T Value[T]](input Iterator[T]) Iterator[T]

Latest retains the latest value emitted by the input Iterator, removing backpressure.

func Map

func Map[T, U any](input Iterator[T], transform func(val T) U) Iterator[U]

Map converts a Iterator[T] to an Iterator[U] by changing each element using the provided transform func.

func MapErr

func MapErr[T, U any](input Iterator[T], transform func(val T) (U, error)) Iterator[U]

MapErr converts a Iterator[T] to an Iterator[U] by changing each element using the provided transform func. The transform func is allowed to return an error, which will be carried through the iterator pipeline instead of the value.

func ToIterator

func ToIterator[T any](streamer Streamer[T]) Iterator[T]

ToIterator takes any Streamer and converts it to an Iterator.

func ZipLatest

func ZipLatest[T Value[T], U Value[U], V any](inputA Iterator[T], inputB Iterator[U], zipFunc func(T, U) V) Iterator[V]

ZipLatest joins two input Iterators asynchronously, returning an output Iterator that contains merged data according to zipFunc. This is not a perfect element-wise zip; when each iterator emits a value, it is zipped against the most recent value from the other iterator.

type IteratorFunc

type IteratorFunc[T any] func(context.Context) iter.Seq[Result[T]]

IteratorFunc adapts any function with the appropriate signature into an Iterator.

type Lease

type Lease[T any] struct {
	// contains filtered or unexported fields
}

Lease allows reading values from an iterator from an event loop, so long as the Lease is [Renew]ed each iteration. If the lease goes an iteration without being renewed, it will automatically cancel its iterator until it is renewed again, at which point it will restart the iterator.

func NewLease

func NewLease[T any](leaser *Leaser, source Iterator[T], errListener func(error), listener func(T)) *Lease[T]

NewLease creates a Lease reading values from the [source] Iterator. When values arrive, either the [errListener] or the [listener] is invoked depending on whether the arriving result was an error or a T.

func (*Lease[T]) Renew

func (s *Lease[T]) Renew()

Renew ensures that the stream is alive, starting a new instance if needed. Renew may be called before [Update] during the same event loop iteration in order to give the provider time to produce a result.

func (*Lease[T]) Update

func (s *Lease[T]) Update()

Update reads from the asynchronous provider func, instantiating it if it is not already running. Funcs provided to [Listen] will be called during an invocation of Update if they are called at all. Only one call to [Update] per event loop iteration will have any effect.

type Leaser

type Leaser struct {
	// contains filtered or unexported fields
}

Leaser issues stream Leases. Each Lease must be constantly renewed to keep the underlying iterator active.

func NewLeaser

func NewLeaser(ctx context.Context, invalidator func(identity any)) *Leaser

NewLeaser builds a Leaser. Whenever any stream emits data, it will invoke the invalidator func with the stream as an argument.

func (*Leaser) Mark

func (l *Leaser) Mark() (cancelled, alive int)

Mark should be invoked at the end of every event loop iteration. It finds and cancels [Lease]s that were not renewed during the event loop.

type Primitive

type Primitive[T PrimitiveType] struct {
	Value T
}

Primitive wraps a PrimitiveType in a Value.

func (Primitive[T]) Clone

func (p Primitive[T]) Clone() Primitive[T]

func (Primitive[T]) Equals

func (p Primitive[T]) Equals(other Primitive[T]) bool

type PrimitiveType

type PrimitiveType interface {
	int | float64
}

PrimitiveType is the set of all primitive types for which equality and deep copy are automatically implementable.

type Result

type Result[T any] struct {
	Value T
	Err   error
}

Result carries a value of type T along with a mutually-exclusive error.

type Streamer

type Streamer[T any] interface {
	// contains filtered or unexported methods
}

Streamer can be asynchronously iterated to receive Result[T] values. The Streamer must terminate when the provided context is cancelled.

type StreamerFunc

type StreamerFunc[T any] func(context.Context) <-chan Result[T]

StreamerFunc adapts any function with the appropriate signature into a Streamer.

type Value

type Value[T any] interface {
	// Equals returns true if the parameter is logically equal to the receiver.
	Equals(T) bool
	// Clone returns a deep copy of the reciever.
	Clone() T
}

Value defines the methods needed for a type to implement equality checks and deep copying.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL