dispatcher

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package dispatcher provides a workqueue dispatcher that dequeues keys and invokes a callback for each one.

The dispatcher handles orphaned in-progress keys, concurrency limits, and batch sizing. Use Handle for synchronous dispatch or HandleAsync for non-blocking dispatch with a Future to await results.

Error Handling

When a callback returns an error, the dispatcher requeues, dead-letters, or drops the key depending on the error type and retry budget. To emit these errors as CloudEvents, pass WithErrorBrokerURL:

handler := dispatcher.Handler(wq, 10, 5, callback, 3,
    dispatcher.WithErrorBrokerURL(ctx, brokerURL, "my-wq"),
)

When the broker URL is empty the option is a no-op, making the feature entirely opt-in.

Index

Examples

Constants

View Source
const (

	// ErrorEventType is the CloudEvent type for dispatcher error events.
	ErrorEventType = "dev.chainguard.workqueue.error.v1"
)

Variables

This section is empty.

Functions

func Handle

func Handle(ctx context.Context, wq workqueue.Interface, concurrency, batchSize int, f Callback, opts ...Option) error

Handle is a synchronous form of HandleAsync.

Example

ExampleHandle demonstrates dispatching a single round of work from a workqueue using Handle.

package main

import (
	"context"
	"fmt"

	"chainguard.dev/driftlessaf/workqueue"
	"chainguard.dev/driftlessaf/workqueue/dispatcher"
	"chainguard.dev/driftlessaf/workqueue/inmem"
)

func main() {
	wq := inmem.NewWorkQueue(5)
	ctx := context.Background()

	if err := wq.Queue(ctx, "example-key", workqueue.Options{}); err != nil {
		panic(err)
	}

	processed := false
	err := dispatcher.Handle(ctx, wq, 5, 0, func(_ context.Context, key string, _ workqueue.Options) error {
		fmt.Printf("Processing key: %s\n", key)
		processed = true
		return nil
	})
	if err != nil {
		panic(err)
	}
	fmt.Printf("Processed: %v\n", processed)
}
Output:
Processing key: example-key
Processed: true

func Handler

func Handler(wq workqueue.Interface, concurrency, batchSize int, f Callback, maxRetry int, opts ...Option) http.Handler

Types

type Callback

type Callback func(ctx context.Context, key string, opts workqueue.Options) error

Callback is the function that Handle calls to process a particular key.

func ServiceCallback

func ServiceCallback(client workqueue.WorkqueueServiceClient) Callback

ServiceCallback returns a Callback that invokes the given service.

Example

ExampleServiceCallback demonstrates creating a Callback that delegates to a WorkqueueServiceClient.

package main

import (
	"fmt"

	"chainguard.dev/driftlessaf/workqueue"
	"chainguard.dev/driftlessaf/workqueue/dispatcher"
)

func main() {
	// ServiceCallback wraps a gRPC WorkqueueServiceClient as a dispatcher Callback.
	// In production, pass a real client obtained from a gRPC connection.
	var client workqueue.WorkqueueServiceClient
	cb := dispatcher.ServiceCallback(client)
	_ = cb
	fmt.Println("ServiceCallback created")
}
Output:
ServiceCallback created

type DispatcherErrorEvent added in v0.4.0

type DispatcherErrorEvent struct {
	// Key is the workqueue key that failed.
	Key string `json:"key"`

	// Error is the error message.
	Error string `json:"error"`

	// Attempts is the number of times the key has been attempted.
	Attempts int `json:"attempts"`

	// Action describes the disposition (requeued, dead-lettered, dropped).
	Action string `json:"action"`

	// NonRetriableReason is set when Action is "dropped", providing the
	// reason the error was marked non-retriable.
	NonRetriableReason string `json:"nonRetriableReason,omitempty"`
}

DispatcherErrorEvent is the CloudEvent payload for dispatcher errors.

type ErrorAction added in v0.4.0

type ErrorAction int

ErrorAction describes the disposition of a key after a dispatch error.

const (
	// ErrorRequeued indicates the key was returned to the queue for retry.
	ErrorRequeued ErrorAction = iota

	// ErrorDeadLettered indicates the key exhausted its retry budget and was
	// moved to the dead-letter queue.
	ErrorDeadLettered

	// ErrorDropped indicates the error was marked non-retriable and the key
	// was completed without further processing.
	ErrorDropped
)

func (ErrorAction) String added in v0.4.0

func (a ErrorAction) String() string

type ErrorContext added in v0.4.0

type ErrorContext struct {
	// Key is the workqueue key that failed.
	Key string

	// Err is the error returned by the callback.
	Err error

	// Attempts is the number of times the key has been attempted (including
	// the current attempt).
	Attempts int

	// Action is what the dispatcher did with the key after the error.
	Action ErrorAction

	// NonRetriableReason is set when Action is ErrorDropped, providing
	// the reason the error was marked non-retriable.
	NonRetriableReason string
}

ErrorContext carries information about a dispatch error.

type Future

type Future func() error

Future is a function that can be used to block on the result of a round of dispatching work.

func HandleAsync

func HandleAsync(ctx context.Context, wq workqueue.Interface, concurrency, batchSize int, f Callback, maxRetry int, opts ...Option) Future

HandleAsync initiates a single iteration of the dispatcher, possibly invoking the callback for several different keys. It returns a future that can be used to block on the result.

type Option added in v0.4.0

type Option func(*config)

Option configures optional dispatcher behaviour.

func WithErrorBrokerURL added in v0.4.0

func WithErrorBrokerURL(ctx context.Context, brokerURL, workqueueName string) Option

WithErrorBrokerURL configures the dispatcher to emit errors as CloudEvents to the given broker URL. workqueueName is used as the CloudEvent source. When brokerURL is empty, error events are disabled (no-op).

The CloudEvents client is constructed eagerly so it is shared across all dispatch rounds. If construction fails the option degrades gracefully to a no-op with a warning log.

Example

ExampleWithErrorBrokerURL demonstrates enabling error events. When the broker URL is empty, error events are silently disabled.

package main

import (
	"context"
	"fmt"

	"chainguard.dev/driftlessaf/workqueue"
	"chainguard.dev/driftlessaf/workqueue/dispatcher"
	"chainguard.dev/driftlessaf/workqueue/inmem"
)

func main() {
	wq := inmem.NewWorkQueue(5)
	ctx := context.Background()

	if err := wq.Queue(ctx, "example-key", workqueue.Options{}); err != nil {
		panic(err)
	}

	// Empty URL disables error events (no-op).
	err := dispatcher.Handle(ctx, wq, 5, 0, func(_ context.Context, _ string, _ workqueue.Options) error {
		return fmt.Errorf("something went wrong")
	}, dispatcher.WithErrorBrokerURL(ctx, "", "my-workqueue"))
	if err != nil {
		panic(err)
	}
	fmt.Println("dispatched with error events disabled")
}
Output:
dispatched with error events disabled

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL