Documentation
¶
Overview ¶
Package task provides task management for Hector v2.
A Task is the unit of work in the A2A protocol. This package implements:
- Full task state machine (submitted → working → completed/failed)
- Human-in-the-loop (HITL) support with input_required state
- Execution state persistence for task resumption
- Task history and artifact management
Index ¶
- Variables
- func NewSQLTaskStore(db *sql.DB, dialect string) (a2asrv.TaskStore, error)
- func NewTaskStoreFromConfig(cfg *config.Config, pool *config.DBPool) (a2asrv.TaskStore, error)
- type Awaiter
- type ExecutionState
- type InMemoryService
- func (s *InMemoryService) Cancel(_ context.Context, taskID string) error
- func (s *InMemoryService) Create(_ context.Context, contextID string) (*Task, error)
- func (s *InMemoryService) Get(_ context.Context, taskID string) (*Task, error)
- func (s *InMemoryService) GetOrCreate(_ context.Context, taskID, contextID string) (*Task, error)
- func (s *InMemoryService) List(_ context.Context, contextID string) ([]*Task, error)
- func (s *InMemoryService) Update(_ context.Context, task *Task) error
- type InputOption
- type InputRequirement
- type InputResponse
- type InputType
- type MemoryTaskStore
- type SQLTaskStore
- type Service
- type State
- type Status
- type Task
- func (t *Task) AddArtifact(artifact a2a.Artifact)
- func (t *Task) AppendHistory(msg *a2a.Message)
- func (t *Task) CancelAllChildren() (cancelled int, failed int)
- func (t *Task) CancelExecution(callID string) bool
- func (t *Task) GetStatus() Status
- func (t *Task) LoadExecutionState() *ExecutionState
- func (t *Task) ProvideInput(optionID string) *InputOption
- func (t *Task) RegisterExecution(exec *agent.ChildExecution)
- func (t *Task) RequestInput(req *InputRequirement)
- func (t *Task) SaveExecutionState(state *ExecutionState)
- func (t *Task) SetMetadata(key string, value any)
- func (t *Task) SetStatus(state State, message *a2a.Message, err error)
- func (t *Task) UnregisterExecution(callID string)
- type TaskError
Constants ¶
This section is empty.
Variables ¶
var ( ErrInputTimeout = errors.New("input timeout") ErrNoWaiter = errors.New("no waiter for task") ErrWaiterFull = errors.New("waiter channel full") )
Errors
var ( ErrTaskNotFound = &TaskError{Code: "task_not_found", Message: "task not found"} ErrTaskTerminal = &TaskError{Code: "task_terminal", Message: "task is in terminal state"} )
Errors
Functions ¶
func NewSQLTaskStore ¶
NewSQLTaskStore creates a new SQL-based TaskStore implementing a2asrv.TaskStore. The db connection should be shared with other services using the same database to prevent SQLite "database is locked" errors.
func NewTaskStoreFromConfig ¶
NewTaskStoreFromConfig creates a TaskStore based on configuration. DBPool is required for SQL backends to share connections and prevent lock errors. Returns nil if no task persistence is configured (a2a-go uses in-memory).
Example config:
databases:
default:
driver: sqlite
database: ./.hector/tasks.db
server:
tasks:
backend: sql
database: default
Types ¶
type Awaiter ¶
type Awaiter struct {
// contains filtered or unexported fields
}
Awaiter manages waiting for human input on tasks. It supports both blocking (streaming) and async (request/response) modes.
func NewAwaiter ¶
NewAwaiter creates a new task awaiter.
func (*Awaiter) ProvideInput ¶
func (a *Awaiter) ProvideInput(taskID string, resp *InputResponse) error
ProvideInput provides input for a waiting task. This is called from the API handler when the user responds.
func (*Awaiter) WaitForInput ¶
WaitForInput blocks until input is received or timeout. This is used in blocking (streaming) mode.
type ExecutionState ¶
type ExecutionState struct {
// Phase identifies what phase the execution was in.
Phase string
// Iteration is the loop iteration (for tool call loops).
Iteration int
// Messages is the conversation history at pause time.
Messages []*a2a.Message
// PendingToolCall is the tool call awaiting approval.
PendingToolCall *tool.ToolCall
// Thinking accumulated so far.
ThinkingContent string
ThinkingSignature string
// TextAccumulator for partial text.
TextAccumulator string
// Timestamp when execution was paused.
Timestamp time.Time
// Custom data for complex scenarios.
Custom map[string]any
}
ExecutionState contains the state needed to resume a task. This is critical for HITL scenarios where the task pauses for input.
type InMemoryService ¶
type InMemoryService struct {
// contains filtered or unexported fields
}
InMemoryService is an in-memory implementation of Service.
func NewInMemoryService ¶
func NewInMemoryService() *InMemoryService
NewInMemoryService creates a new in-memory task service.
func (*InMemoryService) Cancel ¶
func (s *InMemoryService) Cancel(_ context.Context, taskID string) error
Cancel cancels a task.
func (*InMemoryService) GetOrCreate ¶
GetOrCreate retrieves a task by ID or creates one with that ID if not found.
type InputOption ¶
type InputOption struct {
// ID is the identifier for this option.
ID string
// Label is the display text.
Label string
// Value is the value if selected.
Value any
// IsDefault indicates this is the default option.
IsDefault bool
}
InputOption represents an option presented to the user.
type InputRequirement ¶
type InputRequirement struct {
// Type of input required.
Type InputType
// Prompt to show the user.
Prompt *a2a.Message
// Options available to the user.
Options []InputOption
// Timeout for the input request.
Timeout time.Duration
// ToolCall is the tool awaiting approval (for tool approval type).
ToolCall *tool.ToolCall
// RequestedAt is when the input was requested.
RequestedAt time.Time
}
InputRequirement describes what human input is needed.
func ApprovalRequest ¶
ApprovalRequest creates a standard tool approval input requirement.
func ClarificationRequest ¶
func ClarificationRequest(prompt string, timeout time.Duration) *InputRequirement
ClarificationRequest creates a clarification input requirement.
func ConfirmationRequest ¶
func ConfirmationRequest(prompt string, timeout time.Duration) *InputRequirement
ConfirmationRequest creates a confirmation input requirement.
type InputResponse ¶
type InputResponse struct {
// OptionID is the selected option (if options were provided).
OptionID string
// Value is the input value (for free-form input).
Value any
// Approved is true if the action was approved (for approval requests).
Approved bool
// Message is an optional message from the user.
Message string
}
InputResponse contains the human input response.
type InputType ¶
type InputType string
InputType identifies the type of human input required.
const ( // InputTypeToolApproval requires approval for a tool call. InputTypeToolApproval InputType = "tool_approval" // InputTypeClarification requires clarifying information. InputTypeClarification InputType = "clarification" // InputTypeAuthentication requires authentication. InputTypeAuthentication InputType = "authentication" // InputTypeConfirmation requires confirmation to proceed. InputTypeConfirmation InputType = "confirmation" )
type MemoryTaskStore ¶ added in v1.15.1
type MemoryTaskStore struct {
// contains filtered or unexported fields
}
MemoryTaskStore is a simple in-memory implementation of a2asrv.TaskStore. Used as the default when no persistent storage is configured.
func NewMemoryTaskStore ¶ added in v1.15.1
func NewMemoryTaskStore() *MemoryTaskStore
NewMemoryTaskStore creates a new in-memory task store.
func (*MemoryTaskStore) Close ¶ added in v1.15.1
func (s *MemoryTaskStore) Close() error
Close closes the store (implements a2asrv.TaskStore).
type SQLTaskStore ¶
type SQLTaskStore struct {
// contains filtered or unexported fields
}
SQLTaskStore implements a2asrv.TaskStore using SQL database. It stores a2a.Task objects directly (as required by the interface).
func (*SQLTaskStore) Close ¶
func (s *SQLTaskStore) Close() error
Close closes the database connection.
func (*SQLTaskStore) Save ¶
Save stores a task (implements a2asrv.TaskStore). Uses UPSERT for atomic insert/update. For optimistic concurrency, we check if the task was modified since we loaded it by comparing timestamps stored in task.Metadata["_updated_at"]. If stale, we log a warning but proceed (a2a protocol handles task state transitions).
type Service ¶
type Service interface {
// Create creates a new task.
Create(ctx context.Context, contextID string) (*Task, error)
// GetOrCreate retrieves a task by ID or creates one with that ID if not found.
// This is used to associate executor tasks with a2a's task IDs.
GetOrCreate(ctx context.Context, taskID, contextID string) (*Task, error)
// Get retrieves a task by ID.
Get(ctx context.Context, taskID string) (*Task, error)
// Update saves task changes.
Update(ctx context.Context, task *Task) error
// Cancel cancels a task.
Cancel(ctx context.Context, taskID string) error
// List lists tasks for a context.
List(ctx context.Context, contextID string) ([]*Task, error)
}
Service manages tasks.
type State ¶
type State string
State represents the current state of a task.
const ( // StateSubmitted means the task has been submitted but not started. StateSubmitted State = "submitted" // StateWorking means the task is being processed. StateWorking State = "working" // StateCompleted means the task finished successfully. StateCompleted State = "completed" // StateFailed means the task failed with an error. StateFailed State = "failed" // StateCancelled means the task was cancelled. StateCancelled State = "cancelled" // StateInputRequired means the task is waiting for human input (HITL). StateInputRequired State = "input_required" // StateAuthRequired means the task needs authentication. StateAuthRequired State = "auth_required" // StateRejected means the task was rejected (e.g., approval denied). StateRejected State = "rejected" )
func (State) IsTerminal ¶
IsTerminal returns whether this state is terminal (no more transitions).
type Status ¶
type Status struct {
State State
Message *a2a.Message
Timestamp time.Time
Error error // For failed state
}
Status contains the task state and an optional message.
type Task ¶
type Task struct {
// ID is the unique identifier for this task.
ID string
// ContextID links this task to a session/conversation.
ContextID string
// Status contains the current state and message.
Status Status
// History is the task-specific message history.
History []*a2a.Message
// Artifacts produced by this task.
Artifacts []a2a.Artifact
// Metadata contains additional task data.
Metadata map[string]any
// InputRequirement specifies what input is needed (for HITL).
InputRequirement *InputRequirement
// ExecutionState for task resumption.
ExecutionState *ExecutionState
// CreatedAt is when the task was created.
CreatedAt time.Time
// UpdatedAt is when the task was last updated.
UpdatedAt time.Time
// contains filtered or unexported fields
}
Task represents a unit of work in the A2A protocol. Tasks have a full state machine and support human-in-the-loop interactions.
func (*Task) AddArtifact ¶
AddArtifact adds an artifact to the task.
func (*Task) AppendHistory ¶
AppendHistory adds a message to the task history.
func (*Task) CancelAllChildren ¶
CancelAllChildren cancels all active child executions (cascade cancellation). Returns the count of successfully cancelled and failed cancellations.
func (*Task) CancelExecution ¶
CancelExecution cancels a specific child execution by callID. Returns true if cancellation was initiated.
func (*Task) LoadExecutionState ¶
func (t *Task) LoadExecutionState() *ExecutionState
LoadExecutionState returns and clears the saved execution state.
func (*Task) ProvideInput ¶
func (t *Task) ProvideInput(optionID string) *InputOption
ProvideInput processes human input and clears the requirement.
func (*Task) RegisterExecution ¶
func (t *Task) RegisterExecution(exec *agent.ChildExecution)
RegisterExecution registers a child execution (tool or sub-agent) for cascade cancellation. Implements agent.CancellableTask interface.
func (*Task) RequestInput ¶
func (t *Task) RequestInput(req *InputRequirement)
RequestInput pauses the task for human input.
func (*Task) SaveExecutionState ¶
func (t *Task) SaveExecutionState(state *ExecutionState)
SaveExecutionState saves the current execution state for later resumption.
func (*Task) SetMetadata ¶
SetMetadata sets a metadata value.
func (*Task) UnregisterExecution ¶
UnregisterExecution removes a child execution from tracking. Should be called when execution completes (success or failure).