workqueue/

directory
v0.10.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2026 License: Apache-2.0

README

workqueue

This module provisions a regionalized workqueue abstraction over Google Cloud Storage that implements a Kubernetes-like workqueue abstraction for processing work with concurrency control, in an otherwise stateless fashion (using Google Cloud Run).

Keys are put into the queue and processed from the queue using a symmetrical GRPC service definition under ./pkg/workqueue/workqueue.proto. This module takes the name of a service implementing this proto service, and exposes the name of a service into which work can be enqueued.

module "workqueue" {
  source = "chainguard-dev/common/infra//modules/workqueue"

  project_id = var.project_id
  name       = "${var.name}-workqueue"
  regions    = var.regions

  // The number of keys to process concurrently.
  concurrent-work = 10

  // Maximum number of retry attempts before a task is moved to the dead letter queue
  // Default is 0 (unlimited retries)
  max-retry = 5

  // Optionally disable DLQ alerting while phasing in a queue
  enable_dead_letter_alerting = true

  // It is recommended that folks use a "global" scoped workqueue to get the
  // most accurate deduplication and concurrency control.  The "regional" scope
  // offers regionalized deduplication and concurrency control, but cannot
  // guarantee that receivers and dispatchers in other regions will not process
  // the same key concurrently or redundantly.
  scope = "global"

  // The name of a service that implements the workqueue GRPC service above.
  reconciler-service = {
    name = "foo"
  }

  notification_channels = var.notification_channels
}

// Authorize the bar service to queue keys in our workqueue.
module "bar-queues-keys" {
  for_each = var.regions

  source = "chainguard-dev/common/infra//modules/authorize-private-service"

  project_id = var.project_id
  region     = each.key
  name       = module.workqueue.receiver.name

  service-account = google_service_account.fanout.email
}

// Stand up the bar service in each of our regions.
module "bar-service" {
  source = "chainguard-dev/common/infra//modules/regional-go-service"
  ...
      regional-env = [{
        name  = "WORKQUEUE_SERVICE"
        value = { for k, v in module.bar-queues-keys : k => v.uri }
      }]
  ...
}

Then the "bar" service initializes a client for the workqueue GRPC service pointing at WORKQUEUE_SERVICE with Cloud Run authentication (see the workqueue.NewWorkqueueClient helper), and queues keys, e.g.

	// Set up the client
	client, err := workqueue.NewWorkqueueClient(ctx, os.Getenv("WORKQUEUE_SERVICE"))
	if err != nil {
		log.Panicf("failed to create client: %v", err)
	}
	defer client.Close()

	// Process a key!
	if _, err := client.Process(ctx, &workqueue.ProcessRequest{
		Key: key,
	}); err != nil {
		log.Panicf("failed to process key: %v", err)
	}

Dashboard

A separate dashboard module is available for monitoring your workqueue. The dashboard provides comprehensive visibility into queue metrics, processing latency, retry patterns, and system health.

// Deploy the workqueue dashboard separately
module "workqueue-dashboard" {
  source = "chainguard-dev/common/infra//modules/dashboard/workqueue"

  // Pass the same configuration used for the workqueue
  name            = var.name
  max_retry       = var.max-retry
  concurrent_work = var.concurrent-work
  scope           = var.scope

  // Optional: Add alert policy IDs
  alerts = {
    "high-retry-alert" = google_monitoring_alert_policy.high_retry.id
  }
}

The dashboard includes:

  • Queue state visualization (work in progress, queued, added)
  • Processing and wait latency metrics
  • Retry analytics and completion patterns
  • Dead letter queue monitoring (when max-retry is configured)
  • Service logs for receiver and dispatcher

See modules/dashboard/workqueue for more details.

Maximum Retry and Dead Letter Queue

The workqueue system supports a maximum retry limit for tasks through the max-retry variable. When a task fails and gets requeued, the system tracks the number of attempts. Once the maximum retry limit is reached, the task is moved to a dead letter queue instead of being requeued.

  • Setting max-retry = 0 (the default) means unlimited retries
  • Setting max-retry = 5 will move a task to the dead letter queue after 5 failed attempts
  • Setting enable_dead_letter_alerting = false temporarily suppresses the DLQ alert while you phase in a queue

Tasks in the dead letter queue are stored with their original metadata plus:

  • A timestamp in the key name to prevent collisions
  • A failed-time metadata field indicating when the task was moved to the dead letter queue

Dead-lettered tasks can be inspected using standard GCS tools. They are stored in the workqueue bucket under the dead-letter/ prefix.

module "workqueue" {
  source = "chainguard-dev/common/infra//modules/workqueue"

  # ... other configuration ...

  // Maximum retry limit (5 attempts before moving to dead letter queue)
  max-retry = 5

  // Optional: disable DLQ alerting during phased rollout
  enable_dead_letter_alerting = false
}
Reenqueuing Dead-Lettered Keys

A Cloud Run Job is automatically created for each workqueue to reenqueue dead-lettered keys. This is useful after deploying a fix for an issue that caused keys to fail repeatedly.

To run the reenqueue job:

gcloud run jobs execute <workqueue-name>-reenqueue --region <region>

The job will:

  1. Enumerate all dead-lettered keys in the workqueue
  2. Re-queue each key with a fresh attempt counter (preserving the original priority)
  3. When the requeued key succeeds, the dead-letter entry is automatically cleaned up

Requirements

No requirements.

Providers

Name Version
google n/a
google-beta n/a
random n/a

Modules

Name Source Version
change-trigger-calls-dispatcher ../authorize-private-service n/a
cron-trigger-calls-dispatcher ../authorize-private-service n/a
dispatcher-calls-target ../authorize-private-service n/a
dispatcher-service ../regional-go-service n/a
receiver-service ../regional-go-service n/a
reenqueue ../cron n/a

Resources

Name Type
google-beta_google_project_service_identity.pubsub resource
google_cloud_scheduler_job.cron resource
google_monitoring_alert_policy.dead_letter_queue resource
google_pubsub_subscription.global-this resource
google_pubsub_topic.global-object-change-notifications resource
google_pubsub_topic_iam_binding.global-gcs-publishes-to-topic resource
google_service_account.change-trigger resource
google_service_account.cron-trigger resource
google_service_account.dispatcher resource
google_service_account.receiver resource
google_service_account.reenqueue resource
google_service_account_iam_binding.allow-pubsub-to-mint-tokens resource
google_storage_bucket.global-workqueue resource
google_storage_bucket_iam_binding.global-authorize-access resource
google_storage_bucket_iam_member.reenqueue-bucket-access resource
google_storage_notification.global-object-change-notifications resource
random_string.bucket_suffix resource
random_string.change-trigger resource
random_string.cron-trigger resource
random_string.dispatcher resource
random_string.receiver resource
random_string.reenqueue resource
google_storage_project_service_account.gcs_account data source

Inputs

Name Description Type Default Required
batch-size Optional cap on how much work to launch per dispatcher pass. Defaults to ceil(concurrent-work / number of regions) when unset. number null no
concurrent-work The amount of concurrent work to dispatch at a given time. number n/a yes
cpu_idle Set to false for a region in order to use instance-based billing. Defaults to true. map(map(bool))
{
"dispatcher": {},
"receiver": {}
}
no
deletion_protection Whether to enable delete protection for the service. bool true no
enable_dead_letter_alerting Whether to enable alerting for dead-lettered keys. bool true no
labels Labels to apply to the workqueue resources. map(string) {} no
max-retry The maximum number of retry attempts before a task is moved to the dead letter queue. Set this to 0 to have unlimited retries. number 100 no
multi_regional_location The multi-regional location for the global workqueue bucket (e.g., 'US', 'EU', 'ASIA'). Only used when scope='global'. string "US" no
name n/a string n/a yes
notification_channels List of notification channels to alert. list(string) n/a yes
primary-region The primary region for single-homed resources like the reenqueue job. Defaults to the first region in the regions map. string null no
product Product label to apply to the service. string "unknown" no
project_id n/a string n/a yes
reconciler-service The name of the reconciler service that the workqueue will dispatch work to.
object({
name = string
})
n/a yes
regions A map from region names to a network and subnetwork. A service will be created in each region configured to egress the specified traffic via the specified subnetwork.
map(object({
network = string
subnet = string
}))
n/a yes
scope The scope of the workqueue. Must be 'global' for a single multi-regional workqueue. string "global" no
team Team label to apply to resources (replaces deprecated 'squad'). string n/a yes

Outputs

Name Description
dispatcher n/a
receiver n/a

Directories

Path Synopsis
cmd
dispatcher command
inmem command
receiver command
reenqueue command
send command
test command
hyperqueue
cmd/hyperqueue command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL