Documentation
¶
Overview ¶
Package executor provides the core execution engine for AI agent operations, implementing a robust system for running commands with support for streaming, tool calls, and asynchronous operations through a Future/Promise pattern.
Design decisions:
- Command pattern: Encapsulates execution parameters in RunCommand struct
- Future/Promise: Async operations with type-safe result handling
- Structured output: JSON Schema validation for responses
- Context awareness: All operations respect context cancellation
- Thread safety: Concurrent execution support with proper synchronization
- Flexible unmarshaling: Support for different response types (JSON, string, gjson)
Key components:
Executor: Interface defining the core execution contract ├── Run: Executes agent commands with streaming support └── handleToolCalls: Manages tool invocations during execution
RunCommand: Configuration for execution ├── Agent: The AI agent to execute ├── Thread: Memory aggregator for context ├── Stream: Enable/disable streaming mode └── Hook: Event handler for execution lifecycle
Future/Promise pattern: ├── CompletableFuture: Combined interface for async operations ├── Promise: Write interface for results └── Future: Read interface for retrieving results
Example usage:
// Create and configure a run command
cmd, err := NewRunCommand(agent, thread, hook)
if err != nil {
return err
}
cmd = cmd.WithStream(true).
WithMaxTurns(5).
WithContextVariables(vars)
// Create a future for the result
future := NewFuture(DefaultUnmarshal[MyResponse]())
// Execute the command
if err := executor.Run(ctx, cmd, future); err != nil {
return err
}
// Get the result (blocks until complete)
result, err := future.Get()
The package is designed to be internal, providing the execution engine while keeping implementation details private. It handles:
- Command execution lifecycle
- Asynchronous operation management
- Tool call coordination
- Response validation and unmarshaling
- Event distribution through hooks
- Context and cancellation management
Index ¶
- func DefaultUnmarshal[T any]() func([]byte) (T, error)
- func NewTemporalAgentWorker(client client.Client, broker broker.Broker) worker.Worker
- func ToJSONSchema[T any]() *jsonschema.Schema
- type CompletableFuture
- type Executor
- type Future
- type Local
- type Promise
- type RemoteAgent
- type RemoteRunCommand
- type RemoteRunResult
- type RemoteRunResultType
- type RunCommand
- func (r *RunCommand) ID() uuid.UUID
- func (r *RunCommand) Validate() error
- func (r RunCommand) WithContextVariables(contextVariables types.ContextVars) RunCommand
- func (r RunCommand) WithMaxTurns(maxTurns int) RunCommand
- func (r RunCommand) WithStream(stream bool) RunCommand
- func (r RunCommand) WithStructuredOutput(output *provider.StructuredOutput) RunCommand
- type Temporal
- func (t *Temporal) CallTool(ctx context.Context, tc remoteToolCallParams) (remoteToolCallResult, error)
- func (t *Temporal) PublishError(ctx context.Context, params completionParams, errMsg string) error
- func (t *Temporal) Run(ctx workflow.Context, cmd RemoteRunCommand) (RemoteRunResult, error)
- func (t *Temporal) RunChildWorkflow(ctx workflow.Context, cmd RemoteRunCommand) (RemoteRunResult, error)
- func (t *Temporal) RunCompletion(ctx context.Context, cmd completionParams) (RemoteRunResult, error)
- type TemporalProxy
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultUnmarshal ¶ added in v0.1.0
func NewTemporalAgentWorker ¶ added in v0.1.8
func ToJSONSchema ¶ added in v0.1.0
func ToJSONSchema[T any]() *jsonschema.Schema
Types ¶
type CompletableFuture ¶ added in v0.1.0
type Executor ¶
type Executor interface {
Run(context.Context, RunCommand, Promise) error
// contains filtered or unexported methods
}
type RemoteAgent ¶ added in v0.1.3
type RemoteAgent struct {
Name string `json:"name"`
Model string `json:"model"`
Instructions string `json:"instructions"`
ParallelToolCalls bool `json:"parallelToolCalls"`
}
func (*RemoteAgent) RenderInstructions ¶ added in v0.1.3
func (a *RemoteAgent) RenderInstructions(cv types.ContextVars) (string, error)
RenderInstructions renders the agent's instructions with the provided context variables.
type RemoteRunCommand ¶ added in v0.1.3
type RemoteRunCommand struct {
ID uuid.UUID `json:"id"`
Agent RemoteAgent `json:"agent"`
StructuredOutput *provider.StructuredOutput `json:"structured_output,omitempty"`
Stream bool `json:"stream"`
MaxTurns int `json:"max_turns"`
ContextVariables types.ContextVars `json:"context_variables,omitempty"`
Checkpoint shorttermmemory.Checkpoint `json:"checkpoint"`
}
func RemoteRunCommandFromRunCommand ¶ added in v0.1.3
func RemoteRunCommandFromRunCommand(cmd RunCommand) RemoteRunCommand
type RemoteRunResult ¶ added in v0.1.3
type RemoteRunResult struct {
ID uuid.UUID `json:"id"`
Checkpoint shorttermmemory.Checkpoint `json:"checkpoint"`
Result string `json:"result"`
Type RemoteRunResultType `json:"type"`
ToolCalls *messages.ToolCallMessage `json:"tool_calls,omitempty"`
ContextVariables types.ContextVars `json:"context_variables,omitempty"`
}
type RemoteRunResultType ¶ added in v0.1.3
type RemoteRunResultType uint8
const ( RemoteRunResultTypeIncomplete RemoteRunResultType = iota RemoteRunResultTypeCompletion RemoteRunResultTypeToolCall )
type RunCommand ¶
type RunCommand struct {
Agent api.Agent
Thread *shorttermmemory.Aggregator
StructuredOutput *provider.StructuredOutput
Stream bool
MaxTurns int
ContextVariables types.ContextVars
Hook events.Hook
// contains filtered or unexported fields
}
func NewRunCommand ¶
func NewRunCommand(agent api.Agent, thread *shorttermmemory.Aggregator, hook events.Hook) (RunCommand, error)
func (*RunCommand) ID ¶
func (r *RunCommand) ID() uuid.UUID
func (*RunCommand) Validate ¶ added in v0.1.0
func (r *RunCommand) Validate() error
func (RunCommand) WithContextVariables ¶
func (r RunCommand) WithContextVariables(contextVariables types.ContextVars) RunCommand
func (RunCommand) WithMaxTurns ¶
func (r RunCommand) WithMaxTurns(maxTurns int) RunCommand
func (RunCommand) WithStream ¶
func (r RunCommand) WithStream(stream bool) RunCommand
func (RunCommand) WithStructuredOutput ¶ added in v0.1.2
func (r RunCommand) WithStructuredOutput(output *provider.StructuredOutput) RunCommand
type Temporal ¶
type Temporal struct {
// contains filtered or unexported fields
}
func (*Temporal) PublishError ¶ added in v0.1.3
PublishError is an activity that publishes error events
func (*Temporal) Run ¶ added in v0.1.3
func (t *Temporal) Run(ctx workflow.Context, cmd RemoteRunCommand) (RemoteRunResult, error)
func (*Temporal) RunChildWorkflow ¶ added in v0.1.3
func (t *Temporal) RunChildWorkflow(ctx workflow.Context, cmd RemoteRunCommand) (RemoteRunResult, error)
func (*Temporal) RunCompletion ¶ added in v0.1.3
func (t *Temporal) RunCompletion(ctx context.Context, cmd completionParams) (RemoteRunResult, error)
type TemporalProxy ¶ added in v0.1.3
type TemporalProxy struct {
// contains filtered or unexported fields
}
func NewTemporal ¶ added in v0.1.8
func NewTemporal(client client.Client, broker broker.Broker) *TemporalProxy
func (*TemporalProxy) Run ¶ added in v0.1.3
func (t *TemporalProxy) Run(ctx context.Context, cmd RunCommand, promise Promise) error