engine

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2026 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Filter

func Filter(rec arrow.Record, filter arrow.Array) (arrow.Record, error)

func Select

func Select(rec arrow.Record, cols ...string) arrow.Record

Select returns a new record with a subset of columns.

Types

type Aggregation

type Aggregation string

Aggregation is an enumeration of aggregation functions.

const (
	// Sum is the sum aggregation.
	Sum Aggregation = "SUM"
	// Mean is the mean aggregation.
	Mean Aggregation = "MEAN"
	// Count is the count aggregation.
	Count Aggregation = "COUNT"
)

type AlignedAllocator

type AlignedAllocator struct {
	// contains filtered or unexported fields
}

AlignedAllocator is a memory allocator that ensures memory is aligned to a specific boundary. This is crucial for performance with certain hardware instructions (e.g., SIMD).

func NewAlignedAllocator

func NewAlignedAllocator(pool memory.Allocator, alignment int) *AlignedAllocator

NewAlignedAllocator creates a new AlignedAllocator.

func (*AlignedAllocator) Allocate

func (a *AlignedAllocator) Allocate(size int) []byte

Allocate allocates a slice of memory with the specified alignment.

func (*AlignedAllocator) Free

func (a *AlignedAllocator) Free(b []byte)

Free frees a slice of memory.

func (*AlignedAllocator) Reallocate

func (a *AlignedAllocator) Reallocate(size int, b []byte) []byte

Reallocate reallocates a slice of memory.

type DataType

type DataType interface {
	ID() arrow.Type
}

DataType is an interface for Arrow-like columnar data types.

type Hook

type Hook interface {
	// Execute is called at a specific point in the engine's execution.
	Execute(context.Context, interface{}) error
}

Hook is an interface for extending the engine's functionality.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline represents a series of processing stages.

func NewPipeline

func NewPipeline() *Pipeline

NewPipeline creates a new pipeline.

func (*Pipeline) AddStage

func (p *Pipeline) AddStage(s Stage)

AddStage adds a processing stage to the pipeline.

func (*Pipeline) Execute

func (p *Pipeline) Execute(ctx context.Context, in <-chan arrow.Record) <-chan arrow.Record

Execute runs the pipeline.

type Predicate

type Predicate interface {
	// Eval evaluates the predicate on the given record and returns a boolean array
	// indicating which rows match the condition.
	Eval(rec arrow.Record) (arrow.Array, error)
}

Predicate is an interface for evaluating a filter condition on a record.

func NewPredicate

func NewPredicate(expression string) (Predicate, error)

NewPredicate creates a new predicate from a filter expression string. It supports simple binary expressions and AND (&&) / OR (||) operators.

type Reader

type Reader interface {
	Read() (arrow.Record, error)
	Release()
}

Reader is the interface for reading columnar data.

type Record

type Record struct {
	arrow.Record
}

Record represents a chunk of columnar data. A stream of Records forms a chunked, streaming data layout.

func NewRecord

func NewRecord(rec arrow.Record) *Record

NewRecord creates a new Record from an arrow.Record.

func (*Record) Release

func (r *Record) Release()

Release releases the memory held by the Record.

type Stage

type Stage func(ctx context.Context, in <-chan arrow.Record) <-chan arrow.Record

Stage is a function that processes a stream of records.

type Stream

type Stream interface {
	Reader
	Writer
}

Stream is the interface for streaming columnar data.

type StreamingPipeline

type StreamingPipeline struct {
	// contains filtered or unexported fields
}

StreamingPipeline represents a streaming pipeline.

func NewStreamingPipeline

func NewStreamingPipeline(in <-chan arrow.Record) *StreamingPipeline

NewStreamingPipeline creates a new streaming pipeline.

func (*StreamingPipeline) Aggregate

func (sp *StreamingPipeline) Aggregate(agg Aggregation, column string) *StreamingPipeline

Aggregate adds an aggregation stage to the pipeline.

func (*StreamingPipeline) Execute

func (sp *StreamingPipeline) Execute(ctx context.Context) <-chan arrow.Record

Execute runs the streaming pipeline.

func (*StreamingPipeline) Filter

func (sp *StreamingPipeline) Filter(expression string) *StreamingPipeline

Filter adds a filter stage to the pipeline.

func (*StreamingPipeline) Project

func (sp *StreamingPipeline) Project(columns []string) *StreamingPipeline

Project adds a projection stage to the pipeline.

type Writer

type Writer interface {
	Write(arrow.Record) error
	Close() error
}

Writer is the interface for writing columnar data.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL