native

package
v1.21.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package native provides the native execution provider implementation.

The native provider implements execution.Provider using a local worker pool and SQL-backed queue for durable task execution. Features include:

  • Persistent task queue backed by SQL database
  • Configurable retry policies with exponential backoff
  • Dead-letter queue for failed tasks
  • Worker pool for controlled concurrency

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CollectingEventQueue

type CollectingEventQueue struct {
	// contains filtered or unexported fields
}

CollectingEventQueue collects A2A events for non-streaming execution. It implements a2asrv/eventqueue.Queue interface.

func NewCollectingEventQueue

func NewCollectingEventQueue() *CollectingEventQueue

NewCollectingEventQueue creates a new collector.

func (*CollectingEventQueue) Close

func (q *CollectingEventQueue) Close() error

Close marks the queue as closed.

func (*CollectingEventQueue) Events

func (q *CollectingEventQueue) Events() []a2a.Event

Events returns all collected events.

func (*CollectingEventQueue) Read

Read returns the next event (for interface compatibility). This is typically not used - use Events() instead.

func (*CollectingEventQueue) Write

func (q *CollectingEventQueue) Write(ctx context.Context, event a2a.Event) error

Write adds an event to the collection.

type ExecutorProvider

type ExecutorProvider interface {
	// GetExecutor returns an executor for the given app and agent.
	GetExecutor(ctx context.Context, appID, agentName string) (a2asrv.AgentExecutor, error)
}

ExecutorProvider provides executors for app/agent combinations. Implemented by server.AppManager.

type MetricsExporter

type MetricsExporter struct {
	// contains filtered or unexported fields
}

MetricsExporter periodicall collects queue stats and exports them to metrics.

func NewMetricsExporter

func NewMetricsExporter(queue Queue, metrics *observability.Metrics, appName string) *MetricsExporter

NewMetricsExporter creates a new metrics exporter.

func (*MetricsExporter) Start

func (e *MetricsExporter) Start(ctx context.Context, interval time.Duration)

Start begins periodic metrics collection. blocking.

type NativeProvider

type NativeProvider struct {
	// contains filtered or unexported fields
}

NativeProvider implements execution.Provider using a local worker pool and SQL queue.

func NewProvider

func NewProvider(
	pool *config.DBPool,
	dsn string,
	retryPolicy *RetryPolicy,
	executorProvider ExecutorProvider,
	taskService task.Service,
	metrics *observability.Metrics,
	opts WorkerPoolOptions,
) (*NativeProvider, error)

NewProvider creates a new NativeProvider.

func (*NativeProvider) Queue

func (p *NativeProvider) Queue() Queue

Queue returns the underlying queue (for metrics/admin).

func (*NativeProvider) Start

func (p *NativeProvider) Start(ctx context.Context) error

Start starts the worker pool.

func (*NativeProvider) Stop

func (p *NativeProvider) Stop() error

Stop stops the worker pool.

func (*NativeProvider) Submit

func (p *NativeProvider) Submit(ctx context.Context, req *execution.Request) error

Submit enqueues a task for execution.

type Queue

type Queue interface {
	// Enqueue adds a task to the queue for processing.
	// Returns immediately - task will be processed by a worker.
	Enqueue(ctx context.Context, item *QueueItem) error

	// Dequeue retrieves the next task for processing.
	// The item is atomically marked as "processing".
	// Blocks until a task is available or context is cancelled.
	Dequeue(ctx context.Context) (*QueueItem, error)

	// Ack acknowledges successful task completion.
	// The task is marked as completed.
	Ack(ctx context.Context, itemID string) error

	// Nack negatively acknowledges - task failed, may retry.
	// If retry_count >= max_retries, task moves to dead-letter queue.
	Nack(ctx context.Context, itemID string, err error) error

	// Requeue moves a dead-letter item back to pending queue.
	Requeue(ctx context.Context, itemID string) error

	// RecoverStale finds items stuck in "processing" state and resets them.
	// Called on startup to handle items from crashed workers.
	RecoverStale(ctx context.Context, staleThreshold time.Duration) error

	// Stats returns queue statistics for an app.
	Stats(ctx context.Context, appName string) (*QueueStats, error)

	// ListDLQ returns items in dead-letter queue.
	ListDLQ(ctx context.Context, appName string, limit int) ([]*QueueItem, error)

	// Close gracefully shuts down the queue.
	Close() error
}

Queue abstracts task queueing for resilience and durability.

The queue provides at-least-once delivery semantics:

  • Tasks are persisted before acknowledgement
  • Failed tasks are retried according to RetryPolicy
  • Tasks exceeding max retries are moved to dead-letter queue

type QueueItem

type QueueItem struct {
	// ID is the unique identifier for this queue item.
	ID string `json:"id"`

	// TaskID links to the a2a_tasks table for result storage.
	TaskID string `json:"task_id"`

	// AppName identifies the app (multi-tenant isolation).
	AppName string `json:"app_name"`

	// AgentName is the agent to invoke.
	AgentName string `json:"agent_name"`

	// Input is the user input/prompt for the agent.
	Input string `json:"input"`

	// ContextID is the session ID for conversation continuity.
	ContextID string `json:"context_id"`

	// UserID is the user identifier.
	UserID string `json:"user_id"`

	// Metadata contains additional task data.
	Metadata map[string]any `json:"metadata,omitempty"`

	// Priority affects dequeue order (higher = sooner).
	Priority int `json:"priority"`

	// MaxRetries is the maximum number of retry attempts.
	MaxRetries int `json:"max_retries"`

	// RetryCount is the current number of retry attempts.
	RetryCount int `json:"retry_count"`

	// Status is the current state of the queue item.
	Status QueueItemStatus `json:"status"`

	// EnqueuedAt is when the item was added to the queue.
	EnqueuedAt time.Time `json:"enqueued_at"`

	// ScheduledFor is when the item should be processed (for delayed execution).
	ScheduledFor time.Time `json:"scheduled_for"`

	// DeadlineAt is the deadline for task completion.
	DeadlineAt time.Time `json:"deadline_at"`

	// StartedAt is when processing started.
	StartedAt *time.Time `json:"started_at,omitempty"`

	// CompletedAt is when processing completed.
	CompletedAt *time.Time `json:"completed_at,omitempty"`

	// LastError is the most recent error message.
	LastError string `json:"last_error,omitempty"`

	// WorkerID is the ID of the worker processing this item.
	WorkerID string `json:"worker_id,omitempty"`
}

QueueItem represents a queued task.

func NewQueueItem

func NewQueueItem(appName, agentName, input string) *QueueItem

NewQueueItem creates a new queue item with sensible defaults.

func (*QueueItem) DeserializeMetadata

func (i *QueueItem) DeserializeMetadata(data string) error

DeserializeMetadata parses metadata from JSON.

func (*QueueItem) ExtractContext

func (i *QueueItem) ExtractContext(ctx context.Context) context.Context

ExtractContext extracts trace information from the item's metadata and returns a new context. This restores the trace context from the queue item.

func (*QueueItem) InjectContext

func (i *QueueItem) InjectContext(ctx context.Context)

InjectContext injects the current context's trace information into the item's metadata. This allows the trace to be propagated across the queue boundary.

func (*QueueItem) IsExpired

func (i *QueueItem) IsExpired() bool

IsExpired returns true if the item has passed its deadline.

func (*QueueItem) IsScheduled

func (i *QueueItem) IsScheduled() bool

IsScheduled returns true if the item is scheduled for future processing.

func (*QueueItem) SerializeMetadata

func (i *QueueItem) SerializeMetadata() (string, error)

SerializeMetadata converts metadata to JSON.

func (*QueueItem) WithContextID

func (i *QueueItem) WithContextID(contextID string) *QueueItem

WithContextID sets the session ID for conversation continuity.

func (*QueueItem) WithDeadline

func (i *QueueItem) WithDeadline(deadline time.Time) *QueueItem

WithDeadline sets when the task must complete by.

func (*QueueItem) WithMaxRetries

func (i *QueueItem) WithMaxRetries(maxRetries int) *QueueItem

WithMaxRetries sets the maximum retry attempts.

func (*QueueItem) WithMetadata

func (i *QueueItem) WithMetadata(key string, value any) *QueueItem

WithMetadata sets additional metadata.

func (*QueueItem) WithPriority

func (i *QueueItem) WithPriority(priority int) *QueueItem

WithPriority sets the priority (higher = processed sooner).

func (*QueueItem) WithScheduledFor

func (i *QueueItem) WithScheduledFor(scheduledFor time.Time) *QueueItem

WithScheduledFor sets when the task should be processed.

func (*QueueItem) WithUserID

func (i *QueueItem) WithUserID(userID string) *QueueItem

WithUserID sets the user identifier.

type QueueItemStatus

type QueueItemStatus string

QueueItemStatus represents the state of a queue item.

const (
	// QueueItemPending - waiting to be processed.
	QueueItemPending QueueItemStatus = "pending"

	// QueueItemProcessing - currently being processed by a worker.
	QueueItemProcessing QueueItemStatus = "processing"

	// QueueItemCompleted - successfully completed.
	QueueItemCompleted QueueItemStatus = "completed"

	// QueueItemFailed - failed but may be retried.
	QueueItemFailed QueueItemStatus = "failed"

	// QueueItemDead - exceeded max retries, in dead-letter queue.
	QueueItemDead QueueItemStatus = "dead"
)

type QueueStats

type QueueStats struct {
	// Pending is the count of items waiting to be processed.
	Pending int64 `json:"pending"`

	// Processing is the count of items currently being processed.
	Processing int64 `json:"processing"`

	// Completed is the count of successfully completed items.
	Completed int64 `json:"completed"`

	// Failed is the count of failed items (may be retried).
	Failed int64 `json:"failed"`

	// DeadLetter is the count of items in dead-letter queue.
	DeadLetter int64 `json:"dead_letter"`

	// OldestPending is the age of the oldest pending item.
	OldestPending time.Duration `json:"oldest_pending"`

	// AvgProcessingTime is the average processing time for completed items.
	AvgProcessingTime time.Duration `json:"avg_processing_time"`
}

QueueStats contains queue statistics.

type RetryPolicy

type RetryPolicy struct {
	// MaxRetries is the maximum number of retry attempts.
	MaxRetries int `yaml:"max_retries" json:"max_retries"`

	// InitialDelay is the delay before the first retry.
	InitialDelay time.Duration `yaml:"initial_delay" json:"initial_delay"`

	// MaxDelay is the maximum delay between retries.
	MaxDelay time.Duration `yaml:"max_delay" json:"max_delay"`

	// BackoffFactor is the multiplier for exponential backoff.
	BackoffFactor float64 `yaml:"backoff_factor" json:"backoff_factor"`
}

RetryPolicy configures retry behavior.

func DefaultRetryPolicy

func DefaultRetryPolicy() *RetryPolicy

DefaultRetryPolicy returns a retry policy with sensible defaults.

func (*RetryPolicy) NextDelay

func (p *RetryPolicy) NextDelay(attempt int) time.Duration

NextDelay calculates the delay before the next retry attempt.

func (*RetryPolicy) SetDefaults

func (p *RetryPolicy) SetDefaults()

SetDefaults applies default values to the retry policy.

func (*RetryPolicy) ShouldRetry

func (p *RetryPolicy) ShouldRetry(retryCount int) bool

ShouldRetry returns true if another retry attempt should be made.

type SQLQueue

type SQLQueue struct {
	// contains filtered or unexported fields
}

SQLQueue implements Queue using SQL database.

It uses SELECT FOR UPDATE SKIP LOCKED (PostgreSQL) or optimistic locking (SQLite/MySQL) for atomic dequeue operations.

func NewSQLQueue

func NewSQLQueue(db *sql.DB, dialect string, opts ...SQLQueueOption) (*SQLQueue, error)

NewSQLQueue creates a new SQL-backed queue.

func (*SQLQueue) Ack

func (q *SQLQueue) Ack(ctx context.Context, itemID string) error

Ack acknowledges successful task completion.

func (*SQLQueue) Close

func (q *SQLQueue) Close() error

Close gracefully shuts down the queue.

func (*SQLQueue) Dequeue

func (q *SQLQueue) Dequeue(ctx context.Context) (*QueueItem, error)

Dequeue retrieves the next task for processing.

func (*SQLQueue) Enqueue

func (q *SQLQueue) Enqueue(ctx context.Context, item *QueueItem) error

Enqueue adds a task to the queue.

func (*SQLQueue) ListDLQ

func (q *SQLQueue) ListDLQ(ctx context.Context, appName string, limit int) ([]*QueueItem, error)

ListDLQ returns items in the dead-letter queue.

func (*SQLQueue) Nack

func (q *SQLQueue) Nack(ctx context.Context, itemID string, errMsg error) error

Nack negatively acknowledges a task failure.

func (*SQLQueue) RecoverStale

func (q *SQLQueue) RecoverStale(ctx context.Context, staleThreshold time.Duration) error

RecoverStale resets items stuck in processing state.

func (*SQLQueue) Requeue

func (q *SQLQueue) Requeue(ctx context.Context, itemID string) error

Requeue moves a dead-letter item back to pending.

func (*SQLQueue) Stats

func (q *SQLQueue) Stats(ctx context.Context, appName string) (*QueueStats, error)

Stats returns queue statistics.

type SQLQueueOption

type SQLQueueOption func(*SQLQueue)

SQLQueueOption configures SQLQueue.

func WithPollInterval

func WithPollInterval(d time.Duration) SQLQueueOption

WithPollInterval sets how often to poll for new items.

func WithRetryPolicy

func WithRetryPolicy(p *RetryPolicy) SQLQueueOption

WithRetryPolicy sets the retry policy.

func WithWorkerID

func WithWorkerID(id string) SQLQueueOption

WithWorkerID sets the worker ID for this queue instance.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages concurrent task processing from the queue.

Workers dequeue items, invoke the appropriate executor, and handle success/failure acknowledgements including retry logic.

func NewWorkerPool

func NewWorkerPool(queue Queue, executorProvider ExecutorProvider, taskService task.Service, metrics *observability.Metrics, opts WorkerPoolOptions) *WorkerPool

NewWorkerPool creates a new worker pool.

func (*WorkerPool) Start

func (p *WorkerPool) Start(ctx context.Context) error

Start begins processing tasks from the queue. This method is non-blocking - workers run in goroutines.

func (*WorkerPool) Stats

func (p *WorkerPool) Stats() WorkerPoolStats

Stats returns current worker pool statistics.

func (*WorkerPool) Stop

func (p *WorkerPool) Stop() error

Stop gracefully shuts down the worker pool. It signals workers to stop and waits for in-flight tasks to complete.

type WorkerPoolOptions

type WorkerPoolOptions struct {
	// NumWorkers is the number of concurrent workers.
	// Default: 4
	NumWorkers int

	// ShutdownTimeout is how long to wait for in-flight tasks during shutdown.
	// Default: 30 seconds
	ShutdownTimeout time.Duration

	// StaleRecoveryInterval is how often to check for stale "processing" items.
	// Default: 1 minute
	StaleRecoveryInterval time.Duration

	// StaleThreshold is how old a "processing" item must be to be considered stale.
	// Default: 5 minutes
	StaleThreshold time.Duration
}

WorkerPoolOptions configures the worker pool.

func (*WorkerPoolOptions) SetDefaults

func (o *WorkerPoolOptions) SetDefaults()

SetDefaults applies default values.

type WorkerPoolStats

type WorkerPoolStats struct {
	WorkerID   string `json:"worker_id"`
	NumWorkers int    `json:"num_workers"`
	Started    bool   `json:"started"`
	Stopped    bool   `json:"stopped"`
}

WorkerPoolStats contains worker pool statistics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL