exec

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2026 License: MIT Imports: 23 Imported by: 0

Documentation

Overview

Package exec implements the Volcano-style executor for the Cypher query engine. It defines the Operator interface, the Row/RowSlab data model, and the pipeline driver Drain.

Data model

A Row is a slice of expr.Value. A RowSlab is a bounded, pooled container of pre-allocated rows used to eliminate per-row heap allocations in the hot path.

Concurrency

RowSlab is NOT safe for concurrent use. Each goroutine must obtain its own slab from NewRowSlab or from a sync.Pool managed by the caller. The exported SlabPool provides a ready-to-use pool with default capacity.

Index

Examples

Constants

View Source
const DefaultMaxDistinct = 10_000_000

DefaultMaxDistinct is the default upper bound on distinct rows tracked by the Distinct operator.

View Source
const DefaultMaxGroups = 1_000_000

DefaultMaxGroups is the default upper bound on distinct groups that EagerAggregation will hold in memory.

View Source
const DefaultMaxSortRows = 10_000_000

DefaultMaxSortRows is the default upper bound on rows that Sort holds in memory.

View Source
const DefaultMorselSize = 1024

DefaultMorselSize is the number of NodeIDs processed per worker goroutine per scheduling quantum. Sized to fill roughly one or two cache lines of work before touching a channel.

View Source
const DefaultSlabCapacity = 4096

DefaultSlabCapacity is the default maximum number of rows a RowSlab holds before returning ErrSlabOverflow. It is sized to keep a typical pipeline batch within a few cache lines.

Variables

View Source
var ErrAggMemoryExceeded = errors.New("exec: aggregation memory cap exceeded")

ErrAggMemoryExceeded is returned by EagerAggregation.Next when the number of distinct groups exceeds the configured maxGroups limit.

View Source
var ErrConstraintViolation = errors.New("exec: constraint violation")

ErrConstraintViolation is the sentinel returned (wrapped) by CheckSetProperty when a write would violate a constraint.

View Source
var ErrDeleteNodeHasRelationships = errors.New("exec: cannot delete node with existing relationships; use DETACH DELETE")

ErrDeleteNodeHasRelationships is returned when DELETE is attempted on a node that still has one or more incident relationships. Use DETACH DELETE to remove the node together with its relationships.

View Source
var ErrDistinctMemoryExceeded = errors.New("exec: distinct memory cap exceeded")

ErrDistinctMemoryExceeded is returned by Distinct.Next when the number of distinct rows seen exceeds the configured maxDistinct limit.

View Source
var ErrIndexTypeMismatch = errors.New("exec: index type mismatch")

ErrIndexTypeMismatch is returned by NodeByIndexSeek.Init when the seek value's Kind is incompatible with the index's key type.

View Source
var ErrPropertyValueIsNull = errors.New("exec: property value is null (skip)")

ErrPropertyValueIsNull is the sentinel returned by [parsePropValue] when the value is the literal "null". By openCypher semantics, assigning null to a property removes it (or never sets it for a fresh node), so callers that catch this sentinel must skip the property entirely rather than surface a parse error.

View Source
var ErrSchemaMismatch = errors.New("exec: union schema mismatch: column counts differ")

ErrSchemaMismatch is returned when the left and right operands of a UNION produce rows with different column counts.

View Source
var ErrSlabOverflow = errors.New("exec: row slab overflow")

ErrSlabOverflow is returned by RowSlab.Alloc when the slab has reached its capacity limit. Callers must flush or reset the slab before continuing.

View Source
var ErrSortMemoryExceeded = errors.New("exec: sort memory cap exceeded")

ErrSortMemoryExceeded is returned when Sort collects more than maxRows rows.

View Source
var ErrUnwindNilChild = errors.New("exec: NewUnwind requires non-nil child Operator")

ErrUnwindNilChild is returned by NewUnwind when child is nil.

View Source
var ErrUnwindNilListFn = errors.New("exec: NewUnwind requires non-nil listFn")

ErrUnwindNilListFn is returned by NewUnwind when listFn is nil.

View Source
var ErrVarLenCapExceeded = errors.New("exec: variable-length expand safety cap exceeded")

ErrVarLenCapExceeded is returned when a VarLengthExpand exceeds its configured maximum edge traversal count.

Functions

func PropMapContainsNullLiteral

func PropMapContainsNullLiteral(s string) bool

PropMapContainsNullLiteral reports whether the property-map source string contains any value that is the literal `null`. It is used by MERGE to surface the openCypher `MergeReadOwnWrites` error when a merge predicate contains a null property value — such a merge can never match its own write because null comparisons are always tri-valued false, so the engine rejects the pattern outright. Used at MERGE plan-build time; CREATE silently drops null-valued properties so it does NOT use this check.

The argument may be either a single map literal "{k: v, …}" or a larger surface form (e.g. a full pattern string "(a)-[r:T {k: null}]->(b)") in which case every embedded balanced "{...}" segment is scanned. The check splits each map at top-level commas, isolates each value substring, and reports true if any value (case-insensitively) equals the bare token `null`. Variable refs and expressions are ignored — they parse to non-literal forms.

func WithCyphermorphism

func WithCyphermorphism(relCols []int) expandOption

WithCyphermorphism returns an option that enables cyphermorphism enforcement. relCols lists the column indices in each input row that hold existing edge IDs (as expr.IntegerValue). When Expand is about to emit a row with a new edgeID, it rejects the row if edgeID equals any value already present in those columns.

Types

type AllNodesScan

type AllNodesScan struct {
	// contains filtered or unexported fields
}

AllNodesScan is a Volcano leaf operator that produces one Row per node in the graph. Each Row has a single column: an expr.IntegerValue holding the node's graph.NodeID cast to int64.

AllNodesScan is NOT safe for concurrent use.

func NewAllNodesScan

func NewAllNodesScan(g nodeWalker) *AllNodesScan

NewAllNodesScan creates an AllNodesScan over g.

func (*AllNodesScan) Close

func (op *AllNodesScan) Close() error

Close releases resources. The collected nodeIDs slice is retained (but its length zeroed) to allow reuse if Init is called again.

func (*AllNodesScan) Init

func (op *AllNodesScan) Init(ctx context.Context) error

Init collects all NodeIDs from the graph into an internal slice. The collection itself honours ctx cancellation every 4096 nodes.

func (*AllNodesScan) Next

func (op *AllNodesScan) Next(out *Row) (bool, error)

Next writes the next NodeID into out and returns (true, nil), or returns (false, nil) at end-of-stream. ctx.Err() is checked on every call.

type AllShortestPaths

type AllShortestPaths struct {
	// contains filtered or unexported fields
}

AllShortestPaths is a Volcano pipeline operator that, for each input row, finds all paths of minimum length from srcCol to dstCol and emits one output row per path.

AllShortestPaths is NOT safe for concurrent use.

func NewAllShortestPaths

func NewAllShortestPaths(input Operator, fwd, rev csrAdjacency, dir Direction, srcCol, dstCol int) *AllShortestPaths

NewAllShortestPaths creates an AllShortestPaths operator.

func (*AllShortestPaths) Close

func (op *AllShortestPaths) Close() error

Close closes the input operator.

func (*AllShortestPaths) Init

func (op *AllShortestPaths) Init(ctx context.Context) error

Init initialises the operator.

func (*AllShortestPaths) Next

func (op *AllShortestPaths) Next(out *Row) (bool, error)

Next emits one row per shortest path per input row.

type AntiSemiApply

type AntiSemiApply struct {
	// contains filtered or unexported fields
}

AntiSemiApply emits each outer row for which the inner sub-plan produces zero rows.

AntiSemiApply is NOT safe for concurrent use.

func NewAntiSemiApply

func NewAntiSemiApply(outer, inner Operator, arg *Argument) *AntiSemiApply

NewAntiSemiApply creates an AntiSemiApply operator.

  • outer is the driving (left) plan.
  • inner is the correlated (right) sub-plan whose leaf is arg.
  • arg is the Argument node seeded with each outer row before inner Init.

func (*AntiSemiApply) Close

func (op *AntiSemiApply) Close() error

Close closes the outer plan.

func (*AntiSemiApply) Init

func (op *AntiSemiApply) Init(ctx context.Context) error

Init initialises the outer plan.

func (*AntiSemiApply) Next

func (op *AntiSemiApply) Next(out *Row) (bool, error)

Next advances to the next outer row for which the inner plan has zero results.

type Apply

type Apply struct {
	// contains filtered or unexported fields
}

Apply is a Volcano pipeline operator that performs a dependent (correlated) join: for each outer row, it re-runs the inner plan seeded with that row and emits one output row per inner result.

Apply is NOT safe for concurrent use.

func NewApply

func NewApply(outer, inner Operator, arg *Argument) *Apply

NewApply creates an Apply operator.

  • outer is the left (driving) plan.
  • inner is the right (sub) plan; its leaf must be the provided arg.
  • arg is the Argument node at the root of inner; Apply seeds it before each inner Init call.

Apply takes ownership of both plans. The caller must not use outer or inner directly after calling NewApply.

func (*Apply) Close

func (op *Apply) Close() error

Close releases resources and closes both the outer and inner plans.

func (*Apply) Init

func (op *Apply) Init(ctx context.Context) error

Init initialises both the outer plan and stores ctx for subsequent Next calls. The inner plan is initialised lazily on the first outer row.

func (*Apply) Next

func (op *Apply) Next(out *Row) (bool, error)

Next advances the Apply operator:

  • If there is no current inner row available, it pulls the next outer row, seeds and re-inits the inner plan, then pulls from the inner plan.
  • Returns each combined (outer || inner) row.
  • Returns (false, nil) when the outer plan is exhausted.

type Argument

type Argument struct {
	// contains filtered or unexported fields
}

Argument is a Volcano leaf operator that emits the single outer Row injected by an Apply driver. It emits exactly one row per Init/Next cycle.

Argument is NOT safe for concurrent use.

func NewArgument

func NewArgument() *Argument

NewArgument creates an Argument operator with no initial row. The Apply driver must call [SetOuterRow] before the first Init/Next cycle.

func (*Argument) Close

func (op *Argument) Close() error

Close is a no-op; Argument holds no resources beyond the outer row reference.

func (*Argument) Init

func (op *Argument) Init(ctx context.Context) error

Init resets the emitted flag so that the next Next call returns the outer row. It does not require a child operator.

func (*Argument) Next

func (op *Argument) Next(out *Row) (bool, error)

Next emits the outer row exactly once per Init call. Subsequent calls return (false, nil) until Init is called again.

func (*Argument) SetOuterRow

func (op *Argument) SetOuterRow(row Row)

SetOuterRow injects the current outer row. It must be called by the Apply driver before each Init/Next cycle for the inner plan.

type ConstraintKind

type ConstraintKind uint8

ConstraintKind distinguishes UNIQUE from NOT_NULL constraints.

const (
	// ConstraintUnique requires that at most one node with a given label has a
	// particular value for the constrained property.
	ConstraintUnique ConstraintKind = iota
	// ConstraintNotNull requires that every node with a given label has a
	// non-null value for the constrained property.
	ConstraintNotNull
)

type ConstraintRegistry

type ConstraintRegistry struct {
	// contains filtered or unexported fields
}

ConstraintRegistry is a thread-safe registry of active constraints. It stores unique and not-null constraints keyed by "label.prop".

ConstraintRegistry is safe for concurrent use.

func NewConstraintRegistry

func NewConstraintRegistry() *ConstraintRegistry

NewConstraintRegistry creates an empty ConstraintRegistry.

func (*ConstraintRegistry) CheckSetProperty

func (r *ConstraintRegistry) CheckSetProperty(labels []string, prop string, value lpg.PropertyValue, mgr *index.Manager) error

CheckSetProperty validates that setting prop = value on a node with the given labels does not violate any registered constraint. mgr is used for unique-constraint index lookups (hash index Cardinality check) as a secondary source; the primary source is the registry's own value set.

Returns *ConstraintViolationError (which wraps ErrConstraintViolation) on the first violation found; nil when all constraints pass.

func (*ConstraintRegistry) HasNotNull

func (r *ConstraintRegistry) HasNotNull(label, prop string) bool

HasNotNull reports whether a not-null constraint exists for (label, prop).

func (*ConstraintRegistry) ListConstraintRows

func (r *ConstraintRegistry) ListConstraintRows() [][]expr.Value

ListConstraintRows returns a [][]expr.Value where each inner slice has four elements: [name, type, label, property]. The name column uses the canonical "label.prop" key; type is "UNIQUE" or "NOT_NULL". Rows are returned in deterministic lexicographic order.

ListConstraintRows is safe for concurrent use.

func (*ConstraintRegistry) RecordPropertySet

func (r *ConstraintRegistry) RecordPropertySet(labels []string, prop string, value lpg.PropertyValue)

RecordPropertySet records that a property value has been successfully written to a node with the given labels. This keeps the unique value sets up-to-date so that subsequent CheckSetProperty calls detect violations. It is a no-op when no unique constraint exists for (label, prop).

func (*ConstraintRegistry) RegisterNotNull

func (r *ConstraintRegistry) RegisterNotNull(label, prop string)

RegisterNotNull adds a not-null constraint for (label, prop).

func (*ConstraintRegistry) RegisterUnique

func (r *ConstraintRegistry) RegisterUnique(label, prop, indexName string)

RegisterUnique adds a unique constraint for (label, prop) backed by indexName in the index.Manager.

func (*ConstraintRegistry) UniqueIndexName

func (r *ConstraintRegistry) UniqueIndexName(label, prop string) (string, bool)

UniqueIndexName returns the backing index name for a unique constraint on (label, prop), or ("", false) if none exists.

func (*ConstraintRegistry) UnregisterNotNull

func (r *ConstraintRegistry) UnregisterNotNull(label, prop string)

UnregisterNotNull removes the not-null constraint for (label, prop). No-op if absent.

func (*ConstraintRegistry) UnregisterUnique

func (r *ConstraintRegistry) UnregisterUnique(label, prop string)

UnregisterUnique removes the unique constraint for (label, prop). No-op if absent.

type ConstraintViolationError

type ConstraintViolationError struct {
	// Label is the node label the constraint is defined on.
	Label string
	// Property is the constrained property key.
	Property string
	// Kind describes the type of constraint: "UNIQUE" or "NOT NULL".
	Kind string
	// Detail is an optional human-readable explanation.
	Detail string
}

ConstraintViolationError carries structured context about which constraint was violated.

func (*ConstraintViolationError) Error

func (e *ConstraintViolationError) Error() string

Error implements the error interface.

func (*ConstraintViolationError) Unwrap

func (e *ConstraintViolationError) Unwrap() error

Unwrap chains to ErrConstraintViolation so callers can use errors.Is.

type CorrelatedApply

type CorrelatedApply struct {
	// contains filtered or unexported fields
}

CorrelatedApply is a Volcano pipeline operator that performs a dependent (correlated) join with the convention that the inner pipeline begins with an Argument leaf re-emitting the outer row. The inner row is forwarded verbatim as the operator's output; no concatenation with the outer row is performed (the outer columns are already present in the inner row).

CorrelatedApply is NOT safe for concurrent use.

func NewCorrelatedApply

func NewCorrelatedApply(outer, inner Operator, arg *Argument) *CorrelatedApply

NewCorrelatedApply creates a CorrelatedApply operator.

  • outer is the left (driving) plan.
  • inner is the right (sub) plan whose leftmost leaf is the provided arg.
  • arg is the Argument node at the inner leaf; CorrelatedApply seeds it before each inner Init call so that the inner pipeline observes the current outer row.

CorrelatedApply takes ownership of both plans. The caller must not use outer or inner directly after calling NewCorrelatedApply.

func (*CorrelatedApply) Close

func (op *CorrelatedApply) Close() error

Close releases resources and closes both the outer and inner plans.

func (*CorrelatedApply) Init

func (op *CorrelatedApply) Init(ctx context.Context) error

Init initialises the outer plan and stores ctx for subsequent Next calls. The inner plan is initialised lazily on the first outer row.

func (*CorrelatedApply) Next

func (op *CorrelatedApply) Next(out *Row) (bool, error)

Next advances the CorrelatedApply operator. The inner row is forwarded verbatim; CorrelatedApply does not concatenate the outer row because the inner row already carries the outer columns (the inner pipeline's leaf Argument re-emitted them).

type CreateConstraintOp

type CreateConstraintOp struct {
	// contains filtered or unexported fields
}

CreateConstraintOp is a Volcano DDL operator that registers a constraint.

CreateConstraintOp is NOT safe for concurrent use.

func NewCreateConstraintOp

func NewCreateConstraintOp(
	name, label, prop string,
	kind ConstraintKind,
	ifNotExists bool,
	mgr *index.Manager,
	reg *ConstraintRegistry,
	onSchemaChange func(),
) *CreateConstraintOp

NewCreateConstraintOp creates a CreateConstraintOp. onSchemaChange, when non-nil, is invoked exactly once after the operator successfully registers a new constraint — i.e. NOT when the IF NOT EXISTS branch silently absorbs an already-registered constraint. The Engine wires e.ClearPlanCache as onSchemaChange so cached plans are invalidated after a real schema mutation.

func (*CreateConstraintOp) Close

func (op *CreateConstraintOp) Close() error

Close implements Operator.

func (*CreateConstraintOp) Init

func (op *CreateConstraintOp) Init(ctx context.Context) error

Init implements Operator.

func (*CreateConstraintOp) Next

func (op *CreateConstraintOp) Next(_ *Row) (bool, error)

Next implements Operator. Performs the CREATE CONSTRAINT side effect on the first call, then signals end-of-stream.

type CreateIndexOp

type CreateIndexOp struct {
	// contains filtered or unexported fields
}

CreateIndexOp is a Volcano DDL operator that registers a new secondary index.

CreateIndexOp is NOT safe for concurrent use.

func NewCreateIndexOp

func NewCreateIndexOp(
	name string,
	kind IndexKindExec,
	ifNotExists bool,
	mgr *index.Manager,
	onSchemaChange func(),
) *CreateIndexOp

NewCreateIndexOp creates a CreateIndexOp. onSchemaChange, when non-nil, is invoked exactly once after the operator successfully creates a new index in mgr — i.e. NOT when the IF NOT EXISTS branch silently absorbs a duplicate. The Engine wires e.ClearPlanCache as onSchemaChange so cached plans are invalidated after a real schema mutation.

func (*CreateIndexOp) Close

func (op *CreateIndexOp) Close() error

Close implements Operator.

func (*CreateIndexOp) Init

func (op *CreateIndexOp) Init(ctx context.Context) error

Init implements Operator.

func (*CreateIndexOp) Next

func (op *CreateIndexOp) Next(_ *Row) (bool, error)

Next implements Operator. It performs the CREATE INDEX side effect on the first call, then signals end-of-stream. Returns (false, nil) immediately on subsequent calls.

type CreateNode

type CreateNode struct {
	// contains filtered or unexported fields
}

CreateNode creates a new graph node per input row, sets its labels and properties, and appends the new NodeID as a new column.

CreateNode is NOT safe for concurrent use.

func NewCreateNode

func NewCreateNode(
	nodeVar string,
	labels []string,
	properties string,
	child Operator,
	mutator GraphMutator,
) (*CreateNode, error)

NewCreateNode creates a CreateNode operator.

nodeVar is the variable name bound to the new node (may be empty if the node is not referenced downstream). labels is the ordered list of labels to attach. properties is the opaque literal property-map string (e.g. `{name: "Alice"}`) produced by the IR translator; it is parsed once during construction. mutator is the graph write surface.

func (*CreateNode) Close

func (op *CreateNode) Close() error

Close closes the child operator.

func (*CreateNode) Init

func (op *CreateNode) Init(ctx context.Context) error

Init initialises the operator and its child.

The first CreateNode.Init in the process also seeds [globalNodeCounter] past the largest synthetic key currently interned in op.mutator, so that node keys generated in this process cannot collide with keys persisted by an earlier process and replayed during WAL / snapshot recovery. The seed is gated by [globalNodeCounterSeededOnce] so the scan runs at most once per process regardless of how many CreateNode operators are created.

func (*CreateNode) Next

func (op *CreateNode) Next(out *Row) (bool, error)

Next pulls one row from the child, creates a node, and appends the NodeID column. Returns (true, nil) when a row was produced, (false, nil) at end-of-stream, (false, err) on error.

func (*CreateNode) WithConstraints

func (op *CreateNode) WithConstraints(reg *ConstraintRegistry, mgr *index.Manager) *CreateNode

WithConstraints attaches a ConstraintRegistry and index.Manager to the operator for pre-write enforcement. Both must be non-nil. Returns op for chaining.

func (*CreateNode) WithParams

func (op *CreateNode) WithParams(params map[string]expr.Value) (*CreateNode, error)

WithParams attaches query parameters for $name substitution in property expressions. Re-parses the property map with the supplied params. Returns op for chaining.

func (*CreateNode) WithPropsEvalFn

func (op *CreateNode) WithPropsEvalFn(fn PropsEvalFn) *CreateNode

WithPropsEvalFn attaches a per-row property evaluator. When fn is non-nil it is called on every Next invocation and its results are merged with the statically parsed props (literal values). Dynamic results take precedence over same-keyed literal values, allowing the property map to contain a mix of literals and expression-valued entries.

type CreateRelationship

type CreateRelationship struct {
	// contains filtered or unexported fields
}

CreateRelationship creates a new directed edge per input row between two already-bound nodes.

CreateRelationship is NOT safe for concurrent use.

func NewCreateRelationship

func NewCreateRelationship(
	startVar, endVar, relVar, relType, properties string,
	schema map[string]int,
	child Operator,
	mutator GraphMutator,
) (*CreateRelationship, error)

NewCreateRelationship creates a CreateRelationship operator.

startVar and endVar are the variable names (column indices are looked up in schema) of the source and destination nodes. relVar is the variable name bound to the new relationship (may be empty). relType is the relationship type label. properties is the opaque literal property-map string. schema maps currently bound variable names to their column indices.

func (*CreateRelationship) Close

func (op *CreateRelationship) Close() error

Close closes the child operator.

func (*CreateRelationship) Init

func (op *CreateRelationship) Init(ctx context.Context) error

Init initialises the operator and its child.

func (*CreateRelationship) Next

func (op *CreateRelationship) Next(out *Row) (bool, error)

Next pulls one row from the child, resolves the endpoint NodeIDs, creates the edge, and appends an optional RelationshipValue column.

func (*CreateRelationship) WithParams

func (op *CreateRelationship) WithParams(params map[string]expr.Value) (*CreateRelationship, error)

WithParams re-parses the property map with the supplied query parameters for $name substitution. Returns op for chaining.

func (*CreateRelationship) WithPropsEvalFn

func (op *CreateRelationship) WithPropsEvalFn(fn PropsEvalFn) *CreateRelationship

WithPropsEvalFn attaches a per-row property evaluator. See CreateNode.WithPropsEvalFn.

type DeleteNode

type DeleteNode struct {
	// contains filtered or unexported fields
}

DeleteNode deletes an already-bound node (labels + properties stripped) from the graph, provided it has no incident relationships.

DeleteNode is NOT safe for concurrent use.

func NewDeleteNode

func NewDeleteNode(
	nodeVar string,
	schema map[string]int,
	child Operator,
	mutator GraphMutator,
) *DeleteNode

NewDeleteNode creates a DeleteNode operator.

func (*DeleteNode) Close

func (op *DeleteNode) Close() error

Close closes the child operator.

func (*DeleteNode) Init

func (op *DeleteNode) Init(ctx context.Context) error

Init initialises the operator and its child.

func (*DeleteNode) Next

func (op *DeleteNode) Next(out *Row) (bool, error)

Next pulls one row from the child and deletes the bound node.

func (*DeleteNode) WithRelEndpoints

func (op *DeleteNode) WithRelEndpoints(fn RelEndpointFn) *DeleteNode

WithRelEndpoints attaches a per-row lookup that returns the (srcID, dstID) endpoints of the edge identified by the bare-variable target. When set AND the schema-direct slot holds an IntegerValue (the in-pipeline edge-id encoding emitted by Expand), the operator dispatches to the edge-removal path instead of treating the integer as a NodeID.

func (*DeleteNode) WithTargetEvalFn

func (op *DeleteNode) WithTargetEvalFn(fn TargetEvalFn) *DeleteNode

WithTargetEvalFn attaches a per-row evaluator for non-variable DELETE targets (subscripts, property access, …). When set, the operator resolves the target value via the evaluator instead of the schema lookup keyed by nodeVar.

type DeleteRelationship

type DeleteRelationship struct {
	// contains filtered or unexported fields
}

DeleteRelationship removes a directed edge per input row.

DeleteRelationship is NOT safe for concurrent use.

func NewDeleteRelationship

func NewDeleteRelationship(
	relVar string,
	schema map[string]int,
	child Operator,
	mutator GraphMutator,
) *DeleteRelationship

NewDeleteRelationship creates a DeleteRelationship operator.

func (*DeleteRelationship) Close

func (op *DeleteRelationship) Close() error

Close closes the child operator.

func (*DeleteRelationship) Init

func (op *DeleteRelationship) Init(ctx context.Context) error

Init initialises the operator and its child.

func (*DeleteRelationship) Next

func (op *DeleteRelationship) Next(out *Row) (bool, error)

Next pulls one row from the child and removes the bound relationship.

type DetachDelete

type DetachDelete struct {
	// contains filtered or unexported fields
}

DetachDelete removes all incident edges from a node and then strips the node's labels and properties.

DetachDelete is NOT safe for concurrent use.

func NewDetachDelete

func NewDetachDelete(
	nodeVar string,
	schema map[string]int,
	child Operator,
	mutator GraphMutator,
) *DetachDelete

NewDetachDelete creates a DetachDelete operator.

func (*DetachDelete) Close

func (op *DetachDelete) Close() error

Close closes the child operator.

func (*DetachDelete) Init

func (op *DetachDelete) Init(ctx context.Context) error

Init initialises the operator and its child.

func (*DetachDelete) Next

func (op *DetachDelete) Next(out *Row) (bool, error)

Next pulls one row from the child, removes all incident edges of the bound node, then strips the node's labels and properties.

func (*DetachDelete) WithTargetEvalFn

func (op *DetachDelete) WithTargetEvalFn(fn TargetEvalFn) *DetachDelete

WithTargetEvalFn attaches a per-row evaluator for non-variable DETACH DELETE targets (subscripts, property access, …).

type Direction

type Direction uint8

Direction controls which edges Expand follows.

const (
	// DirOut follows only out-edges of the source node.
	DirOut Direction = iota + 1
	// DirIn follows only in-edges (reverse edges).
	DirIn
	// DirBoth follows both out-edges and in-edges.
	DirBoth
)

type Distinct

type Distinct struct {
	// contains filtered or unexported fields
}

Distinct is a streaming Volcano operator that emits each unique row exactly once. It maintains a hash set of seen rows; hash collisions are resolved by full equality checks.

Distinct is NOT safe for concurrent use.

func NewDistinct

func NewDistinct(child Operator, maxDistinct int) *Distinct

NewDistinct creates a Distinct operator.

  • child: the upstream operator.
  • maxDistinct: upper bound on distinct rows; pass 0 to use DefaultMaxDistinct.

func (*Distinct) Close

func (op *Distinct) Close() error

Close closes the child operator and releases internal state.

func (*Distinct) Init

func (op *Distinct) Init(ctx context.Context) error

Init initialises the operator and resets the deduplication state.

func (*Distinct) Next

func (op *Distinct) Next(out *Row) (bool, error)

Next pulls rows from the child and emits the first occurrence of each unique row. Duplicate rows are silently discarded. Returns ErrDistinctMemoryExceeded if more than maxDistinct distinct rows are encountered.

type DropConstraintOp

type DropConstraintOp struct {
	// contains filtered or unexported fields
}

DropConstraintOp is a Volcano DDL operator that deregisters a constraint.

DropConstraintOp is NOT safe for concurrent use.

func NewDropConstraintOp

func NewDropConstraintOp(
	name, label, prop string,
	kind ConstraintKind,
	ifExists bool,
	mgr *index.Manager,
	reg *ConstraintRegistry,
	onSchemaChange func(),
) *DropConstraintOp

NewDropConstraintOp creates a DropConstraintOp. onSchemaChange, when non-nil, is invoked exactly once after the operator successfully removes a constraint — i.e. NOT when the IF EXISTS branch silently absorbs an absent-constraint condition. The Engine wires e.ClearPlanCache as onSchemaChange so cached plans are invalidated after a real schema mutation.

func (*DropConstraintOp) Close

func (op *DropConstraintOp) Close() error

Close implements Operator.

func (*DropConstraintOp) Init

func (op *DropConstraintOp) Init(ctx context.Context) error

Init implements Operator.

func (*DropConstraintOp) Next

func (op *DropConstraintOp) Next(_ *Row) (bool, error)

Next implements Operator. Performs the DROP CONSTRAINT side effect on the first call, then signals end-of-stream.

type DropIndexOp

type DropIndexOp struct {
	// contains filtered or unexported fields
}

DropIndexOp is a Volcano DDL operator that deregisters a secondary index.

DropIndexOp is NOT safe for concurrent use.

func NewDropIndexOp

func NewDropIndexOp(
	name string,
	ifExists bool,
	mgr *index.Manager,
	onSchemaChange func(),
) *DropIndexOp

NewDropIndexOp creates a DropIndexOp. onSchemaChange, when non-nil, is invoked exactly once after the operator successfully drops the index — i.e. NOT when the IF EXISTS branch silently absorbs a missing-index error. The Engine wires e.ClearPlanCache as onSchemaChange so cached plans are invalidated after a real schema mutation.

func (*DropIndexOp) Close

func (op *DropIndexOp) Close() error

Close implements Operator.

func (*DropIndexOp) Init

func (op *DropIndexOp) Init(ctx context.Context) error

Init implements Operator.

func (*DropIndexOp) Next

func (op *DropIndexOp) Next(_ *Row) (bool, error)

Next implements Operator. It performs the DROP INDEX side effect on the first call, then signals end-of-stream.

type Eager

type Eager struct {
	// contains filtered or unexported fields
}

Eager is a pipeline-breaking barrier. Init drains every row from the child into an internal buffer; Next then re-emits the buffered rows in insertion order.

func NewEager

func NewEager(child Operator) *Eager

NewEager wraps child in an Eager barrier.

func (*Eager) Close

func (op *Eager) Close() error

Close releases the buffer and closes the child.

func (*Eager) Init

func (op *Eager) Init(ctx context.Context) error

Init initialises the operator AND drains every child row into the buffer so the downstream pipeline can apply LIMIT / SKIP / DISTINCT without starving any write-side child of its driving rows.

func (*Eager) Next

func (op *Eager) Next(out *Row) (bool, error)

Next emits the next buffered row.

type EagerAggregation

type EagerAggregation struct {
	// contains filtered or unexported fields
}

EagerAggregation is a blocking (pipeline-breaking) Volcano operator that groups rows from its child by the specified key columns and applies per-group aggregators. It emits one output row per group once the child is exhausted.

EagerAggregation is NOT safe for concurrent use.

func NewEagerAggregation

func NewEagerAggregation(
	child Operator,
	keyCols []int,
	aggFactories []funcs.AggregatorFactory,
	maxGroups int,
) (*EagerAggregation, error)

NewEagerAggregation creates an EagerAggregation operator.

  • child: the upstream operator to consume.
  • keyCols: column indices whose values define the group key. An empty slice computes a single global aggregate.
  • aggFactories: one AggregatorFactory per aggregate expression. Must not be empty.
  • maxGroups: upper bound on distinct groups; pass 0 to use DefaultMaxGroups.

func (*EagerAggregation) Close

func (op *EagerAggregation) Close() error

Close closes the child operator and releases internal state.

func (*EagerAggregation) Init

func (op *EagerAggregation) Init(ctx context.Context) error

Init initialises the operator. The blocking consume phase is deferred to the first Next call.

func (*EagerAggregation) Next

func (op *EagerAggregation) Next(out *Row) (bool, error)

Next emits the next aggregated row. On the first call it consumes all rows from the child (pipeline breaker) and builds the group table. Subsequent calls iterate through the completed groups.

type Expand

type Expand struct {
	// contains filtered or unexported fields
}

Expand is a Volcano pipeline operator that, for each input row, expands one hop along the graph's CSR adjacency.

Expand is NOT safe for concurrent use.

func NewExpand

func NewExpand(input Operator, fwd, rev csrAdjacency, cfg ExpandConfig) *Expand

NewExpand creates an Expand operator. fwd is the forward CSR; rev is the reverse CSR (required for DirIn/DirBoth, ignored for DirOut).

func NewExpandWithOptions

func NewExpandWithOptions(input Operator, fwd, rev csrAdjacency, cfg ExpandConfig, options ...expandOption) *Expand

NewExpandWithOptions creates an Expand operator with optional extra configuration applied via functional options (e.g. WithCyphermorphism).

All ExpandConfig fields behave identically to NewExpand. Options are applied after base construction and augment the operator's behaviour without altering its public type.

func (*Expand) Close

func (op *Expand) Close() error

Close releases resources and closes the child operator.

func (*Expand) Init

func (op *Expand) Init(ctx context.Context) error

Init initialises the operator and its child.

func (*Expand) Next

func (op *Expand) Next(out *Row) (bool, error)

Next emits the next (srcID, edgeID, dstID) triplet appended to the current input row. It pulls a new input row whenever the current source's adjacency is exhausted.

type ExpandConfig

type ExpandConfig struct {
	// Direction to follow. Defaults to DirOut when zero.
	Direction Direction
	// EdgeType, when non-empty, restricts emitted edges to those whose
	// positional index is present in EdgeTypeFilter with this type label.
	EdgeType string
	// EdgeTypeFilter maps absolute edge positions to type labels.  Required
	// when EdgeType is non-empty.
	EdgeTypeFilter map[uint64]string
	// InputCol is the column index in each input row that holds the source
	// NodeID (as expr.IntegerValue).  Defaults to 0.
	InputCol int
	// RelCols lists the input-row columns holding edge IDs already traversed
	// by sibling Expand operators in the same MATCH pattern. Each emitted
	// edge must NOT match any of these columns (openCypher 9 §3.2.2
	// relationship-isomorphism / cyphermorphism). Empty disables the
	// check.
	RelCols []int
	// MultiplicityFn returns the Cypher CREATE-call multiplicity recorded
	// for the directed edge (srcID, dstID). When the returned count is N >
	// 1, the operator emits the corresponding output row N times in a row,
	// reflecting the openCypher rule that `MATCH ()-[r]->()` enumerates
	// each CREATE call separately even when the underlying simple-graph
	// storage collapsed them to one entry (Merge5 [21]). A nil fn (or
	// returning 0 / 1) disables the multiplicity emit and behaves like a
	// plain single-row Expand.
	MultiplicityFn func(srcID, dstID uint64) int64
}

ExpandConfig carries the optional configuration for NewExpand.

type Filter

type Filter struct {
	// contains filtered or unexported fields
}

Filter is a Volcano pipeline operator that applies a FilterFn predicate to each row produced by its child operator. It emits only rows for which the predicate returns expr.BoolValue(true).

Null and false both suppress the row (three-valued logic).

Filter is NOT safe for concurrent use.

func NewFilter

func NewFilter(child Operator, predFn FilterFn) *Filter

NewFilter creates a Filter operator that wraps child and applies predFn to every row.

func (*Filter) Close

func (op *Filter) Close() error

Close releases resources and closes the child operator.

func (*Filter) Init

func (op *Filter) Init(ctx context.Context) error

Init initialises the operator and its child.

func (*Filter) Next

func (op *Filter) Next(out *Row) (bool, error)

Next advances to the next row that passes the predicate. It pulls rows from the child until one satisfies the predicate, end-of-stream, or an error.

type FilterFn

type FilterFn func(row Row) (expr.Value, error)

FilterFn is a predicate over a Row. It must return (BoolValue(true), nil) to accept the row. Any other non-error return value (including NULL and BoolValue(false)) causes the row to be dropped. An error halts the pipeline.

type GlobalAggregateAdapter

type GlobalAggregateAdapter struct {
	// contains filtered or unexported fields
}

GlobalAggregateAdapter wraps an EagerAggregation operator that has no group-by keys and ensures the output stream contains exactly one row even when the child is empty.

GlobalAggregateAdapter is NOT safe for concurrent use.

func NewGlobalAggregateAdapter

func NewGlobalAggregateAdapter(child Operator, aggFactories []funcs.AggregatorFactory) *GlobalAggregateAdapter

NewGlobalAggregateAdapter returns a GlobalAggregateAdapter that wraps child. aggFactories must contain one factory per aggregate column, in the same order as the columns produced by child. The factories are invoked exactly once, on empty input, to synthesise the neutral row.

Note: the adapter does not validate that child is a group-by-less EagerAggregation; callers are responsible for that. Wrapping a grouped aggregation has no effect because grouped aggregations always emit either zero (no input) or N rows (one per group), and the empty-input case for a grouped aggregation is correct as is.

func (*GlobalAggregateAdapter) Close

func (op *GlobalAggregateAdapter) Close() error

Close releases the adapter's child.

func (*GlobalAggregateAdapter) Init

Init initialises the adapter and its child.

func (*GlobalAggregateAdapter) Next

func (op *GlobalAggregateAdapter) Next(out *Row) (bool, error)

Next forwards child rows unchanged. When the child reports end-of-stream without ever producing a row, Next emits exactly one synthetic row built from the neutral results of each aggregate factory before signalling exhaustion itself.

type GraphMutator

type GraphMutator interface {
	// AddNode interns n and returns its stable NodeID. Returns the
	// error from the underlying graph implementation (currently only
	// [adjlist.ErrShardFull] is reachable, and only when the
	// underlying [adjlist.Config.MaxShardCapacity] is set).
	AddNode(n string) (graph.NodeID, error)

	// AddEdge inserts a directed edge (src→dst) with weight w, interning
	// endpoints as needed. Returns the stable NodeIDs of src and dst and
	// any error from the underlying graph implementation.
	AddEdge(src, dst string, w float64) (srcID, dstID graph.NodeID, err error)

	// AddEdgeH is [GraphMutator.AddEdge] with a stable per-edge handle
	// allocated by the underlying graph and stamped onto the adjacency
	// slot. The returned handle keys the *ByHandle metadata setters below
	// so a parallel CREATE's type and properties are resolvable on the
	// read path by an identity that survives sibling-edge deletion. Used
	// by CreateRelationship; MERGE keeps using AddEdge (its read path
	// falls back to the per-pair union). The handle is always non-zero on
	// success.
	AddEdgeH(src, dst string, w float64) (srcID, dstID graph.NodeID, handle uint64, err error)

	// RemoveEdge removes the directed edge from src to dst (no-op if absent).
	RemoveEdge(src, dst string)

	// SetNodeLabel attaches label to n (inserting n if absent). Returns
	// any error from the underlying [adjlist.AdjList.AddNode] (see
	// [GraphMutator.AddNode]).
	SetNodeLabel(n, label string) error

	// RemoveNodeLabel detaches label from n (no-op if absent).
	RemoveNodeLabel(n, label string)

	// RemoveNode tombstones n in the underlying graph so subsequent reads
	// (AllNodesScan, count(*), Order) treat the node as absent. Callers
	// should strip labels/properties/incident edges before invoking
	// RemoveNode so the tombstone reflects the fully-deleted state.
	RemoveNode(n string)

	// IsTombstoned reports whether the NodeID has been tombstoned. Used by
	// AllNodesScan to skip phantom entries the Mapper still indexes.
	IsTombstoned(id graph.NodeID) bool

	// SetNodeProperty sets the named property on n. Returns any error
	// from the underlying [adjlist.AdjList.AddNode] (see
	// [GraphMutator.AddNode]).
	SetNodeProperty(n, key string, value lpg.PropertyValue) error

	// DelNodeProperty removes the named property from n (no-op if absent).
	DelNodeProperty(n, key string)

	// NodeProperties returns a snapshot of all properties currently on n.
	NodeProperties(n string) map[string]lpg.PropertyValue

	// NodeLabels returns a snapshot of all labels currently on n in
	// unspecified order.
	NodeLabels(n string) []string

	// HasEdge reports whether a directed edge from src to dst is present.
	HasEdge(src, dst string) bool

	// SetEdgeLabel attaches label to the directed edge (src, dst).
	SetEdgeLabel(src, dst, label string)

	// SetEdgeProperty sets the named property on the directed edge (src, dst).
	// Returns any error from the underlying graph (e.g. schema violation).
	SetEdgeProperty(src, dst, key string, value lpg.PropertyValue) error

	// DelEdgeProperty removes the named property from the directed edge
	// (src, dst) (no-op if absent).
	DelEdgeProperty(src, dst, key string)

	// EdgeProperties returns a snapshot of every property currently set on
	// the directed edge (src, dst). Returns an empty map when the edge has
	// no properties or does not exist.
	EdgeProperties(src, dst string) map[string]lpg.PropertyValue

	// EdgeLabels returns a snapshot of every label currently attached to
	// the directed edge (src, dst). Returns an empty slice when the edge
	// has no labels or does not exist. Used by DELETE r to capture the
	// relationship type before tombstoning the edge so the row's
	// post-delete RelationshipValue keeps `RETURN type(r)` working.
	EdgeLabels(src, dst string) []string

	// IncEdgeCreateCount bumps the Cypher CREATE-call multiplicity
	// counter for the directed edge (src, dst) by one and returns the
	// new (1-based) value. The counter records how many CREATE
	// statements have targeted the same endpoint pair regardless of
	// whether the underlying storage already had an entry — MERGE
	// consults it to emit multiplicity rows when an existing edge
	// satisfies the merge pattern (Merge5 [3]). The returned index is
	// the per-instance idx callers pass to the *At family of
	// metadata-write helpers.
	IncEdgeCreateCount(src, dst string) int64
	// EdgeCreateCount returns the current CREATE-call multiplicity
	// counter for the directed edge (src, dst), or 0 when no CREATE
	// has been recorded.
	EdgeCreateCount(src, dst string) int64
	// DecEdgeCreateCount decrements the CREATE-call multiplicity
	// counter (floor 0). Called by DELETE so subsequent MERGEs see
	// the correct multiplicity.
	DecEdgeCreateCount(src, dst string)
	// SetEdgeLabelAt attaches `label` to the directed edge instance
	// (src, dst) at the supplied 1-based CREATE index. Used by
	// CreateRelationship so parallel CREATEs of the same endpoint
	// pair retain their distinct labels (Match2 [6] / Match7 [29]).
	SetEdgeLabelAt(src, dst string, idx int64, label string)
	// EdgeLabelsAt returns the labels recorded at instance `idx` of
	// the directed edge (src, dst), or nil when the instance has no
	// per-CREATE labels.
	EdgeLabelsAt(src, dst string, idx int64) []string
	// SetEdgePropertyAt records `key`=`value` on the directed edge
	// instance (src, dst) at the supplied 1-based CREATE index.
	SetEdgePropertyAt(src, dst string, idx int64, key string, value lpg.PropertyValue)
	// EdgePropertiesAt returns the property map recorded at instance
	// `idx` of the directed edge (src, dst), or nil when no
	// per-CREATE map was captured.
	EdgePropertiesAt(src, dst string, idx int64) map[string]lpg.PropertyValue
	// RemoveEdgeInstance drops every per-CREATE label and property
	// associated with (src, dst) at `idx`. Used by DELETE to discard
	// a specific logical edge while leaving sibling instances
	// untouched.
	RemoveEdgeInstance(src, dst string, idx int64)

	// SetEdgeLabelByHandle attaches `label` to the edge identified by the
	// stable `handle` on the (src, dst) pair (see [GraphMutator.AddEdgeH]).
	// The handle-keyed analogue of SetEdgeLabelAt; the read path resolves a
	// parallel CREATE's type by this identity instead of a positional CSR
	// index. No-op when handle is 0.
	SetEdgeLabelByHandle(src, dst string, handle uint64, label string)
	// EdgeLabelsByHandle returns the labels recorded for the edge
	// identified by `handle` on the (src, dst) pair, or nil when none.
	EdgeLabelsByHandle(src, dst string, handle uint64) []string
	// SetEdgePropertyByHandle records `key`=`value` on the edge identified
	// by the stable `handle` on the (src, dst) pair. No-op when handle is 0.
	SetEdgePropertyByHandle(src, dst string, handle uint64, key string, value lpg.PropertyValue)
	// EdgePropertiesByHandle returns the property map recorded for the edge
	// identified by `handle` on the (src, dst) pair, or nil when none.
	EdgePropertiesByHandle(src, dst string, handle uint64) map[string]lpg.PropertyValue
	// RemoveEdgeInstanceByHandle drops every per-handle label and property
	// associated with (src, dst) at `handle`. Used by DELETE to discard a
	// specific logical edge while leaving sibling handles untouched.
	RemoveEdgeInstanceByHandle(src, dst string, handle uint64)

	// OutNeighbours returns the outgoing neighbour node keys of n as a
	// snapshot slice. Callers must not mutate the returned slice.
	OutNeighbours(n string) []string

	// InNeighbours returns the incoming neighbour node keys of n as a
	// snapshot slice. This requires a full graph walk for directed adjacency
	// lists that do not maintain a reverse index. Callers must not mutate the
	// returned slice.
	InNeighbours(n string) []string

	// OutDegree returns the number of outgoing edges from n.
	OutDegree(n string) int

	// ResolveNodeID translates a user-facing node key to its internal NodeID,
	// returning ok=false when the node has not been interned yet.
	ResolveNodeID(n string) (graph.NodeID, bool)

	// ResolveNodeLabel translates an internal NodeID back to the user-facing
	// node key, returning ok=false when id is unknown.
	ResolveNodeLabel(id graph.NodeID) (string, bool)

	// WalkNodeIDs calls fn for every node currently interned in the graph.
	WalkNodeIDs(fn func(graph.NodeID) bool)
}

GraphMutator is the write surface exposed to Cypher write operators.

All methods accept the user-facing node key (string) used by the lpg.Graph[string,float64] instantiation. The graph is responsible for interning the value and returning the stable internal NodeID where applicable.

GraphMutator is NOT safe for concurrent use from multiple goroutines; each physical operator tree owns exactly one instance.

type IndexBuffer

type IndexBuffer struct {
	// contains filtered or unexported fields
}

IndexBuffer collects index.Change events produced by write operators during a single write transaction.

Call Enqueue for every graph mutation. At the transaction boundary:

  • Commit: fans changes to index.Manager.ApplyBatch then resets.
  • Rollback: discards changes without touching indexes.

IndexBuffer is NOT safe for concurrent use.

func (*IndexBuffer) Commit

func (b *IndexBuffer) Commit(mgr *index.Manager)

Commit applies all buffered changes to mgr via ApplyBatch, then resets the buffer. A nil mgr is safe: changes are discarded without panicking.

func (*IndexBuffer) Enqueue

func (b *IndexBuffer) Enqueue(c index.Change)

Enqueue appends c to the buffer.

func (*IndexBuffer) Len

func (b *IndexBuffer) Len() int

Len returns the number of changes currently buffered.

func (*IndexBuffer) Rollback

func (b *IndexBuffer) Rollback()

Rollback discards all buffered changes without applying them.

type IndexKindExec

type IndexKindExec uint8

IndexKindExec distinguishes hash vs. btree in the exec layer.

const (
	// ExecIndexHash creates a hash.Index[string].
	ExecIndexHash IndexKindExec = iota
	// ExecIndexBTree creates a btree.Index[string].
	ExecIndexBTree
)

type Int64HashIndex

type Int64HashIndex struct {
	// contains filtered or unexported fields
}

Int64HashIndex adapts hash.Index[int64] to the [hashLookup] interface. It accepts only expr.IntegerValue seek keys.

func NewInt64HashIndex

func NewInt64HashIndex(idx interface {
	Lookup(value int64) *roaring64.Bitmap
}) *Int64HashIndex

NewInt64HashIndex constructs an Int64HashIndex.

func (*Int64HashIndex) LookupBitmap

func (h *Int64HashIndex) LookupBitmap(value expr.Value) (*roaring64.Bitmap, error)

LookupBitmap implements [hashLookup].

type Int64RangeIndex

type Int64RangeIndex struct {
	// contains filtered or unexported fields
}

Int64RangeIndex adapts btree.Index[int64] to the [rangeLookup] interface. Nil bounds are treated as ±∞ using math.MinInt64 / math.MaxInt64.

func NewInt64RangeIndex

func NewInt64RangeIndex(idx interface {
	Range(lo, hi int64) *roaring64.Bitmap
}) *Int64RangeIndex

NewInt64RangeIndex constructs an Int64RangeIndex.

func (*Int64RangeIndex) RangeBitmap

func (r *Int64RangeIndex) RangeBitmap(lo, hi expr.Value) *roaring64.Bitmap

RangeBitmap implements [rangeLookup].

type LPGLabelSource

type LPGLabelSource struct {
	// contains filtered or unexported fields
}

LPGLabelSource is a concrete [labelResolver] built from an lpg label registry and a label index. It is the standard adapter for tests and production use.

func NewLPGLabelSource

func NewLPGLabelSource(reg *label.Index, lookupFn func(string) (uint32, bool)) *LPGLabelSource

NewLPGLabelSource constructs a LPGLabelSource. lookupFn should wrap lpg.LabelRegistry.Lookup, casting LabelID to uint32.

func (*LPGLabelSource) ResolveLabelBitmap

func (s *LPGLabelSource) ResolveLabelBitmap(name string) *roaring64.Bitmap

ResolveLabelBitmap implements [labelResolver].

type Limit

type Limit struct {
	// contains filtered or unexported fields
}

Limit is a Volcano pipeline operator that forwards at most n rows from its child operator and then signals end-of-stream.

Limit is NOT safe for concurrent use.

func NewLimit

func NewLimit(child Operator, n int64) (*Limit, error)

NewLimit creates a Limit operator that passes at most n rows from child. n must be ≥ 0; a limit of 0 emits no rows.

func (*Limit) Close

func (op *Limit) Close() error

Close releases resources and closes the child operator.

func (*Limit) Init

func (op *Limit) Init(ctx context.Context) error

Init initialises the operator and resets the emission counter.

func (*Limit) Next

func (op *Limit) Next(out *Row) (bool, error)

Next forwards the next row from the child, unless the limit has been reached, in which case it returns (false, nil) immediately.

type Merge

type Merge struct {
	// contains filtered or unexported fields
}

Merge implements MERGE semantics: match-or-create a pattern.

Merge is NOT safe for concurrent use.

func NewMerge

func NewMerge(
	nodeVar string,
	labels []string,
	properties string,
	onCreateStrs, onMatchStrs []string,
	searchFn MergeSearchFn,
	schema map[string]int,
	child Operator,
	mutator GraphMutator,
) (*Merge, error)

NewMerge creates a Merge operator.

nodeVar is the variable bound to the merged node. labels and properties are the node-pattern components used when creating a new node. onCreateStrs and onMatchStrs are opaque SET-item strings from the IR translator. searchFn executes the read side of the match. schema maps variable names to column indices. mutator is the graph write surface.

func (*Merge) Close

func (op *Merge) Close() error

Close closes the child operator.

func (*Merge) Init

func (op *Merge) Init(ctx context.Context) error

Init initialises the operator: executes the search plan, then dispatches to the ON MATCH or ON CREATE branch depending on whether the search returned any rows.

The first Merge.Init (or CreateNode.Init) in the process also seeds [globalNodeCounter] past the largest synthetic key already interned in op.mutator, so that the keys minted by [Merge.freshNodeKey] in this process cannot collide with __cx_merge_<hex> keys persisted by an earlier process and replayed during WAL / snapshot recovery. Without this a one-process-per-command consumer mints __cx_merge_1 on every command and the second MERGE silently overwrites the first node. The seed is gated by [globalNodeCounterSeededOnce] so the O(N) scan runs at most once per process regardless of how many CreateNode / Merge operators are built.

func (*Merge) Next

func (op *Merge) Next(out *Row) (bool, error)

Next emits one row: either a matched row (ON MATCH) or the created row (ON CREATE), each emitted exactly once.

func (*Merge) WithConstraints

func (op *Merge) WithConstraints(reg *ConstraintRegistry, mgr *index.Manager) *Merge

WithConstraints attaches a ConstraintRegistry and index.Manager for pre-write enforcement in ON CREATE and ON MATCH actions. Both must be non-nil. Returns op for chaining.

func (*Merge) WithParams

func (op *Merge) WithParams(params map[string]expr.Value) (*Merge, error)

WithParams re-parses the property map with the supplied query parameters for $name substitution. Returns op for chaining.

func (*Merge) WithPropsEvalFn

func (op *Merge) WithPropsEvalFn(fn PropsEvalFn) *Merge

WithPropsEvalFn attaches a per-row property evaluator. When fn is non-nil the operator re-evaluates the MERGE node-pattern property map against each driving row and uses the merged (literal ∪ dynamic) property set both as the search predicate and as the ON CREATE node-property writes. Required for MERGE patterns whose inline property map contains variable references such as `MERGE (p:Person {login: prop.login})` after an UNWIND.

Returns op for chaining.

type MergeRelAction

type MergeRelAction struct {
	// contains filtered or unexported fields
}

MergeRelAction is a pre-parsed `SET <relVar>.<key> = <value>` item.

func MergeRelActionFromKV

func MergeRelActionFromKV(key, value string) MergeRelAction

MergeRelActionFromKV constructs a MergeRelationship ON CREATE / ON MATCH action from a (key, value) pair. value is the opaque literal string as it appears in the source query (e.g. `'foo'` or `42`).

type MergeRelationship

type MergeRelationship struct {
	// contains filtered or unexported fields
}

MergeRelationship matches-or-creates a single-hop directed relationship between two already-bound endpoint columns. ON CREATE / ON MATCH actions targeting the relationship variable are applied to the matched-or-created edge.

MergeRelationship is NOT safe for concurrent use.

func NewMergeRelationship

func NewMergeRelationship(child Operator, srcCol, dstCol int, relType string, mutator GraphMutator) *MergeRelationship

NewMergeRelationship constructs a MergeRelationship operator.

  • child is the upstream plan providing rows with the bound endpoints.
  • srcCol / dstCol are the column indices that hold the src / dst NodeID.
  • relType is the relationship type label (single label only).
  • mutator is the graph write surface.

func (*MergeRelationship) Close

func (op *MergeRelationship) Close() error

Close closes the child operator.

func (*MergeRelationship) Init

func (op *MergeRelationship) Init(ctx context.Context) error

Init initialises the operator and its child.

func (*MergeRelationship) Next

func (op *MergeRelationship) Next(out *Row) (bool, error)

Next emits the next input row, ensuring that the (src)-[:relType]->(dst) edge exists in the graph (either pre-existing or newly created). When an existing edge has CREATE-multiplicity N > 1 the operator emits N rows for the same upstream tuple (Merge5 [3]).

func (*MergeRelationship) WithOnCreate

func (op *MergeRelationship) WithOnCreate(relVar string, actions []MergeRelAction) *MergeRelationship

WithOnCreate registers ON CREATE SET actions to apply when the edge is newly created. Each action is `<relVar>.<key> = <value>`; the caller has already verified that every action targets the relationship variable bound by this operator.

func (*MergeRelationship) WithOnMatch

func (op *MergeRelationship) WithOnMatch(relVar string, actions []MergeRelAction) *MergeRelationship

WithOnMatch registers ON MATCH SET actions to apply when the edge already exists.

func (*MergeRelationship) WithRelColumn

func (op *MergeRelationship) WithRelColumn(relCol int) *MergeRelationship

WithRelColumn registers the output-row column index that will carry the matched / created edge ID. When set (relCol >= 0) MergeRelationship extends the row with an IntegerValue(edgeID) at the column so downstream operators (RETURN r, count(r), …) see the bound relationship.

func (*MergeRelationship) WithRelProperties

func (op *MergeRelationship) WithRelProperties(propsRaw string) *MergeRelationship

WithRelProperties registers an inline relationship property predicate (e.g. `{name: 'r2'}` from `MERGE (a)-[r:T {name: 'r2'}]->(b)`). When set, the operator filters the existing-edge search by the predicate AND writes the listed properties when a new edge is created. Pass an empty string to clear.

func (*MergeRelationship) WithSchema

func (op *MergeRelationship) WithSchema(schema map[string]int) *MergeRelationship

WithSchema attaches the upstream variable-to-column mapping so entity-copy actions (`SET r = a`) can resolve the source variable from the row at write time.

func (*MergeRelationship) WithUndirected

func (op *MergeRelationship) WithUndirected(u bool) *MergeRelationship

WithUndirected toggles the undirected-search behaviour. When true, the match phase probes both (src, dst) and (dst, src) directions before falling through to the edge-create path, matching the openCypher semantics of `MERGE (a)-[r:T]-(b)` (Merge5 [13]).

type MergeSearchFn

type MergeSearchFn func(ctx context.Context) ([]Row, error)

MergeSearchFn is a function that executes the search sub-plan for the MERGE pattern and returns the matching rows. An empty slice means no match.

func NewMergeSearchFnFromPattern

func NewMergeSearchFnFromPattern(
	labels []string,
	propertiesRaw string,
	params map[string]expr.Value,
	mutator GraphMutator,
) (MergeSearchFn, error)

NewMergeSearchFnFromPattern returns a MergeSearchFn that finds every node in mutator whose label set contains every label in labels and whose property bag is equal to every (key, value) parsed from propertiesRaw.

labels is the slice of pattern labels (may be empty when the pattern is e.g. `(n {key: v})`). propertiesRaw is the opaque literal-map string surfaced by the IR (e.g. `{name: "Alice", age: 30}`); it may be empty. params binds `$name` references in propertiesRaw to query parameters; when empty the parser ignores parameter substitution.

The function returned by NewMergeSearchFnFromPattern walks every interned node id, resolves the label and property bag, and admits the node iff every label and every property matches. Match scaling is O(N) where N is the number of interned nodes; the cost is acceptable for the typical MERGE workload (small N or label-restricted pattern). A future revision may use [labelResolver]'s bitmap intersection to short-circuit label scans.

type NodeByIndexRangeScan

type NodeByIndexRangeScan struct {
	// contains filtered or unexported fields
}

NodeByIndexRangeScan is a Volcano leaf operator that scans a B+tree index over a half-open, closed, or open interval. Each Row has a single column: expr.IntegerValue(nodeID).

NodeByIndexRangeScan is NOT safe for concurrent use.

func NewNodeByIndexRangeScan

func NewNodeByIndexRangeScan(idx rangeLookup, lo, hi RangeBound) *NodeByIndexRangeScan

NewNodeByIndexRangeScan creates a NodeByIndexRangeScan.

func (*NodeByIndexRangeScan) Close

func (op *NodeByIndexRangeScan) Close() error

Close releases resources.

func (*NodeByIndexRangeScan) Init

func (op *NodeByIndexRangeScan) Init(ctx context.Context) error

Init performs the range lookup and initialises the bitmap iterator.

func (*NodeByIndexRangeScan) Next

func (op *NodeByIndexRangeScan) Next(out *Row) (bool, error)

Next emits the next matching NodeID. Returns (false, nil) at end-of-stream.

type NodeByIndexSeek

type NodeByIndexSeek struct {
	// contains filtered or unexported fields
}

NodeByIndexSeek is a Volcano leaf operator that performs an equality lookup on a property hash index. Each Row has a single column: expr.IntegerValue(nodeID).

NodeByIndexSeek is NOT safe for concurrent use.

func NewNodeByIndexSeek

func NewNodeByIndexSeek(idx hashLookup, seekValue expr.Value) *NodeByIndexSeek

NewNodeByIndexSeek creates a NodeByIndexSeek that looks up seekValue in idx.

func (*NodeByIndexSeek) Close

func (op *NodeByIndexSeek) Close() error

Close releases resources.

func (*NodeByIndexSeek) Init

func (op *NodeByIndexSeek) Init(ctx context.Context) error

Init performs the index lookup and initialises the bitmap iterator.

func (*NodeByIndexSeek) Next

func (op *NodeByIndexSeek) Next(out *Row) (bool, error)

Next emits the next matching NodeID. Returns (false, nil) at end-of-stream.

type NodeByLabelScan

type NodeByLabelScan struct {
	// contains filtered or unexported fields
}

NodeByLabelScan is a Volcano leaf operator that emits one Row per NodeID carrying the named label. Each Row has a single column: expr.IntegerValue(nodeID).

NodeByLabelScan is NOT safe for concurrent use.

func NewNodeByLabelScan

func NewNodeByLabelScan(labelName string, src labelResolver) *NodeByLabelScan

NewNodeByLabelScan creates a NodeByLabelScan for the given label.

func (*NodeByLabelScan) Close

func (op *NodeByLabelScan) Close() error

Close releases resources held by the operator.

func (*NodeByLabelScan) Init

func (op *NodeByLabelScan) Init(ctx context.Context) error

Init resolves the label to a bitmap and initialises the iterator.

func (*NodeByLabelScan) Next

func (op *NodeByLabelScan) Next(out *Row) (bool, error)

Next emits the next matching NodeID. Returns (false, nil) at end-of-stream.

type Operator

type Operator interface {
	// Init initialises the operator and its children. ctx is stored for later
	// use in Next; implementations must not begin producing rows in Init.
	Init(ctx context.Context) error

	// Next advances the operator by one row, writing the result into out.
	// It returns (true, nil) if a row was written, (false, nil) at end-of-stream,
	// or (false, err) on error. After returning (false, _), Next must not be
	// called again.
	//
	// Implementations check ctx.Done() on every call. Long-running loops check
	// ctx.Done() every 4096 iterations.
	Next(out *Row) (bool, error)

	// Close releases all resources held by this operator (open file handles,
	// memory, goroutines). It must be called exactly once by the pipeline
	// driver, even when Next returned an error.
	Close() error
}

Operator is the core abstraction of the Volcano iterator model. Every node in a physical query plan implements this interface.

Lifecycle

  1. [Init] is called exactly once before the first call to [Next].
  2. [Next] is called repeatedly until it returns (false, nil) or an error.
  3. [Close] is called exactly once, regardless of whether [Next] returned an error. Implementations must release all resources in [Close].

Cancellation

Every [Next] implementation must check ctx.Done() at the top of the call. For long-running inner loops that produce more than 4096 tuples without returning, check ctx.Done() every 4096 iterations.

Concurrency

An Operator instance is NOT safe for concurrent use. Each goroutine in a parallel pipeline segment owns its own operator tree.

type OptionalApply

type OptionalApply struct {
	// contains filtered or unexported fields
}

OptionalApply is the left-outer variant of CorrelatedApply. For every outer row, the inner pipeline is driven exactly like CorrelatedApply; when the inner pipeline produces zero rows for a given outer row, OptionalApply emits a single NULL-extended row whose width equals the configured padded width, holding the outer columns followed by NULL placeholders for the inner-introduced columns.

OptionalApply is NOT safe for concurrent use.

func NewOptionalApply

func NewOptionalApply(outer, inner Operator, arg *Argument, paddedWidth int) *OptionalApply

NewOptionalApply creates an OptionalApply operator.

  • outer is the left (driving) plan.
  • inner is the right (sub) plan whose leftmost leaf is the provided arg.
  • arg is the Argument node at the inner leaf; OptionalApply seeds it before each inner Init call.
  • paddedWidth is the total width of an output row, i.e. outerWidth plus the number of columns the inner pipeline introduces. When the inner pipeline emits zero rows for an outer, OptionalApply emits a row of this width whose first outerWidth columns are the outer row and whose trailing columns are expr.Null.

OptionalApply takes ownership of both plans.

func (*OptionalApply) Close

func (op *OptionalApply) Close() error

Close releases resources and closes both the outer and inner plans.

func (*OptionalApply) Init

func (op *OptionalApply) Init(ctx context.Context) error

Init initialises both the outer plan and stores ctx for subsequent Next calls.

func (*OptionalApply) Next

func (op *OptionalApply) Next(out *Row) (bool, error)

Next emits the next output row. The semantics are:

  • For each outer row, drain the inner pipeline.
  • If the inner pipeline emits ≥1 row, those rows are forwarded verbatim.
  • If the inner pipeline emits 0 rows, a single NULL-extended row is emitted consisting of the outer columns followed by expr.Null for every inner column the pipeline would have introduced.

type OptionalExpand

type OptionalExpand struct {
	// contains filtered or unexported fields
}

OptionalExpand is a Volcano pipeline operator that performs a single-hop expansion and emits a NULL-extended row when no edges match for an input node.

OptionalExpand is NOT safe for concurrent use.

func NewOptionalExpand

func NewOptionalExpand(input Operator, fwd, rev csrAdjacency, cfg ExpandConfig) *OptionalExpand

NewOptionalExpand creates an OptionalExpand operator.

  • input is the upstream operator supplying node IDs.
  • fwd is the forward CSR adjacency.
  • rev is the reverse CSR adjacency (required for DirIn/DirBoth).
  • cfg is the Expand configuration (direction, edge-type filter, inputCol).

The NULL-extension row uses the same column layout as Expand: inputRow... || srcID || Null(edgeID) || Null(dstID).

func (*OptionalExpand) Close

func (op *OptionalExpand) Close() error

Close closes the input and child operators.

func (*OptionalExpand) Init

func (op *OptionalExpand) Init(ctx context.Context) error

Init initialises the operator.

func (*OptionalExpand) Next

func (op *OptionalExpand) Next(out *Row) (bool, error)

Next emits the next row. For each input row:

  • It feeds the row into the inner Expand one hop at a time.
  • If Expand emits ≥1 row, those rows are forwarded as-is.
  • If Expand emits 0 rows for an input node, a NULL-extended row is emitted.

type ParallelScan

type ParallelScan struct {
	// contains filtered or unexported fields
}

ParallelScan is a Volcano leaf operator that partitions the full node scan into fixed-size morsels and executes them concurrently on up to GOMAXPROCS worker goroutines.

ParallelScan is NOT safe for concurrent use.

func NewParallelScan

func NewParallelScan(g nodeWalker, morselSize int) *ParallelScan

NewParallelScan creates a ParallelScan over g. morselSize controls the chunk size per worker; pass 0 to use DefaultMorselSize.

func (*ParallelScan) Close

func (op *ParallelScan) Close() error

Close cancels workers and waits for them to exit.

func (*ParallelScan) Init

func (op *ParallelScan) Init(ctx context.Context) error

Init collects all node IDs, partitions them into morsels, and launches worker goroutines. The output channel is populated lazily by the workers.

func (*ParallelScan) Next

func (op *ParallelScan) Next(out *Row) (bool, error)

Next reads the next NodeID from the output channel.

type ProcedureCallOp

type ProcedureCallOp struct {
	// contains filtered or unexported fields
}

ProcedureCallOp invokes a registered procedure and emits its result rows.

ProcedureCallOp is NOT safe for concurrent use.

func NewProcedureCallOp

func NewProcedureCallOp(
	namespace []string,
	name string,
	argExprs []func(Row) (expr.Value, error),
	yieldVars []string,
	child Operator,
	reg *procs.Registry,
) *ProcedureCallOp

NewProcedureCallOp creates a ProcedureCallOp.

namespace and name identify the procedure. argExprs evaluate procedure arguments against the current driver row. yieldVars names the output columns. child is the driving subplan; pass nil for a standalone CALL. reg is the procedure registry used for lookup at runtime.

func (*ProcedureCallOp) Close

func (op *ProcedureCallOp) Close() error

Close releases resources and closes the child operator.

func (*ProcedureCallOp) Init

func (op *ProcedureCallOp) Init(ctx context.Context) error

Init resets internal state and initialises the child if present.

func (*ProcedureCallOp) Next

func (op *ProcedureCallOp) Next(out *Row) (bool, error)

Next advances the operator by one row.

It draws driving rows from the child (or a synthetic empty row when child is nil), invokes the procedure for each, buffers all result rows, and emits them one at a time.

Void procedure semantics. A procedure declared with no output columns (len(yieldVars) == 0) is treated as a side-effect-only step. When invoked in-query (op.child != nil) it must NOT consume the driver row; instead each driver row is emitted unchanged once the impl has run, preserving the upstream variable bindings for any downstream RETURN. Standalone CALL (op.child == nil) emits nothing.

type Project

type Project struct {
	// contains filtered or unexported fields
}

Project is a Volcano pipeline operator that applies a list of ProjectionItem expressions to each input row, producing an output row with one column per item.

Project is NOT safe for concurrent use.

func NewProject

func NewProject(child Operator, items []ProjectionItem) (*Project, error)

NewProject creates a Project operator. items defines the output schema; each item's Eval function is applied to each input row. An empty items slice is legal (e.g. `WITH *` over a pattern that binds no variables); the resulting operator forwards an empty Row for every input row.

func (*Project) Close

func (op *Project) Close() error

Close releases resources and closes the child operator.

func (*Project) Columns

func (op *Project) Columns() []string

Columns returns the ordered list of output column aliases.

func (*Project) Init

func (op *Project) Init(ctx context.Context) error

Init initialises the operator and its child.

func (*Project) Next

func (op *Project) Next(out *Row) (bool, error)

Next evaluates each projection item against the next input row and writes the result into out. Returns (true, nil) when a projected row is available, (false, nil) at end-of-stream, or (false, err) on evaluation or child error.

type ProjectionItem

type ProjectionItem struct {
	// Alias is the output column name (e.g. "n", "count(n)", "x").
	Alias string
	// Eval evaluates the item expression against the current input row and
	// returns the projected value.
	Eval func(Row) (expr.Value, error)
}

ProjectionItem describes a single column in a projection. Eval is evaluated against the input row; Alias names the resulting output column.

type PropEntry

type PropEntry struct {
	Key   string
	Value lpg.PropertyValue
}

PropEntry is an exported key/value pair for use by external plan builders (api.go) that construct dynamic property evaluators. It mirrors propLiteral but carries exported fields so the physical builder can return values from a PropsEvalFn without requiring propLiteral to be exported.

type PropsEvalFn

type PropsEvalFn func(row Row) []PropEntry

PropsEvalFn is a per-row property evaluator closure. It receives the current row and returns a slice of (key, value) pairs produced by evaluating the property-map AST expressions against the row's bound variables. Any entry whose evaluation yields Null is omitted (openCypher: assigning null to a property is a no-op on a fresh node).

The closure is constructed once by the physical plan builder and captures the schema, function registry, and query parameters.

type RangeBound

type RangeBound struct {
	// Value is the bound's expr.Value.  Nil means unbounded (use the
	// minimum or maximum representable value for the index type).
	Value expr.Value
	// Include determines whether the bound is inclusive (≤ / ≥) or exclusive
	// (< / >).
	Include bool
}

RangeBound carries one endpoint of a range predicate.

type Record

type Record map[string]interface{}

Record is a single result row, accessed by column name. The underlying map is owned by the ResultSet; callers must copy values they need to retain beyond the next ResultSet.Next call.

type RelCols

type RelCols struct {
	SrcCol int
	DstCol int
}

RelCols carries the raw column indices that the Expand operator places for a relationship variable. SetProperty and RemoveProperty use it to reconstruct the (src, dst) endpoint keys when the bound entity is a relationship rather than a node.

The edgeCol (schema[entityVar]) holds the edge-position counter; SrcCol and DstCol hold the corresponding endpoint NodeIDs as IntegerValue.

type RelEndpointFn

type RelEndpointFn func(row Row) (uint64, uint64, bool)

RelEndpointFn returns the (srcID, dstID) endpoints for an edge that the schema-direct path is about to delete. Used when the bare-variable target carries an IntegerValue edge id (the in-pipeline encoding emitted by Expand) so DeleteNode can dispatch to the edge-removal branch without misinterpreting the id as a node id.

type RemoveLabels

type RemoveLabels struct {
	// contains filtered or unexported fields
}

RemoveLabels removes one or more labels from an already-bound node per input row.

RemoveLabels is NOT safe for concurrent use.

func NewRemoveLabels

func NewRemoveLabels(
	nodeVar string,
	labels []string,
	schema map[string]int,
	child Operator,
	mutator GraphMutator,
) *RemoveLabels

NewRemoveLabels creates a RemoveLabels operator.

func (*RemoveLabels) Close

func (op *RemoveLabels) Close() error

Close closes the child operator.

func (*RemoveLabels) Init

func (op *RemoveLabels) Init(ctx context.Context) error

Init initialises the operator and its child.

func (*RemoveLabels) Next

func (op *RemoveLabels) Next(out *Row) (bool, error)

Next pulls one row from the child and removes the specified labels.

type RemoveProperty

type RemoveProperty struct {
	// contains filtered or unexported fields
}

RemoveProperty removes a single named property from an already-bound node or relationship per input row. For relationships, call WithRelCols to supply the endpoint column indices.

RemoveProperty is NOT safe for concurrent use.

func NewRemoveProperty

func NewRemoveProperty(
	entityVar, propertyKey string,
	schema map[string]int,
	child Operator,
	mutator GraphMutator,
) *RemoveProperty

NewRemoveProperty creates a RemoveProperty operator.

func (*RemoveProperty) Close

func (op *RemoveProperty) Close() error

Close closes the child operator.

func (*RemoveProperty) Init

func (op *RemoveProperty) Init(ctx context.Context) error

Init initialises the operator and its child.

func (*RemoveProperty) Next

func (op *RemoveProperty) Next(out *Row) (bool, error)

Next pulls one row from the child and removes the specified property.

func (*RemoveProperty) WithRelCols

func (op *RemoveProperty) WithRelCols(rc RelCols) *RemoveProperty

WithRelCols marks entityVar as a relationship variable and records the row columns that hold the src and dst NodeIDs. Must be called before the first Next invocation. Returns op for chaining.

type Result

type Result interface {
	// Next advances to the next result row. It returns true if a row is
	// available, false at end-of-stream or on error. After Next returns false,
	// callers should check Err.
	Next() bool

	// Record returns the current row as a [Record] (column name → value).
	// Record must not be called before the first successful Next or after Next
	// returns false.
	Record() Record

	// Err returns the first error encountered during iteration, or nil.
	Err() error

	// Columns returns the ordered list of column names for this result set.
	// The slice is stable across calls and is not modified after construction.
	Columns() []string

	// Close releases all resources held by this result set, including the
	// underlying operator tree. It must be called exactly once.
	Close() error
}

Result is a forward-only, streaming iterator over query result rows.

Lifecycle

  1. Call [Next] in a loop until it returns false.
  2. After the loop, check [Err] for any error that terminated iteration.
  3. Call [Close] exactly once to release resources.

Concurrency

Result implementations are NOT safe for concurrent use.

type ResultSet

type ResultSet struct {
	// contains filtered or unexported fields
}

ResultSet is the concrete implementation of Result returned by Run.

ResultSet is NOT safe for concurrent use.

func Run

func Run(ctx context.Context, plan Operator, cols []string) *ResultSet

Run initialises plan, stores the column names, and returns a ResultSet ready for iteration. The caller drives iteration via ResultSet.Next and must call ResultSet.Close when done.

Run does not pull any rows; all work happens lazily in ResultSet.Next. The Record map is pre-allocated once here and reused across every Next call to eliminate the per-row allocation that previously dominated heap usage.

func (*ResultSet) Close

func (rs *ResultSet) Close() error

Close releases all resources held by the ResultSet, including the underlying operator tree. It must be called exactly once. Calling Close after a previous Close is a no-op.

func (*ResultSet) Columns

func (rs *ResultSet) Columns() []string

Columns returns the ordered list of column names. The slice is never nil and is stable for the lifetime of the ResultSet.

func (*ResultSet) Err

func (rs *ResultSet) Err() error

Err returns the first error encountered during iteration, or nil.

func (*ResultSet) Next

func (rs *ResultSet) Next() bool

Next advances to the next result row. It returns true when a row is available (accessible via Record), and false at end-of-stream or on error.

func (*ResultSet) Record

func (rs *ResultSet) Record() Record

Record returns the current row. Must only be called after a successful Next.

The returned map is owned by the ResultSet and reused by the next Next call; callers that need to retain a row beyond the next Next must copy it (or use ResultSet.TakeRecord).

func (*ResultSet) TakeRecord

func (rs *ResultSet) TakeRecord() Record

TakeRecord returns the current row and transfers ownership of its backing map to the caller, installing a fresh map for subsequent Next calls. Unlike ResultSet.Record — whose result is reused on the next Next — the map returned here is safe to retain. The materialisation path uses this to drain rows under the transaction-visibility barrier without the extra per-row copy that re-hashing every column into a new map would cost. Must only be called after a successful Next.

type RollUpApply

type RollUpApply struct {
	// contains filtered or unexported fields
}

RollUpApply is a Volcano pipeline operator that performs pattern-comprehension execution: for each outer row, it drains the entire inner sub-plan into a expr.ListValue and emits (outerRow... || listValue) as a single output row.

RollUpApply is NOT safe for concurrent use.

func NewRollUpApply

func NewRollUpApply(outer, inner Operator, arg *Argument, listEval func(Row) (expr.Value, error)) *RollUpApply

NewRollUpApply creates a RollUpApply operator.

  • outer is the driving (left) plan.
  • inner is the correlated (right) sub-plan whose leaf is arg.
  • arg is the Argument node seeded with each outer row before inner Init.
  • listEval, when non-nil, is called for each inner row to extract the value to collect into the list. When nil, the first column of each inner row is collected.

func (*RollUpApply) Close

func (op *RollUpApply) Close() error

Close closes both the outer and inner plans.

func (*RollUpApply) Init

func (op *RollUpApply) Init(ctx context.Context) error

Init initialises the outer plan.

func (*RollUpApply) Next

func (op *RollUpApply) Next(out *Row) (bool, error)

Next emits one output row per outer row. For each outer row, the entire inner plan is drained into a ListValue appended as a new column.

type Row

type Row []expr.Value

Row is a single tuple in the pipeline: a slice of expr.Value whose positions correspond to the operator's output schema. The slice is owned by the RowSlab that allocated it; callers must not retain it beyond the slab's lifetime.

func Drain

func Drain(ctx context.Context, op Operator) ([]Row, error)

Drain initialises op, pulls every row from the pipeline, and returns the collected rows as a []Row. Close is always called before Drain returns, regardless of whether an error occurred.

Cancellation: Drain honours ctx.Done() via the per-Next check inside each operator. If ctx is cancelled, Drain returns the partial result set and the context error.

The returned rows are independent copies: each element is a snapshot of the Row written by the operator at that iteration. Callers own the returned slice.

Example

ExampleDrain assembles the equivalent of `UNWIND [10, 20, 30] AS x RETURN x LIMIT 2` as a hand-built operator tree and drains it. SingleRow seeds one empty row, Unwind expands the literal list, and Limit caps the output.

package main

import (
	"context"
	"fmt"

	"github.com/FlavioCFOliveira/GoGraph/cypher/exec"
	"github.com/FlavioCFOliveira/GoGraph/cypher/expr"
)

func main() {
	// SingleRow emits exactly one empty row to drive the pipeline.
	src := exec.NewSingleRowOperator()

	// Unwind expands a fixed list; listFn ignores the (empty) input row.
	unwind, err := exec.NewUnwind(src, func(exec.Row) (expr.ListValue, error) {
		return expr.ListValue{
			expr.IntegerValue(10),
			expr.IntegerValue(20),
			expr.IntegerValue(30),
		}, nil
	})
	if err != nil {
		fmt.Println("NewUnwind:", err)
		return
	}

	// Limit passes at most two rows downstream.
	limit, err := exec.NewLimit(unwind, 2)
	if err != nil {
		fmt.Println("NewLimit:", err)
		return
	}

	// Drain runs the pipeline and always closes it before returning.
	rows, err := exec.Drain(context.Background(), limit)
	if err != nil {
		fmt.Println("Drain:", err)
		return
	}

	fmt.Println("rows:", len(rows))
	for _, r := range rows {
		fmt.Println("x =", r[0])
	}
}
Output:
rows: 2
x = 10
x = 20

type RowSlab

type RowSlab struct {
	// contains filtered or unexported fields
}

RowSlab is a bounded arena of pre-allocated rows. It eliminates per-row heap allocations by backing all rows in a single flat slice. Each call to [Alloc] hands out a sub-slice at zero GC cost after the initial backing allocation.

RowSlab is NOT safe for concurrent use; each pipeline stage owns its own instance, typically obtained from SlabPool.

Lifecycle

  1. Obtain a slab from SlabPool.Get (or call NewRowSlab).
  2. Call [Alloc] for each row needed in the current batch.
  3. When the batch is fully processed, call [Reset] and return the slab to SlabPool.Put.

[Reset] zeroes the column values in every allocated row so that no expr.Value reference is retained past the batch boundary (preventing GC nepotism between batches).

func NewRowSlab

func NewRowSlab(width, capacity int) *RowSlab

NewRowSlab creates a RowSlab with the given column count and row capacity. width is the number of expr.Value slots pre-allocated per row; pass 0 for variable-width rows (callers supply their own slice to [AllocRaw]). capacity must be ≥ 1; DefaultSlabCapacity is a reasonable default.

func (*RowSlab) Alloc

func (s *RowSlab) Alloc() (Row, error)

Alloc returns the next available pre-allocated row in the slab. It returns ErrSlabOverflow if the slab is exhausted. The row width matches the width passed to NewRowSlab; for variable-width slabs (width=0) use [AllocRaw].

func (*RowSlab) AllocRaw

func (s *RowSlab) AllocRaw() (int, error)

AllocRaw returns the next row slot for variable-width slabs (width=0). The caller is responsible for setting the returned row to a correctly-sized slice before use. For fixed-width slabs, use [Alloc].

func (*RowSlab) Cap

func (s *RowSlab) Cap() int

Cap returns the maximum number of rows this slab can hold.

func (*RowSlab) GetRow

func (s *RowSlab) GetRow(idx int) Row

GetRow returns the row at index idx. Panics if idx is out of bounds.

func (*RowSlab) Len

func (s *RowSlab) Len() int

Len returns the number of rows currently allocated.

func (*RowSlab) Reset

func (s *RowSlab) Reset()

Reset resets the slab for reuse. It zeroes every value slot in each allocated row to release references held by the GC, then resets the allocation counter to zero. The backing memory is retained.

func (*RowSlab) SetRow

func (s *RowSlab) SetRow(idx int, r Row)

SetRow stores row r at index idx. Panics if idx is out of bounds. Used with variable-width slabs after [AllocRaw].

type SemiApply

type SemiApply struct {
	// contains filtered or unexported fields
}

SemiApply emits each outer row for which the inner sub-plan produces at least one row. The inner plan is closed after the first match (short-circuit).

SemiApply is NOT safe for concurrent use.

func NewSemiApply

func NewSemiApply(outer, inner Operator, arg *Argument) *SemiApply

NewSemiApply creates a SemiApply operator.

  • outer is the driving (left) plan.
  • inner is the correlated (right) sub-plan whose leaf is arg.
  • arg is the Argument node seeded with each outer row before inner Init.

func (*SemiApply) Close

func (op *SemiApply) Close() error

Close closes the outer plan. The inner plan is already closed per-row inside Next; a redundant close here is safe because Close on a closed operator must be a no-op per the Operator contract.

func (*SemiApply) Init

func (op *SemiApply) Init(ctx context.Context) error

Init initialises the outer plan.

func (*SemiApply) Next

func (op *SemiApply) Next(out *Row) (bool, error)

Next advances to the next outer row for which the inner plan has ≥1 result.

type SetAllProperties

type SetAllProperties struct {
	// contains filtered or unexported fields
}

SetAllProperties replaces or merges every property on an already-bound node or relationship per input row.

SetAllProperties is NOT safe for concurrent use.

func NewSetAllPropertiesFromEntity

func NewSetAllPropertiesFromEntity(
	entityVar, sourceVar string,
	isReplace bool,
	schema map[string]int,
	child Operator,
	mutator GraphMutator,
) *SetAllProperties

NewSetAllPropertiesFromEntity creates a SetAllProperties operator copying every property from sourceVar (a bound node or relationship) to entityVar. isReplace selects `=` (true) vs `+=` (false) semantics.

func NewSetAllPropertiesFromMap

func NewSetAllPropertiesFromMap(
	entityVar, mapLiteral string,
	isReplace bool,
	schema map[string]int,
	child Operator,
	mutator GraphMutator,
) (*SetAllProperties, error)

NewSetAllPropertiesFromMap creates a SetAllProperties operator writing every key/value pair from mapLiteral to entityVar. mapLiteral is the opaque literal-map string (e.g. `{a: 1, b: "x"}`) produced by the AST printer.

func NewSetAllPropertiesFromParam

func NewSetAllPropertiesFromParam(
	entityVar, paramName string,
	isReplace bool,
	schema map[string]int,
	child Operator,
	mutator GraphMutator,
) *SetAllProperties

NewSetAllPropertiesFromParam creates a SetAllProperties operator writing every key/value pair from the named query parameter to entityVar. The parameter must resolve to a MapValue at exec time; non-map values are treated as a no-op.

func (*SetAllProperties) Close

func (op *SetAllProperties) Close() error

Close closes the child operator.

func (*SetAllProperties) Init

func (op *SetAllProperties) Init(ctx context.Context) error

Init initialises the operator and its child.

func (*SetAllProperties) Next

func (op *SetAllProperties) Next(out *Row) (bool, error)

Next pulls one row from the child and applies the configured whole-entity mutation. The row is forwarded unchanged so downstream operators (e.g. ProduceResults) can read the affected entity.

func (*SetAllProperties) WithConstraints

func (op *SetAllProperties) WithConstraints(reg *ConstraintRegistry, mgr *index.Manager) *SetAllProperties

WithConstraints attaches a ConstraintRegistry and index.Manager for pre-write enforcement. Both must be non-nil. Returns op for chaining.

func (*SetAllProperties) WithParams

func (op *SetAllProperties) WithParams(params map[string]expr.Value) (*SetAllProperties, error)

WithParams attaches query parameters for $name substitution in the literal map and for parameter-sourced operators. Returns op for chaining.

func (*SetAllProperties) WithRelCols

func (op *SetAllProperties) WithRelCols(rc RelCols) *SetAllProperties

WithRelCols marks entityVar as a relationship variable and records the row columns that hold the src and dst NodeIDs. Must be called before the first Next invocation. Returns op for chaining.

func (*SetAllProperties) WithSourceRelCols

func (op *SetAllProperties) WithSourceRelCols(rc RelCols) *SetAllProperties

WithSourceRelCols marks sourceVar as a relationship variable and records the row columns that hold its src and dst NodeIDs. Must be called before the first Next invocation when SourceVar is a relationship. Returns op for chaining.

type SetLabels

type SetLabels struct {
	// contains filtered or unexported fields
}

SetLabels adds one or more labels to an already-bound node per input row.

SetLabels is NOT safe for concurrent use.

func NewSetLabels

func NewSetLabels(
	nodeVar string,
	labels []string,
	schema map[string]int,
	child Operator,
	mutator GraphMutator,
) *SetLabels

NewSetLabels creates a SetLabels operator.

func (*SetLabels) Close

func (op *SetLabels) Close() error

Close closes the child operator.

func (*SetLabels) Init

func (op *SetLabels) Init(ctx context.Context) error

Init initialises the operator and its child.

func (*SetLabels) Next

func (op *SetLabels) Next(out *Row) (bool, error)

Next pulls one row from the child and adds the specified labels.

type SetProperty

type SetProperty struct {
	// contains filtered or unexported fields
}

SetProperty sets or replaces properties on an already-bound node or relationship per input row. The entity is identified by entityVar. For nodes, the column value must be an IntegerValue-encoded NodeID or a NodeValue. For relationships, call WithRelCols to supply the endpoint column indices.

SetProperty is NOT safe for concurrent use.

func NewSetProperty

func NewSetProperty(
	entityVar, propertyKey, valueExpr string,
	schema map[string]int,
	child Operator,
	mutator GraphMutator,
) (*SetProperty, error)

NewSetProperty creates a SetProperty operator.

entityVar is the variable name of the target node or relationship. propertyKey is the property key for single-property mode; pass empty for whole-entity mode. valueExpr is the opaque literal string from the IR. schema maps variable names to column indices. mutator is the graph write surface.

func (*SetProperty) Close

func (op *SetProperty) Close() error

Close closes the child operator.

func (*SetProperty) Init

func (op *SetProperty) Init(ctx context.Context) error

Init initialises the operator and its child.

func (*SetProperty) Next

func (op *SetProperty) Next(out *Row) (bool, error)

Next pulls one row from the child and applies the property mutation.

func (*SetProperty) WithConstraints

func (op *SetProperty) WithConstraints(reg *ConstraintRegistry, mgr *index.Manager) *SetProperty

WithConstraints attaches a ConstraintRegistry and index.Manager for pre-write enforcement. Both must be non-nil. Returns op for chaining.

func (*SetProperty) WithParams

func (op *SetProperty) WithParams(params map[string]expr.Value) *SetProperty

WithParams attaches query parameters for $name substitution in value expressions. Returns op for chaining.

func (*SetProperty) WithRelCols

func (op *SetProperty) WithRelCols(rc RelCols) *SetProperty

WithRelCols marks entityVar as a relationship variable and records the row columns that hold the src and dst NodeIDs. Must be called before the first Next invocation. Returns op for chaining.

func (*SetProperty) WithValueEvalFn

func (op *SetProperty) WithValueEvalFn(fn ValueEvalFn) *SetProperty

WithValueEvalFn attaches a per-row evaluator for the SET RHS expression. The closure is invoked whenever the operator needs to compute the new property value; it takes priority over the literal-string parser path for single-property assignments.

type ShortestPath

type ShortestPath struct {
	// contains filtered or unexported fields
}

ShortestPath is a Volcano pipeline operator that, for each input row, finds a single shortest path from srcCol to dstCol using BFS and emits one output row containing the path (or Null if unreachable).

ShortestPath is NOT safe for concurrent use.

func NewShortestPath

func NewShortestPath(input Operator, fwd, rev csrAdjacency, dir Direction, srcCol, dstCol int) *ShortestPath

NewShortestPath creates a ShortestPath operator.

  • input is the upstream operator supplying (srcID, dstID) pairs.
  • fwd is the forward CSR adjacency.
  • rev is the reverse CSR (required for DirIn/DirBoth).
  • dir is the traversal direction.
  • srcCol / dstCol are the column indices in each input row for source and destination node IDs.

func (*ShortestPath) Close

func (op *ShortestPath) Close() error

Close closes the input operator.

func (*ShortestPath) Init

func (op *ShortestPath) Init(ctx context.Context) error

Init initialises the operator.

func (*ShortestPath) Next

func (op *ShortestPath) Next(out *Row) (bool, error)

Next emits one row per input row, containing the shortest path (or Null).

type SingleRow

type SingleRow struct {
	// contains filtered or unexported fields
}

SingleRow emits one empty row then signals exhaustion.

SingleRow is NOT safe for concurrent use.

func NewSingleRowOperator

func NewSingleRowOperator() *SingleRow

NewSingleRowOperator returns a SingleRow operator.

func (*SingleRow) Close

func (op *SingleRow) Close() error

Close is a no-op; SingleRow holds no resources.

func (*SingleRow) Init

func (op *SingleRow) Init(ctx context.Context) error

Init resets the operator state.

func (*SingleRow) Next

func (op *SingleRow) Next(out *Row) (bool, error)

Next emits one empty row on the first call and returns false on all subsequent calls.

type Skip

type Skip struct {
	// contains filtered or unexported fields
}

Skip is a Volcano pipeline operator that discards the first n rows from its child operator and then forwards all remaining rows.

Skip is NOT safe for concurrent use.

func NewSkip

func NewSkip(child Operator, n int64) (*Skip, error)

NewSkip creates a Skip operator that discards the first n rows from child. n must be ≥ 0; a skip of 0 is a no-op pass-through.

func (*Skip) Close

func (op *Skip) Close() error

Close releases resources and closes the child operator.

func (*Skip) Init

func (op *Skip) Init(ctx context.Context) error

Init initialises the operator and resets the skip counter.

func (*Skip) Next

func (op *Skip) Next(out *Row) (bool, error)

Next discards rows until n have been skipped, then forwards subsequent rows.

type SlabPool

type SlabPool struct {
	// contains filtered or unexported fields
}

SlabPool is a sync.Pool-backed pool of RowSlab instances with a fixed column width and capacity. Operators that process a high volume of rows should obtain slabs from a shared pool to reduce GC pressure.

SlabPool is safe for concurrent use.

func NewSlabPool

func NewSlabPool(width, capacity int) *SlabPool

NewSlabPool creates a SlabPool that vends RowSlabs with the given column width and row capacity.

func (*SlabPool) Get

func (sp *SlabPool) Get() *RowSlab

Get retrieves a reset RowSlab from the pool, or allocates a new one.

func (*SlabPool) Put

func (sp *SlabPool) Put(s *RowSlab)

Put resets s and returns it to the pool.

type Sort

type Sort struct {
	// contains filtered or unexported fields
}

Sort is a blocking Volcano operator that collects all rows from its child, sorts them by the specified SortKey sequence, and emits them in order.

Sort is NOT safe for concurrent use.

func NewSort

func NewSort(child Operator, keys []SortKey, maxRows int) (*Sort, error)

NewSort creates a Sort operator.

  • child: the upstream operator to consume.
  • keys: ORDER BY specification. Must not be empty.
  • maxRows: upper bound on rows held in memory; pass 0 to use DefaultMaxSortRows.

func (*Sort) Close

func (op *Sort) Close() error

Close closes the child operator and releases internal storage.

func (*Sort) Init

func (op *Sort) Init(ctx context.Context) error

Init initialises the operator. The blocking collect+sort phase is deferred to the first Next call.

func (*Sort) Next

func (op *Sort) Next(out *Row) (bool, error)

Next emits the next sorted row. On the first call it collects and sorts all rows from the child (pipeline breaker). Subsequent calls step through the sorted slice.

type SortKey

type SortKey struct {
	// ColIdx is the zero-based index of the column within each Row.
	// Ignored when Eval is non-nil.
	ColIdx int
	// Ascending controls the sort direction. true = ASC, false = DESC.
	Ascending bool
	// Eval is an optional expression evaluator. When non-nil the sort key
	// value is obtained by calling Eval(row) rather than reading row[ColIdx].
	// This supports ORDER BY expressions that are not direct projection
	// output columns (e.g. ORDER BY n.age after RETURN n).
	Eval func(Row) (expr.Value, error)
}

SortKey describes a single ORDER BY column.

type StringHashIndex

type StringHashIndex struct {
	// contains filtered or unexported fields
}

StringHashIndex adapts hash.Index[string] to the [hashLookup] interface. It accepts only expr.StringValue seek keys; other kinds return ErrIndexTypeMismatch.

func NewStringHashIndex

func NewStringHashIndex(idx interface {
	Lookup(value string) *roaring64.Bitmap
}) *StringHashIndex

NewStringHashIndex constructs a StringHashIndex.

func (*StringHashIndex) LookupBitmap

func (h *StringHashIndex) LookupBitmap(value expr.Value) (*roaring64.Bitmap, error)

LookupBitmap implements [hashLookup].

type StringRangeIndex

type StringRangeIndex struct {
	// contains filtered or unexported fields
}

StringRangeIndex adapts btree.Index[string] to the [rangeLookup] interface. Nil bounds are treated as "" (empty) / "\xff…" (all bytes 0xff, 256 chars).

func NewStringRangeIndex

func NewStringRangeIndex(idx interface {
	Range(lo, hi string) *roaring64.Bitmap
}) *StringRangeIndex

NewStringRangeIndex constructs a StringRangeIndex.

func (*StringRangeIndex) RangeBitmap

func (r *StringRangeIndex) RangeBitmap(lo, hi expr.Value) *roaring64.Bitmap

RangeBitmap implements [rangeLookup].

type TargetEvalFn

type TargetEvalFn func(row Row) (expr.Value, error)

TargetEvalFn evaluates a DELETE / DETACH DELETE target expression against the current input row and returns the resolved value. The exec operator inspects the value: NodeValue / IntegerValue selects the node by ID; RelationshipValue selects the relationship; null is a row-passthrough no-op (matches openCypher 9 §3.5.8).

type Top

type Top struct {
	// contains filtered or unexported fields
}

Top is a blocking Volcano operator that emits the N smallest rows (per the given sort keys) from its child, using a bounded heap for O(M log N) memory and time.

Top is NOT safe for concurrent use.

func NewTop

func NewTop(child Operator, keys []SortKey, n int) (*Top, error)

NewTop creates a Top operator.

  • child: the upstream operator to consume.
  • keys: ORDER BY specification. Must not be empty.
  • n: number of rows to return. Must be ≥ 1.

func (*Top) Close

func (op *Top) Close() error

Close closes the child operator and releases internal storage.

func (*Top) Init

func (op *Top) Init(ctx context.Context) error

Init initialises the operator. The blocking consume phase is deferred to the first Next call.

func (*Top) Next

func (op *Top) Next(out *Row) (bool, error)

Next emits the next top-N row in sorted order. On the first call it consumes all rows from the child and finalises the heap.

type Union

type Union struct {
	// contains filtered or unexported fields
}

Union emits the set-union of left and right: all rows from both sides with duplicates removed. It is implemented as UnionAll wrapped in a Distinct operator.

Schema mismatch detection is inherited from UnionAll.

Union is NOT safe for concurrent use.

func NewUnion

func NewUnion(left, right Operator, maxDistinct int) *Union

NewUnion creates a Union operator that deduplicates the concatenation of left and right.

  • maxDistinct: upper bound on distinct rows; pass 0 to use DefaultMaxDistinct.

func (*Union) Close

func (op *Union) Close() error

Close releases all resources.

func (*Union) Init

func (op *Union) Init(ctx context.Context) error

Init initialises the operator.

func (*Union) Next

func (op *Union) Next(out *Row) (bool, error)

Next emits the next unique row from the union.

type UnionAll

type UnionAll struct {
	// contains filtered or unexported fields
}

UnionAll is a Volcano operator that concatenates the output of left and right children without deduplication. It validates that both sides produce rows of the same width (column count).

UnionAll is NOT safe for concurrent use.

func NewUnionAll

func NewUnionAll(left, right Operator) *UnionAll

NewUnionAll creates a UnionAll operator that concatenates left then right.

func (*UnionAll) Close

func (op *UnionAll) Close() error

Close closes both child operators. Both are always attempted; if both fail the errors are joined.

func (*UnionAll) Init

func (op *UnionAll) Init(ctx context.Context) error

Init initialises both child operators.

func (*UnionAll) Next

func (op *UnionAll) Next(out *Row) (bool, error)

Next emits rows from left until exhausted, then emits rows from right. Returns ErrSchemaMismatch if the first right-side row has a different column count than the first left-side row.

type Unwind

type Unwind struct {
	// contains filtered or unexported fields
}

Unwind is a Volcano pipeline operator that implements the UNWIND clause. For each input row it evaluates a list expression and emits one output row per list element, appending the element value as a new column.

Unwind is NOT safe for concurrent use.

func NewUnwind

func NewUnwind(child Operator, listFn UnwindListFn) (*Unwind, error)

NewUnwind creates an Unwind operator.

child provides the context rows. listFn is evaluated once per input row and must return the list to expand. The caller is responsible for appending the element column to the output row; Unwind handles that internally.

Both child and listFn are required: a nil argument returns the typed sentinel ErrUnwindNilChild or ErrUnwindNilListFn respectively, so callers can distinguish the cause via errors.Is. NewUnwind never panics.

func (*Unwind) Close

func (op *Unwind) Close() error

Close releases resources and closes the child operator.

Close is idempotent within a single pipeline lifecycle: calling it more than once between two Init invocations returns nil from the second and later calls and does NOT propagate to op.child.Close again. The idempotency guard is reset by Init, so an Init→Close→Init→Close sequence still closes the child twice — once per cycle, as expected.

func (*Unwind) Init

func (op *Unwind) Init(ctx context.Context) error

Init initialises the operator and its child. It clears all per-iteration state (curRow, curList, listIdx) and resets the idempotency guard (closed) so that Init is the exact dual of Close, allowing an operator instance to be safely re-Init'd after a previous Close.

func (*Unwind) Next

func (op *Unwind) Next(out *Row) (bool, error)

Next advances to the next element. It pulls a new input row from the child whenever the current list is exhausted, then emits one row per element.

Returns (true, nil) when an output row was written to out, (false, nil) at end-of-stream, (false, err) on error.

type UnwindListFn

type UnwindListFn func(row Row) (expr.ListValue, error)

UnwindListFn evaluates the list expression for one input row. It returns a expr.ListValue when the expression evaluates to a list, or nil/empty when there is nothing to expand.

type ValueEvalFn

type ValueEvalFn func(row Row) (value lpg.PropertyValue, isNull bool, hasValue bool, err error)

ValueEvalFn evaluates a SET RHS expression against the current input row and returns the resulting property value plus a flag distinguishing the "no value produced" case from the "explicit null" case. The exec operator uses null/no-value semantics to either delete (null) or no-op (no value).

type VarLengthConfig

type VarLengthConfig struct {
	// Direction to follow. Defaults to DirOut when zero.
	Direction Direction
	// EdgeType, when non-empty, restricts expansion to edges of this type.
	EdgeType string
	// EdgeTypeFilter maps absolute edge positions to type labels.
	EdgeTypeFilter map[uint64]string
	// InputCol is the column index in each input row that holds the source
	// NodeID. Defaults to 0.
	InputCol int
	// MinHops is the minimum path length (inclusive). Must be ≥ 0.
	MinHops int
	// MaxHops is the maximum path length (inclusive). Must be ≥ MinHops.
	// Use math.MaxInt for unbounded (not recommended without a safety cap).
	MaxHops int
	// MaxEdgesTraversed is the safety cap on total edge traversals per input
	// row. Defaults to 1,000,000 when 0.
	MaxEdgesTraversed int
	// ExcludedRelCols lists column indices in the input row holding edge
	// identifiers (IntegerValue or RelationshipValue) that must not be
	// traversed inside this VLE step. Implements the openCypher
	// no-repeated-relationships rule across distinct rel patterns within
	// the same MATCH (e.g. `MATCH ()-[r:EDGE]-() MATCH (n)-[*0..1]-()-[r]
	// -()-[*0..1]-(m)` — the two variable-length steps must not reuse the
	// edge bound to `r`). The visited bitset is pre-populated with each
	// listed column's edge position at BFS seed time.
	ExcludedRelCols []int
}

VarLengthConfig carries configuration for NewVarLengthExpand.

type VarLengthExpand

type VarLengthExpand struct {
	// contains filtered or unexported fields
}

VarLengthExpand is a Volcano pipeline operator that performs bounded BFS variable-length expansion.

VarLengthExpand is NOT safe for concurrent use.

func NewVarLengthExpand

func NewVarLengthExpand(input Operator, fwd, rev csrAdjacency, cfg *VarLengthConfig) *VarLengthExpand

NewVarLengthExpand creates a VarLengthExpand operator. cfg is read-only and taken by pointer to avoid copying the configuration struct on this hot path.

func (*VarLengthExpand) Close

func (op *VarLengthExpand) Close() error

Close releases resources.

func (*VarLengthExpand) Init

func (op *VarLengthExpand) Init(ctx context.Context) error

Init initialises the operator.

func (*VarLengthExpand) Next

func (op *VarLengthExpand) Next(out *Row) (bool, error)

Next emits the next (inputRow... || pathEdgesAsListValue || dstNodeID) row. The path is encoded as a expr.ListValue of edge positions (IntegerValues), followed by the destination node ID as an IntegerValue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL