README
¶
workqueue
This module provisions a regionalized workqueue abstraction over Google Cloud Storage that implements a Kubernetes-like workqueue abstraction for processing work with concurrency control, in an otherwise stateless fashion (using Google Cloud Run).
Keys are put into the queue and processed from the queue using a symmetrical
GRPC service definition under ./pkg/workqueue/workqueue.proto. This module
takes the name of a service implementing this proto service, and exposes the
name of a service into which work can be enqueued.
module "workqueue" {
source = "chainguard-dev/common/infra//modules/workqueue"
project_id = var.project_id
name = "${var.name}-workqueue"
regions = var.regions
// The number of keys to process concurrently.
concurrent-work = 10
// Maximum number of retry attempts before a task is moved to the dead letter queue
// Default is 0 (unlimited retries)
max-retry = 5
// The name of a service that implements the workqueue GRPC service above.
reconciler-service = {
name = "foo"
}
notification_channels = var.notification_channels
}
// Authorize the bar service to queue keys in our workqueue.
module "bar-queues-keys" {
for_each = var.regions
source = "chainguard-dev/common/infra//modules/authorize-private-service"
project_id = var.project_id
region = each.key
name = module.workqueue.receiver.name
service-account = google_service_account.fanout.email
}
// Stand up the bar service in each of our regions.
module "bar-service" {
source = "chainguard-dev/common/infra//modules/regional-go-service"
...
regional-env = [{
name = "WORKQUEUE_SERVICE"
value = { for k, v in module.bar-queues-keys : k => v.uri }
}]
...
}
Then the "bar" service initializes a client for the workqueue GRPC service
pointing at WORKQUEUE_SERVICE with Cloud Run authentication (see the
workqueue.NewWorkqueueClient helper), and queues keys, e.g.
// Set up the client
client, err := workqueue.NewWorkqueueClient(ctx, os.Getenv("WORKQUEUE_SERVICE"))
if err != nil {
log.Panicf("failed to create client: %v", err)
}
defer client.Close()
// Process a key!
if _, err := client.Process(ctx, &workqueue.ProcessRequest{
Key: key,
}); err != nil {
log.Panicf("failed to process key: %v", err)
}
Maximum Retry and Dead Letter Queue
The workqueue system supports a maximum retry limit for tasks through the max-retry variable. When a task fails and gets requeued, the system tracks the number of attempts. Once the maximum retry limit is reached, the task is moved to a dead letter queue instead of being requeued.
- Setting
max-retry = 0(the default) means unlimited retries - Setting
max-retry = 5will move a task to the dead letter queue after 5 failed attempts
Tasks in the dead letter queue are stored with their original metadata plus:
- A timestamp in the key name to prevent collisions
- A
failed-timemetadata field indicating when the task was moved to the dead letter queue
Dead-lettered tasks can be inspected using standard GCS tools. They are stored in the workqueue bucket under the dead-letter/ prefix.
module "workqueue" {
source = "chainguard-dev/common/infra//modules/workqueue"
# ... other configuration ...
// Maximum retry limit (5 attempts before moving to dead letter queue)
max-retry = 5
}
Requirements
No requirements.
Providers
| Name | Version |
|---|---|
| n/a | |
| google-beta | n/a |
| random | n/a |
Modules
| Name | Source | Version |
|---|---|---|
| change-trigger-calls-dispatcher | ../authorize-private-service | n/a |
| collapsible | ../dashboard/sections/collapsible | n/a |
| cron-trigger-calls-dispatcher | ../authorize-private-service | n/a |
| dashboard | ../dashboard | n/a |
| dispatcher-calls-target | ../authorize-private-service | n/a |
| dispatcher-logs | ../dashboard/sections/logs | n/a |
| dispatcher-service | ../regional-go-service | n/a |
| layout | ../dashboard/sections/layout | n/a |
| percent-deduped | ../dashboard/widgets/xy-ratio | n/a |
| process-latency | ../dashboard/widgets/latency | n/a |
| receiver-logs | ../dashboard/sections/logs | n/a |
| receiver-service | ../regional-go-service | n/a |
| wait-latency | ../dashboard/widgets/latency | n/a |
| width | ../dashboard/sections/width | n/a |
| work-added | ../dashboard/widgets/xy | n/a |
| work-in-progress | ../dashboard/widgets/xy | n/a |
| work-queued | ../dashboard/widgets/xy | n/a |
Resources
| Name | Type |
|---|---|
| google-beta_google_project_service_identity.pubsub | resource |
| google_cloud_scheduler_job.cron | resource |
| google_pubsub_subscription.this | resource |
| google_pubsub_topic.object-change-notifications | resource |
| google_pubsub_topic_iam_binding.gcs-publishes-to-topic | resource |
| google_service_account.change-trigger | resource |
| google_service_account.cron-trigger | resource |
| google_service_account.dispatcher | resource |
| google_service_account.receiver | resource |
| google_service_account_iam_binding.allow-pubsub-to-mint-tokens | resource |
| google_storage_bucket.workqueue | resource |
| google_storage_bucket_iam_binding.authorize-access | resource |
| google_storage_notification.object-change-notifications | resource |
| random_string.change-trigger | resource |
| random_string.cron-trigger | resource |
| random_string.dispatcher | resource |
| random_string.receiver | resource |
| google_storage_project_service_account.gcs_account | data source |
Inputs
| Name | Description | Type | Default | Required |
|---|---|---|---|---|
| concurrent-work | The amount of concurrent work to dispatch at a given time. | number |
n/a | yes |
| max-retry | The maximum number of retry attempts before a task is moved to the dead letter queue. Default of 0 means unlimited retries. | number |
0 |
no |
| name | n/a | string |
n/a | yes |
| notification_channels | List of notification channels to alert. | list(string) |
n/a | yes |
| project_id | n/a | string |
n/a | yes |
| reconciler-service | The name of the reconciler service that the workqueue will dispatch work to. | object({ |
n/a | yes |
| regions | A map from region names to a network and subnetwork. A service will be created in each region configured to egress the specified traffic via the specified subnetwork. | map(object({ |
n/a | yes |
| require_squad | Whether to require squad variable to be specified | bool |
false |
no |
| squad | squad label to apply to the service. | string |
"" |
no |
Outputs
| Name | Description |
|---|---|
| receiver | n/a |