taskqueue

package
v0.0.0-...-e9fa201 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package taskqueue provides a durable task queue for background processing.

Supported backends: - Database (Vitess/MySQL) - default, uses existing infrastructure - In-memory - for testing only

Community use cases: - Object/version cleanup - Lifecycle transitions

Enterprise use cases (requires license): - Cross-region replication (CRR) - Audit log shipping - Webhook delivery

Note: GC RefCount decrements are now handled by the centralized chunk_registry in the metadata database, eliminating the need for distributed task queues.

Index

Constants

View Source
const (
	DefaultPollInterval      = time.Second
	DefaultConcurrency       = 5
	DefaultVisibilityTimeout = 5 * time.Minute
	DefaultMaxRetries        = 3
)

Default configuration values

View Source
const (
	LifecycleActionDelete        = "delete"
	LifecycleActionDeleteVersion = "delete_version"
	LifecycleActionTransition    = "transition"
	LifecycleActionAbortMPU      = "abort_mpu"
	LifecycleActionPromote       = "promote" // Intelligent tiering auto-promotion on access
)

Lifecycle action constants

View Source
const (
	RestoreTierExpedited = "Expedited" // 1-5 minutes (only Glacier)
	RestoreTierStandard  = "Standard"  // 3-5 hours (Glacier), 12 hours (Deep Archive)
	RestoreTierBulk      = "Bulk"      // 5-12 hours (Glacier), 48 hours (Deep Archive)
)

Restore tier constants (match AWS retrieval tiers)

Variables

View Source
var (
	// TasksProcessedTotal tracks total tasks processed by type and status
	TasksProcessedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
		Namespace: "zapfs",
		Subsystem: "taskqueue",
		Name:      "tasks_processed_total",
		Help:      "Total number of tasks processed",
	}, []string{"type", "status"}) // status: "completed", "failed", "no_handler"

	// TaskProcessingDuration tracks task processing time by type
	TaskProcessingDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
		Namespace: "zapfs",
		Subsystem: "taskqueue",
		Name:      "task_processing_duration_seconds",
		Help:      "Time spent processing tasks",
		Buckets:   []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60},
	}, []string{"type"})

	// TasksEnqueuedTotal tracks total tasks enqueued by type
	TasksEnqueuedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
		Namespace: "zapfs",
		Subsystem: "taskqueue",
		Name:      "tasks_enqueued_total",
		Help:      "Total number of tasks enqueued",
	}, []string{"type"})

	// TaskRetries tracks task retry counts
	TaskRetries = prometheus.NewCounterVec(prometheus.CounterOpts{
		Namespace: "zapfs",
		Subsystem: "taskqueue",
		Name:      "task_retries_total",
		Help:      "Total number of task retries",
	}, []string{"type"})

	// QueueDepth tracks current queue depth by status
	QueueDepth = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Namespace: "zapfs",
		Subsystem: "taskqueue",
		Name:      "queue_depth",
		Help:      "Current number of tasks in queue by status",
	}, []string{"status"}) // status: "pending", "running", "failed"

	// WorkerActive tracks number of active workers
	WorkerActive = prometheus.NewGauge(prometheus.GaugeOpts{
		Namespace: "zapfs",
		Subsystem: "taskqueue",
		Name:      "workers_active",
		Help:      "Number of active worker goroutines",
	})

	// DequeueErrors tracks dequeue operation errors
	DequeueErrors = prometheus.NewCounter(prometheus.CounterOpts{
		Namespace: "zapfs",
		Subsystem: "taskqueue",
		Name:      "dequeue_errors_total",
		Help:      "Total number of dequeue errors",
	})

	// DeadlockRetries tracks deadlock retry attempts
	DeadlockRetries = prometheus.NewCounter(prometheus.CounterOpts{
		Namespace: "zapfs",
		Subsystem: "taskqueue",
		Name:      "deadlock_retries_total",
		Help:      "Total number of deadlock retry attempts",
	})
)
View Source
var (
	ErrTaskNotFound       = errors.New("task not found")
	ErrQueueFull          = errors.New("task queue is full")
	ErrQueueClosed        = errors.New("task queue is closed")
	ErrInvalidPayload     = errors.New("invalid task payload")
	ErrMaxRetriesExceeded = errors.New("max retries exceeded")
)

Common errors

Functions

func MarshalPayload

func MarshalPayload(v any) (json.RawMessage, error)

MarshalPayload is a helper to marshal a payload struct to JSON.

func UnmarshalPayload

func UnmarshalPayload[T any](payload json.RawMessage) (T, error)

UnmarshalPayload is a helper to unmarshal a JSON payload.

Types

type DBQueue

type DBQueue struct {
	// contains filtered or unexported fields
}

DBQueue is a database-backed implementation of Queue. Supports MySQL/Vitess and PostgreSQL/CockroachDB for durable, distributed task storage. Supports multiple concurrent workers via FOR UPDATE SKIP LOCKED.

func NewDBQueue

func NewDBQueue(cfg DBQueueConfig) (*DBQueue, error)

NewDBQueue creates a new database-backed queue.

func (*DBQueue) Cancel

func (q *DBQueue) Cancel(ctx context.Context, taskID string) error

func (*DBQueue) Cleanup

func (q *DBQueue) Cleanup(ctx context.Context, olderThan time.Duration) (int, error)

func (*DBQueue) Close

func (q *DBQueue) Close() error

func (*DBQueue) Complete

func (q *DBQueue) Complete(ctx context.Context, taskID string) error

func (*DBQueue) Dequeue

func (q *DBQueue) Dequeue(ctx context.Context, workerID string, taskTypes ...TaskType) (*Task, error)

func (*DBQueue) Enqueue

func (q *DBQueue) Enqueue(ctx context.Context, task *Task) error

func (*DBQueue) Fail

func (q *DBQueue) Fail(ctx context.Context, taskID string, taskErr error) error

func (*DBQueue) Get

func (q *DBQueue) Get(ctx context.Context, taskID string) (*Task, error)

func (*DBQueue) Heartbeat

func (q *DBQueue) Heartbeat(ctx context.Context, taskID string, workerID string) error

Heartbeat extends the visibility timeout for a running task. Workers should call this periodically to prevent the task from being reclaimed. Uses deadlock retry to ensure heartbeat succeeds even under contention.

func (*DBQueue) List

func (q *DBQueue) List(ctx context.Context, filter TaskFilter) ([]*Task, error)

func (*DBQueue) ReclaimStale

func (q *DBQueue) ReclaimStale(ctx context.Context) (int, error)

ReclaimStale finds tasks that have been running too long without a heartbeat and marks them as pending for retry. Returns the number of tasks reclaimed.

func (*DBQueue) Stats

func (q *DBQueue) Stats(ctx context.Context) (*QueueStats, error)

func (*DBQueue) VisibilityTimeout

func (q *DBQueue) VisibilityTimeout() time.Duration

VisibilityTimeout returns the configured visibility timeout.

type DBQueueConfig

type DBQueueConfig struct {
	DB                *sql.DB
	Driver            Driver        // Database driver (mysql, postgres). Defaults to mysql.
	TableName         string        // Defaults to "tasks"
	VisibilityTimeout time.Duration // How long before a running task is considered abandoned (default: 5m)
}

DBQueueConfig configures the database queue.

type Driver

type Driver string

Driver identifies a database driver type for the task queue.

const (
	// DriverMySQL uses MySQL/MariaDB/Vitess with ? placeholders
	DriverMySQL Driver = "mysql"
	// DriverPostgres uses PostgreSQL/CockroachDB with $N placeholders
	DriverPostgres Driver = "postgres"
)

type EventPayload

type EventPayload struct {
	// EventName is the S3 event type (e.g., "s3:ObjectCreated:Put").
	EventName string `json:"event_name"`

	// Bucket is the bucket name where the event occurred.
	Bucket string `json:"bucket"`

	// Key is the object key (empty for bucket-level events).
	Key string `json:"key"`

	// Size is the object size in bytes (for object events).
	Size int64 `json:"size"`

	// ETag is the object's entity tag.
	ETag string `json:"etag"`

	// VersionID is the object version (if versioning enabled).
	VersionID string `json:"version_id,omitempty"`

	// OwnerID is the bucket/object owner's canonical user ID.
	OwnerID string `json:"owner_id"`

	// RequestID is the original S3 request ID.
	RequestID string `json:"request_id"`

	// SourceIP is the client IP that made the request.
	SourceIP string `json:"source_ip"`

	// Timestamp is the event time in Unix milliseconds.
	Timestamp int64 `json:"timestamp"`

	// Sequencer is used for ordering events on the same object.
	Sequencer string `json:"sequencer"`

	// UserAgent is the client's user agent string.
	UserAgent string `json:"user_agent,omitempty"`
}

EventPayload is the payload for TaskTypeEvent tasks. Contains S3 event metadata for delivery to notification destinations.

type FederationDiscoveryPayload

type FederationDiscoveryPayload struct {
	// Local bucket name (federated bucket)
	Bucket string `json:"bucket"`

	// Pagination state for resuming discovery
	StartAfter string `json:"start_after,omitempty"` // Continue from this key
	Prefix     string `json:"prefix,omitempty"`      // Optional prefix filter

	// Batch configuration
	BatchSize int `json:"batch_size"` // Max objects per batch (default: 1000)

	// Versioning
	IncludeVersions bool `json:"include_versions"` // Discover all versions (for versioned buckets)

	// Tracking
	StartedAt int64 `json:"started_at"` // Unix timestamp (nanos) when discovery started
}

FederationDiscoveryPayload is the payload for TaskTypeFederationDiscovery tasks. Contains metadata for discovering objects in an external S3 bucket.

type FederationIngestPayload

type FederationIngestPayload struct {
	// Local bucket name (where the object will be stored)
	Bucket string `json:"bucket"`

	// Object key
	Key string `json:"key"`

	// Version ID from external S3 (empty for non-versioned)
	VersionID string `json:"version_id,omitempty"`

	// Object metadata from external S3
	Size         int64  `json:"size"`
	ETag         string `json:"etag"`
	ContentType  string `json:"content_type,omitempty"`
	StorageClass string `json:"storage_class,omitempty"`
	LastModified int64  `json:"last_modified"` // Unix timestamp (nanos)

	// Tracking
	DiscoveredAt int64 `json:"discovered_at"` // Unix timestamp (nanos) when object was discovered
}

FederationIngestPayload is the payload for TaskTypeFederationIngest tasks. Contains metadata for ingesting an object from external S3 to local storage.

type Handler

type Handler interface {
	// Type returns the task type this handler processes.
	Type() TaskType

	// Handle processes the task and returns an error if it failed.
	Handle(ctx context.Context, task *Task) error
}

Handler processes tasks of a specific type.

type LifecyclePayload

type LifecyclePayload struct {
	// Object identification
	Bucket    string `json:"bucket"`
	Key       string `json:"key"`
	VersionID string `json:"version_id,omitempty"`

	// Action to take: "delete", "delete_version", "transition", "abort_mpu"
	Action       string `json:"action"`
	StorageClass string `json:"storage_class,omitempty"` // For transitions

	// Tracking
	RuleID      string `json:"rule_id"`
	EvaluatedAt int64  `json:"evaluated_at"` // Unix timestamp (nanos)

	// Verification (to ensure object hasn't changed since evaluation)
	ExpectedETag    string `json:"expected_etag,omitempty"`
	ExpectedModTime int64  `json:"expected_mod_time,omitempty"` // Unix timestamp (nanos)

	// For multipart uploads
	UploadID string `json:"upload_id,omitempty"`
}

LifecyclePayload is the payload for TaskTypeLifecycle tasks. Contains object metadata and action to perform.

type MemoryQueue

type MemoryQueue struct {
	// contains filtered or unexported fields
}

MemoryQueue is an in-memory implementation of Queue for testing. NOT for production use - tasks are not persisted.

func (*MemoryQueue) Cancel

func (q *MemoryQueue) Cancel(ctx context.Context, taskID string) error

func (*MemoryQueue) Cleanup

func (q *MemoryQueue) Cleanup(ctx context.Context, olderThan time.Duration) (int, error)

func (*MemoryQueue) Close

func (q *MemoryQueue) Close() error

func (*MemoryQueue) Complete

func (q *MemoryQueue) Complete(ctx context.Context, taskID string) error

func (*MemoryQueue) Dequeue

func (q *MemoryQueue) Dequeue(ctx context.Context, workerID string, taskTypes ...TaskType) (*Task, error)

func (*MemoryQueue) Enqueue

func (q *MemoryQueue) Enqueue(ctx context.Context, task *Task) error

func (*MemoryQueue) Fail

func (q *MemoryQueue) Fail(ctx context.Context, taskID string, err error) error

func (*MemoryQueue) Get

func (q *MemoryQueue) Get(ctx context.Context, taskID string) (*Task, error)

func (*MemoryQueue) Heartbeat

func (q *MemoryQueue) Heartbeat(ctx context.Context, taskID string, workerID string) error

func (*MemoryQueue) List

func (q *MemoryQueue) List(ctx context.Context, filter TaskFilter) ([]*Task, error)

func (*MemoryQueue) Stats

func (q *MemoryQueue) Stats(ctx context.Context) (*QueueStats, error)

type Queue

type Queue interface {
	// Enqueue adds a task to the queue.
	Enqueue(ctx context.Context, task *Task) error

	// Dequeue retrieves the next available task for processing.
	// Returns nil if no tasks are available.
	// Uses FOR UPDATE SKIP LOCKED for concurrent worker support.
	Dequeue(ctx context.Context, workerID string, taskTypes ...TaskType) (*Task, error)

	// Complete marks a task as successfully completed.
	Complete(ctx context.Context, taskID string) error

	// Fail marks a task as failed with an error message.
	// If retries remain, the task will be requeued.
	Fail(ctx context.Context, taskID string, err error) error

	// Cancel marks a task as cancelled.
	Cancel(ctx context.Context, taskID string) error

	// Heartbeat extends the visibility timeout for a running task.
	// Workers should call this periodically for long-running tasks.
	Heartbeat(ctx context.Context, taskID string, workerID string) error

	// Get retrieves a task by ID.
	Get(ctx context.Context, taskID string) (*Task, error)

	// List returns tasks matching the filter.
	List(ctx context.Context, filter TaskFilter) ([]*Task, error)

	// Stats returns queue statistics.
	Stats(ctx context.Context) (*QueueStats, error)

	// Cleanup removes old completed/failed tasks.
	Cleanup(ctx context.Context, olderThan time.Duration) (int, error)

	// Close shuts down the queue.
	Close() error
}

Queue defines the interface for task queue operations.

func NewMemoryQueue

func NewMemoryQueue() Queue

NewMemoryQueue creates a new in-memory queue.

type QueueStats

type QueueStats struct {
	Pending    int64 `json:"pending"`
	Running    int64 `json:"running"`
	Completed  int64 `json:"completed"`
	Failed     int64 `json:"failed"`
	DeadLetter int64 `json:"dead_letter"`

	// By type
	ByType map[TaskType]int64 `json:"by_type"`

	// Performance
	AvgProcessingTime time.Duration `json:"avg_processing_time_ms"`
	OldestPending     *time.Time    `json:"oldest_pending,omitempty"`
}

QueueStats provides queue metrics.

type RestorePayload

type RestorePayload struct {
	// Object identification
	ObjectID  string `json:"object_id"`            // UUID of the object
	Bucket    string `json:"bucket"`               // Bucket name
	Key       string `json:"key"`                  // Object key
	VersionID string `json:"version_id,omitempty"` // Version ID if versioned

	// Restore configuration
	Days int    `json:"days"` // Number of days to keep restored copy
	Tier string `json:"tier"` // Retrieval tier: "Expedited", "Standard", "Bulk"

	// Source location
	StorageClass    string `json:"storage_class"`              // Current storage class (GLACIER, DEEP_ARCHIVE)
	TransitionedRef string `json:"transitioned_ref,omitempty"` // Reference to archived data

	// Tracking
	RequestedAt int64  `json:"requested_at"` // Unix nano when restore was initiated
	RequestID   string `json:"request_id"`   // Original S3 request ID
}

RestorePayload is the payload for TaskTypeRestore tasks. Contains metadata for restoring an archived object.

type Task

type Task struct {
	// Identification
	ID       string       `json:"id" db:"id"`
	Type     TaskType     `json:"type" db:"type"`
	Status   TaskStatus   `json:"status" db:"status"`
	Priority TaskPriority `json:"priority" db:"priority"`

	// Payload - JSON encoded task-specific data
	Payload json.RawMessage `json:"payload" db:"payload"`

	// Scheduling
	ScheduledAt time.Time  `json:"scheduled_at" db:"scheduled_at"`
	StartedAt   *time.Time `json:"started_at,omitempty" db:"started_at"`
	CompletedAt *time.Time `json:"completed_at,omitempty" db:"completed_at"`

	// Retry handling
	Attempts   int       `json:"attempts" db:"attempts"`
	MaxRetries int       `json:"max_retries" db:"max_retries"`
	RetryAfter time.Time `json:"retry_after,omitempty" db:"retry_after"`

	// Error tracking
	LastError string `json:"last_error,omitempty" db:"last_error"`

	// Metadata
	CreatedAt time.Time `json:"created_at" db:"created_at"`
	UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
	Region    string    `json:"region,omitempty" db:"region"` // Source region
	WorkerID  string    `json:"worker_id,omitempty" db:"worker_id"`
}

Task represents a unit of work to be processed.

type TaskFilter

type TaskFilter struct {
	Type   TaskType   `json:"type,omitempty"`
	Status TaskStatus `json:"status,omitempty"`
	Region string     `json:"region,omitempty"`
	Limit  int        `json:"limit,omitempty"`
	Offset int        `json:"offset,omitempty"`
}

TaskFilter for querying tasks.

type TaskPriority

type TaskPriority int

TaskPriority allows urgent tasks to be processed first.

const (
	PriorityLow    TaskPriority = 0
	PriorityNormal TaskPriority = 5
	PriorityHigh   TaskPriority = 10
	PriorityUrgent TaskPriority = 20
)

type TaskStatus

type TaskStatus string

TaskStatus represents the current state of a task.

const (
	StatusPending    TaskStatus = "pending"     // Waiting to be picked up
	StatusRunning    TaskStatus = "running"     // Currently being processed
	StatusCompleted  TaskStatus = "completed"   // Successfully finished
	StatusFailed     TaskStatus = "failed"      // Failed, may retry
	StatusDeadLetter TaskStatus = "dead_letter" // Failed permanently
	StatusCancelled  TaskStatus = "cancelled"   // Cancelled by user/system
)

type TaskType

type TaskType string

TaskType identifies the type of task for routing to handlers.

const (
	TaskTypeCleanup   TaskType = "cleanup"   // Object/version cleanup
	TaskTypeLifecycle TaskType = "lifecycle" // Lifecycle transitions
	TaskTypeEvent     TaskType = "event"     // S3 event notification
	TaskTypeRestore   TaskType = "restore"   // Object restore from archive

	// Federation task types (S3 passthrough/migration)
	TaskTypeFederationIngest    TaskType = "federation_ingest"    // Ingest object from external S3 to local
	TaskTypeFederationDiscovery TaskType = "federation_discovery" // Discover objects in external S3 bucket
)

Community task types

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker polls the queue and executes tasks.

func NewWorker

func NewWorker(cfg WorkerConfig) *Worker

NewWorker creates a new task worker.

func (*Worker) HandlerTypes

func (w *Worker) HandlerTypes() []TaskType

HandlerTypes returns the task types this worker handles.

func (*Worker) Queue

func (w *Worker) Queue() Queue

Queue returns the underlying queue (for testing/metrics).

func (*Worker) RegisterHandler

func (w *Worker) RegisterHandler(h Handler)

RegisterHandler registers a handler for a task type.

func (*Worker) Start

func (w *Worker) Start(ctx context.Context)

Start begins processing tasks.

func (*Worker) Stop

func (w *Worker) Stop()

Stop gracefully shuts down the worker.

type WorkerConfig

type WorkerConfig struct {
	ID           string
	Queue        Queue
	PollInterval time.Duration
	Concurrency  int
}

WorkerConfig configures the task worker.

Directories

Path Synopsis
Package handlers provides task handlers for the task queue.
Package handlers provides task handlers for the task queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL