Documentation
¶
Overview ¶
Package dag provides a DAG (Directed Acyclic Graph) execution engine for orchestrating provider-based service calls in dependency order.
It composes with gokit/provider — each node wraps a RequestResponse[I,O] and all existing provider middleware (resilience, stateful, logging, tracing) applies per-node without changes.
Two execution modes share the same graph:
- ExecuteBatch: runs ALL nodes in dependency order (one-shot)
- ExecuteStreaming: runs only nodes whose schedule/condition is met
Index ¶
- func BuildLevels(g *Graph) ([][]string, error)
- func Read[T any](state *State, port Port[T]) (T, error)
- func Write[T any](state *State, port Port[T], value T)
- type ConditionFunc
- type Edge
- type Engine
- type FilePipelineLoader
- type Graph
- type Node
- type NodeConfig
- type NodeDef
- type NodeFilter
- type NodeResult
- type Pipeline
- type PipelineLoader
- type Port
- type Registry
- type Result
- type ScheduleConfig
- type Session
- type State
- type Tool
- type ToolConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildLevels ¶
BuildLevels uses Kahn's algorithm to group nodes by dependency level. Nodes within the same level can execute in parallel. Returns an error if a cycle is detected.
Types ¶
type ConditionFunc ¶
ConditionFunc evaluates whether a node should run based on state.
type Engine ¶
type Engine struct {
// MaxParallel limits concurrent nodes per level (0 = unlimited).
MaxParallel int
}
Engine executes a graph in dependency order.
func (*Engine) ExecuteBatch ¶
ExecuteBatch runs ALL nodes in dependency order, one-shot.
func (*Engine) ExecuteStreaming ¶
func (e *Engine) ExecuteStreaming(ctx context.Context, g *Graph, state *State, filter NodeFilter) (*Result, error)
ExecuteStreaming runs only nodes that pass the filter. Nodes that don't pass are marked as "skipped".
type FilePipelineLoader ¶
type FilePipelineLoader struct {
// contains filtered or unexported fields
}
FilePipelineLoader loads pipelines from YAML files on disk.
type Graph ¶
Graph declares nodes and edges (dependency relationships).
func ResolvePipeline ¶
func ResolvePipeline(p *Pipeline, registry *Registry, loader PipelineLoader) (*Graph, error)
ResolvePipeline converts a Pipeline definition into an executable Graph. It resolves includes recursively and looks up node implementations from the registry.
type Node ¶
Node is the execution unit in a DAG.
func FromProvider ¶
func FromProvider[I, O any](cfg NodeConfig[I, O]) Node
FromProvider bridges a provider.RequestResponse[I,O] into a DAG Node.
func WithLogging ¶
WithLogging wraps a Node with execution logging. Logs: node name, duration, and success/error status.
func WithMetrics ¶
func WithMetrics(node Node, metrics *observability.Metrics) Node
WithMetrics wraps a Node with metric recording. Records operation count, duration, and errors.
func WithTracing ¶
WithTracing wraps a Node with OpenTelemetry span creation. Each execution creates a span named "{prefix}.{nodeName}".
type NodeConfig ¶
type NodeConfig[I, O any] struct { // Name is the unique node identifier in the graph. Name string // Service is the provider to execute. Service provider.RequestResponse[I, O] // Extract reads inputs from state. Extract func(state *State) (I, error) // Output is the port where the result is written. Output Port[O] }
NodeConfig configures a provider-backed node.
type NodeDef ¶
type NodeDef struct {
// Component is the registry lookup key for this node.
Component string `yaml:"component"`
// DependsOn lists node names this node depends on.
DependsOn []string `yaml:"depends_on,omitempty"`
// Schedule configures schedule-based execution (streaming mode only).
Schedule *ScheduleConfig `yaml:"schedule,omitempty"`
// Condition is a named condition function key.
Condition string `yaml:"condition,omitempty"`
}
NodeDef defines a node within a pipeline.
type NodeFilter ¶
NodeFilter returns true if a node should execute in this cycle.
type NodeResult ¶
type NodeResult struct {
Name string
Status string // "completed" | "skipped" | "failed"
Duration time.Duration
Output any
Error error
}
NodeResult holds the outcome of a single node execution.
type Pipeline ¶
type Pipeline struct {
// Name is the pipeline identifier.
Name string `yaml:"name"`
// Mode is the execution mode: "batch" or "streaming".
Mode string `yaml:"mode"`
// Includes lists sub-pipeline names to compose (recursive).
Includes []string `yaml:"includes,omitempty"`
// Nodes defines the pipeline's node specifications.
Nodes []NodeDef `yaml:"nodes"`
}
Pipeline is a composable, YAML-defined graph definition.
type PipelineLoader ¶
PipelineLoader loads pipeline definitions by name.
func NewFilePipelineLoader ¶
func NewFilePipelineLoader(dirs ...string) PipelineLoader
NewFilePipelineLoader creates a loader that searches the given directories for pipeline YAML files.
type Port ¶
Port is a compile-time typed accessor for State. It prevents type mismatches between nodes at compile time.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry provides named node lookup for dynamic graph construction.
type Result ¶
type Result struct {
NodeResults map[string]NodeResult
Duration time.Duration
}
Result holds the outcome of a graph execution.
type ScheduleConfig ¶
type ScheduleConfig struct {
// Interval is the minimum time between runs.
Interval time.Duration `yaml:"interval"`
// MinBuffer is the minimum data accumulation time before first run.
MinBuffer time.Duration `yaml:"min_buffer"`
}
ScheduleConfig defines schedule-based execution parameters.
type Session ¶
type Session struct {
// ID is the session identifier.
ID string
// State is the shared state across execution cycles.
State *State
// contains filtered or unexported fields
}
Session holds per-session state for streaming pipelines.
func (*Session) ReadyFilter ¶
func (s *Session) ReadyFilter(pipeline *Pipeline, conditions map[string]ConditionFunc) NodeFilter
ReadyFilter returns a NodeFilter that checks schedule + conditions. A node is ready if:
- It has no schedule (always ready), OR
- Its schedule interval has elapsed AND its min_buffer period has passed
- AND its condition function (if any) returns true
type State ¶
type State struct {
// contains filtered or unexported fields
}
State is a thread-safe key-value store for passing data between nodes.
type Tool ¶
type Tool[I, O any] struct { // contains filtered or unexported fields }
Tool wraps a DAG pipeline as a provider.RequestResponse.
type ToolConfig ¶
type ToolConfig[I, O any] struct { // Name is the provider name. Name string // InputFn writes input into state before execution. InputFn func(input I, state *State) // OutputFn reads output from state after execution. OutputFn func(state *State) (O, error) }
ToolConfig configures how a DAG pipeline maps to a provider interface.