gcs

package
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

README

GCS Workqueue Implementation

This package implements a Google Cloud Storage (GCS) backed workqueue that provides reliable, persistent task processing with state management.

Bucket Organization

The GCS workqueue uses object prefixes to organize tasks by their state within a single bucket:

Prefixes
  • queued/ - Tasks waiting to be processed
  • in-progress/ - Tasks currently being processed by a worker
  • dead-letter/ - Tasks that have failed after exceeding maximum retry attempts
State Transitions
queued/{key} → in-progress/{key} → [completed] (deleted)
                      ↓
                 dead-letter/{key} (on failure)
                      ↑
                 queued/{key} (on requeue)

Object Metadata

Each object stores metadata to track task state:

  • priority - Zero-padded 8-digit priority for lexicographic ordering (higher = processed first)
  • attempts - Number of processing attempts
  • lease-expiration - When the current lease expires (for in-progress tasks)
  • not-before - Earliest time the task should be processed (RFC3339 format)
  • failed-time - When the task was moved to dead letter queue (RFC3339 format)
  • last-attempted - Unix timestamp of last processing attempt

Key Features

  • Priority-based processing - Higher priority tasks processed first
  • Lease-based ownership - In-progress tasks have renewable leases to prevent multiple workers processing the same task
  • Automatic retry with backoff - Failed tasks automatically requeued with exponential backoff
  • Dead letter handling - Tasks exceeding retry limits moved to dead letter queue
  • Orphan detection - Detects and handles tasks with expired leases
  • Deduplication - Duplicate queue requests update priority/timing instead of creating duplicates

Metrics

The implementation exports Prometheus metrics for:

  • Queue sizes (queued, in-progress, dead-lettered)
  • Processing latency and wait times
  • Retry attempts and completion rates
  • Task priorities and attempt distributions

Documentation

Overview

Package gcs provides a Google Cloud Storage-backed workqueue implementation.

Keys are stored as GCS objects under queued/, in-progress/, and dead-letter/ prefixes. The implementation supports priority ordering, not-before delays, and automatic lease refresh to detect orphaned work.

Index

Examples

Constants

This section is empty.

Variables

View Source
var RefreshInterval = 5 * time.Minute

RefreshInterval is the period on which we refresh the lease of owned objects It is surfaced as a global, so that it can be mutated by tests and exposed as a flag by binaries wrapping this library. However, binary authors should use caution to pass consistent values to the key ingress and dispatchers, or they may see unexpected behavior. TODO(mattmoor): What's the right balance here?

View Source
var TrackWorkAttemptMinThreshold = 20

The minimum number of attempts before tracking work attempts. This is to minimize the cardinality of the metric.

Functions

func NewWorkQueue

func NewWorkQueue(client ClientInterface, limit int, opts ...Option) workqueue.Interface

NewWorkQueue creates a new GCS-backed workqueue.

Example

ExampleNewWorkQueue demonstrates constructing a GCS-backed workqueue.

package main

import (
	"fmt"
)

func main() {
	// In production, pass a real *storage.BucketHandle obtained from a
	// cloud.google.com/go/storage client.
	//
	//   client, err := storage.NewClient(ctx)
	//   bucket := client.Bucket("my-workqueue-bucket")
	//   wq := gcs.NewWorkQueue(bucket, 10)
	//
	// The limit parameter controls the maximum number of keys dequeued per
	// Enumerate call.
	fmt.Println("GCS workqueue limit:", 10)
}
Output:
GCS workqueue limit: 10

Types

type ClientInterface

type ClientInterface interface {
	Object(name string) *storage.ObjectHandle
	Objects(ctx context.Context, q *storage.Query) *storage.ObjectIterator
}

ClientInterface is an interface that abstracts the GCS client.

type Option added in v0.7.1

type Option func(*wq)

Option configures a GCS-backed workqueue created by NewWorkQueue.

func WithName added in v0.7.1

func WithName(name string) Option

WithName sets the queue_name label applied to every Prometheus metric this workqueue emits. Use it to disambiguate multiple workqueues running in the same Cloud Run service (which share K_SERVICE / K_REVISION). Defaults to "".

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL