README
¶
CloudEvents Workqueue Module
This Terraform module provisions a service that subscribes to CloudEvents from a broker and enqueues work items to a workqueue based on a specified CloudEvent extension attribute.
Architecture
The module creates a Subscriber Service that:
- Subscribes to specific CloudEvent types from a broker (Pub/Sub topic)
- Extracts a workqueue key from a specified CloudEvent extension
- Enqueues work items to a workqueue service for processing
This module is designed to work with github-events module, which publishes GitHub webhook events as CloudEvents with extensions like pullrequesturl and issueurl.
Key Features
- Flexible Event Filtering: Support multiple Knative Trigger-style filters with OR logic
- Extension-based Routing: Use any CloudEvent extension as the workqueue key
- Reliable Delivery: Built-in retry logic and error handling
- Cloud-native: Runs on Cloud Run with automatic scaling
Setup Instructions
Step 1: Deploy Prerequisites
You need:
- A CloudEvent broker (from
cloudevent-brokermodule) - A workqueue service (from
workqueuemodule) - A source of CloudEvents (e.g.,
github-eventsmodule)
Step 2: Deploy the CloudEvents Workqueue
module "github_pr_processor" {
source = "../../modules/cloudevents-workqueue"
project_id = var.project_id
name = "github-pr-processor"
regions = var.regions
notification_channels = var.notification_channels
# Subscribe to the broker
broker = module.cloudevent-broker.broker
# Subscribe to specific CloudEvent types using filters
filters = [
{ "type" = "dev.chainguard.github.pull_request" },
{ "type" = "dev.chainguard.github.pull_request_review" },
{ "type" = "dev.chainguard.github.pull_request_review_comment" },
{ "type" = "dev.chainguard.github.issue_comment" }, # For PR comments
]
# Use the pullrequesturl extension as the workqueue key
extension_key = "pullrequesturl"
# Send to workqueue
workqueue = {
name = module.workqueue.dispatcher.name
}
}
Step 3: Process Work Items
Implement a workqueue consumer that processes the URLs:
package main
import (
"context"
"log"
"github.com/chainguard-dev/terraform-infra-common/pkg/workqueue"
)
func main() {
ctx := context.Background()
client, err := workqueue.NewWorkqueueClient(ctx, os.Getenv("WORKQUEUE_SERVICE"))
if err != nil {
log.Fatal(err)
}
for {
item, err := client.Dequeue(ctx)
if err != nil {
log.Printf("Error dequeuing: %v", err)
continue
}
// item.Key will be the PR URL (e.g., https://github.com/owner/repo/pull/123)
if err := processPullRequest(ctx, item.Key); err != nil {
log.Printf("Error processing %s: %v", item.Key, err)
}
}
}
Use Cases
GitHub Pull Request Processing
Subscribe to PR-related events and process them using the PR URL:
module "pr_processor" {
source = "../../modules/cloudevents-workqueue"
# ... base configuration ...
# List all PR-related events
filters = [
{ "type" = "dev.chainguard.github.pull_request" },
{ "type" = "dev.chainguard.github.pull_request_review" },
{ "type" = "dev.chainguard.github.pull_request_review_comment" },
{ "type" = "dev.chainguard.github.check_run" },
{ "type" = "dev.chainguard.github.check_suite" }
]
extension_key = "pullrequesturl"
}
Advanced Filtering Examples
Filter only opened PRs:
filters = [
{
"type" = "dev.chainguard.github.pull_request"
"action" = "opened"
}
]
Filter only merged PRs:
filters = [
{
"type" = "dev.chainguard.github.pull_request"
"action" = "closed"
"merged" = "true"
}
]
Filter multiple specific event types:
filters = [
{ "type" = "dev.chainguard.github.pull_request" },
{ "type" = "dev.chainguard.github.issues" },
{ "type" = "dev.chainguard.github.check_run" },
{ "type" = "dev.chainguard.github.check_suite" }
]
GitHub Issue Processing
Subscribe to issue-related events and process them using the issue URL:
module "issue_processor" {
source = "../../modules/cloudevents-workqueue"
# ... base configuration ...
# List all issue-related events
filters = [
{ "type" = "dev.chainguard.github.issues" },
{ "type" = "dev.chainguard.github.issue_comment" }
]
extension_key = "issueurl"
}
Custom CloudEvents
This module works with any CloudEvents that have the appropriate extension:
module "custom_processor" {
source = "../../modules/cloudevents-workqueue"
# ... base configuration ...
# List specific user events
filters = [
{ "type" = "com.example.user.created" },
{ "type" = "com.example.user.updated" },
{ "type" = "com.example.user.deleted" }
]
# Use a custom extension
extension_key = "userid"
}
How It Works
- Event Reception: The subscriber service receives CloudEvents via HTTP POST from Pub/Sub
- Extension Extraction: The service looks for the specified extension in the CloudEvent
- Workqueue Enqueue: If the extension exists and has a non-empty string value, it's enqueued
- Error Handling:
- Missing or invalid extensions are logged and acknowledged (no retry)
- Workqueue errors trigger retries via Pub/Sub redelivery
Requirements
No requirements.
Providers
| Name | Version |
|---|---|
| n/a |
Modules
| Name | Source | Version |
|---|---|---|
| subscriber | ../regional-go-service | n/a |
| subscriber-calls-workqueue | ../authorize-private-service | n/a |
| trigger | ../cloudevent-trigger | n/a |
Resources
| Name | Type |
|---|---|
| google_service_account.subscriber | resource |
Inputs
| Name | Description | Type | Default | Required |
|---|---|---|---|---|
| ack_deadline_seconds | The deadline for acking a message. | number |
300 |
no |
| broker | A map from each of the input region names to the name of the Broker topic in that region. | map(string) |
n/a | yes |
| deletion_protection | Whether to enable deletion protection for resources | bool |
true |
no |
| extension_key | The CloudEvent extension attribute to use as the workqueue key (e.g., pullrequesturl or issueurl) | string |
n/a | yes |
| filters | A list of Knative Trigger-style filters over cloud event attributes. Each filter is a map of attribute key-value pairs that must match exactly. Multiple filters are combined with OR logic (any filter can match). Examples: # Single event type filters = [ { "type" = "dev.chainguard.github.pull_request" } ] # Multiple event types filters = [ { "type" = "dev.chainguard.github.pull_request" }, { "type" = "dev.chainguard.github.pull_request_review" } ] # Filter by type and action filters = [ { "type" = "dev.chainguard.github.pull_request" "action" = "opened" } ] |
list(map(string)) |
[] |
no |
| max_delivery_attempts | The maximum number of delivery attempts for any event. | number |
20 |
no |
| maximum_backoff | The maximum delay between consecutive deliveries of a given message. | number |
600 |
no |
| minimum_backoff | The minimum delay between consecutive deliveries of a given message. | number |
10 |
no |
| name | The base name for resources | string |
n/a | yes |
| notification_channels | List of notification channels for alerts | list(string) |
n/a | yes |
| priority | Priority for workqueue items (higher values = higher priority) | number |
0 |
no |
| product | Product label to apply to the service. | string |
"unknown" |
no |
| project_id | The GCP project ID | string |
n/a | yes |
| regions | A map of regions to launch services in (see regional-go-service module for format) | map(object({ |
n/a | yes |
| team | Team label to apply to resources (replaces deprecated 'squad'). | string |
n/a | yes |
| workqueue | The workqueue to send events to | object({ |
n/a | yes |
Outputs
| Name | Description |
|---|---|
| subscriber | n/a |