concurrentlineprocessor

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 23, 2025 License: MIT Imports: 16 Imported by: 1

README

Concurrent Line Processor

Go Reference Go Report Card

A high-performance, concurrent line-by-line processor for large files and streams in Go. This package splits input into chunks and processes lines concurrently using goroutines. You can also stitch multiple io.ReadCloser sources together and treat them as a single logical stream without extra fan-in plumbing.

Features

  • Concurrent Processing: Process lines concurrently with a configurable number of worker goroutines
  • Memory Efficient: Uses a sync.Pool and streaming; never loads entire file into memory
  • Customizable: Supply a thread-safe custom line processor function
  • Metrics: Built-in metrics (bytes read/written, rows read/written, processing duration)
  • Standard Interface: Implements io.Reader and has a Close() for resource cleanup
  • Flexible Configuration: Configure chunk size, worker count, channel size, and row read limit
  • Multi-source Input: Merge multiple io.ReadCloser inputs into one stream (ordering between sources is nondeterministic)
  • Backpressure Friendly: Internal bounded channels help balance producer/consumer throughput

Installation

go get github.com/anvesh9652/concurrent-line-processor@latest

Quick Start

Below are common usage patterns. Each example is self-contained and can be copied into a file and run with go run.

Quick Start

Below are common usage patterns. Each example is self-contained and can be copied into a file and run with go run.

Note: Reading via io.ReadAll will accumulate all processed data in memory. Prefer io.Copy to a file or stream for very large inputs.

1. Basic Usage (stream to stdout)
package main

import (
    "fmt"
    "io"
    "os"

    clp "github.com/anvesh9652/concurrent-line-processor"
)

func main() {
    file, err := os.Open("large-file.txt")
    if err != nil {
        panic(err)
    }
    defer file.Close()

    processor := clp.NewConcurrentLineProcessor(file)
    defer processor.Close() // only needed when multiple readers were supplied or to flush pipe early

    // Stream the processed output directly
    if _, err := io.Copy(os.Stdout, processor); err != nil {
        panic(err)
    }

    metrics := processor.Metrics()
    fmt.Printf("Processed %d rows, %d bytes in %s\n", metrics.RowsRead, metrics.BytesRead, metrics.TimeTook)
}
2. Merging Multiple Sources (nondeterministic interleaving)
package main

import (
    "io"
    "os"

    clp "github.com/anvesh9652/concurrent-line-processor"
)

func main() {
    files := []string{"part-1.log", "part-2.log", "part-3.log"}
    var readers []io.ReadCloser
    for _, name := range files {
        f, err := os.Open(name)
        if err != nil {
            panic(err)
        }
        readers = append(readers, f)
    }

    processor := clp.NewConcurrentLineProcessor(nil,
        clp.WithMultiReaders(readers...),
        clp.WithWorkers(4),
    )
    defer processor.Close()

    _, err := io.Copy(os.Stdout, processor)
    if err != nil {
        panic(err)
    }
}
3. Custom Line Processing
package main

import (
    "bytes"
    "io"
    "os"

    clp "github.com/anvesh9652/concurrent-line-processor"
)

func main() {
    file, err := os.Open("data.csv")
    if err != nil {
        panic(err)
    }
    defer file.Close()

    upperCaseProcessor := func(line []byte) ([]byte, error) {
        return bytes.ToUpper(line), nil
    }

    processor := clp.NewConcurrentLineProcessor(file,
        clp.WithCustomLineProcessor(upperCaseProcessor),
        clp.WithWorkers(8),
        clp.WithChunkSize(1024*1024), // 1MB chunks
    )
    defer processor.Close()

    outFile, err := os.Create("output.csv")
    if err != nil { panic(err) }
    defer outFile.Close()
    if _, err := io.Copy(outFile, processor); err != nil { panic(err) }
}
4. CSV to JSONL Conversion
package main

import (
    "bytes"
    "encoding/csv"
    "encoding/json"
    "io"
    "os"

    clp "github.com/anvesh9652/concurrent-line-processor"
)

func convertCSVToJSONL(inputFile, outputFile string, headers []string) error {
    input, err := os.Open(inputFile)
    if err != nil {
        return err
    }
    defer input.Close()

    output, err := os.Create(outputFile)
    if err != nil {
        return err
    }
    defer output.Close()

    csvToJSONProcessor := func(line []byte) ([]byte, error) {
        reader := csv.NewReader(bytes.NewReader(line))
        row, err := reader.Read()
        if err != nil {
            return nil, err
        }
        record := make(map[string]string)
        for i, header := range headers {
            if i < len(row) {
                record[header] = row[i]
            }
        }
        return json.Marshal(record)
    }

    processor := clp.NewConcurrentLineProcessor(input,
        clp.WithCustomLineProcessor(csvToJSONProcessor),
        clp.WithWorkers(4),
        clp.WithRowsReadLimit(-1),
    )

    _, err = io.Copy(output, processor)
    return err
}
5. Processing with Row Limit
package main

import (
    "fmt"
    "io"
    "os"

    clp "github.com/anvesh9652/concurrent-line-processor"
)

func processFirstThousandRows(filename string) error {
    file, err := os.Open(filename)
    if err != nil {
        return err
    }
    defer file.Close()

    processor := clp.NewConcurrentLineProcessor(file,
        clp.WithRowsReadLimit(1000),
        clp.WithWorkers(2),
    )

    _, err = io.Copy(io.Discard, processor)
    if err != nil {
        return err
    }

    metrics := processor.Metrics()
    fmt.Printf("Processed %d rows in %s\n", metrics.RowsRead, metrics.TimeTook)
    return nil
}

Processing stops when the limit of newline-delimited rows is reached. Internally the reader may consume extra bytes to detect the boundary.

Performance Considerations

Memory Usage
  • A sync.Pool minimizes per-chunk allocations.
  • Memory scales with (chunk size * active workers) plus channel buffering.
  • Default chunk size is 64KB (not 30KB as previously documented). Tune based on typical line length.
Worker Count
  • More workers help if your custom line processor is CPU-bound.
  • For pure pass-through or I/O-bound workloads, increasing workers yields limited benefit.
  • Default is runtime.NumCPU().
Chunk Size
  • Larger chunks reduce syscall overhead but increase peak memory.
  • Smaller chunks improve responsiveness and reduce memory footprint.
  • Default: 64KB. Examples show tuning up to multi-MB for high-throughput transformations.
Channel Size
  • Controls buffering between reader, processors, and writer stages.
  • Default: 70. Increase if workers frequently starve or if you have bursty input.
Ordering
  • Output line ordering relative to original input is preserved within individual chunks but overall ordering is not guaranteed when using multiple readers or very small chunk sizes with multiple workers.
  • If strict ordering matters, run with WithWorkers(1).

Examples

The examples/ directory contains complete examples demonstrating:

  • Basic file processing
  • CSV to JSONL conversion
  • JSONL to CSV conversion
  • Custom line transformations
  • Multi-reader merging
  • Performance profiling

Metrics

The processor provides detailed metrics accessible via Metrics(). They are updated atomically during pipeline execution and the duration is finalized once processing completes (until then TimeTook reports the current elapsed time):

type Metrics struct {
    BytesRead    int64         `json:"bytes_read"`    // Total bytes read from source (may exceed written when a row limit is applied)
    BytesWritten int64         `json:"bytes_written"` // Total bytes written after optional transformation
    RowsRead     int64         `json:"rows_read"`     // Total newline-delimited rows consumed
    RowsWritten  int64         `json:"rows_written"`  // Total rows emitted to the output stream
    TimeTook     time.Duration `json:"time_took"`     // Elapsed or finalized processing duration
}

Thread Safety

  • Call Read from a single goroutine at a time (standard io.Reader contract).
  • The internal pipeline is concurrent; metrics fields are updated atomically and can be read safely at any time.
  • Custom line processor functions must be thread-safe. Avoid mutating shared state unless you synchronize externally.
  • When multiple source readers are configured, their data is consumed concurrently.

Requirements

  • Go 1.24.0 or later
  • External dependency: golang.org/x/sync (errgroup)

License

This project is licensed under the MIT License - see the LICENSE file for details.

API Documentation

For complete API documentation, visit pkg.go.dev.

Error Handling

Errors encountered during any stage (reading, processing, writing) propagate through the internal pipe and surface on subsequent Read / io.Copy. After completion, Metrics().TimeTook reflects total duration even if an error occurred.

Future Improvements

Need to work on

Documentation

Overview

Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.

See reader.go for full package documentation and usage examples.

Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.

This package allows you to efficiently process large files or streams by splitting the input into chunks and processing each line concurrently using multiple goroutines. It now supports orchestrating multiple io.ReadCloser sources as a single logical stream, allowing you to merge large datasets without custom plumbing.

Features

  • Concurrent processing of lines using a configurable number of workers (goroutines)
  • Custom line processor function for transforming or filtering lines
  • Metrics reporting (bytes read, rows read, processing time, etc.)
  • Optional row read limit

Basic Usage

import (
    "os"
    clp "github.com/anvesh9652/concurrent-line-processor"
)

f, err := os.Open("largefile.txt")
clp.ExitOnError(err)
defer f.Close()
pr := clp.NewConcurrentLineProcessor(f, clp.WithWorkers(4), clp.WithChunkSize(1024*1024))
output, err := io.ReadAll(pr)
clp.ExitOnError(err)
fmt.Println(string(output))

Custom Line Processing

pr := clp.NewConcurrentLineProcessor(f, clp.WithCustomLineProcessor(func(line []byte) ([]byte, error) {
    // Transform or filter the line
    return bytes.ToUpper(line), nil
}))

Metrics

metrics := pr.Metrics()
fmt.Printf("Rows read: %d, Bytes read: %d, Time took: %s\n", metrics.RowsRead, metrics.BytesRead, metrics.TimeTook)

For more advanced usage, see the examples/ directory.

Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.

See reader.go for full package documentation and usage examples, including how to wire multiple io.ReadCloser sources into a single processor.

Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.

See reader.go for full package documentation and usage examples, including configuration with multiple readers.

Index

Constants

This section is empty.

Variables

View Source
var Files = []string{
	"/Users/agali/go-workspace/src/github.com/anvesh9652/concurrent-line-processor/data/temp_example.csv",
	"/Users/agali/go-workspace/src/github.com/anvesh9652/concurrent-line-processor/tmp/2024-06-04-details.jsonl",
	"/Users/agali/Downloads/temp/my_data/usage_data_12m.json",
	"/Users/agali/Downloads/temp/my_data/usage_data_3m.json",
	"/Users/agali/Desktop/Work/go-lang/tryouts/1brc/gen/measurements.txt",
}

Files contains a list of test files used for development and testing. This variable is used internally for testing and benchmarking purposes.

View Source
var (
	KB = 1024
)

Functions

func EnsureNewLineAtEnd added in v1.1.1

func EnsureNewLineAtEnd(chunk *Chunk)

func ErrWithDebugStack

func ErrWithDebugStack(err error) error

func ExitOnError added in v1.0.3

func ExitOnError(err error)

func Filter added in v1.1.0

func Filter[T any](arr []T, keep func(T) bool) []T

func FormatBytes added in v1.0.10

func FormatBytes(size float64) string

func FormatDuration added in v1.1.1

func FormatDuration(d time.Duration) string

func IfNull added in v1.0.3

func IfNull[T any](org *T, def T) T

func NewConcurrentLineProcessor

func NewConcurrentLineProcessor(r io.ReadCloser, opts ...Option) *concurrentLineProcessor

NewConcurrentLineProcessor creates a new concurrentLineProcessor that reads from the provided io.ReadCloser. It starts processing immediately in background goroutines and returns a processor that implements io.Reader.

When you need to process more than one source, pass nil as the reader and supply inputs with WithMultiReaders.

The processor splits input into chunks, processes each line concurrently using multiple workers, and provides the processed output through the Read method.

Example:

file, err := os.Open("large-file.txt")
if err != nil {
	log.Fatal(err)
}
defer file.Close()

processor := clp.NewConcurrentLineProcessor(file,
	clp.WithWorkers(8),
	clp.WithChunkSize(1024*1024),
)

output, err := io.ReadAll(processor)
if err != nil {
	log.Fatal(err)
}

func PrintAsJsonString

func PrintAsJsonString(v any)

func PrintSummaryPeriodically added in v1.0.12

func PrintSummaryPeriodically(ctx context.Context, p *concurrentLineProcessor, interval time.Duration)

func WithOpts

func WithOpts(p *concurrentLineProcessor, opts ...Option) *concurrentLineProcessor

WithOpts applies the given options to the concurrentLineProcessor. This is a convenience function for applying multiple options at once.

Types

type Chunk

type Chunk struct {
	// contains filtered or unexported fields
}

Chunk represents a piece of data to be processed, containing an ID for ordering and a pointer to the actual data buffer.

type LineDetails added in v1.1.1

type LineDetails struct {
	// ReaderID is the ID of the source reader from which this line was read.
	ReaderID int
	// ChunkID is the ID of the chunk which we have read from the source reader.
	ChunkID int
}

LineDetails provides contextual information about a line being processed. All the fields follow zero-based indexing.

type LineProcessor

type LineProcessor func(b []byte, info *LineDetails) ([]byte, error)

LineProcessor is a function type for processing individual lines. It receives a line as []byte and info and then returns the processed line and any error. Implementations must be thread-safe as they may be called concurrently.

type Metrics

type Metrics struct {
	// BytesRead is the total number of bytes read from the source reader.
	// When RowsReadLimit is set, it might read more bytes than the bytes written.
	BytesRead int64 `json:"bytes_read"`
	// BytesWritten is the total number of bytes written after processing each line.
	BytesWritten int64 `json:"bytes_written"`
	// RowsRead is the total number of rows read from the source reader.
	RowsRead int64 `json:"rows_read"`
	// RowsWritten is the total number of rows written to the output stream.
	RowsWritten int64 `json:"rows_written"`
	// TimeTook is the total time taken to read and process the data.
	TimeTook time.Duration `json:"time_took"`
}

Metrics contains performance and processing statistics for a concurrentLineProcessor.

type Option

type Option func(*concurrentLineProcessor)

Option is a function type for configuring concurrentLineProcessor instances. Options are passed to NewConcurrentLineProcessor to customize behavior.

func WithChannelSize added in v1.0.10

func WithChannelSize(size int) Option

WithChannelSize sets the size of the channels used for input and output streams. A larger channel size can improve throughput for high-volume data processing. Default (when unspecified) is 70 (see defaultChanSize in reader.go).

Example:

clp.NewConcurrentLineProcessor(reader, clp.WithChannelSize(1000)) // 1000 items in channel

func WithChunkSize

func WithChunkSize(size int) Option

WithChunkSize sets the chunk size for reading data from the source. Larger chunk sizes can improve performance for large files but may use more memory. The default chunk size is 64KB.

Example:

clp.NewConcurrentLineProcessor(reader, clp.WithChunkSize(1024*1024)) // 1MB chunks

func WithContext added in v1.1.1

func WithContext(ctx context.Context) Option

WithContext sets the context for the concurrentLineProcessor. This context can be used to manage cancellation and timeouts for the processing operations.

Example:

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
clp.NewConcurrentLineProcessor(reader, clp.WithContext(ctx))

func WithCustomLineProcessor

func WithCustomLineProcessor(c LineProcessor) Option

WithCustomLineProcessor sets a custom function to process each line. The function receives a line as []byte and should return the processed line. The function must be thread-safe and should not modify external state without proper synchronization.

Example:

// Convert lines to uppercase
processor := func(line []byte) ([]byte, error) {
    return bytes.ToUpper(line), nil
}
clp.NewConcurrentLineProcessor(reader, clp.WithCustomLineProcessor(processor))

func WithMultiReaders added in v1.1.0

func WithMultiReaders(readers ...io.ReadCloser) Option

WithMultiReaders sets multiple source readers for the concurrentLineProcessor. When used, the reader passed to NewConcurrentLineProcessor can be nil because this option replaces the internal reader list. Empty readers will by handled by Read method.

Example:

readers := []io.ReadCloser{reader1, reader2, reader3}
clp.NewConcurrentLineProcessor(nil, clp.WithMultiReaders(readers...))

func WithRowsReadLimit

func WithRowsReadLimit(limit int) Option

WithRowsReadLimit sets a limit on the number of rows to read from the source. Use -1 for no limit (default). This is useful for processing only a subset of a large file for testing or sampling purposes.

Example:

clp.NewConcurrentLineProcessor(reader, clp.WithRowsReadLimit(1000)) // Process only first 1000 lines

func WithWorkers

func WithWorkers(n int) Option

WithWorkers sets the number of worker goroutines for concurrent processing. More workers can improve performance for CPU-intensive line processing, but may not help for I/O-bound operations. The default is runtime.NumCPU().

Example:

clp.NewConcurrentLineProcessor(reader, clp.WithWorkers(8))

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL