Documentation
¶
Overview ¶
Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.
See reader.go for full package documentation and usage examples.
Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.
This package allows you to efficiently process large files or streams by splitting the input into chunks and processing each line concurrently using multiple goroutines.
Features ¶
- Concurrent processing of lines using a configurable number of workers (goroutines)
- Custom line processor function for transforming or filtering lines
- Metrics reporting (bytes read, rows read, processing time, etc.)
- Optional row read limit
Basic Usage ¶
import (
"os"
clp "github.com/anvesh9652/concurrent-line-processor"
)
f, err := os.Open("largefile.txt")
clp.ExitOnError(err)
defer f.Close()
pr := clp.NewConcurrentLineProcessor(f, clp.WithWorkers(4), clp.WithChunkSize(1024*1024))
output, err := io.ReadAll(pr)
clp.ExitOnError(err)
fmt.Println(string(output))
Custom Line Processing ¶
pr := clp.NewConcurrentLineProcessor(f, clp.WithCustomLineProcessor(func(line []byte) ([]byte, error) {
// Transform or filter the line
return bytes.ToUpper(line), nil
}))
Metrics ¶
metrics := pr.Metrics()
fmt.Printf("Rows read: %d, Bytes read: %d, Time took: %s\n", metrics.RowsRead, metrics.BytesRead, metrics.TimeTook)
For more advanced usage, see the examples/ directory.
Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.
See reader.go for full package documentation and usage examples.
Package concurrentlineprocessor provides a high-performance, concurrent line-by-line processor for large files or streams.
See reader.go for full package documentation and usage examples.
Index ¶
- Variables
- func ErrWithDebugStack(err error) error
- func ExitOnError(err error)
- func IfNull[T any](org *T, def T) T
- func PrintAsJsonString(d any)
- func WithNewLine(data []byte) []byte
- func WithOpts(p *ConcurrentLineProcessor, opts ...Option)
- type Chunk
- type ConcurrentLineProcessor
- type LineProcessor
- type Metrics
- type Option
Constants ¶
This section is empty.
Variables ¶
var ( DefaultChunkSize = 1024 * 30 // 30 KB // DefaultWorkers is the number of goroutines used for processing chunks DefaultWorkers = runtime.NumCPU() )
var Files = []string{
"/Users/agali/go-workspace/src/github.com/anvesh9652/concurrent-line-processor/data/temp_example.csv",
"/Users/agali/go-workspace/src/github.com/anvesh9652/concurrent-line-processor/tmp/2024-06-04-details.jsonl",
"/Users/agali/go-workspace/src/github.com/anvesh9652/concurrent-line-processor/tmp/transform-00002_1.csv.jsonl",
"/Users/agali/Desktop/Work/go-lang/tryouts/1brc/src_data.txt",
}
Test files List
Functions ¶
func ErrWithDebugStack ¶
func ExitOnError ¶ added in v1.0.3
func ExitOnError(err error)
func PrintAsJsonString ¶
func PrintAsJsonString(d any)
func WithNewLine ¶
func WithOpts ¶
func WithOpts(p *ConcurrentLineProcessor, opts ...Option)
WithOpts applies the given options to the ParallelReader.
Types ¶
type ConcurrentLineProcessor ¶
type ConcurrentLineProcessor struct {
// contains filtered or unexported fields
}
func NewConcurrentLineProcessor ¶
func NewConcurrentLineProcessor(r io.Reader, opts ...Option) *ConcurrentLineProcessor
NewConcurrentLineProcessor creates a new ConcurrentLineProcessor that reads from the provided io.Reader.
func (*ConcurrentLineProcessor) Metrics ¶
func (p *ConcurrentLineProcessor) Metrics() Metrics
func (*ConcurrentLineProcessor) Read ¶
func (p *ConcurrentLineProcessor) Read(b []byte) (int, error)
func (*ConcurrentLineProcessor) RowsRead ¶
func (p *ConcurrentLineProcessor) RowsRead() int
type LineProcessor ¶
type Metrics ¶
type Metrics struct {
// BytesRead is the total number of bytes read from the source reader.
// When RowsReadLimit is set, it might read more bytes than the transformed bytes.
BytesRead int64 `json:"bytes_read"`
// TransformedBytes is the total number of bytes after processing each line.
TransformedBytes int64 `json:"transformed_bytes"`
// RowsRead is the total number of rows read from the source reader.
RowsRead int64 `json:"rows_read"`
// TimeTook is the total time taken to read and process the data.
TimeTook string `json:"time_took"`
}
type Option ¶
type Option func(*ConcurrentLineProcessor)
func WithChunkSize ¶
WithChunkSize sets the chunk size for the ParallelReader.
func WithCustomLineProcessor ¶
func WithCustomLineProcessor(c LineProcessor) Option
WithCustomLineProcessor sets a custom line processor for the ParallelReader.
func WithRowsReadLimit ¶
WithRowsReadLimit sets the row read limit for the ParallelReader.
func WithWorkers ¶
WithWorkers sets the number of workers for the ParallelReader.