concurrentlineprocessor

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 20, 2025 License: MIT Imports: 14 Imported by: 1

README

Concurrent Line Processor

Go Reference Go Report Card

A high-performance, concurrent line-by-line processor for large files and streams in Go. This package enables efficient processing of large files by splitting input into chunks and processing each line concurrently using multiple goroutines. You can also stitch multiple data sources together and treat them as a single stream.

Features

  • Concurrent Processing: Process lines concurrently using configurable number of worker goroutines
  • Memory Efficient: Uses memory pooling and streaming to handle large files without loading everything into memory
  • Customizable: Support for custom line processing functions
  • Metrics: Built-in performance metrics (bytes read, rows processed, processing time etc..)
  • Standard Interface: Implements io.ReadCloser for seamless integration with existing Go I/O patterns
  • Flexible Configuration: Configurable chunk size, worker count, and row limits
  • Multi-source Input: Combine multiple io.ReadCloser inputs into a single logical stream without manual fan-in code

Installation

go get github.com/anvesh9652/concurrent-line-processor

Quick Start

Basic Usage
package main

import (
    "fmt"
    "io"
    "os"
    
    clp "github.com/anvesh9652/concurrent-line-processor"
)

func main() {
    // Open a large file
    file, err := os.Open("large-file.txt")
    if err != nil {
        panic(err)
    }
    defer file.Close()

    // Create a concurrent line processor
    processor := clp.NewConcurrentLineProcessor(file)
### Merging Multiple Sources

Process multiple files (or any `io.ReadCloser` values) as a single logical stream:

```go
package main

import (
    "io"
    "os"

    clp "github.com/anvesh9652/concurrent-line-processor"
)

func main() {
    files := []string{"part-1.log", "part-2.log", "part-3.log"}
    var readers []io.ReadCloser
    for _, name := range files {
        f, err := os.Open(name)
        if err != nil {
            panic(err)
        }
        readers = append(readers, f)
    }

    processor := clp.NewConcurrentLineProcessor(nil,
        clp.WithMultiReaders(readers...),
        clp.WithWorkers(4),
    )

    defer processor.Close()
    _, err := io.Copy(os.Stdout, processor)
    if err != nil {
        panic(err)
    }
}
// Read all processed output
output, err := io.ReadAll(processor)
if err != nil {
    panic(err)
}

fmt.Println(string(output))

// Print processing metrics
metrics := processor.Metrics()
fmt.Printf("Processed %d rows, %d bytes in %s\n", 
    metrics.RowsRead, metrics.BytesRead, metrics.TimeTook)

}


### Custom Line Processing

Transform each line using a custom processor function:

```go
package main

import (
    "bytes"
    "encoding/json"
    "io"
    "os"
    "strings"
    
    clp "github.com/anvesh9652/concurrent-line-processor"
)

func main() {
    file, err := os.Open("data.csv")
    if err != nil {
        panic(err)
    }
    defer file.Close()

    // Convert CSV lines to uppercase
    upperCaseProcessor := func(line []byte) ([]byte, error) {
        return bytes.ToUpper(line), nil
    }

    processor := clp.NewConcurrentLineProcessor(file,
        clp.WithCustomLineProcessor(upperCaseProcessor),
        clp.WithWorkers(8),
        clp.WithChunkSize(1024*1024), // 1MB chunks
    )

    output, err := io.ReadAll(processor)
    if err != nil {
        panic(err)
    }

    // Write to output file
    err = os.WriteFile("output.csv", output, 0644)
    if err != nil {
        panic(err)
    }
}
CSV to JSONL Conversion
package main

import (
    "bytes"
    "encoding/csv"
    "encoding/json"
    "io"
    "os"
    
    clp "github.com/anvesh9652/concurrent-line-processor"
)

func convertCSVToJSONL(inputFile, outputFile string, headers []string) error {
    input, err := os.Open(inputFile)
    if err != nil {
        return err
    }
    defer input.Close()

    output, err := os.Create(outputFile)
    if err != nil {
        return err
    }
    defer output.Close()

    // Custom processor to convert CSV row to JSON
    csvToJSONProcessor := func(line []byte) ([]byte, error) {
        reader := csv.NewReader(bytes.NewReader(line))
        row, err := reader.Read()
        if err != nil {
            return nil, err
        }

        // Create map from headers and row values
        record := make(map[string]string)
        for i, header := range headers {
            if i < len(row) {
                record[header] = row[i]
            }
        }

        return json.Marshal(record)
    }

    processor := clp.NewConcurrentLineProcessor(input,
        clp.WithCustomLineProcessor(csvToJSONProcessor),
        clp.WithWorkers(4),
        clp.WithRowsReadLimit(-1), // Process all rows
    )

    _, err = io.Copy(output, processor)
    return err
}
Processing with Row Limit

Process only the first N rows of a large file:

package main

import (
    "io"
    "os"
    
    clp "github.com/anvesh9652/concurrent-line-processor"
)

func processFirstThousandRows(filename string) error {
    file, err := os.Open(filename)
    if err != nil {
        return err
    }
    defer file.Close()

    processor := clp.NewConcurrentLineProcessor(file,
        clp.WithRowsReadLimit(1000), // Only process first 1000 rows
        clp.WithWorkers(2),
    )

    // Process and discard output (useful for analysis)
    _, err = io.Copy(io.Discard, processor)
    if err != nil {
        return err
    }

    metrics := processor.Metrics()
    fmt.Printf("Processed %d rows in %s\n", metrics.RowsRead, metrics.TimeTook)
    
    return nil
}

Configuration Options

The processor can be configured using the following options:

WithWorkers(n int)

Sets the number of worker goroutines for concurrent processing.

  • Default: runtime.NumCPU()
  • Use 1 for sequential processing
  • Higher values may improve performance for CPU-intensive operations
WithChunkSize(size int)

Sets the size of chunks read from the source.

  • Default: 30KB (1024 * 30)
  • Larger chunks may improve performance but use more memory
  • Smaller chunks may reduce memory usage but increase overhead
WithCustomLineProcessor(processor LineProcessor)

Sets a custom function to process each line.

  • Function signature: func([]byte) ([]byte, error)
  • Must be thread-safe
  • Should not modify external state without proper synchronization
WithRowsReadLimit(limit int)

Sets a limit on the number of rows to read.

  • Default: -1 (no limit)
  • Useful for processing subsets of large files
  • Processing stops when the limit is reached

Performance Considerations

Memory Usage
  • The processor uses memory pooling to minimize allocations
  • Memory usage scales with chunk size and number of workers
  • For very large files, consider smaller chunk sizes
Worker Count
  • More workers help with CPU-intensive line processing
  • For I/O-bound operations, more workers may not help
  • Start with runtime.NumCPU() and adjust based on your use case
Chunk Size
  • Larger chunks reduce overhead but increase memory usage
  • Smaller chunks are more memory-efficient but may have higher overhead
  • The default 30KB works well for most use cases

Examples

The examples/ directory contains complete examples demonstrating:

  • Basic file processing
  • CSV to JSONL conversion
  • JSONL to CSV conversion
  • Custom line transformations
  • Performance profiling

Metrics

The processor provides detailed metrics accessible via the Metrics() method:

type Metrics struct {
    BytesRead        int64  `json:"bytes_read"`        // Total bytes read from source
    BytesTransformed int64  `json:"bytes_transformed"` // Total bytes after processing each line
    RowsRead         int64  `json:"rows_read"`         // Total rows processed
    RowsWritten      int64  `json:"rows_written"`      // Total rows written to the output stream
    TimeTook         string `json:"time_took"`         // Total processing time
}

Thread Safety

  • The concurrentLineProcessor itself is safe for concurrent use
  • Custom line processor functions must be thread-safe
  • Metrics can be safely accessed concurrently
  • The processor implements io.Reader and can be used safely by one goroutine at a time

Requirements

  • Go 1.24.0 or later
  • No external dependencies beyond golang.org/x/sync

License

This project is licensed under the MIT License - see the LICENSE file for details.

API Documentation

For complete API documentation, visit pkg.go.dev.

Documentation

Overview

Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.

See reader.go for full package documentation and usage examples.

Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.

This package allows you to efficiently process large files or streams by splitting the input into chunks and processing each line concurrently using multiple goroutines. It now supports orchestrating multiple io.ReadCloser sources as a single logical stream, allowing you to merge large datasets without custom plumbing.

Features

  • Concurrent processing of lines using a configurable number of workers (goroutines)
  • Custom line processor function for transforming or filtering lines
  • Metrics reporting (bytes read, rows read, processing time, etc.)
  • Optional row read limit

Basic Usage

import (
    "os"
    clp "github.com/anvesh9652/concurrent-line-processor"
)

f, err := os.Open("largefile.txt")
clp.ExitOnError(err)
defer f.Close()
pr := clp.NewConcurrentLineProcessor(f, clp.WithWorkers(4), clp.WithChunkSize(1024*1024))
output, err := io.ReadAll(pr)
clp.ExitOnError(err)
fmt.Println(string(output))

Custom Line Processing

pr := clp.NewConcurrentLineProcessor(f, clp.WithCustomLineProcessor(func(line []byte) ([]byte, error) {
    // Transform or filter the line
    return bytes.ToUpper(line), nil
}))

Metrics

metrics := pr.Metrics()
fmt.Printf("Rows read: %d, Bytes read: %d, Time took: %s\n", metrics.RowsRead, metrics.BytesRead, metrics.TimeTook)

For more advanced usage, see the examples/ directory.

Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.

See reader.go for full package documentation and usage examples, including how to wire multiple io.ReadCloser sources into a single processor.

Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.

See reader.go for full package documentation and usage examples, including configuration with multiple readers.

Index

Constants

This section is empty.

Variables

View Source
var Files = []string{
	"/Users/agali/go-workspace/src/github.com/anvesh9652/concurrent-line-processor/data/temp_example.csv",
	"/Users/agali/go-workspace/src/github.com/anvesh9652/concurrent-line-processor/tmp/2024-06-04-details.jsonl",
	"/Users/agali/go-workspace/src/github.com/anvesh9652/concurrent-line-processor/tmp/transform-00002_1.csv.jsonl",
	"/Users/agali/Desktop/Work/go-lang/tryouts/1brc/src_data.txt",
}

Files contains a list of test files used for development and testing. This variable is used internally for testing and benchmarking purposes.

Functions

func AppendNewLine added in v1.0.10

func AppendNewLine(b *[]byte)

func ErrWithDebugStack

func ErrWithDebugStack(err error) error

func ExitOnError added in v1.0.3

func ExitOnError(err error)

func Filter added in v1.1.0

func Filter[T any](arr []T, keep func(T) bool) []T

func FormatBytes added in v1.0.10

func FormatBytes(size int) string

func IfNull added in v1.0.3

func IfNull[T any](org *T, def T) T

func NewConcurrentLineProcessor

func NewConcurrentLineProcessor(r io.ReadCloser, opts ...Option) *concurrentLineProcessor

NewConcurrentLineProcessor creates a new concurrentLineProcessor that reads from the provided io.ReadCloser. It starts processing immediately in background goroutines and returns a processor that implements io.Reader.

When you need to process more than one source, pass nil as the reader and supply inputs with WithMultiReaders.

The processor splits input into chunks, processes each line concurrently using multiple workers, and provides the processed output through the Read method.

Example:

file, err := os.Open("large-file.txt")
if err != nil {
	log.Fatal(err)
}
defer file.Close()

processor := clp.NewConcurrentLineProcessor(file,
	clp.WithWorkers(8),
	clp.WithChunkSize(1024*1024),
)

output, err := io.ReadAll(processor)
if err != nil {
	log.Fatal(err)
}

func PrintAsJsonString

func PrintAsJsonString(v any)

func PrintSummaryPeriodically added in v1.0.12

func PrintSummaryPeriodically(p *concurrentLineProcessor, now time.Time)

func WithOpts

func WithOpts(p *concurrentLineProcessor, opts ...Option)

WithOpts applies the given options to the concurrentLineProcessor. This is a convenience function for applying multiple options at once.

Types

type Chunk

type Chunk struct {
	// contains filtered or unexported fields
}

Chunk represents a piece of data to be processed, containing an ID for ordering and a pointer to the actual data buffer.

type LineProcessor

type LineProcessor func([]byte) ([]byte, error)

LineProcessor is a function type for processing individual lines. It receives a line as []byte and returns the processed line and any error. Implementations must be thread-safe as they may be called concurrently.

type Metrics

type Metrics struct {
	// BytesRead is the total number of bytes read from the source reader.
	// When RowsReadLimit is set, it might read more bytes than the transformed bytes.
	BytesRead int64 `json:"bytes_read"`
	// BytesTransformed is the total number of bytes after processing each line.
	BytesTransformed int64 `json:"bytes_transformed"`
	// RowsRead is the total number of rows read from the source reader.
	RowsRead int64 `json:"rows_read"`
	// RowsWritten is the total number of rows written to the output stream.
	RowsWritten int64 `json:"rows_written"`
	// TimeTook is the total time taken to read and process the data.
	TimeTook string `json:"time_took"`
}

Metrics contains performance and processing statistics for a concurrentLineProcessor.

type Option

type Option func(*concurrentLineProcessor)

Option is a function type for configuring concurrentLineProcessor instances. Options are passed to NewConcurrentLineProcessor to customize behavior.

func WithChannelSize added in v1.0.10

func WithChannelSize(size int) Option

WithChannelSize sets the size of the channels used for input and output streams. A larger channel size can improve throughput for high-volume data processing, The default channel size is 100.

Example:

clp.NewConcurrentLineProcessor(reader, clp.WithChannelSize(1000)) // 1000 items in channel

func WithChunkSize

func WithChunkSize(size int) Option

WithChunkSize sets the chunk size for reading data from the source. Larger chunk sizes can improve performance for large files but may use more memory. The default chunk size is 64KB.

Example:

clp.NewConcurrentLineProcessor(reader, clp.WithChunkSize(1024*1024)) // 1MB chunks

func WithCustomLineProcessor

func WithCustomLineProcessor(c LineProcessor) Option

WithCustomLineProcessor sets a custom function to process each line. The function receives a line as []byte and should return the processed line. The function must be thread-safe and should not modify external state without proper synchronization.

Example:

// Convert lines to uppercase
processor := func(line []byte) ([]byte, error) {
    return bytes.ToUpper(line), nil
}
clp.NewConcurrentLineProcessor(reader, clp.WithCustomLineProcessor(processor))

func WithMultiReaders added in v1.1.0

func WithMultiReaders(readers ...io.ReadCloser) Option

WithMultiReaders sets multiple source readers for the concurrentLineProcessor. When used, the reader passed to NewConcurrentLineProcessor can be nil because this option replaces the internal reader list.

Example:

readers := []io.ReadCloser{reader1, reader2, reader3}
clp.NewConcurrentLineProcessor(nil, clp.WithMultiReaders(readers...))

func WithRowsReadLimit

func WithRowsReadLimit(limit int) Option

WithRowsReadLimit sets a limit on the number of rows to read from the source. Use -1 for no limit (default). This is useful for processing only a subset of a large file for testing or sampling purposes.

Example:

clp.NewConcurrentLineProcessor(reader, clp.WithRowsReadLimit(1000)) // Process only first 1000 lines

func WithWorkers

func WithWorkers(n int) Option

WithWorkers sets the number of worker goroutines for concurrent processing. More workers can improve performance for CPU-intensive line processing, but may not help for I/O-bound operations. The default is runtime.NumCPU().

Example:

clp.NewConcurrentLineProcessor(reader, clp.WithWorkers(8))

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL