Documentation
¶
Overview ¶
Package gcs implements a Google Cloud Storage backed workqueue that provides reliable, persistent task processing with state management.
Overview ¶
The GCS workqueue uses object prefixes to organize tasks by their state within a single bucket. Tasks flow through three states: queued, in-progress, and dead-letter. The implementation provides priority-based processing, lease-based ownership, automatic retry with backoff, and dead letter handling.
Bucket Organization ¶
Tasks are organized using the following prefixes:
- queued/ - Tasks waiting to be processed
- in-progress/ - Tasks currently being processed by a worker
- dead-letter/ - Tasks that have failed after exceeding maximum retry attempts
Features ¶
- Priority-based processing: Higher priority tasks are processed first
- Lease-based ownership: In-progress tasks have renewable leases to prevent multiple workers from processing the same task
- Automatic retry with backoff: Failed tasks are automatically requeued with exponential backoff
- Dead letter handling: Tasks exceeding retry limits are moved to a dead letter queue
- Orphan detection: Detects and handles tasks with expired leases
- Deduplication: Duplicate queue requests update priority/timing instead of creating duplicates
Usage ¶
Create a new workqueue using NewWorkQueue with a GCS bucket handle:
client, err := storage.NewClient(ctx)
if err != nil {
log.Fatal(err)
}
wq := gcs.NewWorkQueue(client.Bucket("my-bucket"), 100)
Queue a task:
err := wq.Queue(ctx, "task-key", workqueue.Options{
Priority: 10,
})
Enumerate and process tasks:
inProgress, queued, deadLettered, err := wq.Enumerate(ctx)
if err != nil {
log.Fatal(err)
}
for _, key := range queued {
owned, err := key.Start(ctx)
if err != nil {
continue
}
// Process the task...
if err := owned.Complete(ctx); err != nil {
log.Printf("failed to complete: %v", err)
}
}
Configuration ¶
The package exposes the following configuration variables:
- RefreshInterval: The period on which leases are refreshed (default: 5 minutes)
- TrackWorkAttemptMinThreshold: Minimum attempts before tracking in metrics (default: 20)
Metrics ¶
The implementation exports Prometheus metrics for monitoring:
- workqueue_in_progress_keys: Number of keys currently being processed
- workqueue_queued_keys: Number of keys in the backlog
- workqueue_notbefore_keys: Number of keys waiting on a 'not before' time
- workqueue_dead_lettered_keys: Number of keys in the dead letter queue
- workqueue_process_latency_seconds: Duration taken to process a key
- workqueue_wait_latency_seconds: Duration the key waited to start
- workqueue_added_keys: Total number of queue requests
- workqueue_deduped_keys: Total number of deduplicated keys
- workqueue_max_attempts: Maximum attempts for any queued or in-progress task
- workqueue_time_to_completion_seconds: Time from first queue to final outcome
All metrics include service_name and revision_name labels derived from K_SERVICE and K_REVISION environment variables.
Thread Safety ¶
The workqueue implementation is safe for concurrent use. Multiple goroutines can queue, enumerate, and process tasks simultaneously. The lease-based ownership model ensures that only one worker processes a given task at a time.
Example (GetTaskState) ¶
Example_getTaskState demonstrates retrieving the state of a specific task.
package main
import (
"context"
"fmt"
"log"
"cloud.google.com/go/storage"
"github.com/chainguard-dev/terraform-infra-common/pkg/workqueue/gcs"
)
func main() {
ctx := context.Background()
// Create a GCS client
client, err := storage.NewClient(ctx)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Create a workqueue
wq := gcs.NewWorkQueue(client.Bucket("my-workqueue-bucket"), 100)
// Get the state of a specific task
state, err := wq.Get(ctx, "task-123")
if err != nil {
log.Printf("Task not found: %v", err)
return
}
fmt.Printf("Task %s: status=%v, attempts=%d, priority=%d\n",
state.Key, state.Status, state.Attempts, state.Priority)
}
Output:
Example (HandleOrphanedTasks) ¶
Example_handleOrphanedTasks demonstrates detecting and handling orphaned tasks.
package main
import (
"context"
"fmt"
"log"
"cloud.google.com/go/storage"
"github.com/chainguard-dev/terraform-infra-common/pkg/workqueue/gcs"
)
func main() {
ctx := context.Background()
// Create a GCS client
client, err := storage.NewClient(ctx)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Create a workqueue
wq := gcs.NewWorkQueue(client.Bucket("my-workqueue-bucket"), 100)
// Enumerate tasks
inProgress, _, _, err := wq.Enumerate(ctx)
if err != nil {
log.Print(err)
return
}
// Check for orphaned tasks (tasks with expired leases)
for _, key := range inProgress {
if key.IsOrphaned() {
fmt.Printf("Found orphaned task: %s\n", key.Name())
// Requeue the orphaned task
if err := key.Requeue(ctx); err != nil {
log.Printf("Failed to requeue orphaned task: %v", err)
}
}
}
}
Output:
Example (ProcessTasks) ¶
Example_processTasks demonstrates the typical workflow for processing tasks from the workqueue.
package main
import (
"context"
"fmt"
"log"
"cloud.google.com/go/storage"
"github.com/chainguard-dev/terraform-infra-common/pkg/workqueue/gcs"
)
func main() {
ctx := context.Background()
// Create a GCS client
client, err := storage.NewClient(ctx)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Create a workqueue
wq := gcs.NewWorkQueue(client.Bucket("my-workqueue-bucket"), 100)
// Enumerate tasks
inProgress, queued, deadLettered, err := wq.Enumerate(ctx)
if err != nil {
log.Print(err)
return
}
fmt.Printf("In-progress: %d, Queued: %d, Dead-lettered: %d\n",
len(inProgress), len(queued), len(deadLettered))
// Process queued tasks
for _, key := range queued {
// Start processing the task
owned, err := key.Start(ctx)
if err != nil {
log.Printf("Failed to start task %s: %v", key.Name(), err)
continue
}
// Process the task using the owned context
// The context is cancelled if the lease is lost
select {
case <-owned.Context().Done():
log.Printf("Lost ownership of task %s", key.Name())
continue
default:
// Do work here...
}
// Mark the task as complete
if err := owned.Complete(ctx); err != nil {
log.Printf("Failed to complete task %s: %v", key.Name(), err)
}
}
}
Output:
Example (RequeueTask) ¶
Example_requeueTask demonstrates how to requeue a task for later processing.
package main
import (
"context"
"fmt"
"log"
"cloud.google.com/go/storage"
"github.com/chainguard-dev/terraform-infra-common/pkg/workqueue/gcs"
)
func main() {
ctx := context.Background()
// Create a GCS client
client, err := storage.NewClient(ctx)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Create a workqueue
wq := gcs.NewWorkQueue(client.Bucket("my-workqueue-bucket"), 100)
// Enumerate and get a queued task
_, queued, _, err := wq.Enumerate(ctx)
if err != nil {
log.Print(err)
return
}
if len(queued) == 0 {
fmt.Println("No tasks to process")
return
}
// Start the task
owned, err := queued[0].Start(ctx)
if err != nil {
log.Print(err)
return
}
// If processing fails, requeue the task
if err := owned.Requeue(ctx); err != nil {
log.Printf("Failed to requeue: %v", err)
}
fmt.Println("Task requeued for retry")
}
Output:
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var RefreshInterval = 5 * time.Minute
RefreshInterval is the period on which we refresh the lease of owned objects It is surfaced as a global, so that it can be mutated by tests and exposed as a flag by binaries wrapping this library. However, binary authors should use caution to pass consistent values to the key ingress and dispatchers, or they may see unexpected behavior. TODO(mattmoor): What's the right balance here?
var TrackWorkAttemptMinThreshold = 20
The minimum number of attempts before tracking work attempts. This is to minimize the cardinality of the metric.
Functions ¶
func NewWorkQueue ¶
func NewWorkQueue(client ClientInterface, limit int) workqueue.Interface
NewWorkQueue creates a new GCS-backed workqueue.
Example ¶
ExampleNewWorkQueue demonstrates creating a new GCS-backed workqueue.
package main
import (
"context"
"fmt"
"log"
"cloud.google.com/go/storage"
"github.com/chainguard-dev/terraform-infra-common/pkg/workqueue"
"github.com/chainguard-dev/terraform-infra-common/pkg/workqueue/gcs"
)
func main() {
ctx := context.Background()
// Create a GCS client
client, err := storage.NewClient(ctx)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Create a workqueue with a limit of 100 items
wq := gcs.NewWorkQueue(client.Bucket("my-workqueue-bucket"), 100)
// Queue a task with priority
if err := wq.Queue(ctx, "task-123", workqueue.Options{
Priority: 10,
}); err != nil {
log.Print(err)
return
}
fmt.Println("Task queued successfully")
}
Output:
Types ¶
type ClientInterface ¶
type ClientInterface interface {
Object(name string) *storage.ObjectHandle
Objects(ctx context.Context, q *storage.Query) *storage.ObjectIterator
}
ClientInterface is an interface that abstracts the GCS client.