Documentation
¶
Overview ¶
Package composer provides composite tool workflow execution for Virtual MCP Server.
Composite tools orchestrate multi-step workflows across multiple backend MCP servers. The package supports sequential and parallel execution, user elicitation, conditional logic, and error handling.
Package composer provides composite tool workflow execution for Virtual MCP Server.
Package composer provides composite tool workflow execution for Virtual MCP Server.
Package composer provides composite tool workflow execution for Virtual MCP Server.
Package composer provides composite tool workflow execution for Virtual MCP Server.
Package composer provides composite tool workflow execution for Virtual MCP Server.
Package composer provides composite tool workflow execution for Virtual MCP Server.
Index ¶
- Variables
- type Composer
- type DefaultElicitationHandler
- type ElicitationConfig
- type ElicitationHandler
- type ElicitationProtocolHandler
- type ElicitationResponse
- type ErrorHandler
- type InMemoryWorkflowStateStore
- func (s *InMemoryWorkflowStateStore) DeleteState(_ context.Context, workflowID string) error
- func (s *InMemoryWorkflowStateStore) ListActiveWorkflows(_ context.Context) ([]string, error)
- func (s *InMemoryWorkflowStateStore) LoadState(_ context.Context, workflowID string) (*WorkflowStatus, error)
- func (s *InMemoryWorkflowStateStore) SaveState(_ context.Context, workflowID string, state *WorkflowStatus) error
- type PendingElicitation
- type SDKElicitationRequester
- type StepResult
- type StepStatusType
- type StepType
- type TemplateExpander
- type ValidationError
- type WorkflowContext
- func (ctx *WorkflowContext) Clone() *WorkflowContext
- func (ctx *WorkflowContext) GetLastStepOutput() map[string]any
- func (ctx *WorkflowContext) GetStepResult(stepID string) (*StepResult, bool)
- func (ctx *WorkflowContext) HasStepCompleted(stepID string) bool
- func (ctx *WorkflowContext) HasStepFailed(stepID string) bool
- func (ctx *WorkflowContext) RecordStepFailure(stepID string, err error)
- func (ctx *WorkflowContext) RecordStepSkipped(stepID string)
- func (ctx *WorkflowContext) RecordStepStart(stepID string)
- func (ctx *WorkflowContext) RecordStepSuccess(stepID string, output map[string]any)
- type WorkflowDefinition
- type WorkflowError
- type WorkflowResult
- type WorkflowStateStore
- type WorkflowStatus
- type WorkflowStatusType
- type WorkflowStep
Constants ¶
This section is empty.
Variables ¶
var ( // ErrElicitationTimeout is returned when an elicitation request times out. ErrElicitationTimeout = errors.New("elicitation request timed out") // ErrElicitationCancelled is returned when the user cancels the elicitation. ErrElicitationCancelled = errors.New("elicitation request was cancelled by user") // ErrElicitationDeclined is returned when the user declines the elicitation. ErrElicitationDeclined = errors.New("elicitation request was declined by user") // ErrSchemaTooLarge is returned when the schema exceeds size limits. ErrSchemaTooLarge = errors.New("schema too large") // ErrSchemaTooDeep is returned when the schema exceeds nesting depth limits. ErrSchemaTooDeep = errors.New("schema nesting too deep") // ErrContentTooLarge is returned when response content exceeds size limits. ErrContentTooLarge = errors.New("response content too large") )
var ( // ErrWorkflowNotFound indicates the workflow doesn't exist. ErrWorkflowNotFound = errors.New("workflow not found") // ErrWorkflowTimeout indicates the workflow exceeded its timeout. ErrWorkflowTimeout = errors.New("workflow timed out") // ErrWorkflowCancelled indicates the workflow was cancelled. ErrWorkflowCancelled = errors.New("workflow cancelled") // ErrInvalidWorkflowDefinition indicates the workflow definition is invalid. ErrInvalidWorkflowDefinition = errors.New("invalid workflow definition") // ErrStepFailed indicates a workflow step failed. ErrStepFailed = errors.New("step failed") // ErrTemplateExpansion indicates template expansion failed. ErrTemplateExpansion = errors.New("template expansion failed") // ErrCircularDependency indicates a circular dependency in step dependencies. ErrCircularDependency = errors.New("circular dependency detected") // ErrDependencyNotMet indicates a step dependency hasn't completed. ErrDependencyNotMet = errors.New("dependency not met") // ErrToolCallFailed indicates a tool call failed. ErrToolCallFailed = errors.New("tool call failed") )
Common workflow execution errors.
Functions ¶
This section is empty.
Types ¶
type Composer ¶
type Composer interface {
// ExecuteWorkflow executes a composite tool workflow.
// Returns the workflow result or an error if execution fails.
ExecuteWorkflow(ctx context.Context, def *WorkflowDefinition, params map[string]any) (*WorkflowResult, error)
// ValidateWorkflow checks if a workflow definition is valid.
// This includes checking for cycles, invalid tool references, etc.
ValidateWorkflow(ctx context.Context, def *WorkflowDefinition) error
// GetWorkflowStatus returns the current status of a running workflow.
// Used for long-running workflows with elicitation.
GetWorkflowStatus(ctx context.Context, workflowID string) (*WorkflowStatus, error)
// CancelWorkflow cancels a running workflow.
CancelWorkflow(ctx context.Context, workflowID string) error
}
Composer executes composite tool workflows that orchestrate multi-step operations across multiple backend MCP servers.
Workflows can include:
- Sequential tool calls
- Parallel execution (DAG-based)
- User elicitation (interactive prompts)
- Conditional execution
- Error handling and retries
func NewWorkflowEngine ¶ added in v0.6.0
func NewWorkflowEngine( rtr router.Router, backendClient vmcp.BackendClient, elicitationHandler ElicitationProtocolHandler, ) Composer
NewWorkflowEngine creates a new workflow execution engine.
The elicitationHandler parameter is optional. If nil, elicitation steps will fail. This allows the engine to be used without elicitation support for simple workflows.
type DefaultElicitationHandler ¶ added in v0.6.4
type DefaultElicitationHandler struct {
// contains filtered or unexported fields
}
DefaultElicitationHandler implements ElicitationProtocolHandler as a thin wrapper around the MCP SDK.
This handler provides:
- SDK-agnostic abstraction layer (enables SDK migration)
- Security validation (timeout, schema size/depth, content size)
- Error transformation (SDK errors → domain errors)
- Logging and observability
The handler delegates JSON-RPC ID correlation to the underlying SDK, which handles it internally. We only provide validation, transformation, and abstraction.
Per MCP 2025-06-18 spec: Elicitation is synchronous - send request, block, receive response.
Thread-safety: Safe for concurrent calls. The underlying SDK session must be thread-safe.
func NewDefaultElicitationHandler ¶ added in v0.6.4
func NewDefaultElicitationHandler(sdkRequester SDKElicitationRequester) *DefaultElicitationHandler
NewDefaultElicitationHandler creates a new SDK-agnostic elicitation handler.
The sdkRequester parameter wraps the underlying MCP SDK's RequestElicitation functionality. For mark3labs SDK, this would be the MCPServer instance. For a future official SDK, this would be replaced without changing workflow code.
func (*DefaultElicitationHandler) RequestElicitation ¶ added in v0.6.4
func (h *DefaultElicitationHandler) RequestElicitation( ctx context.Context, workflowID string, stepID string, config *ElicitationConfig, ) (*ElicitationResponse, error)
RequestElicitation sends an elicitation request to the client and waits for response.
This is a synchronous blocking call that:
- Validates configuration and enforces security limits (timeout, schema size/depth, content size)
- Applies timeout constraints (default 5min, max 1hour)
- Delegates to SDK's RequestElicitation (which handles JSON-RPC ID correlation)
- Validates response content size
- Transforms SDK response to domain type
Per security review: Enforces max timeout (10 minutes), schema size (100KB), schema depth (10 levels), and response content size (1MB) to prevent resource exhaustion attacks.
Per MCP 2025-06-18 spec: The SDK handles JSON-RPC ID correlation internally. The workflowID and stepID parameters are for logging/tracking only.
Returns ElicitationResponse or error if validation fails, timeout occurs, or user declines/cancels.
type ElicitationConfig ¶
type ElicitationConfig struct {
// Message is the prompt message shown to the user.
Message string
// Schema is the JSON Schema for the requested data.
// Per MCP spec, must be a flat object with primitive properties.
Schema map[string]any
// Timeout is how long to wait for user response.
// Default: 5 minutes.
Timeout time.Duration
// OnDecline defines what to do if user declines.
OnDecline *ElicitationHandler
// OnCancel defines what to do if user cancels.
OnCancel *ElicitationHandler
}
ElicitationConfig defines parameters for elicitation steps.
type ElicitationHandler ¶
type ElicitationHandler struct {
// Action defines what to do.
// Options: "skip_remaining", "abort", "continue"
Action string
}
ElicitationHandler defines how to handle elicitation responses.
type ElicitationProtocolHandler ¶
type ElicitationProtocolHandler interface {
// RequestElicitation sends an elicitation request to the client and waits for response.
//
// This is a synchronous blocking call that:
// 1. Validates configuration and enforces security limits
// 2. Sends the elicitation request to the client via underlying SDK
// 3. Blocks until the client responds or timeout occurs
// 4. Returns the user's response (accept/decline/cancel)
//
// Per MCP 2025-06-18: The SDK handles JSON-RPC ID correlation internally.
// The workflowID and stepID are for internal tracking/logging only.
//
// Returns ElicitationResponse or error if timeout/cancelled/failed.
RequestElicitation(
ctx context.Context,
workflowID string,
stepID string,
config *ElicitationConfig,
) (*ElicitationResponse, error)
}
ElicitationProtocolHandler handles MCP elicitation protocol interactions.
This interface provides an SDK-agnostic abstraction for elicitation requests, enabling migration from mark3labs SDK to official SDK without changing workflow code.
Per MCP 2025-06-18 spec: Elicitation is a synchronous request/response protocol where the server sends a request and blocks until the client responds.
type ElicitationResponse ¶
type ElicitationResponse struct {
// Action is what the user did: "accept", "decline", "cancel"
Action string
// Content contains the user-provided data (for accept action).
Content map[string]any
// ReceivedAt is when the response was received.
ReceivedAt time.Time
}
ElicitationResponse represents a user's response to an elicitation.
type ErrorHandler ¶
type ErrorHandler struct {
// Action defines what to do when the step fails.
// Options: "abort", "continue", "retry"
Action string
// RetryCount is the number of retry attempts (for retry action).
RetryCount int
// RetryDelay is the initial delay between retries.
// Uses exponential backoff: delay * 2^attempt
RetryDelay time.Duration
// ContinueOnError indicates whether to continue workflow on error.
ContinueOnError bool
}
ErrorHandler defines how to handle step failures.
type InMemoryWorkflowStateStore ¶ added in v0.6.4
type InMemoryWorkflowStateStore struct {
// contains filtered or unexported fields
}
InMemoryWorkflowStateStore implements WorkflowStateStore with in-memory storage.
This implementation stores workflow state in memory, which means:
- State is lost on server restart
- No support for distributed/multi-instance deployments
- Fast access with no I/O overhead
This is suitable for Phase 2 (basic elicitation support). Future phases can implement Redis/DB backends for persistence and distribution.
Thread-safety: Safe for concurrent access using sync.RWMutex.
func (*InMemoryWorkflowStateStore) DeleteState ¶ added in v0.6.4
func (s *InMemoryWorkflowStateStore) DeleteState( _ context.Context, workflowID string, ) error
DeleteState removes workflow state.
This is idempotent - deleting a non-existent workflow is not an error.
func (*InMemoryWorkflowStateStore) ListActiveWorkflows ¶ added in v0.6.4
func (s *InMemoryWorkflowStateStore) ListActiveWorkflows(_ context.Context) ([]string, error)
ListActiveWorkflows returns all active workflow IDs.
A workflow is considered active if it has state stored in the store. The returned list is a snapshot at the time of the call.
func (*InMemoryWorkflowStateStore) LoadState ¶ added in v0.6.4
func (s *InMemoryWorkflowStateStore) LoadState( _ context.Context, workflowID string, ) (*WorkflowStatus, error)
LoadState retrieves workflow state.
Returns ErrWorkflowNotFound if the workflow does not exist. The returned state is a clone to prevent external modifications.
func (*InMemoryWorkflowStateStore) SaveState ¶ added in v0.6.4
func (s *InMemoryWorkflowStateStore) SaveState( _ context.Context, workflowID string, state *WorkflowStatus, ) error
SaveState persists workflow state.
If a workflow with the same ID already exists, it is overwritten. This is thread-safe for concurrent saves.
type PendingElicitation ¶
type PendingElicitation struct {
// StepID is the elicitation step ID.
StepID string
// Message is the elicitation message.
Message string
// Schema is the requested data schema.
Schema map[string]any
// ExpiresAt is when the elicitation times out.
ExpiresAt time.Time
}
PendingElicitation represents an elicitation awaiting user response.
type SDKElicitationRequester ¶ added in v0.6.4
type SDKElicitationRequester interface {
// RequestElicitation sends an elicitation request via the SDK and blocks for response.
// This wraps the SDK's synchronous RequestElicitation method.
RequestElicitation(ctx context.Context, request mcp.ElicitationRequest) (*mcp.ElicitationResult, error)
}
SDKElicitationRequester is an abstraction for the underlying MCP SDK's elicitation functionality.
This interface wraps the SDK's RequestElicitation method, enabling:
- Migration from mark3labs SDK to official SDK without changing workflow code
- Mocking for unit tests
- Custom implementations for testing
The SDK handles JSON-RPC ID correlation internally - our wrapper doesn't need to track IDs.
type StepResult ¶
type StepResult struct {
// StepID identifies the step.
StepID string
// Status is the step status.
Status StepStatusType
// Output contains the step output data.
Output map[string]any
// Error contains error information if the step failed.
Error error
// StartTime is when the step started.
StartTime time.Time
// EndTime is when the step completed.
EndTime time.Time
// Duration is the step execution time.
Duration time.Duration
// RetryCount is the number of retries performed.
RetryCount int
}
StepResult contains the result of a single workflow step.
type StepStatusType ¶
type StepStatusType string
StepStatusType represents the state of a workflow step.
const ( // StepStatusPending indicates the step is queued. StepStatusPending StepStatusType = "pending" // StepStatusRunning indicates the step is executing. StepStatusRunning StepStatusType = "running" // StepStatusCompleted indicates the step completed successfully. StepStatusCompleted StepStatusType = "completed" // StepStatusFailed indicates the step failed. StepStatusFailed StepStatusType = "failed" // StepStatusSkipped indicates the step was skipped (condition was false). StepStatusSkipped StepStatusType = "skipped" )
type StepType ¶
type StepType string
StepType defines the type of workflow step.
const ( // StepTypeTool executes a backend tool. StepTypeTool StepType = "tool" // StepTypeElicitation requests user input via MCP elicitation protocol. StepTypeElicitation StepType = "elicitation" // StepTypeConditional executes based on a condition (future). StepTypeConditional StepType = "conditional" )
type TemplateExpander ¶
type TemplateExpander interface {
// Expand evaluates templates in the given data using the workflow context.
Expand(ctx context.Context, data map[string]any, workflowCtx *WorkflowContext) (map[string]any, error)
// EvaluateCondition evaluates a condition template to a boolean.
EvaluateCondition(ctx context.Context, condition string, workflowCtx *WorkflowContext) (bool, error)
}
TemplateExpander handles template expansion for workflow arguments.
func NewTemplateExpander ¶ added in v0.6.0
func NewTemplateExpander() TemplateExpander
NewTemplateExpander creates a new template expander.
type ValidationError ¶ added in v0.6.0
type ValidationError struct {
// Field is the field that failed validation.
Field string
// Message is the error message.
Message string
// Cause is the underlying error.
Cause error
}
ValidationError wraps workflow validation errors.
func NewValidationError ¶ added in v0.6.0
func NewValidationError(field, message string, cause error) *ValidationError
NewValidationError creates a new validation error.
func (*ValidationError) Error ¶ added in v0.6.0
func (e *ValidationError) Error() string
Error implements the error interface.
func (*ValidationError) Unwrap ¶ added in v0.6.0
func (e *ValidationError) Unwrap() error
Unwrap returns the underlying error.
type WorkflowContext ¶
type WorkflowContext struct {
// WorkflowID is the unique workflow execution ID.
WorkflowID string
// Params are the input parameters.
Params map[string]any
// Steps contains the results of completed steps.
Steps map[string]*StepResult
// Variables stores workflow-scoped variables.
Variables map[string]any
}
WorkflowContext contains the execution context for a workflow.
func (*WorkflowContext) Clone ¶ added in v0.6.0
func (ctx *WorkflowContext) Clone() *WorkflowContext
Clone creates a shallow copy of the workflow context. Maps and step results are cloned, but nested values within maps are shared. This is useful for testing and validation.
func (*WorkflowContext) GetLastStepOutput ¶ added in v0.6.0
func (ctx *WorkflowContext) GetLastStepOutput() map[string]any
GetLastStepOutput retrieves the output of the most recently completed step. This is useful for getting the final workflow output.
func (*WorkflowContext) GetStepResult ¶ added in v0.6.0
func (ctx *WorkflowContext) GetStepResult(stepID string) (*StepResult, bool)
GetStepResult retrieves a step result by ID.
func (*WorkflowContext) HasStepCompleted ¶ added in v0.6.0
func (ctx *WorkflowContext) HasStepCompleted(stepID string) bool
HasStepCompleted checks if a step has completed successfully.
func (*WorkflowContext) HasStepFailed ¶ added in v0.6.0
func (ctx *WorkflowContext) HasStepFailed(stepID string) bool
HasStepFailed checks if a step has failed.
func (*WorkflowContext) RecordStepFailure ¶ added in v0.6.0
func (ctx *WorkflowContext) RecordStepFailure(stepID string, err error)
RecordStepFailure records a step failure.
func (*WorkflowContext) RecordStepSkipped ¶ added in v0.6.0
func (ctx *WorkflowContext) RecordStepSkipped(stepID string)
RecordStepSkipped records that a step was skipped (condition was false).
func (*WorkflowContext) RecordStepStart ¶ added in v0.6.0
func (ctx *WorkflowContext) RecordStepStart(stepID string)
RecordStepStart records that a step has started execution.
func (*WorkflowContext) RecordStepSuccess ¶ added in v0.6.0
func (ctx *WorkflowContext) RecordStepSuccess(stepID string, output map[string]any)
RecordStepSuccess records a successful step completion.
type WorkflowDefinition ¶
type WorkflowDefinition struct {
// Name is the workflow name (must be unique).
Name string
// Description describes what the workflow does.
Description string
// Parameters defines the input parameter schema (JSON Schema).
Parameters map[string]any
// Steps are the workflow steps to execute.
Steps []WorkflowStep
// Timeout is the maximum execution time for the workflow.
// Default: 30 minutes.
Timeout time.Duration
// FailureMode defines how to handle step failures.
// Options: "abort" (default), "continue", "best_effort"
FailureMode string
// Metadata stores additional workflow information.
Metadata map[string]string
}
WorkflowDefinition defines a composite tool workflow.
type WorkflowError ¶ added in v0.6.0
type WorkflowError struct {
// WorkflowID is the workflow execution ID.
WorkflowID string
// StepID is the step that caused the error (if applicable).
StepID string
// Message is the error message.
Message string
// Cause is the underlying error.
Cause error
}
WorkflowError wraps workflow execution errors with context.
func NewWorkflowError ¶ added in v0.6.0
func NewWorkflowError(workflowID, stepID, message string, cause error) *WorkflowError
NewWorkflowError creates a new workflow error.
func (*WorkflowError) Error ¶ added in v0.6.0
func (e *WorkflowError) Error() string
Error implements the error interface.
func (*WorkflowError) Unwrap ¶ added in v0.6.0
func (e *WorkflowError) Unwrap() error
Unwrap returns the underlying error for errors.Is and errors.As.
type WorkflowResult ¶
type WorkflowResult struct {
// WorkflowID is the unique identifier for this execution.
WorkflowID string
// Status is the final workflow status.
Status WorkflowStatusType
// Output contains the workflow output data.
// Typically the output of the last step.
Output map[string]any
// Steps contains the results of each step.
Steps map[string]*StepResult
// Error contains error information if the workflow failed.
Error error
// StartTime is when the workflow started.
StartTime time.Time
// EndTime is when the workflow completed.
EndTime time.Time
// Duration is the total execution time.
Duration time.Duration
// Metadata stores additional result information.
Metadata map[string]string
}
WorkflowResult contains the output of a workflow execution.
type WorkflowStateStore ¶
type WorkflowStateStore interface {
// SaveState persists workflow state.
SaveState(ctx context.Context, workflowID string, state *WorkflowStatus) error
// LoadState retrieves workflow state.
LoadState(ctx context.Context, workflowID string) (*WorkflowStatus, error)
// DeleteState removes workflow state.
DeleteState(ctx context.Context, workflowID string) error
// ListActiveWorkflows returns all active workflow IDs.
ListActiveWorkflows(ctx context.Context) ([]string, error)
}
WorkflowStateStore manages workflow execution state. This enables persistence and recovery of long-running workflows.
func NewInMemoryWorkflowStateStore ¶ added in v0.6.4
func NewInMemoryWorkflowStateStore() WorkflowStateStore
NewInMemoryWorkflowStateStore creates a new in-memory workflow state store.
type WorkflowStatus ¶
type WorkflowStatus struct {
// WorkflowID identifies the workflow.
WorkflowID string
// Status is the current workflow status.
Status WorkflowStatusType
// CurrentStep is the currently executing step (if running).
CurrentStep string
// CompletedSteps are the steps that have completed.
CompletedSteps []string
// PendingElicitations are elicitations waiting for user response.
PendingElicitations []*PendingElicitation
// StartTime is when the workflow started.
StartTime time.Time
// LastUpdateTime is when the status was last updated.
LastUpdateTime time.Time
}
WorkflowStatus represents the current state of a workflow execution.
type WorkflowStatusType ¶
type WorkflowStatusType string
WorkflowStatusType represents the state of a workflow.
const ( // WorkflowStatusPending indicates the workflow is queued. WorkflowStatusPending WorkflowStatusType = "pending" // WorkflowStatusRunning indicates the workflow is executing. WorkflowStatusRunning WorkflowStatusType = "running" // WorkflowStatusWaitingForElicitation indicates the workflow is waiting for user input. WorkflowStatusWaitingForElicitation WorkflowStatusType = "waiting_for_elicitation" // WorkflowStatusCompleted indicates the workflow completed successfully. WorkflowStatusCompleted WorkflowStatusType = "completed" // WorkflowStatusFailed indicates the workflow failed. WorkflowStatusFailed WorkflowStatusType = "failed" // WorkflowStatusCancelled indicates the workflow was cancelled. WorkflowStatusCancelled WorkflowStatusType = "cancelled" // WorkflowStatusTimedOut indicates the workflow timed out. WorkflowStatusTimedOut WorkflowStatusType = "timed_out" )
type WorkflowStep ¶
type WorkflowStep struct {
// ID uniquely identifies this step within the workflow.
ID string
// Type is the step type: "tool", "elicitation", "conditional"
Type StepType
// Tool is the tool to call (for tool steps).
// Format: "toolname" or "backend.toolname"
Tool string
// Arguments are the tool arguments with template expansion support.
// Templates use Go text/template syntax with access to:
// - {{.params.name}}: Input parameters
// - {{.steps.stepid.output}}: Previous step outputs
// - {{.steps.stepid.content}}: Elicitation response data
// - {{.steps.stepid.action}}: Elicitation action (accept/decline/cancel)
Arguments map[string]any
// Condition is an optional condition for conditional execution.
// If specified and evaluates to false, the step is skipped.
// Uses template syntax, must evaluate to boolean.
Condition string
// DependsOn lists step IDs that must complete before this step.
// Enables DAG-based parallel execution.
DependsOn []string
// OnError defines error handling for this step.
OnError *ErrorHandler
// Elicitation defines elicitation parameters (for elicitation steps).
Elicitation *ElicitationConfig
// Timeout is the maximum execution time for this step.
Timeout time.Duration
// Metadata stores additional step information.
Metadata map[string]string
}
WorkflowStep represents a single step in a workflow.