workqueue

package
v2.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package workqueue provides a work queue implementation for task execution.

Index

Constants

View Source
const (
	// PayloadTypeExecute defines the payload type for execution.
	PayloadTypeExecute = "execute"
	// PayloadTypeCancel defines the payload type for cancelation.
	PayloadTypeCancel = "cancel"
)

Variables

View Source
var ErrConcurrencyLimitExceeded = errors.New("concurrency limit exceeded")

ErrConcurrencyLimitExceeded is returned new work acceptance violates the configured concurrency limit.

View Source
var ErrLeaseAlreadyTaken = errors.New("lease for this type of job is already taken")

ErrLeaseAlreadyTaken is returned by LeaseManager.Acquire when a conflicting lease is already held for the given task id.

View Source
var ErrMalformedPayload = errors.New("malformed payload")

ErrMalformedPayload is a non-retryable error which can be returned by HandlerFn.

View Source
var ErrQueueClosed = errors.New("queue closed")

ErrQueueClosed can be returned by Read implementation to stop the polling queue backend.

Functions

func AttachHeartbeater

func AttachHeartbeater(ctx context.Context, hb Heartbeater) context.Context

AttachHeartbeater creates a new context with the provided [Hearbeater] attached.

func NewPushQueue

func NewPushQueue(writer Writer) (Queue, HandlerFn)

NewPushQueue creates a Queue implementation through which SDK submits work to the queue backend. The returned HandlerFn is expected to be invoked in a separate goroutine by the caller when work is assigned to the node.

Types

type ExponentialReadBackoff

type ExponentialReadBackoff struct {
	// BaseDelay is used for calculating retry interval as base * 2 ^ attempt.
	BaseDelay time.Duration
	// MaxDelay sets a cap for the value returned from NextDelay.
	MaxDelay time.Duration
}

ExponentialReadBackoff is a ReadRetryPolicy implementation which uses exponential backoff with full jitter.

func (*ExponentialReadBackoff) NextDelay

func (e *ExponentialReadBackoff) NextDelay(attempt int) time.Duration

NextDelay implements ReadRetryPolicy interface.

type HandlerConfig

type HandlerConfig struct {
	Limiter limiter.ConcurrencyConfig
}

HandlerConfig configures the work queue handler.

type HandlerFn

type HandlerFn func(context.Context, *Payload) (a2a.SendMessageResult, error)

HandlerFn starts agent execution for the provided payload.

type Heartbeater

type Heartbeater interface {
	// HeartbeatInterval returns the interval between heartbeats.
	HeartbeatInterval() time.Duration
	// Heartbeat is called at heartbeat interval to mark the message as still being handled.
	Heartbeat(ctx context.Context) error
}

Heartbeater defines an optional heartbeat policy for message handler. Heartbeats are sent while worker is handling a message. For NewPullQueue the interface must be implemented by Message structs returned from Read. For NewPushQueue heartbeater needs to be attached to context passed to HandlerFn using AttachHeartbeater.

func HeartbeaterFrom

func HeartbeaterFrom(ctx context.Context) (Heartbeater, bool)

HeartbeaterFrom returns a Heartbeater attached using AttachHeartbeater.

type InMemoryQueueConfig added in v2.3.0

type InMemoryQueueConfig struct {
	LeaseManager LeaseManager
}

InMemoryQueueConfig configures the in-memory Queue implementation.

type Lease added in v2.3.0

type Lease interface {
	// TaskID returns the task identifier associated with the lease.
	// Implementations may return a TaskID different from the one in the [Payload] to
	// support idempotency (e.g. dedup by message ID and map to an existing TaskID).
	TaskID() a2a.TaskID
	// Release releases the lease.
	Release(context.Context)
}

Lease represents an acquired exclusive right to process a work item. Lease extension is supported by implementing Heartbeater.

type LeaseManager added in v2.3.0

type LeaseManager interface {
	// Acquire attempts to acquire a lease for the given payload.
	Acquire(context.Context, *Payload) (Lease, error)
}

LeaseManager controls concurrent access to task execution and cancelation.

func NewInMemoryLeaseManager added in v2.3.0

func NewInMemoryLeaseManager() LeaseManager

NewInMemoryLeaseManager returns a LeaseManager that tracks leases in memory.

type Message

type Message interface {
	// Payload returns the payload of the message which becomes the execution or cancelation input.
	Payload() *Payload
	// Complete marks the message as completed after it was handled by a worker.
	Complete(ctx context.Context) error
	// Return returns the message to the queue after worker failed to handle it.
	Return(ctx context.Context, cause error) error
}

Message defines the message for execution or cancelation.

type Payload

type Payload struct {
	// Type defines the type of payload.
	Type PayloadType
	// TaskID is an ID of the task to execute or cancel.
	TaskID a2a.TaskID
	// CancelRequest defines the cancelation parameters. It is only set for [PayloadTypeCancel].
	CancelRequest *a2a.CancelTaskRequest
	// ExecuteRequest defines the execution parameters. It is only set for [PayloadTypeExecute].
	ExecuteRequest *a2a.SendMessageRequest
	// CallContext carries serializable call context data (user identity, service params, tenant)
	// for cross-process propagation in distributed mode.
	CallContext map[string]any
}

Payload defines the payload for execution or cancelation.

type PayloadType

type PayloadType string

PayloadType defines the type of payload.

type PullQueueConfig

type PullQueueConfig struct {
	// ReadRetry configures the behavior of polling loop in case of workqueue Read errors.
	ReadRetry ReadRetryPolicy
	// BeforeExecutionCallback is called before execution is started. It can be used to modify the context or the message.
	// If an error is returned, execution will not be started and neither Complete nor Return will be called for the Message.
	BeforeExecutionCallback func(context.Context, Message) (context.Context, error)
	// AfterExecutionCallback is called after execution finishes. It can be used to decide how the message should be handled.
	// If an error is returned, neither Complete nor Return will be called for the Message.
	AfterExecutionCallback func(context.Context, Message, a2a.SendMessageResult, error) error
}

PullQueueConfig provides a way to customize pull-queue behavior.

type Queue

type Queue interface {
	Writer

	// RegisterHandler registers an executor. This method is called by the SDK.
	RegisterHandler(HandlerConfig, HandlerFn)
}

Queue is an interface for the work distribution component. Executor backend registers itself using RegisterHandler when RequestHandler is created. HandlerFn can be used by work queue implementations to start execution when works is received.

func NewInMemory added in v2.3.0

func NewInMemory(cfg *InMemoryQueueConfig) Queue

NewInMemory creates a Queue implementation which uses in-memory storage for tasks.

func NewPullQueue

func NewPullQueue(rw ReadWriter, cfg *PullQueueConfig) Queue

NewPullQueue creates a Queue implementation which runs a work polling loop until ErrQueueClosed is returned from Read.

type ReadRetryPolicy

type ReadRetryPolicy interface {
	// NextDelay returns the sleep duration for a failed Read attempt.
	NextDelay(attempt int) time.Duration
}

ReadRetryPolicy is used to configure pull-queue loop behavior in case of errors returned from Read.

type ReadWriter

type ReadWriter interface {
	Writer
	// Read dequeues a new message from the queue.
	Read(context.Context) (Message, error)
}

ReadWriter is the necessary pull-queue dependency. Write is used by executor frontend to submit work when a message is received from a client. Read is called periodically from background goroutine to request work. Read blocks if no work is available. ErrQueueClosed will stop the polling loop.

type Writer

type Writer interface {
	// Write puts a new payload into the queue. Paylod will contain a TaskID but a different value can be returned to handle idempotency.
	Write(context.Context, *Payload) (a2a.TaskID, error)
}

Writer is used by executor frontend to submit work for execution.

type WriterFunc added in v2.3.0

type WriterFunc func(context.Context, *Payload) (a2a.TaskID, error)

WriterFunc is a function that implements the Writer interface.

func (WriterFunc) Write added in v2.3.0

func (fn WriterFunc) Write(ctx context.Context, payload *Payload) (a2a.TaskID, error)

Write implements Writer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL