Documentation
¶
Overview ¶
Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.
See reader.go for full package documentation and usage examples.
Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.
This package allows you to efficiently process large files or streams by splitting the input into chunks and processing each line (or chunk) concurrently using multiple goroutines. It supports orchestrating multiple io.ReadCloser sources as a single logical stream, allowing you to merge large datasets without custom plumbing.
Features ¶
- Concurrent processing using a configurable number of workers (goroutines)
- Custom line processor function for transforming or filtering individual lines
- Custom chunk processor function for processing entire chunks at once
- ChunkDetails context passed to processors with ReaderID and ChunkID
- Metrics reporting (bytes read/written, rows read/written, processing time)
- Optional row read limit for sampling or testing
- Multi-source input: merge multiple io.ReadCloser inputs into one stream
- Backpressure-friendly internal bounded channels
- Memory-efficient sync.Pool-based chunk allocation
Output Ordering ¶
IMPORTANT: By default, output lines may NOT preserve the original input order. Concurrent processing with multiple workers causes chunks to complete at different times, resulting in unpredictable output ordering.
To guarantee ordered output:
- Use WithWorkers(1) to process sequentially
- For multiple input sources, merge them with io.MultiReader before passing to the processor instead of using WithMultiReaders/WithReaders (which processes sources concurrently)
Basic Usage ¶
import (
"os"
"io"
clp "github.com/anvesh9652/concurrent-line-processor"
)
f, err := os.Open("largefile.txt")
clp.ExitOnError(err)
defer f.Close()
pr := clp.NewConcurrentLineProcessor(f, clp.WithWorkers(4), clp.WithChunkSize(1024*1024))
output, err := io.ReadAll(pr)
clp.ExitOnError(err)
fmt.Println(string(output))
Custom Line Processing ¶
The DataProcessor function signature is: func(b []byte, info *ChunkDetails, out io.Writer) error Processors write their output to the provided io.Writer and return any error.
pr := clp.NewConcurrentLineProcessor(f, clp.WithCustomLineProcessor(
func(line []byte, info *clp.ChunkDetails, out io.Writer) error {
_, err := out.Write(bytes.ToUpper(line))
return err
},
))
Custom Chunk Processing ¶
For processing entire chunks at once (e.g., aggregation):
pr := clp.NewConcurrentLineProcessor(f, clp.WithCustomChunkProcessor(
func(chunk []byte, info *clp.ChunkDetails, out io.Writer) error {
// Process entire chunk
_, err := out.Write(chunk)
return err
},
))
Metrics ¶
metrics := pr.Metrics()
fmt.Printf("Rows read: %d, Bytes read: %d, Time took: %s\n", metrics.RowsRead, metrics.BytesRead, metrics.TimeTook)
For more advanced usage, see the examples/ directory.
Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.
See reader.go for full package documentation and usage examples, including how to wire multiple io.ReadCloser sources into a single processor.
Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.
See reader.go for full package documentation and usage examples, including configuration with multiple readers.
Index ¶
- Constants
- Variables
- func DrainData(r io.Reader)
- func EnsureNewLineAtEnd(chunk *Chunk)
- func ErrWithDebugStack(err error) error
- func ExitOnError(err error)
- func Filter[T any](arr []T, keep func(T) bool) []T
- func FormatBytes(size, base float64) string
- func FormatDuration(d time.Duration) string
- func IfNull[T any](org *T, def T) T
- func Lines(l []byte, rawLine bool) iter.Seq[[]byte]
- func NewConcurrentLineProcessor(r io.ReadCloser, opts ...Option) *concurrentLineProcessor
- func PrintAsJsonString(v any)
- func PrintSummaryPeriodically(ctx context.Context, p *concurrentLineProcessor, interval time.Duration)
- func Ptr[T any](v T) *T
- func WithOpts(p *concurrentLineProcessor, opts ...Option) *concurrentLineProcessor
- type Chunk
- type ChunkDetails
- type DataProcessor
- type Metrics
- type Option
- func WithChannelSize(size int) Option
- func WithChunkSize(size int) Option
- func WithContext(ctx context.Context) Option
- func WithCustomChunkProcessor(c DataProcessor) Option
- func WithCustomLineProcessor(c DataProcessor) Option
- func WithReaders(readers ...io.ReadCloser) Option
- func WithRowsReadLimit(limit int) Option
- func WithWorkers(n int) Option
Constants ¶
const ( BaseSI = 1000 BaseBinary = 1024 )
const (
KB = 1024
)
Variables ¶
var Files = []string{
"/Users/agali/go-workspace/src/github.com/anvesh9652/concurrent-line-processor/data/temp_example.csv",
"/Users/agali/go-workspace/src/github.com/anvesh9652/concurrent-line-processor/tmp/2024-06-04-details.jsonl",
"/Users/agali/Downloads/temp/my_data/usage_data_12m.json",
"/Users/agali/Downloads/temp/my_data/usage_data_3m.json",
"/Users/agali/Desktop/Work/go-lang/tryouts/1brc/gen/measurements.txt",
}
Files contains a list of test files used for development and testing. This variable is used internally for testing and benchmarking purposes.
Functions ¶
func EnsureNewLineAtEnd ¶ added in v1.1.1
func EnsureNewLineAtEnd(chunk *Chunk)
EnsureNewLineAtEnd ensures the chunk's data ends with a newline character. If the chunk is nil or empty, this is a no-op. It modifies the chunk in-place, either by setting an existing byte or appending.
func ErrWithDebugStack ¶
func ExitOnError ¶ added in v1.0.3
func ExitOnError(err error)
func FormatBytes ¶ added in v1.0.10
FormatBytes formats a byte count into a human-readable string. Use BaseSI (1000) for SI units (KB, MB, GB) or BaseBinary (1024) for binary units (KiB, MiB, GiB).
func FormatDuration ¶ added in v1.1.1
FormatDuration formats a duration into a human-readable string with appropriate precision. Precision decreases as duration increases: nanoseconds for tiny durations, milliseconds for sub-second, seconds for sub-minute, etc.
func Lines ¶ added in v1.2.0
Lines returns an iterator over lines in the given byte slice. Each yielded line does not include the trailing newline character if rawLine is false. Otherwise it will have the trailing newline character. Uses Go 1.23+ range-over-func iteration pattern.
func NewConcurrentLineProcessor ¶
func NewConcurrentLineProcessor(r io.ReadCloser, opts ...Option) *concurrentLineProcessor
NewConcurrentLineProcessor creates a new concurrentLineProcessor that reads from the provided io.ReadCloser. It starts processing immediately in background goroutines and returns a processor that implements io.Reader.
When you need to process more than one source, pass nil as the reader and supply inputs with WithMultiReaders.
The processor splits input into chunks, processes each line concurrently using multiple workers, and provides the processed output through the Read method.
IMPORTANT: Output order is NOT guaranteed by default. Use WithWorkers(1) for ordered output.
Example:
file, err := os.Open("large-file.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
processor := clp.NewConcurrentLineProcessor(file,
clp.WithWorkers(8),
clp.WithChunkSize(1024*1024),
)
output, err := io.ReadAll(processor)
if err != nil {
log.Fatal(err)
}
func PrintAsJsonString ¶
func PrintAsJsonString(v any)
func PrintSummaryPeriodically ¶ added in v1.0.12
func PrintSummaryPeriodically(ctx context.Context, p *concurrentLineProcessor, interval time.Duration)
PrintSummaryPeriodically prints the processor's summary at regular intervals. It stops when the context is cancelled. Useful for monitoring long-running processes.
Types ¶
type Chunk ¶
type Chunk struct {
// contains filtered or unexported fields
}
Chunk represents a piece of data to be processed. It implements io.Writer and io.ByteWriter for efficient data accumulation.
Each chunk has an ID for ordering within a reader and a readerID to identify which source reader it came from (useful when processing multiple readers).
type ChunkDetails ¶ added in v1.2.0
type ChunkDetails struct {
// ReaderID identifies which source reader this data came from.
// Useful when processing multiple readers via WithReaders.
ReaderID int
// ChunkID is the sequential ID of the chunk within its source reader.
// Can be used for ordering or debugging purposes.
ChunkID int
}
ChunkDetails provides contextual information about the data being processed. It is passed to DataProcessor functions to provide context about the source. All fields use zero-based indexing.
type DataProcessor ¶ added in v1.2.0
type DataProcessor func(b []byte, info *ChunkDetails, out io.Writer) error
DataProcessor is a function type for processing individual lines or chunks. It receives the data as []byte, contextual info via ChunkDetails (containing ReaderID and ChunkID), and an io.Writer to write the processed output to.
The processor must write its output to the provided io.Writer rather than returning the result. This design allows for efficient streaming without intermediate allocations.
Implementations must be thread-safe as they may be called concurrently from multiple workers. Do not mutate shared state without proper synchronization (e.g., sync.Mutex).
Example:
func(b []byte, info *ChunkDetails, out io.Writer) error {
_, err := out.Write(bytes.ToUpper(b))
return err
}
type Metrics ¶
type Metrics struct {
// BytesRead is the total number of bytes read from the source reader.
// When RowsReadLimit is set, it might read more bytes than the bytes written.
BytesRead int64 `json:"bytes_read"`
// BytesWritten is the total number of bytes written after processing each line.
BytesWritten int64 `json:"bytes_written"`
// RowsRead is the total number of rows read from the source reader.
RowsRead int64 `json:"rows_read"`
// RowsWritten is the total number of rows written to the output stream.
RowsWritten int64 `json:"rows_written"`
// TimeTook is the total time taken to read and process the data.
TimeTook time.Duration `json:"time_took"`
}
Metrics contains performance and processing statistics for a concurrentLineProcessor.
type Option ¶
type Option func(*concurrentLineProcessor)
Option is a function type for configuring concurrentLineProcessor instances. Options are passed to NewConcurrentLineProcessor to customize behavior.
func WithChannelSize ¶ added in v1.0.10
WithChannelSize sets the size of the channels used for input and output streams. A larger channel size can improve throughput for high-volume data processing. Default (when unspecified) is 70 (see defaultChanSize in reader.go).
Example:
clp.NewConcurrentLineProcessor(reader, clp.WithChannelSize(1000)) // 1000 items in channel
func WithChunkSize ¶
WithChunkSize sets the chunk size for reading data from the source. Larger chunk sizes can improve performance for large files but may use more memory. The default chunk size is 64KB.
Example:
clp.NewConcurrentLineProcessor(reader, clp.WithChunkSize(1024*1024)) // 1MB chunks
func WithContext ¶ added in v1.1.1
WithContext sets the context for the concurrentLineProcessor. This context can be used to manage cancellation and timeouts for the processing operations.
Example:
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() clp.NewConcurrentLineProcessor(reader, clp.WithContext(ctx))
func WithCustomChunkProcessor ¶ added in v1.2.0
func WithCustomChunkProcessor(c DataProcessor) Option
WithCustomChunkProcessor sets a custom function to process entire chunks at once. Unlike WithCustomLineProcessor, this processes the full chunk buffer rather than individual lines, which can be more efficient for certain operations like aggregation.
The function receives a chunk as []byte, a ChunkDetails struct with contextual info (ReaderID, ChunkID), and an io.Writer to write output to.
The processor must write its output to the provided io.Writer. A newline is automatically ensured at the end of the chunk output.
The function must be thread-safe and should not modify external state without proper synchronization (e.g., sync.Mutex).
Example:
// Process entire chunk and extract all JSON keys
processor := func(chunk []byte, info *clp.ChunkDetails, out io.Writer) error {
// Process chunk as a whole
_, err := out.Write(chunk)
return err
}
clp.NewConcurrentLineProcessor(reader, clp.WithCustomChunkProcessor(processor))
func WithCustomLineProcessor ¶
func WithCustomLineProcessor(c DataProcessor) Option
WithCustomLineProcessor sets a custom function to process each line individually. The function receives a line as []byte (without trailing newline), a ChunkDetails struct with contextual info (ReaderID, ChunkID), and an io.Writer to write output to.
The processor must write its output to the provided io.Writer. A newline is automatically appended after each processed line.
The function must be thread-safe and should not modify external state without proper synchronization (e.g., sync.Mutex).
Example:
// Convert lines to uppercase
processor := func(line []byte, info *clp.ChunkDetails, out io.Writer) error {
_, err := out.Write(bytes.ToUpper(line))
return err
}
clp.NewConcurrentLineProcessor(reader, clp.WithCustomLineProcessor(processor))
func WithReaders ¶ added in v1.2.1
func WithReaders(readers ...io.ReadCloser) Option
WithReaders sets multiple source readers for the concurrentLineProcessor. When used, the reader passed to NewConcurrentLineProcessor can be nil because this option replaces the internal reader list. Empty readers will by handled by Read method.
IMPORTANT: Multiple readers are processed concurrently, so output from different sources will be interleaved unpredictably. If you need ordered output across multiple sources, merge them with io.MultiReader before passing to the processor:
merged := io.NopCloser(io.MultiReader(r1, r2, r3)) clp.NewConcurrentLineProcessor(merged, clp.WithWorkers(1))
Example:
readers := []io.ReadCloser{reader1, reader2, reader3}
clp.NewConcurrentLineProcessor(nil, clp.WithReaders(readers...))
func WithRowsReadLimit ¶
WithRowsReadLimit sets a limit on the number of rows to read from the source. Use -1 for no limit (default). This is useful for processing only a subset of a large file for testing or sampling purposes.
Example:
clp.NewConcurrentLineProcessor(reader, clp.WithRowsReadLimit(1000)) // Process only first 1000 lines
func WithWorkers ¶
WithWorkers sets the number of worker goroutines for concurrent processing. More workers can improve performance for CPU-intensive line processing, but may not help for I/O-bound operations. The default is runtime.NumCPU().
IMPORTANT: Multiple workers process chunks concurrently, which means output order is NOT preserved. Use WithWorkers(1) if you need output to maintain input order.
Example:
clp.NewConcurrentLineProcessor(reader, clp.WithWorkers(8))
For ordered output:
clp.NewConcurrentLineProcessor(reader, clp.WithWorkers(1))