Documentation
¶
Overview ¶
Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.
See reader.go for full package documentation and usage examples.
Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.
This package allows you to efficiently process large files or streams by splitting the input into chunks and processing each line concurrently using multiple goroutines. It now supports orchestrating multiple io.ReadCloser sources as a single logical stream, allowing you to merge large datasets without custom plumbing.
Features ¶
- Concurrent processing of lines using a configurable number of workers (goroutines)
- Custom line processor function for transforming or filtering lines
- Metrics reporting (bytes read, rows read, processing time, etc.)
- Optional row read limit
Basic Usage ¶
import (
"os"
clp "github.com/anvesh9652/concurrent-line-processor"
)
f, err := os.Open("largefile.txt")
clp.ExitOnError(err)
defer f.Close()
pr := clp.NewConcurrentLineProcessor(f, clp.WithWorkers(4), clp.WithChunkSize(1024*1024))
output, err := io.ReadAll(pr)
clp.ExitOnError(err)
fmt.Println(string(output))
Custom Line Processing ¶
pr := clp.NewConcurrentLineProcessor(f, clp.WithCustomLineProcessor(func(line []byte) ([]byte, error) {
// Transform or filter the line
return bytes.ToUpper(line), nil
}))
Metrics ¶
metrics := pr.Metrics()
fmt.Printf("Rows read: %d, Bytes read: %d, Time took: %s\n", metrics.RowsRead, metrics.BytesRead, metrics.TimeTook)
For more advanced usage, see the examples/ directory.
Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.
See reader.go for full package documentation and usage examples, including how to wire multiple io.ReadCloser sources into a single processor.
Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.
See reader.go for full package documentation and usage examples, including configuration with multiple readers.
Index ¶
- Variables
- func EnsureNewLineAtEnd(chunk *Chunk)
- func ErrWithDebugStack(err error) error
- func ExitOnError(err error)
- func Filter[T any](arr []T, keep func(T) bool) []T
- func FormatBytes(size float64) string
- func FormatDuration(d time.Duration) string
- func IfNull[T any](org *T, def T) T
- func NewConcurrentLineProcessor(r io.ReadCloser, opts ...Option) *concurrentLineProcessor
- func PrintAsJsonString(v any)
- func PrintSummaryPeriodically(ctx context.Context, p *concurrentLineProcessor, interval time.Duration)
- func WithOpts(p *concurrentLineProcessor, opts ...Option) *concurrentLineProcessor
- type Chunk
- type LineDetails
- type LineProcessor
- type Metrics
- type Option
Constants ¶
This section is empty.
Variables ¶
var Files = []string{
"/Users/agali/go-workspace/src/github.com/anvesh9652/concurrent-line-processor/data/temp_example.csv",
"/Users/agali/go-workspace/src/github.com/anvesh9652/concurrent-line-processor/tmp/2024-06-04-details.jsonl",
"/Users/agali/Downloads/temp/my_data/usage_data_12m.json",
"/Users/agali/Downloads/temp/my_data/usage_data_3m.json",
"/Users/agali/Desktop/Work/go-lang/tryouts/1brc/gen/measurements.txt",
}
Files contains a list of test files used for development and testing. This variable is used internally for testing and benchmarking purposes.
var (
KB = 1024
)
Functions ¶
func EnsureNewLineAtEnd ¶ added in v1.1.1
func EnsureNewLineAtEnd(chunk *Chunk)
func ErrWithDebugStack ¶
func ExitOnError ¶ added in v1.0.3
func ExitOnError(err error)
func FormatBytes ¶ added in v1.0.10
func FormatDuration ¶ added in v1.1.1
func NewConcurrentLineProcessor ¶
func NewConcurrentLineProcessor(r io.ReadCloser, opts ...Option) *concurrentLineProcessor
NewConcurrentLineProcessor creates a new concurrentLineProcessor that reads from the provided io.ReadCloser. It starts processing immediately in background goroutines and returns a processor that implements io.Reader.
When you need to process more than one source, pass nil as the reader and supply inputs with WithMultiReaders.
The processor splits input into chunks, processes each line concurrently using multiple workers, and provides the processed output through the Read method.
Example:
file, err := os.Open("large-file.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
processor := clp.NewConcurrentLineProcessor(file,
clp.WithWorkers(8),
clp.WithChunkSize(1024*1024),
)
output, err := io.ReadAll(processor)
if err != nil {
log.Fatal(err)
}
func PrintAsJsonString ¶
func PrintAsJsonString(v any)
func PrintSummaryPeriodically ¶ added in v1.0.12
Types ¶
type Chunk ¶
type Chunk struct {
// contains filtered or unexported fields
}
Chunk represents a piece of data to be processed, containing an ID for ordering and a pointer to the actual data buffer.
type LineDetails ¶ added in v1.1.1
type LineDetails struct {
// ReaderID is the ID of the source reader from which this line was read.
ReaderID int
// ChunkID is the ID of the chunk which we have read from the source reader.
ChunkID int
}
LineDetails provides contextual information about a line being processed. All the fields follow zero-based indexing.
type LineProcessor ¶
type LineProcessor func(b []byte, info *LineDetails) ([]byte, error)
LineProcessor is a function type for processing individual lines. It receives a line as []byte and info and then returns the processed line and any error. Implementations must be thread-safe as they may be called concurrently.
type Metrics ¶
type Metrics struct {
// BytesRead is the total number of bytes read from the source reader.
// When RowsReadLimit is set, it might read more bytes than the bytes written.
BytesRead int64 `json:"bytes_read"`
// BytesWritten is the total number of bytes written after processing each line.
BytesWritten int64 `json:"bytes_written"`
// RowsRead is the total number of rows read from the source reader.
RowsRead int64 `json:"rows_read"`
// RowsWritten is the total number of rows written to the output stream.
RowsWritten int64 `json:"rows_written"`
// TimeTook is the total time taken to read and process the data.
TimeTook time.Duration `json:"time_took"`
}
Metrics contains performance and processing statistics for a concurrentLineProcessor.
type Option ¶
type Option func(*concurrentLineProcessor)
Option is a function type for configuring concurrentLineProcessor instances. Options are passed to NewConcurrentLineProcessor to customize behavior.
func WithChannelSize ¶ added in v1.0.10
WithChannelSize sets the size of the channels used for input and output streams. A larger channel size can improve throughput for high-volume data processing. Default (when unspecified) is 70 (see defaultChanSize in reader.go).
Example:
clp.NewConcurrentLineProcessor(reader, clp.WithChannelSize(1000)) // 1000 items in channel
func WithChunkSize ¶
WithChunkSize sets the chunk size for reading data from the source. Larger chunk sizes can improve performance for large files but may use more memory. The default chunk size is 64KB.
Example:
clp.NewConcurrentLineProcessor(reader, clp.WithChunkSize(1024*1024)) // 1MB chunks
func WithContext ¶ added in v1.1.1
WithContext sets the context for the concurrentLineProcessor. This context can be used to manage cancellation and timeouts for the processing operations.
Example:
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() clp.NewConcurrentLineProcessor(reader, clp.WithContext(ctx))
func WithCustomLineProcessor ¶
func WithCustomLineProcessor(c LineProcessor) Option
WithCustomLineProcessor sets a custom function to process each line. The function receives a line as []byte and should return the processed line. The function must be thread-safe and should not modify external state without proper synchronization.
Example:
// Convert lines to uppercase
processor := func(line []byte) ([]byte, error) {
return bytes.ToUpper(line), nil
}
clp.NewConcurrentLineProcessor(reader, clp.WithCustomLineProcessor(processor))
func WithMultiReaders ¶ added in v1.1.0
func WithMultiReaders(readers ...io.ReadCloser) Option
WithMultiReaders sets multiple source readers for the concurrentLineProcessor. When used, the reader passed to NewConcurrentLineProcessor can be nil because this option replaces the internal reader list. Empty readers will by handled by Read method.
Example:
readers := []io.ReadCloser{reader1, reader2, reader3}
clp.NewConcurrentLineProcessor(nil, clp.WithMultiReaders(readers...))
func WithRowsReadLimit ¶
WithRowsReadLimit sets a limit on the number of rows to read from the source. Use -1 for no limit (default). This is useful for processing only a subset of a large file for testing or sampling purposes.
Example:
clp.NewConcurrentLineProcessor(reader, clp.WithRowsReadLimit(1000)) // Process only first 1000 lines
func WithWorkers ¶
WithWorkers sets the number of worker goroutines for concurrent processing. More workers can improve performance for CPU-intensive line processing, but may not help for I/O-bound operations. The default is runtime.NumCPU().
Example:
clp.NewConcurrentLineProcessor(reader, clp.WithWorkers(8))