Documentation
¶
Overview ¶
Package gocurrent provides utilities for common Go concurrency patterns.
This package implements several concurrency primitives inspired by Rob Pike's concurrency patterns from his talk "Go Concurrency Patterns" (https://go.dev/talks/2012/concurrency.slide).
The main components include:
- Reader: A goroutine wrapper that continuously calls a reader function and sends results to a channel, with error signaling via ClosedChan()
- Writer: A goroutine for serializing writes using a writer callback, with error signaling via ClosedChan()
- Mapper: Transform and/or filter data between channels
- Reducer: Collect and reduce N values from an input channel with configurable time windows
- Pipe: Connect a reader and writer channel with identity transform
- FanIn: Merge multiple input channels into a single output channel
- FanOut: Distribute messages from one channel to multiple output channels
- Map: A thread-safe map with read/write locking capabilities
All concurrency primitives are designed to be composable and provide fine-grained control over goroutine lifecycles, resource management, and error monitoring through completion signaling channels.
Index ¶
- func IDFunc[T any](input T) T
- type Block
- type Broadcast
- type Component
- type FanIn
- type FanInOption
- type FanOut
- func (fo *FanOut[T]) Add(output chan<- T, filter FilterFunc[T], wait bool) (callbackChan chan error)
- func (fo *FanOut[T]) ClosedChan() <-chan error
- func (fo *FanOut[T]) Count() int
- func (fo *FanOut[T]) DebugInfo() any
- func (fo *FanOut[T]) InputChan() chan<- T
- func (fo *FanOut[T]) New(filter FilterFunc[T]) chan T
- func (fo *FanOut[T]) Remove(output chan<- T, wait bool) (callbackChan chan error)
- func (fo *FanOut[T]) Send(value T)
- type FanOutOption
- type FilterFunc
- type InputComponent
- type Map
- func (m *Map[K, V]) Delete(k K)
- func (m *Map[K, V]) Get(k K) (V, bool)
- func (m *Map[K, V]) Has(k K) bool
- func (m *Map[K, V]) LDelete(k K, lock bool)
- func (m *Map[K, V]) LGet(k K, lock bool) (V, bool)
- func (m *Map[K, V]) LHas(k K, lock bool) bool
- func (m *Map[K, V]) LRange(lock bool, meth func(K, V) bool)
- func (m *Map[K, V]) LSet(k K, v V, lock bool)
- func (m *Map[K, V]) Lock()
- func (m *Map[K, V]) RLock()
- func (m *Map[K, V]) RUnlock()
- func (m *Map[K, V]) Range(meth func(K, V) bool)
- func (m *Map[K, V]) Set(k K, v V)
- func (m *Map[K, V]) Unlock()
- func (m *Map[K, V]) Update(actions func(items map[K]V))
- func (m *Map[K, V]) View(actions func())
- type Mapper
- func Connect[T any](from OutputComponent[T], to InputComponent[T]) *Mapper[T, T]
- func ConnectWith[I, O any](from OutputComponent[I], to InputComponent[O], mapper func(I) (O, bool, bool)) *Mapper[I, O]
- func NewMapper[T any, U any](input <-chan T, output chan<- U, mapper func(T) (U, bool, bool), ...) *Mapper[T, U]
- func NewPipe[T any](input <-chan T, output chan<- T) *Mapper[T, T]
- type MapperOption
- type Merge
- type Message
- type OutputComponent
- type Pipeline
- type Reader
- type ReaderFunc
- type ReaderOption
- type Reducer
- func (r *Reducer[T, C, U]) ClosedChan() <-chan error
- func (fo *Reducer[T, C, U]) Flush()
- func (fo *Reducer[T, C, U]) InputChan() chan<- T
- func (r *Reducer[T, C, U]) IsRunning() bool
- func (fo *Reducer[T, C, U]) OutputChan() <-chan U
- func (fo *Reducer[T, C, U]) Send(value T)
- func (fo *Reducer[T, C, U]) Stop()
- type Reducer2
- type ReducerOption
- type ReducerOption2
- type RunnerBase
- type Writer
- type WriterFunc
- type WriterOption
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Block ¶ added in v0.0.7
type Block struct {
// contains filtered or unexported fields
}
Block represents a composite component made up of multiple connected primitives. A Block itself acts as a component and can be nested within other Blocks.
type Broadcast ¶ added in v0.0.7
Example: Broadcast pattern - one input, multiple outputs
func NewBroadcast ¶ added in v0.0.7
NewBroadcast creates a broadcast block using FanOut
func (*Broadcast[T]) AddOutput ¶ added in v0.0.7
func (b *Broadcast[T]) AddOutput(filter FilterFunc[T]) chan T
AddOutput adds a new output channel to the broadcast
type Component ¶ added in v0.0.7
type Component interface {
// Stop stops the component and cleans up resources
Stop() error
// IsRunning returns true if the component is currently running
IsRunning() bool
}
Component represents any building block that can be part of a Block. All gocurrent primitives can implement this interface.
type FanIn ¶
type FanIn[T any] struct { RunnerBase[fanInCmd[T]] // OnChannelRemoved is called when a channel is removed so the caller can // perform other cleanups etc based on this OnChannelRemoved func(fi *FanIn[T], inchan <-chan T) // contains filtered or unexported fields }
FanIn merges multiple input channels into a single output channel. It implements the fan-in concurrency pattern where messages from multiple sources are combined into one stream.
Example ¶
// Create 5 input channels and send 5 numbers into them
// the collector channel
fanin := NewFanIn[int]()
defer fanin.Stop()
NUM_CHANS := 2
NUM_MSGS := 3
var inchans []chan int
for i := 0; i < NUM_CHANS; i++ {
inchan := make(chan int)
inchans = append(inchans, inchan)
fanin.Add(inchan)
}
for i := 0; i < NUM_CHANS; i++ {
go func(inchan chan int) {
// send some numbers into this fanin
for j := 0; j < NUM_MSGS; j++ {
inchan <- j
}
}(inchans[i])
}
// collect the fanned values
var vals []int
for i := 0; i < NUM_CHANS*NUM_MSGS; i++ {
val := <-fanin.OutputChan()
vals = append(vals, val)
}
// sort and print them for testing
sort.Ints(vals)
for _, v := range vals {
fmt.Println(v)
}
Output: 0 0 1 1 2 2
func NewFanIn ¶
func NewFanIn[T any](opts ...FanInOption[T]) *FanIn[T]
NewFanIn creates a new FanIn that merges multiple input channels with functional options. By default, creates and owns an unbuffered output channel. Use options to customize. The FanIn starts running immediately upon creation.
Examples:
// Simple usage with owned channel (backwards compatible) fanin := NewFanIn[int]() // With existing channel (backwards compatible) outChan := make(chan int, 10) fanin := NewFanIn(WithFanInOutputChan(outChan)) // With buffered output fanin := NewFanIn[int](WithFanInOutputBuffer[int](100))
func (*FanIn[T]) Add ¶
func (fi *FanIn[T]) Add(inputs ...<-chan T)
Add adds one or more input channels to the FanIn. Messages from these channels will be merged into the output channel. Panics if any input channel is nil.
func (*FanIn[T]) ClosedChan ¶ added in v0.0.7
ClosedChan returns the channel used to signal when the fan-in is done
func (*FanIn[T]) OutputChan ¶ added in v0.0.7
func (fi *FanIn[T]) OutputChan() <-chan T
OutputChan returns the channel on which merged output can be received.
type FanInOption ¶ added in v0.0.8
FanInOption is a functional option for configuring a FanIn
func WithFanInOnChannelRemoved ¶ added in v0.0.8
func WithFanInOnChannelRemoved[T any](fn func(*FanIn[T], <-chan T)) FanInOption[T]
WithFanInOnChannelRemoved sets the callback for when a channel is removed
func WithFanInOutputBuffer ¶ added in v0.0.8
func WithFanInOutputBuffer[T any](size int) FanInOption[T]
WithFanInOutputBuffer creates a buffered output channel for the FanIn
func WithFanInOutputChan ¶ added in v0.0.8
func WithFanInOutputChan[T any](ch chan T) FanInOption[T]
WithFanInOutputChan sets the output channel for the FanIn
type FanOut ¶
type FanOut[T any] struct { RunnerBase[fanOutCmd[T]] // In the default mode, the Send method simply writes to an input channel that is read // by the runner loop of this FanOut. As soon as an event is read, it by default sequentially // writes to all output channels. If the output channels are not being drained by the reader // goroutine (in 2 above) then the Send method will block. // In other words, if the reader goroutine is NOT running before the Send method is invoked // OR if the reader goroutine is blocked for some reason, then the Send method will block. // To prevent this set the async flag to true in the Send method to true so that writes to // the reader goroutines are themselves asynchronous and non blocking. // // By setting this flag to true, writes to th output channels will happen synchronously without // invoking a new goroutine. This will help reduce number of goroutines kicked off during dispatch // and is is an optimization if callers/owners of this FanOut want to exercise fine control over the // reader channels and goroutines. For example the caller might create buffered output channels so // writes are blocked, or the caller themselves may be running the readers in seperate goroutines // to prevent any blocking behavior. SendSync bool // contains filtered or unexported fields }
FanOut takes a message from one chanel, applies a mapper function and fans it out to N output channels.
The general pattern is to:
- Create a FanOut[T] with the NewFanOut method
- Start a reader goroutine that reads values from fanout channels (note this SHOULD be started by any values are sent on the input channel)
- Start sending values through the input channel via the Send method.
Example ¶
// Create a fanout wiht 5 output channels and see that
// numbers sent into the output are read from all of these
fanout := NewFanOut[int]()
defer fanout.Stop()
NUM_CHANS := 2
NUM_MSGS := 3
// Add some receiver channels
var outchans []chan int
for i := 0; i < NUM_CHANS; i++ {
outchan := fanout.New(nil)
outchans = append(outchans, outchan)
}
var vals []int
for i := 0; i < NUM_MSGS; i++ {
fanout.Send(i)
}
// wait till all fanouts have been collected
for j := 0; j < NUM_MSGS; j++ {
for i := 0; i < NUM_CHANS; i++ {
val := <-outchans[i]
vals = append(vals, val)
}
}
// sort and print them for testing
sort.Ints(vals)
for _, v := range vals {
fmt.Println(v)
}
Output: 0 0 1 1 2 2
func NewFanOut ¶
func NewFanOut[T any](opts ...FanOutOption[T]) *FanOut[T]
NewFanOut creates a new FanOut that fans messages to multiple output channels with functional options. By default, creates and owns an unbuffered input channel. Use options to customize. The FanOut starts running immediately upon creation.
Examples:
// Simple usage with owned channel (backwards compatible) fanout := NewFanOut[int]() // With existing channel (backwards compatible) inChan := make(chan int, 10) fanout := NewFanOut(WithFanOutInputChan(inChan)) // With buffered input fanout := NewFanOut[int](WithFanOutInputBuffer[int](100)) // With synchronous sends fanout := NewFanOut[int](WithFanOutSendSync[int](true))
func (*FanOut[T]) Add ¶
func (fo *FanOut[T]) Add(output chan<- T, filter FilterFunc[T], wait bool) (callbackChan chan error)
Adds a new channel to which incoming messages will be fanned out to. These output channels can be either added by the caller or created by this runner. If the output channel was passed, then it wont be closed when this runner finishes (or is stopped). A filter function can also be passed on a per output channel basis that can either transform or filter messages specific to this output channel. For example filters can be used to check permissions for an incoming message wrt to an output channel.
Output channels are added to our list of listeners asynchronously. The wait parameter if set to true will return a channel that can be read from to ensure that this output channel registration is synchronous.
func (*FanOut[T]) ClosedChan ¶ added in v0.0.7
ClosedChan returns the channel used to signal when the fan-out is done
func (*FanOut[T]) InputChan ¶ added in v0.0.7
func (fo *FanOut[T]) InputChan() chan<- T
InputChan returns the channel on which messages can be sent to this runner to be fanned-out.
func (*FanOut[T]) New ¶
func (fo *FanOut[T]) New(filter FilterFunc[T]) chan T
Adds a new output channel with an optional filter function that will be managed by this runner.
type FanOutOption ¶ added in v0.0.8
FanOutOption is a functional option for configuring a FanOut
func WithFanOutInputBuffer ¶ added in v0.0.8
func WithFanOutInputBuffer[T any](size int) FanOutOption[T]
WithFanOutInputBuffer creates a buffered input channel for the FanOut
func WithFanOutInputChan ¶ added in v0.0.8
func WithFanOutInputChan[T any](ch chan T) FanOutOption[T]
WithFanOutInputChan sets the input channel for the FanOut
func WithFanOutSendSync ¶ added in v0.0.8
func WithFanOutSendSync[T any](sync bool) FanOutOption[T]
WithFanOutSendSync sets whether sends to output channels should be synchronous
type FilterFunc ¶
type FilterFunc[T any] func(*T) *T
FanOuts lets a message to be fanned-out to multiple channels. Optionally the message can also be transformed (or filtered) before fanning out to the listeners.
type InputComponent ¶ added in v0.0.7
type InputComponent[T any] interface { Component // InputChan returns the channel for sending input to this component InputChan() chan<- T // Send is a convenience method for sending to the input channel Send(T) }
InputComponent represents a component with an input channel
type Map ¶
type Map[K comparable, V any] struct { // contains filtered or unexported fields }
A synchronized map with read and update lock capabilities
func (*Map[K, V]) LDelete ¶
Deletes the value by a given key. Optionally obtains a write lock if requested. The lock can be skipped if a lock was already obtained over a larger transaction.
func (*Map[K, V]) LGet ¶
Gets the value by a given key. Optionally obtains a read lock if requested. A lock can be skipped if a lock was already obtained over a larger transaction.
func (*Map[K, V]) LHas ¶
Check if the map contains an entry by key. Optionally obtains a read lock if requested. A lock can be skipped if a lock was already obtained over a larger transaction.
func (*Map[K, V]) LRange ¶
Iterates over the items in this map. Optionally obtains a read lock if requested. The lock can be skipped if a lock was already obtained over a larger transaction.
func (*Map[K, V]) LSet ¶
Sets the value by a given key. Optionally obtains a write lock if requested. The lock can be skipped if a lock was already obtained over a larger transaction.
func (*Map[K, V]) Set ¶
func (m *Map[K, V]) Set(k K, v V)
Locks the map to set the value of a given key.
type Mapper ¶
type Mapper[I any, O any] struct { RunnerBase[string] // MapFunc is applied to each value in the input channel // and returns a tuple of 3 things - outval, skip, stop // if skip is false, outval is sent to the output channel // if stop is true, then the entire mapper stops processing any further elements. // This mechanism can be used inaddition to the Stop method if sequencing this // within the elements of input channel is required MapFunc func(I) (O, bool, bool) OnDone func(p *Mapper[I, O]) // contains filtered or unexported fields }
Mapper connects an input and output channel applying transforms between them. It reads from the input channel, applies a transformation function, and writes the result to the output channel.
func Connect ¶ added in v0.0.7
func Connect[T any](from OutputComponent[T], to InputComponent[T]) *Mapper[T, T]
Connect connects the output of one component to the input of another using a Pipe. Returns the pipe so it can be managed if needed.
func ConnectWith ¶ added in v0.0.7
func ConnectWith[I, O any](from OutputComponent[I], to InputComponent[O], mapper func(I) (O, bool, bool)) *Mapper[I, O]
ConnectWith connects two components using a custom mapper function
func NewMapper ¶
func NewMapper[T any, U any](input <-chan T, output chan<- U, mapper func(T) (U, bool, bool), opts ...MapperOption[T, U]) *Mapper[T, U]
NewMapper creates a new mapper between an input and output channel with functional options. The ownership of the channels is by the caller and not the Mapper, so they will not be closed when the mapper stops. The mapper function returns (output, skip, stop) where: - output: the transformed value - skip: if true, the output is not sent to the output channel - stop: if true, the mapper stops processing further elements
Examples:
// Simple usage (backwards compatible)
mapper := NewMapper(inChan, outChan, myMapperFunc)
// With OnDone callback
mapper := NewMapper(inChan, outChan, myMapperFunc,
WithMapperOnDone(func(m *Mapper[int, string]) {
log.Println("mapper done")
}))
func NewPipe ¶
NewPipe creates a new pipe that connects an input and output channel. A pipe is a mapper with the identity function, so it simply forwards all values from input to output without transformation.
func (*Mapper[I, O]) ClosedChan ¶ added in v0.0.7
ClosedChan returns the channel used to signal when the mapper is done
func (*Mapper[I, O]) InputChan ¶ added in v0.0.7
func (m *Mapper[I, O]) InputChan() chan<- I
InputChan returns the input channel (exposes the private field for Component interface)
func (*Mapper[I, O]) OutputChan ¶ added in v0.0.7
func (m *Mapper[I, O]) OutputChan() <-chan O
OutputChan returns the output channel (exposes the private field for Component interface)
type MapperOption ¶ added in v0.0.8
MapperOption is a functional option for configuring a Mapper
func WithMapperOnDone ¶ added in v0.0.8
func WithMapperOnDone[I, O any](fn func(*Mapper[I, O])) MapperOption[I, O]
WithMapperOnDone sets the callback to be called when the mapper finishes
type Merge ¶ added in v0.0.7
Example: Merge pattern - multiple inputs, one output
func (*Merge[T]) AddInput ¶ added in v0.0.7
func (m *Merge[T]) AddInput(input <-chan T)
AddInput adds a new input channel to the merge
func (*Merge[T]) OutputChan ¶ added in v0.0.7
func (m *Merge[T]) OutputChan() <-chan T
OutputChan implements OutputComponent
type Message ¶
type Message[T any] struct { Value T // The actual value being transmitted Error error // Any error that occurred during processing Source any // Optional source information for debugging }
Message represents a value with optional error and source information. It's used by channels to carry both successful values and error conditions.
type OutputComponent ¶ added in v0.0.7
type OutputComponent[T any] interface { Component // OutputChan returns the channel for receiving output from this component OutputChan() <-chan T }
OutputComponent represents a component with an output channel
type Pipeline ¶ added in v0.0.7
Pipeline creates a linear sequence of components connected by pipes
func NewPipeline ¶ added in v0.0.7
NewPipeline creates a new pipeline block
func (*Pipeline[T]) InputChan ¶ added in v0.0.7
func (p *Pipeline[T]) InputChan() chan<- T
InputChan implements InputComponent
func (*Pipeline[T]) OutputChan ¶ added in v0.0.7
func (p *Pipeline[T]) OutputChan() <-chan T
OutputChan implements OutputComponent
type Reader ¶
type Reader[R any] struct { RunnerBase[string] Read ReaderFunc[R] OnDone func(r *Reader[R]) // contains filtered or unexported fields }
Reader is a typed Reader goroutine which calls a Read method to return data over a channel. It continuously calls the reader function and sends results to a channel wrapped in Message structs.
func NewReader ¶
func NewReader[R any](read ReaderFunc[R], opts ...ReaderOption[R]) *Reader[R]
NewReader creates a new reader instance with functional options. The reader function is required as the first parameter, with optional configuration via functional options.
Examples:
// Simple usage (backwards compatible)
reader := NewReader(myReaderFunc)
// With options
reader := NewReader(myReaderFunc, WithOutputBuffer[int](10))
// With multiple options
reader := NewReader(myReaderFunc,
WithOutputBuffer[int](100),
WithOnDone(func(r *Reader[int]) { log.Println("done") }))
func (*Reader[R]) ClosedChan ¶ added in v0.0.2
The channel used to signal when the reader is done
func (*Reader[R]) OutputChan ¶ added in v0.0.7
OutputChan returns the channel on which messages can be received.
type ReaderFunc ¶
ReaderFunc is the type of the reader method used by the Reader goroutine primitive.
type ReaderOption ¶ added in v0.0.8
ReaderOption is a functional option for configuring a Reader
func WithOnDone ¶ added in v0.0.8
func WithOnDone[R any](fn func(*Reader[R])) ReaderOption[R]
WithOnDone sets the callback to be called when the reader finishes
func WithOutputBuffer ¶ added in v0.0.8
func WithOutputBuffer[R any](size int) ReaderOption[R]
WithOutputBuffer sets the buffer size for the output channel
type Reducer ¶
type Reducer[T any, C any, U any] struct { FlushPeriod time.Duration // CollectFunc adds an input to the collection and returns the updated collection. // The bool return value indicates whether a flush should be triggered immediately. CollectFunc func(collection C, inputs ...T) (C, bool) ReduceFunc func(collectedItems C) (reducedOutputs U) // contains filtered or unexported fields }
Reducer is a way to collect messages of type T in some kind of window and reduce them to type U. For example this could be used to batch messages into a list every 10 seconds. Alternatively if a time based window is not used a reduction can be invoked manually.
Example ¶
inputChan := make(chan int)
outputChan := make(chan []int)
// Create a reducer that collects integers into slices
reducer := NewIDReducer(
WithInputChan[int, []int, []int](inputChan),
WithOutputChan[int, []int](outputChan),
WithFlushPeriod[int, []int, []int](50*time.Millisecond))
defer reducer.Stop()
// Send data
go func() {
for i := range 3 {
inputChan <- i
}
}()
// After FlushPeriod, receive the collected batch
batch := <-outputChan
fmt.Println(batch)
Output: [0 1 2]
func NewReducer ¶
func NewReducer[T any, C any, U any](opts ...ReducerOption[T, C, U]) *Reducer[T, C, U]
NewReducer creates a reducer over generic input and output types. Options can be provided to configure the input channel, output channel, flush period, etc. If channels are not provided via options, the reducer will create and own them. Just like other runners, the Reducer starts as soon as it is created.
func (*Reducer[T, C, U]) ClosedChan ¶ added in v0.0.7
ClosedChan returns the channel used to signal when the reducer is done
func (*Reducer[T, C, U]) Flush ¶
func (fo *Reducer[T, C, U]) Flush()
Flush immediately processes all pending events and sends the result to the output channel.
func (*Reducer[T, C, U]) InputChan ¶ added in v0.0.7
func (fo *Reducer[T, C, U]) InputChan() chan<- T
InputChan returns the channel onto which messages can be sent (to be reduced).
func (*Reducer[T, C, U]) IsRunning ¶ added in v0.0.7
IsRunning returns true if the reducer is still running
func (*Reducer[T, C, U]) OutputChan ¶ added in v0.0.7
func (fo *Reducer[T, C, U]) OutputChan() <-chan U
OutputChan returns the channel from which we can read "reduced" values from
type Reducer2 ¶ added in v0.0.8
Reducer2 is a simplified 2-parameter version of Reducer where the collection type C is the same as the output type (U == C). This is the most common use case.
func NewIDReducer ¶
func NewIDReducer[T any](opts ...ReducerOption2[T, []T]) *Reducer2[T, []T]
NewIDReducer2 creates a Reducer2 that simply collects events of type T into a list (of type []T).
func NewListReducer ¶ added in v0.0.5
func NewListReducer[T any](opts ...ReducerOption2[[]T, []T]) *Reducer2[[]T, []T]
A reducer that collects a list of items and concats them to a collection This allows producers to send events here in batch mode instead of 1 at a time
func NewReducer2 ¶ added in v0.0.8
func NewReducer2[T any, C any](opts ...ReducerOption2[T, C]) *Reducer2[T, C]
NewReducer2 creates a 2-parameter reducer where collection type equals output type. This is a simpler API for the common case where no type transformation is needed.
type ReducerOption ¶ added in v0.0.5
ReducerOption is a functional option for configuring a Reducer
func WithFlushPeriod ¶ added in v0.0.5
WithFlushPeriod sets the flush period for the reducer
func WithInputChan ¶ added in v0.0.5
func WithInputChan[T any, C any, U any](ch chan T) ReducerOption[T, C, U]
WithInputChan sets the input channel for the reducer
func WithOutputChan ¶ added in v0.0.5
func WithOutputChan[T any, C any, U any](ch chan U) ReducerOption[T, C, U]
WithOutputChan sets the output channel for the reducer
type ReducerOption2 ¶ added in v0.0.8
type ReducerOption2[T any, C any] = ReducerOption[T, C, C]
ReducerOption2 is a functional option for configuring a Reducer2
func WithFlushPeriod2 ¶ added in v0.0.8
func WithFlushPeriod2[T any, C any](period time.Duration) ReducerOption2[T, C]
WithFlushPeriod2 sets the flush period for a Reducer2
func WithInputChan2 ¶ added in v0.0.8
func WithInputChan2[T any, C any](ch chan T) ReducerOption2[T, C]
WithInputChan2 sets the input channel for a Reducer2
func WithOutputChan2 ¶ added in v0.0.8
func WithOutputChan2[T any, C any](ch chan C) ReducerOption2[T, C]
WithOutputChan2 sets the output channel for a Reducer2
type RunnerBase ¶
type RunnerBase[C any] struct { // contains filtered or unexported fields }
Base of the Reader and Writer primitives
func NewRunnerBase ¶
func NewRunnerBase[C any](stopVal C) RunnerBase[C]
Creates a new base runner - called by the Reader and Writer primitives
func (*RunnerBase[R]) DebugInfo ¶
func (r *RunnerBase[R]) DebugInfo() any
Used for returning any debug information.
func (*RunnerBase[C]) IsRunning ¶
func (r *RunnerBase[C]) IsRunning() bool
Returns true if currently running otherwise false
func (*RunnerBase[C]) Stop ¶
func (r *RunnerBase[C]) Stop() error
This method is called to stop the runner. It is upto the child classes to listen to messages on the control channel and initiate the wind-down and cleanup process.
type Writer ¶
type Writer[W any] struct { RunnerBase[string] Write WriterFunc[W] // contains filtered or unexported fields }
Writer is a typed Writer goroutine type which calls the Write method when it serializes its writes. It provides a way to serialize concurrent writes through a single goroutine.
func NewWriter ¶
func NewWriter[W any](write WriterFunc[W], opts ...WriterOption[W]) *Writer[W]
NewWriter creates a new writer instance with functional options. The writer function is required as the first parameter, with optional configuration via functional options.
Examples:
// Simple usage (backwards compatible) writer := NewWriter(myWriterFunc) // With buffered input writer := NewWriter(myWriterFunc, WithInputBuffer[int](100))
func (*Writer[W]) ClosedChan ¶ added in v0.0.2
ClosedChan returns the channel used to signal when the writer is done
type WriterFunc ¶
WriterFunc is the type of the writer method used by the writer goroutine primitive to serialize its writes.
type WriterOption ¶ added in v0.0.8
WriterOption is a functional option for configuring a Writer
func WithInputBuffer ¶ added in v0.0.8
func WithInputBuffer[W any](size int) WriterOption[W]
WithInputBuffer sets the buffer size for the input channel