Documentation
¶
Overview ¶
Package workflow provides n8n-style workflow automation capabilities.
Index ¶
- Variables
- func TenantFromContext(ctx context.Context) string
- func WithTenantContext(ctx context.Context, tenantID string) context.Context
- type ActionConfig
- type ActionHandler
- type ActionType
- type BrowserActionConfig
- type ChannelActionConfig
- type ChannelTriggerConfig
- type ConditionConfig
- type Config
- type Connection
- type CreateWorkflowRequest
- type DelayConfig
- type EmailActionConfig
- type Engine
- func (e *Engine) CancelExecution(id string) error
- func (e *Engine) Close() error
- func (e *Engine) Execute(ctx context.Context, workflow *Workflow, triggerType TriggerType, ...) (*Execution, error)
- func (e *Engine) GetExecution(id string) (*Execution, error)
- func (e *Engine) ListExecutions() []*Execution
- func (e *Engine) RegisterHandler(actionType ActionType, handler ActionHandler)
- func (e *Engine) ResumeExecution(id string, resume ExecutionResumeInput) (*Execution, error)
- func (e *Engine) SetCheckpointEnabledFunc(enabled func() bool)
- func (e *Engine) SetExecutionObserver(observer func(*Execution))
- func (e *Engine) SetToolGateway(gateway ToolRuntime)
- func (e *Engine) ValidateWorkflow(workflow *Workflow) error
- type ExecuteWorkflowRequest
- type Execution
- type ExecutionCancellationController
- type ExecutionCheckpoint
- type ExecutionCheckpointKind
- type ExecutionLaunchRequest
- type ExecutionLauncher
- type ExecutionLog
- type ExecutionResumeController
- type ExecutionResumeInput
- type ExecutionStatus
- type FileActionConfig
- type FileChangeConfig
- type HTTPActionConfig
- type Handler
- func (h *Handler) CancelExecution(c echo.Context) error
- func (h *Handler) CreateWorkflow(c echo.Context) error
- func (h *Handler) DeleteWorkflow(c echo.Context) error
- func (h *Handler) DisableWorkflow(c echo.Context) error
- func (h *Handler) EnableWorkflow(c echo.Context) error
- func (h *Handler) ExecuteWorkflow(c echo.Context) error
- func (h *Handler) ExportWorkflow(c echo.Context) error
- func (h *Handler) GetExecution(c echo.Context) error
- func (h *Handler) GetExecutionLogs(c echo.Context) error
- func (h *Handler) GetService() *WorkflowService
- func (h *Handler) GetStats(c echo.Context) error
- func (h *Handler) GetTemplates(c echo.Context) error
- func (h *Handler) GetWorkflow(c echo.Context) error
- func (h *Handler) HandleWebhook(c echo.Context) error
- func (h *Handler) ImportWorkflow(c echo.Context) error
- func (h *Handler) ListExecutions(c echo.Context) error
- func (h *Handler) ListWorkflows(c echo.Context) error
- func (h *Handler) RegisterRoutes(e *echo.Echo)
- func (h *Handler) ResumeExecution(c echo.Context) error
- func (h *Handler) RetryExecution(c echo.Context) error
- func (h *Handler) SetExecutionLauncher(launcher ExecutionLauncher)
- func (h *Handler) SetIdleReclaim(idleAfter time.Duration)
- func (h *Handler) SetRouteMiddlewares(middlewares ...echo.MiddlewareFunc)
- func (h *Handler) SetServiceInitHook(fn func(*WorkflowService))
- func (h *Handler) UpdateWorkflow(c echo.Context) error
- func (h *Handler) ValidateWorkflow(c echo.Context) error
- type JavaScriptConfig
- type LLMActionConfig
- type ListOptions
- type LoopConfig
- type Node
- type NodeResult
- type NodeStatus
- type NodeType
- type NotifyActionConfig
- type Position
- type Repository
- func (r *Repository) CleanupOldExecutions(ctx context.Context, retentionDays int) (int64, error)
- func (r *Repository) CreateWorkflow(ctx context.Context, workflow *Workflow) error
- func (r *Repository) DeleteWebhook(ctx context.Context, workflowID string) error
- func (r *Repository) DeleteWorkflow(ctx context.Context, id string) error
- func (r *Repository) GetExecution(ctx context.Context, id string) (*Execution, error)
- func (r *Repository) GetExecutionLogs(ctx context.Context, executionID string, opts *ListOptions) ([]*ExecutionLog, int, error)
- func (r *Repository) GetStats(ctx context.Context, tenantID string) (*Stats, error)
- func (r *Repository) GetWebhookByPath(ctx context.Context, path string) (string, error)
- func (r *Repository) GetWorkflow(ctx context.Context, id string) (*Workflow, error)
- func (r *Repository) ListExecutions(ctx context.Context, workflowID string, opts *ListOptions) ([]*Execution, int, error)
- func (r *Repository) ListWorkflows(ctx context.Context, tenantID string, opts *ListOptions) ([]*Workflow, int, error)
- func (r *Repository) SaveExecution(ctx context.Context, execution *Execution) error
- func (r *Repository) SaveExecutionLog(ctx context.Context, log *ExecutionLog) error
- func (r *Repository) SaveWebhook(ctx context.Context, workflowID, path, method, authType string, ...) error
- func (r *Repository) UpdateWorkflow(ctx context.Context, workflow *Workflow) error
- type ResumeExecutionRequest
- type ScheduleConfig
- type Service
- type SkillActionConfig
- type Stats
- type SwitchConfig
- type SystemTriggerConfig
- type ToolExecutionRequest
- type ToolExecutionResult
- type ToolRuntime
- type TriggerConfig
- type TriggerType
- type UpdateWorkflowRequest
- type VariableActionConfig
- type WebhookConfig
- type Workflow
- type WorkflowService
- func (s *WorkflowService) CancelExecution(ctx context.Context, id string) error
- func (s *WorkflowService) CancelExecutionDirect(ctx context.Context, id string) error
- func (s *WorkflowService) CleanupOldExecutions(ctx context.Context) (int64, error)
- func (s *WorkflowService) Close() error
- func (s *WorkflowService) CreateWorkflow(ctx context.Context, workflow *Workflow) (*Workflow, error)
- func (s *WorkflowService) DeleteWorkflow(ctx context.Context, id string) error
- func (s *WorkflowService) DisableWorkflow(ctx context.Context, id string) error
- func (s *WorkflowService) EnableWorkflow(ctx context.Context, id string) error
- func (s *WorkflowService) ExecuteWorkflow(ctx context.Context, id string, triggerData map[string]interface{}) (*Execution, error)
- func (s *WorkflowService) ExecuteWorkflowWithTrigger(ctx context.Context, id string, triggerType TriggerType, ...) (*Execution, error)
- func (s *WorkflowService) GetExecution(ctx context.Context, id string) (*Execution, error)
- func (s *WorkflowService) GetExecutionLogs(ctx context.Context, executionID string, opts *ListOptions) ([]*ExecutionLog, int, error)
- func (s *WorkflowService) GetStats(ctx context.Context, tenantID string) (*Stats, error)
- func (s *WorkflowService) GetWorkflow(ctx context.Context, id string) (*Workflow, error)
- func (s *WorkflowService) HandleWebhook(ctx context.Context, path string, method string, headers map[string]string, ...) (*Execution, error)
- func (s *WorkflowService) ListExecutions(ctx context.Context, workflowID string, opts *ListOptions) ([]*Execution, int, error)
- func (s *WorkflowService) ListWorkflows(ctx context.Context, tenantID string, opts *ListOptions) ([]*Workflow, int, error)
- func (s *WorkflowService) RegisterActionHandler(actionType ActionType, handler ActionHandler)
- func (s *WorkflowService) RegisterTrigger(ctx context.Context, workflowID string, trigger *TriggerConfig) error
- func (s *WorkflowService) ResumeExecution(ctx context.Context, id string, resume ExecutionResumeInput) (*Execution, error)
- func (s *WorkflowService) ResumeExecutionDirect(ctx context.Context, id string, resume ExecutionResumeInput) (*Execution, error)
- func (s *WorkflowService) RetryExecution(ctx context.Context, id string) (*Execution, error)
- func (s *WorkflowService) SetExecutionLauncher(launcher ExecutionLauncher)
- func (s *WorkflowService) SetFlagEvaluator(evaluator workflowFlagEvaluator)
- func (s *WorkflowService) SetMetricsRecorder(recorder workflowMetricsRecorder)
- func (s *WorkflowService) SetToolGateway(gateway ToolRuntime)
- func (s *WorkflowService) StartCleanupJob()
- func (s *WorkflowService) UnregisterTrigger(ctx context.Context, workflowID string) error
- func (s *WorkflowService) UpdateWorkflow(ctx context.Context, workflow *Workflow) (*Workflow, error)
- func (s *WorkflowService) ValidateWorkflow(ctx context.Context, workflow *Workflow) error
- type WorkflowSettings
- type WorkflowStatus
- type WorkflowTemplateResponse
Constants ¶
This section is empty.
Variables ¶
var ( ErrWorkflowNotFound = errors.New("workflow not found") ErrWorkflowDisabled = errors.New("workflow is disabled") ErrExecutionNotFound = errors.New("execution not found") ErrNodeNotFound = errors.New("node not found") ErrInvalidWorkflow = errors.New("invalid workflow definition") ErrInvalidNode = errors.New("invalid node configuration") ErrInvalidTrigger = errors.New("invalid trigger configuration") ErrCyclicDependency = errors.New("cyclic dependency detected") ErrMaxExecutionsReached = errors.New("maximum concurrent executions reached") ErrExecutionTimeout = errors.New("execution timed out") ErrNodeExecutionFailed = errors.New("node execution failed") )
Common errors
Functions ¶
func TenantFromContext ¶
Types ¶
type ActionConfig ¶
type ActionConfig struct {
Type ActionType `json:"type"`
HTTP *HTTPActionConfig `json:"http,omitempty"`
LLM *LLMActionConfig `json:"llm,omitempty"`
Skill *SkillActionConfig `json:"skill,omitempty"`
Channel *ChannelActionConfig `json:"channel,omitempty"`
Browser *BrowserActionConfig `json:"browser,omitempty"`
File *FileActionConfig `json:"file,omitempty"`
JavaScript *JavaScriptConfig `json:"javascript,omitempty"`
Variable *VariableActionConfig `json:"variable,omitempty"`
Email *EmailActionConfig `json:"email,omitempty"`
Notify *NotifyActionConfig `json:"notify,omitempty"`
}
ActionConfig contains action-specific configuration.
type ActionHandler ¶
type ActionHandler func(ctx context.Context, config map[string]interface{}, input map[string]interface{}) (map[string]interface{}, error)
ActionHandler handles execution of a specific action type.
type ActionType ¶
type ActionType string
ActionType represents the type of action node.
const ( ActionTypeHTTP ActionType = "http" ActionTypeLLM ActionType = "llm" ActionTypeSkill ActionType = "skill" ActionTypeChannelSend ActionType = "channel_send" ActionTypeBrowser ActionType = "browser" ActionTypeFileOps ActionType = "file_ops" ActionTypeJavaScript ActionType = "javascript" ActionTypeSetVariable ActionType = "set_variable" ActionTypeEmail ActionType = "email" ActionTypeNotify ActionType = "notify" )
type BrowserActionConfig ¶
type BrowserActionConfig struct {
Action string `json:"action"` // navigate, click, type, screenshot, etc.
URL string `json:"url,omitempty"`
Selector string `json:"selector,omitempty"`
Value string `json:"value,omitempty"`
Options map[string]interface{} `json:"options,omitempty"`
}
BrowserActionConfig contains browser action configuration.
type ChannelActionConfig ¶
type ChannelActionConfig struct {
ChannelType string `json:"channel_type"`
ChannelID string `json:"channel_id"`
Message string `json:"message"`
Format string `json:"format,omitempty"` // text, markdown, html
}
ChannelActionConfig contains channel send action configuration.
type ChannelTriggerConfig ¶
type ChannelTriggerConfig struct {
ChannelType string `json:"channel_type"` // telegram, discord, etc.
ChannelID string `json:"channel_id,omitempty"`
Keywords []string `json:"keywords,omitempty"` // Trigger on keywords
Regex string `json:"regex,omitempty"` // Trigger on regex match
FromUsers []string `json:"from_users,omitempty"` // Filter by user
}
ChannelTriggerConfig contains channel message trigger configuration.
type ConditionConfig ¶
type ConditionConfig struct {
Expression string `json:"expression"` // JavaScript expression
TruePort string `json:"true_port,omitempty"`
FalsePort string `json:"false_port,omitempty"`
}
ConditionConfig contains condition node configuration.
type Config ¶
type Config struct {
MaxConcurrentExecutions int `json:"max_concurrent_executions" yaml:"max_concurrent_executions"`
DefaultTimeout int `json:"default_timeout" yaml:"default_timeout"` // seconds
MaxTimeout int `json:"max_timeout" yaml:"max_timeout"` // seconds
RetryDelay int `json:"retry_delay" yaml:"retry_delay"` // seconds
MaxRetries int `json:"max_retries" yaml:"max_retries"`
SaveExecutionData bool `json:"save_execution_data" yaml:"save_execution_data"`
ExecutionDataRetention int `json:"execution_data_retention" yaml:"execution_data_retention"` // days
WebhookBasePath string `json:"webhook_base_path" yaml:"webhook_base_path"`
Enabled bool `json:"enabled" yaml:"enabled"`
}
Config contains workflow engine configuration.
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns the default workflow configuration.
type Connection ¶
type Connection struct {
ID string `json:"id"`
SourceNode string `json:"source_node"`
SourcePort string `json:"source_port,omitempty"`
TargetNode string `json:"target_node"`
TargetPort string `json:"target_port,omitempty"`
Condition string `json:"condition,omitempty"` // For conditional connections
}
Connection represents a connection between nodes.
type CreateWorkflowRequest ¶
type CreateWorkflowRequest struct {
Name string `json:"name" validate:"required"`
Description string `json:"description,omitempty"`
Nodes []Node `json:"nodes,omitempty"`
Connections []Connection `json:"connections,omitempty"`
Variables map[string]string `json:"variables,omitempty"`
Settings *WorkflowSettings `json:"settings,omitempty"`
Tags []string `json:"tags,omitempty"`
}
CreateWorkflowRequest represents a create workflow request.
type DelayConfig ¶
type DelayConfig struct {
Duration int `json:"duration"` // Delay in seconds
Until string `json:"until,omitempty"` // Wait until specific time
}
DelayConfig contains delay node configuration.
type EmailActionConfig ¶
type EmailActionConfig struct {
To []string `json:"to"`
Cc []string `json:"cc,omitempty"`
Bcc []string `json:"bcc,omitempty"`
Subject string `json:"subject"`
Body string `json:"body"`
ContentType string `json:"content_type,omitempty"` // text, html
Attachments []string `json:"attachments,omitempty"`
}
EmailActionConfig contains email action configuration.
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine executes workflows.
func (*Engine) CancelExecution ¶
CancelExecution cancels a running execution.
func (*Engine) Execute ¶
func (e *Engine) Execute(ctx context.Context, workflow *Workflow, triggerType TriggerType, triggerData map[string]interface{}) (*Execution, error)
Execute starts a workflow execution.
func (*Engine) GetExecution ¶
GetExecution returns an execution by ID.
func (*Engine) ListExecutions ¶
ListExecutions returns all executions.
func (*Engine) RegisterHandler ¶
func (e *Engine) RegisterHandler(actionType ActionType, handler ActionHandler)
RegisterHandler registers a custom action handler.
func (*Engine) ResumeExecution ¶
func (e *Engine) ResumeExecution(id string, resume ExecutionResumeInput) (*Execution, error)
ResumeExecution resumes a paused execution with a decision payload.
func (*Engine) SetCheckpointEnabledFunc ¶
SetCheckpointEnabledFunc wires a runtime gate for checkpoint/pause semantics.
func (*Engine) SetExecutionObserver ¶
SetExecutionObserver wires a callback for execution state transitions.
func (*Engine) SetToolGateway ¶
func (e *Engine) SetToolGateway(gateway ToolRuntime)
SetToolGateway wires the shared tool runtime into workflow action handlers.
func (*Engine) ValidateWorkflow ¶
ValidateWorkflow validates a workflow definition.
type ExecuteWorkflowRequest ¶
type ExecuteWorkflowRequest struct {
TriggerData map[string]interface{} `json:"trigger_data,omitempty"`
}
ExecuteWorkflowRequest represents an execute workflow request.
type Execution ¶
type Execution struct {
ID string `json:"id"`
WorkflowID string `json:"workflow_id"`
WorkflowName string `json:"workflow_name"`
TenantID string `json:"tenant_id"`
Status ExecutionStatus `json:"status"`
StatusReason string `json:"status_reason,omitempty"`
TriggerType TriggerType `json:"trigger_type"`
TriggerData map[string]interface{} `json:"trigger_data,omitempty"`
Variables map[string]interface{} `json:"variables,omitempty"`
NodeResults map[string]*NodeResult `json:"node_results,omitempty"`
Checkpoint *ExecutionCheckpoint `json:"checkpoint,omitempty"`
Error string `json:"error,omitempty"`
StartedAt time.Time `json:"started_at"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
Duration int64 `json:"duration,omitempty"` // Duration in milliseconds
}
Execution represents a workflow execution instance.
type ExecutionCancellationController ¶
type ExecutionCancellationController interface {
CancelExecution(ctx context.Context, executionID string, reason string) (handled bool, err error)
}
ExecutionCancellationController optionally lets callers route execution cancellation through the owning runtime/control plane.
type ExecutionCheckpoint ¶
type ExecutionCheckpoint struct {
ID string `json:"id"`
Kind ExecutionCheckpointKind `json:"kind"`
NodeID string `json:"node_id,omitempty"`
NodeName string `json:"node_name,omitempty"`
Reason string `json:"reason,omitempty"`
Payload map[string]interface{} `json:"payload,omitempty"`
CreatedAt time.Time `json:"created_at"`
ResumedAt *time.Time `json:"resumed_at,omitempty"`
Resume *ExecutionResumeInput `json:"resume,omitempty"`
}
ExecutionCheckpoint captures pause/resume state for an execution.
type ExecutionCheckpointKind ¶
type ExecutionCheckpointKind string
ExecutionCheckpointKind identifies a deterministic pause/resume checkpoint.
const ( ExecutionCheckpointPauseForApproval ExecutionCheckpointKind = "pause_for_approval" ExecutionCheckpointResumeWithDecision ExecutionCheckpointKind = "resume_with_decision" ExecutionCheckpointAwaitToolResult ExecutionCheckpointKind = "await_tool_result" ExecutionCheckpointJSONTask ExecutionCheckpointKind = "json_task" )
type ExecutionLaunchRequest ¶
type ExecutionLaunchRequest struct {
WorkflowID string `json:"workflow_id"`
TriggerType TriggerType `json:"trigger_type,omitempty"`
TriggerData map[string]interface{} `json:"trigger_data,omitempty"`
UserID string `json:"user_id,omitempty"`
ConversationID string `json:"conversation_id,omitempty"`
TenantID string `json:"tenant_id,omitempty"`
}
ExecutionLaunchRequest describes a caller-originated workflow run request.
type ExecutionLauncher ¶
type ExecutionLauncher interface {
LaunchExecution(ctx context.Context, req ExecutionLaunchRequest) (*Execution, error)
}
ExecutionLauncher allows callers to route workflow launches through an alternate runtime while preserving workflow.Execution responses.
type ExecutionLog ¶
type ExecutionLog struct {
ID string `json:"id"`
ExecutionID string `json:"execution_id"`
NodeID string `json:"node_id,omitempty"`
Level string `json:"level"` // debug, info, warn, error
Message string `json:"message"`
Data string `json:"data,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
ExecutionLog represents a log entry for an execution.
type ExecutionResumeController ¶
type ExecutionResumeController interface {
ResumeExecution(ctx context.Context, executionID string, resume ExecutionResumeInput) (handled bool, execution *Execution, err error)
}
ExecutionResumeController optionally lets callers route checkpoint resume through the owning runtime/control plane.
type ExecutionResumeInput ¶
type ExecutionResumeInput struct {
Decision string `json:"decision,omitempty"`
Payload map[string]interface{} `json:"payload,omitempty"`
}
ExecutionResumeInput represents a resume decision/payload supplied by callers.
type ExecutionStatus ¶
type ExecutionStatus string
ExecutionStatus represents the status of a workflow execution.
const ( ExecutionStatusPending ExecutionStatus = "pending" ExecutionStatusRunning ExecutionStatus = "running" ExecutionStatusCompleted ExecutionStatus = "completed" ExecutionStatusFailed ExecutionStatus = "failed" ExecutionStatusCancelled ExecutionStatus = "cancelled" ExecutionStatusPaused ExecutionStatus = "paused" )
type FileActionConfig ¶
type FileActionConfig struct {
Operation string `json:"operation"` // read, write, copy, move, delete
SourcePath string `json:"source_path"`
DestPath string `json:"dest_path,omitempty"`
Content string `json:"content,omitempty"`
Encoding string `json:"encoding,omitempty"`
CreateDirs bool `json:"create_dirs,omitempty"`
}
FileActionConfig contains file operation configuration.
type FileChangeConfig ¶
type FileChangeConfig struct {
Path string `json:"path"`
Patterns []string `json:"patterns,omitempty"` // File patterns to watch
Events []string `json:"events,omitempty"` // create, modify, delete
Recursive bool `json:"recursive,omitempty"` // Watch subdirectories
}
FileChangeConfig contains file change trigger configuration.
type HTTPActionConfig ¶
type HTTPActionConfig struct {
URL string `json:"url"`
Method string `json:"method"`
Headers map[string]string `json:"headers,omitempty"`
Body string `json:"body,omitempty"`
ContentType string `json:"content_type,omitempty"`
Timeout int `json:"timeout,omitempty"`
FollowRedirects bool `json:"follow_redirects,omitempty"`
ValidateSSL bool `json:"validate_ssl,omitempty"`
}
HTTPActionConfig contains HTTP action configuration.
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler handles HTTP requests for workflows.
func NewHandler ¶
func NewHandler(service *WorkflowService) *Handler
NewHandler creates a new workflow handler.
func NewLazyHandler ¶
func NewLazyHandler(initFn func() *WorkflowService) *Handler
NewLazyHandler creates a handler that defers WorkflowService creation to the first API call.
func (*Handler) CancelExecution ¶
CancelExecution cancels a running execution.
func (*Handler) CreateWorkflow ¶
CreateWorkflow creates a new workflow.
func (*Handler) DeleteWorkflow ¶
DeleteWorkflow deletes a workflow.
func (*Handler) DisableWorkflow ¶
DisableWorkflow disables a workflow.
func (*Handler) EnableWorkflow ¶
EnableWorkflow enables a workflow.
func (*Handler) ExecuteWorkflow ¶
ExecuteWorkflow manually executes a workflow.
func (*Handler) ExportWorkflow ¶
ExportWorkflow exports a workflow as JSON.
func (*Handler) GetExecution ¶
GetExecution retrieves an execution by ID.
func (*Handler) GetExecutionLogs ¶
GetExecutionLogs retrieves logs for an execution.
func (*Handler) GetService ¶
func (h *Handler) GetService() *WorkflowService
GetService returns the workflow service, triggering lazy init if needed.
func (*Handler) GetTemplates ¶
GetTemplates returns available workflow templates.
func (*Handler) GetWorkflow ¶
GetWorkflow retrieves a workflow by ID.
func (*Handler) HandleWebhook ¶
HandleWebhook handles incoming webhook requests.
func (*Handler) ImportWorkflow ¶
ImportWorkflow imports a workflow from JSON.
func (*Handler) ListExecutions ¶
ListExecutions lists executions for a workflow.
func (*Handler) ListWorkflows ¶
ListWorkflows lists workflows with pagination.
func (*Handler) RegisterRoutes ¶
RegisterRoutes registers the workflow routes.
func (*Handler) ResumeExecution ¶
ResumeExecution resumes a paused execution from its current checkpoint.
func (*Handler) RetryExecution ¶
RetryExecution retries a failed execution.
func (*Handler) SetExecutionLauncher ¶
func (h *Handler) SetExecutionLauncher(launcher ExecutionLauncher)
SetExecutionLauncher overrides manual workflow execute requests with a harness-aware launcher while leaving the service-based read APIs intact.
func (*Handler) SetIdleReclaim ¶
SetIdleReclaim configures idle reclaim for the lazily initialized workflow service.
func (*Handler) SetRouteMiddlewares ¶
func (h *Handler) SetRouteMiddlewares(middlewares ...echo.MiddlewareFunc)
SetRouteMiddlewares applies security middleware to workflow management routes. Webhook routes stay public and are intentionally excluded.
func (*Handler) SetServiceInitHook ¶
func (h *Handler) SetServiceInitHook(fn func(*WorkflowService))
SetServiceInitHook configures a callback that runs once the lazy service is created.
func (*Handler) UpdateWorkflow ¶
UpdateWorkflow updates an existing workflow.
type JavaScriptConfig ¶
JavaScriptConfig contains JavaScript execution configuration.
type LLMActionConfig ¶
type LLMActionConfig struct {
Provider string `json:"provider"` // openai, claude, ollama
Model string `json:"model"`
Prompt string `json:"prompt"`
SystemPrompt string `json:"system_prompt,omitempty"`
Temperature float64 `json:"temperature,omitempty"`
MaxTokens int `json:"max_tokens,omitempty"`
Tools []string `json:"tools,omitempty"` // Tool names to enable
}
LLMActionConfig contains LLM action configuration.
type ListOptions ¶
type ListOptions struct {
Offset int `json:"offset"`
Limit int `json:"limit"`
Sort string `json:"sort,omitempty"`
Order string `json:"order,omitempty"` // asc, desc
Filters map[string]string `json:"filters,omitempty"`
}
ListOptions contains options for listing resources.
type LoopConfig ¶
type LoopConfig struct {
Type string `json:"type"` // for_each, while, count
Items string `json:"items,omitempty"` // Expression for items to iterate
Condition string `json:"condition,omitempty"` // While condition
Count int `json:"count,omitempty"` // Fixed count
MaxIterations int `json:"max_iterations,omitempty"` // Safety limit
}
LoopConfig contains loop node configuration.
type Node ¶
type Node struct {
ID string `json:"id"`
Type NodeType `json:"type"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Config map[string]interface{} `json:"config,omitempty"`
Position *Position `json:"position,omitempty"`
Disabled bool `json:"disabled,omitempty"`
}
Node represents a workflow node.
type NodeResult ¶
type NodeResult struct {
NodeID string `json:"node_id"`
NodeName string `json:"node_name"`
Status NodeStatus `json:"status"`
Input map[string]interface{} `json:"input,omitempty"`
Output map[string]interface{} `json:"output,omitempty"`
Error string `json:"error,omitempty"`
StartedAt time.Time `json:"started_at"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
Duration int64 `json:"duration,omitempty"` // Duration in milliseconds
RetryCount int `json:"retry_count,omitempty"`
}
NodeResult represents the result of a node execution.
type NodeStatus ¶
type NodeStatus string
NodeStatus represents the status of a node execution.
const ( NodeStatusPending NodeStatus = "pending" NodeStatusRunning NodeStatus = "running" NodeStatusCompleted NodeStatus = "completed" NodeStatusFailed NodeStatus = "failed" NodeStatusSkipped NodeStatus = "skipped" )
type NotifyActionConfig ¶
type NotifyActionConfig struct {
Title string `json:"title,omitempty"`
Message string `json:"message"`
Priority string `json:"priority,omitempty"` // low, normal, high
Channel string `json:"channel,omitempty"` // Notification channel
}
NotifyActionConfig contains notification action configuration.
type Repository ¶
type Repository struct {
// contains filtered or unexported fields
}
Repository handles workflow persistence.
func NewRepository ¶
func NewRepository(db *sql.DB) (*Repository, error)
NewRepository creates a new workflow repository.
func NewRepositoryWithReadDB ¶
func NewRepositoryWithReadDB(writeDB, readDB *sql.DB) (*Repository, error)
NewRepositoryWithReadDB creates a new workflow repository with separate write and read database handles.
func (*Repository) CleanupOldExecutions ¶
func (*Repository) CreateWorkflow ¶
func (r *Repository) CreateWorkflow(ctx context.Context, workflow *Workflow) error
func (*Repository) DeleteWebhook ¶
func (r *Repository) DeleteWebhook(ctx context.Context, workflowID string) error
func (*Repository) DeleteWorkflow ¶
func (r *Repository) DeleteWorkflow(ctx context.Context, id string) error
func (*Repository) GetExecution ¶
func (*Repository) GetExecutionLogs ¶
func (r *Repository) GetExecutionLogs(ctx context.Context, executionID string, opts *ListOptions) ([]*ExecutionLog, int, error)
func (*Repository) GetWebhookByPath ¶
func (*Repository) GetWorkflow ¶
func (*Repository) ListExecutions ¶
func (r *Repository) ListExecutions(ctx context.Context, workflowID string, opts *ListOptions) ([]*Execution, int, error)
func (*Repository) ListWorkflows ¶
func (r *Repository) ListWorkflows(ctx context.Context, tenantID string, opts *ListOptions) ([]*Workflow, int, error)
func (*Repository) SaveExecution ¶
func (r *Repository) SaveExecution(ctx context.Context, execution *Execution) error
func (*Repository) SaveExecutionLog ¶
func (r *Repository) SaveExecutionLog(ctx context.Context, log *ExecutionLog) error
func (*Repository) SaveWebhook ¶
func (*Repository) UpdateWorkflow ¶
func (r *Repository) UpdateWorkflow(ctx context.Context, workflow *Workflow) error
type ResumeExecutionRequest ¶
type ResumeExecutionRequest struct {
Decision string `json:"decision,omitempty"`
Payload map[string]interface{} `json:"payload,omitempty"`
}
ResumeExecutionRequest represents a resume execution request.
type ScheduleConfig ¶
type ScheduleConfig struct {
Cron string `json:"cron,omitempty"` // Cron expression
Interval int `json:"interval,omitempty"` // Interval in seconds
Timezone string `json:"timezone,omitempty"` // Timezone for cron
}
ScheduleConfig contains schedule trigger configuration.
type Service ¶
type Service interface {
// Workflow CRUD
CreateWorkflow(ctx context.Context, workflow *Workflow) (*Workflow, error)
GetWorkflow(ctx context.Context, id string) (*Workflow, error)
UpdateWorkflow(ctx context.Context, workflow *Workflow) (*Workflow, error)
DeleteWorkflow(ctx context.Context, id string) error
ListWorkflows(ctx context.Context, tenantID string, opts *ListOptions) ([]*Workflow, int, error)
// Workflow control
EnableWorkflow(ctx context.Context, id string) error
DisableWorkflow(ctx context.Context, id string) error
ValidateWorkflow(ctx context.Context, workflow *Workflow) error
// Execution
ExecuteWorkflow(ctx context.Context, id string, triggerData map[string]interface{}) (*Execution, error)
ExecuteWorkflowWithTrigger(ctx context.Context, id string, triggerType TriggerType, triggerData map[string]interface{}) (*Execution, error)
GetExecution(ctx context.Context, id string) (*Execution, error)
ListExecutions(ctx context.Context, workflowID string, opts *ListOptions) ([]*Execution, int, error)
CancelExecution(ctx context.Context, id string) error
RetryExecution(ctx context.Context, id string) (*Execution, error)
ResumeExecution(ctx context.Context, id string, resume ExecutionResumeInput) (*Execution, error)
// Logs
GetExecutionLogs(ctx context.Context, executionID string, opts *ListOptions) ([]*ExecutionLog, int, error)
// Triggers
RegisterTrigger(ctx context.Context, workflowID string, trigger *TriggerConfig) error
UnregisterTrigger(ctx context.Context, workflowID string) error
// Webhook
HandleWebhook(ctx context.Context, path string, method string, headers map[string]string, body []byte) (*Execution, error)
// Stats
GetStats(ctx context.Context, tenantID string) (*Stats, error)
}
Service defines the workflow service interface.
type SkillActionConfig ¶
type SkillActionConfig struct {
SkillID string `json:"skill_id"`
Parameters map[string]interface{} `json:"parameters,omitempty"`
}
SkillActionConfig contains skill action configuration.
type Stats ¶
type Stats struct {
TotalWorkflows int `json:"total_workflows"`
ActiveWorkflows int `json:"active_workflows"`
TotalExecutions int `json:"total_executions"`
RunningExecutions int `json:"running_executions"`
SuccessfulExecutions int `json:"successful_executions"`
FailedExecutions int `json:"failed_executions"`
}
Stats contains workflow statistics.
type SwitchConfig ¶
type SwitchConfig struct {
Expression string `json:"expression"` // Expression to evaluate
Cases map[string]string `json:"cases"` // Value -> port mapping
Default string `json:"default,omitempty"` // Default port
}
SwitchConfig contains switch node configuration.
type SystemTriggerConfig ¶
type SystemTriggerConfig struct {
EventType string `json:"event_type"` // startup, shutdown, etc.
}
SystemTriggerConfig contains system event trigger configuration.
type ToolExecutionRequest ¶
type ToolExecutionRequest struct {
ToolName string
Arguments map[string]interface{}
SessionID string
UserID string
Provider string
ProviderID string
Model string
AgentID string
}
ToolExecutionRequest is the workflow-local representation of a tool call.
type ToolExecutionResult ¶
type ToolExecutionResult struct {
ExecutionResult interface{}
CompactPayload interface{}
AuditPayload interface{}
}
ToolExecutionResult is the workflow-local result envelope returned by ToolRuntime.
type ToolRuntime ¶
type ToolRuntime interface {
Execute(ctx context.Context, req ToolExecutionRequest) (*ToolExecutionResult, error)
}
ToolRuntime is the workflow-side abstraction for shared tool execution.
type TriggerConfig ¶
type TriggerConfig struct {
Type TriggerType `json:"type"`
Schedule *ScheduleConfig `json:"schedule,omitempty"`
Webhook *WebhookConfig `json:"webhook,omitempty"`
File *FileChangeConfig `json:"file,omitempty"`
Channel *ChannelTriggerConfig `json:"channel,omitempty"`
System *SystemTriggerConfig `json:"system,omitempty"`
}
TriggerConfig contains trigger-specific configuration.
type TriggerType ¶
type TriggerType string
TriggerType represents the type of workflow trigger.
const ( TriggerTypeManual TriggerType = "manual" TriggerTypeSchedule TriggerType = "schedule" TriggerTypeWebhook TriggerType = "webhook" TriggerTypeFileChange TriggerType = "file_change" TriggerTypeChannelMsg TriggerType = "channel_message" TriggerTypeSystemEvent TriggerType = "system_event" )
type UpdateWorkflowRequest ¶
type UpdateWorkflowRequest struct {
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
Status WorkflowStatus `json:"status,omitempty"`
Nodes []Node `json:"nodes,omitempty"`
Connections []Connection `json:"connections,omitempty"`
Variables map[string]string `json:"variables,omitempty"`
Settings *WorkflowSettings `json:"settings,omitempty"`
Tags []string `json:"tags,omitempty"`
}
UpdateWorkflowRequest represents an update workflow request.
type VariableActionConfig ¶
type VariableActionConfig struct {
Name string `json:"name"`
Value string `json:"value"`
Scope string `json:"scope,omitempty"` // execution, workflow
}
VariableActionConfig contains variable set action configuration.
type WebhookConfig ¶
type WebhookConfig struct {
Path string `json:"path"`
Method string `json:"method,omitempty"` // GET, POST, etc.
Headers map[string]string `json:"headers,omitempty"`
AuthType string `json:"auth_type,omitempty"` // none, basic, bearer, api_key
AuthConfig map[string]string `json:"auth_config,omitempty"`
}
WebhookConfig contains webhook trigger configuration.
type Workflow ¶
type Workflow struct {
ID string `json:"id"`
TenantID string `json:"tenant_id"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Status WorkflowStatus `json:"status"`
Nodes []Node `json:"nodes"`
Connections []Connection `json:"connections"`
Variables map[string]string `json:"variables,omitempty"`
Settings *WorkflowSettings `json:"settings,omitempty"`
Tags []string `json:"tags,omitempty"`
Version int `json:"version"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CreatedBy string `json:"created_by,omitempty"`
UpdatedBy string `json:"updated_by,omitempty"`
}
Workflow represents a workflow definition.
type WorkflowService ¶
type WorkflowService struct {
// contains filtered or unexported fields
}
WorkflowService implements the Service interface.
func NewService ¶
func NewService(config *Config, repo *Repository) (*WorkflowService, error)
NewService creates a new workflow service.
func (*WorkflowService) CancelExecution ¶
func (s *WorkflowService) CancelExecution(ctx context.Context, id string) error
CancelExecution cancels a running execution.
func (*WorkflowService) CancelExecutionDirect ¶
func (s *WorkflowService) CancelExecutionDirect(ctx context.Context, id string) error
CancelExecutionDirect cancels an execution directly in the workflow runtime. Drivers should use this to avoid routing back through harness control paths.
func (*WorkflowService) CleanupOldExecutions ¶
func (s *WorkflowService) CleanupOldExecutions(ctx context.Context) (int64, error)
CleanupOldExecutions removes old execution data.
func (*WorkflowService) Close ¶
func (s *WorkflowService) Close() error
Close shuts down the service.
func (*WorkflowService) CreateWorkflow ¶
func (s *WorkflowService) CreateWorkflow(ctx context.Context, workflow *Workflow) (*Workflow, error)
CreateWorkflow creates a new workflow.
func (*WorkflowService) DeleteWorkflow ¶
func (s *WorkflowService) DeleteWorkflow(ctx context.Context, id string) error
DeleteWorkflow deletes a workflow.
func (*WorkflowService) DisableWorkflow ¶
func (s *WorkflowService) DisableWorkflow(ctx context.Context, id string) error
DisableWorkflow disables a workflow.
func (*WorkflowService) EnableWorkflow ¶
func (s *WorkflowService) EnableWorkflow(ctx context.Context, id string) error
EnableWorkflow enables a workflow.
func (*WorkflowService) ExecuteWorkflow ¶
func (s *WorkflowService) ExecuteWorkflow(ctx context.Context, id string, triggerData map[string]interface{}) (*Execution, error)
ExecuteWorkflow manually executes a workflow.
func (*WorkflowService) ExecuteWorkflowWithTrigger ¶
func (s *WorkflowService) ExecuteWorkflowWithTrigger(ctx context.Context, id string, triggerType TriggerType, triggerData map[string]interface{}) (*Execution, error)
ExecuteWorkflowWithTrigger executes a workflow directly with an explicit trigger type.
func (*WorkflowService) GetExecution ¶
GetExecution retrieves an execution by ID.
func (*WorkflowService) GetExecutionLogs ¶
func (s *WorkflowService) GetExecutionLogs(ctx context.Context, executionID string, opts *ListOptions) ([]*ExecutionLog, int, error)
GetExecutionLogs retrieves logs for an execution.
func (*WorkflowService) GetWorkflow ¶
GetWorkflow retrieves a workflow by ID.
func (*WorkflowService) HandleWebhook ¶
func (s *WorkflowService) HandleWebhook(ctx context.Context, path string, method string, headers map[string]string, body []byte) (*Execution, error)
HandleWebhook handles an incoming webhook request.
func (*WorkflowService) ListExecutions ¶
func (s *WorkflowService) ListExecutions(ctx context.Context, workflowID string, opts *ListOptions) ([]*Execution, int, error)
ListExecutions lists executions for a workflow.
func (*WorkflowService) ListWorkflows ¶
func (s *WorkflowService) ListWorkflows(ctx context.Context, tenantID string, opts *ListOptions) ([]*Workflow, int, error)
ListWorkflows lists workflows with pagination.
func (*WorkflowService) RegisterActionHandler ¶
func (s *WorkflowService) RegisterActionHandler(actionType ActionType, handler ActionHandler)
RegisterActionHandler registers a custom action handler.
func (*WorkflowService) RegisterTrigger ¶
func (s *WorkflowService) RegisterTrigger(ctx context.Context, workflowID string, trigger *TriggerConfig) error
RegisterTrigger registers a trigger for a workflow.
func (*WorkflowService) ResumeExecution ¶
func (s *WorkflowService) ResumeExecution(ctx context.Context, id string, resume ExecutionResumeInput) (*Execution, error)
ResumeExecution resumes a paused workflow execution.
func (*WorkflowService) ResumeExecutionDirect ¶
func (s *WorkflowService) ResumeExecutionDirect(ctx context.Context, id string, resume ExecutionResumeInput) (*Execution, error)
ResumeExecutionDirect resumes a paused execution directly in the workflow runtime. Drivers and control-plane bridges should use this to avoid routing back through harness launch paths.
func (*WorkflowService) RetryExecution ¶
RetryExecution retries a failed execution.
func (*WorkflowService) SetExecutionLauncher ¶
func (s *WorkflowService) SetExecutionLauncher(launcher ExecutionLauncher)
SetExecutionLauncher wires an optional submission/control runtime for externally initiated workflow executions.
func (*WorkflowService) SetFlagEvaluator ¶
func (s *WorkflowService) SetFlagEvaluator(evaluator workflowFlagEvaluator)
SetFlagEvaluator wires feature-flag evaluation into workflow runtime behavior.
func (*WorkflowService) SetMetricsRecorder ¶
func (s *WorkflowService) SetMetricsRecorder(recorder workflowMetricsRecorder)
SetMetricsRecorder wires a lightweight runtime counter recorder.
func (*WorkflowService) SetToolGateway ¶
func (s *WorkflowService) SetToolGateway(gateway ToolRuntime)
SetToolGateway wires the shared tool runtime into the workflow engine.
func (*WorkflowService) StartCleanupJob ¶
func (s *WorkflowService) StartCleanupJob()
StartCleanupJob starts a background job to clean up old executions.
func (*WorkflowService) UnregisterTrigger ¶
func (s *WorkflowService) UnregisterTrigger(ctx context.Context, workflowID string) error
UnregisterTrigger unregisters a trigger for a workflow.
func (*WorkflowService) UpdateWorkflow ¶
func (s *WorkflowService) UpdateWorkflow(ctx context.Context, workflow *Workflow) (*Workflow, error)
UpdateWorkflow updates an existing workflow.
func (*WorkflowService) ValidateWorkflow ¶
func (s *WorkflowService) ValidateWorkflow(ctx context.Context, workflow *Workflow) error
ValidateWorkflow validates a workflow definition.
type WorkflowSettings ¶
type WorkflowSettings struct {
Timeout int `json:"timeout,omitempty"` // Execution timeout in seconds
MaxRetries int `json:"max_retries,omitempty"` // Max retries on failure
RetryDelay int `json:"retry_delay,omitempty"` // Delay between retries in seconds
ContinueOnError bool `json:"continue_on_error,omitempty"` // Continue execution on node error
SaveExecutionData bool `json:"save_execution_data,omitempty"` // Save execution data for debugging
MaxConcurrent int `json:"max_concurrent,omitempty"` // Max concurrent executions
NotifyOnCompletion bool `json:"notify_on_completion,omitempty"` // Send notification on completion
NotifyOnError bool `json:"notify_on_error,omitempty"` // Send notification on error
}
WorkflowSettings contains workflow-level settings.
type WorkflowStatus ¶
type WorkflowStatus string
WorkflowStatus represents the status of a workflow.
const ( WorkflowStatusActive WorkflowStatus = "active" WorkflowStatusInactive WorkflowStatus = "inactive" WorkflowStatusDraft WorkflowStatus = "draft" )
type WorkflowTemplateResponse ¶
type WorkflowTemplateResponse struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Category string `json:"category"`
Tags []string `json:"tags"`
Workflow Workflow `json:"workflow"`
}
WorkflowTemplateResponse represents a workflow template.
func DefaultTemplates ¶
func DefaultTemplates() []WorkflowTemplateResponse
DefaultTemplates returns the built-in workflow templates.