ops

package
v0.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2026 License: MIT Imports: 10 Imported by: 12

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrHaltTraversal         = errors.New("halt traversal")
	ErrGraphQueryMemoryLimit = errors.New("graph query required more memory than allowed")
)
View Source
var (
	ErrOperationDone = errors.New("parallel operation context has expired")
)

Functions

func AcyclicTraverseNodes

func AcyclicTraverseNodes(tx graph.Transaction, plan TraversalPlan, nodeFilter NodeFilter) (graph.NodeSet, error)

AcyclicTraverseNodes Does a traversal, but includes nodes that are intermediaries and terminals

func AcyclicTraverseTerminals

func AcyclicTraverseTerminals(tx graph.Transaction, plan TraversalPlan) (graph.NodeSet, error)

func CountNodes

func CountNodes(ctx context.Context, db graph.Database, criteria ...graph.Criteria) (int64, error)

CountNodes will fetch the current number of nodes in the database that match the given criteria

func DBFetchNodesByIDBitmap

func DBFetchNodesByIDBitmap(ctx context.Context, db graph.Database, nodeIDs cardinality.Duplex[uint32]) ([]*graph.Node, error)

func DeleteNodes

func DeleteNodes(tx graph.Transaction, nodeIDs ...graph.ID) error

func DeleteRelationships

func DeleteRelationships(tx graph.Transaction, relationshipIDs ...graph.ID) error

func DeleteRelationshipsQuery

func DeleteRelationshipsQuery(relationshipIDs ...graph.ID) graph.CriteriaProvider

func FetchAllNodeProperties

func FetchAllNodeProperties(tx graph.Transaction, nodes graph.NodeSet) error

func FetchEndNodes

func FetchEndNodes(query graph.RelationshipQuery) (graph.NodeSet, error)

func FetchLargestNodeID

func FetchLargestNodeID(ctx context.Context, db graph.Database) (graph.ID, error)

FetchLargestNodeID will fetch the current node database identifier ceiling.

func FetchNode

func FetchNode(tx graph.Transaction, id graph.ID) (*graph.Node, error)

func FetchNodeIDs

func FetchNodeIDs(query graph.NodeQuery) ([]graph.ID, error)

func FetchNodeIDsOfKindFromBitmap

func FetchNodeIDsOfKindFromBitmap(tx graph.Transaction, nodeIDs cardinality.Duplex[uint32], kinds ...graph.Kind) ([]graph.ID, error)

func FetchNodeProperties

func FetchNodeProperties(tx graph.Transaction, nodes graph.NodeSet, propertyNames []string) error

func FetchNodeRelationships

func FetchNodeRelationships(tx graph.Transaction, root *graph.Node, direction graph.Direction) ([]*graph.Relationship, error)

func FetchNodeSet

func FetchNodeSet(query graph.NodeQuery) (graph.NodeSet, error)

func FetchNodes

func FetchNodes(query graph.NodeQuery) ([]*graph.Node, error)

func FetchNodesByQuery

func FetchNodesByQuery(tx graph.Transaction, query string, limit int) (graph.NodeSet, error)

func FetchPathSet

func FetchPathSet(queryInst graph.RelationshipQuery) (graph.PathSet, error)

func FetchPathSetByQuery

func FetchPathSetByQuery(tx graph.Transaction, query string) (graph.PathSet, error)

func FetchRelationship

func FetchRelationship(tx graph.Transaction, id graph.ID) (*graph.Relationship, error)

func FetchRelationshipIDs

func FetchRelationshipIDs(query graph.RelationshipQuery) ([]graph.ID, error)

func FetchRelationshipNodes

func FetchRelationshipNodes(tx graph.Transaction, relationship *graph.Relationship) (*graph.Node, *graph.Node, error)

func FetchRelationships

func FetchRelationships(query graph.RelationshipQuery) ([]*graph.Relationship, error)

func FetchStartNodeIDs

func FetchStartNodeIDs(query graph.RelationshipQuery) ([]graph.ID, error)

func FetchStartNodes

func FetchStartNodes(query graph.RelationshipQuery) (graph.NodeSet, error)

func ForEachEndNode

func ForEachEndNode(relationshipQuery graph.RelationshipQuery, delegate func(relationship *graph.Relationship, node *graph.Node) error) error

func ForEachNodeID

func ForEachNodeID(tx graph.Transaction, ids []graph.ID, delegate func(node *graph.Node) error) error

Note this does not work with mutations inside the delegate

func ForEachStartNode

func ForEachStartNode(relationshipQuery graph.RelationshipQuery, delegate func(relationship *graph.Relationship, node *graph.Node) error) error

func ParallelFetchNodes

func ParallelFetchNodes(ctx context.Context, db graph.Database, criteria graph.Criteria, numWorkers int) (graph.NodeSet, error)

ParallelFetchNodes will first look up the largest node database identifier. The function will then spin up to numWorkers parallel read transactions. Each transaction will apply the user passed criteria to this function to a range of node database identifiers to avoid parallel worker collisions.

func ParallelNodeQuery

func ParallelNodeQuery(ctx context.Context, db graph.Database, criteria graph.Criteria, numWorkers int, queryDelegate func(query graph.NodeQuery) error) error

ParallelNodeQuery will first look up the largest node database identifier. The function will then spin up to numWorkers parallel read transactions. Each transaction will apply the user passed criteria to this function to a range of node database identifiers to avoid parallel worker collisions.

func TXFetchNodesByIDBitmap

func TXFetchNodesByIDBitmap(tx graph.Transaction, nodeIDs cardinality.Duplex[uint32]) ([]*graph.Node, error)

func Traversal

func Traversal(tx graph.Transaction, plan TraversalPlan, pathVisitor PathVisitor) error

func TraverseIntermediaryPaths

func TraverseIntermediaryPaths(tx graph.Transaction, plan TraversalPlan, nodeFilter NodeFilter) (graph.PathSet, error)

TraverseIntermediaryPaths NodeFilter is used to select candidate nodes for adding to the results

func TraversePaths

func TraversePaths(tx graph.Transaction, plan TraversalPlan) (graph.PathSet, error)

Types

type DepthExceptionHandler

type DepthExceptionHandler func(ctx *TraversalContext, segment *graph.PathSegment)

DepthExceptionHandler is invoked on paths that exceed depth traversal plan depth limits.

type LimitSkipTracker

type LimitSkipTracker struct {
	Limit int

	Skip int
	// contains filtered or unexported fields
}

func (*LimitSkipTracker) AtLimit

func (s *LimitSkipTracker) AtLimit() bool

func (*LimitSkipTracker) ShouldCollect

func (s *LimitSkipTracker) ShouldCollect() bool

type NodeFilter

type NodeFilter func(node *graph.Node) bool

type Operation

type Operation[T any] struct {
	// contains filtered or unexported fields
}

func NewOperation

func NewOperation[T any](opCtx OperationContext) *Operation[T]

func StartNewOperation

func StartNewOperation[T any](opCtx OperationContext) *Operation[T]

func (*Operation[T]) Done

func (s *Operation[T]) Done() error

func (*Operation[T]) Start

func (s *Operation[T]) Start()

func (*Operation[T]) SubmitReader

func (s *Operation[T]) SubmitReader(reader ReaderFunc[T]) error

func (*Operation[T]) SubmitWriter

func (s *Operation[T]) SubmitWriter(writer WriterFunc[T]) error

type OperationContext

type OperationContext struct {
	Parent            context.Context
	DB                graph.Database
	NumReaders        int
	NumWriters        int
	ReaderJobCapacity int
	WriterJobCapacity int
}

func (OperationContext) GetParent

func (s OperationContext) GetParent() context.Context

func (OperationContext) GetReaderJobCapacity

func (s OperationContext) GetReaderJobCapacity() int

func (OperationContext) GetWriterJobCapacity

func (s OperationContext) GetWriterJobCapacity() int

type ParallelNodeQueryBuilder

type ParallelNodeQueryBuilder[T any] struct {
	// contains filtered or unexported fields
}

ParallelNodeQueryBuilder is a type that can be used to construct a dawgs node query that is run in parallel. The Stream(...) function commits the query to as many workers as specified and then submits all results to a single channel that can be safely ranged over. Context cancellation is taken into consideration and the channel will close upon exit of the parallel query's context.

func NewParallelNodeQuery

func NewParallelNodeQuery[T any](db graph.Database) *ParallelNodeQueryBuilder[T]

func (*ParallelNodeQueryBuilder[T]) Error

func (s *ParallelNodeQueryBuilder[T]) Error() error

Error returns any error that may have occurred during the parallel operation. This error may be a joined error.

func (*ParallelNodeQueryBuilder[T]) Join

func (s *ParallelNodeQueryBuilder[T]) Join()

Join blocks the current thread and waits for the parallel node query to complete.

func (*ParallelNodeQueryBuilder[T]) Stream

func (s *ParallelNodeQueryBuilder[T]) Stream(ctx context.Context, numWorkers int) <-chan T

Stream commits the query to the database in parallel and writes all results to the returned output channel.

func (*ParallelNodeQueryBuilder[T]) UsingQuery

func (s *ParallelNodeQueryBuilder[T]) UsingQuery(queryDelegate func(query graph.NodeQuery, outC chan<- T) error) *ParallelNodeQueryBuilder[T]

UsingQuery specifies the execution and marshalling of results from the database. All results written to the outC channel parameter will be received by the Stream(...) caller.

func (*ParallelNodeQueryBuilder[T]) WithCriteria

func (s *ParallelNodeQueryBuilder[T]) WithCriteria(criteria graph.Criteria) *ParallelNodeQueryBuilder[T]

WithCriteria specifies the criteria being used to filter this query.

type PathFilter

type PathFilter func(ctx *TraversalContext, segment *graph.PathSegment) bool

PathFilter is invoked on completed paths identified during a graph traversal. It may return a boolean value representing if the path was consumed. If consumed, a rendered path is then tracked for traversal plan limit specifications.

type PathVisitor

type PathVisitor func(ctx *TraversalContext, segment *graph.PathSegment) error

PathVisitor is invoked on completed paths identified during a graph traversal. It may return an error value in the case where a fatal error condition has been encountered, rendering further traversal moot.

type ReaderFunc

type ReaderFunc[T any] func(ctx context.Context, tx graph.Transaction, outC chan<- T) error

type RelationshipNodes

type RelationshipNodes struct {
	Start []graph.ID
	End   []graph.ID
}

func CollectNodeIDs

func CollectNodeIDs(relationshipQuery graph.RelationshipQuery) (RelationshipNodes, error)

func (*RelationshipNodes) Add

func (s *RelationshipNodes) Add(relationship *graph.Relationship)

type SegmentFilter

type SegmentFilter func(ctx *TraversalContext, segment *graph.PathSegment) bool

type TraversalContext

type TraversalContext struct {
	LimitSkipTracker LimitSkipTracker
}

type TraversalPlan

type TraversalPlan struct {
	Root                  *graph.Node
	Direction             graph.Direction
	BranchQuery           graph.CriteriaProvider
	DepthExceptionHandler DepthExceptionHandler
	ExpansionFilter       func(segment *graph.PathSegment) bool
	DescentFilter         SegmentFilter
	PathFilter            PathFilter
	Skip                  int
	Limit                 int
}

type WriterFunc

type WriterFunc[T any] func(ctx context.Context, batch graph.Batch, inC <-chan T) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL