Documentation
¶
Overview ¶
Package composer provides composite tool workflow execution for Virtual MCP Server.
Composite tools orchestrate multi-step workflows across multiple backend MCP servers. The package supports sequential and parallel execution, user elicitation, conditional logic, and error handling.
Package composer provides composite tool workflow execution for Virtual MCP Server.
Package composer provides composite tool workflow execution for Virtual MCP Server.
Package composer provides composite tool workflow execution for Virtual MCP Server.
Package composer provides composite tool workflow execution for Virtual MCP Server.
Index ¶
- Variables
- type Composer
- type ElicitationConfig
- type ElicitationHandler
- type ElicitationProtocolHandler
- type ElicitationResponse
- type ErrorHandler
- type PendingElicitation
- type StepResult
- type StepStatusType
- type StepType
- type TemplateExpander
- type ValidationError
- type WorkflowContext
- func (ctx *WorkflowContext) Clone() *WorkflowContext
- func (ctx *WorkflowContext) GetLastStepOutput() map[string]any
- func (ctx *WorkflowContext) GetStepResult(stepID string) (*StepResult, bool)
- func (ctx *WorkflowContext) HasStepCompleted(stepID string) bool
- func (ctx *WorkflowContext) HasStepFailed(stepID string) bool
- func (ctx *WorkflowContext) RecordStepFailure(stepID string, err error)
- func (ctx *WorkflowContext) RecordStepSkipped(stepID string)
- func (ctx *WorkflowContext) RecordStepStart(stepID string)
- func (ctx *WorkflowContext) RecordStepSuccess(stepID string, output map[string]any)
- type WorkflowDefinition
- type WorkflowError
- type WorkflowResult
- type WorkflowStateStore
- type WorkflowStatus
- type WorkflowStatusType
- type WorkflowStep
Constants ¶
This section is empty.
Variables ¶
var ( // ErrWorkflowNotFound indicates the workflow doesn't exist. ErrWorkflowNotFound = errors.New("workflow not found") // ErrWorkflowTimeout indicates the workflow exceeded its timeout. ErrWorkflowTimeout = errors.New("workflow timed out") // ErrWorkflowCancelled indicates the workflow was cancelled. ErrWorkflowCancelled = errors.New("workflow cancelled") // ErrInvalidWorkflowDefinition indicates the workflow definition is invalid. ErrInvalidWorkflowDefinition = errors.New("invalid workflow definition") // ErrStepFailed indicates a workflow step failed. ErrStepFailed = errors.New("step failed") // ErrTemplateExpansion indicates template expansion failed. ErrTemplateExpansion = errors.New("template expansion failed") // ErrCircularDependency indicates a circular dependency in step dependencies. ErrCircularDependency = errors.New("circular dependency detected") // ErrDependencyNotMet indicates a step dependency hasn't completed. ErrDependencyNotMet = errors.New("dependency not met") // ErrToolCallFailed indicates a tool call failed. ErrToolCallFailed = errors.New("tool call failed") )
Common workflow execution errors.
Functions ¶
This section is empty.
Types ¶
type Composer ¶
type Composer interface {
// ExecuteWorkflow executes a composite tool workflow.
// Returns the workflow result or an error if execution fails.
ExecuteWorkflow(ctx context.Context, def *WorkflowDefinition, params map[string]any) (*WorkflowResult, error)
// ValidateWorkflow checks if a workflow definition is valid.
// This includes checking for cycles, invalid tool references, etc.
ValidateWorkflow(ctx context.Context, def *WorkflowDefinition) error
// GetWorkflowStatus returns the current status of a running workflow.
// Used for long-running workflows with elicitation.
GetWorkflowStatus(ctx context.Context, workflowID string) (*WorkflowStatus, error)
// CancelWorkflow cancels a running workflow.
CancelWorkflow(ctx context.Context, workflowID string) error
}
Composer executes composite tool workflows that orchestrate multi-step operations across multiple backend MCP servers.
Workflows can include:
- Sequential tool calls
- Parallel execution (DAG-based)
- User elicitation (interactive prompts)
- Conditional execution
- Error handling and retries
func NewWorkflowEngine ¶ added in v0.6.0
func NewWorkflowEngine( rtr router.Router, backendClient vmcp.BackendClient, ) Composer
NewWorkflowEngine creates a new workflow execution engine.
type ElicitationConfig ¶
type ElicitationConfig struct {
// Message is the prompt message shown to the user.
Message string
// Schema is the JSON Schema for the requested data.
// Per MCP spec, must be a flat object with primitive properties.
Schema map[string]any
// Timeout is how long to wait for user response.
// Default: 5 minutes.
Timeout time.Duration
// OnDecline defines what to do if user declines.
OnDecline *ElicitationHandler
// OnCancel defines what to do if user cancels.
OnCancel *ElicitationHandler
}
ElicitationConfig defines parameters for elicitation steps.
type ElicitationHandler ¶
type ElicitationHandler struct {
// Action defines what to do.
// Options: "skip_remaining", "abort", "continue"
Action string
}
ElicitationHandler defines how to handle elicitation responses.
type ElicitationProtocolHandler ¶
type ElicitationProtocolHandler interface {
// SendElicitation sends an elicitation request to the client.
SendElicitation(ctx context.Context, workflowID string, stepID string, config *ElicitationConfig) error
// ReceiveElicitationResponse waits for and returns the user's response.
// Blocks until response is received or timeout occurs.
ReceiveElicitationResponse(ctx context.Context, workflowID string, stepID string) (*ElicitationResponse, error)
}
ElicitationProtocolHandler handles MCP elicitation protocol interactions.
type ElicitationResponse ¶
type ElicitationResponse struct {
// Action is what the user did: "accept", "decline", "cancel"
Action string
// Content contains the user-provided data (for accept action).
Content map[string]any
// ReceivedAt is when the response was received.
ReceivedAt time.Time
}
ElicitationResponse represents a user's response to an elicitation.
type ErrorHandler ¶
type ErrorHandler struct {
// Action defines what to do when the step fails.
// Options: "abort", "continue", "retry"
Action string
// RetryCount is the number of retry attempts (for retry action).
RetryCount int
// RetryDelay is the initial delay between retries.
// Uses exponential backoff: delay * 2^attempt
RetryDelay time.Duration
// ContinueOnError indicates whether to continue workflow on error.
ContinueOnError bool
}
ErrorHandler defines how to handle step failures.
type PendingElicitation ¶
type PendingElicitation struct {
// StepID is the elicitation step ID.
StepID string
// Message is the elicitation message.
Message string
// Schema is the requested data schema.
Schema map[string]any
// ExpiresAt is when the elicitation times out.
ExpiresAt time.Time
}
PendingElicitation represents an elicitation awaiting user response.
type StepResult ¶
type StepResult struct {
// StepID identifies the step.
StepID string
// Status is the step status.
Status StepStatusType
// Output contains the step output data.
Output map[string]any
// Error contains error information if the step failed.
Error error
// StartTime is when the step started.
StartTime time.Time
// EndTime is when the step completed.
EndTime time.Time
// Duration is the step execution time.
Duration time.Duration
// RetryCount is the number of retries performed.
RetryCount int
}
StepResult contains the result of a single workflow step.
type StepStatusType ¶
type StepStatusType string
StepStatusType represents the state of a workflow step.
const ( // StepStatusPending indicates the step is queued. StepStatusPending StepStatusType = "pending" // StepStatusRunning indicates the step is executing. StepStatusRunning StepStatusType = "running" // StepStatusCompleted indicates the step completed successfully. StepStatusCompleted StepStatusType = "completed" // StepStatusFailed indicates the step failed. StepStatusFailed StepStatusType = "failed" // StepStatusSkipped indicates the step was skipped (condition was false). StepStatusSkipped StepStatusType = "skipped" )
type StepType ¶
type StepType string
StepType defines the type of workflow step.
const ( // StepTypeTool executes a backend tool. StepTypeTool StepType = "tool" // StepTypeElicitation requests user input via MCP elicitation protocol. StepTypeElicitation StepType = "elicitation" // StepTypeConditional executes based on a condition (future). StepTypeConditional StepType = "conditional" )
type TemplateExpander ¶
type TemplateExpander interface {
// Expand evaluates templates in the given data using the workflow context.
Expand(ctx context.Context, data map[string]any, workflowCtx *WorkflowContext) (map[string]any, error)
// EvaluateCondition evaluates a condition template to a boolean.
EvaluateCondition(ctx context.Context, condition string, workflowCtx *WorkflowContext) (bool, error)
}
TemplateExpander handles template expansion for workflow arguments.
func NewTemplateExpander ¶ added in v0.6.0
func NewTemplateExpander() TemplateExpander
NewTemplateExpander creates a new template expander.
type ValidationError ¶ added in v0.6.0
type ValidationError struct {
// Field is the field that failed validation.
Field string
// Message is the error message.
Message string
// Cause is the underlying error.
Cause error
}
ValidationError wraps workflow validation errors.
func NewValidationError ¶ added in v0.6.0
func NewValidationError(field, message string, cause error) *ValidationError
NewValidationError creates a new validation error.
func (*ValidationError) Error ¶ added in v0.6.0
func (e *ValidationError) Error() string
Error implements the error interface.
func (*ValidationError) Unwrap ¶ added in v0.6.0
func (e *ValidationError) Unwrap() error
Unwrap returns the underlying error.
type WorkflowContext ¶
type WorkflowContext struct {
// WorkflowID is the unique workflow execution ID.
WorkflowID string
// Params are the input parameters.
Params map[string]any
// Steps contains the results of completed steps.
Steps map[string]*StepResult
// Variables stores workflow-scoped variables.
Variables map[string]any
}
WorkflowContext contains the execution context for a workflow.
func (*WorkflowContext) Clone ¶ added in v0.6.0
func (ctx *WorkflowContext) Clone() *WorkflowContext
Clone creates a shallow copy of the workflow context. Maps and step results are cloned, but nested values within maps are shared. This is useful for testing and validation.
func (*WorkflowContext) GetLastStepOutput ¶ added in v0.6.0
func (ctx *WorkflowContext) GetLastStepOutput() map[string]any
GetLastStepOutput retrieves the output of the most recently completed step. This is useful for getting the final workflow output.
func (*WorkflowContext) GetStepResult ¶ added in v0.6.0
func (ctx *WorkflowContext) GetStepResult(stepID string) (*StepResult, bool)
GetStepResult retrieves a step result by ID.
func (*WorkflowContext) HasStepCompleted ¶ added in v0.6.0
func (ctx *WorkflowContext) HasStepCompleted(stepID string) bool
HasStepCompleted checks if a step has completed successfully.
func (*WorkflowContext) HasStepFailed ¶ added in v0.6.0
func (ctx *WorkflowContext) HasStepFailed(stepID string) bool
HasStepFailed checks if a step has failed.
func (*WorkflowContext) RecordStepFailure ¶ added in v0.6.0
func (ctx *WorkflowContext) RecordStepFailure(stepID string, err error)
RecordStepFailure records a step failure.
func (*WorkflowContext) RecordStepSkipped ¶ added in v0.6.0
func (ctx *WorkflowContext) RecordStepSkipped(stepID string)
RecordStepSkipped records that a step was skipped (condition was false).
func (*WorkflowContext) RecordStepStart ¶ added in v0.6.0
func (ctx *WorkflowContext) RecordStepStart(stepID string)
RecordStepStart records that a step has started execution.
func (*WorkflowContext) RecordStepSuccess ¶ added in v0.6.0
func (ctx *WorkflowContext) RecordStepSuccess(stepID string, output map[string]any)
RecordStepSuccess records a successful step completion.
type WorkflowDefinition ¶
type WorkflowDefinition struct {
// Name is the workflow name (must be unique).
Name string
// Description describes what the workflow does.
Description string
// Parameters defines the input parameter schema (JSON Schema).
Parameters map[string]any
// Steps are the workflow steps to execute.
Steps []WorkflowStep
// Timeout is the maximum execution time for the workflow.
// Default: 30 minutes.
Timeout time.Duration
// FailureMode defines how to handle step failures.
// Options: "abort" (default), "continue", "best_effort"
FailureMode string
// Metadata stores additional workflow information.
Metadata map[string]string
}
WorkflowDefinition defines a composite tool workflow.
type WorkflowError ¶ added in v0.6.0
type WorkflowError struct {
// WorkflowID is the workflow execution ID.
WorkflowID string
// StepID is the step that caused the error (if applicable).
StepID string
// Message is the error message.
Message string
// Cause is the underlying error.
Cause error
}
WorkflowError wraps workflow execution errors with context.
func NewWorkflowError ¶ added in v0.6.0
func NewWorkflowError(workflowID, stepID, message string, cause error) *WorkflowError
NewWorkflowError creates a new workflow error.
func (*WorkflowError) Error ¶ added in v0.6.0
func (e *WorkflowError) Error() string
Error implements the error interface.
func (*WorkflowError) Unwrap ¶ added in v0.6.0
func (e *WorkflowError) Unwrap() error
Unwrap returns the underlying error for errors.Is and errors.As.
type WorkflowResult ¶
type WorkflowResult struct {
// WorkflowID is the unique identifier for this execution.
WorkflowID string
// Status is the final workflow status.
Status WorkflowStatusType
// Output contains the workflow output data.
// Typically the output of the last step.
Output map[string]any
// Steps contains the results of each step.
Steps map[string]*StepResult
// Error contains error information if the workflow failed.
Error error
// StartTime is when the workflow started.
StartTime time.Time
// EndTime is when the workflow completed.
EndTime time.Time
// Duration is the total execution time.
Duration time.Duration
// Metadata stores additional result information.
Metadata map[string]string
}
WorkflowResult contains the output of a workflow execution.
type WorkflowStateStore ¶
type WorkflowStateStore interface {
// SaveState persists workflow state.
SaveState(ctx context.Context, workflowID string, state *WorkflowStatus) error
// LoadState retrieves workflow state.
LoadState(ctx context.Context, workflowID string) (*WorkflowStatus, error)
// DeleteState removes workflow state.
DeleteState(ctx context.Context, workflowID string) error
// ListActiveWorkflows returns all active workflow IDs.
ListActiveWorkflows(ctx context.Context) ([]string, error)
}
WorkflowStateStore manages workflow execution state. This enables persistence and recovery of long-running workflows.
type WorkflowStatus ¶
type WorkflowStatus struct {
// WorkflowID identifies the workflow.
WorkflowID string
// Status is the current workflow status.
Status WorkflowStatusType
// CurrentStep is the currently executing step (if running).
CurrentStep string
// CompletedSteps are the steps that have completed.
CompletedSteps []string
// PendingElicitations are elicitations waiting for user response.
PendingElicitations []*PendingElicitation
// StartTime is when the workflow started.
StartTime time.Time
// LastUpdateTime is when the status was last updated.
LastUpdateTime time.Time
}
WorkflowStatus represents the current state of a workflow execution.
type WorkflowStatusType ¶
type WorkflowStatusType string
WorkflowStatusType represents the state of a workflow.
const ( // WorkflowStatusPending indicates the workflow is queued. WorkflowStatusPending WorkflowStatusType = "pending" // WorkflowStatusRunning indicates the workflow is executing. WorkflowStatusRunning WorkflowStatusType = "running" // WorkflowStatusWaitingForElicitation indicates the workflow is waiting for user input. WorkflowStatusWaitingForElicitation WorkflowStatusType = "waiting_for_elicitation" // WorkflowStatusCompleted indicates the workflow completed successfully. WorkflowStatusCompleted WorkflowStatusType = "completed" // WorkflowStatusFailed indicates the workflow failed. WorkflowStatusFailed WorkflowStatusType = "failed" // WorkflowStatusCancelled indicates the workflow was cancelled. WorkflowStatusCancelled WorkflowStatusType = "cancelled" // WorkflowStatusTimedOut indicates the workflow timed out. WorkflowStatusTimedOut WorkflowStatusType = "timed_out" )
type WorkflowStep ¶
type WorkflowStep struct {
// ID uniquely identifies this step within the workflow.
ID string
// Type is the step type: "tool", "elicitation", "conditional"
Type StepType
// Tool is the tool to call (for tool steps).
// Format: "toolname" or "backend.toolname"
Tool string
// Arguments are the tool arguments with template expansion support.
// Templates use Go text/template syntax with access to:
// - {{.params.name}}: Input parameters
// - {{.steps.stepid.output}}: Previous step outputs
// - {{.steps.stepid.content}}: Elicitation response data
// - {{.steps.stepid.action}}: Elicitation action (accept/decline/cancel)
Arguments map[string]any
// Condition is an optional condition for conditional execution.
// If specified and evaluates to false, the step is skipped.
// Uses template syntax, must evaluate to boolean.
Condition string
// DependsOn lists step IDs that must complete before this step.
// Enables DAG-based parallel execution.
DependsOn []string
// OnError defines error handling for this step.
OnError *ErrorHandler
// Elicitation defines elicitation parameters (for elicitation steps).
Elicitation *ElicitationConfig
// Timeout is the maximum execution time for this step.
Timeout time.Duration
// Metadata stores additional step information.
Metadata map[string]string
}
WorkflowStep represents a single step in a workflow.