goque

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2025 License: MIT Imports: 19 Imported by: 0

README

Goque

pipeline Coverage

A robust, database-backed task queue system for Go with built-in worker pools, retry logic, and graceful shutdown support. Supports PostgreSQL, MySQL, and SQLite.

Features

Current Features
  • Multi-database support - Works with PostgreSQL, MySQL, and SQLite
  • Reliable persistence - Task storage with ACID guarantees
  • Worker pool management - Configurable concurrent task processing using goroutine pools
  • Automatic retry logic - Configurable retry attempts with custom backoff strategies
  • Task lifecycle management - Track task status through multiple states (new, processing, done, error, etc.)
  • Graceful shutdown - Clean worker shutdown with in-flight task handling
  • Task timeout handling - Per-task timeout configuration with context cancellation
  • Extensible hooks - Before/after processing hooks for custom logic (metrics, logging, tracing)
  • Type-safe queries - PostgreSQL/MySQL use go-jet for type-safe SQL query generation
  • External ID support - Associate tasks with external identifiers for idempotency
  • Built-in task healer - Automatically marks stuck tasks as errored for reprocessing
  • Multi-processor support - Manage multiple task types with a single queue manager
  • Structured logging - Built-in structured logging with log/slog
  • Production-ready example - Complete example service with web dashboard and API
  • Prometheus metrics - Built-in Prometheus metrics for monitoring task queue performance

Installation

go get github.com/ruko1202/goque

Quick Start

1. Prepare database

Goque supports three database backends: PostgreSQL, MySQL, and SQLite.

# Configure your database connection in .env.local
# For PostgreSQL:
echo 'DB_DRIVER postgres' > .env.local
echo 'DB_DSN postgres://postgres:postgres@localhost:5432/goque?sslmode=disable' >> .env.local

# For MySQL:
# echo 'DB_DRIVER mysql' > .env.local
# echo 'DB_DSN root:root@tcp(localhost:3306)/goque?parseTime=true&loc=UTC' >> .env.local

# For SQLite:
# echo 'DB_DRIVER sqlite3' > .env.local
# echo 'DB_DSN ./goque.db' >> .env.local

# Install database tools
make bin-deps-db

# Run migrations (works with any database)
make db-up

For detailed database setup instructions, see DATABASE.md.

2. Create a Task Processor

Implement the TaskProcessor interface to define how your tasks should be processed:

type EmailProcessor struct{}

func (p *EmailProcessor) ProcessTask(ctx context.Context, task *entity.Task) error {
    // Your task processing logic here
    return sendEmail(task.Payload)
}
package main

import (
    "context"
    "time"

	"github.com/ruko1202/xlog"
    "github.com/jmoiron/sqlx"
    _ "github.com/lib/pq"
    "github.com/ruko1202/goque"
    "github.com/ruko1202/goque/pkg/goquestorage"
)

func main() {
	ctx := context.Background()
    // Initialize database connection
    db, err := goquestorage.NewDBConn(ctx, &goquestorage.Config{
        DSN: "postgres://user:pass@localhost:5432/goque?sslmode=disable",
        Driver: goquestorage.PgDriver,
    })
    if err != nil {
        xlog.Panic(ctx, err.Error())
    }

    // Create task storage (works with any supported database)
    taskStorage, err := goque.NewStorage(db)
    if err != nil {
        panic(err)
    }

    // Optional: Configure metrics with service name
    goque.SetMetricsServiceName("my-service")

    // Create Goque manager (includes built-in healer processor)
    goq := goque.NewGoque(taskStorage)

    // Register your task processors
    goq.RegisterProcessor(
        "send_email",
        &EmailProcessor{},
        goque.WithWorkersCount(10),
        goque.WithTaskProcessingMaxAttempts(3),
        goque.WithTaskProcessingTimeout(30 * time.Second),
    )

    // Run all processors
    goq.Run(ctx)

    // Graceful shutdown
    defer goq.Stop()
}
4. Adding Tasks to the Queue
payload := `{"to": "user@example.com", "subject": "Hello"}`
// Create a new task
task := goque.NewTask("send_email", payload)

// Or with external ID for idempotency
task := goque.NewTaskWithExternalID("send_email", payload, "external-order-123")

// Add to queue using TaskQueueManager (recommended - includes metrics)
taskQueueManager := goque.NewTaskQueueManager(taskStorage)
err := taskQueueManager.AddTaskToQueue(ctx, task)

Example Application

A complete, production-ready example service demonstrating real-world Goque usage is available in the examples/service directory.

For detailed instructions and API documentation, see examples/service/README.md.

Configuration Options

The GoqueProcessor supports various configuration options:

  • WithWorkers(n int) - Set the number of concurrent workers (default: 1)
  • WithMaxAttempts(n int32) - Set maximum retry attempts (default: 3)
  • WithTaskTimeout(d time.Duration) - Set per-task timeout (default: 30s)
  • WithFetchMaxTasks(n int64) - Set maximum tasks to fetch per cycle (default: 10)
  • WithFetchTick(d time.Duration) - Set fetch interval (default: 1s)
  • WithNextAttemptAtFunc(f NextAttemptAtFunc) - Custom retry backoff strategy
  • WithHooksBeforeProcessing(hooks ...HookBeforeProcessing) - Add pre-processing hooks
  • WithHooksAfterProcessing(hooks ...HookAfterProcessing) - Add post-processing hooks

Task Status Lifecycle

Tasks flow through the following states:

new → pending → processing → done
        ↑ ↓         ↓    ↓
        │ └──error──┘  canceled
        │     ↓
        │  attempts_left
        │
        └──(healer fixes stuck pending tasks)
Status Descriptions
  • new - Task created and ready to be picked up
  • pending - Task scheduled for future processing (via NextAttemptAt)
  • processing - Task currently being processed by a worker
  • done - Task completed successfully ✓ (terminal)
  • error - Task failed but has retry attempts remaining
  • attempts_left - Task failed and exhausted all retry attempts ✗ (terminal)
  • canceled - Task was manually canceled ✗ (terminal)
Valid State Transitions
Current Status Next Status Trigger
new pending Task scheduled for processing
pending processing Worker picks up task
pending error Healer marks stuck task (cure operation)
processing done Successful processing
processing error Failed processing with retries left
processing canceled Manual cancellation
error pending Retry logic schedules next attempt
error attempts_left No more retry attempts available
Terminal States

Tasks in these states will not be processed again:

  • done - Successfully completed
  • canceled - Manually canceled by user
  • attempts_left - Failed with no remaining retry attempts

Built-in Features

Task Healer

Goque includes a built-in healer processor that automatically monitors and fixes stuck tasks. Tasks that remain in the "pending" status for too long are automatically marked as errored, allowing them to be retried. The healer is automatically registered when you use internal.NewGoque().

You can configure the healer behavior:

goq.RegisterProcessor(
    "send_email",
    &EmailProcessor{},
    goque.WithWorkersCount(10),
    goque.WithTaskProcessingMaxAttempts(3),
    goque.WithTaskProcessingTimeout(30 * time.Second),

    goque.WithHealerPeriod(10*time.Minute),
    goque.WithHealerUpdatedAtTimeAgo(time.Hour),
    goque.WithHealerTimeout(30*time.Second),
)
Prometheus Metrics

Goque includes built-in Prometheus metrics for comprehensive monitoring of your task queue. Metrics are automatically collected during task processing operations.

Available Metrics
Metric Name Type Labels Description
goque_processed_tasks_total Counter task_type, status Total number of processed tasks by type and final status
goque_processed_tasks_with_error_total Counter task_type, task_processing_operations, task_error_type Tasks processed with errors, including error type details
goque_task_processing_duration_seconds Histogram task_type Task processing duration distribution in seconds
goque_task_payload_size_bytes Histogram task_type Task payload size distribution in bytes
Configuration
import (
    "github.com/prometheus/client_golang/prometheus/promhttp"
    "github.com/ruko1202/goque"
    "net/http"
)

// Optional: Set service name for metrics labels
goque.SetMetricsServiceName("my-service")

// Expose metrics endpoint
http.Handle("/metrics", promhttp.Handler())
go http.ListenAndServe(":9090", nil)
Task Processing Operations

Metrics track errors across different operations:

  • add_to_queue - Errors during task creation and queue insertion
  • processing - Errors during task execution
  • cleanup - Errors during task cleanup operations
  • health - Errors during healer operations
Example Queries
# Task processing rate by type
rate(goque_processed_tasks_total[5m])

# Task error rate
rate(goque_processed_tasks_with_error_total[5m])

# Average processing duration
rate(goque_task_processing_duration_seconds_sum[5m])
  / rate(goque_task_processing_duration_seconds_count[5m])

# 95th percentile processing time
histogram_quantile(0.95, goque_task_processing_duration_seconds_bucket)

# Tasks by status
sum by (status) (goque_processed_tasks_total)

For a complete example with metrics integration, see examples/service/.

Development

Prerequisites
  • Go 1.23 or higher
  • One of the supported databases:
    • PostgreSQL 12+
    • MySQL 8+
    • SQLite 3+
  • Make
Setup
# Install all dependencies and tools
make all

# Install only binary dependencies
make bin-deps

# Download Go modules
make deps
Running Tests
# Run all tests
make tloc

# Run tests with coverage
make test-cov
Code Quality
# Run linter
make lint

# Format code
make fmt
Database Operations

All database commands work with any supported database (PostgreSQL, MySQL, SQLite). Simply configure DB_DRIVER in .env.local and use universal commands:

# Show current database configuration
make db-info

# Create a new migration for current DB_DRIVER
make db-migrate-create name="your_migration_name"

# Check migration status
make db-status

# Apply migrations
make db-up

# Rollback last migration
make db-down

# Regenerate database models (PostgreSQL only)
make db-models

# Docker Compose commands for local development
make docker-up           # Start PostgreSQL and MySQL with Docker Compose
make docker-pg-up        # Start only PostgreSQL
make docker-mysql-up     # Start only MySQL
make docker-down         # Stop and remove all containers
make docker-down-volumes # Stop and remove containers with volumes
make docker-logs         # Show logs from all containers
make docker-ps           # Show status of containers

For complete database setup guide, migrations structure, and Docker Compose configuration, see DATABASE.md.

Generate Mocks
make mocks

Project Structure

.
├── goque_manager.go            # Public API - TaskQueueManager interface
├── internal/
│   ├── goque.go                # Main Goque implementation
│   ├── entity/                 # Domain entities and errors
│   │   ├── task.go             # Task entity
│   │   ├── errors.go           # Domain errors
│   │   └── operations.go       # Task operation constants
│   ├── queue_manager/          # Task queue manager implementation
│   ├── metrics/                # Prometheus metrics
│   │   ├── metrics.go          # Metrics collectors and functions
│   │   └── vars.go             # Metrics configuration
│   ├── processors/             # Task processing components
│   │   ├── queueprocessor/     # Main queue processor
│   │   └── internalprocessors/ # Built-in processors (healer, cleaner)
│   ├── storages/               # Data access layer (multi-database support)
│   │   ├── pg/task/            # PostgreSQL storage (go-jet)
│   │   ├── mysql/task/         # MySQL storage (raw SQL)
│   │   ├── sqlite/task/        # SQLite storage (raw SQL)
│   │   └── dbutils/            # Database utilities
│   └── pkg/
│       └── generated/          # Generated code (models, mocks)
├── migrations/                 # Database migrations
│   ├── pg/                     # PostgreSQL migrations
│   ├── mysql/                  # MySQL migrations
│   └── sqlite/                 # SQLite migrations
├── examples/service/           # Production-ready example service
├── DATABASE.md                 # Complete database setup guide
└── test/                       # Test utilities and fixtures

License

[Add your license here]

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Documentation

Overview

Package goque provides a distributed task queue manager with support for multiple processors and automatic task healing and cleaning.

Package goque provides a robust, SQL-backed task queue system for Go applications.

Index

Constants

View Source
const (
	TaskStatusNew          = entity.TaskStatusNew          // Task is ready to be picked up
	TaskStatusPending      = entity.TaskStatusPending      // Task is scheduled for future processing
	TaskStatusProcessing   = entity.TaskStatusProcessing   // Task is currently being processed
	TaskStatusDone         = entity.TaskStatusDone         // Task completed successfully
	TaskStatusCanceled     = entity.TaskStatusCanceled     // Task was manually canceled
	TaskStatusError        = entity.TaskStatusError        // Task failed but has retry attempts remaining
	TaskStatusAttemptsLeft = entity.TaskStatusAttemptsLeft // Task failed and exhausted all retries
)

Task status constants define the possible states a task can be in.

Variables

View Source
var (
	// WithValue adds a single key-value pair to the context for task metadata tracking.
	WithValue = goquectx.WithValue
	// WithValues adds multiple key-value pairs to the context for task metadata tracking.
	WithValues = goquectx.WithValues
	// ValueByKey retrieves stored value from the context by key.
	ValueByKey = goquectx.ValueByKey
	// Values retrieves all stored metadata values from the context.
	Values = goquectx.Values
)

Context value functions for storing and retrieving task metadata.

View Source
var (
	// NewTask creates a new task with the specified type and payload.
	NewTask = entity.NewTask
	// NewTaskWithExternalID creates a new task with an external identifier for idempotency.
	NewTaskWithExternalID = entity.NewTaskWithExternalID
)

Task creation functions for adding new tasks to the queue.

View Source
var (
	// WithTaskFetcherMaxTasks sets the maximum number of tasks to fetch in a single batch.
	WithTaskFetcherMaxTasks = queueprocessor.WithTaskFetcherMaxTasks
	// WithTaskFetcherTick sets the interval between task fetch attempts.
	WithTaskFetcherTick = queueprocessor.WithTaskFetcherTick
	// WithTaskFetcherTimeout sets the timeout for fetching tasks from storage.
	WithTaskFetcherTimeout = queueprocessor.WithTaskFetcherTimeout
)

Task fetcher configuration options.

View Source
var (
	// WithWorkersCount sets the number of concurrent workers for processing tasks.
	WithWorkersCount = queueprocessor.WithWorkersCount
	// WithWorkersPanicHandler sets a custom panic handler for worker goroutines.
	WithWorkersPanicHandler = queueprocessor.WithWorkersPanicHandler
	// WithTaskProcessingTimeout sets the timeout for processing a single task.
	WithTaskProcessingTimeout = queueprocessor.WithTaskProcessingTimeout
	// WithTaskProcessingMaxAttempts sets the maximum number of retry attempts for failed tasks.
	WithTaskProcessingMaxAttempts = queueprocessor.WithTaskProcessingMaxAttempts
	// WithTaskProcessingNextAttemptAtFunc sets a custom function to calculate the next retry time.
	WithTaskProcessingNextAttemptAtFunc = queueprocessor.WithTaskProcessingNextAttemptAtFunc
)

Worker and task processing configuration options.

View Source
var (
	// WithHooksBeforeProcessing sets hooks to execute before processing each task.
	WithHooksBeforeProcessing = queueprocessor.WithHooksBeforeProcessing
	// WithHooksAfterProcessing sets hooks to execute after processing each task.
	WithHooksAfterProcessing = queueprocessor.WithHooksAfterProcessing
)

Hook configuration options for task processing.

View Source
var (
	// WithCleanerUpdatedAtTimeAgo sets the age threshold for tasks to be cleaned.
	WithCleanerUpdatedAtTimeAgo = queueprocessor.WithCleanerUpdatedAtTimeAgo
	// WithCleanerTimeout sets the timeout for the cleaner operation.
	WithCleanerTimeout = queueprocessor.WithCleanerTimeout
	// WithCleanerPeriod sets the interval between cleaner runs.
	WithCleanerPeriod = queueprocessor.WithCleanerPeriod
)

Cleaner configuration options for removing old tasks.

View Source
var (
	// WithHealerUpdatedAtTimeAgo sets the age threshold for tasks to be healed.
	WithHealerUpdatedAtTimeAgo = queueprocessor.WithHealerUpdatedAtTimeAgo
	// WithHealerTimeout sets the timeout for the healer operation.
	WithHealerTimeout = queueprocessor.WithHealerTimeout
	// WithHealerPeriod sets the interval between healer runs.
	WithHealerPeriod = queueprocessor.WithHealerPeriod
)

Healer configuration options for fixing stuck tasks.

View Source
var NoopTaskProcessor = queueprocessor.NoopTaskProcessor

NoopTaskProcessor is a no-op task processor that does nothing and returns nil.

Functions

func SetMetricsServiceName added in v0.2.0

func SetMetricsServiceName(name string)

SetMetricsServiceName sets the service name label for Prometheus metrics. This should be called once during application initialization, before starting the queue manager.

Example:

goque.SetMetricsServiceName("my-service")

Types

type Goque

type Goque struct {
	// contains filtered or unexported fields
}

Goque is the main task queue manager that coordinates multiple task processors.

func NewGoque

func NewGoque(taskStorage TaskStorage) *Goque

NewGoque creates a new Goque instance with the specified task storage.

func (*Goque) RegisterProcessor

func (g *Goque) RegisterProcessor(
	processorType string,
	taskProcessor queueprocessor.TaskProcessor,
	opts ...queueprocessor.GoqueProcessorOpts,
)

RegisterProcessor registers a new task processor for a specific task type. Should be call before Run.

func (*Goque) Run

func (g *Goque) Run(ctx context.Context) error

Run starts all registered processors in separate goroutines.

func (*Goque) Stop

func (g *Goque) Stop()

Stop gracefully shuts down all registered processors and waits for them to finish.

type Metadata added in v0.3.0

type Metadata = entity.Metadata

Metadata represents arbitrary key-value data associated with a task for tracking and context.

type Task added in v0.0.4

type Task = entity.Task

Task represents a unit of work to be processed by the queue system.

type TaskFilter added in v0.1.1

type TaskFilter = dbentity.GetTasksFilter

TaskFilter represents filtering criteria for querying tasks from the queue.

type TaskProcessorFunc added in v0.0.4

type TaskProcessorFunc = queueprocessor.TaskProcessorFunc

TaskProcessorFunc is a function type that implements the TaskProcessor interface.

type TaskQueueManager added in v0.1.1

type TaskQueueManager interface {
	AsyncAddTaskToQueue(ctx context.Context, task *Task)
	AddTaskToQueue(ctx context.Context, task *Task) error
	GetTask(ctx context.Context, taskID uuid.UUID) (*Task, error)
	GetTasks(ctx context.Context, filter *TaskFilter, limit int64) ([]*Task, error)
	ResetAttempts(ctx context.Context, taskID uuid.UUID) error
}

TaskQueueManager provides operations for managing tasks in the queue. It offers both synchronous and asynchronous methods for adding tasks, as well as querying and managing existing tasks.

func NewTaskQueueManager added in v0.1.1

func NewTaskQueueManager(taskStorage TaskStorage) TaskQueueManager

NewTaskQueueManager creates a new TaskQueueManager instance with the specified task storage.

type TaskStatus added in v0.0.4

type TaskStatus = entity.TaskStatus

TaskStatus represents the current status of a task in its lifecycle.

type TaskStorage

type TaskStorage = storages.Task

TaskStorage defines the interface for task persistence operations.

func NewStorage added in v0.0.4

func NewStorage(db *sqlx.DB) (TaskStorage, error)

NewStorage creates a new task storage instance based on the database driver. Supports PostgreSQL and MySQL databases.

type TaskType added in v0.0.4

type TaskType = entity.TaskType

TaskType represents the type identifier for tasks in the queue.

Directories

Path Synopsis
internal
entity
Package entity contains domain entities for the task queue system.
Package entity contains domain entities for the task queue system.
metrics
Package metrics provides Prometheus instrumentation for Goque task queue operations.
Package metrics provides Prometheus instrumentation for Goque task queue operations.
pkg/generated/mocks/mock_storages
Package mock_storages is a generated GoMock package.
Package mock_storages is a generated GoMock package.
processors/internalprocessors
Package internalprocessors provides internal task processors for queue management including cleaning and healing operations.
Package internalprocessors provides internal task processors for queue management including cleaning and healing operations.
processors/queueprocessor
Package queueprocessor provides task queue processing functionality with configurable workers and retry logic.
Package queueprocessor provides task queue processing functionality with configurable workers and retry logic.
queuemanager
Package queuemanager provides high-level task queue management operations.
Package queuemanager provides high-level task queue management operations.
storages
Package storages provides interfaces and implementations for task storage backends.
Package storages provides interfaces and implementations for task storage backends.
storages/dbentity
Package dbentity provides common database entities and filters for task storage implementations.
Package dbentity provides common database entities and filters for task storage implementations.
storages/dbutils
Package dbutils provides common database utilities for task storage implementations.
Package dbutils provides common database utilities for task storage implementations.
storages/mysql/task
Package mysqltask provides MySQL storage operations for task management in the queue system.
Package mysqltask provides MySQL storage operations for task management in the queue system.
storages/pg/task
Package task provides storage operations for task management in the queue system.
Package task provides storage operations for task management in the queue system.
storages/sqlite
Package sqlite provides SQLite storage operations for task management in the queue system.
Package sqlite provides SQLite storage operations for task management in the queue system.
utils/goquectx
Package goquectx provides utilities for managing task metadata within context values.
Package goquectx provides utilities for managing task metadata within context values.
utils/xtime
Package xtime provides time-related utility functions.
Package xtime provides time-related utility functions.
pkg
goquestorage
Package goquestorage provides task storage implementations for different database backends.
Package goquestorage provides task storage implementations for different database backends.
scripts
dbmodels command
test
testutils
Package testutils provides testing utilities for the goque project.
Package testutils provides testing utilities for the goque project.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL