Documentation
¶
Index ¶
- func PipeFunc[T any](ctx context.Context, fn func(context.Context, Stream[T]) error, ...) func(context.Context, T) error
- func Reduce[T, U any](s Stream[T], initial U, fn func(context.Context, U, T) (U, error)) (U, error)
- func ToMap[T any, K comparable, V any](s Stream[T], key func(T) K, value func(T) V) map[K]V
- type KeyValue
- type Stream
- func Concat[T any](streams ...Stream[T]) Stream[T]
- func Empty[T any]() Stream[T]
- func FanIn[T any](streams []Stream[T]) Stream[T]
- func FanMap[T, U any](s Stream[T], n int, fn func(context.Context, T) (U, error)) Stream[U]
- func FanMapFilter[T, U any](s Stream[T], n int, fn func(context.Context, T) (U, bool, error)) Stream[U]
- func FlatMap[T, U any](s Stream[T], fn func(context.Context, T) Stream[U]) Stream[U]
- func Flatten[T any](s Stream[[]T]) Stream[T]
- func From[T any](ctx context.Context, source <-chan T) Stream[T]
- func FromFunc[T any](ctx context.Context, bufSize int, ...) Stream[T]
- func FromIter[T any](ctx context.Context, seq iter.Seq[T]) Stream[T]
- func Generate[T any](ctx context.Context, fn func() T) Stream[T]
- func Iterate[T any](ctx context.Context, seed T, fn func(T) T) Stream[T]
- func Map[T, U any](s Stream[T], fn func(context.Context, T) (U, error)) Stream[U]
- func MapEach[T, U any](streams []Stream[T], fn func(context.Context, T) (U, error)) []Stream[U]
- func MapFilter[T, U any](s Stream[T], fn func(context.Context, T) (U, bool, error)) Stream[U]
- func MapFilterEach[T, U any](streams []Stream[T], fn func(context.Context, T) (U, bool, error)) []Stream[U]
- func Of[T any](ctx context.Context, items ...T) Stream[T]
- func OfMap[K comparable, V any](ctx context.Context, m map[K]V) Stream[KeyValue[K, V]]
- func Pipe[T any](ctx context.Context, bufferSize ...int) (func(context.Context, T) error, Stream[T])
- func Split[T any](s Stream[T], n int) Stream[[]T]
- func Window[T any](s Stream[T], n int) Stream[[]T]
- func (s Stream[T]) AllMatch(fn func(context.Context, T) bool) bool
- func (s Stream[T]) AnyMatch(fn func(context.Context, T) bool) bool
- func (s Stream[T]) Chan() <-chan T
- func (s Stream[T]) Collect() []T
- func (s Stream[T]) Context() context.Context
- func (s Stream[T]) Count() int
- func (s Stream[T]) Distinct(key func(T) string) Stream[T]
- func (s Stream[T]) FanOut(n int) []Stream[T]
- func (s Stream[T]) Filter(fn func(context.Context, T) bool) Stream[T]
- func (s Stream[T]) FindFirst() (T, bool)
- func (s Stream[T]) FindFirstMatch(fn func(context.Context, T) bool) (T, bool)
- func (s Stream[T]) ForEach(fn func(context.Context, T) error) error
- func (s Stream[T]) Iter() iter.Seq[T]
- func (s Stream[T]) Iter2() iter.Seq2[int, T]
- func (s Stream[T]) Max(cmp func(T, T) int) (T, bool)
- func (s Stream[T]) Min(cmp func(T, T) int) (T, bool)
- func (s Stream[T]) NoneMatch(fn func(context.Context, T) bool) bool
- func (s Stream[T]) Peek(fn func(context.Context, T)) Stream[T]
- func (s Stream[T]) Process(n int, fn func(context.Context, T) error, opts ...gofuncy.GroupOption) error
- func (s Stream[T]) Reverse() Stream[T]
- func (s Stream[T]) Skip(n int) Stream[T]
- func (s Stream[T]) Sort(cmp func(T, T) int) Stream[T]
- func (s Stream[T]) Take(n int) Stream[T]
- func (s Stream[T]) Tee(n int) []Stream[T]
- func (s Stream[T]) Throttle(d time.Duration) Stream[T]
- func (s Stream[T]) WithOptions(opts ...gofuncy.GoOption) Stream[T]
Examples ¶
- Concat
- Empty
- FanMapFilter
- FlatMap
- Flatten
- From
- FromFunc
- FromIter
- Generate
- Iterate
- Map
- MapEach
- MapFilter
- MapFilterEach
- Of
- Pipe
- Reduce
- Split
- Stream.AllMatch
- Stream.AnyMatch
- Stream.Collect
- Stream.Distinct
- Stream.FanOut
- Stream.Filter
- Stream.FindFirst
- Stream.FindFirstMatch
- Stream.ForEach
- Stream.Iter
- Stream.Max
- Stream.Min
- Stream.Peek
- Stream.Reverse
- Stream.Skip
- Stream.Sort
- Stream.Take
- Stream.Tee
- Stream.Throttle
- ToMap
- Window
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func PipeFunc ¶
func PipeFunc[T any](ctx context.Context, fn func(context.Context, Stream[T]) error, opts ...gofuncy.GoOption) func(context.Context, T) error
PipeFunc creates a Pipe and launches the consumer fn in a gofuncy.Go goroutine. Returns only the send handler.
func Reduce ¶
Reduce folds all elements into a single value using fn. Returns the accumulated result or the first error from fn.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
sum, _ := stream.Reduce(stream.Of(ctx, 1, 2, 3, 4, 5), 0, func(_ context.Context, acc, n int) (int, error) {
return acc + n, nil
})
fmt.Println(sum)
}
Output: 15
func ToMap ¶
func ToMap[T any, K comparable, V any](s Stream[T], key func(T) K, value func(T) V) map[K]V
ToMap collects all stream elements into a map using the key and value functions. If duplicate keys occur, the last value wins.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
got := stream.ToMap(
stream.Of(ctx, "a", "bb"),
func(s string) string { return s },
func(s string) int { return len(s) },
)
fmt.Println(got["a"], got["bb"])
}
Output: 1 2
Types ¶
type KeyValue ¶
type KeyValue[K comparable, V any] struct { Key K Value V }
KeyValue holds a single key-value pair from a map.
type Stream ¶
type Stream[T any] struct { // contains filtered or unexported fields }
func Concat ¶
Concat returns a stream that emits all elements from each input stream in order: first all elements from streams[0], then streams[1], etc. Uses the context and options from the first stream.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
got := stream.Concat(
stream.Of(ctx, 1, 2),
stream.Of(ctx, 3, 4),
).Collect()
fmt.Println(got)
}
Output: [1 2 3 4]
func Empty ¶
Example ¶
package main
import (
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
fmt.Println(stream.Empty[int]().Count())
}
Output: 0
func FanIn ¶
FanIn combines multiple streams into a single stream. Elements arrive in non-deterministic order as they become available. Uses the context and options from the first stream.
func FanMap ¶
FanMap fans out a stream into n partitions, maps each concurrently, and fans in the results. Output order is non-deterministic.
func FanMapFilter ¶
func FanMapFilter[T, U any](s Stream[T], n int, fn func(context.Context, T) (U, bool, error)) Stream[U]
FanMapFilter fans out, applies MapFilter concurrently, and fans in the results.
Example ¶
package main
import (
"context"
"fmt"
"strconv"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
got := stream.FanMapFilter(stream.Of(ctx, 1, 2, 3, 4, 5, 6), 2, func(_ context.Context, n int) (string, bool, error) {
if n%2 == 0 {
return "", false, nil
}
return strconv.Itoa(n * 10), true, nil
}).Collect()
fmt.Println(len(got))
}
Output: 3
func FlatMap ¶
FlatMap applies fn to each element of the source stream, producing a sub-stream per element, and flattens the results into a single output stream sequentially.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
got := stream.FlatMap(stream.Of(ctx, 1, 2, 3), func(ctx context.Context, n int) stream.Stream[int] {
return stream.Of(ctx, n, n*10)
}).Collect()
fmt.Println(got)
}
Output: [1 10 2 20 3 30]
func Flatten ¶
Flatten flattens a stream of slices into a stream of individual elements.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
chunked := stream.Split(stream.Of(ctx, 1, 2, 3, 4, 5), 2)
got := stream.Flatten(chunked).Collect()
fmt.Println(got)
}
Output: [1 2 3 4 5]
func From ¶
From wraps an existing channel as a Stream with the given context.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
ch := make(chan int, 3)
ch <- 10
ch <- 20
ch <- 30
close(ch)
fmt.Println(stream.From(ctx, ch).Collect())
}
Output: [10 20 30]
func FromFunc ¶
func FromFunc[T any](ctx context.Context, bufSize int, fn func(ctx context.Context, send func(T) error) error, opts ...gofuncy.GoOption) Stream[T]
FromFunc creates a Stream from a blocking function that sends items via the provided send callback. The function should block until it is done producing items and must respect context cancellation.
The stream closes automatically when fn returns. If fn returns a non-nil error it is handled by gofuncy.Go (logged via the configured error handler).
bufSize controls backpressure: a full buffer blocks the send callback until the stream consumer catches up.
Example — bridging a message subscriber into a stream:
s := stream.FromFunc(ctx, 16, func(ctx context.Context, send func(Event) error) error {
return sub.Subscribe(ctx, "events", func(ctx context.Context, msg courier.Message[Event]) error {
return send(msg.Payload)
})
})
Example ¶
ExampleFromFunc demonstrates creating a stream from a blocking producer function.
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
s := stream.FromFunc(ctx, 0, func(ctx context.Context, send func(string) error) error {
for _, v := range []string{"alpha", "beta", "gamma"} {
if err := send(v); err != nil {
return err
}
}
cancel()
return nil
})
for _, v := range s.Collect() {
fmt.Println(v)
}
}
Output: alpha beta gamma
func FromIter ¶
FromIter creates a Stream from an iter.Seq by pushing elements into a channel.
Example ¶
package main
import (
"context"
"fmt"
"slices"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
seq := slices.Values([]int{10, 20, 30})
fmt.Println(stream.FromIter(ctx, seq).Collect())
}
Output: [10 20 30]
func Generate ¶
Generate returns an infinite stream where each element is produced by fn. The stream runs until the context is cancelled.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var i int
got := stream.Generate(ctx, func() int { i++; return i }).Take(3).Collect()
fmt.Println(got)
}
Output: [1 2 3]
func Iterate ¶
Iterate returns an infinite stream starting with seed, then applying fn repeatedly: seed, fn(seed), fn(fn(seed)), ... The stream runs until the context is cancelled.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
got := stream.Iterate(ctx, 1, func(n int) int { return n * 2 }).Take(5).Collect()
fmt.Println(got)
}
Output: [1 2 4 8 16]
func Map ¶
Map returns a new Stream by applying fn to each element of the source stream. If fn returns an error, the stream closes and the error is handled by gofuncy.Go.
Example ¶
package main
import (
"context"
"fmt"
"strconv"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
got := stream.Map(stream.Of(ctx, 1, 2, 3), func(_ context.Context, n int) (string, error) {
return strconv.Itoa(n), nil
}).Collect()
fmt.Println(got)
}
Output: [1 2 3]
func MapEach ¶
MapEach applies Map to each stream in a slice, returning a slice of transformed streams.
Example ¶
package main
import (
"context"
"fmt"
"strconv"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
s1 := stream.Of(ctx, 1, 2, 3)
s2 := stream.Of(ctx, 4, 5, 6)
mapped := stream.MapEach([]stream.Stream[int]{s1, s2}, func(_ context.Context, n int) (string, error) {
return strconv.Itoa(n), nil
})
fmt.Println(mapped[0].Collect())
fmt.Println(mapped[1].Collect())
}
Output: [1 2 3] [4 5 6]
func MapFilter ¶
MapFilter maps and filters elements. The bool controls emission: (val, true, nil) emits val; (_, false, nil) skips the item; (_, _, err) stops the stream.
Example ¶
package main
import (
"context"
"fmt"
"strconv"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
got := stream.MapFilter(stream.Of(ctx, 1, 2, 3, 4, 5), func(_ context.Context, n int) (string, bool, error) {
if n%2 == 0 {
return "", false, nil
}
return strconv.Itoa(n), true, nil
}).Collect()
fmt.Println(got)
}
Output: [1 3 5]
func MapFilterEach ¶
func MapFilterEach[T, U any](streams []Stream[T], fn func(context.Context, T) (U, bool, error)) []Stream[U]
MapFilterEach applies MapFilter to each stream in a slice.
Example ¶
package main
import (
"context"
"fmt"
"strconv"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
s1 := stream.Of(ctx, 1, 2, 3)
s2 := stream.Of(ctx, 4, 5, 6)
mapped := stream.MapFilterEach([]stream.Stream[int]{s1, s2}, func(_ context.Context, n int) (string, bool, error) {
if n%2 == 0 {
return "", false, nil
}
return strconv.Itoa(n), true, nil
})
fmt.Println(mapped[0].Collect())
fmt.Println(mapped[1].Collect())
}
Output: [1 3] [5]
func Of ¶
Of returns a Stream based on the given elements.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
got := stream.Of(ctx, 1, 2, 3).Collect()
fmt.Println(got)
}
Output: [1 2 3]
func OfMap ¶
OfMap returns a Stream of KeyValue pairs from the given map. Iteration order is non-deterministic, matching Go's map semantics.
func Pipe ¶
func Pipe[T any](ctx context.Context, bufferSize ...int) (func(context.Context, T) error, Stream[T])
Pipe creates a writable stream entry point. Returns a send function and the readable stream. The send function returns ctx.Err() if the context is cancelled. The channel is closed when ctx is done.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
send, s := stream.Pipe[int](ctx)
go func() {
_ = send(ctx, 1)
_ = send(ctx, 2)
_ = send(ctx, 3)
cancel()
}()
fmt.Println(s.Collect())
}
Output: [1 2 3]
func Split ¶
Split groups consecutive elements into batches of size n. The last batch may contain fewer than n elements.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
got := stream.Split(stream.Of(ctx, 1, 2, 3, 4, 5), 2).Collect()
fmt.Println(got)
}
Output: [[1 2] [3 4] [5]]
func Window ¶
Window emits sliding windows of n consecutive elements. If the source has fewer than n elements, no windows are emitted.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
got := stream.Window(stream.Of(ctx, 1, 2, 3, 4, 5), 3).Collect()
fmt.Println(got)
}
Output: [[1 2 3] [2 3 4] [3 4 5]]
func (Stream[T]) AllMatch ¶
AllMatch returns true if all elements match the predicate. Short-circuits on the first non-matching element. Returns true for an empty stream.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
fmt.Println(stream.Of(ctx, 2, 4, 6).AllMatch(func(_ context.Context, n int) bool { return n%2 == 0 }))
}
Output: true
func (Stream[T]) AnyMatch ¶
AnyMatch returns true if any element matches the predicate. Short-circuits on the first matching element. Returns false for an empty stream.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
fmt.Println(stream.Of(ctx, 1, 2, 3).AnyMatch(func(_ context.Context, n int) bool { return n > 2 }))
}
Output: true
func (Stream[T]) Collect ¶
func (s Stream[T]) Collect() []T
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
fmt.Println(stream.Of(ctx, "a", "b", "c").Collect())
}
Output: [a b c]
func (Stream[T]) Distinct ¶
Distinct deduplicates elements using a key function. First occurrence wins.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
got := stream.Of(ctx, "a", "b", "a", "c", "b").Distinct(func(s string) string {
return s
}).Collect()
fmt.Println(got)
}
Output: [a b c]
func (Stream[T]) FanOut ¶
FanOut distributes elements round-robin across n output streams.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
parts := stream.Of(ctx, 1, 2, 3, 4, 5).FanOut(2)
results := make([][]int, 2)
done := make(chan struct{})
go func() {
results[1] = parts[1].Collect()
close(done)
}()
results[0] = parts[0].Collect()
<-done
fmt.Println(results[0])
fmt.Println(results[1])
}
Output: [1 3 5] [2 4]
func (Stream[T]) Filter ¶
Filter returns a stream containing only elements where fn returns true.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
got := stream.Of(ctx, 1, 2, 3, 4, 5, 6).Filter(func(_ context.Context, n int) bool {
return n%2 == 0
}).Collect()
fmt.Println(got)
}
Output: [2 4 6]
func (Stream[T]) FindFirst ¶
FindFirst returns the first element of the stream and true, or the zero value and false if the stream is empty.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
v, ok := stream.Of(ctx, 10, 20, 30).FindFirst()
fmt.Println(v, ok)
}
Output: 10 true
func (Stream[T]) FindFirstMatch ¶
FindFirstMatch returns the first element matching the predicate and true, or the zero value and false if no element matches.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
v, ok := stream.Of(ctx, 1, 2, 3, 4).FindFirstMatch(func(_ context.Context, n int) bool { return n > 2 })
fmt.Println(v, ok)
}
Output: 3 true
func (Stream[T]) ForEach ¶
ForEach consumes the stream, calling fn for each element. Returns the first error from fn or ctx, nil when fully consumed.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
_ = stream.Of(ctx, 1, 2, 3).ForEach(func(_ context.Context, v int) error {
fmt.Println(v)
return nil
})
}
Output: 1 2 3
func (Stream[T]) Iter ¶
Iter returns an iter.Seq that yields each element of the stream. The returned iterator drains the stream; it can only be used once.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
for v := range stream.Of(ctx, 1, 2, 3).Iter() {
fmt.Println(v)
}
}
Output: 1 2 3
func (Stream[T]) Iter2 ¶
Iter2 returns an iter.Seq2 that yields each element with its zero-based index. The returned iterator drains the stream; it can only be used once.
func (Stream[T]) Max ¶
Max returns the maximum element according to cmp and true, or the zero value and false if the stream is empty.
Example ¶
package main
import (
"cmp"
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
v, ok := stream.Of(ctx, 3, 1, 4, 1, 5).Max(cmp.Compare[int])
fmt.Println(v, ok)
}
Output: 5 true
func (Stream[T]) Min ¶
Min returns the minimum element according to cmp and true, or the zero value and false if the stream is empty.
Example ¶
package main
import (
"cmp"
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
v, ok := stream.Of(ctx, 3, 1, 4, 1, 5).Min(cmp.Compare[int])
fmt.Println(v, ok)
}
Output: 1 true
func (Stream[T]) NoneMatch ¶
NoneMatch returns true if no elements match the predicate. Short-circuits on the first matching element. Returns true for an empty stream.
func (Stream[T]) Peek ¶
Peek calls fn for each element as a side-effect and forwards the element unchanged.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
var peeked []int
got := stream.Of(ctx, 1, 2, 3).Peek(func(_ context.Context, n int) {
peeked = append(peeked, n)
}).Collect()
fmt.Println(got)
fmt.Println(peeked)
}
Output: [1 2 3] [1 2 3]
func (Stream[T]) Process ¶
func (s Stream[T]) Process(n int, fn func(context.Context, T) error, opts ...gofuncy.GroupOption) error
Process consumes the stream, dispatching each element to a worker pool of size n. All errors are collected and returned via errors.Join.
func (Stream[T]) Reverse ¶
Reverse collects all elements and emits them in reverse order.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
got := stream.Of(ctx, 1, 2, 3).Reverse().Collect()
fmt.Println(got)
}
Output: [3 2 1]
func (Stream[T]) Skip ¶
Skip drops the first n elements and emits the rest.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
got := stream.Of(ctx, 1, 2, 3, 4, 5).Skip(2).Collect()
fmt.Println(got)
}
Output: [3 4 5]
func (Stream[T]) Sort ¶
Sort collects all elements, sorts them using cmp, and emits in sorted order.
Example ¶
package main
import (
"cmp"
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
got := stream.Of(ctx, 3, 1, 4, 1, 5).Sort(cmp.Compare[int]).Collect()
fmt.Println(got)
}
Output: [1 1 3 4 5]
func (Stream[T]) Take ¶
Take emits the first n elements then closes the stream.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
got := stream.Of(ctx, 1, 2, 3, 4, 5).Take(3).Collect()
fmt.Println(got)
}
Output: [1 2 3]
func (Stream[T]) Tee ¶
Tee broadcasts every element to n output streams. Unlike FanOut which round-robins, Tee sends each element to all streams.
Example ¶
package main
import (
"context"
"fmt"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
streams := stream.Of(ctx, 1, 2, 3).Tee(2)
results := make([][]int, 2)
done := make(chan struct{})
go func() {
results[1] = streams[1].Collect()
close(done)
}()
results[0] = streams[0].Collect()
<-done
fmt.Println(results[0])
fmt.Println(results[1])
}
Output: [1 2 3] [1 2 3]
func (Stream[T]) Throttle ¶
Throttle rate-limits the stream to at most one element per duration d.
Example ¶
package main
import (
"context"
"fmt"
"time"
stream "github.com/foomo/goflow"
)
func main() {
ctx := context.Background()
got := stream.Of(ctx, 1, 2, 3).Throttle(1 * time.Millisecond).Collect()
fmt.Println(got)
}
Output: [1 2 3]
Source Files
¶
- concat.go
- distinct.go
- fanin.go
- fanmap.go
- fanmapfilter.go
- fanout.go
- filter.go
- find.go
- flatmap.go
- flatten.go
- foreach.go
- fromfunc.go
- generate.go
- iter.go
- iterate.go
- map.go
- mapeach.go
- mapfilter.go
- mapfiltereach.go
- match.go
- minmax.go
- ofmap.go
- peek.go
- pipe.go
- process.go
- reduce.go
- reverse.go
- skip.go
- sort.go
- split.go
- stream.go
- take.go
- tee.go
- throttle.go
- tomap.go
- window.go