gocurrent

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2026 License: GPL-3.0 Imports: 8 Imported by: 4

README

gocurrent

Go Reference Go Report Card

A Go library providing utilities for common concurrency patterns with customizable behavior. This package implements several concurrency primitives inspired by Rob Pike's concurrency patterns from his talk "Go Concurrency Patterns".

Installation

go get github.com/panyam/gocurrent

Quick Start

import "github.com/panyam/gocurrent"

// Create a reader that generates numbers
reader := gocurrent.NewReader(func() (int, error) {
    return rand.Intn(100), nil
})
defer reader.Stop()

// Read from the channel
for msg := range reader.OutputChan() {
    if msg.Error != nil {
        log.Printf("Error: %v", msg.Error)
        continue
    }
    fmt.Printf("Received: %d\n", msg.Value)
}

Components

Reader

A goroutine wrapper that continuously calls a reader function and sends results to a channel.

// Create a reader that reads from a data source
reader := gocurrent.NewReader(func() (string, error) {
    // Your data reading logic here
    return "data", nil
})
defer reader.Stop()

// Monitor for reader completion or errors
go func() {
    select {
    case err := <-reader.ClosedChan():
        if err != nil {
            log.Printf("Reader terminated with error: %v", err)
        } else {
            log.Println("Reader completed successfully")
        }
    }
}()

// Process messages
for msg := range reader.OutputChan() {
    if msg.Error != nil {
        log.Printf("Read error: %v", msg.Error)
        break
    }
    fmt.Printf("Read: %s\n", msg.Value)
}
Writer

A goroutine for serializing writes using a writer callback method.

// Create a writer that processes data
writer := gocurrent.NewWriter(func(data string) error {
    // Your data writing logic here
    fmt.Printf("Writing: %s\n", data)
    return nil
})
defer writer.Stop()

// Monitor for writer completion or errors
go func() {
    select {
    case err := <-writer.ClosedChan():
        if err != nil {
            log.Printf("Writer terminated with error: %v", err)
        } else {
            log.Println("Writer completed successfully")
        }
    }
}()

// Send data to writer
writer.Send("Hello")
writer.Send("World")
Mapper

Transform and/or filter data between channels.

inputChan := make(chan int, 10)
outputChan := make(chan string, 10)

// Create a mapper that converts integers to strings
mapper := gocurrent.NewMapper(inputChan, outputChan, func(i int) (string, bool, bool) {
    // Return: (output, skip, stop)
    return fmt.Sprintf("Number: %d", i), false, false
})
defer mapper.Stop()

// Send data
inputChan <- 42
inputChan <- 100

// Read transformed data
result := <-outputChan // "Number: 42"
Reducer

Collect and reduce values from an input channel with configurable time windows. The Reducer has three type parameters:

  • T - the input event type
  • C - the intermediate collection type (where events are batched)
  • U - the output type after reduction

Reducers use functional options for configuration:

// Simple case: Use NewIDReducer to collect events into a slice
// With all defaults (creates its own channels, 100ms flush period)
reducer := gocurrent.NewIDReducer[int]()
defer reducer.Stop()

// With custom configuration
inputChan := make(chan int, 10)
outputChan := make(chan []int, 10)
reducer := gocurrent.NewIDReducer[int](
    gocurrent.WithInputChan[int, []int, []int](inputChan),
    gocurrent.WithOutputChan[int, []int, []int](outputChan),
    gocurrent.WithFlushPeriod[int, []int, []int](100 * time.Millisecond))
defer reducer.Stop()

// Send data
for i := 0; i < 5; i++ {
    reducer.Send(i)
}

// After FlushPeriod, receive the collected batch
batch := <-outputChan // []int{0, 1, 2, 3, 4}

For custom collection and reduction logic, use NewReducer directly:

// Custom reducer: collect strings into a map, reduce to a summary
type WordCount map[string]int

inputChan := make(chan string, 10)
outputChan := make(chan string, 10)
reducer := gocurrent.NewReducer[string, WordCount, string](
    gocurrent.WithInputChan[string, WordCount, string](inputChan),
    gocurrent.WithOutputChan[string, WordCount, string](outputChan),
    gocurrent.WithFlushPeriod[string, WordCount, string](100 * time.Millisecond))
reducer.CollectFunc = func(word string, counts WordCount) (WordCount, bool) {
    if counts == nil {
        counts = make(WordCount)
    }
    counts[word]++
    return counts, false // Return true to trigger immediate flush
}
reducer.ReduceFunc = func(counts WordCount) string {
    return fmt.Sprintf("Counted %d unique words", len(counts))
}
Custom Flush Triggers

The CollectFunc can signal when to flush by returning true as the second return value. This enables custom flush criteria beyond time-based flushing:

// Length-based flush: flush when collection reaches 100 items
reducer.CollectFunc = func(input int, collection []int) ([]int, bool) {
    newCollection := append(collection, input)
    shouldFlush := len(newCollection) >= 100
    return newCollection, shouldFlush
}

// Custom criteria: flush when sum exceeds threshold
reducer.CollectFunc = func(input int, sum int) (int, bool) {
    newSum := sum + input
    shouldFlush := newSum > 1000
    return newSum, shouldFlush
}

// Manual flush is also available
reducer.Flush()
Pipe

Connect a reader and writer channel with identity transform.

inputChan := make(chan string, 10)
outputChan := make(chan string, 10)

// Create a pipe (identity mapper)
pipe := gocurrent.NewPipe(inputChan, outputChan)
defer pipe.Stop()

inputChan <- "hello"
result := <-outputChan // "hello"
FanIn

Merge multiple input channels into a single output channel.

// Create input channels
chan1 := make(chan int, 10)
chan2 := make(chan int, 10)
chan3 := make(chan int, 10)

// Create fan-in
fanIn := gocurrent.NewFanIn[int](nil)
defer fanIn.Stop()

// Add input channels
fanIn.Add(chan1, chan2, chan3)

// Send data to different channels
chan1 <- 1
chan2 <- 2
chan3 <- 3

// Read merged output
for i := 0; i < 3; i++ {
    result := <-fanIn.OutputChan()
    fmt.Printf("Received: %d\n", result)
}
FanOut

Distribute messages from one channel to multiple output channels. Three dispatch strategies are available, each implementing the FanOuter[T] interface:

Type Sender blocks? FIFO ordering? Goroutines
QueuedFanOut (recommended) No (until queue full) Strict 2 total (bounded)
SyncFanOut Yes (all outputs) Strict 0 extra
AsyncFanOut No None N per event
// QueuedFanOut (recommended) — non-blocking sender, strict FIFO
fo := gocurrent.NewQueuedFanOut[string]()
defer fo.Stop()

out1 := fo.New(nil) // No filter
out2 := fo.New(func(s *string) *string {
    if len(*s) > 5 { return s }
    return nil
})

fo.Send("hello")       // Goes to out1 only (filtered from out2)
fo.Send("hello world") // Goes to both out1 and out2

// SyncFanOut — sender blocks until all outputs receive each event
syncFo := gocurrent.NewSyncFanOut[int]()

// AsyncFanOut — fire-and-forget, no ordering guarantee
asyncFo := gocurrent.NewAsyncFanOut[int]()
Migration from FanOut[T]

The old FanOut[T] type and NewFanOut() constructor have been removed. Replace with the explicit type that matches your use case:

Old code New code
NewFanOut[T]() (default async) NewQueuedFanOut[T]()
NewFanOut[T](WithFanOutSendSync[T](true)) NewSyncFanOut[T]()
*FanOut[T] type FanOuter[T] interface or concrete type
SyncMap

A type-safe generic wrapper around sync.Map, providing the same concurrent map semantics optimized for read-heavy workloads but with compile-time type safety.

// Usable without initialization (zero value is ready to use)
var m gocurrent.SyncMap[string, int]

// Basic operations — same API as sync.Map, just typed
m.Store("key1", 42)
value, ok := m.Load("key1")
if ok {
    fmt.Printf("Value: %d\n", value)
}

// Atomic load-and-delete
prev, loaded := m.LoadAndDelete("key1")

// Iterate all entries
m.Range(func(k string, v int) bool {
    fmt.Printf("%s = %d\n", k, v)
    return true // return false to stop
})

Features

  • Type Safety: All components are fully generic and type-safe
  • Resource Management: Proper cleanup and lifecycle management
  • Composability: Components can be easily combined
  • Customizable: Configurable behavior and filtering
  • Thread Safety: Built-in synchronization where needed
  • Error Handling: Comprehensive error propagation with completion signaling
  • Monitoring: Built-in channels for monitoring goroutine completion and errors

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the GPL License.

References

Documentation

Overview

Package gocurrent provides utilities for common Go concurrency patterns.

This package implements several concurrency primitives inspired by Rob Pike's concurrency patterns from his talk "Go Concurrency Patterns" (https://go.dev/talks/2012/concurrency.slide).

The main components include:

  • Reader: A goroutine wrapper that continuously calls a reader function and sends results to a channel, with error signaling via ClosedChan()
  • Writer: A goroutine for serializing writes using a writer callback, with error signaling via ClosedChan()
  • Mapper: Transform and/or filter data between channels
  • Reducer: Collect and reduce N values from an input channel with configurable time windows
  • Pipe: Connect a reader and writer channel with identity transform
  • FanIn: Merge multiple input channels into a single output channel
  • FanOut: Distribute messages from one channel to multiple output channels. Three dispatch strategies available via SyncFanOut, AsyncFanOut, and QueuedFanOut (recommended), each with different ordering/blocking trade-offs. See the FanOuter interface for the common API.
  • SyncMap: A type-safe generic wrapper around sync.Map

All concurrency primitives are designed to be composable and provide fine-grained control over goroutine lifecycles, resource management, and error monitoring through completion signaling channels.

Index

Examples

Constants

View Source
const DefaultQueueSize = 64

DefaultQueueSize is the default capacity of the dispatch queue used by QueuedFanOut. The queue acts as a buffer between the runner goroutine (which reads events from the input channel) and the dispatch goroutine (which delivers events to outputs). When the queue is full, the runner blocks — propagating back-pressure to the sender.

Variables

This section is empty.

Functions

func IDFunc

func IDFunc[T any](input T) T

IDFunc is an identity function that returns its input unchanged. It's commonly used as a default mapper function for pipes and other operations.

Types

type AsyncFanOut added in v0.1.0

type AsyncFanOut[T any] struct {
	// contains filtered or unexported fields
}

AsyncFanOut distributes events to all registered output channels by spawning a separate goroutine for each output on every event.

Ordering semantics — fire and forget per output:

	Time ──────────────────────────────────────────────────────────────►

	Sender:  Send(A) ──► Send(B) ──► ...
	             │           │
	             │           ├─ goroutine: B → out[0]  ← may arrive BEFORE A!
	             │           ├─ goroutine: B → out[1]
	             │           └─ goroutine: B → out[2]
	             │
	             ├─ goroutine: A → out[0]  ← may arrive AFTER B!
	             ├─ goroutine: A → out[1]
	             └─ goroutine: A → out[2]

	NO guarantee: A and B goroutines race. B can arrive before A on any output.

  - Sender never blocks (goroutines are spawned immediately).
  - NO ordering guarantee: goroutines for event B may execute before
    goroutines for event A. The Go scheduler does not guarantee FIFO
    ordering of goroutine execution.
  - Goroutine count: N (outputs) per event. Can explode under high
    throughput with many outputs.
  - A slow output does NOT affect delivery to other outputs.

Use AsyncFanOut only when event ordering does not matter and per-output independence is more important than resource usage. In most cases, QueuedFanOut is a better choice.

func NewAsyncFanOut added in v0.1.0

func NewAsyncFanOut[T any](opts ...FanOutOption[T]) *AsyncFanOut[T]

NewAsyncFanOut creates an AsyncFanOut that spawns a goroutine per output per event. The fan-out starts running immediately.

Options:

Example:

fo := NewAsyncFanOut[int]()
defer fo.Stop()
out := fo.New(nil)
fo.Send(42)
val := <-out // 42

func (*AsyncFanOut) Add added in v0.1.0

func (c *AsyncFanOut) Add(output chan<- T, filter FilterFunc[T], wait bool) (callbackChan chan error)

Add registers an output channel with an optional filter. If wait is true, the returned channel receives nil once registration is complete.

func (*AsyncFanOut) ClosedChan added in v0.1.0

func (c *AsyncFanOut) ClosedChan() <-chan error

ClosedChan returns the channel used to signal when the fan-out is done.

func (*AsyncFanOut) Count added in v0.1.0

func (c *AsyncFanOut) Count() int

Count returns the number of registered output channels.

func (*AsyncFanOut) DebugInfo added in v0.1.0

func (c *AsyncFanOut) DebugInfo() any

DebugInfo returns diagnostic information about the fan-out's state.

func (*AsyncFanOut) InputChan added in v0.1.0

func (c *AsyncFanOut) InputChan() chan<- T

InputChan returns the write-only input channel.

func (*AsyncFanOut) New added in v0.1.0

func (c *AsyncFanOut) New(filter FilterFunc[T]) chan T

New creates a new owned output channel with an optional filter. The fan-out will close this channel on Remove or Stop.

func (*AsyncFanOut) Remove added in v0.1.0

func (c *AsyncFanOut) Remove(output chan<- T, wait bool) (callbackChan chan error)

Remove unregisters an output channel. If the channel was created by New, it is also closed.

func (*AsyncFanOut) Send added in v0.1.0

func (c *AsyncFanOut) Send(value T)

Send writes a value to the input channel for fan-out distribution.

type Block added in v0.0.7

type Block struct {
	// contains filtered or unexported fields
}

Block represents a composite component made up of multiple connected primitives. A Block itself acts as a component and can be nested within other Blocks.

func NewBlock added in v0.0.7

func NewBlock(name string) *Block

NewBlock creates a new block with the given name

func (*Block) Add added in v0.0.7

func (b *Block) Add(component Component)

Add adds a component to this block

func (*Block) Count added in v0.0.7

func (b *Block) Count() int

Count returns the number of components in this block

func (*Block) IsRunning added in v0.0.7

func (b *Block) IsRunning() bool

IsRunning returns true if any component in the block is running

func (*Block) Name added in v0.0.7

func (b *Block) Name() string

Name returns the block's name

func (*Block) Stop added in v0.0.7

func (b *Block) Stop() error

Stop stops all components in this block in reverse order

type Broadcast added in v0.0.7

type Broadcast[T any] struct {
	*Block
	// contains filtered or unexported fields
}

Example: Broadcast pattern - one input, multiple outputs. Uses QueuedFanOut for strict FIFO ordering with non-blocking sends.

func NewBroadcast added in v0.0.7

func NewBroadcast[T any](name string) *Broadcast[T]

NewBroadcast creates a broadcast block using QueuedFanOut

func (*Broadcast[T]) AddOutput added in v0.0.7

func (b *Broadcast[T]) AddOutput(filter FilterFunc[T]) chan T

AddOutput adds a new output channel to the broadcast

func (*Broadcast[T]) InputChan added in v0.0.7

func (b *Broadcast[T]) InputChan() chan<- T

InputChan implements InputComponent

func (*Broadcast[T]) Send added in v0.0.7

func (b *Broadcast[T]) Send(value T)

Send implements InputComponent

type Component added in v0.0.7

type Component interface {
	// Stop stops the component and cleans up resources
	Stop() error

	// IsRunning returns true if the component is currently running
	IsRunning() bool
}

Component represents any building block that can be part of a Block. All gocurrent primitives can implement this interface.

type FanIn

type FanIn[T any] struct {
	RunnerBase[fanInCmd[T]]
	// OnChannelRemoved is called when a channel is removed so the caller can
	// perform other cleanups etc based on this
	OnChannelRemoved func(fi *FanIn[T], inchan <-chan T)
	// contains filtered or unexported fields
}

FanIn merges multiple input channels into a single output channel. It implements the fan-in concurrency pattern where messages from multiple sources are combined into one stream.

Example
// Create 5 input channels and send 5 numbers into them
// the collector channel
fanin := NewFanIn[int]()
defer fanin.Stop()

NUM_CHANS := 2
NUM_MSGS := 3

var inchans []chan int
for i := 0; i < NUM_CHANS; i++ {
	inchan := make(chan int)
	inchans = append(inchans, inchan)
	fanin.Add(inchan)
}

for i := 0; i < NUM_CHANS; i++ {
	go func(inchan chan int) {
		// send some  numbers into this fanin
		for j := 0; j < NUM_MSGS; j++ {
			inchan <- j
		}
	}(inchans[i])
}

// collect the fanned values
var vals []int
for i := 0; i < NUM_CHANS*NUM_MSGS; i++ {
	val := <-fanin.OutputChan()
	vals = append(vals, val)
}

// sort and print them for testing
sort.Ints(vals)

for _, v := range vals {
	fmt.Println(v)
}
Output:
0
0
1
1
2
2

func NewFanIn

func NewFanIn[T any](opts ...FanInOption[T]) *FanIn[T]

NewFanIn creates a new FanIn that merges multiple input channels with functional options. By default, creates and owns an unbuffered output channel. Use options to customize. The FanIn starts running immediately upon creation.

Examples:

// Simple usage with owned channel (backwards compatible)
fanin := NewFanIn[int]()

// With existing channel (backwards compatible)
outChan := make(chan int, 10)
fanin := NewFanIn(WithFanInOutputChan(outChan))

// With buffered output
fanin := NewFanIn[int](WithFanInOutputBuffer[int](100))

func (*FanIn[T]) Add

func (fi *FanIn[T]) Add(inputs ...<-chan T)

Add adds one or more input channels to the FanIn. Messages from these channels will be merged into the output channel. Panics if any input channel is nil.

func (*FanIn[T]) ClosedChan added in v0.0.7

func (fi *FanIn[T]) ClosedChan() <-chan error

ClosedChan returns the channel used to signal when the fan-in is done

func (*FanIn[T]) Count

func (fi *FanIn[T]) Count() int

Count returns the number of input channels currently being monitored.

func (*FanIn[T]) OutputChan added in v0.0.7

func (fi *FanIn[T]) OutputChan() <-chan T

OutputChan returns the channel on which merged output can be received.

func (*FanIn[T]) Remove

func (fi *FanIn[T]) Remove(target <-chan T)

Remove removes an input channel from the FanIn's monitor list. The channel will no longer contribute to the merged output.

type FanInOption added in v0.0.8

type FanInOption[T any] func(*FanIn[T])

FanInOption is a functional option for configuring a FanIn

func WithFanInOnChannelRemoved added in v0.0.8

func WithFanInOnChannelRemoved[T any](fn func(*FanIn[T], <-chan T)) FanInOption[T]

WithFanInOnChannelRemoved sets the callback for when a channel is removed

func WithFanInOutputBuffer added in v0.0.8

func WithFanInOutputBuffer[T any](size int) FanInOption[T]

WithFanInOutputBuffer creates a buffered output channel for the FanIn

func WithFanInOutputChan added in v0.0.8

func WithFanInOutputChan[T any](ch chan T) FanInOption[T]

WithFanInOutputChan sets the output channel for the FanIn

type FanOutOption added in v0.0.8

type FanOutOption[T any] func(*fanOutCore[T])

FanOutOption is a functional option for configuring any fan-out type.

func WithFanOutInputBuffer added in v0.0.8

func WithFanOutInputBuffer[T any](size int) FanOutOption[T]

WithFanOutInputBuffer creates a buffered input channel of the given size. The fan-out owns and will close this channel on Stop.

func WithFanOutInputChan added in v0.0.8

func WithFanOutInputChan[T any](ch chan T) FanOutOption[T]

WithFanOutInputChan sets the input channel. The fan-out will NOT close this channel on Stop (caller retains ownership).

type FanOuter added in v0.1.0

type FanOuter[T any] interface {
	Component

	// Send delivers a value to the fan-out for distribution to all outputs.
	// Blocking behavior depends on the concrete implementation.
	Send(value T)

	// InputChan returns the write-only input channel. Callers may send
	// directly on this channel instead of calling Send.
	InputChan() chan<- T

	// Add registers an existing output channel with an optional per-channel
	// filter. If wait is true, the returned channel receives nil once the
	// registration is complete; otherwise the returned channel is nil.
	Add(output chan<- T, filter FilterFunc[T], wait bool) (callbackChan chan error)

	// New creates a new output channel owned by the fan-out (closed on Stop)
	// with an optional filter. The call blocks until registration is complete.
	New(filter FilterFunc[T]) chan T

	// Remove unregisters an output channel. If the channel was created by New,
	// it is also closed. If wait is true, the returned channel receives nil
	// once the removal is complete.
	Remove(output chan<- T, wait bool) (callbackChan chan error)

	// Count returns the current number of registered output channels.
	Count() int

	// ClosedChan returns a channel that receives nil (or an error) when the
	// fan-out has fully shut down.
	ClosedChan() <-chan error
}

FanOuter is the interface satisfied by all fan-out dispatch strategies.

Three concrete implementations are provided, each with different trade-offs between sender blocking, event ordering, and goroutine usage:

  • SyncFanOut: Sender blocks until all outputs receive the event. Strict FIFO ordering. Zero extra goroutines.
  • AsyncFanOut: Sender never blocks (one goroutine per output per event). No ordering guarantee. Goroutine count can explode.
  • QueuedFanOut: Sender blocks only when the dispatch queue is full. Strict FIFO ordering via a persistent dispatch goroutine. Two goroutines total (runner + dispatcher). Recommended default.

Ordering semantics summary:

| Type           | Sender blocks?        | FIFO ordering? | Goroutines        |
|----------------|-----------------------|----------------|-------------------|
| SyncFanOut     | Yes (all outputs)     | Strict         | 0 extra           |
| AsyncFanOut    | No                    | None           | N per event       |
| QueuedFanOut   | No (until queue full) | Strict         | 2 total (bounded) |

type FilterFunc

type FilterFunc[T any] func(*T) *T

FilterFunc is an optional per-output transformation/filtering function. It receives a pointer to the event and returns a pointer to the (possibly modified) event. Return nil to skip delivery to this output.

type IdleTimer added in v0.0.15

type IdleTimer struct {
	// contains filtered or unexported fields
}

IdleTimer implements a ref-counted idle timeout. The timer starts counting when all activity ceases (active count drops to zero) and pauses while any activity is in progress.

This is the "activity-gated idle timeout" pattern used for session/connection idle cleanup — any service managing sessions, connections, or pooled resources with automatic expiry needs this exact primitive.

Usage:

timer := NewIdleTimer(30*time.Second, func() { removeSession(id) })
timer.Acquire()       // request starts — pause idle timer
defer timer.Release() // request ends — restart timer if no more activity
timer.Stop()          // explicit shutdown — cancel timer permanently

All methods are nil-safe: calling Acquire/Release/Stop on a nil *IdleTimer is a no-op. This enables callers to use *IdleTimer as a struct field without nil checks at every call site.

func NewIdleTimer added in v0.0.15

func NewIdleTimer(timeout time.Duration, onExpire func()) *IdleTimer

NewIdleTimer creates an IdleTimer that calls onExpire when the timeout elapses with no activity. The timer starts immediately — call Acquire() to pause it.

If timeout is 0, returns a no-op timer: Acquire/Release/Stop do nothing and onExpire is never called. This is the default for "no timeout configured."

func (*IdleTimer) Acquire added in v0.0.15

func (t *IdleTimer) Acquire()

Acquire marks the start of an activity (e.g., an in-flight request). Stops the idle timer so it cannot fire while the activity is in progress. Must be paired with a corresponding Release().

Safe to call on a nil receiver (no-op).

func (*IdleTimer) Release added in v0.0.15

func (t *IdleTimer) Release()

Release marks the end of an activity. If the active count drops to zero and the timer has not been stopped, the idle timer is restarted with the full timeout duration.

Safe to call on a nil receiver (no-op).

func (*IdleTimer) Stop added in v0.0.15

func (t *IdleTimer) Stop()

Stop permanently cancels the idle timer. After Stop(), onExpire will never be called. Safe to call multiple times. Safe to call on a nil receiver.

type InputComponent added in v0.0.7

type InputComponent[T any] interface {
	Component

	// InputChan returns the channel for sending input to this component
	InputChan() chan<- T

	// Send is a convenience method for sending to the input channel
	Send(T)
}

InputComponent represents a component with an input channel

type Mapper

type Mapper[I any, O any] struct {
	RunnerBase[string]

	// MapFunc is applied to each value in the input channel
	// and returns a tuple of 3 things - outval, skip, stop
	// if skip is false, outval is sent to the output channel
	// if stop is true, then the entire mapper stops processing any further elements.
	// This mechanism can be used inaddition to the Stop method if sequencing this
	// within the elements of input channel is required
	MapFunc func(I) (O, bool, bool)
	OnDone  func(p *Mapper[I, O])
	// contains filtered or unexported fields
}

Mapper connects an input and output channel applying transforms between them. It reads from the input channel, applies a transformation function, and writes the result to the output channel.

func Connect added in v0.0.7

func Connect[T any](from OutputComponent[T], to InputComponent[T]) *Mapper[T, T]

Connect connects the output of one component to the input of another using a Pipe. Returns the pipe so it can be managed if needed.

func ConnectWith added in v0.0.7

func ConnectWith[I, O any](from OutputComponent[I], to InputComponent[O],
	mapper func(I) (O, bool, bool)) *Mapper[I, O]

ConnectWith connects two components using a custom mapper function

func NewMapper

func NewMapper[T any, U any](input <-chan T, output chan<- U, mapper func(T) (U, bool, bool), opts ...MapperOption[T, U]) *Mapper[T, U]

NewMapper creates a new mapper between an input and output channel with functional options. The ownership of the channels is by the caller and not the Mapper, so they will not be closed when the mapper stops. The mapper function returns (output, skip, stop) where: - output: the transformed value - skip: if true, the output is not sent to the output channel - stop: if true, the mapper stops processing further elements

Examples:

// Simple usage (backwards compatible)
mapper := NewMapper(inChan, outChan, myMapperFunc)

// With OnDone callback
mapper := NewMapper(inChan, outChan, myMapperFunc,
    WithMapperOnDone(func(m *Mapper[int, string]) {
        log.Println("mapper done")
    }))

func NewPipe

func NewPipe[T any](input <-chan T, output chan<- T) *Mapper[T, T]

NewPipe creates a new pipe that connects an input and output channel. A pipe is a mapper with the identity function, so it simply forwards all values from input to output without transformation.

func (*Mapper[I, O]) ClosedChan added in v0.0.7

func (m *Mapper[I, O]) ClosedChan() <-chan error

ClosedChan returns the channel used to signal when the mapper is done

func (*Mapper[I, O]) InputChan added in v0.0.7

func (m *Mapper[I, O]) InputChan() chan<- I

InputChan returns the input channel (exposes the private field for Component interface)

func (*Mapper[I, O]) OutputChan added in v0.0.7

func (m *Mapper[I, O]) OutputChan() <-chan O

OutputChan returns the output channel (exposes the private field for Component interface)

type MapperOption added in v0.0.8

type MapperOption[I, O any] func(*Mapper[I, O])

MapperOption is a functional option for configuring a Mapper

func WithMapperOnDone added in v0.0.8

func WithMapperOnDone[I, O any](fn func(*Mapper[I, O])) MapperOption[I, O]

WithMapperOnDone sets the callback to be called when the mapper finishes

type Merge added in v0.0.7

type Merge[T any] struct {
	*Block
	// contains filtered or unexported fields
}

Example: Merge pattern - multiple inputs, one output

func NewMerge added in v0.0.7

func NewMerge[T any](name string) *Merge[T]

NewMerge creates a merge block using FanIn

func (*Merge[T]) AddInput added in v0.0.7

func (m *Merge[T]) AddInput(input <-chan T)

AddInput adds a new input channel to the merge

func (*Merge[T]) OutputChan added in v0.0.7

func (m *Merge[T]) OutputChan() <-chan T

OutputChan implements OutputComponent

type Message

type Message[T any] struct {
	Value  T     // The actual value being transmitted
	Error  error // Any error that occurred during processing
	Source any   // Optional source information for debugging
}

Message represents a value with optional error and source information. It's used by channels to carry both successful values and error conditions.

type OutputComponent added in v0.0.7

type OutputComponent[T any] interface {
	Component

	// OutputChan returns the channel for receiving output from this component
	OutputChan() <-chan T
}

OutputComponent represents a component with an output channel

type Pipeline added in v0.0.7

type Pipeline[T any] struct {
	*Block
	// contains filtered or unexported fields
}

Pipeline creates a linear sequence of components connected by pipes

func NewPipeline added in v0.0.7

func NewPipeline[T any](name string) *Pipeline[T]

NewPipeline creates a new pipeline block

func (*Pipeline[T]) InputChan added in v0.0.7

func (p *Pipeline[T]) InputChan() chan<- T

InputChan implements InputComponent

func (*Pipeline[T]) OutputChan added in v0.0.7

func (p *Pipeline[T]) OutputChan() <-chan T

OutputChan implements OutputComponent

func (*Pipeline[T]) Send added in v0.0.7

func (p *Pipeline[T]) Send(value T)

Send implements InputComponent

type QueuedFanOut added in v0.1.0

type QueuedFanOut[T any] struct {
	// contains filtered or unexported fields
}

QueuedFanOut distributes events to all registered output channels via a persistent dispatch goroutine reading from a buffered queue.

This is the recommended fan-out strategy for most use cases.

Ordering semantics — ordered pipeline:

	Time ──────────────────────────────────────────────────────────────►

	Sender:  Send(A) ──► Send(B) ──► Send(C) ──►  ...   (never blocks unless queue full)
	             │           │           │
	             └──────► dispatchChan (buffered queue, default 64) ◄──┘
	                           │
	                      Dispatch goroutine (single, persistent):
	                           ├─ read A: deliver to out[0], out[1], out[2]
	                           ├─ read B: deliver to out[0], out[1], out[2]  ← only after A is done
	                           └─ read C: deliver to out[0], out[1], out[2]

	Guarantee: A is fully delivered to ALL outputs before B begins.
	           Sender is decoupled — it only blocks when the queue is full.

  - Sender blocks only when the dispatch queue is full (configurable,
    default 64). This propagates back-pressure without silently dropping events.
  - Strict FIFO: the single dispatch goroutine processes events sequentially.
    Event A is delivered to ALL outputs before event B begins delivery.
  - Two goroutines total (runner + dispatcher), regardless of event volume.
  - A slow output blocks delivery of the current event to remaining outputs
    AND delays delivery of subsequent events in the queue.

The subscriber list is captured as an immutable [outputSnapshot] on each Add/Remove. The dispatch goroutine always works from the snapshot bundled with each event, so there is no race between subscriber management and event delivery, and zero per-event allocations for the subscriber list.

When an output is removed, it is added to a concurrent "removed set" (sync.Map). The dispatch goroutine checks this set before each send and skips removed outputs. Self-owned channels are closed during Stop after the dispatch goroutine has fully exited.

Example

ExampleQueuedFanOut demonstrates basic QueuedFanOut usage: create a fan-out, add output channels, send events, and receive them on all outputs in strict FIFO order.

fanout := NewQueuedFanOut[int]()
defer fanout.Stop()

NUM_CHANS := 2
NUM_MSGS := 3

var outchans []chan int
for i := 0; i < NUM_CHANS; i++ {
	outchan := fanout.New(nil)
	outchans = append(outchans, outchan)
}

var vals []int

for i := 0; i < NUM_MSGS; i++ {
	fanout.Send(i)
}

for j := 0; j < NUM_MSGS; j++ {
	for i := 0; i < NUM_CHANS; i++ {
		val := <-outchans[i]
		vals = append(vals, val)
	}
}

sort.Ints(vals)
for _, v := range vals {
	fmt.Println(v)
}
Output:
0
0
1
1
2
2

func NewQueuedFanOut added in v0.1.0

func NewQueuedFanOut[T any](opts ...any) *QueuedFanOut[T]

NewQueuedFanOut creates a QueuedFanOut that delivers events via a persistent dispatch goroutine with strict FIFO ordering. The fan-out starts running immediately.

Accepts both common FanOutOption and QueuedFanOutOption options:

Example:

fo := NewQueuedFanOut[int]()
defer fo.Stop()
out := fo.New(nil)
fo.Send(42)
val := <-out // 42

func (*QueuedFanOut) Add added in v0.1.0

func (c *QueuedFanOut) Add(output chan<- T, filter FilterFunc[T], wait bool) (callbackChan chan error)

Add registers an output channel with an optional filter. If wait is true, the returned channel receives nil once registration is complete.

func (*QueuedFanOut) ClosedChan added in v0.1.0

func (c *QueuedFanOut) ClosedChan() <-chan error

ClosedChan returns the channel used to signal when the fan-out is done.

func (*QueuedFanOut) Count added in v0.1.0

func (c *QueuedFanOut) Count() int

Count returns the number of registered output channels.

func (*QueuedFanOut[T]) DebugInfo added in v0.1.0

func (fo *QueuedFanOut[T]) DebugInfo() any

DebugInfo returns diagnostic information including queue depth, which is useful for debugging back-pressure issues and understanding dispatch state.

func (*QueuedFanOut) InputChan added in v0.1.0

func (c *QueuedFanOut) InputChan() chan<- T

InputChan returns the write-only input channel.

func (*QueuedFanOut) New added in v0.1.0

func (c *QueuedFanOut) New(filter FilterFunc[T]) chan T

New creates a new owned output channel with an optional filter. The fan-out will close this channel on Remove or Stop.

func (*QueuedFanOut) Remove added in v0.1.0

func (c *QueuedFanOut) Remove(output chan<- T, wait bool) (callbackChan chan error)

Remove unregisters an output channel. If the channel was created by New, it is also closed.

func (*QueuedFanOut) Send added in v0.1.0

func (c *QueuedFanOut) Send(value T)

Send writes a value to the input channel for fan-out distribution.

type QueuedFanOutOption added in v0.1.0

type QueuedFanOutOption[T any] func(*QueuedFanOut[T])

QueuedFanOutOption is a functional option specific to QueuedFanOut.

func WithQueueSize added in v0.1.0

func WithQueueSize[T any](size int) QueuedFanOutOption[T]

WithQueueSize sets the capacity of the dispatch queue (default 64). A larger queue allows more events to be buffered before the sender blocks, at the cost of higher memory usage.

type Reader

type Reader[R any] struct {
	RunnerBase[string]

	Read ReaderFunc[R]

	OnDone func(r *Reader[R])
	// contains filtered or unexported fields
}

Reader is a typed Reader goroutine which calls a Read method to return data over a channel. It continuously calls the reader function and sends results to a channel wrapped in Message structs.

func NewReader

func NewReader[R any](read ReaderFunc[R], opts ...ReaderOption[R]) *Reader[R]

NewReader creates a new reader instance with functional options. The reader function is required as the first parameter, with optional configuration via functional options.

Examples:

// Simple usage (backwards compatible)
reader := NewReader(myReaderFunc)

// With options
reader := NewReader(myReaderFunc, WithOutputBuffer[int](10))

// With multiple options
reader := NewReader(myReaderFunc,
    WithOutputBuffer[int](100),
    WithOnDone(func(r *Reader[int]) { log.Println("done") }))

func (*Reader[R]) ClosedChan added in v0.0.2

func (rc *Reader[R]) ClosedChan() <-chan error

ClosedChan returns the channel used to signal when the reader is done.

func (*Reader[R]) DebugInfo

func (r *Reader[R]) DebugInfo() any

func (*Reader[R]) OutputChan added in v0.0.7

func (rc *Reader[R]) OutputChan() <-chan Message[R]

OutputChan returns the channel on which messages can be received.

type ReaderFunc

type ReaderFunc[R any] func() (msg R, err error)

ReaderFunc is the type of the reader method used by the Reader goroutine primitive.

type ReaderOption added in v0.0.8

type ReaderOption[R any] func(*Reader[R])

ReaderOption is a functional option for configuring a Reader

func WithOnDone added in v0.0.8

func WithOnDone[R any](fn func(*Reader[R])) ReaderOption[R]

WithOnDone sets the callback to be called when the reader finishes

func WithOutputBuffer added in v0.0.8

func WithOutputBuffer[R any](size int) ReaderOption[R]

WithOutputBuffer sets the buffer size for the output channel

type Reducer

type Reducer[T any, C any, U any] struct {
	FlushPeriod time.Duration
	// CollectFunc adds an input to the collection and returns the updated collection.
	// The bool return value indicates whether a flush should be triggered immediately.
	CollectFunc func(collection C, inputs ...T) (C, bool)
	ReduceFunc  func(collectedItems C) (reducedOutputs U)
	// contains filtered or unexported fields
}

Reducer is a way to collect messages of type T in some kind of window and reduce them to type U. For example this could be used to batch messages into a list every 10 seconds. Alternatively if a time based window is not used a reduction can be invoked manually.

Example
inputChan := make(chan int)
outputChan := make(chan []int)

// Create a reducer that collects integers into slices
reducer := NewIDReducer(
	WithInputChan[int, []int, []int](inputChan),
	WithOutputChan[int, []int](outputChan),
	WithFlushPeriod[int, []int, []int](50*time.Millisecond))
defer reducer.Stop()

// Send data
go func() {
	for i := range 3 {
		inputChan <- i
	}
}()

// After FlushPeriod, receive the collected batch
batch := <-outputChan
fmt.Println(batch)
Output:
[0 1 2]

func NewReducer

func NewReducer[T any, C any, U any](opts ...ReducerOption[T, C, U]) *Reducer[T, C, U]

NewReducer creates a reducer over generic input and output types. Options can be provided to configure the input channel, output channel, flush period, etc. If channels are not provided via options, the reducer will create and own them. Just like other runners, the Reducer starts as soon as it is created.

func (*Reducer[T, C, U]) ClosedChan added in v0.0.7

func (r *Reducer[T, C, U]) ClosedChan() <-chan error

ClosedChan returns the channel used to signal when the reducer is done

func (*Reducer[T, C, U]) Flush

func (fo *Reducer[T, C, U]) Flush()

Flush triggers an immediate flush of pending events by sending a command to the reducer goroutine. This is safe to call from any goroutine.

func (*Reducer[T, C, U]) InputChan added in v0.0.7

func (fo *Reducer[T, C, U]) InputChan() chan<- T

InputChan returns the channel onto which messages can be sent (to be reduced).

func (*Reducer[T, C, U]) IsRunning added in v0.0.7

func (r *Reducer[T, C, U]) IsRunning() bool

IsRunning returns true if the reducer is still running

func (*Reducer[T, C, U]) OutputChan added in v0.0.7

func (fo *Reducer[T, C, U]) OutputChan() <-chan U

OutputChan returns the channel from which we can read "reduced" values from

func (*Reducer[T, C, U]) Send

func (fo *Reducer[T, C, U]) Send(value T)

Send sends a message/value onto this reducer for (eventual) reduction.

func (*Reducer[T, C, U]) Stop

func (fo *Reducer[T, C, U]) Stop()

Stop stops the reducer and closes all channels it owns.

type Reducer2 added in v0.0.8

type Reducer2[T any, C any] = Reducer[T, C, C]

Reducer2 is a simplified 2-parameter version of Reducer where the collection type C is the same as the output type (U == C). This is the most common use case.

func NewIDReducer

func NewIDReducer[T any](opts ...ReducerOption2[T, []T]) *Reducer2[T, []T]

NewIDReducer2 creates a Reducer2 that simply collects events of type T into a list (of type []T).

func NewListReducer added in v0.0.5

func NewListReducer[T any](opts ...ReducerOption2[[]T, []T]) *Reducer2[[]T, []T]

A reducer that collects a list of items and concats them to a collection This allows producers to send events here in batch mode instead of 1 at a time

func NewReducer2 added in v0.0.8

func NewReducer2[T any, C any](opts ...ReducerOption2[T, C]) *Reducer2[T, C]

NewReducer2 creates a 2-parameter reducer where collection type equals output type. This is a simpler API for the common case where no type transformation is needed.

type ReducerOption added in v0.0.5

type ReducerOption[T any, C any, U any] func(*Reducer[T, C, U])

ReducerOption is a functional option for configuring a Reducer

func WithCollectFunc added in v0.0.12

func WithCollectFunc[T any, C any, U any](fn func(C, ...T) (C, bool)) ReducerOption[T, C, U]

WithCollectFunc sets the collect function for the reducer

func WithFlushPeriod added in v0.0.5

func WithFlushPeriod[T any, C any, U any](period time.Duration) ReducerOption[T, C, U]

WithFlushPeriod sets the flush period for the reducer

func WithInputChan added in v0.0.5

func WithInputChan[T any, C any, U any](ch chan T) ReducerOption[T, C, U]

WithInputChan sets the input channel for the reducer

func WithOutputChan added in v0.0.5

func WithOutputChan[T any, C any, U any](ch chan U) ReducerOption[T, C, U]

WithOutputChan sets the output channel for the reducer

func WithReduceFunc added in v0.0.12

func WithReduceFunc[T any, C any, U any](fn func(C) U) ReducerOption[T, C, U]

WithReduceFunc sets the reduce function for the reducer

type ReducerOption2 added in v0.0.8

type ReducerOption2[T any, C any] = ReducerOption[T, C, C]

ReducerOption2 is a functional option for configuring a Reducer2

func WithFlushPeriod2 added in v0.0.8

func WithFlushPeriod2[T any, C any](period time.Duration) ReducerOption2[T, C]

WithFlushPeriod2 sets the flush period for a Reducer2

func WithInputChan2 added in v0.0.8

func WithInputChan2[T any, C any](ch chan T) ReducerOption2[T, C]

WithInputChan2 sets the input channel for a Reducer2

func WithOutputChan2 added in v0.0.8

func WithOutputChan2[T any, C any](ch chan C) ReducerOption2[T, C]

WithOutputChan2 sets the output channel for a Reducer2

type RunnerBase

type RunnerBase[C any] struct {
	// contains filtered or unexported fields
}

RunnerBase is the base of the Reader, Writer, Mapper, FanIn, and FanOut primitives. It provides lifecycle management (start/stop) and coordination between the owner goroutine and the worker goroutine.

Key design: controlChan is created once and never closed or nilled. The done channel is closed by cleanup() to signal that the worker goroutine has exited. This eliminates the data race between Stop() sending on controlChan and cleanup() closing it that existed in the previous mutex+close design.

func NewRunnerBase

func NewRunnerBase[C any](stopVal C) RunnerBase[C]

NewRunnerBase creates a new base runner. Called by Reader, Writer, Mapper, FanIn, and FanOut constructors. The controlChan is buffered(1) to allow a single stop signal to be sent without blocking.

func (*RunnerBase[R]) DebugInfo

func (r *RunnerBase[R]) DebugInfo() any

DebugInfo returns diagnostic information about the runner's state.

func (*RunnerBase[C]) Done added in v0.0.12

func (r *RunnerBase[C]) Done() <-chan struct{}

Done returns a channel that is closed when the runner's worker goroutine exits. Useful for coordinating with other goroutines that need to know when the runner has stopped (e.g., FanIn's pipeClosed callback uses this to avoid sending on controlChan after the FanIn goroutine has exited).

func (*RunnerBase[C]) IsRunning

func (r *RunnerBase[C]) IsRunning() bool

IsRunning returns true if the runner's worker goroutine is active.

func (*RunnerBase[C]) Stop

func (r *RunnerBase[C]) Stop() error

Stop sends a stop signal to the worker goroutine and waits for it to finish. It is safe to call Stop() concurrently, multiple times, or after the worker goroutine has already self-terminated. Only the first call that transitions isRunning from true to false will send the stop signal; subsequent calls return immediately.

type SyncFanOut added in v0.1.0

type SyncFanOut[T any] struct {
	// contains filtered or unexported fields
}

SyncFanOut distributes events to all registered output channels synchronously within the runner goroutine.

Ordering semantics — sender waits for everyone:

	Time ──────────────────────────────────────────────────────────────►

	Sender:  Send(A) ─────────────────────────────────► Send(B) ───────────────────────► ...
	                  │                                          │
	                  ├─ deliver A to out[0] (blocks if full)    ├─ deliver B to out[0]
	                  ├─ deliver A to out[1] (blocks if full)    ├─ deliver B to out[1]
	                  └─ deliver A to out[2] (blocks if full)    └─ deliver B to out[2]

	Guarantee: A is fully delivered to ALL outputs before B begins.

  - Sender blocks until ALL outputs have received the event.
  - Strict FIFO: event A is delivered to every output before event B.
  - Zero extra goroutines — everything runs in the single runner goroutine.
  - A slow output blocks the sender AND all other outputs.

Use SyncFanOut when the number of outputs is small, outputs are fast, and back-pressure to the sender is desirable.

func NewSyncFanOut added in v0.1.0

func NewSyncFanOut[T any](opts ...FanOutOption[T]) *SyncFanOut[T]

NewSyncFanOut creates a SyncFanOut that delivers events to all outputs synchronously. The fan-out starts running immediately.

Options:

Example:

fo := NewSyncFanOut[int]()
defer fo.Stop()
out := fo.New(nil)
fo.Send(42)
val := <-out // 42

func (*SyncFanOut) Add added in v0.1.0

func (c *SyncFanOut) Add(output chan<- T, filter FilterFunc[T], wait bool) (callbackChan chan error)

Add registers an output channel with an optional filter. If wait is true, the returned channel receives nil once registration is complete.

func (*SyncFanOut) ClosedChan added in v0.1.0

func (c *SyncFanOut) ClosedChan() <-chan error

ClosedChan returns the channel used to signal when the fan-out is done.

func (*SyncFanOut) Count added in v0.1.0

func (c *SyncFanOut) Count() int

Count returns the number of registered output channels.

func (*SyncFanOut) DebugInfo added in v0.1.0

func (c *SyncFanOut) DebugInfo() any

DebugInfo returns diagnostic information about the fan-out's state.

func (*SyncFanOut) InputChan added in v0.1.0

func (c *SyncFanOut) InputChan() chan<- T

InputChan returns the write-only input channel.

func (*SyncFanOut) New added in v0.1.0

func (c *SyncFanOut) New(filter FilterFunc[T]) chan T

New creates a new owned output channel with an optional filter. The fan-out will close this channel on Remove or Stop.

func (*SyncFanOut) Remove added in v0.1.0

func (c *SyncFanOut) Remove(output chan<- T, wait bool) (callbackChan chan error)

Remove unregisters an output channel. If the channel was created by New, it is also closed.

func (*SyncFanOut) Send added in v0.1.0

func (c *SyncFanOut) Send(value T)

Send writes a value to the input channel for fan-out distribution.

type SyncMap added in v0.0.14

type SyncMap[K comparable, V any] struct {
	// contains filtered or unexported fields
}

SyncMap is a type-safe generic wrapper around sync.Map.

It provides the same concurrent map semantics as sync.Map — optimized for read-heavy workloads with stable keys — but with compile-time type safety instead of interface{} casts.

For documentation on the underlying concurrency guarantees, see: https://pkg.go.dev/sync#Map

Usage:

var m gocurrent.SyncMap[string, *Session]
m.Store("abc", session)
if s, ok := m.Load("abc"); ok {
    // s is *Session, no type assertion needed
}

func (*SyncMap[K, V]) Delete added in v0.0.14

func (m *SyncMap[K, V]) Delete(key K)

Delete deletes the value for a key.

func (*SyncMap[K, V]) Load added in v0.0.14

func (m *SyncMap[K, V]) Load(key K) (value V, ok bool)

Load returns the value stored in the map for a key, or the zero value if no value is present. The ok result indicates whether value was found.

func (*SyncMap[K, V]) LoadAndDelete added in v0.0.14

func (m *SyncMap[K, V]) LoadAndDelete(key K) (value V, loaded bool)

LoadAndDelete deletes the value for a key, returning the previous value if any. The loaded result reports whether the key was present.

func (*SyncMap[K, V]) LoadOrStore added in v0.0.14

func (m *SyncMap[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool)

LoadOrStore returns the existing value for the key if present. Otherwise, it stores and returns the given value. The loaded result is true if the value was loaded, false if stored.

func (*SyncMap[K, V]) Range added in v0.0.14

func (m *SyncMap[K, V]) Range(f func(key K, value V) bool)

Range calls f sequentially for each key and value present in the map. If f returns false, Range stops the iteration.

See sync.Map.Range for details on concurrent modification semantics.

func (*SyncMap[K, V]) Store added in v0.0.14

func (m *SyncMap[K, V]) Store(key K, value V)

Store sets the value for a key.

type Writer

type Writer[W any] struct {
	RunnerBase[string]

	Write WriterFunc[W]
	// contains filtered or unexported fields
}

Writer is a typed Writer goroutine type which calls the Write method when it serializes its writes. It provides a way to serialize concurrent writes through a single goroutine.

func NewWriter

func NewWriter[W any](write WriterFunc[W], opts ...WriterOption[W]) *Writer[W]

NewWriter creates a new writer instance with functional options. The writer function is required as the first parameter, with optional configuration via functional options.

Examples:

// Simple usage (backwards compatible)
writer := NewWriter(myWriterFunc)

// With buffered input
writer := NewWriter(myWriterFunc, WithInputBuffer[int](100))

func (*Writer[W]) ClosedChan added in v0.0.2

func (wc *Writer[W]) ClosedChan() <-chan error

ClosedChan returns the channel used to signal when the writer is done

func (*Writer[W]) DebugInfo

func (w *Writer[W]) DebugInfo() any

func (*Writer[W]) InputChan added in v0.0.7

func (wc *Writer[W]) InputChan() chan<- W

InputChan returns the channel on which messages can be sent to the Writer. The returned channel is never nil after construction. Callers should prefer Send() for safe access that handles the writer being stopped.

func (*Writer[W]) Send

func (wc *Writer[W]) Send(req W) bool

Send sends a message to the Writer. Returns true if the message was accepted, false if the writer is stopped. Uses a select on Done() to safely unblock if the writer stops while the send is pending.

type WriterFunc

type WriterFunc[W any] func(W) error

WriterFunc is the type of the writer method used by the writer goroutine primitive to serialize its writes.

type WriterOption added in v0.0.8

type WriterOption[W any] func(*Writer[W])

WriterOption is a functional option for configuring a Writer

func WithInputBuffer added in v0.0.8

func WithInputBuffer[W any](size int) WriterOption[W]

WithInputBuffer sets the buffer size for the input channel

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL