Documentation
¶
Overview ¶
Package gocurrent provides utilities for common Go concurrency patterns.
This package implements several concurrency primitives inspired by Rob Pike's concurrency patterns from his talk "Go Concurrency Patterns" (https://go.dev/talks/2012/concurrency.slide).
The main components include:
- Reader: A goroutine wrapper that continuously calls a reader function and sends results to a channel, with error signaling via ClosedChan()
- Writer: A goroutine for serializing writes using a writer callback, with error signaling via ClosedChan()
- Mapper: Transform and/or filter data between channels
- Reducer: Collect and reduce N values from an input channel with configurable time windows
- Pipe: Connect a reader and writer channel with identity transform
- FanIn: Merge multiple input channels into a single output channel
- FanOut: Distribute messages from one channel to multiple output channels. Three dispatch strategies available via SyncFanOut, AsyncFanOut, and QueuedFanOut (recommended), each with different ordering/blocking trade-offs. See the FanOuter interface for the common API.
- SyncMap: A type-safe generic wrapper around sync.Map
All concurrency primitives are designed to be composable and provide fine-grained control over goroutine lifecycles, resource management, and error monitoring through completion signaling channels.
Index ¶
- Constants
- func IDFunc[T any](input T) T
- type AsyncFanOut
- func (c *AsyncFanOut) Add(output chan<- T, filter FilterFunc[T], wait bool) (callbackChan chan error)
- func (c *AsyncFanOut) ClosedChan() <-chan error
- func (c *AsyncFanOut) Count() int
- func (c *AsyncFanOut) DebugInfo() any
- func (c *AsyncFanOut) InputChan() chan<- T
- func (c *AsyncFanOut) New(filter FilterFunc[T]) chan T
- func (c *AsyncFanOut) Remove(output chan<- T, wait bool) (callbackChan chan error)
- func (c *AsyncFanOut) Send(value T)
- type Block
- type Broadcast
- type Component
- type FanIn
- type FanInOption
- type FanOutOption
- type FanOuter
- type FilterFunc
- type IdleTimer
- type InputComponent
- type Mapper
- func Connect[T any](from OutputComponent[T], to InputComponent[T]) *Mapper[T, T]
- func ConnectWith[I, O any](from OutputComponent[I], to InputComponent[O], mapper func(I) (O, bool, bool)) *Mapper[I, O]
- func NewMapper[T any, U any](input <-chan T, output chan<- U, mapper func(T) (U, bool, bool), ...) *Mapper[T, U]
- func NewPipe[T any](input <-chan T, output chan<- T) *Mapper[T, T]
- type MapperOption
- type Merge
- type Message
- type OutputComponent
- type Pipeline
- type QueuedFanOut
- func (c *QueuedFanOut) Add(output chan<- T, filter FilterFunc[T], wait bool) (callbackChan chan error)
- func (c *QueuedFanOut) ClosedChan() <-chan error
- func (c *QueuedFanOut) Count() int
- func (fo *QueuedFanOut[T]) DebugInfo() any
- func (c *QueuedFanOut) InputChan() chan<- T
- func (c *QueuedFanOut) New(filter FilterFunc[T]) chan T
- func (c *QueuedFanOut) Remove(output chan<- T, wait bool) (callbackChan chan error)
- func (c *QueuedFanOut) Send(value T)
- type QueuedFanOutOption
- type Reader
- type ReaderFunc
- type ReaderOption
- type Reducer
- func (r *Reducer[T, C, U]) ClosedChan() <-chan error
- func (fo *Reducer[T, C, U]) Flush()
- func (fo *Reducer[T, C, U]) InputChan() chan<- T
- func (r *Reducer[T, C, U]) IsRunning() bool
- func (fo *Reducer[T, C, U]) OutputChan() <-chan U
- func (fo *Reducer[T, C, U]) Send(value T)
- func (fo *Reducer[T, C, U]) Stop()
- type Reducer2
- type ReducerOption
- func WithCollectFunc[T any, C any, U any](fn func(C, ...T) (C, bool)) ReducerOption[T, C, U]
- func WithFlushPeriod[T any, C any, U any](period time.Duration) ReducerOption[T, C, U]
- func WithInputChan[T any, C any, U any](ch chan T) ReducerOption[T, C, U]
- func WithOutputChan[T any, C any, U any](ch chan U) ReducerOption[T, C, U]
- func WithReduceFunc[T any, C any, U any](fn func(C) U) ReducerOption[T, C, U]
- type ReducerOption2
- type RunnerBase
- type SyncFanOut
- func (c *SyncFanOut) Add(output chan<- T, filter FilterFunc[T], wait bool) (callbackChan chan error)
- func (c *SyncFanOut) ClosedChan() <-chan error
- func (c *SyncFanOut) Count() int
- func (c *SyncFanOut) DebugInfo() any
- func (c *SyncFanOut) InputChan() chan<- T
- func (c *SyncFanOut) New(filter FilterFunc[T]) chan T
- func (c *SyncFanOut) Remove(output chan<- T, wait bool) (callbackChan chan error)
- func (c *SyncFanOut) Send(value T)
- type SyncMap
- func (m *SyncMap[K, V]) Delete(key K)
- func (m *SyncMap[K, V]) Load(key K) (value V, ok bool)
- func (m *SyncMap[K, V]) LoadAndDelete(key K) (value V, loaded bool)
- func (m *SyncMap[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool)
- func (m *SyncMap[K, V]) Range(f func(key K, value V) bool)
- func (m *SyncMap[K, V]) Store(key K, value V)
- type Writer
- type WriterFunc
- type WriterOption
Examples ¶
Constants ¶
const DefaultQueueSize = 64
DefaultQueueSize is the default capacity of the dispatch queue used by QueuedFanOut. The queue acts as a buffer between the runner goroutine (which reads events from the input channel) and the dispatch goroutine (which delivers events to outputs). When the queue is full, the runner blocks — propagating back-pressure to the sender.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type AsyncFanOut ¶ added in v0.1.0
type AsyncFanOut[T any] struct { // contains filtered or unexported fields }
AsyncFanOut distributes events to all registered output channels by spawning a separate goroutine for each output on every event.
Ordering semantics — fire and forget per output:
Time ──────────────────────────────────────────────────────────────►
Sender: Send(A) ──► Send(B) ──► ...
│ │
│ ├─ goroutine: B → out[0] ← may arrive BEFORE A!
│ ├─ goroutine: B → out[1]
│ └─ goroutine: B → out[2]
│
├─ goroutine: A → out[0] ← may arrive AFTER B!
├─ goroutine: A → out[1]
└─ goroutine: A → out[2]
NO guarantee: A and B goroutines race. B can arrive before A on any output.
- Sender never blocks (goroutines are spawned immediately).
- NO ordering guarantee: goroutines for event B may execute before
goroutines for event A. The Go scheduler does not guarantee FIFO
ordering of goroutine execution.
- Goroutine count: N (outputs) per event. Can explode under high
throughput with many outputs.
- A slow output does NOT affect delivery to other outputs.
Use AsyncFanOut only when event ordering does not matter and per-output independence is more important than resource usage. In most cases, QueuedFanOut is a better choice.
func NewAsyncFanOut ¶ added in v0.1.0
func NewAsyncFanOut[T any](opts ...FanOutOption[T]) *AsyncFanOut[T]
NewAsyncFanOut creates an AsyncFanOut that spawns a goroutine per output per event. The fan-out starts running immediately.
Options:
- WithFanOutInputChan: use an existing input channel (caller-owned)
- WithFanOutInputBuffer: create a buffered input channel
Example:
fo := NewAsyncFanOut[int]() defer fo.Stop() out := fo.New(nil) fo.Send(42) val := <-out // 42
func (*AsyncFanOut) Add ¶ added in v0.1.0
func (c *AsyncFanOut) Add(output chan<- T, filter FilterFunc[T], wait bool) (callbackChan chan error)
Add registers an output channel with an optional filter. If wait is true, the returned channel receives nil once registration is complete.
func (*AsyncFanOut) ClosedChan ¶ added in v0.1.0
func (c *AsyncFanOut) ClosedChan() <-chan error
ClosedChan returns the channel used to signal when the fan-out is done.
func (*AsyncFanOut) Count ¶ added in v0.1.0
func (c *AsyncFanOut) Count() int
Count returns the number of registered output channels.
func (*AsyncFanOut) DebugInfo ¶ added in v0.1.0
func (c *AsyncFanOut) DebugInfo() any
DebugInfo returns diagnostic information about the fan-out's state.
func (*AsyncFanOut) InputChan ¶ added in v0.1.0
func (c *AsyncFanOut) InputChan() chan<- T
InputChan returns the write-only input channel.
func (*AsyncFanOut) New ¶ added in v0.1.0
func (c *AsyncFanOut) New(filter FilterFunc[T]) chan T
New creates a new owned output channel with an optional filter. The fan-out will close this channel on Remove or Stop.
type Block ¶ added in v0.0.7
type Block struct {
// contains filtered or unexported fields
}
Block represents a composite component made up of multiple connected primitives. A Block itself acts as a component and can be nested within other Blocks.
type Broadcast ¶ added in v0.0.7
Example: Broadcast pattern - one input, multiple outputs. Uses QueuedFanOut for strict FIFO ordering with non-blocking sends.
func NewBroadcast ¶ added in v0.0.7
NewBroadcast creates a broadcast block using QueuedFanOut
func (*Broadcast[T]) AddOutput ¶ added in v0.0.7
func (b *Broadcast[T]) AddOutput(filter FilterFunc[T]) chan T
AddOutput adds a new output channel to the broadcast
type Component ¶ added in v0.0.7
type Component interface {
// Stop stops the component and cleans up resources
Stop() error
// IsRunning returns true if the component is currently running
IsRunning() bool
}
Component represents any building block that can be part of a Block. All gocurrent primitives can implement this interface.
type FanIn ¶
type FanIn[T any] struct { RunnerBase[fanInCmd[T]] // OnChannelRemoved is called when a channel is removed so the caller can // perform other cleanups etc based on this OnChannelRemoved func(fi *FanIn[T], inchan <-chan T) // contains filtered or unexported fields }
FanIn merges multiple input channels into a single output channel. It implements the fan-in concurrency pattern where messages from multiple sources are combined into one stream.
Example ¶
// Create 5 input channels and send 5 numbers into them
// the collector channel
fanin := NewFanIn[int]()
defer fanin.Stop()
NUM_CHANS := 2
NUM_MSGS := 3
var inchans []chan int
for i := 0; i < NUM_CHANS; i++ {
inchan := make(chan int)
inchans = append(inchans, inchan)
fanin.Add(inchan)
}
for i := 0; i < NUM_CHANS; i++ {
go func(inchan chan int) {
// send some numbers into this fanin
for j := 0; j < NUM_MSGS; j++ {
inchan <- j
}
}(inchans[i])
}
// collect the fanned values
var vals []int
for i := 0; i < NUM_CHANS*NUM_MSGS; i++ {
val := <-fanin.OutputChan()
vals = append(vals, val)
}
// sort and print them for testing
sort.Ints(vals)
for _, v := range vals {
fmt.Println(v)
}
Output: 0 0 1 1 2 2
func NewFanIn ¶
func NewFanIn[T any](opts ...FanInOption[T]) *FanIn[T]
NewFanIn creates a new FanIn that merges multiple input channels with functional options. By default, creates and owns an unbuffered output channel. Use options to customize. The FanIn starts running immediately upon creation.
Examples:
// Simple usage with owned channel (backwards compatible) fanin := NewFanIn[int]() // With existing channel (backwards compatible) outChan := make(chan int, 10) fanin := NewFanIn(WithFanInOutputChan(outChan)) // With buffered output fanin := NewFanIn[int](WithFanInOutputBuffer[int](100))
func (*FanIn[T]) Add ¶
func (fi *FanIn[T]) Add(inputs ...<-chan T)
Add adds one or more input channels to the FanIn. Messages from these channels will be merged into the output channel. Panics if any input channel is nil.
func (*FanIn[T]) ClosedChan ¶ added in v0.0.7
ClosedChan returns the channel used to signal when the fan-in is done
func (*FanIn[T]) OutputChan ¶ added in v0.0.7
func (fi *FanIn[T]) OutputChan() <-chan T
OutputChan returns the channel on which merged output can be received.
type FanInOption ¶ added in v0.0.8
FanInOption is a functional option for configuring a FanIn
func WithFanInOnChannelRemoved ¶ added in v0.0.8
func WithFanInOnChannelRemoved[T any](fn func(*FanIn[T], <-chan T)) FanInOption[T]
WithFanInOnChannelRemoved sets the callback for when a channel is removed
func WithFanInOutputBuffer ¶ added in v0.0.8
func WithFanInOutputBuffer[T any](size int) FanInOption[T]
WithFanInOutputBuffer creates a buffered output channel for the FanIn
func WithFanInOutputChan ¶ added in v0.0.8
func WithFanInOutputChan[T any](ch chan T) FanInOption[T]
WithFanInOutputChan sets the output channel for the FanIn
type FanOutOption ¶ added in v0.0.8
type FanOutOption[T any] func(*fanOutCore[T])
FanOutOption is a functional option for configuring any fan-out type.
func WithFanOutInputBuffer ¶ added in v0.0.8
func WithFanOutInputBuffer[T any](size int) FanOutOption[T]
WithFanOutInputBuffer creates a buffered input channel of the given size. The fan-out owns and will close this channel on Stop.
func WithFanOutInputChan ¶ added in v0.0.8
func WithFanOutInputChan[T any](ch chan T) FanOutOption[T]
WithFanOutInputChan sets the input channel. The fan-out will NOT close this channel on Stop (caller retains ownership).
type FanOuter ¶ added in v0.1.0
type FanOuter[T any] interface { Component // Send delivers a value to the fan-out for distribution to all outputs. // Blocking behavior depends on the concrete implementation. Send(value T) // InputChan returns the write-only input channel. Callers may send // directly on this channel instead of calling Send. InputChan() chan<- T // Add registers an existing output channel with an optional per-channel // filter. If wait is true, the returned channel receives nil once the // registration is complete; otherwise the returned channel is nil. Add(output chan<- T, filter FilterFunc[T], wait bool) (callbackChan chan error) // New creates a new output channel owned by the fan-out (closed on Stop) // with an optional filter. The call blocks until registration is complete. New(filter FilterFunc[T]) chan T // Remove unregisters an output channel. If the channel was created by New, // it is also closed. If wait is true, the returned channel receives nil // once the removal is complete. Remove(output chan<- T, wait bool) (callbackChan chan error) // Count returns the current number of registered output channels. Count() int // ClosedChan returns a channel that receives nil (or an error) when the // fan-out has fully shut down. ClosedChan() <-chan error }
FanOuter is the interface satisfied by all fan-out dispatch strategies.
Three concrete implementations are provided, each with different trade-offs between sender blocking, event ordering, and goroutine usage:
- SyncFanOut: Sender blocks until all outputs receive the event. Strict FIFO ordering. Zero extra goroutines.
- AsyncFanOut: Sender never blocks (one goroutine per output per event). No ordering guarantee. Goroutine count can explode.
- QueuedFanOut: Sender blocks only when the dispatch queue is full. Strict FIFO ordering via a persistent dispatch goroutine. Two goroutines total (runner + dispatcher). Recommended default.
Ordering semantics summary:
| Type | Sender blocks? | FIFO ordering? | Goroutines | |----------------|-----------------------|----------------|-------------------| | SyncFanOut | Yes (all outputs) | Strict | 0 extra | | AsyncFanOut | No | None | N per event | | QueuedFanOut | No (until queue full) | Strict | 2 total (bounded) |
type FilterFunc ¶
type FilterFunc[T any] func(*T) *T
FilterFunc is an optional per-output transformation/filtering function. It receives a pointer to the event and returns a pointer to the (possibly modified) event. Return nil to skip delivery to this output.
type IdleTimer ¶ added in v0.0.15
type IdleTimer struct {
// contains filtered or unexported fields
}
IdleTimer implements a ref-counted idle timeout. The timer starts counting when all activity ceases (active count drops to zero) and pauses while any activity is in progress.
This is the "activity-gated idle timeout" pattern used for session/connection idle cleanup — any service managing sessions, connections, or pooled resources with automatic expiry needs this exact primitive.
Usage:
timer := NewIdleTimer(30*time.Second, func() { removeSession(id) })
timer.Acquire() // request starts — pause idle timer
defer timer.Release() // request ends — restart timer if no more activity
timer.Stop() // explicit shutdown — cancel timer permanently
All methods are nil-safe: calling Acquire/Release/Stop on a nil *IdleTimer is a no-op. This enables callers to use *IdleTimer as a struct field without nil checks at every call site.
func NewIdleTimer ¶ added in v0.0.15
NewIdleTimer creates an IdleTimer that calls onExpire when the timeout elapses with no activity. The timer starts immediately — call Acquire() to pause it.
If timeout is 0, returns a no-op timer: Acquire/Release/Stop do nothing and onExpire is never called. This is the default for "no timeout configured."
func (*IdleTimer) Acquire ¶ added in v0.0.15
func (t *IdleTimer) Acquire()
Acquire marks the start of an activity (e.g., an in-flight request). Stops the idle timer so it cannot fire while the activity is in progress. Must be paired with a corresponding Release().
Safe to call on a nil receiver (no-op).
type InputComponent ¶ added in v0.0.7
type InputComponent[T any] interface { Component // InputChan returns the channel for sending input to this component InputChan() chan<- T // Send is a convenience method for sending to the input channel Send(T) }
InputComponent represents a component with an input channel
type Mapper ¶
type Mapper[I any, O any] struct { RunnerBase[string] // MapFunc is applied to each value in the input channel // and returns a tuple of 3 things - outval, skip, stop // if skip is false, outval is sent to the output channel // if stop is true, then the entire mapper stops processing any further elements. // This mechanism can be used inaddition to the Stop method if sequencing this // within the elements of input channel is required MapFunc func(I) (O, bool, bool) OnDone func(p *Mapper[I, O]) // contains filtered or unexported fields }
Mapper connects an input and output channel applying transforms between them. It reads from the input channel, applies a transformation function, and writes the result to the output channel.
func Connect ¶ added in v0.0.7
func Connect[T any](from OutputComponent[T], to InputComponent[T]) *Mapper[T, T]
Connect connects the output of one component to the input of another using a Pipe. Returns the pipe so it can be managed if needed.
func ConnectWith ¶ added in v0.0.7
func ConnectWith[I, O any](from OutputComponent[I], to InputComponent[O], mapper func(I) (O, bool, bool)) *Mapper[I, O]
ConnectWith connects two components using a custom mapper function
func NewMapper ¶
func NewMapper[T any, U any](input <-chan T, output chan<- U, mapper func(T) (U, bool, bool), opts ...MapperOption[T, U]) *Mapper[T, U]
NewMapper creates a new mapper between an input and output channel with functional options. The ownership of the channels is by the caller and not the Mapper, so they will not be closed when the mapper stops. The mapper function returns (output, skip, stop) where: - output: the transformed value - skip: if true, the output is not sent to the output channel - stop: if true, the mapper stops processing further elements
Examples:
// Simple usage (backwards compatible)
mapper := NewMapper(inChan, outChan, myMapperFunc)
// With OnDone callback
mapper := NewMapper(inChan, outChan, myMapperFunc,
WithMapperOnDone(func(m *Mapper[int, string]) {
log.Println("mapper done")
}))
func NewPipe ¶
NewPipe creates a new pipe that connects an input and output channel. A pipe is a mapper with the identity function, so it simply forwards all values from input to output without transformation.
func (*Mapper[I, O]) ClosedChan ¶ added in v0.0.7
ClosedChan returns the channel used to signal when the mapper is done
func (*Mapper[I, O]) InputChan ¶ added in v0.0.7
func (m *Mapper[I, O]) InputChan() chan<- I
InputChan returns the input channel (exposes the private field for Component interface)
func (*Mapper[I, O]) OutputChan ¶ added in v0.0.7
func (m *Mapper[I, O]) OutputChan() <-chan O
OutputChan returns the output channel (exposes the private field for Component interface)
type MapperOption ¶ added in v0.0.8
MapperOption is a functional option for configuring a Mapper
func WithMapperOnDone ¶ added in v0.0.8
func WithMapperOnDone[I, O any](fn func(*Mapper[I, O])) MapperOption[I, O]
WithMapperOnDone sets the callback to be called when the mapper finishes
type Merge ¶ added in v0.0.7
Example: Merge pattern - multiple inputs, one output
func (*Merge[T]) AddInput ¶ added in v0.0.7
func (m *Merge[T]) AddInput(input <-chan T)
AddInput adds a new input channel to the merge
func (*Merge[T]) OutputChan ¶ added in v0.0.7
func (m *Merge[T]) OutputChan() <-chan T
OutputChan implements OutputComponent
type Message ¶
type Message[T any] struct { Value T // The actual value being transmitted Error error // Any error that occurred during processing Source any // Optional source information for debugging }
Message represents a value with optional error and source information. It's used by channels to carry both successful values and error conditions.
type OutputComponent ¶ added in v0.0.7
type OutputComponent[T any] interface { Component // OutputChan returns the channel for receiving output from this component OutputChan() <-chan T }
OutputComponent represents a component with an output channel
type Pipeline ¶ added in v0.0.7
Pipeline creates a linear sequence of components connected by pipes
func NewPipeline ¶ added in v0.0.7
NewPipeline creates a new pipeline block
func (*Pipeline[T]) InputChan ¶ added in v0.0.7
func (p *Pipeline[T]) InputChan() chan<- T
InputChan implements InputComponent
func (*Pipeline[T]) OutputChan ¶ added in v0.0.7
func (p *Pipeline[T]) OutputChan() <-chan T
OutputChan implements OutputComponent
type QueuedFanOut ¶ added in v0.1.0
type QueuedFanOut[T any] struct { // contains filtered or unexported fields }
QueuedFanOut distributes events to all registered output channels via a persistent dispatch goroutine reading from a buffered queue.
This is the recommended fan-out strategy for most use cases.
Ordering semantics — ordered pipeline:
Time ──────────────────────────────────────────────────────────────►
Sender: Send(A) ──► Send(B) ──► Send(C) ──► ... (never blocks unless queue full)
│ │ │
└──────► dispatchChan (buffered queue, default 64) ◄──┘
│
Dispatch goroutine (single, persistent):
├─ read A: deliver to out[0], out[1], out[2]
├─ read B: deliver to out[0], out[1], out[2] ← only after A is done
└─ read C: deliver to out[0], out[1], out[2]
Guarantee: A is fully delivered to ALL outputs before B begins.
Sender is decoupled — it only blocks when the queue is full.
- Sender blocks only when the dispatch queue is full (configurable,
default 64). This propagates back-pressure without silently dropping events.
- Strict FIFO: the single dispatch goroutine processes events sequentially.
Event A is delivered to ALL outputs before event B begins delivery.
- Two goroutines total (runner + dispatcher), regardless of event volume.
- A slow output blocks delivery of the current event to remaining outputs
AND delays delivery of subsequent events in the queue.
The subscriber list is captured as an immutable [outputSnapshot] on each Add/Remove. The dispatch goroutine always works from the snapshot bundled with each event, so there is no race between subscriber management and event delivery, and zero per-event allocations for the subscriber list.
When an output is removed, it is added to a concurrent "removed set" (sync.Map). The dispatch goroutine checks this set before each send and skips removed outputs. Self-owned channels are closed during Stop after the dispatch goroutine has fully exited.
Example ¶
ExampleQueuedFanOut demonstrates basic QueuedFanOut usage: create a fan-out, add output channels, send events, and receive them on all outputs in strict FIFO order.
fanout := NewQueuedFanOut[int]()
defer fanout.Stop()
NUM_CHANS := 2
NUM_MSGS := 3
var outchans []chan int
for i := 0; i < NUM_CHANS; i++ {
outchan := fanout.New(nil)
outchans = append(outchans, outchan)
}
var vals []int
for i := 0; i < NUM_MSGS; i++ {
fanout.Send(i)
}
for j := 0; j < NUM_MSGS; j++ {
for i := 0; i < NUM_CHANS; i++ {
val := <-outchans[i]
vals = append(vals, val)
}
}
sort.Ints(vals)
for _, v := range vals {
fmt.Println(v)
}
Output: 0 0 1 1 2 2
func NewQueuedFanOut ¶ added in v0.1.0
func NewQueuedFanOut[T any](opts ...any) *QueuedFanOut[T]
NewQueuedFanOut creates a QueuedFanOut that delivers events via a persistent dispatch goroutine with strict FIFO ordering. The fan-out starts running immediately.
Accepts both common FanOutOption and QueuedFanOutOption options:
- WithFanOutInputChan: use an existing input channel (caller-owned)
- WithFanOutInputBuffer: create a buffered input channel
- WithQueueSize: set the dispatch queue capacity (default 64)
Example:
fo := NewQueuedFanOut[int]() defer fo.Stop() out := fo.New(nil) fo.Send(42) val := <-out // 42
func (*QueuedFanOut) Add ¶ added in v0.1.0
func (c *QueuedFanOut) Add(output chan<- T, filter FilterFunc[T], wait bool) (callbackChan chan error)
Add registers an output channel with an optional filter. If wait is true, the returned channel receives nil once registration is complete.
func (*QueuedFanOut) ClosedChan ¶ added in v0.1.0
func (c *QueuedFanOut) ClosedChan() <-chan error
ClosedChan returns the channel used to signal when the fan-out is done.
func (*QueuedFanOut) Count ¶ added in v0.1.0
func (c *QueuedFanOut) Count() int
Count returns the number of registered output channels.
func (*QueuedFanOut[T]) DebugInfo ¶ added in v0.1.0
func (fo *QueuedFanOut[T]) DebugInfo() any
DebugInfo returns diagnostic information including queue depth, which is useful for debugging back-pressure issues and understanding dispatch state.
func (*QueuedFanOut) InputChan ¶ added in v0.1.0
func (c *QueuedFanOut) InputChan() chan<- T
InputChan returns the write-only input channel.
func (*QueuedFanOut) New ¶ added in v0.1.0
func (c *QueuedFanOut) New(filter FilterFunc[T]) chan T
New creates a new owned output channel with an optional filter. The fan-out will close this channel on Remove or Stop.
type QueuedFanOutOption ¶ added in v0.1.0
type QueuedFanOutOption[T any] func(*QueuedFanOut[T])
QueuedFanOutOption is a functional option specific to QueuedFanOut.
func WithQueueSize ¶ added in v0.1.0
func WithQueueSize[T any](size int) QueuedFanOutOption[T]
WithQueueSize sets the capacity of the dispatch queue (default 64). A larger queue allows more events to be buffered before the sender blocks, at the cost of higher memory usage.
type Reader ¶
type Reader[R any] struct { RunnerBase[string] Read ReaderFunc[R] OnDone func(r *Reader[R]) // contains filtered or unexported fields }
Reader is a typed Reader goroutine which calls a Read method to return data over a channel. It continuously calls the reader function and sends results to a channel wrapped in Message structs.
func NewReader ¶
func NewReader[R any](read ReaderFunc[R], opts ...ReaderOption[R]) *Reader[R]
NewReader creates a new reader instance with functional options. The reader function is required as the first parameter, with optional configuration via functional options.
Examples:
// Simple usage (backwards compatible)
reader := NewReader(myReaderFunc)
// With options
reader := NewReader(myReaderFunc, WithOutputBuffer[int](10))
// With multiple options
reader := NewReader(myReaderFunc,
WithOutputBuffer[int](100),
WithOnDone(func(r *Reader[int]) { log.Println("done") }))
func (*Reader[R]) ClosedChan ¶ added in v0.0.2
ClosedChan returns the channel used to signal when the reader is done.
func (*Reader[R]) OutputChan ¶ added in v0.0.7
OutputChan returns the channel on which messages can be received.
type ReaderFunc ¶
ReaderFunc is the type of the reader method used by the Reader goroutine primitive.
type ReaderOption ¶ added in v0.0.8
ReaderOption is a functional option for configuring a Reader
func WithOnDone ¶ added in v0.0.8
func WithOnDone[R any](fn func(*Reader[R])) ReaderOption[R]
WithOnDone sets the callback to be called when the reader finishes
func WithOutputBuffer ¶ added in v0.0.8
func WithOutputBuffer[R any](size int) ReaderOption[R]
WithOutputBuffer sets the buffer size for the output channel
type Reducer ¶
type Reducer[T any, C any, U any] struct { FlushPeriod time.Duration // CollectFunc adds an input to the collection and returns the updated collection. // The bool return value indicates whether a flush should be triggered immediately. CollectFunc func(collection C, inputs ...T) (C, bool) ReduceFunc func(collectedItems C) (reducedOutputs U) // contains filtered or unexported fields }
Reducer is a way to collect messages of type T in some kind of window and reduce them to type U. For example this could be used to batch messages into a list every 10 seconds. Alternatively if a time based window is not used a reduction can be invoked manually.
Example ¶
inputChan := make(chan int)
outputChan := make(chan []int)
// Create a reducer that collects integers into slices
reducer := NewIDReducer(
WithInputChan[int, []int, []int](inputChan),
WithOutputChan[int, []int](outputChan),
WithFlushPeriod[int, []int, []int](50*time.Millisecond))
defer reducer.Stop()
// Send data
go func() {
for i := range 3 {
inputChan <- i
}
}()
// After FlushPeriod, receive the collected batch
batch := <-outputChan
fmt.Println(batch)
Output: [0 1 2]
func NewReducer ¶
func NewReducer[T any, C any, U any](opts ...ReducerOption[T, C, U]) *Reducer[T, C, U]
NewReducer creates a reducer over generic input and output types. Options can be provided to configure the input channel, output channel, flush period, etc. If channels are not provided via options, the reducer will create and own them. Just like other runners, the Reducer starts as soon as it is created.
func (*Reducer[T, C, U]) ClosedChan ¶ added in v0.0.7
ClosedChan returns the channel used to signal when the reducer is done
func (*Reducer[T, C, U]) Flush ¶
func (fo *Reducer[T, C, U]) Flush()
Flush triggers an immediate flush of pending events by sending a command to the reducer goroutine. This is safe to call from any goroutine.
func (*Reducer[T, C, U]) InputChan ¶ added in v0.0.7
func (fo *Reducer[T, C, U]) InputChan() chan<- T
InputChan returns the channel onto which messages can be sent (to be reduced).
func (*Reducer[T, C, U]) IsRunning ¶ added in v0.0.7
IsRunning returns true if the reducer is still running
func (*Reducer[T, C, U]) OutputChan ¶ added in v0.0.7
func (fo *Reducer[T, C, U]) OutputChan() <-chan U
OutputChan returns the channel from which we can read "reduced" values from
type Reducer2 ¶ added in v0.0.8
Reducer2 is a simplified 2-parameter version of Reducer where the collection type C is the same as the output type (U == C). This is the most common use case.
func NewIDReducer ¶
func NewIDReducer[T any](opts ...ReducerOption2[T, []T]) *Reducer2[T, []T]
NewIDReducer2 creates a Reducer2 that simply collects events of type T into a list (of type []T).
func NewListReducer ¶ added in v0.0.5
func NewListReducer[T any](opts ...ReducerOption2[[]T, []T]) *Reducer2[[]T, []T]
A reducer that collects a list of items and concats them to a collection This allows producers to send events here in batch mode instead of 1 at a time
func NewReducer2 ¶ added in v0.0.8
func NewReducer2[T any, C any](opts ...ReducerOption2[T, C]) *Reducer2[T, C]
NewReducer2 creates a 2-parameter reducer where collection type equals output type. This is a simpler API for the common case where no type transformation is needed.
type ReducerOption ¶ added in v0.0.5
ReducerOption is a functional option for configuring a Reducer
func WithCollectFunc ¶ added in v0.0.12
func WithCollectFunc[T any, C any, U any](fn func(C, ...T) (C, bool)) ReducerOption[T, C, U]
WithCollectFunc sets the collect function for the reducer
func WithFlushPeriod ¶ added in v0.0.5
WithFlushPeriod sets the flush period for the reducer
func WithInputChan ¶ added in v0.0.5
func WithInputChan[T any, C any, U any](ch chan T) ReducerOption[T, C, U]
WithInputChan sets the input channel for the reducer
func WithOutputChan ¶ added in v0.0.5
func WithOutputChan[T any, C any, U any](ch chan U) ReducerOption[T, C, U]
WithOutputChan sets the output channel for the reducer
func WithReduceFunc ¶ added in v0.0.12
func WithReduceFunc[T any, C any, U any](fn func(C) U) ReducerOption[T, C, U]
WithReduceFunc sets the reduce function for the reducer
type ReducerOption2 ¶ added in v0.0.8
type ReducerOption2[T any, C any] = ReducerOption[T, C, C]
ReducerOption2 is a functional option for configuring a Reducer2
func WithFlushPeriod2 ¶ added in v0.0.8
func WithFlushPeriod2[T any, C any](period time.Duration) ReducerOption2[T, C]
WithFlushPeriod2 sets the flush period for a Reducer2
func WithInputChan2 ¶ added in v0.0.8
func WithInputChan2[T any, C any](ch chan T) ReducerOption2[T, C]
WithInputChan2 sets the input channel for a Reducer2
func WithOutputChan2 ¶ added in v0.0.8
func WithOutputChan2[T any, C any](ch chan C) ReducerOption2[T, C]
WithOutputChan2 sets the output channel for a Reducer2
type RunnerBase ¶
type RunnerBase[C any] struct { // contains filtered or unexported fields }
RunnerBase is the base of the Reader, Writer, Mapper, FanIn, and FanOut primitives. It provides lifecycle management (start/stop) and coordination between the owner goroutine and the worker goroutine.
Key design: controlChan is created once and never closed or nilled. The done channel is closed by cleanup() to signal that the worker goroutine has exited. This eliminates the data race between Stop() sending on controlChan and cleanup() closing it that existed in the previous mutex+close design.
func NewRunnerBase ¶
func NewRunnerBase[C any](stopVal C) RunnerBase[C]
NewRunnerBase creates a new base runner. Called by Reader, Writer, Mapper, FanIn, and FanOut constructors. The controlChan is buffered(1) to allow a single stop signal to be sent without blocking.
func (*RunnerBase[R]) DebugInfo ¶
func (r *RunnerBase[R]) DebugInfo() any
DebugInfo returns diagnostic information about the runner's state.
func (*RunnerBase[C]) Done ¶ added in v0.0.12
func (r *RunnerBase[C]) Done() <-chan struct{}
Done returns a channel that is closed when the runner's worker goroutine exits. Useful for coordinating with other goroutines that need to know when the runner has stopped (e.g., FanIn's pipeClosed callback uses this to avoid sending on controlChan after the FanIn goroutine has exited).
func (*RunnerBase[C]) IsRunning ¶
func (r *RunnerBase[C]) IsRunning() bool
IsRunning returns true if the runner's worker goroutine is active.
func (*RunnerBase[C]) Stop ¶
func (r *RunnerBase[C]) Stop() error
Stop sends a stop signal to the worker goroutine and waits for it to finish. It is safe to call Stop() concurrently, multiple times, or after the worker goroutine has already self-terminated. Only the first call that transitions isRunning from true to false will send the stop signal; subsequent calls return immediately.
type SyncFanOut ¶ added in v0.1.0
type SyncFanOut[T any] struct { // contains filtered or unexported fields }
SyncFanOut distributes events to all registered output channels synchronously within the runner goroutine.
Ordering semantics — sender waits for everyone:
Time ──────────────────────────────────────────────────────────────► Sender: Send(A) ─────────────────────────────────► Send(B) ───────────────────────► ... │ │ ├─ deliver A to out[0] (blocks if full) ├─ deliver B to out[0] ├─ deliver A to out[1] (blocks if full) ├─ deliver B to out[1] └─ deliver A to out[2] (blocks if full) └─ deliver B to out[2] Guarantee: A is fully delivered to ALL outputs before B begins. - Sender blocks until ALL outputs have received the event. - Strict FIFO: event A is delivered to every output before event B. - Zero extra goroutines — everything runs in the single runner goroutine. - A slow output blocks the sender AND all other outputs.
Use SyncFanOut when the number of outputs is small, outputs are fast, and back-pressure to the sender is desirable.
func NewSyncFanOut ¶ added in v0.1.0
func NewSyncFanOut[T any](opts ...FanOutOption[T]) *SyncFanOut[T]
NewSyncFanOut creates a SyncFanOut that delivers events to all outputs synchronously. The fan-out starts running immediately.
Options:
- WithFanOutInputChan: use an existing input channel (caller-owned)
- WithFanOutInputBuffer: create a buffered input channel
Example:
fo := NewSyncFanOut[int]() defer fo.Stop() out := fo.New(nil) fo.Send(42) val := <-out // 42
func (*SyncFanOut) Add ¶ added in v0.1.0
func (c *SyncFanOut) Add(output chan<- T, filter FilterFunc[T], wait bool) (callbackChan chan error)
Add registers an output channel with an optional filter. If wait is true, the returned channel receives nil once registration is complete.
func (*SyncFanOut) ClosedChan ¶ added in v0.1.0
func (c *SyncFanOut) ClosedChan() <-chan error
ClosedChan returns the channel used to signal when the fan-out is done.
func (*SyncFanOut) Count ¶ added in v0.1.0
func (c *SyncFanOut) Count() int
Count returns the number of registered output channels.
func (*SyncFanOut) DebugInfo ¶ added in v0.1.0
func (c *SyncFanOut) DebugInfo() any
DebugInfo returns diagnostic information about the fan-out's state.
func (*SyncFanOut) InputChan ¶ added in v0.1.0
func (c *SyncFanOut) InputChan() chan<- T
InputChan returns the write-only input channel.
func (*SyncFanOut) New ¶ added in v0.1.0
func (c *SyncFanOut) New(filter FilterFunc[T]) chan T
New creates a new owned output channel with an optional filter. The fan-out will close this channel on Remove or Stop.
type SyncMap ¶ added in v0.0.14
type SyncMap[K comparable, V any] struct { // contains filtered or unexported fields }
SyncMap is a type-safe generic wrapper around sync.Map.
It provides the same concurrent map semantics as sync.Map — optimized for read-heavy workloads with stable keys — but with compile-time type safety instead of interface{} casts.
For documentation on the underlying concurrency guarantees, see: https://pkg.go.dev/sync#Map
Usage:
var m gocurrent.SyncMap[string, *Session]
m.Store("abc", session)
if s, ok := m.Load("abc"); ok {
// s is *Session, no type assertion needed
}
func (*SyncMap[K, V]) Delete ¶ added in v0.0.14
func (m *SyncMap[K, V]) Delete(key K)
Delete deletes the value for a key.
func (*SyncMap[K, V]) Load ¶ added in v0.0.14
Load returns the value stored in the map for a key, or the zero value if no value is present. The ok result indicates whether value was found.
func (*SyncMap[K, V]) LoadAndDelete ¶ added in v0.0.14
LoadAndDelete deletes the value for a key, returning the previous value if any. The loaded result reports whether the key was present.
func (*SyncMap[K, V]) LoadOrStore ¶ added in v0.0.14
LoadOrStore returns the existing value for the key if present. Otherwise, it stores and returns the given value. The loaded result is true if the value was loaded, false if stored.
type Writer ¶
type Writer[W any] struct { RunnerBase[string] Write WriterFunc[W] // contains filtered or unexported fields }
Writer is a typed Writer goroutine type which calls the Write method when it serializes its writes. It provides a way to serialize concurrent writes through a single goroutine.
func NewWriter ¶
func NewWriter[W any](write WriterFunc[W], opts ...WriterOption[W]) *Writer[W]
NewWriter creates a new writer instance with functional options. The writer function is required as the first parameter, with optional configuration via functional options.
Examples:
// Simple usage (backwards compatible) writer := NewWriter(myWriterFunc) // With buffered input writer := NewWriter(myWriterFunc, WithInputBuffer[int](100))
func (*Writer[W]) ClosedChan ¶ added in v0.0.2
ClosedChan returns the channel used to signal when the writer is done
func (*Writer[W]) InputChan ¶ added in v0.0.7
func (wc *Writer[W]) InputChan() chan<- W
InputChan returns the channel on which messages can be sent to the Writer. The returned channel is never nil after construction. Callers should prefer Send() for safe access that handles the writer being stopped.
type WriterFunc ¶
WriterFunc is the type of the writer method used by the writer goroutine primitive to serialize its writes.
type WriterOption ¶ added in v0.0.8
WriterOption is a functional option for configuring a Writer
func WithInputBuffer ¶ added in v0.0.8
func WithInputBuffer[W any](size int) WriterOption[W]
WithInputBuffer sets the buffer size for the input channel