Documentation
¶
Overview ¶
Package dbsp (partially) implements Database Stream Processing (DBSP) operators for incremental computation on Z-sets (multisets with integer multiplicities). See detailed documentation in https://mihaibudiu.github.io/work/dbsp-spec.pdf.
DBSP provides the theoretical foundation for Δ-controller's incremental view maintenance. It represents data as Z-sets where each document has an associated multiplicity, enabling efficient incremental processing of insertions, updates, and deletions.
Key components:
- DocumentZSet: Core Z-set implementation for document collections.
- Operator: Interface for DBSP operators (linear, bilinear, nonlinear ops).
- Executor: Orchestrates operator chains and manages incremental computation.
- ChainGraph: Represents computation graphs with optimization support.
- RewriteEngine: Performs operator fusion and optimization.
Operator types:
- Linear: Selection, projection, aggregation (preserve zero, commute with addition).
- Bilinear: Join operations (multiplication-like semantics).
- Nonlinear: Complex transformations that don't preserve linearity.
The DBSP implementation supports incremental view maintenance (IVM) with O(|changes|) complexity and operator fusion for performance optimization.
Example usage:
zset := dbsp.NewDocumentZSet() zset.AddDocument(doc, 1) // Insert with multiplicity 1 op := dbsp.NewProjection(projector) result, err := op.Process(zset)
Index ¶
- func DeepCopyAny(val any) any
- func DeepEqual(a, b Document) (bool, error)
- type AddOp
- type AggregateInput
- type BaseOp
- type BinaryJoinOp
- type ChainGraph
- func (g *ChainGraph) AddInput(op *InputOp) string
- func (g *ChainGraph) AddToChain(op Operator) string
- func (g *ChainGraph) Arity() int
- func (g *ChainGraph) GetChain() []Operator
- func (g *ChainGraph) GetInput(name string) *InputOp
- func (g *ChainGraph) GetStartNode() string
- func (g *ChainGraph) SetJoin(op Operator) string
- func (g *ChainGraph) String() string
- func (g *ChainGraph) Validate() error
- type ConstantOp
- type DelayOp
- type DeltaZSet
- type DifferentiatorOp
- type DistinctOp
- type DistinctOptimizationRule
- type Document
- type DocumentEntry
- type DocumentZSet
- func (dz *DocumentZSet) Add(other *DocumentZSet) (*DocumentZSet, error)
- func (dz *DocumentZSet) AddDocument(doc Document, count int) (*DocumentZSet, error)
- func (dz *DocumentZSet) AddDocumentMutate(doc Document, count int) error
- func (dz *DocumentZSet) Contains(doc Document) (bool, error)
- func (dz *DocumentZSet) DeepCopy() *DocumentZSet
- func (dz *DocumentZSet) Distinct() (*DocumentZSet, error)
- func (dz *DocumentZSet) GetDocuments() ([]Document, error)
- func (dz *DocumentZSet) GetMultiplicity(doc Document) (int, error)
- func (dz *DocumentZSet) GetUniqueDocuments() ([]Document, error)
- func (dz *DocumentZSet) IsZero() bool
- func (dz *DocumentZSet) List() ([]DocumentEntry, error)
- func (dz *DocumentZSet) ShallowCopy() *DocumentZSet
- func (dz *DocumentZSet) Size() int
- func (dz *DocumentZSet) String() string
- func (dz *DocumentZSet) Subtract(other *DocumentZSet) (*DocumentZSet, error)
- func (dz *DocumentZSet) TotalSize() int
- func (dz *DocumentZSet) Unique() (*DocumentZSet, error)
- func (dz *DocumentZSet) UniqueCount() int
- type Evaluator
- type Executor
- type Extractor
- type FusedOp
- type GatherOp
- type GraphNode
- type GroupData
- type IncrementalBinaryJoinOp
- func (op *IncrementalBinaryJoinOp) HasZeroPreservationProperty() bool
- func (op *IncrementalBinaryJoinOp) IsTimeInvariant() bool
- func (op *IncrementalBinaryJoinOp) OpType() OperatorType
- func (op *IncrementalBinaryJoinOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
- func (op *IncrementalBinaryJoinOp) Reset()
- type IncrementalExecutionContext
- type IncrementalGatherOp
- type IncrementalJoinOp
- type InputOp
- type IntegrationDifferentiationEliminationRule
- type IntegratorOp
- type JoinIncrementalizationRule
- type JoinOp
- type LinearChainIncrementalizationRule
- type LinearChainRewriteEngine
- type LinearChainRule
- type LinearOperatorFusionRule
- type NegateOp
- type Operator
- type OperatorType
- type ProjectThenSelectOp
- type ProjectionOp
- type SelectThenProjectionsOp
- type SelectionOp
- type SubtractOp
- type Transformer
- type UnwindOp
- type ZSetError
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DeepCopyAny ¶
DeepCopyAny creates a deep copy of a document or any nested structure.
Types ¶
type AddOp ¶
type AddOp struct {
BaseOp
}
Addition node
func (*AddOp) Process ¶
func (n *AddOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
Process evaluates the op.
type AggregateInput ¶
AggregateInput is helper struct for gather.
type BaseOp ¶
type BaseOp struct {
// contains filtered or unexported fields
}
BaseOp is a base operator embedded by all ops.
type BinaryJoinOp ¶
type BinaryJoinOp struct {
BaseOp
// contains filtered or unexported fields
}
BinaryJoinOp is a snapshot binary join op.
func NewBinaryJoin ¶
func NewBinaryJoin(eval Evaluator, inputs []string) *BinaryJoinOp
NewBinaryJoinOp creates a new snapshot binary join op.
func (*BinaryJoinOp) HasZeroPreservationProperty ¶
func (n *BinaryJoinOp) HasZeroPreservationProperty() bool
func (*BinaryJoinOp) IsTimeInvariant ¶
func (n *BinaryJoinOp) IsTimeInvariant() bool
func (*BinaryJoinOp) OpType ¶
func (n *BinaryJoinOp) OpType() OperatorType
func (*BinaryJoinOp) Process ¶
func (n *BinaryJoinOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
Process evaluates the op.
type ChainGraph ¶
type ChainGraph struct {
// contains filtered or unexported fields
}
ChainGraph represents your specialized graph structure directly. Always: [Inputs] -> [Optional N-ary Join] -> [Chain of Linear Ops] -> [Output]
func (*ChainGraph) AddInput ¶
func (g *ChainGraph) AddInput(op *InputOp) string
AddInput adds an input node.
func (*ChainGraph) AddToChain ¶
func (g *ChainGraph) AddToChain(op Operator) string
AddToChain adds an operation to the linear chain.
func (*ChainGraph) Arity ¶
func (g *ChainGraph) Arity() int
Arity returns the number of inputs of the chain.
func (*ChainGraph) GetChain ¶
func (g *ChainGraph) GetChain() []Operator
GetChain returns the operations along the linear chain.
func (*ChainGraph) GetInput ¶
func (g *ChainGraph) GetInput(name string) *InputOp
GetInput returns a named input of the chain.
func (*ChainGraph) GetStartNode ¶
func (g *ChainGraph) GetStartNode() string
GetStartNode returns the node where the linear chain begins.
func (*ChainGraph) SetJoin ¶
func (g *ChainGraph) SetJoin(op Operator) string
SetJoin sets the join node (if multiple inputs)
func (*ChainGraph) String ¶
func (g *ChainGraph) String() string
String representation for debugging (horizontal layout).
func (*ChainGraph) Validate ¶
func (g *ChainGraph) Validate() error
Validate checks the graph structure.
type ConstantOp ¶
type ConstantOp struct {
BaseOp
// contains filtered or unexported fields
}
Constant node.
func NewConstant ¶
func NewConstant(value *DocumentZSet, name string) *ConstantOp
NewConstant creates a new constant op.
func (*ConstantOp) Process ¶
func (n *ConstantOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
Process evaluates the op.
type DelayOp ¶
type DelayOp struct {
BaseOp
// contains filtered or unexported fields
}
DelayOp implements the z^(-1) operator: delays stream by one timestep. PROBLEMATIC: This is stateful and needs careful handling in the execution model.
func (*DelayOp) HasZeroPreservationProperty ¶
func (*DelayOp) IsTimeInvariant ¶
func (*DelayOp) OpType ¶
func (n *DelayOp) OpType() OperatorType
func (*DelayOp) Process ¶
func (n *DelayOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
Process evaluates the op.
type DeltaZSet ¶
type DeltaZSet = map[string]*DocumentZSet
type DifferentiatorOp ¶
type DifferentiatorOp struct {
BaseOp
// contains filtered or unexported fields
}
DifferentiatorOp implements the D operator: converts snapshots to deltas. D(s)[t] = s[t] - s[t-1]
func NewDifferentiator ¶
func NewDifferentiator() *DifferentiatorOp
NewDifferentiator creates a new D operator.
func (*DifferentiatorOp) HasZeroPreservationProperty ¶
func (n *DifferentiatorOp) HasZeroPreservationProperty() bool
func (*DifferentiatorOp) IsTimeInvariant ¶
func (n *DifferentiatorOp) IsTimeInvariant() bool
func (*DifferentiatorOp) OpType ¶
func (n *DifferentiatorOp) OpType() OperatorType
func (*DifferentiatorOp) Process ¶
func (n *DifferentiatorOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
Process evaluates the op.
func (*DifferentiatorOp) Reset ¶
func (n *DifferentiatorOp) Reset()
Reset state (useful for testing or restarting computation)
type DistinctOp ¶
type DistinctOp struct {
BaseOp
}
Distinct node
func (*DistinctOp) HasZeroPreservationProperty ¶
func (n *DistinctOp) HasZeroPreservationProperty() bool
func (*DistinctOp) IsTimeInvariant ¶
func (n *DistinctOp) IsTimeInvariant() bool
func (*DistinctOp) OpType ¶
func (n *DistinctOp) OpType() OperatorType
func (*DistinctOp) Process ¶
func (n *DistinctOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
Process evaluates the op.
type DistinctOptimizationRule ¶
type DistinctOptimizationRule struct{}
Rule 4: Optimize "distinct" operations.
func (*DistinctOptimizationRule) Apply ¶
func (r *DistinctOptimizationRule) Apply(graph *ChainGraph) error
func (*DistinctOptimizationRule) CanApply ¶
func (r *DistinctOptimizationRule) CanApply(graph *ChainGraph) bool
func (*DistinctOptimizationRule) Name ¶
func (r *DistinctOptimizationRule) Name() string
type Document ¶
Document represents an unstructured document as map[string]any. Can contain embedded maps, slices, and primitives (int64, float64, string, bool).
func DeepCopyDocument ¶
DeepCopyDocument creates a deep copy of a document.
func NewDocument ¶
func NewDocument() Document
NewDocument creates a new document and wraps it to be added to a Z-sets.
type DocumentEntry ¶
DocumentEntry represents a document with its multiplicity in a Z-set.
type DocumentZSet ¶
type DocumentZSet struct {
// contains filtered or unexported fields
}
DocumentZSet implements Z-sets for atomic documents. Documents are treated as opaque units - no internal structure operations are considered.
func FromDocuments ¶
func FromDocuments(docs []Document) (*DocumentZSet, error)
FromDocuments creates a Z-set from a slice of documents (each with multiplicity 1).
func NewDocumentZSet ¶
func NewDocumentZSet() *DocumentZSet
NewDocumentZSet creates an empty DocumentZSet.
func SingletonZSet ¶
func SingletonZSet(doc Document) (*DocumentZSet, error)
SingletonZSet creates a Z-set containing a single document with multiplicity 1.
func (*DocumentZSet) Add ¶
func (dz *DocumentZSet) Add(other *DocumentZSet) (*DocumentZSet, error)
Add performs Z-set addition (union with multiplicity).
func (*DocumentZSet) AddDocument ¶
func (dz *DocumentZSet) AddDocument(doc Document, count int) (*DocumentZSet, error)
AddDocument adds a document to the ZSet with given multiplicity and creates a new ZSet. This is the core operation for building Z-sets. It copies the added doc.
func (*DocumentZSet) AddDocumentMutate ¶
func (dz *DocumentZSet) AddDocumentMutate(doc Document, count int) error
AddDocumentMutate adds a document to the ZSet with given multiplicity by modifying the Zset in place.
func (*DocumentZSet) Contains ¶
func (dz *DocumentZSet) Contains(doc Document) (bool, error)
Contains checks if a document exists in the Z-set with positive multiplicity.
func (*DocumentZSet) DeepCopy ¶
func (dz *DocumentZSet) DeepCopy() *DocumentZSet
DeepCopy creates a deep copy of the DocumentZSet.
func (*DocumentZSet) Distinct ¶
func (dz *DocumentZSet) Distinct() (*DocumentZSet, error)
Distinct converts Z-set to set semantics (all multiplicities become 1). This is crucial for converting from multiset to set semantics
func (*DocumentZSet) GetDocuments ¶
func (dz *DocumentZSet) GetDocuments() ([]Document, error)
GetDocuments returns all documents as a slice (with multiplicities). Documents with multiplicity n appear n times in the result.
func (*DocumentZSet) GetMultiplicity ¶
func (dz *DocumentZSet) GetMultiplicity(doc Document) (int, error)
GetMultiplicity returns the multiplicity of a specific document.
func (*DocumentZSet) GetUniqueDocuments ¶
func (dz *DocumentZSet) GetUniqueDocuments() ([]Document, error)
GetUniqueDocuments returns all unique documents (ignoring multiplicities).
func (*DocumentZSet) IsZero ¶
func (dz *DocumentZSet) IsZero() bool
IsZero checks if the Z-set is empty (no documents with positive multiplicity).
func (*DocumentZSet) List ¶
func (dz *DocumentZSet) List() ([]DocumentEntry, error)
List returns all documents with their multiplicities (including negative ones).
func (*DocumentZSet) ShallowCopy ¶
func (dz *DocumentZSet) ShallowCopy() *DocumentZSet
ShallowCopy creates a shallow copy of the DocumentZSet.
func (*DocumentZSet) Size ¶
func (dz *DocumentZSet) Size() int
Size returns the number of documents counting only positive multiplicities.
func (*DocumentZSet) String ¶
func (dz *DocumentZSet) String() string
String returns a string representation of the Z-set for debugging.
func (*DocumentZSet) Subtract ¶
func (dz *DocumentZSet) Subtract(other *DocumentZSet) (*DocumentZSet, error)
Subtract performs Z-set subtraction.
func (*DocumentZSet) TotalSize ¶
func (dz *DocumentZSet) TotalSize() int
TotalSize returns the total number of documents, counting both positive and negative multiplicities.
func (*DocumentZSet) Unique ¶
func (dz *DocumentZSet) Unique() (*DocumentZSet, error)
Unique converts a Z-set to set semantics preserving multiplicity sign (all multiplicities become +/-1).
func (*DocumentZSet) UniqueCount ¶
func (dz *DocumentZSet) UniqueCount() int
UniqueCount returns number of unique documents (ignoring multiplicities).
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
Executor executes incremental queries on the specialized linear chain graph.
func NewExecutor ¶
func NewExecutor(graph *ChainGraph, log logr.Logger) (*Executor, error)
NewExecutor returns a new executor.
func (*Executor) GetExecutionPlan ¶
GetExecutionPlan returns a human-readable execution plan.
func (*Executor) GetNodeResult ¶
func (e *Executor) GetNodeResult(nodeID string, deltaInputs map[string]*DocumentZSet) (*DocumentZSet, error)
GetNodeResult returns intermediate results for debugging.
func (*Executor) ProcessDelta ¶
func (e *Executor) ProcessDelta(deltaInputs DeltaZSet) (*DocumentZSet, error)
ProcessDelta processes one delta input and produces delta output. This is the core incremental execution method.
type FusedOp ¶
type FusedOp struct {
BaseOp
// contains filtered or unexported fields
}
FusedOp is a naive fused op that just calls the subsequent nodes process function along the chain.
func NewFusedOp ¶
NewFusedOp creates a new fused op.
func (*FusedOp) HasZeroPreservationProperty ¶
func (*FusedOp) IsTimeInvariant ¶
func (*FusedOp) OpType ¶
func (n *FusedOp) OpType() OperatorType
func (*FusedOp) Process ¶
func (n *FusedOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
Process evaluates the fused op.
type GatherOp ¶
type GatherOp struct {
BaseOp
// contains filtered or unexported fields
}
Snapshot Gather Operation (stateless).
func NewGather ¶
func NewGather(keyExtractor, valueExtractor Extractor, aggregator Transformer) *GatherOp
NewGather creates a new snapshot gather op.
func (*GatherOp) HasZeroPreservationProperty ¶
func (*GatherOp) IsTimeInvariant ¶
func (*GatherOp) OpType ¶
func (op *GatherOp) OpType() OperatorType
func (*GatherOp) Process ¶
func (op *GatherOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
Process evaluates the op.
type IncrementalBinaryJoinOp ¶
type IncrementalBinaryJoinOp struct {
BaseOp
// contains filtered or unexported fields
}
IncrementalBinaryJoinOp implements an incremental binary join.
func NewIncrementalBinaryJoin ¶
func NewIncrementalBinaryJoin(eval Evaluator, inputs []string) *IncrementalBinaryJoinOp
NewIncrementalBinaryJoinOp creates a new incremental binary join.
func (*IncrementalBinaryJoinOp) HasZeroPreservationProperty ¶
func (op *IncrementalBinaryJoinOp) HasZeroPreservationProperty() bool
func (*IncrementalBinaryJoinOp) IsTimeInvariant ¶
func (op *IncrementalBinaryJoinOp) IsTimeInvariant() bool
func (*IncrementalBinaryJoinOp) OpType ¶
func (op *IncrementalBinaryJoinOp) OpType() OperatorType
func (*IncrementalBinaryJoinOp) Process ¶
func (op *IncrementalBinaryJoinOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
Process evaluates the op.
func (*IncrementalBinaryJoinOp) Reset ¶
func (op *IncrementalBinaryJoinOp) Reset()
Reset method for testing.
type IncrementalExecutionContext ¶
type IncrementalExecutionContext struct {
// contains filtered or unexported fields
}
IncrementalExecutionContext helps track incremental execution state.
func NewIncrementalExecutionContext ¶
func NewIncrementalExecutionContext(executor *Executor) *IncrementalExecutionContext
NewIncrementalExecutionContext returns a new executor context.
func (*IncrementalExecutionContext) GetCumulativeOutput ¶
func (ctx *IncrementalExecutionContext) GetCumulativeOutput() *DocumentZSet
GetCumulativeOutput returns the current cumulative output.
func (*IncrementalExecutionContext) ProcessDelta ¶
func (ctx *IncrementalExecutionContext) ProcessDelta(deltaInputs map[string]*DocumentZSet) (*DocumentZSet, error)
ProcessDelta processes one delta and updates cumulative state.
func (*IncrementalExecutionContext) Reset ¶
func (ctx *IncrementalExecutionContext) Reset()
Reset the context for a fresh start.
type IncrementalGatherOp ¶
type IncrementalGatherOp struct {
BaseOp
// contains filtered or unexported fields
}
Incremental Gather Operation (stateful) Implements optimized gather^Δ with O(|delta|) complexity
func NewIncrementalGather ¶
func NewIncrementalGather(keyExtractor, valueExtractor Extractor, aggregator Transformer) *IncrementalGatherOp
NewIncrementalGather returns a new incremental gather Operation.
func (*IncrementalGatherOp) HasZeroPreservationProperty ¶
func (op *IncrementalGatherOp) HasZeroPreservationProperty() bool
func (*IncrementalGatherOp) IsTimeInvariant ¶
func (op *IncrementalGatherOp) IsTimeInvariant() bool
func (*IncrementalGatherOp) OpType ¶
func (op *IncrementalGatherOp) OpType() OperatorType
func (*IncrementalGatherOp) Process ¶
func (op *IncrementalGatherOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
Process evaluates the op.
type IncrementalJoinOp ¶
type IncrementalJoinOp struct {
BaseOp
// contains filtered or unexported fields
}
IncrementalJoinOp implements an incremental n-ary join.
func NewIncrementalJoin ¶
func NewIncrementalJoin(eval Evaluator, inputs []string) *IncrementalJoinOp
NewIncrementalJoinOp creates a new incremental n-ary join.
func (*IncrementalJoinOp) HasZeroPreservationProperty ¶
func (op *IncrementalJoinOp) HasZeroPreservationProperty() bool
func (*IncrementalJoinOp) IsTimeInvariant ¶
func (op *IncrementalJoinOp) IsTimeInvariant() bool
func (*IncrementalJoinOp) OpType ¶
func (op *IncrementalJoinOp) OpType() OperatorType
func (*IncrementalJoinOp) Process ¶
func (op *IncrementalJoinOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
Process evaluates the op.
type InputOp ¶
type InputOp struct {
BaseOp
// contains filtered or unexported fields
}
Input node (source of data).
func (*InputOp) HasZeroPreservationProperty ¶
func (*InputOp) IsTimeInvariant ¶
func (*InputOp) OpType ¶
func (n *InputOp) OpType() OperatorType
func (*InputOp) Process ¶
func (n *InputOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
Process evaluates the op.
type IntegrationDifferentiationEliminationRule ¶
type IntegrationDifferentiationEliminationRule struct{}
Rule 3: Remove I->D pairs (they cancel out).
func (*IntegrationDifferentiationEliminationRule) Apply ¶
func (r *IntegrationDifferentiationEliminationRule) Apply(graph *ChainGraph) error
func (*IntegrationDifferentiationEliminationRule) CanApply ¶
func (r *IntegrationDifferentiationEliminationRule) CanApply(graph *ChainGraph) bool
func (*IntegrationDifferentiationEliminationRule) Name ¶
func (r *IntegrationDifferentiationEliminationRule) Name() string
type IntegratorOp ¶
type IntegratorOp struct {
BaseOp
// contains filtered or unexported fields
}
IntegratorOp implements the I operator: converts deltas to snapshots. I(s)[t] = Σ(i=0 to t) s[i]
func (*IntegratorOp) HasZeroPreservationProperty ¶
func (n *IntegratorOp) HasZeroPreservationProperty() bool
func (*IntegratorOp) IsTimeInvariant ¶
func (n *IntegratorOp) IsTimeInvariant() bool
func (*IntegratorOp) OpType ¶
func (n *IntegratorOp) OpType() OperatorType
func (*IntegratorOp) Process ¶
func (n *IntegratorOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
Process evaluates the op.
func (*IntegratorOp) Reset ¶
func (n *IntegratorOp) Reset()
Reset state (useful for testing or restarting computation)
type JoinIncrementalizationRule ¶
type JoinIncrementalizationRule struct{}
Rule 1: Convert N-ary join to incremental version.
func (*JoinIncrementalizationRule) Apply ¶
func (r *JoinIncrementalizationRule) Apply(graph *ChainGraph) error
func (*JoinIncrementalizationRule) CanApply ¶
func (r *JoinIncrementalizationRule) CanApply(graph *ChainGraph) bool
func (*JoinIncrementalizationRule) Name ¶
func (r *JoinIncrementalizationRule) Name() string
type JoinOp ¶
type JoinOp struct {
BaseOp
// contains filtered or unexported fields
}
Snapshot N-ary join (non-incremental).
func (*JoinOp) HasZeroPreservationProperty ¶
func (*JoinOp) IsTimeInvariant ¶
func (*JoinOp) OpType ¶
func (op *JoinOp) OpType() OperatorType
func (*JoinOp) Process ¶
func (op *JoinOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
Process evaluates the op.
type LinearChainIncrementalizationRule ¶
type LinearChainIncrementalizationRule struct{}
Rule 2: Incrementalize the entire linear chain.
func (*LinearChainIncrementalizationRule) Apply ¶
func (r *LinearChainIncrementalizationRule) Apply(graph *ChainGraph) error
func (*LinearChainIncrementalizationRule) CanApply ¶
func (r *LinearChainIncrementalizationRule) CanApply(graph *ChainGraph) bool
func (*LinearChainIncrementalizationRule) Name ¶
func (r *LinearChainIncrementalizationRule) Name() string
type LinearChainRewriteEngine ¶
type LinearChainRewriteEngine struct {
// contains filtered or unexported fields
}
LinearChainRewriteEngine rewrites a LinearChainGraph, applying the DBSP rewriter rules.
func NewLinearChainRewriteEngine ¶
func NewLinearChainRewriteEngine() *LinearChainRewriteEngine
NewLinearChainRewriteEngine creates a new LinearChainRewriteEngine.
func (*LinearChainRewriteEngine) AddRule ¶
func (re *LinearChainRewriteEngine) AddRule(rule LinearChainRule)
AddRule adds a rule to the rewrite engine.
func (*LinearChainRewriteEngine) Optimize ¶
func (re *LinearChainRewriteEngine) Optimize(graph *ChainGraph) error
Optimize applies all rules until reaching a fixpoint.
type LinearChainRule ¶
type LinearChainRule interface {
Name() string
CanApply(graph *ChainGraph) bool
Apply(graph *ChainGraph) error
}
LinearChainRule is the general interface for rewrite rules.
type LinearOperatorFusionRule ¶
type LinearOperatorFusionRule struct{}
Rule 5: Fuse adjacent linear operations for efficiency.
func (*LinearOperatorFusionRule) Apply ¶
func (r *LinearOperatorFusionRule) Apply(graph *ChainGraph) error
func (*LinearOperatorFusionRule) CanApply ¶
func (r *LinearOperatorFusionRule) CanApply(graph *ChainGraph) bool
func (*LinearOperatorFusionRule) Name ¶
func (r *LinearOperatorFusionRule) Name() string
type NegateOp ¶
type NegateOp struct {
BaseOp
}
Negation node
func (*NegateOp) Process ¶
func (n *NegateOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
Process evaluates the op.
type Operator ¶
type Operator interface {
// Process input ZSets and produce output ZSet.
Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
// Get input arity (number of inputs expected).
Arity() int
// OpType returns the type of the operator.
OpType() OperatorType
IsTimeInvariant() bool
HasZeroPreservationProperty() bool
// contains filtered or unexported methods
}
Operator represents a computation node in the graph.
func FuseFilterProject ¶
func FuseFilterProject(filter *SelectionOp, project *ProjectionOp) (Operator, error)
Helper function for safe fusion
func IncrementalizeOp ¶
IncrementalizeOp converts a "snapshot" operator into an incremental operator and returns a boolean to signal whether the conversion was successful.
type OperatorType ¶
type OperatorType int
OperatorType classifies operators for rewrite rules
const ( OpTypeLinear OperatorType = iota // Op^Δ = Op OpTypeBilinear // Op^Δ needs expansion (like joins) OpTypeNonLinear // Op^Δ needs special handling (like distinct) OpTypeStructural // Graph structure (add, subtract, etc.) )
type ProjectThenSelectOp ¶
type ProjectThenSelectOp struct {
BaseOp
// contains filtered or unexported fields
}
ProjectThenSelectOp implements a fused project followed by a select op in single pass.
func NewProjectThenSelect ¶
func NewProjectThenSelect(projEval, selEval Evaluator) *ProjectThenSelectOp
NewProjectThenSelectOp create a new fused op for a project followed by a select op.
func (*ProjectThenSelectOp) HasZeroPreservationProperty ¶
func (op *ProjectThenSelectOp) HasZeroPreservationProperty() bool
func (*ProjectThenSelectOp) IsTimeInvariant ¶
func (op *ProjectThenSelectOp) IsTimeInvariant() bool
func (*ProjectThenSelectOp) OpType ¶
func (op *ProjectThenSelectOp) OpType() OperatorType
func (*ProjectThenSelectOp) Process ¶
func (op *ProjectThenSelectOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
Process evaluates the fused op.
type ProjectionOp ¶
type ProjectionOp struct {
BaseOp
// contains filtered or unexported fields
}
Projection node.
func NewProjection ¶
func NewProjection(eval Evaluator) *ProjectionOp
NewProjection creates a new projection op.
func (*ProjectionOp) HasZeroPreservationProperty ¶
func (n *ProjectionOp) HasZeroPreservationProperty() bool
func (*ProjectionOp) IsTimeInvariant ¶
func (n *ProjectionOp) IsTimeInvariant() bool
func (*ProjectionOp) OpType ¶
func (n *ProjectionOp) OpType() OperatorType
func (*ProjectionOp) Process ¶
func (n *ProjectionOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
Process evaluates the op.
type SelectThenProjectionsOp ¶
type SelectThenProjectionsOp struct {
BaseOp
// contains filtered or unexported fields
}
SelectThenProjectionsOp implements a fused optional selection plus one or more projections.
func NewSelectThenProjections ¶
func NewSelectThenProjections(selEval Evaluator, projEvals []Evaluator) *SelectThenProjectionsOp
NewSelectThenProjectionsOp creates a new fused selection/projection op.
func (*SelectThenProjectionsOp) HasZeroPreservationProperty ¶
func (op *SelectThenProjectionsOp) HasZeroPreservationProperty() bool
func (*SelectThenProjectionsOp) IsTimeInvariant ¶
func (op *SelectThenProjectionsOp) IsTimeInvariant() bool
func (*SelectThenProjectionsOp) OpType ¶
func (op *SelectThenProjectionsOp) OpType() OperatorType
func (*SelectThenProjectionsOp) Process ¶
func (op *SelectThenProjectionsOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
Process evaluates the fused op.
type SelectionOp ¶
type SelectionOp struct {
BaseOp
// contains filtered or unexported fields
}
Selection node.
func NewSelection ¶
func NewSelection(eval Evaluator) *SelectionOp
NewSelection creates a new selection op.
func (*SelectionOp) HasZeroPreservationProperty ¶
func (n *SelectionOp) HasZeroPreservationProperty() bool
func (*SelectionOp) IsTimeInvariant ¶
func (n *SelectionOp) IsTimeInvariant() bool
func (*SelectionOp) OpType ¶
func (n *SelectionOp) OpType() OperatorType
func (*SelectionOp) Process ¶
func (n *SelectionOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
Process evaluates the op.
type SubtractOp ¶
type SubtractOp struct {
BaseOp
}
Subtraction node
func (*SubtractOp) Process ¶
func (n *SubtractOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
Process evaluates the op.
type Transformer ¶
Transform documents by setting/replacing fields.
type UnwindOp ¶
type UnwindOp struct {
BaseOp
// contains filtered or unexported fields
}
UnwindOp flattens arrays within documents.
func NewUnwind ¶
func NewUnwind(arrayExtractor Extractor, transformer Transformer) *UnwindOp
NewUnwind creates a new unwind op.
func (*UnwindOp) HasZeroPreservationProperty ¶
func (*UnwindOp) IsTimeInvariant ¶
func (*UnwindOp) OpType ¶
func (op *UnwindOp) OpType() OperatorType
func (*UnwindOp) Process ¶
func (op *UnwindOp) Process(inputs ...*DocumentZSet) (*DocumentZSet, error)
Process evaluates the op.