gcs

package
v0.10.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2026 License: Apache-2.0 Imports: 19 Imported by: 0

README

GCS Workqueue Implementation

This package implements a Google Cloud Storage (GCS) backed workqueue that provides reliable, persistent task processing with state management.

Bucket Organization

The GCS workqueue uses object prefixes to organize tasks by their state within a single bucket:

Prefixes
  • queued/ - Tasks waiting to be processed
  • in-progress/ - Tasks currently being processed by a worker
  • dead-letter/ - Tasks that have failed after exceeding maximum retry attempts
State Transitions
queued/{key} → in-progress/{key} → [completed] (deleted)
                      ↓
                 dead-letter/{key} (on failure)
                      ↑
                 queued/{key} (on requeue)

Object Metadata

Each object stores metadata to track task state:

  • priority - Zero-padded 8-digit priority for lexicographic ordering (higher = processed first)
  • attempts - Number of processing attempts
  • lease-expiration - When the current lease expires (for in-progress tasks)
  • not-before - Earliest time the task should be processed (RFC3339 format)
  • failed-time - When the task was moved to dead letter queue (RFC3339 format)
  • last-attempted - Unix timestamp of last processing attempt

Key Features

  • Priority-based processing - Higher priority tasks processed first
  • Lease-based ownership - In-progress tasks have renewable leases to prevent multiple workers processing the same task
  • Automatic retry with backoff - Failed tasks automatically requeued with exponential backoff
  • Dead letter handling - Tasks exceeding retry limits moved to dead letter queue
  • Orphan detection - Detects and handles tasks with expired leases
  • Deduplication - Duplicate queue requests update priority/timing instead of creating duplicates

Metrics

The implementation exports Prometheus metrics for:

  • Queue sizes (queued, in-progress, dead-lettered)
  • Processing latency and wait times
  • Retry attempts and completion rates
  • Task priorities and attempt distributions

Documentation

Overview

Package gcs implements a Google Cloud Storage backed workqueue that provides reliable, persistent task processing with state management.

Overview

The GCS workqueue uses object prefixes to organize tasks by their state within a single bucket. Tasks flow through three states: queued, in-progress, and dead-letter. The implementation provides priority-based processing, lease-based ownership, automatic retry with backoff, and dead letter handling.

Bucket Organization

Tasks are organized using the following prefixes:

  • queued/ - Tasks waiting to be processed
  • in-progress/ - Tasks currently being processed by a worker
  • dead-letter/ - Tasks that have failed after exceeding maximum retry attempts

Features

  • Priority-based processing: Higher priority tasks are processed first
  • Lease-based ownership: In-progress tasks have renewable leases to prevent multiple workers from processing the same task
  • Automatic retry with backoff: Failed tasks are automatically requeued with exponential backoff
  • Dead letter handling: Tasks exceeding retry limits are moved to a dead letter queue
  • Orphan detection: Detects and handles tasks with expired leases
  • Deduplication: Duplicate queue requests update priority/timing instead of creating duplicates

Usage

Create a new workqueue using NewWorkQueue with a GCS bucket handle:

client, err := storage.NewClient(ctx)
if err != nil {
    log.Fatal(err)
}
wq := gcs.NewWorkQueue(client.Bucket("my-bucket"), 100)

Queue a task:

err := wq.Queue(ctx, "task-key", workqueue.Options{
    Priority: 10,
})

Enumerate and process tasks:

inProgress, queued, deadLettered, err := wq.Enumerate(ctx)
if err != nil {
    log.Fatal(err)
}
for _, key := range queued {
    owned, err := key.Start(ctx)
    if err != nil {
        continue
    }
    // Process the task...
    if err := owned.Complete(ctx); err != nil {
        log.Printf("failed to complete: %v", err)
    }
}

Configuration

The package exposes the following configuration variables:

  • RefreshInterval: The period on which leases are refreshed (default: 5 minutes)
  • TrackWorkAttemptMinThreshold: Minimum attempts before tracking in metrics (default: 20)

Metrics

The implementation exports Prometheus metrics for monitoring:

  • workqueue_in_progress_keys: Number of keys currently being processed
  • workqueue_queued_keys: Number of keys in the backlog
  • workqueue_notbefore_keys: Number of keys waiting on a 'not before' time
  • workqueue_dead_lettered_keys: Number of keys in the dead letter queue
  • workqueue_process_latency_seconds: Duration taken to process a key
  • workqueue_wait_latency_seconds: Duration the key waited to start
  • workqueue_added_keys: Total number of queue requests
  • workqueue_deduped_keys: Total number of deduplicated keys
  • workqueue_max_attempts: Maximum attempts for any queued or in-progress task
  • workqueue_time_to_completion_seconds: Time from first queue to final outcome

All metrics include service_name and revision_name labels derived from K_SERVICE and K_REVISION environment variables.

Thread Safety

The workqueue implementation is safe for concurrent use. Multiple goroutines can queue, enumerate, and process tasks simultaneously. The lease-based ownership model ensures that only one worker processes a given task at a time.

Example (GetTaskState)

Example_getTaskState demonstrates retrieving the state of a specific task.

package main

import (
	"context"
	"fmt"
	"log"

	"cloud.google.com/go/storage"

	"github.com/chainguard-dev/terraform-infra-common/pkg/workqueue/gcs"
)

func main() {
	ctx := context.Background()

	// Create a GCS client
	client, err := storage.NewClient(ctx)
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	// Create a workqueue
	wq := gcs.NewWorkQueue(client.Bucket("my-workqueue-bucket"), 100)

	// Get the state of a specific task
	state, err := wq.Get(ctx, "task-123")
	if err != nil {
		log.Printf("Task not found: %v", err)
		return
	}

	fmt.Printf("Task %s: status=%v, attempts=%d, priority=%d\n",
		state.Key, state.Status, state.Attempts, state.Priority)
}
Example (HandleOrphanedTasks)

Example_handleOrphanedTasks demonstrates detecting and handling orphaned tasks.

package main

import (
	"context"
	"fmt"
	"log"

	"cloud.google.com/go/storage"

	"github.com/chainguard-dev/terraform-infra-common/pkg/workqueue/gcs"
)

func main() {
	ctx := context.Background()

	// Create a GCS client
	client, err := storage.NewClient(ctx)
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	// Create a workqueue
	wq := gcs.NewWorkQueue(client.Bucket("my-workqueue-bucket"), 100)

	// Enumerate tasks
	inProgress, _, _, err := wq.Enumerate(ctx)
	if err != nil {
		log.Print(err)
		return
	}

	// Check for orphaned tasks (tasks with expired leases)
	for _, key := range inProgress {
		if key.IsOrphaned() {
			fmt.Printf("Found orphaned task: %s\n", key.Name())
			// Requeue the orphaned task
			if err := key.Requeue(ctx); err != nil {
				log.Printf("Failed to requeue orphaned task: %v", err)
			}
		}
	}
}
Example (ProcessTasks)

Example_processTasks demonstrates the typical workflow for processing tasks from the workqueue.

package main

import (
	"context"
	"fmt"
	"log"

	"cloud.google.com/go/storage"

	"github.com/chainguard-dev/terraform-infra-common/pkg/workqueue/gcs"
)

func main() {
	ctx := context.Background()

	// Create a GCS client
	client, err := storage.NewClient(ctx)
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	// Create a workqueue
	wq := gcs.NewWorkQueue(client.Bucket("my-workqueue-bucket"), 100)

	// Enumerate tasks
	inProgress, queued, deadLettered, err := wq.Enumerate(ctx)
	if err != nil {
		log.Print(err)
		return
	}

	fmt.Printf("In-progress: %d, Queued: %d, Dead-lettered: %d\n",
		len(inProgress), len(queued), len(deadLettered))

	// Process queued tasks
	for _, key := range queued {
		// Start processing the task
		owned, err := key.Start(ctx)
		if err != nil {
			log.Printf("Failed to start task %s: %v", key.Name(), err)
			continue
		}

		// Process the task using the owned context
		// The context is cancelled if the lease is lost
		select {
		case <-owned.Context().Done():
			log.Printf("Lost ownership of task %s", key.Name())
			continue
		default:
			// Do work here...
		}

		// Mark the task as complete
		if err := owned.Complete(ctx); err != nil {
			log.Printf("Failed to complete task %s: %v", key.Name(), err)
		}
	}
}
Example (RequeueTask)

Example_requeueTask demonstrates how to requeue a task for later processing.

package main

import (
	"context"
	"fmt"
	"log"

	"cloud.google.com/go/storage"

	"github.com/chainguard-dev/terraform-infra-common/pkg/workqueue/gcs"
)

func main() {
	ctx := context.Background()

	// Create a GCS client
	client, err := storage.NewClient(ctx)
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	// Create a workqueue
	wq := gcs.NewWorkQueue(client.Bucket("my-workqueue-bucket"), 100)

	// Enumerate and get a queued task
	_, queued, _, err := wq.Enumerate(ctx)
	if err != nil {
		log.Print(err)
		return
	}

	if len(queued) == 0 {
		fmt.Println("No tasks to process")
		return
	}

	// Start the task
	owned, err := queued[0].Start(ctx)
	if err != nil {
		log.Print(err)
		return
	}

	// If processing fails, requeue the task
	if err := owned.Requeue(ctx); err != nil {
		log.Printf("Failed to requeue: %v", err)
	}

	fmt.Println("Task requeued for retry")
}

Index

Examples

Constants

This section is empty.

Variables

View Source
var RefreshInterval = 5 * time.Minute

RefreshInterval is the period on which we refresh the lease of owned objects It is surfaced as a global, so that it can be mutated by tests and exposed as a flag by binaries wrapping this library. However, binary authors should use caution to pass consistent values to the key ingress and dispatchers, or they may see unexpected behavior. TODO(mattmoor): What's the right balance here?

View Source
var TrackWorkAttemptMinThreshold = 20

The minimum number of attempts before tracking work attempts. This is to minimize the cardinality of the metric.

Functions

func NewWorkQueue

func NewWorkQueue(client ClientInterface, limit int) workqueue.Interface

NewWorkQueue creates a new GCS-backed workqueue.

Example

ExampleNewWorkQueue demonstrates creating a new GCS-backed workqueue.

package main

import (
	"context"
	"fmt"
	"log"

	"cloud.google.com/go/storage"

	"github.com/chainguard-dev/terraform-infra-common/pkg/workqueue"
	"github.com/chainguard-dev/terraform-infra-common/pkg/workqueue/gcs"
)

func main() {
	ctx := context.Background()

	// Create a GCS client
	client, err := storage.NewClient(ctx)
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	// Create a workqueue with a limit of 100 items
	wq := gcs.NewWorkQueue(client.Bucket("my-workqueue-bucket"), 100)

	// Queue a task with priority
	if err := wq.Queue(ctx, "task-123", workqueue.Options{
		Priority: 10,
	}); err != nil {
		log.Print(err)
		return
	}

	fmt.Println("Task queued successfully")
}

Types

type ClientInterface

type ClientInterface interface {
	Object(name string) *storage.ObjectHandle
	Objects(ctx context.Context, q *storage.Query) *storage.ObjectIterator
}

ClientInterface is an interface that abstracts the GCS client.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL