goque

package module
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2025 License: MIT Imports: 13 Imported by: 0

README

Goque

pipeline Coverage

A robust, database-backed task queue system for Go with built-in worker pools, retry logic, and graceful shutdown support. Supports PostgreSQL, MySQL, and SQLite.

Features

  • Multi-database support - Works with PostgreSQL, MySQL, and SQLite
  • Reliable persistence - Task storage with ACID guarantees
  • Worker pool management - Configurable concurrent task processing using goroutine pools
  • Automatic retry logic - Configurable retry attempts with custom backoff strategies
  • Task lifecycle management - Track task status through multiple states (new, processing, done, error, etc.)
  • Graceful shutdown - Clean worker shutdown with in-flight task handling
  • Task timeout handling - Per-task timeout configuration with context cancellation
  • Extensible hooks - Before/after processing hooks for custom logic
  • Type-safe queries - PostgreSQL uses go-jet for type-safe SQL query generation
  • External ID support - Associate tasks with external identifiers for idempotency
  • Built-in task healer - Automatically marks stuck tasks as errored for reprocessing
  • Multi-processor support - Manage multiple task types with a single queue manager

Installation

go get github.com/ruko1202/goque

Quick Start

1. Prepare database

Goque supports three database backends: PostgreSQL, MySQL, and SQLite.

# Configure your database connection in .env.local
# For PostgreSQL:
echo 'DB_DRIVER postgres' > .env.local
echo 'DB_DSN postgres://postgres:postgres@localhost:5432/goque?sslmode=disable' >> .env.local

# For MySQL:
# echo 'DB_DRIVER mysql' > .env.local
# echo 'DB_DSN root:root@tcp(localhost:3306)/goque?parseTime=true&loc=UTC' >> .env.local

# For SQLite:
# echo 'DB_DRIVER sqlite3' > .env.local
# echo 'DB_DSN ./goque.db' >> .env.local

# Install database tools
make bin-deps-db

# Run migrations (works with any database)
make db-up

For detailed database setup instructions, see DATABASE.md.

2. Create a Task Processor

Implement the TaskProcessor interface to define how your tasks should be processed:

type EmailProcessor struct{}

func (p *EmailProcessor) ProcessTask(ctx context.Context, task *entity.Task) error {
    // Your task processing logic here
    return sendEmail(task.Payload)
}
package main

import (
    "context"
    "database/sql"
    "time"

    "github.com/ruko1202/goque/internal"
    "github.com/ruko1202/goque/internal/processor"
    "github.com/ruko1202/goque/internal/storages/task"
)

func main() {
    // Initialize database connection
    // For PostgreSQL: sql.Open("postgres", dsn)
    // For MySQL: sql.Open("mysql", dsn)
    // For SQLite: sql.Open("sqlite3", dsn)
    db, err := sql.Open("postgres", "your-connection-string")
    if err != nil {
        panic(err)
    }

    // Create task storage (works with any supported database)
    taskStorage := task.NewStorage(db)

    // Create Goque manager (includes built-in healer processor)
    goque := internal.NewGoque(taskStorage)

    // Register your task processors
    goque.RegisterProcessor(
        "send_email",
        &EmailProcessor{},
        processor.WithWorkers(10),
        processor.WithMaxAttempts(3),
        processor.WithTaskTimeout(30 * time.Second),
    )

    // Run all processors
    ctx := context.Background()
    goque.Run(ctx)

    // Graceful shutdown
    defer goque.Stop()
}
4. Adding Tasks to the Queue
import "github.com/ruko1202/goque/internal/entity"

// Create a new task
task := entity.NewTask("send_email", `{"to": "user@example.com", "subject": "Hello"}`)

// Or with external ID for idempotency
task := entity.NewTaskWithExternalID("send_email", payload, "order-123")

// Add to storage
err := taskStorage.AddTask(ctx, task)

Configuration Options

The GoqueProcessor supports various configuration options:

  • WithWorkers(n int) - Set the number of concurrent workers (default: 1)
  • WithMaxAttempts(n int32) - Set maximum retry attempts (default: 3)
  • WithTaskTimeout(d time.Duration) - Set per-task timeout (default: 30s)
  • WithFetchMaxTasks(n int64) - Set maximum tasks to fetch per cycle (default: 10)
  • WithFetchTick(d time.Duration) - Set fetch interval (default: 1s)
  • WithNextAttemptAtFunc(f NextAttemptAtFunc) - Custom retry backoff strategy
  • WithHooksBeforeProcessing(hooks ...HookBeforeProcessing) - Add pre-processing hooks
  • WithHooksAfterProcessing(hooks ...HookAfterProcessing) - Add post-processing hooks

Task Status Lifecycle

Tasks flow through the following states:

new → pending → processing → done
        ↑ ↓         ↓    ↓
        │ └──error──┘  canceled
        │     ↓
        │  attempts_left
        │
        └──(healer fixes stuck pending tasks)
Status Descriptions
  • new - Task created and ready to be picked up
  • pending - Task scheduled for future processing (via NextAttemptAt)
  • processing - Task currently being processed by a worker
  • done - Task completed successfully ✓ (terminal)
  • error - Task failed but has retry attempts remaining
  • attempts_left - Task failed and exhausted all retry attempts ✗ (terminal)
  • canceled - Task was manually canceled ✗ (terminal)
Valid State Transitions
Current Status Next Status Trigger
new pending Task scheduled for processing
pending processing Worker picks up task
pending error Healer marks stuck task (cure operation)
processing done Successful processing
processing error Failed processing with retries left
processing canceled Manual cancellation
error pending Retry logic schedules next attempt
error attempts_left No more retry attempts available
Terminal States

Tasks in these states will not be processed again:

  • done - Successfully completed
  • canceled - Manually canceled by user
  • attempts_left - Failed with no remaining retry attempts

Built-in Features

Task Healer

Goque includes a built-in healer processor that automatically monitors and fixes stuck tasks. Tasks that remain in the "pending" status for too long are automatically marked as errored, allowing them to be retried. The healer is automatically registered when you use internal.NewGoque().

You can configure the healer behavior:

import internalprocessors "github.com/ruko1202/goque/internal/internal_processors"

goque := internal.NewGoque(
    taskStorage,
    internalprocessors.WithHealerUpdatedAtTimeAgo(5 * time.Minute),
    internalprocessors.WithHealerMaxTasks(200),
)

Development

Prerequisites
  • Go 1.23 or higher
  • One of the supported databases:
    • PostgreSQL 12+
    • MySQL 8+
    • SQLite 3+
  • Make
Setup
# Install all dependencies and tools
make all

# Install only binary dependencies
make bin-deps

# Download Go modules
make deps
Running Tests
# Run all tests
make tloc

# Run tests with coverage
make test-cov
Code Quality
# Run linter
make lint

# Format code
make fmt
Database Operations

All database commands work with any supported database (PostgreSQL, MySQL, SQLite). Simply configure DB_DRIVER in .env.local and use universal commands:

# Show current database configuration
make db-info

# Create a new migration for current DB_DRIVER
make db-migrate-create name="your_migration_name"

# Check migration status
make db-status

# Apply migrations
make db-up

# Rollback last migration
make db-down

# Regenerate database models (PostgreSQL only)
make db-models

# Docker Compose commands for local development
make docker-up           # Start PostgreSQL and MySQL with Docker Compose
make docker-pg-up        # Start only PostgreSQL
make docker-mysql-up     # Start only MySQL
make docker-down         # Stop and remove all containers
make docker-down-volumes # Stop and remove containers with volumes
make docker-logs         # Show logs from all containers
make docker-ps           # Show status of containers

For complete database setup guide, migrations structure, and Docker Compose configuration, see DATABASE.md.

Generate Mocks
make mocks

Project Structure

.
├── internal/
│   ├── goque.go                # Main queue manager
│   ├── entity/                 # Domain entities (Task, etc.)
│   ├── processor/              # Queue processor and task processor interfaces
│   ├── internal_processors/    # Built-in processors (healer, etc.)
│   ├── storages/               # Data access layer (multi-database support)
│   │   ├── pg/task/            # PostgreSQL storage (go-jet)
│   │   ├── mysql/task/         # MySQL storage (raw SQL)
│   │   ├── sqlite/task/        # SQLite storage (raw SQL)
│   │   └── dbutils/            # Database utilities
│   └── pkg/
│       └── generated/          # Generated code (models, mocks)
├── migrations/                 # Database migrations
│   ├── pg/                     # PostgreSQL migrations
│   ├── mysql/                  # MySQL migrations
│   └── sqlite/                 # SQLite migrations
├── DATABASE.md                 # Complete database setup guide
└── test/                       # Test utilities and fixtures

License

[Add your license here]

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Documentation

Overview

Package goque provides a distributed task queue manager with support for multiple processors and automatic task healing and cleaning.

Package goque provides a robust, PostgreSQL-backed task queue system for Go applications.

Index

Constants

View Source
const (
	TaskStatusNew          = entity.TaskStatusNew          // Task is ready to be picked up
	TaskStatusPending      = entity.TaskStatusPending      // Task is scheduled for future processing
	TaskStatusProcessing   = entity.TaskStatusProcessing   // Task is currently being processed
	TaskStatusDone         = entity.TaskStatusDone         // Task completed successfully
	TaskStatusCanceled     = entity.TaskStatusCanceled     // Task was manually canceled
	TaskStatusError        = entity.TaskStatusError        // Task failed but has retry attempts remaining
	TaskStatusAttemptsLeft = entity.TaskStatusAttemptsLeft // Task failed and exhausted all retries
)

Task status constants define the possible states a task can be in.

Variables

View Source
var (
	// NewTask creates a new task with the specified type and payload.
	NewTask = entity.NewTask
	// NewTaskWithExternalID creates a new task with an external identifier for idempotency.
	NewTaskWithExternalID = entity.NewTaskWithExternalID
)

Task creation functions for adding new tasks to the queue.

View Source
var (
	// WithTaskFetcherMaxTasks sets the maximum number of tasks to fetch in a single batch.
	WithTaskFetcherMaxTasks = queueprocessor.WithTaskFetcherMaxTasks
	// WithTaskFetcherTick sets the interval between task fetch attempts.
	WithTaskFetcherTick = queueprocessor.WithTaskFetcherTick
	// WithTaskFetcherTimeout sets the timeout for fetching tasks from storage.
	WithTaskFetcherTimeout = queueprocessor.WithTaskFetcherTimeout
)

Task fetcher configuration options.

View Source
var (
	// WithWorkersCount sets the number of concurrent workers for processing tasks.
	WithWorkersCount = queueprocessor.WithWorkersCount
	// WithWorkersPanicHandler sets a custom panic handler for worker goroutines.
	WithWorkersPanicHandler = queueprocessor.WithWorkersPanicHandler
	// WithTaskProcessingTimeout sets the timeout for processing a single task.
	WithTaskProcessingTimeout = queueprocessor.WithTaskProcessingTimeout
	// WithTaskProcessingMaxAttempts sets the maximum number of retry attempts for failed tasks.
	WithTaskProcessingMaxAttempts = queueprocessor.WithTaskProcessingMaxAttempts
	// WithTaskProcessingNextAttemptAtFunc sets a custom function to calculate the next retry time.
	WithTaskProcessingNextAttemptAtFunc = queueprocessor.WithTaskProcessingNextAttemptAtFunc
)

Worker and task processing configuration options.

View Source
var (
	// WithHooksBeforeProcessing sets hooks to execute before processing each task.
	WithHooksBeforeProcessing = queueprocessor.WithHooksBeforeProcessing
	// WithHooksAfterProcessing sets hooks to execute after processing each task.
	WithHooksAfterProcessing = queueprocessor.WithHooksAfterProcessing
)

Hook configuration options for task processing.

View Source
var (
	// WithCleanerUpdatedAtTimeAgo sets the age threshold for tasks to be cleaned.
	WithCleanerUpdatedAtTimeAgo = queueprocessor.WithCleanerUpdatedAtTimeAgo
	// WithCleanerTimeout sets the timeout for the cleaner operation.
	WithCleanerTimeout = queueprocessor.WithCleanerTimeout
	// WithCleanerPeriod sets the interval between cleaner runs.
	WithCleanerPeriod = queueprocessor.WithCleanerPeriod
)

Cleaner configuration options for removing old tasks.

View Source
var (
	// WithHealerUpdatedAtTimeAgo sets the age threshold for tasks to be healed.
	WithHealerUpdatedAtTimeAgo = queueprocessor.WithHealerUpdatedAtTimeAgo
	// WithHealerTimeout sets the timeout for the healer operation.
	WithHealerTimeout = queueprocessor.WithHealerTimeout
	// WithHealerPeriod sets the interval between healer runs.
	WithHealerPeriod = queueprocessor.WithHealerPeriod
)

Healer configuration options for fixing stuck tasks.

View Source
var NoopTaskProcessor = queueprocessor.NoopTaskProcessor

NoopTaskProcessor is a no-op task processor that does nothing and returns nil.

Functions

This section is empty.

Types

type Goque

type Goque struct {
	// contains filtered or unexported fields
}

Goque is the main task queue manager that coordinates multiple task processors.

func NewGoque

func NewGoque(taskStorage TaskStorage) *Goque

NewGoque creates a new Goque instance with the specified task storage.

func (*Goque) RegisterProcessor

func (g *Goque) RegisterProcessor(
	processorType string,
	taskProcessor queueprocessor.TaskProcessor,
	opts ...queueprocessor.GoqueProcessorOpts,
)

RegisterProcessor registers a new task processor for a specific task type. Should be call before Run.

func (*Goque) Run

func (g *Goque) Run(ctx context.Context) error

Run starts all registered processors in separate goroutines.

func (*Goque) Stop

func (g *Goque) Stop()

Stop gracefully shuts down all registered processors and waits for them to finish.

type Task added in v0.0.4

type Task = entity.Task

Task represents a unit of work to be processed by the queue system.

type TaskProcessorFunc added in v0.0.4

type TaskProcessorFunc = queueprocessor.TaskProcessorFunc

TaskProcessorFunc is a function type that implements the TaskProcessor interface.

type TaskPusher added in v0.0.4

type TaskPusher struct {
	// contains filtered or unexported fields
}

TaskPusher provides functionality for adding tasks to the queue storage.

func NewTaskPusher added in v0.0.4

func NewTaskPusher(taskStorage TaskStorage) *TaskPusher

NewTaskPusher creates a new TaskPusher instance with the specified task storage.

func (*TaskPusher) AddTaskToQueue added in v0.0.4

func (q *TaskPusher) AddTaskToQueue(ctx context.Context, task *entity.Task) error

AddTaskToQueue adds a task to the queue and returns an error if the operation fails.

func (*TaskPusher) AsyncAddTaskToQueue added in v0.0.4

func (q *TaskPusher) AsyncAddTaskToQueue(ctx context.Context, task *entity.Task)

AsyncAddTaskToQueue adds a task to the queue asynchronously without waiting for completion.

type TaskStatus added in v0.0.4

type TaskStatus = entity.TaskStatus

TaskStatus represents the current status of a task in its lifecycle.

type TaskStorage

type TaskStorage = storages.Task

TaskStorage defines the interface for task persistence operations.

func NewStorage added in v0.0.4

func NewStorage(db *sqlx.DB) (TaskStorage, error)

NewStorage creates a new task storage instance based on the database driver. Supports PostgreSQL and MySQL databases.

type TaskType added in v0.0.4

type TaskType = entity.TaskType

TaskType represents the type identifier for tasks in the queue.

Directories

Path Synopsis
internal
entity
Package entity contains domain entities for the task queue system.
Package entity contains domain entities for the task queue system.
pkg/generated/mocks/mock_storages
Package mock_storages is a generated GoMock package.
Package mock_storages is a generated GoMock package.
processors/internalprocessors
Package internalprocessors provides internal task processors for queue management including cleaning and healing operations.
Package internalprocessors provides internal task processors for queue management including cleaning and healing operations.
processors/queueprocessor
Package queueprocessor provides task queue processing functionality with configurable workers and retry logic.
Package queueprocessor provides task queue processing functionality with configurable workers and retry logic.
storages
Package storages provides interfaces and implementations for task storage backends.
Package storages provides interfaces and implementations for task storage backends.
storages/dbentity
Package dbentity provides common database entities and filters for task storage implementations.
Package dbentity provides common database entities and filters for task storage implementations.
storages/dbutils
Package dbutils provides common database utilities for task storage implementations.
Package dbutils provides common database utilities for task storage implementations.
storages/mysql/task
Package mysqltask provides MySQL storage operations for task management in the queue system.
Package mysqltask provides MySQL storage operations for task management in the queue system.
storages/pg/task
Package task provides storage operations for task management in the queue system.
Package task provides storage operations for task management in the queue system.
storages/sqlite
Package sqlite provides SQLite storage operations for task management in the queue system.
Package sqlite provides SQLite storage operations for task management in the queue system.
utils/xtime
Package xtime provides time-related utility functions.
Package xtime provides time-related utility functions.
pkg
goquestorage
Package goquestorage provides task storage implementations for different database backends.
Package goquestorage provides task storage implementations for different database backends.
scripts
dbmodels command
test
testutils
Package testutils provides testing utilities for the goque project.
Package testutils provides testing utilities for the goque project.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL