dag

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2026 License: MIT Imports: 11 Imported by: 0

README

dag

DAG (Directed Acyclic Graph) execution engine for orchestrating provider-based service calls in dependency order.

Install

go get github.com/kbukum/gokit

Quick Start

package main

import (
    "context"
    "fmt"
    "github.com/kbukum/gokit/dag"
)

func main() {
    // Define typed ports for compile-time safety
    rawPort := dag.Port[string]{Key: "raw"}
    resultPort := dag.Port[string]{Key: "result"}

    // Create nodes (using func nodes for brevity — real code uses FromProvider)
    extract := myExtractNode(rawPort)
    transform := myTransformNode(rawPort, resultPort)

    // Build graph with dependencies
    g := &dag.Graph{
        Nodes: map[string]dag.Node{
            "extract":   extract,
            "transform": transform,
        },
        Edges: []dag.Edge{{From: "extract", To: "transform"}},
    }

    // Execute in dependency order
    engine := &dag.Engine{}
    state := dag.NewState()
    result, _ := engine.ExecuteBatch(context.Background(), g, state)

    out, _ := dag.Read(state, resultPort)
    fmt.Println(out, result.Duration)
}

Provider Integration

Every node wraps a provider.RequestResponse[I,O]. All existing provider middleware applies per-node:

// Wrap a provider as a DAG node
svc := provider.WithResilience(rawSvc, resilience.Config{...})
node := dag.FromProvider(dag.NodeConfig[MyInput, *MyOutput]{
    Name:    "my-service",
    Service: svc,
    Extract: func(state *dag.State) (MyInput, error) {
        return dag.Read(state, inputPort)
    },
    Output: outputPort,
})

Pipeline as Provider

Wrap a DAG pipeline as a provider.RequestResponse for composability:

tool := dag.AsTool[Input, Output](engine, graph, dag.ToolConfig[Input, Output]{
    Name:    "my-pipeline",
    InputFn: func(in Input, state *dag.State) { dag.Write(state, inPort, in) },
    OutputFn: func(state *dag.State) (Output, error) { return dag.Read(state, outPort) },
})

// Now callable like any other provider
result, err := tool.Execute(ctx, myInput)

YAML Pipeline Definitions

name: full-process
mode: batch
includes:
  - data-enrichment
nodes:
  - component: extract
  - component: transform
    depends_on: [extract]
  - component: load
    depends_on: [transform]
loader := dag.NewFilePipelineLoader("./pipelines")
pipeline, _ := loader.Load("full-process")
graph, _ := dag.ResolvePipeline(pipeline, registry, loader)

Streaming Mode

For long-running sessions where nodes fire on different schedules:

sess := dag.NewSession("session-1")
conditions := map[string]dag.ConditionFunc{
    "has-data": func(state *dag.State) bool {
        _, ok := state.Get("data")
        return ok
    },
}

filter := sess.ReadyFilter(pipeline, conditions)
result, _ := engine.ExecuteStreaming(ctx, graph, sess.State, filter)

Observability

Per-node tracing, metrics, and logging:

node = dag.WithTracing(node, "my-pipeline")
node = dag.WithMetrics(node, metrics)
node = dag.WithLogging(node, log)

Key Types & Functions

Name Description
State Thread-safe key-value store for passing data between nodes
Port[T] Compile-time typed accessor for State
Node Interface: Name() + Run(ctx, state)
FromProvider[I,O]() Bridges provider.RequestResponse[I,O] into a Node
Graph / Edge Declares nodes and dependency relationships
BuildLevels() Kahn's algorithm — groups nodes by dependency level
Engine Executes graphs via ExecuteBatch() or ExecuteStreaming()
AsTool[I,O]() Wraps a DAG pipeline as provider.RequestResponse[I,O]
Registry Named node lookup for dynamic graph construction
Pipeline / NodeDef YAML-defined graph definitions with includes
Session Per-session state and schedule tracking for streaming mode
WithTracing / WithMetrics / WithLogging Observability node wrappers

⬅ Back to main README

Documentation

Overview

Package dag provides a DAG (Directed Acyclic Graph) execution engine for orchestrating provider-based service calls in dependency order.

It composes with gokit/provider — each node wraps a RequestResponse[I,O] and all existing provider middleware (resilience, stateful, logging, tracing) applies per-node without changes.

Two execution modes share the same graph:

  • ExecuteBatch: runs ALL nodes in dependency order (one-shot)
  • ExecuteStreaming: runs only nodes whose schedule/condition is met

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildLevels

func BuildLevels(g *Graph) ([][]string, error)

BuildLevels uses Kahn's algorithm to group nodes by dependency level. Nodes within the same level can execute in parallel. Returns an error if a cycle is detected.

func Read

func Read[T any](state *State, port Port[T]) (T, error)

Read retrieves a typed value from state using a Port. Returns an error if the key is missing or the type doesn't match.

func Write

func Write[T any](state *State, port Port[T], value T)

Write stores a typed value into state using a Port.

Types

type ConditionFunc

type ConditionFunc func(state *State) bool

ConditionFunc evaluates whether a node should run based on state.

type Edge

type Edge struct {
	From string
	To   string
}

Edge represents a dependency: To depends on From.

type Engine

type Engine struct {
	// MaxParallel limits concurrent nodes per level (0 = unlimited).
	MaxParallel int
}

Engine executes a graph in dependency order.

func (*Engine) ExecuteBatch

func (e *Engine) ExecuteBatch(ctx context.Context, g *Graph, state *State) (*Result, error)

ExecuteBatch runs ALL nodes in dependency order, one-shot.

func (*Engine) ExecuteStreaming

func (e *Engine) ExecuteStreaming(ctx context.Context, g *Graph, state *State, filter NodeFilter) (*Result, error)

ExecuteStreaming runs only nodes that pass the filter. Nodes that don't pass are marked as "skipped".

type FilePipelineLoader

type FilePipelineLoader struct {
	// contains filtered or unexported fields
}

FilePipelineLoader loads pipelines from YAML files on disk.

func (*FilePipelineLoader) Load

func (l *FilePipelineLoader) Load(name string) (*Pipeline, error)

Load searches for a pipeline YAML file by name across configured directories. It searches for {name}.yaml and {name}.yml in each directory (recursively).

type Graph

type Graph struct {
	Nodes map[string]Node
	Edges []Edge
}

Graph declares nodes and edges (dependency relationships).

func ResolvePipeline

func ResolvePipeline(p *Pipeline, registry *Registry, loader PipelineLoader) (*Graph, error)

ResolvePipeline converts a Pipeline definition into an executable Graph. It resolves includes recursively and looks up node implementations from the registry.

type Node

type Node interface {
	Name() string
	Run(ctx context.Context, state *State) (any, error)
}

Node is the execution unit in a DAG.

func FromProvider

func FromProvider[I, O any](cfg NodeConfig[I, O]) Node

FromProvider bridges a provider.RequestResponse[I,O] into a DAG Node.

func WithLogging

func WithLogging(node Node, log *logger.Logger) Node

WithLogging wraps a Node with execution logging. Logs: node name, duration, and success/error status.

func WithMetrics

func WithMetrics(node Node, metrics *observability.Metrics) Node

WithMetrics wraps a Node with metric recording. Records operation count, duration, and errors.

func WithTracing

func WithTracing(node Node, prefix string) Node

WithTracing wraps a Node with OpenTelemetry span creation. Each execution creates a span named "{prefix}.{nodeName}".

type NodeConfig

type NodeConfig[I, O any] struct {
	// Name is the unique node identifier in the graph.
	Name string
	// Service is the provider to execute.
	Service provider.RequestResponse[I, O]
	// Extract reads inputs from state.
	Extract func(state *State) (I, error)
	// Output is the port where the result is written.
	Output Port[O]
}

NodeConfig configures a provider-backed node.

type NodeDef

type NodeDef struct {
	// Component is the registry lookup key for this node.
	Component string `yaml:"component"`
	// DependsOn lists node names this node depends on.
	DependsOn []string `yaml:"depends_on,omitempty"`
	// Schedule configures schedule-based execution (streaming mode only).
	Schedule *ScheduleConfig `yaml:"schedule,omitempty"`
	// Condition is a named condition function key.
	Condition string `yaml:"condition,omitempty"`
}

NodeDef defines a node within a pipeline.

type NodeFilter

type NodeFilter func(nodeName string, state *State) bool

NodeFilter returns true if a node should execute in this cycle.

type NodeResult

type NodeResult struct {
	Name     string
	Status   string // "completed" | "skipped" | "failed"
	Duration time.Duration
	Output   any
	Error    error
}

NodeResult holds the outcome of a single node execution.

type Pipeline

type Pipeline struct {
	// Name is the pipeline identifier.
	Name string `yaml:"name"`
	// Mode is the execution mode: "batch" or "streaming".
	Mode string `yaml:"mode"`
	// Includes lists sub-pipeline names to compose (recursive).
	Includes []string `yaml:"includes,omitempty"`
	// Nodes defines the pipeline's node specifications.
	Nodes []NodeDef `yaml:"nodes"`
}

Pipeline is a composable, YAML-defined graph definition.

func LoadPipeline

func LoadPipeline(name string, paths ...string) (*Pipeline, error)

LoadPipeline loads a pipeline from explicit file paths. It tries each path until one succeeds.

type PipelineLoader

type PipelineLoader interface {
	Load(name string) (*Pipeline, error)
}

PipelineLoader loads pipeline definitions by name.

func NewFilePipelineLoader

func NewFilePipelineLoader(dirs ...string) PipelineLoader

NewFilePipelineLoader creates a loader that searches the given directories for pipeline YAML files.

type Port

type Port[T any] struct {
	Key string
}

Port is a compile-time typed accessor for State. It prevents type mismatches between nodes at compile time.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry provides named node lookup for dynamic graph construction.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new empty Registry.

func (*Registry) Get

func (r *Registry) Get(name string) (Node, bool)

Get retrieves a node by name.

func (*Registry) List

func (r *Registry) List() []string

List returns sorted names of all registered nodes.

func (*Registry) Register

func (r *Registry) Register(name string, node Node)

Register adds a node to the registry.

type Result

type Result struct {
	NodeResults map[string]NodeResult
	Duration    time.Duration
}

Result holds the outcome of a graph execution.

type ScheduleConfig

type ScheduleConfig struct {
	// Interval is the minimum time between runs.
	Interval time.Duration `yaml:"interval"`
	// MinBuffer is the minimum data accumulation time before first run.
	MinBuffer time.Duration `yaml:"min_buffer"`
}

ScheduleConfig defines schedule-based execution parameters.

type Session

type Session struct {
	// ID is the session identifier.
	ID string
	// State is the shared state across execution cycles.
	State *State
	// contains filtered or unexported fields
}

Session holds per-session state for streaming pipelines.

func NewSession

func NewSession(id string) *Session

NewSession creates a new streaming session.

func (*Session) ReadyFilter

func (s *Session) ReadyFilter(pipeline *Pipeline, conditions map[string]ConditionFunc) NodeFilter

ReadyFilter returns a NodeFilter that checks schedule + conditions. A node is ready if:

  • It has no schedule (always ready), OR
  • Its schedule interval has elapsed AND its min_buffer period has passed
  • AND its condition function (if any) returns true

type State

type State struct {
	// contains filtered or unexported fields
}

State is a thread-safe key-value store for passing data between nodes.

func NewState

func NewState() *State

NewState creates a new empty State.

func (*State) Get

func (s *State) Get(key string) (any, bool)

Get retrieves a value by key. Returns false if the key does not exist.

func (*State) Set

func (s *State) Set(key string, value any)

Set stores a value by key.

type Tool

type Tool[I, O any] struct {
	// contains filtered or unexported fields
}

Tool wraps a DAG pipeline as a provider.RequestResponse.

func AsTool

func AsTool[I, O any](engine *Engine, graph *Graph, cfg ToolConfig[I, O]) *Tool[I, O]

AsTool wraps a DAG pipeline execution as a provider.RequestResponse. Input is written to state via InputFn, output is read via OutputFn.

func (*Tool[I, O]) Execute

func (t *Tool[I, O]) Execute(ctx context.Context, input I) (O, error)

func (*Tool[I, O]) IsAvailable

func (t *Tool[I, O]) IsAvailable(_ context.Context) bool

func (*Tool[I, O]) Name

func (t *Tool[I, O]) Name() string

type ToolConfig

type ToolConfig[I, O any] struct {
	// Name is the provider name.
	Name string
	// InputFn writes input into state before execution.
	InputFn func(input I, state *State)
	// OutputFn reads output from state after execution.
	OutputFn func(state *State) (O, error)
}

ToolConfig configures how a DAG pipeline maps to a provider interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL