Documentation
¶
Overview ¶
Package dispatcher provides a workqueue dispatcher that dequeues keys and invokes a callback for each one.
The dispatcher handles orphaned in-progress keys, concurrency limits, and batch sizing. Use Handle for synchronous dispatch or HandleAsync for non-blocking dispatch with a Future to await results.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Handle ¶
func Handle(ctx context.Context, wq workqueue.Interface, concurrency, batchSize int, f Callback) error
Handle is a synchronous form of HandleAsync.
Example ¶
ExampleHandle demonstrates dispatching a single round of work from a workqueue using Handle.
package main
import (
"context"
"fmt"
"chainguard.dev/driftlessaf/workqueue"
"chainguard.dev/driftlessaf/workqueue/dispatcher"
"chainguard.dev/driftlessaf/workqueue/inmem"
)
func main() {
wq := inmem.NewWorkQueue(5)
ctx := context.Background()
if err := wq.Queue(ctx, "example-key", workqueue.Options{}); err != nil {
panic(err)
}
processed := false
err := dispatcher.Handle(ctx, wq, 5, 0, func(_ context.Context, key string, _ workqueue.Options) error {
fmt.Printf("Processing key: %s\n", key)
processed = true
return nil
})
if err != nil {
panic(err)
}
fmt.Printf("Processed: %v\n", processed)
}
Output: Processing key: example-key Processed: true
Types ¶
type Callback ¶
Callback is the function that Handle calls to process a particular key.
func ServiceCallback ¶
func ServiceCallback(client workqueue.WorkqueueServiceClient) Callback
ServiceCallback returns a Callback that invokes the given service.
Example ¶
ExampleServiceCallback demonstrates creating a Callback that delegates to a WorkqueueServiceClient.
package main
import (
"fmt"
"chainguard.dev/driftlessaf/workqueue"
"chainguard.dev/driftlessaf/workqueue/dispatcher"
)
func main() {
// ServiceCallback wraps a gRPC WorkqueueServiceClient as a dispatcher Callback.
// In production, pass a real client obtained from a gRPC connection.
var client workqueue.WorkqueueServiceClient
cb := dispatcher.ServiceCallback(client)
_ = cb
fmt.Println("ServiceCallback created")
}
Output: ServiceCallback created
type Future ¶
type Future func() error
Future is a function that can be used to block on the result of a round of dispatching work.
func HandleAsync ¶
func HandleAsync(ctx context.Context, wq workqueue.Interface, concurrency, batchSize int, f Callback, maxRetry int) Future
HandleAsync initiates a single iteration of the dispatcher, possibly invoking the callback for several different keys. It returns a future that can be used to block on the result.