workqueue

package
v0.3.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2026 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// PayloadTypeExecute defines the payload type for execution.
	PayloadTypeExecute = "execute"
	// PayloadTypeCancel defines the payload type for cancelation.
	PayloadTypeCancel = "cancel"
)

Variables

View Source
var ErrConcurrencyLimitExceeded = errors.New("concurrency limit exceeded")

ErrConcurrencyLimitExceeded is returned new work acceptance violates the configured concurrency limit.

View Source
var ErrMalformedPayload = errors.New("malformed payload")

ErrMalformedPayload is a non-retryable error which can be returned by HandlerFn.

View Source
var ErrQueueClosed = errors.New("queue closed")

ErrQueueClosed can be returned by Read implementation to stop the polling queue backend.

Functions

func NewPushQueue

func NewPushQueue(writer Writer) (Queue, HandlerFn)

NewPushQueue creates a Queue implementation through which SDK submits work to the queue backend. The returned handler function is expected to be invoked in a separate goroutine by the caller when work is assigned to the node.

func WithHeartbeater

func WithHeartbeater(ctx context.Context, hb Heartbeater) context.Context

WithHeartbeater creates a new context with the provided [Hearbeater] attached.

Types

type ExponentialReadBackoff

type ExponentialReadBackoff struct {
	// BaseDelay is used for calculating retry interval as base * 2 ^ attempt.
	BaseDelay time.Duration
	// MaxDelay sets a cap for the value returned from NextDelay.
	MaxDelay time.Duration
}

ExponentialReadBackoff is a ReadRetryPolicy implementation which uses exponential backoff with full jitter.

func (*ExponentialReadBackoff) NextDelay

func (e *ExponentialReadBackoff) NextDelay(attempt int) time.Duration

type HandlerFn

type HandlerFn func(context.Context, *Payload) (a2a.SendMessageResult, error)

HandlerFn starts agent execution for the provided payload.

type Heartbeater

type Heartbeater interface {
	// HeartbeatInterval returns the interval between heartbeats.
	HeartbeatInterval() time.Duration
	// Heartbeat is called at heartbeat interval to mark the message as still being handled.
	Heartbeat(ctx context.Context) error
}

Heartbeater defines an optional heartbeat policy for message handler. Heartbeats are sent while worker is handling a message. For NewPullQueue the interface must be implemented by Message structs returned from Read. For NewPushQueue heartbeater needs to be attached to context passed to HandlerFn using WithHeartbeater.

func HeartbeaterFrom

func HeartbeaterFrom(ctx context.Context) (Heartbeater, bool)

HeartbeaterFrom returns a Heartbeater attached using WithHeartbeater.

type Message

type Message interface {
	// Payload returns the payload of the message which becomes the execution or cancelation input.
	Payload() *Payload
	// Complete marks the message as completed after it was handled by a worker.
	Complete(ctx context.Context) error
	// Return returns the message to the queue after worker failed to handle it.
	Return(ctx context.Context, cause error) error
}

Message defines the message for execution or cancelation.

type Payload

type Payload struct {
	// Type defines the type of payload.
	Type PayloadType
	// TaskID is an ID of the task to execute or cancel.
	TaskID a2a.TaskID
	// CancelParams defines the cancelation parameters. It is only set for [PayloadTypeCancel].
	CancelParams *a2a.TaskIDParams
	// ExecuteParams defines the execution parameters. It is only set for [PayloadTypeExecute].
	ExecuteParams *a2a.MessageSendParams
}

Payload defines the payload for execution or cancelation.

type PayloadType

type PayloadType string

PayloadType defines the type of payload.

type PullQueueConfig

type PullQueueConfig struct {
	// ReadRetry configures the behavior of polling loop in case of workqueue Read errors.
	ReadRetry ReadRetryPolicy
}

PullQueueConfig provides a way to customize pull-queue behavior.

type Queue

type Queue interface {
	Writer

	// RegisterHandler registers an executor. This method is called by the SDK.
	RegisterHandler(limiter.ConcurrencyConfig, HandlerFn)
}

Queue is an interface for the work distribution component. Executor backend registers itself using RegisterHandler when RequestHandler is created. HandlerFn can be used by work queue implementations to start execution when works is received.

func NewPullQueue

func NewPullQueue(rw ReadWriter, cfg *PullQueueConfig) Queue

NewPullQueue creates a Queue implementation which starts runs a work polling loop until ErrQueueClosed is returned from Read.

type ReadRetryPolicy

type ReadRetryPolicy interface {
	// NextDelay returns the sleep duration for a failed Read attempt.
	NextDelay(attempt int) time.Duration
}

ReadRetryPolicy is used to configure pull-queue loop behavior in case of errors returned from Read.

type ReadWriter

type ReadWriter interface {
	Writer
	// Read dequeues a new message from the queue.
	Read(context.Context) (Message, error)
}

ReadWriter is the neccessary pull-queue dependency. Write is used by executor frontend to submit work when a message is received from a client. Read is called periodically from background goroutine to request work. Read blocks if no work is available. ErrQueueClosed will stop the polling loop.

type Writer

type Writer interface {
	// Write puts a new payload into the queue. Paylod will contain a TaskID but a different value can be returned to handle idempotency.
	Write(context.Context, *Payload) (a2a.TaskID, error)
}

Writer is used by executor frontend to submit work for execution.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL