workflow

package
v0.0.0-...-8acab51 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2026 License: MIT Imports: 22 Imported by: 0

Documentation

Overview

Package workflow provides n8n-style workflow automation capabilities.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrWorkflowNotFound     = errors.New("workflow not found")
	ErrWorkflowDisabled     = errors.New("workflow is disabled")
	ErrExecutionNotFound    = errors.New("execution not found")
	ErrNodeNotFound         = errors.New("node not found")
	ErrInvalidWorkflow      = errors.New("invalid workflow definition")
	ErrInvalidNode          = errors.New("invalid node configuration")
	ErrInvalidTrigger       = errors.New("invalid trigger configuration")
	ErrCyclicDependency     = errors.New("cyclic dependency detected")
	ErrMaxExecutionsReached = errors.New("maximum concurrent executions reached")
	ErrExecutionTimeout     = errors.New("execution timed out")
	ErrNodeExecutionFailed  = errors.New("node execution failed")
)

Common errors

Functions

func TenantFromContext

func TenantFromContext(ctx context.Context) string

func WithTenantContext

func WithTenantContext(ctx context.Context, tenantID string) context.Context

Types

type ActionConfig

type ActionConfig struct {
	Type       ActionType            `json:"type"`
	HTTP       *HTTPActionConfig     `json:"http,omitempty"`
	LLM        *LLMActionConfig      `json:"llm,omitempty"`
	Skill      *SkillActionConfig    `json:"skill,omitempty"`
	Channel    *ChannelActionConfig  `json:"channel,omitempty"`
	Browser    *BrowserActionConfig  `json:"browser,omitempty"`
	File       *FileActionConfig     `json:"file,omitempty"`
	JavaScript *JavaScriptConfig     `json:"javascript,omitempty"`
	Variable   *VariableActionConfig `json:"variable,omitempty"`
	Email      *EmailActionConfig    `json:"email,omitempty"`
	Notify     *NotifyActionConfig   `json:"notify,omitempty"`
}

ActionConfig contains action-specific configuration.

type ActionHandler

type ActionHandler func(ctx context.Context, config map[string]interface{}, input map[string]interface{}) (map[string]interface{}, error)

ActionHandler handles execution of a specific action type.

type ActionType

type ActionType string

ActionType represents the type of action node.

const (
	ActionTypeHTTP        ActionType = "http"
	ActionTypeLLM         ActionType = "llm"
	ActionTypeSkill       ActionType = "skill"
	ActionTypeChannelSend ActionType = "channel_send"
	ActionTypeBrowser     ActionType = "browser"
	ActionTypeFileOps     ActionType = "file_ops"
	ActionTypeJavaScript  ActionType = "javascript"
	ActionTypeSetVariable ActionType = "set_variable"
	ActionTypeEmail       ActionType = "email"
	ActionTypeNotify      ActionType = "notify"
)

type BrowserActionConfig

type BrowserActionConfig struct {
	Action   string                 `json:"action"` // navigate, click, type, screenshot, etc.
	URL      string                 `json:"url,omitempty"`
	Selector string                 `json:"selector,omitempty"`
	Value    string                 `json:"value,omitempty"`
	Options  map[string]interface{} `json:"options,omitempty"`
}

BrowserActionConfig contains browser action configuration.

type ChannelActionConfig

type ChannelActionConfig struct {
	ChannelType string `json:"channel_type"`
	ChannelID   string `json:"channel_id"`
	Message     string `json:"message"`
	Format      string `json:"format,omitempty"` // text, markdown, html
}

ChannelActionConfig contains channel send action configuration.

type ChannelTriggerConfig

type ChannelTriggerConfig struct {
	ChannelType string   `json:"channel_type"` // telegram, discord, etc.
	ChannelID   string   `json:"channel_id,omitempty"`
	Keywords    []string `json:"keywords,omitempty"`   // Trigger on keywords
	Regex       string   `json:"regex,omitempty"`      // Trigger on regex match
	FromUsers   []string `json:"from_users,omitempty"` // Filter by user
}

ChannelTriggerConfig contains channel message trigger configuration.

type ConditionConfig

type ConditionConfig struct {
	Expression string `json:"expression"` // JavaScript expression
	TruePort   string `json:"true_port,omitempty"`
	FalsePort  string `json:"false_port,omitempty"`
}

ConditionConfig contains condition node configuration.

type Config

type Config struct {
	MaxConcurrentExecutions int    `json:"max_concurrent_executions" yaml:"max_concurrent_executions"`
	DefaultTimeout          int    `json:"default_timeout" yaml:"default_timeout"` // seconds
	MaxTimeout              int    `json:"max_timeout" yaml:"max_timeout"`         // seconds
	RetryDelay              int    `json:"retry_delay" yaml:"retry_delay"`         // seconds
	MaxRetries              int    `json:"max_retries" yaml:"max_retries"`
	SaveExecutionData       bool   `json:"save_execution_data" yaml:"save_execution_data"`
	ExecutionDataRetention  int    `json:"execution_data_retention" yaml:"execution_data_retention"` // days
	WebhookBasePath         string `json:"webhook_base_path" yaml:"webhook_base_path"`
	Enabled                 bool   `json:"enabled" yaml:"enabled"`
}

Config contains workflow engine configuration.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns the default workflow configuration.

type Connection

type Connection struct {
	ID         string `json:"id"`
	SourceNode string `json:"source_node"`
	SourcePort string `json:"source_port,omitempty"`
	TargetNode string `json:"target_node"`
	TargetPort string `json:"target_port,omitempty"`
	Condition  string `json:"condition,omitempty"` // For conditional connections
}

Connection represents a connection between nodes.

type CreateWorkflowRequest

type CreateWorkflowRequest struct {
	Name        string            `json:"name" validate:"required"`
	Description string            `json:"description,omitempty"`
	Nodes       []Node            `json:"nodes,omitempty"`
	Connections []Connection      `json:"connections,omitempty"`
	Variables   map[string]string `json:"variables,omitempty"`
	Settings    *WorkflowSettings `json:"settings,omitempty"`
	Tags        []string          `json:"tags,omitempty"`
}

CreateWorkflowRequest represents a create workflow request.

type DelayConfig

type DelayConfig struct {
	Duration int    `json:"duration"`        // Delay in seconds
	Until    string `json:"until,omitempty"` // Wait until specific time
}

DelayConfig contains delay node configuration.

type EmailActionConfig

type EmailActionConfig struct {
	To          []string `json:"to"`
	Cc          []string `json:"cc,omitempty"`
	Bcc         []string `json:"bcc,omitempty"`
	Subject     string   `json:"subject"`
	Body        string   `json:"body"`
	ContentType string   `json:"content_type,omitempty"` // text, html
	Attachments []string `json:"attachments,omitempty"`
}

EmailActionConfig contains email action configuration.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine executes workflows.

func NewEngine

func NewEngine(config *Config) *Engine

NewEngine creates a new workflow engine.

func (*Engine) CancelExecution

func (e *Engine) CancelExecution(id string) error

CancelExecution cancels a running execution.

func (*Engine) Close

func (e *Engine) Close() error

Close shuts down the engine.

func (*Engine) Execute

func (e *Engine) Execute(ctx context.Context, workflow *Workflow, triggerType TriggerType, triggerData map[string]interface{}) (*Execution, error)

Execute starts a workflow execution.

func (*Engine) GetExecution

func (e *Engine) GetExecution(id string) (*Execution, error)

GetExecution returns an execution by ID.

func (*Engine) ListExecutions

func (e *Engine) ListExecutions() []*Execution

ListExecutions returns all executions.

func (*Engine) RegisterHandler

func (e *Engine) RegisterHandler(actionType ActionType, handler ActionHandler)

RegisterHandler registers a custom action handler.

func (*Engine) ResumeExecution

func (e *Engine) ResumeExecution(id string, resume ExecutionResumeInput) (*Execution, error)

ResumeExecution resumes a paused execution with a decision payload.

func (*Engine) SetCheckpointEnabledFunc

func (e *Engine) SetCheckpointEnabledFunc(enabled func() bool)

SetCheckpointEnabledFunc wires a runtime gate for checkpoint/pause semantics.

func (*Engine) SetExecutionObserver

func (e *Engine) SetExecutionObserver(observer func(*Execution))

SetExecutionObserver wires a callback for execution state transitions.

func (*Engine) SetToolGateway

func (e *Engine) SetToolGateway(gateway ToolRuntime)

SetToolGateway wires the shared tool runtime into workflow action handlers.

func (*Engine) ValidateWorkflow

func (e *Engine) ValidateWorkflow(workflow *Workflow) error

ValidateWorkflow validates a workflow definition.

type ExecuteWorkflowRequest

type ExecuteWorkflowRequest struct {
	TriggerData map[string]interface{} `json:"trigger_data,omitempty"`
}

ExecuteWorkflowRequest represents an execute workflow request.

type Execution

type Execution struct {
	ID           string                 `json:"id"`
	WorkflowID   string                 `json:"workflow_id"`
	WorkflowName string                 `json:"workflow_name"`
	TenantID     string                 `json:"tenant_id"`
	Status       ExecutionStatus        `json:"status"`
	StatusReason string                 `json:"status_reason,omitempty"`
	TriggerType  TriggerType            `json:"trigger_type"`
	TriggerData  map[string]interface{} `json:"trigger_data,omitempty"`
	Variables    map[string]interface{} `json:"variables,omitempty"`
	NodeResults  map[string]*NodeResult `json:"node_results,omitempty"`
	Checkpoint   *ExecutionCheckpoint   `json:"checkpoint,omitempty"`
	Error        string                 `json:"error,omitempty"`
	StartedAt    time.Time              `json:"started_at"`
	CompletedAt  *time.Time             `json:"completed_at,omitempty"`
	Duration     int64                  `json:"duration,omitempty"` // Duration in milliseconds
}

Execution represents a workflow execution instance.

type ExecutionCancellationController

type ExecutionCancellationController interface {
	CancelExecution(ctx context.Context, executionID string, reason string) (handled bool, err error)
}

ExecutionCancellationController optionally lets callers route execution cancellation through the owning runtime/control plane.

type ExecutionCheckpoint

type ExecutionCheckpoint struct {
	ID        string                  `json:"id"`
	Kind      ExecutionCheckpointKind `json:"kind"`
	NodeID    string                  `json:"node_id,omitempty"`
	NodeName  string                  `json:"node_name,omitempty"`
	Reason    string                  `json:"reason,omitempty"`
	Payload   map[string]interface{}  `json:"payload,omitempty"`
	CreatedAt time.Time               `json:"created_at"`
	ResumedAt *time.Time              `json:"resumed_at,omitempty"`
	Resume    *ExecutionResumeInput   `json:"resume,omitempty"`
}

ExecutionCheckpoint captures pause/resume state for an execution.

type ExecutionCheckpointKind

type ExecutionCheckpointKind string

ExecutionCheckpointKind identifies a deterministic pause/resume checkpoint.

const (
	ExecutionCheckpointPauseForApproval   ExecutionCheckpointKind = "pause_for_approval"
	ExecutionCheckpointResumeWithDecision ExecutionCheckpointKind = "resume_with_decision"
	ExecutionCheckpointAwaitToolResult    ExecutionCheckpointKind = "await_tool_result"
	ExecutionCheckpointJSONTask           ExecutionCheckpointKind = "json_task"
)

type ExecutionLaunchRequest

type ExecutionLaunchRequest struct {
	WorkflowID     string                 `json:"workflow_id"`
	TriggerType    TriggerType            `json:"trigger_type,omitempty"`
	TriggerData    map[string]interface{} `json:"trigger_data,omitempty"`
	UserID         string                 `json:"user_id,omitempty"`
	ConversationID string                 `json:"conversation_id,omitempty"`
	TenantID       string                 `json:"tenant_id,omitempty"`
}

ExecutionLaunchRequest describes a caller-originated workflow run request.

type ExecutionLauncher

type ExecutionLauncher interface {
	LaunchExecution(ctx context.Context, req ExecutionLaunchRequest) (*Execution, error)
}

ExecutionLauncher allows callers to route workflow launches through an alternate runtime while preserving workflow.Execution responses.

type ExecutionLog

type ExecutionLog struct {
	ID          string    `json:"id"`
	ExecutionID string    `json:"execution_id"`
	NodeID      string    `json:"node_id,omitempty"`
	Level       string    `json:"level"` // debug, info, warn, error
	Message     string    `json:"message"`
	Data        string    `json:"data,omitempty"`
	Timestamp   time.Time `json:"timestamp"`
}

ExecutionLog represents a log entry for an execution.

type ExecutionResumeController

type ExecutionResumeController interface {
	ResumeExecution(ctx context.Context, executionID string, resume ExecutionResumeInput) (handled bool, execution *Execution, err error)
}

ExecutionResumeController optionally lets callers route checkpoint resume through the owning runtime/control plane.

type ExecutionResumeInput

type ExecutionResumeInput struct {
	Decision string                 `json:"decision,omitempty"`
	Payload  map[string]interface{} `json:"payload,omitempty"`
}

ExecutionResumeInput represents a resume decision/payload supplied by callers.

type ExecutionStatus

type ExecutionStatus string

ExecutionStatus represents the status of a workflow execution.

const (
	ExecutionStatusPending   ExecutionStatus = "pending"
	ExecutionStatusRunning   ExecutionStatus = "running"
	ExecutionStatusCompleted ExecutionStatus = "completed"
	ExecutionStatusFailed    ExecutionStatus = "failed"
	ExecutionStatusCancelled ExecutionStatus = "cancelled"
	ExecutionStatusPaused    ExecutionStatus = "paused"
)

type FileActionConfig

type FileActionConfig struct {
	Operation  string `json:"operation"` // read, write, copy, move, delete
	SourcePath string `json:"source_path"`
	DestPath   string `json:"dest_path,omitempty"`
	Content    string `json:"content,omitempty"`
	Encoding   string `json:"encoding,omitempty"`
	CreateDirs bool   `json:"create_dirs,omitempty"`
}

FileActionConfig contains file operation configuration.

type FileChangeConfig

type FileChangeConfig struct {
	Path      string   `json:"path"`
	Patterns  []string `json:"patterns,omitempty"`  // File patterns to watch
	Events    []string `json:"events,omitempty"`    // create, modify, delete
	Recursive bool     `json:"recursive,omitempty"` // Watch subdirectories
}

FileChangeConfig contains file change trigger configuration.

type HTTPActionConfig

type HTTPActionConfig struct {
	URL             string            `json:"url"`
	Method          string            `json:"method"`
	Headers         map[string]string `json:"headers,omitempty"`
	Body            string            `json:"body,omitempty"`
	ContentType     string            `json:"content_type,omitempty"`
	Timeout         int               `json:"timeout,omitempty"`
	FollowRedirects bool              `json:"follow_redirects,omitempty"`
	ValidateSSL     bool              `json:"validate_ssl,omitempty"`
}

HTTPActionConfig contains HTTP action configuration.

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler handles HTTP requests for workflows.

func NewHandler

func NewHandler(service *WorkflowService) *Handler

NewHandler creates a new workflow handler.

func NewLazyHandler

func NewLazyHandler(initFn func() *WorkflowService) *Handler

NewLazyHandler creates a handler that defers WorkflowService creation to the first API call.

func (*Handler) CancelExecution

func (h *Handler) CancelExecution(c echo.Context) error

CancelExecution cancels a running execution.

func (*Handler) CreateWorkflow

func (h *Handler) CreateWorkflow(c echo.Context) error

CreateWorkflow creates a new workflow.

func (*Handler) DeleteWorkflow

func (h *Handler) DeleteWorkflow(c echo.Context) error

DeleteWorkflow deletes a workflow.

func (*Handler) DisableWorkflow

func (h *Handler) DisableWorkflow(c echo.Context) error

DisableWorkflow disables a workflow.

func (*Handler) EnableWorkflow

func (h *Handler) EnableWorkflow(c echo.Context) error

EnableWorkflow enables a workflow.

func (*Handler) ExecuteWorkflow

func (h *Handler) ExecuteWorkflow(c echo.Context) error

ExecuteWorkflow manually executes a workflow.

func (*Handler) ExportWorkflow

func (h *Handler) ExportWorkflow(c echo.Context) error

ExportWorkflow exports a workflow as JSON.

func (*Handler) GetExecution

func (h *Handler) GetExecution(c echo.Context) error

GetExecution retrieves an execution by ID.

func (*Handler) GetExecutionLogs

func (h *Handler) GetExecutionLogs(c echo.Context) error

GetExecutionLogs retrieves logs for an execution.

func (*Handler) GetService

func (h *Handler) GetService() *WorkflowService

GetService returns the workflow service, triggering lazy init if needed.

func (*Handler) GetStats

func (h *Handler) GetStats(c echo.Context) error

GetStats returns workflow statistics.

func (*Handler) GetTemplates

func (h *Handler) GetTemplates(c echo.Context) error

GetTemplates returns available workflow templates.

func (*Handler) GetWorkflow

func (h *Handler) GetWorkflow(c echo.Context) error

GetWorkflow retrieves a workflow by ID.

func (*Handler) HandleWebhook

func (h *Handler) HandleWebhook(c echo.Context) error

HandleWebhook handles incoming webhook requests.

func (*Handler) ImportWorkflow

func (h *Handler) ImportWorkflow(c echo.Context) error

ImportWorkflow imports a workflow from JSON.

func (*Handler) ListExecutions

func (h *Handler) ListExecutions(c echo.Context) error

ListExecutions lists executions for a workflow.

func (*Handler) ListWorkflows

func (h *Handler) ListWorkflows(c echo.Context) error

ListWorkflows lists workflows with pagination.

func (*Handler) RegisterRoutes

func (h *Handler) RegisterRoutes(e *echo.Echo)

RegisterRoutes registers the workflow routes.

func (*Handler) ResumeExecution

func (h *Handler) ResumeExecution(c echo.Context) error

ResumeExecution resumes a paused execution from its current checkpoint.

func (*Handler) RetryExecution

func (h *Handler) RetryExecution(c echo.Context) error

RetryExecution retries a failed execution.

func (*Handler) SetExecutionLauncher

func (h *Handler) SetExecutionLauncher(launcher ExecutionLauncher)

SetExecutionLauncher overrides manual workflow execute requests with a harness-aware launcher while leaving the service-based read APIs intact.

func (*Handler) SetIdleReclaim

func (h *Handler) SetIdleReclaim(idleAfter time.Duration)

SetIdleReclaim configures idle reclaim for the lazily initialized workflow service.

func (*Handler) SetRouteMiddlewares

func (h *Handler) SetRouteMiddlewares(middlewares ...echo.MiddlewareFunc)

SetRouteMiddlewares applies security middleware to workflow management routes. Webhook routes stay public and are intentionally excluded.

func (*Handler) SetServiceInitHook

func (h *Handler) SetServiceInitHook(fn func(*WorkflowService))

SetServiceInitHook configures a callback that runs once the lazy service is created.

func (*Handler) UpdateWorkflow

func (h *Handler) UpdateWorkflow(c echo.Context) error

UpdateWorkflow updates an existing workflow.

func (*Handler) ValidateWorkflow

func (h *Handler) ValidateWorkflow(c echo.Context) error

ValidateWorkflow validates a workflow definition.

type JavaScriptConfig

type JavaScriptConfig struct {
	Code    string `json:"code"`
	Timeout int    `json:"timeout,omitempty"`
}

JavaScriptConfig contains JavaScript execution configuration.

type LLMActionConfig

type LLMActionConfig struct {
	Provider     string   `json:"provider"` // openai, claude, ollama
	Model        string   `json:"model"`
	Prompt       string   `json:"prompt"`
	SystemPrompt string   `json:"system_prompt,omitempty"`
	Temperature  float64  `json:"temperature,omitempty"`
	MaxTokens    int      `json:"max_tokens,omitempty"`
	Tools        []string `json:"tools,omitempty"` // Tool names to enable
}

LLMActionConfig contains LLM action configuration.

type ListOptions

type ListOptions struct {
	Offset  int               `json:"offset"`
	Limit   int               `json:"limit"`
	Sort    string            `json:"sort,omitempty"`
	Order   string            `json:"order,omitempty"` // asc, desc
	Filters map[string]string `json:"filters,omitempty"`
}

ListOptions contains options for listing resources.

type LoopConfig

type LoopConfig struct {
	Type          string `json:"type"`                     // for_each, while, count
	Items         string `json:"items,omitempty"`          // Expression for items to iterate
	Condition     string `json:"condition,omitempty"`      // While condition
	Count         int    `json:"count,omitempty"`          // Fixed count
	MaxIterations int    `json:"max_iterations,omitempty"` // Safety limit
}

LoopConfig contains loop node configuration.

type Node

type Node struct {
	ID          string                 `json:"id"`
	Type        NodeType               `json:"type"`
	Name        string                 `json:"name"`
	Description string                 `json:"description,omitempty"`
	Config      map[string]interface{} `json:"config,omitempty"`
	Position    *Position              `json:"position,omitempty"`
	Disabled    bool                   `json:"disabled,omitempty"`
}

Node represents a workflow node.

type NodeResult

type NodeResult struct {
	NodeID      string                 `json:"node_id"`
	NodeName    string                 `json:"node_name"`
	Status      NodeStatus             `json:"status"`
	Input       map[string]interface{} `json:"input,omitempty"`
	Output      map[string]interface{} `json:"output,omitempty"`
	Error       string                 `json:"error,omitempty"`
	StartedAt   time.Time              `json:"started_at"`
	CompletedAt *time.Time             `json:"completed_at,omitempty"`
	Duration    int64                  `json:"duration,omitempty"` // Duration in milliseconds
	RetryCount  int                    `json:"retry_count,omitempty"`
}

NodeResult represents the result of a node execution.

type NodeStatus

type NodeStatus string

NodeStatus represents the status of a node execution.

const (
	NodeStatusPending   NodeStatus = "pending"
	NodeStatusRunning   NodeStatus = "running"
	NodeStatusCompleted NodeStatus = "completed"
	NodeStatusFailed    NodeStatus = "failed"
	NodeStatusSkipped   NodeStatus = "skipped"
)

type NodeType

type NodeType string

NodeType represents the type of workflow node.

const (
	NodeTypeTrigger   NodeType = "trigger"
	NodeTypeAction    NodeType = "action"
	NodeTypeCondition NodeType = "condition"
	NodeTypeLoop      NodeType = "loop"
	NodeTypeSwitch    NodeType = "switch"
	NodeTypeMerge     NodeType = "merge"
	NodeTypeDelay     NodeType = "delay"
	NodeTypeSubflow   NodeType = "subflow"
)

type NotifyActionConfig

type NotifyActionConfig struct {
	Title    string `json:"title,omitempty"`
	Message  string `json:"message"`
	Priority string `json:"priority,omitempty"` // low, normal, high
	Channel  string `json:"channel,omitempty"`  // Notification channel
}

NotifyActionConfig contains notification action configuration.

type Position

type Position struct {
	X float64 `json:"x"`
	Y float64 `json:"y"`
}

Position represents the visual position of a node.

type Repository

type Repository struct {
	// contains filtered or unexported fields
}

Repository handles workflow persistence.

func NewRepository

func NewRepository(db *sql.DB) (*Repository, error)

NewRepository creates a new workflow repository.

func NewRepositoryWithReadDB

func NewRepositoryWithReadDB(writeDB, readDB *sql.DB) (*Repository, error)

NewRepositoryWithReadDB creates a new workflow repository with separate write and read database handles.

func (*Repository) CleanupOldExecutions

func (r *Repository) CleanupOldExecutions(ctx context.Context, retentionDays int) (int64, error)

func (*Repository) CreateWorkflow

func (r *Repository) CreateWorkflow(ctx context.Context, workflow *Workflow) error

func (*Repository) DeleteWebhook

func (r *Repository) DeleteWebhook(ctx context.Context, workflowID string) error

func (*Repository) DeleteWorkflow

func (r *Repository) DeleteWorkflow(ctx context.Context, id string) error

func (*Repository) GetExecution

func (r *Repository) GetExecution(ctx context.Context, id string) (*Execution, error)

func (*Repository) GetExecutionLogs

func (r *Repository) GetExecutionLogs(ctx context.Context, executionID string, opts *ListOptions) ([]*ExecutionLog, int, error)

func (*Repository) GetStats

func (r *Repository) GetStats(ctx context.Context, tenantID string) (*Stats, error)

func (*Repository) GetWebhookByPath

func (r *Repository) GetWebhookByPath(ctx context.Context, path string) (string, error)

func (*Repository) GetWorkflow

func (r *Repository) GetWorkflow(ctx context.Context, id string) (*Workflow, error)

func (*Repository) ListExecutions

func (r *Repository) ListExecutions(ctx context.Context, workflowID string, opts *ListOptions) ([]*Execution, int, error)

func (*Repository) ListWorkflows

func (r *Repository) ListWorkflows(ctx context.Context, tenantID string, opts *ListOptions) ([]*Workflow, int, error)

func (*Repository) SaveExecution

func (r *Repository) SaveExecution(ctx context.Context, execution *Execution) error

func (*Repository) SaveExecutionLog

func (r *Repository) SaveExecutionLog(ctx context.Context, log *ExecutionLog) error

func (*Repository) SaveWebhook

func (r *Repository) SaveWebhook(ctx context.Context, workflowID, path, method, authType string, authConfig map[string]string) error

func (*Repository) UpdateWorkflow

func (r *Repository) UpdateWorkflow(ctx context.Context, workflow *Workflow) error

type ResumeExecutionRequest

type ResumeExecutionRequest struct {
	Decision string                 `json:"decision,omitempty"`
	Payload  map[string]interface{} `json:"payload,omitempty"`
}

ResumeExecutionRequest represents a resume execution request.

type ScheduleConfig

type ScheduleConfig struct {
	Cron     string `json:"cron,omitempty"`     // Cron expression
	Interval int    `json:"interval,omitempty"` // Interval in seconds
	Timezone string `json:"timezone,omitempty"` // Timezone for cron
}

ScheduleConfig contains schedule trigger configuration.

type Service

type Service interface {
	// Workflow CRUD
	CreateWorkflow(ctx context.Context, workflow *Workflow) (*Workflow, error)
	GetWorkflow(ctx context.Context, id string) (*Workflow, error)
	UpdateWorkflow(ctx context.Context, workflow *Workflow) (*Workflow, error)
	DeleteWorkflow(ctx context.Context, id string) error
	ListWorkflows(ctx context.Context, tenantID string, opts *ListOptions) ([]*Workflow, int, error)

	// Workflow control
	EnableWorkflow(ctx context.Context, id string) error
	DisableWorkflow(ctx context.Context, id string) error
	ValidateWorkflow(ctx context.Context, workflow *Workflow) error

	// Execution
	ExecuteWorkflow(ctx context.Context, id string, triggerData map[string]interface{}) (*Execution, error)
	ExecuteWorkflowWithTrigger(ctx context.Context, id string, triggerType TriggerType, triggerData map[string]interface{}) (*Execution, error)
	GetExecution(ctx context.Context, id string) (*Execution, error)
	ListExecutions(ctx context.Context, workflowID string, opts *ListOptions) ([]*Execution, int, error)
	CancelExecution(ctx context.Context, id string) error
	RetryExecution(ctx context.Context, id string) (*Execution, error)
	ResumeExecution(ctx context.Context, id string, resume ExecutionResumeInput) (*Execution, error)

	// Logs
	GetExecutionLogs(ctx context.Context, executionID string, opts *ListOptions) ([]*ExecutionLog, int, error)

	// Triggers
	RegisterTrigger(ctx context.Context, workflowID string, trigger *TriggerConfig) error
	UnregisterTrigger(ctx context.Context, workflowID string) error

	// Webhook
	HandleWebhook(ctx context.Context, path string, method string, headers map[string]string, body []byte) (*Execution, error)

	// Stats
	GetStats(ctx context.Context, tenantID string) (*Stats, error)
}

Service defines the workflow service interface.

type SkillActionConfig

type SkillActionConfig struct {
	SkillID    string                 `json:"skill_id"`
	Parameters map[string]interface{} `json:"parameters,omitempty"`
}

SkillActionConfig contains skill action configuration.

type Stats

type Stats struct {
	TotalWorkflows       int `json:"total_workflows"`
	ActiveWorkflows      int `json:"active_workflows"`
	TotalExecutions      int `json:"total_executions"`
	RunningExecutions    int `json:"running_executions"`
	SuccessfulExecutions int `json:"successful_executions"`
	FailedExecutions     int `json:"failed_executions"`
}

Stats contains workflow statistics.

type SwitchConfig

type SwitchConfig struct {
	Expression string            `json:"expression"`        // Expression to evaluate
	Cases      map[string]string `json:"cases"`             // Value -> port mapping
	Default    string            `json:"default,omitempty"` // Default port
}

SwitchConfig contains switch node configuration.

type SystemTriggerConfig

type SystemTriggerConfig struct {
	EventType string `json:"event_type"` // startup, shutdown, etc.
}

SystemTriggerConfig contains system event trigger configuration.

type ToolExecutionRequest

type ToolExecutionRequest struct {
	ToolName   string
	Arguments  map[string]interface{}
	SessionID  string
	UserID     string
	Provider   string
	ProviderID string
	Model      string
	AgentID    string
}

ToolExecutionRequest is the workflow-local representation of a tool call.

type ToolExecutionResult

type ToolExecutionResult struct {
	ExecutionResult interface{}
	CompactPayload  interface{}
	AuditPayload    interface{}
}

ToolExecutionResult is the workflow-local result envelope returned by ToolRuntime.

type ToolRuntime

type ToolRuntime interface {
	Execute(ctx context.Context, req ToolExecutionRequest) (*ToolExecutionResult, error)
}

ToolRuntime is the workflow-side abstraction for shared tool execution.

type TriggerConfig

type TriggerConfig struct {
	Type     TriggerType           `json:"type"`
	Schedule *ScheduleConfig       `json:"schedule,omitempty"`
	Webhook  *WebhookConfig        `json:"webhook,omitempty"`
	File     *FileChangeConfig     `json:"file,omitempty"`
	Channel  *ChannelTriggerConfig `json:"channel,omitempty"`
	System   *SystemTriggerConfig  `json:"system,omitempty"`
}

TriggerConfig contains trigger-specific configuration.

type TriggerType

type TriggerType string

TriggerType represents the type of workflow trigger.

const (
	TriggerTypeManual      TriggerType = "manual"
	TriggerTypeSchedule    TriggerType = "schedule"
	TriggerTypeWebhook     TriggerType = "webhook"
	TriggerTypeFileChange  TriggerType = "file_change"
	TriggerTypeChannelMsg  TriggerType = "channel_message"
	TriggerTypeSystemEvent TriggerType = "system_event"
)

type UpdateWorkflowRequest

type UpdateWorkflowRequest struct {
	Name        string            `json:"name,omitempty"`
	Description string            `json:"description,omitempty"`
	Status      WorkflowStatus    `json:"status,omitempty"`
	Nodes       []Node            `json:"nodes,omitempty"`
	Connections []Connection      `json:"connections,omitempty"`
	Variables   map[string]string `json:"variables,omitempty"`
	Settings    *WorkflowSettings `json:"settings,omitempty"`
	Tags        []string          `json:"tags,omitempty"`
}

UpdateWorkflowRequest represents an update workflow request.

type VariableActionConfig

type VariableActionConfig struct {
	Name  string `json:"name"`
	Value string `json:"value"`
	Scope string `json:"scope,omitempty"` // execution, workflow
}

VariableActionConfig contains variable set action configuration.

type WebhookConfig

type WebhookConfig struct {
	Path       string            `json:"path"`
	Method     string            `json:"method,omitempty"` // GET, POST, etc.
	Headers    map[string]string `json:"headers,omitempty"`
	AuthType   string            `json:"auth_type,omitempty"` // none, basic, bearer, api_key
	AuthConfig map[string]string `json:"auth_config,omitempty"`
}

WebhookConfig contains webhook trigger configuration.

type Workflow

type Workflow struct {
	ID          string            `json:"id"`
	TenantID    string            `json:"tenant_id"`
	Name        string            `json:"name"`
	Description string            `json:"description,omitempty"`
	Status      WorkflowStatus    `json:"status"`
	Nodes       []Node            `json:"nodes"`
	Connections []Connection      `json:"connections"`
	Variables   map[string]string `json:"variables,omitempty"`
	Settings    *WorkflowSettings `json:"settings,omitempty"`
	Tags        []string          `json:"tags,omitempty"`
	Version     int               `json:"version"`
	CreatedAt   time.Time         `json:"created_at"`
	UpdatedAt   time.Time         `json:"updated_at"`
	CreatedBy   string            `json:"created_by,omitempty"`
	UpdatedBy   string            `json:"updated_by,omitempty"`
}

Workflow represents a workflow definition.

type WorkflowService

type WorkflowService struct {
	// contains filtered or unexported fields
}

WorkflowService implements the Service interface.

func NewService

func NewService(config *Config, repo *Repository) (*WorkflowService, error)

NewService creates a new workflow service.

func (*WorkflowService) CancelExecution

func (s *WorkflowService) CancelExecution(ctx context.Context, id string) error

CancelExecution cancels a running execution.

func (*WorkflowService) CancelExecutionDirect

func (s *WorkflowService) CancelExecutionDirect(ctx context.Context, id string) error

CancelExecutionDirect cancels an execution directly in the workflow runtime. Drivers should use this to avoid routing back through harness control paths.

func (*WorkflowService) CleanupOldExecutions

func (s *WorkflowService) CleanupOldExecutions(ctx context.Context) (int64, error)

CleanupOldExecutions removes old execution data.

func (*WorkflowService) Close

func (s *WorkflowService) Close() error

Close shuts down the service.

func (*WorkflowService) CreateWorkflow

func (s *WorkflowService) CreateWorkflow(ctx context.Context, workflow *Workflow) (*Workflow, error)

CreateWorkflow creates a new workflow.

func (*WorkflowService) DeleteWorkflow

func (s *WorkflowService) DeleteWorkflow(ctx context.Context, id string) error

DeleteWorkflow deletes a workflow.

func (*WorkflowService) DisableWorkflow

func (s *WorkflowService) DisableWorkflow(ctx context.Context, id string) error

DisableWorkflow disables a workflow.

func (*WorkflowService) EnableWorkflow

func (s *WorkflowService) EnableWorkflow(ctx context.Context, id string) error

EnableWorkflow enables a workflow.

func (*WorkflowService) ExecuteWorkflow

func (s *WorkflowService) ExecuteWorkflow(ctx context.Context, id string, triggerData map[string]interface{}) (*Execution, error)

ExecuteWorkflow manually executes a workflow.

func (*WorkflowService) ExecuteWorkflowWithTrigger

func (s *WorkflowService) ExecuteWorkflowWithTrigger(ctx context.Context, id string, triggerType TriggerType, triggerData map[string]interface{}) (*Execution, error)

ExecuteWorkflowWithTrigger executes a workflow directly with an explicit trigger type.

func (*WorkflowService) GetExecution

func (s *WorkflowService) GetExecution(ctx context.Context, id string) (*Execution, error)

GetExecution retrieves an execution by ID.

func (*WorkflowService) GetExecutionLogs

func (s *WorkflowService) GetExecutionLogs(ctx context.Context, executionID string, opts *ListOptions) ([]*ExecutionLog, int, error)

GetExecutionLogs retrieves logs for an execution.

func (*WorkflowService) GetStats

func (s *WorkflowService) GetStats(ctx context.Context, tenantID string) (*Stats, error)

GetStats returns workflow statistics.

func (*WorkflowService) GetWorkflow

func (s *WorkflowService) GetWorkflow(ctx context.Context, id string) (*Workflow, error)

GetWorkflow retrieves a workflow by ID.

func (*WorkflowService) HandleWebhook

func (s *WorkflowService) HandleWebhook(ctx context.Context, path string, method string, headers map[string]string, body []byte) (*Execution, error)

HandleWebhook handles an incoming webhook request.

func (*WorkflowService) ListExecutions

func (s *WorkflowService) ListExecutions(ctx context.Context, workflowID string, opts *ListOptions) ([]*Execution, int, error)

ListExecutions lists executions for a workflow.

func (*WorkflowService) ListWorkflows

func (s *WorkflowService) ListWorkflows(ctx context.Context, tenantID string, opts *ListOptions) ([]*Workflow, int, error)

ListWorkflows lists workflows with pagination.

func (*WorkflowService) RegisterActionHandler

func (s *WorkflowService) RegisterActionHandler(actionType ActionType, handler ActionHandler)

RegisterActionHandler registers a custom action handler.

func (*WorkflowService) RegisterTrigger

func (s *WorkflowService) RegisterTrigger(ctx context.Context, workflowID string, trigger *TriggerConfig) error

RegisterTrigger registers a trigger for a workflow.

func (*WorkflowService) ResumeExecution

func (s *WorkflowService) ResumeExecution(ctx context.Context, id string, resume ExecutionResumeInput) (*Execution, error)

ResumeExecution resumes a paused workflow execution.

func (*WorkflowService) ResumeExecutionDirect

func (s *WorkflowService) ResumeExecutionDirect(ctx context.Context, id string, resume ExecutionResumeInput) (*Execution, error)

ResumeExecutionDirect resumes a paused execution directly in the workflow runtime. Drivers and control-plane bridges should use this to avoid routing back through harness launch paths.

func (*WorkflowService) RetryExecution

func (s *WorkflowService) RetryExecution(ctx context.Context, id string) (*Execution, error)

RetryExecution retries a failed execution.

func (*WorkflowService) SetExecutionLauncher

func (s *WorkflowService) SetExecutionLauncher(launcher ExecutionLauncher)

SetExecutionLauncher wires an optional submission/control runtime for externally initiated workflow executions.

func (*WorkflowService) SetFlagEvaluator

func (s *WorkflowService) SetFlagEvaluator(evaluator workflowFlagEvaluator)

SetFlagEvaluator wires feature-flag evaluation into workflow runtime behavior.

func (*WorkflowService) SetMetricsRecorder

func (s *WorkflowService) SetMetricsRecorder(recorder workflowMetricsRecorder)

SetMetricsRecorder wires a lightweight runtime counter recorder.

func (*WorkflowService) SetToolGateway

func (s *WorkflowService) SetToolGateway(gateway ToolRuntime)

SetToolGateway wires the shared tool runtime into the workflow engine.

func (*WorkflowService) StartCleanupJob

func (s *WorkflowService) StartCleanupJob()

StartCleanupJob starts a background job to clean up old executions.

func (*WorkflowService) UnregisterTrigger

func (s *WorkflowService) UnregisterTrigger(ctx context.Context, workflowID string) error

UnregisterTrigger unregisters a trigger for a workflow.

func (*WorkflowService) UpdateWorkflow

func (s *WorkflowService) UpdateWorkflow(ctx context.Context, workflow *Workflow) (*Workflow, error)

UpdateWorkflow updates an existing workflow.

func (*WorkflowService) ValidateWorkflow

func (s *WorkflowService) ValidateWorkflow(ctx context.Context, workflow *Workflow) error

ValidateWorkflow validates a workflow definition.

type WorkflowSettings

type WorkflowSettings struct {
	Timeout            int  `json:"timeout,omitempty"`              // Execution timeout in seconds
	MaxRetries         int  `json:"max_retries,omitempty"`          // Max retries on failure
	RetryDelay         int  `json:"retry_delay,omitempty"`          // Delay between retries in seconds
	ContinueOnError    bool `json:"continue_on_error,omitempty"`    // Continue execution on node error
	SaveExecutionData  bool `json:"save_execution_data,omitempty"`  // Save execution data for debugging
	MaxConcurrent      int  `json:"max_concurrent,omitempty"`       // Max concurrent executions
	NotifyOnCompletion bool `json:"notify_on_completion,omitempty"` // Send notification on completion
	NotifyOnError      bool `json:"notify_on_error,omitempty"`      // Send notification on error
}

WorkflowSettings contains workflow-level settings.

type WorkflowStatus

type WorkflowStatus string

WorkflowStatus represents the status of a workflow.

const (
	WorkflowStatusActive   WorkflowStatus = "active"
	WorkflowStatusInactive WorkflowStatus = "inactive"
	WorkflowStatusDraft    WorkflowStatus = "draft"
)

type WorkflowTemplateResponse

type WorkflowTemplateResponse struct {
	ID          string   `json:"id"`
	Name        string   `json:"name"`
	Description string   `json:"description"`
	Category    string   `json:"category"`
	Tags        []string `json:"tags"`
	Workflow    Workflow `json:"workflow"`
}

WorkflowTemplateResponse represents a workflow template.

func DefaultTemplates

func DefaultTemplates() []WorkflowTemplateResponse

DefaultTemplates returns the built-in workflow templates.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL