Documentation
¶
Overview ¶
Package native provides the native execution provider implementation.
The native provider implements execution.Provider using a local worker pool and SQL-backed queue for durable task execution. Features include:
- Persistent task queue backed by SQL database
- Configurable retry policies with exponential backoff
- Dead-letter queue for failed tasks
- Worker pool for controlled concurrency
Index ¶
- type CollectingEventQueue
- type ExecutorProvider
- type MetricsExporter
- type NativeProvider
- type Queue
- type QueueItem
- func (i *QueueItem) DeserializeMetadata(data string) error
- func (i *QueueItem) ExtractContext(ctx context.Context) context.Context
- func (i *QueueItem) InjectContext(ctx context.Context)
- func (i *QueueItem) IsExpired() bool
- func (i *QueueItem) IsScheduled() bool
- func (i *QueueItem) SerializeMetadata() (string, error)
- func (i *QueueItem) WithContextID(contextID string) *QueueItem
- func (i *QueueItem) WithDeadline(deadline time.Time) *QueueItem
- func (i *QueueItem) WithMaxRetries(maxRetries int) *QueueItem
- func (i *QueueItem) WithMetadata(key string, value any) *QueueItem
- func (i *QueueItem) WithPriority(priority int) *QueueItem
- func (i *QueueItem) WithScheduledFor(scheduledFor time.Time) *QueueItem
- func (i *QueueItem) WithUserID(userID string) *QueueItem
- type QueueItemStatus
- type QueueStats
- type RetryPolicy
- type SQLQueue
- func (q *SQLQueue) Ack(ctx context.Context, itemID string) error
- func (q *SQLQueue) Close() error
- func (q *SQLQueue) Dequeue(ctx context.Context) (*QueueItem, error)
- func (q *SQLQueue) Enqueue(ctx context.Context, item *QueueItem) error
- func (q *SQLQueue) ListDLQ(ctx context.Context, appName string, limit int) ([]*QueueItem, error)
- func (q *SQLQueue) Nack(ctx context.Context, itemID string, errMsg error) error
- func (q *SQLQueue) RecoverStale(ctx context.Context, staleThreshold time.Duration) error
- func (q *SQLQueue) Requeue(ctx context.Context, itemID string) error
- func (q *SQLQueue) Stats(ctx context.Context, appName string) (*QueueStats, error)
- type SQLQueueOption
- type WorkerPool
- type WorkerPoolOptions
- type WorkerPoolStats
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CollectingEventQueue ¶
type CollectingEventQueue struct {
// contains filtered or unexported fields
}
CollectingEventQueue collects A2A events for non-streaming execution. It implements a2asrv/eventqueue.Queue interface.
func NewCollectingEventQueue ¶
func NewCollectingEventQueue() *CollectingEventQueue
NewCollectingEventQueue creates a new collector.
func (*CollectingEventQueue) Close ¶
func (q *CollectingEventQueue) Close() error
Close marks the queue as closed.
func (*CollectingEventQueue) Events ¶
func (q *CollectingEventQueue) Events() []a2a.Event
Events returns all collected events.
type ExecutorProvider ¶
type ExecutorProvider interface {
// GetExecutor returns an executor for the given app and agent.
GetExecutor(ctx context.Context, appID, agentName string) (a2asrv.AgentExecutor, error)
}
ExecutorProvider provides executors for app/agent combinations. Implemented by server.AppManager.
type MetricsExporter ¶
type MetricsExporter struct {
// contains filtered or unexported fields
}
MetricsExporter periodicall collects queue stats and exports them to metrics.
func NewMetricsExporter ¶
func NewMetricsExporter(queue Queue, metrics *observability.Metrics, appName string) *MetricsExporter
NewMetricsExporter creates a new metrics exporter.
type NativeProvider ¶
type NativeProvider struct {
// contains filtered or unexported fields
}
NativeProvider implements execution.Provider using a local worker pool and SQL queue.
func NewProvider ¶
func NewProvider( pool *config.DBPool, dsn string, retryPolicy *RetryPolicy, executorProvider ExecutorProvider, taskService task.Service, metrics *observability.Metrics, opts WorkerPoolOptions, ) (*NativeProvider, error)
NewProvider creates a new NativeProvider.
func (*NativeProvider) Queue ¶
func (p *NativeProvider) Queue() Queue
Queue returns the underlying queue (for metrics/admin).
type Queue ¶
type Queue interface {
// Enqueue adds a task to the queue for processing.
// Returns immediately - task will be processed by a worker.
Enqueue(ctx context.Context, item *QueueItem) error
// Dequeue retrieves the next task for processing.
// The item is atomically marked as "processing".
// Blocks until a task is available or context is cancelled.
Dequeue(ctx context.Context) (*QueueItem, error)
// Ack acknowledges successful task completion.
// The task is marked as completed.
Ack(ctx context.Context, itemID string) error
// Nack negatively acknowledges - task failed, may retry.
// If retry_count >= max_retries, task moves to dead-letter queue.
Nack(ctx context.Context, itemID string, err error) error
// Requeue moves a dead-letter item back to pending queue.
Requeue(ctx context.Context, itemID string) error
// RecoverStale finds items stuck in "processing" state and resets them.
// Called on startup to handle items from crashed workers.
RecoverStale(ctx context.Context, staleThreshold time.Duration) error
// Stats returns queue statistics for an app.
Stats(ctx context.Context, appName string) (*QueueStats, error)
// ListDLQ returns items in dead-letter queue.
ListDLQ(ctx context.Context, appName string, limit int) ([]*QueueItem, error)
// Close gracefully shuts down the queue.
Close() error
}
Queue abstracts task queueing for resilience and durability.
The queue provides at-least-once delivery semantics:
- Tasks are persisted before acknowledgement
- Failed tasks are retried according to RetryPolicy
- Tasks exceeding max retries are moved to dead-letter queue
type QueueItem ¶
type QueueItem struct {
// ID is the unique identifier for this queue item.
ID string `json:"id"`
// TaskID links to the a2a_tasks table for result storage.
TaskID string `json:"task_id"`
// AppName identifies the app (multi-tenant isolation).
AppName string `json:"app_name"`
// AgentName is the agent to invoke.
AgentName string `json:"agent_name"`
// Input is the user input/prompt for the agent.
Input string `json:"input"`
// ContextID is the session ID for conversation continuity.
ContextID string `json:"context_id"`
// UserID is the user identifier.
UserID string `json:"user_id"`
// Metadata contains additional task data.
Metadata map[string]any `json:"metadata,omitempty"`
// Priority affects dequeue order (higher = sooner).
Priority int `json:"priority"`
// MaxRetries is the maximum number of retry attempts.
MaxRetries int `json:"max_retries"`
// RetryCount is the current number of retry attempts.
RetryCount int `json:"retry_count"`
// Status is the current state of the queue item.
Status QueueItemStatus `json:"status"`
// EnqueuedAt is when the item was added to the queue.
EnqueuedAt time.Time `json:"enqueued_at"`
// ScheduledFor is when the item should be processed (for delayed execution).
ScheduledFor time.Time `json:"scheduled_for"`
// DeadlineAt is the deadline for task completion.
DeadlineAt time.Time `json:"deadline_at"`
// StartedAt is when processing started.
StartedAt *time.Time `json:"started_at,omitempty"`
// CompletedAt is when processing completed.
CompletedAt *time.Time `json:"completed_at,omitempty"`
// LastError is the most recent error message.
LastError string `json:"last_error,omitempty"`
// WorkerID is the ID of the worker processing this item.
WorkerID string `json:"worker_id,omitempty"`
}
QueueItem represents a queued task.
func NewQueueItem ¶
NewQueueItem creates a new queue item with sensible defaults.
func (*QueueItem) DeserializeMetadata ¶
DeserializeMetadata parses metadata from JSON.
func (*QueueItem) ExtractContext ¶
ExtractContext extracts trace information from the item's metadata and returns a new context. This restores the trace context from the queue item.
func (*QueueItem) InjectContext ¶
InjectContext injects the current context's trace information into the item's metadata. This allows the trace to be propagated across the queue boundary.
func (*QueueItem) IsScheduled ¶
IsScheduled returns true if the item is scheduled for future processing.
func (*QueueItem) SerializeMetadata ¶
SerializeMetadata converts metadata to JSON.
func (*QueueItem) WithContextID ¶
WithContextID sets the session ID for conversation continuity.
func (*QueueItem) WithDeadline ¶
WithDeadline sets when the task must complete by.
func (*QueueItem) WithMaxRetries ¶
WithMaxRetries sets the maximum retry attempts.
func (*QueueItem) WithMetadata ¶
WithMetadata sets additional metadata.
func (*QueueItem) WithPriority ¶
WithPriority sets the priority (higher = processed sooner).
func (*QueueItem) WithScheduledFor ¶
WithScheduledFor sets when the task should be processed.
func (*QueueItem) WithUserID ¶
WithUserID sets the user identifier.
type QueueItemStatus ¶
type QueueItemStatus string
QueueItemStatus represents the state of a queue item.
const ( // QueueItemPending - waiting to be processed. QueueItemPending QueueItemStatus = "pending" // QueueItemProcessing - currently being processed by a worker. QueueItemProcessing QueueItemStatus = "processing" // QueueItemCompleted - successfully completed. QueueItemCompleted QueueItemStatus = "completed" // QueueItemFailed - failed but may be retried. QueueItemFailed QueueItemStatus = "failed" // QueueItemDead - exceeded max retries, in dead-letter queue. QueueItemDead QueueItemStatus = "dead" )
type QueueStats ¶
type QueueStats struct {
// Pending is the count of items waiting to be processed.
Pending int64 `json:"pending"`
// Processing is the count of items currently being processed.
Processing int64 `json:"processing"`
// Completed is the count of successfully completed items.
Completed int64 `json:"completed"`
// Failed is the count of failed items (may be retried).
Failed int64 `json:"failed"`
// DeadLetter is the count of items in dead-letter queue.
DeadLetter int64 `json:"dead_letter"`
// OldestPending is the age of the oldest pending item.
OldestPending time.Duration `json:"oldest_pending"`
// AvgProcessingTime is the average processing time for completed items.
AvgProcessingTime time.Duration `json:"avg_processing_time"`
}
QueueStats contains queue statistics.
type RetryPolicy ¶
type RetryPolicy struct {
// MaxRetries is the maximum number of retry attempts.
MaxRetries int `yaml:"max_retries" json:"max_retries"`
// InitialDelay is the delay before the first retry.
InitialDelay time.Duration `yaml:"initial_delay" json:"initial_delay"`
// MaxDelay is the maximum delay between retries.
MaxDelay time.Duration `yaml:"max_delay" json:"max_delay"`
// BackoffFactor is the multiplier for exponential backoff.
BackoffFactor float64 `yaml:"backoff_factor" json:"backoff_factor"`
}
RetryPolicy configures retry behavior.
func DefaultRetryPolicy ¶
func DefaultRetryPolicy() *RetryPolicy
DefaultRetryPolicy returns a retry policy with sensible defaults.
func (*RetryPolicy) NextDelay ¶
func (p *RetryPolicy) NextDelay(attempt int) time.Duration
NextDelay calculates the delay before the next retry attempt.
func (*RetryPolicy) SetDefaults ¶
func (p *RetryPolicy) SetDefaults()
SetDefaults applies default values to the retry policy.
func (*RetryPolicy) ShouldRetry ¶
func (p *RetryPolicy) ShouldRetry(retryCount int) bool
ShouldRetry returns true if another retry attempt should be made.
type SQLQueue ¶
type SQLQueue struct {
// contains filtered or unexported fields
}
SQLQueue implements Queue using SQL database.
It uses SELECT FOR UPDATE SKIP LOCKED (PostgreSQL) or optimistic locking (SQLite/MySQL) for atomic dequeue operations.
func NewSQLQueue ¶
NewSQLQueue creates a new SQL-backed queue.
func (*SQLQueue) RecoverStale ¶
RecoverStale resets items stuck in processing state.
type SQLQueueOption ¶
type SQLQueueOption func(*SQLQueue)
SQLQueueOption configures SQLQueue.
func WithPollInterval ¶
func WithPollInterval(d time.Duration) SQLQueueOption
WithPollInterval sets how often to poll for new items.
func WithRetryPolicy ¶
func WithRetryPolicy(p *RetryPolicy) SQLQueueOption
WithRetryPolicy sets the retry policy.
func WithWorkerID ¶
func WithWorkerID(id string) SQLQueueOption
WithWorkerID sets the worker ID for this queue instance.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool manages concurrent task processing from the queue.
Workers dequeue items, invoke the appropriate executor, and handle success/failure acknowledgements including retry logic.
func NewWorkerPool ¶
func NewWorkerPool(queue Queue, executorProvider ExecutorProvider, taskService task.Service, metrics *observability.Metrics, opts WorkerPoolOptions) *WorkerPool
NewWorkerPool creates a new worker pool.
func (*WorkerPool) Start ¶
func (p *WorkerPool) Start(ctx context.Context) error
Start begins processing tasks from the queue. This method is non-blocking - workers run in goroutines.
func (*WorkerPool) Stats ¶
func (p *WorkerPool) Stats() WorkerPoolStats
Stats returns current worker pool statistics.
func (*WorkerPool) Stop ¶
func (p *WorkerPool) Stop() error
Stop gracefully shuts down the worker pool. It signals workers to stop and waits for in-flight tasks to complete.
type WorkerPoolOptions ¶
type WorkerPoolOptions struct {
// NumWorkers is the number of concurrent workers.
// Default: 4
NumWorkers int
// ShutdownTimeout is how long to wait for in-flight tasks during shutdown.
// Default: 30 seconds
ShutdownTimeout time.Duration
// StaleRecoveryInterval is how often to check for stale "processing" items.
// Default: 1 minute
StaleRecoveryInterval time.Duration
// StaleThreshold is how old a "processing" item must be to be considered stale.
// Default: 5 minutes
StaleThreshold time.Duration
}
WorkerPoolOptions configures the worker pool.
func (*WorkerPoolOptions) SetDefaults ¶
func (o *WorkerPoolOptions) SetDefaults()
SetDefaults applies default values.