query

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package query's physical_plan.go declares the Volcano-model physical operator interface and 16 concrete operator implementations (C3.0 extraction).

Status: PARTIAL. C3.0 lifts the operator types + execution logic from origin/archive/gemini-bulk-2026-05-13^3 with no consumers. The existing Step-based Executor (executor.go + executor_steps.go) keeps serving the /v1/cypher endpoint; this file's operators run only when wired by C4 (planner) + C5 (parser additions) in later PRs.

Deferred:

  • CallOperator (CALL ... YIELD) — references procedureRegistry, which lives in procedures.go (C6 territory). To preserve the per-PR discipline, it lands alongside C6 rather than dragging C6 forward.
  • Operator-level unit tests — deferred to C3.1 (mirroring the C1.0 + C1.1 split that surfaced a real navigation bug in the btree archive).

Each operator carries `otel.Tracer("query").Start(...)` spans on Open (and on Next where loop hot-paths warrant it) per the audit's S7 verdict. The acceptance bar of "OTEL spans visible in pkg/telemetry/ exporter integration test" cannot be met because pkg/telemetry does not yet exist in the tree; surface added so a future telemetry extraction can wire to it.

Package query's planner.go translates a Query AST into a tree of PhysicalOperators (C4.0 extraction + C4.1 q.Call reinstate). Consumes C3.0's operator types + C3.1's CallOperator.

Status: PARTIAL. The planner runs only when wired by a planner-driven executor path; the existing Step-based Executor (executor.go + executor_steps.go) keeps serving /v1/cypher. C5 (parser CALL/YIELD) + C3.1 (CallOperator) + this file's q.Call block now form the full AST → operator translation for CALL ... YIELD; execution requires C6's procedure bodies (Decision-6-gated in NEXT_STEPS_2026-05-13.md).

Deferred:

  • Planner-level unit tests — deferred (mirroring the C1.0/C1.1 and C3.0/C3.1 splits). The q.Call reinstate in C4.1 specifically should get a planner test that confirms a parsed CALL query produces a plan tree containing CallOperator at the right position.

Several archive comments are preserved verbatim: "Simplified" on the OPTIONAL MATCH path, "for the spike" on the linear-expansion comment, "redundant but safe" on isWhereConsumedByIndex. These are honest spike markers — left intact to keep the lift surgical; cleanup belongs in a follow-up once tests anchor the contract.

Package query — procedure registry + procedure bodies for Cypher CALL ... YIELD.

History:

  • C3.1 (PR #175): registry SKELETON (Procedure type, empty map, RegisterProcedure exported func) so CallOperator could compile.
  • C6 (this file): registers the first real procedure, algo.shortestPath, by wiring to pkg/algorithms.ShortestPathForTenant. Per Decision 6 = B (NEXT_STEPS_2026-05-13.md, resolved 2026-05-13), the algorithm takes storage.Storage (interface) so the procedure passes graph through without type assertion.

Future procedures:

  • gnn.messagePass — skipped; pkg/gnn doesn't exist on OSS (Subset 🟢 audit note carries forward).
  • llm.generate — dropped; pkg/intelligence doesn't exist on OSS (Decision 4 retired 2026-05-13 by package absence).
  • Backend-specific procedures may register via RegisterProcedure at init.

Index

Constants

View Source
const (
	// DefaultQueryTimeout is the default timeout for query execution
	DefaultQueryTimeout = 30 * time.Second

	// MaxQueryTimeout is the maximum allowed query timeout
	MaxQueryTimeout = 5 * time.Minute
)
View Source
const (
	// MaxCartesianProductResults limits the maximum size of cartesian product
	// to prevent memory exhaustion from queries like MATCH (a), (b), (c)
	MaxCartesianProductResults = 100000

	// MaxIntermediateResults limits intermediate results during query execution
	MaxIntermediateResults = 1000000
)
View Source
const (
	// MinStreamBufferSize is the minimum buffer size to prevent deadlocks
	MinStreamBufferSize = 10
	// DefaultStreamBufferSize is used when no size is specified
	DefaultStreamBufferSize = 100
)
View Source
const (
	// DefaultMaxTraversalDepth is used when MaxDepth is not specified or is 0
	DefaultMaxTraversalDepth = 10
	// MaxAllowedTraversalDepth is the absolute maximum to prevent stack overflow in DFS
	// and memory exhaustion in BFS
	MaxAllowedTraversalDepth = 100
	// MinTraversalDepth is the minimum valid depth (0 means only start node)
	MinTraversalDepth = 0

	// DefaultMaxResults is used when MaxResults is not specified or is 0
	DefaultMaxResults = 10000
	// MaxAllowedResults is the absolute maximum to prevent memory exhaustion
	MaxAllowedResults = 1000000
)

Traversal depth limits to prevent resource exhaustion

View Source
const (
	// DefaultTaskTimeout is the default timeout for individual task execution
	DefaultTaskTimeout = 30 * time.Second
	// MinTaskTimeout is the minimum allowed task timeout
	MinTaskTimeout = 1 * time.Second
)
View Source
const (
	// MaxQueryLength is the maximum allowed query length (10KB)
	MaxQueryLength = 10000
)

Variables

View Source
var ErrInvalidMaxResults = fmt.Errorf("MaxResults must be non-negative")

ErrInvalidMaxResults is returned when MaxResults is negative

View Source
var ErrInvalidTraversalDepth = fmt.Errorf("traversal depth out of valid range [%d, %d]", MinTraversalDepth, MaxAllowedTraversalDepth)

ErrInvalidTraversalDepth is returned when depth is out of valid range

Functions

func Collect

func Collect(stream *ResultStream) ([]*storage.Node, error)

Collect collects all results from a stream into a slice

func Count

func Count(stream *ResultStream) (int, error)

Count counts results in a stream

func RegisterFunction

func RegisterFunction(name string, fn QueryFunc)

RegisterFunction registers a named function for use in queries

func RegisterProcedure

func RegisterProcedure(name string, proc Procedure)

RegisterProcedure adds a procedure to the registry. Exported so external packages can register procedures (e.g. enterprise plugin loader, test fixtures) without requiring direct map access.

func SanitizeQuery

func SanitizeQuery(query string) (string, error)

SanitizeQuery validates and sanitizes a query string for security It checks for: - Maximum length (DoS prevention) - Dangerous patterns (XSS, SQL injection, code injection) - Empty/whitespace-only queries - Normalizes whitespace

func ValidateQueryTimeout

func ValidateQueryTimeout(timeout time.Duration) time.Duration

ValidateQueryTimeout is a convenience function for validating query timeouts.

func ValidateTaskTimeout

func ValidateTaskTimeout(timeout time.Duration) time.Duration

ValidateTaskTimeout is a convenience function for validating task timeouts.

func ValidateTimeout

func ValidateTimeout(timeout time.Duration, config TimeoutConfig) time.Duration

ValidateTimeout validates and normalizes a timeout duration. Returns the default if timeout is <= 0. Returns min if timeout is less than min (when min > 0). Returns max if timeout exceeds max (when max > 0).

func ValidateTraversalOptions

func ValidateTraversalOptions(opts *TraversalOptions) error

ValidateTraversalOptions validates and normalizes traversal options. Returns an error if options are invalid. Note: MaxDepth=0 is valid and means "only return the start node".

func WithDefaultDepth

func WithDefaultDepth(depth int) int

WithDefaultDepth returns the depth or DefaultMaxTraversalDepth if depth is <= 0. Use this when you want to apply a default for unspecified depths.

Types

type AggregateOperator

type AggregateOperator struct {
	Input PhysicalOperator
	Items []*ReturnItem
	// contains filtered or unexported fields
}

AggregateOperator performs grouped aggregations.

func (*AggregateOperator) Close

func (o *AggregateOperator) Close(ctx *ExecutionContext) error

func (*AggregateOperator) Next

func (*AggregateOperator) Open

type AggregationComputer

type AggregationComputer struct{}

AggregationComputer handles aggregate function computation

func (*AggregationComputer) ComputeAggregates

func (ac *AggregationComputer) ComputeAggregates(ctx *ExecutionContext, returnItems []*ReturnItem) map[string]any

ComputeAggregates computes all aggregate functions in the return clause

func (*AggregationComputer) ComputeGroupedAggregates

func (ac *AggregationComputer) ComputeGroupedAggregates(ctx *ExecutionContext, returnItems []*ReturnItem, groupByExprs []*PropertyExpression) []map[string]any

ComputeGroupedAggregates computes aggregates for each group

func (*AggregationComputer) ExtractValue

func (ac *AggregationComputer) ExtractValue(val storage.Value) any

ExtractValue extracts the actual value from storage.Value

type ArithmeticExpression

type ArithmeticExpression struct {
	Left     Expression
	Operator string // "+", "-", "*", "/", "%"
	Right    Expression
}

ArithmeticExpression represents binary arithmetic: +, -, *, /, % Uses the dual-eval pattern: EvalValue returns the computed value, Eval coerces to bool for WHERE context.

func (*ArithmeticExpression) Eval

func (ae *ArithmeticExpression) Eval(context map[string]any) (bool, error)

func (*ArithmeticExpression) EvalValue

func (ae *ArithmeticExpression) EvalValue(context map[string]any) (any, error)

type Assignment

type Assignment struct {
	Variable  string
	Property  string
	Value     any        // literal value (used when ValueExpr is nil)
	ValueExpr Expression // expression RHS (takes precedence over Value when non-nil)
}

Assignment represents a property assignment

type BatchProcessor

type BatchProcessor struct {
	// contains filtered or unexported fields
}

BatchProcessor processes query results in batches

func NewBatchProcessor

func NewBatchProcessor(batchSize int, processor func([]*storage.Node) error) *BatchProcessor

NewBatchProcessor creates a batch processor

func (*BatchProcessor) Process

func (bp *BatchProcessor) Process(stream *ResultStream) error

Process processes a result stream in batches

type BinaryExpression

type BinaryExpression struct {
	Left     Expression
	Operator string
	Right    Expression
}

BinaryExpression represents binary operations (AND, OR, =, <, >, etc.)

func (*BinaryExpression) Eval

func (be *BinaryExpression) Eval(context map[string]any) (bool, error)

func (*BinaryExpression) EvalValue

func (be *BinaryExpression) EvalValue(context map[string]any) (any, error)

type BindingSet

type BindingSet struct {
	// contains filtered or unexported fields
}

BindingSet represents a set of variable bindings

type CallClause

type CallClause struct {
	ProcedureName string
	Arguments     []Expression
	YieldItems    []string // Variables to bind from result
}

CallClause represents a procedure call (e.g. CALL algo.shortestPath(...))

type CallOperator

type CallOperator struct {
	Input         PhysicalOperator
	ProcedureName string
	Arguments     []Expression
	YieldItems    []string // Variables to bind from result
	// contains filtered or unexported fields
}

CallOperator executes a procedure call (e.g., algorithm). Procedure dispatch goes through procedureRegistry (pkg/query/procedures.go). C3.1 lands the operator + skeleton registry; C6 registers actual procedures once Decision 6 (S1↔algorithms storage-type wiring) is resolved.

func (*CallOperator) Close

func (o *CallOperator) Close(ctx *ExecutionContext) error

func (*CallOperator) Next

func (o *CallOperator) Next(ctx *ExecutionContext) (*BindingSet, error)

func (*CallOperator) Open

func (o *CallOperator) Open(ctx *ExecutionContext) error

type CaseExpression

type CaseExpression struct {
	Operand     Expression // non-nil for simple CASE, nil for searched CASE
	WhenClauses []CaseWhen
	ElseResult  Expression // nil if no ELSE
}

CaseExpression represents a CASE expression (both searched and simple forms)

func (*CaseExpression) Eval

func (ce *CaseExpression) Eval(context map[string]any) (bool, error)

Eval evaluates CASE as a boolean (for WHERE usage) by coercing the result value

func (*CaseExpression) EvalValue

func (ce *CaseExpression) EvalValue(context map[string]any) (any, error)

EvalValue returns the raw value of the first matching WHEN branch

type CaseWhen

type CaseWhen struct {
	Condition Expression // bool condition (searched) or comparison value (simple)
	Result    Expression
}

CaseWhen represents a single WHEN/THEN branch

type CreateClause

type CreateClause struct {
	Patterns []*Pattern
}

CreateClause represents node/relationship creation

type CreateOperator

type CreateOperator struct {
	Input    PhysicalOperator
	Patterns []*Pattern
	// contains filtered or unexported fields
}

CallOperator executes a procedure call (e.g., algorithm). CreateOperator handles node and relationship creation.

func (*CreateOperator) Close

func (o *CreateOperator) Close(ctx *ExecutionContext) error

func (*CreateOperator) Next

func (o *CreateOperator) Next(ctx *ExecutionContext) (*BindingSet, error)

func (*CreateOperator) Open

func (o *CreateOperator) Open(ctx *ExecutionContext) error

type CreateStep

type CreateStep struct {
	// contains filtered or unexported fields
}

CreateStep executes a CREATE clause

func (*CreateStep) Execute

func (cs *CreateStep) Execute(ctx *ExecutionContext) error

func (*CreateStep) StepDetail

func (cs *CreateStep) StepDetail() string

func (*CreateStep) StepName

func (cs *CreateStep) StepName() string

type DeleteClause

type DeleteClause struct {
	Variables []string
	Detach    bool // DETACH DELETE removes relationships too
}

DeleteClause represents deletion

type DeleteOperator

type DeleteOperator struct {
	Input     PhysicalOperator
	Variables []string
	Detach    bool
}

DeleteOperator handles node and edge deletion.

func (*DeleteOperator) Close

func (o *DeleteOperator) Close(ctx *ExecutionContext) error

func (*DeleteOperator) Next

func (o *DeleteOperator) Next(ctx *ExecutionContext) (*BindingSet, error)

func (*DeleteOperator) Open

func (o *DeleteOperator) Open(ctx *ExecutionContext) error

type DeleteStep

type DeleteStep struct {
	// contains filtered or unexported fields
}

DeleteStep executes a DELETE clause

func (*DeleteStep) Execute

func (ds *DeleteStep) Execute(ctx *ExecutionContext) error

func (*DeleteStep) StepDetail

func (ds *DeleteStep) StepDetail() string

func (*DeleteStep) StepName

func (ds *DeleteStep) StepName() string

type Direction

type Direction int

Direction represents relationship direction

const (
	DirectionOutgoing Direction = iota
	DirectionIncoming
	DirectionBoth
)

func (Direction) String

func (d Direction) String() string

type ExecutionContext

type ExecutionContext struct {
	// contains filtered or unexported fields
}

ExecutionContext holds execution state.

Audit A6c-query (2026-05-08): tenantID is read once from the request context at construction time and snapshotted here. Subsequent step executions read ec.tenantID directly rather than parsing the context per call. Step authors must use the *ForTenant graph methods with this tenantID — pre-fix, every executor step called tenant-blind storage methods regardless of the JWT-derived tenant on r.Context().

func (*ExecutionContext) CheckCancellation

func (ec *ExecutionContext) CheckCancellation() error

CheckCancellation returns an error if the context is cancelled

func (*ExecutionContext) IsCancelled

func (ec *ExecutionContext) IsCancelled() bool

IsCancelled checks if the execution context has been cancelled

type ExecutionPlan

type ExecutionPlan struct {
	Steps []ExecutionStep
}

ExecutionPlan represents a query execution plan

type ExecutionStep

type ExecutionStep interface {
	Execute(ctx *ExecutionContext) error
}

ExecutionStep represents a single step in execution

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor executes parsed queries against a graph

func NewExecutor

func NewExecutor(graph *storage.GraphStorage) *Executor

NewExecutor creates a new query executor

func NewExecutorWithTimeout

func NewExecutorWithTimeout(graph *storage.GraphStorage, timeout time.Duration) *Executor

NewExecutorWithTimeout creates a new query executor with custom timeout

func (*Executor) Execute

func (e *Executor) Execute(query *Query) (*ResultSet, error)

Execute executes a query and returns results. Includes panic recovery to prevent server crashes from malformed queries. Uses the default query timeout.

func (*Executor) ExecuteWithContext

func (e *Executor) ExecuteWithContext(ctx context.Context, query *Query) (result *ResultSet, err error)

ExecuteWithContext executes a query with context for cancellation and timeout support. Includes panic recovery to prevent server crashes from malformed queries.

func (*Executor) ExecuteWithParams

func (e *Executor) ExecuteWithParams(query *Query, params map[string]any) (*ResultSet, error)

ExecuteWithParams executes a parameterized query. Parameters are provided as a map and injected into the query before execution. ParameterRef values in property maps are resolved to actual values, and parameters are made available in bindings as "$name" keys.

func (*Executor) ExecuteWithParamsContext added in v0.5.0

func (e *Executor) ExecuteWithParamsContext(ctx context.Context, query *Query, params map[string]any) (*ResultSet, error)

ExecuteWithParamsContext is ExecuteWithParams that honours a caller-supplied context (timeout / cancellation). The HTTP query handler uses this so that parameterized queries get both parameter substitution AND the request timeout — calling ExecuteWithContext directly drops req.Parameters and stores the literal "&{name}" instead of the value (#237).

func (*Executor) ExecuteWithText

func (e *Executor) ExecuteWithText(queryText string, query *Query) (*ResultSet, error)

ExecuteWithText executes a query from text and uses query caching

func (*Executor) SetQueryTimeout

func (e *Executor) SetQueryTimeout(timeout time.Duration)

SetQueryTimeout sets the query timeout

func (*Executor) SetSearchIndex

func (e *Executor) SetSearchIndex(idx *search.FullTextIndex)

SetSearchIndex configures full-text search for use in queries. Registers the search() function that captures the index via closure.

func (*Executor) SetVectorSearch

func (e *Executor) SetVectorSearch(
	similarityFn VectorSimilarityFunc,
	searchFn VectorSearchFunc,
	hasIndexFn HasVectorIndexFunc,
	getNodeFn GetNodeFunc,
)

SetVectorSearch wires up vector similarity and HNSW search for use in queries. Follows the same closure pattern as SetSearchIndex for full-text search.

type ExpandOperator

type ExpandOperator struct {
	Input     PhysicalOperator
	SourceVar string
	TargetVar string
	EdgeVar   string
	EdgeType  string
	Direction Direction // Use local query.Direction
	// contains filtered or unexported fields
}

ExpandOperator expands from a source node along edges of a specific type.

func (*ExpandOperator) Close

func (o *ExpandOperator) Close(ctx *ExecutionContext) error

func (*ExpandOperator) Next

func (o *ExpandOperator) Next(ctx *ExecutionContext) (*BindingSet, error)

func (*ExpandOperator) Open

func (o *ExpandOperator) Open(ctx *ExecutionContext) error

type Expression

type Expression interface {
	Eval(context map[string]any) (bool, error)
	EvalValue(context map[string]any) (any, error)
}

Expression is an interface for all expression types. EvalValue returns the raw evaluation result (used for projection, SET RHS, CALL args); Eval coerces to bool (used for WHERE / AND / OR / comparison). All concrete implementations satisfy both — the dual-eval pattern.

type FilterOperator

type FilterOperator struct {
	Input      PhysicalOperator
	Expression Expression // Use existing Expression interface if available
}

FilterOperator filters rows based on a predicate.

func (*FilterOperator) Close

func (o *FilterOperator) Close(ctx *ExecutionContext) error

func (*FilterOperator) Next

func (o *FilterOperator) Next(ctx *ExecutionContext) (*BindingSet, error)

func (*FilterOperator) Open

func (o *FilterOperator) Open(ctx *ExecutionContext) error

type FilterStep

type FilterStep struct {
	// contains filtered or unexported fields
}

FilterStep executes a WHERE clause

func (*FilterStep) Execute

func (fs *FilterStep) Execute(ctx *ExecutionContext) error

func (*FilterStep) StepDetail

func (fs *FilterStep) StepDetail() string

func (*FilterStep) StepName

func (fs *FilterStep) StepName() string

type FunctionCallExpression

type FunctionCallExpression struct {
	Name string
	Args []Expression
}

FunctionCallExpression represents a function call (e.g., toLower(n.name))

func (*FunctionCallExpression) Eval

func (fce *FunctionCallExpression) Eval(context map[string]any) (bool, error)

Eval evaluates the function and coerces the result to bool (for WHERE usage)

func (*FunctionCallExpression) EvalValue

func (fce *FunctionCallExpression) EvalValue(context map[string]any) (any, error)

EvalValue evaluates the function and returns the raw result

type GetNodeFunc

type GetNodeFunc func(nodeID uint64) (any, error)

GetNodeFunc fetches a node by ID (avoids importing storage in step types).

type HasVectorIndexFunc

type HasVectorIndexFunc func(propertyName string) bool

HasVectorIndexFunc checks whether a vector index exists for a property.

type HashJoinOperator

type HashJoinOperator struct {
	Left  PhysicalOperator
	Right PhysicalOperator
	Var   string // The common variable to join on
	// contains filtered or unexported fields
}

HashJoinOperator performs an efficient equijoin using an in-memory hash table. Build phase: buffers Right input into a hash map. Probe phase: streams Left input and probes the map.

func (*HashJoinOperator) Close

func (o *HashJoinOperator) Close(ctx *ExecutionContext) error

func (*HashJoinOperator) Next

func (*HashJoinOperator) Open

func (o *HashJoinOperator) Open(ctx *ExecutionContext) error

type IndexLookupStep

type IndexLookupStep struct {
	// contains filtered or unexported fields
}

IndexLookupStep uses a property index for efficient node lookup This replaces a full scan when the optimizer detects an indexable equality condition

func (*IndexLookupStep) Execute

func (ils *IndexLookupStep) Execute(ctx *ExecutionContext) error

func (*IndexLookupStep) StepDetail

func (ils *IndexLookupStep) StepDetail() string

func (*IndexLookupStep) StepName

func (ils *IndexLookupStep) StepName() string

type IndexSeekOperator

type IndexSeekOperator struct {
	Variable    string
	PropertyKey string
	Value       storage.Value
	// contains filtered or unexported fields
}

IndexSeekOperator uses a property index to find nodes.

func (*IndexSeekOperator) Close

func (o *IndexSeekOperator) Close(ctx *ExecutionContext) error

func (*IndexSeekOperator) Next

func (*IndexSeekOperator) Open

type Lexer

type Lexer struct {
	// contains filtered or unexported fields
}

Lexer tokenizes a query string

func NewLexer

func NewLexer(input string) *Lexer

NewLexer creates a new lexer

func (*Lexer) Tokenize

func (l *Lexer) Tokenize() ([]Token, error)

Tokenize converts the input string into tokens

type LiteralExpression

type LiteralExpression struct {
	Value any
}

LiteralExpression represents a literal value

func (*LiteralExpression) Eval

func (le *LiteralExpression) Eval(context map[string]any) (bool, error)

func (*LiteralExpression) EvalValue

func (le *LiteralExpression) EvalValue(context map[string]any) (any, error)

type MatchClause

type MatchClause struct {
	Patterns []*Pattern
}

MatchClause represents a MATCH pattern

type MatchStep

type MatchStep struct {
	// contains filtered or unexported fields
}

MatchStep executes a MATCH clause

func (*MatchStep) Execute

func (ms *MatchStep) Execute(ctx *ExecutionContext) error

func (*MatchStep) StepDetail

func (ms *MatchStep) StepDetail() string

func (*MatchStep) StepName

func (ms *MatchStep) StepName() string

type MergeClause

type MergeClause struct {
	Pattern  *Pattern
	OnMatch  *SetClause
	OnCreate *SetClause
}

MergeClause represents a MERGE operation (match-or-create)

type MergeOperator

type MergeOperator struct {
	Input    PhysicalOperator
	Pattern  *Pattern
	OnMatch  *SetClause
	OnCreate *SetClause
}

MergeOperator handles match-or-create logic.

func (*MergeOperator) Close

func (o *MergeOperator) Close(ctx *ExecutionContext) error

func (*MergeOperator) Next

func (o *MergeOperator) Next(ctx *ExecutionContext) (*BindingSet, error)

func (*MergeOperator) Open

func (o *MergeOperator) Open(ctx *ExecutionContext) error

type MergeStep

type MergeStep struct {
	// contains filtered or unexported fields
}

MergeStep executes a MERGE clause (match-or-create)

func (*MergeStep) Execute

func (ms *MergeStep) Execute(ctx *ExecutionContext) error

func (*MergeStep) StepDetail

func (ms *MergeStep) StepDetail() string

func (*MergeStep) StepName

func (ms *MergeStep) StepName() string

type NestedLoopJoinOperator

type NestedLoopJoinOperator struct {
	Left  PhysicalOperator
	Right PhysicalOperator
	// contains filtered or unexported fields
}

NestedLoopJoinOperator performs a nested loop join (Cartesian product) of two input streams. It buffers the Right input in memory.

func (*NestedLoopJoinOperator) Close

func (*NestedLoopJoinOperator) Next

func (*NestedLoopJoinOperator) Open

type NodePattern

type NodePattern struct {
	Variable   string
	Labels     []string
	Properties map[string]any
}

NodePattern represents a node in a pattern

type NodeScanOperator

type NodeScanOperator struct {
	Variable string
	Label    string
	// contains filtered or unexported fields
}

NodeScanOperator scans all nodes with a given label (or all nodes if label is empty).

func (*NodeScanOperator) Close

func (o *NodeScanOperator) Close(ctx *ExecutionContext) error

func (*NodeScanOperator) Next

func (*NodeScanOperator) Open

func (o *NodeScanOperator) Open(ctx *ExecutionContext) error

type OptimizationHint

type OptimizationHint struct {
	Type          string // "index_available", "filter_early", "join_reorder"
	Description   string
	EstimatedGain float64 // Estimated speedup multiplier
}

OptimizationHint provides hints about potential optimizations

type Optimizer

type Optimizer struct {
	// contains filtered or unexported fields
}

Optimizer optimizes query execution plans

func NewOptimizer

func NewOptimizer(graph *storage.GraphStorage) *Optimizer

NewOptimizer creates a new query optimizer

func (*Optimizer) AnalyzeQuery

func (o *Optimizer) AnalyzeQuery(query *Query) []OptimizationHint

AnalyzeQuery analyzes a query and suggests optimizations

func (*Optimizer) EstimateCost

func (o *Optimizer) EstimateCost(pattern *MatchClause) float64

EstimateCost estimates the cost of executing a match pattern

func (*Optimizer) Optimize

func (o *Optimizer) Optimize(plan *ExecutionPlan, query *Query) *ExecutionPlan

Optimize optimizes an execution plan

type OptionalMatchClause

type OptionalMatchClause struct {
	Patterns []*Pattern
	Where    *WhereClause // WHERE scoped to this optional match
}

OptionalMatchClause represents an OPTIONAL MATCH pattern with left-outer-join semantics

type OptionalMatchOperator

type OptionalMatchOperator struct {
	Input   PhysicalOperator
	Pattern *Pattern
	// contains filtered or unexported fields
}

OptionalMatchOperator implements left-outer-join semantics.

func (*OptionalMatchOperator) Close

func (*OptionalMatchOperator) Next

func (*OptionalMatchOperator) Open

type OptionalMatchStep

type OptionalMatchStep struct {
	// contains filtered or unexported fields
}

OptionalMatchStep executes an OPTIONAL MATCH clause with left-outer-join semantics. When no match is found, variables from the pattern are set to nil (null propagation).

func (*OptionalMatchStep) Execute

func (oms *OptionalMatchStep) Execute(ctx *ExecutionContext) error

func (*OptionalMatchStep) StepDetail

func (oms *OptionalMatchStep) StepDetail() string

func (*OptionalMatchStep) StepName

func (oms *OptionalMatchStep) StepName() string

type OrderByItem

type OrderByItem struct {
	Expression *PropertyExpression
	ValueExpr  Expression // broader expression (takes precedence when non-nil)
	Ascending  bool
}

OrderByItem represents ordering specification

type ParallelAggregation

type ParallelAggregation struct {
	// contains filtered or unexported fields
}

ParallelAggregation performs parallel aggregation operations

func NewParallelAggregation

func NewParallelAggregation(graph *storage.GraphStorage) *ParallelAggregation

NewParallelAggregation creates a parallel aggregation engine

func (*ParallelAggregation) AggregateProperty

func (pa *ParallelAggregation) AggregateProperty(
	ctx context.Context,
	propertyKey string,
	aggregateFunc func(values []any) any,
) (any, error)

AggregateProperty performs parallel property aggregation

Concurrent Safety: 1. Divides node ID range among workers for parallel scanning 2. Each worker builds independent value slice (no shared state) 3. Workers send value slices to buffered channel (size = numWorkers) 4. Main goroutine aggregates after all workers complete

Concurrent Edge Cases: 1. Workers may encounter deleted nodes - silently skipped (continue on error) 2. Workers may encounter nodes without the property - only existing values collected 3. No synchronization needed between workers - non-overlapping ID ranges 4. Final aggregation happens sequentially after parallel collection

func (*ParallelAggregation) CountNodesByLabel

func (pa *ParallelAggregation) CountNodesByLabel(ctx context.Context, label string) (int, error)

CountNodesByLabel counts nodes with a label in parallel

Concurrent Safety: 1. Divides node ID range among workers for parallel scanning 2. Each worker has independent count accumulator (no shared state) 3. Workers send results to buffered channel (size = numWorkers) 4. Main goroutine waits via WaitGroup before closing channels

Concurrent Edge Cases: 1. Workers may encounter deleted nodes - silently skipped (continue on error) 2. No synchronization needed between workers - non-overlapping ID ranges 3. Channel buffer prevents workers from blocking on result send

type ParallelPathFinder

type ParallelPathFinder struct {
	// contains filtered or unexported fields
}

ParallelPathFinder finds paths in parallel

func NewParallelPathFinder

func NewParallelPathFinder(graph *storage.GraphStorage) *ParallelPathFinder

NewParallelPathFinder creates a parallel path finder

func (*ParallelPathFinder) FindAllPaths

func (ppf *ParallelPathFinder) FindAllPaths(
	ctx context.Context,
	pairs [][2]uint64,
	maxDepth int,
) ([][]uint64, error)

FindAllPaths finds all paths between multiple pairs of nodes in parallel

Concurrent Safety: 1. Each pair is processed by independent goroutine 2. Semaphore limits concurrent path searches to NumCPU() 3. Each path search maintains independent visited map (no shared state) 4. Results collected via buffered channel after all workers complete

Concurrent Edge Cases: 1. Semaphore prevents goroutine explosion when processing many pairs 2. Path search may fail (no path found) - returns nil, continues processing others 3. Goroutines may complete in any order - results order not preserved 4. All goroutines complete before returning (WaitGroup ensures this)

type ParallelPipeline

type ParallelPipeline struct {
	// contains filtered or unexported fields
}

ParallelPipeline executes pipeline stages in parallel

func NewParallelPipeline

func NewParallelPipeline(workers int) *ParallelPipeline

NewParallelPipeline creates a parallel pipeline

func (*ParallelPipeline) AddStage

func (pp *ParallelPipeline) AddStage(stage PipelineStage) *ParallelPipeline

AddStage adds a stage to the pipeline

func (*ParallelPipeline) Execute

func (pp *ParallelPipeline) Execute(input *ResultStream) *ResultStream

Execute executes the pipeline with parallel workers

type ParallelTraversal

type ParallelTraversal struct {
	// contains filtered or unexported fields
}

ParallelTraversal performs parallel BFS traversal

func NewParallelTraversal

func NewParallelTraversal(graph *storage.GraphStorage, startIDs []uint64, maxDepth int) *ParallelTraversal

NewParallelTraversal creates a parallel traversal query

func (*ParallelTraversal) Execute

func (pt *ParallelTraversal) Execute(ctx context.Context) ([]*storage.Node, error)

Execute performs parallel traversal from multiple starting nodes

Concurrent Safety: 1. Uses sync.Map for thread-safe visited tracking across workers 2. Multiple goroutines traverse different branches simultaneously 3. Results channel has buffer to reduce blocking 4. Workers coordinate via WaitGroup for proper shutdown 5. Error channel uses select/default to prevent blocking on error reporting 6. Context is propagated to all spawned goroutines for cancellation

Concurrent Edge Cases: 1. Multiple workers may discover same node - sync.Map.LoadOrStore handles this 2. Result channel may fill up - select/default prevents goroutine leaks 3. One worker error doesn't stop others - they continue until WaitGroup completes 4. Channel close is synchronized with WaitGroup to prevent send-on-closed-channel 5. Context cancellation propagates to all child goroutines spawned by traverseFrom

type ParameterExpression

type ParameterExpression struct {
	Name string // "name" from $name
}

ParameterExpression represents a query parameter reference ($name) in expressions

func (*ParameterExpression) Eval

func (pe *ParameterExpression) Eval(context map[string]any) (bool, error)

func (*ParameterExpression) EvalValue

func (pe *ParameterExpression) EvalValue(context map[string]any) (any, error)

EvalValue returns the raw parameter value for use in comparisons

type ParameterRef

type ParameterRef struct {
	Name string // "name" from $name
}

ParameterRef represents a parameter reference in property maps ({name: $name})

type Parser

type Parser struct {
	// contains filtered or unexported fields
}

Parser builds an AST from tokens

func NewParser

func NewParser(tokens []Token) *Parser

NewParser creates a new parser

func (*Parser) Parse

func (p *Parser) Parse() (*Query, error)

Parse parses the tokens into a Query AST

type Path

type Path struct {
	Nodes []*storage.Node
	Edges []*storage.Edge
}

Path represents a path through the graph

type Pattern

type Pattern struct {
	Nodes         []*NodePattern
	Relationships []*RelationshipPattern
}

Pattern represents a graph pattern to match

type PhysicalOperator

type PhysicalOperator interface {
	// Open initializes the operator and its children.
	Open(ctx *ExecutionContext) error

	// Next returns the next row (BindingSet) from the operator.
	// Returns (nil, nil) when the result set is exhausted.
	Next(ctx *ExecutionContext) (*BindingSet, error)

	// Close releases any resources held by the operator.
	Close(ctx *ExecutionContext) error
}

PhysicalOperator is the interface for physical query operators (Volcano model).

type PipelineStage

type PipelineStage func(*storage.Node) (*storage.Node, bool)

PipelineStage represents a stage in a query pipeline

type Planner

type Planner struct {
	// contains filtered or unexported fields
}

Planner translates a Query AST into a tree of PhysicalOperators.

func NewPlanner

func NewPlanner(graph storage.Storage) *Planner

NewPlanner creates a new query planner.

func (*Planner) Plan

func (p *Planner) Plan(ctx context.Context, q *Query) (PhysicalOperator, error)

Plan creates a physical execution plan for the given query.

func (*Planner) PlanSub

func (p *Planner) PlanSub(ctx context.Context, q *Query, input PhysicalOperator) (PhysicalOperator, error)

PlanSub creates an execution plan, potentially with an existing input source.

type Procedure

type Procedure func(ctx context.Context, graph storage.Storage, tenantID string, args []any) ([]map[string]any, error)

Procedure is the function signature for a Cypher procedure callable via `CALL procedure_name(args) YIELD items`. The storage.Storage interface argument is intentionally the S1-narrowed type; algorithms exposed as procedures take the interface (per Decision 6 = B).

type ProjectOperator

type ProjectOperator struct {
	Input PhysicalOperator
	Items []*ReturnItem
}

ProjectOperator transforms rows into the final result format.

func (*ProjectOperator) Close

func (o *ProjectOperator) Close(ctx *ExecutionContext) error

func (*ProjectOperator) Next

func (*ProjectOperator) Open

func (o *ProjectOperator) Open(ctx *ExecutionContext) error

type PropertyExpression

type PropertyExpression struct {
	Variable string
	Property string
}

PropertyExpression represents property access (e.g., n.name)

func (*PropertyExpression) Eval

func (pe *PropertyExpression) Eval(context map[string]any) (bool, error)

func (*PropertyExpression) EvalValue

func (pe *PropertyExpression) EvalValue(context map[string]any) (any, error)

type Query

type Query struct {
	Match           *MatchClause
	Where           *WhereClause
	Return          *ReturnClause
	Create          *CreateClause
	Delete          *DeleteClause
	Set             *SetClause
	Remove          *RemoveClause
	Unwind          *UnwindClause
	Merge           *MergeClause
	With            *WithClause
	OptionalMatches []*OptionalMatchClause
	Union           *UnionClause
	UnionNext       *Query // For UNION chaining
	Next            *Query // For WITH chaining
	Call            *CallClause
	Limit           int
	Skip            int
	Explain         bool
	Profile         bool

	// InitialBindings are injected by WITH clause chaining
	InitialBindings []*BindingSet
}

Query represents a complete query statement

type QueryCache

type QueryCache struct {
	// contains filtered or unexported fields
}

QueryCache caches compiled/optimized queries

func NewQueryCache

func NewQueryCache() *QueryCache

NewQueryCache creates a new query cache

func (*QueryCache) Get

func (qc *QueryCache) Get(queryText string) (*ExecutionPlan, bool)

Get retrieves a cached plan

func (*QueryCache) GetTopQueries

func (qc *QueryCache) GetTopQueries(limit int) []*QueryStatistics

GetTopQueries returns most frequently executed queries

func (*QueryCache) Put

func (qc *QueryCache) Put(queryText string, plan *ExecutionPlan)

Put stores a plan in cache

func (*QueryCache) RecordExecution

func (qc *QueryCache) RecordExecution(queryText string, executionTimeMicros int64, optimized bool)

RecordExecution records query execution statistics

type QueryFunc

type QueryFunc func(args []any) (any, error)

QueryFunc is a function callable from within a query

func GetFunction

func GetFunction(name string) (QueryFunc, error)

GetFunction retrieves a registered function by name (case-insensitive)

type QueryPipeline

type QueryPipeline struct {
	// contains filtered or unexported fields
}

QueryPipeline chains multiple processing stages

func NewQueryPipeline

func NewQueryPipeline() *QueryPipeline

NewQueryPipeline creates a new query pipeline

func (*QueryPipeline) AddStage

func (qp *QueryPipeline) AddStage(stage PipelineStage) *QueryPipeline

AddStage adds a processing stage

func (*QueryPipeline) Execute

func (qp *QueryPipeline) Execute(input *ResultStream) *ResultStream

Execute executes the pipeline on a stream

func (*QueryPipeline) Filter

func (qp *QueryPipeline) Filter(predicate func(*storage.Node) bool) *QueryPipeline

Filter adds a filter stage

func (*QueryPipeline) Map

func (qp *QueryPipeline) Map(transform func(*storage.Node) *storage.Node) *QueryPipeline

Map adds a transformation stage

type QueryStatistics

type QueryStatistics struct {
	QueryText          string
	ExecutionCount     int
	TotalExecutionTime int64 // microseconds
	AvgExecutionTime   int64
	LastOptimized      bool
}

QueryStatistics tracks query execution statistics

type RelationshipPattern

type RelationshipPattern struct {
	Variable   string
	Type       string
	Direction  Direction
	Properties map[string]any
	From       *NodePattern
	To         *NodePattern
	MinHops    int // For variable-length paths
	MaxHops    int
}

RelationshipPattern represents a relationship in a pattern

type RemoveClause

type RemoveClause struct {
	Items []*RemoveItem
}

RemoveClause represents property or label removal

type RemoveItem

type RemoveItem struct {
	Variable string
	Property string // non-empty for property removal
}

RemoveItem represents a single property or label to remove

type RemoveOperator

type RemoveOperator struct {
	Input PhysicalOperator
	Items []*RemoveItem
}

RemoveOperator handles property removal.

func (*RemoveOperator) Close

func (o *RemoveOperator) Close(ctx *ExecutionContext) error

func (*RemoveOperator) Next

func (o *RemoveOperator) Next(ctx *ExecutionContext) (*BindingSet, error)

func (*RemoveOperator) Open

func (o *RemoveOperator) Open(ctx *ExecutionContext) error

type RemoveStep

type RemoveStep struct {
	// contains filtered or unexported fields
}

RemoveStep executes a REMOVE clause — removes properties from nodes

func (*RemoveStep) Execute

func (rs *RemoveStep) Execute(ctx *ExecutionContext) error

func (*RemoveStep) StepDetail

func (rs *RemoveStep) StepDetail() string

func (*RemoveStep) StepName

func (rs *RemoveStep) StepName() string

type ResultSet

type ResultSet struct {
	Columns []string
	Rows    []map[string]any
	Count   int
	Profile []StepProfile // Populated when PROFILE is used
}

ResultSet represents query results

type ResultStream

type ResultStream struct {
	// contains filtered or unexported fields
}

ResultStream provides streaming query results

func NewResultStream

func NewResultStream(bufferSize int) *ResultStream

NewResultStream creates a new result stream. Enforces a minimum buffer size to prevent deadlocks with unbuffered channels.

func (*ResultStream) Close

func (rs *ResultStream) Close()

Close closes the result stream

func (*ResultStream) Next

func (rs *ResultStream) Next() (*storage.Node, error)

Next returns the next result or error

func (*ResultStream) Send

func (rs *ResultStream) Send(node *storage.Node) bool

Send sends a result to the stream

func (*ResultStream) SendError

func (rs *ResultStream) SendError(err error)

SendError sends an error and closes the stream

type ReturnClause

type ReturnClause struct {
	Items     []*ReturnItem
	Distinct  bool
	OrderBy   []*OrderByItem
	GroupBy   []*PropertyExpression
	Ascending bool
}

ReturnClause represents what to return

type ReturnItem

type ReturnItem struct {
	Expression *PropertyExpression
	ValueExpr  Expression // Broader type for function calls; if non-nil, takes precedence
	Alias      string
	Aggregate  string // COUNT, SUM, AVG, MIN, MAX, COLLECT
}

ReturnItem represents a single return item

type ReturnStep

type ReturnStep struct {
	// contains filtered or unexported fields
}

ReturnStep executes a RETURN clause

func (*ReturnStep) Execute

func (rs *ReturnStep) Execute(ctx *ExecutionContext) error

func (*ReturnStep) StepDetail

func (rs *ReturnStep) StepDetail() string

func (*ReturnStep) StepName

func (rs *ReturnStep) StepName() string

type SetClause

type SetClause struct {
	Assignments []*Assignment
}

SetClause represents property updates

type SetOperator

type SetOperator struct {
	Input       PhysicalOperator
	Assignments []*Assignment
}

SetOperator handles property updates.

func (*SetOperator) Close

func (o *SetOperator) Close(ctx *ExecutionContext) error

func (*SetOperator) Next

func (o *SetOperator) Next(ctx *ExecutionContext) (*BindingSet, error)

func (*SetOperator) Open

func (o *SetOperator) Open(ctx *ExecutionContext) error

type SetStep

type SetStep struct {
	// contains filtered or unexported fields
}

SetStep executes a SET clause

func (*SetStep) Execute

func (ss *SetStep) Execute(ctx *ExecutionContext) error

func (*SetStep) StepDetail

func (ss *SetStep) StepDetail() string

func (*SetStep) StepName

func (ss *SetStep) StepName() string

type StepDescriber

type StepDescriber interface {
	StepName() string
	StepDetail() string
}

StepDescriber provides human-readable descriptions of execution steps

type StepProfile

type StepProfile struct {
	StepName string
	Detail   string
	Duration time.Duration
	RowsOut  int
}

StepProfile holds profiling data for a single execution step

type StreamingQuery

type StreamingQuery struct {
	// contains filtered or unexported fields
}

StreamingQuery executes queries with streaming results

func NewStreamingQuery

func NewStreamingQuery(graph *storage.GraphStorage) *StreamingQuery

NewStreamingQuery creates a streaming query executor

func (*StreamingQuery) StreamNodes

func (sq *StreamingQuery) StreamNodes(
	filter func(*storage.Node) bool,
) *ResultStream

StreamNodes streams all nodes matching a filter

func (*StreamingQuery) StreamTraversal

func (sq *StreamingQuery) StreamTraversal(
	startID uint64,
	maxDepth int,
) *ResultStream

StreamTraversal streams nodes discovered during traversal

type Task

type Task interface {
	Execute(graph *storage.GraphStorage) (any, error)
	ID() string
}

Task represents a unit of work

type TaskResult

type TaskResult struct {
	TaskID   string
	Result   any
	Error    error
	TimedOut bool          // True if task was cancelled due to timeout
	Duration time.Duration // How long the task took
}

TaskResult contains the result of a task execution

type TimeoutConfig

type TimeoutConfig struct {
	Min     time.Duration // Minimum allowed timeout (0 means no minimum)
	Max     time.Duration // Maximum allowed timeout (0 means no maximum)
	Default time.Duration // Default timeout when value is invalid
}

TimeoutConfig defines the bounds for timeout validation.

func DefaultQueryTimeoutConfig

func DefaultQueryTimeoutConfig() TimeoutConfig

DefaultQueryTimeoutConfig returns the standard config for query timeouts.

func DefaultTaskTimeoutConfig

func DefaultTaskTimeoutConfig() TimeoutConfig

DefaultTaskTimeoutConfig returns the standard config for task timeouts.

type Token

type Token struct {
	Type   TokenType
	Value  string
	Pos    int
	Line   int
	Column int
}

Token represents a lexical token

type TokenType

type TokenType int

TokenType represents the type of a token

const (
	// Special tokens
	TokenEOF TokenType = iota
	TokenError

	// Keywords
	TokenMatch
	TokenWhere
	TokenReturn
	TokenCreate
	TokenDelete
	TokenDetach
	TokenSet
	TokenWith
	TokenLimit
	TokenSkip
	TokenOrder
	TokenOrderBy // BY keyword
	TokenAsc
	TokenDesc
	TokenDistinct
	TokenAs
	TokenAnd
	TokenOr
	TokenNot
	TokenGroup
	TokenBy
	TokenExplain
	TokenProfile
	TokenUnwind
	TokenMerge
	TokenOn
	TokenOptional
	TokenCase
	TokenWhen
	TokenThen
	TokenElse
	TokenEnd
	TokenUnion
	TokenAll
	TokenIs
	TokenIn
	TokenRemove
	TokenStarts
	TokenEnds
	TokenContains
	TokenCall
	TokenYield

	// Identifiers and literals
	TokenParameter // $name
	TokenIdentifier
	TokenString
	TokenNumber
	TokenTrue
	TokenFalse
	TokenNull

	// Operators
	TokenEquals        // =
	TokenNotEquals     // !=, <>
	TokenLessThan      // <
	TokenGreaterThan   // >
	TokenLessEquals    // <=
	TokenGreaterEquals // >=
	TokenPlus          // +
	TokenMinus         // -
	TokenStar          // *
	TokenSlash         // /
	TokenPercent       // %
	TokenDot           // .
	TokenColon         // :
	TokenComma         // ,
	TokenSemicolon     // ;

	// Delimiters
	TokenLeftParen    // (
	TokenRightParen   // )
	TokenLeftBracket  // [
	TokenRightBracket // ]
	TokenLeftBrace    // {
	TokenRightBrace   // }

	// Relationship arrows
	TokenArrowLeft  // <-
	TokenArrowRight // ->
	TokenArrowBoth  // -
)

func (TokenType) String

func (t TokenType) String() string

type TraversalError

type TraversalError struct {
	NodeID uint64
	Err    error
}

TraversalError records an error encountered during traversal

func (TraversalError) Error

func (te TraversalError) Error() string

type TraversalOptions

type TraversalOptions struct {
	StartNodeID   uint64
	Direction     Direction
	EdgeTypes     []string                 // Filter by edge types (empty = all types)
	MaxDepth      int                      // Maximum traversal depth
	MaxResults    int                      // Maximum nodes to return
	Predicate     func(*storage.Node) bool // Node filter function
	EdgePredicate func(*storage.Edge) bool // Edge filter function (for temporal/property filtering)
	FailOnMissing bool                     // If true, return error on first missing node; if false, track and continue
}

TraversalOptions configures graph traversal

type TraversalResult

type TraversalResult struct {
	Nodes      []*storage.Node
	Paths      []Path
	SkippedIDs []uint64         // Node IDs that were skipped due to errors
	Errors     []TraversalError // Errors encountered during traversal
}

TraversalResult contains the results of a traversal

type Traverser

type Traverser struct {
	// contains filtered or unexported fields
}

Traverser performs graph traversals

func NewTraverser

func NewTraverser(storage *storage.GraphStorage) *Traverser

NewTraverser creates a new traverser

func (*Traverser) BFS

BFS performs breadth-first search traversal

func (*Traverser) DFS

DFS performs depth-first search traversal

func (*Traverser) FindAllPaths

func (t *Traverser) FindAllPaths(fromID, toID uint64, maxDepth int, edgeTypes []string) ([]Path, error)

FindAllPaths finds all paths between two nodes up to maxDepth

func (*Traverser) FindAllPathsWithPredicate

func (t *Traverser) FindAllPathsWithPredicate(fromID, toID uint64, maxDepth int, edgeTypes []string, edgePredicate func(*storage.Edge) bool) ([]Path, error)

FindAllPathsWithPredicate finds all paths with optional edge filtering

func (*Traverser) FindShortestPath

func (t *Traverser) FindShortestPath(fromID, toID uint64, edgeTypes []string) (Path, error)

FindShortestPath finds the shortest path between two nodes (BFS-based)

func (*Traverser) FindShortestPathWithPredicate

func (t *Traverser) FindShortestPathWithPredicate(fromID, toID uint64, edgeTypes []string, edgePredicate func(*storage.Edge) bool) (Path, error)

FindShortestPathWithPredicate finds shortest path with optional edge filtering

func (*Traverser) GetNeighborhood

func (t *Traverser) GetNeighborhood(nodeID uint64, hops int, direction Direction) ([]*storage.Node, error)

GetNeighborhood gets all nodes within N hops. If hops is 0, only the start node is returned.

type UnaryExpression

type UnaryExpression struct {
	Operator string // "NOT", "-"
	Operand  Expression
}

UnaryExpression represents unary operators: NOT, -

func (*UnaryExpression) Eval

func (ue *UnaryExpression) Eval(context map[string]any) (bool, error)

func (*UnaryExpression) EvalValue

func (ue *UnaryExpression) EvalValue(context map[string]any) (any, error)

type UnionClause

type UnionClause struct {
	All bool // true = UNION ALL (keep duplicates), false = UNION (deduplicate)
}

UnionClause represents a UNION between query segments

type UnionOperator

type UnionOperator struct {
	Left  PhysicalOperator
	Right PhysicalOperator
	All   bool // If false, deduplicate
	// contains filtered or unexported fields
}

UnionOperator combines results from two query segments.

func (*UnionOperator) Close

func (o *UnionOperator) Close(ctx *ExecutionContext) error

func (*UnionOperator) Next

func (o *UnionOperator) Next(ctx *ExecutionContext) (*BindingSet, error)

func (*UnionOperator) Open

func (o *UnionOperator) Open(ctx *ExecutionContext) error

type UnwindClause

type UnwindClause struct {
	Expression *PropertyExpression
	Alias      string
}

UnwindClause represents an UNWIND operation

type UnwindOperator

type UnwindOperator struct {
	Input      PhysicalOperator
	Expression *PropertyExpression
	Alias      string
	// contains filtered or unexported fields
}

UnwindOperator expands list values into individual rows.

func (*UnwindOperator) Close

func (o *UnwindOperator) Close(ctx *ExecutionContext) error

func (*UnwindOperator) Next

func (o *UnwindOperator) Next(ctx *ExecutionContext) (*BindingSet, error)

func (*UnwindOperator) Open

func (o *UnwindOperator) Open(ctx *ExecutionContext) error

type UnwindStep

type UnwindStep struct {
	// contains filtered or unexported fields
}

UnwindStep executes an UNWIND clause - expands list values into individual bindings

func (*UnwindStep) Execute

func (us *UnwindStep) Execute(ctx *ExecutionContext) error

func (*UnwindStep) StepDetail

func (us *UnwindStep) StepDetail() string

func (*UnwindStep) StepName

func (us *UnwindStep) StepName() string

type VectorSearchFunc

type VectorSearchFunc func(propertyName string, query []float32, k, ef int) ([]VectorSearchResult, error)

VectorSearchFunc performs HNSW k-NN search on a named vector index.

type VectorSearchResult

type VectorSearchResult struct {
	NodeID   uint64
	Distance float32
}

VectorSearchResult mirrors vector.SearchResult without importing pkg/vector.

type VectorSearchStep

type VectorSearchStep struct {
	// contains filtered or unexported fields
}

VectorSearchStep performs HNSW-accelerated pre-filtering. Inserted by the optimizer before MatchStep to pre-bind a variable to nodes whose embeddings are similar to a query vector.

func (*VectorSearchStep) Execute

func (vs *VectorSearchStep) Execute(ctx *ExecutionContext) error

func (*VectorSearchStep) StepDetail

func (vs *VectorSearchStep) StepDetail() string

func (*VectorSearchStep) StepName

func (vs *VectorSearchStep) StepName() string

type VectorSimilarityFunc

type VectorSimilarityFunc func(a, b []float32) (float64, error)

VectorSimilarityFunc computes similarity between two vectors. Returns a float64 score (e.g., cosine similarity in [-1, 1]).

type WhereClause

type WhereClause struct {
	Expression Expression
}

WhereClause represents filtering conditions

type WithClause

type WithClause struct {
	Items []*ReturnItem
	Where *WhereClause
}

WithClause represents a WITH projection between query segments

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages a pool of worker goroutines for parallel query execution

func NewWorkerPool

func NewWorkerPool(workers int) *WorkerPool

NewWorkerPool creates a worker pool with specified number of workers

func NewWorkerPoolWithTimeout

func NewWorkerPoolWithTimeout(workers int, taskTimeout time.Duration) *WorkerPool

NewWorkerPoolWithTimeout creates a worker pool with custom task timeout

func (*WorkerPool) DetailedStats

func (wp *WorkerPool) DetailedStats() WorkerPoolStats

DetailedStats returns detailed pool statistics

func (*WorkerPool) Results

func (wp *WorkerPool) Results() <-chan TaskResult

Results returns the results channel

func (*WorkerPool) Start

func (wp *WorkerPool) Start(graph *storage.GraphStorage)

Start starts the worker pool

func (*WorkerPool) Stats

func (wp *WorkerPool) Stats() (processed, active int64)

Stats returns pool statistics

func (*WorkerPool) Stop

func (wp *WorkerPool) Stop()

Stop stops the worker pool

func (*WorkerPool) Submit

func (wp *WorkerPool) Submit(task Task) error

Submit submits a task for execution

type WorkerPoolStats

type WorkerPoolStats struct {
	Processed   int64
	Active      int64
	TimedOut    int64
	TaskTimeout time.Duration
	Workers     int
}

WorkerPoolStats contains detailed worker pool statistics

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL