concurrentlineprocessor

package module
v1.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 5, 2026 License: MIT Imports: 18 Imported by: 1

README

Concurrent Line Processor

Go Reference Go Report Card

A high-performance, concurrent line-by-line processor for large files and streams in Go. This package splits input into chunks and processes lines concurrently using goroutines. You can also stitch multiple io.ReadCloser sources together and treat them as a single logical stream without extra fan-in plumbing.

Features

  • Concurrent Processing: Process lines concurrently with a configurable number of worker goroutines
  • Dual Processing Modes: Process individual lines or entire chunks with custom processors
  • Memory Efficient: Uses a sync.Pool and streaming; never loads entire file into memory
  • Customizable: Supply a thread-safe custom line or chunk processor function
  • Context-Aware Processors: Processors receive ChunkDetails with ReaderID and ChunkID for tracking
  • Metrics: Built-in metrics (bytes read/written, rows read/written, processing duration)
  • Standard Interface: Implements io.Reader and has a Close() for resource cleanup
  • Flexible Configuration: Configure chunk size, worker count, channel size, and row read limit
  • Multi-source Input: Merge multiple io.ReadCloser inputs into one stream (ordering between sources is nondeterministic)
  • Backpressure Friendly: Internal bounded channels help balance producer/consumer throughput

⚠️ Important: Output Ordering
By default, concurrent processing means output lines may not maintain the same order as the input. This is because multiple workers process chunks in parallel and complete at different times. If you require ordered output that preserves the input sequence, use WithWorkers(1). For multiple input sources, you must also merge them using Go's io.MultiReader instead of WithMultiReaders to maintain a single sequential stream.

Installation

go get github.com/anvesh9652/concurrent-line-processor@latest

Quick Start

Below are common usage patterns. Each example is self-contained and can be copied into a file and run with go run.

Note: Reading via io.ReadAll will accumulate all processed data in memory. Prefer io.Copy to a file or stream for very large inputs.

1. Basic Usage (stream to stdout)
package main

import (
    "fmt"
    "io"
    "os"

    clp "github.com/anvesh9652/concurrent-line-processor"
)

func main() {
    file, err := os.Open("large-file.txt")
    if err != nil {
        panic(err)
    }
    defer file.Close()

    processor := clp.NewConcurrentLineProcessor(file)
    defer processor.Close() // Closes input readers and the internal pipe

    // Stream the processed output directly
    if _, err := io.Copy(os.Stdout, processor); err != nil {
        panic(err)
    }

    metrics := processor.Metrics()
    fmt.Printf("Processed %d rows, %d bytes in %s\n", metrics.RowsRead, metrics.BytesRead, metrics.TimeTook)
}
2. Merging Multiple Sources (unordered output)
package main

import (
    "io"
    "os"

    clp "github.com/anvesh9652/concurrent-line-processor"
)

func main() {
    files := []string{"part-1.log", "part-2.log", "part-3.log"}
    var readers []io.ReadCloser
    for _, name := range files {
        f, err := os.Open(name)
        if err != nil {
            panic(err)
        }
        readers = append(readers, f)
    }

    // WithMultiReaders processes sources concurrently - output order is not preserved
    processor := clp.NewConcurrentLineProcessor(nil,
        clp.WithMultiReaders(readers...),
        clp.WithWorkers(4),
    )
    defer processor.Close()

    _, err := io.Copy(os.Stdout, processor)
    if err != nil {
        panic(err)
    }
}
3. Custom Line Processing
package main

import (
    "bytes"
    "io"
    "os"

    clp "github.com/anvesh9652/concurrent-line-processor"
)

func main() {
    file, err := os.Open("data.csv")
    if err != nil {
        panic(err)
    }
    defer file.Close()

    // DataProcessor signature: func(b []byte, info *ChunkDetails, out io.Writer) error
    // Processors write output to the provided io.Writer
    upperCaseProcessor := func(line []byte, info *clp.ChunkDetails, out io.Writer) error {
        _, err := out.Write(bytes.ToUpper(line))
        return err
    }

    processor := clp.NewConcurrentLineProcessor(file,
        clp.WithCustomLineProcessor(upperCaseProcessor),
        clp.WithWorkers(8),
        clp.WithChunkSize(1024*1024), // 1MB chunks
    )
    defer processor.Close()

    outFile, err := os.Create("output.csv")
    if err != nil { panic(err) }
    defer outFile.Close()
    if _, err := io.Copy(outFile, processor); err != nil { panic(err) }
}
4. CSV to JSONL Conversion
package main

import (
    "bytes"
    "encoding/csv"
    "encoding/json"
    "io"
    "os"

    clp "github.com/anvesh9652/concurrent-line-processor"
)

func convertCSVToJSONL(inputFile, outputFile string, headers []string) error {
    input, err := os.Open(inputFile)
    if err != nil {
        return err
    }
    defer input.Close()

    output, err := os.Create(outputFile)
    if err != nil {
        return err
    }
    defer output.Close()

    // The processor receives ChunkDetails with ReaderID and ChunkID for context
    csvToJSONProcessor := func(line []byte, info *clp.ChunkDetails, out io.Writer) error {
        reader := csv.NewReader(bytes.NewReader(line))
        row, err := reader.Read()
        if err != nil {
            return err
        }
        record := make(map[string]string)
        for i, header := range headers {
            if i < len(row) {
                record[header] = row[i]
            }
        }
        return json.NewEncoder(out).Encode(record)
    }

    processor := clp.NewConcurrentLineProcessor(input,
        clp.WithCustomLineProcessor(csvToJSONProcessor),
        clp.WithWorkers(4),
        clp.WithRowsReadLimit(-1),
    )

    _, err = io.Copy(output, processor)
    return err
}
5. Processing with Row Limit
package main

import (
    "fmt"
    "io"
    "os"

    clp "github.com/anvesh9652/concurrent-line-processor"
)

func processFirstThousandRows(filename string) error {
    file, err := os.Open(filename)
    if err != nil {
        return err
    }
    defer file.Close()

    processor := clp.NewConcurrentLineProcessor(file,
        clp.WithRowsReadLimit(1000),
        clp.WithWorkers(2),
    )

    _, err = io.Copy(io.Discard, processor)
    if err != nil {
        return err
    }

    metrics := processor.Metrics()
    fmt.Printf("Processed %d rows in %s\n", metrics.RowsRead, metrics.TimeTook)
    return nil
}

Processing stops when the limit of newline-delimited rows is reached. Internally the reader may consume extra bytes to detect the boundary.

Performance Considerations

Memory Usage
  • A sync.Pool minimizes per-chunk allocations.
  • Memory scales with (chunk size * active workers) plus channel buffering.
  • Default chunk size is 64KB. Tune based on typical line length and performance requirements.
Worker Count
  • More workers help if your custom line processor is CPU-bound.
  • For pure pass-through or I/O-bound workloads, increasing workers yields limited benefit.
  • Default is runtime.NumCPU().
Chunk Size
  • Larger chunks reduce syscall overhead but increase peak memory.
  • Smaller chunks improve responsiveness and reduce memory footprint.
  • Default: 64KB. Examples show tuning up to multi-MB for high-throughput transformations.
Channel Size
  • Controls buffering between reader, processors, and writer stages.
  • Default: 70. Increase if workers frequently starve or if you have bursty input.
Ordering

Default Behavior: Output lines do not preserve input order due to concurrent processing. Multiple workers process chunks in parallel, and chunks complete at different times, causing output to be interleaved unpredictably.

When You Need Ordered Output:

If maintaining the exact input order is critical for your use case, configure the processor as follows:

  1. Single Worker: Use WithWorkers(1) to process chunks sequentially
  2. Single Input Source:
    • DO NOT use WithMultiReaders() with multiple sources
    • DO merge multiple readers using Go's io.MultiReader() before passing to the processor

Example for Ordered Output:

// Merge multiple files while preserving order
f1, _ := os.Open("file1.txt")
f2, _ := os.Open("file2.txt")
f3, _ := os.Open("file3.txt")

// Use io.MultiReader to merge into single sequential stream
mergedReader := io.NopCloser(io.MultiReader(f1, f2, f3))

processor := clp.NewConcurrentLineProcessor(mergedReader,
    clp.WithWorkers(1),        // Sequential processing
)

Performance Trade-off: Ordered output sacrifices the concurrent processing benefits and will be significantly slower than the default configuration.

Examples

The examples/ directory contains complete examples demonstrating:

  • Basic file processing
  • CSV to JSONL conversion (line processing)
  • JSONL to CSV conversion (chunk processing with WithCustomChunkProcessor)
  • Custom line transformations
  • Multi-reader merging
  • Performance profiling

Metrics

The processor provides detailed metrics accessible via Metrics(). They are updated atomically during pipeline execution and the duration is finalized once processing completes (until then TimeTook reports the current elapsed time):

type Metrics struct {
    BytesRead    int64         `json:"bytes_read"`    // Total bytes read from source (may exceed written when a row limit is applied)
    BytesWritten int64         `json:"bytes_written"` // Total bytes written after optional transformation
    RowsRead     int64         `json:"rows_read"`     // Total newline-delimited rows consumed
    RowsWritten  int64         `json:"rows_written"`  // Total rows emitted to the output stream
    TimeTook     time.Duration `json:"time_took"`     // Elapsed or finalized processing duration
}

Thread Safety

  • Call Read from a single goroutine at a time (standard io.Reader contract).
  • The internal pipeline is concurrent; metrics fields are updated atomically and can be read safely at any time.
  • Custom processor functions (DataProcessor) must be thread-safe. They receive an io.Writer to write output to, avoiding shared state issues.
  • If your processor needs to update external state, use proper synchronization (e.g., sync.Mutex).
  • When multiple source readers are configured, their data is consumed concurrently.

Requirements

  • Go 1.25.0 or later (uses range-over-func iteration)
  • External dependency: golang.org/x/sync (errgroup)

License

This project is licensed under the MIT License - see the LICENSE file for details.

API Documentation

For complete API documentation, visit pkg.go.dev.

Error Handling

Errors encountered during any stage (reading, processing, writing) propagate through the internal pipe and surface on subsequent Read / io.Copy. After completion, Metrics().TimeTook reflects total duration even if an error occurred.

Future Improvements

Need to work on

Documentation

Overview

Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.

See reader.go for full package documentation and usage examples.

Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.

This package allows you to efficiently process large files or streams by splitting the input into chunks and processing each line (or chunk) concurrently using multiple goroutines. It supports orchestrating multiple io.ReadCloser sources as a single logical stream, allowing you to merge large datasets without custom plumbing.

Features

  • Concurrent processing using a configurable number of workers (goroutines)
  • Custom line processor function for transforming or filtering individual lines
  • Custom chunk processor function for processing entire chunks at once
  • ChunkDetails context passed to processors with ReaderID and ChunkID
  • Metrics reporting (bytes read/written, rows read/written, processing time)
  • Optional row read limit for sampling or testing
  • Multi-source input: merge multiple io.ReadCloser inputs into one stream
  • Backpressure-friendly internal bounded channels
  • Memory-efficient sync.Pool-based chunk allocation

Output Ordering

IMPORTANT: By default, output lines may NOT preserve the original input order. Concurrent processing with multiple workers causes chunks to complete at different times, resulting in unpredictable output ordering.

To guarantee ordered output:

  • Use WithWorkers(1) to process sequentially
  • For multiple input sources, merge them with io.MultiReader before passing to the processor instead of using WithMultiReaders/WithReaders (which processes sources concurrently)

Basic Usage

import (
    "os"
    "io"
    clp "github.com/anvesh9652/concurrent-line-processor"
)

f, err := os.Open("largefile.txt")
clp.ExitOnError(err)
defer f.Close()
pr := clp.NewConcurrentLineProcessor(f, clp.WithWorkers(4), clp.WithChunkSize(1024*1024))
output, err := io.ReadAll(pr)
clp.ExitOnError(err)
fmt.Println(string(output))

Custom Line Processing

The DataProcessor function signature is: func(b []byte, info *ChunkDetails, out io.Writer) error Processors write their output to the provided io.Writer and return any error.

pr := clp.NewConcurrentLineProcessor(f, clp.WithCustomLineProcessor(
    func(line []byte, info *clp.ChunkDetails, out io.Writer) error {
        _, err := out.Write(bytes.ToUpper(line))
        return err
    },
))

Custom Chunk Processing

For processing entire chunks at once (e.g., aggregation):

pr := clp.NewConcurrentLineProcessor(f, clp.WithCustomChunkProcessor(
    func(chunk []byte, info *clp.ChunkDetails, out io.Writer) error {
        // Process entire chunk
        _, err := out.Write(chunk)
        return err
    },
))

Metrics

metrics := pr.Metrics()
fmt.Printf("Rows read: %d, Bytes read: %d, Time took: %s\n", metrics.RowsRead, metrics.BytesRead, metrics.TimeTook)

For more advanced usage, see the examples/ directory.

Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.

See reader.go for full package documentation and usage examples, including how to wire multiple io.ReadCloser sources into a single processor.

Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.

See reader.go for full package documentation and usage examples, including configuration with multiple readers.

Index

Constants

View Source
const (
	BaseSI     = 1000
	BaseBinary = 1024
)
View Source
const (
	KB = 1024
)

Variables

View Source
var Files = []string{
	"/Users/agali/go-workspace/src/github.com/anvesh9652/concurrent-line-processor/data/temp_example.csv",
	"/Users/agali/go-workspace/src/github.com/anvesh9652/concurrent-line-processor/tmp/2024-06-04-details.jsonl",
	"/Users/agali/Downloads/temp/my_data/usage_data_12m.json",
	"/Users/agali/Downloads/temp/my_data/usage_data_3m.json",
	"/Users/agali/Desktop/Work/go-lang/tryouts/1brc/gen/measurements.txt",
}

Files contains a list of test files used for development and testing. This variable is used internally for testing and benchmarking purposes.

Functions

func DrainData added in v1.2.1

func DrainData(r io.Reader)

func EnsureNewLineAtEnd added in v1.1.1

func EnsureNewLineAtEnd(chunk *Chunk)

EnsureNewLineAtEnd ensures the chunk's data ends with a newline character. If the chunk is nil or empty, this is a no-op. It modifies the chunk in-place, either by setting an existing byte or appending.

func ErrWithDebugStack

func ErrWithDebugStack(err error) error

func ExitOnError added in v1.0.3

func ExitOnError(err error)

func Filter added in v1.1.0

func Filter[T any](arr []T, keep func(T) bool) []T

func FormatBytes added in v1.0.10

func FormatBytes(size, base float64) string

FormatBytes formats a byte count into a human-readable string. Use BaseSI (1000) for SI units (KB, MB, GB) or BaseBinary (1024) for binary units (KiB, MiB, GiB).

func FormatDuration added in v1.1.1

func FormatDuration(d time.Duration) string

FormatDuration formats a duration into a human-readable string with appropriate precision. Precision decreases as duration increases: nanoseconds for tiny durations, milliseconds for sub-second, seconds for sub-minute, etc.

func IfNull added in v1.0.3

func IfNull[T any](org *T, def T) T

func Lines added in v1.2.0

func Lines(l []byte, rawLine bool) iter.Seq[[]byte]

Lines returns an iterator over lines in the given byte slice. Each yielded line does not include the trailing newline character if rawLine is false. Otherwise it will have the trailing newline character. Uses Go 1.23+ range-over-func iteration pattern.

func NewConcurrentLineProcessor

func NewConcurrentLineProcessor(r io.ReadCloser, opts ...Option) *concurrentLineProcessor

NewConcurrentLineProcessor creates a new concurrentLineProcessor that reads from the provided io.ReadCloser. It starts processing immediately in background goroutines and returns a processor that implements io.Reader.

When you need to process more than one source, pass nil as the reader and supply inputs with WithMultiReaders.

The processor splits input into chunks, processes each line concurrently using multiple workers, and provides the processed output through the Read method.

IMPORTANT: Output order is NOT guaranteed by default. Use WithWorkers(1) for ordered output.

Example:

file, err := os.Open("large-file.txt")
if err != nil {
	log.Fatal(err)
}
defer file.Close()

processor := clp.NewConcurrentLineProcessor(file,
	clp.WithWorkers(8),
	clp.WithChunkSize(1024*1024),
)

output, err := io.ReadAll(processor)
if err != nil {
	log.Fatal(err)
}

func PrintAsJsonString

func PrintAsJsonString(v any)

func PrintSummaryPeriodically added in v1.0.12

func PrintSummaryPeriodically(ctx context.Context, p *concurrentLineProcessor, interval time.Duration)

PrintSummaryPeriodically prints the processor's summary at regular intervals. It stops when the context is cancelled. Useful for monitoring long-running processes.

func Ptr added in v1.2.0

func Ptr[T any](v T) *T

func WithOpts

func WithOpts(p *concurrentLineProcessor, opts ...Option) *concurrentLineProcessor

WithOpts applies the given options to the concurrentLineProcessor. This is a convenience function for applying multiple options at once.

Types

type Chunk

type Chunk struct {
	// contains filtered or unexported fields
}

Chunk represents a piece of data to be processed. It implements io.Writer and io.ByteWriter for efficient data accumulation.

Each chunk has an ID for ordering within a reader and a readerID to identify which source reader it came from (useful when processing multiple readers).

func (*Chunk) Grow added in v1.2.1

func (chunk *Chunk) Grow(n int)

func (*Chunk) Write added in v1.2.0

func (chunk *Chunk) Write(src []byte) (int, error)

Write implements io.Writer, appending src to the chunk's data buffer. It uses copy for efficiency when possible, falling back to append for overflow. The endingPos is updated to reflect the new data boundary.

func (*Chunk) WriteByte added in v1.2.0

func (chunk *Chunk) WriteByte(b byte) error

type ChunkDetails added in v1.2.0

type ChunkDetails struct {
	// ReaderID identifies which source reader this data came from.
	// Useful when processing multiple readers via WithReaders.
	ReaderID int
	// ChunkID is the sequential ID of the chunk within its source reader.
	// Can be used for ordering or debugging purposes.
	ChunkID int
}

ChunkDetails provides contextual information about the data being processed. It is passed to DataProcessor functions to provide context about the source. All fields use zero-based indexing.

type DataProcessor added in v1.2.0

type DataProcessor func(b []byte, info *ChunkDetails, out io.Writer) error

DataProcessor is a function type for processing individual lines or chunks. It receives the data as []byte, contextual info via ChunkDetails (containing ReaderID and ChunkID), and an io.Writer to write the processed output to.

The processor must write its output to the provided io.Writer rather than returning the result. This design allows for efficient streaming without intermediate allocations.

Implementations must be thread-safe as they may be called concurrently from multiple workers. Do not mutate shared state without proper synchronization (e.g., sync.Mutex).

Example:

func(b []byte, info *ChunkDetails, out io.Writer) error {
    _, err := out.Write(bytes.ToUpper(b))
    return err
}

type Metrics

type Metrics struct {
	// BytesRead is the total number of bytes read from the source reader.
	// When RowsReadLimit is set, it might read more bytes than the bytes written.
	BytesRead int64 `json:"bytes_read"`
	// BytesWritten is the total number of bytes written after processing each line.
	BytesWritten int64 `json:"bytes_written"`
	// RowsRead is the total number of rows read from the source reader.
	RowsRead int64 `json:"rows_read"`
	// RowsWritten is the total number of rows written to the output stream.
	RowsWritten int64 `json:"rows_written"`
	// TimeTook is the total time taken to read and process the data.
	TimeTook time.Duration `json:"time_took"`
}

Metrics contains performance and processing statistics for a concurrentLineProcessor.

type Option

type Option func(*concurrentLineProcessor)

Option is a function type for configuring concurrentLineProcessor instances. Options are passed to NewConcurrentLineProcessor to customize behavior.

func WithChannelSize added in v1.0.10

func WithChannelSize(size int) Option

WithChannelSize sets the size of the channels used for input and output streams. A larger channel size can improve throughput for high-volume data processing. Default (when unspecified) is 70 (see defaultChanSize in reader.go).

Example:

clp.NewConcurrentLineProcessor(reader, clp.WithChannelSize(1000)) // 1000 items in channel

func WithChunkSize

func WithChunkSize(size int) Option

WithChunkSize sets the chunk size for reading data from the source. Larger chunk sizes can improve performance for large files but may use more memory. The default chunk size is 64KB.

Example:

clp.NewConcurrentLineProcessor(reader, clp.WithChunkSize(1024*1024)) // 1MB chunks

func WithContext added in v1.1.1

func WithContext(ctx context.Context) Option

WithContext sets the context for the concurrentLineProcessor. This context can be used to manage cancellation and timeouts for the processing operations.

Example:

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
clp.NewConcurrentLineProcessor(reader, clp.WithContext(ctx))

func WithCustomChunkProcessor added in v1.2.0

func WithCustomChunkProcessor(c DataProcessor) Option

WithCustomChunkProcessor sets a custom function to process entire chunks at once. Unlike WithCustomLineProcessor, this processes the full chunk buffer rather than individual lines, which can be more efficient for certain operations like aggregation.

The function receives a chunk as []byte, a ChunkDetails struct with contextual info (ReaderID, ChunkID), and an io.Writer to write output to.

The processor must write its output to the provided io.Writer. A newline is automatically ensured at the end of the chunk output.

The function must be thread-safe and should not modify external state without proper synchronization (e.g., sync.Mutex).

Example:

// Process entire chunk and extract all JSON keys
processor := func(chunk []byte, info *clp.ChunkDetails, out io.Writer) error {
    // Process chunk as a whole
    _, err := out.Write(chunk)
    return err
}
clp.NewConcurrentLineProcessor(reader, clp.WithCustomChunkProcessor(processor))

func WithCustomLineProcessor

func WithCustomLineProcessor(c DataProcessor) Option

WithCustomLineProcessor sets a custom function to process each line individually. The function receives a line as []byte (without trailing newline), a ChunkDetails struct with contextual info (ReaderID, ChunkID), and an io.Writer to write output to.

The processor must write its output to the provided io.Writer. A newline is automatically appended after each processed line.

The function must be thread-safe and should not modify external state without proper synchronization (e.g., sync.Mutex).

Example:

// Convert lines to uppercase
processor := func(line []byte, info *clp.ChunkDetails, out io.Writer) error {
    _, err := out.Write(bytes.ToUpper(line))
    return err
}
clp.NewConcurrentLineProcessor(reader, clp.WithCustomLineProcessor(processor))

func WithReaders added in v1.2.1

func WithReaders(readers ...io.ReadCloser) Option

WithReaders sets multiple source readers for the concurrentLineProcessor. When used, the reader passed to NewConcurrentLineProcessor can be nil because this option replaces the internal reader list. Empty readers will by handled by Read method.

IMPORTANT: Multiple readers are processed concurrently, so output from different sources will be interleaved unpredictably. If you need ordered output across multiple sources, merge them with io.MultiReader before passing to the processor:

merged := io.NopCloser(io.MultiReader(r1, r2, r3))
clp.NewConcurrentLineProcessor(merged, clp.WithWorkers(1))

Example:

readers := []io.ReadCloser{reader1, reader2, reader3}
clp.NewConcurrentLineProcessor(nil, clp.WithReaders(readers...))

func WithRowsReadLimit

func WithRowsReadLimit(limit int) Option

WithRowsReadLimit sets a limit on the number of rows to read from the source. Use -1 for no limit (default). This is useful for processing only a subset of a large file for testing or sampling purposes.

Example:

clp.NewConcurrentLineProcessor(reader, clp.WithRowsReadLimit(1000)) // Process only first 1000 lines

func WithWorkers

func WithWorkers(n int) Option

WithWorkers sets the number of worker goroutines for concurrent processing. More workers can improve performance for CPU-intensive line processing, but may not help for I/O-bound operations. The default is runtime.NumCPU().

IMPORTANT: Multiple workers process chunks concurrently, which means output order is NOT preserved. Use WithWorkers(1) if you need output to maintain input order.

Example:

clp.NewConcurrentLineProcessor(reader, clp.WithWorkers(8))

For ordered output:

clp.NewConcurrentLineProcessor(reader, clp.WithWorkers(1))

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL