composer

package
v0.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2025 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package composer provides composite tool workflow execution for Virtual MCP Server.

Composite tools orchestrate multi-step workflows across multiple backend MCP servers. The package supports sequential and parallel execution, user elicitation, conditional logic, and error handling.

Package composer provides composite tool workflow execution for Virtual MCP Server.

Package composer provides composite tool workflow execution for Virtual MCP Server.

Package composer provides composite tool workflow execution for Virtual MCP Server.

Package composer provides composite tool workflow execution for Virtual MCP Server.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrWorkflowNotFound indicates the workflow doesn't exist.
	ErrWorkflowNotFound = errors.New("workflow not found")

	// ErrWorkflowTimeout indicates the workflow exceeded its timeout.
	ErrWorkflowTimeout = errors.New("workflow timed out")

	// ErrWorkflowCancelled indicates the workflow was cancelled.
	ErrWorkflowCancelled = errors.New("workflow cancelled")

	// ErrInvalidWorkflowDefinition indicates the workflow definition is invalid.
	ErrInvalidWorkflowDefinition = errors.New("invalid workflow definition")

	// ErrStepFailed indicates a workflow step failed.
	ErrStepFailed = errors.New("step failed")

	// ErrTemplateExpansion indicates template expansion failed.
	ErrTemplateExpansion = errors.New("template expansion failed")

	// ErrCircularDependency indicates a circular dependency in step dependencies.
	ErrCircularDependency = errors.New("circular dependency detected")

	// ErrDependencyNotMet indicates a step dependency hasn't completed.
	ErrDependencyNotMet = errors.New("dependency not met")

	// ErrToolCallFailed indicates a tool call failed.
	ErrToolCallFailed = errors.New("tool call failed")
)

Common workflow execution errors.

Functions

This section is empty.

Types

type Composer

type Composer interface {
	// ExecuteWorkflow executes a composite tool workflow.
	// Returns the workflow result or an error if execution fails.
	ExecuteWorkflow(ctx context.Context, def *WorkflowDefinition, params map[string]any) (*WorkflowResult, error)

	// ValidateWorkflow checks if a workflow definition is valid.
	// This includes checking for cycles, invalid tool references, etc.
	ValidateWorkflow(ctx context.Context, def *WorkflowDefinition) error

	// GetWorkflowStatus returns the current status of a running workflow.
	// Used for long-running workflows with elicitation.
	GetWorkflowStatus(ctx context.Context, workflowID string) (*WorkflowStatus, error)

	// CancelWorkflow cancels a running workflow.
	CancelWorkflow(ctx context.Context, workflowID string) error
}

Composer executes composite tool workflows that orchestrate multi-step operations across multiple backend MCP servers.

Workflows can include:

  • Sequential tool calls
  • Parallel execution (DAG-based)
  • User elicitation (interactive prompts)
  • Conditional execution
  • Error handling and retries

func NewWorkflowEngine added in v0.6.0

func NewWorkflowEngine(
	rtr router.Router,
	backendClient vmcp.BackendClient,
) Composer

NewWorkflowEngine creates a new workflow execution engine.

type ElicitationConfig

type ElicitationConfig struct {
	// Message is the prompt message shown to the user.
	Message string

	// Schema is the JSON Schema for the requested data.
	// Per MCP spec, must be a flat object with primitive properties.
	Schema map[string]any

	// Timeout is how long to wait for user response.
	// Default: 5 minutes.
	Timeout time.Duration

	// OnDecline defines what to do if user declines.
	OnDecline *ElicitationHandler

	// OnCancel defines what to do if user cancels.
	OnCancel *ElicitationHandler
}

ElicitationConfig defines parameters for elicitation steps.

type ElicitationHandler

type ElicitationHandler struct {
	// Action defines what to do.
	// Options: "skip_remaining", "abort", "continue"
	Action string
}

ElicitationHandler defines how to handle elicitation responses.

type ElicitationProtocolHandler

type ElicitationProtocolHandler interface {
	// SendElicitation sends an elicitation request to the client.
	SendElicitation(ctx context.Context, workflowID string, stepID string, config *ElicitationConfig) error

	// ReceiveElicitationResponse waits for and returns the user's response.
	// Blocks until response is received or timeout occurs.
	ReceiveElicitationResponse(ctx context.Context, workflowID string, stepID string) (*ElicitationResponse, error)
}

ElicitationProtocolHandler handles MCP elicitation protocol interactions.

type ElicitationResponse

type ElicitationResponse struct {
	// Action is what the user did: "accept", "decline", "cancel"
	Action string

	// Content contains the user-provided data (for accept action).
	Content map[string]any

	// ReceivedAt is when the response was received.
	ReceivedAt time.Time
}

ElicitationResponse represents a user's response to an elicitation.

type ErrorHandler

type ErrorHandler struct {
	// Action defines what to do when the step fails.
	// Options: "abort", "continue", "retry"
	Action string

	// RetryCount is the number of retry attempts (for retry action).
	RetryCount int

	// RetryDelay is the initial delay between retries.
	// Uses exponential backoff: delay * 2^attempt
	RetryDelay time.Duration

	// ContinueOnError indicates whether to continue workflow on error.
	ContinueOnError bool
}

ErrorHandler defines how to handle step failures.

type PendingElicitation

type PendingElicitation struct {
	// StepID is the elicitation step ID.
	StepID string

	// Message is the elicitation message.
	Message string

	// Schema is the requested data schema.
	Schema map[string]any

	// ExpiresAt is when the elicitation times out.
	ExpiresAt time.Time
}

PendingElicitation represents an elicitation awaiting user response.

type StepResult

type StepResult struct {
	// StepID identifies the step.
	StepID string

	// Status is the step status.
	Status StepStatusType

	// Output contains the step output data.
	Output map[string]any

	// Error contains error information if the step failed.
	Error error

	// StartTime is when the step started.
	StartTime time.Time

	// EndTime is when the step completed.
	EndTime time.Time

	// Duration is the step execution time.
	Duration time.Duration

	// RetryCount is the number of retries performed.
	RetryCount int
}

StepResult contains the result of a single workflow step.

type StepStatusType

type StepStatusType string

StepStatusType represents the state of a workflow step.

const (
	// StepStatusPending indicates the step is queued.
	StepStatusPending StepStatusType = "pending"

	// StepStatusRunning indicates the step is executing.
	StepStatusRunning StepStatusType = "running"

	// StepStatusCompleted indicates the step completed successfully.
	StepStatusCompleted StepStatusType = "completed"

	// StepStatusFailed indicates the step failed.
	StepStatusFailed StepStatusType = "failed"

	// StepStatusSkipped indicates the step was skipped (condition was false).
	StepStatusSkipped StepStatusType = "skipped"
)

type StepType

type StepType string

StepType defines the type of workflow step.

const (
	// StepTypeTool executes a backend tool.
	StepTypeTool StepType = "tool"

	// StepTypeElicitation requests user input via MCP elicitation protocol.
	StepTypeElicitation StepType = "elicitation"

	// StepTypeConditional executes based on a condition (future).
	StepTypeConditional StepType = "conditional"
)

type TemplateExpander

type TemplateExpander interface {
	// Expand evaluates templates in the given data using the workflow context.
	Expand(ctx context.Context, data map[string]any, workflowCtx *WorkflowContext) (map[string]any, error)

	// EvaluateCondition evaluates a condition template to a boolean.
	EvaluateCondition(ctx context.Context, condition string, workflowCtx *WorkflowContext) (bool, error)
}

TemplateExpander handles template expansion for workflow arguments.

func NewTemplateExpander added in v0.6.0

func NewTemplateExpander() TemplateExpander

NewTemplateExpander creates a new template expander.

type ValidationError added in v0.6.0

type ValidationError struct {
	// Field is the field that failed validation.
	Field string

	// Message is the error message.
	Message string

	// Cause is the underlying error.
	Cause error
}

ValidationError wraps workflow validation errors.

func NewValidationError added in v0.6.0

func NewValidationError(field, message string, cause error) *ValidationError

NewValidationError creates a new validation error.

func (*ValidationError) Error added in v0.6.0

func (e *ValidationError) Error() string

Error implements the error interface.

func (*ValidationError) Unwrap added in v0.6.0

func (e *ValidationError) Unwrap() error

Unwrap returns the underlying error.

type WorkflowContext

type WorkflowContext struct {
	// WorkflowID is the unique workflow execution ID.
	WorkflowID string

	// Params are the input parameters.
	Params map[string]any

	// Steps contains the results of completed steps.
	Steps map[string]*StepResult

	// Variables stores workflow-scoped variables.
	Variables map[string]any
}

WorkflowContext contains the execution context for a workflow.

func (*WorkflowContext) Clone added in v0.6.0

func (ctx *WorkflowContext) Clone() *WorkflowContext

Clone creates a shallow copy of the workflow context. Maps and step results are cloned, but nested values within maps are shared. This is useful for testing and validation.

func (*WorkflowContext) GetLastStepOutput added in v0.6.0

func (ctx *WorkflowContext) GetLastStepOutput() map[string]any

GetLastStepOutput retrieves the output of the most recently completed step. This is useful for getting the final workflow output.

func (*WorkflowContext) GetStepResult added in v0.6.0

func (ctx *WorkflowContext) GetStepResult(stepID string) (*StepResult, bool)

GetStepResult retrieves a step result by ID.

func (*WorkflowContext) HasStepCompleted added in v0.6.0

func (ctx *WorkflowContext) HasStepCompleted(stepID string) bool

HasStepCompleted checks if a step has completed successfully.

func (*WorkflowContext) HasStepFailed added in v0.6.0

func (ctx *WorkflowContext) HasStepFailed(stepID string) bool

HasStepFailed checks if a step has failed.

func (*WorkflowContext) RecordStepFailure added in v0.6.0

func (ctx *WorkflowContext) RecordStepFailure(stepID string, err error)

RecordStepFailure records a step failure.

func (*WorkflowContext) RecordStepSkipped added in v0.6.0

func (ctx *WorkflowContext) RecordStepSkipped(stepID string)

RecordStepSkipped records that a step was skipped (condition was false).

func (*WorkflowContext) RecordStepStart added in v0.6.0

func (ctx *WorkflowContext) RecordStepStart(stepID string)

RecordStepStart records that a step has started execution.

func (*WorkflowContext) RecordStepSuccess added in v0.6.0

func (ctx *WorkflowContext) RecordStepSuccess(stepID string, output map[string]any)

RecordStepSuccess records a successful step completion.

type WorkflowDefinition

type WorkflowDefinition struct {
	// Name is the workflow name (must be unique).
	Name string

	// Description describes what the workflow does.
	Description string

	// Parameters defines the input parameter schema (JSON Schema).
	Parameters map[string]any

	// Steps are the workflow steps to execute.
	Steps []WorkflowStep

	// Timeout is the maximum execution time for the workflow.
	// Default: 30 minutes.
	Timeout time.Duration

	// FailureMode defines how to handle step failures.
	// Options: "abort" (default), "continue", "best_effort"
	FailureMode string

	// Metadata stores additional workflow information.
	Metadata map[string]string
}

WorkflowDefinition defines a composite tool workflow.

type WorkflowError added in v0.6.0

type WorkflowError struct {
	// WorkflowID is the workflow execution ID.
	WorkflowID string

	// StepID is the step that caused the error (if applicable).
	StepID string

	// Message is the error message.
	Message string

	// Cause is the underlying error.
	Cause error
}

WorkflowError wraps workflow execution errors with context.

func NewWorkflowError added in v0.6.0

func NewWorkflowError(workflowID, stepID, message string, cause error) *WorkflowError

NewWorkflowError creates a new workflow error.

func (*WorkflowError) Error added in v0.6.0

func (e *WorkflowError) Error() string

Error implements the error interface.

func (*WorkflowError) Unwrap added in v0.6.0

func (e *WorkflowError) Unwrap() error

Unwrap returns the underlying error for errors.Is and errors.As.

type WorkflowResult

type WorkflowResult struct {
	// WorkflowID is the unique identifier for this execution.
	WorkflowID string

	// Status is the final workflow status.
	Status WorkflowStatusType

	// Output contains the workflow output data.
	// Typically the output of the last step.
	Output map[string]any

	// Steps contains the results of each step.
	Steps map[string]*StepResult

	// Error contains error information if the workflow failed.
	Error error

	// StartTime is when the workflow started.
	StartTime time.Time

	// EndTime is when the workflow completed.
	EndTime time.Time

	// Duration is the total execution time.
	Duration time.Duration

	// Metadata stores additional result information.
	Metadata map[string]string
}

WorkflowResult contains the output of a workflow execution.

type WorkflowStateStore

type WorkflowStateStore interface {
	// SaveState persists workflow state.
	SaveState(ctx context.Context, workflowID string, state *WorkflowStatus) error

	// LoadState retrieves workflow state.
	LoadState(ctx context.Context, workflowID string) (*WorkflowStatus, error)

	// DeleteState removes workflow state.
	DeleteState(ctx context.Context, workflowID string) error

	// ListActiveWorkflows returns all active workflow IDs.
	ListActiveWorkflows(ctx context.Context) ([]string, error)
}

WorkflowStateStore manages workflow execution state. This enables persistence and recovery of long-running workflows.

type WorkflowStatus

type WorkflowStatus struct {
	// WorkflowID identifies the workflow.
	WorkflowID string

	// Status is the current workflow status.
	Status WorkflowStatusType

	// CurrentStep is the currently executing step (if running).
	CurrentStep string

	// CompletedSteps are the steps that have completed.
	CompletedSteps []string

	// PendingElicitations are elicitations waiting for user response.
	PendingElicitations []*PendingElicitation

	// StartTime is when the workflow started.
	StartTime time.Time

	// LastUpdateTime is when the status was last updated.
	LastUpdateTime time.Time
}

WorkflowStatus represents the current state of a workflow execution.

type WorkflowStatusType

type WorkflowStatusType string

WorkflowStatusType represents the state of a workflow.

const (
	// WorkflowStatusPending indicates the workflow is queued.
	WorkflowStatusPending WorkflowStatusType = "pending"

	// WorkflowStatusRunning indicates the workflow is executing.
	WorkflowStatusRunning WorkflowStatusType = "running"

	// WorkflowStatusWaitingForElicitation indicates the workflow is waiting for user input.
	WorkflowStatusWaitingForElicitation WorkflowStatusType = "waiting_for_elicitation"

	// WorkflowStatusCompleted indicates the workflow completed successfully.
	WorkflowStatusCompleted WorkflowStatusType = "completed"

	// WorkflowStatusFailed indicates the workflow failed.
	WorkflowStatusFailed WorkflowStatusType = "failed"

	// WorkflowStatusCancelled indicates the workflow was cancelled.
	WorkflowStatusCancelled WorkflowStatusType = "cancelled"

	// WorkflowStatusTimedOut indicates the workflow timed out.
	WorkflowStatusTimedOut WorkflowStatusType = "timed_out"
)

type WorkflowStep

type WorkflowStep struct {
	// ID uniquely identifies this step within the workflow.
	ID string

	// Type is the step type: "tool", "elicitation", "conditional"
	Type StepType

	// Tool is the tool to call (for tool steps).
	// Format: "toolname" or "backend.toolname"
	Tool string

	// Arguments are the tool arguments with template expansion support.
	// Templates use Go text/template syntax with access to:
	//   - {{.params.name}}: Input parameters
	//   - {{.steps.stepid.output}}: Previous step outputs
	//   - {{.steps.stepid.content}}: Elicitation response data
	//   - {{.steps.stepid.action}}: Elicitation action (accept/decline/cancel)
	Arguments map[string]any

	// Condition is an optional condition for conditional execution.
	// If specified and evaluates to false, the step is skipped.
	// Uses template syntax, must evaluate to boolean.
	Condition string

	// DependsOn lists step IDs that must complete before this step.
	// Enables DAG-based parallel execution.
	DependsOn []string

	// OnError defines error handling for this step.
	OnError *ErrorHandler

	// Elicitation defines elicitation parameters (for elicitation steps).
	Elicitation *ElicitationConfig

	// Timeout is the maximum execution time for this step.
	Timeout time.Duration

	// Metadata stores additional step information.
	Metadata map[string]string
}

WorkflowStep represents a single step in a workflow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL