Documentation
¶
Overview ¶
Package taskexec provides a concurrent agent invocation manager. The manager enforces concurrency control on task level and guarantees that at any given moment there's only one goroutine which is mutating the state of an a2a.Task.
For every [execution] the [localManager] starts two goroutines in an errgroup.Group:
- One calls Executor and starts producing events writing them to an eventqueue.Queue.
- The second one reads events in a loop and passes them through Processor responsible for deciding when to stop.
For cancelations the handling is different depending on whether there exists a concurrent execution:
- When a concurrent execution exists, only one goroutine is started which calls Canceler. It writes an event to the same queue the execution is using and expects the concurrently-running event consumer to handle it.
- When there's no concurrent execution, the mechanism is the same as for the execution: one goroutine which calls cancel and the second one which processes events.
Event consumer continues to run waiting for a terminal event to be produced, even if execution goroutine finished.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrExecutionInProgress is returned when a caller attempts to start an execution for // a Task concurrently with another execution. ErrExecutionInProgress = errors.New("task execution is already in progress") // ErrCancelationInProgress is returned when a caller attempts to start an execution for // a Task concurrently with its cancelation. ErrCancelationInProgress = errors.New("task cancelation is in progress") )
Functions ¶
This section is empty.
Types ¶
type Canceler ¶
type Canceler interface {
// Cancel attempts to cancel a Task.
// Expected to produce a Task update event with canceled state.
Cancel(context.Context, eventqueue.Queue) error
}
Canceler implementation sends a Task cancelation signal.
type Cleaner ¶ added in v0.3.9
type Cleaner interface {
// Cleanup is called after execution or cancellation with the final result.
Cleanup(context.Context, a2a.SendMessageResult, error)
}
Cleaner implementation can be used to run callbacks after execution or cancellation completes.
type DistributedManagerConfig ¶ added in v0.3.4
type DistributedManagerConfig struct {
WorkQueue workqueue.Queue
QueueManager eventqueue.Manager
Factory Factory
TaskStore TaskStore
ConcurrencyConfig limiter.ConcurrencyConfig
Logger *slog.Logger
PanicHandler PanicHandlerFn
}
DistributedManagerConfig contains configuration for A2A task execution mode where work is distributed across an A2A cluster.
type Executor ¶
type Executor interface {
// Start starts publishing events to the queue. Called in a separate goroutine.
Execute(context.Context, eventqueue.Queue) error
}
Executor implementation starts an agent execution.
type Factory ¶ added in v0.3.3
type Factory interface {
// CreateExecutor is used to create initialized Executor and Processor for a Task execution which will run in separate goroutines.
CreateExecutor(context.Context, a2a.TaskID, *a2a.MessageSendParams) (Executor, Processor, Cleaner, error)
// CreateCanceler is used to create initialized Canceler and Processor for a Task cancelation which will run in separate goroutines.
CreateCanceler(context.Context, *a2a.TaskIDParams) (Canceler, Processor, Cleaner, error)
}
Factory is used to setup task execution or cancelation context.
type LocalManagerConfig ¶ added in v0.3.4
type LocalManagerConfig struct {
QueueManager eventqueue.Manager
ConcurrencyConfig limiter.ConcurrencyConfig
Factory Factory
PanicHandler PanicHandlerFn
}
LocalManagerConfig contains in-process execution Manager configuration parameters.
type Manager ¶
type Manager interface {
// Resubscribe is used to resubscribe to events of an active execution.
Resubscribe(ctx context.Context, taskID a2a.TaskID) (Subscription, error)
// Execute requests an execution for handling a received message.
Execute(ctx context.Context, params *a2a.MessageSendParams) (Subscription, error)
// Cancel requests a task cancelation.
Cancel(ctx context.Context, params *a2a.TaskIDParams) (*a2a.Task, error)
}
Manager provides an API for executing and canceling tasks.
func NewDistributedManager ¶ added in v0.3.4
func NewDistributedManager(cfg *DistributedManagerConfig) Manager
NewDistributedManager creates a new Manager instance which uses WorkQueue for work distribution across A2A cluster.
func NewLocalManager ¶ added in v0.3.3
func NewLocalManager(cfg LocalManagerConfig) Manager
NewLocalManager is a [localManager] constructor function.
type PanicHandlerFn ¶ added in v0.3.6
PanicHandlerFn is a function that handles panics occurred during execution.
type Processor ¶
type Processor interface {
// Process is called for each event produced by the started Execution. Called in a separate goroutine.
Process(context.Context, a2a.Event) (*ProcessorResult, error)
// ProcessError is called when an execution error is encountered to try recovering from it.
// If it returns a result, the returned value becomes the result of the execution. If an error can't be handled
// either a modified error or the original error cause must be returned.
ProcessError(context.Context, error) (a2a.SendMessageResult, error)
}
Processor implementation handles events produced during AgentExecution.
type ProcessorResult ¶ added in v0.3.4
type ProcessorResult struct {
// ExecutionResult becomes the result of the execution if a non-nil value is returned.
ExecutionResult a2a.SendMessageResult
// ExecutionFailureCause can be returned by the processor to pass information about why the execution stopped to event producer.
// It is set when ExecutionResult is not a direct consequence of executor-emitted event: for example, a malformed event was received and task was moved to failed state.
// The cause will be accessible using context.Cause(ctx) in the executor code.
ExecutionFailureCause error
// TaskVersion is the version of the task after the event was processed.
TaskVersion a2a.TaskVersion
// EventOverride can be returned by the processor to change which event gets emitted to subscribers.
// This is useful when we failed to process a malformed event and moved the task to failed state.
EventOverride a2a.Event
}
ProcessorResult is returned by processor after an event was handled successfuly.
type Subscription ¶
type Subscription interface {
// TaskID is ID of the task to which this subscription is related.
TaskID() a2a.TaskID
// Events returns a sequence of events. If error is returned the sequence is terminated.
// This method can only be called once.
Events(ctx context.Context) iter.Seq2[a2a.Event, error]
}
Subscription encapsulates the logic of subscribing to execution events.