dag

package
v0.12.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrVertexNotFound = errors.New("vertex not found")
	ErrEdgeNotFound   = errors.New("edge not found")
	ErrCycleDetected  = errors.New("cycle detected: adding this edge would create a cycle")
	ErrDAGClosed      = errors.New("dag is closed")
)

Functions

This section is empty.

Types

type BatchOption

type BatchOption func(*batchOptions)

BatchOption configures batch operation behavior.

func WithBatchLock

func WithBatchLock() BatchOption

WithBatchLock enables distributed locking for the batch operation. This ensures consistency in distributed environments.

func WithBatchSize

func WithBatchSize(size int) BatchOption

WithBatchSize sets the batch size for Redis pipeline operations.

func WithSkipCycleDetection

func WithSkipCycleDetection() BatchOption

WithSkipCycleDetection skips cycle detection for batch edge additions. WARNING: Use with caution. If cycles are created, the DAG may become invalid.

func WithoutBatchLock

func WithoutBatchLock() BatchOption

WithoutBatchLock disables distributed locking for the batch operation. Use this when you know there are no concurrent operations.

type BatchResult

type BatchResult struct {
	// Succeeded is the number of operations that succeeded.
	Succeeded int
	// Failed is the number of operations that failed.
	Failed int
	// Errors contains the errors for failed operations.
	// Key is the index of the failed operation in the input slice.
	Errors map[int]error
}

BatchResult contains the results of a batch operation.

type DAG

type DAG[T comparable] interface {
	// AddVertex adds a vertex to the graph.
	// If the vertex already exists, this is a no-op.
	AddVertex(ctx context.Context, vertex T) error

	// AddEdge adds a directed edge from `from` to `to`.
	// This means `from` must be processed before `to`.
	// Both vertices are created if they don't exist.
	// Returns ErrCycleDetected if adding this edge would create a cycle.
	AddEdge(ctx context.Context, from, to T) error

	// DelVertex removes a vertex and all its associated edges.
	// Returns ErrVertexNotFound if vertex doesn't exist.
	DelVertex(ctx context.Context, vertex T) error

	// DelEdge removes the edge from `from` to `to`.
	// Returns ErrEdgeNotFound if the edge doesn't exist.
	DelEdge(ctx context.Context, from, to T) error

	// HasVertex checks if a vertex exists in the graph.
	HasVertex(ctx context.Context, vertex T) (bool, error)

	// HasEdge checks if an edge exists from `from` to `to`.
	HasEdge(ctx context.Context, from, to T) (bool, error)

	// InDegree returns the in-degree of a vertex (number of incoming edges).
	// Returns ErrVertexNotFound if vertex doesn't exist.
	InDegree(ctx context.Context, vertex T) (int, error)

	// OutDegree returns the out-degree of a vertex (number of outgoing edges).
	// Returns ErrVertexNotFound if vertex doesn't exist.
	OutDegree(ctx context.Context, vertex T) (int, error)

	// VertexCount returns the total number of vertices in the graph.
	VertexCount(ctx context.Context) (int, error)

	// EdgeCount returns the total number of edges in the graph.
	EdgeCount(ctx context.Context) (int, error)

	// Vertices returns all vertices in the graph.
	Vertices(ctx context.Context) ([]T, error)

	// Successors returns all vertices that have an edge from the given vertex.
	// Returns ErrVertexNotFound if vertex doesn't exist.
	Successors(ctx context.Context, vertex T) ([]T, error)

	// Predecessors returns all vertices that have an edge to the given vertex.
	// Returns ErrVertexNotFound if vertex doesn't exist.
	Predecessors(ctx context.Context, vertex T) ([]T, error)

	// Pipeline returns a channel that emits vertices in topological order.
	// Vertices with in-degree 0 are emitted first.
	// Once a vertex is emitted, it is removed from the graph.
	// The channel is closed when the graph is empty or Close() is called.
	Pipeline(ctx context.Context) (<-chan T, error)

	// Close releases resources and stops the Pipeline.
	Close() error
}

DAG represents a Directed Acyclic Graph data structure. It supports adding vertices (nodes) and edges (dependencies), and provides a Pipeline for consuming vertices in topological order.

The generic type T represents the vertex identifier type. NOTE: It's thread-safe.

type Edge

type Edge[T comparable] struct {
	From T
	To   T
}

Edge represents a directed edge from one vertex to another.

type Factory

type Factory[T comparable] interface {
	// Create creates a new DAG with the given name.
	// If a DAG with the same name already exists, it returns the existing one.
	Create(ctx context.Context, name string, opts ...Option) (DAG[T], error)
}

Factory creates DAG instances.

type MemoryDAG

type MemoryDAG[T comparable] struct {
	// contains filtered or unexported fields
}

MemoryDAG is an in-memory implementation of DAG.

func NewMemoryDAG

func NewMemoryDAG[T comparable](name string, opts ...Option) (*MemoryDAG[T], error)

NewMemoryDAG creates a new in-memory DAG.

func (*MemoryDAG[T]) AddEdge

func (d *MemoryDAG[T]) AddEdge(ctx context.Context, from, to T) error

func (*MemoryDAG[T]) AddVertex

func (d *MemoryDAG[T]) AddVertex(ctx context.Context, vertex T) error

func (*MemoryDAG[T]) Close

func (d *MemoryDAG[T]) Close() error

func (*MemoryDAG[T]) DelEdge

func (d *MemoryDAG[T]) DelEdge(ctx context.Context, from, to T) error

func (*MemoryDAG[T]) DelVertex

func (d *MemoryDAG[T]) DelVertex(ctx context.Context, vertex T) error

func (*MemoryDAG[T]) EdgeCount

func (d *MemoryDAG[T]) EdgeCount(ctx context.Context) (int, error)

func (*MemoryDAG[T]) HasEdge

func (d *MemoryDAG[T]) HasEdge(ctx context.Context, from, to T) (bool, error)

func (*MemoryDAG[T]) HasVertex

func (d *MemoryDAG[T]) HasVertex(ctx context.Context, vertex T) (bool, error)

func (*MemoryDAG[T]) InDegree

func (d *MemoryDAG[T]) InDegree(ctx context.Context, vertex T) (int, error)

func (*MemoryDAG[T]) OutDegree

func (d *MemoryDAG[T]) OutDegree(ctx context.Context, vertex T) (int, error)

func (*MemoryDAG[T]) Pipeline

func (d *MemoryDAG[T]) Pipeline(ctx context.Context) (<-chan T, error)

func (*MemoryDAG[T]) Predecessors

func (d *MemoryDAG[T]) Predecessors(ctx context.Context, vertex T) ([]T, error)

func (*MemoryDAG[T]) Successors

func (d *MemoryDAG[T]) Successors(ctx context.Context, vertex T) ([]T, error)

func (*MemoryDAG[T]) VertexCount

func (d *MemoryDAG[T]) VertexCount(ctx context.Context) (int, error)

func (*MemoryDAG[T]) Vertices

func (d *MemoryDAG[T]) Vertices(ctx context.Context) ([]T, error)

type MemoryDAGFactory

type MemoryDAGFactory[T comparable] struct {
	// contains filtered or unexported fields
}

MemoryDAGFactory creates MemoryDAG instances.

func NewMemoryDAGFactory

func NewMemoryDAGFactory[T comparable]() *MemoryDAGFactory[T]

NewMemoryDAGFactory creates a new MemoryDAGFactory.

func (*MemoryDAGFactory[T]) Close

func (f *MemoryDAGFactory[T]) Close() error

Close closes all DAGs managed by the factory.

func (*MemoryDAGFactory[T]) Create

func (f *MemoryDAGFactory[T]) Create(ctx context.Context, name string, opts ...Option) (DAG[T], error)

func (*MemoryDAGFactory[T]) Remove

func (f *MemoryDAGFactory[T]) Remove(name string) error

Remove removes a DAG from the factory and closes it.

type Option

type Option func(*Options)

Option configures DAG behavior.

func WithBufferSize

func WithBufferSize(size int) Option

WithBufferSize sets the buffer size for the Pipeline channel.

func WithDAGLogger

func WithDAGLogger(logger *zerolog.Logger) Option

WithDAGLogger sets the logger for DAG operations.

func WithPollInterval

func WithPollInterval(ms int) Option

WithPollInterval sets the poll interval in milliseconds.

type Options

type Options struct {
	// BufferSize is the buffer size for the Pipeline channel.
	// Default is 0 (unbuffered).
	BufferSize int

	// PollInterval is the interval for polling vertices with in-degree 0.
	// Default is 100ms.
	PollInterval int

	// Logger is the optional logger for DAG operations.
	// If nil, logging is disabled.
	Logger *zerolog.Logger
}

Options holds DAG configuration.

func DefaultOptions

func DefaultOptions() *Options

DefaultOptions returns default options.

type RedisDAG

type RedisDAG[T comparable] struct {
	// contains filtered or unexported fields
}

RedisDAG is a Redis-backed implementation of DAG. It stores the graph structure in Redis, allowing distributed access.

func NewRedisDAG

func NewRedisDAG[T comparable](client redis.Cmdable, name string, config *RedisDAGConfig[T], opts ...Option) (*RedisDAG[T], error)

NewRedisDAG creates a new Redis-backed DAG.

func (*RedisDAG[T]) AddEdge

func (d *RedisDAG[T]) AddEdge(ctx context.Context, from, to T) error

func (*RedisDAG[T]) AddEdges

func (d *RedisDAG[T]) AddEdges(ctx context.Context, edges []Edge[T], opts ...BatchOption) (*BatchResult, error)

AddEdges adds multiple edges to the graph in a single batch operation. This is more efficient than calling AddEdge multiple times. If cycle detection is enabled (default), it will detect cycles for each edge.

func (*RedisDAG[T]) AddVertex

func (d *RedisDAG[T]) AddVertex(ctx context.Context, vertex T) error

func (*RedisDAG[T]) AddVertices

func (d *RedisDAG[T]) AddVertices(ctx context.Context, vertices []T, opts ...BatchOption) (*BatchResult, error)

AddVertices adds multiple vertices to the graph in a single batch operation. This is more efficient than calling AddVertex multiple times.

func (*RedisDAG[T]) Cleanup

func (d *RedisDAG[T]) Cleanup(ctx context.Context) error

Cleanup removes all Redis keys associated with this DAG.

func (*RedisDAG[T]) Close

func (d *RedisDAG[T]) Close() error

func (*RedisDAG[T]) DelEdge

func (d *RedisDAG[T]) DelEdge(ctx context.Context, from, to T) error

func (*RedisDAG[T]) DelEdges

func (d *RedisDAG[T]) DelEdges(ctx context.Context, edges []Edge[T], opts ...BatchOption) (*BatchResult, error)

DelEdges removes multiple edges from the graph. This is more efficient than calling DelEdge multiple times.

func (*RedisDAG[T]) DelVertex

func (d *RedisDAG[T]) DelVertex(ctx context.Context, vertex T) error

func (*RedisDAG[T]) DelVertices

func (d *RedisDAG[T]) DelVertices(ctx context.Context, vertices []T, opts ...BatchOption) (*BatchResult, error)

DelVertices removes multiple vertices and their associated edges. This is more efficient than calling DelVertex multiple times.

func (*RedisDAG[T]) EdgeCount

func (d *RedisDAG[T]) EdgeCount(ctx context.Context) (int, error)

func (*RedisDAG[T]) HasEdge

func (d *RedisDAG[T]) HasEdge(ctx context.Context, from, to T) (bool, error)

func (*RedisDAG[T]) HasVertex

func (d *RedisDAG[T]) HasVertex(ctx context.Context, vertex T) (bool, error)

func (*RedisDAG[T]) InDegree

func (d *RedisDAG[T]) InDegree(ctx context.Context, vertex T) (int, error)

func (*RedisDAG[T]) OutDegree

func (d *RedisDAG[T]) OutDegree(ctx context.Context, vertex T) (int, error)

func (*RedisDAG[T]) Pipeline

func (d *RedisDAG[T]) Pipeline(ctx context.Context) (<-chan T, error)

func (*RedisDAG[T]) Predecessors

func (d *RedisDAG[T]) Predecessors(ctx context.Context, vertex T) ([]T, error)

func (*RedisDAG[T]) SetLockerGenerator

func (d *RedisDAG[T]) SetLockerGenerator(gen locker.SyncLockerGenerator)

SetLockerGenerator sets the locker generator for distributed locking. This is optional and only needed when using batch operations with locking.

func (*RedisDAG[T]) Successors

func (d *RedisDAG[T]) Successors(ctx context.Context, vertex T) ([]T, error)

func (*RedisDAG[T]) VertexCount

func (d *RedisDAG[T]) VertexCount(ctx context.Context) (int, error)

func (*RedisDAG[T]) Vertices

func (d *RedisDAG[T]) Vertices(ctx context.Context) ([]T, error)

type RedisDAGConfig

type RedisDAGConfig[T comparable] struct {
	// Marshal converts vertex to string for Redis storage.
	// Default uses JSON marshaling.
	Marshal func(T) (string, error)

	// Unmarshal converts string from Redis to vertex.
	// Default uses JSON unmarshaling.
	Unmarshal func(string) (T, error)
}

RedisDAGConfig configures RedisDAG behavior.

type RedisDAGFactory

type RedisDAGFactory[T comparable] struct {
	// contains filtered or unexported fields
}

RedisDAGFactory creates RedisDAG instances.

func NewRedisDAGFactory

func NewRedisDAGFactory[T comparable](client redis.Cmdable, config *RedisDAGConfig[T]) *RedisDAGFactory[T]

NewRedisDAGFactory creates a new RedisDAGFactory.

func (*RedisDAGFactory[T]) Close

func (f *RedisDAGFactory[T]) Close(ctx context.Context) error

Close closes all DAGs managed by the factory.

func (*RedisDAGFactory[T]) Create

func (f *RedisDAGFactory[T]) Create(ctx context.Context, name string, opts ...Option) (DAG[T], error)

func (*RedisDAGFactory[T]) Remove

func (f *RedisDAGFactory[T]) Remove(ctx context.Context, name string) error

Remove removes a DAG from the factory and closes it.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL