pipeline

package
v1.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2025 License: AGPL-3.0 Imports: 9 Imported by: 0

Documentation

Overview

Package pipeline provides a streaming data processing pipeline with memory-safe row ownership. This package addresses memory ownership issues by establishing clear ownership semantics: batches are owned by the Reader that returns them, and consumers must copy data they wish to retain.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ReturnBatch

func ReturnBatch(batch *Batch)

ReturnBatch returns a batch to the global pool for reuse. The batch should not be used after calling this function.

func ReturnPooledRow

func ReturnPooledRow(row Row)

ReturnPooledRow returns a Row map to the global pool for reuse. The row is cleared before being pooled.

func ToStringMap

func ToStringMap(row Row) map[string]any

ToStringMap converts a Row to map[string]any for compatibility with legacy interfaces.

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch is owned by the Reader that returns it. Consumers must not hold references after the next Next() call. If you must retain, copy rows you need (see copyRow / copyBatch).

The Batch reuses underlying Row maps for memory efficiency. Access rows only through the provided methods - never retain references to Row objects returned by Get() as they may be reused. Use CopyRow() if you need to retain data.

func CopyBatch

func CopyBatch(in *Batch) *Batch

CopyBatch creates a deep copy of a batch using lower-level primitives.

func GetBatch

func GetBatch() *Batch

GetBatch returns a reusable batch from the global pool. The batch is clean and ready to use.

func (*Batch) AddRow

func (b *Batch) AddRow() Row

AddRow adds a new row to the batch, reusing an existing Row map if available. Returns the Row map that should be populated. The returned Row must not be retained beyond the lifetime of this batch.

func (*Batch) DeleteRow

func (b *Batch) DeleteRow(index int)

DeleteRow marks a row as deleted without losing the underlying map. The map is preserved for future reuse.

func (*Batch) Get

func (b *Batch) Get(index int) Row

Get returns the row at the given index. The returned Row must not be retained beyond the lifetime of this batch, as it may be reused. Use CopyRow() if you need to retain the data.

func (*Batch) Len

func (b *Batch) Len() int

Len returns the number of valid rows in the batch.

func (*Batch) ReplaceRow

func (b *Batch) ReplaceRow(index int, newRow Row)

ReplaceRow replaces the row at the given index with a new row. The old row is returned to the pool and the new row takes its place. The batch takes ownership of the new row.

func (*Batch) SwapRows

func (b *Batch) SwapRows(index1, index2 int) bool

SwapRows efficiently swaps two rows within a batch without allocations. This is useful for readers that need to rearrange data without copying. Returns false if either index is invalid.

func (*Batch) TakeRow

func (b *Batch) TakeRow(index int) Row

TakeRow extracts a row from the batch and transfers ownership to the caller. The row at the given index is replaced with a fresh row from the pool. The caller is responsible for returning the row to the pool when done. Returns nil if index is invalid.

type BatchPoolStats

type BatchPoolStats struct {
	Allocations uint64
	Gets        uint64
	Puts        uint64
}

BatchPoolStats contains counters for batch pool usage.

func GlobalBatchPoolStats

func GlobalBatchPoolStats() BatchPoolStats

GlobalBatchPoolStats returns usage counters for the global batch pool.

func (BatchPoolStats) LeakedBatches

func (s BatchPoolStats) LeakedBatches() uint64

LeakedBatches returns the number of batches that were gotten but never returned.

type FlatMapReader

type FlatMapReader struct {
	// contains filtered or unexported fields
}

FlatMapReader: 1→N mapping (including 0 or many)

func FlatMap

func FlatMap(in Reader, fn func(Row, func(Row))) *FlatMapReader

func (*FlatMapReader) Close

func (f *FlatMapReader) Close() error

func (*FlatMapReader) Next

func (f *FlatMapReader) Next(ctx context.Context) (*Batch, error)

type InMemorySortReader

type InMemorySortReader struct {
	// contains filtered or unexported fields
}

InMemorySortReader buffers all rows in memory, sorts them, then returns in order

func (*InMemorySortReader) Close

func (s *InMemorySortReader) Close() error

func (*InMemorySortReader) Next

func (s *InMemorySortReader) Next(ctx context.Context) (*Batch, error)

type MapReader

type MapReader struct {
	// contains filtered or unexported fields
}

MapReader: 1→1 mapping (optionally drop by returning (nil,false))

func Filter

func Filter(in Reader, pred func(Row) bool) *MapReader

FilterReader: keep rows where pred==true (thin wrapper around Map)

func Map

func Map(in Reader, fn func(Row) (Row, bool)) *MapReader

func (*MapReader) Close

func (m *MapReader) Close() error

func (*MapReader) Next

func (m *MapReader) Next(ctx context.Context) (*Batch, error)

type Reader

type Reader interface {
	Next(ctx context.Context) (*Batch, error)
	Close() error
}

Reader is a pull-based iterator over Batches. Next returns (nil, io.EOF) when the stream ends.

func SortInMemory

func SortInMemory(reader Reader, less func(Row, Row) bool) Reader

SortInMemory creates a reader that sorts all input rows in memory

type Row

type Row = map[wkk.RowKey]any

Row represents a single data row as a map of RowKey to any value.

func CopyRow

func CopyRow(in Row) Row

CopyRow creates a deep copy of a row.

func GetPooledRow

func GetPooledRow() Row

GetPooledRow gets a clean Row map from the global pool. The caller is responsible for returning it via ReturnPooledRow when done.

type SliceSource

type SliceSource struct {
	// contains filtered or unexported fields
}

SliceSource is a tiny source for examples/tests. Swap this with your Parquet reader that yields up to batchPool.sz rows each Next().

func NewSliceSource

func NewSliceSource(data []Row) *SliceSource

func (*SliceSource) Close

func (s *SliceSource) Close() error

func (*SliceSource) Next

func (s *SliceSource) Next() (*Batch, error)

Directories

Path Synopsis
binary_codec.go provides a custom binary encoding/decoding for Row data (map[string]any) optimized for memory efficiency and performance.
binary_codec.go provides a custom binary encoding/decoding for Row data (map[string]any) optimized for memory efficiency and performance.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL