goflow

package module
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2026 License: MIT Imports: 6 Imported by: 0

README

Build Status Go Report Card GoDoc

goflow

goflow

Type-safe, composable stream processing for Go.

goflow provides a generic Stream[T] type backed by channels and context. It ships with functional operators (Map, Filter, Reduce, FlatMap) and concurrency primitives (FanOut, FanIn, Process, Tee). Built on gofuncy for goroutine management with OpenTelemetry tracing.

Install

go get github.com/foomo/goflow

Quick Example

package main

import (
    "context"
    "fmt"

    "github.com/foomo/goflow"
)

func main() {
    ctx := context.Background()

    result := goflow.Map(
        goflow.Of(ctx, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10).
            Filter(func(_ context.Context, n int) bool {
                return n%2 == 0
            }),
        func(_ context.Context, n int) (int, error) {
            return n * n, nil
        },
    ).Collect()

    fmt.Println(result) // [4 16 36 64 100]
}

Features

  • Generic Stream[T] with compile-time type safety
  • Composable functional operators (Map, Filter, FlatMap, Reduce, and more)
  • Built-in concurrency (FanOut, FanIn, FanMap, Process, Tee)
  • Context-aware cancellation and timeout propagation
  • Channel buffering control via Pipe
  • iter.Seq/iter.Seq2 integration
  • OpenTelemetry tracing via gofuncy

Documentation

How to Contribute

Contributions are welcome! Please read the contributing guide.

Contributors

License

Distributed under MIT License, please see the license file within the code for more details.

Made with ♥ foomo by bestbytes

Documentation

Overview

Example (Shutdown_cancel)

Example_shutdown_cancel demonstrates manually cancelling a pipeline.

package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())

	s := stream.FromFunc(ctx, 0, func(ctx context.Context, send func(int) error) error {
		for i := 1; ; i++ {
			if err := send(i); err != nil {
				return err
			}

			if i == 3 {
				cancel()

				return nil
			}
		}
	})

	results := s.Collect()
	fmt.Println(results)
}
Output:
[1 2 3]
Example (Shutdown_timeout)

Example_shutdown_timeout demonstrates using a context timeout to automatically stop a pipeline after a deadline.

package main

import (
	"context"
	"fmt"
	"sync/atomic"
	"time"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	var i atomic.Int64

	results := stream.Generate(ctx, func() int64 {
		return i.Add(1)
	}).Take(1000).Collect()

	// The pipeline stops when the timeout fires.
	// Collect returns whatever was gathered before cancellation.
	fmt.Println(len(results) > 0 && len(results) <= 1000)
}
Output:
true

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func PipeFunc

func PipeFunc[T any](ctx context.Context, fn func(context.Context, Stream[T]) error, opts ...gofuncy.GoOption) func(context.Context, T) error

PipeFunc creates a Pipe and launches the consumer fn in a gofuncy.Go goroutine. Returns only the send handler.

func Reduce

func Reduce[T, U any](s Stream[T], initial U, fn func(context.Context, U, T) (U, error)) (U, error)

Reduce folds all elements into a single value using fn. Returns the accumulated result or the first error from fn.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	sum, _ := stream.Reduce(stream.Of(ctx, 1, 2, 3, 4, 5), 0, func(_ context.Context, acc, n int) (int, error) {
		return acc + n, nil
	})
	fmt.Println(sum)
}
Output:
15

func ToMap

func ToMap[T any, K comparable, V any](s Stream[T], key func(T) K, value func(T) V) map[K]V

ToMap collects all stream elements into a map using the key and value functions. If duplicate keys occur, the last value wins.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	got := stream.ToMap(
		stream.Of(ctx, "a", "bb"),
		func(s string) string { return s },
		func(s string) int { return len(s) },
	)
	fmt.Println(got["a"], got["bb"])
}
Output:
1 2

Types

type KeyValue

type KeyValue[K comparable, V any] struct {
	Key   K
	Value V
}

KeyValue holds a single key-value pair from a map.

type Stream

type Stream[T any] struct {
	// contains filtered or unexported fields
}

func Concat

func Concat[T any](streams ...Stream[T]) Stream[T]

Concat returns a stream that emits all elements from each input stream in order: first all elements from streams[0], then streams[1], etc. Uses the context and options from the first stream.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	got := stream.Concat(
		stream.Of(ctx, 1, 2),
		stream.Of(ctx, 3, 4),
	).Collect()
	fmt.Println(got)
}
Output:
[1 2 3 4]

func Empty

func Empty[T any]() Stream[T]
Example
package main

import (
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	fmt.Println(stream.Empty[int]().Count())
}
Output:
0

func FanIn

func FanIn[T any](streams []Stream[T]) Stream[T]

FanIn combines multiple streams into a single stream. Elements arrive in non-deterministic order as they become available. Uses the context and options from the first stream.

func FanMap

func FanMap[T, U any](s Stream[T], n int, fn func(context.Context, T) (U, error)) Stream[U]

FanMap fans out a stream into n partitions, maps each concurrently, and fans in the results. Output order is non-deterministic.

func FanMapFilter

func FanMapFilter[T, U any](s Stream[T], n int, fn func(context.Context, T) (U, bool, error)) Stream[U]

FanMapFilter fans out, applies MapFilter concurrently, and fans in the results.

Example
package main

import (
	"context"
	"fmt"
	"strconv"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	got := stream.FanMapFilter(stream.Of(ctx, 1, 2, 3, 4, 5, 6), 2, func(_ context.Context, n int) (string, bool, error) {
		if n%2 == 0 {
			return "", false, nil
		}

		return strconv.Itoa(n * 10), true, nil
	}).Collect()
	fmt.Println(len(got))
}
Output:
3

func FlatMap

func FlatMap[T, U any](s Stream[T], fn func(context.Context, T) Stream[U]) Stream[U]

FlatMap applies fn to each element of the source stream, producing a sub-stream per element, and flattens the results into a single output stream sequentially.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	got := stream.FlatMap(stream.Of(ctx, 1, 2, 3), func(ctx context.Context, n int) stream.Stream[int] {
		return stream.Of(ctx, n, n*10)
	}).Collect()
	fmt.Println(got)
}
Output:
[1 10 2 20 3 30]

func Flatten

func Flatten[T any](s Stream[[]T]) Stream[T]

Flatten flattens a stream of slices into a stream of individual elements.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	chunked := stream.Split(stream.Of(ctx, 1, 2, 3, 4, 5), 2)
	got := stream.Flatten(chunked).Collect()
	fmt.Println(got)
}
Output:
[1 2 3 4 5]

func From

func From[T any](ctx context.Context, source <-chan T) Stream[T]

From wraps an existing channel as a Stream with the given context.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()

	ch := make(chan int, 3)
	ch <- 10

	ch <- 20

	ch <- 30

	close(ch)

	fmt.Println(stream.From(ctx, ch).Collect())
}
Output:
[10 20 30]

func FromFunc

func FromFunc[T any](ctx context.Context, bufSize int, fn func(ctx context.Context, send func(T) error) error, opts ...gofuncy.GoOption) Stream[T]

FromFunc creates a Stream from a blocking function that sends items via the provided send callback. The function should block until it is done producing items and must respect context cancellation.

The stream closes automatically when fn returns. If fn returns a non-nil error it is handled by gofuncy.Go (logged via the configured error handler).

bufSize controls backpressure: a full buffer blocks the send callback until the stream consumer catches up.

Example — bridging a message subscriber into a stream:

s := stream.FromFunc(ctx, 16, func(ctx context.Context, send func(Event) error) error {
    return sub.Subscribe(ctx, "events", func(ctx context.Context, msg courier.Message[Event]) error {
        return send(msg.Payload)
    })
})
Example

ExampleFromFunc demonstrates creating a stream from a blocking producer function.

package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())

	s := stream.FromFunc(ctx, 0, func(ctx context.Context, send func(string) error) error {
		for _, v := range []string{"alpha", "beta", "gamma"} {
			if err := send(v); err != nil {
				return err
			}
		}

		cancel()

		return nil
	})

	for _, v := range s.Collect() {
		fmt.Println(v)
	}
}
Output:
alpha
beta
gamma

func FromIter

func FromIter[T any](ctx context.Context, seq iter.Seq[T]) Stream[T]

FromIter creates a Stream from an iter.Seq by pushing elements into a channel.

Example
package main

import (
	"context"
	"fmt"
	"slices"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	seq := slices.Values([]int{10, 20, 30})
	fmt.Println(stream.FromIter(ctx, seq).Collect())
}
Output:
[10 20 30]

func Generate

func Generate[T any](ctx context.Context, fn func() T) Stream[T]

Generate returns an infinite stream where each element is produced by fn. The stream runs until the context is cancelled.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	var i int

	got := stream.Generate(ctx, func() int { i++; return i }).Take(3).Collect()
	fmt.Println(got)
}
Output:
[1 2 3]

func Iterate

func Iterate[T any](ctx context.Context, seed T, fn func(T) T) Stream[T]

Iterate returns an infinite stream starting with seed, then applying fn repeatedly: seed, fn(seed), fn(fn(seed)), ... The stream runs until the context is cancelled.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	got := stream.Iterate(ctx, 1, func(n int) int { return n * 2 }).Take(5).Collect()
	fmt.Println(got)
}
Output:
[1 2 4 8 16]

func Map

func Map[T, U any](s Stream[T], fn func(context.Context, T) (U, error)) Stream[U]

Map returns a new Stream by applying fn to each element of the source stream. If fn returns an error, the stream closes and the error is handled by gofuncy.Go.

Example
package main

import (
	"context"
	"fmt"
	"strconv"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	got := stream.Map(stream.Of(ctx, 1, 2, 3), func(_ context.Context, n int) (string, error) {
		return strconv.Itoa(n), nil
	}).Collect()
	fmt.Println(got)
}
Output:
[1 2 3]

func MapEach

func MapEach[T, U any](streams []Stream[T], fn func(context.Context, T) (U, error)) []Stream[U]

MapEach applies Map to each stream in a slice, returning a slice of transformed streams.

Example
package main

import (
	"context"
	"fmt"
	"strconv"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	s1 := stream.Of(ctx, 1, 2, 3)
	s2 := stream.Of(ctx, 4, 5, 6)
	mapped := stream.MapEach([]stream.Stream[int]{s1, s2}, func(_ context.Context, n int) (string, error) {
		return strconv.Itoa(n), nil
	})
	fmt.Println(mapped[0].Collect())
	fmt.Println(mapped[1].Collect())
}
Output:
[1 2 3]
[4 5 6]

func MapFilter

func MapFilter[T, U any](s Stream[T], fn func(context.Context, T) (U, bool, error)) Stream[U]

MapFilter maps and filters elements. The bool controls emission: (val, true, nil) emits val; (_, false, nil) skips the item; (_, _, err) stops the stream.

Example
package main

import (
	"context"
	"fmt"
	"strconv"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	got := stream.MapFilter(stream.Of(ctx, 1, 2, 3, 4, 5), func(_ context.Context, n int) (string, bool, error) {
		if n%2 == 0 {
			return "", false, nil
		}

		return strconv.Itoa(n), true, nil
	}).Collect()
	fmt.Println(got)
}
Output:
[1 3 5]

func MapFilterEach

func MapFilterEach[T, U any](streams []Stream[T], fn func(context.Context, T) (U, bool, error)) []Stream[U]

MapFilterEach applies MapFilter to each stream in a slice.

Example
package main

import (
	"context"
	"fmt"
	"strconv"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	s1 := stream.Of(ctx, 1, 2, 3)
	s2 := stream.Of(ctx, 4, 5, 6)
	mapped := stream.MapFilterEach([]stream.Stream[int]{s1, s2}, func(_ context.Context, n int) (string, bool, error) {
		if n%2 == 0 {
			return "", false, nil
		}

		return strconv.Itoa(n), true, nil
	})
	fmt.Println(mapped[0].Collect())
	fmt.Println(mapped[1].Collect())
}
Output:
[1 3]
[5]

func Of

func Of[T any](ctx context.Context, items ...T) Stream[T]

Of returns a Stream based on the given elements.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	got := stream.Of(ctx, 1, 2, 3).Collect()
	fmt.Println(got)
}
Output:
[1 2 3]

func OfMap

func OfMap[K comparable, V any](ctx context.Context, m map[K]V) Stream[KeyValue[K, V]]

OfMap returns a Stream of KeyValue pairs from the given map. Iteration order is non-deterministic, matching Go's map semantics.

func Pipe

func Pipe[T any](ctx context.Context, bufferSize ...int) (func(context.Context, T) error, Stream[T])

Pipe creates a writable stream entry point. Returns a send function and the readable stream. The send function returns ctx.Err() if the context is cancelled. The channel is closed when ctx is done.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	send, s := stream.Pipe[int](ctx)

	go func() {
		_ = send(ctx, 1)
		_ = send(ctx, 2)
		_ = send(ctx, 3)

		cancel()
	}()

	fmt.Println(s.Collect())
}
Output:
[1 2 3]

func Split

func Split[T any](s Stream[T], n int) Stream[[]T]

Split groups consecutive elements into batches of size n. The last batch may contain fewer than n elements.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	got := stream.Split(stream.Of(ctx, 1, 2, 3, 4, 5), 2).Collect()
	fmt.Println(got)
}
Output:
[[1 2] [3 4] [5]]

func Window

func Window[T any](s Stream[T], n int) Stream[[]T]

Window emits sliding windows of n consecutive elements. If the source has fewer than n elements, no windows are emitted.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	got := stream.Window(stream.Of(ctx, 1, 2, 3, 4, 5), 3).Collect()
	fmt.Println(got)
}
Output:
[[1 2 3] [2 3 4] [3 4 5]]

func (Stream[T]) AllMatch

func (s Stream[T]) AllMatch(fn func(context.Context, T) bool) bool

AllMatch returns true if all elements match the predicate. Short-circuits on the first non-matching element. Returns true for an empty stream.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	fmt.Println(stream.Of(ctx, 2, 4, 6).AllMatch(func(_ context.Context, n int) bool { return n%2 == 0 }))
}
Output:
true

func (Stream[T]) AnyMatch

func (s Stream[T]) AnyMatch(fn func(context.Context, T) bool) bool

AnyMatch returns true if any element matches the predicate. Short-circuits on the first matching element. Returns false for an empty stream.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	fmt.Println(stream.Of(ctx, 1, 2, 3).AnyMatch(func(_ context.Context, n int) bool { return n > 2 }))
}
Output:
true

func (Stream[T]) Chan

func (s Stream[T]) Chan() <-chan T

func (Stream[T]) Collect

func (s Stream[T]) Collect() []T
Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	fmt.Println(stream.Of(ctx, "a", "b", "c").Collect())
}
Output:
[a b c]

func (Stream[T]) Context

func (s Stream[T]) Context() context.Context

Context returns the stream's bound context.

func (Stream[T]) Count

func (s Stream[T]) Count() int

func (Stream[T]) Distinct

func (s Stream[T]) Distinct(key func(T) string) Stream[T]

Distinct deduplicates elements using a key function. First occurrence wins.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	got := stream.Of(ctx, "a", "b", "a", "c", "b").Distinct(func(s string) string {
		return s
	}).Collect()
	fmt.Println(got)
}
Output:
[a b c]

func (Stream[T]) FanOut

func (s Stream[T]) FanOut(n int) []Stream[T]

FanOut distributes elements round-robin across n output streams.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	parts := stream.Of(ctx, 1, 2, 3, 4, 5).FanOut(2)

	results := make([][]int, 2)
	done := make(chan struct{})

	go func() {
		results[1] = parts[1].Collect()

		close(done)
	}()

	results[0] = parts[0].Collect()

	<-done

	fmt.Println(results[0])
	fmt.Println(results[1])
}
Output:
[1 3 5]
[2 4]

func (Stream[T]) Filter

func (s Stream[T]) Filter(fn func(context.Context, T) bool) Stream[T]

Filter returns a stream containing only elements where fn returns true.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	got := stream.Of(ctx, 1, 2, 3, 4, 5, 6).Filter(func(_ context.Context, n int) bool {
		return n%2 == 0
	}).Collect()
	fmt.Println(got)
}
Output:
[2 4 6]

func (Stream[T]) FindFirst

func (s Stream[T]) FindFirst() (T, bool)

FindFirst returns the first element of the stream and true, or the zero value and false if the stream is empty.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	v, ok := stream.Of(ctx, 10, 20, 30).FindFirst()
	fmt.Println(v, ok)
}
Output:
10 true

func (Stream[T]) FindFirstMatch

func (s Stream[T]) FindFirstMatch(fn func(context.Context, T) bool) (T, bool)

FindFirstMatch returns the first element matching the predicate and true, or the zero value and false if no element matches.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	v, ok := stream.Of(ctx, 1, 2, 3, 4).FindFirstMatch(func(_ context.Context, n int) bool { return n > 2 })
	fmt.Println(v, ok)
}
Output:
3 true

func (Stream[T]) ForEach

func (s Stream[T]) ForEach(fn func(context.Context, T) error) error

ForEach consumes the stream, calling fn for each element. Returns the first error from fn or ctx, nil when fully consumed.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	_ = stream.Of(ctx, 1, 2, 3).ForEach(func(_ context.Context, v int) error {
		fmt.Println(v)
		return nil
	})
}
Output:
1
2
3

func (Stream[T]) Iter

func (s Stream[T]) Iter() iter.Seq[T]

Iter returns an iter.Seq that yields each element of the stream. The returned iterator drains the stream; it can only be used once.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	for v := range stream.Of(ctx, 1, 2, 3).Iter() {
		fmt.Println(v)
	}
}
Output:
1
2
3

func (Stream[T]) Iter2

func (s Stream[T]) Iter2() iter.Seq2[int, T]

Iter2 returns an iter.Seq2 that yields each element with its zero-based index. The returned iterator drains the stream; it can only be used once.

func (Stream[T]) Max

func (s Stream[T]) Max(cmp func(T, T) int) (T, bool)

Max returns the maximum element according to cmp and true, or the zero value and false if the stream is empty.

Example
package main

import (
	"cmp"
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	v, ok := stream.Of(ctx, 3, 1, 4, 1, 5).Max(cmp.Compare[int])
	fmt.Println(v, ok)
}
Output:
5 true

func (Stream[T]) Min

func (s Stream[T]) Min(cmp func(T, T) int) (T, bool)

Min returns the minimum element according to cmp and true, or the zero value and false if the stream is empty.

Example
package main

import (
	"cmp"
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	v, ok := stream.Of(ctx, 3, 1, 4, 1, 5).Min(cmp.Compare[int])
	fmt.Println(v, ok)
}
Output:
1 true

func (Stream[T]) NoneMatch

func (s Stream[T]) NoneMatch(fn func(context.Context, T) bool) bool

NoneMatch returns true if no elements match the predicate. Short-circuits on the first matching element. Returns true for an empty stream.

func (Stream[T]) Peek

func (s Stream[T]) Peek(fn func(context.Context, T)) Stream[T]

Peek calls fn for each element as a side-effect and forwards the element unchanged.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()

	var peeked []int

	got := stream.Of(ctx, 1, 2, 3).Peek(func(_ context.Context, n int) {
		peeked = append(peeked, n)
	}).Collect()
	fmt.Println(got)
	fmt.Println(peeked)
}
Output:
[1 2 3]
[1 2 3]

func (Stream[T]) Process

func (s Stream[T]) Process(n int, fn func(context.Context, T) error, opts ...gofuncy.GroupOption) error

Process consumes the stream, dispatching each element to a worker pool of size n. All errors are collected and returned via errors.Join.

func (Stream[T]) Reverse

func (s Stream[T]) Reverse() Stream[T]

Reverse collects all elements and emits them in reverse order.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	got := stream.Of(ctx, 1, 2, 3).Reverse().Collect()
	fmt.Println(got)
}
Output:
[3 2 1]

func (Stream[T]) Skip

func (s Stream[T]) Skip(n int) Stream[T]

Skip drops the first n elements and emits the rest.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	got := stream.Of(ctx, 1, 2, 3, 4, 5).Skip(2).Collect()
	fmt.Println(got)
}
Output:
[3 4 5]

func (Stream[T]) Sort

func (s Stream[T]) Sort(cmp func(T, T) int) Stream[T]

Sort collects all elements, sorts them using cmp, and emits in sorted order.

Example
package main

import (
	"cmp"
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	got := stream.Of(ctx, 3, 1, 4, 1, 5).Sort(cmp.Compare[int]).Collect()
	fmt.Println(got)
}
Output:
[1 1 3 4 5]

func (Stream[T]) Take

func (s Stream[T]) Take(n int) Stream[T]

Take emits the first n elements then closes the stream.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	got := stream.Of(ctx, 1, 2, 3, 4, 5).Take(3).Collect()
	fmt.Println(got)
}
Output:
[1 2 3]

func (Stream[T]) Tee

func (s Stream[T]) Tee(n int) []Stream[T]

Tee broadcasts every element to n output streams. Unlike FanOut which round-robins, Tee sends each element to all streams.

Example
package main

import (
	"context"
	"fmt"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	streams := stream.Of(ctx, 1, 2, 3).Tee(2)

	results := make([][]int, 2)
	done := make(chan struct{})

	go func() {
		results[1] = streams[1].Collect()

		close(done)
	}()

	results[0] = streams[0].Collect()

	<-done

	fmt.Println(results[0])
	fmt.Println(results[1])
}
Output:
[1 2 3]
[1 2 3]

func (Stream[T]) Throttle

func (s Stream[T]) Throttle(d time.Duration) Stream[T]

Throttle rate-limits the stream to at most one element per duration d.

Example
package main

import (
	"context"
	"fmt"
	"time"

	stream "github.com/foomo/goflow"
)

func main() {
	ctx := context.Background()
	got := stream.Of(ctx, 1, 2, 3).Throttle(1 * time.Millisecond).Collect()
	fmt.Println(got)
}
Output:
[1 2 3]

func (Stream[T]) WithOptions

func (s Stream[T]) WithOptions(opts ...gofuncy.GoOption) Stream[T]

WithOptions returns a shallow copy of the stream with the given options appended.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL