temporal

package
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2026 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Overview

Package temporal contains helpers for integrating this SDK with go.temporal.io (client, worker, etc.). This file adapts pkg/logger.Logger to go.temporal.io/sdk/log.Logger for client.Options. It is internal to this module; callers who build their own Temporal client should wire sdk/log.Logger themselves (or copy this small bridge).

If a file imports both this package and go.temporal.io/sdk/temporal, use an import alias on one of them (e.g. sdktrace "go.temporal.io/sdk/temporal") — import paths differ, but package names would both default to "temporal".

Index

Constants

This section is empty.

Variables

View Source
var ErrAgentAlreadyRunning = errors.New("agent already has an active run")

ErrAgentAlreadyRunning is returned when Execute, ExecuteStream, or RunAsync is called while a run is already in progress.

View Source
var ErrAgentFingerprintMismatch = errors.New("temporal: agent fingerprint mismatch (caller vs worker); redeploy worker or align agent config")

ErrAgentFingerprintMismatch is returned when workflow input fingerprint does not match the worker.

Functions

func ComputeAgentFingerprint

func ComputeAgentFingerprint(m AgentFingerprintPayload) string

ComputeAgentFingerprint returns a stable SHA-256 hex digest of the payload (identity, prompts, tools, sampling, limits, policy, optional MCP wiring). Use the same toolPolicyFingerprint and MCP fingerprint inputs from pkg/agent on both the process that issues runs and the worker process.

func NewLogAdapter

func NewLogAdapter(l logger.Logger) tlog.Logger

NewLogAdapter returns a Temporal sdk/log.Logger that delegates to the SDK logger.Logger. Calls use context.Background() because Temporal's logger API is not context-aware.

func ToolNamesFromTools

func ToolNamesFromTools(tools []interfaces.Tool) []string

ToolNamesFromTools returns sorted tool names for fingerprinting.

Types

type AddConversationMessagesInput

type AddConversationMessagesInput struct {
	ConversationID   string               `json:"conversation_id,omitempty"`
	Messages         []interfaces.Message `json:"messages,omitempty"`
	AgentFingerprint string               `json:"agent_fingerprint,omitempty"`
}

AddConversationMessagesInput is the input to AddConversationMessagesActivity.

type AgentEventUpdate

type AgentEventUpdate struct {
	AgentName        string          `json:"agent_name"`
	LocalChannelName string          `json:"local_channel_name,omitempty"`
	EventJSON        json.RawMessage `json:"event_json"`
}

AgentEventUpdate is the payload for agent-event updates when using one event workflow per agent. AgentName is the name of the agent that emitted the event (main agent or a sub-agent). LocalChannelName is the in-process pub/sub channel name (agent_event_<main-workflow-id>) all nodes in the delegation tree publish to.

type AgentFingerprintPayload

type AgentFingerprintPayload struct {
	Version int `json:"v"`

	Name           string `json:"name"`
	Description    string `json:"description"`
	SystemPrompt   string `json:"system_prompt"`
	ResponseFormat *struct {
		Type   string         `json:"type"`
		Name   string         `json:"name,omitempty"`
		Schema map[string]any `json:"schema,omitempty"`
	} `json:"response_format,omitempty"`

	ToolNames []string `json:"tool_names"`

	// PolicyFingerprint is an opaque string from pkg/agent (same on caller and worker Temporal config).
	PolicyFingerprint string `json:"policy_fp"`

	// MCPFingerprint is the pkg/agent MCP wiring digest over transports (no secrets), timeouts, filters,
	// and extra MCP client names. Tool names already appear in ToolNames; this catches same tools
	// pointing at a different endpoint or policy. Omitted when empty.
	MCPFingerprint string `json:"mcp_fingerprint,omitempty"`

	// AgentMode is the execution mode (e.g. interactive vs autonomous); must match pkg/agent WithAgentMode on caller and worker.
	AgentMode string `json:"agent_mode"`

	// AgentToolExecutionMode is the tool execution mode (e.g. sequential vs parallel); must match pkg/agent WithAgentToolExecutionMode on caller and worker.
	AgentToolExecutionMode string `json:"agent_tool_execution_mode"`

	Sampling *sdkruntime.LLMSampling `json:"sampling,omitempty"`

	SessionSize int `json:"session_size"`

	MaxIterations     int   `json:"max_iterations"`
	TimeoutNs         int64 `json:"timeout_ns"`
	ApprovalTimeoutNs int64 `json:"approval_timeout_ns"`
}

AgentFingerprintPayload is the JSON-serializable snapshot hashed by ComputeAgentFingerprint. Agent callers and Temporal workers must supply the same fields so client and worker digests match.

func BuildAgentFingerprintPayload

func BuildAgentFingerprintPayload(
	spec sdkruntime.AgentSpec,
	toolNames []string,
	policyFingerprint string,
	sampling *sdkruntime.LLMSampling,
	sessionSize int,
	limits sdkruntime.AgentLimits,
	mcpFingerprint string,
	agentMode string,
	agentToolExecutionMode types.AgentToolExecutionMode,
) AgentFingerprintPayload

BuildAgentFingerprintPayload builds a payload from spec and execution fields shared by caller and worker.

type AgentLLMInput

type AgentLLMInput struct {
	AgentName        string               `json:"agent_name,omitempty"`
	ConversationID   string               `json:"conversation_id,omitempty"`
	Messages         []interfaces.Message `json:"messages,omitempty"`
	SkipTools        bool                 `json:"skip_tools,omitempty"`
	AgentFingerprint string               `json:"agent_fingerprint,omitempty"`
	MessageID        string               `json:"message_id,omitempty"`
	EventWorkflowID  string               `json:"event_workflow_id,omitempty"`
	EventTaskQueue   string               `json:"event_task_queue,omitempty"`
	LocalChannelName string               `json:"local_channel_name,omitempty"`
}

AgentLLMInput is the input to AgentLLMActivity and AgentLLMStreamActivity. When ConversationID is set, the activity loads history from the store. MessageID is the assistant text id for TEXT_MESSAGE_* (and stream ordering with REASONING_*); the workflow sets it each turn.

type AgentLLMResult

type AgentLLMResult struct {
	Content   string               `json:"content"`
	ToolCalls []ToolCallRequest    `json:"tool_calls"`
	Usage     *interfaces.LLMUsage `json:"usage,omitempty"`
}

AgentLLMResult is the return value of AgentLLMActivity. Workflow uses it to decide: return content or execute tools.

type AgentToolApprovalInput

type AgentToolApprovalInput struct {
	AgentName        string         `json:"agent_name"`
	ToolCallID       string         `json:"tool_call_id"`
	ToolName         string         `json:"tool_name"`
	ToolDisplayName  string         `json:"tool_display_name,omitempty"`
	Args             map[string]any `json:"args"`
	EventWorkflowID  string         `json:"event_workflow_id"`
	EventTaskQueue   string         `json:"event_task_queue,omitempty"`
	LocalChannelName string         `json:"local_channel_name,omitempty"`
	SubAgentName     string         `json:"sub_agent_name,omitempty"`
	AgentFingerprint string         `json:"agent_fingerprint,omitempty"`
}

type AgentToolAuthorizeInput added in v0.1.3

type AgentToolAuthorizeInput struct {
	ToolName         string         `json:"tool_name"`
	Args             map[string]any `json:"args"`
	ToolCallID       string         `json:"tool_call_id"`
	AgentFingerprint string         `json:"agent_fingerprint,omitempty"`
}

type AgentToolAuthorizeResult added in v0.1.3

type AgentToolAuthorizeResult struct {
	Allowed bool   `json:"allowed"`
	Reason  string `json:"reason,omitempty"`
}

type AgentToolExecuteInput

type AgentToolExecuteInput struct {
	ToolName         string               `json:"tool_name"`
	Args             map[string]any       `json:"args"`
	ConversationID   string               `json:"conversation_id,omitempty"`
	Messages         []interfaces.Message `json:"messages,omitempty"`
	ToolCallID       string               `json:"tool_call_id,omitempty"`
	AgentFingerprint string               `json:"agent_fingerprint,omitempty"`
}

AgentToolExecuteInput is the input to AgentToolExecuteActivity.

type AgentWorkflowInput

type AgentWorkflowInput struct {
	UserPrompt       string                         `json:"user_prompt,omitempty"`
	EventWorkflowID  string                         `json:"event_workflow_id,omitempty"`
	EventTaskQueue   string                         `json:"event_task_queue,omitempty"`
	LocalChannelName string                         `json:"local_channel_name,omitempty"`
	StreamingEnabled bool                           `json:"streaming_enabled,omitempty"`
	ConversationID   string                         `json:"conversation_id,omitempty"`
	AgentFingerprint string                         `json:"agent_fingerprint,omitempty"`
	EventTypes       []events.AgentEventType        `json:"event_types,omitempty"`
	SubAgentDepth    int                            `json:"sub_agent_depth,omitempty"`
	SubAgentRoutes   map[string]types.SubAgentRoute `json:"sub_agent_routes,omitempty"`
	MaxSubAgentDepth int                            `json:"max_sub_agent_depth,omitempty"`
}

AgentWorkflowInput is the input to AgentWorkflow. EventWorkflowID is set when streaming or approval is used. StreamingEnabled enables partial content streaming (from WithStream). ConversationID is set when conversation is used; workflow fetches messages and writes assistant/tool via activities. SubAgentDepth is 0 for a top-level user run; each child workflow increments it (runtime cap vs maxSubAgentDepth). SubAgentRoutes maps sub-agent tool name -> route; built from WithSubAgents when the run starts. LocalChannelName is the in-process pub/sub channel name used for in-memory event fan-in across the delegation tree. Set once at the top level (agent_event_<main-workflow-id>) and propagated unchanged to all sub-agents. Contrast with EventWorkflowID which is used for out-of-process (remote) routing. EventTaskQueue is the Temporal task queue for AgentEventWorkflow (e.g. main TaskQueue + "-events"); required for UpdateWithStartWorkflow when EventWorkflowID is set. EventTypes is set by the SDK; a single "*" element means emit all event kinds (used for Stream). AgentFingerprint is the SHA-256 hex digest of the worker-local agent config; activities reject on mismatch.

type Option

type Option func(*TemporalRuntimeConfig)

Option configures a TemporalRuntime.

func WithAgentExecution

func WithAgentExecution(exec sdkruntime.AgentExecution) Option

WithAgentExecution sets LLM, tools, session, and limits (same as sdkruntime.ExecuteRequest.AgentExecution).

func WithAgentMode added in v0.1.3

func WithAgentMode(mode string) Option

WithAgentMode sets the agent mode string used with ComputeAgentFingerprint. Must match pkg/agent WithAgentMode for the same agent (caller process and worker process).

func WithAgentSpec

func WithAgentSpec(spec sdkruntime.AgentSpec) Option

WithAgentSpec sets identity and response format (same as sdkruntime.ExecuteRequest.AgentSpec).

func WithAgentToolExecutionMode added in v0.1.7

func WithAgentToolExecutionMode(mode types.AgentToolExecutionMode) Option

WithAgentToolExecutionMode sets the agent tool execution mode string used with ComputeAgentFingerprint. Must match pkg/agent WithAgentToolExecutionMode for the same agent (caller process and worker process).

func WithDisableFingerprintCheck added in v0.1.6

func WithDisableFingerprintCheck(disable bool) Option

WithDisableFingerprintCheck disables activity-time caller-vs-worker fingerprint verification. Break-glass only: use temporarily during rollout incidents; default is strict verification.

func WithDisableLocalWorker added in v0.1.3

func WithDisableLocalWorker(disable bool) Option

WithDisableLocalWorker mirrors pkg/agent [DisableLocalWorker]. When false, the client embeds a worker and the runtime skips DescribeTaskQueue poller checks before starting workflows.

func WithEnableRemoteWorkers

func WithEnableRemoteWorkers(enableRemoteWorkers bool) Option

func WithInstanceId

func WithInstanceId(instanceId string) Option

func WithLogger

func WithLogger(logger logger.Logger) Option

func WithMCPFingerprint added in v0.1.2

func WithMCPFingerprint(fp string) Option

WithMCPFingerprint sets the MCP wiring digest used with ComputeAgentFingerprint. Must match pkg/agent's mcpConfigFingerprint for the same WithMCPConfig / WithMCPClients wiring.

func WithPolicyFingerprint

func WithPolicyFingerprint(fp string) Option

WithPolicyFingerprint sets the opaque policy digest used with ComputeAgentFingerprint. Must match pkg/agent's toolPolicyFingerprint for the same agent options.

func WithRemoteWorker

func WithRemoteWorker(remoteWorker bool) Option

func WithTemporalClient

func WithTemporalClient(client client.Client, taskQueue string) Option

WithTemporalClient sets the Temporal client.

func WithTemporalConfig

func WithTemporalConfig(config *TemporalConfig) Option

WithTemporalConfig sets the Temporal config.

type SendAgentEventActivityInput added in v0.1.1

type SendAgentEventActivityInput struct {
	EventWorkflowID string                `json:"event_workflow_id,omitempty"`
	EventTaskQueue  string                `json:"event_task_queue,omitempty"`
	EventType       events.AgentEventType `json:"event_type"`
	Update          *AgentEventUpdate     `json:"update"`
}

SendAgentEventActivityInput is the payload for SendAgentEventUpdateActivity (workflow + activity).

type SendAgentEventResult added in v0.1.1

type SendAgentEventResult struct {
	// StreamUnavailable is true when delivery failed in a way that likely means the stream is gone.
	StreamUnavailable bool `json:"stream_unavailable,omitempty"`
}

SendAgentEventResult is returned by SendAgentEventUpdateActivity. Fatal errors are returned as activity error; StreamUnavailable is a soft failure: workflow sets streamingUnavailable and continues.

type TemporalConfig

type TemporalConfig struct {
	Host      string
	Port      int
	Namespace string
	TaskQueue string
}

type TemporalRuntime

type TemporalRuntime struct {
	TemporalRuntimeConfig
	// contains filtered or unexported fields
}

func NewTemporalRuntime

func NewTemporalRuntime(opts ...Option) (*TemporalRuntime, error)

func (*TemporalRuntime) AddConversationMessagesActivity

func (rt *TemporalRuntime) AddConversationMessagesActivity(ctx context.Context, input AddConversationMessagesInput) error

AddConversationMessagesActivity adds messages to the conversation memory.

func (*TemporalRuntime) AgentEventWorkflow

func (rt *TemporalRuntime) AgentEventWorkflow(ctx workflow.Context) error

AgentEventWorkflow is one per agent. Receives events and approval requests via workflow updates. Each update includes runID so events are published to per-run channels (agent_event_{runID}, approval_{runID}). Completes only when it receives the "complete" signal (on agent Close).

func (*TemporalRuntime) AgentLLMActivity

func (rt *TemporalRuntime) AgentLLMActivity(ctx context.Context, input AgentLLMInput) (*AgentLLMResult, error)

AgentLLMActivity calls the LLM and returns content plus any tool calls. When input.ConversationID is set, fetches from store and adds assistant message on completion.

func (*TemporalRuntime) AgentLLMStreamActivity

func (rt *TemporalRuntime) AgentLLMStreamActivity(ctx context.Context, input AgentLLMInput) (*AgentLLMResult, error)

AgentLLMStreamActivity streams LLM response tokens. Event order: optional reasoning block (REASONING_*), then TEXT_MESSAGE_START → TEXT_MESSAGE_CONTENT* → TEXT_MESSAGE_END. When input.ConversationID is set, fetches messages from conversation and prepends to workflow messages.

func (*TemporalRuntime) AgentToolApprovalActivity

func (rt *TemporalRuntime) AgentToolApprovalActivity(ctx context.Context, input AgentToolApprovalInput) (types.ApprovalStatus, error)

AgentToolApprovalActivity blocks until the driver completes it via CompleteActivity. Publishes a CUSTOM (tool_approval / delegation) event to the local agent_event channel (Run and Stream). When EventWorkflowID is set, UpdateWorkflow uses WorkflowUpdateStageCompleted and updateWorkflowApprovalRPCTimeout so the event handler has returned before ErrResultPending; RPC timeout maps to ApprovalStatusUnavailable.

func (*TemporalRuntime) AgentToolAuthorizeActivity added in v0.1.3

func (rt *TemporalRuntime) AgentToolAuthorizeActivity(ctx context.Context, input AgentToolAuthorizeInput) (AgentToolAuthorizeResult, error)

AgentToolAuthorizeActivity checks optional programmatic authorization before approval/execute.

func (*TemporalRuntime) AgentToolExecuteActivity

func (rt *TemporalRuntime) AgentToolExecuteActivity(ctx context.Context, input AgentToolExecuteInput) (string, error)

AgentToolExecuteActivity executes a tool by name and adds tool message to conversation when ConversationID is set.

func (*TemporalRuntime) AgentWorkflow

func (rt *TemporalRuntime) AgentWorkflow(ctx workflow.Context, input AgentWorkflowInput) (*types.AgentRunResult, error)

AgentWorkflow runs the agent loop: LLM → tool calls → approval/execute → feed results back to LLM → repeat. Stops when LLM returns no tool calls, or max iterations reached. When Input.EventWorkflowID is set, sends agent events and approval requests to the event workflow.

func (*TemporalRuntime) Approve

func (rt *TemporalRuntime) Approve(ctx context.Context, approvalToken string, status types.ApprovalStatus) error

func (*TemporalRuntime) Close

func (rt *TemporalRuntime) Close()

func (*TemporalRuntime) EventPublishActivity

func (rt *TemporalRuntime) EventPublishActivity(ctx context.Context, channel string, eventJSON json.RawMessage) error

EventPublishActivity publishes an event to the given channel (agent_event_<main-workflow-id>).

func (*TemporalRuntime) Execute

func (*TemporalRuntime) ExecuteStream

func (rt *TemporalRuntime) ExecuteStream(ctx context.Context, req *runtime.ExecuteRequest) (<-chan events.AgentEvent, error)

func (*TemporalRuntime) GetEventBus

func (rt *TemporalRuntime) GetEventBus() eventbus.EventBus

GetEventBus returns the event bus for the runtime.

func (*TemporalRuntime) OnApproval

func (rt *TemporalRuntime) OnApproval(ctx context.Context, approvalToken string, status types.ApprovalStatus) error

OnApproval completes a tool approval when using ExecuteStream. Pass the string from ev.Approval (see the streaming examples) along with the chosen status.

func (*TemporalRuntime) SendAgentEventUpdateActivity

func (rt *TemporalRuntime) SendAgentEventUpdateActivity(ctx context.Context, in SendAgentEventActivityInput) (SendAgentEventResult, error)

SendAgentEventUpdateActivity sends event: via UpdateWithStartWorkflow when eventWorkflowID is set; else in-memory agentChannel. Returns StreamUnavailable without error when delivery fails but the workflow should continue (dead stream / pipeline). Returns a non-nil error for configuration or internal failures (fatal to the workflow).

func (*TemporalRuntime) SetEventBus

func (rt *TemporalRuntime) SetEventBus(eventbus eventbus.EventBus)

SetEventBus replaces the in-process event bus. Sub-agent runtimes are wired to the parent agent's bus so delegation and approval events fan in correctly.

func (*TemporalRuntime) Start

func (rt *TemporalRuntime) Start(ctx context.Context) error

Start starts the worker (blocks until Stop is called).

func (*TemporalRuntime) Stop

func (rt *TemporalRuntime) Stop()

Stop stops the Temporal worker(s). Called when the agent package stops an embedded worker or closes the runtime.

type TemporalRuntimeConfig

type TemporalRuntimeConfig struct {
	AgentSpec         sdkruntime.AgentSpec
	AgentExecution    sdkruntime.AgentExecution
	PolicyFingerprint string // from pkg/agent toolPolicyFingerprint; must match caller temporal.ComputeAgentFingerprint inputs
	MCPFingerprint    string // from pkg/agent mcpConfigFingerprint; must match caller temporal.ComputeAgentFingerprint inputs
	// AgentMode is the string form of [types.AgentMode] (e.g. "interactive", "autonomous"); must match pkg/agent WithAgentMode.
	AgentMode string
	// AgentToolExecutionMode is the [types.AgentToolExecutionMode] (e.g. "sequential", "parallel"); must match pkg/agent WithAgentToolExecutionMode.
	AgentToolExecutionMode types.AgentToolExecutionMode
	// DisableLocalWorker mirrors pkg/agent [DisableLocalWorker]: when false, the client embeds a worker
	// so Execute/ExecuteStream skip DescribeTaskQueue poller checks. ([NewAgentWorker] never calls those methods.)
	DisableLocalWorker bool
	// DisableFingerprintCheck disables caller-vs-worker agent fingerprint verification at activity entry.
	// Break-glass only: keep false in production for rollout/config safety.
	DisableFingerprintCheck bool
	// contains filtered or unexported fields
}

TemporalRuntimeConfig holds connection settings plus the same sdkruntime.AgentSpec / sdkruntime.AgentExecution shape as sdkruntime.ExecuteRequest, so workers and pkg/agent share one layout.

type ToolCallRequest

type ToolCallRequest struct {
	ToolCallID      string         `json:"tool_call_id"` // from LLM; used to match tool results
	ToolName        string         `json:"tool_name"`
	ToolDisplayName string         `json:"tool_display_name,omitempty"`
	Args            map[string]any `json:"args"`
	NeedsApproval   bool           `json:"needs_approval"`
}

ToolCallRequest is a tool invocation with approval flag. NeedsApproval is set by AgentLLMActivity.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL