gocurrent

package module
v0.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2025 License: GPL-3.0 Imports: 7 Imported by: 4

README

gocurrent

Go Reference Go Report Card

A Go library providing utilities for common concurrency patterns with customizable behavior. This package implements several concurrency primitives inspired by Rob Pike's concurrency patterns from his talk "Go Concurrency Patterns".

Installation

go get github.com/panyam/gocurrent

Quick Start

import "github.com/panyam/gocurrent"

// Create a reader that generates numbers
reader := gocurrent.NewReader(func() (int, error) {
    return rand.Intn(100), nil
})
defer reader.Stop()

// Read from the channel
for msg := range reader.RecvChan() {
    if msg.Error != nil {
        log.Printf("Error: %v", msg.Error)
        continue
    }
    fmt.Printf("Received: %d\n", msg.Value)
}

Components

Reader

A goroutine wrapper that continuously calls a reader function and sends results to a channel.

// Create a reader that reads from a data source
reader := gocurrent.NewReader(func() (string, error) {
    // Your data reading logic here
    return "data", nil
})
defer reader.Stop()

// Monitor for reader completion or errors
go func() {
    select {
    case err := <-reader.ClosedChan():
        if err != nil {
            log.Printf("Reader terminated with error: %v", err)
        } else {
            log.Println("Reader completed successfully")
        }
    }
}()

// Process messages
for msg := range reader.RecvChan() {
    if msg.Error != nil {
        log.Printf("Read error: %v", msg.Error)
        break
    }
    fmt.Printf("Read: %s\n", msg.Value)
}
Writer

A goroutine for serializing writes using a writer callback method.

// Create a writer that processes data
writer := gocurrent.NewWriter(func(data string) error {
    // Your data writing logic here
    fmt.Printf("Writing: %s\n", data)
    return nil
})
defer writer.Stop()

// Monitor for writer completion or errors
go func() {
    select {
    case err := <-writer.ClosedChan():
        if err != nil {
            log.Printf("Writer terminated with error: %v", err)
        } else {
            log.Println("Writer completed successfully")
        }
    }
}()

// Send data to writer
writer.Send("Hello")
writer.Send("World")
Mapper

Transform and/or filter data between channels.

inputChan := make(chan int, 10)
outputChan := make(chan string, 10)

// Create a mapper that converts integers to strings
mapper := gocurrent.NewMapper(inputChan, outputChan, func(i int) (string, bool, bool) {
    // Return: (output, skip, stop)
    return fmt.Sprintf("Number: %d", i), false, false
})
defer mapper.Stop()

// Send data
inputChan <- 42
inputChan <- 100

// Read transformed data
result := <-outputChan // "Number: 42"
Reducer

Collect and reduce values from an input channel with configurable time windows. The Reducer has three type parameters:

  • T - the input event type
  • C - the intermediate collection type (where events are batched)
  • U - the output type after reduction

Reducers use functional options for configuration:

// Simple case: Use NewIDReducer to collect events into a slice
// With all defaults (creates its own channels, 100ms flush period)
reducer := gocurrent.NewIDReducer[int]()
defer reducer.Stop()

// With custom configuration
inputChan := make(chan int, 10)
outputChan := make(chan []int, 10)
reducer := gocurrent.NewIDReducer[int](
    gocurrent.WithInputChan[int, []int, []int](inputChan),
    gocurrent.WithOutputChan[int, []int, []int](outputChan),
    gocurrent.WithFlushPeriod[int, []int, []int](100 * time.Millisecond))
defer reducer.Stop()

// Send data
for i := 0; i < 5; i++ {
    reducer.Send(i)
}

// After FlushPeriod, receive the collected batch
batch := <-outputChan // []int{0, 1, 2, 3, 4}

For custom collection and reduction logic, use NewReducer directly:

// Custom reducer: collect strings into a map, reduce to a summary
type WordCount map[string]int

inputChan := make(chan string, 10)
outputChan := make(chan string, 10)
reducer := gocurrent.NewReducer[string, WordCount, string](
    gocurrent.WithInputChan[string, WordCount, string](inputChan),
    gocurrent.WithOutputChan[string, WordCount, string](outputChan),
    gocurrent.WithFlushPeriod[string, WordCount, string](100 * time.Millisecond))
reducer.CollectFunc = func(word string, counts WordCount) (WordCount, bool) {
    if counts == nil {
        counts = make(WordCount)
    }
    counts[word]++
    return counts, false // Return true to trigger immediate flush
}
reducer.ReduceFunc = func(counts WordCount) string {
    return fmt.Sprintf("Counted %d unique words", len(counts))
}
Custom Flush Triggers

The CollectFunc can signal when to flush by returning true as the second return value. This enables custom flush criteria beyond time-based flushing:

// Length-based flush: flush when collection reaches 100 items
reducer.CollectFunc = func(input int, collection []int) ([]int, bool) {
    newCollection := append(collection, input)
    shouldFlush := len(newCollection) >= 100
    return newCollection, shouldFlush
}

// Custom criteria: flush when sum exceeds threshold
reducer.CollectFunc = func(input int, sum int) (int, bool) {
    newSum := sum + input
    shouldFlush := newSum > 1000
    return newSum, shouldFlush
}

// Manual flush is also available
reducer.Flush()
Pipe

Connect a reader and writer channel with identity transform.

inputChan := make(chan string, 10)
outputChan := make(chan string, 10)

// Create a pipe (identity mapper)
pipe := gocurrent.NewPipe(inputChan, outputChan)
defer pipe.Stop()

inputChan <- "hello"
result := <-outputChan // "hello"
FanIn

Merge multiple input channels into a single output channel.

// Create input channels
chan1 := make(chan int, 10)
chan2 := make(chan int, 10)
chan3 := make(chan int, 10)

// Create fan-in
fanIn := gocurrent.NewFanIn[int](nil)
defer fanIn.Stop()

// Add input channels
fanIn.Add(chan1, chan2, chan3)

// Send data to different channels
chan1 <- 1
chan2 <- 2
chan3 <- 3

// Read merged output
for i := 0; i < 3; i++ {
    result := <-fanIn.RecvChan()
    fmt.Printf("Received: %d\n", result)
}
FanOut

Distribute messages from one channel to multiple output channels.

// Create fan-out
fanOut := gocurrent.NewFanOut[string](nil)
defer fanOut.Stop()

// Add output channels
out1 := fanOut.New(nil) // No filter
out2 := fanOut.New(func(s *string) *string {
    // Filter: only pass strings longer than 5 chars
    if len(*s) > 5 {
        return s
    }
    return nil
})

// Send data
fanOut.Send("hello")    // Goes to out1 only
fanOut.Send("hello world") // Goes to both out1 and out2

// Read from outputs
select {
case msg := <-out1:
    fmt.Printf("Out1: %s\n", msg)
case msg := <-out2:
    fmt.Printf("Out2: %s\n", msg)
}
Map

A thread-safe map with read/write locking capabilities.

// Create a thread-safe map
safeMap := gocurrent.NewMap[string, int]()

// Basic operations
safeMap.Set("key1", 42)
value, exists := safeMap.Get("key1")
if exists {
    fmt.Printf("Value: %d\n", value)
}

// Transaction-style operations
safeMap.Update(func(m map[string]int) {
    m["key2"] = 100
    m["key3"] = 200
})

safeMap.View(func() {
    val1, _ := safeMap.LGet("key1", false) // No lock needed in View
    val2, _ := safeMap.LGet("key2", false)
    fmt.Printf("Sum: %d\n", val1 + val2)
})

Features

  • Type Safety: All components are fully generic and type-safe
  • Resource Management: Proper cleanup and lifecycle management
  • Composability: Components can be easily combined
  • Customizable: Configurable behavior and filtering
  • Thread Safety: Built-in synchronization where needed
  • Error Handling: Comprehensive error propagation with completion signaling
  • Monitoring: Built-in channels for monitoring goroutine completion and errors

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the GPL License.

References

Documentation

Overview

Package gocurrent provides utilities for common Go concurrency patterns.

This package implements several concurrency primitives inspired by Rob Pike's concurrency patterns from his talk "Go Concurrency Patterns" (https://go.dev/talks/2012/concurrency.slide).

The main components include:

  • Reader: A goroutine wrapper that continuously calls a reader function and sends results to a channel, with error signaling via ClosedChan()
  • Writer: A goroutine for serializing writes using a writer callback, with error signaling via ClosedChan()
  • Mapper: Transform and/or filter data between channels
  • Reducer: Collect and reduce N values from an input channel with configurable time windows
  • Pipe: Connect a reader and writer channel with identity transform
  • FanIn: Merge multiple input channels into a single output channel
  • FanOut: Distribute messages from one channel to multiple output channels
  • Map: A thread-safe map with read/write locking capabilities

All concurrency primitives are designed to be composable and provide fine-grained control over goroutine lifecycles, resource management, and error monitoring through completion signaling channels.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func IDFunc

func IDFunc[T any](input T) T

IDFunc is an identity function that returns its input unchanged. It's commonly used as a default mapper function for pipes and other operations.

Types

type Block added in v0.0.7

type Block struct {
	// contains filtered or unexported fields
}

Block represents a composite component made up of multiple connected primitives. A Block itself acts as a component and can be nested within other Blocks.

func NewBlock added in v0.0.7

func NewBlock(name string) *Block

NewBlock creates a new block with the given name

func (*Block) Add added in v0.0.7

func (b *Block) Add(component Component)

Add adds a component to this block

func (*Block) Count added in v0.0.7

func (b *Block) Count() int

Count returns the number of components in this block

func (*Block) IsRunning added in v0.0.7

func (b *Block) IsRunning() bool

IsRunning returns true if any component in the block is running

func (*Block) Name added in v0.0.7

func (b *Block) Name() string

Name returns the block's name

func (*Block) Stop added in v0.0.7

func (b *Block) Stop() error

Stop stops all components in this block in reverse order

type Broadcast added in v0.0.7

type Broadcast[T any] struct {
	*Block
	// contains filtered or unexported fields
}

Example: Broadcast pattern - one input, multiple outputs

func NewBroadcast added in v0.0.7

func NewBroadcast[T any](name string) *Broadcast[T]

NewBroadcast creates a broadcast block using FanOut

func (*Broadcast[T]) AddOutput added in v0.0.7

func (b *Broadcast[T]) AddOutput(filter FilterFunc[T]) chan T

AddOutput adds a new output channel to the broadcast

func (*Broadcast[T]) InputChan added in v0.0.7

func (b *Broadcast[T]) InputChan() chan<- T

InputChan implements InputComponent

func (*Broadcast[T]) Send added in v0.0.7

func (b *Broadcast[T]) Send(value T)

Send implements InputComponent

type Component added in v0.0.7

type Component interface {
	// Stop stops the component and cleans up resources
	Stop() error

	// IsRunning returns true if the component is currently running
	IsRunning() bool
}

Component represents any building block that can be part of a Block. All gocurrent primitives can implement this interface.

type FanIn

type FanIn[T any] struct {
	RunnerBase[fanInCmd[T]]
	// OnChannelRemoved is called when a channel is removed so the caller can
	// perform other cleanups etc based on this
	OnChannelRemoved func(fi *FanIn[T], inchan <-chan T)
	// contains filtered or unexported fields
}

FanIn merges multiple input channels into a single output channel. It implements the fan-in concurrency pattern where messages from multiple sources are combined into one stream.

Example
// Create 5 input channels and send 5 numbers into them
// the collector channel
fanin := NewFanIn[int]()
defer fanin.Stop()

NUM_CHANS := 2
NUM_MSGS := 3

var inchans []chan int
for i := 0; i < NUM_CHANS; i++ {
	inchan := make(chan int)
	inchans = append(inchans, inchan)
	fanin.Add(inchan)
}

for i := 0; i < NUM_CHANS; i++ {
	go func(inchan chan int) {
		// send some  numbers into this fanin
		for j := 0; j < NUM_MSGS; j++ {
			inchan <- j
		}
	}(inchans[i])
}

// collect the fanned values
var vals []int
for i := 0; i < NUM_CHANS*NUM_MSGS; i++ {
	val := <-fanin.OutputChan()
	vals = append(vals, val)
}

// sort and print them for testing
sort.Ints(vals)

for _, v := range vals {
	fmt.Println(v)
}
Output:

0
0
1
1
2
2

func NewFanIn

func NewFanIn[T any](opts ...FanInOption[T]) *FanIn[T]

NewFanIn creates a new FanIn that merges multiple input channels with functional options. By default, creates and owns an unbuffered output channel. Use options to customize. The FanIn starts running immediately upon creation.

Examples:

// Simple usage with owned channel (backwards compatible)
fanin := NewFanIn[int]()

// With existing channel (backwards compatible)
outChan := make(chan int, 10)
fanin := NewFanIn(WithFanInOutputChan(outChan))

// With buffered output
fanin := NewFanIn[int](WithFanInOutputBuffer[int](100))

func (*FanIn[T]) Add

func (fi *FanIn[T]) Add(inputs ...<-chan T)

Add adds one or more input channels to the FanIn. Messages from these channels will be merged into the output channel. Panics if any input channel is nil.

func (*FanIn[T]) ClosedChan added in v0.0.7

func (fi *FanIn[T]) ClosedChan() <-chan error

ClosedChan returns the channel used to signal when the fan-in is done

func (*FanIn[T]) Count

func (fi *FanIn[T]) Count() int

Count returns the number of input channels currently being monitored.

func (*FanIn[T]) OutputChan added in v0.0.7

func (fi *FanIn[T]) OutputChan() <-chan T

OutputChan returns the channel on which merged output can be received.

func (*FanIn[T]) Remove

func (fi *FanIn[T]) Remove(target <-chan T)

Remove removes an input channel from the FanIn's monitor list. The channel will no longer contribute to the merged output.

type FanInOption added in v0.0.8

type FanInOption[T any] func(*FanIn[T])

FanInOption is a functional option for configuring a FanIn

func WithFanInOnChannelRemoved added in v0.0.8

func WithFanInOnChannelRemoved[T any](fn func(*FanIn[T], <-chan T)) FanInOption[T]

WithFanInOnChannelRemoved sets the callback for when a channel is removed

func WithFanInOutputBuffer added in v0.0.8

func WithFanInOutputBuffer[T any](size int) FanInOption[T]

WithFanInOutputBuffer creates a buffered output channel for the FanIn

func WithFanInOutputChan added in v0.0.8

func WithFanInOutputChan[T any](ch chan T) FanInOption[T]

WithFanInOutputChan sets the output channel for the FanIn

type FanOut

type FanOut[T any] struct {
	RunnerBase[fanOutCmd[T]]

	// In the default mode, the Send method simply writes to an input channel that is read
	// by the runner loop of this FanOut.  As soon as an event is read, it by default sequentially
	// writes to all output channels.  If the output channels are not being drained by the reader
	// goroutine (in 2 above) then the Send method will block.
	// In other words, if the reader goroutine is NOT running before the Send method is invoked
	// OR if the reader goroutine is blocked for some reason, then the Send method will block.
	// To prevent this set the async flag to true in the Send method to true so that writes to
	// the reader goroutines are themselves asynchronous and non blocking.
	//
	// By setting this flag to true, writes to th output channels will happen synchronously without
	// invoking a new goroutine.  This will help reduce number of goroutines kicked off during dispatch
	// and is is an optimization if callers/owners of this FanOut want to exercise fine control over the
	// reader channels and goroutines.  For example the caller might create buffered output channels so
	// writes are blocked, or the caller themselves may be running the readers in seperate goroutines
	// to prevent any blocking behavior.
	SendSync bool
	// contains filtered or unexported fields
}

FanOut takes a message from one chanel, applies a mapper function and fans it out to N output channels.

The general pattern is to:

  1. Create a FanOut[T] with the NewFanOut method
  2. Start a reader goroutine that reads values from fanout channels (note this SHOULD be started by any values are sent on the input channel)
  3. Start sending values through the input channel via the Send method.
Example
// Create a fanout wiht 5 output channels and see that
// numbers sent into the output are read from all of these
fanout := NewFanOut[int]()
defer fanout.Stop()

NUM_CHANS := 2
NUM_MSGS := 3

// Add some receiver channels
var outchans []chan int
for i := 0; i < NUM_CHANS; i++ {
	outchan := fanout.New(nil)
	outchans = append(outchans, outchan)
}

var vals []int

for i := 0; i < NUM_MSGS; i++ {
	fanout.Send(i)
}

// wait till all fanouts have been collected
for j := 0; j < NUM_MSGS; j++ {
	for i := 0; i < NUM_CHANS; i++ {
		val := <-outchans[i]
		vals = append(vals, val)
	}
}

// sort and print them for testing
sort.Ints(vals)

for _, v := range vals {
	fmt.Println(v)
}
Output:

0
0
1
1
2
2

func NewFanOut

func NewFanOut[T any](opts ...FanOutOption[T]) *FanOut[T]

NewFanOut creates a new FanOut that fans messages to multiple output channels with functional options. By default, creates and owns an unbuffered input channel. Use options to customize. The FanOut starts running immediately upon creation.

Examples:

// Simple usage with owned channel (backwards compatible)
fanout := NewFanOut[int]()

// With existing channel (backwards compatible)
inChan := make(chan int, 10)
fanout := NewFanOut(WithFanOutInputChan(inChan))

// With buffered input
fanout := NewFanOut[int](WithFanOutInputBuffer[int](100))

// With synchronous sends
fanout := NewFanOut[int](WithFanOutSendSync[int](true))

func (*FanOut[T]) Add

func (fo *FanOut[T]) Add(output chan<- T, filter FilterFunc[T], wait bool) (callbackChan chan error)

Adds a new channel to which incoming messages will be fanned out to. These output channels can be either added by the caller or created by this runner. If the output channel was passed, then it wont be closed when this runner finishes (or is stopped). A filter function can also be passed on a per output channel basis that can either transform or filter messages specific to this output channel. For example filters can be used to check permissions for an incoming message wrt to an output channel.

Output channels are added to our list of listeners asynchronously. The wait parameter if set to true will return a channel that can be read from to ensure that this output channel registration is synchronous.

func (*FanOut[T]) ClosedChan added in v0.0.7

func (fo *FanOut[T]) ClosedChan() <-chan error

ClosedChan returns the channel used to signal when the fan-out is done

func (*FanOut[T]) Count

func (fo *FanOut[T]) Count() int

Returns the number of listening channels currently running.

func (*FanOut[T]) DebugInfo

func (fo *FanOut[T]) DebugInfo() any

func (*FanOut[T]) InputChan added in v0.0.7

func (fo *FanOut[T]) InputChan() chan<- T

InputChan returns the channel on which messages can be sent to this runner to be fanned-out.

func (*FanOut[T]) New

func (fo *FanOut[T]) New(filter FilterFunc[T]) chan T

Adds a new output channel with an optional filter function that will be managed by this runner.

func (*FanOut[T]) Remove

func (fo *FanOut[T]) Remove(output chan<- T, wait bool) (callbackChan chan error)

Removes an output channel from our list of listeners. If the channel was managed/owned by this runner then it will also be closed. Just like the Add method, Removals are asynchronous. This can be made synchronized by passing wait=true.

func (*FanOut[T]) Send

func (fo *FanOut[T]) Send(value T)

Sends a value which will be fanned out. This is a wrapper over sending messages over the input channel returned by SendChan.

type FanOutOption added in v0.0.8

type FanOutOption[T any] func(*FanOut[T])

FanOutOption is a functional option for configuring a FanOut

func WithFanOutInputBuffer added in v0.0.8

func WithFanOutInputBuffer[T any](size int) FanOutOption[T]

WithFanOutInputBuffer creates a buffered input channel for the FanOut

func WithFanOutInputChan added in v0.0.8

func WithFanOutInputChan[T any](ch chan T) FanOutOption[T]

WithFanOutInputChan sets the input channel for the FanOut

func WithFanOutSendSync added in v0.0.8

func WithFanOutSendSync[T any](sync bool) FanOutOption[T]

WithFanOutSendSync sets whether sends to output channels should be synchronous

type FilterFunc

type FilterFunc[T any] func(*T) *T

FanOuts lets a message to be fanned-out to multiple channels. Optionally the message can also be transformed (or filtered) before fanning out to the listeners.

type InputComponent added in v0.0.7

type InputComponent[T any] interface {
	Component

	// InputChan returns the channel for sending input to this component
	InputChan() chan<- T

	// Send is a convenience method for sending to the input channel
	Send(T)
}

InputComponent represents a component with an input channel

type Map

type Map[K comparable, V any] struct {
	// contains filtered or unexported fields
}

A synchronized map with read and update lock capabilities

func NewMap

func NewMap[K comparable, V any]() (out *Map[K, V])

Creates a new lockable map

func (*Map[K, V]) Delete

func (m *Map[K, V]) Delete(k K)

Locks the map to delete a given key.

func (*Map[K, V]) Get

func (m *Map[K, V]) Get(k K) (V, bool)

Locks the map to get the value of a given key.

func (*Map[K, V]) Has

func (m *Map[K, V]) Has(k K) bool

Locks the map to check for key membership

func (*Map[K, V]) LDelete

func (m *Map[K, V]) LDelete(k K, lock bool)

Deletes the value by a given key. Optionally obtains a write lock if requested. The lock can be skipped if a lock was already obtained over a larger transaction.

func (*Map[K, V]) LGet

func (m *Map[K, V]) LGet(k K, lock bool) (V, bool)

Gets the value by a given key. Optionally obtains a read lock if requested. A lock can be skipped if a lock was already obtained over a larger transaction.

func (*Map[K, V]) LHas

func (m *Map[K, V]) LHas(k K, lock bool) bool

Check if the map contains an entry by key. Optionally obtains a read lock if requested. A lock can be skipped if a lock was already obtained over a larger transaction.

func (*Map[K, V]) LRange

func (m *Map[K, V]) LRange(lock bool, meth func(K, V) bool)

Iterates over the items in this map. Optionally obtains a read lock if requested. The lock can be skipped if a lock was already obtained over a larger transaction.

func (*Map[K, V]) LSet

func (m *Map[K, V]) LSet(k K, v V, lock bool)

Sets the value by a given key. Optionally obtains a write lock if requested. The lock can be skipped if a lock was already obtained over a larger transaction.

func (*Map[K, V]) Lock

func (m *Map[K, V]) Lock()

Obtains a write lock on the map

func (*Map[K, V]) RLock

func (m *Map[K, V]) RLock()

Obtains a read lock on the map

func (*Map[K, V]) RUnlock

func (m *Map[K, V]) RUnlock()

Relinquishes a read lock on the map

func (*Map[K, V]) Range

func (m *Map[K, V]) Range(meth func(K, V) bool)

Locks the map to range of keys/values in this map

func (*Map[K, V]) Set

func (m *Map[K, V]) Set(k K, v V)

Locks the map to set the value of a given key.

func (*Map[K, V]) Unlock

func (m *Map[K, V]) Unlock()

Relinquishes a write lock on the map

func (*Map[K, V]) Update

func (m *Map[K, V]) Update(actions func(items map[K]V))

Obtains a write lock over the map to perform a list of udpate actions

func (*Map[K, V]) View

func (m *Map[K, V]) View(actions func())

Obtains a read lock over the map to perform a list of read actions

type Mapper

type Mapper[I any, O any] struct {
	RunnerBase[string]

	// MapFunc is applied to each value in the input channel
	// and returns a tuple of 3 things - outval, skip, stop
	// if skip is false, outval is sent to the output channel
	// if stop is true, then the entire mapper stops processing any further elements.
	// This mechanism can be used inaddition to the Stop method if sequencing this
	// within the elements of input channel is required
	MapFunc func(I) (O, bool, bool)
	OnDone  func(p *Mapper[I, O])
	// contains filtered or unexported fields
}

Mapper connects an input and output channel applying transforms between them. It reads from the input channel, applies a transformation function, and writes the result to the output channel.

func Connect added in v0.0.7

func Connect[T any](from OutputComponent[T], to InputComponent[T]) *Mapper[T, T]

Connect connects the output of one component to the input of another using a Pipe. Returns the pipe so it can be managed if needed.

func ConnectWith added in v0.0.7

func ConnectWith[I, O any](from OutputComponent[I], to InputComponent[O],
	mapper func(I) (O, bool, bool)) *Mapper[I, O]

ConnectWith connects two components using a custom mapper function

func NewMapper

func NewMapper[T any, U any](input <-chan T, output chan<- U, mapper func(T) (U, bool, bool), opts ...MapperOption[T, U]) *Mapper[T, U]

NewMapper creates a new mapper between an input and output channel with functional options. The ownership of the channels is by the caller and not the Mapper, so they will not be closed when the mapper stops. The mapper function returns (output, skip, stop) where: - output: the transformed value - skip: if true, the output is not sent to the output channel - stop: if true, the mapper stops processing further elements

Examples:

// Simple usage (backwards compatible)
mapper := NewMapper(inChan, outChan, myMapperFunc)

// With OnDone callback
mapper := NewMapper(inChan, outChan, myMapperFunc,
    WithMapperOnDone(func(m *Mapper[int, string]) {
        log.Println("mapper done")
    }))

func NewPipe

func NewPipe[T any](input <-chan T, output chan<- T) *Mapper[T, T]

NewPipe creates a new pipe that connects an input and output channel. A pipe is a mapper with the identity function, so it simply forwards all values from input to output without transformation.

func (*Mapper[I, O]) ClosedChan added in v0.0.7

func (m *Mapper[I, O]) ClosedChan() <-chan error

ClosedChan returns the channel used to signal when the mapper is done

func (*Mapper[I, O]) InputChan added in v0.0.7

func (m *Mapper[I, O]) InputChan() chan<- I

InputChan returns the input channel (exposes the private field for Component interface)

func (*Mapper[I, O]) OutputChan added in v0.0.7

func (m *Mapper[I, O]) OutputChan() <-chan O

OutputChan returns the output channel (exposes the private field for Component interface)

type MapperOption added in v0.0.8

type MapperOption[I, O any] func(*Mapper[I, O])

MapperOption is a functional option for configuring a Mapper

func WithMapperOnDone added in v0.0.8

func WithMapperOnDone[I, O any](fn func(*Mapper[I, O])) MapperOption[I, O]

WithMapperOnDone sets the callback to be called when the mapper finishes

type Merge added in v0.0.7

type Merge[T any] struct {
	*Block
	// contains filtered or unexported fields
}

Example: Merge pattern - multiple inputs, one output

func NewMerge added in v0.0.7

func NewMerge[T any](name string) *Merge[T]

NewMerge creates a merge block using FanIn

func (*Merge[T]) AddInput added in v0.0.7

func (m *Merge[T]) AddInput(input <-chan T)

AddInput adds a new input channel to the merge

func (*Merge[T]) OutputChan added in v0.0.7

func (m *Merge[T]) OutputChan() <-chan T

OutputChan implements OutputComponent

type Message

type Message[T any] struct {
	Value  T     // The actual value being transmitted
	Error  error // Any error that occurred during processing
	Source any   // Optional source information for debugging
}

Message represents a value with optional error and source information. It's used by channels to carry both successful values and error conditions.

type OutputComponent added in v0.0.7

type OutputComponent[T any] interface {
	Component

	// OutputChan returns the channel for receiving output from this component
	OutputChan() <-chan T
}

OutputComponent represents a component with an output channel

type Pipeline added in v0.0.7

type Pipeline[T any] struct {
	*Block
	// contains filtered or unexported fields
}

Pipeline creates a linear sequence of components connected by pipes

func NewPipeline added in v0.0.7

func NewPipeline[T any](name string) *Pipeline[T]

NewPipeline creates a new pipeline block

func (*Pipeline[T]) InputChan added in v0.0.7

func (p *Pipeline[T]) InputChan() chan<- T

InputChan implements InputComponent

func (*Pipeline[T]) OutputChan added in v0.0.7

func (p *Pipeline[T]) OutputChan() <-chan T

OutputChan implements OutputComponent

func (*Pipeline[T]) Send added in v0.0.7

func (p *Pipeline[T]) Send(value T)

Send implements InputComponent

type Reader

type Reader[R any] struct {
	RunnerBase[string]

	Read ReaderFunc[R]

	OnDone func(r *Reader[R])
	// contains filtered or unexported fields
}

Reader is a typed Reader goroutine which calls a Read method to return data over a channel. It continuously calls the reader function and sends results to a channel wrapped in Message structs.

func NewReader

func NewReader[R any](read ReaderFunc[R], opts ...ReaderOption[R]) *Reader[R]

NewReader creates a new reader instance with functional options. The reader function is required as the first parameter, with optional configuration via functional options.

Examples:

// Simple usage (backwards compatible)
reader := NewReader(myReaderFunc)

// With options
reader := NewReader(myReaderFunc, WithOutputBuffer[int](10))

// With multiple options
reader := NewReader(myReaderFunc,
    WithOutputBuffer[int](100),
    WithOnDone(func(r *Reader[int]) { log.Println("done") }))

func (*Reader[R]) ClosedChan added in v0.0.2

func (rc *Reader[R]) ClosedChan() <-chan error

The channel used to signal when the reader is done

func (*Reader[R]) DebugInfo

func (r *Reader[R]) DebugInfo() any

func (*Reader[R]) OutputChan added in v0.0.7

func (rc *Reader[R]) OutputChan() <-chan Message[R]

OutputChan returns the channel on which messages can be received.

type ReaderFunc

type ReaderFunc[R any] func() (msg R, err error)

ReaderFunc is the type of the reader method used by the Reader goroutine primitive.

type ReaderOption added in v0.0.8

type ReaderOption[R any] func(*Reader[R])

ReaderOption is a functional option for configuring a Reader

func WithOnDone added in v0.0.8

func WithOnDone[R any](fn func(*Reader[R])) ReaderOption[R]

WithOnDone sets the callback to be called when the reader finishes

func WithOutputBuffer added in v0.0.8

func WithOutputBuffer[R any](size int) ReaderOption[R]

WithOutputBuffer sets the buffer size for the output channel

type Reducer

type Reducer[T any, C any, U any] struct {
	FlushPeriod time.Duration
	// CollectFunc adds an input to the collection and returns the updated collection.
	// The bool return value indicates whether a flush should be triggered immediately.
	CollectFunc func(collection C, inputs ...T) (C, bool)
	ReduceFunc  func(collectedItems C) (reducedOutputs U)
	// contains filtered or unexported fields
}

Reducer is a way to collect messages of type T in some kind of window and reduce them to type U. For example this could be used to batch messages into a list every 10 seconds. Alternatively if a time based window is not used a reduction can be invoked manually.

Example
inputChan := make(chan int)
outputChan := make(chan []int)

// Create a reducer that collects integers into slices
reducer := NewIDReducer(
	WithInputChan[int, []int, []int](inputChan),
	WithOutputChan[int, []int](outputChan),
	WithFlushPeriod[int, []int, []int](50*time.Millisecond))
defer reducer.Stop()

// Send data
go func() {
	for i := range 3 {
		inputChan <- i
	}
}()

// After FlushPeriod, receive the collected batch
batch := <-outputChan
fmt.Println(batch)
Output:

[0 1 2]

func NewReducer

func NewReducer[T any, C any, U any](opts ...ReducerOption[T, C, U]) *Reducer[T, C, U]

NewReducer creates a reducer over generic input and output types. Options can be provided to configure the input channel, output channel, flush period, etc. If channels are not provided via options, the reducer will create and own them. Just like other runners, the Reducer starts as soon as it is created.

func (*Reducer[T, C, U]) ClosedChan added in v0.0.7

func (r *Reducer[T, C, U]) ClosedChan() <-chan error

ClosedChan returns the channel used to signal when the reducer is done

func (*Reducer[T, C, U]) Flush

func (fo *Reducer[T, C, U]) Flush()

Flush immediately processes all pending events and sends the result to the output channel.

func (*Reducer[T, C, U]) InputChan added in v0.0.7

func (fo *Reducer[T, C, U]) InputChan() chan<- T

InputChan returns the channel onto which messages can be sent (to be reduced).

func (*Reducer[T, C, U]) IsRunning added in v0.0.7

func (r *Reducer[T, C, U]) IsRunning() bool

IsRunning returns true if the reducer is still running

func (*Reducer[T, C, U]) OutputChan added in v0.0.7

func (fo *Reducer[T, C, U]) OutputChan() <-chan U

OutputChan returns the channel from which we can read "reduced" values from

func (*Reducer[T, C, U]) Send

func (fo *Reducer[T, C, U]) Send(value T)

Send sends a message/value onto this reducer for (eventual) reduction.

func (*Reducer[T, C, U]) Stop

func (fo *Reducer[T, C, U]) Stop()

Stop stops the reducer and closes all channels it owns.

type Reducer2 added in v0.0.8

type Reducer2[T any, C any] = Reducer[T, C, C]

Reducer2 is a simplified 2-parameter version of Reducer where the collection type C is the same as the output type (U == C). This is the most common use case.

func NewIDReducer

func NewIDReducer[T any](opts ...ReducerOption2[T, []T]) *Reducer2[T, []T]

NewIDReducer2 creates a Reducer2 that simply collects events of type T into a list (of type []T).

func NewListReducer added in v0.0.5

func NewListReducer[T any](opts ...ReducerOption2[[]T, []T]) *Reducer2[[]T, []T]

A reducer that collects a list of items and concats them to a collection This allows producers to send events here in batch mode instead of 1 at a time

func NewReducer2 added in v0.0.8

func NewReducer2[T any, C any](opts ...ReducerOption2[T, C]) *Reducer2[T, C]

NewReducer2 creates a 2-parameter reducer where collection type equals output type. This is a simpler API for the common case where no type transformation is needed.

type ReducerOption added in v0.0.5

type ReducerOption[T any, C any, U any] func(*Reducer[T, C, U])

ReducerOption is a functional option for configuring a Reducer

func WithFlushPeriod added in v0.0.5

func WithFlushPeriod[T any, C any, U any](period time.Duration) ReducerOption[T, C, U]

WithFlushPeriod sets the flush period for the reducer

func WithInputChan added in v0.0.5

func WithInputChan[T any, C any, U any](ch chan T) ReducerOption[T, C, U]

WithInputChan sets the input channel for the reducer

func WithOutputChan added in v0.0.5

func WithOutputChan[T any, C any, U any](ch chan U) ReducerOption[T, C, U]

WithOutputChan sets the output channel for the reducer

type ReducerOption2 added in v0.0.8

type ReducerOption2[T any, C any] = ReducerOption[T, C, C]

ReducerOption2 is a functional option for configuring a Reducer2

func WithFlushPeriod2 added in v0.0.8

func WithFlushPeriod2[T any, C any](period time.Duration) ReducerOption2[T, C]

WithFlushPeriod2 sets the flush period for a Reducer2

func WithInputChan2 added in v0.0.8

func WithInputChan2[T any, C any](ch chan T) ReducerOption2[T, C]

WithInputChan2 sets the input channel for a Reducer2

func WithOutputChan2 added in v0.0.8

func WithOutputChan2[T any, C any](ch chan C) ReducerOption2[T, C]

WithOutputChan2 sets the output channel for a Reducer2

type RunnerBase

type RunnerBase[C any] struct {
	// contains filtered or unexported fields
}

Base of the Reader and Writer primitives

func NewRunnerBase

func NewRunnerBase[C any](stopVal C) RunnerBase[C]

Creates a new base runner - called by the Reader and Writer primitives

func (*RunnerBase[R]) DebugInfo

func (r *RunnerBase[R]) DebugInfo() any

Used for returning any debug information.

func (*RunnerBase[C]) IsRunning

func (r *RunnerBase[C]) IsRunning() bool

Returns true if currently running otherwise false

func (*RunnerBase[C]) Stop

func (r *RunnerBase[C]) Stop() error

This method is called to stop the runner. It is upto the child classes to listen to messages on the control channel and initiate the wind-down and cleanup process.

type Writer

type Writer[W any] struct {
	RunnerBase[string]

	Write WriterFunc[W]
	// contains filtered or unexported fields
}

Writer is a typed Writer goroutine type which calls the Write method when it serializes its writes. It provides a way to serialize concurrent writes through a single goroutine.

func NewWriter

func NewWriter[W any](write WriterFunc[W], opts ...WriterOption[W]) *Writer[W]

NewWriter creates a new writer instance with functional options. The writer function is required as the first parameter, with optional configuration via functional options.

Examples:

// Simple usage (backwards compatible)
writer := NewWriter(myWriterFunc)

// With buffered input
writer := NewWriter(myWriterFunc, WithInputBuffer[int](100))

func (*Writer[W]) ClosedChan added in v0.0.2

func (wc *Writer[W]) ClosedChan() <-chan error

ClosedChan returns the channel used to signal when the writer is done

func (*Writer[W]) DebugInfo

func (w *Writer[W]) DebugInfo() any

func (*Writer[W]) InputChan added in v0.0.7

func (wc *Writer[W]) InputChan() chan<- W

InputChan returns the channel on which messages can be sent to the Writer.

func (*Writer[W]) Send

func (wc *Writer[W]) Send(req W) bool

Send sends a message to the Writer. This is a shortcut for sending a message to the underlying channel.

type WriterFunc

type WriterFunc[W any] func(W) error

WriterFunc is the type of the writer method used by the writer goroutine primitive to serialize its writes.

type WriterOption added in v0.0.8

type WriterOption[W any] func(*Writer[W])

WriterOption is a functional option for configuring a Writer

func WithInputBuffer added in v0.0.8

func WithInputBuffer[W any](size int) WriterOption[W]

WithInputBuffer sets the buffer size for the input channel

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL