Versions in this module Expand all Collapse all v0 v0.1.1 Sep 20, 2025 v0.1.0 Sep 4, 2025 Changes in this version + func DeepCopyAny(val any) any + func DeepEqual(a, b Document) (bool, error) + type AddOp struct + func NewAdd() *AddOp + func (n *AddOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error) + type AggregateInput struct + Key any + Values []any + type BaseOp struct + func NewBaseOp(name string, arity int) BaseOp + func (n *BaseOp) Arity() int + type BinaryJoinOp struct + func NewBinaryJoin(eval Evaluator, inputs []string) *BinaryJoinOp + func (n *BinaryJoinOp) HasZeroPreservationProperty() bool + func (n *BinaryJoinOp) IsTimeInvariant() bool + func (n *BinaryJoinOp) OpType() OperatorType + func (n *BinaryJoinOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error) + type ChainGraph struct + func NewChainGraph() *ChainGraph + func (g *ChainGraph) AddInput(op *InputOp) string + func (g *ChainGraph) AddToChain(op Operator) string + func (g *ChainGraph) Arity() int + func (g *ChainGraph) GetChain() []Operator + func (g *ChainGraph) GetInput(name string) *InputOp + func (g *ChainGraph) GetStartNode() string + func (g *ChainGraph) SetJoin(op Operator) string + func (g *ChainGraph) String() string + func (g *ChainGraph) Validate() error + type ConstantOp struct + func NewConstant(value *DocumentZSet, name string) *ConstantOp + func (n *ConstantOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error) + type DelayOp struct + func NewDelay() *DelayOp + func (n *DelayOp) HasZeroPreservationProperty() bool + func (n *DelayOp) IsTimeInvariant() bool + func (n *DelayOp) OpType() OperatorType + func (n *DelayOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error) + func (n *DelayOp) Reset() + type DeltaZSet = map[string]*DocumentZSet + type DifferentiatorOp struct + func NewDifferentiator() *DifferentiatorOp + func (n *DifferentiatorOp) HasZeroPreservationProperty() bool + func (n *DifferentiatorOp) IsTimeInvariant() bool + func (n *DifferentiatorOp) OpType() OperatorType + func (n *DifferentiatorOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error) + func (n *DifferentiatorOp) Reset() + type DistinctOp struct + func NewDistinct() *DistinctOp + func (n *DistinctOp) HasZeroPreservationProperty() bool + func (n *DistinctOp) IsTimeInvariant() bool + func (n *DistinctOp) OpType() OperatorType + func (n *DistinctOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error) + type DistinctOptimizationRule struct + func (r *DistinctOptimizationRule) Apply(graph *ChainGraph) error + func (r *DistinctOptimizationRule) CanApply(graph *ChainGraph) bool + func (r *DistinctOptimizationRule) Name() string + type Document = map[string]any + func DeepCopyDocument(val any) Document + func NewDocument() Document + type DocumentEntry struct + Document Document + Multiplicity int + type DocumentZSet struct + func FromDocuments(docs []Document) (*DocumentZSet, error) + func NewDocumentZSet() *DocumentZSet + func SingletonZSet(doc Document) (*DocumentZSet, error) + func (dz *DocumentZSet) Add(other *DocumentZSet) (*DocumentZSet, error) + func (dz *DocumentZSet) AddDocument(doc Document, count int) (*DocumentZSet, error) + func (dz *DocumentZSet) AddDocumentMutate(doc Document, count int) error + func (dz *DocumentZSet) Contains(doc Document) (bool, error) + func (dz *DocumentZSet) DeepCopy() *DocumentZSet + func (dz *DocumentZSet) Distinct() (*DocumentZSet, error) + func (dz *DocumentZSet) GetDocuments() ([]Document, error) + func (dz *DocumentZSet) GetMultiplicity(doc Document) (int, error) + func (dz *DocumentZSet) GetUniqueDocuments() ([]Document, error) + func (dz *DocumentZSet) IsZero() bool + func (dz *DocumentZSet) List() ([]DocumentEntry, error) + func (dz *DocumentZSet) ShallowCopy() *DocumentZSet + func (dz *DocumentZSet) Size() int + func (dz *DocumentZSet) String() string + func (dz *DocumentZSet) Subtract(other *DocumentZSet) (*DocumentZSet, error) + func (dz *DocumentZSet) TotalSize() int + func (dz *DocumentZSet) Unique() (*DocumentZSet, error) + func (dz *DocumentZSet) UniqueCount() int + type Evaluator interface + Evaluate func(Document) ([]Document, error) + type Executor struct + func NewExecutor(graph *ChainGraph, log logr.Logger) (*Executor, error) + func (e *Executor) GetExecutionPlan() string + func (e *Executor) GetNodeResult(nodeID string, deltaInputs map[string]*DocumentZSet) (*DocumentZSet, error) + func (e *Executor) ProcessDelta(deltaInputs DeltaZSet) (*DocumentZSet, error) + func (e *Executor) Reset() + type Extractor interface + Extract func(Document) (any, error) + type FusedOp struct + func NewFusedOp(nodes []Operator, name string) (*FusedOp, error) + func (n *FusedOp) HasZeroPreservationProperty() bool + func (n *FusedOp) IsTimeInvariant() bool + func (n *FusedOp) OpType() OperatorType + func (n *FusedOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error) + type GatherOp struct + func NewGather(keyExtractor, valueExtractor Extractor, aggregator Transformer) *GatherOp + func (op *GatherOp) HasZeroPreservationProperty() bool + func (op *GatherOp) IsTimeInvariant() bool + func (op *GatherOp) OpType() OperatorType + func (op *GatherOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error) + type GraphNode struct + ID string + Inputs []*GraphNode + Op Operator + Output *GraphNode + type GroupData struct + Document Document + Key any + Values []any + type IncrementalBinaryJoinOp struct + func NewIncrementalBinaryJoin(eval Evaluator, inputs []string) *IncrementalBinaryJoinOp + func (op *IncrementalBinaryJoinOp) HasZeroPreservationProperty() bool + func (op *IncrementalBinaryJoinOp) IsTimeInvariant() bool + func (op *IncrementalBinaryJoinOp) OpType() OperatorType + func (op *IncrementalBinaryJoinOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error) + func (op *IncrementalBinaryJoinOp) Reset() + type IncrementalExecutionContext struct + func NewIncrementalExecutionContext(executor *Executor) *IncrementalExecutionContext + func (ctx *IncrementalExecutionContext) GetCumulativeOutput() *DocumentZSet + func (ctx *IncrementalExecutionContext) ProcessDelta(deltaInputs map[string]*DocumentZSet) (*DocumentZSet, error) + func (ctx *IncrementalExecutionContext) Reset() + type IncrementalGatherOp struct + func NewIncrementalGather(keyExtractor, valueExtractor Extractor, aggregator Transformer) *IncrementalGatherOp + func (op *IncrementalGatherOp) HasZeroPreservationProperty() bool + func (op *IncrementalGatherOp) IsTimeInvariant() bool + func (op *IncrementalGatherOp) OpType() OperatorType + func (op *IncrementalGatherOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error) + func (op *IncrementalGatherOp) Reset() + type IncrementalJoinOp struct + func NewIncrementalJoin(eval Evaluator, inputs []string) *IncrementalJoinOp + func (op *IncrementalJoinOp) HasZeroPreservationProperty() bool + func (op *IncrementalJoinOp) IsTimeInvariant() bool + func (op *IncrementalJoinOp) OpType() OperatorType + func (op *IncrementalJoinOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error) + type InputOp struct + func NewInput(name string) *InputOp + func (n *InputOp) HasZeroPreservationProperty() bool + func (n *InputOp) IsTimeInvariant() bool + func (n *InputOp) Name() string + func (n *InputOp) OpType() OperatorType + func (n *InputOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error) + func (n *InputOp) SetData(data *DocumentZSet) + type IntegrationDifferentiationEliminationRule struct + func (r *IntegrationDifferentiationEliminationRule) Apply(graph *ChainGraph) error + func (r *IntegrationDifferentiationEliminationRule) CanApply(graph *ChainGraph) bool + func (r *IntegrationDifferentiationEliminationRule) Name() string + type IntegratorOp struct + func NewIntegrator() *IntegratorOp + func (n *IntegratorOp) HasZeroPreservationProperty() bool + func (n *IntegratorOp) IsTimeInvariant() bool + func (n *IntegratorOp) OpType() OperatorType + func (n *IntegratorOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error) + func (n *IntegratorOp) Reset() + type JoinIncrementalizationRule struct + func (r *JoinIncrementalizationRule) Apply(graph *ChainGraph) error + func (r *JoinIncrementalizationRule) CanApply(graph *ChainGraph) bool + func (r *JoinIncrementalizationRule) Name() string + type JoinOp struct + func NewJoin(eval Evaluator, inputs []string) *JoinOp + func (op *JoinOp) HasZeroPreservationProperty() bool + func (op *JoinOp) IsTimeInvariant() bool + func (op *JoinOp) OpType() OperatorType + func (op *JoinOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error) + type LinearChainIncrementalizationRule struct + func (r *LinearChainIncrementalizationRule) Apply(graph *ChainGraph) error + func (r *LinearChainIncrementalizationRule) CanApply(graph *ChainGraph) bool + func (r *LinearChainIncrementalizationRule) Name() string + type LinearChainRewriteEngine struct + func NewLinearChainRewriteEngine() *LinearChainRewriteEngine + func (re *LinearChainRewriteEngine) AddRule(rule LinearChainRule) + func (re *LinearChainRewriteEngine) Optimize(graph *ChainGraph) error + type LinearChainRule interface + Apply func(graph *ChainGraph) error + CanApply func(graph *ChainGraph) bool + Name func() string + type LinearOperatorFusionRule struct + func (r *LinearOperatorFusionRule) Apply(graph *ChainGraph) error + func (r *LinearOperatorFusionRule) CanApply(graph *ChainGraph) bool + func (r *LinearOperatorFusionRule) Name() string + type NegateOp struct + func NewNegate() *NegateOp + func (n *NegateOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error) + type Operator interface + Arity func() int + HasZeroPreservationProperty func() bool + IsTimeInvariant func() bool + OpType func() OperatorType + Process func(inputs ...*DocumentZSet) (*DocumentZSet, error) + func FuseFilterProject(filter *SelectionOp, project *ProjectionOp) (Operator, error) + func IncrementalizeOp(in Operator) (Operator, bool) + type OperatorType int + const OpTypeBilinear + const OpTypeLinear + const OpTypeNonLinear + const OpTypeStructural + type ProjectThenSelectOp struct + func NewProjectThenSelect(projEval, selEval Evaluator) *ProjectThenSelectOp + func (op *ProjectThenSelectOp) HasZeroPreservationProperty() bool + func (op *ProjectThenSelectOp) IsTimeInvariant() bool + func (op *ProjectThenSelectOp) OpType() OperatorType + func (op *ProjectThenSelectOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error) + type ProjectionOp struct + func NewProjection(eval Evaluator) *ProjectionOp + func (n *ProjectionOp) HasZeroPreservationProperty() bool + func (n *ProjectionOp) IsTimeInvariant() bool + func (n *ProjectionOp) OpType() OperatorType + func (n *ProjectionOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error) + type SelectThenProjectionsOp struct + func NewSelectThenProjections(selEval Evaluator, projEvals []Evaluator) *SelectThenProjectionsOp + func (op *SelectThenProjectionsOp) HasZeroPreservationProperty() bool + func (op *SelectThenProjectionsOp) IsTimeInvariant() bool + func (op *SelectThenProjectionsOp) OpType() OperatorType + func (op *SelectThenProjectionsOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error) + type SelectionOp struct + func NewSelection(eval Evaluator) *SelectionOp + func (n *SelectionOp) HasZeroPreservationProperty() bool + func (n *SelectionOp) IsTimeInvariant() bool + func (n *SelectionOp) OpType() OperatorType + func (n *SelectionOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error) + type SubtractOp struct + func NewSubtract() *SubtractOp + func (n *SubtractOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error) + type Transformer interface + Transform func(Document, any) (Document, error) + type UnwindOp struct + func NewUnwind(arrayExtractor Extractor, transformer Transformer) *UnwindOp + func (op *UnwindOp) HasZeroPreservationProperty() bool + func (op *UnwindOp) IsTimeInvariant() bool + func (op *UnwindOp) OpType() OperatorType + func (op *UnwindOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error) + type ZSetError struct + Cause error + Message string + func (e *ZSetError) Error() string