engine

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2026 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package engine defines workflow engine abstractions for durable agent execution. It provides pluggable interfaces so generated code and the runtime can target Temporal, in-memory, or custom backends without modification.

Core Abstractions

The package defines several key interfaces:

  • Engine: Registers workflows and activities, starts workflow executions. The runtime calls Engine methods during agent registration and run submission.

  • WorkflowContext: Provides deterministic operations inside workflow handlers. Generated workflow code uses this to schedule activities, handle signals, and start child workflows. Implementations must ensure replay-safe behavior.

  • WorkflowHandle: Represents a running workflow. Callers use handles to wait for completion, send signals, or cancel execution.

  • Future[T]: Represents a pending activity result. Enables parallel execution by allowing workflows to launch multiple activities and collect results later, without reflection-based assignment.

  • Receiver[T]: Delivers typed signals to workflows in a deterministic way. Used for pause/resume, clarification answers, and external tool results.

Available Implementations

Two engine implementations ship with loom-mcp:

  • temporal: Production-grade durable execution backed by Temporal. Supports workflow replay, long-running execution, and distributed workers.

  • inmem: In-memory synchronous execution for development and testing. No durability, no workers, runs immediately in the caller's goroutine.

Determinism Requirements

Workflow handlers run in a deterministic environment where the same inputs and history must produce the same outputs. WorkflowContext enforces this by:

  • Providing Now() instead of time.Now() for workflow time
  • Requiring activities for all I/O operations
  • Using replay-safe signal channels

Activities (planner calls, tool execution) are NOT deterministic and can perform arbitrary I/O. The engine records activity inputs/outputs and replays them during workflow recovery.

Usage Pattern

// Create engine (Temporal for production)
eng, _ := temporal.NewWorker(temporal.Options{...})
defer eng.Close()

// Create runtime with engine
rt := runtime.New(runtime.WithEngine(eng))

// Register agents (registers workflows/activities on engine)
chat.RegisterChatAgent(ctx, rt, chat.ChatAgentConfig{...})

// Start runs (submits workflows to engine)
client := chat.NewClient(rt)
out, _ := client.Run(ctx, "session-1", messages)

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrWorkflowNotFound indicates that no workflow execution exists for the given identifier.
	ErrWorkflowNotFound = errors.New("workflow not found")
	// ErrWorkflowCompleted indicates that a workflow exists but no longer accepts signals.
	ErrWorkflowCompleted = errors.New("workflow completed")
)

Functions

func ActivityHeartbeatTimeout

func ActivityHeartbeatTimeout(ctx context.Context) time.Duration

ActivityHeartbeatTimeout returns the effective heartbeat timeout carried on ctx. Zero means the current activity does not use heartbeat-based failure detection.

func IsActivityContext

func IsActivityContext(ctx context.Context) bool

IsActivityContext reports whether ctx is marked as originating from an activity invocation.

func RecordActivityHeartbeat

func RecordActivityHeartbeat(ctx context.Context, details ...any) bool

RecordActivityHeartbeat emits a heartbeat on ctx when an engine-specific recorder is attached. It returns false when ctx is not an activity context or when the engine does not require heartbeats.

func WithActivityContext

func WithActivityContext(ctx context.Context) context.Context

WithActivityContext returns a child context that is marked as an activity invocation context.

func WithActivityHeartbeatRecorder

func WithActivityHeartbeatRecorder(ctx context.Context, recorder ActivityHeartbeatRecorder) context.Context

WithActivityHeartbeatRecorder returns a child context carrying the given heartbeat recorder.

func WithActivityHeartbeatTimeout

func WithActivityHeartbeatTimeout(ctx context.Context, timeout time.Duration) context.Context

WithActivityHeartbeatTimeout returns a child context carrying the effective heartbeat timeout for the current activity attempt. Zero means the engine is not using heartbeat-based liveness detection for this activity.

func WithWorkflowContext

func WithWorkflowContext(ctx context.Context, wf WorkflowContext) context.Context

WithWorkflowContext returns a child context that carries the provided WorkflowContext. Engine adapters should use this when invoking activity handlers so downstream code can retrieve the workflow context if needed.

Types

type ActivityHeartbeatRecorder

type ActivityHeartbeatRecorder interface {
	RecordHeartbeat(details ...any)
}

ActivityHeartbeatRecorder records liveness heartbeats for a running activity. Engine adapters attach implementations to activity contexts when the backend requires heartbeats to deliver cancellation or timeout updates promptly.

type ActivityOptions

type ActivityOptions struct {
	// Queue overrides the default activity queue. If empty, the activity inherits
	// the workflow's task queue.
	Queue string
	// RetryPolicy controls retry behavior for this activity. If zero-valued, the
	// engine uses its default retry policy.
	RetryPolicy RetryPolicy
	// ScheduleToStartTimeout bounds how long the activity may wait in the task
	// queue before a worker starts the attempt. Zero means leave queue-wait
	// unspecified here and let the engine adapter apply its own defaults.
	ScheduleToStartTimeout time.Duration
	// StartToCloseTimeout bounds one activity attempt once a worker has started
	// executing it. This is the primary "healthy attempt" budget for planner and
	// tool work. Zero means use the engine default.
	StartToCloseTimeout time.Duration
	// HeartbeatTimeout bounds the maximum gap between heartbeats emitted by the
	// running activity. Zero disables heartbeat-based liveness detection.
	HeartbeatTimeout time.Duration
}

ActivityOptions configures retry and timeouts for an activity.

type Canceler

type Canceler interface {
	// CancelByID requests cancellation of the workflow identified by
	// workflowID.
	CancelByID(ctx context.Context, workflowID string) error
}

Canceler provides workflow cancellation by workflow ID without requiring in-process workflow handles. Engines that support out-of-process cancellation (for example, Temporal) should implement this so callers can cancel runs across process restarts.

type ChildWorkflowHandle

type ChildWorkflowHandle interface {
	// Get waits for child completion and returns the typed result.
	Get(ctx context.Context) (*api.RunOutput, error)
	// IsReady returns true if the child workflow has completed (success or failure)
	// and Get() will not block.
	IsReady() bool
	// Cancel requests cancellation of the child workflow execution.
	Cancel(ctx context.Context) error
	// RunID returns the engine-assigned run identifier of the child.
	RunID() string
}

ChildWorkflowHandle allows a parent workflow to await/cancel a child workflow.

type ChildWorkflowRequest

type ChildWorkflowRequest struct {
	// ID is the child workflow identifier, unique within the engine scope.
	ID string
	// Workflow is the provider workflow name to execute.
	Workflow string
	// TaskQueue is the queue to schedule the child on.
	TaskQueue string
	// Input is the payload passed to the child workflow handler.
	Input *api.RunInput
	// RunTimeout bounds the total child workflow execution time.
	RunTimeout time.Duration
	// RetryPolicy controls start retries for the child workflow start attempt.
	RetryPolicy RetryPolicy
}

ChildWorkflowRequest describes a child workflow to start from within an existing workflow execution.

type CompletionQuerier

type CompletionQuerier interface {
	// QueryRunCompletion returns the workflow output for successful runs or the
	// terminal error for failed/timed-out/canceled runs, addressed by
	// workflowID.
	QueryRunCompletion(ctx context.Context, workflowID string) (*api.RunOutput, error)
}

CompletionQuerier allows runtimes to recover the same terminal output/error that WorkflowHandle.Wait would have returned, but by run identifier after a restart or detached starter. Callers should query lifecycle first and invoke this only once the engine reports a terminal status, because implementations may otherwise block waiting for completion.

type Engine

type Engine interface {
	// RegisterWorkflow registers a workflow definition with the engine.
	RegisterWorkflow(ctx context.Context, def WorkflowDefinition) error

	// RegisterHookActivity registers a typed activity that publishes workflow-emitted
	// hook events outside of the deterministic workflow thread. The activity accepts
	// *api.HookActivityInput and returns an error.
	RegisterHookActivity(ctx context.Context, name string, opts ActivityOptions, fn func(context.Context, *api.HookActivityInput) error) error

	// RegisterPlannerActivity registers a typed planner activity (PlanStart or
	// PlanResume) that accepts *api.PlanActivityInput and returns *api.PlanActivityOutput.
	RegisterPlannerActivity(ctx context.Context, name string, opts ActivityOptions, fn func(context.Context, *api.PlanActivityInput) (*api.PlanActivityOutput, error)) error

	// RegisterExecuteToolActivity registers a typed execute_tool activity that
	// accepts *api.ToolInput and returns *api.ToolOutput.
	RegisterExecuteToolActivity(ctx context.Context, name string, opts ActivityOptions, fn func(context.Context, *api.ToolInput) (*api.ToolOutput, error)) error

	// StartWorkflow initiates a new workflow execution and returns a handle for
	// interacting with it. The workflow ID in req must be unique for the engine
	// instance. Returns an error if the workflow name is not registered, the ID
	// conflicts with a running workflow, or if scheduling fails.
	StartWorkflow(ctx context.Context, req WorkflowStartRequest) (WorkflowHandle, error)

	// QueryRunStatus returns the current lifecycle status for the workflow
	// identified by workflowID. The engine is the source of truth for durable
	// workflow status. Returns an error if the workflow does not exist or if
	// querying fails.
	QueryRunStatus(ctx context.Context, workflowID string) (RunStatus, error)
}

Engine abstracts workflow registration and execution so adapters (Temporal, in-memory, or custom) can be swapped without touching generated code. Implementations translate these generic types into backend-specific primitives.

type Future

type Future[T any] interface {
	// Get blocks until the activity completes and returns the typed result.
	// Calling Get multiple times on the same Future returns the same value/error.
	Get(ctx context.Context) (T, error)

	// IsReady returns true if the activity has completed (success or failure) and Get()
	// will not block. This allows workflows to poll or implement custom waiting strategies.
	IsReady() bool
}

Future represents a pending activity result that will become available after the activity completes. Futures enable parallel activity execution: workflows can launch multiple tool activities and collect results later using Get(), which blocks until the activity finishes.

Thread-safety: Futures are bound to a single workflow execution and must not be shared across workflow executions. Calling Get() multiple times is safe and returns the same result/error on each call.

Lifecycle: Valid from creation until the workflow completes. Get() must be called before the workflow exits; abandoned futures leak workflow resources in some engines. IsReady() enables polling without blocking.

type HookActivityCall

type HookActivityCall struct {
	// Name identifies the registered hook activity.
	Name string

	// Input is the typed payload passed to the activity handler.
	Input *api.HookActivityInput

	// Options overrides the registered activity defaults for this invocation.
	Options ActivityOptions
}

HookActivityCall describes a single invocation of the runtime hook publishing activity from inside workflow code.

type PlannerActivityCall

type PlannerActivityCall struct {
	// Name identifies the registered planner activity.
	Name string

	// Input is the typed payload passed to the activity handler.
	Input *api.PlanActivityInput

	// Options overrides the registered activity defaults for this invocation.
	Options ActivityOptions
}

PlannerActivityCall describes a single invocation of a PlanStart/PlanResume activity from inside workflow code.

type Receiver

type Receiver[T any] interface {
	// Receive blocks until a signal value is delivered and returns it.
	// Implementations should respect ctx when possible; for engines that do not
	// support context cancellation, Receive may ignore ctx and rely on workflow
	// cancellation semantics instead.
	Receive(ctx context.Context) (T, error)

	// ReceiveWithTimeout blocks until a signal value is delivered or the timeout
	// elapses and returns an error. A non-positive timeout must return
	// context.DeadlineExceeded immediately.
	//
	// This method is required so workflow code can enforce global run deadlines
	// while awaiting external signals (pause/resume, confirmations, clarifications,
	// external tool results) without relying on engine-level timeouts.
	ReceiveWithTimeout(ctx context.Context, timeout time.Duration) (T, error)

	// ReceiveAsync attempts to receive a signal without blocking.
	ReceiveAsync() (T, bool)
}

Receiver exposes typed workflow signal delivery in an engine-agnostic way. Implementations wrap engine-specific channels (Temporal signal channels, in-process Go channels, etc.) and provide blocking and non-blocking receive helpers so workflow code can react to external events deterministically.

type RegistrationSealer

type RegistrationSealer interface {
	// SealRegistration closes the registration phase and makes staged handlers
	// visible to execution. Implementations must be idempotent.
	SealRegistration(ctx context.Context) error
}

RegistrationSealer allows an engine to stage workflow/activity registrations until the runtime has finished building its full local registry. Worker-capable engines use this to avoid polling with partially registered handlers.

type RetryPolicy

type RetryPolicy struct {
	// MaxAttempts caps the total number of retry attempts. Zero means unlimited retries.
	MaxAttempts int
	// InitialInterval is the delay before the first retry. Zero means use engine default.
	InitialInterval time.Duration
	// BackoffCoefficient multiplies the delay after each retry. Values < 1 are treated
	// as 1 (constant backoff). A value of 2 provides exponential backoff.
	BackoffCoefficient float64
}

RetryPolicy defines retry semantics shared by workflows and activities. Zero-valued fields mean the engine uses its defaults.

type RunStatus

type RunStatus string

RunStatus represents the lifecycle state of a workflow execution.

const (
	// RunStatusPending indicates the workflow has been accepted but not started yet.
	RunStatusPending RunStatus = "pending"
	// RunStatusRunning indicates the workflow is actively executing.
	RunStatusRunning RunStatus = "running"
	// RunStatusCompleted indicates the workflow finished successfully.
	RunStatusCompleted RunStatus = "completed"
	// RunStatusTimedOut indicates the workflow exceeded its run deadline.
	RunStatusTimedOut RunStatus = "timed_out"
	// RunStatusFailed indicates the workflow failed permanently.
	RunStatusFailed RunStatus = "failed"
	// RunStatusCanceled indicates the workflow was canceled externally.
	RunStatusCanceled RunStatus = "canceled"
	// RunStatusPaused indicates execution is paused awaiting external intervention.
	RunStatusPaused RunStatus = "paused"
)

type Signaler

type Signaler interface {
	// SignalByID sends a signal to the given workflow identified by workflowID
	// and optional runID. The payload is engine-specific and must be
	// serializable by the engine client.
	SignalByID(ctx context.Context, workflowID, runID, name string, payload any) error
}

Signaler provides direct signaling by workflow ID/run ID without relying on in-process workflow handles. Engines that support out-of-process signaling (e.g., Temporal) should implement this interface so the runtime can deliver Provide*/Pause/Resume signals across process restarts.

type ToolActivityCall

type ToolActivityCall struct {
	// Name identifies the registered execute_tool activity.
	Name string

	// Input is the typed payload passed to the activity handler.
	Input *api.ToolInput

	// Options overrides the registered activity defaults for this invocation.
	Options ActivityOptions
}

ToolActivityCall describes a single invocation of a tool execution activity from inside workflow code.

type WorkflowContext

type WorkflowContext interface {
	// Context returns the Go context for the workflow. In deterministic engines
	// (like Temporal), this is a special replay-aware context. Use this for activity
	// execution and cancellation propagation.
	Context() context.Context
	// SetQueryHandler registers a read-only query handler that can be invoked by
	// external clients to retrieve workflow state. Handlers must be deterministic
	// and side-effect free. Engines that do not support queries may implement
	// this as a no-op.
	SetQueryHandler(name string, handler any) error

	// WorkflowID returns the unique identifier for this workflow execution.
	WorkflowID() string

	// RunID returns the engine-assigned run identifier, used for observability
	// and run-level correlation.
	RunID() string

	// PublishHook schedules the runtime hook activity and waits for completion.
	// Implementations must run hook publishing outside of the deterministic workflow
	// thread (e.g., via activities in Temporal) so subscribers can perform I/O.
	PublishHook(ctx context.Context, call HookActivityCall) error

	// ExecutePlannerActivity schedules a planner activity (PlanStart/PlanResume)
	// and blocks until it completes. Planner activities are executed outside the
	// deterministic workflow thread and may perform I/O.
	ExecutePlannerActivity(ctx context.Context, call PlannerActivityCall) (*api.PlanActivityOutput, error)

	// ExecuteToolActivity schedules a tool execution activity and blocks until it
	// completes. This is useful for sequential execution (finalizers, single tools).
	ExecuteToolActivity(ctx context.Context, call ToolActivityCall) (*api.ToolOutput, error)

	// ExecuteToolActivityAsync schedules a tool execution activity and returns a Future
	// so workflows can run multiple tools concurrently and collect results later.
	ExecuteToolActivityAsync(ctx context.Context, call ToolActivityCall) (Future[*api.ToolOutput], error)

	// PauseRequests returns a typed receiver for pause signals.
	PauseRequests() Receiver[*api.PauseRequest]

	// ResumeRequests returns a typed receiver for resume signals.
	ResumeRequests() Receiver[*api.ResumeRequest]

	// ClarificationAnswers returns a typed receiver for clarification answers.
	ClarificationAnswers() Receiver[*api.ClarificationAnswer]

	// ExternalToolResults returns a typed receiver for external tool results.
	ExternalToolResults() Receiver[*api.ToolResultsSet]

	// ConfirmationDecisions returns a typed receiver for tool confirmation decisions.
	ConfirmationDecisions() Receiver[*api.ConfirmationDecision]

	// Now returns the current workflow time in a deterministic manner. Implementations
	// must return a time source that is replay-safe (e.g., Temporal's workflow.Now).
	Now() time.Time

	// NewTimer returns a Future that becomes ready after the given duration elapses
	// in workflow time. This is the engine-agnostic primitive for waking up on time
	// without polling.
	//
	// Implementations must schedule a deterministic timer (e.g., Temporal's
	// workflow.NewTimer). A non-positive duration should produce a Future that is
	// already ready.
	NewTimer(ctx context.Context, d time.Duration) (Future[time.Time], error)

	// Await blocks until condition returns true, or ctx is done.
	//
	// Condition must be deterministic and side-effect free. A typical use is to
	// wait on a set of Futures using IsReady() without draining them in a fixed
	// order (e.g., "wait until any tool future completes").
	Await(ctx context.Context, condition func() bool) error

	// StartChildWorkflow starts a child workflow execution and returns a handle
	// to await its completion or cancel it. Implementations should honor the
	// provided workflow name, task queue and timeouts without requiring local
	// registration lookups in the parent process.
	StartChildWorkflow(ctx context.Context, req ChildWorkflowRequest) (ChildWorkflowHandle, error)

	// Detached returns a derived WorkflowContext whose cancellation is disconnected
	// from the parent workflow scope.
	//
	// This is intended for cleanup/terminal work (e.g., emitting RunCompleted
	// hooks) that should still be attempted even when the main workflow context is
	// canceled.
	Detached() WorkflowContext

	// WithCancel returns a derived WorkflowContext whose cancellation can be
	// triggered independently of the parent workflow scope. This is used to
	// cooperatively cancel in-flight activities/child workflows when the runtime
	// needs to finalize (e.g., time budget reached).
	//
	// In deterministic engines, this must map to a workflow-level cancel scope
	// (e.g., Temporal's workflow.WithCancel).
	WithCancel() (WorkflowContext, func())
}

WorkflowContext exposes engine operations to workflow handlers within the deterministic execution environment of a workflow. It wraps engine-specific contexts (Temporal workflow.Context, in-memory contexts, etc.) and provides a uniform API for activity execution, signal handling, and observability.

Implementations must ensure deterministic replay: operations that interact with the workflow engine (planner/tool activities and signal receivers) must produce deterministic results when replayed. Direct I/O, random number generation, or system time access within workflows violates determinism and causes workflow failures.

Thread-safety: WorkflowContext is bound to a single workflow execution and must not be shared across goroutines. Activity and signal operations are serialized by the workflow engine.

Lifecycle: Created by the engine when a workflow starts and remains valid until the workflow completes or fails. Do not cache WorkflowContext outside the workflow function scope.

func WorkflowContextFromContext

func WorkflowContextFromContext(ctx context.Context) WorkflowContext

WorkflowContextFromContext extracts a WorkflowContext from ctx if present. Returns nil if the context does not carry a workflow context. Engine adapters are responsible for attaching the workflow context via WithWorkflowContext.

type WorkflowDefinition

type WorkflowDefinition struct {
	// Name is the logical identifier registered with the engine (e.g., "AgentWorkflow").
	Name string
	// TaskQueue is the default queue used when starting new workflows. Workers
	// subscribe to this queue to receive workflow tasks.
	TaskQueue string
	// Handler is the workflow function invoked by the engine when the workflow executes.
	Handler WorkflowFunc
}

WorkflowDefinition binds a workflow handler to a logical name and default queue.

type WorkflowFunc

type WorkflowFunc func(ctx WorkflowContext, input *api.RunInput) (*api.RunOutput, error)

WorkflowFunc is the generated workflow entry point. It receives a WorkflowContext and a typed RunInput, returning a typed RunOutput. Implementations must be deterministic with respect to activity results.

type WorkflowHandle

type WorkflowHandle interface {
	// Wait blocks until the workflow completes and returns the typed result.
	// Returns an error if the workflow fails or is cancelled.
	Wait(ctx context.Context) (*api.RunOutput, error)

	// Signal sends an asynchronous message to the workflow. The workflow can listen
	// for signals using engine-specific APIs. Returns an error if the signal cannot
	// be delivered (e.g., workflow already completed).
	Signal(ctx context.Context, name string, payload any) error

	// Cancel requests cancellation of the workflow. The workflow's context will be
	// cancelled, and in-flight activities may be cancelled depending on the engine.
	// Returns an error if cancellation fails.
	Cancel(ctx context.Context) error
}

WorkflowHandle allows callers to interact with a running workflow. Returned by Engine.StartWorkflow, it provides methods to wait for completion, send signals, or cancel execution.

type WorkflowStartRequest

type WorkflowStartRequest struct {
	// ID is the workflow identifier, which must be unique within the engine scope.
	// Typically derived from the agent ID and a UUID.
	ID string
	// Workflow names the registered workflow definition to execute. Engines that
	// support multiple workflows (one per agent) require this field.
	Workflow string
	// TaskQueue selects the queue to schedule the workflow on. Workers listening
	// on this queue will pick up the workflow.
	TaskQueue string
	// Input is the typed payload passed to the workflow handler.
	Input *api.RunInput
	// RunTimeout bounds the total workflow execution time at the engine level.
	// Zero means use the engine default (if any). Engines may map this to their
	// native execution timeout/TTL (Temporal: WorkflowRunTimeout/ExecutionTimeout).
	RunTimeout time.Duration
	// Memo stores small diagnostic payloads alongside the workflow execution.
	// Engines like Temporal persist these for queries/visibility. Nil means no memo.
	Memo map[string]any
	// SearchAttributes captures indexed metadata used for visibility queries.
	// Nil means no attributes are set.
	SearchAttributes map[string]any
	// RetryPolicy controls automatic restarts of the workflow start attempt if
	// scheduling fails. Not to be confused with activity retries.
	RetryPolicy RetryPolicy
}

WorkflowStartRequest describes how to launch a workflow execution. Generated code constructs these when agents are invoked.

Directories

Path Synopsis
Package inmem provides an in-memory workflow engine implementation for tests and local development.
Package inmem provides an in-memory workflow engine implementation for tests and local development.
Package temporal implements the loom-mcp workflow engine adapter backed by Temporal (https://temporal.io).
Package temporal implements the loom-mcp workflow engine adapter backed by Temporal (https://temporal.io).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL