temporal

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2026 License: MIT Imports: 28 Imported by: 0

Documentation

Overview

Package temporal implements the loom-mcp workflow engine adapter backed by Temporal (https://temporal.io). It satisfies the generic engine.Engine interface, allowing generated code and the runtime to orchestrate durable workflows without importing the Temporal SDK directly.

Why Temporal?

Temporal provides durable execution for long-running agent workflows. When an agent makes multiple tool calls, awaits human input, or runs for extended periods, Temporal ensures the workflow state survives process restarts, network failures, and crashes. The runtime replays the workflow from event history, producing deterministic execution.

Constructing an Engine

Worker processes use NewWorker to create an engine with Temporal client and worker options:

eng, err := temporal.NewWorker(temporal.Options{
    ClientOptions: &client.Options{
        HostPort:  "temporal:7233",
        Namespace: "default",
        // Required: enforce loom-mcp's workflow boundary contract.
        // Tool results/artifacts cross boundaries as canonical JSON bytes (api.ToolEvent/api.ToolArtifact),
        // and planner.ToolResult is rejected if it ever tries to cross a Temporal boundary.
        // Pass the generated tool specs aggregate for the agent(s) hosted by this runtime.
        // Example: specs "<module>/gen/<service>/agents/<agent>/specs"
        // DataConverter: temporal.NewAgentDataConverter(specs.Spec),
    },
    WorkerOptions: temporal.WorkerOptions{
        TaskQueue:              "orchestrator.chat",
        MaxConcurrentActivities: 10,
    },
})
if err != nil {
    log.Fatal(err)
}
rt := runtime.New(runtime.WithEngine(eng))
// Register toolsets first, then agents.
if err := rt.Seal(context.Background()); err != nil {
    log.Fatal(err)
}
defer eng.Close()

Client-only processes use NewClient and do not register local workflows or activities:

eng, err := temporal.NewClient(temporal.Options{
    ClientOptions: &client.Options{
        HostPort:  "temporal:7233",
        Namespace: "default",
    },
})
if err != nil {
    log.Fatal(err)
}

Worker vs Client Mode

Worker mode polls task queues and executes workflows locally. Client mode submits workflows without local execution.

Registration sealing is part of the worker-mode contract: the runtime must seal registration only after all toolsets and agents have been registered so polling begins from a coherent local registry.

Workflow Determinism

Temporal workflows must be deterministic: given the same inputs and event history, they must produce the same outputs. This package provides a WorkflowContext that exposes only deterministic operations:

  • Now() returns workflow time (not wall clock)
  • PublishHook schedules hook publishing outside the workflow thread
  • ExecutePlannerActivity runs planner activities
  • ExecuteToolActivity/ExecuteToolActivityAsync run tool activities
  • PauseRequests/ResumeRequests/... return typed signal receivers
  • StartChildWorkflow starts nested workflows

Planners and tool executors run inside activities, which are not constrained by determinism. The workflow handler (generated code) coordinates activities and processes their results deterministically.

OpenTelemetry Integration

The engine emits traces using a "trace domains" contract:

  • Synchronous request handling (HTTP/gRPC) stays within a single trace tree.
  • Durable scheduling (Temporal) creates a new trace tree per activity execution and links it back to the initiating request trace via OTel links.

This avoids long-lived traces that fragment in collectors/sampling pipelines while preserving navigability across domains.

Query Handlers

Workflows can expose query handlers for external introspection. The runtime uses queries to retrieve run status and transcript state without blocking workflow execution.

Package temporal implements the engine.Engine adapter backed by Temporal. It registers workflows and activities, manages per-queue workers, starts executions, and exposes workflow handles for waiting, signaling, and cancellation. The adapter wires OpenTelemetry tracing/metrics and keeps Temporal-specific worker lifecycle inside this package.

Package temporal isolates workflow-handle and signal/cancel helpers so the engine's main file can focus on registration and workflow start semantics.

Package temporal keeps OpenTelemetry wiring separate from the engine's core registration/start path so the adapter can own one instrumentation contract without burying it in worker and workflow plumbing.

Package temporal keeps read-only workflow query helpers separate from the engine's write path. These methods translate Temporal-specific visibility and completion state into the smaller engine contract consumed by the runtime.

Package temporal keeps search-attribute typing at the adapter boundary. The shared runtime contract stays generic (`map[string]any`), and only this file decides how those values map onto Temporal visibility types.

Package temporal keeps worker lifecycle isolated from the engine's public API. Worker-mode engines stage queue handlers during registration and start polling only after the runtime seals registration.

Package temporal contains the loom-mcp Temporal engine adapter.

This file defines the Temporal-backed implementation of engine.WorkflowContext. The runtime uses it to: - execute typed planner/tool/hook activities with engine-owned defaults, - access deterministic time/timers and workflow cancellation, - receive external signals in a replay-safe way, - start child workflows by explicit name and queue.

Contract:

  • Activity option defaults are resolved by name and merged with per-call overrides.
  • Temporal cancellation errors are normalized to context.Canceled for runtime-wide classification that does not depend on Temporal types.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewAgentDataConverter

func NewAgentDataConverter(spec func(aitools.Ident) (*aitools.ToolSpec, bool)) converter.DataConverter

NewAgentDataConverter returns a Temporal data converter that enforces loom-mcp workflow boundary contracts.

Temporal's default JSON payload converter decodes `any` fields as JSON-shaped values (map[string]any, []any, float64, ...). loom-mcp forbids `any` across workflow/activity/signal boundaries: it must be represented as canonical JSON bytes (json.RawMessage) and decoded back into typed Go values using the tool's generated codecs at the execution boundary (activities), not inside workflow serialization.

This converter:

  • Provides stable encoding/decoding for loom-mcp API envelopes.
  • Fails fast if planner.ToolResult crosses a Temporal boundary (use api.ToolEvent instead).

spec is accepted for API compatibility; the current boundary-safe envelopes carry JSON bytes directly and do not require spec lookup during conversion.

func NewWorkflowContext

func NewWorkflowContext(e *Engine, ctx workflow.Context) engine.WorkflowContext

NewWorkflowContext adapts a Temporal workflow.Context into the engine.WorkflowContext used by the loom-mcp runtime.

This is intended for workflows that run in the same Temporal worker as the loom-mcp engine but are not started through it, and still need to call runtime helpers (for example ExecuteAgentChildWithRoute).

The returned context uses engine defaults (queue, timeouts, retry) when invoking typed planner/tool/hook activities.

Types

type ActivityDefaults

type ActivityDefaults struct {
	// Hook configures workflow hook publishing activities.
	Hook ActivityTimeoutDefaults
	// Planner configures PlanStart and PlanResume activities.
	Planner ActivityTimeoutDefaults
	// Tool configures ExecuteTool activities.
	Tool ActivityTimeoutDefaults
}

ActivityDefaults groups Temporal-specific defaults for each registered activity class. Planner defaults apply to both PlanStart and PlanResume activities because both represent planner attempts from the runtime's point of view.

type ActivityTimeoutDefaults

type ActivityTimeoutDefaults struct {
	// QueueWaitTimeout bounds how long a scheduled activity may wait for a worker
	// before Temporal times out the task.
	QueueWaitTimeout time.Duration
	// LivenessTimeout bounds the maximum gap between runtime-emitted heartbeats
	// before Temporal concludes the worker attempt is no longer healthy.
	LivenessTimeout time.Duration
}

ActivityTimeoutDefaults configures the Temporal-only mechanics that sit around a runtime-owned activity attempt budget.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine implements engine.Engine using Temporal as the durable execution backend. It manages workflow/activity registration, per-queue worker lifecycle, and workflow execution handles. Worker-mode engines stage registrations until the runtime seals registration, at which point polling begins with a fully configured runtime registry.

Thread-safety: All methods are safe for concurrent use. Internal state is protected by mutexes.

Lifecycle:

  • Construct via NewWorker() for worker processes or NewClient() for client-only processes.
  • Register workflows/activities only on worker engines.
  • Call engine.SealRegistration via runtime.Seal() once all local registrations are complete so polling begins from a coherent registry.
  • Call Close() to gracefully stop workers and release the Temporal client when owned here.

func NewClient

func NewClient(opts Options) (*Engine, error)

NewClient constructs a Temporal engine for client-only processes. Client-mode engines can start workflows, query status, and signal runs, but they reject workflow/activity registration because they do not own workers.

func NewWorker

func NewWorker(opts Options) (*Engine, error)

NewWorker constructs a Temporal engine for worker processes. Worker-mode engines accept workflow/activity registration and begin polling only after registration has been sealed.

func (*Engine) CancelByID

func (e *Engine) CancelByID(ctx context.Context, workflowID string) error

CancelByID requests cancellation of a workflow by its durable workflow ID.

func (*Engine) Close

func (e *Engine) Close() error

Close gracefully shuts down the Temporal client if the engine created it (via ClientOptions). If a pre-configured Client was provided to New(), Close does nothing, leaving client lifecycle management to the caller.

Returns nil (error signature maintained for interface compatibility).

Thread-safe: Safe to call concurrently, but typically called once during shutdown.

func (*Engine) QueryRunCompletion

func (e *Engine) QueryRunCompletion(ctx context.Context, workflowID string) (*api.RunOutput, error)

QueryRunCompletion returns the terminal output/error for a workflow by workflow identifier so restart-time repair can preserve the original failure.

func (*Engine) QueryRunStatus

func (e *Engine) QueryRunStatus(ctx context.Context, workflowID string) (engine.RunStatus, error)

QueryRunStatus returns the current lifecycle status for a workflow execution by querying Temporal. The workflowID parameter is the Temporal WorkflowID and this queries the latest run for that durable workflow.

func (*Engine) QueryWorkflow

func (e *Engine) QueryWorkflow(ctx context.Context, workflowID, queryType string, args ...any) (converter.EncodedValue, error)

QueryWorkflow exposes Temporal query execution through the engine's durable workflow identity. It exists only for read-only integrations, such as transcript rehydration, that need a narrow query capability instead of the full Temporal client surface.

func (*Engine) RegisterExecuteToolActivity

func (e *Engine) RegisterExecuteToolActivity(_ context.Context, name string, opts engine.ActivityOptions, fn func(context.Context, *api.ToolInput) (*api.ToolOutput, error)) error

RegisterExecuteToolActivity registers a typed execute_tool activity with the Temporal engine. This method binds a Go function that accepts *api.ToolInput and returns *api.ToolOutput to a logical activity name for use within agent workflows. The activity is registered on the specified task queue (opts.Queue), or falls back to the engine's default queue if unspecified. Registered activities are accessible from workflows via ExecuteActivity using the provided name. Returns an error if the activity name is empty or registration fails due to worker configuration.

Thread-safe: Safe to call concurrently with other Register* methods.

func (*Engine) RegisterHookActivity

func (e *Engine) RegisterHookActivity(_ context.Context, name string, opts engine.ActivityOptions, fn func(context.Context, *api.HookActivityInput) error) error

RegisterHookActivity registers a typed hook activity with the Temporal engine. Hook activities publish workflow-emitted hook events outside of deterministic workflow code. The activity accepts *api.HookActivityInput and returns an error.

func (*Engine) RegisterPlannerActivity

func (e *Engine) RegisterPlannerActivity(_ context.Context, name string, opts engine.ActivityOptions, fn func(context.Context, *api.PlanActivityInput) (*api.PlanActivityOutput, error)) error

RegisterPlannerActivity registers a typed planner activity (PlanStart/PlanResume) with the Temporal engine. It binds a Go function that accepts *api.PlanActivityInput and returns *api.PlanActivityOutput to a logical activity name for use in agent workflows. The activity is registered on the specified task queue (opts.Queue), falling back to the engine's default queue if unspecified. Registered activities can be invoked from workflows via ExecuteActivity using the provided name.

Returns an error if the activity name is empty or if registration fails due to worker configuration.

Thread-safe: Safe to call concurrently with other Register* methods.

func (*Engine) RegisterWorkflow

func (e *Engine) RegisterWorkflow(_ context.Context, def engine.WorkflowDefinition) error

RegisterWorkflow registers a workflow definition with the Temporal worker for the specified task queue. The workflow handler is wrapped to provide the engine's WorkflowContext abstraction and lifecycle management (context creation/cleanup).

The workflow's TaskQueue determines which worker handles executions. If empty, the engine's default queue is used. A worker for the queue is created if needed.

Returns an error if the workflow name is empty, already registered, or if worker creation fails. Registration must complete before calling StartWorkflow.

Thread-safe: Safe to call concurrently with other Register* methods.

func (*Engine) SealRegistration

func (e *Engine) SealRegistration(context.Context) error

SealRegistration closes the registration phase for worker-mode engines and starts polling for every queue registered so far. Client-mode engines do not stage registrations, so sealing is a no-op.

func (*Engine) SignalByID

func (e *Engine) SignalByID(ctx context.Context, workflowID, runID, name string, payload any) error

SignalByID sends a signal to a workflow by workflow ID and optional run ID.

func (*Engine) StartWorkflow

StartWorkflow launches a new workflow execution on Temporal using the specified workflow definition and input. It constructs Temporal-specific start options from the request (ID, queue, retry policy) and executes the workflow asynchronously.

The workflow's task queue is resolved in order: req.TaskQueue → def.TaskQueue → engine.defaultQueue. A base context is stored for activity execution correlation.

Returns a WorkflowHandle for waiting, signaling, or cancelling the execution. Returns an error if the workflow name is not registered, the ID conflicts with an existing workflow, or if Temporal client execution fails.

Thread-safe: Safe to call concurrently.

type InstrumentationOptions

type InstrumentationOptions struct {
	// DisableTracing skips installing the OTEL tracing interceptor on the client
	// and workers. When false (default), distributed traces are automatically emitted
	// for workflow/activity executions.
	DisableTracing bool

	// DisableMetrics skips installing the OTEL metrics handler on the client and
	// workers. When false (default), workflow/activity metrics (counts, latencies,
	// failures) are automatically emitted.
	DisableMetrics bool

	// TracerOptions is retained for source compatibility but is ignored by the
	// engine's trace-domain implementation. Traces are emitted by loom-mcp's own
	// activity interceptors (new-root spans + OTel links).
	TracerOptions temporalotel.TracerOptions

	// MetricsOptions customize the OTEL metrics handler (metric names, labels, etc.).
	// Only used when DisableMetrics is false. Refer to Temporal SDK OTEL docs.
	MetricsOptions temporalotel.MetricsHandlerOptions
}

InstrumentationOptions configures how the engine wires OpenTelemetry (OTEL) tracing and metrics into the Temporal client and workers. By default, both tracing and metrics are enabled automatically using OTEL interceptors provided by the Temporal SDK.

Set DisableTracing or DisableMetrics to opt out of automatic instrumentation. Use TracerOptions and MetricsOptions to customize the OTEL interceptor behavior (e.g., span attributes, metric namespaces, sampling). Refer to Temporal's OTEL contrib documentation for available customization options.

type Options

type Options struct {
	// Client is an optional pre-configured Temporal client. If nil, the adapter
	// creates a lazy client using ClientOptions, allowing automatic OTEL interceptor
	// installation. Provide a pre-configured client when you need custom interceptors
	// or connection pooling.
	Client client.Client

	// ClientOptions describe how to construct the Temporal client when Client is nil.
	// Required when Client is nil. Only connection-related fields (HostPort, Namespace,
	// etc.) need to be set; OTEL interceptors are configured automatically.
	ClientOptions *client.Options

	// WorkerOptions configures worker defaults for task queue, concurrency, and
	// identity. NewWorker requires TaskQueue to be set and creates one worker per
	// unique task queue. NewClient ignores this field.
	WorkerOptions WorkerOptions

	// ActivityDefaults configures Temporal-owned execution mechanics for each
	// activity class. These defaults apply only when the runtime registration did
	// not already specify the corresponding engine.ActivityOptions field, so
	// semantic attempt budgets remain owned by the runtime while queue-wait and
	// liveness stay adapter-specific.
	ActivityDefaults ActivityDefaults

	// Instrumentation toggles OTEL tracing and metrics for the Temporal client and workers.
	// Tracing and metrics are enabled by default. Set DisableTracing or DisableMetrics to
	// opt out. Customize interceptor behavior via TracerOptions and MetricsOptions.
	Instrumentation InstrumentationOptions

	// Logger emits workflow and worker logs. If nil, a noop logger is used (no output).
	// Provide a logger to observe workflow execution, worker health, and activity progress.
	Logger telemetry.Logger

	// Metrics records workflow-level metrics (execution counts, latencies, failures).
	// If nil, a noop metrics recorder is used. Provide an implementation to emit metrics
	// to your observability stack.
	Metrics telemetry.Metrics

	// Tracer creates workflow-level spans for distributed tracing. If nil, a noop tracer
	// is used. Provide an implementation to emit traces to your observability backend.
	Tracer telemetry.Tracer
}

Options configures the Temporal engine adapter. Either a pre-configured Client or ClientOptions must be provided. The adapter automatically wires OTEL instrumentation.

Use NewWorker when the process will register workflows/activities and poll task queues locally. Use NewClient when the process only needs Temporal client capabilities such as starting workflows, querying status, or signaling runs.

type WorkerOptions

type WorkerOptions struct {
	// TaskQueue is the default queue name used when workflow/activity definitions
	// omit a queue. Required - at least one default queue must be configured.
	TaskQueue string

	// Options are passed directly to Temporal's worker.New constructor for controlling
	// worker behavior: concurrency limits, worker identity, custom interceptors, etc.
	// Refer to Temporal SDK documentation for available options.
	Options worker.Options
}

WorkerOptions configures the shared worker settings applied to all task queues managed by the engine. When workflows or activities target different queues, the engine creates one worker per unique queue, each using these shared settings.

TaskQueue is required and defines the default queue used when workflow/activity definitions omit a queue specification. The Options field provides fine-grained control over worker behavior (concurrency, identity, interceptors) and is forwarded directly to Temporal's worker.New constructor.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL