Documentation
¶
Overview ¶
Package taskqueue provides a durable task queue for background processing.
Supported backends: - Database (Vitess/MySQL) - default, uses existing infrastructure - In-memory - for testing only
Community use cases: - Object/version cleanup - Lifecycle transitions
Enterprise use cases (requires license): - Cross-region replication (CRR) - Audit log shipping - Webhook delivery
Note: GC RefCount decrements are now handled by the centralized chunk_registry in the metadata database, eliminating the need for distributed task queues.
Index ¶
- Constants
- Variables
- func MarshalPayload(v any) (json.RawMessage, error)
- func UnmarshalPayload[T any](payload json.RawMessage) (T, error)
- type DBQueue
- func (q *DBQueue) Cancel(ctx context.Context, taskID string) error
- func (q *DBQueue) Cleanup(ctx context.Context, olderThan time.Duration) (int, error)
- func (q *DBQueue) Close() error
- func (q *DBQueue) Complete(ctx context.Context, taskID string) error
- func (q *DBQueue) Dequeue(ctx context.Context, workerID string, taskTypes ...TaskType) (*Task, error)
- func (q *DBQueue) Enqueue(ctx context.Context, task *Task) error
- func (q *DBQueue) Fail(ctx context.Context, taskID string, taskErr error) error
- func (q *DBQueue) Get(ctx context.Context, taskID string) (*Task, error)
- func (q *DBQueue) Heartbeat(ctx context.Context, taskID string, workerID string) error
- func (q *DBQueue) List(ctx context.Context, filter TaskFilter) ([]*Task, error)
- func (q *DBQueue) ReclaimStale(ctx context.Context) (int, error)
- func (q *DBQueue) Stats(ctx context.Context) (*QueueStats, error)
- func (q *DBQueue) VisibilityTimeout() time.Duration
- type DBQueueConfig
- type Driver
- type EventPayload
- type FederationDiscoveryPayload
- type FederationIngestPayload
- type Handler
- type LifecyclePayload
- type MemoryQueue
- func (q *MemoryQueue) Cancel(ctx context.Context, taskID string) error
- func (q *MemoryQueue) Cleanup(ctx context.Context, olderThan time.Duration) (int, error)
- func (q *MemoryQueue) Close() error
- func (q *MemoryQueue) Complete(ctx context.Context, taskID string) error
- func (q *MemoryQueue) Dequeue(ctx context.Context, workerID string, taskTypes ...TaskType) (*Task, error)
- func (q *MemoryQueue) Enqueue(ctx context.Context, task *Task) error
- func (q *MemoryQueue) Fail(ctx context.Context, taskID string, err error) error
- func (q *MemoryQueue) Get(ctx context.Context, taskID string) (*Task, error)
- func (q *MemoryQueue) Heartbeat(ctx context.Context, taskID string, workerID string) error
- func (q *MemoryQueue) List(ctx context.Context, filter TaskFilter) ([]*Task, error)
- func (q *MemoryQueue) Stats(ctx context.Context) (*QueueStats, error)
- type Queue
- type QueueStats
- type RestorePayload
- type Task
- type TaskFilter
- type TaskPriority
- type TaskStatus
- type TaskType
- type Worker
- type WorkerConfig
Constants ¶
const ( DefaultPollInterval = time.Second DefaultConcurrency = 5 DefaultVisibilityTimeout = 5 * time.Minute DefaultMaxRetries = 3 )
Default configuration values
const ( LifecycleActionDelete = "delete" LifecycleActionDeleteVersion = "delete_version" LifecycleActionTransition = "transition" LifecycleActionAbortMPU = "abort_mpu" LifecycleActionPromote = "promote" // Intelligent tiering auto-promotion on access )
Lifecycle action constants
const ( RestoreTierExpedited = "Expedited" // 1-5 minutes (only Glacier) RestoreTierStandard = "Standard" // 3-5 hours (Glacier), 12 hours (Deep Archive) RestoreTierBulk = "Bulk" // 5-12 hours (Glacier), 48 hours (Deep Archive) )
Restore tier constants (match AWS retrieval tiers)
Variables ¶
var ( // TasksProcessedTotal tracks total tasks processed by type and status TasksProcessedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "zapfs", Subsystem: "taskqueue", Name: "tasks_processed_total", Help: "Total number of tasks processed", }, []string{"type", "status"}) // status: "completed", "failed", "no_handler" // TaskProcessingDuration tracks task processing time by type TaskProcessingDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "zapfs", Subsystem: "taskqueue", Name: "task_processing_duration_seconds", Help: "Time spent processing tasks", Buckets: []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60}, }, []string{"type"}) // TasksEnqueuedTotal tracks total tasks enqueued by type TasksEnqueuedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "zapfs", Subsystem: "taskqueue", Name: "tasks_enqueued_total", Help: "Total number of tasks enqueued", }, []string{"type"}) // TaskRetries tracks task retry counts TaskRetries = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "zapfs", Subsystem: "taskqueue", Name: "task_retries_total", Help: "Total number of task retries", }, []string{"type"}) // QueueDepth tracks current queue depth by status QueueDepth = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "zapfs", Subsystem: "taskqueue", Name: "queue_depth", Help: "Current number of tasks in queue by status", }, []string{"status"}) // status: "pending", "running", "failed" // WorkerActive tracks number of active workers WorkerActive = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "zapfs", Subsystem: "taskqueue", Name: "workers_active", Help: "Number of active worker goroutines", }) // DequeueErrors tracks dequeue operation errors DequeueErrors = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "zapfs", Subsystem: "taskqueue", Name: "dequeue_errors_total", Help: "Total number of dequeue errors", }) // DeadlockRetries tracks deadlock retry attempts DeadlockRetries = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "zapfs", Subsystem: "taskqueue", Name: "deadlock_retries_total", Help: "Total number of deadlock retry attempts", }) )
var ( ErrTaskNotFound = errors.New("task not found") ErrQueueFull = errors.New("task queue is full") ErrQueueClosed = errors.New("task queue is closed") ErrInvalidPayload = errors.New("invalid task payload") ErrMaxRetriesExceeded = errors.New("max retries exceeded") )
Common errors
Functions ¶
func MarshalPayload ¶
func MarshalPayload(v any) (json.RawMessage, error)
MarshalPayload is a helper to marshal a payload struct to JSON.
func UnmarshalPayload ¶
func UnmarshalPayload[T any](payload json.RawMessage) (T, error)
UnmarshalPayload is a helper to unmarshal a JSON payload.
Types ¶
type DBQueue ¶
type DBQueue struct {
// contains filtered or unexported fields
}
DBQueue is a database-backed implementation of Queue. Supports MySQL/Vitess and PostgreSQL/CockroachDB for durable, distributed task storage. Supports multiple concurrent workers via FOR UPDATE SKIP LOCKED.
func NewDBQueue ¶
func NewDBQueue(cfg DBQueueConfig) (*DBQueue, error)
NewDBQueue creates a new database-backed queue.
func (*DBQueue) Heartbeat ¶
Heartbeat extends the visibility timeout for a running task. Workers should call this periodically to prevent the task from being reclaimed. Uses deadlock retry to ensure heartbeat succeeds even under contention.
func (*DBQueue) ReclaimStale ¶
ReclaimStale finds tasks that have been running too long without a heartbeat and marks them as pending for retry. Returns the number of tasks reclaimed.
func (*DBQueue) VisibilityTimeout ¶
VisibilityTimeout returns the configured visibility timeout.
type DBQueueConfig ¶
type DBQueueConfig struct {
DB *sql.DB
Driver Driver // Database driver (mysql, postgres). Defaults to mysql.
TableName string // Defaults to "tasks"
VisibilityTimeout time.Duration // How long before a running task is considered abandoned (default: 5m)
}
DBQueueConfig configures the database queue.
type EventPayload ¶
type EventPayload struct {
// EventName is the S3 event type (e.g., "s3:ObjectCreated:Put").
EventName string `json:"event_name"`
// Bucket is the bucket name where the event occurred.
Bucket string `json:"bucket"`
// Key is the object key (empty for bucket-level events).
Key string `json:"key"`
// Size is the object size in bytes (for object events).
Size int64 `json:"size"`
// ETag is the object's entity tag.
ETag string `json:"etag"`
// VersionID is the object version (if versioning enabled).
VersionID string `json:"version_id,omitempty"`
// OwnerID is the bucket/object owner's canonical user ID.
OwnerID string `json:"owner_id"`
// RequestID is the original S3 request ID.
RequestID string `json:"request_id"`
// SourceIP is the client IP that made the request.
SourceIP string `json:"source_ip"`
// Timestamp is the event time in Unix milliseconds.
Timestamp int64 `json:"timestamp"`
// Sequencer is used for ordering events on the same object.
Sequencer string `json:"sequencer"`
// UserAgent is the client's user agent string.
UserAgent string `json:"user_agent,omitempty"`
}
EventPayload is the payload for TaskTypeEvent tasks. Contains S3 event metadata for delivery to notification destinations.
type FederationDiscoveryPayload ¶
type FederationDiscoveryPayload struct {
// Local bucket name (federated bucket)
Bucket string `json:"bucket"`
// Pagination state for resuming discovery
StartAfter string `json:"start_after,omitempty"` // Continue from this key
Prefix string `json:"prefix,omitempty"` // Optional prefix filter
// Batch configuration
BatchSize int `json:"batch_size"` // Max objects per batch (default: 1000)
// Versioning
IncludeVersions bool `json:"include_versions"` // Discover all versions (for versioned buckets)
// Tracking
StartedAt int64 `json:"started_at"` // Unix timestamp (nanos) when discovery started
}
FederationDiscoveryPayload is the payload for TaskTypeFederationDiscovery tasks. Contains metadata for discovering objects in an external S3 bucket.
type FederationIngestPayload ¶
type FederationIngestPayload struct {
// Local bucket name (where the object will be stored)
Bucket string `json:"bucket"`
// Object key
Key string `json:"key"`
// Version ID from external S3 (empty for non-versioned)
VersionID string `json:"version_id,omitempty"`
// Object metadata from external S3
Size int64 `json:"size"`
ETag string `json:"etag"`
ContentType string `json:"content_type,omitempty"`
StorageClass string `json:"storage_class,omitempty"`
LastModified int64 `json:"last_modified"` // Unix timestamp (nanos)
// Tracking
DiscoveredAt int64 `json:"discovered_at"` // Unix timestamp (nanos) when object was discovered
}
FederationIngestPayload is the payload for TaskTypeFederationIngest tasks. Contains metadata for ingesting an object from external S3 to local storage.
type Handler ¶
type Handler interface {
// Type returns the task type this handler processes.
Type() TaskType
// Handle processes the task and returns an error if it failed.
Handle(ctx context.Context, task *Task) error
}
Handler processes tasks of a specific type.
type LifecyclePayload ¶
type LifecyclePayload struct {
// Object identification
Bucket string `json:"bucket"`
Key string `json:"key"`
VersionID string `json:"version_id,omitempty"`
// Action to take: "delete", "delete_version", "transition", "abort_mpu"
Action string `json:"action"`
StorageClass string `json:"storage_class,omitempty"` // For transitions
// Tracking
RuleID string `json:"rule_id"`
EvaluatedAt int64 `json:"evaluated_at"` // Unix timestamp (nanos)
// Verification (to ensure object hasn't changed since evaluation)
ExpectedETag string `json:"expected_etag,omitempty"`
ExpectedModTime int64 `json:"expected_mod_time,omitempty"` // Unix timestamp (nanos)
// For multipart uploads
UploadID string `json:"upload_id,omitempty"`
}
LifecyclePayload is the payload for TaskTypeLifecycle tasks. Contains object metadata and action to perform.
type MemoryQueue ¶
type MemoryQueue struct {
// contains filtered or unexported fields
}
MemoryQueue is an in-memory implementation of Queue for testing. NOT for production use - tasks are not persisted.
func (*MemoryQueue) Close ¶
func (q *MemoryQueue) Close() error
func (*MemoryQueue) Complete ¶
func (q *MemoryQueue) Complete(ctx context.Context, taskID string) error
func (*MemoryQueue) List ¶
func (q *MemoryQueue) List(ctx context.Context, filter TaskFilter) ([]*Task, error)
func (*MemoryQueue) Stats ¶
func (q *MemoryQueue) Stats(ctx context.Context) (*QueueStats, error)
type Queue ¶
type Queue interface {
// Enqueue adds a task to the queue.
Enqueue(ctx context.Context, task *Task) error
// Dequeue retrieves the next available task for processing.
// Returns nil if no tasks are available.
// Uses FOR UPDATE SKIP LOCKED for concurrent worker support.
Dequeue(ctx context.Context, workerID string, taskTypes ...TaskType) (*Task, error)
// Complete marks a task as successfully completed.
Complete(ctx context.Context, taskID string) error
// Fail marks a task as failed with an error message.
// If retries remain, the task will be requeued.
Fail(ctx context.Context, taskID string, err error) error
// Cancel marks a task as cancelled.
Cancel(ctx context.Context, taskID string) error
// Heartbeat extends the visibility timeout for a running task.
// Workers should call this periodically for long-running tasks.
Heartbeat(ctx context.Context, taskID string, workerID string) error
// Get retrieves a task by ID.
Get(ctx context.Context, taskID string) (*Task, error)
// List returns tasks matching the filter.
List(ctx context.Context, filter TaskFilter) ([]*Task, error)
// Stats returns queue statistics.
Stats(ctx context.Context) (*QueueStats, error)
// Cleanup removes old completed/failed tasks.
Cleanup(ctx context.Context, olderThan time.Duration) (int, error)
// Close shuts down the queue.
Close() error
}
Queue defines the interface for task queue operations.
type QueueStats ¶
type QueueStats struct {
Pending int64 `json:"pending"`
Running int64 `json:"running"`
Completed int64 `json:"completed"`
Failed int64 `json:"failed"`
DeadLetter int64 `json:"dead_letter"`
// By type
ByType map[TaskType]int64 `json:"by_type"`
// Performance
AvgProcessingTime time.Duration `json:"avg_processing_time_ms"`
OldestPending *time.Time `json:"oldest_pending,omitempty"`
}
QueueStats provides queue metrics.
type RestorePayload ¶
type RestorePayload struct {
// Object identification
ObjectID string `json:"object_id"` // UUID of the object
Bucket string `json:"bucket"` // Bucket name
Key string `json:"key"` // Object key
VersionID string `json:"version_id,omitempty"` // Version ID if versioned
// Restore configuration
Days int `json:"days"` // Number of days to keep restored copy
Tier string `json:"tier"` // Retrieval tier: "Expedited", "Standard", "Bulk"
// Source location
StorageClass string `json:"storage_class"` // Current storage class (GLACIER, DEEP_ARCHIVE)
TransitionedRef string `json:"transitioned_ref,omitempty"` // Reference to archived data
// Tracking
RequestedAt int64 `json:"requested_at"` // Unix nano when restore was initiated
RequestID string `json:"request_id"` // Original S3 request ID
}
RestorePayload is the payload for TaskTypeRestore tasks. Contains metadata for restoring an archived object.
type Task ¶
type Task struct {
// Identification
ID string `json:"id" db:"id"`
Type TaskType `json:"type" db:"type"`
Status TaskStatus `json:"status" db:"status"`
Priority TaskPriority `json:"priority" db:"priority"`
// Payload - JSON encoded task-specific data
Payload json.RawMessage `json:"payload" db:"payload"`
// Scheduling
ScheduledAt time.Time `json:"scheduled_at" db:"scheduled_at"`
StartedAt *time.Time `json:"started_at,omitempty" db:"started_at"`
CompletedAt *time.Time `json:"completed_at,omitempty" db:"completed_at"`
// Retry handling
Attempts int `json:"attempts" db:"attempts"`
MaxRetries int `json:"max_retries" db:"max_retries"`
RetryAfter time.Time `json:"retry_after,omitempty" db:"retry_after"`
// Error tracking
LastError string `json:"last_error,omitempty" db:"last_error"`
// Metadata
CreatedAt time.Time `json:"created_at" db:"created_at"`
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
Region string `json:"region,omitempty" db:"region"` // Source region
WorkerID string `json:"worker_id,omitempty" db:"worker_id"`
}
Task represents a unit of work to be processed.
type TaskFilter ¶
type TaskFilter struct {
Type TaskType `json:"type,omitempty"`
Status TaskStatus `json:"status,omitempty"`
Region string `json:"region,omitempty"`
Limit int `json:"limit,omitempty"`
Offset int `json:"offset,omitempty"`
}
TaskFilter for querying tasks.
type TaskPriority ¶
type TaskPriority int
TaskPriority allows urgent tasks to be processed first.
const ( PriorityLow TaskPriority = 0 PriorityNormal TaskPriority = 5 PriorityHigh TaskPriority = 10 PriorityUrgent TaskPriority = 20 )
type TaskStatus ¶
type TaskStatus string
TaskStatus represents the current state of a task.
const ( StatusPending TaskStatus = "pending" // Waiting to be picked up StatusRunning TaskStatus = "running" // Currently being processed StatusCompleted TaskStatus = "completed" // Successfully finished StatusFailed TaskStatus = "failed" // Failed, may retry StatusDeadLetter TaskStatus = "dead_letter" // Failed permanently StatusCancelled TaskStatus = "cancelled" // Cancelled by user/system )
type TaskType ¶
type TaskType string
TaskType identifies the type of task for routing to handlers.
const ( TaskTypeCleanup TaskType = "cleanup" // Object/version cleanup TaskTypeLifecycle TaskType = "lifecycle" // Lifecycle transitions TaskTypeEvent TaskType = "event" // S3 event notification TaskTypeRestore TaskType = "restore" // Object restore from archive // Federation task types (S3 passthrough/migration) TaskTypeFederationIngest TaskType = "federation_ingest" // Ingest object from external S3 to local TaskTypeFederationDiscovery TaskType = "federation_discovery" // Discover objects in external S3 bucket )
Community task types
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker polls the queue and executes tasks.
func (*Worker) HandlerTypes ¶
HandlerTypes returns the task types this worker handles.
func (*Worker) RegisterHandler ¶
RegisterHandler registers a handler for a task type.