concurrentlineprocessor

package module
v1.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2025 License: MIT Imports: 13 Imported by: 1

Documentation

Overview

Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.

See reader.go for full package documentation and usage examples.

Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.

This package allows you to efficiently process large files or streams by splitting the input into chunks and processing each line concurrently using multiple goroutines.

Features

  • Concurrent processing of lines using a configurable number of workers (goroutines)
  • Custom line processor function for transforming or filtering lines
  • Metrics reporting (bytes read, rows read, processing time, etc.)
  • Optional row read limit

Basic Usage

import (
    "os"
    clp "github.com/anvesh9652/concurrent-line-processor"
)

f, err := os.Open("largefile.txt")
clp.ExitOnError(err)
defer f.Close()
pr := clp.NewConcurrentLineProcessor(f, clp.WithWorkers(4), clp.WithChunkSize(1024*1024))
output, err := io.ReadAll(pr)
clp.ExitOnError(err)
fmt.Println(string(output))

Custom Line Processing

pr := clp.NewConcurrentLineProcessor(f, clp.WithCustomLineProcessor(func(line []byte) ([]byte, error) {
    // Transform or filter the line
    return bytes.ToUpper(line), nil
}))

Metrics

metrics := pr.Metrics()
fmt.Printf("Rows read: %d, Bytes read: %d, Time took: %s\n", metrics.RowsRead, metrics.BytesRead, metrics.TimeTook)

For more advanced usage, see the examples/ directory.

Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.

See reader.go for full package documentation and usage examples.

Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.

See reader.go for full package documentation and usage examples.

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultChunkSize = 1024 * 30 // 30 KB

	// DefaultWorkers is the number of goroutines used for processing chunks
	DefaultWorkers = runtime.NumCPU()
)
View Source
var Files = []string{
	"/Users/agali/go-workspace/src/github.com/anvesh9652/concurrent-line-processor/data/temp_example.csv",
	"/Users/agali/go-workspace/src/github.com/anvesh9652/concurrent-line-processor/tmp/2024-06-04-details.jsonl",
	"/Users/agali/go-workspace/src/github.com/anvesh9652/concurrent-line-processor/tmp/transform-00002_1.csv.jsonl",
	"/Users/agali/Desktop/Work/go-lang/tryouts/1brc/src_data.txt",
}

Test files List

Functions

func ErrWithDebugStack

func ErrWithDebugStack(err error) error

func ExitOnError added in v1.0.3

func ExitOnError(err error)

func IfNull added in v1.0.3

func IfNull[T any](org *T, def T) T

func PrintAsJsonString

func PrintAsJsonString(d any)

func WithNewLine

func WithNewLine(data []byte) []byte

func WithOpts

func WithOpts(p *ConcurrentLineProcessor, opts ...Option)

WithOpts applies the given options to the ParallelReader.

Types

type Chunk

type Chunk struct {
	// contains filtered or unexported fields
}

type ConcurrentLineProcessor

type ConcurrentLineProcessor struct {
	// contains filtered or unexported fields
}

func NewConcurrentLineProcessor

func NewConcurrentLineProcessor(r io.Reader, opts ...Option) *ConcurrentLineProcessor

NewConcurrentLineProcessor creates a new ConcurrentLineProcessor that reads from the provided io.Reader.

func (*ConcurrentLineProcessor) Metrics

func (p *ConcurrentLineProcessor) Metrics() Metrics

func (*ConcurrentLineProcessor) Read

func (p *ConcurrentLineProcessor) Read(b []byte) (int, error)

func (*ConcurrentLineProcessor) RowsRead

func (p *ConcurrentLineProcessor) RowsRead() int

type LineProcessor

type LineProcessor func([]byte) ([]byte, error)

type Metrics

type Metrics struct {
	// BytesRead is the total number of bytes read from the source reader.
	// When RowsReadLimit is set, it might read more bytes than the transformed bytes.
	BytesRead int64 `json:"bytes_read"`
	// TransformedBytes is the total number of bytes after processing each line.
	TransformedBytes int64 `json:"transformed_bytes"`
	// RowsRead is the total number of rows read from the source reader.
	RowsRead int64 `json:"rows_read"`
	// TimeTook is the total time taken to read and process the data.
	TimeTook string `json:"time_took"`
}

type Option

type Option func(*ConcurrentLineProcessor)

func WithChunkSize

func WithChunkSize(size int) Option

WithChunkSize sets the chunk size for the ParallelReader.

func WithCustomLineProcessor

func WithCustomLineProcessor(c LineProcessor) Option

WithCustomLineProcessor sets a custom line processor for the ParallelReader.

func WithRowsReadLimit

func WithRowsReadLimit(limit int) Option

WithRowsReadLimit sets the row read limit for the ParallelReader.

func WithWorkers

func WithWorkers(n int) Option

WithWorkers sets the number of workers for the ParallelReader.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL