Documentation
¶
Overview ¶
Package stream provides a powerful and expressive API for processing sequences of data in Go.
The stream API is inspired by Java 8 Streams and provides a functional programming approach to data processing with support for method chaining, lazy evaluation, and context-aware operations.
Core Concepts:
A Stream represents a sequence of elements supporting sequential operations. Streams are:
- Lazy: computation is only performed when a terminal operation is initiated
- Immutable: operations return new streams rather than modifying existing ones
- Context-aware: all operations respect context cancellation and timeouts
- Resource-managed: streams should be closed to release resources
Basic Usage:
// Create a stream from a slice
stream := stream.FromSlice([]int{1, 2, 3, 4, 5})
defer stream.Close()
// Chain operations and collect results
result, err := stream.
Filter(func(x int) bool { return x%2 == 0 }). // Keep even numbers
Map(func(x int) int { return x * 2 }). // Double them
ToSlice(context.Background()) // Collect to slice
if err != nil {
log.Fatal(err)
}
fmt.Println(result) // [4, 8]
Stream Creation:
Multiple ways to create streams:
// From slice
stream := stream.FromSlice([]string{"a", "b", "c"})
// From channel
ch := make(chan int, 3)
ch <- 1; ch <- 2; ch <- 3; close(ch)
stream := stream.FromChannel(ch)
// From generator function (infinite)
counter := 0
stream := stream.Generate(func() int {
counter++
return counter
})
// Empty stream
stream := stream.Empty[int]()
Intermediate Operations:
Intermediate operations are lazy and return new streams:
// Filter elements based on predicate
stream.Filter(func(x int) bool { return x > 0 })
// Transform elements
stream.Map(func(x int) int { return x * 2 })
// Transform to different type
stream.MapTo(func(x int) interface{} { return fmt.Sprintf("num-%d", x) })
// Flatten nested streams
stream.FlatMap(func(x int) Stream[int] {
return stream.FromSlice([]int{x, x}) // Duplicate each element
})
// Remove duplicates
stream.Distinct()
// Sort elements
stream.Sorted(func(a, b int) int {
if a < b { return -1 }
if a > b { return 1 }
return 0
})
// Skip first n elements
stream.Skip(5)
// Limit to n elements
stream.Limit(10)
// Peek at elements (for debugging)
stream.Peek(func(x int) { log.Printf("Processing: %d", x) })
Terminal Operations:
Terminal operations consume the stream and produce results:
// Iterate over elements
stream.ForEach(ctx, func(x int) {
fmt.Println(x)
})
// Reduce to single value
sum, err := stream.Reduce(ctx, 0, func(acc, x int) int {
return acc + x
})
// Collect to custom type
result, err := stream.Collect(
ctx,
func() interface{} { return &strings.Builder{} },
func(acc interface{}, x string) {
acc.(*strings.Builder).WriteString(x)
},
func(acc1, acc2 interface{}) interface{} { return acc1 },
)
// Convert to slice
slice, err := stream.ToSlice(ctx)
// Count elements
count, err := stream.Count(ctx)
// Find elements
first, found, err := stream.FindFirst(ctx)
any, found, err := stream.FindAny(ctx)
// Test predicates
hasAny, err := stream.AnyMatch(ctx, predicate)
hasAll, err := stream.AllMatch(ctx, predicate)
hasNone, err := stream.NoneMatch(ctx, predicate)
// Find extremes
min, found, err := stream.Min(ctx, comparator)
max, found, err := stream.Max(ctx, comparator)
Data Processing Patterns:
Filter-Map-Reduce Pattern:
result, err := stream.FromSlice(numbers).
Filter(func(x int) bool { return x > 0 }). // Keep positive
Map(func(x int) int { return x * x }). // Square them
Reduce(ctx, 0, func(acc, x int) int { // Sum squares
return acc + x
})
Data Transformation Pipeline:
emails, err := stream.FromSlice(usernames).
Filter(func(name string) bool { return len(name) > 3 }).
Map(func(name string) string {
return strings.ToLower(name) + "@company.com"
}).
ToSlice(ctx)
Aggregation and Statistics:
numbers := stream.FromSlice([]int{1, 2, 3, 4, 5})
count, _ := numbers.Count(ctx)
sum, _ := numbers.Reduce(ctx, 0, func(a, b int) int { return a + b })
min, _, _ := numbers.Min(ctx, intComparator)
max, _, _ := numbers.Max(ctx, intComparator)
Text Processing:
words, err := stream.FromSlice(strings.Fields(text)).
Filter(func(word string) bool { return len(word) > 2 }).
Map(func(word string) string { return strings.ToLower(word) }).
Distinct().
Sorted(func(a, b string) int { return strings.Compare(a, b) }).
ToSlice(ctx)
Infinite Streams:
// Generate fibonacci sequence
a, b := 0, 1
fibonacci := stream.Generate(func() int {
a, b = b, a+b
return a
})
first10, _ := fibonacci.Limit(10).ToSlice(ctx)
Error Handling:
All terminal operations return errors that should be checked:
result, err := stream.ToSlice(ctx)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
log.Println("Operation timed out")
} else if errors.Is(err, stream.ErrStreamClosed) {
log.Println("Stream was closed")
} else {
log.Printf("Stream error: %v", err)
}
return
}
Context and Cancellation:
All terminal operations accept a context for cancellation and timeouts:
// With timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
result, err := stream.ToSlice(ctx)
// With cancellation
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(time.Second)
cancel() // Cancel after 1 second
}()
err := stream.ForEach(ctx, processFunction)
Resource Management:
Streams should be closed to release resources:
stream := stream.FromSlice(data)
defer stream.Close() // Always close streams
// Or use in function scope
func processData(data []int) error {
stream := stream.FromSlice(data)
defer stream.Close()
return stream.ForEach(ctx, process)
}
Performance Characteristics:
- Lazy evaluation: operations are only executed when terminal operations are called
- Memory efficient: elements are processed one at a time in most cases
- Some operations (like Sort) require collecting all elements
- Parallel operations: future versions may support parallel processing
- Overhead: functional style adds some overhead compared to imperative loops
Common Patterns:
Data Validation Pipeline:
validData, err := stream.FromSlice(rawData).
Filter(func(item DataItem) bool { return item.IsValid() }).
Map(func(item DataItem) DataItem { return item.Normalize() }).
Filter(func(item DataItem) bool { return item.PassesBusinessRules() }).
ToSlice(ctx)
ETL (Extract, Transform, Load):
err := stream.FromChannel(inputChannel).
Map(func(raw RawData) ProcessedData { return transform(raw) }).
Filter(func(data ProcessedData) bool { return data.IsComplete() }).
ForEach(ctx, func(data ProcessedData) {
database.Save(data)
})
Statistical Analysis:
stats := Stats{}
stream.FromSlice(measurements).ForEach(ctx, func(m Measurement) {
stats.Add(m)
})
mean := stats.Mean()
stddev := stats.StandardDeviation()
Word Frequency Analysis:
frequencies, err := stream.FromSlice(strings.Fields(text)).
Map(func(word string) string { return strings.ToLower(word) }).
Collect(ctx,
func() interface{} { return make(map[string]int) },
func(acc interface{}, word string) {
acc.(map[string]int)[word]++
},
func(acc1, acc2 interface{}) interface{} {
// Combine maps for parallel processing
return combineMaps(acc1, acc2)
},
)
Best Practices:
Always close streams to release resources: defer stream.Close()
Check errors from terminal operations: result, err := stream.ToSlice(ctx) if err != nil { // handle error }
Use appropriate context timeouts: ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
Chain operations efficiently: // Good: single chain result := stream.Filter(p1).Map(f1).Filter(p2).ToSlice(ctx)
// Avoid: multiple intermediate variables s1 := stream.Filter(p1) s2 := s1.Map(f1) s3 := s2.Filter(p2)
Use Peek for debugging: stream.Peek(func(x T) { log.Printf("Debug: %v", x) })
Consider memory implications of operations: // Memory-efficient (streaming) stream.Filter(predicate).ForEach(ctx, action)
// Memory-intensive (collects all) stream.Sorted(comparator).ToSlice(ctx)
Handle infinite streams carefully: stream.Generate(generator).Limit(1000).ToSlice(ctx) // Always limit
Comparison with Alternatives:
vs Traditional Loops:
- More expressive and readable
- Built-in error handling and context support
- Functional composition
- Some performance overhead
vs Channels:
- Higher-level abstraction
- Rich set of operations
- Better error handling
- Less flexible for complex flow control
vs Iterator Pattern:
- Lazy evaluation
- Method chaining
- Context-aware operations
- Resource management
Thread Safety:
Individual stream instances are not thread-safe and should not be shared between goroutines. However, you can create multiple streams from the same source safely:
// Safe: separate streams go processStream(stream.FromSlice(data).Filter(predicate1)) go processStream(stream.FromSlice(data).Filter(predicate2)) // Unsafe: shared stream s := stream.FromSlice(data) go s.Filter(predicate1).ForEach(ctx1, action1) // Don't do this go s.Filter(predicate2).ForEach(ctx2, action2) // Don't do this
Integration with Other Packages:
The stream package integrates well with other goflow components:
// With worker pool for parallel processing
pool, _ := workerpool.NewSafe(4, 100)
err := stream.FromSlice(tasks).ForEach(ctx, func(task Task) {
pool.Submit(workerpool.TaskFunc(func(ctx context.Context) error {
return processTask(task)
}))
})
// With pipeline for complex processing
pipeline := pipeline.New().
AddStageFunc("extract", extractData).
AddStageFunc("transform", func(ctx context.Context, input interface{}) (interface{}, error) {
data := input.([]DataItem)
processed, err := stream.FromSlice(data).
Filter(isValid).
Map(normalize).
ToSlice(ctx)
return processed, err
}).
AddStageFunc("load", loadData)
Future Enhancements:
Planned features for future versions:
- Parallel stream processing
- More built-in collectors
- Stream splitting and merging
- Integration with Go generics improvements
- Performance optimizations
Example ¶
Example demonstrates basic stream usage.
// Create a stream from a slice
stream := FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
defer func() { _ = stream.Close() }()
// Chain operations: filter even numbers, multiply by 2, take first 3
result, err := stream.
Filter(func(x int) bool { return x%2 == 0 }).
Map(func(x int) int { return x * 2 }).
Limit(3).
ToSlice(context.Background())
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Result: %v\n", result)
Output: Result: [4 8 12]
Example (Aggregation) ¶
Example_aggregation demonstrates various aggregation operations.
numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
// Count elements
countStream := FromSlice(numbers)
count, _ := countStream.Count(context.Background())
_ = countStream.Close()
fmt.Printf("Count: %d\n", count)
// Sum using reduce
sumStream := FromSlice(numbers)
sum, _ := sumStream.Reduce(context.Background(), 0, func(acc, x int) int {
return acc + x
})
_ = sumStream.Close()
fmt.Printf("Sum: %d\n", sum)
// Find min and max
minStream := FromSlice(numbers)
minValue, found, _ := minStream.Min(context.Background(), func(a, b int) int {
if a < b {
return -1
}
if a > b {
return 1
}
return 0
})
_ = minStream.Close()
if found {
fmt.Printf("Min: %d\n", minValue)
}
maxStream := FromSlice(numbers)
maxValue, found, _ := maxStream.Max(context.Background(), func(a, b int) int {
if a < b {
return -1
}
if a > b {
return 1
}
return 0
})
_ = maxStream.Close()
if found {
fmt.Printf("Max: %d\n", maxValue)
}
// Check if any/all numbers meet criteria
anyStream := FromSlice(numbers)
hasEven, _ := anyStream.AnyMatch(context.Background(), func(x int) bool {
return x%2 == 0
})
_ = anyStream.Close()
fmt.Printf("Has even numbers: %t\n", hasEven)
allStream := FromSlice(numbers)
allPositive, _ := allStream.AllMatch(context.Background(), func(x int) bool {
return x > 0
})
_ = allStream.Close()
fmt.Printf("All positive: %t\n", allPositive)
Output: Count: 10 Sum: 55 Min: 1 Max: 10 Has even numbers: true All positive: true
Example (Channels) ¶
Example_channels demonstrates creating streams from channels.
// Create a channel and send some data
ch := make(chan string, 5)
ch <- "apple"
ch <- "banana"
ch <- "cherry"
ch <- "date"
ch <- "elderberry"
close(ch)
stream := FromChannel(ch)
defer func() { _ = stream.Close() }()
// Process fruits: filter by length and convert to uppercase
result, err := stream.
Filter(func(fruit string) bool { return len(fruit) <= 5 }).
Map(func(fruit string) string { return strings.ToUpper(fruit) }).
ToSlice(context.Background())
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Short fruits: %v\n", result)
Output: Short fruits: [APPLE DATE]
Example (CollectToCustomType) ¶
Example_collectToCustomType demonstrates collecting to custom types.
words := []string{"hello", "world", "from", "stream", "api"}
stream := FromSlice(words)
defer func() { _ = stream.Close() }()
// Collect into a custom string with separators
result, err := stream.
Filter(func(word string) bool { return len(word) > 3 }).
Collect(
context.Background(),
func() interface{} { return &strings.Builder{} }, // Supplier
func(acc interface{}, value string) { // Accumulator
builder := acc.(*strings.Builder)
if builder.Len() > 0 {
builder.WriteString(", ")
}
builder.WriteString(value)
},
func(acc1, acc2 interface{}) interface{} { // Combiner (for parallel processing)
b1 := acc1.(*strings.Builder)
b2 := acc2.(*strings.Builder)
b1.WriteString(", ")
b1.WriteString(b2.String())
return b1
},
)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
builder := result.(*strings.Builder)
fmt.Printf("Collected: %s\n", builder.String())
Output: Collected: hello, world, from, stream
Example (DataProcessing) ¶
Example_dataProcessing demonstrates a data processing pipeline.
// Sample data: user names
users := []string{"john.doe", "jane.smith", "bob.wilson", "alice.brown"}
stream := FromSlice(users)
defer func() { _ = stream.Close() }()
// Process names: convert to proper case and create email addresses
emails, err := stream.
Filter(func(name string) bool { return len(name) > 5 }). // Filter long names
Map(func(name string) string {
// Convert to proper case
parts := strings.Split(name, ".")
for i, part := range parts {
if len(part) > 0 {
parts[i] = strings.ToUpper(part[:1]) + part[1:]
}
}
return strings.Join(parts, " ")
}).
Map(func(name string) string {
// Create email address
emailName := strings.ToLower(strings.ReplaceAll(name, " ", "."))
return emailName + "@company.com"
}).
ToSlice(context.Background())
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
for _, email := range emails {
fmt.Println(email)
}
Output: john.doe@company.com jane.smith@company.com bob.wilson@company.com alice.brown@company.com
Example (FlatMap) ¶
Example_flatMap demonstrates flattening nested structures.
// Each number generates a range from 1 to that number
stream := FromSlice([]int{2, 3, 4})
defer func() { _ = stream.Close() }()
result, err := stream.
FlatMap(func(n int) Stream[int] {
// Create a range from 1 to n
nums := make([]int, n)
for i := 0; i < n; i++ {
nums[i] = i + 1
}
return FromSlice(nums)
}).
ToSlice(context.Background())
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Flattened: %v\n", result)
Output: Flattened: [1 2 1 2 3 1 2 3 4]
Example (Generator) ¶
Example_generator demonstrates infinite streams with generators.
counter := 0
// Generate infinite stream of incrementing numbers
stream := Generate(func() int {
counter++
return counter
})
defer func() { _ = stream.Close() }()
// Take first 5 even squares
result, err := stream.
Filter(func(x int) bool { return x%2 == 0 }). // Even numbers
Map(func(x int) int { return x * x }). // Squares
Limit(5). // Limit to 5
ToSlice(context.Background())
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("First 5 even squares: %v\n", result)
Output: First 5 even squares: [4 16 36 64 100]
Example (Numbers) ¶
Example_numbers demonstrates number processing.
// Generate stream of numbers and process them
stream := FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12})
defer func() { _ = stream.Close() }()
// Find squares of even numbers, skip first 2, limit to 3
result, err := stream.
Filter(func(x int) bool { return x%2 == 0 }). // Even numbers: 2,4,6,8,10,12
Map(func(x int) int { return x * x }). // Squares: 4,16,36,64,100,144
Skip(2). // Skip first 2: 36,64,100,144
Limit(3). // Take 3: 36,64,100
ToSlice(context.Background())
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Result: %v\n", result)
Output: Result: [36 64 100]
Example (Peek) ¶
Example_peek demonstrates debugging with peek. Peek allows you to observe elements as they flow through the pipeline without modifying them. Useful for debugging and logging.
stream := FromSlice([]int{1, 2, 3, 4, 5})
defer func() { _ = stream.Close() }()
// Use Peek to observe elements at different stages of the pipeline.
// Note: Due to lazy evaluation, peek output order may vary.
// We don't verify the debug output here, only the final result.
result, err := stream.
Peek(func(x int) { _ = x /* Observe original values */ }).
Filter(func(x int) bool { return x%2 == 0 }).
Peek(func(x int) { _ = x /* Observe filtered values */ }).
Map(func(x int) int { return x * 10 }).
Peek(func(x int) { _ = x /* Observe mapped values */ }).
ToSlice(context.Background())
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Result: %v\n", result)
Output: Result: [20 40]
Example (TextProcessing) ¶
Example_textProcessing demonstrates text processing with streams.
text := "The quick brown fox jumps over the lazy dog"
words := strings.Fields(text)
stream := FromSlice(words)
defer func() { _ = stream.Close() }()
// Process words: filter long words, convert to uppercase, sort
processedWords, err := stream.
Filter(func(word string) bool { return len(word) > 3 }). // Words longer than 3 chars
Map(func(word string) string { return strings.ToUpper(word) }). // Uppercase
Distinct(). // Remove duplicates
Sorted(func(a, b string) int { return strings.Compare(a, b) }). // Sort alphabetically
ToSlice(context.Background())
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Processed words: %v\n", processedWords)
Output: Processed words: [BROWN JUMPS LAZY OVER QUICK]
Example (WordCount) ¶
Example_wordCount demonstrates a word counting pipeline.
text := "the quick brown fox jumps over the lazy dog the fox is quick"
words := strings.Fields(text)
// Count word frequencies using collect
stream := FromSlice(words)
defer func() { _ = stream.Close() }()
wordCount, err := stream.
Map(func(word string) string { return strings.ToLower(word) }). // Normalize case
Collect(
context.Background(),
func() interface{} { return make(map[string]int) }, // Create map
func(acc interface{}, word string) { // Count words
wordMap := acc.(map[string]int)
wordMap[word]++
},
func(acc1, acc2 interface{}) interface{} { // Combine maps
map1 := acc1.(map[string]int)
map2 := acc2.(map[string]int)
for word, count := range map2 {
map1[word] += count
}
return map1
},
)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
wordMap := wordCount.(map[string]int)
fmt.Printf("Word frequencies: %v\n", wordMap)
Output: Word frequencies: map[brown:1 dog:1 fox:2 is:1 jumps:1 lazy:1 over:1 quick:2 the:3]
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrStreamClosed = errors.New("stream is closed")
ErrStreamClosed is returned when attempting to operate on a closed stream.
Functions ¶
This section is empty.
Types ¶
type Source ¶
type Source[T any] interface { // Next returns the next element and true, or zero value and false if no more elements. Next(ctx context.Context) (T, bool, error) // Close closes the source and releases resources. Close() error }
Source represents a data source for streams.
type Stream ¶
type Stream[T any] interface { // Filter returns a stream consisting of elements that match the given predicate. Filter(predicate func(T) bool) Stream[T] // Map returns a stream consisting of the results of applying the given function to elements. Map(mapper func(T) T) Stream[T] // MapTo transforms elements to a different type and returns a new typed stream. MapTo(mapper func(T) interface{}) Stream[interface{}] // FlatMap returns a stream consisting of results of replacing each element with // the contents of a mapped stream produced by applying the provided mapping function. FlatMap(mapper func(T) Stream[T]) Stream[T] // Distinct returns a stream consisting of distinct elements (according to equality). Distinct() Stream[T] // Sorted returns a stream consisting of elements sorted according to natural order. // The compare function should return negative if a < b, 0 if a == b, positive if a > b. Sorted(compare func(a, b T) int) Stream[T] // Skip returns a stream consisting of remaining elements after skipping n elements. Skip(n int64) Stream[T] // Limit returns a stream consisting of elements truncated to be no longer than maxSize. Limit(maxSize int64) Stream[T] // Peek returns a stream consisting of elements, additionally performing the provided // action on each element as elements are consumed. Peek(action func(T)) Stream[T] // ForEach performs an action for each element of the stream. ForEach(ctx context.Context, action func(T)) error // Reduce performs a reduction on elements using the provided identity and combining function. Reduce(ctx context.Context, identity T, accumulator func(T, T) T) (T, error) // Collect performs a mutable reduction operation on elements. Collect(ctx context.Context, supplier func() interface{}, accumulator func(interface{}, T), combiner func(interface{}, interface{}) interface{}) (interface{}, error) // ToSlice returns a slice containing all elements. ToSlice(ctx context.Context) ([]T, error) // Count returns the count of elements. Count(ctx context.Context) (int64, error) // AnyMatch returns whether any elements match the given predicate. AnyMatch(ctx context.Context, predicate func(T) bool) (bool, error) // AllMatch returns whether all elements match the given predicate. AllMatch(ctx context.Context, predicate func(T) bool) (bool, error) // NoneMatch returns whether no elements match the given predicate. NoneMatch(ctx context.Context, predicate func(T) bool) (bool, error) // FindFirst returns the first element, if present. FindFirst(ctx context.Context) (T, bool, error) // FindAny returns any element, if present. FindAny(ctx context.Context) (T, bool, error) // Min returns the minimum element according to the provided comparator. Min(ctx context.Context, compare func(a, b T) int) (T, bool, error) // Max returns the maximum element according to the provided comparator. Max(ctx context.Context, compare func(a, b T) int) (T, bool, error) // Close closes the stream and releases resources. It also cancels any // running pipeline goroutines, allowing operations to terminate cleanly // even if the stream was not fully consumed. Close() error // IsClosed returns true if the stream is closed. IsClosed() bool }
Stream represents a sequence of elements supporting sequential and parallel operations. Streams are lazy; computation on the source data is only performed when a terminal operation is initiated, and source elements are consumed only as needed.
func FromChannel ¶
FromChannel creates a Stream from a channel.