goque

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2026 License: MIT Imports: 22 Imported by: 0

README

Goque

pipeline Coverage

A robust, database-backed task queue system for Go with built-in worker pools, retry logic, and graceful shutdown support. Supports PostgreSQL, MySQL, and SQLite.

Features

Current Features
  • Multi-database support - Works with PostgreSQL, MySQL, and SQLite
  • Reliable persistence - Task storage with ACID guarantees
  • Worker pool management - Configurable concurrent task processing using goroutine pools
  • Automatic retry logic - Configurable retry attempts with custom backoff strategies
  • Task lifecycle management - Track task status through multiple states (new, processing, done, error, etc.)
  • Graceful shutdown - Clean worker shutdown with in-flight task handling
  • Task timeout handling - Per-task timeout configuration with context cancellation
  • Extensible hooks - Before/after processing hooks for custom logic (metrics, logging, tracing)
  • Type-safe queries - PostgreSQL/MySQL use go-jet for type-safe SQL query generation
  • External ID support - Associate tasks with external identifiers for idempotency
  • Built-in task healer - Automatically marks stuck tasks as errored for reprocessing
  • Multi-processor support - Manage multiple task types with a single queue manager
  • Periodic jobs - Schedule recurring task creation with cron expressions or custom schedulers
  • Structured logging - Built-in structured logging with xlog (supports zap, slog, and custom adapters)
  • Production-ready example - Complete example service with web dashboard and API
  • Prometheus metrics - Built-in Prometheus metrics for monitoring task queue performance

Installation

go get github.com/ruko1202/goque

Database Support

Goque supports three database backends with different performance characteristics:

Database Latency Memory/op Best For Production Ready
PostgreSQL 0.97 ms 35.4 KB High-throughput production systems Recommended
MySQL 1.59 ms 33.4 KB Memory-constrained environments ✅ Yes
SQLite 3.24 ms 38.9 KB Local development, testing, CI/CD ⚠️ Dev/Test only

Key findings:

  • PostgreSQL is 3.4x faster than SQLite and 39% faster than MySQL
  • MySQL uses 6% less memory than PostgreSQL
  • SQLite has file-level locking and is not recommended for production

Quick Start

1. Prepare database

Choose the database backend that fits your deployment requirements.

Production

For production deployments, apply migrations from the migrations/ directory to your database:

# Install goose migration tool
go install github.com/pressly/goose/v3/cmd/goose@latest

# Apply PostgreSQL migrations
goose -dir migrations/pg postgres "your-production-dsn" up

# Or for MySQL
goose -dir migrations/mysql mysql "your-production-dsn" up
Local Development

For local development, use Docker Compose and Make commands:

# Start PostgreSQL and MySQL with Docker Compose
make docker-up

# Configure your database connection in .env.local
# For PostgreSQL (recommended):
echo 'DB_DRIVER=postgres' > .env.local
echo 'DB_DSN=postgres://postgres:postgres@localhost:5432/goque?sslmode=disable' >> .env.local

# For MySQL:
# echo 'DB_DRIVER=mysql' > .env.local
# echo 'DB_DSN=root:root@tcp(localhost:3306)/goque?parseTime=true&loc=UTC' >> .env.local

# For SQLite (dev/test only):
# echo 'DB_DRIVER=sqlite3' > .env.local
# echo 'DB_DSN=./goque.db' >> .env.local

# Install database tools (goose)
make bin-deps-db

# Apply migrations
make db-up

For detailed database setup instructions, see DATABASE.md.

2. Create a Task Processor

Implement the TaskProcessor interface to define how your tasks should be processed:

type EmailProcessor struct{}

func (p *EmailProcessor) ProcessTask(ctx context.Context, task *entity.Task) error {
    // Your task processing logic here
    return sendEmail(task.Payload)
}

Use a typed processor when you want Goque to decode JSON payloads before your processing logic runs:

type EmailPayload struct {
    To      string `json:"to"`
    Subject string `json:"subject"`
}

processor := goque.NewTypedTaskProcessor(
    goque.TypedTaskProcessorFunc[EmailPayload](func(ctx context.Context, task *goque.TypedTask[EmailPayload]) error {
        return sendEmail(task.Payload.To, task.Payload.Subject)
    }),
    goque.WithPayloadDecodeErrorCancel(),
)

If payload decoding fails, the typed processor is not called. The decode error is returned through the normal task processing flow, recorded in goque_payload_decode_errors_total, written to task.Errors, and passed to WithHooksAfterProcessing. Add WithPayloadDecodeErrorCancel to the typed processor to cancel decode failures instead of retrying them:

goq.RegisterProcessor(
    "send_email",
    processor,
    goque.WithHooksAfterProcessing(func(ctx context.Context, task *goque.Task, err error) {
        if errors.Is(err, goque.ErrPayloadUnmarshal) {
            // alert or inspect invalid payload
        }
    }),
)

### 3. Initialize and Run the Queue Manager (Recommended)

```go
package main

import (
    "context"
    "time"

    "github.com/jmoiron/sqlx"
    _ "github.com/lib/pq"
    "github.com/ruko1202/goque"
    "github.com/ruko1202/goque/pkg/goquestorage"
    "github.com/ruko1202/xlog"
    "go.uber.org/zap"
)

func main() {
    ctx := context.Background()

    // Configure structured logging with xlog
    logger := xlog.NewZapAdapter(zap.Must(zap.NewProduction()))
    xlog.ReplaceGlobalLogger(logger)
    ctx = xlog.ContextWithLogger(ctx, logger)

    // Optional: Configure OpenTelemetry tracing
    // goque.SetTracerProvider(tracerProvider) // See "Observability" section

    // Initialize database connection
    db, err := goquestorage.NewDBConn(ctx, &goquestorage.Config{
        DSN: "postgres://user:pass@localhost:5432/goque?sslmode=disable",
        Driver: goquestorage.PgDriver,
    })
    if err != nil {
        xlog.Panic(ctx, err.Error())
    }

    // Create task storage (works with any supported database)
    taskStorage, err := goque.NewStorage(db)
    if err != nil {
        panic(err)
    }

    // Optional: Configure metrics with service name
    goque.SetMetricsServiceName("my-service")

    // Create Goque manager (includes built-in healer processor)
    goq := goque.NewGoque(taskStorage)

    // Register your task processors
    goq.RegisterProcessor(
        "send_email",
        &EmailProcessor{},
        goque.WithWorkersCount(10),
        goque.WithTaskProcessingMaxAttempts(3),
        goque.WithTaskProcessingTimeout(30 * time.Second),
    )

    // Run all processors
    goq.Run(ctx)

    // Graceful shutdown
    defer goq.Stop()
}
4. Adding Tasks to the Queue
payload := `{"to": "user@example.com", "subject": "Hello"}`
// Create a new task
task := goque.NewTask("send_email", payload)

// Or with external ID for idempotency
task := goque.NewTaskWithExternalID("send_email", payload, "external-order-123")

// Or marshal a typed payload as JSON
task, err := goque.NewTaskWithPayload("send_email", EmailPayload{
    To:      "user@example.com",
    Subject: "Hello",
})

// Add to queue using TaskQueueManager (recommended - includes metrics)
taskQueueManager := goque.NewTaskQueueManager(taskStorage)
err := taskQueueManager.AddTaskToQueue(ctx, task)

Example Application

A complete, production-ready example service demonstrating real-world Goque usage is available in the examples/service directory.

For detailed instructions and API documentation, see examples/service/README.md.

Configuration Options

The GoqueProcessor supports various configuration options:

  • WithWorkersCount(n int) - Set the number of concurrent workers (default: 1)
  • WithWorkersPanicHandler(handler func(context.Context) func(any)) - Set a custom worker panic handler
  • WithTaskProcessingMaxAttempts(n int32) - Set maximum retry attempts (default: 3)
  • WithTaskProcessingTimeout(d time.Duration) - Set per-task timeout (default: 30s)
  • WithTaskProcessingNextAttemptAtFunc(f) - Custom retry backoff strategy
  • WithTaskFetcherMaxTasks(n int64) - Set maximum tasks to fetch per cycle (default: 10)
  • WithTaskFetcherTick(d time.Duration) - Set fetch interval (default: 1s)
  • WithTaskFetcherTimeout(d time.Duration) - Set timeout for fetching tasks from storage
  • WithHooksBeforeProcessing(hooks ...HookBeforeProcessing) - Add pre-processing hooks
  • WithHooksAfterProcessing(hooks ...HookAfterProcessing) - Add post-processing hooks
  • WithCleanerPeriod(d time.Duration) - Set the cleaner run interval
  • WithCleanerUpdatedAtTimeAgo(d time.Duration) - Set the completed-task age threshold for cleanup
  • WithCleanerTimeout(d time.Duration) - Set the cleaner operation timeout
  • WithHealerPeriod(d time.Duration) - Set the healer run interval
  • WithHealerUpdatedAtTimeAgo(d time.Duration) - Set the stuck-task age threshold for healing
  • WithHealerTimeout(d time.Duration) - Set the healer operation timeout
Periodic Jobs

Periodic jobs run in separate scheduler processors. On each schedule tick, Goque calls the job factory and inserts the returned task into the queue through TaskQueueManager. The inserted task is a normal one-shot task: successful tasks still become done, failed tasks still use the normal retry flow, and cancellation is handled by the queue manager.

Use NewCronJob for a standard 5-field cron expression:

periodicJob, err := goque.NewCronJob(
    "daily_report_schedule",
    "0 3 * * *",
    time.UTC,
    func(ctx context.Context) (*goque.Task, error) {
        return goque.NewTask(
            "daily_report",
            `{"report":"sales"}`,
        ), nil
    },
    goque.WithPeriodicJobRunOnStart(),
)
if err != nil {
    return err
}

goq := goque.NewGoque(taskStorage)
goq.RegisterPeriodicJob(periodicJob)

WithPeriodicJobRunOnStart() is optional. When enabled, the periodic job enqueues one task immediately when Goque starts, then continues with its normal schedule.

Register a normal processor for the task type produced by the periodic factory:

goq.RegisterProcessor(
    "daily_report",
    &DailyReportProcessor{},
)

For non-cron schedules, use NewPeriodicJob with PeriodicSchedulerFunc or any type that implements PeriodicSchedule:

periodicJob, err := goque.NewPeriodicJob(
    "quarter_hour_report_schedule",
    goque.PeriodicSchedulerFunc(func(t time.Time) time.Time {
        return t.Add(15 * time.Minute)
    }),
    func(ctx context.Context) (*goque.Task, error) {
        return goque.NewTask(
            "quarter_hour_report",
            `{"report":"sales"}`,
        ), nil
    },
)
if err != nil {
    return err
}

Use external_id when the periodic job must be idempotent for a schedule slot:

slot := time.Now().UTC().Truncate(15 * time.Minute)
externalID := fmt.Sprintf("quarter_hour_report:%s", slot.Format(time.RFC3339))
task := goque.NewTaskWithExternalID("quarter_hour_report", payload, externalID)

Task Status Lifecycle

Tasks flow through the following states:

new → pending → processing → done
        ↑ ↓         ↓    ↓
        │ └──error──┘  canceled
        │     ↓
        │  attempts_left
        │
        └──(healer fixes stuck pending tasks)
Status Descriptions
  • new - Task created and ready to be picked up
  • pending - Task scheduled for future processing (via NextAttemptAt)
  • processing - Task currently being processed by a worker
  • done - Task completed successfully ✓ (terminal)
  • error - Task failed but has retry attempts remaining
  • attempts_left - Task failed and exhausted all retry attempts ✗ (terminal)
  • canceled - Task was manually canceled ✗ (terminal)
Valid State Transitions
Current Status Next Status Trigger
new pending Task scheduled for processing
pending processing Worker picks up task
pending error Healer marks stuck task (cure operation)
processing done Successful processing
processing error Failed processing with retries left
processing canceled Manual cancellation
error pending Retry logic schedules next attempt
error attempts_left No more retry attempts available
Terminal States

Tasks in these states will not be processed again:

  • done - Successfully completed
  • canceled - Manually canceled by user
  • attempts_left - Failed with no remaining retry attempts

Built-in Features

Task Healer

Goque includes a built-in healer processor that automatically monitors and fixes stuck tasks. Tasks that remain in the "pending" status for too long are automatically marked as errored, allowing them to be retried. The healer is automatically registered when you use internal.NewGoque().

You can configure the healer behavior:

goq.RegisterProcessor(
    "send_email",
    &EmailProcessor{},
    goque.WithWorkersCount(10),
    goque.WithTaskProcessingMaxAttempts(3),
    goque.WithTaskProcessingTimeout(30 * time.Second),

    goque.WithHealerPeriod(10*time.Minute),
    goque.WithHealerUpdatedAtTimeAgo(time.Hour),
    goque.WithHealerTimeout(30*time.Second),
)
Observability
Prometheus Metrics

Goque includes built-in Prometheus metrics for comprehensive monitoring of your task queue. Metrics are automatically collected during task processing operations.

Available Metrics
Metric Name Type Labels Description
goque_processed_tasks_total Counter task_type, status Total number of processed tasks by type and final status
goque_processed_tasks_with_error_total Counter task_type, task_processing_operations, task_error_type Tasks processed with errors, including error type details
goque_task_processing_duration_seconds Histogram task_type Task processing duration distribution in seconds
goque_task_payload_size_bytes Histogram task_type Task payload size distribution in bytes
goque_payload_decode_errors_total Counter task_type Typed task payload JSON decode errors by task type
Configuration
import (
    "github.com/prometheus/client_golang/prometheus/promhttp"
    "github.com/ruko1202/goque"
    "net/http"
)

// Optional: Set service name for metrics labels
goque.SetMetricsServiceName("my-service")

// Expose metrics endpoint
http.Handle("/metrics", promhttp.Handler())
go http.ListenAndServe(":9090", nil)
Task Processing Operations

Metrics track errors across different operations:

  • add_to_queue - Errors during task creation and queue insertion
  • processing - Errors during task execution
  • cleanup - Errors during task cleanup operations
  • health - Errors during healer operations
Example Queries
# Task processing rate by type
rate(goque_processed_tasks_total[5m])

# Task error rate
rate(goque_processed_tasks_with_error_total[5m])

# Typed payload decode errors by task type
sum by (task_type) (rate(goque_payload_decode_errors_total[5m]))

# Average processing duration
rate(goque_task_processing_duration_seconds_sum[5m])
  / rate(goque_task_processing_duration_seconds_count[5m])

# 95th percentile processing time
histogram_quantile(0.95, goque_task_processing_duration_seconds_bucket)

# Tasks by status
sum by (status) (goque_processed_tasks_total)

For a complete example with metrics integration, see examples/service/.

Structured Logging (xlog)

Goque uses xlog for structured logging with support for multiple backends (Zap, Slog, custom adapters). See the Quick Start example above for configuration.

What gets logged:

  • Task lifecycle events (creation, processing, completion)
  • Error events and retry attempts
  • Database operations and transaction management
  • Worker pool events and task distribution
Distributed Tracing (OpenTelemetry)

Goque supports OpenTelemetry for distributed tracing. By default, uses a noop tracer (zero overhead). See the Quick Start example above for enabling tracing.

Traced operations:

  • Task processing loop and fetching
  • Hook execution (before/after)
  • Task queue operations (add, get)

Performance impact:

  • Noop (default): 0% overhead
  • With 1% sampling: <0.1% memory overhead
  • Without sampling: ~6% memory overhead

⚠️ Important: Call goque.SetTracerProvider() before creating Goque instances.

Development

Prerequisites
  • Go 1.23 or higher
  • One of the supported databases:
    • PostgreSQL 12+
    • MySQL 8+
    • SQLite 3+
  • Make
Setup
# Install all dependencies and tools
make all

# Install only binary dependencies
make bin-deps

# Download Go modules
make deps
Running Tests
# Run all tests
make tloc

# Run tests with coverage
make test-cov
Code Quality
# Run linter
make lint

# Format code
make fmt
Database Operations

All database commands work with any supported database (PostgreSQL, MySQL, SQLite). Simply configure DB_DRIVER in .env.local and use universal commands:

# Show current database configuration
make db-info

# Create a new migration for current DB_DRIVER
make db-migrate-create name="your_migration_name"

# Check migration status
make db-status

# Apply migrations
make db-up

# Rollback last migration
make db-down

# Regenerate database models (PostgreSQL only)
make db-models

# Docker Compose commands for local development
make docker-up           # Start PostgreSQL and MySQL with Docker Compose
make docker-pg-up        # Start only PostgreSQL
make docker-mysql-up     # Start only MySQL
make docker-down         # Stop and remove all containers
make docker-down-volumes # Stop and remove containers with volumes
make docker-logs         # Show logs from all containers
make docker-ps           # Show status of containers

For complete database setup guide, migrations structure, and Docker Compose configuration, see DATABASE.md.

Generate Mocks
make mocks

Project Structure

.
├── goque_manager.go            # Public API - TaskQueueManager interface
├── internal/
│   ├── goque.go                # Main Goque implementation
│   ├── entity/                 # Domain entities and errors
│   │   ├── task.go             # Task entity
│   │   ├── errors.go           # Domain errors
│   │   └── operations.go       # Task operation constants
│   ├── queue_manager/          # Task queue manager implementation
│   ├── metrics/                # Prometheus metrics
│   │   ├── metrics.go          # Metrics collectors and functions
│   │   └── vars.go             # Metrics configuration
│   ├── processors/             # Task processing components
│   │   ├── queueprocessor/     # Main queue processor
│   │   └── internalprocessors/ # Built-in processors (healer, cleaner)
│   ├── storages/               # Data access layer (multi-database support)
│   │   ├── pg/task/            # PostgreSQL storage (go-jet)
│   │   ├── mysql/task/         # MySQL storage (raw SQL)
│   │   ├── sqlite/task/        # SQLite storage (raw SQL)
│   │   └── dbutils/            # Database utilities
│   └── pkg/
│       └── generated/          # Generated code (models, mocks)
├── migrations/                 # Database migrations
│   ├── pg/                     # PostgreSQL migrations
│   ├── mysql/                  # MySQL migrations
│   └── sqlite/                 # SQLite migrations
├── examples/service/           # Production-ready example service
├── DATABASE.md                 # Complete database setup guide
└── test/                       # Test utilities and fixtures

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Overview

Package goque provides a distributed task queue manager with support for multiple processors and automatic task healing and cleaning.

Package goque provides a robust, SQL-backed task queue system for Go applications.

Package goque provides a robust, SQL-backed task queue system for Go applications.

Index

Constants

View Source
const (
	TaskStatusNew          = entity.TaskStatusNew          // Task is ready to be picked up
	TaskStatusPending      = entity.TaskStatusPending      // Task is scheduled for future processing
	TaskStatusProcessing   = entity.TaskStatusProcessing   // Task is currently being processed
	TaskStatusDone         = entity.TaskStatusDone         // Task completed successfully
	TaskStatusCanceled     = entity.TaskStatusCanceled     // Task was manually canceled
	TaskStatusError        = entity.TaskStatusError        // Task failed but has retry attempts remaining
	TaskStatusAttemptsLeft = entity.TaskStatusAttemptsLeft // Task failed and exhausted all retries
)

Task status constants define the possible states a task can be in.

Variables

View Source
var (
	// WithValue adds a single key-value pair to the context for task metadata tracking.
	WithValue = goquectx.WithValue
	// WithValues adds multiple key-value pairs to the context for task metadata tracking.
	WithValues = goquectx.WithValues
	// ValueByKey retrieves stored value from the context by key.
	ValueByKey = goquectx.ValueByKey
	// Values retrieves all stored metadata values from the context.
	Values = goquectx.Values
)

Context value functions for storing and retrieving task metadata.

View Source
var (
	// NoTaskPayload represents an empty task payload.
	NoTaskPayload = entity.NoTaskPayload
	// NewTask creates a new task with the specified type and payload.
	NewTask = entity.NewTask
	// NewTaskWithExternalID creates a new task with an external identifier for idempotency.
	NewTaskWithExternalID = entity.NewTaskWithExternalID
)

Task creation functions for adding new tasks to the queue.

View Source
var (
	// ErrDuplicateTask is returned when attempting to insert a task with a duplicate external ID.
	ErrDuplicateTask = entity.ErrDuplicateTask
	// ErrInvalidPayloadFormat is returned when the task payload is not valid JSON.
	ErrInvalidPayloadFormat = entity.ErrInvalidPayloadFormat
	// ErrPayloadMarshal is returned when a typed task payload cannot be marshaled to JSON.
	ErrPayloadMarshal = entity.ErrPayloadMarshal
	// ErrPayloadUnmarshal is returned when a typed task payload cannot be unmarshaled from JSON.
	ErrPayloadUnmarshal = entity.ErrPayloadUnmarshal
	// ErrTaskCancel is returned when a task is canceled during processing.
	ErrTaskCancel = entity.ErrTaskCancel
	// ErrTaskTimeout is returned when task processing exceeds the timeout limit.
	ErrTaskTimeout = entity.ErrTaskTimeout
)
View Source
var (
	// NewPeriodicJob creates a periodic job from a schedule and task factory.
	NewPeriodicJob = periodicprocessor.NewJob
	// NewCronJob creates a periodic job from a standard 5-field cron spec.
	NewCronJob = periodicprocessor.NewCronJob
	// CronSchedule creates a schedule from a standard 5-field cron spec.
	CronSchedule = periodicprocessor.CronSchedule
	// WithPeriodicJobRunOnStart makes a periodic job enqueue one task when the scheduler starts.
	WithPeriodicJobRunOnStart = periodicprocessor.WithRunOnStart
)
View Source
var (
	// WithTaskFetcherMaxTasks sets the maximum number of tasks to fetch in a single batch.
	WithTaskFetcherMaxTasks = queueprocessor.WithTaskFetcherMaxTasks
	// WithTaskFetcherTick sets the interval between task fetch attempts.
	WithTaskFetcherTick = queueprocessor.WithTaskFetcherTick
	// WithTaskFetcherTimeout sets the timeout for fetching tasks from storage.
	WithTaskFetcherTimeout = queueprocessor.WithTaskFetcherTimeout
)

Task fetcher configuration options.

View Source
var (
	// WithWorkersCount sets the number of concurrent workers for processing tasks.
	WithWorkersCount = queueprocessor.WithWorkersCount
	// WithWorkersPanicHandler sets a custom panic handler for worker goroutines.
	WithWorkersPanicHandler = queueprocessor.WithWorkersPanicHandler
	// WithTaskProcessingTimeout sets the timeout for processing a single task.
	WithTaskProcessingTimeout = queueprocessor.WithTaskProcessingTimeout
	// WithTaskProcessingMaxAttempts sets the maximum number of retry attempts for failed tasks.
	WithTaskProcessingMaxAttempts = queueprocessor.WithTaskProcessingMaxAttempts
	// WithTaskProcessingNextAttemptAtFunc sets a custom function to calculate the next retry time.
	WithTaskProcessingNextAttemptAtFunc = queueprocessor.WithTaskProcessingNextAttemptAtFunc
)

Worker and task processing configuration options.

View Source
var (
	// WithHooksBeforeProcessing sets hooks to execute before processing each task.
	WithHooksBeforeProcessing = queueprocessor.WithHooksBeforeProcessing
	// WithHooksAfterProcessing sets hooks to execute after processing each task.
	WithHooksAfterProcessing = queueprocessor.WithHooksAfterProcessing
)

Hook configuration options for task processing.

View Source
var (
	// WithCleanerUpdatedAtTimeAgo sets the age threshold for tasks to be cleaned.
	WithCleanerUpdatedAtTimeAgo = queueprocessor.WithCleanerUpdatedAtTimeAgo
	// WithCleanerTimeout sets the timeout for the cleaner operation.
	WithCleanerTimeout = queueprocessor.WithCleanerTimeout
	// WithCleanerPeriod sets the interval between cleaner runs.
	WithCleanerPeriod = queueprocessor.WithCleanerPeriod
)

Cleaner configuration options for removing old tasks.

View Source
var (
	// WithHealerUpdatedAtTimeAgo sets the age threshold for tasks to be healed.
	WithHealerUpdatedAtTimeAgo = queueprocessor.WithHealerUpdatedAtTimeAgo
	// WithHealerTimeout sets the timeout for the healer operation.
	WithHealerTimeout = queueprocessor.WithHealerTimeout
	// WithHealerPeriod sets the interval between healer runs.
	WithHealerPeriod = queueprocessor.WithHealerPeriod
)

Healer configuration options for fixing stuck tasks.

View Source
var NoopTaskProcessor = queueprocessor.NoopTaskProcessor

NoopTaskProcessor is a no-op task processor that does nothing and returns nil.

Functions

func SetMetricsServiceName added in v0.2.0

func SetMetricsServiceName(name string)

SetMetricsServiceName sets the service name label for Prometheus metrics. This should be called once during application initialization, before starting the queue manager.

Example:

goque.SetMetricsServiceName("my-service")

func SetTracerProvider added in v0.4.0

func SetTracerProvider(tp trace.TracerProvider)

SetTracerProvider configures the TracerProvider used by Goque for distributed tracing.

This function must be called BEFORE creating any Goque instances or TaskQueueManager instances, as they capture the tracer at creation time.

By default, Goque uses a noop tracer (zero overhead). Call this function to enable OpenTelemetry distributed tracing in production environments.

Example usage:

import (
    "github.com/ruko1202/goque"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

// Initialize TracerProvider
tracerProvider := sdktrace.NewTracerProvider(
    sdktrace.WithSampler(sdktrace.TraceIDRatioBased(0.01)), // 1% sampling
    sdktrace.WithBatcher(exporter),
)
defer tracerProvider.Shutdown(ctx)

// Configure Goque (BEFORE creating instances)
goque.SetTracerProvider(tracerProvider)

// Now create Goque instances
goq := goque.NewGoque(taskStorage)

Types

type Goque

type Goque struct {
	// contains filtered or unexported fields
}

Goque is the main task queue manager that coordinates multiple task processors.

func NewGoque

func NewGoque(taskStorage TaskStorage) *Goque

NewGoque creates a new Goque instance with the specified task storage.

func (*Goque) RegisterPeriodicJob added in v0.5.0

func (g *Goque) RegisterPeriodicJob(job *PeriodicJob)

RegisterPeriodicJob registers a periodic job processor. Should be called before Run.

func (*Goque) RegisterProcessor

func (g *Goque) RegisterProcessor(
	processorType string,
	taskProcessor TaskProcessor,
	opts ...ProcessorOpts,
)

RegisterProcessor registers a new task processor for a specific task type. Should be call before Run.

func (*Goque) Run

func (g *Goque) Run(ctx context.Context) error

Run starts all registered processors in separate goroutines.

func (*Goque) Stop

func (g *Goque) Stop()

Stop gracefully shuts down all registered processors and waits for them to finish.

type Metadata added in v0.3.0

type Metadata = entity.Metadata

Metadata represents arbitrary key-value data associated with a task for tracking and context.

type PeriodicJob added in v0.5.0

type PeriodicJob = periodicprocessor.Job

PeriodicJob describes a producer that periodically inserts regular queue tasks.

type PeriodicJobFactory added in v0.5.0

type PeriodicJobFactory = periodicprocessor.TaskFactory

PeriodicJobFactory creates a task for a scheduled run.

type PeriodicJobOpts added in v0.5.0

type PeriodicJobOpts = periodicprocessor.JobOptions

PeriodicJobOpts configures a periodic job.

type PeriodicSchedule added in v0.5.0

type PeriodicSchedule = periodicprocessor.Scheduler

PeriodicSchedule calculates the next run time for a periodic job.

type PeriodicSchedulerFunc added in v0.5.0

type PeriodicSchedulerFunc = periodicprocessor.SchedulerFunc

PeriodicSchedulerFunc wrap PeriodicSchedule.

type ProcessorOpts added in v0.5.0

type ProcessorOpts = queueprocessor.GoqueProcessorOpts

ProcessorOpts is a function type for configuring GoqueProcessor options.

type Task added in v0.0.4

type Task = entity.Task

Task represents a unit of work to be processed by the queue system.

func NewTaskWithPayload added in v0.6.0

func NewTaskWithPayload[T any](taskType TaskType, payload T) (*Task, error)

NewTaskWithPayload creates a new task with a typed payload marshaled as JSON.

func NewTaskWithPayloadAndExternalID added in v0.6.0

func NewTaskWithPayloadAndExternalID[T any](taskType TaskType, payload T, externalID string) (*Task, error)

NewTaskWithPayloadAndExternalID creates a new task with a typed payload and custom external ID.

type TaskFilter added in v0.1.1

type TaskFilter = dbentity.GetTasksFilter

TaskFilter represents filtering criteria for querying tasks from the queue.

type TaskProcessor added in v0.5.0

type TaskProcessor = queueprocessor.TaskProcessor

TaskProcessor defines the interface for processing individual tasks.

func NewTypedTaskProcessor added in v0.6.0

func NewTypedTaskProcessor[T any](processor TypedTaskProcessor[T], opts ...TypedTaskProcessorOpt[T]) TaskProcessor

NewTypedTaskProcessor wraps a typed task processor for use with RegisterProcessor.

type TaskProcessorFunc added in v0.0.4

type TaskProcessorFunc = queueprocessor.TaskProcessorFunc

TaskProcessorFunc is a function type that implements the TaskProcessor interface.

type TaskQueueManager added in v0.1.1

type TaskQueueManager interface {
	AsyncAddTaskToQueue(ctx context.Context, task *Task)
	AddTaskToQueue(ctx context.Context, task *Task) error
	GetTask(ctx context.Context, taskID uuid.UUID) (*Task, error)
	GetTasks(ctx context.Context, filter *TaskFilter, limit int64) ([]*Task, error)
	ResetAttempts(ctx context.Context, taskID uuid.UUID) error
	CancelTask(ctx context.Context, taskID uuid.UUID) error
}

TaskQueueManager provides operations for managing tasks in the queue. It offers both synchronous and asynchronous methods for adding tasks, as well as querying and managing existing tasks.

func NewTaskQueueManager added in v0.1.1

func NewTaskQueueManager(taskStorage TaskStorage) TaskQueueManager

NewTaskQueueManager creates a new TaskQueueManager instance with the specified task storage.

type TaskStatus added in v0.0.4

type TaskStatus = entity.TaskStatus

TaskStatus represents the current status of a task in its lifecycle.

type TaskStorage

type TaskStorage = storages.Task

TaskStorage defines the interface for task persistence operations.

func NewStorage added in v0.0.4

func NewStorage(db *sqlx.DB) (TaskStorage, error)

NewStorage creates a new task storage instance based on the database driver. Supports PostgreSQL and MySQL databases.

type TaskType added in v0.0.4

type TaskType = entity.TaskType

TaskType represents the type identifier for tasks in the queue.

type TypedTask added in v0.6.0

type TypedTask[T any] = entity.TypedTask[T]

TypedTask represents a task with a payload decoded into the expected Go type.

type TypedTaskProcessor added in v0.6.0

type TypedTaskProcessor[T any] = queueprocessor.TypedTaskProcessor[T]

TypedTaskProcessor defines the interface for processing typed task payloads.

type TypedTaskProcessorFunc added in v0.6.0

type TypedTaskProcessorFunc[T any] = queueprocessor.TypedTaskProcessorFunc[T]

TypedTaskProcessorFunc is a function type that implements the TypedTaskProcessor interface.

type TypedTaskProcessorOpt added in v0.6.0

type TypedTaskProcessorOpt[T any] = queueprocessor.GoqueTypedProcessorOpts[T]

TypedTaskProcessorOpt configures a typed task processor adapter.

func WithCancelTaskWhenPayloadDecodeError added in v0.6.0

func WithCancelTaskWhenPayloadDecodeError[T any]() TypedTaskProcessorOpt[T]

WithCancelTaskWhenPayloadDecodeError cancels typed tasks when payload decoding fails instead of retrying them.

Directories

Path Synopsis
internal
entity
Package entity contains domain entities for the task queue system.
Package entity contains domain entities for the task queue system.
metrics
Package metrics provides Prometheus instrumentation for Goque task queue operations.
Package metrics provides Prometheus instrumentation for Goque task queue operations.
pkg/generated/mocks/mock_periodicprocessor
Package mock_periodicprocessor is a generated GoMock package.
Package mock_periodicprocessor is a generated GoMock package.
pkg/generated/mocks/mock_storages
Package mock_storages is a generated GoMock package.
Package mock_storages is a generated GoMock package.
processors/internalprocessors
Package internalprocessors provides internal task processors for queue management including cleaning and healing operations.
Package internalprocessors provides internal task processors for queue management including cleaning and healing operations.
processors/periodicprocessor
Package periodicprocessor provides cron-based periodic job scheduling.
Package periodicprocessor provides cron-based periodic job scheduling.
processors/queueprocessor
Package queueprocessor provides task queue processing functionality with configurable workers and retry logic.
Package queueprocessor provides task queue processing functionality with configurable workers and retry logic.
queuemanager
Package queuemanager provides high-level task queue management operations.
Package queuemanager provides high-level task queue management operations.
storages
Package storages provides interfaces and implementations for task storage backends.
Package storages provides interfaces and implementations for task storage backends.
storages/dbentity
Package dbentity provides common database entities and filters for task storage implementations.
Package dbentity provides common database entities and filters for task storage implementations.
storages/dbutils
Package dbutils provides common database utilities for task storage implementations.
Package dbutils provides common database utilities for task storage implementations.
storages/mysql/task
Package mysqltask provides MySQL storage operations for task management in the queue system.
Package mysqltask provides MySQL storage operations for task management in the queue system.
storages/pg/task
Package task provides storage operations for task management in the queue system.
Package task provides storage operations for task management in the queue system.
storages/sqlite
Package sqlite provides SQLite storage operations for task management in the queue system.
Package sqlite provides SQLite storage operations for task management in the queue system.
utils/goquectx
Package goquectx provides utilities for managing task metadata within context values.
Package goquectx provides utilities for managing task metadata within context values.
utils/xcollections
Package xcollections provides thread-safe collection types.
Package xcollections provides thread-safe collection types.
utils/xpool
Package xpool provides a buffer pool for efficient memory reuse.
Package xpool provides a buffer pool for efficient memory reuse.
utils/xtime
Package xtime provides time-related utility functions.
Package xtime provides time-related utility functions.
utils/xtracer
Package xtracer provides OpenTelemetry tracing utilities for Goque.
Package xtracer provides OpenTelemetry tracing utilities for Goque.
pkg
goquestorage
Package goquestorage provides task storage implementations for different database backends.
Package goquestorage provides task storage implementations for different database backends.
scripts
dbmodels command
test
testutils
Package testutils provides testing utilities for the goque project.
Package testutils provides testing utilities for the goque project.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL