Documentation
¶
Overview ¶
Package workqueue provides a work queue implementation for task execution.
Index ¶
- Constants
- Variables
- func AttachHeartbeater(ctx context.Context, hb Heartbeater) context.Context
- func NewPushQueue(writer Writer) (Queue, HandlerFn)
- type ExponentialReadBackoff
- type HandlerConfig
- type HandlerFn
- type Heartbeater
- type Message
- type Payload
- type PayloadType
- type PullQueueConfig
- type Queue
- type ReadRetryPolicy
- type ReadWriter
- type Writer
Constants ¶
const ( // PayloadTypeExecute defines the payload type for execution. PayloadTypeExecute = "execute" // PayloadTypeCancel defines the payload type for cancelation. PayloadTypeCancel = "cancel" )
Variables ¶
var ErrConcurrencyLimitExceeded = errors.New("concurrency limit exceeded")
ErrConcurrencyLimitExceeded is returned new work acceptance violates the configured concurrency limit.
var ErrMalformedPayload = errors.New("malformed payload")
ErrMalformedPayload is a non-retryable error which can be returned by HandlerFn.
var ErrQueueClosed = errors.New("queue closed")
ErrQueueClosed can be returned by Read implementation to stop the polling queue backend.
Functions ¶
func AttachHeartbeater ¶
func AttachHeartbeater(ctx context.Context, hb Heartbeater) context.Context
AttachHeartbeater creates a new context with the provided [Hearbeater] attached.
Types ¶
type ExponentialReadBackoff ¶
type ExponentialReadBackoff struct {
// BaseDelay is used for calculating retry interval as base * 2 ^ attempt.
BaseDelay time.Duration
// MaxDelay sets a cap for the value returned from NextDelay.
MaxDelay time.Duration
}
ExponentialReadBackoff is a ReadRetryPolicy implementation which uses exponential backoff with full jitter.
func (*ExponentialReadBackoff) NextDelay ¶
func (e *ExponentialReadBackoff) NextDelay(attempt int) time.Duration
NextDelay implements ReadRetryPolicy interface.
type HandlerConfig ¶
type HandlerConfig struct {
Limiter limiter.ConcurrencyConfig
}
HandlerConfig configures the work queue handler.
type Heartbeater ¶
type Heartbeater interface {
// HeartbeatInterval returns the interval between heartbeats.
HeartbeatInterval() time.Duration
// Heartbeat is called at heartbeat interval to mark the message as still being handled.
Heartbeat(ctx context.Context) error
}
Heartbeater defines an optional heartbeat policy for message handler. Heartbeats are sent while worker is handling a message. For NewPullQueue the interface must be implemented by Message structs returned from Read. For NewPushQueue heartbeater needs to be attached to context passed to HandlerFn using AttachHeartbeater.
func HeartbeaterFrom ¶
func HeartbeaterFrom(ctx context.Context) (Heartbeater, bool)
HeartbeaterFrom returns a Heartbeater attached using AttachHeartbeater.
type Message ¶
type Message interface {
// Payload returns the payload of the message which becomes the execution or cancelation input.
Payload() *Payload
// Complete marks the message as completed after it was handled by a worker.
Complete(ctx context.Context) error
// Return returns the message to the queue after worker failed to handle it.
Return(ctx context.Context, cause error) error
}
Message defines the message for execution or cancelation.
type Payload ¶
type Payload struct {
// Type defines the type of payload.
Type PayloadType
// TaskID is an ID of the task to execute or cancel.
TaskID a2a.TaskID
// CancelRequest defines the cancelation parameters. It is only set for [PayloadTypeCancel].
CancelRequest *a2a.CancelTaskRequest
// ExecuteRequest defines the execution parameters. It is only set for [PayloadTypeExecute].
ExecuteRequest *a2a.SendMessageRequest
}
Payload defines the payload for execution or cancelation.
type PullQueueConfig ¶
type PullQueueConfig struct {
// ReadRetry configures the behavior of polling loop in case of workqueue Read errors.
ReadRetry ReadRetryPolicy
}
PullQueueConfig provides a way to customize pull-queue behavior.
type Queue ¶
type Queue interface {
Writer
// RegisterHandler registers an executor. This method is called by the SDK.
RegisterHandler(HandlerConfig, HandlerFn)
}
Queue is an interface for the work distribution component. Executor backend registers itself using RegisterHandler when RequestHandler is created. HandlerFn can be used by work queue implementations to start execution when works is received.
func NewPullQueue ¶
func NewPullQueue(rw ReadWriter, cfg *PullQueueConfig) Queue
NewPullQueue creates a Queue implementation which starts runs a work polling loop until ErrQueueClosed is returned from Read.
type ReadRetryPolicy ¶
type ReadRetryPolicy interface {
// NextDelay returns the sleep duration for a failed Read attempt.
NextDelay(attempt int) time.Duration
}
ReadRetryPolicy is used to configure pull-queue loop behavior in case of errors returned from Read.
type ReadWriter ¶
type ReadWriter interface {
Writer
// Read dequeues a new message from the queue.
Read(context.Context) (Message, error)
}
ReadWriter is the neccessary pull-queue dependency. Write is used by executor frontend to submit work when a message is received from a client. Read is called periodically from background goroutine to request work. Read blocks if no work is available. ErrQueueClosed will stop the polling loop.