goque

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2025 License: MIT Imports: 6 Imported by: 0

README

Goque

pipeline Coverage

A robust, PostgreSQL-backed task queue system for Go with built-in worker pools, retry logic, and graceful shutdown support.

Features

  • PostgreSQL-backed persistence - Reliable task storage with ACID guarantees
  • Worker pool management - Configurable concurrent task processing using goroutine pools
  • Automatic retry logic - Configurable retry attempts with custom backoff strategies
  • Task lifecycle management - Track task status through multiple states (new, processing, done, error, etc.)
  • Graceful shutdown - Clean worker shutdown with in-flight task handling
  • Task timeout handling - Per-task timeout configuration with context cancellation
  • Extensible hooks - Before/after processing hooks for custom logic
  • Type-safe queries - Using go-jet for type-safe SQL query generation
  • External ID support - Associate tasks with external identifiers for idempotency
  • Built-in task healer - Automatically marks stuck tasks as errored for reprocessing
  • Multi-processor support - Manage multiple task types with a single queue manager

Installation

go get github.com/ruko1202/goque

Quick Start

1. Prepare database

First, set up your PostgreSQL database and run migrations:

# Configure your database connection
echo 'DB_DSN "postgresql://user:password@localhost:5432/dbname?sslmode=disable"' > .env.local
echo 'DB_DRIVER "postgres"' >> .env.local

# Install database tools
make bin-deps-db

# Run migrations
make db-up
2. Create a Task Processor

Implement the TaskProcessor interface to define how your tasks should be processed:

type EmailProcessor struct{}

func (p *EmailProcessor) ProcessTask(ctx context.Context, task *entity.Task) error {
    // Your task processing logic here
    return sendEmail(task.Payload)
}
package main

import (
    "context"
    "database/sql"
    "time"

    "github.com/ruko1202/goque/internal"
    "github.com/ruko1202/goque/internal/processor"
    "github.com/ruko1202/goque/internal/storages/task"
)

func main() {
    // Initialize database connection
    db, err := sql.Open("postgres", "your-connection-string")
    if err != nil {
        panic(err)
    }

    // Create task storage
    taskStorage := task.NewStorage(db)

    // Create Goque manager (includes built-in healer processor)
    goque := internal.NewGoque(taskStorage)

    // Register your task processors
    goque.RegisterProcessor(
        "send_email",
        &EmailProcessor{},
        processor.WithWorkers(10),
        processor.WithMaxAttempts(3),
        processor.WithTaskTimeout(30 * time.Second),
    )

    // Run all processors
    ctx := context.Background()
    goque.Run(ctx)

    // Graceful shutdown
    defer goque.Stop()
}
4. Adding Tasks to the Queue
import "github.com/ruko1202/goque/internal/entity"

// Create a new task
task := entity.NewTask("send_email", `{"to": "user@example.com", "subject": "Hello"}`)

// Or with external ID for idempotency
task := entity.NewTaskWithExternalID("send_email", payload, "order-123")

// Add to storage
err := taskStorage.AddTask(ctx, task)

Configuration Options

The GoqueProcessor supports various configuration options:

  • WithWorkers(n int) - Set the number of concurrent workers (default: 1)
  • WithMaxAttempts(n int32) - Set maximum retry attempts (default: 3)
  • WithTaskTimeout(d time.Duration) - Set per-task timeout (default: 30s)
  • WithFetchMaxTasks(n int64) - Set maximum tasks to fetch per cycle (default: 10)
  • WithFetchTick(d time.Duration) - Set fetch interval (default: 1s)
  • WithNextAttemptAtFunc(f NextAttemptAtFunc) - Custom retry backoff strategy
  • WithHooksBeforeProcessing(hooks ...HookBeforeProcessing) - Add pre-processing hooks
  • WithHooksAfterProcessing(hooks ...HookAfterProcessing) - Add post-processing hooks

Task Status Lifecycle

Tasks flow through the following states:

new → pending → processing → done
                 ↓       ↓
              canceled  error → attempts_left (no more retries)
                          ↓
                        (retry) → processing
  
  • new - Task created and ready to be picked up
  • pending - Task scheduled for future processing
  • processing - Task currently being processed
  • done - Task completed successfully
  • error - Task failed but has retry attempts remaining
  • attempts_left - Task failed and exhausted all retry attempts
  • canceled - Task was manually canceled

Built-in Features

Task Healer

Goque includes a built-in healer processor that automatically monitors and fixes stuck tasks. Tasks that remain in the "pending" status for too long are automatically marked as errored, allowing them to be retried. The healer is automatically registered when you use internal.NewGoque().

You can configure the healer behavior:

import internalprocessors "github.com/ruko1202/goque/internal/internal_processors"

goque := internal.NewGoque(
    taskStorage,
    internalprocessors.WithHealerUpdatedAtTimeAgo(5 * time.Minute),
    internalprocessors.WithHealerMaxTasks(200),
)

Development

Prerequisites
  • Go 1.23 or higher
  • PostgreSQL 12+
  • Make
Setup
# Install all dependencies and tools
make all

# Install only binary dependencies
make bin-deps

# Download Go modules
make deps
Running Tests
# Run all tests
make tloc

# Run tests with coverage
make test-cov
Code Quality
# Run linter
make lint

# Format code
make fmt
Database Operations
# Create a new migration
make db-migrate-create name="your_migration_name"

# Check migration status
make db-status

# Apply migrations
make db-up

# Rollback last migration
make db-down

# Regenerate database models
make db-models
Generate Mocks
make mocks

Project Structure

.
├── internal/
│   ├── goque.go                # Main queue manager
│   ├── entity/                 # Domain entities (Task, etc.)
│   ├── processor/              # Queue processor and task processor interfaces
│   ├── internal_processors/    # Built-in processors (healer, etc.)
│   ├── storages/               # Data access layer
│   │   ├── task/               # Task storage implementation
│   │   └── dbutils/            # Database utilities
│   └── pkg/
│       └── generated/          # Generated code (models, mocks)
├── migrations/                 # Database migrations
└── test/                       # Test utilities and fixtures

License

[Add your license here]

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Documentation

Overview

Package goque provides a distributed task queue manager with support for multiple processors and automatic task healing and cleaning.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Goque

type Goque struct {
	// contains filtered or unexported fields
}

Goque is the main task queue manager that coordinates multiple task processors.

func NewGoque

func NewGoque(taskStorage TaskStorage) *Goque

NewGoque creates a new Goque instance with the specified task storage.

func (*Goque) RegisterProcessor

func (g *Goque) RegisterProcessor(
	processorType string,
	taskProcessor queueprocessor.TaskProcessor,
	opts ...queueprocessor.GoqueProcessorOpts,
)

RegisterProcessor registers a new task processor for a specific task type. Should be call before Run.

func (*Goque) Run

func (g *Goque) Run(ctx context.Context) error

Run starts all registered processors in separate goroutines.

func (*Goque) Stop

func (g *Goque) Stop()

Stop gracefully shuts down all registered processors and waits for them to finish.

type TaskStorage

TaskStorage combines all storage interfaces required by Goque for task processing, cleaning, and healing operations.

Directories

Path Synopsis
internal
entity
Package entity contains domain entities for the task queue system.
Package entity contains domain entities for the task queue system.
pkg/generated/mocks/mock_processors/queueprocessor
Package mock_queueprocessor is a generated GoMock package.
Package mock_queueprocessor is a generated GoMock package.
processors/internalprocessors
Package internalprocessors provides internal task processors for queue management including cleaning and healing operations.
Package internalprocessors provides internal task processors for queue management including cleaning and healing operations.
processors/queueprocessor
Package queueprocessor provides task queue processing functionality with configurable workers and retry logic.
Package queueprocessor provides task queue processing functionality with configurable workers and retry logic.
queuemngr
Package queuemngr provides queue management functionality for adding tasks to storage.
Package queuemngr provides queue management functionality for adding tasks to storage.
storages/sql/pg/task
Package task provides storage operations for task management in the queue system.
Package task provides storage operations for task management in the queue system.
storages/sql/pg/utils
Package sqlpgutils provides database utility functions and helpers.
Package sqlpgutils provides database utility functions and helpers.
storages/sql/utils
Package sqldbutils provides database utilities for transaction management and common database operations.
Package sqldbutils provides database utilities for transaction management and common database operations.
utils/xtime
Package xtime provides time-related utility functions.
Package xtime provides time-related utility functions.
pkg
utils/sqldb
Package sqldb provides utilities for establishing and managing SQL database connections.
Package sqldb provides utilities for establishing and managing SQL database connections.
scripts
dbmodels command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL