cloudevents-workqueue/

directory
v0.8.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 3, 2025 License: Apache-2.0

README

CloudEvents Workqueue Module

This Terraform module provisions a service that subscribes to CloudEvents from a broker and enqueues work items to a workqueue based on a specified CloudEvent extension attribute.

Architecture

The module creates a Subscriber Service that:

  • Subscribes to specific CloudEvent types from a broker (Pub/Sub topic)
  • Extracts a workqueue key from a specified CloudEvent extension
  • Enqueues work items to a workqueue service for processing

This module is designed to work with github-events module, which publishes GitHub webhook events as CloudEvents with extensions like pullrequesturl and issueurl.

Key Features

  • Flexible Event Filtering: Support multiple Knative Trigger-style filters with OR logic
  • Extension-based Routing: Use any CloudEvent extension as the workqueue key
  • Reliable Delivery: Built-in retry logic and error handling
  • Cloud-native: Runs on Cloud Run with automatic scaling

Setup Instructions

Step 1: Deploy Prerequisites

You need:

  1. A CloudEvent broker (from cloudevent-broker module)
  2. A workqueue service (from workqueue module)
  3. A source of CloudEvents (e.g., github-events module)
Step 2: Deploy the CloudEvents Workqueue
module "github_pr_processor" {
  source = "../../modules/cloudevents-workqueue"

  project_id            = var.project_id
  name                  = "github-pr-processor"
  regions               = var.regions
  notification_channels = var.notification_channels

  # Subscribe to the broker
  broker = module.cloudevent-broker.broker

  # Subscribe to specific CloudEvent types using filters
  filters = [
    { "type" = "dev.chainguard.github.pull_request" },
    { "type" = "dev.chainguard.github.pull_request_review" },
    { "type" = "dev.chainguard.github.pull_request_review_comment" },
    { "type" = "dev.chainguard.github.issue_comment" },  # For PR comments
  ]

  # Use the pullrequesturl extension as the workqueue key
  extension_key = "pullrequesturl"

  # Send to workqueue
  workqueue = {
    name = module.workqueue.dispatcher.name
  }
}
Step 3: Process Work Items

Implement a workqueue consumer that processes the URLs:

package main

import (
    "context"
    "log"

    "github.com/chainguard-dev/terraform-infra-common/pkg/workqueue"
)

func main() {
    ctx := context.Background()

    client, err := workqueue.NewWorkqueueClient(ctx, os.Getenv("WORKQUEUE_SERVICE"))
    if err != nil {
        log.Fatal(err)
    }

    for {
        item, err := client.Dequeue(ctx)
        if err != nil {
            log.Printf("Error dequeuing: %v", err)
            continue
        }

        // item.Key will be the PR URL (e.g., https://github.com/owner/repo/pull/123)
        if err := processPullRequest(ctx, item.Key); err != nil {
            log.Printf("Error processing %s: %v", item.Key, err)
        }
    }
}

Use Cases

GitHub Pull Request Processing

Subscribe to PR-related events and process them using the PR URL:

module "pr_processor" {
  source = "../../modules/cloudevents-workqueue"

  # ... base configuration ...

  # List all PR-related events
  filters = [
    { "type" = "dev.chainguard.github.pull_request" },
    { "type" = "dev.chainguard.github.pull_request_review" },
    { "type" = "dev.chainguard.github.pull_request_review_comment" },
    { "type" = "dev.chainguard.github.check_run" },
    { "type" = "dev.chainguard.github.check_suite" }
  ]

  extension_key = "pullrequesturl"
}
Advanced Filtering Examples

Filter only opened PRs:

filters = [
  {
    "type"   = "dev.chainguard.github.pull_request"
    "action" = "opened"
  }
]

Filter only merged PRs:

filters = [
  {
    "type"   = "dev.chainguard.github.pull_request"
    "action" = "closed"
    "merged" = "true"
  }
]

Filter multiple specific event types:

filters = [
  { "type" = "dev.chainguard.github.pull_request" },
  { "type" = "dev.chainguard.github.issues" },
  { "type" = "dev.chainguard.github.check_run" },
  { "type" = "dev.chainguard.github.check_suite" }
]
GitHub Issue Processing

Subscribe to issue-related events and process them using the issue URL:

module "issue_processor" {
  source = "../../modules/cloudevents-workqueue"

  # ... base configuration ...

  # List all issue-related events
  filters = [
    { "type" = "dev.chainguard.github.issues" },
    { "type" = "dev.chainguard.github.issue_comment" }
  ]

  extension_key = "issueurl"
}
Custom CloudEvents

This module works with any CloudEvents that have the appropriate extension:

module "custom_processor" {
  source = "../../modules/cloudevents-workqueue"

  # ... base configuration ...

  # List specific user events
  filters = [
    { "type" = "com.example.user.created" },
    { "type" = "com.example.user.updated" },
    { "type" = "com.example.user.deleted" }
  ]

  # Use a custom extension
  extension_key = "userid"
}

How It Works

  1. Event Reception: The subscriber service receives CloudEvents via HTTP POST from Pub/Sub
  2. Extension Extraction: The service looks for the specified extension in the CloudEvent
  3. Workqueue Enqueue: If the extension exists and has a non-empty string value, it's enqueued
  4. Error Handling:
    • Missing or invalid extensions are logged and acknowledged (no retry)
    • Workqueue errors trigger retries via Pub/Sub redelivery

Requirements

No requirements.

Providers

Name Version
google n/a

Modules

Name Source Version
subscriber ../regional-go-service n/a
subscriber-calls-workqueue ../authorize-private-service n/a
trigger ../cloudevent-trigger n/a

Resources

Name Type
google_service_account.subscriber resource

Inputs

Name Description Type Default Required
ack_deadline_seconds The deadline for acking a message. number 300 no
broker A map from each of the input region names to the name of the Broker topic in that region. map(string) n/a yes
deletion_protection Whether to enable deletion protection for resources bool true no
extension_key The CloudEvent extension attribute to use as the workqueue key (e.g., pullrequesturl or issueurl) string n/a yes
filters A list of Knative Trigger-style filters over cloud event attributes.

Each filter is a map of attribute key-value pairs that must match exactly.
Multiple filters are combined with OR logic (any filter can match).

Examples:
# Single event type
filters = [
{ "type" = "dev.chainguard.github.pull_request" }
]

# Multiple event types
filters = [
{ "type" = "dev.chainguard.github.pull_request" },
{ "type" = "dev.chainguard.github.pull_request_review" }
]

# Filter by type and action
filters = [
{
"type" = "dev.chainguard.github.pull_request"
"action" = "opened"
}
]
list(map(string)) [] no
max_delivery_attempts The maximum number of delivery attempts for any event. number 20 no
maximum_backoff The maximum delay between consecutive deliveries of a given message. number 600 no
minimum_backoff The minimum delay between consecutive deliveries of a given message. number 10 no
name The base name for resources string n/a yes
notification_channels List of notification channels for alerts list(string) n/a yes
product Product label to apply to the service. string "unknown" no
project_id The GCP project ID string n/a yes
regions A map of regions to launch services in (see regional-go-service module for format)
map(object({
network = string
subnet = string
}))
n/a yes
team Team label to apply to resources (replaces deprecated 'squad'). string "unknown" no
workqueue The workqueue to send events to
object({
name = string
})
n/a yes

Outputs

Name Description
subscriber n/a

Directories

Path Synopsis
cmd
subscriber command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL