queue

package
v1.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 20, 2025 License: BSD-3-Clause Imports: 16 Imported by: 0

Documentation

Overview

Package queue provides a comprehensive task queue and background job processing system with advanced scheduling capabilities.

It supports in-memory, Redis, and RabbitMQ queues with features like:

  • Priority-based job scheduling
  • Worker pool management
  • Retry mechanisms with exponential backoff
  • Middleware support for logging, metrics, and recovery
  • Batch processing capabilities
  • Graceful shutdown handling
  • Job progress tracking
  • Comprehensive error handling
  • RabbitMQ support with persistent message delivery
  • Scheduled job execution
  • Cron-based scheduling (using enhanced cron expressions with optional seconds support)
  • Interval-based scheduling (using time.Duration)
  • Task registration and management
  • Error recovery and retries
  • Context-aware execution

Queue Manager Example:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/valentin-kaiser/go-core/queue"
)

func main() {
	// Create a new queue manager
	manager := queue.NewManager()

	// Register a job handler
	manager.RegisterHandler("email", func(ctx context.Context, job *queue.Job) error {
		fmt.Printf("Processing email job: %s\n", job.ID)
		return nil
	})

	// Start the manager
	if err := manager.Start(context.Background()); err != nil {
		panic(err)
	}
	defer manager.Stop()

	// Enqueue a job
	job := queue.NewJob("email").
		WithPayload(map[string]interface{}{
			"to":      "user@example.com",
			"subject": "Welcome!",
		}).
		Build()

	if err := manager.Enqueue(context.Background(), job); err != nil {
		panic(err)
	}

	// Wait for processing
	time.Sleep(time.Second)
}

Task Scheduler Example:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/valentin-kaiser/go-core/queue"
)

func main() {
	// Create a new task scheduler
	scheduler := queue.NewTaskScheduler()

	// Register a cron-based task (5 fields - traditional)
	scheduler.RegisterCronTask("cleanup", "0 0 * * *", func(ctx context.Context) error {
		fmt.Println("Running daily cleanup task")
		return nil
	})

	// Register a cron-based task with seconds (6 fields)
	scheduler.RegisterCronTask("frequent", "*/30 * * * * *", func(ctx context.Context) error {
		fmt.Println("Running every 30 seconds")
		return nil
	})

	// Register a predefined expression
	scheduler.RegisterCronTask("hourly", "@hourly", func(ctx context.Context) error {
		fmt.Println("Running every hour")
		return nil
	})

	// Register a task with named months and days
	scheduler.RegisterCronTask("weekday", "0 0 9 * * MON-FRI", func(ctx context.Context) error {
		fmt.Println("Running on weekdays at 9 AM")
		return nil
	})

	// Register an interval-based task
	scheduler.RegisterIntervalTask("health-check", time.Minute*5, func(ctx context.Context) error {
		fmt.Println("Running health check")
		return nil
	})

	// Start the scheduler
	if err := scheduler.Start(context.Background()); err != nil {
		panic(err)
	}
	defer scheduler.Stop()

	// Keep the program running
	select {}
}

Index

Constants

This section is empty.

Variables

View Source
var (
	// Presets are predefined cron expressions for common schedules
	Presets = map[string]string{
		"@yearly":   "0 0 0 1 1 *",
		"@annually": "0 0 0 1 1 *",
		"@monthly":  "0 0 0 1 * *",
		"@weekly":   "0 0 0 * * 0",
		"@daily":    "0 0 0 * * *",
		"@midnight": "0 0 0 * * *",
		"@hourly":   "0 0 * * * *",
	}
	// Months is a map of month names to their numeric values
	Months = map[string]int{
		"JAN": 1, "FEB": 2, "MAR": 3, "APR": 4, "MAY": 5, "JUN": 6,
		"JUL": 7, "AUG": 8, "SEP": 9, "OCT": 10, "NOV": 11, "DEC": 12,
	}
	// Days is a map of day names to their numeric values
	Days = map[string]int{
		"SUN": 0, "MON": 1, "TUE": 2, "WED": 3, "THU": 4, "FRI": 5, "SAT": 6,
	}
)
View Source
var ErrNoJobAvailable = apperror.NewError("no job available")

ErrNoJobAvailable is returned when no job is available in the queue

Functions

func IsRetryable

func IsRetryable(err error) bool

IsRetryable checks if an error is a retryable error that should trigger job retry. Returns true if the error is of type RetryableError, which indicates the job should be retried according to the configured retry policy instead of being marked as failed.

Example usage:

if err := processJob(); err != nil {
	if queue.IsRetryable(err) {
		// Job will be automatically retried
	} else {
		// Job will be marked as failed
	}
}

Types

type Batch

type Batch struct {
	ID          string            `json:"id"`
	Name        string            `json:"name"`
	JobIDs      []string          `json:"job_ids"`
	Status      Status            `json:"status"`
	Total       int               `json:"total"`
	Pending     int               `json:"pending"`
	Running     int               `json:"running"`
	Completed   int               `json:"completed"`
	Failed      int               `json:"failed"`
	CreatedAt   time.Time         `json:"created_at"`
	CompletedAt time.Time         `json:"completed_at,omitempty"`
	Metadata    map[string]string `json:"metadata,omitempty"`
}

Batch represents a batch of jobs

type BatchManager

type BatchManager struct {
	// contains filtered or unexported fields
}

BatchManager manages job batches

func NewBatchManager

func NewBatchManager(manager *Manager) *BatchManager

NewBatchManager creates a new batch manager for handling grouped job operations. Batch managers allow you to group related jobs together and track their collective progress, providing operations like creating batches, monitoring batch completion, and handling batch-level callbacks.

Example usage:

batchMgr := queue.NewBatchManager(manager)
batch := batchMgr.CreateBatch("user-emails")
// Add jobs to batch...

func (*BatchManager) CreateBatch

func (bm *BatchManager) CreateBatch(ctx context.Context, name string, jobs []*Job) (*Batch, error)

CreateBatch creates a new batch of jobs

func (*BatchManager) DeleteBatch

func (bm *BatchManager) DeleteBatch(ctx context.Context, id string) error

DeleteBatch removes a batch

func (*BatchManager) GetBatch

func (bm *BatchManager) GetBatch(id string) (*Batch, error)

GetBatch retrieves a batch by ID

func (*BatchManager) GetBatches

func (bm *BatchManager) GetBatches() []*Batch

GetBatches returns all batches

func (*BatchManager) UpdateBatchStatus

func (bm *BatchManager) UpdateBatchStatus(ctx context.Context, batchID string) error

UpdateBatchStatus updates the status of a batch based on its jobs

type CronExpression

type CronExpression struct {
	Second    *CronField // Optional seconds field (0-59)
	Minute    CronField  // Minute field (0-59)
	Hour      CronField  // Hour field (0-23)
	Day       CronField  // Day field (1-31)
	Month     CronField  // Month field (1-12)
	DayOfWeek CronField  // Day of week field (0-6, 0 = Sunday)
}

CronExpression represents a parsed cron expression

type CronField

type CronField struct {
	Min, Max int
	Values   []int
}

CronField represents a field in a cron expression

type Job

type Job struct {
	ID          string            `json:"id"`
	Type        string            `json:"type"`
	Priority    Priority          `json:"priority"`
	Status      Status            `json:"status"`
	Attempts    int               `json:"attempts"`
	MaxAttempts int               `json:"max_attempts"`
	Progress    float64           `json:"progress"`
	CreatedAt   time.Time         `json:"created_at"`
	UpdatedAt   time.Time         `json:"updated_at"`
	CompletedAt time.Time         `json:"completed_at,omitempty"`
	ScheduleAt  time.Time         `json:"schedule_at,omitempty"`
	RetryAt     time.Time         `json:"retry_at,omitempty"`
	Timeout     time.Duration     `json:"timeout"`
	Payload     json.RawMessage   `json:"payload,omitempty"`
	Results     json.RawMessage   `json:"results,omitempty"`
	Error       string            `json:"error,omitempty"`
	Metadata    map[string]string `json:"metadata,omitempty"`
}

Job represents a job to be processed

func (*Job) IsExpired

func (j *Job) IsExpired() bool

IsExpired returns true if the job has exceeded its maximum attempts

func (*Job) IsScheduled

func (j *Job) IsScheduled() bool

IsScheduled returns true if the job is scheduled for a future time

type JobBuilder

type JobBuilder struct {
	// contains filtered or unexported fields
}

JobBuilder helps create jobs with a fluent API

func NewJob

func NewJob(jobType string) *JobBuilder

NewJob creates a new job builder

func (*JobBuilder) Build

func (jb *JobBuilder) Build() *Job

Build returns the constructed job

func (*JobBuilder) WithDelay

func (jb *JobBuilder) WithDelay(delay time.Duration) *JobBuilder

WithDelay schedules the job to run after a delay

func (*JobBuilder) WithID

func (jb *JobBuilder) WithID(id string) *JobBuilder

WithID sets the job ID

func (*JobBuilder) WithJSONPayload

func (jb *JobBuilder) WithJSONPayload(jsonData []byte) *JobBuilder

WithJSONPayload sets the job payload from JSON

func (*JobBuilder) WithMaxAttempts

func (jb *JobBuilder) WithMaxAttempts(maxAttempts int) *JobBuilder

WithMaxAttempts sets the maximum number of attempts

func (*JobBuilder) WithMetadata

func (jb *JobBuilder) WithMetadata(key, value string) *JobBuilder

WithMetadata sets job metadata

func (*JobBuilder) WithPayload

func (jb *JobBuilder) WithPayload(payload map[string]interface{}) *JobBuilder

WithPayload sets the job payload

func (*JobBuilder) WithPriority

func (jb *JobBuilder) WithPriority(priority Priority) *JobBuilder

WithPriority sets the job priority

func (*JobBuilder) WithScheduleAt

func (jb *JobBuilder) WithScheduleAt(scheduleAt time.Time) *JobBuilder

WithScheduleAt schedules the job for a specific time

type JobContext

type JobContext struct {
	Job *Job
}

JobContext provides context for job execution

func NewJobContext

func NewJobContext(_ context.Context, job *Job) *JobContext

NewJobContext creates a new job context that provides access to job information and progress reporting capabilities within job handlers. The context allows handlers to update job progress and access job metadata during execution.

Example usage in a job handler:

func myHandler(ctx *queue.JobContext) error {
	ctx.ReportProgress(0.5) // 50% complete
	// do work...
	ctx.ReportProgress(1.0) // 100% complete
	return nil
}

func (*JobContext) GetMetadata

func (jc *JobContext) GetMetadata(key string) (string, bool)

GetMetadata gets a metadata value

func (*JobContext) GetPayload

func (jc *JobContext) GetPayload(key string) (interface{}, bool)

GetPayload gets a value from the job payload

func (*JobContext) GetPayloadBool

func (jc *JobContext) GetPayloadBool(key string) (bool, bool)

GetPayloadBool gets a bool value from the job payload

func (*JobContext) GetPayloadInt

func (jc *JobContext) GetPayloadInt(key string) (int, bool)

GetPayloadInt gets an int value from the job payload

func (*JobContext) GetPayloadString

func (jc *JobContext) GetPayloadString(key string) (string, bool)

GetPayloadString gets a string value from the job payload

func (*JobContext) ReportProgress

func (jc *JobContext) ReportProgress(progress float64)

ReportProgress reports job progress

type JobHandler

type JobHandler func(ctx context.Context, job *Job) error

JobHandler is a function that processes jobs

func LoggingMiddleware

func LoggingMiddleware(next JobHandler) JobHandler

LoggingMiddleware logs job execution

func MetricsMiddleware

func MetricsMiddleware(next JobHandler) JobHandler

MetricsMiddleware tracks job metrics

func MiddlewareChain

func MiddlewareChain(handler JobHandler, middlewares ...Middleware) JobHandler

MiddlewareChain applies multiple middlewares to a job handler

func RecoveryMiddleware

func RecoveryMiddleware(next JobHandler) JobHandler

RecoveryMiddleware recovers from panics in job handlers

type JobProgress

type JobProgress struct {
	JobID    string  `json:"job_id"`
	Progress float64 `json:"progress"`
	Message  string  `json:"message,omitempty"`
}

JobProgress represents job progress information

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages the job queue and workers

func NewManager

func NewManager() *Manager

NewManager creates a new queue manager with default settings

func (*Manager) Enqueue

func (m *Manager) Enqueue(ctx context.Context, job *Job) error

Enqueue adds a job to the queue

func (*Manager) GetJob

func (m *Manager) GetJob(ctx context.Context, id string) (*Job, error)

GetJob retrieves a job by ID

func (*Manager) GetJobs

func (m *Manager) GetJobs(ctx context.Context, status Status, limit int) ([]*Job, error)

GetJobs retrieves jobs by status

func (*Manager) GetStats

func (m *Manager) GetStats() *Stats

GetStats returns current queue statistics

func (*Manager) IsRunning

func (m *Manager) IsRunning() bool

IsRunning returns true if the manager is currently running

func (*Manager) RegisterHandler

func (m *Manager) RegisterHandler(jobType string, handler JobHandler)

RegisterHandler registers a job handler for a specific job type

func (*Manager) Schedule

func (m *Manager) Schedule(ctx context.Context, job *Job) error

Schedule adds a scheduled job to the queue

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

Start starts the queue manager

func (*Manager) Stop

func (m *Manager) Stop() error

Stop stops the queue manager gracefully

func (*Manager) WithQueue

func (m *Manager) WithQueue(queue Queue) *Manager

WithQueue sets the queue implementation

func (*Manager) WithRabbitMQ

func (m *Manager) WithRabbitMQ(config RabbitMQConfig) *Manager

WithRabbitMQ sets the queue to use RabbitMQ with the given configuration

func (*Manager) WithRabbitMQFromURL

func (m *Manager) WithRabbitMQFromURL(url string) *Manager

WithRabbitMQFromURL sets the queue to use RabbitMQ with the given URL

func (*Manager) WithRetryAttempts

func (m *Manager) WithRetryAttempts(attempts int) *Manager

WithRetryAttempts sets the maximum number of retry attempts

func (*Manager) WithRetryBackoff

func (m *Manager) WithRetryBackoff(backoff float64) *Manager

WithRetryBackoff sets the retry backoff multiplier

func (*Manager) WithRetryDelay

func (m *Manager) WithRetryDelay(delay time.Duration) *Manager

WithRetryDelay sets the retry delay

func (*Manager) WithScheduleInterval

func (m *Manager) WithScheduleInterval(interval time.Duration) *Manager

WithScheduleInterval sets the interval for checking scheduled jobs

func (*Manager) WithWorkers

func (m *Manager) WithWorkers(workers int) *Manager

WithWorkers sets the number of workers

type MemoryQueue

type MemoryQueue struct {
	// contains filtered or unexported fields
}

MemoryQueue implements an in-memory job queue with priority support

func NewMemoryQueue

func NewMemoryQueue() *MemoryQueue

NewMemoryQueue creates and initializes a new in-memory job queue with priority support.

Thread-safety: The queue is thread-safe, utilizing a sync.RWMutex for concurrent access and a channel for notifications. This ensures safe operations in multi-threaded environments.

Intended use cases: The queue is suitable for managing jobs in memory, particularly in scenarios requiring priority-based scheduling and thread-safe operations. It is ideal for applications where job persistence is not required, and all jobs can be managed in memory.

func (*MemoryQueue) Close

func (mq *MemoryQueue) Close() error

Close closes the queue

func (*MemoryQueue) DeleteJob

func (mq *MemoryQueue) DeleteJob(_ context.Context, id string) error

DeleteJob removes a job from the queue

func (*MemoryQueue) Dequeue

func (mq *MemoryQueue) Dequeue(ctx context.Context, timeout time.Duration) (*Job, error)

Dequeue removes and returns the next job from the queue

func (*MemoryQueue) Enqueue

func (mq *MemoryQueue) Enqueue(_ context.Context, job *Job) error

Enqueue adds a job to the queue

func (*MemoryQueue) GetJob

func (mq *MemoryQueue) GetJob(_ context.Context, id string) (*Job, error)

GetJob retrieves a job by ID

func (*MemoryQueue) GetJobs

func (mq *MemoryQueue) GetJobs(_ context.Context, status Status, limit int) ([]*Job, error)

GetJobs retrieves jobs by status

func (*MemoryQueue) GetStats

func (mq *MemoryQueue) GetStats(_ context.Context) (*Stats, error)

GetStats returns queue statistics

func (*MemoryQueue) Schedule

func (mq *MemoryQueue) Schedule(_ context.Context, job *Job) error

Schedule adds a scheduled job to the queue

func (*MemoryQueue) UpdateJob

func (mq *MemoryQueue) UpdateJob(_ context.Context, job *Job) error

UpdateJob updates an existing job

type Middleware

type Middleware func(JobHandler) JobHandler

Middleware is a function that wraps a JobHandler

func TimeoutMiddleware

func TimeoutMiddleware(timeout time.Duration) Middleware

TimeoutMiddleware adds timeout to job execution

type Priority

type Priority int

Priority defines the priority levels for jobs

const (
	// PriorityLow represents the lowest priority level
	PriorityLow Priority = iota
	// PriorityNormal represents the normal priority level
	PriorityNormal
	// PriorityHigh represents the high priority level
	PriorityHigh
	// PriorityCritical represents the critical priority level
	PriorityCritical
)

func (Priority) String

func (p Priority) String() string

String returns the string representation of the priority

type Queue

type Queue interface {
	Enqueue(ctx context.Context, job *Job) error
	Dequeue(ctx context.Context, timeout time.Duration) (*Job, error)
	Schedule(ctx context.Context, job *Job) error
	UpdateJob(ctx context.Context, job *Job) error
	GetJob(ctx context.Context, id string) (*Job, error)
	GetJobs(ctx context.Context, status Status, limit int) ([]*Job, error)
	GetStats(ctx context.Context) (*Stats, error)
	DeleteJob(ctx context.Context, id string) error
	Close() error
}

Queue defines the interface for job queues

type RabbitMQ

type RabbitMQ struct {
	// contains filtered or unexported fields
}

RabbitMQ implements a RabbitMQ-backed job queue

func NewRabbitMQ

func NewRabbitMQ(config RabbitMQConfig) (*RabbitMQ, error)

NewRabbitMQ creates a new RabbitMQ-backed queue.

Configuration Parameters: - URL: The RabbitMQ server URL. This is required for establishing a connection. - QueueName: The name of the queue to use. This is required and must be unique. - ExchangeName: The name of the exchange to bind the queue to. This is required. - RoutingKey: The routing key for binding the queue to the exchange. This is required. - Durable: If true, the queue will survive a broker restart. - AutoDelete: If true, the queue will be deleted when no consumers are connected. - Exclusive: If true, the queue will be used by only one connection and deleted when the connection closes. - NoWait: If true, the queue declaration will not wait for a server response. - MaxPriority: The maximum priority level for the queue (0-255). Defaults to 10 if not specified.

Connection Setup: The function establishes a connection to the RabbitMQ server using the provided URL. It then declares a queue with the specified configuration parameters and binds it to the exchange.

Error Conditions: - Missing or empty URL, QueueName, ExchangeName, or RoutingKey will result in an error. - Connection failures (e.g., network issues, invalid URL) will return a wrapped error. - Invalid MaxPriority values (outside the range 0-255) may cause unexpected behavior.

func NewRabbitMQFromURL

func NewRabbitMQFromURL(url string) (*RabbitMQ, error)

NewRabbitMQFromURL creates a new RabbitMQ queue with a simple URL

func (*RabbitMQ) Close

func (rq *RabbitMQ) Close() error

Close closes the RabbitMQ connection

func (*RabbitMQ) DeleteJob

func (rq *RabbitMQ) DeleteJob(_ context.Context, id string) error

DeleteJob removes a job from the queue

func (*RabbitMQ) Dequeue

func (rq *RabbitMQ) Dequeue(ctx context.Context, timeout time.Duration) (*Job, error)

Dequeue retrieves a job from the queue

func (*RabbitMQ) Enqueue

func (rq *RabbitMQ) Enqueue(ctx context.Context, job *Job) error

Enqueue adds a job to the queue

func (*RabbitMQ) GetJob

func (rq *RabbitMQ) GetJob(_ context.Context, id string) (*Job, error)

GetJob retrieves a job by ID

func (*RabbitMQ) GetJobs

func (rq *RabbitMQ) GetJobs(_ context.Context, status Status, limit int) ([]*Job, error)

GetJobs retrieves jobs by status

func (*RabbitMQ) GetStats

func (rq *RabbitMQ) GetStats(_ context.Context) (*Stats, error)

GetStats returns queue statistics

func (*RabbitMQ) IsConnectionOpen

func (rq *RabbitMQ) IsConnectionOpen() bool

IsConnectionOpen checks if the RabbitMQ connection is open

func (*RabbitMQ) PurgeQueue

func (rq *RabbitMQ) PurgeQueue(_ context.Context) error

PurgeQueue removes all messages from the queue

func (*RabbitMQ) Reconnect

func (rq *RabbitMQ) Reconnect(config RabbitMQConfig) error

Reconnect attempts to reconnect to RabbitMQ

func (*RabbitMQ) Schedule

func (rq *RabbitMQ) Schedule(ctx context.Context, job *Job) error

Schedule adds a job to be processed at a specific time

func (*RabbitMQ) UpdateJob

func (rq *RabbitMQ) UpdateJob(_ context.Context, job *Job) error

UpdateJob updates a job's status

type RabbitMQConfig

type RabbitMQConfig struct {
	URL          string
	QueueName    string
	ExchangeName string
	RoutingKey   string
	Durable      bool
	AutoDelete   bool
	Exclusive    bool
	NoWait       bool
	MaxPriority  int // Maximum priority level for the queue (0-255)
}

RabbitMQConfig holds configuration for RabbitMQ queue

type RedisQueue

type RedisQueue struct {
	// contains filtered or unexported fields
}

RedisQueue implements a Redis-backed job queue

func NewRedisQueue

func NewRedisQueue(client redis.Cmdable, keyPrefix string) *RedisQueue

NewRedisQueue creates a new Redis-backed queue.

Parameters: - client: A Redis client implementing the redis.Cmdable interface, used to interact with the Redis database. - keyPrefix: A string used as a prefix for all Redis keys created by the queue. If an empty string is provided, the default prefix "queue" is used.

Purpose: This function initializes a RedisQueue instance, which provides methods for enqueueing, dequeueing, and scheduling jobs in a Redis-backed job queue.

func (*RedisQueue) Close

func (rq *RedisQueue) Close() error

Close closes the queue

func (*RedisQueue) DeleteJob

func (rq *RedisQueue) DeleteJob(ctx context.Context, id string) error

DeleteJob removes a job from the queue

func (*RedisQueue) Dequeue

func (rq *RedisQueue) Dequeue(ctx context.Context, timeout time.Duration) (*Job, error)

Dequeue removes and returns the next job from the queue

func (*RedisQueue) Enqueue

func (rq *RedisQueue) Enqueue(ctx context.Context, job *Job) error

Enqueue adds a job to the queue

func (*RedisQueue) GetJob

func (rq *RedisQueue) GetJob(ctx context.Context, id string) (*Job, error)

GetJob retrieves a job by ID

func (*RedisQueue) GetJobs

func (rq *RedisQueue) GetJobs(ctx context.Context, status Status, limit int) ([]*Job, error)

GetJobs retrieves jobs by status

func (*RedisQueue) GetStats

func (rq *RedisQueue) GetStats(ctx context.Context) (*Stats, error)

GetStats returns queue statistics

func (*RedisQueue) MoveScheduledToPending

func (rq *RedisQueue) MoveScheduledToPending(ctx context.Context) error

MoveScheduledToPending moves scheduled jobs that are ready to be processed

func (*RedisQueue) Schedule

func (rq *RedisQueue) Schedule(ctx context.Context, job *Job) error

Schedule adds a scheduled job to the queue

func (*RedisQueue) UpdateJob

func (rq *RedisQueue) UpdateJob(ctx context.Context, job *Job) error

UpdateJob updates an existing job

type RetryableError

type RetryableError struct {
	Err error
}

RetryableError represents an error that should trigger a retry

func NewRetryableError

func NewRetryableError(err error) *RetryableError

NewRetryableError wraps an error to indicate that a job should be retried. When a job handler returns a RetryableError, the job manager will automatically retry the job according to the configured retry policy, instead of marking it as failed.

Example usage:

func myJobHandler(job *queue.Job) error {
	if err := doSomething(); err != nil {
		if isTemporaryError(err) {
			return queue.NewRetryableError(err) // Will retry
		}
		return err // Will fail permanently
	}
	return nil
}

func (*RetryableError) Error

func (e *RetryableError) Error() string

type Stats

type Stats struct {
	JobsProcessed int64 `json:"jobs_processed"`
	JobsFailed    int64 `json:"jobs_failed"`
	JobsRetried   int64 `json:"jobs_retried"`
	QueueSize     int64 `json:"queue_size"`
	WorkersActive int64 `json:"workers_active"`
	WorkersBusy   int64 `json:"workers_busy"`
	TotalJobs     int64 `json:"total_jobs"`
	Pending       int64 `json:"pending"`
	Running       int64 `json:"running"`
	Completed     int64 `json:"completed"`
	Failed        int64 `json:"failed"`
	Retrying      int64 `json:"retrying"`
	Scheduled     int64 `json:"scheduled"`
	DeadLetter    int64 `json:"dead_letter"`
}

Stats represents queue statistics

type Status

type Status int

Status represents the status of a job

const (
	// StatusPending indicates the job is pending execution
	StatusPending Status = iota
	// StatusRunning indicates the job is currently being executed
	StatusRunning
	// StatusCompleted indicates the job has completed successfully
	StatusCompleted
	// StatusFailed indicates the job has failed
	StatusFailed
	// StatusRetrying indicates the job is scheduled for retry
	StatusRetrying
	// StatusScheduled indicates the job is scheduled for future execution
	StatusScheduled
	// StatusDeadLetter indicates the job has been moved to dead letter queue
	StatusDeadLetter
)

func (Status) String

func (s Status) String() string

String returns the string representation of the status

type Task

type Task struct {
	ID                  string        `json:"id"`
	Name                string        `json:"name"`
	Type                TaskType      `json:"type"`
	CronSpec            string        `json:"cron_spec,omitempty"`
	Interval            time.Duration `json:"interval,omitempty"`
	Function            TaskFunc      `json:"-"`
	NextRun             time.Time     `json:"next_run"`
	LastRun             time.Time     `json:"last_run"`
	RunCount            int64         `json:"run_count"`
	ErrorCount          int64         `json:"error_count"`
	ConsecutiveFailures int64         `json:"consecutive_failures"`
	LastError           string        `json:"last_error,omitempty"`
	IsRunning           bool          `json:"is_running"`
	Quiet               bool          `json:"log_on_first_failure_only"`
	AllowConcurrent     bool          `json:"allow_concurrent"`
	MaxRetries          int           `json:"max_retries"`
	RetryDelay          time.Duration `json:"retry_delay"`
	Timeout             time.Duration `json:"timeout"`
	Enabled             bool          `json:"enabled"`
	CreatedAt           time.Time     `json:"created_at"`
	UpdatedAt           time.Time     `json:"updated_at"`
	// contains filtered or unexported fields
}

Task represents a scheduled task

type TaskFunc

type TaskFunc func(ctx context.Context) error

TaskFunc represents a task function that can be executed

type TaskOptions

type TaskOptions struct {
	// MaxRetries specifies the maximum number of retries for a task default is 0 (no retries)
	MaxRetries int
	// RetryDelay specifies the delay between retries (default is 5 seconds)
	RetryDelay time.Duration
	// Timeout specifies the maximum duration for task execution (default is 5 minutes)
	Timeout time.Duration
	// Concurrent specifies whether the task can run concurrently (default is false)
	Concurrent bool
	// Immediately specifies whether the task should run immediately upon registration (default is false)
	Immediately bool
	// Quiet specifies whether to log only the first failure in a series of consecutive failures (default is false)
	Quiet bool
}

TaskOptions provides configuration options for tasks

type TaskScheduler

type TaskScheduler struct {
	// contains filtered or unexported fields
}

TaskScheduler manages background tasks

func NewTaskScheduler

func NewTaskScheduler() *TaskScheduler

NewTaskScheduler creates a new task scheduler with default settings

func (*TaskScheduler) DisableTask

func (s *TaskScheduler) DisableTask(name string) error

DisableTask disables a task

func (*TaskScheduler) EnableTask

func (s *TaskScheduler) EnableTask(name string) error

EnableTask enables a task

func (*TaskScheduler) GetTask

func (s *TaskScheduler) GetTask(name string) (*Task, error)

GetTask returns a task by name

func (*TaskScheduler) GetTasks

func (s *TaskScheduler) GetTasks() map[string]*Task

GetTasks returns all registered tasks

func (*TaskScheduler) IsRunning

func (s *TaskScheduler) IsRunning() bool

IsRunning returns true if the scheduler is running

func (*TaskScheduler) ParseCronSpec

func (s *TaskScheduler) ParseCronSpec(cronSpec string) (*CronExpression, error)

ParseCronSpec parses a cron specification

func (*TaskScheduler) RegisterCronTask

func (s *TaskScheduler) RegisterCronTask(name, cronSpec string, fn TaskFunc) error

RegisterCronTask registers a new cron-based task

func (*TaskScheduler) RegisterCronTaskWithOptions

func (s *TaskScheduler) RegisterCronTaskWithOptions(name, cronSpec string, fn TaskFunc, options TaskOptions) error

RegisterCronTaskWithOptions registers a new cron-based task with options

func (*TaskScheduler) RegisterIntervalTask

func (s *TaskScheduler) RegisterIntervalTask(name string, interval time.Duration, fn TaskFunc) error

RegisterIntervalTask registers a new interval-based task

func (*TaskScheduler) RegisterIntervalTaskWithOptions

func (s *TaskScheduler) RegisterIntervalTaskWithOptions(name string, interval time.Duration, fn TaskFunc, options TaskOptions) error

RegisterIntervalTaskWithOptions registers a new interval-based task with options

func (*TaskScheduler) RegisterOrRescheduleCronTask

func (s *TaskScheduler) RegisterOrRescheduleCronTask(name, cronSpec string, fn TaskFunc) error

RegisterOrRescheduleCronTask registers a new cron-based task or reschedules an existing one

func (*TaskScheduler) RegisterOrRescheduleCronTaskWithOptions

func (s *TaskScheduler) RegisterOrRescheduleCronTaskWithOptions(name, cronSpec string, fn TaskFunc, options TaskOptions) error

RegisterOrRescheduleCronTaskWithOptions registers a new cron-based task or reschedules an existing one with options

func (*TaskScheduler) RegisterOrRescheduleIntervalTask

func (s *TaskScheduler) RegisterOrRescheduleIntervalTask(name string, interval time.Duration, fn TaskFunc) error

RegisterOrRescheduleIntervalTask registers a new interval-based task or reschedules an existing one

func (*TaskScheduler) RegisterOrRescheduleIntervalTaskWithOptions

func (s *TaskScheduler) RegisterOrRescheduleIntervalTaskWithOptions(name string, interval time.Duration, fn TaskFunc, options TaskOptions) error

RegisterOrRescheduleIntervalTaskWithOptions registers a new interval-based task or reschedules an existing one with options

func (*TaskScheduler) RemoveTask

func (s *TaskScheduler) RemoveTask(name string) error

RemoveTask removes a task from the scheduler

func (*TaskScheduler) RescheduleTaskWithCron

func (s *TaskScheduler) RescheduleTaskWithCron(name, cronSpec string) error

RescheduleTaskWithCron reschedules an existing task with a new cron specification

func (*TaskScheduler) RescheduleTaskWithInterval

func (s *TaskScheduler) RescheduleTaskWithInterval(name string, interval time.Duration) error

RescheduleTaskWithInterval reschedules an existing task with a new interval

func (*TaskScheduler) Start

func (s *TaskScheduler) Start(ctx context.Context) error

Start starts the task scheduler

func (*TaskScheduler) Stop

func (s *TaskScheduler) Stop()

Stop stops the task scheduler

func (*TaskScheduler) ValidateCronSpec

func (s *TaskScheduler) ValidateCronSpec(spec string) error

ValidateCronSpec validates a cron specification.

Purpose: This function checks whether the provided cron specification is valid. It supports both standard cron formats (5 or 6 fields) and predefined expressions.

Supported formats: - Standard cron expressions with 5 fields (minute, hour, day, month, day of week). - Extended cron expressions with 6 fields (second, minute, hour, day, month, day of week). - Predefined expressions such as "@yearly", "@daily", "@hourly", etc., which are mapped to standard cron strings.

Validation rules: - Ensures the cron syntax is correct. - Maps predefined expressions to their equivalent cron strings. - Delegates parsing and validation to the parseCronSpec function.

func (*TaskScheduler) WithCheckInterval

func (s *TaskScheduler) WithCheckInterval(interval time.Duration) *TaskScheduler

WithCheckInterval sets the interval for checking scheduled tasks

func (*TaskScheduler) WithDefaultTimeout

func (s *TaskScheduler) WithDefaultTimeout(timeout time.Duration) *TaskScheduler

WithDefaultTimeout sets the default timeout for task execution

func (*TaskScheduler) WithRetryDelay

func (s *TaskScheduler) WithRetryDelay(delay time.Duration) *TaskScheduler

WithRetryDelay sets the delay between retries

type TaskType

type TaskType int

TaskType represents the type of task scheduling

const (
	// TaskTypeCron represents cron-based task scheduling
	TaskTypeCron TaskType = iota
	// TaskTypeInterval represents interval-based task scheduling
	TaskTypeInterval
)

func (TaskType) String

func (t TaskType) String() string

String returns the string representation of the task type

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL