Documentation
¶
Overview ¶
Package exec implements the Volcano-style executor for the Cypher query engine. It defines the Operator interface, the Row/RowSlab data model, and the pipeline driver Drain.
Data model ¶
A Row is a slice of expr.Value. A RowSlab is a bounded, pooled container of pre-allocated rows used to eliminate per-row heap allocations in the hot path.
Concurrency ¶
RowSlab is NOT safe for concurrent use. Each goroutine must obtain its own slab from NewRowSlab or from a sync.Pool managed by the caller. The exported SlabPool provides a ready-to-use pool with default capacity.
Index ¶
- Constants
- Variables
- func PropMapContainsNullLiteral(s string) bool
- func WithCyphermorphism(relCols []int) expandOption
- type AllNodesScan
- type AllShortestPaths
- type AntiSemiApply
- type Apply
- type Argument
- type ConstraintKind
- type ConstraintRegistry
- func (r *ConstraintRegistry) CheckSetProperty(labels []string, prop string, value lpg.PropertyValue, mgr *index.Manager) error
- func (r *ConstraintRegistry) HasNotNull(label, prop string) bool
- func (r *ConstraintRegistry) ListConstraintRows() [][]expr.Value
- func (r *ConstraintRegistry) RecordPropertySet(labels []string, prop string, value lpg.PropertyValue)
- func (r *ConstraintRegistry) RegisterNotNull(label, prop string)
- func (r *ConstraintRegistry) RegisterUnique(label, prop, indexName string)
- func (r *ConstraintRegistry) UniqueIndexName(label, prop string) (string, bool)
- func (r *ConstraintRegistry) UnregisterNotNull(label, prop string)
- func (r *ConstraintRegistry) UnregisterUnique(label, prop string)
- type ConstraintViolationError
- type CorrelatedApply
- type CreateConstraintOp
- type CreateIndexOp
- type CreateNode
- func (op *CreateNode) Close() error
- func (op *CreateNode) Init(ctx context.Context) error
- func (op *CreateNode) Next(out *Row) (bool, error)
- func (op *CreateNode) WithConstraints(reg *ConstraintRegistry, mgr *index.Manager) *CreateNode
- func (op *CreateNode) WithParams(params map[string]expr.Value) (*CreateNode, error)
- func (op *CreateNode) WithPropsEvalFn(fn PropsEvalFn) *CreateNode
- type CreateRelationship
- func (op *CreateRelationship) Close() error
- func (op *CreateRelationship) Init(ctx context.Context) error
- func (op *CreateRelationship) Next(out *Row) (bool, error)
- func (op *CreateRelationship) WithParams(params map[string]expr.Value) (*CreateRelationship, error)
- func (op *CreateRelationship) WithPropsEvalFn(fn PropsEvalFn) *CreateRelationship
- type DeleteNode
- type DeleteRelationship
- type DetachDelete
- type Direction
- type Distinct
- type DropConstraintOp
- type DropIndexOp
- type Eager
- type EagerAggregation
- type Expand
- type ExpandConfig
- type Filter
- type FilterFn
- type GlobalAggregateAdapter
- type GraphMutator
- type IndexBuffer
- type IndexKindExec
- type Int64HashIndex
- type Int64RangeIndex
- type LPGLabelSource
- type Limit
- type Merge
- func (op *Merge) Close() error
- func (op *Merge) Init(ctx context.Context) error
- func (op *Merge) Next(out *Row) (bool, error)
- func (op *Merge) WithConstraints(reg *ConstraintRegistry, mgr *index.Manager) *Merge
- func (op *Merge) WithParams(params map[string]expr.Value) (*Merge, error)
- func (op *Merge) WithPropsEvalFn(fn PropsEvalFn) *Merge
- type MergeRelAction
- type MergeRelationship
- func (op *MergeRelationship) Close() error
- func (op *MergeRelationship) Init(ctx context.Context) error
- func (op *MergeRelationship) Next(out *Row) (bool, error)
- func (op *MergeRelationship) WithOnCreate(relVar string, actions []MergeRelAction) *MergeRelationship
- func (op *MergeRelationship) WithOnMatch(relVar string, actions []MergeRelAction) *MergeRelationship
- func (op *MergeRelationship) WithRelColumn(relCol int) *MergeRelationship
- func (op *MergeRelationship) WithRelProperties(propsRaw string) *MergeRelationship
- func (op *MergeRelationship) WithSchema(schema map[string]int) *MergeRelationship
- func (op *MergeRelationship) WithUndirected(u bool) *MergeRelationship
- type MergeSearchFn
- type NodeByIndexRangeScan
- type NodeByIndexSeek
- type NodeByLabelScan
- type Operator
- type OptionalApply
- type OptionalExpand
- type ParallelScan
- type ProcedureCallOp
- type Project
- type ProjectionItem
- type PropEntry
- type PropsEvalFn
- type RangeBound
- type Record
- type RelCols
- type RelEndpointFn
- type RemoveLabels
- type RemoveProperty
- type Result
- type ResultSet
- type RollUpApply
- type Row
- type RowSlab
- type SemiApply
- type SetAllProperties
- func NewSetAllPropertiesFromEntity(entityVar, sourceVar string, isReplace bool, schema map[string]int, ...) *SetAllProperties
- func NewSetAllPropertiesFromMap(entityVar, mapLiteral string, isReplace bool, schema map[string]int, ...) (*SetAllProperties, error)
- func NewSetAllPropertiesFromParam(entityVar, paramName string, isReplace bool, schema map[string]int, ...) *SetAllProperties
- func (op *SetAllProperties) Close() error
- func (op *SetAllProperties) Init(ctx context.Context) error
- func (op *SetAllProperties) Next(out *Row) (bool, error)
- func (op *SetAllProperties) WithConstraints(reg *ConstraintRegistry, mgr *index.Manager) *SetAllProperties
- func (op *SetAllProperties) WithParams(params map[string]expr.Value) (*SetAllProperties, error)
- func (op *SetAllProperties) WithRelCols(rc RelCols) *SetAllProperties
- func (op *SetAllProperties) WithSourceRelCols(rc RelCols) *SetAllProperties
- type SetLabels
- type SetProperty
- func (op *SetProperty) Close() error
- func (op *SetProperty) Init(ctx context.Context) error
- func (op *SetProperty) Next(out *Row) (bool, error)
- func (op *SetProperty) WithConstraints(reg *ConstraintRegistry, mgr *index.Manager) *SetProperty
- func (op *SetProperty) WithParams(params map[string]expr.Value) *SetProperty
- func (op *SetProperty) WithRelCols(rc RelCols) *SetProperty
- func (op *SetProperty) WithValueEvalFn(fn ValueEvalFn) *SetProperty
- type ShortestPath
- type SingleRow
- type Skip
- type SlabPool
- type Sort
- type SortKey
- type StringHashIndex
- type StringRangeIndex
- type TargetEvalFn
- type Top
- type Union
- type UnionAll
- type Unwind
- type UnwindListFn
- type ValueEvalFn
- type VarLengthConfig
- type VarLengthExpand
Examples ¶
Constants ¶
const DefaultMaxDistinct = 10_000_000
DefaultMaxDistinct is the default upper bound on distinct rows tracked by the Distinct operator.
const DefaultMaxGroups = 1_000_000
DefaultMaxGroups is the default upper bound on distinct groups that EagerAggregation will hold in memory.
const DefaultMaxSortRows = 10_000_000
DefaultMaxSortRows is the default upper bound on rows that Sort holds in memory.
const DefaultMorselSize = 1024
DefaultMorselSize is the number of NodeIDs processed per worker goroutine per scheduling quantum. Sized to fill roughly one or two cache lines of work before touching a channel.
const DefaultSlabCapacity = 4096
DefaultSlabCapacity is the default maximum number of rows a RowSlab holds before returning ErrSlabOverflow. It is sized to keep a typical pipeline batch within a few cache lines.
Variables ¶
var ErrAggMemoryExceeded = errors.New("exec: aggregation memory cap exceeded")
ErrAggMemoryExceeded is returned by EagerAggregation.Next when the number of distinct groups exceeds the configured maxGroups limit.
var ErrConstraintViolation = errors.New("exec: constraint violation")
ErrConstraintViolation is the sentinel returned (wrapped) by CheckSetProperty when a write would violate a constraint.
var ErrDeleteNodeHasRelationships = errors.New("exec: cannot delete node with existing relationships; use DETACH DELETE")
ErrDeleteNodeHasRelationships is returned when DELETE is attempted on a node that still has one or more incident relationships. Use DETACH DELETE to remove the node together with its relationships.
var ErrDistinctMemoryExceeded = errors.New("exec: distinct memory cap exceeded")
ErrDistinctMemoryExceeded is returned by Distinct.Next when the number of distinct rows seen exceeds the configured maxDistinct limit.
var ErrIndexTypeMismatch = errors.New("exec: index type mismatch")
ErrIndexTypeMismatch is returned by NodeByIndexSeek.Init when the seek value's Kind is incompatible with the index's key type.
var ErrPropertyValueIsNull = errors.New("exec: property value is null (skip)")
ErrPropertyValueIsNull is the sentinel returned by [parsePropValue] when the value is the literal "null". By openCypher semantics, assigning null to a property removes it (or never sets it for a fresh node), so callers that catch this sentinel must skip the property entirely rather than surface a parse error.
var ErrSchemaMismatch = errors.New("exec: union schema mismatch: column counts differ")
ErrSchemaMismatch is returned when the left and right operands of a UNION produce rows with different column counts.
var ErrSlabOverflow = errors.New("exec: row slab overflow")
ErrSlabOverflow is returned by RowSlab.Alloc when the slab has reached its capacity limit. Callers must flush or reset the slab before continuing.
var ErrSortMemoryExceeded = errors.New("exec: sort memory cap exceeded")
ErrSortMemoryExceeded is returned when Sort collects more than maxRows rows.
var ErrUnwindNilChild = errors.New("exec: NewUnwind requires non-nil child Operator")
ErrUnwindNilChild is returned by NewUnwind when child is nil.
var ErrUnwindNilListFn = errors.New("exec: NewUnwind requires non-nil listFn")
ErrUnwindNilListFn is returned by NewUnwind when listFn is nil.
var ErrVarLenCapExceeded = errors.New("exec: variable-length expand safety cap exceeded")
ErrVarLenCapExceeded is returned when a VarLengthExpand exceeds its configured maximum edge traversal count.
Functions ¶
func PropMapContainsNullLiteral ¶
PropMapContainsNullLiteral reports whether the property-map source string contains any value that is the literal `null`. It is used by MERGE to surface the openCypher `MergeReadOwnWrites` error when a merge predicate contains a null property value — such a merge can never match its own write because null comparisons are always tri-valued false, so the engine rejects the pattern outright. Used at MERGE plan-build time; CREATE silently drops null-valued properties so it does NOT use this check.
The argument may be either a single map literal "{k: v, …}" or a larger surface form (e.g. a full pattern string "(a)-[r:T {k: null}]->(b)") in which case every embedded balanced "{...}" segment is scanned. The check splits each map at top-level commas, isolates each value substring, and reports true if any value (case-insensitively) equals the bare token `null`. Variable refs and expressions are ignored — they parse to non-literal forms.
func WithCyphermorphism ¶
func WithCyphermorphism(relCols []int) expandOption
WithCyphermorphism returns an option that enables cyphermorphism enforcement. relCols lists the column indices in each input row that hold existing edge IDs (as expr.IntegerValue). When Expand is about to emit a row with a new edgeID, it rejects the row if edgeID equals any value already present in those columns.
Types ¶
type AllNodesScan ¶
type AllNodesScan struct {
// contains filtered or unexported fields
}
AllNodesScan is a Volcano leaf operator that produces one Row per node in the graph. Each Row has a single column: an expr.IntegerValue holding the node's graph.NodeID cast to int64.
AllNodesScan is NOT safe for concurrent use.
func NewAllNodesScan ¶
func NewAllNodesScan(g nodeWalker) *AllNodesScan
NewAllNodesScan creates an AllNodesScan over g.
func (*AllNodesScan) Close ¶
func (op *AllNodesScan) Close() error
Close releases resources. The collected nodeIDs slice is retained (but its length zeroed) to allow reuse if Init is called again.
type AllShortestPaths ¶
type AllShortestPaths struct {
// contains filtered or unexported fields
}
AllShortestPaths is a Volcano pipeline operator that, for each input row, finds all paths of minimum length from srcCol to dstCol and emits one output row per path.
AllShortestPaths is NOT safe for concurrent use.
func NewAllShortestPaths ¶
func NewAllShortestPaths(input Operator, fwd, rev csrAdjacency, dir Direction, srcCol, dstCol int) *AllShortestPaths
NewAllShortestPaths creates an AllShortestPaths operator.
func (*AllShortestPaths) Close ¶
func (op *AllShortestPaths) Close() error
Close closes the input operator.
type AntiSemiApply ¶
type AntiSemiApply struct {
// contains filtered or unexported fields
}
AntiSemiApply emits each outer row for which the inner sub-plan produces zero rows.
AntiSemiApply is NOT safe for concurrent use.
func NewAntiSemiApply ¶
func NewAntiSemiApply(outer, inner Operator, arg *Argument) *AntiSemiApply
NewAntiSemiApply creates an AntiSemiApply operator.
- outer is the driving (left) plan.
- inner is the correlated (right) sub-plan whose leaf is arg.
- arg is the Argument node seeded with each outer row before inner Init.
type Apply ¶
type Apply struct {
// contains filtered or unexported fields
}
Apply is a Volcano pipeline operator that performs a dependent (correlated) join: for each outer row, it re-runs the inner plan seeded with that row and emits one output row per inner result.
Apply is NOT safe for concurrent use.
func NewApply ¶
NewApply creates an Apply operator.
- outer is the left (driving) plan.
- inner is the right (sub) plan; its leaf must be the provided arg.
- arg is the Argument node at the root of inner; Apply seeds it before each inner Init call.
Apply takes ownership of both plans. The caller must not use outer or inner directly after calling NewApply.
func (*Apply) Init ¶
Init initialises both the outer plan and stores ctx for subsequent Next calls. The inner plan is initialised lazily on the first outer row.
type Argument ¶
type Argument struct {
// contains filtered or unexported fields
}
Argument is a Volcano leaf operator that emits the single outer Row injected by an Apply driver. It emits exactly one row per Init/Next cycle.
Argument is NOT safe for concurrent use.
func NewArgument ¶
func NewArgument() *Argument
NewArgument creates an Argument operator with no initial row. The Apply driver must call [SetOuterRow] before the first Init/Next cycle.
func (*Argument) Close ¶
Close is a no-op; Argument holds no resources beyond the outer row reference.
func (*Argument) Init ¶
Init resets the emitted flag so that the next Next call returns the outer row. It does not require a child operator.
func (*Argument) Next ¶
Next emits the outer row exactly once per Init call. Subsequent calls return (false, nil) until Init is called again.
func (*Argument) SetOuterRow ¶
SetOuterRow injects the current outer row. It must be called by the Apply driver before each Init/Next cycle for the inner plan.
type ConstraintKind ¶
type ConstraintKind uint8
ConstraintKind distinguishes UNIQUE from NOT_NULL constraints.
const ( // ConstraintUnique requires that at most one node with a given label has a // particular value for the constrained property. ConstraintUnique ConstraintKind = iota // ConstraintNotNull requires that every node with a given label has a // non-null value for the constrained property. ConstraintNotNull )
type ConstraintRegistry ¶
type ConstraintRegistry struct {
// contains filtered or unexported fields
}
ConstraintRegistry is a thread-safe registry of active constraints. It stores unique and not-null constraints keyed by "label.prop".
ConstraintRegistry is safe for concurrent use.
func NewConstraintRegistry ¶
func NewConstraintRegistry() *ConstraintRegistry
NewConstraintRegistry creates an empty ConstraintRegistry.
func (*ConstraintRegistry) CheckSetProperty ¶
func (r *ConstraintRegistry) CheckSetProperty(labels []string, prop string, value lpg.PropertyValue, mgr *index.Manager) error
CheckSetProperty validates that setting prop = value on a node with the given labels does not violate any registered constraint. mgr is used for unique-constraint index lookups (hash index Cardinality check) as a secondary source; the primary source is the registry's own value set.
Returns *ConstraintViolationError (which wraps ErrConstraintViolation) on the first violation found; nil when all constraints pass.
func (*ConstraintRegistry) HasNotNull ¶
func (r *ConstraintRegistry) HasNotNull(label, prop string) bool
HasNotNull reports whether a not-null constraint exists for (label, prop).
func (*ConstraintRegistry) ListConstraintRows ¶
func (r *ConstraintRegistry) ListConstraintRows() [][]expr.Value
ListConstraintRows returns a [][]expr.Value where each inner slice has four elements: [name, type, label, property]. The name column uses the canonical "label.prop" key; type is "UNIQUE" or "NOT_NULL". Rows are returned in deterministic lexicographic order.
ListConstraintRows is safe for concurrent use.
func (*ConstraintRegistry) RecordPropertySet ¶
func (r *ConstraintRegistry) RecordPropertySet(labels []string, prop string, value lpg.PropertyValue)
RecordPropertySet records that a property value has been successfully written to a node with the given labels. This keeps the unique value sets up-to-date so that subsequent CheckSetProperty calls detect violations. It is a no-op when no unique constraint exists for (label, prop).
func (*ConstraintRegistry) RegisterNotNull ¶
func (r *ConstraintRegistry) RegisterNotNull(label, prop string)
RegisterNotNull adds a not-null constraint for (label, prop).
func (*ConstraintRegistry) RegisterUnique ¶
func (r *ConstraintRegistry) RegisterUnique(label, prop, indexName string)
RegisterUnique adds a unique constraint for (label, prop) backed by indexName in the index.Manager.
func (*ConstraintRegistry) UniqueIndexName ¶
func (r *ConstraintRegistry) UniqueIndexName(label, prop string) (string, bool)
UniqueIndexName returns the backing index name for a unique constraint on (label, prop), or ("", false) if none exists.
func (*ConstraintRegistry) UnregisterNotNull ¶
func (r *ConstraintRegistry) UnregisterNotNull(label, prop string)
UnregisterNotNull removes the not-null constraint for (label, prop). No-op if absent.
func (*ConstraintRegistry) UnregisterUnique ¶
func (r *ConstraintRegistry) UnregisterUnique(label, prop string)
UnregisterUnique removes the unique constraint for (label, prop). No-op if absent.
type ConstraintViolationError ¶
type ConstraintViolationError struct {
// Label is the node label the constraint is defined on.
Label string
// Property is the constrained property key.
Property string
// Kind describes the type of constraint: "UNIQUE" or "NOT NULL".
Kind string
// Detail is an optional human-readable explanation.
Detail string
}
ConstraintViolationError carries structured context about which constraint was violated.
func (*ConstraintViolationError) Error ¶
func (e *ConstraintViolationError) Error() string
Error implements the error interface.
func (*ConstraintViolationError) Unwrap ¶
func (e *ConstraintViolationError) Unwrap() error
Unwrap chains to ErrConstraintViolation so callers can use errors.Is.
type CorrelatedApply ¶
type CorrelatedApply struct {
// contains filtered or unexported fields
}
CorrelatedApply is a Volcano pipeline operator that performs a dependent (correlated) join with the convention that the inner pipeline begins with an Argument leaf re-emitting the outer row. The inner row is forwarded verbatim as the operator's output; no concatenation with the outer row is performed (the outer columns are already present in the inner row).
CorrelatedApply is NOT safe for concurrent use.
func NewCorrelatedApply ¶
func NewCorrelatedApply(outer, inner Operator, arg *Argument) *CorrelatedApply
NewCorrelatedApply creates a CorrelatedApply operator.
- outer is the left (driving) plan.
- inner is the right (sub) plan whose leftmost leaf is the provided arg.
- arg is the Argument node at the inner leaf; CorrelatedApply seeds it before each inner Init call so that the inner pipeline observes the current outer row.
CorrelatedApply takes ownership of both plans. The caller must not use outer or inner directly after calling NewCorrelatedApply.
func (*CorrelatedApply) Close ¶
func (op *CorrelatedApply) Close() error
Close releases resources and closes both the outer and inner plans.
func (*CorrelatedApply) Init ¶
func (op *CorrelatedApply) Init(ctx context.Context) error
Init initialises the outer plan and stores ctx for subsequent Next calls. The inner plan is initialised lazily on the first outer row.
func (*CorrelatedApply) Next ¶
func (op *CorrelatedApply) Next(out *Row) (bool, error)
Next advances the CorrelatedApply operator. The inner row is forwarded verbatim; CorrelatedApply does not concatenate the outer row because the inner row already carries the outer columns (the inner pipeline's leaf Argument re-emitted them).
type CreateConstraintOp ¶
type CreateConstraintOp struct {
// contains filtered or unexported fields
}
CreateConstraintOp is a Volcano DDL operator that registers a constraint.
CreateConstraintOp is NOT safe for concurrent use.
func NewCreateConstraintOp ¶
func NewCreateConstraintOp( name, label, prop string, kind ConstraintKind, ifNotExists bool, mgr *index.Manager, reg *ConstraintRegistry, onSchemaChange func(), ) *CreateConstraintOp
NewCreateConstraintOp creates a CreateConstraintOp. onSchemaChange, when non-nil, is invoked exactly once after the operator successfully registers a new constraint — i.e. NOT when the IF NOT EXISTS branch silently absorbs an already-registered constraint. The Engine wires e.ClearPlanCache as onSchemaChange so cached plans are invalidated after a real schema mutation.
func (*CreateConstraintOp) Close ¶
func (op *CreateConstraintOp) Close() error
Close implements Operator.
type CreateIndexOp ¶
type CreateIndexOp struct {
// contains filtered or unexported fields
}
CreateIndexOp is a Volcano DDL operator that registers a new secondary index.
CreateIndexOp is NOT safe for concurrent use.
func NewCreateIndexOp ¶
func NewCreateIndexOp( name string, kind IndexKindExec, ifNotExists bool, mgr *index.Manager, onSchemaChange func(), ) *CreateIndexOp
NewCreateIndexOp creates a CreateIndexOp. onSchemaChange, when non-nil, is invoked exactly once after the operator successfully creates a new index in mgr — i.e. NOT when the IF NOT EXISTS branch silently absorbs a duplicate. The Engine wires e.ClearPlanCache as onSchemaChange so cached plans are invalidated after a real schema mutation.
type CreateNode ¶
type CreateNode struct {
// contains filtered or unexported fields
}
CreateNode creates a new graph node per input row, sets its labels and properties, and appends the new NodeID as a new column.
CreateNode is NOT safe for concurrent use.
func NewCreateNode ¶
func NewCreateNode( nodeVar string, labels []string, properties string, child Operator, mutator GraphMutator, ) (*CreateNode, error)
NewCreateNode creates a CreateNode operator.
nodeVar is the variable name bound to the new node (may be empty if the node is not referenced downstream). labels is the ordered list of labels to attach. properties is the opaque literal property-map string (e.g. `{name: "Alice"}`) produced by the IR translator; it is parsed once during construction. mutator is the graph write surface.
func (*CreateNode) Init ¶
func (op *CreateNode) Init(ctx context.Context) error
Init initialises the operator and its child.
The first CreateNode.Init in the process also seeds [globalNodeCounter] past the largest synthetic key currently interned in op.mutator, so that node keys generated in this process cannot collide with keys persisted by an earlier process and replayed during WAL / snapshot recovery. The seed is gated by [globalNodeCounterSeededOnce] so the scan runs at most once per process regardless of how many CreateNode operators are created.
func (*CreateNode) Next ¶
func (op *CreateNode) Next(out *Row) (bool, error)
Next pulls one row from the child, creates a node, and appends the NodeID column. Returns (true, nil) when a row was produced, (false, nil) at end-of-stream, (false, err) on error.
func (*CreateNode) WithConstraints ¶
func (op *CreateNode) WithConstraints(reg *ConstraintRegistry, mgr *index.Manager) *CreateNode
WithConstraints attaches a ConstraintRegistry and index.Manager to the operator for pre-write enforcement. Both must be non-nil. Returns op for chaining.
func (*CreateNode) WithParams ¶
func (op *CreateNode) WithParams(params map[string]expr.Value) (*CreateNode, error)
WithParams attaches query parameters for $name substitution in property expressions. Re-parses the property map with the supplied params. Returns op for chaining.
func (*CreateNode) WithPropsEvalFn ¶
func (op *CreateNode) WithPropsEvalFn(fn PropsEvalFn) *CreateNode
WithPropsEvalFn attaches a per-row property evaluator. When fn is non-nil it is called on every Next invocation and its results are merged with the statically parsed props (literal values). Dynamic results take precedence over same-keyed literal values, allowing the property map to contain a mix of literals and expression-valued entries.
type CreateRelationship ¶
type CreateRelationship struct {
// contains filtered or unexported fields
}
CreateRelationship creates a new directed edge per input row between two already-bound nodes.
CreateRelationship is NOT safe for concurrent use.
func NewCreateRelationship ¶
func NewCreateRelationship( startVar, endVar, relVar, relType, properties string, schema map[string]int, child Operator, mutator GraphMutator, ) (*CreateRelationship, error)
NewCreateRelationship creates a CreateRelationship operator.
startVar and endVar are the variable names (column indices are looked up in schema) of the source and destination nodes. relVar is the variable name bound to the new relationship (may be empty). relType is the relationship type label. properties is the opaque literal property-map string. schema maps currently bound variable names to their column indices.
func (*CreateRelationship) Close ¶
func (op *CreateRelationship) Close() error
Close closes the child operator.
func (*CreateRelationship) Init ¶
func (op *CreateRelationship) Init(ctx context.Context) error
Init initialises the operator and its child.
func (*CreateRelationship) Next ¶
func (op *CreateRelationship) Next(out *Row) (bool, error)
Next pulls one row from the child, resolves the endpoint NodeIDs, creates the edge, and appends an optional RelationshipValue column.
func (*CreateRelationship) WithParams ¶
func (op *CreateRelationship) WithParams(params map[string]expr.Value) (*CreateRelationship, error)
WithParams re-parses the property map with the supplied query parameters for $name substitution. Returns op for chaining.
func (*CreateRelationship) WithPropsEvalFn ¶
func (op *CreateRelationship) WithPropsEvalFn(fn PropsEvalFn) *CreateRelationship
WithPropsEvalFn attaches a per-row property evaluator. See CreateNode.WithPropsEvalFn.
type DeleteNode ¶
type DeleteNode struct {
// contains filtered or unexported fields
}
DeleteNode deletes an already-bound node (labels + properties stripped) from the graph, provided it has no incident relationships.
DeleteNode is NOT safe for concurrent use.
func NewDeleteNode ¶
func NewDeleteNode( nodeVar string, schema map[string]int, child Operator, mutator GraphMutator, ) *DeleteNode
NewDeleteNode creates a DeleteNode operator.
func (*DeleteNode) Init ¶
func (op *DeleteNode) Init(ctx context.Context) error
Init initialises the operator and its child.
func (*DeleteNode) Next ¶
func (op *DeleteNode) Next(out *Row) (bool, error)
Next pulls one row from the child and deletes the bound node.
func (*DeleteNode) WithRelEndpoints ¶
func (op *DeleteNode) WithRelEndpoints(fn RelEndpointFn) *DeleteNode
WithRelEndpoints attaches a per-row lookup that returns the (srcID, dstID) endpoints of the edge identified by the bare-variable target. When set AND the schema-direct slot holds an IntegerValue (the in-pipeline edge-id encoding emitted by Expand), the operator dispatches to the edge-removal path instead of treating the integer as a NodeID.
func (*DeleteNode) WithTargetEvalFn ¶
func (op *DeleteNode) WithTargetEvalFn(fn TargetEvalFn) *DeleteNode
WithTargetEvalFn attaches a per-row evaluator for non-variable DELETE targets (subscripts, property access, …). When set, the operator resolves the target value via the evaluator instead of the schema lookup keyed by nodeVar.
type DeleteRelationship ¶
type DeleteRelationship struct {
// contains filtered or unexported fields
}
DeleteRelationship removes a directed edge per input row.
DeleteRelationship is NOT safe for concurrent use.
func NewDeleteRelationship ¶
func NewDeleteRelationship( relVar string, schema map[string]int, child Operator, mutator GraphMutator, ) *DeleteRelationship
NewDeleteRelationship creates a DeleteRelationship operator.
func (*DeleteRelationship) Close ¶
func (op *DeleteRelationship) Close() error
Close closes the child operator.
type DetachDelete ¶
type DetachDelete struct {
// contains filtered or unexported fields
}
DetachDelete removes all incident edges from a node and then strips the node's labels and properties.
DetachDelete is NOT safe for concurrent use.
func NewDetachDelete ¶
func NewDetachDelete( nodeVar string, schema map[string]int, child Operator, mutator GraphMutator, ) *DetachDelete
NewDetachDelete creates a DetachDelete operator.
func (*DetachDelete) Init ¶
func (op *DetachDelete) Init(ctx context.Context) error
Init initialises the operator and its child.
func (*DetachDelete) Next ¶
func (op *DetachDelete) Next(out *Row) (bool, error)
Next pulls one row from the child, removes all incident edges of the bound node, then strips the node's labels and properties.
func (*DetachDelete) WithTargetEvalFn ¶
func (op *DetachDelete) WithTargetEvalFn(fn TargetEvalFn) *DetachDelete
WithTargetEvalFn attaches a per-row evaluator for non-variable DETACH DELETE targets (subscripts, property access, …).
type Distinct ¶
type Distinct struct {
// contains filtered or unexported fields
}
Distinct is a streaming Volcano operator that emits each unique row exactly once. It maintains a hash set of seen rows; hash collisions are resolved by full equality checks.
Distinct is NOT safe for concurrent use.
func NewDistinct ¶
NewDistinct creates a Distinct operator.
- child: the upstream operator.
- maxDistinct: upper bound on distinct rows; pass 0 to use DefaultMaxDistinct.
type DropConstraintOp ¶
type DropConstraintOp struct {
// contains filtered or unexported fields
}
DropConstraintOp is a Volcano DDL operator that deregisters a constraint.
DropConstraintOp is NOT safe for concurrent use.
func NewDropConstraintOp ¶
func NewDropConstraintOp( name, label, prop string, kind ConstraintKind, ifExists bool, mgr *index.Manager, reg *ConstraintRegistry, onSchemaChange func(), ) *DropConstraintOp
NewDropConstraintOp creates a DropConstraintOp. onSchemaChange, when non-nil, is invoked exactly once after the operator successfully removes a constraint — i.e. NOT when the IF EXISTS branch silently absorbs an absent-constraint condition. The Engine wires e.ClearPlanCache as onSchemaChange so cached plans are invalidated after a real schema mutation.
func (*DropConstraintOp) Close ¶
func (op *DropConstraintOp) Close() error
Close implements Operator.
type DropIndexOp ¶
type DropIndexOp struct {
// contains filtered or unexported fields
}
DropIndexOp is a Volcano DDL operator that deregisters a secondary index.
DropIndexOp is NOT safe for concurrent use.
func NewDropIndexOp ¶
func NewDropIndexOp( name string, ifExists bool, mgr *index.Manager, onSchemaChange func(), ) *DropIndexOp
NewDropIndexOp creates a DropIndexOp. onSchemaChange, when non-nil, is invoked exactly once after the operator successfully drops the index — i.e. NOT when the IF EXISTS branch silently absorbs a missing-index error. The Engine wires e.ClearPlanCache as onSchemaChange so cached plans are invalidated after a real schema mutation.
type Eager ¶
type Eager struct {
// contains filtered or unexported fields
}
Eager is a pipeline-breaking barrier. Init drains every row from the child into an internal buffer; Next then re-emits the buffered rows in insertion order.
type EagerAggregation ¶
type EagerAggregation struct {
// contains filtered or unexported fields
}
EagerAggregation is a blocking (pipeline-breaking) Volcano operator that groups rows from its child by the specified key columns and applies per-group aggregators. It emits one output row per group once the child is exhausted.
EagerAggregation is NOT safe for concurrent use.
func NewEagerAggregation ¶
func NewEagerAggregation( child Operator, keyCols []int, aggFactories []funcs.AggregatorFactory, maxGroups int, ) (*EagerAggregation, error)
NewEagerAggregation creates an EagerAggregation operator.
- child: the upstream operator to consume.
- keyCols: column indices whose values define the group key. An empty slice computes a single global aggregate.
- aggFactories: one AggregatorFactory per aggregate expression. Must not be empty.
- maxGroups: upper bound on distinct groups; pass 0 to use DefaultMaxGroups.
func (*EagerAggregation) Close ¶
func (op *EagerAggregation) Close() error
Close closes the child operator and releases internal state.
type Expand ¶
type Expand struct {
// contains filtered or unexported fields
}
Expand is a Volcano pipeline operator that, for each input row, expands one hop along the graph's CSR adjacency.
Expand is NOT safe for concurrent use.
func NewExpand ¶
func NewExpand(input Operator, fwd, rev csrAdjacency, cfg ExpandConfig) *Expand
NewExpand creates an Expand operator. fwd is the forward CSR; rev is the reverse CSR (required for DirIn/DirBoth, ignored for DirOut).
func NewExpandWithOptions ¶
func NewExpandWithOptions(input Operator, fwd, rev csrAdjacency, cfg ExpandConfig, options ...expandOption) *Expand
NewExpandWithOptions creates an Expand operator with optional extra configuration applied via functional options (e.g. WithCyphermorphism).
All ExpandConfig fields behave identically to NewExpand. Options are applied after base construction and augment the operator's behaviour without altering its public type.
type ExpandConfig ¶
type ExpandConfig struct {
// Direction to follow. Defaults to DirOut when zero.
Direction Direction
// EdgeType, when non-empty, restricts emitted edges to those whose
// positional index is present in EdgeTypeFilter with this type label.
EdgeType string
// EdgeTypeFilter maps absolute edge positions to type labels. Required
// when EdgeType is non-empty.
EdgeTypeFilter map[uint64]string
// InputCol is the column index in each input row that holds the source
// NodeID (as expr.IntegerValue). Defaults to 0.
InputCol int
// RelCols lists the input-row columns holding edge IDs already traversed
// by sibling Expand operators in the same MATCH pattern. Each emitted
// edge must NOT match any of these columns (openCypher 9 §3.2.2
// relationship-isomorphism / cyphermorphism). Empty disables the
// check.
RelCols []int
// MultiplicityFn returns the Cypher CREATE-call multiplicity recorded
// for the directed edge (srcID, dstID). When the returned count is N >
// 1, the operator emits the corresponding output row N times in a row,
// reflecting the openCypher rule that `MATCH ()-[r]->()` enumerates
// each CREATE call separately even when the underlying simple-graph
// storage collapsed them to one entry (Merge5 [21]). A nil fn (or
// returning 0 / 1) disables the multiplicity emit and behaves like a
// plain single-row Expand.
MultiplicityFn func(srcID, dstID uint64) int64
}
ExpandConfig carries the optional configuration for NewExpand.
type Filter ¶
type Filter struct {
// contains filtered or unexported fields
}
Filter is a Volcano pipeline operator that applies a FilterFn predicate to each row produced by its child operator. It emits only rows for which the predicate returns expr.BoolValue(true).
Null and false both suppress the row (three-valued logic).
Filter is NOT safe for concurrent use.
func NewFilter ¶
NewFilter creates a Filter operator that wraps child and applies predFn to every row.
type FilterFn ¶
FilterFn is a predicate over a Row. It must return (BoolValue(true), nil) to accept the row. Any other non-error return value (including NULL and BoolValue(false)) causes the row to be dropped. An error halts the pipeline.
type GlobalAggregateAdapter ¶
type GlobalAggregateAdapter struct {
// contains filtered or unexported fields
}
GlobalAggregateAdapter wraps an EagerAggregation operator that has no group-by keys and ensures the output stream contains exactly one row even when the child is empty.
GlobalAggregateAdapter is NOT safe for concurrent use.
func NewGlobalAggregateAdapter ¶
func NewGlobalAggregateAdapter(child Operator, aggFactories []funcs.AggregatorFactory) *GlobalAggregateAdapter
NewGlobalAggregateAdapter returns a GlobalAggregateAdapter that wraps child. aggFactories must contain one factory per aggregate column, in the same order as the columns produced by child. The factories are invoked exactly once, on empty input, to synthesise the neutral row.
Note: the adapter does not validate that child is a group-by-less EagerAggregation; callers are responsible for that. Wrapping a grouped aggregation has no effect because grouped aggregations always emit either zero (no input) or N rows (one per group), and the empty-input case for a grouped aggregation is correct as is.
func (*GlobalAggregateAdapter) Close ¶
func (op *GlobalAggregateAdapter) Close() error
Close releases the adapter's child.
func (*GlobalAggregateAdapter) Init ¶
func (op *GlobalAggregateAdapter) Init(ctx context.Context) error
Init initialises the adapter and its child.
func (*GlobalAggregateAdapter) Next ¶
func (op *GlobalAggregateAdapter) Next(out *Row) (bool, error)
Next forwards child rows unchanged. When the child reports end-of-stream without ever producing a row, Next emits exactly one synthetic row built from the neutral results of each aggregate factory before signalling exhaustion itself.
type GraphMutator ¶
type GraphMutator interface {
// AddNode interns n and returns its stable NodeID. Returns the
// error from the underlying graph implementation (currently only
// [adjlist.ErrShardFull] is reachable, and only when the
// underlying [adjlist.Config.MaxShardCapacity] is set).
AddNode(n string) (graph.NodeID, error)
// AddEdge inserts a directed edge (src→dst) with weight w, interning
// endpoints as needed. Returns the stable NodeIDs of src and dst and
// any error from the underlying graph implementation.
AddEdge(src, dst string, w float64) (srcID, dstID graph.NodeID, err error)
// AddEdgeH is [GraphMutator.AddEdge] with a stable per-edge handle
// allocated by the underlying graph and stamped onto the adjacency
// slot. The returned handle keys the *ByHandle metadata setters below
// so a parallel CREATE's type and properties are resolvable on the
// read path by an identity that survives sibling-edge deletion. Used
// by CreateRelationship; MERGE keeps using AddEdge (its read path
// falls back to the per-pair union). The handle is always non-zero on
// success.
AddEdgeH(src, dst string, w float64) (srcID, dstID graph.NodeID, handle uint64, err error)
// RemoveEdge removes the directed edge from src to dst (no-op if absent).
RemoveEdge(src, dst string)
// SetNodeLabel attaches label to n (inserting n if absent). Returns
// any error from the underlying [adjlist.AdjList.AddNode] (see
// [GraphMutator.AddNode]).
SetNodeLabel(n, label string) error
// RemoveNodeLabel detaches label from n (no-op if absent).
RemoveNodeLabel(n, label string)
// RemoveNode tombstones n in the underlying graph so subsequent reads
// (AllNodesScan, count(*), Order) treat the node as absent. Callers
// should strip labels/properties/incident edges before invoking
// RemoveNode so the tombstone reflects the fully-deleted state.
RemoveNode(n string)
// IsTombstoned reports whether the NodeID has been tombstoned. Used by
// AllNodesScan to skip phantom entries the Mapper still indexes.
IsTombstoned(id graph.NodeID) bool
// SetNodeProperty sets the named property on n. Returns any error
// from the underlying [adjlist.AdjList.AddNode] (see
// [GraphMutator.AddNode]).
SetNodeProperty(n, key string, value lpg.PropertyValue) error
// DelNodeProperty removes the named property from n (no-op if absent).
DelNodeProperty(n, key string)
// NodeProperties returns a snapshot of all properties currently on n.
NodeProperties(n string) map[string]lpg.PropertyValue
// NodeLabels returns a snapshot of all labels currently on n in
// unspecified order.
NodeLabels(n string) []string
// HasEdge reports whether a directed edge from src to dst is present.
HasEdge(src, dst string) bool
// SetEdgeLabel attaches label to the directed edge (src, dst).
SetEdgeLabel(src, dst, label string)
// SetEdgeProperty sets the named property on the directed edge (src, dst).
// Returns any error from the underlying graph (e.g. schema violation).
SetEdgeProperty(src, dst, key string, value lpg.PropertyValue) error
// DelEdgeProperty removes the named property from the directed edge
// (src, dst) (no-op if absent).
DelEdgeProperty(src, dst, key string)
// EdgeProperties returns a snapshot of every property currently set on
// the directed edge (src, dst). Returns an empty map when the edge has
// no properties or does not exist.
EdgeProperties(src, dst string) map[string]lpg.PropertyValue
// EdgeLabels returns a snapshot of every label currently attached to
// the directed edge (src, dst). Returns an empty slice when the edge
// has no labels or does not exist. Used by DELETE r to capture the
// relationship type before tombstoning the edge so the row's
// post-delete RelationshipValue keeps `RETURN type(r)` working.
EdgeLabels(src, dst string) []string
// IncEdgeCreateCount bumps the Cypher CREATE-call multiplicity
// counter for the directed edge (src, dst) by one and returns the
// new (1-based) value. The counter records how many CREATE
// statements have targeted the same endpoint pair regardless of
// whether the underlying storage already had an entry — MERGE
// consults it to emit multiplicity rows when an existing edge
// satisfies the merge pattern (Merge5 [3]). The returned index is
// the per-instance idx callers pass to the *At family of
// metadata-write helpers.
IncEdgeCreateCount(src, dst string) int64
// EdgeCreateCount returns the current CREATE-call multiplicity
// counter for the directed edge (src, dst), or 0 when no CREATE
// has been recorded.
EdgeCreateCount(src, dst string) int64
// DecEdgeCreateCount decrements the CREATE-call multiplicity
// counter (floor 0). Called by DELETE so subsequent MERGEs see
// the correct multiplicity.
DecEdgeCreateCount(src, dst string)
// SetEdgeLabelAt attaches `label` to the directed edge instance
// (src, dst) at the supplied 1-based CREATE index. Used by
// CreateRelationship so parallel CREATEs of the same endpoint
// pair retain their distinct labels (Match2 [6] / Match7 [29]).
SetEdgeLabelAt(src, dst string, idx int64, label string)
// EdgeLabelsAt returns the labels recorded at instance `idx` of
// the directed edge (src, dst), or nil when the instance has no
// per-CREATE labels.
EdgeLabelsAt(src, dst string, idx int64) []string
// SetEdgePropertyAt records `key`=`value` on the directed edge
// instance (src, dst) at the supplied 1-based CREATE index.
SetEdgePropertyAt(src, dst string, idx int64, key string, value lpg.PropertyValue)
// EdgePropertiesAt returns the property map recorded at instance
// `idx` of the directed edge (src, dst), or nil when no
// per-CREATE map was captured.
EdgePropertiesAt(src, dst string, idx int64) map[string]lpg.PropertyValue
// RemoveEdgeInstance drops every per-CREATE label and property
// associated with (src, dst) at `idx`. Used by DELETE to discard
// a specific logical edge while leaving sibling instances
// untouched.
RemoveEdgeInstance(src, dst string, idx int64)
// SetEdgeLabelByHandle attaches `label` to the edge identified by the
// stable `handle` on the (src, dst) pair (see [GraphMutator.AddEdgeH]).
// The handle-keyed analogue of SetEdgeLabelAt; the read path resolves a
// parallel CREATE's type by this identity instead of a positional CSR
// index. No-op when handle is 0.
SetEdgeLabelByHandle(src, dst string, handle uint64, label string)
// EdgeLabelsByHandle returns the labels recorded for the edge
// identified by `handle` on the (src, dst) pair, or nil when none.
EdgeLabelsByHandle(src, dst string, handle uint64) []string
// SetEdgePropertyByHandle records `key`=`value` on the edge identified
// by the stable `handle` on the (src, dst) pair. No-op when handle is 0.
SetEdgePropertyByHandle(src, dst string, handle uint64, key string, value lpg.PropertyValue)
// EdgePropertiesByHandle returns the property map recorded for the edge
// identified by `handle` on the (src, dst) pair, or nil when none.
EdgePropertiesByHandle(src, dst string, handle uint64) map[string]lpg.PropertyValue
// RemoveEdgeInstanceByHandle drops every per-handle label and property
// associated with (src, dst) at `handle`. Used by DELETE to discard a
// specific logical edge while leaving sibling handles untouched.
RemoveEdgeInstanceByHandle(src, dst string, handle uint64)
// OutNeighbours returns the outgoing neighbour node keys of n as a
// snapshot slice. Callers must not mutate the returned slice.
OutNeighbours(n string) []string
// InNeighbours returns the incoming neighbour node keys of n as a
// snapshot slice. This requires a full graph walk for directed adjacency
// lists that do not maintain a reverse index. Callers must not mutate the
// returned slice.
InNeighbours(n string) []string
// OutDegree returns the number of outgoing edges from n.
OutDegree(n string) int
// ResolveNodeID translates a user-facing node key to its internal NodeID,
// returning ok=false when the node has not been interned yet.
ResolveNodeID(n string) (graph.NodeID, bool)
// ResolveNodeLabel translates an internal NodeID back to the user-facing
// node key, returning ok=false when id is unknown.
ResolveNodeLabel(id graph.NodeID) (string, bool)
// WalkNodeIDs calls fn for every node currently interned in the graph.
WalkNodeIDs(fn func(graph.NodeID) bool)
}
GraphMutator is the write surface exposed to Cypher write operators.
All methods accept the user-facing node key (string) used by the lpg.Graph[string,float64] instantiation. The graph is responsible for interning the value and returning the stable internal NodeID where applicable.
GraphMutator is NOT safe for concurrent use from multiple goroutines; each physical operator tree owns exactly one instance.
type IndexBuffer ¶
type IndexBuffer struct {
// contains filtered or unexported fields
}
IndexBuffer collects index.Change events produced by write operators during a single write transaction.
Call Enqueue for every graph mutation. At the transaction boundary:
- Commit: fans changes to index.Manager.ApplyBatch then resets.
- Rollback: discards changes without touching indexes.
IndexBuffer is NOT safe for concurrent use.
func (*IndexBuffer) Commit ¶
func (b *IndexBuffer) Commit(mgr *index.Manager)
Commit applies all buffered changes to mgr via ApplyBatch, then resets the buffer. A nil mgr is safe: changes are discarded without panicking.
func (*IndexBuffer) Enqueue ¶
func (b *IndexBuffer) Enqueue(c index.Change)
Enqueue appends c to the buffer.
func (*IndexBuffer) Len ¶
func (b *IndexBuffer) Len() int
Len returns the number of changes currently buffered.
func (*IndexBuffer) Rollback ¶
func (b *IndexBuffer) Rollback()
Rollback discards all buffered changes without applying them.
type IndexKindExec ¶
type IndexKindExec uint8
IndexKindExec distinguishes hash vs. btree in the exec layer.
const ( // ExecIndexHash creates a hash.Index[string]. ExecIndexHash IndexKindExec = iota // ExecIndexBTree creates a btree.Index[string]. ExecIndexBTree )
type Int64HashIndex ¶
type Int64HashIndex struct {
// contains filtered or unexported fields
}
Int64HashIndex adapts hash.Index[int64] to the [hashLookup] interface. It accepts only expr.IntegerValue seek keys.
func NewInt64HashIndex ¶
func NewInt64HashIndex(idx interface {
Lookup(value int64) *roaring64.Bitmap
}) *Int64HashIndex
NewInt64HashIndex constructs an Int64HashIndex.
func (*Int64HashIndex) LookupBitmap ¶
LookupBitmap implements [hashLookup].
type Int64RangeIndex ¶
type Int64RangeIndex struct {
// contains filtered or unexported fields
}
Int64RangeIndex adapts btree.Index[int64] to the [rangeLookup] interface. Nil bounds are treated as ±∞ using math.MinInt64 / math.MaxInt64.
func NewInt64RangeIndex ¶
func NewInt64RangeIndex(idx interface {
Range(lo, hi int64) *roaring64.Bitmap
}) *Int64RangeIndex
NewInt64RangeIndex constructs an Int64RangeIndex.
func (*Int64RangeIndex) RangeBitmap ¶
func (r *Int64RangeIndex) RangeBitmap(lo, hi expr.Value) *roaring64.Bitmap
RangeBitmap implements [rangeLookup].
type LPGLabelSource ¶
type LPGLabelSource struct {
// contains filtered or unexported fields
}
LPGLabelSource is a concrete [labelResolver] built from an lpg label registry and a label index. It is the standard adapter for tests and production use.
func NewLPGLabelSource ¶
NewLPGLabelSource constructs a LPGLabelSource. lookupFn should wrap lpg.LabelRegistry.Lookup, casting LabelID to uint32.
func (*LPGLabelSource) ResolveLabelBitmap ¶
func (s *LPGLabelSource) ResolveLabelBitmap(name string) *roaring64.Bitmap
ResolveLabelBitmap implements [labelResolver].
type Limit ¶
type Limit struct {
// contains filtered or unexported fields
}
Limit is a Volcano pipeline operator that forwards at most n rows from its child operator and then signals end-of-stream.
Limit is NOT safe for concurrent use.
func NewLimit ¶
NewLimit creates a Limit operator that passes at most n rows from child. n must be ≥ 0; a limit of 0 emits no rows.
type Merge ¶
type Merge struct {
// contains filtered or unexported fields
}
Merge implements MERGE semantics: match-or-create a pattern.
Merge is NOT safe for concurrent use.
func NewMerge ¶
func NewMerge( nodeVar string, labels []string, properties string, onCreateStrs, onMatchStrs []string, searchFn MergeSearchFn, schema map[string]int, child Operator, mutator GraphMutator, ) (*Merge, error)
NewMerge creates a Merge operator.
nodeVar is the variable bound to the merged node. labels and properties are the node-pattern components used when creating a new node. onCreateStrs and onMatchStrs are opaque SET-item strings from the IR translator. searchFn executes the read side of the match. schema maps variable names to column indices. mutator is the graph write surface.
func (*Merge) Init ¶
Init initialises the operator: executes the search plan, then dispatches to the ON MATCH or ON CREATE branch depending on whether the search returned any rows.
The first Merge.Init (or CreateNode.Init) in the process also seeds [globalNodeCounter] past the largest synthetic key already interned in op.mutator, so that the keys minted by [Merge.freshNodeKey] in this process cannot collide with __cx_merge_<hex> keys persisted by an earlier process and replayed during WAL / snapshot recovery. Without this a one-process-per-command consumer mints __cx_merge_1 on every command and the second MERGE silently overwrites the first node. The seed is gated by [globalNodeCounterSeededOnce] so the O(N) scan runs at most once per process regardless of how many CreateNode / Merge operators are built.
func (*Merge) Next ¶
Next emits one row: either a matched row (ON MATCH) or the created row (ON CREATE), each emitted exactly once.
func (*Merge) WithConstraints ¶
func (op *Merge) WithConstraints(reg *ConstraintRegistry, mgr *index.Manager) *Merge
WithConstraints attaches a ConstraintRegistry and index.Manager for pre-write enforcement in ON CREATE and ON MATCH actions. Both must be non-nil. Returns op for chaining.
func (*Merge) WithParams ¶
WithParams re-parses the property map with the supplied query parameters for $name substitution. Returns op for chaining.
func (*Merge) WithPropsEvalFn ¶
func (op *Merge) WithPropsEvalFn(fn PropsEvalFn) *Merge
WithPropsEvalFn attaches a per-row property evaluator. When fn is non-nil the operator re-evaluates the MERGE node-pattern property map against each driving row and uses the merged (literal ∪ dynamic) property set both as the search predicate and as the ON CREATE node-property writes. Required for MERGE patterns whose inline property map contains variable references such as `MERGE (p:Person {login: prop.login})` after an UNWIND.
Returns op for chaining.
type MergeRelAction ¶
type MergeRelAction struct {
// contains filtered or unexported fields
}
MergeRelAction is a pre-parsed `SET <relVar>.<key> = <value>` item.
func MergeRelActionFromKV ¶
func MergeRelActionFromKV(key, value string) MergeRelAction
MergeRelActionFromKV constructs a MergeRelationship ON CREATE / ON MATCH action from a (key, value) pair. value is the opaque literal string as it appears in the source query (e.g. `'foo'` or `42`).
type MergeRelationship ¶
type MergeRelationship struct {
// contains filtered or unexported fields
}
MergeRelationship matches-or-creates a single-hop directed relationship between two already-bound endpoint columns. ON CREATE / ON MATCH actions targeting the relationship variable are applied to the matched-or-created edge.
MergeRelationship is NOT safe for concurrent use.
func NewMergeRelationship ¶
func NewMergeRelationship(child Operator, srcCol, dstCol int, relType string, mutator GraphMutator) *MergeRelationship
NewMergeRelationship constructs a MergeRelationship operator.
- child is the upstream plan providing rows with the bound endpoints.
- srcCol / dstCol are the column indices that hold the src / dst NodeID.
- relType is the relationship type label (single label only).
- mutator is the graph write surface.
func (*MergeRelationship) Close ¶
func (op *MergeRelationship) Close() error
Close closes the child operator.
func (*MergeRelationship) Init ¶
func (op *MergeRelationship) Init(ctx context.Context) error
Init initialises the operator and its child.
func (*MergeRelationship) Next ¶
func (op *MergeRelationship) Next(out *Row) (bool, error)
Next emits the next input row, ensuring that the (src)-[:relType]->(dst) edge exists in the graph (either pre-existing or newly created). When an existing edge has CREATE-multiplicity N > 1 the operator emits N rows for the same upstream tuple (Merge5 [3]).
func (*MergeRelationship) WithOnCreate ¶
func (op *MergeRelationship) WithOnCreate(relVar string, actions []MergeRelAction) *MergeRelationship
WithOnCreate registers ON CREATE SET actions to apply when the edge is newly created. Each action is `<relVar>.<key> = <value>`; the caller has already verified that every action targets the relationship variable bound by this operator.
func (*MergeRelationship) WithOnMatch ¶
func (op *MergeRelationship) WithOnMatch(relVar string, actions []MergeRelAction) *MergeRelationship
WithOnMatch registers ON MATCH SET actions to apply when the edge already exists.
func (*MergeRelationship) WithRelColumn ¶
func (op *MergeRelationship) WithRelColumn(relCol int) *MergeRelationship
WithRelColumn registers the output-row column index that will carry the matched / created edge ID. When set (relCol >= 0) MergeRelationship extends the row with an IntegerValue(edgeID) at the column so downstream operators (RETURN r, count(r), …) see the bound relationship.
func (*MergeRelationship) WithRelProperties ¶
func (op *MergeRelationship) WithRelProperties(propsRaw string) *MergeRelationship
WithRelProperties registers an inline relationship property predicate (e.g. `{name: 'r2'}` from `MERGE (a)-[r:T {name: 'r2'}]->(b)`). When set, the operator filters the existing-edge search by the predicate AND writes the listed properties when a new edge is created. Pass an empty string to clear.
func (*MergeRelationship) WithSchema ¶
func (op *MergeRelationship) WithSchema(schema map[string]int) *MergeRelationship
WithSchema attaches the upstream variable-to-column mapping so entity-copy actions (`SET r = a`) can resolve the source variable from the row at write time.
func (*MergeRelationship) WithUndirected ¶
func (op *MergeRelationship) WithUndirected(u bool) *MergeRelationship
WithUndirected toggles the undirected-search behaviour. When true, the match phase probes both (src, dst) and (dst, src) directions before falling through to the edge-create path, matching the openCypher semantics of `MERGE (a)-[r:T]-(b)` (Merge5 [13]).
type MergeSearchFn ¶
MergeSearchFn is a function that executes the search sub-plan for the MERGE pattern and returns the matching rows. An empty slice means no match.
func NewMergeSearchFnFromPattern ¶
func NewMergeSearchFnFromPattern( labels []string, propertiesRaw string, params map[string]expr.Value, mutator GraphMutator, ) (MergeSearchFn, error)
NewMergeSearchFnFromPattern returns a MergeSearchFn that finds every node in mutator whose label set contains every label in labels and whose property bag is equal to every (key, value) parsed from propertiesRaw.
labels is the slice of pattern labels (may be empty when the pattern is e.g. `(n {key: v})`). propertiesRaw is the opaque literal-map string surfaced by the IR (e.g. `{name: "Alice", age: 30}`); it may be empty. params binds `$name` references in propertiesRaw to query parameters; when empty the parser ignores parameter substitution.
The function returned by NewMergeSearchFnFromPattern walks every interned node id, resolves the label and property bag, and admits the node iff every label and every property matches. Match scaling is O(N) where N is the number of interned nodes; the cost is acceptable for the typical MERGE workload (small N or label-restricted pattern). A future revision may use [labelResolver]'s bitmap intersection to short-circuit label scans.
type NodeByIndexRangeScan ¶
type NodeByIndexRangeScan struct {
// contains filtered or unexported fields
}
NodeByIndexRangeScan is a Volcano leaf operator that scans a B+tree index over a half-open, closed, or open interval. Each Row has a single column: expr.IntegerValue(nodeID).
NodeByIndexRangeScan is NOT safe for concurrent use.
func NewNodeByIndexRangeScan ¶
func NewNodeByIndexRangeScan(idx rangeLookup, lo, hi RangeBound) *NodeByIndexRangeScan
NewNodeByIndexRangeScan creates a NodeByIndexRangeScan.
func (*NodeByIndexRangeScan) Close ¶
func (op *NodeByIndexRangeScan) Close() error
Close releases resources.
type NodeByIndexSeek ¶
type NodeByIndexSeek struct {
// contains filtered or unexported fields
}
NodeByIndexSeek is a Volcano leaf operator that performs an equality lookup on a property hash index. Each Row has a single column: expr.IntegerValue(nodeID).
NodeByIndexSeek is NOT safe for concurrent use.
func NewNodeByIndexSeek ¶
func NewNodeByIndexSeek(idx hashLookup, seekValue expr.Value) *NodeByIndexSeek
NewNodeByIndexSeek creates a NodeByIndexSeek that looks up seekValue in idx.
type NodeByLabelScan ¶
type NodeByLabelScan struct {
// contains filtered or unexported fields
}
NodeByLabelScan is a Volcano leaf operator that emits one Row per NodeID carrying the named label. Each Row has a single column: expr.IntegerValue(nodeID).
NodeByLabelScan is NOT safe for concurrent use.
func NewNodeByLabelScan ¶
func NewNodeByLabelScan(labelName string, src labelResolver) *NodeByLabelScan
NewNodeByLabelScan creates a NodeByLabelScan for the given label.
func (*NodeByLabelScan) Close ¶
func (op *NodeByLabelScan) Close() error
Close releases resources held by the operator.
type Operator ¶
type Operator interface {
// Init initialises the operator and its children. ctx is stored for later
// use in Next; implementations must not begin producing rows in Init.
Init(ctx context.Context) error
// Next advances the operator by one row, writing the result into out.
// It returns (true, nil) if a row was written, (false, nil) at end-of-stream,
// or (false, err) on error. After returning (false, _), Next must not be
// called again.
//
// Implementations check ctx.Done() on every call. Long-running loops check
// ctx.Done() every 4096 iterations.
Next(out *Row) (bool, error)
// Close releases all resources held by this operator (open file handles,
// memory, goroutines). It must be called exactly once by the pipeline
// driver, even when Next returned an error.
Close() error
}
Operator is the core abstraction of the Volcano iterator model. Every node in a physical query plan implements this interface.
Lifecycle ¶
- [Init] is called exactly once before the first call to [Next].
- [Next] is called repeatedly until it returns (false, nil) or an error.
- [Close] is called exactly once, regardless of whether [Next] returned an error. Implementations must release all resources in [Close].
Cancellation ¶
Every [Next] implementation must check ctx.Done() at the top of the call. For long-running inner loops that produce more than 4096 tuples without returning, check ctx.Done() every 4096 iterations.
Concurrency ¶
An Operator instance is NOT safe for concurrent use. Each goroutine in a parallel pipeline segment owns its own operator tree.
type OptionalApply ¶
type OptionalApply struct {
// contains filtered or unexported fields
}
OptionalApply is the left-outer variant of CorrelatedApply. For every outer row, the inner pipeline is driven exactly like CorrelatedApply; when the inner pipeline produces zero rows for a given outer row, OptionalApply emits a single NULL-extended row whose width equals the configured padded width, holding the outer columns followed by NULL placeholders for the inner-introduced columns.
OptionalApply is NOT safe for concurrent use.
func NewOptionalApply ¶
func NewOptionalApply(outer, inner Operator, arg *Argument, paddedWidth int) *OptionalApply
NewOptionalApply creates an OptionalApply operator.
- outer is the left (driving) plan.
- inner is the right (sub) plan whose leftmost leaf is the provided arg.
- arg is the Argument node at the inner leaf; OptionalApply seeds it before each inner Init call.
- paddedWidth is the total width of an output row, i.e. outerWidth plus the number of columns the inner pipeline introduces. When the inner pipeline emits zero rows for an outer, OptionalApply emits a row of this width whose first outerWidth columns are the outer row and whose trailing columns are expr.Null.
OptionalApply takes ownership of both plans.
func (*OptionalApply) Close ¶
func (op *OptionalApply) Close() error
Close releases resources and closes both the outer and inner plans.
func (*OptionalApply) Init ¶
func (op *OptionalApply) Init(ctx context.Context) error
Init initialises both the outer plan and stores ctx for subsequent Next calls.
func (*OptionalApply) Next ¶
func (op *OptionalApply) Next(out *Row) (bool, error)
Next emits the next output row. The semantics are:
- For each outer row, drain the inner pipeline.
- If the inner pipeline emits ≥1 row, those rows are forwarded verbatim.
- If the inner pipeline emits 0 rows, a single NULL-extended row is emitted consisting of the outer columns followed by expr.Null for every inner column the pipeline would have introduced.
type OptionalExpand ¶
type OptionalExpand struct {
// contains filtered or unexported fields
}
OptionalExpand is a Volcano pipeline operator that performs a single-hop expansion and emits a NULL-extended row when no edges match for an input node.
OptionalExpand is NOT safe for concurrent use.
func NewOptionalExpand ¶
func NewOptionalExpand(input Operator, fwd, rev csrAdjacency, cfg ExpandConfig) *OptionalExpand
NewOptionalExpand creates an OptionalExpand operator.
- input is the upstream operator supplying node IDs.
- fwd is the forward CSR adjacency.
- rev is the reverse CSR adjacency (required for DirIn/DirBoth).
- cfg is the Expand configuration (direction, edge-type filter, inputCol).
The NULL-extension row uses the same column layout as Expand: inputRow... || srcID || Null(edgeID) || Null(dstID).
func (*OptionalExpand) Close ¶
func (op *OptionalExpand) Close() error
Close closes the input and child operators.
func (*OptionalExpand) Init ¶
func (op *OptionalExpand) Init(ctx context.Context) error
Init initialises the operator.
func (*OptionalExpand) Next ¶
func (op *OptionalExpand) Next(out *Row) (bool, error)
Next emits the next row. For each input row:
- It feeds the row into the inner Expand one hop at a time.
- If Expand emits ≥1 row, those rows are forwarded as-is.
- If Expand emits 0 rows for an input node, a NULL-extended row is emitted.
type ParallelScan ¶
type ParallelScan struct {
// contains filtered or unexported fields
}
ParallelScan is a Volcano leaf operator that partitions the full node scan into fixed-size morsels and executes them concurrently on up to GOMAXPROCS worker goroutines.
ParallelScan is NOT safe for concurrent use.
func NewParallelScan ¶
func NewParallelScan(g nodeWalker, morselSize int) *ParallelScan
NewParallelScan creates a ParallelScan over g. morselSize controls the chunk size per worker; pass 0 to use DefaultMorselSize.
func (*ParallelScan) Close ¶
func (op *ParallelScan) Close() error
Close cancels workers and waits for them to exit.
type ProcedureCallOp ¶
type ProcedureCallOp struct {
// contains filtered or unexported fields
}
ProcedureCallOp invokes a registered procedure and emits its result rows.
ProcedureCallOp is NOT safe for concurrent use.
func NewProcedureCallOp ¶
func NewProcedureCallOp( namespace []string, name string, argExprs []func(Row) (expr.Value, error), yieldVars []string, child Operator, reg *procs.Registry, ) *ProcedureCallOp
NewProcedureCallOp creates a ProcedureCallOp.
namespace and name identify the procedure. argExprs evaluate procedure arguments against the current driver row. yieldVars names the output columns. child is the driving subplan; pass nil for a standalone CALL. reg is the procedure registry used for lookup at runtime.
func (*ProcedureCallOp) Close ¶
func (op *ProcedureCallOp) Close() error
Close releases resources and closes the child operator.
func (*ProcedureCallOp) Init ¶
func (op *ProcedureCallOp) Init(ctx context.Context) error
Init resets internal state and initialises the child if present.
func (*ProcedureCallOp) Next ¶
func (op *ProcedureCallOp) Next(out *Row) (bool, error)
Next advances the operator by one row.
It draws driving rows from the child (or a synthetic empty row when child is nil), invokes the procedure for each, buffers all result rows, and emits them one at a time.
Void procedure semantics. A procedure declared with no output columns (len(yieldVars) == 0) is treated as a side-effect-only step. When invoked in-query (op.child != nil) it must NOT consume the driver row; instead each driver row is emitted unchanged once the impl has run, preserving the upstream variable bindings for any downstream RETURN. Standalone CALL (op.child == nil) emits nothing.
type Project ¶
type Project struct {
// contains filtered or unexported fields
}
Project is a Volcano pipeline operator that applies a list of ProjectionItem expressions to each input row, producing an output row with one column per item.
Project is NOT safe for concurrent use.
func NewProject ¶
func NewProject(child Operator, items []ProjectionItem) (*Project, error)
NewProject creates a Project operator. items defines the output schema; each item's Eval function is applied to each input row. An empty items slice is legal (e.g. `WITH *` over a pattern that binds no variables); the resulting operator forwards an empty Row for every input row.
type ProjectionItem ¶
type ProjectionItem struct {
// Alias is the output column name (e.g. "n", "count(n)", "x").
Alias string
// Eval evaluates the item expression against the current input row and
// returns the projected value.
Eval func(Row) (expr.Value, error)
}
ProjectionItem describes a single column in a projection. Eval is evaluated against the input row; Alias names the resulting output column.
type PropEntry ¶
type PropEntry struct {
Key string
Value lpg.PropertyValue
}
PropEntry is an exported key/value pair for use by external plan builders (api.go) that construct dynamic property evaluators. It mirrors propLiteral but carries exported fields so the physical builder can return values from a PropsEvalFn without requiring propLiteral to be exported.
type PropsEvalFn ¶
PropsEvalFn is a per-row property evaluator closure. It receives the current row and returns a slice of (key, value) pairs produced by evaluating the property-map AST expressions against the row's bound variables. Any entry whose evaluation yields Null is omitted (openCypher: assigning null to a property is a no-op on a fresh node).
The closure is constructed once by the physical plan builder and captures the schema, function registry, and query parameters.
type RangeBound ¶
type RangeBound struct {
// Value is the bound's expr.Value. Nil means unbounded (use the
// minimum or maximum representable value for the index type).
Value expr.Value
// Include determines whether the bound is inclusive (≤ / ≥) or exclusive
// (< / >).
Include bool
}
RangeBound carries one endpoint of a range predicate.
type Record ¶
type Record map[string]interface{}
Record is a single result row, accessed by column name. The underlying map is owned by the ResultSet; callers must copy values they need to retain beyond the next ResultSet.Next call.
type RelCols ¶
RelCols carries the raw column indices that the Expand operator places for a relationship variable. SetProperty and RemoveProperty use it to reconstruct the (src, dst) endpoint keys when the bound entity is a relationship rather than a node.
The edgeCol (schema[entityVar]) holds the edge-position counter; SrcCol and DstCol hold the corresponding endpoint NodeIDs as IntegerValue.
type RelEndpointFn ¶
RelEndpointFn returns the (srcID, dstID) endpoints for an edge that the schema-direct path is about to delete. Used when the bare-variable target carries an IntegerValue edge id (the in-pipeline encoding emitted by Expand) so DeleteNode can dispatch to the edge-removal branch without misinterpreting the id as a node id.
type RemoveLabels ¶
type RemoveLabels struct {
// contains filtered or unexported fields
}
RemoveLabels removes one or more labels from an already-bound node per input row.
RemoveLabels is NOT safe for concurrent use.
func NewRemoveLabels ¶
func NewRemoveLabels( nodeVar string, labels []string, schema map[string]int, child Operator, mutator GraphMutator, ) *RemoveLabels
NewRemoveLabels creates a RemoveLabels operator.
type RemoveProperty ¶
type RemoveProperty struct {
// contains filtered or unexported fields
}
RemoveProperty removes a single named property from an already-bound node or relationship per input row. For relationships, call WithRelCols to supply the endpoint column indices.
RemoveProperty is NOT safe for concurrent use.
func NewRemoveProperty ¶
func NewRemoveProperty( entityVar, propertyKey string, schema map[string]int, child Operator, mutator GraphMutator, ) *RemoveProperty
NewRemoveProperty creates a RemoveProperty operator.
func (*RemoveProperty) Close ¶
func (op *RemoveProperty) Close() error
Close closes the child operator.
func (*RemoveProperty) Init ¶
func (op *RemoveProperty) Init(ctx context.Context) error
Init initialises the operator and its child.
func (*RemoveProperty) Next ¶
func (op *RemoveProperty) Next(out *Row) (bool, error)
Next pulls one row from the child and removes the specified property.
func (*RemoveProperty) WithRelCols ¶
func (op *RemoveProperty) WithRelCols(rc RelCols) *RemoveProperty
WithRelCols marks entityVar as a relationship variable and records the row columns that hold the src and dst NodeIDs. Must be called before the first Next invocation. Returns op for chaining.
type Result ¶
type Result interface {
// Next advances to the next result row. It returns true if a row is
// available, false at end-of-stream or on error. After Next returns false,
// callers should check Err.
Next() bool
// Record returns the current row as a [Record] (column name → value).
// Record must not be called before the first successful Next or after Next
// returns false.
Record() Record
// Err returns the first error encountered during iteration, or nil.
Err() error
// Columns returns the ordered list of column names for this result set.
// The slice is stable across calls and is not modified after construction.
Columns() []string
// Close releases all resources held by this result set, including the
// underlying operator tree. It must be called exactly once.
Close() error
}
Result is a forward-only, streaming iterator over query result rows.
Lifecycle ¶
- Call [Next] in a loop until it returns false.
- After the loop, check [Err] for any error that terminated iteration.
- Call [Close] exactly once to release resources.
Concurrency ¶
Result implementations are NOT safe for concurrent use.
type ResultSet ¶
type ResultSet struct {
// contains filtered or unexported fields
}
ResultSet is the concrete implementation of Result returned by Run.
ResultSet is NOT safe for concurrent use.
func Run ¶
Run initialises plan, stores the column names, and returns a ResultSet ready for iteration. The caller drives iteration via ResultSet.Next and must call ResultSet.Close when done.
Run does not pull any rows; all work happens lazily in ResultSet.Next. The Record map is pre-allocated once here and reused across every Next call to eliminate the per-row allocation that previously dominated heap usage.
func (*ResultSet) Close ¶
Close releases all resources held by the ResultSet, including the underlying operator tree. It must be called exactly once. Calling Close after a previous Close is a no-op.
func (*ResultSet) Columns ¶
Columns returns the ordered list of column names. The slice is never nil and is stable for the lifetime of the ResultSet.
func (*ResultSet) Next ¶
Next advances to the next result row. It returns true when a row is available (accessible via Record), and false at end-of-stream or on error.
func (*ResultSet) Record ¶
Record returns the current row. Must only be called after a successful Next.
The returned map is owned by the ResultSet and reused by the next Next call; callers that need to retain a row beyond the next Next must copy it (or use ResultSet.TakeRecord).
func (*ResultSet) TakeRecord ¶
TakeRecord returns the current row and transfers ownership of its backing map to the caller, installing a fresh map for subsequent Next calls. Unlike ResultSet.Record — whose result is reused on the next Next — the map returned here is safe to retain. The materialisation path uses this to drain rows under the transaction-visibility barrier without the extra per-row copy that re-hashing every column into a new map would cost. Must only be called after a successful Next.
type RollUpApply ¶
type RollUpApply struct {
// contains filtered or unexported fields
}
RollUpApply is a Volcano pipeline operator that performs pattern-comprehension execution: for each outer row, it drains the entire inner sub-plan into a expr.ListValue and emits (outerRow... || listValue) as a single output row.
RollUpApply is NOT safe for concurrent use.
func NewRollUpApply ¶
func NewRollUpApply(outer, inner Operator, arg *Argument, listEval func(Row) (expr.Value, error)) *RollUpApply
NewRollUpApply creates a RollUpApply operator.
- outer is the driving (left) plan.
- inner is the correlated (right) sub-plan whose leaf is arg.
- arg is the Argument node seeded with each outer row before inner Init.
- listEval, when non-nil, is called for each inner row to extract the value to collect into the list. When nil, the first column of each inner row is collected.
func (*RollUpApply) Close ¶
func (op *RollUpApply) Close() error
Close closes both the outer and inner plans.
type Row ¶
Row is a single tuple in the pipeline: a slice of expr.Value whose positions correspond to the operator's output schema. The slice is owned by the RowSlab that allocated it; callers must not retain it beyond the slab's lifetime.
func Drain ¶
Drain initialises op, pulls every row from the pipeline, and returns the collected rows as a []Row. Close is always called before Drain returns, regardless of whether an error occurred.
Cancellation: Drain honours ctx.Done() via the per-Next check inside each operator. If ctx is cancelled, Drain returns the partial result set and the context error.
The returned rows are independent copies: each element is a snapshot of the Row written by the operator at that iteration. Callers own the returned slice.
Example ¶
ExampleDrain assembles the equivalent of `UNWIND [10, 20, 30] AS x RETURN x LIMIT 2` as a hand-built operator tree and drains it. SingleRow seeds one empty row, Unwind expands the literal list, and Limit caps the output.
package main
import (
"context"
"fmt"
"github.com/FlavioCFOliveira/GoGraph/cypher/exec"
"github.com/FlavioCFOliveira/GoGraph/cypher/expr"
)
func main() {
// SingleRow emits exactly one empty row to drive the pipeline.
src := exec.NewSingleRowOperator()
// Unwind expands a fixed list; listFn ignores the (empty) input row.
unwind, err := exec.NewUnwind(src, func(exec.Row) (expr.ListValue, error) {
return expr.ListValue{
expr.IntegerValue(10),
expr.IntegerValue(20),
expr.IntegerValue(30),
}, nil
})
if err != nil {
fmt.Println("NewUnwind:", err)
return
}
// Limit passes at most two rows downstream.
limit, err := exec.NewLimit(unwind, 2)
if err != nil {
fmt.Println("NewLimit:", err)
return
}
// Drain runs the pipeline and always closes it before returning.
rows, err := exec.Drain(context.Background(), limit)
if err != nil {
fmt.Println("Drain:", err)
return
}
fmt.Println("rows:", len(rows))
for _, r := range rows {
fmt.Println("x =", r[0])
}
}
Output: rows: 2 x = 10 x = 20
type RowSlab ¶
type RowSlab struct {
// contains filtered or unexported fields
}
RowSlab is a bounded arena of pre-allocated rows. It eliminates per-row heap allocations by backing all rows in a single flat slice. Each call to [Alloc] hands out a sub-slice at zero GC cost after the initial backing allocation.
RowSlab is NOT safe for concurrent use; each pipeline stage owns its own instance, typically obtained from SlabPool.
Lifecycle ¶
- Obtain a slab from SlabPool.Get (or call NewRowSlab).
- Call [Alloc] for each row needed in the current batch.
- When the batch is fully processed, call [Reset] and return the slab to SlabPool.Put.
[Reset] zeroes the column values in every allocated row so that no expr.Value reference is retained past the batch boundary (preventing GC nepotism between batches).
func NewRowSlab ¶
NewRowSlab creates a RowSlab with the given column count and row capacity. width is the number of expr.Value slots pre-allocated per row; pass 0 for variable-width rows (callers supply their own slice to [AllocRaw]). capacity must be ≥ 1; DefaultSlabCapacity is a reasonable default.
func (*RowSlab) Alloc ¶
Alloc returns the next available pre-allocated row in the slab. It returns ErrSlabOverflow if the slab is exhausted. The row width matches the width passed to NewRowSlab; for variable-width slabs (width=0) use [AllocRaw].
func (*RowSlab) AllocRaw ¶
AllocRaw returns the next row slot for variable-width slabs (width=0). The caller is responsible for setting the returned row to a correctly-sized slice before use. For fixed-width slabs, use [Alloc].
type SemiApply ¶
type SemiApply struct {
// contains filtered or unexported fields
}
SemiApply emits each outer row for which the inner sub-plan produces at least one row. The inner plan is closed after the first match (short-circuit).
SemiApply is NOT safe for concurrent use.
func NewSemiApply ¶
NewSemiApply creates a SemiApply operator.
- outer is the driving (left) plan.
- inner is the correlated (right) sub-plan whose leaf is arg.
- arg is the Argument node seeded with each outer row before inner Init.
func (*SemiApply) Close ¶
Close closes the outer plan. The inner plan is already closed per-row inside Next; a redundant close here is safe because Close on a closed operator must be a no-op per the Operator contract.
type SetAllProperties ¶
type SetAllProperties struct {
// contains filtered or unexported fields
}
SetAllProperties replaces or merges every property on an already-bound node or relationship per input row.
SetAllProperties is NOT safe for concurrent use.
func NewSetAllPropertiesFromEntity ¶
func NewSetAllPropertiesFromEntity( entityVar, sourceVar string, isReplace bool, schema map[string]int, child Operator, mutator GraphMutator, ) *SetAllProperties
NewSetAllPropertiesFromEntity creates a SetAllProperties operator copying every property from sourceVar (a bound node or relationship) to entityVar. isReplace selects `=` (true) vs `+=` (false) semantics.
func NewSetAllPropertiesFromMap ¶
func NewSetAllPropertiesFromMap( entityVar, mapLiteral string, isReplace bool, schema map[string]int, child Operator, mutator GraphMutator, ) (*SetAllProperties, error)
NewSetAllPropertiesFromMap creates a SetAllProperties operator writing every key/value pair from mapLiteral to entityVar. mapLiteral is the opaque literal-map string (e.g. `{a: 1, b: "x"}`) produced by the AST printer.
func NewSetAllPropertiesFromParam ¶
func NewSetAllPropertiesFromParam( entityVar, paramName string, isReplace bool, schema map[string]int, child Operator, mutator GraphMutator, ) *SetAllProperties
NewSetAllPropertiesFromParam creates a SetAllProperties operator writing every key/value pair from the named query parameter to entityVar. The parameter must resolve to a MapValue at exec time; non-map values are treated as a no-op.
func (*SetAllProperties) Close ¶
func (op *SetAllProperties) Close() error
Close closes the child operator.
func (*SetAllProperties) Init ¶
func (op *SetAllProperties) Init(ctx context.Context) error
Init initialises the operator and its child.
func (*SetAllProperties) Next ¶
func (op *SetAllProperties) Next(out *Row) (bool, error)
Next pulls one row from the child and applies the configured whole-entity mutation. The row is forwarded unchanged so downstream operators (e.g. ProduceResults) can read the affected entity.
func (*SetAllProperties) WithConstraints ¶
func (op *SetAllProperties) WithConstraints(reg *ConstraintRegistry, mgr *index.Manager) *SetAllProperties
WithConstraints attaches a ConstraintRegistry and index.Manager for pre-write enforcement. Both must be non-nil. Returns op for chaining.
func (*SetAllProperties) WithParams ¶
func (op *SetAllProperties) WithParams(params map[string]expr.Value) (*SetAllProperties, error)
WithParams attaches query parameters for $name substitution in the literal map and for parameter-sourced operators. Returns op for chaining.
func (*SetAllProperties) WithRelCols ¶
func (op *SetAllProperties) WithRelCols(rc RelCols) *SetAllProperties
WithRelCols marks entityVar as a relationship variable and records the row columns that hold the src and dst NodeIDs. Must be called before the first Next invocation. Returns op for chaining.
func (*SetAllProperties) WithSourceRelCols ¶
func (op *SetAllProperties) WithSourceRelCols(rc RelCols) *SetAllProperties
WithSourceRelCols marks sourceVar as a relationship variable and records the row columns that hold its src and dst NodeIDs. Must be called before the first Next invocation when SourceVar is a relationship. Returns op for chaining.
type SetLabels ¶
type SetLabels struct {
// contains filtered or unexported fields
}
SetLabels adds one or more labels to an already-bound node per input row.
SetLabels is NOT safe for concurrent use.
func NewSetLabels ¶
func NewSetLabels( nodeVar string, labels []string, schema map[string]int, child Operator, mutator GraphMutator, ) *SetLabels
NewSetLabels creates a SetLabels operator.
type SetProperty ¶
type SetProperty struct {
// contains filtered or unexported fields
}
SetProperty sets or replaces properties on an already-bound node or relationship per input row. The entity is identified by entityVar. For nodes, the column value must be an IntegerValue-encoded NodeID or a NodeValue. For relationships, call WithRelCols to supply the endpoint column indices.
SetProperty is NOT safe for concurrent use.
func NewSetProperty ¶
func NewSetProperty( entityVar, propertyKey, valueExpr string, schema map[string]int, child Operator, mutator GraphMutator, ) (*SetProperty, error)
NewSetProperty creates a SetProperty operator.
entityVar is the variable name of the target node or relationship. propertyKey is the property key for single-property mode; pass empty for whole-entity mode. valueExpr is the opaque literal string from the IR. schema maps variable names to column indices. mutator is the graph write surface.
func (*SetProperty) Init ¶
func (op *SetProperty) Init(ctx context.Context) error
Init initialises the operator and its child.
func (*SetProperty) Next ¶
func (op *SetProperty) Next(out *Row) (bool, error)
Next pulls one row from the child and applies the property mutation.
func (*SetProperty) WithConstraints ¶
func (op *SetProperty) WithConstraints(reg *ConstraintRegistry, mgr *index.Manager) *SetProperty
WithConstraints attaches a ConstraintRegistry and index.Manager for pre-write enforcement. Both must be non-nil. Returns op for chaining.
func (*SetProperty) WithParams ¶
func (op *SetProperty) WithParams(params map[string]expr.Value) *SetProperty
WithParams attaches query parameters for $name substitution in value expressions. Returns op for chaining.
func (*SetProperty) WithRelCols ¶
func (op *SetProperty) WithRelCols(rc RelCols) *SetProperty
WithRelCols marks entityVar as a relationship variable and records the row columns that hold the src and dst NodeIDs. Must be called before the first Next invocation. Returns op for chaining.
func (*SetProperty) WithValueEvalFn ¶
func (op *SetProperty) WithValueEvalFn(fn ValueEvalFn) *SetProperty
WithValueEvalFn attaches a per-row evaluator for the SET RHS expression. The closure is invoked whenever the operator needs to compute the new property value; it takes priority over the literal-string parser path for single-property assignments.
type ShortestPath ¶
type ShortestPath struct {
// contains filtered or unexported fields
}
ShortestPath is a Volcano pipeline operator that, for each input row, finds a single shortest path from srcCol to dstCol using BFS and emits one output row containing the path (or Null if unreachable).
ShortestPath is NOT safe for concurrent use.
func NewShortestPath ¶
func NewShortestPath(input Operator, fwd, rev csrAdjacency, dir Direction, srcCol, dstCol int) *ShortestPath
NewShortestPath creates a ShortestPath operator.
- input is the upstream operator supplying (srcID, dstID) pairs.
- fwd is the forward CSR adjacency.
- rev is the reverse CSR (required for DirIn/DirBoth).
- dir is the traversal direction.
- srcCol / dstCol are the column indices in each input row for source and destination node IDs.
type SingleRow ¶
type SingleRow struct {
// contains filtered or unexported fields
}
SingleRow emits one empty row then signals exhaustion.
SingleRow is NOT safe for concurrent use.
func NewSingleRowOperator ¶
func NewSingleRowOperator() *SingleRow
NewSingleRowOperator returns a SingleRow operator.
type Skip ¶
type Skip struct {
// contains filtered or unexported fields
}
Skip is a Volcano pipeline operator that discards the first n rows from its child operator and then forwards all remaining rows.
Skip is NOT safe for concurrent use.
func NewSkip ¶
NewSkip creates a Skip operator that discards the first n rows from child. n must be ≥ 0; a skip of 0 is a no-op pass-through.
type SlabPool ¶
type SlabPool struct {
// contains filtered or unexported fields
}
SlabPool is a sync.Pool-backed pool of RowSlab instances with a fixed column width and capacity. Operators that process a high volume of rows should obtain slabs from a shared pool to reduce GC pressure.
SlabPool is safe for concurrent use.
func NewSlabPool ¶
NewSlabPool creates a SlabPool that vends RowSlabs with the given column width and row capacity.
type Sort ¶
type Sort struct {
// contains filtered or unexported fields
}
Sort is a blocking Volcano operator that collects all rows from its child, sorts them by the specified SortKey sequence, and emits them in order.
Sort is NOT safe for concurrent use.
func NewSort ¶
NewSort creates a Sort operator.
- child: the upstream operator to consume.
- keys: ORDER BY specification. Must not be empty.
- maxRows: upper bound on rows held in memory; pass 0 to use DefaultMaxSortRows.
type SortKey ¶
type SortKey struct {
// ColIdx is the zero-based index of the column within each Row.
// Ignored when Eval is non-nil.
ColIdx int
// Ascending controls the sort direction. true = ASC, false = DESC.
Ascending bool
// Eval is an optional expression evaluator. When non-nil the sort key
// value is obtained by calling Eval(row) rather than reading row[ColIdx].
// This supports ORDER BY expressions that are not direct projection
// output columns (e.g. ORDER BY n.age after RETURN n).
Eval func(Row) (expr.Value, error)
}
SortKey describes a single ORDER BY column.
type StringHashIndex ¶
type StringHashIndex struct {
// contains filtered or unexported fields
}
StringHashIndex adapts hash.Index[string] to the [hashLookup] interface. It accepts only expr.StringValue seek keys; other kinds return ErrIndexTypeMismatch.
func NewStringHashIndex ¶
func NewStringHashIndex(idx interface {
Lookup(value string) *roaring64.Bitmap
}) *StringHashIndex
NewStringHashIndex constructs a StringHashIndex.
func (*StringHashIndex) LookupBitmap ¶
LookupBitmap implements [hashLookup].
type StringRangeIndex ¶
type StringRangeIndex struct {
// contains filtered or unexported fields
}
StringRangeIndex adapts btree.Index[string] to the [rangeLookup] interface. Nil bounds are treated as "" (empty) / "\xff…" (all bytes 0xff, 256 chars).
func NewStringRangeIndex ¶
func NewStringRangeIndex(idx interface {
Range(lo, hi string) *roaring64.Bitmap
}) *StringRangeIndex
NewStringRangeIndex constructs a StringRangeIndex.
func (*StringRangeIndex) RangeBitmap ¶
func (r *StringRangeIndex) RangeBitmap(lo, hi expr.Value) *roaring64.Bitmap
RangeBitmap implements [rangeLookup].
type TargetEvalFn ¶
TargetEvalFn evaluates a DELETE / DETACH DELETE target expression against the current input row and returns the resolved value. The exec operator inspects the value: NodeValue / IntegerValue selects the node by ID; RelationshipValue selects the relationship; null is a row-passthrough no-op (matches openCypher 9 §3.5.8).
type Top ¶
type Top struct {
// contains filtered or unexported fields
}
Top is a blocking Volcano operator that emits the N smallest rows (per the given sort keys) from its child, using a bounded heap for O(M log N) memory and time.
Top is NOT safe for concurrent use.
func NewTop ¶
NewTop creates a Top operator.
- child: the upstream operator to consume.
- keys: ORDER BY specification. Must not be empty.
- n: number of rows to return. Must be ≥ 1.
type Union ¶
type Union struct {
// contains filtered or unexported fields
}
Union emits the set-union of left and right: all rows from both sides with duplicates removed. It is implemented as UnionAll wrapped in a Distinct operator.
Schema mismatch detection is inherited from UnionAll.
Union is NOT safe for concurrent use.
func NewUnion ¶
NewUnion creates a Union operator that deduplicates the concatenation of left and right.
- maxDistinct: upper bound on distinct rows; pass 0 to use DefaultMaxDistinct.
type UnionAll ¶
type UnionAll struct {
// contains filtered or unexported fields
}
UnionAll is a Volcano operator that concatenates the output of left and right children without deduplication. It validates that both sides produce rows of the same width (column count).
UnionAll is NOT safe for concurrent use.
func NewUnionAll ¶
NewUnionAll creates a UnionAll operator that concatenates left then right.
func (*UnionAll) Close ¶
Close closes both child operators. Both are always attempted; if both fail the errors are joined.
type Unwind ¶
type Unwind struct {
// contains filtered or unexported fields
}
Unwind is a Volcano pipeline operator that implements the UNWIND clause. For each input row it evaluates a list expression and emits one output row per list element, appending the element value as a new column.
Unwind is NOT safe for concurrent use.
func NewUnwind ¶
func NewUnwind(child Operator, listFn UnwindListFn) (*Unwind, error)
NewUnwind creates an Unwind operator.
child provides the context rows. listFn is evaluated once per input row and must return the list to expand. The caller is responsible for appending the element column to the output row; Unwind handles that internally.
Both child and listFn are required: a nil argument returns the typed sentinel ErrUnwindNilChild or ErrUnwindNilListFn respectively, so callers can distinguish the cause via errors.Is. NewUnwind never panics.
func (*Unwind) Close ¶
Close releases resources and closes the child operator.
Close is idempotent within a single pipeline lifecycle: calling it more than once between two Init invocations returns nil from the second and later calls and does NOT propagate to op.child.Close again. The idempotency guard is reset by Init, so an Init→Close→Init→Close sequence still closes the child twice — once per cycle, as expected.
func (*Unwind) Init ¶
Init initialises the operator and its child. It clears all per-iteration state (curRow, curList, listIdx) and resets the idempotency guard (closed) so that Init is the exact dual of Close, allowing an operator instance to be safely re-Init'd after a previous Close.
type UnwindListFn ¶
UnwindListFn evaluates the list expression for one input row. It returns a expr.ListValue when the expression evaluates to a list, or nil/empty when there is nothing to expand.
type ValueEvalFn ¶
ValueEvalFn evaluates a SET RHS expression against the current input row and returns the resulting property value plus a flag distinguishing the "no value produced" case from the "explicit null" case. The exec operator uses null/no-value semantics to either delete (null) or no-op (no value).
type VarLengthConfig ¶
type VarLengthConfig struct {
// Direction to follow. Defaults to DirOut when zero.
Direction Direction
// EdgeType, when non-empty, restricts expansion to edges of this type.
EdgeType string
// EdgeTypeFilter maps absolute edge positions to type labels.
EdgeTypeFilter map[uint64]string
// InputCol is the column index in each input row that holds the source
// NodeID. Defaults to 0.
InputCol int
// MinHops is the minimum path length (inclusive). Must be ≥ 0.
MinHops int
// MaxHops is the maximum path length (inclusive). Must be ≥ MinHops.
// Use math.MaxInt for unbounded (not recommended without a safety cap).
MaxHops int
// MaxEdgesTraversed is the safety cap on total edge traversals per input
// row. Defaults to 1,000,000 when 0.
MaxEdgesTraversed int
// ExcludedRelCols lists column indices in the input row holding edge
// identifiers (IntegerValue or RelationshipValue) that must not be
// traversed inside this VLE step. Implements the openCypher
// no-repeated-relationships rule across distinct rel patterns within
// the same MATCH (e.g. `MATCH ()-[r:EDGE]-() MATCH (n)-[*0..1]-()-[r]
// -()-[*0..1]-(m)` — the two variable-length steps must not reuse the
// edge bound to `r`). The visited bitset is pre-populated with each
// listed column's edge position at BFS seed time.
ExcludedRelCols []int
}
VarLengthConfig carries configuration for NewVarLengthExpand.
type VarLengthExpand ¶
type VarLengthExpand struct {
// contains filtered or unexported fields
}
VarLengthExpand is a Volcano pipeline operator that performs bounded BFS variable-length expansion.
VarLengthExpand is NOT safe for concurrent use.
func NewVarLengthExpand ¶
func NewVarLengthExpand(input Operator, fwd, rev csrAdjacency, cfg *VarLengthConfig) *VarLengthExpand
NewVarLengthExpand creates a VarLengthExpand operator. cfg is read-only and taken by pointer to avoid copying the configuration struct on this hot path.
func (*VarLengthExpand) Init ¶
func (op *VarLengthExpand) Init(ctx context.Context) error
Init initialises the operator.
func (*VarLengthExpand) Next ¶
func (op *VarLengthExpand) Next(out *Row) (bool, error)
Next emits the next (inputRow... || pathEdgesAsListValue || dstNodeID) row. The path is encoded as a expr.ListValue of edge positions (IntegerValues), followed by the destination node ID as an IntegerValue.
Source Files
¶
- apply.go
- argument.go
- constraints.go
- correlated_apply.go
- create_constraint.go
- create_index.go
- create_node.go
- create_relationship.go
- cyphermorphism.go
- delete.go
- detach_delete.go
- distinct.go
- driver.go
- drop_constraint.go
- drop_index.go
- eager.go
- eager_aggregation.go
- expand.go
- filter.go
- global_aggregate_adapter.go
- index_writeback.go
- limit.go
- merge.go
- merge_relationship.go
- merge_search.go
- operator.go
- optional_expand.go
- parallel.go
- procedure_call.go
- produce_results.go
- project.go
- remove.go
- rollup_apply.go
- row.go
- scan_all.go
- scan_index_btree.go
- scan_index_hash.go
- scan_label.go
- semi_apply.go
- set.go
- set_all.go
- shortest_path.go
- single_row.go
- sort.go
- temporal_literal.go
- top.go
- union.go
- unwind.go
- varlen_expand.go
- write_graph.go