Documentation
¶
Overview ¶
Package dispatcher provides a workqueue dispatcher that dequeues keys and invokes a callback for each one.
The dispatcher handles orphaned in-progress keys, concurrency limits, and batch sizing. Use Handle for synchronous dispatch or HandleAsync for non-blocking dispatch with a Future to await results.
Error Handling ¶
When a callback returns an error, the dispatcher requeues, dead-letters, or drops the key depending on the error type and retry budget. To emit these errors as CloudEvents, pass WithErrorBrokerURL:
handler := dispatcher.Handler(wq, 10, 5, callback, 3,
dispatcher.WithErrorBrokerURL(ctx, brokerURL, "my-wq"),
)
When the broker URL is empty the option is a no-op, making the feature entirely opt-in.
Index ¶
- Constants
- func Handle(ctx context.Context, wq workqueue.Interface, concurrency, batchSize int, ...) error
- func Handler(wq workqueue.Interface, concurrency, batchSize int, f Callback, maxRetry int, ...) http.Handler
- type Callback
- type DispatcherErrorEvent
- type ErrorAction
- type ErrorContext
- type Future
- type Option
Examples ¶
Constants ¶
const (
// ErrorEventType is the CloudEvent type for dispatcher error events.
ErrorEventType = "dev.chainguard.workqueue.error.v1"
)
Variables ¶
This section is empty.
Functions ¶
func Handle ¶
func Handle(ctx context.Context, wq workqueue.Interface, concurrency, batchSize int, f Callback, opts ...Option) error
Handle is a synchronous form of HandleAsync.
Example ¶
ExampleHandle demonstrates dispatching a single round of work from a workqueue using Handle.
package main
import (
"context"
"fmt"
"chainguard.dev/driftlessaf/workqueue"
"chainguard.dev/driftlessaf/workqueue/dispatcher"
"chainguard.dev/driftlessaf/workqueue/inmem"
)
func main() {
wq := inmem.NewWorkQueue(5)
ctx := context.Background()
if err := wq.Queue(ctx, "example-key", workqueue.Options{}); err != nil {
panic(err)
}
processed := false
err := dispatcher.Handle(ctx, wq, 5, 0, func(_ context.Context, key string, _ workqueue.Options) error {
fmt.Printf("Processing key: %s\n", key)
processed = true
return nil
})
if err != nil {
panic(err)
}
fmt.Printf("Processed: %v\n", processed)
}
Output: Processing key: example-key Processed: true
Types ¶
type Callback ¶
Callback is the function that Handle calls to process a particular key.
func ServiceCallback ¶
func ServiceCallback(client workqueue.WorkqueueServiceClient) Callback
ServiceCallback returns a Callback that invokes the given service.
Example ¶
ExampleServiceCallback demonstrates creating a Callback that delegates to a WorkqueueServiceClient.
package main
import (
"fmt"
"chainguard.dev/driftlessaf/workqueue"
"chainguard.dev/driftlessaf/workqueue/dispatcher"
)
func main() {
// ServiceCallback wraps a gRPC WorkqueueServiceClient as a dispatcher Callback.
// In production, pass a real client obtained from a gRPC connection.
var client workqueue.WorkqueueServiceClient
cb := dispatcher.ServiceCallback(client)
_ = cb
fmt.Println("ServiceCallback created")
}
Output: ServiceCallback created
type DispatcherErrorEvent ¶ added in v0.4.0
type DispatcherErrorEvent struct {
// Key is the workqueue key that failed.
Key string `json:"key"`
// Error is the error message.
Error string `json:"error"`
// Attempts is the number of times the key has been attempted.
Attempts int `json:"attempts"`
// Action describes the disposition (requeued, dead-lettered, dropped).
Action string `json:"action"`
// NonRetriableReason is set when Action is "dropped", providing the
// reason the error was marked non-retriable.
NonRetriableReason string `json:"nonRetriableReason,omitempty"`
}
DispatcherErrorEvent is the CloudEvent payload for dispatcher errors.
type ErrorAction ¶ added in v0.4.0
type ErrorAction int
ErrorAction describes the disposition of a key after a dispatch error.
const ( // ErrorRequeued indicates the key was returned to the queue for retry. ErrorRequeued ErrorAction = iota // ErrorDeadLettered indicates the key exhausted its retry budget and was // moved to the dead-letter queue. ErrorDeadLettered // ErrorDropped indicates the error was marked non-retriable and the key // was completed without further processing. ErrorDropped )
func (ErrorAction) String ¶ added in v0.4.0
func (a ErrorAction) String() string
type ErrorContext ¶ added in v0.4.0
type ErrorContext struct {
// Key is the workqueue key that failed.
Key string
// Err is the error returned by the callback.
Err error
// Attempts is the number of times the key has been attempted (including
// the current attempt).
Attempts int
// Action is what the dispatcher did with the key after the error.
Action ErrorAction
// NonRetriableReason is set when Action is ErrorDropped, providing
// the reason the error was marked non-retriable.
NonRetriableReason string
}
ErrorContext carries information about a dispatch error.
type Future ¶
type Future func() error
Future is a function that can be used to block on the result of a round of dispatching work.
func HandleAsync ¶
func HandleAsync(ctx context.Context, wq workqueue.Interface, concurrency, batchSize int, f Callback, maxRetry int, opts ...Option) Future
HandleAsync initiates a single iteration of the dispatcher, possibly invoking the callback for several different keys. It returns a future that can be used to block on the result.
type Option ¶ added in v0.4.0
type Option func(*config)
Option configures optional dispatcher behaviour.
func WithErrorBrokerURL ¶ added in v0.4.0
WithErrorBrokerURL configures the dispatcher to emit errors as CloudEvents to the given broker URL. workqueueName is used as the CloudEvent source. When brokerURL is empty, error events are disabled (no-op).
The CloudEvents client is constructed eagerly so it is shared across all dispatch rounds. If construction fails the option degrades gracefully to a no-op with a warning log.
Example ¶
ExampleWithErrorBrokerURL demonstrates enabling error events. When the broker URL is empty, error events are silently disabled.
package main
import (
"context"
"fmt"
"chainguard.dev/driftlessaf/workqueue"
"chainguard.dev/driftlessaf/workqueue/dispatcher"
"chainguard.dev/driftlessaf/workqueue/inmem"
)
func main() {
wq := inmem.NewWorkQueue(5)
ctx := context.Background()
if err := wq.Queue(ctx, "example-key", workqueue.Options{}); err != nil {
panic(err)
}
// Empty URL disables error events (no-op).
err := dispatcher.Handle(ctx, wq, 5, 0, func(_ context.Context, _ string, _ workqueue.Options) error {
return fmt.Errorf("something went wrong")
}, dispatcher.WithErrorBrokerURL(ctx, "", "my-workqueue"))
if err != nil {
panic(err)
}
fmt.Println("dispatched with error events disabled")
}
Output: dispatched with error events disabled