README
¶
workqueue
This module provisions a regionalized workqueue abstraction over Google Cloud Storage that implements a Kubernetes-like workqueue abstraction for processing work with concurrency control, in an otherwise stateless fashion (using Google Cloud Run).
Keys are put into the queue and processed from the queue using a symmetrical
GRPC service definition under ./pkg/workqueue/workqueue.proto. This module
takes the name of a service implementing this proto service, and exposes the
name of a service into which work can be enqueued.
module "workqueue" {
source = "chainguard-dev/common/infra//modules/workqueue"
project_id = var.project_id
name = "${var.name}-workqueue"
regions = var.regions
// The number of keys to process concurrently.
concurrent-work = 10
// Maximum number of retry attempts before a task is moved to the dead letter queue
// Default is 0 (unlimited retries)
max-retry = 5
// Optionally disable DLQ alerting while phasing in a queue
enable_dead_letter_alerting = true
// It is recommended that folks use a "global" scoped workqueue to get the
// most accurate deduplication and concurrency control. The "regional" scope
// offers regionalized deduplication and concurrency control, but cannot
// guarantee that receivers and dispatchers in other regions will not process
// the same key concurrently or redundantly.
scope = "global"
// The name of a service that implements the workqueue GRPC service above.
reconciler-service = {
name = "foo"
}
notification_channels = var.notification_channels
}
// Authorize the bar service to queue keys in our workqueue.
module "bar-queues-keys" {
for_each = var.regions
source = "chainguard-dev/common/infra//modules/authorize-private-service"
project_id = var.project_id
region = each.key
name = module.workqueue.receiver.name
service-account = google_service_account.fanout.email
}
// Stand up the bar service in each of our regions.
module "bar-service" {
source = "chainguard-dev/common/infra//modules/regional-go-service"
...
regional-env = [{
name = "WORKQUEUE_SERVICE"
value = { for k, v in module.bar-queues-keys : k => v.uri }
}]
...
}
Then the "bar" service initializes a client for the workqueue GRPC service
pointing at WORKQUEUE_SERVICE with Cloud Run authentication (see the
workqueue.NewWorkqueueClient helper), and queues keys, e.g.
// Set up the client
client, err := workqueue.NewWorkqueueClient(ctx, os.Getenv("WORKQUEUE_SERVICE"))
if err != nil {
log.Panicf("failed to create client: %v", err)
}
defer client.Close()
// Process a key!
if _, err := client.Process(ctx, &workqueue.ProcessRequest{
Key: key,
}); err != nil {
log.Panicf("failed to process key: %v", err)
}
Dashboard
A separate dashboard module is available for monitoring your workqueue. The dashboard provides comprehensive visibility into queue metrics, processing latency, retry patterns, and system health.
// Deploy the workqueue dashboard separately
module "workqueue-dashboard" {
source = "chainguard-dev/common/infra//modules/dashboard/workqueue"
// Pass the same configuration used for the workqueue
name = var.name
max_retry = var.max-retry
concurrent_work = var.concurrent-work
scope = var.scope
// Optional: Add alert policy IDs
alerts = {
"high-retry-alert" = google_monitoring_alert_policy.high_retry.id
}
}
The dashboard includes:
- Queue state visualization (work in progress, queued, added)
- Processing and wait latency metrics
- Retry analytics and completion patterns
- Dead letter queue monitoring (when max-retry is configured)
- Service logs for receiver and dispatcher
See modules/dashboard/workqueue for more details.
Maximum Retry and Dead Letter Queue
The workqueue system supports a maximum retry limit for tasks through the max-retry variable. When a task fails and gets requeued, the system tracks the number of attempts. Once the maximum retry limit is reached, the task is moved to a dead letter queue instead of being requeued.
- Setting
max-retry = 0(the default) means unlimited retries - Setting
max-retry = 5will move a task to the dead letter queue after 5 failed attempts - Setting
enable_dead_letter_alerting = falsetemporarily suppresses the DLQ alert while you phase in a queue
Tasks in the dead letter queue are stored with their original metadata plus:
- A timestamp in the key name to prevent collisions
- A
failed-timemetadata field indicating when the task was moved to the dead letter queue
Dead-lettered tasks can be inspected using standard GCS tools. They are stored in the workqueue bucket under the dead-letter/ prefix.
module "workqueue" {
source = "chainguard-dev/common/infra//modules/workqueue"
# ... other configuration ...
// Maximum retry limit (5 attempts before moving to dead letter queue)
max-retry = 5
// Optional: disable DLQ alerting during phased rollout
enable_dead_letter_alerting = false
}
Reenqueuing Dead-Lettered Keys
A Cloud Run Job is automatically created for each workqueue to reenqueue dead-lettered keys. This is useful after deploying a fix for an issue that caused keys to fail repeatedly.
To run the reenqueue job:
gcloud run jobs execute <workqueue-name>-reenqueue --region <region>
The job will:
- Enumerate all dead-lettered keys in the workqueue
- Re-queue each key with a fresh attempt counter (preserving the original priority)
- When the requeued key succeeds, the dead-letter entry is automatically cleaned up
Requirements
No requirements.
Providers
| Name | Version |
|---|---|
| n/a | |
| google-beta | n/a |
| random | n/a |
Modules
| Name | Source | Version |
|---|---|---|
| change-trigger-calls-dispatcher | ../authorize-private-service | n/a |
| cron-trigger-calls-dispatcher | ../authorize-private-service | n/a |
| dispatcher-calls-target | ../authorize-private-service | n/a |
| dispatcher-service | ../regional-go-service | n/a |
| receiver-service | ../regional-go-service | n/a |
| reenqueue | ../cron | n/a |
Resources
Inputs
| Name | Description | Type | Default | Required |
|---|---|---|---|---|
| batch-size | Optional cap on how much work to launch per dispatcher pass. Defaults to ceil(concurrent-work / number of regions) when unset. | number |
null |
no |
| concurrent-work | The amount of concurrent work to dispatch at a given time. | number |
n/a | yes |
| cpu_idle | Set to false for a region in order to use instance-based billing. Defaults to true. | map(map(bool)) |
{ |
no |
| deletion_protection | Whether to enable delete protection for the service. | bool |
true |
no |
| enable_dead_letter_alerting | Whether to enable alerting for dead-lettered keys. | bool |
true |
no |
| labels | Labels to apply to the workqueue resources. | map(string) |
{} |
no |
| max-retry | The maximum number of retry attempts before a task is moved to the dead letter queue. Set this to 0 to have unlimited retries. | number |
100 |
no |
| multi_regional_location | The multi-regional location for the global workqueue bucket (e.g., 'US', 'EU', 'ASIA'). Only used when scope='global'. | string |
"US" |
no |
| name | n/a | string |
n/a | yes |
| notification_channels | List of notification channels to alert. | list(string) |
n/a | yes |
| primary-region | The primary region for single-homed resources like the reenqueue job. Defaults to the first region in the regions map. | string |
null |
no |
| product | Product label to apply to the service. | string |
"unknown" |
no |
| project_id | n/a | string |
n/a | yes |
| reconciler-service | The name of the reconciler service that the workqueue will dispatch work to. | object({ |
n/a | yes |
| regions | A map from region names to a network and subnetwork. A service will be created in each region configured to egress the specified traffic via the specified subnetwork. | map(object({ |
n/a | yes |
| scope | The scope of the workqueue. Must be 'global' for a single multi-regional workqueue. | string |
"global" |
no |
| team | Team label to apply to resources (replaces deprecated 'squad'). | string |
n/a | yes |
Outputs
| Name | Description |
|---|---|
| dispatcher | n/a |
| receiver | n/a |