task

package
v1.31.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2026 License: MIT Imports: 18 Imported by: 0

Documentation

Overview

Package task provides task management for Hector v2.

A Task is the unit of work in the A2A protocol. This package implements:

  • Full task state machine (submitted → working → completed/failed)
  • Human-in-the-loop (HITL) support with input_required state
  • Execution state persistence for task resumption
  • Task history and artifact management

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInputTimeout = errors.New("input timeout")
	ErrNoWaiter     = errors.New("no waiter for task")
	ErrWaiterFull   = errors.New("waiter channel full")
)

Errors

View Source
var (
	ErrTaskNotFound = &TaskError{Code: "task_not_found", Message: "task not found"}
	ErrTaskTerminal = &TaskError{Code: "task_terminal", Message: "task is in terminal state"}
)

Errors

Functions

func NewSQLTaskStore

func NewSQLTaskStore(db *sql.DB, dialect string) (a2asrv.TaskStore, error)

NewSQLTaskStore creates a new SQL-based TaskStore implementing a2asrv.TaskStore. The db connection should be shared with other services using the same database to prevent SQLite "database is locked" errors.

func NewTaskStore added in v1.21.0

func NewTaskStore(pool *config.DBPool, databaseDSN string) (a2asrv.TaskStore, error)

NewTaskStore creates a TaskStore using the provided database configuration. This is the new clean interface that accepts separated configs.

Types

type Awaiter

type Awaiter struct {
	// contains filtered or unexported fields
}

Awaiter manages waiting for human input on tasks. It supports both blocking (streaming) and async (request/response) modes.

func NewAwaiter

func NewAwaiter(defaultTimeout time.Duration) *Awaiter

NewAwaiter creates a new task awaiter.

func (*Awaiter) IsWaiting

func (a *Awaiter) IsWaiting(taskID string) bool

IsWaiting returns whether a task is waiting for input.

func (*Awaiter) ProvideInput

func (a *Awaiter) ProvideInput(taskID string, resp *InputResponse) error

ProvideInput provides input for a waiting task. This is called from the API handler when the user responds.

func (*Awaiter) WaitForInput

func (a *Awaiter) WaitForInput(ctx context.Context, task *Task) (*InputResponse, error)

WaitForInput blocks until input is received or timeout. This is used in blocking (streaming) mode.

type ExecutionState

type ExecutionState struct {
	// Phase identifies what phase the execution was in.
	Phase string

	// Iteration is the loop iteration (for tool call loops).
	Iteration int

	// Messages is the conversation history at pause time.
	Messages []*a2a.Message

	// PendingToolCall is the tool call awaiting approval.
	PendingToolCall *tool.ToolCall

	// Thinking accumulated so far.
	ThinkingContent   string
	ThinkingSignature string

	// TextAccumulator for partial text.
	TextAccumulator string

	// Timestamp when execution was paused.
	Timestamp time.Time

	// Custom data for complex scenarios.
	Custom map[string]any
}

ExecutionState contains the state needed to resume a task. This is critical for HITL scenarios where the task pauses for input.

type InMemoryTaskStore added in v1.21.0

type InMemoryTaskStore struct {
	// contains filtered or unexported fields
}

InMemoryTaskStore implements a2asrv.TaskStore using an in-memory map. It supports multi-app isolation.

func NewInMemoryTaskStore added in v1.21.0

func NewInMemoryTaskStore() *InMemoryTaskStore

NewInMemoryTaskStore creates a new in-memory task store.

func (*InMemoryTaskStore) DeleteByApp added in v1.21.0

func (s *InMemoryTaskStore) DeleteByApp(ctx context.Context, appName string) error

DeleteByApp removes all tasks for a specific app.

func (*InMemoryTaskStore) Get added in v1.21.0

func (s *InMemoryTaskStore) Get(ctx context.Context, id a2a.TaskID) (*a2a.Task, error)

Get retrieves a task by ID.

func (*InMemoryTaskStore) List added in v1.21.0

func (s *InMemoryTaskStore) List(ctx context.Context, appName, contextID string) ([]*a2a.Task, error)

List retrieves tasks for a specific app and context (session).

func (*InMemoryTaskStore) Save added in v1.21.0

func (s *InMemoryTaskStore) Save(ctx context.Context, task *a2a.Task) error

Save stores a task.

type InputOption

type InputOption struct {
	// ID is the identifier for this option.
	ID string

	// Label is the display text.
	Label string

	// Value is the value if selected.
	Value any

	// IsDefault indicates this is the default option.
	IsDefault bool
}

InputOption represents an option presented to the user.

type InputRequirement

type InputRequirement struct {
	// Type of input required.
	Type InputType

	// Prompt to show the user.
	Prompt *a2a.Message

	// Options available to the user.
	Options []InputOption

	// Timeout for the input request.
	Timeout time.Duration

	// ToolCall is the tool awaiting approval (for tool approval type).
	ToolCall *tool.ToolCall

	// RequestedAt is when the input was requested.
	RequestedAt time.Time
}

InputRequirement describes what human input is needed.

func ApprovalRequest

func ApprovalRequest(tc *tool.ToolCall, prompt string, timeout time.Duration) *InputRequirement

ApprovalRequest creates a standard tool approval input requirement.

func ClarificationRequest

func ClarificationRequest(prompt string, timeout time.Duration) *InputRequirement

ClarificationRequest creates a clarification input requirement.

func ConfirmationRequest

func ConfirmationRequest(prompt string, timeout time.Duration) *InputRequirement

ConfirmationRequest creates a confirmation input requirement.

type InputResponse

type InputResponse struct {
	// OptionID is the selected option (if options were provided).
	OptionID string

	// Value is the input value (for free-form input).
	Value any

	// Approved is true if the action was approved (for approval requests).
	Approved bool

	// Message is an optional message from the user.
	Message string
}

InputResponse contains the human input response.

type InputType

type InputType string

InputType identifies the type of human input required.

const (
	// InputTypeToolApproval requires approval for a tool call.
	InputTypeToolApproval InputType = "tool_approval"

	// InputTypeClarification requires clarifying information.
	InputTypeClarification InputType = "clarification"

	// InputTypeAuthentication requires authentication.
	InputTypeAuthentication InputType = "authentication"

	// InputTypeConfirmation requires confirmation to proceed.
	InputTypeConfirmation InputType = "confirmation"
)

type PersistentService added in v1.21.0

type PersistentService struct {
	// contains filtered or unexported fields
}

PersistentService implements Service using a persistent backend. It ensures strict app isolation by relying on the store's context-aware methods.

func NewPersistentService added in v1.21.0

func NewPersistentService(store PersistentTaskStore) *PersistentService

NewPersistentService creates a new persistent task service.

func (*PersistentService) Cancel added in v1.21.0

func (s *PersistentService) Cancel(ctx context.Context, taskID string) error

Cancel cancels a task and its active executions.

func (*PersistentService) Create added in v1.21.0

func (s *PersistentService) Create(ctx context.Context, contextID string) (*Task, error)

Create creates a new task and persists it.

func (*PersistentService) Get added in v1.21.0

func (s *PersistentService) Get(ctx context.Context, taskID string) (*Task, error)

Get retrieves a task by ID.

func (*PersistentService) GetOrCreate added in v1.21.0

func (s *PersistentService) GetOrCreate(ctx context.Context, taskID, contextID string) (*Task, error)

GetOrCreate retrieves a task by ID or creates one with that ID if not found.

func (*PersistentService) List added in v1.21.0

func (s *PersistentService) List(ctx context.Context, contextID string) ([]*Task, error)

List lists tasks for a context.

func (*PersistentService) Update added in v1.21.0

func (s *PersistentService) Update(ctx context.Context, task *Task) error

Update saves task changes.

type PersistentTaskStore added in v1.21.0

type PersistentTaskStore interface {
	a2asrv.TaskStore
	List(ctx context.Context, appName, contextID string) ([]*a2a.Task, error)
}

PersistentTaskStore interface combines a2asrv.TaskStore with additional methods we need.

type SQLTaskStore

type SQLTaskStore struct {
	// contains filtered or unexported fields
}

SQLTaskStore implements a2asrv.TaskStore using SQL database. It stores a2a.Task objects directly (as required by the interface).

func (*SQLTaskStore) Close

func (s *SQLTaskStore) Close() error

Close closes the database connection.

func (*SQLTaskStore) DeleteByApp added in v1.21.0

func (s *SQLTaskStore) DeleteByApp(ctx context.Context, appName string) error

DeleteByApp removes all tasks for a specific app.

func (*SQLTaskStore) Get

func (s *SQLTaskStore) Get(ctx context.Context, taskID a2a.TaskID) (*a2a.Task, error)

Get retrieves a task by ID (implements a2asrv.TaskStore).

func (*SQLTaskStore) List added in v1.21.0

func (s *SQLTaskStore) List(ctx context.Context, appName, contextID string) ([]*a2a.Task, error)

List retrieves tasks for a specific app and context (session). It supports filtering by status (optional).

func (*SQLTaskStore) Save

func (s *SQLTaskStore) Save(ctx context.Context, task *a2a.Task) error

Save stores a task (implements a2asrv.TaskStore). Uses UPSERT for atomic insert/update. For optimistic concurrency, we check if the task was modified since we loaded it by comparing timestamps stored in task.Metadata["_updated_at"]. If stale, we log a warning but proceed (a2a protocol handles task state transitions).

func (*SQLTaskStore) Update added in v1.21.0

func (s *SQLTaskStore) Update(ctx context.Context, task *a2a.Task) error

Update persists changes to an existing task. This is effectively an alias for Save but semantically useful for the interface.

type Service

type Service interface {
	// Create creates a new task.
	Create(ctx context.Context, contextID string) (*Task, error)

	// GetOrCreate retrieves a task by ID or creates one with that ID if not found.
	// This is used to associate executor tasks with a2a's task IDs.
	GetOrCreate(ctx context.Context, taskID, contextID string) (*Task, error)

	// Get retrieves a task by ID.
	Get(ctx context.Context, taskID string) (*Task, error)

	// Update saves task changes.
	Update(ctx context.Context, task *Task) error

	// Cancel cancels a task.
	Cancel(ctx context.Context, taskID string) error

	// List lists tasks for a context.
	List(ctx context.Context, contextID string) ([]*Task, error)
}

Service manages tasks.

func NewService added in v1.21.0

func NewService(store a2asrv.TaskStore) (Service, error)

NewService creates a new persistent task service.

type State

type State string

State represents the current state of a task.

const (
	// StateSubmitted means the task has been submitted but not started.
	StateSubmitted State = "submitted"

	// StateWorking means the task is being processed.
	StateWorking State = "working"

	// StateCompleted means the task finished successfully.
	StateCompleted State = "completed"

	// StateFailed means the task failed with an error.
	StateFailed State = "failed"

	// StateCancelled means the task was cancelled.
	StateCancelled State = "cancelled"

	// StateInputRequired means the task is waiting for human input (HITL).
	StateInputRequired State = "input_required"

	// StateAuthRequired means the task needs authentication.
	StateAuthRequired State = "auth_required"

	// StateRejected means the task was rejected (e.g., approval denied).
	StateRejected State = "rejected"
)

func (State) IsPending

func (s State) IsPending() bool

IsPending returns whether this state is waiting for something.

func (State) IsTerminal

func (s State) IsTerminal() bool

IsTerminal returns whether this state is terminal (no more transitions).

type Status

type Status struct {
	State     State
	Message   *a2a.Message
	Timestamp time.Time
	Error     error // For failed state
}

Status contains the task state and an optional message.

type Task

type Task struct {
	// ID is the unique identifier for this task.
	ID string

	// AppName identifies the app this task belongs to (multi-tenancy).
	AppName string

	// ContextID links this task to a session/conversation.
	ContextID string
	// Status contains the current state and message.
	Status Status

	// History is the task-specific message history.
	History []*a2a.Message

	// Artifacts produced by this task.
	Artifacts []a2a.Artifact

	// Metadata contains additional task data.
	Metadata map[string]any

	// InputRequirement specifies what input is needed (for HITL).
	InputRequirement *InputRequirement

	// ExecutionState for task resumption.
	ExecutionState *ExecutionState

	// CreatedAt is when the task was created.
	CreatedAt time.Time

	// UpdatedAt is when the task was last updated.
	UpdatedAt time.Time
	// contains filtered or unexported fields
}

Task represents a unit of work in the A2A protocol. Tasks have a full state machine and support human-in-the-loop interactions.

func New

func New(contextID, appName string) *Task

New creates a new task.

func (*Task) AddArtifact

func (t *Task) AddArtifact(artifact a2a.Artifact)

AddArtifact adds an artifact to the task.

func (*Task) AppendHistory

func (t *Task) AppendHistory(msg *a2a.Message)

AppendHistory adds a message to the task history.

func (*Task) CancelAllChildren

func (t *Task) CancelAllChildren() (cancelled int, failed int)

CancelAllChildren cancels all active child executions (cascade cancellation). Returns the count of successfully cancelled and failed cancellations.

func (*Task) CancelExecution

func (t *Task) CancelExecution(callID string) bool

CancelExecution cancels a specific child execution by callID. Returns true if cancellation was initiated.

func (*Task) GetID added in v1.16.1

func (t *Task) GetID() string

GetID returns the task ID.

func (*Task) GetStatus

func (t *Task) GetStatus() Status

GetStatus returns the current status (thread-safe).

func (*Task) LoadExecutionState

func (t *Task) LoadExecutionState() *ExecutionState

LoadExecutionState returns and clears the saved execution state.

func (*Task) ProvideInput

func (t *Task) ProvideInput(optionID string) *InputOption

ProvideInput processes human input and clears the requirement.

func (*Task) RegisterExecution

func (t *Task) RegisterExecution(exec *agent.ChildExecution)

RegisterExecution registers a child execution (tool or sub-agent) for cascade cancellation. Implements agent.CancellableTask interface.

func (*Task) RequestInput

func (t *Task) RequestInput(req *InputRequirement)

RequestInput pauses the task for human input.

func (*Task) SaveExecutionState

func (t *Task) SaveExecutionState(state *ExecutionState)

SaveExecutionState saves the current execution state for later resumption.

func (*Task) SetMetadata

func (t *Task) SetMetadata(key string, value any)

SetMetadata sets a metadata value.

func (*Task) SetStatus

func (t *Task) SetStatus(state State, message *a2a.Message, err error)

SetStatus updates the task status.

func (*Task) UnregisterExecution

func (t *Task) UnregisterExecution(callID string)

UnregisterExecution removes a child execution from tracking. Should be called when execution completes (success or failure).

type TaskError

type TaskError struct {
	Code    string
	Message string
}

TaskError is a task-related error.

func (*TaskError) Error

func (e *TaskError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL