Documentation
¶
Index ¶
- func Collect[T any](in <-chan T) []T
- func CollectWithContext[T any](ctx context.Context, in <-chan T) []T
- func Merge[T any](ctx context.Context, channels ...<-chan T) <-chan T
- func OrDone[T any](ctx context.Context, in <-chan T) <-chan T
- func RunErrorSync(ctx context.Context, fn func(ctx context.Context, errs <-chan error)) (chan<- error, func())
- func RunErrorSyncFunc(ctx context.Context, fn func(ctx context.Context, err error)) (chan<- error, func())
- func Tee[T any](ctx context.Context, in <-chan T) (<-chan T, <-chan T)
- func TeeN[T any](ctx context.Context, in <-chan T, n int) []<-chan T
- type BatchOption
- type DoOption
- type FilterMapOption
- type FilterOption
- type ForEachOutputOption
- type FromFuncOption
- type FromSeq2Value
- type Generator
- func Fanin[T any](gg ...Generator[T]) Generator[T]
- func FromFunc[T any](f func(context.Context) (T, bool, error), options ...FromFuncOption) Generator[T]
- func FromSeq[T any](seq iter.Seq[T]) Generator[T]
- func FromSeq2[T, U any](seq iter.Seq2[T, U]) Generator[FromSeq2Value[T, U]]
- func Of[T any](items ...T) Generator[T]
- type MapOption
- type None
- type Sync
- type Worker
- func Batch[T any](n int, opt ...BatchOption) Worker[T, []T]
- func Filter[T any](f func(context.Context, T) (bool, error), opt ...FilterOption) Worker[T, T]
- func FilterMap[T, U any](f func(context.Context, T) (bool, U, error), opt ...FilterMapOption) Worker[T, U]
- func Flatten[T any]() Worker[[]T, T]
- func ForEachOutput[T, U any](f func(ctx context.Context, val T, out chan<- U, errs chan<- error), ...) Worker[T, U]
- func Map[T, U any](f func(context.Context, T) (U, error), opt ...MapOption) Worker[T, U]
- func Pipe[A, B, C any](a Worker[A, B], b Worker[B, C]) Worker[A, C]
- func Pipe10[A, B, C, D, E, F, G, H, I, J, K any](a Worker[A, B], b Worker[B, C], c Worker[C, D], d Worker[D, E], e Worker[E, F], ...) Worker[A, K]
- func Pipe2[A, B, C any](a Worker[A, B], b Worker[B, C]) Worker[A, C]
- func Pipe3[A, B, C, D any](a Worker[A, B], b Worker[B, C], c Worker[C, D]) Worker[A, D]
- func Pipe4[A, B, C, D, E any](a Worker[A, B], b Worker[B, C], c Worker[C, D], d Worker[D, E]) Worker[A, E]
- func Pipe5[A, B, C, D, E, F any](a Worker[A, B], b Worker[B, C], c Worker[C, D], d Worker[D, E], e Worker[E, F]) Worker[A, F]
- func Pipe6[A, B, C, D, E, F, G any](a Worker[A, B], b Worker[B, C], c Worker[C, D], d Worker[D, E], e Worker[E, F], ...) Worker[A, G]
- func Pipe7[A, B, C, D, E, F, G, H any](a Worker[A, B], b Worker[B, C], c Worker[C, D], d Worker[D, E], e Worker[E, F], ...) Worker[A, H]
- func Pipe8[A, B, C, D, E, F, G, H, I any](a Worker[A, B], b Worker[B, C], c Worker[C, D], d Worker[D, E], e Worker[E, F], ...) Worker[A, I]
- func Pipe9[A, B, C, D, E, F, G, H, I, J any](a Worker[A, B], b Worker[B, C], c Worker[C, D], d Worker[D, E], e Worker[E, F], ...) Worker[A, J]
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Collect ¶
func Collect[T any](in <-chan T) []T
Collect collects all items from the channel and returns them as a slice.
func CollectWithContext ¶
CollectWithContext collects all items from the channel and returns them as a slice. If the context is cancelled, it stops collecting items.
func Merge ¶ added in v0.6.0
Merge merges multiple input channels into a single output channel. It stops merging when the context is cancelled or all input channels are closed.
func OrDone ¶
OrDone is a utility function that returns a channel that will be closed when the context is done.
func RunErrorSync ¶ added in v0.6.0
func RunErrorSync(ctx context.Context, fn func(ctx context.Context, errs <-chan error)) (chan<- error, func())
RunErrorSync creates and starts a new error-handling goroutine. It returns a channel to send errors to and a function to call to wait for the handler to finish. The returned wait function will close the error channel and wait for the handler goroutine to complete.
func RunErrorSyncFunc ¶ added in v0.6.0
func RunErrorSyncFunc(ctx context.Context, fn func(ctx context.Context, err error)) (chan<- error, func())
RunErrorSyncFunc is similar to RunErrorSync but takes a function that handles one error at a time. It returns a channel to send errors to and a function to call to wait for the handler to finish. The returned wait function will close the error channel and wait for the handler goroutine to complete.
Types ¶
type BatchOption ¶ added in v0.4.0
type BatchOption func(*batchOptions) error
func BatchBufferSize ¶ added in v0.4.0
func BatchBufferSize(n int) BatchOption
func BatchMaxWait ¶ added in v0.4.0
func BatchMaxWait(d time.Duration) BatchOption
type DoOption ¶ added in v0.4.0
type DoOption func(*doOptions) error
func DoOnBeforeClose ¶ added in v0.5.0
func DoPoolSize ¶ added in v0.4.0
type FilterMapOption ¶ added in v0.5.0
type FilterMapOption func(*filterMapOptions) error
func FilterMapBufferSize ¶ added in v0.5.0
func FilterMapBufferSize(n int) FilterMapOption
func FilterMapPoolSize ¶ added in v0.5.0
func FilterMapPoolSize(n int) FilterMapOption
type FilterOption ¶ added in v0.4.0
type FilterOption func(*filterOptions) error
func FilterBufferSize ¶ added in v0.4.0
func FilterBufferSize(n int) FilterOption
func FilterPoolSize ¶ added in v0.4.0
func FilterPoolSize(n int) FilterOption
type ForEachOutputOption ¶ added in v0.5.0
type ForEachOutputOption func(*forEachOutputOptions) error
func ForEachOutputBufferSize ¶ added in v0.5.0
func ForEachOutputBufferSize(bufferSize int) ForEachOutputOption
func ForEachOutputOnBeforeClose ¶ added in v0.5.0
func ForEachOutputOnBeforeClose(f func(context.Context)) ForEachOutputOption
func ForEachOutputPoolSize ¶ added in v0.5.0
func ForEachOutputPoolSize(poolSize int) ForEachOutputOption
type FromFuncOption ¶ added in v0.4.0
type FromFuncOption func(*fromFuncOptions) error
func FromFuncBufferSize ¶ added in v0.4.0
func FromFuncBufferSize(bufferSize int) FromFuncOption
func FromFuncOnBeforeClose ¶ added in v0.4.0
func FromFuncOnBeforeClose(f func(context.Context)) FromFuncOption
func FromFuncPoolSize ¶ added in v0.4.0
func FromFuncPoolSize(poolSize int) FromFuncOption
type FromSeq2Value ¶
type FromSeq2Value[T, U any] struct { Val1 T Val2 U }
type Generator ¶
Generator is a worker that generates items of type T without any input.
func Fanin ¶ added in v0.6.0
Fanin combines multiple Generator workers into a single Generator. It takes a variable number of Generator workers and returns a new Generator that merges the outputs of all the provided generators into a single output channel.
Example ¶
ctx := context.Background()
g1 := Of("Hello", "World")
g2 := Of("Foo", "Bar")
gg := Fanin(g1, g2)
res := Collect(gg(ctx, nil, nil))
slices.Sort(res)
fmt.Println(res)
Output: [Bar Foo Hello World]
func FromFunc ¶
func FromFunc[T any](f func(context.Context) (T, bool, error), options ...FromFuncOption) Generator[T]
FromFunc returns a Generator that emits items generated by the given function. The returned channel will emit items until the function returns false in the second return value.
Example ¶
ctx := context.Background()
count := atomic.Int32{}
genFn := func(ctx context.Context) (int32, bool, error) {
value := count.Add(1)
if value > 5 {
return 0, false, nil
}
return value, true, nil
}
in := FromFunc(genFn)
s := in(ctx, nil, nil)
for item := range s {
fmt.Println(item)
}
Output: 1 2 3 4 5
func FromSeq ¶
Example ¶
ctx := context.Background()
seq := slices.Values([]int{1, 2, 3, 4, 5})
in := FromSeq(seq)
s := in(ctx, nil, nil)
for item := range s {
fmt.Println(item)
}
Output: 1 2 3 4 5
func FromSeq2 ¶
func FromSeq2[T, U any](seq iter.Seq2[T, U]) Generator[FromSeq2Value[T, U]]
Example ¶
ctx := context.Background()
seq := slices.All([]string{"a", "b", "c", "d", "e"})
in := FromSeq2(seq)
s := in(ctx, nil, nil)
for item := range s {
fmt.Printf("%d, %s\n", item.Val1, item.Val2)
}
Output: 0, a 1, b 2, c 3, d 4, e
type MapOption ¶ added in v0.4.0
type MapOption func(*mapOptions) error
func MapBufferSize ¶ added in v0.4.0
func MapPoolSize ¶ added in v0.4.0
type None ¶
type None struct{}
None is a type that represents no value. It is typically used as the input type of generator worker that does not depend on any input channel or for a sync worker that does not emit any items.
type Sync ¶
Sync is a worker that processes items of type T and does not emit any items.
func Connect ¶
Connect connects multiple Sync workers in parallel. It takes a variable number of Sync workers and returns a new Sync worker that runs all the provided workers concurrently on the same input channel. The output channel of the returned Sync worker will be closed once all the connected workers have completed their processing.
Example ¶
ctx := context.Background()
g := Of("Hello", "Hello", "Hello")
capitalize := Map(func(ctx context.Context, i string) (string, error) {
return strings.ToUpper(i), nil
})
lowercase := Map(func(ctx context.Context, i string) (string, error) {
return strings.ToLower(i), nil
})
resA := make([]string, 0)
a := Do(func(ctx context.Context, i string) error {
resA = append(resA, i)
return nil
})
resB := make([]string, 0)
b := Do(func(ctx context.Context, i string) error {
resB = append(resB, i)
return nil
})
p1 := Pipe(capitalize, a)
p2 := Pipe(lowercase, b)
<-Pipe(g, Connect(p1, p2))(ctx, nil, nil)
for _, s := range resA {
fmt.Println(s)
}
for _, s := range resB {
fmt.Println(s)
}
Output: HELLO HELLO HELLO hello hello hello
func Do ¶
Do returns a sync worker that applies the given function to each item in the channel. The output channel will not emit any items, and it will be closed when the input channel is closed or the context is done. If the function returns an error, it will be sent to the error channel.
Example ¶
ctx := context.Background()
g := Of(1, 2, 3, 4, 5)
d := Do(func(ctx context.Context, i int) error {
fmt.Println(i)
return nil
})
<-Pipe(g, d)(ctx, nil, nil)
Output: 1 2 3 4 5
func Fanout ¶ added in v0.6.0
Fanout connects a Worker to multiple Sync workers. It takes a Worker and a variable number of Sync workers, and returns a new Sync worker that first processes input items using the provided Worker, and then fans out the output to all the provided Sync workers for further processing.
Example ¶
ctx := context.Background()
g := Of("Hello", "Hello", "Hello")
capitalize := Map(func(ctx context.Context, i string) (string, error) {
return strings.ToUpper(i), nil
})
lowercase := Map(func(ctx context.Context, i string) (string, error) {
return strings.ToLower(i), nil
})
resA := make([]string, 0)
a := Do(func(ctx context.Context, i string) error {
resA = append(resA, i)
return nil
})
resB := make([]string, 0)
b := Do(func(ctx context.Context, i string) error {
resB = append(resB, i)
return nil
})
p1 := Pipe(capitalize, a)
p2 := Pipe(lowercase, b)
<-Fanout(g, p1, p2)(ctx, nil, nil)
for _, s := range resA {
fmt.Println(s)
}
for _, s := range resB {
fmt.Println(s)
}
Output: HELLO HELLO HELLO hello hello hello
type Worker ¶ added in v0.6.0
Worker is the core abstraction representing a processing unit that takes input of type T from a channel, processes it, and emits output of type U to another channel. It also receives a context for cancellation and a channel for reporting errors.
func Batch ¶ added in v0.1.0
func Batch[T any](n int, opt ...BatchOption) Worker[T, []T]
Batch returns a Worker that batches items from the input channel into slices of n items. If the batch is not full after maxWait, it will be sent anyway.
Example ¶
ctx := context.Background()
in := Of(1, 2, 3, 4, 5)
b := Batch[int](2)
p := Pipe(in, b)
for item := range p(ctx, nil, nil) {
fmt.Printf("%v\n", item)
}
Output: [1 2] [3 4] [5]
func Filter ¶
Filter returns a worker that filters the input channel using the given function.
Example ¶
ctx := context.Background()
in := Of(1, 2, 3, 4, 5)
even := Filter(func(ctx context.Context, n int) (bool, error) {
return n%2 == 0, nil
})
p := Pipe(in, even)
s := p(ctx, nil, nil)
for item := range s {
fmt.Println(item)
}
Output: 2 4
func FilterMap ¶ added in v0.5.0
func FilterMap[T, U any](f func(context.Context, T) (bool, U, error), opt ...FilterMapOption) Worker[T, U]
FilterMap returns a worker that filters and maps items from the input channel.
Example ¶
ctx := context.Background()
in := Of(1, 2, 3, 4, 5)
// Filter even numbers and multiply by 10
filterMapEvenAndMultiply := FilterMap(func(ctx context.Context, n int) (bool, int, error) {
if n%2 == 0 {
return true, n * 10, nil
}
return false, 0, nil
})
p := Pipe(in, filterMapEvenAndMultiply)
s := p(ctx, nil, nil)
for item := range s {
fmt.Println(item)
}
Output: 20 40
func Flatten ¶ added in v0.2.0
Flatten returns a Worker that flattens a channel of slices into a channel of individual items.
Example ¶
ctx := context.Background()
in := Of([]int{1, 2}, []int{3, 4}, []int{5})
f := Flatten[int]()
p := Pipe(in, f)
for item := range p(ctx, nil, nil) {
fmt.Printf("%v\n", item)
}
Output: 1 2 3 4 5
func ForEachOutput ¶ added in v0.5.0
func ForEachOutput[T, U any](f func(ctx context.Context, val T, out chan<- U, errs chan<- error), opt ...ForEachOutputOption) Worker[T, U]
ForEachOutput returns a worker that applies a function to each item from the input channel. The function can write directly to the output channel. The output channel should not be closed by the function, since the output channel will be closed when the input channel is closed or the context is done. ForEachOutput panics if invalid options are provided.
Example ¶
ctx := context.Background()
in := Of(1, 2, 3, 4, 5)
f := func(ctx context.Context, n int, out chan<- int, errs chan<- error) {
out <- n * 2
}
p := Pipe(in, ForEachOutput(f))
s := p(ctx, nil, nil)
for n := range s {
fmt.Println(n)
}
Output: 2 4 6 8 10
func Map ¶
Map returns a worker that applies a function to each item from the input channel.
Example ¶
ctx := context.Background()
in := Of(1, 2, 3, 4, 5)
double := Map(func(ctx context.Context, n int) (int, error) {
return n * 2, nil
})
p := Pipe(in, double)
s := p(ctx, nil, nil)
for n := range s {
fmt.Println(n)
}
Output: 2 4 6 8 10
func Pipe ¶
Pipe pipes two workers together. It is a convenience function that calls Pipe2.
Example ¶
ctx := context.Background()
a := Of(1, 2, 3, 4, 5)
p := Pipe(a, Map(func(ctx context.Context, n int) (int, error) {
return n + 1, nil
}))
s := p(ctx, nil, nil)
for item := range s {
fmt.Println(item)
}
Output: 2 3 4 5 6
func Pipe10 ¶ added in v0.6.0
func Pipe10[A, B, C, D, E, F, G, H, I, J, K any](a Worker[A, B], b Worker[B, C], c Worker[C, D], d Worker[D, E], e Worker[E, F], f Worker[F, G], g Worker[G, H], h Worker[H, I], i Worker[I, J], j Worker[J, K]) Worker[A, K]
Pipe10 pipes ten workers together.
func Pipe4 ¶
func Pipe4[A, B, C, D, E any](a Worker[A, B], b Worker[B, C], c Worker[C, D], d Worker[D, E]) Worker[A, E]
Pipe4 pipes four workers together.
func Pipe5 ¶
func Pipe5[A, B, C, D, E, F any](a Worker[A, B], b Worker[B, C], c Worker[C, D], d Worker[D, E], e Worker[E, F]) Worker[A, F]
Pipe5 pipes five workers together.
func Pipe6 ¶ added in v0.6.0
func Pipe6[A, B, C, D, E, F, G any](a Worker[A, B], b Worker[B, C], c Worker[C, D], d Worker[D, E], e Worker[E, F], f Worker[F, G]) Worker[A, G]
Pipe6 pipes six workers together.
func Pipe7 ¶ added in v0.6.0
func Pipe7[A, B, C, D, E, F, G, H any](a Worker[A, B], b Worker[B, C], c Worker[C, D], d Worker[D, E], e Worker[E, F], f Worker[F, G], g Worker[G, H]) Worker[A, H]
Pipe7 pipes seven workers together.