flow

package
v1.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2026 License: MIT Imports: 24 Imported by: 0

Documentation

Overview

Package flow provides discovery, caching, and validation for YAML-defined flows. Filenames map to flow IDs (kebab-case) and output schemas may use $ref which are resolved at load.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrFlowNotFound     = errors.New("flow not found")
	ErrFlowDisabled     = errors.New("flow is disabled")
	ErrInvalidFlowName  = errors.New("invalid flow name")
	ErrDuplicateFlowID  = errors.New("duplicate flow ID")
	ErrInvalidStepID    = errors.New("invalid step ID")
	ErrDuplicateStepID  = errors.New("duplicate step ID")
	ErrInvalidRule      = errors.New("rule references non-existent step")
	ErrInvalidFallback  = errors.New("fallback references non-existent step")
	ErrNoSteps          = errors.New("flow has no steps")
	ErrInvalidYAML      = errors.New("invalid flow YAML")
	ErrInvalidPredicate = errors.New("invalid predicate")
	// TODO(#): Implement cycle detection in step graph validation
	// Use DFS or Tarjan's algorithm to detect circular dependencies
	ErrCycleDetected = errors.New("cycle detected in step graph")
)

Functions

func Invalidate

func Invalidate()

Invalidate clears the cached flows, forcing re-discovery on next access.

Types

type Conflicts added in v1.12.0

type Conflicts struct {
	Conflicts []FlowConflict
}

Conflicts aggregates all duplicate flow ID conflicts.

func GetConflicts added in v1.12.0

func GetConflicts() *Conflicts

GetConflicts returns any duplicate flow ID conflicts discovered during loading.

func (*Conflicts) Error added in v1.12.0

func (c *Conflicts) Error() string

Error returns a formatted error string for the conflicts.

func (*Conflicts) HasConflicts added in v1.12.0

func (c *Conflicts) HasConflicts() bool

HasConflicts returns true if there are any conflicts.

type Fallback

type Fallback struct {
	Retry int    `yaml:"retry"`
	Delay int    `yaml:"delay,omitempty"`
	To    string `yaml:"to,omitempty"`
}

Fallback defines retry and error-routing behavior for a step.

type Flow

type Flow struct {
	ID          string   `json:"id"`
	Name        string   `json:"name"`
	Description string   `json:"description"`
	Disabled    bool     `json:"disabled"`
	Steps       int      `json:"steps"`
	Spec        FlowSpec `json:"spec,omitempty"`
	Location    string   `json:"location"`
	HasConflict bool     `json:"has_conflict"`
	Locations   []string `json:"conflict_locations,omitempty"`
}

Flow represents a discovered flow definition.

func All

func All() []Flow

All returns all discovered flows.

func Get

func Get(id string) (*Flow, error)

Get returns a flow by ID, or ErrFlowNotFound.

type FlowConflict added in v1.12.0

type FlowConflict struct {
	ID        string
	Locations []string
}

FlowConflict holds duplicate flow IDs and their locations.

type FlowSession

type FlowSession struct {
	Prefix string `yaml:"prefix,omitempty"`
}

FlowSession controls session behavior at the flow level.

type FlowSpec

type FlowSpec struct {
	Args    map[string]any `yaml:"args,omitempty"`
	Session FlowSession    `yaml:"session,omitempty"`
	Steps   []Step         `yaml:"steps"`
}

FlowSpec contains the flow's args schema and step definitions.

type FlowState

type FlowState struct {
	SessionID      string
	RootSessionID  string
	FlowID         string
	StepID         string
	Status         FlowStatus
	Args           map[string]any
	Output         string
	IsStructOutput bool
	CreatedAt      int64
	UpdatedAt      int64
}

type FlowStatus

type FlowStatus string
const (
	FlowStatusRunning   FlowStatus = "running"
	FlowStatusCompleted FlowStatus = "completed"
	FlowStatusFailed    FlowStatus = "failed"
	FlowStatusPostponed FlowStatus = "postponed"
)

type Rule

type Rule struct {
	If       string `yaml:"if"`
	Then     string `yaml:"then"`
	Postpone bool   `yaml:"postpone,omitempty"`
}

Rule defines a conditional routing rule evaluated after step completion.

type Service

type Service interface {
	pubsub.Subscriber[FlowState]
	Run(ctx context.Context, sessionPrefix string, flowID string, args map[string]any, fresh bool) (<-chan agentpkg.AgentEvent, <-chan *FlowState, error)
}

func NewService

func NewService(
	sessions session.Service,
	messages message.Service,
	querier db.QuerierWithTx,
	permissions permission.Service,
	agents agentpkg.AgentFactory,
) Service

type Step

type Step struct {
	ID       string      `yaml:"id"`
	Agent    string      `yaml:"agent,omitempty"`
	Session  StepSession `yaml:"session,omitempty"`
	Prompt   string      `yaml:"prompt"`
	Output   *StepOutput `yaml:"output,omitempty"`
	Rules    []Rule      `yaml:"rules,omitempty"`
	Fallback *Fallback   `yaml:"fallback,omitempty"`
}

Step defines a single step in the flow graph.

type StepOutput

type StepOutput struct {
	Schema map[string]any `yaml:"schema"`
}

StepOutput defines optional structured output for a step.

type StepSession

type StepSession struct {
	Fork bool `yaml:"fork,omitempty"`
}

StepSession controls session behavior for a step.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL