Documentation
¶
Index ¶
- Variables
- type BatchOption
- type BatchResult
- type DAG
- type Edge
- type Factory
- type MemoryDAG
- func (d *MemoryDAG[T]) AddEdge(ctx context.Context, from, to T) error
- func (d *MemoryDAG[T]) AddVertex(ctx context.Context, vertex T) error
- func (d *MemoryDAG[T]) Close() error
- func (d *MemoryDAG[T]) DelEdge(ctx context.Context, from, to T) error
- func (d *MemoryDAG[T]) DelVertex(ctx context.Context, vertex T) error
- func (d *MemoryDAG[T]) EdgeCount(ctx context.Context) (int, error)
- func (d *MemoryDAG[T]) HasEdge(ctx context.Context, from, to T) (bool, error)
- func (d *MemoryDAG[T]) HasVertex(ctx context.Context, vertex T) (bool, error)
- func (d *MemoryDAG[T]) InDegree(ctx context.Context, vertex T) (int, error)
- func (d *MemoryDAG[T]) OutDegree(ctx context.Context, vertex T) (int, error)
- func (d *MemoryDAG[T]) Pipeline(ctx context.Context) (<-chan T, error)
- func (d *MemoryDAG[T]) Predecessors(ctx context.Context, vertex T) ([]T, error)
- func (d *MemoryDAG[T]) Successors(ctx context.Context, vertex T) ([]T, error)
- func (d *MemoryDAG[T]) VertexCount(ctx context.Context) (int, error)
- func (d *MemoryDAG[T]) Vertices(ctx context.Context) ([]T, error)
- type MemoryDAGFactory
- type Option
- type Options
- type RedisDAG
- func (d *RedisDAG[T]) AddEdge(ctx context.Context, from, to T) error
- func (d *RedisDAG[T]) AddEdges(ctx context.Context, edges []Edge[T], opts ...BatchOption) (*BatchResult, error)
- func (d *RedisDAG[T]) AddVertex(ctx context.Context, vertex T) error
- func (d *RedisDAG[T]) AddVertices(ctx context.Context, vertices []T, opts ...BatchOption) (*BatchResult, error)
- func (d *RedisDAG[T]) Cleanup(ctx context.Context) error
- func (d *RedisDAG[T]) Close() error
- func (d *RedisDAG[T]) DelEdge(ctx context.Context, from, to T) error
- func (d *RedisDAG[T]) DelEdges(ctx context.Context, edges []Edge[T], opts ...BatchOption) (*BatchResult, error)
- func (d *RedisDAG[T]) DelVertex(ctx context.Context, vertex T) error
- func (d *RedisDAG[T]) DelVertices(ctx context.Context, vertices []T, opts ...BatchOption) (*BatchResult, error)
- func (d *RedisDAG[T]) EdgeCount(ctx context.Context) (int, error)
- func (d *RedisDAG[T]) HasEdge(ctx context.Context, from, to T) (bool, error)
- func (d *RedisDAG[T]) HasVertex(ctx context.Context, vertex T) (bool, error)
- func (d *RedisDAG[T]) InDegree(ctx context.Context, vertex T) (int, error)
- func (d *RedisDAG[T]) OutDegree(ctx context.Context, vertex T) (int, error)
- func (d *RedisDAG[T]) Pipeline(ctx context.Context) (<-chan T, error)
- func (d *RedisDAG[T]) Predecessors(ctx context.Context, vertex T) ([]T, error)
- func (d *RedisDAG[T]) SetLockerGenerator(gen locker.SyncLockerGenerator)
- func (d *RedisDAG[T]) Successors(ctx context.Context, vertex T) ([]T, error)
- func (d *RedisDAG[T]) VertexCount(ctx context.Context) (int, error)
- func (d *RedisDAG[T]) Vertices(ctx context.Context) ([]T, error)
- type RedisDAGConfig
- type RedisDAGFactory
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type BatchOption ¶
type BatchOption func(*batchOptions)
BatchOption configures batch operation behavior.
func WithBatchLock ¶
func WithBatchLock() BatchOption
WithBatchLock enables distributed locking for the batch operation. This ensures consistency in distributed environments.
func WithBatchSize ¶
func WithBatchSize(size int) BatchOption
WithBatchSize sets the batch size for Redis pipeline operations.
func WithSkipCycleDetection ¶
func WithSkipCycleDetection() BatchOption
WithSkipCycleDetection skips cycle detection for batch edge additions. WARNING: Use with caution. If cycles are created, the DAG may become invalid.
func WithoutBatchLock ¶
func WithoutBatchLock() BatchOption
WithoutBatchLock disables distributed locking for the batch operation. Use this when you know there are no concurrent operations.
type BatchResult ¶
type BatchResult struct {
// Succeeded is the number of operations that succeeded.
Succeeded int
// Failed is the number of operations that failed.
Failed int
// Errors contains the errors for failed operations.
// Key is the index of the failed operation in the input slice.
Errors map[int]error
}
BatchResult contains the results of a batch operation.
type DAG ¶
type DAG[T comparable] interface { // AddVertex adds a vertex to the graph. // If the vertex already exists, this is a no-op. AddVertex(ctx context.Context, vertex T) error // AddEdge adds a directed edge from `from` to `to`. // This means `from` must be processed before `to`. // Both vertices are created if they don't exist. // Returns ErrCycleDetected if adding this edge would create a cycle. AddEdge(ctx context.Context, from, to T) error // DelVertex removes a vertex and all its associated edges. // Returns ErrVertexNotFound if vertex doesn't exist. DelVertex(ctx context.Context, vertex T) error // DelEdge removes the edge from `from` to `to`. // Returns ErrEdgeNotFound if the edge doesn't exist. DelEdge(ctx context.Context, from, to T) error // HasVertex checks if a vertex exists in the graph. HasVertex(ctx context.Context, vertex T) (bool, error) // HasEdge checks if an edge exists from `from` to `to`. HasEdge(ctx context.Context, from, to T) (bool, error) // InDegree returns the in-degree of a vertex (number of incoming edges). // Returns ErrVertexNotFound if vertex doesn't exist. InDegree(ctx context.Context, vertex T) (int, error) // OutDegree returns the out-degree of a vertex (number of outgoing edges). // Returns ErrVertexNotFound if vertex doesn't exist. OutDegree(ctx context.Context, vertex T) (int, error) // VertexCount returns the total number of vertices in the graph. VertexCount(ctx context.Context) (int, error) // EdgeCount returns the total number of edges in the graph. EdgeCount(ctx context.Context) (int, error) // Vertices returns all vertices in the graph. Vertices(ctx context.Context) ([]T, error) // Successors returns all vertices that have an edge from the given vertex. // Returns ErrVertexNotFound if vertex doesn't exist. Successors(ctx context.Context, vertex T) ([]T, error) // Predecessors returns all vertices that have an edge to the given vertex. // Returns ErrVertexNotFound if vertex doesn't exist. Predecessors(ctx context.Context, vertex T) ([]T, error) // Pipeline returns a channel that emits vertices in topological order. // Vertices with in-degree 0 are emitted first. // Once a vertex is emitted, it is removed from the graph. // The channel is closed when the graph is empty or Close() is called. Pipeline(ctx context.Context) (<-chan T, error) // Close releases resources and stops the Pipeline. Close() error }
DAG represents a Directed Acyclic Graph data structure. It supports adding vertices (nodes) and edges (dependencies), and provides a Pipeline for consuming vertices in topological order.
The generic type T represents the vertex identifier type. NOTE: It's thread-safe.
type Edge ¶
type Edge[T comparable] struct { From T To T }
Edge represents a directed edge from one vertex to another.
type Factory ¶
type Factory[T comparable] interface { // Create creates a new DAG with the given name. // If a DAG with the same name already exists, it returns the existing one. Create(ctx context.Context, name string, opts ...Option) (DAG[T], error) }
Factory creates DAG instances.
type MemoryDAG ¶
type MemoryDAG[T comparable] struct { // contains filtered or unexported fields }
MemoryDAG is an in-memory implementation of DAG.
func NewMemoryDAG ¶
func NewMemoryDAG[T comparable](name string, opts ...Option) (*MemoryDAG[T], error)
NewMemoryDAG creates a new in-memory DAG.
func (*MemoryDAG[T]) Predecessors ¶
func (*MemoryDAG[T]) Successors ¶
func (*MemoryDAG[T]) VertexCount ¶
type MemoryDAGFactory ¶
type MemoryDAGFactory[T comparable] struct { // contains filtered or unexported fields }
MemoryDAGFactory creates MemoryDAG instances.
func NewMemoryDAGFactory ¶
func NewMemoryDAGFactory[T comparable]() *MemoryDAGFactory[T]
NewMemoryDAGFactory creates a new MemoryDAGFactory.
func (*MemoryDAGFactory[T]) Close ¶
func (f *MemoryDAGFactory[T]) Close() error
Close closes all DAGs managed by the factory.
func (*MemoryDAGFactory[T]) Remove ¶
func (f *MemoryDAGFactory[T]) Remove(name string) error
Remove removes a DAG from the factory and closes it.
type Option ¶
type Option func(*Options)
Option configures DAG behavior.
func WithBufferSize ¶
WithBufferSize sets the buffer size for the Pipeline channel.
func WithDAGLogger ¶
WithDAGLogger sets the logger for DAG operations.
func WithPollInterval ¶
WithPollInterval sets the poll interval in milliseconds.
type Options ¶
type Options struct {
// BufferSize is the buffer size for the Pipeline channel.
// Default is 0 (unbuffered).
BufferSize int
// PollInterval is the interval for polling vertices with in-degree 0.
// Default is 100ms.
PollInterval int
// Logger is the optional logger for DAG operations.
// If nil, logging is disabled.
Logger *zerolog.Logger
}
Options holds DAG configuration.
type RedisDAG ¶
type RedisDAG[T comparable] struct { // contains filtered or unexported fields }
RedisDAG is a Redis-backed implementation of DAG. It stores the graph structure in Redis, allowing distributed access.
func NewRedisDAG ¶
func NewRedisDAG[T comparable](client redis.Cmdable, name string, config *RedisDAGConfig[T], opts ...Option) (*RedisDAG[T], error)
NewRedisDAG creates a new Redis-backed DAG.
func (*RedisDAG[T]) AddEdges ¶
func (d *RedisDAG[T]) AddEdges(ctx context.Context, edges []Edge[T], opts ...BatchOption) (*BatchResult, error)
AddEdges adds multiple edges to the graph in a single batch operation. This is more efficient than calling AddEdge multiple times. If cycle detection is enabled (default), it will detect cycles for each edge.
func (*RedisDAG[T]) AddVertices ¶
func (d *RedisDAG[T]) AddVertices(ctx context.Context, vertices []T, opts ...BatchOption) (*BatchResult, error)
AddVertices adds multiple vertices to the graph in a single batch operation. This is more efficient than calling AddVertex multiple times.
func (*RedisDAG[T]) DelEdges ¶
func (d *RedisDAG[T]) DelEdges(ctx context.Context, edges []Edge[T], opts ...BatchOption) (*BatchResult, error)
DelEdges removes multiple edges from the graph. This is more efficient than calling DelEdge multiple times.
func (*RedisDAG[T]) DelVertices ¶
func (d *RedisDAG[T]) DelVertices(ctx context.Context, vertices []T, opts ...BatchOption) (*BatchResult, error)
DelVertices removes multiple vertices and their associated edges. This is more efficient than calling DelVertex multiple times.
func (*RedisDAG[T]) Predecessors ¶
func (*RedisDAG[T]) SetLockerGenerator ¶
func (d *RedisDAG[T]) SetLockerGenerator(gen locker.SyncLockerGenerator)
SetLockerGenerator sets the locker generator for distributed locking. This is optional and only needed when using batch operations with locking.
func (*RedisDAG[T]) Successors ¶
func (*RedisDAG[T]) VertexCount ¶
type RedisDAGConfig ¶
type RedisDAGConfig[T comparable] struct { // Marshal converts vertex to string for Redis storage. // Default uses JSON marshaling. Marshal func(T) (string, error) // Unmarshal converts string from Redis to vertex. // Default uses JSON unmarshaling. Unmarshal func(string) (T, error) }
RedisDAGConfig configures RedisDAG behavior.
type RedisDAGFactory ¶
type RedisDAGFactory[T comparable] struct { // contains filtered or unexported fields }
RedisDAGFactory creates RedisDAG instances.
func NewRedisDAGFactory ¶
func NewRedisDAGFactory[T comparable](client redis.Cmdable, config *RedisDAGConfig[T]) *RedisDAGFactory[T]
NewRedisDAGFactory creates a new RedisDAGFactory.
func (*RedisDAGFactory[T]) Close ¶
func (f *RedisDAGFactory[T]) Close(ctx context.Context) error
Close closes all DAGs managed by the factory.