Documentation
¶
Overview ¶
Package compute holds the chunk-level Arrow kernels that back Cosma's eager DataFrame operations: expression evaluation, filtering, and (in time) arithmetic, aggregation, and hashing. It depends only on Arrow and the public expression tree — never on the dataframe package — so eager operations call into it without an import cycle.
Index ¶
- func BoxedValues(chunked *arrow.Chunked) ([]any, error)
- func BuildArray(dtype arrow.DataType, vals []any, mem memory.Allocator) (arrow.Array, error)
- func Eval(e expr.ExprNode, rec arrow.Record, mem memory.Allocator) (arrow.Array, error)
- func EvalParallel(ctx context.Context, predicate expr.ExprNode, batches []arrow.Record, ...) ([]arrow.Record, error)
- func FilterRecord(rec arrow.Record, mask arrow.Array, mem memory.Allocator) (arrow.Record, error)
- func Parallelism() int
- func RegisterBinaryKernel(typeID arrow.Type, k BinaryKernel)
- func RegisterUnaryKernel(typeID arrow.Type, k UnaryKernel)
- func SetParallelism(n int)
- func SortIndices(chunked *arrow.Chunked, descending, nullsFirst bool) ([]int64, error)
- func SortIndicesMulti(keys []SortKey) ([]int64, error)
- func Take(chunked *arrow.Chunked, indices []int64, mem memory.Allocator) (arrow.Array, error)
- type Aggregates
- type BinaryKernel
- type OpMetrics
- type SortKey
- type UnaryKernel
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BoxedValues ¶
BoxedValues reads a chunked column into one boxed Go value per row, in row order, with nil for nulls. It backs group-key construction where the key type is only known at runtime.
func BuildArray ¶
BuildArray materializes a slice of boxed values into an Arrow array of the given type. A nil element becomes a null. It is the inverse of BoxedValues and is used to emit group keys and reduction results.
func Eval ¶
Eval evaluates an expression tree against a single record batch and returns a newly-allocated arrow.Array of length rec.NumRows().
Ownership: the caller owns the returned array and must Release it. rec and its columns are not released. Every intermediate array allocated while recursing into children is released before Eval returns.
func EvalParallel ¶
func EvalParallel( ctx context.Context, predicate expr.ExprNode, batches []arrow.Record, mem memory.Allocator, ) ([]arrow.Record, error)
EvalParallel evaluates predicate and applies FilterRecord across each record batch produced by iter, fanning the work out over up to workers() goroutines. The output slices are returned in the same order as the input batches.
It is the parallel counterpart of the serial loop in dataframe.Filter; the dataframe package calls it when Parallelism() > 1.
Ownership: each returned arrow.Record is newly allocated and owned by the caller. ctx cancellation causes an early return; already-filtered records are released before returning the context error.
func FilterRecord ¶
FilterRecord returns a new record holding only the rows for which mask is true. mask must be a boolean array of length rec.NumRows(); a null mask entry is treated as false and drops the row.
Ownership: the caller owns the returned record and must Release it. rec and mask are not released.
func Parallelism ¶
func Parallelism() int
Parallelism returns the configured degree of parallelism. 0 means "GOMAXPROCS at call time."
func RegisterBinaryKernel ¶
func RegisterBinaryKernel(typeID arrow.Type, k BinaryKernel)
RegisterBinaryKernel installs a binary kernel for the given Arrow type ID. A later registration for the same type ID replaces the earlier one.
func RegisterUnaryKernel ¶
func RegisterUnaryKernel(typeID arrow.Type, k UnaryKernel)
RegisterUnaryKernel installs a unary kernel for the given Arrow type ID. A later registration for the same type ID replaces the earlier one.
func SetParallelism ¶
func SetParallelism(n int)
SetParallelism sets the number of goroutines that EvalParallel and GroupReduceParallel will use. A value of 0 (the default) means GOMAXPROCS is queried at each call, so it tracks runtime changes. A value of 1 disables all parallelism (serial execution). Negative values are clamped to 1.
func SortIndices ¶
SortIndices returns a stable permutation of row indices that orders chunked by value. By default nulls sort last; set nullsFirst to place them first. The result is a take/gather permutation: reorder every column of a DataFrame by it to keep rows aligned.
func SortIndicesMulti ¶
SortIndicesMulti returns a stable permutation ordering rows lexicographically by keys: keys[0] is most significant, ties broken by keys[1], and so on. Each key carries its own direction and null placement. All key columns must share the same logical length.
func Take ¶
Take reorders a chunked array by row indices, returning a new single-chunk array. indices select logical rows across all chunks of chunked: index i refers to the i-th row in chunk order. A null at a selected row is preserved as a null in the output.
Ownership: the caller owns the returned array and must Release it. chunked is not retained or released.
Take is the shared gather primitive behind Sort (reorder by a permutation) and Join (gather matched rows). Indices may be negative to emit a null at that output position; this lets a left/outer join fill unmatched right-side rows with nulls.
Types ¶
type Aggregates ¶
Aggregates holds the reductions for one numeric column, computed in a single pass. Sum, Min and Max are boxed in the column's element type and are nil when Count == 0 (empty or all-null). Mean is valid only when Count > 0.
func GroupReduce ¶
GroupReduce folds a chunked column into per-group Aggregates. groupIDs holds one group index per logical row, in row order across chunks, so it must have length chunked.Len(); numGroups is the number of distinct groups. The result is indexed by group id. It is the per-column half of a two-phase GroupBy: the caller assigns group ids once and reduces every value column against them.
func GroupReduceParallel ¶
func GroupReduceParallel(groupIDs []int, numGroups int, chunked *arrow.Chunked) ([]Aggregates, error)
GroupReduceParallel is the parallel version of GroupReduce. It partitions the flat row range [0, len(groupIDs)) into up to workers() equal-sized strips, reduces each strip independently, then merges the partial results.
The two-phase approach is safe because Aggregates fields (Sum, Count, Min, Max) are commutative and associative: partial sums add, counts add, min/max take the extremum.
Output: a []Aggregates of length numGroups in the same format as GroupReduce.
type BinaryKernel ¶
type BinaryKernel func(op expr.BinaryOp, left, right arrow.Array, mem memory.Allocator) (arrow.Array, error)
BinaryKernel evaluates a binary op over two same-length arrays of a custom Arrow type, returning a newly-allocated result the caller owns.
type OpMetrics ¶
type OpMetrics struct {
// Op is a short human-readable label for the operation (e.g. "filter",
// "groupby").
Op string
// Workers is the number of goroutines that ran.
Workers int
// Rows is the number of logical rows processed.
Rows int64
// Elapsed is the wall-clock time from start of fan-out to completion of
// all workers.
Elapsed time.Duration
}
OpMetrics captures timing and throughput for a single parallel kernel invocation. It is updated by EvalParallel and GroupReduceParallel and can be read via LastMetrics().
func LastMetrics ¶
func LastMetrics() OpMetrics
LastMetrics returns the OpMetrics recorded by the most recent parallel kernel call in this process. It is a snapshot; concurrent calls may overwrite it.