processing

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package processing provides the single dynamic processing engine for Pulse. It implements aggregators, attributes, filterers, and groupers that operate on record iterators backed by .pulse encoded data.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Aggregator

type Aggregator interface {
	// Aggregate computes the aggregation over the given records for the named field.
	Aggregate(records []*Record, field string) (float64, error)
}

Aggregator computes a single aggregate value from a set of records.

type AggregatorFactory

type AggregatorFactory func(agg *types.Aggregation, schema *encoding.Schema) (Aggregator, error)

AggregatorFactory creates an Aggregator from a type specification.

type AttributeComputer

type AttributeComputer interface {
	// Compute calculates derived values for all records, returning a value per record.
	// The returned slice has one entry per record. A nil second return means no null handling.
	Compute(records []*Record, field string) ([]float64, error)
}

AttributeComputer computes derived attribute values for each record.

type AttributeFactory

type AttributeFactory func(attr *types.Attribute, schema *encoding.Schema) (AttributeComputer, error)

AttributeFactory creates an AttributeComputer from a type specification.

type FilterFunc

type FilterFunc func(record *Record) (bool, error)

FilterFunc evaluates whether a record passes a filter.

type FiltererBuilder

type FiltererBuilder interface {
	// Build creates a filter function from the filter specification.
	Build(filter *types.Filterer, schema *encoding.Schema) (FilterFunc, error)
}

FiltererBuilder constructs a FilterFunc from a filter specification.

type FiltererFactory

type FiltererFactory func() FiltererBuilder

FiltererFactory creates a FiltererBuilder from a type specification.

type Grouper

type Grouper interface {
	// Group partitions the records by the specified field, returning a map of group key to records.
	Group(records []*Record, field string) (map[string][]*Record, error)
}

Grouper partitions records into named groups.

type GrouperFactory

type GrouperFactory func(grp *types.Group, schema *encoding.Schema) (Grouper, error)

GrouperFactory creates a Grouper from a type specification.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor is the single dynamic processing engine for Pulse. It handles filtering, attribute computation, grouping, and aggregation over record iterators backed by .pulse encoded data.

func NewProcessor

func NewProcessor(schema *encoding.Schema) *Processor

NewProcessor creates a new Processor for the given schema.

func (*Processor) Process

func (p *Processor) Process(ctx context.Context, req *types.Request, iter RecordIterator) (*types.Response, error)

Process executes a single request against the record iterator.

func (*Processor) ProcessComposed

func (p *Processor) ProcessComposed(ctx context.Context, composed *types.ComposedRequest, records []*Record) ([]*types.Response, error)

ProcessComposed executes multiple requests against a shared record set.

type Record

type Record struct {
	// contains filtered or unexported fields
}

Record represents a single data row with field accessors. It provides both numeric and string access for processing operations.

func NewRecord

func NewRecord(schema *encoding.Schema, values map[string]float64) *Record

NewRecord creates a record with the given schema and field values.

func NewRecordWithNulls

func NewRecordWithNulls(schema *encoding.Schema, values map[string]float64, nulls map[string]bool) *Record

NewRecordWithNulls creates a record with explicit null tracking.

func (*Record) AllValues

func (r *Record) AllValues() map[string]any

AllValues returns all field values as a map (for expression evaluation).

func (*Record) NumericValue

func (r *Record) NumericValue(name string) (float64, bool)

NumericValue returns the numeric value for the named field. Returns the value and true if present and non-null, or 0 and false if null or missing.

func (*Record) Schema

func (r *Record) Schema() *encoding.Schema

Schema returns the record's schema.

func (*Record) StringValue

func (r *Record) StringValue(name string) (string, bool)

StringValue returns the resolved string value for categorical fields. For non-categorical fields, returns the empty string and false.

type RecordIterator

type RecordIterator interface {
	// Next advances to the next record. Returns false when exhausted.
	Next() bool
	// Record returns the current record. Only valid after Next returns true.
	Record() *Record
	// Reset resets the iterator to the beginning.
	Reset()
}

RecordIterator provides sequential access to records.

type SliceIterator

type SliceIterator struct {
	// contains filtered or unexported fields
}

SliceIterator implements RecordIterator over a slice of records.

func NewSliceIterator

func NewSliceIterator(records []*Record) *SliceIterator

NewSliceIterator creates an iterator over the given records.

func (*SliceIterator) Next

func (it *SliceIterator) Next() bool

Next advances to the next record.

func (*SliceIterator) Record

func (it *SliceIterator) Record() *Record

Record returns the current record.

func (*SliceIterator) Reset

func (it *SliceIterator) Reset()

Reset resets the iterator to the beginning.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL