Documentation
¶
Overview ¶
Package temporal contains helpers for integrating this SDK with go.temporal.io (client, worker, etc.). This file adapts pkg/logger.Logger to go.temporal.io/sdk/log.Logger for client.Options. It is internal to this module; callers who build their own Temporal client should wire sdk/log.Logger themselves (or copy this small bridge).
If a file imports both this package and go.temporal.io/sdk/temporal, use an import alias on one of them (e.g. sdktrace "go.temporal.io/sdk/temporal") — import paths differ, but package names would both default to "temporal".
Index ¶
- Variables
- func ComputeAgentFingerprint(m AgentFingerprintPayload) string
- func NewLogAdapter(l logger.Logger) tlog.Logger
- func ToolNamesFromTools(tools []interfaces.Tool) []string
- type AddConversationMessagesInput
- type AgentEventUpdate
- type AgentFingerprintPayload
- type AgentLLMInput
- type AgentLLMResult
- type AgentLLMStreamInput
- type AgentToolApprovalInput
- type AgentToolAuthorizeInput
- type AgentToolAuthorizeResult
- type AgentToolExecuteInput
- type AgentWorkflowInput
- type Option
- func WithAgentExecution(exec sdkruntime.AgentExecution) Option
- func WithAgentMode(mode string) Option
- func WithAgentSpec(spec sdkruntime.AgentSpec) Option
- func WithDisableLocalWorker(disable bool) Option
- func WithEnableRemoteWorkers(enableRemoteWorkers bool) Option
- func WithInstanceId(instanceId string) Option
- func WithLogger(logger logger.Logger) Option
- func WithMCPFingerprint(fp string) Option
- func WithPolicyFingerprint(fp string) Option
- func WithRemoteWorker(remoteWorker bool) Option
- func WithTemporalClient(client client.Client, taskQueue string) Option
- func WithTemporalConfig(config *TemporalConfig) Option
- type SendAgentEventActivityInput
- type SendAgentEventResult
- type TemporalConfig
- type TemporalRuntime
- func (rt *TemporalRuntime) AddConversationMessagesActivity(ctx context.Context, input AddConversationMessagesInput) error
- func (rt *TemporalRuntime) AgentEventWorkflow(ctx workflow.Context) error
- func (rt *TemporalRuntime) AgentLLMActivity(ctx context.Context, input AgentLLMInput) (*AgentLLMResult, error)
- func (rt *TemporalRuntime) AgentLLMStreamActivity(ctx context.Context, input AgentLLMStreamInput) (*AgentLLMResult, error)
- func (rt *TemporalRuntime) AgentToolApprovalActivity(ctx context.Context, input AgentToolApprovalInput) (types.ApprovalStatus, error)
- func (rt *TemporalRuntime) AgentToolAuthorizeActivity(ctx context.Context, input AgentToolAuthorizeInput) (AgentToolAuthorizeResult, error)
- func (rt *TemporalRuntime) AgentToolExecuteActivity(ctx context.Context, input AgentToolExecuteInput) (string, error)
- func (rt *TemporalRuntime) AgentWorkflow(ctx workflow.Context, input AgentWorkflowInput) (*types.AgentResponse, error)
- func (rt *TemporalRuntime) Approve(ctx context.Context, approvalToken string, status types.ApprovalStatus) error
- func (rt *TemporalRuntime) Close()
- func (rt *TemporalRuntime) EventPublishActivity(ctx context.Context, channel string, event *types.AgentEvent) error
- func (rt *TemporalRuntime) Execute(ctx context.Context, req *runtime.ExecuteRequest) (*types.AgentResponse, error)
- func (rt *TemporalRuntime) ExecuteStream(ctx context.Context, req *runtime.ExecuteRequest) (chan *types.AgentEvent, error)
- func (rt *TemporalRuntime) GetEventBus() eventbus.EventBus
- func (rt *TemporalRuntime) OnApproval(ctx context.Context, approvalToken string, status types.ApprovalStatus) error
- func (rt *TemporalRuntime) SendAgentEventUpdateActivity(ctx context.Context, in SendAgentEventActivityInput) (SendAgentEventResult, error)
- func (rt *TemporalRuntime) SetEventBus(eventbus eventbus.EventBus)
- func (rt *TemporalRuntime) Start(ctx context.Context) error
- func (rt *TemporalRuntime) Stop()
- type TemporalRuntimeConfig
- type ToolCallRequest
Constants ¶
This section is empty.
Variables ¶
var ErrAgentAlreadyRunning = errors.New("agent already has an active run")
ErrAgentAlreadyRunning is returned when Execute, ExecuteStream, or RunAsync is called while a run is already in progress.
var ErrAgentFingerprintMismatch = errors.New("temporal: agent fingerprint mismatch (caller vs worker); redeploy worker or align agent config")
ErrAgentFingerprintMismatch is returned when workflow input fingerprint does not match the worker.
Functions ¶
func ComputeAgentFingerprint ¶
func ComputeAgentFingerprint(m AgentFingerprintPayload) string
ComputeAgentFingerprint returns a stable SHA-256 hex digest of the payload (identity, prompts, tools, sampling, limits, policy, optional MCP wiring). Use the same toolPolicyFingerprint and MCP fingerprint inputs from pkg/agent on both the process that issues runs and the worker process.
func NewLogAdapter ¶
NewLogAdapter returns a Temporal sdk/log.Logger that delegates to the SDK logger.Logger. Calls use context.Background() because Temporal's logger API is not context-aware.
func ToolNamesFromTools ¶
func ToolNamesFromTools(tools []interfaces.Tool) []string
ToolNamesFromTools returns sorted tool names for fingerprinting.
Types ¶
type AddConversationMessagesInput ¶
type AddConversationMessagesInput struct {
ConversationID string `json:"conversation_id,omitempty"`
Messages []interfaces.Message `json:"messages,omitempty"`
AgentFingerprint string `json:"agent_fingerprint,omitempty"`
}
AddConversationMessagesInput is the input to AddConversationMessagesActivity.
type AgentEventUpdate ¶
type AgentEventUpdate struct {
AgentName string `json:"agent_name"`
LocalChannelName string `json:"local_channel_name,omitempty"`
Event *types.AgentEvent `json:"event"`
}
AgentEventUpdate is the payload for agent-event updates when using one event workflow per agent. AgentName is the name of the agent that emitted the event (main agent or a sub-agent). LocalChannelName is the in-process pub/sub channel name (agent_event_<main-workflow-id>) all nodes in the delegation tree publish to.
type AgentFingerprintPayload ¶
type AgentFingerprintPayload struct {
Version int `json:"v"`
Name string `json:"name"`
Description string `json:"description"`
SystemPrompt string `json:"system_prompt"`
ResponseFormat *struct {
Type string `json:"type"`
Name string `json:"name,omitempty"`
Schema map[string]any `json:"schema,omitempty"`
} `json:"response_format,omitempty"`
ToolNames []string `json:"tool_names"`
// PolicyFingerprint is an opaque string from pkg/agent (same on caller and worker Temporal config).
PolicyFingerprint string `json:"policy_fp"`
// MCPFingerprint is the pkg/agent MCP wiring digest over transports (no secrets), timeouts, filters,
// and extra MCP client names. Tool names already appear in ToolNames; this catches same tools
// pointing at a different endpoint or policy. Omitted when empty.
MCPFingerprint string `json:"mcp_fingerprint,omitempty"`
// AgentMode is the execution mode (e.g. interactive vs autonomous); must match pkg/agent WithAgentMode on caller and worker.
AgentMode string `json:"agent_mode"`
Sampling *sdkruntime.LLMSampling `json:"sampling,omitempty"`
SessionSize int `json:"session_size"`
MaxIterations int `json:"max_iterations"`
TimeoutNs int64 `json:"timeout_ns"`
ApprovalTimeoutNs int64 `json:"approval_timeout_ns"`
}
AgentFingerprintPayload is the JSON-serializable snapshot hashed by ComputeAgentFingerprint. Agent callers and Temporal workers must supply the same fields so client and worker digests match.
func BuildAgentFingerprintPayload ¶
func BuildAgentFingerprintPayload( spec sdkruntime.AgentSpec, toolNames []string, policyFingerprint string, sampling *sdkruntime.LLMSampling, sessionSize int, limits sdkruntime.AgentLimits, mcpFingerprint string, agentMode string, ) AgentFingerprintPayload
BuildAgentFingerprintPayload builds a payload from spec and execution fields shared by caller and worker.
type AgentLLMInput ¶
type AgentLLMInput struct {
ConversationID string `json:"conversation_id,omitempty"`
Messages []interfaces.Message `json:"messages,omitempty"`
SkipTools bool `json:"skip_tools,omitempty"`
AgentFingerprint string `json:"agent_fingerprint,omitempty"`
}
AgentLLMInput is the input to AgentLLMActivity. When ConversationID is set, activity fetches messages from store. UserPrompt is passed directly; no message construction in workflow. Messages used only for non-conversation multi-turn.
type AgentLLMResult ¶
type AgentLLMResult struct {
Content string `json:"content"`
ToolCalls []ToolCallRequest `json:"tool_calls"`
Usage *interfaces.LLMUsage `json:"usage,omitempty"`
}
AgentLLMResult is the return value of AgentLLMActivity. Workflow uses it to decide: return content or execute tools.
type AgentLLMStreamInput ¶
type AgentLLMStreamInput struct {
AgentName string `json:"agent_name,omitempty"`
ConversationID string `json:"conversation_id,omitempty"`
Messages []interfaces.Message `json:"messages,omitempty"`
EventWorkflowID string `json:"event_workflow_id,omitempty"`
EventTaskQueue string `json:"event_task_queue,omitempty"`
LocalChannelName string `json:"local_channel_name,omitempty"`
SkipTools bool `json:"skip_tools,omitempty"`
AgentFingerprint string `json:"agent_fingerprint,omitempty"`
}
AgentLLMStreamInput is the input to AgentLLMStreamActivity.
type AgentToolApprovalInput ¶
type AgentToolApprovalInput struct {
ToolName string `json:"tool_name"`
Args map[string]any `json:"args"`
ToolCallID string `json:"tool_call_id"`
EventWorkflowID string `json:"event_workflow_id"`
EventTaskQueue string `json:"event_task_queue,omitempty"`
LocalChannelName string `json:"local_channel_name,omitempty"`
SubAgentName string `json:"sub_agent_name,omitempty"`
AgentFingerprint string `json:"agent_fingerprint,omitempty"`
}
type AgentToolAuthorizeInput ¶ added in v0.1.3
type AgentToolAuthorizeResult ¶ added in v0.1.3
type AgentToolExecuteInput ¶
type AgentToolExecuteInput struct {
ToolName string `json:"tool_name"`
Args map[string]any `json:"args"`
ConversationID string `json:"conversation_id,omitempty"`
Messages []interfaces.Message `json:"messages,omitempty"`
ToolCallID string `json:"tool_call_id,omitempty"`
AgentFingerprint string `json:"agent_fingerprint,omitempty"`
}
AgentToolExecuteInput is the input to AgentToolExecuteActivity.
type AgentWorkflowInput ¶
type AgentWorkflowInput struct {
UserPrompt string `json:"user_prompt,omitempty"`
EventWorkflowID string `json:"event_workflow_id,omitempty"`
EventTaskQueue string `json:"event_task_queue,omitempty"`
LocalChannelName string `json:"local_channel_name,omitempty"`
StreamingEnabled bool `json:"streaming_enabled,omitempty"`
ConversationID string `json:"conversation_id,omitempty"`
AgentFingerprint string `json:"agent_fingerprint,omitempty"`
EventTypes []types.AgentEventType `json:"event_types,omitempty"`
SubAgentDepth int `json:"sub_agent_depth,omitempty"`
SubAgentRoutes map[string]types.SubAgentRoute `json:"sub_agent_routes,omitempty"`
MaxSubAgentDepth int `json:"max_sub_agent_depth,omitempty"`
}
AgentWorkflowInput is the input to AgentWorkflow. EventWorkflowID is set when streaming or approval is used. StreamingEnabled enables partial content streaming (from WithStream). ConversationID is set when conversation is used; workflow fetches messages and writes assistant/tool via activities. SubAgentDepth is 0 for a top-level user run; each child workflow increments it (runtime cap vs maxSubAgentDepth). SubAgentRoutes maps sub-agent tool name -> route; built from WithSubAgents when the run starts. LocalChannelName is the in-process pub/sub channel name used for in-memory event fan-in across the delegation tree. Set once at the top level (agent_event_<main-workflow-id>) and propagated unchanged to all sub-agents. Contrast with EventWorkflowID which is used for out-of-process (remote) routing. EventTaskQueue is the Temporal task queue for AgentEventWorkflow (e.g. main TaskQueue + "-events"); required for UpdateWithStartWorkflow when EventWorkflowID is set. EventTypes is set by the SDK; a single "*" element means emit all event kinds (used for Stream). AgentFingerprint is the SHA-256 hex digest of the worker-local agent config; activities reject on mismatch.
type Option ¶
type Option func(*TemporalRuntimeConfig)
Option configures a TemporalRuntime.
func WithAgentExecution ¶
func WithAgentExecution(exec sdkruntime.AgentExecution) Option
WithAgentExecution sets LLM, tools, session, and limits (same as sdkruntime.ExecuteRequest.AgentExecution).
func WithAgentMode ¶ added in v0.1.3
WithAgentMode sets the agent mode string used with ComputeAgentFingerprint. Must match pkg/agent WithAgentMode for the same agent (caller process and worker process).
func WithAgentSpec ¶
func WithAgentSpec(spec sdkruntime.AgentSpec) Option
WithAgentSpec sets identity and response format (same as sdkruntime.ExecuteRequest.AgentSpec).
func WithDisableLocalWorker ¶ added in v0.1.3
WithDisableLocalWorker mirrors pkg/agent [DisableLocalWorker]. When false, the client embeds a worker and the runtime skips DescribeTaskQueue poller checks before starting workflows.
func WithEnableRemoteWorkers ¶
func WithInstanceId ¶
func WithLogger ¶
func WithMCPFingerprint ¶ added in v0.1.2
WithMCPFingerprint sets the MCP wiring digest used with ComputeAgentFingerprint. Must match pkg/agent's mcpConfigFingerprint for the same WithMCPConfig / WithMCPClients wiring.
func WithPolicyFingerprint ¶
WithPolicyFingerprint sets the opaque policy digest used with ComputeAgentFingerprint. Must match pkg/agent's toolPolicyFingerprint for the same agent options.
func WithRemoteWorker ¶
func WithTemporalClient ¶
WithTemporalClient sets the Temporal client.
func WithTemporalConfig ¶
func WithTemporalConfig(config *TemporalConfig) Option
WithTemporalConfig sets the Temporal config.
type SendAgentEventActivityInput ¶ added in v0.1.1
type SendAgentEventActivityInput struct {
EventWorkflowID string `json:"event_workflow_id,omitempty"`
EventTaskQueue string `json:"event_task_queue,omitempty"`
Update *AgentEventUpdate `json:"update"`
}
SendAgentEventActivityInput is the payload for SendAgentEventUpdateActivity (workflow + activity).
type SendAgentEventResult ¶ added in v0.1.1
type SendAgentEventResult struct {
StreamUnavailable bool `json:"stream_unavailable,omitempty"`
}
SendAgentEventResult is returned by SendAgentEventUpdateActivity. Fatal errors are returned as activity error; StreamUnavailable is a soft failure: workflow sets streamingUnavailable and continues.
type TemporalConfig ¶
type TemporalRuntime ¶
type TemporalRuntime struct {
TemporalRuntimeConfig
// contains filtered or unexported fields
}
func NewTemporalRuntime ¶
func NewTemporalRuntime(opts ...Option) (*TemporalRuntime, error)
func (*TemporalRuntime) AddConversationMessagesActivity ¶
func (rt *TemporalRuntime) AddConversationMessagesActivity(ctx context.Context, input AddConversationMessagesInput) error
AddConversationMessagesActivity adds messages to the conversation memory.
func (*TemporalRuntime) AgentEventWorkflow ¶
func (rt *TemporalRuntime) AgentEventWorkflow(ctx workflow.Context) error
AgentEventWorkflow is one per agent. Receives events and approval requests via workflow updates. Each update includes runID so events are published to per-run channels (agent_event_{runID}, approval_{runID}). Completes only when it receives the "complete" signal (on agent Close).
func (*TemporalRuntime) AgentLLMActivity ¶
func (rt *TemporalRuntime) AgentLLMActivity(ctx context.Context, input AgentLLMInput) (*AgentLLMResult, error)
AgentLLMActivity calls the LLM and returns content plus any tool calls. When input.ConversationID is set, fetches from store and adds assistant message on completion.
func (*TemporalRuntime) AgentLLMStreamActivity ¶
func (rt *TemporalRuntime) AgentLLMStreamActivity(ctx context.Context, input AgentLLMStreamInput) (*AgentLLMResult, error)
AgentLLMStreamActivity streams LLM response tokens and emits content_delta/thinking_delta events. When input.ConversationID is set, fetches messages from conversation and prepends to workflow messages.
func (*TemporalRuntime) AgentToolApprovalActivity ¶
func (rt *TemporalRuntime) AgentToolApprovalActivity(ctx context.Context, input AgentToolApprovalInput) (types.ApprovalStatus, error)
AgentToolApprovalActivity blocks until the driver completes it via CompleteActivity. Sends approval request as AgentEventApproval on event channel (same channel for Run and Stream). When EventWorkflowID is set, UpdateWorkflow uses WorkflowUpdateStageCompleted and updateWorkflowApprovalRPCTimeout so the event handler has returned before ErrResultPending; RPC timeout maps to ApprovalStatusUnavailable.
func (*TemporalRuntime) AgentToolAuthorizeActivity ¶ added in v0.1.3
func (rt *TemporalRuntime) AgentToolAuthorizeActivity(ctx context.Context, input AgentToolAuthorizeInput) (AgentToolAuthorizeResult, error)
AgentToolAuthorizeActivity checks optional programmatic authorization before approval/execute.
func (*TemporalRuntime) AgentToolExecuteActivity ¶
func (rt *TemporalRuntime) AgentToolExecuteActivity(ctx context.Context, input AgentToolExecuteInput) (string, error)
AgentToolExecuteActivity executes a tool by name and adds tool message to conversation when ConversationID is set.
func (*TemporalRuntime) AgentWorkflow ¶
func (rt *TemporalRuntime) AgentWorkflow(ctx workflow.Context, input AgentWorkflowInput) (*types.AgentResponse, error)
AgentWorkflow runs the agent loop: LLM → tool calls → approval/execute → feed results back to LLM → repeat. Stops when LLM returns no tool calls, or max iterations reached. When Input.EventWorkflowID is set, sends agent events and approval requests to the event workflow.
func (*TemporalRuntime) Approve ¶
func (rt *TemporalRuntime) Approve(ctx context.Context, approvalToken string, status types.ApprovalStatus) error
func (*TemporalRuntime) Close ¶
func (rt *TemporalRuntime) Close()
func (*TemporalRuntime) EventPublishActivity ¶
func (rt *TemporalRuntime) EventPublishActivity(ctx context.Context, channel string, event *types.AgentEvent) error
EventPublishActivity publishes an event to the given channel (agent_event_<main-workflow-id>).
func (*TemporalRuntime) Execute ¶
func (rt *TemporalRuntime) Execute(ctx context.Context, req *runtime.ExecuteRequest) (*types.AgentResponse, error)
func (*TemporalRuntime) ExecuteStream ¶
func (rt *TemporalRuntime) ExecuteStream(ctx context.Context, req *runtime.ExecuteRequest) (chan *types.AgentEvent, error)
func (*TemporalRuntime) GetEventBus ¶
func (rt *TemporalRuntime) GetEventBus() eventbus.EventBus
GetEventBus returns the event bus for the runtime.
func (*TemporalRuntime) OnApproval ¶
func (rt *TemporalRuntime) OnApproval(ctx context.Context, approvalToken string, status types.ApprovalStatus) error
OnApproval completes a tool approval when using ExecuteStream. Pass the string from ev.Approval (see the streaming examples) along with the chosen status.
func (*TemporalRuntime) SendAgentEventUpdateActivity ¶
func (rt *TemporalRuntime) SendAgentEventUpdateActivity(ctx context.Context, in SendAgentEventActivityInput) (SendAgentEventResult, error)
SendAgentEventUpdateActivity sends event: via UpdateWithStartWorkflow when eventWorkflowID is set; else in-memory agentChannel. Returns StreamUnavailable without error when delivery fails but the workflow should continue (dead stream / pipeline). Returns a non-nil error for configuration or internal failures (fatal to the workflow).
func (*TemporalRuntime) SetEventBus ¶
func (rt *TemporalRuntime) SetEventBus(eventbus eventbus.EventBus)
SetEventBus replaces the in-process event bus. Sub-agent runtimes are wired to the parent agent's bus so delegation and approval events fan in correctly.
func (*TemporalRuntime) Start ¶
func (rt *TemporalRuntime) Start(ctx context.Context) error
Start starts the worker (blocks until Stop is called).
func (*TemporalRuntime) Stop ¶
func (rt *TemporalRuntime) Stop()
Stop stops the Temporal worker(s). Called when the agent package stops an embedded worker or closes the runtime.
type TemporalRuntimeConfig ¶
type TemporalRuntimeConfig struct {
AgentSpec sdkruntime.AgentSpec
AgentExecution sdkruntime.AgentExecution
PolicyFingerprint string // from pkg/agent toolPolicyFingerprint; must match caller temporal.ComputeAgentFingerprint inputs
MCPFingerprint string // from pkg/agent mcpConfigFingerprint; must match caller temporal.ComputeAgentFingerprint inputs
// AgentMode is the string form of [types.AgentMode] (e.g. "interactive", "autonomous"); must match pkg/agent WithAgentMode.
AgentMode string
// DisableLocalWorker mirrors pkg/agent [DisableLocalWorker]: when false, the client embeds a worker
// so Execute/ExecuteStream skip DescribeTaskQueue poller checks. ([NewAgentWorker] never calls those methods.)
DisableLocalWorker bool
// contains filtered or unexported fields
}
TemporalRuntimeConfig holds connection settings plus the same sdkruntime.AgentSpec / sdkruntime.AgentExecution shape as sdkruntime.ExecuteRequest, so workers and pkg/agent share one layout.
type ToolCallRequest ¶
type ToolCallRequest struct {
ToolCallID string `json:"tool_call_id"` // from LLM; used to match tool results
ToolName string `json:"tool_name"`
Args map[string]any `json:"args"`
NeedsApproval bool `json:"needs_approval"`
}
ToolCallRequest is a tool invocation with approval flag. NeedsApproval is set by AgentLLMActivity.