 Documentation
      ¶
      Documentation
      ¶
    
    
  
    
  
    Overview ¶
Package pipeline provides a streaming data processing pipeline with memory-safe row ownership. This package addresses memory ownership issues by establishing clear ownership semantics: batches are owned by the Reader that returns them, and consumers must copy data they wish to retain.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ReturnBatch ¶
func ReturnBatch(batch *Batch)
ReturnBatch returns a batch to the global pool for reuse. The batch should not be used after calling this function.
func ReturnPooledRow ¶
func ReturnPooledRow(row Row)
ReturnPooledRow returns a Row map to the global pool for reuse. The row is cleared before being pooled.
func ToStringMap ¶
ToStringMap converts a Row to map[string]any for compatibility with legacy interfaces.
Types ¶
type Batch ¶
type Batch struct {
	// contains filtered or unexported fields
}
    Batch is owned by the Reader that returns it. Consumers must not hold references after the next Next() call. If you must retain, copy rows you need (see copyRow / copyBatch).
The Batch reuses underlying Row maps for memory efficiency. Access rows only through the provided methods - never retain references to Row objects returned by Get() as they may be reused. Use CopyRow() if you need to retain data.
func GetBatch ¶
func GetBatch() *Batch
GetBatch returns a reusable batch from the global pool. The batch is clean and ready to use.
func (*Batch) AddRow ¶
AddRow adds a new row to the batch, reusing an existing Row map if available. Returns the Row map that should be populated. The returned Row must not be retained beyond the lifetime of this batch.
func (*Batch) DeleteRow ¶
DeleteRow marks a row as deleted without losing the underlying map. The map is preserved for future reuse.
func (*Batch) Get ¶
Get returns the row at the given index. The returned Row must not be retained beyond the lifetime of this batch, as it may be reused. Use CopyRow() if you need to retain the data.
func (*Batch) ReplaceRow ¶
ReplaceRow replaces the row at the given index with a new row. The old row is returned to the pool and the new row takes its place. The batch takes ownership of the new row.
func (*Batch) SwapRows ¶
SwapRows efficiently swaps two rows within a batch without allocations. This is useful for readers that need to rearrange data without copying. Returns false if either index is invalid.
type BatchPoolStats ¶
BatchPoolStats contains counters for batch pool usage.
func GlobalBatchPoolStats ¶
func GlobalBatchPoolStats() BatchPoolStats
GlobalBatchPoolStats returns usage counters for the global batch pool.
func (BatchPoolStats) LeakedBatches ¶
func (s BatchPoolStats) LeakedBatches() uint64
LeakedBatches returns the number of batches that were gotten but never returned.
type FlatMapReader ¶
type FlatMapReader struct {
	// contains filtered or unexported fields
}
    FlatMapReader: 1→N mapping (including 0 or many)
func (*FlatMapReader) Close ¶
func (f *FlatMapReader) Close() error
type InMemorySortReader ¶
type InMemorySortReader struct {
	// contains filtered or unexported fields
}
    InMemorySortReader buffers all rows in memory, sorts them, then returns in order
func (*InMemorySortReader) Close ¶
func (s *InMemorySortReader) Close() error
type MapReader ¶
type MapReader struct {
	// contains filtered or unexported fields
}
    MapReader: 1→1 mapping (optionally drop by returning (nil,false))
type Reader ¶
Reader is a pull-based iterator over Batches. Next returns (nil, io.EOF) when the stream ends.
type Row ¶
Row represents a single data row as a map of RowKey to any value.
func GetPooledRow ¶
func GetPooledRow() Row
GetPooledRow gets a clean Row map from the global pool. The caller is responsible for returning it via ReturnPooledRow when done.
type SliceSource ¶
type SliceSource struct {
	// contains filtered or unexported fields
}
    SliceSource is a tiny source for examples/tests. Swap this with your Parquet reader that yields up to batchPool.sz rows each Next().
func NewSliceSource ¶
func NewSliceSource(data []Row) *SliceSource
func (*SliceSource) Close ¶
func (s *SliceSource) Close() error
func (*SliceSource) Next ¶
func (s *SliceSource) Next() (*Batch, error)
       Source Files
      ¶
      Source Files
      ¶
    
  
       Directories
      ¶
      Directories
      ¶
    
    | Path | Synopsis | 
|---|---|
| binary_codec.go provides a custom binary encoding/decoding for Row data (map[string]any) optimized for memory efficiency and performance. | binary_codec.go provides a custom binary encoding/decoding for Row data (map[string]any) optimized for memory efficiency and performance. |