rivo

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2026 License: MIT Imports: 6 Imported by: 0

README

rivo

Go Reference

rivo is a library for highly concurrent Go programs that provides type safety through generics and a composable workers architecture.

NOTE: THIS LIBRARY IS STILL IN ACTIVE DEVELOPMENT AND IS NOT YET PRODUCTION READY.

About

rivo has two major inspirations:

  1. The book "Concurrency in Go";
  2. ReactiveX, in particular the Go and JS libraries;

Compared to these sources, rivo aims to provide better type safety (both "Concurrency in Go" and RxGo were written in a pre-generics era and make heavy use of interface{}) and a more intuitive API and developer experience (Rx is very powerful, but can be overwhelming for newcomers).

Getting started

Prerequisites

rivo requires Go 1.23 or later.

Installation
  go get github.com/agiac/rivo
Basic concepts

rivo is built around its main Worker type, which has the following signature:

type Worker[T, U any] = func(ctx context.Context, in <-chan T, errs chan<- error) <-chan U

This is the result of various iterations and refinements and I believe you could go a long way with it, even without using the rest of the library.

  • The first argument is a context.Context, which allows for cancellation and timeouts, as well as passing values down the call chain, if needed.
  • The second argument is a read-only channel of type T, which represents the input stream of data.
  • The third argument is a write-only channel for errors. Following Go's focus on explicit error handling, this channel allows workers to report errors without stopping the entire stream.
  • The return value is a read-only channel of type U, which represents the output stream of data.

This structure allows for a clear flow of data, as well as composability of workers to create complex data processing pipelines, while maintaining type safety and explicit error handling.

For convenience, rivo also provides type aliases for common worker patterns:

Generator is a worker that generates items of type T without any input:

type Generator[T any] = Worker[None, T]

Sync is a worker that processes items of type T and does not emit any items, except after closing the output channel to signal completion:

type Sync[T any] = Worker[T, None]

Here's a basic example:

    package main
    
    import (
      "context"
      "fmt"
    
      "github.com/agiac/rivo"
    )
    
    // This example demonstrates a basic usage of workers and the Pipe function.
    // We create a channel of integers and filter only the even ones.
    
    func main() {
      ctx := context.Background()
    
      // `Of` returns a generator which returns a channel that will emit the provided values
      in := rivo.Of(1, 2, 3, 4, 5)
    
      // `Filter` returns a worker that filters the input channel using the given function.
      onlyEven := rivo.Filter(func(ctx context.Context, n int) (bool, error) {
        return n%2 == 0, nil
      })
    
      // `Do` returns a worker that applies the given function to each item in the input channel, without emitting any values.
      log := rivo.Do(func(ctx context.Context, n int) error {
        fmt.Println(n)
        return nil
      })
    
      // `Pipe` composes workers together, returning a new worker
      p := rivo.Pipe3(in, onlyEven, log)
    
      // By passing a context and an input channel to our worker, we can get the output channel.
      // Since our first worker `in` is a generator and does not depend on an input channel, we can pass a nil channel.
      // Also, since log is a sink, we only have to read once from the output channel to know that the pipe has finished.
      <-p(ctx, nil, nil)
    
      // Expected output:
      // 2
      // 4
    }

rivo provides a set of utilities which can be divided in three main categories:

  1. Worker factories: functions that return workers for common use cases, like mapping, filtering, batching, etc.
  2. Flow control: functions that help with composing workers together, like Pipe, Merge, etc.
  3. Utilities: functions that help with common tasks, like collecting items from a channel, error handling, etc.

Worker factories

Generators
  • Of: returns a generator that emits the provided values;
  • FromFunc: returns a generator that emits values returned by the provided function until the function returns false;
  • FromSeq and FromSeq2: return generators that emit the values from the provided iterators;
Sinks
  • Do: returns a sink worker that performs a side effect for each item in the input stream;
Transformers
  • Filter: returns a worker that filters the input stream using the given function;
  • Map: returns a worker that applies a function to each item from the input stream;
  • FilterMap: returns a worker that filters and maps items from the input stream in a single operation;
  • Batch: returns a worker that groups the input stream into batches of the provided size;
  • Flatten: returns a worker that flattens the input stream of slices;
  • ForEachOutput: returns a worker that applies a function to each item, allowing direct output channel access;

Besides these, the library's subdirectories contain more specialized worker factories.

Package rivo/io
  • FromReader: returns a generator worker that reads from the provided io.Reader and emits the read bytes;
  • ToWriter: returns a sink worker that writes the input stream to the provided io.Writer;
Package rivo/bufio
  • FromScanner: returns a generator worker that reads from the provided bufio.Scanner and emits the scanned items;
  • ToWriter: returns a sink worker that writes the input stream to the provided bufio.Writer;
Package rivo/csv
  • FromReader: returns a generator worker that reads from the provided csv.Reader and emits the read records;
  • ToWriter: returns a sink worker that writes the input stream to the provided csv.Writer;
Configuration Options

Many workers support configuration options to customize their behavior:

  • Pool Size: Control the number of concurrent goroutines (e.g., MapPoolSize, FilterPoolSize, DoPoolSize)
  • Buffer Size: Control the internal channel buffer size (e.g., MapBufferSize, BatchBufferSize)
  • Time-based Options: Control time-based behavior (e.g., BatchMaxWait)
  • Lifecycle Hooks: Add hooks for cleanup or finalization (e.g., FromFuncOnBeforeClose)

Example usage:

// Map with custom pool size and buffer size
mapper := rivo.Map(transformFunc, rivo.MapPoolSize(5), rivo.MapBufferSize(100))

// Batch with time-based batching
batcher := rivo.Batch(10, rivo.BatchMaxWait(100*time.Millisecond))

Flow control

rivo provides functions to compose workers together, allowing you to build complex data processing pipelines:

  • Pipe, Pipe2, Pipe3, ... Pipe10: compose multiple workers together into a single worker;

Utilities

rivo provides several utility functions to work with streams:

  • Collect: collects all items from a stream into a slice
  • CollectWithContext: like Collect but respects context cancellation
  • OrDone: utility function that propagates context cancellation to streams
  • Merge: merges multiple streams into a single stream

Examples

More examples can be found in the examples folder.


Contributing

Contributions are welcome! If you have any ideas, suggestions or bug reports, please open an issue or a pull request.

Roadmap

  • Add more workers, also using the RxJS list of operators as a reference:
    • FilterMap (combines filter and map operations)
    • ForEachOutput (direct output channel access)
    • Tap (side effects without modifying the stream)
    • Time-based operators (throttle, debounce, etc.)
    • SQL-like operators (join, group by, etc.)
  • Add more utilities:
    • Merge (combine multiple streams)
    • Zip (combine streams element-wise)
    • Take/Skip operators
  • Performance optimizations and benchmarking
  • Add more examples and tutorials

License

rivo is licensed under the MIT license. See the LICENSE file for details.

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Collect

func Collect[T any](in <-chan T) []T

Collect collects all items from the channel and returns them as a slice.

func CollectWithContext

func CollectWithContext[T any](ctx context.Context, in <-chan T) []T

CollectWithContext collects all items from the channel and returns them as a slice. If the context is cancelled, it stops collecting items.

func Merge added in v0.6.0

func Merge[T any](ctx context.Context, channels ...<-chan T) <-chan T

Merge merges multiple input channels into a single output channel. It stops merging when the context is cancelled or all input channels are closed.

func OrDone

func OrDone[T any](ctx context.Context, in <-chan T) <-chan T

OrDone is a utility function that returns a channel that will be closed when the context is done.

func RunErrorSync added in v0.6.0

func RunErrorSync(ctx context.Context, fn func(ctx context.Context, errs <-chan error)) (chan<- error, func())

RunErrorSync creates and starts a new error-handling goroutine. It returns a channel to send errors to and a function to call to wait for the handler to finish. The returned wait function will close the error channel and wait for the handler goroutine to complete.

func RunErrorSyncFunc added in v0.6.0

func RunErrorSyncFunc(ctx context.Context, fn func(ctx context.Context, err error)) (chan<- error, func())

RunErrorSyncFunc is similar to RunErrorSync but takes a function that handles one error at a time. It returns a channel to send errors to and a function to call to wait for the handler to finish. The returned wait function will close the error channel and wait for the handler goroutine to complete.

func Tee

func Tee[T any](ctx context.Context, in <-chan T) (<-chan T, <-chan T)

Tee returns 2 channels that each receive a copy of each item from the input channel.

func TeeN

func TeeN[T any](ctx context.Context, in <-chan T, n int) []<-chan T

TeeN returns n channels that each receive a copy of each item from the input channel.

Types

type BatchOption added in v0.4.0

type BatchOption func(*batchOptions) error

func BatchBufferSize added in v0.4.0

func BatchBufferSize(n int) BatchOption

func BatchMaxWait added in v0.4.0

func BatchMaxWait(d time.Duration) BatchOption

type DoOption added in v0.4.0

type DoOption func(*doOptions) error

func DoOnBeforeClose added in v0.5.0

func DoOnBeforeClose(fn func(context.Context)) DoOption

func DoPoolSize added in v0.4.0

func DoPoolSize(n int) DoOption

type FilterMapOption added in v0.5.0

type FilterMapOption func(*filterMapOptions) error

func FilterMapBufferSize added in v0.5.0

func FilterMapBufferSize(n int) FilterMapOption

func FilterMapPoolSize added in v0.5.0

func FilterMapPoolSize(n int) FilterMapOption

type FilterOption added in v0.4.0

type FilterOption func(*filterOptions) error

func FilterBufferSize added in v0.4.0

func FilterBufferSize(n int) FilterOption

func FilterPoolSize added in v0.4.0

func FilterPoolSize(n int) FilterOption

type ForEachOutputOption added in v0.5.0

type ForEachOutputOption func(*forEachOutputOptions) error

func ForEachOutputBufferSize added in v0.5.0

func ForEachOutputBufferSize(bufferSize int) ForEachOutputOption

func ForEachOutputOnBeforeClose added in v0.5.0

func ForEachOutputOnBeforeClose(f func(context.Context)) ForEachOutputOption

func ForEachOutputPoolSize added in v0.5.0

func ForEachOutputPoolSize(poolSize int) ForEachOutputOption

type FromFuncOption added in v0.4.0

type FromFuncOption func(*fromFuncOptions) error

func FromFuncBufferSize added in v0.4.0

func FromFuncBufferSize(bufferSize int) FromFuncOption

func FromFuncOnBeforeClose added in v0.4.0

func FromFuncOnBeforeClose(f func(context.Context)) FromFuncOption

func FromFuncPoolSize added in v0.4.0

func FromFuncPoolSize(poolSize int) FromFuncOption

type FromSeq2Value

type FromSeq2Value[T, U any] struct {
	Val1 T
	Val2 U
}

type Generator

type Generator[T any] = Worker[None, T]

Generator is a worker that generates items of type T without any input.

func Fanin added in v0.6.0

func Fanin[T any](gg ...Generator[T]) Generator[T]

Fanin combines multiple Generator workers into a single Generator. It takes a variable number of Generator workers and returns a new Generator that merges the outputs of all the provided generators into a single output channel.

Example
ctx := context.Background()

g1 := Of("Hello", "World")
g2 := Of("Foo", "Bar")

gg := Fanin(g1, g2)

res := Collect(gg(ctx, nil, nil))

slices.Sort(res)

fmt.Println(res)
Output:

[Bar Foo Hello World]

func FromFunc

func FromFunc[T any](f func(context.Context) (T, bool, error), options ...FromFuncOption) Generator[T]

FromFunc returns a Generator that emits items generated by the given function. The returned channel will emit items until the function returns false in the second return value.

Example
ctx := context.Background()

count := atomic.Int32{}

genFn := func(ctx context.Context) (int32, bool, error) {
	value := count.Add(1)

	if value > 5 {
		return 0, false, nil
	}

	return value, true, nil
}

in := FromFunc(genFn)

s := in(ctx, nil, nil)

for item := range s {
	fmt.Println(item)
}
Output:

1
2
3
4
5

func FromSeq

func FromSeq[T any](seq iter.Seq[T]) Generator[T]
Example
ctx := context.Background()

seq := slices.Values([]int{1, 2, 3, 4, 5})
in := FromSeq(seq)

s := in(ctx, nil, nil)

for item := range s {
	fmt.Println(item)
}
Output:

1
2
3
4
5

func FromSeq2

func FromSeq2[T, U any](seq iter.Seq2[T, U]) Generator[FromSeq2Value[T, U]]
Example
ctx := context.Background()

seq := slices.All([]string{"a", "b", "c", "d", "e"})

in := FromSeq2(seq)

s := in(ctx, nil, nil)

for item := range s {
	fmt.Printf("%d, %s\n", item.Val1, item.Val2)
}
Output:

0, a
1, b
2, c
3, d
4, e

func Of

func Of[T any](items ...T) Generator[T]

Of returns a Generator that emits the given items.

Example
ctx := context.Background()

in := Of(1, 2, 3, 4, 5)

s := in(ctx, nil, nil)

for item := range s {
	fmt.Println(item)
}
Output:

1
2
3
4
5

type MapOption added in v0.4.0

type MapOption func(*mapOptions) error

func MapBufferSize added in v0.4.0

func MapBufferSize(bufferSize int) MapOption

func MapPoolSize added in v0.4.0

func MapPoolSize(poolSize int) MapOption

type None

type None struct{}

None is a type that represents no value. It is typically used as the input type of generator worker that does not depend on any input channel or for a sync worker that does not emit any items.

type Sync

type Sync[T any] = Worker[T, None]

Sync is a worker that processes items of type T and does not emit any items.

func Connect

func Connect[T any](pp ...Sync[T]) Sync[T]

Connect connects multiple Sync workers in parallel. It takes a variable number of Sync workers and returns a new Sync worker that runs all the provided workers concurrently on the same input channel. The output channel of the returned Sync worker will be closed once all the connected workers have completed their processing.

Example
ctx := context.Background()

g := Of("Hello", "Hello", "Hello")

capitalize := Map(func(ctx context.Context, i string) (string, error) {
	return strings.ToUpper(i), nil
})

lowercase := Map(func(ctx context.Context, i string) (string, error) {
	return strings.ToLower(i), nil
})

resA := make([]string, 0)
a := Do(func(ctx context.Context, i string) error {
	resA = append(resA, i)
	return nil
})

resB := make([]string, 0)
b := Do(func(ctx context.Context, i string) error {
	resB = append(resB, i)
	return nil
})

p1 := Pipe(capitalize, a)
p2 := Pipe(lowercase, b)

<-Pipe(g, Connect(p1, p2))(ctx, nil, nil)

for _, s := range resA {
	fmt.Println(s)
}

for _, s := range resB {
	fmt.Println(s)
}
Output:

HELLO
HELLO
HELLO
hello
hello
hello

func Do

func Do[T any](f func(context.Context, T) error, opt ...DoOption) Sync[T]

Do returns a sync worker that applies the given function to each item in the channel. The output channel will not emit any items, and it will be closed when the input channel is closed or the context is done. If the function returns an error, it will be sent to the error channel.

Example
ctx := context.Background()

g := Of(1, 2, 3, 4, 5)

d := Do(func(ctx context.Context, i int) error {
	fmt.Println(i)
	return nil
})

<-Pipe(g, d)(ctx, nil, nil)
Output:

1
2
3
4
5

func Fanout added in v0.6.0

func Fanout[T, U any](g Worker[T, U], ss ...Sync[U]) Sync[T]

Fanout connects a Worker to multiple Sync workers. It takes a Worker and a variable number of Sync workers, and returns a new Sync worker that first processes input items using the provided Worker, and then fans out the output to all the provided Sync workers for further processing.

Example
ctx := context.Background()

g := Of("Hello", "Hello", "Hello")

capitalize := Map(func(ctx context.Context, i string) (string, error) {
	return strings.ToUpper(i), nil
})

lowercase := Map(func(ctx context.Context, i string) (string, error) {
	return strings.ToLower(i), nil
})

resA := make([]string, 0)
a := Do(func(ctx context.Context, i string) error {
	resA = append(resA, i)
	return nil
})

resB := make([]string, 0)
b := Do(func(ctx context.Context, i string) error {
	resB = append(resB, i)
	return nil
})

p1 := Pipe(capitalize, a)
p2 := Pipe(lowercase, b)

<-Fanout(g, p1, p2)(ctx, nil, nil)

for _, s := range resA {
	fmt.Println(s)
}

for _, s := range resB {
	fmt.Println(s)
}
Output:

HELLO
HELLO
HELLO
hello
hello
hello

type Worker added in v0.6.0

type Worker[T, U any] = func(ctx context.Context, in <-chan T, errs chan<- error) <-chan U

Worker is the core abstraction representing a processing unit that takes input of type T from a channel, processes it, and emits output of type U to another channel. It also receives a context for cancellation and a channel for reporting errors.

func Batch added in v0.1.0

func Batch[T any](n int, opt ...BatchOption) Worker[T, []T]

Batch returns a Worker that batches items from the input channel into slices of n items. If the batch is not full after maxWait, it will be sent anyway.

Example
ctx := context.Background()

in := Of(1, 2, 3, 4, 5)

b := Batch[int](2)

p := Pipe(in, b)

for item := range p(ctx, nil, nil) {
	fmt.Printf("%v\n", item)
}
Output:

[1 2]
[3 4]
[5]

func Filter

func Filter[T any](f func(context.Context, T) (bool, error), opt ...FilterOption) Worker[T, T]

Filter returns a worker that filters the input channel using the given function.

Example
ctx := context.Background()

in := Of(1, 2, 3, 4, 5)

even := Filter(func(ctx context.Context, n int) (bool, error) {
	return n%2 == 0, nil
})

p := Pipe(in, even)

s := p(ctx, nil, nil)

for item := range s {
	fmt.Println(item)
}
Output:

2
4

func FilterMap added in v0.5.0

func FilterMap[T, U any](f func(context.Context, T) (bool, U, error), opt ...FilterMapOption) Worker[T, U]

FilterMap returns a worker that filters and maps items from the input channel.

Example
ctx := context.Background()

in := Of(1, 2, 3, 4, 5)

// Filter even numbers and multiply by 10
filterMapEvenAndMultiply := FilterMap(func(ctx context.Context, n int) (bool, int, error) {
	if n%2 == 0 {
		return true, n * 10, nil
	}
	return false, 0, nil
})

p := Pipe(in, filterMapEvenAndMultiply)

s := p(ctx, nil, nil)

for item := range s {
	fmt.Println(item)
}
Output:

20
40

func Flatten added in v0.2.0

func Flatten[T any]() Worker[[]T, T]

Flatten returns a Worker that flattens a channel of slices into a channel of individual items.

Example
ctx := context.Background()

in := Of([]int{1, 2}, []int{3, 4}, []int{5})

f := Flatten[int]()

p := Pipe(in, f)

for item := range p(ctx, nil, nil) {
	fmt.Printf("%v\n", item)
}
Output:

1
2
3
4
5

func ForEachOutput added in v0.5.0

func ForEachOutput[T, U any](f func(ctx context.Context, val T, out chan<- U, errs chan<- error), opt ...ForEachOutputOption) Worker[T, U]

ForEachOutput returns a worker that applies a function to each item from the input channel. The function can write directly to the output channel. The output channel should not be closed by the function, since the output channel will be closed when the input channel is closed or the context is done. ForEachOutput panics if invalid options are provided.

Example
ctx := context.Background()

in := Of(1, 2, 3, 4, 5)

f := func(ctx context.Context, n int, out chan<- int, errs chan<- error) {
	out <- n * 2
}

p := Pipe(in, ForEachOutput(f))

s := p(ctx, nil, nil)

for n := range s {
	fmt.Println(n)
}
Output:

2
4
6
8
10

func Map

func Map[T, U any](f func(context.Context, T) (U, error), opt ...MapOption) Worker[T, U]

Map returns a worker that applies a function to each item from the input channel.

Example
ctx := context.Background()

in := Of(1, 2, 3, 4, 5)

double := Map(func(ctx context.Context, n int) (int, error) {
	return n * 2, nil
})

p := Pipe(in, double)

s := p(ctx, nil, nil)

for n := range s {
	fmt.Println(n)
}
Output:

2
4
6
8
10

func Pipe

func Pipe[A, B, C any](a Worker[A, B], b Worker[B, C]) Worker[A, C]

Pipe pipes two workers together. It is a convenience function that calls Pipe2.

Example
ctx := context.Background()

a := Of(1, 2, 3, 4, 5)

p := Pipe(a, Map(func(ctx context.Context, n int) (int, error) {
	return n + 1, nil
}))

s := p(ctx, nil, nil)

for item := range s {
	fmt.Println(item)
}
Output:

2
3
4
5
6

func Pipe10 added in v0.6.0

func Pipe10[A, B, C, D, E, F, G, H, I, J, K any](a Worker[A, B], b Worker[B, C], c Worker[C, D], d Worker[D, E], e Worker[E, F], f Worker[F, G], g Worker[G, H], h Worker[H, I], i Worker[I, J], j Worker[J, K]) Worker[A, K]

Pipe10 pipes ten workers together.

func Pipe2

func Pipe2[A, B, C any](a Worker[A, B], b Worker[B, C]) Worker[A, C]

Pipe2 pipes two workers together.

func Pipe3

func Pipe3[A, B, C, D any](a Worker[A, B], b Worker[B, C], c Worker[C, D]) Worker[A, D]

Pipe3 pipes three workers together.

func Pipe4

func Pipe4[A, B, C, D, E any](a Worker[A, B], b Worker[B, C], c Worker[C, D], d Worker[D, E]) Worker[A, E]

Pipe4 pipes four workers together.

func Pipe5

func Pipe5[A, B, C, D, E, F any](a Worker[A, B], b Worker[B, C], c Worker[C, D], d Worker[D, E], e Worker[E, F]) Worker[A, F]

Pipe5 pipes five workers together.

func Pipe6 added in v0.6.0

func Pipe6[A, B, C, D, E, F, G any](a Worker[A, B], b Worker[B, C], c Worker[C, D], d Worker[D, E], e Worker[E, F], f Worker[F, G]) Worker[A, G]

Pipe6 pipes six workers together.

func Pipe7 added in v0.6.0

func Pipe7[A, B, C, D, E, F, G, H any](a Worker[A, B], b Worker[B, C], c Worker[C, D], d Worker[D, E], e Worker[E, F], f Worker[F, G], g Worker[G, H]) Worker[A, H]

Pipe7 pipes seven workers together.

func Pipe8 added in v0.6.0

func Pipe8[A, B, C, D, E, F, G, H, I any](a Worker[A, B], b Worker[B, C], c Worker[C, D], d Worker[D, E], e Worker[E, F], f Worker[F, G], g Worker[G, H], h Worker[H, I]) Worker[A, I]

Pipe8 pipes eight workers together.

func Pipe9 added in v0.6.0

func Pipe9[A, B, C, D, E, F, G, H, I, J any](a Worker[A, B], b Worker[B, C], c Worker[C, D], d Worker[D, E], e Worker[E, F], f Worker[F, G], g Worker[G, H], h Worker[H, I], i Worker[I, J]) Worker[A, J]

Pipe9 pipes nine workers together.

Directories

Path Synopsis
examples
basic command
errorHandling command
options command
readCSV command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL