stream

package
v1.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2026 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package stream provides a powerful and expressive API for processing sequences of data in Go.

The stream API is inspired by Java 8 Streams and provides a functional programming approach to data processing with support for method chaining, lazy evaluation, and context-aware operations.

Core Concepts:

A Stream represents a sequence of elements supporting sequential operations. Streams are:

  • Lazy: computation is only performed when a terminal operation is initiated
  • Immutable: operations return new streams rather than modifying existing ones
  • Context-aware: all operations respect context cancellation and timeouts
  • Resource-managed: streams should be closed to release resources

Basic Usage:

// Create a stream from a slice
stream := stream.FromSlice([]int{1, 2, 3, 4, 5})
defer stream.Close()

// Chain operations and collect results
result, err := stream.
	Filter(func(x int) bool { return x%2 == 0 }). // Keep even numbers
	Map(func(x int) int { return x * 2 }).         // Double them
	ToSlice(context.Background())                  // Collect to slice

if err != nil {
	log.Fatal(err)
}

fmt.Println(result) // [4, 8]

Stream Creation:

Multiple ways to create streams:

// From slice
stream := stream.FromSlice([]string{"a", "b", "c"})

// From channel
ch := make(chan int, 3)
ch <- 1; ch <- 2; ch <- 3; close(ch)
stream := stream.FromChannel(ch)

// From generator function (infinite)
counter := 0
stream := stream.Generate(func() int {
	counter++
	return counter
})

// Empty stream
stream := stream.Empty[int]()

Intermediate Operations:

Intermediate operations are lazy and return new streams:

// Filter elements based on predicate
stream.Filter(func(x int) bool { return x > 0 })

// Transform elements
stream.Map(func(x int) int { return x * 2 })

// Transform to different type
stream.MapTo(func(x int) interface{} { return fmt.Sprintf("num-%d", x) })

// Flatten nested streams
stream.FlatMap(func(x int) Stream[int] {
	return stream.FromSlice([]int{x, x}) // Duplicate each element
})

// Remove duplicates
stream.Distinct()

// Sort elements
stream.Sorted(func(a, b int) int {
	if a < b { return -1 }
	if a > b { return 1 }
	return 0
})

// Skip first n elements
stream.Skip(5)

// Limit to n elements
stream.Limit(10)

// Peek at elements (for debugging)
stream.Peek(func(x int) { log.Printf("Processing: %d", x) })

Terminal Operations:

Terminal operations consume the stream and produce results:

// Iterate over elements
stream.ForEach(ctx, func(x int) {
	fmt.Println(x)
})

// Reduce to single value
sum, err := stream.Reduce(ctx, 0, func(acc, x int) int {
	return acc + x
})

// Collect to custom type
result, err := stream.Collect(
	ctx,
	func() interface{} { return &strings.Builder{} },
	func(acc interface{}, x string) {
		acc.(*strings.Builder).WriteString(x)
	},
	func(acc1, acc2 interface{}) interface{} { return acc1 },
)

// Convert to slice
slice, err := stream.ToSlice(ctx)

// Count elements
count, err := stream.Count(ctx)

// Find elements
first, found, err := stream.FindFirst(ctx)
any, found, err := stream.FindAny(ctx)

// Test predicates
hasAny, err := stream.AnyMatch(ctx, predicate)
hasAll, err := stream.AllMatch(ctx, predicate)
hasNone, err := stream.NoneMatch(ctx, predicate)

// Find extremes
min, found, err := stream.Min(ctx, comparator)
max, found, err := stream.Max(ctx, comparator)

Data Processing Patterns:

Filter-Map-Reduce Pattern:

result, err := stream.FromSlice(numbers).
	Filter(func(x int) bool { return x > 0 }).      // Keep positive
	Map(func(x int) int { return x * x }).           // Square them
	Reduce(ctx, 0, func(acc, x int) int {           // Sum squares
		return acc + x
	})

Data Transformation Pipeline:

emails, err := stream.FromSlice(usernames).
	Filter(func(name string) bool { return len(name) > 3 }).
	Map(func(name string) string {
		return strings.ToLower(name) + "@company.com"
	}).
	ToSlice(ctx)

Aggregation and Statistics:

numbers := stream.FromSlice([]int{1, 2, 3, 4, 5})

count, _ := numbers.Count(ctx)
sum, _ := numbers.Reduce(ctx, 0, func(a, b int) int { return a + b })
min, _, _ := numbers.Min(ctx, intComparator)
max, _, _ := numbers.Max(ctx, intComparator)

Text Processing:

words, err := stream.FromSlice(strings.Fields(text)).
	Filter(func(word string) bool { return len(word) > 2 }).
	Map(func(word string) string { return strings.ToLower(word) }).
	Distinct().
	Sorted(func(a, b string) int { return strings.Compare(a, b) }).
	ToSlice(ctx)

Infinite Streams:

// Generate fibonacci sequence
a, b := 0, 1
fibonacci := stream.Generate(func() int {
	a, b = b, a+b
	return a
})

first10, _ := fibonacci.Limit(10).ToSlice(ctx)

Error Handling:

All terminal operations return errors that should be checked:

result, err := stream.ToSlice(ctx)
if err != nil {
	if errors.Is(err, context.DeadlineExceeded) {
		log.Println("Operation timed out")
	} else if errors.Is(err, stream.ErrStreamClosed) {
		log.Println("Stream was closed")
	} else {
		log.Printf("Stream error: %v", err)
	}
	return
}

Context and Cancellation:

All terminal operations accept a context for cancellation and timeouts:

// With timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

result, err := stream.ToSlice(ctx)

// With cancellation
ctx, cancel := context.WithCancel(context.Background())
go func() {
	time.Sleep(time.Second)
	cancel() // Cancel after 1 second
}()

err := stream.ForEach(ctx, processFunction)

Resource Management:

Streams should be closed to release resources:

stream := stream.FromSlice(data)
defer stream.Close() // Always close streams

// Or use in function scope
func processData(data []int) error {
	stream := stream.FromSlice(data)
	defer stream.Close()

	return stream.ForEach(ctx, process)
}

Performance Characteristics:

  • Lazy evaluation: operations are only executed when terminal operations are called
  • Memory efficient: elements are processed one at a time in most cases
  • Some operations (like Sort) require collecting all elements
  • Parallel operations: future versions may support parallel processing
  • Overhead: functional style adds some overhead compared to imperative loops

Common Patterns:

Data Validation Pipeline:

validData, err := stream.FromSlice(rawData).
	Filter(func(item DataItem) bool { return item.IsValid() }).
	Map(func(item DataItem) DataItem { return item.Normalize() }).
	Filter(func(item DataItem) bool { return item.PassesBusinessRules() }).
	ToSlice(ctx)

ETL (Extract, Transform, Load):

err := stream.FromChannel(inputChannel).
	Map(func(raw RawData) ProcessedData { return transform(raw) }).
	Filter(func(data ProcessedData) bool { return data.IsComplete() }).
	ForEach(ctx, func(data ProcessedData) {
		database.Save(data)
	})

Statistical Analysis:

stats := Stats{}
stream.FromSlice(measurements).ForEach(ctx, func(m Measurement) {
	stats.Add(m)
})

mean := stats.Mean()
stddev := stats.StandardDeviation()

Word Frequency Analysis:

frequencies, err := stream.FromSlice(strings.Fields(text)).
	Map(func(word string) string { return strings.ToLower(word) }).
	Collect(ctx,
		func() interface{} { return make(map[string]int) },
		func(acc interface{}, word string) {
			acc.(map[string]int)[word]++
		},
		func(acc1, acc2 interface{}) interface{} {
			// Combine maps for parallel processing
			return combineMaps(acc1, acc2)
		},
	)

Best Practices:

  1. Always close streams to release resources: defer stream.Close()

  2. Check errors from terminal operations: result, err := stream.ToSlice(ctx) if err != nil { // handle error }

  3. Use appropriate context timeouts: ctx, cancel := context.WithTimeout(ctx, 30*time.Second)

  4. Chain operations efficiently: // Good: single chain result := stream.Filter(p1).Map(f1).Filter(p2).ToSlice(ctx)

    // Avoid: multiple intermediate variables s1 := stream.Filter(p1) s2 := s1.Map(f1) s3 := s2.Filter(p2)

  5. Use Peek for debugging: stream.Peek(func(x T) { log.Printf("Debug: %v", x) })

  6. Consider memory implications of operations: // Memory-efficient (streaming) stream.Filter(predicate).ForEach(ctx, action)

    // Memory-intensive (collects all) stream.Sorted(comparator).ToSlice(ctx)

  7. Handle infinite streams carefully: stream.Generate(generator).Limit(1000).ToSlice(ctx) // Always limit

Comparison with Alternatives:

vs Traditional Loops:

  • More expressive and readable
  • Built-in error handling and context support
  • Functional composition
  • Some performance overhead

vs Channels:

  • Higher-level abstraction
  • Rich set of operations
  • Better error handling
  • Less flexible for complex flow control

vs Iterator Pattern:

  • Lazy evaluation
  • Method chaining
  • Context-aware operations
  • Resource management

Thread Safety:

Individual stream instances are not thread-safe and should not be shared between goroutines. However, you can create multiple streams from the same source safely:

// Safe: separate streams
go processStream(stream.FromSlice(data).Filter(predicate1))
go processStream(stream.FromSlice(data).Filter(predicate2))

// Unsafe: shared stream
s := stream.FromSlice(data)
go s.Filter(predicate1).ForEach(ctx1, action1) // Don't do this
go s.Filter(predicate2).ForEach(ctx2, action2) // Don't do this

Integration with Other Packages:

The stream package integrates well with other goflow components:

// With worker pool for parallel processing
pool, _ := workerpool.NewSafe(4, 100)
err := stream.FromSlice(tasks).ForEach(ctx, func(task Task) {
	pool.Submit(workerpool.TaskFunc(func(ctx context.Context) error {
		return processTask(task)
	}))
})

// With pipeline for complex processing
pipeline := pipeline.New().
	AddStageFunc("extract", extractData).
	AddStageFunc("transform", func(ctx context.Context, input interface{}) (interface{}, error) {
		data := input.([]DataItem)
		processed, err := stream.FromSlice(data).
			Filter(isValid).
			Map(normalize).
			ToSlice(ctx)
		return processed, err
	}).
	AddStageFunc("load", loadData)

Future Enhancements:

Planned features for future versions:

  • Parallel stream processing
  • More built-in collectors
  • Stream splitting and merging
  • Integration with Go generics improvements
  • Performance optimizations
Example

Example demonstrates basic stream usage.

// Create a stream from a slice
stream := FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
defer func() { _ = stream.Close() }()

// Chain operations: filter even numbers, multiply by 2, take first 3
result, err := stream.
	Filter(func(x int) bool { return x%2 == 0 }).
	Map(func(x int) int { return x * 2 }).
	Limit(3).
	ToSlice(context.Background())

if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

fmt.Printf("Result: %v\n", result)
Output:

Result: [4 8 12]
Example (Aggregation)

Example_aggregation demonstrates various aggregation operations.

numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

// Count elements
countStream := FromSlice(numbers)
count, _ := countStream.Count(context.Background())
_ = countStream.Close()
fmt.Printf("Count: %d\n", count)

// Sum using reduce
sumStream := FromSlice(numbers)
sum, _ := sumStream.Reduce(context.Background(), 0, func(acc, x int) int {
	return acc + x
})
_ = sumStream.Close()
fmt.Printf("Sum: %d\n", sum)

// Find min and max
minStream := FromSlice(numbers)
minValue, found, _ := minStream.Min(context.Background(), func(a, b int) int {
	if a < b {
		return -1
	}
	if a > b {
		return 1
	}
	return 0
})
_ = minStream.Close()
if found {
	fmt.Printf("Min: %d\n", minValue)
}

maxStream := FromSlice(numbers)
maxValue, found, _ := maxStream.Max(context.Background(), func(a, b int) int {
	if a < b {
		return -1
	}
	if a > b {
		return 1
	}
	return 0
})
_ = maxStream.Close()
if found {
	fmt.Printf("Max: %d\n", maxValue)
}

// Check if any/all numbers meet criteria
anyStream := FromSlice(numbers)
hasEven, _ := anyStream.AnyMatch(context.Background(), func(x int) bool {
	return x%2 == 0
})
_ = anyStream.Close()
fmt.Printf("Has even numbers: %t\n", hasEven)

allStream := FromSlice(numbers)
allPositive, _ := allStream.AllMatch(context.Background(), func(x int) bool {
	return x > 0
})
_ = allStream.Close()
fmt.Printf("All positive: %t\n", allPositive)
Output:

Count: 10
Sum: 55
Min: 1
Max: 10
Has even numbers: true
All positive: true
Example (Channels)

Example_channels demonstrates creating streams from channels.

// Create a channel and send some data
ch := make(chan string, 5)
ch <- "apple"
ch <- "banana"
ch <- "cherry"
ch <- "date"
ch <- "elderberry"
close(ch)

stream := FromChannel(ch)
defer func() { _ = stream.Close() }()

// Process fruits: filter by length and convert to uppercase
result, err := stream.
	Filter(func(fruit string) bool { return len(fruit) <= 5 }).
	Map(func(fruit string) string { return strings.ToUpper(fruit) }).
	ToSlice(context.Background())

if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

fmt.Printf("Short fruits: %v\n", result)
Output:

Short fruits: [APPLE DATE]
Example (CollectToCustomType)

Example_collectToCustomType demonstrates collecting to custom types.

words := []string{"hello", "world", "from", "stream", "api"}

stream := FromSlice(words)
defer func() { _ = stream.Close() }()

// Collect into a custom string with separators
result, err := stream.
	Filter(func(word string) bool { return len(word) > 3 }).
	Collect(
		context.Background(),
		func() interface{} { return &strings.Builder{} }, // Supplier
		func(acc interface{}, value string) { // Accumulator
			builder := acc.(*strings.Builder)
			if builder.Len() > 0 {
				builder.WriteString(", ")
			}
			builder.WriteString(value)
		},
		func(acc1, acc2 interface{}) interface{} { // Combiner (for parallel processing)
			b1 := acc1.(*strings.Builder)
			b2 := acc2.(*strings.Builder)
			b1.WriteString(", ")
			b1.WriteString(b2.String())
			return b1
		},
	)

if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

builder := result.(*strings.Builder)
fmt.Printf("Collected: %s\n", builder.String())
Output:

Collected: hello, world, from, stream
Example (DataProcessing)

Example_dataProcessing demonstrates a data processing pipeline.

// Sample data: user names
users := []string{"john.doe", "jane.smith", "bob.wilson", "alice.brown"}

stream := FromSlice(users)
defer func() { _ = stream.Close() }()

// Process names: convert to proper case and create email addresses
emails, err := stream.
	Filter(func(name string) bool { return len(name) > 5 }). // Filter long names
	Map(func(name string) string {
		// Convert to proper case
		parts := strings.Split(name, ".")
		for i, part := range parts {
			if len(part) > 0 {
				parts[i] = strings.ToUpper(part[:1]) + part[1:]
			}
		}
		return strings.Join(parts, " ")
	}).
	Map(func(name string) string {
		// Create email address
		emailName := strings.ToLower(strings.ReplaceAll(name, " ", "."))
		return emailName + "@company.com"
	}).
	ToSlice(context.Background())

if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

for _, email := range emails {
	fmt.Println(email)
}
Output:

john.doe@company.com
jane.smith@company.com
bob.wilson@company.com
alice.brown@company.com
Example (FlatMap)

Example_flatMap demonstrates flattening nested structures.

// Each number generates a range from 1 to that number
stream := FromSlice([]int{2, 3, 4})
defer func() { _ = stream.Close() }()

result, err := stream.
	FlatMap(func(n int) Stream[int] {
		// Create a range from 1 to n
		nums := make([]int, n)
		for i := 0; i < n; i++ {
			nums[i] = i + 1
		}
		return FromSlice(nums)
	}).
	ToSlice(context.Background())

if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

fmt.Printf("Flattened: %v\n", result)
Output:

Flattened: [1 2 1 2 3 1 2 3 4]
Example (Generator)

Example_generator demonstrates infinite streams with generators.

counter := 0

// Generate infinite stream of incrementing numbers
stream := Generate(func() int {
	counter++
	return counter
})
defer func() { _ = stream.Close() }()

// Take first 5 even squares
result, err := stream.
	Filter(func(x int) bool { return x%2 == 0 }). // Even numbers
	Map(func(x int) int { return x * x }).        // Squares
	Limit(5).                                     // Limit to 5
	ToSlice(context.Background())

if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

fmt.Printf("First 5 even squares: %v\n", result)
Output:

First 5 even squares: [4 16 36 64 100]
Example (Numbers)

Example_numbers demonstrates number processing.

// Generate stream of numbers and process them
stream := FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12})
defer func() { _ = stream.Close() }()

// Find squares of even numbers, skip first 2, limit to 3
result, err := stream.
	Filter(func(x int) bool { return x%2 == 0 }). // Even numbers: 2,4,6,8,10,12
	Map(func(x int) int { return x * x }).        // Squares: 4,16,36,64,100,144
	Skip(2).                                      // Skip first 2: 36,64,100,144
	Limit(3).                                     // Take 3: 36,64,100
	ToSlice(context.Background())

if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

fmt.Printf("Result: %v\n", result)
Output:

Result: [36 64 100]
Example (Peek)

Example_peek demonstrates debugging with peek. Peek allows you to observe elements as they flow through the pipeline without modifying them. Useful for debugging and logging.

stream := FromSlice([]int{1, 2, 3, 4, 5})
defer func() { _ = stream.Close() }()

// Use Peek to observe elements at different stages of the pipeline.
// Note: Due to lazy evaluation, peek output order may vary.
// We don't verify the debug output here, only the final result.
result, err := stream.
	Peek(func(x int) { _ = x /* Observe original values */ }).
	Filter(func(x int) bool { return x%2 == 0 }).
	Peek(func(x int) { _ = x /* Observe filtered values */ }).
	Map(func(x int) int { return x * 10 }).
	Peek(func(x int) { _ = x /* Observe mapped values */ }).
	ToSlice(context.Background())

if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

fmt.Printf("Result: %v\n", result)
Output:

Result: [20 40]
Example (TextProcessing)

Example_textProcessing demonstrates text processing with streams.

text := "The quick brown fox jumps over the lazy dog"
words := strings.Fields(text)

stream := FromSlice(words)
defer func() { _ = stream.Close() }()

// Process words: filter long words, convert to uppercase, sort
processedWords, err := stream.
	Filter(func(word string) bool { return len(word) > 3 }).        // Words longer than 3 chars
	Map(func(word string) string { return strings.ToUpper(word) }). // Uppercase
	Distinct().                                                     // Remove duplicates
	Sorted(func(a, b string) int { return strings.Compare(a, b) }). // Sort alphabetically
	ToSlice(context.Background())

if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

fmt.Printf("Processed words: %v\n", processedWords)
Output:

Processed words: [BROWN JUMPS LAZY OVER QUICK]
Example (WordCount)

Example_wordCount demonstrates a word counting pipeline.

text := "the quick brown fox jumps over the lazy dog the fox is quick"
words := strings.Fields(text)

// Count word frequencies using collect
stream := FromSlice(words)
defer func() { _ = stream.Close() }()

wordCount, err := stream.
	Map(func(word string) string { return strings.ToLower(word) }). // Normalize case
	Collect(
		context.Background(),
		func() interface{} { return make(map[string]int) }, // Create map
		func(acc interface{}, word string) { // Count words
			wordMap := acc.(map[string]int)
			wordMap[word]++
		},
		func(acc1, acc2 interface{}) interface{} { // Combine maps
			map1 := acc1.(map[string]int)
			map2 := acc2.(map[string]int)
			for word, count := range map2 {
				map1[word] += count
			}
			return map1
		},
	)

if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

wordMap := wordCount.(map[string]int)
fmt.Printf("Word frequencies: %v\n", wordMap)
Output:

Word frequencies: map[brown:1 dog:1 fox:2 is:1 jumps:1 lazy:1 over:1 quick:2 the:3]

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrStreamClosed = errors.New("stream is closed")

ErrStreamClosed is returned when attempting to operate on a closed stream.

Functions

This section is empty.

Types

type Source

type Source[T any] interface {
	// Next returns the next element and true, or zero value and false if no more elements.
	Next(ctx context.Context) (T, bool, error)
	// Close closes the source and releases resources.
	Close() error
}

Source represents a data source for streams.

type Stream

type Stream[T any] interface {

	// Filter returns a stream consisting of elements that match the given predicate.
	Filter(predicate func(T) bool) Stream[T]

	// Map returns a stream consisting of the results of applying the given function to elements.
	Map(mapper func(T) T) Stream[T]

	// MapTo transforms elements to a different type and returns a new typed stream.
	MapTo(mapper func(T) interface{}) Stream[interface{}]

	// FlatMap returns a stream consisting of results of replacing each element with
	// the contents of a mapped stream produced by applying the provided mapping function.
	FlatMap(mapper func(T) Stream[T]) Stream[T]

	// Distinct returns a stream consisting of distinct elements (according to equality).
	Distinct() Stream[T]

	// Sorted returns a stream consisting of elements sorted according to natural order.
	// The compare function should return negative if a < b, 0 if a == b, positive if a > b.
	Sorted(compare func(a, b T) int) Stream[T]

	// Skip returns a stream consisting of remaining elements after skipping n elements.
	Skip(n int64) Stream[T]

	// Limit returns a stream consisting of elements truncated to be no longer than maxSize.
	Limit(maxSize int64) Stream[T]

	// Peek returns a stream consisting of elements, additionally performing the provided
	// action on each element as elements are consumed.
	Peek(action func(T)) Stream[T]

	// ForEach performs an action for each element of the stream.
	ForEach(ctx context.Context, action func(T)) error

	// Reduce performs a reduction on elements using the provided identity and combining function.
	Reduce(ctx context.Context, identity T, accumulator func(T, T) T) (T, error)

	// Collect performs a mutable reduction operation on elements.
	Collect(ctx context.Context, supplier func() interface{}, accumulator func(interface{}, T), combiner func(interface{}, interface{}) interface{}) (interface{}, error)

	// ToSlice returns a slice containing all elements.
	ToSlice(ctx context.Context) ([]T, error)

	// Count returns the count of elements.
	Count(ctx context.Context) (int64, error)

	// AnyMatch returns whether any elements match the given predicate.
	AnyMatch(ctx context.Context, predicate func(T) bool) (bool, error)

	// AllMatch returns whether all elements match the given predicate.
	AllMatch(ctx context.Context, predicate func(T) bool) (bool, error)

	// NoneMatch returns whether no elements match the given predicate.
	NoneMatch(ctx context.Context, predicate func(T) bool) (bool, error)

	// FindFirst returns the first element, if present.
	FindFirst(ctx context.Context) (T, bool, error)

	// FindAny returns any element, if present.
	FindAny(ctx context.Context) (T, bool, error)

	// Min returns the minimum element according to the provided comparator.
	Min(ctx context.Context, compare func(a, b T) int) (T, bool, error)

	// Max returns the maximum element according to the provided comparator.
	Max(ctx context.Context, compare func(a, b T) int) (T, bool, error)

	// Close closes the stream and releases resources. It also cancels any
	// running pipeline goroutines, allowing operations to terminate cleanly
	// even if the stream was not fully consumed.
	Close() error

	// IsClosed returns true if the stream is closed.
	IsClosed() bool
}

Stream represents a sequence of elements supporting sequential and parallel operations. Streams are lazy; computation on the source data is only performed when a terminal operation is initiated, and source elements are consumed only as needed.

func Empty

func Empty[T any]() Stream[T]

Empty creates an empty Stream.

func FromChannel

func FromChannel[T any](ch <-chan T) Stream[T]

FromChannel creates a Stream from a channel.

func FromSlice

func FromSlice[T any](slice []T) Stream[T]

FromSlice creates a Stream from a slice.

func Generate

func Generate[T any](generator func() T) Stream[T]

Generate creates an infinite Stream from a generator function.

func New

func New[T any](source Source[T]) Stream[T]

New creates a new Stream from a Source.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL