Documentation
¶
Overview ¶
Package workqueue provides a work queue implementation for task execution.
Index ¶
- Constants
- Variables
- func AttachHeartbeater(ctx context.Context, hb Heartbeater) context.Context
- func NewPushQueue(writer Writer) (Queue, HandlerFn)
- type ExponentialReadBackoff
- type HandlerConfig
- type HandlerFn
- type Heartbeater
- type InMemoryQueueConfig
- type Lease
- type LeaseManager
- type Message
- type Payload
- type PayloadType
- type PullQueueConfig
- type Queue
- type ReadRetryPolicy
- type ReadWriter
- type Writer
- type WriterFunc
Constants ¶
const ( // PayloadTypeExecute defines the payload type for execution. PayloadTypeExecute = "execute" // PayloadTypeCancel defines the payload type for cancelation. PayloadTypeCancel = "cancel" )
Variables ¶
var ErrConcurrencyLimitExceeded = errors.New("concurrency limit exceeded")
ErrConcurrencyLimitExceeded is returned new work acceptance violates the configured concurrency limit.
var ErrLeaseAlreadyTaken = errors.New("lease for this type of job is already taken")
ErrLeaseAlreadyTaken is returned by LeaseManager.Acquire when a conflicting lease is already held for the given task id.
var ErrMalformedPayload = errors.New("malformed payload")
ErrMalformedPayload is a non-retryable error which can be returned by HandlerFn.
var ErrQueueClosed = errors.New("queue closed")
ErrQueueClosed can be returned by Read implementation to stop the polling queue backend.
Functions ¶
func AttachHeartbeater ¶
func AttachHeartbeater(ctx context.Context, hb Heartbeater) context.Context
AttachHeartbeater creates a new context with the provided [Hearbeater] attached.
Types ¶
type ExponentialReadBackoff ¶
type ExponentialReadBackoff struct {
// BaseDelay is used for calculating retry interval as base * 2 ^ attempt.
BaseDelay time.Duration
// MaxDelay sets a cap for the value returned from NextDelay.
MaxDelay time.Duration
}
ExponentialReadBackoff is a ReadRetryPolicy implementation which uses exponential backoff with full jitter.
func (*ExponentialReadBackoff) NextDelay ¶
func (e *ExponentialReadBackoff) NextDelay(attempt int) time.Duration
NextDelay implements ReadRetryPolicy interface.
type HandlerConfig ¶
type HandlerConfig struct {
Limiter limiter.ConcurrencyConfig
}
HandlerConfig configures the work queue handler.
type Heartbeater ¶
type Heartbeater interface {
// HeartbeatInterval returns the interval between heartbeats.
HeartbeatInterval() time.Duration
// Heartbeat is called at heartbeat interval to mark the message as still being handled.
Heartbeat(ctx context.Context) error
}
Heartbeater defines an optional heartbeat policy for message handler. Heartbeats are sent while worker is handling a message. For NewPullQueue the interface must be implemented by Message structs returned from Read. For NewPushQueue heartbeater needs to be attached to context passed to HandlerFn using AttachHeartbeater.
func HeartbeaterFrom ¶
func HeartbeaterFrom(ctx context.Context) (Heartbeater, bool)
HeartbeaterFrom returns a Heartbeater attached using AttachHeartbeater.
type InMemoryQueueConfig ¶ added in v2.3.0
type InMemoryQueueConfig struct {
LeaseManager LeaseManager
}
InMemoryQueueConfig configures the in-memory Queue implementation.
type Lease ¶ added in v2.3.0
type Lease interface {
// TaskID returns the task identifier associated with the lease.
// Implementations may return a TaskID different from the one in the [Payload] to
// support idempotency (e.g. dedup by message ID and map to an existing TaskID).
TaskID() a2a.TaskID
// Release releases the lease.
Release(context.Context)
}
Lease represents an acquired exclusive right to process a work item. Lease extension is supported by implementing Heartbeater.
type LeaseManager ¶ added in v2.3.0
type LeaseManager interface {
// Acquire attempts to acquire a lease for the given payload.
Acquire(context.Context, *Payload) (Lease, error)
}
LeaseManager controls concurrent access to task execution and cancelation.
func NewInMemoryLeaseManager ¶ added in v2.3.0
func NewInMemoryLeaseManager() LeaseManager
NewInMemoryLeaseManager returns a LeaseManager that tracks leases in memory.
type Message ¶
type Message interface {
// Payload returns the payload of the message which becomes the execution or cancelation input.
Payload() *Payload
// Complete marks the message as completed after it was handled by a worker.
Complete(ctx context.Context) error
// Return returns the message to the queue after worker failed to handle it.
Return(ctx context.Context, cause error) error
}
Message defines the message for execution or cancelation.
type Payload ¶
type Payload struct {
// Type defines the type of payload.
Type PayloadType
// TaskID is an ID of the task to execute or cancel.
TaskID a2a.TaskID
// CancelRequest defines the cancelation parameters. It is only set for [PayloadTypeCancel].
CancelRequest *a2a.CancelTaskRequest
// ExecuteRequest defines the execution parameters. It is only set for [PayloadTypeExecute].
ExecuteRequest *a2a.SendMessageRequest
// CallContext carries serializable call context data (user identity, service params, tenant)
// for cross-process propagation in distributed mode.
CallContext map[string]any
}
Payload defines the payload for execution or cancelation.
type PullQueueConfig ¶
type PullQueueConfig struct {
// ReadRetry configures the behavior of polling loop in case of workqueue Read errors.
ReadRetry ReadRetryPolicy
// BeforeExecutionCallback is called before execution is started. It can be used to modify the context or the message.
// If an error is returned, execution will not be started and neither Complete nor Return will be called for the Message.
BeforeExecutionCallback func(context.Context, Message) (context.Context, error)
// AfterExecutionCallback is called after execution finishes. It can be used to decide how the message should be handled.
// If an error is returned, neither Complete nor Return will be called for the Message.
AfterExecutionCallback func(context.Context, Message, a2a.SendMessageResult, error) error
}
PullQueueConfig provides a way to customize pull-queue behavior.
type Queue ¶
type Queue interface {
Writer
// RegisterHandler registers an executor. This method is called by the SDK.
RegisterHandler(HandlerConfig, HandlerFn)
}
Queue is an interface for the work distribution component. Executor backend registers itself using RegisterHandler when RequestHandler is created. HandlerFn can be used by work queue implementations to start execution when works is received.
func NewInMemory ¶ added in v2.3.0
func NewInMemory(cfg *InMemoryQueueConfig) Queue
NewInMemory creates a Queue implementation which uses in-memory storage for tasks.
func NewPullQueue ¶
func NewPullQueue(rw ReadWriter, cfg *PullQueueConfig) Queue
NewPullQueue creates a Queue implementation which runs a work polling loop until ErrQueueClosed is returned from Read.
type ReadRetryPolicy ¶
type ReadRetryPolicy interface {
// NextDelay returns the sleep duration for a failed Read attempt.
NextDelay(attempt int) time.Duration
}
ReadRetryPolicy is used to configure pull-queue loop behavior in case of errors returned from Read.
type ReadWriter ¶
type ReadWriter interface {
Writer
// Read dequeues a new message from the queue.
Read(context.Context) (Message, error)
}
ReadWriter is the necessary pull-queue dependency. Write is used by executor frontend to submit work when a message is received from a client. Read is called periodically from background goroutine to request work. Read blocks if no work is available. ErrQueueClosed will stop the polling loop.
type Writer ¶
type Writer interface {
// Write puts a new payload into the queue. Paylod will contain a TaskID but a different value can be returned to handle idempotency.
Write(context.Context, *Payload) (a2a.TaskID, error)
}
Writer is used by executor frontend to submit work for execution.
type WriterFunc ¶ added in v2.3.0
WriterFunc is a function that implements the Writer interface.