Documentation
¶
Index ¶
- Constants
- Variables
- func NewPushQueue(writer Writer) (Queue, HandlerFn)
- func WithHeartbeater(ctx context.Context, hb Heartbeater) context.Context
- type ExponentialReadBackoff
- type HandlerFn
- type Heartbeater
- type Message
- type Payload
- type PayloadType
- type PullQueueConfig
- type Queue
- type ReadRetryPolicy
- type ReadWriter
- type Writer
Constants ¶
const ( // PayloadTypeExecute defines the payload type for execution. PayloadTypeExecute = "execute" // PayloadTypeCancel defines the payload type for cancelation. PayloadTypeCancel = "cancel" )
Variables ¶
var ErrConcurrencyLimitExceeded = errors.New("concurrency limit exceeded")
ErrConcurrencyLimitExceeded is returned new work acceptance violates the configured concurrency limit.
var ErrMalformedPayload = errors.New("malformed payload")
ErrMalformedPayload is a non-retryable error which can be returned by HandlerFn.
var ErrQueueClosed = errors.New("queue closed")
ErrQueueClosed can be returned by Read implementation to stop the polling queue backend.
Functions ¶
func NewPushQueue ¶
NewPushQueue creates a Queue implementation through which SDK submits work to the queue backend. The returned handler function is expected to be invoked in a separate goroutine by the caller when work is assigned to the node.
func WithHeartbeater ¶
func WithHeartbeater(ctx context.Context, hb Heartbeater) context.Context
WithHeartbeater creates a new context with the provided [Hearbeater] attached.
Types ¶
type ExponentialReadBackoff ¶
type ExponentialReadBackoff struct {
// BaseDelay is used for calculating retry interval as base * 2 ^ attempt.
BaseDelay time.Duration
// MaxDelay sets a cap for the value returned from NextDelay.
MaxDelay time.Duration
}
ExponentialReadBackoff is a ReadRetryPolicy implementation which uses exponential backoff with full jitter.
type Heartbeater ¶
type Heartbeater interface {
// HeartbeatInterval returns the interval between heartbeats.
HeartbeatInterval() time.Duration
// Heartbeat is called at heartbeat interval to mark the message as still being handled.
Heartbeat(ctx context.Context) error
}
Heartbeater defines an optional heartbeat policy for message handler. Heartbeats are sent while worker is handling a message. For NewPullQueue the interface must be implemented by Message structs returned from Read. For NewPushQueue heartbeater needs to be attached to context passed to HandlerFn using WithHeartbeater.
func HeartbeaterFrom ¶
func HeartbeaterFrom(ctx context.Context) (Heartbeater, bool)
HeartbeaterFrom returns a Heartbeater attached using WithHeartbeater.
type Message ¶
type Message interface {
// Payload returns the payload of the message which becomes the execution or cancelation input.
Payload() *Payload
// Complete marks the message as completed after it was handled by a worker.
Complete(ctx context.Context) error
// Return returns the message to the queue after worker failed to handle it.
Return(ctx context.Context, cause error) error
}
Message defines the message for execution or cancelation.
type Payload ¶
type Payload struct {
// Type defines the type of payload.
Type PayloadType
// TaskID is an ID of the task to execute or cancel.
TaskID a2a.TaskID
// CancelParams defines the cancelation parameters. It is only set for [PayloadTypeCancel].
CancelParams *a2a.TaskIDParams
// ExecuteParams defines the execution parameters. It is only set for [PayloadTypeExecute].
ExecuteParams *a2a.MessageSendParams
}
Payload defines the payload for execution or cancelation.
type PullQueueConfig ¶
type PullQueueConfig struct {
// ReadRetry configures the behavior of polling loop in case of workqueue Read errors.
ReadRetry ReadRetryPolicy
}
PullQueueConfig provides a way to customize pull-queue behavior.
type Queue ¶
type Queue interface {
Writer
// RegisterHandler registers an executor. This method is called by the SDK.
RegisterHandler(limiter.ConcurrencyConfig, HandlerFn)
}
Queue is an interface for the work distribution component. Executor backend registers itself using RegisterHandler when RequestHandler is created. HandlerFn can be used by work queue implementations to start execution when works is received.
func NewPullQueue ¶
func NewPullQueue(rw ReadWriter, cfg *PullQueueConfig) Queue
NewPullQueue creates a Queue implementation which starts runs a work polling loop until ErrQueueClosed is returned from Read.
type ReadRetryPolicy ¶
type ReadRetryPolicy interface {
// NextDelay returns the sleep duration for a failed Read attempt.
NextDelay(attempt int) time.Duration
}
ReadRetryPolicy is used to configure pull-queue loop behavior in case of errors returned from Read.
type ReadWriter ¶
type ReadWriter interface {
Writer
// Read dequeues a new message from the queue.
Read(context.Context) (Message, error)
}
ReadWriter is the neccessary pull-queue dependency. Write is used by executor frontend to submit work when a message is received from a client. Read is called periodically from background goroutine to request work. Read blocks if no work is available. ErrQueueClosed will stop the polling loop.