Documentation
¶
Overview ¶
Package queue provides a comprehensive task queue and background job processing system with advanced scheduling capabilities.
It supports in-memory, Redis, and RabbitMQ queues with features like:
- Priority-based job scheduling
- Worker pool management
- Retry mechanisms with exponential backoff
- Middleware support for logging, metrics, and recovery
- Batch processing capabilities
- Graceful shutdown handling
- Job progress tracking
- Comprehensive error handling
- RabbitMQ support with persistent message delivery
- Scheduled job execution
- Cron-based scheduling (using enhanced cron expressions with optional seconds support)
- Interval-based scheduling (using time.Duration)
- Task registration and management
- Error recovery and retries
- Context-aware execution
Queue Manager Example:
package main
import (
"context"
"fmt"
"time"
"github.com/valentin-kaiser/go-core/queue"
)
func main() {
// Create a new queue manager
manager := queue.NewManager()
// Register a job handler
manager.RegisterHandler("email", func(ctx context.Context, job *queue.Job) error {
fmt.Printf("Processing email job: %s\n", job.ID)
return nil
})
// Start the manager
if err := manager.Start(context.Background()); err != nil {
panic(err)
}
defer manager.Stop()
// Enqueue a job
job := queue.NewJob("email").
WithPayload(map[string]interface{}{
"to": "user@example.com",
"subject": "Welcome!",
}).
Build()
if err := manager.Enqueue(context.Background(), job); err != nil {
panic(err)
}
// Wait for processing
time.Sleep(time.Second)
}
Task Scheduler Example:
package main
import (
"context"
"fmt"
"time"
"github.com/valentin-kaiser/go-core/queue"
)
func main() {
// Create a new task scheduler
scheduler := queue.NewTaskScheduler()
// Register a cron-based task (5 fields - traditional)
scheduler.RegisterCronTask("cleanup", "0 0 * * *", func(ctx context.Context) error {
fmt.Println("Running daily cleanup task")
return nil
})
// Register a cron-based task with seconds (6 fields)
scheduler.RegisterCronTask("frequent", "*/30 * * * * *", func(ctx context.Context) error {
fmt.Println("Running every 30 seconds")
return nil
})
// Register a predefined expression
scheduler.RegisterCronTask("hourly", "@hourly", func(ctx context.Context) error {
fmt.Println("Running every hour")
return nil
})
// Register a task with named months and days
scheduler.RegisterCronTask("weekday", "0 0 9 * * MON-FRI", func(ctx context.Context) error {
fmt.Println("Running on weekdays at 9 AM")
return nil
})
// Register an interval-based task
scheduler.RegisterIntervalTask("health-check", time.Minute*5, func(ctx context.Context) error {
fmt.Println("Running health check")
return nil
})
// Start the scheduler
if err := scheduler.Start(context.Background()); err != nil {
panic(err)
}
defer scheduler.Stop()
// Keep the program running
select {}
}
Index ¶
- Variables
- func IsRetryable(err error) bool
- type Batch
- type BatchManager
- func (bm *BatchManager) CreateBatch(ctx context.Context, name string, jobs []*Job) (*Batch, error)
- func (bm *BatchManager) DeleteBatch(ctx context.Context, id string) error
- func (bm *BatchManager) GetBatch(id string) (*Batch, error)
- func (bm *BatchManager) GetBatches() []*Batch
- func (bm *BatchManager) UpdateBatchStatus(ctx context.Context, batchID string) error
- type CronExpression
- type CronField
- type Job
- type JobBuilder
- func (jb *JobBuilder) Build() *Job
- func (jb *JobBuilder) WithDelay(delay time.Duration) *JobBuilder
- func (jb *JobBuilder) WithID(id string) *JobBuilder
- func (jb *JobBuilder) WithJSONPayload(jsonData []byte) *JobBuilder
- func (jb *JobBuilder) WithMaxAttempts(maxAttempts int) *JobBuilder
- func (jb *JobBuilder) WithMetadata(key, value string) *JobBuilder
- func (jb *JobBuilder) WithPayload(payload map[string]interface{}) *JobBuilder
- func (jb *JobBuilder) WithPriority(priority Priority) *JobBuilder
- func (jb *JobBuilder) WithScheduleAt(scheduleAt time.Time) *JobBuilder
- type JobContext
- func (jc *JobContext) GetMetadata(key string) (string, bool)
- func (jc *JobContext) GetPayload(key string) (interface{}, bool)
- func (jc *JobContext) GetPayloadBool(key string) (bool, bool)
- func (jc *JobContext) GetPayloadInt(key string) (int, bool)
- func (jc *JobContext) GetPayloadString(key string) (string, bool)
- func (jc *JobContext) ReportProgress(progress float64)
- type JobHandler
- type JobProgress
- type Manager
- func (m *Manager) Enqueue(ctx context.Context, job *Job) error
- func (m *Manager) GetJob(ctx context.Context, id string) (*Job, error)
- func (m *Manager) GetJobs(ctx context.Context, status Status, limit int) ([]*Job, error)
- func (m *Manager) GetStats() *Stats
- func (m *Manager) IsRunning() bool
- func (m *Manager) RegisterHandler(jobType string, handler JobHandler)
- func (m *Manager) Schedule(ctx context.Context, job *Job) error
- func (m *Manager) Start(ctx context.Context) error
- func (m *Manager) Stop() error
- func (m *Manager) WithQueue(queue Queue) *Manager
- func (m *Manager) WithRabbitMQ(config RabbitMQConfig) *Manager
- func (m *Manager) WithRabbitMQFromURL(url string) *Manager
- func (m *Manager) WithRetryAttempts(attempts int) *Manager
- func (m *Manager) WithRetryBackoff(backoff float64) *Manager
- func (m *Manager) WithRetryDelay(delay time.Duration) *Manager
- func (m *Manager) WithScheduleInterval(interval time.Duration) *Manager
- func (m *Manager) WithWorkers(workers int) *Manager
- type MemoryQueue
- func (mq *MemoryQueue) Close() error
- func (mq *MemoryQueue) DeleteJob(_ context.Context, id string) error
- func (mq *MemoryQueue) Dequeue(ctx context.Context, timeout time.Duration) (*Job, error)
- func (mq *MemoryQueue) Enqueue(_ context.Context, job *Job) error
- func (mq *MemoryQueue) GetJob(_ context.Context, id string) (*Job, error)
- func (mq *MemoryQueue) GetJobs(_ context.Context, status Status, limit int) ([]*Job, error)
- func (mq *MemoryQueue) GetStats(_ context.Context) (*Stats, error)
- func (mq *MemoryQueue) Schedule(_ context.Context, job *Job) error
- func (mq *MemoryQueue) UpdateJob(_ context.Context, job *Job) error
- type Middleware
- type Priority
- type Queue
- type RabbitMQ
- func (rq *RabbitMQ) Close() error
- func (rq *RabbitMQ) DeleteJob(_ context.Context, id string) error
- func (rq *RabbitMQ) Dequeue(ctx context.Context, timeout time.Duration) (*Job, error)
- func (rq *RabbitMQ) Enqueue(ctx context.Context, job *Job) error
- func (rq *RabbitMQ) GetJob(_ context.Context, id string) (*Job, error)
- func (rq *RabbitMQ) GetJobs(_ context.Context, status Status, limit int) ([]*Job, error)
- func (rq *RabbitMQ) GetStats(_ context.Context) (*Stats, error)
- func (rq *RabbitMQ) IsConnectionOpen() bool
- func (rq *RabbitMQ) PurgeQueue(_ context.Context) error
- func (rq *RabbitMQ) Reconnect(config RabbitMQConfig) error
- func (rq *RabbitMQ) Schedule(ctx context.Context, job *Job) error
- func (rq *RabbitMQ) UpdateJob(_ context.Context, job *Job) error
- type RabbitMQConfig
- type RedisQueue
- func (rq *RedisQueue) Close() error
- func (rq *RedisQueue) DeleteJob(ctx context.Context, id string) error
- func (rq *RedisQueue) Dequeue(ctx context.Context, timeout time.Duration) (*Job, error)
- func (rq *RedisQueue) Enqueue(ctx context.Context, job *Job) error
- func (rq *RedisQueue) GetJob(ctx context.Context, id string) (*Job, error)
- func (rq *RedisQueue) GetJobs(ctx context.Context, status Status, limit int) ([]*Job, error)
- func (rq *RedisQueue) GetStats(ctx context.Context) (*Stats, error)
- func (rq *RedisQueue) MoveScheduledToPending(ctx context.Context) error
- func (rq *RedisQueue) Schedule(ctx context.Context, job *Job) error
- func (rq *RedisQueue) UpdateJob(ctx context.Context, job *Job) error
- type RetryableError
- type Stats
- type Status
- type Task
- type TaskFunc
- type TaskOptions
- type TaskScheduler
- func (s *TaskScheduler) DisableTask(name string) error
- func (s *TaskScheduler) EnableTask(name string) error
- func (s *TaskScheduler) GetTask(name string) (*Task, error)
- func (s *TaskScheduler) GetTasks() map[string]*Task
- func (s *TaskScheduler) IsRunning() bool
- func (s *TaskScheduler) ParseCronSpec(cronSpec string) (*CronExpression, error)
- func (s *TaskScheduler) RegisterCronTask(name, cronSpec string, fn TaskFunc) error
- func (s *TaskScheduler) RegisterCronTaskWithOptions(name, cronSpec string, fn TaskFunc, options TaskOptions) error
- func (s *TaskScheduler) RegisterIntervalTask(name string, interval time.Duration, fn TaskFunc) error
- func (s *TaskScheduler) RegisterIntervalTaskWithOptions(name string, interval time.Duration, fn TaskFunc, options TaskOptions) error
- func (s *TaskScheduler) RegisterOrRescheduleCronTask(name, cronSpec string, fn TaskFunc) error
- func (s *TaskScheduler) RegisterOrRescheduleCronTaskWithOptions(name, cronSpec string, fn TaskFunc, options TaskOptions) error
- func (s *TaskScheduler) RegisterOrRescheduleIntervalTask(name string, interval time.Duration, fn TaskFunc) error
- func (s *TaskScheduler) RegisterOrRescheduleIntervalTaskWithOptions(name string, interval time.Duration, fn TaskFunc, options TaskOptions) error
- func (s *TaskScheduler) RemoveTask(name string) error
- func (s *TaskScheduler) RescheduleTaskWithCron(name, cronSpec string) error
- func (s *TaskScheduler) RescheduleTaskWithInterval(name string, interval time.Duration) error
- func (s *TaskScheduler) Start(ctx context.Context) error
- func (s *TaskScheduler) Stop()
- func (s *TaskScheduler) ValidateCronSpec(spec string) error
- func (s *TaskScheduler) WithCheckInterval(interval time.Duration) *TaskScheduler
- func (s *TaskScheduler) WithDefaultTimeout(timeout time.Duration) *TaskScheduler
- func (s *TaskScheduler) WithRetryDelay(delay time.Duration) *TaskScheduler
- type TaskType
Constants ¶
This section is empty.
Variables ¶
var ( // Presets are predefined cron expressions for common schedules Presets = map[string]string{ "@yearly": "0 0 0 1 1 *", "@annually": "0 0 0 1 1 *", "@monthly": "0 0 0 1 * *", "@weekly": "0 0 0 * * 0", "@daily": "0 0 0 * * *", "@midnight": "0 0 0 * * *", "@hourly": "0 0 * * * *", } // Months is a map of month names to their numeric values Months = map[string]int{ "JAN": 1, "FEB": 2, "MAR": 3, "APR": 4, "MAY": 5, "JUN": 6, "JUL": 7, "AUG": 8, "SEP": 9, "OCT": 10, "NOV": 11, "DEC": 12, } // Days is a map of day names to their numeric values Days = map[string]int{ "SUN": 0, "MON": 1, "TUE": 2, "WED": 3, "THU": 4, "FRI": 5, "SAT": 6, } )
var ErrNoJobAvailable = apperror.NewError("no job available")
ErrNoJobAvailable is returned when no job is available in the queue
Functions ¶
func IsRetryable ¶
IsRetryable checks if an error is a retryable error that should trigger job retry. Returns true if the error is of type RetryableError, which indicates the job should be retried according to the configured retry policy instead of being marked as failed.
Example usage:
if err := processJob(); err != nil {
if queue.IsRetryable(err) {
// Job will be automatically retried
} else {
// Job will be marked as failed
}
}
Types ¶
type Batch ¶
type Batch struct {
ID string `json:"id"`
Name string `json:"name"`
JobIDs []string `json:"job_ids"`
Status Status `json:"status"`
Total int `json:"total"`
Pending int `json:"pending"`
Running int `json:"running"`
Completed int `json:"completed"`
Failed int `json:"failed"`
CreatedAt time.Time `json:"created_at"`
CompletedAt time.Time `json:"completed_at,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}
Batch represents a batch of jobs
type BatchManager ¶
type BatchManager struct {
// contains filtered or unexported fields
}
BatchManager manages job batches
func NewBatchManager ¶
func NewBatchManager(manager *Manager) *BatchManager
NewBatchManager creates a new batch manager for handling grouped job operations. Batch managers allow you to group related jobs together and track their collective progress, providing operations like creating batches, monitoring batch completion, and handling batch-level callbacks.
Example usage:
batchMgr := queue.NewBatchManager(manager)
batch := batchMgr.CreateBatch("user-emails")
// Add jobs to batch...
func (*BatchManager) CreateBatch ¶
CreateBatch creates a new batch of jobs
func (*BatchManager) DeleteBatch ¶
func (bm *BatchManager) DeleteBatch(ctx context.Context, id string) error
DeleteBatch removes a batch
func (*BatchManager) GetBatch ¶
func (bm *BatchManager) GetBatch(id string) (*Batch, error)
GetBatch retrieves a batch by ID
func (*BatchManager) GetBatches ¶
func (bm *BatchManager) GetBatches() []*Batch
GetBatches returns all batches
func (*BatchManager) UpdateBatchStatus ¶
func (bm *BatchManager) UpdateBatchStatus(ctx context.Context, batchID string) error
UpdateBatchStatus updates the status of a batch based on its jobs
type CronExpression ¶
type CronExpression struct {
Second *CronField // Optional seconds field (0-59)
Minute CronField // Minute field (0-59)
Hour CronField // Hour field (0-23)
Day CronField // Day field (1-31)
Month CronField // Month field (1-12)
DayOfWeek CronField // Day of week field (0-6, 0 = Sunday)
}
CronExpression represents a parsed cron expression
type Job ¶
type Job struct {
ID string `json:"id"`
Type string `json:"type"`
Priority Priority `json:"priority"`
Status Status `json:"status"`
Attempts int `json:"attempts"`
MaxAttempts int `json:"max_attempts"`
Progress float64 `json:"progress"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CompletedAt time.Time `json:"completed_at,omitempty"`
ScheduleAt time.Time `json:"schedule_at,omitempty"`
RetryAt time.Time `json:"retry_at,omitempty"`
Timeout time.Duration `json:"timeout"`
Payload json.RawMessage `json:"payload,omitempty"`
Results json.RawMessage `json:"results,omitempty"`
Error string `json:"error,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}
Job represents a job to be processed
func (*Job) IsScheduled ¶
IsScheduled returns true if the job is scheduled for a future time
type JobBuilder ¶
type JobBuilder struct {
// contains filtered or unexported fields
}
JobBuilder helps create jobs with a fluent API
func (*JobBuilder) WithDelay ¶
func (jb *JobBuilder) WithDelay(delay time.Duration) *JobBuilder
WithDelay schedules the job to run after a delay
func (*JobBuilder) WithID ¶
func (jb *JobBuilder) WithID(id string) *JobBuilder
WithID sets the job ID
func (*JobBuilder) WithJSONPayload ¶
func (jb *JobBuilder) WithJSONPayload(jsonData []byte) *JobBuilder
WithJSONPayload sets the job payload from JSON
func (*JobBuilder) WithMaxAttempts ¶
func (jb *JobBuilder) WithMaxAttempts(maxAttempts int) *JobBuilder
WithMaxAttempts sets the maximum number of attempts
func (*JobBuilder) WithMetadata ¶
func (jb *JobBuilder) WithMetadata(key, value string) *JobBuilder
WithMetadata sets job metadata
func (*JobBuilder) WithPayload ¶
func (jb *JobBuilder) WithPayload(payload map[string]interface{}) *JobBuilder
WithPayload sets the job payload
func (*JobBuilder) WithPriority ¶
func (jb *JobBuilder) WithPriority(priority Priority) *JobBuilder
WithPriority sets the job priority
func (*JobBuilder) WithScheduleAt ¶
func (jb *JobBuilder) WithScheduleAt(scheduleAt time.Time) *JobBuilder
WithScheduleAt schedules the job for a specific time
type JobContext ¶
type JobContext struct {
Job *Job
}
JobContext provides context for job execution
func NewJobContext ¶
func NewJobContext(_ context.Context, job *Job) *JobContext
NewJobContext creates a new job context that provides access to job information and progress reporting capabilities within job handlers. The context allows handlers to update job progress and access job metadata during execution.
Example usage in a job handler:
func myHandler(ctx *queue.JobContext) error {
ctx.ReportProgress(0.5) // 50% complete
// do work...
ctx.ReportProgress(1.0) // 100% complete
return nil
}
func (*JobContext) GetMetadata ¶
func (jc *JobContext) GetMetadata(key string) (string, bool)
GetMetadata gets a metadata value
func (*JobContext) GetPayload ¶
func (jc *JobContext) GetPayload(key string) (interface{}, bool)
GetPayload gets a value from the job payload
func (*JobContext) GetPayloadBool ¶
func (jc *JobContext) GetPayloadBool(key string) (bool, bool)
GetPayloadBool gets a bool value from the job payload
func (*JobContext) GetPayloadInt ¶
func (jc *JobContext) GetPayloadInt(key string) (int, bool)
GetPayloadInt gets an int value from the job payload
func (*JobContext) GetPayloadString ¶
func (jc *JobContext) GetPayloadString(key string) (string, bool)
GetPayloadString gets a string value from the job payload
func (*JobContext) ReportProgress ¶
func (jc *JobContext) ReportProgress(progress float64)
ReportProgress reports job progress
type JobHandler ¶
JobHandler is a function that processes jobs
func LoggingMiddleware ¶
func LoggingMiddleware(next JobHandler) JobHandler
LoggingMiddleware logs job execution
func MetricsMiddleware ¶
func MetricsMiddleware(next JobHandler) JobHandler
MetricsMiddleware tracks job metrics
func MiddlewareChain ¶
func MiddlewareChain(handler JobHandler, middlewares ...Middleware) JobHandler
MiddlewareChain applies multiple middlewares to a job handler
func RecoveryMiddleware ¶
func RecoveryMiddleware(next JobHandler) JobHandler
RecoveryMiddleware recovers from panics in job handlers
type JobProgress ¶
type JobProgress struct {
JobID string `json:"job_id"`
Progress float64 `json:"progress"`
Message string `json:"message,omitempty"`
}
JobProgress represents job progress information
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages the job queue and workers
func NewManager ¶
func NewManager() *Manager
NewManager creates a new queue manager with default settings
func (*Manager) RegisterHandler ¶
func (m *Manager) RegisterHandler(jobType string, handler JobHandler)
RegisterHandler registers a job handler for a specific job type
func (*Manager) WithRabbitMQ ¶
func (m *Manager) WithRabbitMQ(config RabbitMQConfig) *Manager
WithRabbitMQ sets the queue to use RabbitMQ with the given configuration
func (*Manager) WithRabbitMQFromURL ¶
WithRabbitMQFromURL sets the queue to use RabbitMQ with the given URL
func (*Manager) WithRetryAttempts ¶
WithRetryAttempts sets the maximum number of retry attempts
func (*Manager) WithRetryBackoff ¶
WithRetryBackoff sets the retry backoff multiplier
func (*Manager) WithRetryDelay ¶
WithRetryDelay sets the retry delay
func (*Manager) WithScheduleInterval ¶
WithScheduleInterval sets the interval for checking scheduled jobs
func (*Manager) WithWorkers ¶
WithWorkers sets the number of workers
type MemoryQueue ¶
type MemoryQueue struct {
// contains filtered or unexported fields
}
MemoryQueue implements an in-memory job queue with priority support
func NewMemoryQueue ¶
func NewMemoryQueue() *MemoryQueue
NewMemoryQueue creates and initializes a new in-memory job queue with priority support.
Thread-safety: The queue is thread-safe, utilizing a sync.RWMutex for concurrent access and a channel for notifications. This ensures safe operations in multi-threaded environments.
Intended use cases: The queue is suitable for managing jobs in memory, particularly in scenarios requiring priority-based scheduling and thread-safe operations. It is ideal for applications where job persistence is not required, and all jobs can be managed in memory.
func (*MemoryQueue) DeleteJob ¶
func (mq *MemoryQueue) DeleteJob(_ context.Context, id string) error
DeleteJob removes a job from the queue
func (*MemoryQueue) Enqueue ¶
func (mq *MemoryQueue) Enqueue(_ context.Context, job *Job) error
Enqueue adds a job to the queue
func (*MemoryQueue) GetStats ¶
func (mq *MemoryQueue) GetStats(_ context.Context) (*Stats, error)
GetStats returns queue statistics
type Middleware ¶
type Middleware func(JobHandler) JobHandler
Middleware is a function that wraps a JobHandler
func TimeoutMiddleware ¶
func TimeoutMiddleware(timeout time.Duration) Middleware
TimeoutMiddleware adds timeout to job execution
type Queue ¶
type Queue interface {
Enqueue(ctx context.Context, job *Job) error
Dequeue(ctx context.Context, timeout time.Duration) (*Job, error)
Schedule(ctx context.Context, job *Job) error
UpdateJob(ctx context.Context, job *Job) error
GetJob(ctx context.Context, id string) (*Job, error)
GetJobs(ctx context.Context, status Status, limit int) ([]*Job, error)
GetStats(ctx context.Context) (*Stats, error)
DeleteJob(ctx context.Context, id string) error
Close() error
}
Queue defines the interface for job queues
type RabbitMQ ¶
type RabbitMQ struct {
// contains filtered or unexported fields
}
RabbitMQ implements a RabbitMQ-backed job queue
func NewRabbitMQ ¶
func NewRabbitMQ(config RabbitMQConfig) (*RabbitMQ, error)
NewRabbitMQ creates a new RabbitMQ-backed queue.
Configuration Parameters: - URL: The RabbitMQ server URL. This is required for establishing a connection. - QueueName: The name of the queue to use. This is required and must be unique. - ExchangeName: The name of the exchange to bind the queue to. This is required. - RoutingKey: The routing key for binding the queue to the exchange. This is required. - Durable: If true, the queue will survive a broker restart. - AutoDelete: If true, the queue will be deleted when no consumers are connected. - Exclusive: If true, the queue will be used by only one connection and deleted when the connection closes. - NoWait: If true, the queue declaration will not wait for a server response. - MaxPriority: The maximum priority level for the queue (0-255). Defaults to 10 if not specified.
Connection Setup: The function establishes a connection to the RabbitMQ server using the provided URL. It then declares a queue with the specified configuration parameters and binds it to the exchange.
Error Conditions: - Missing or empty URL, QueueName, ExchangeName, or RoutingKey will result in an error. - Connection failures (e.g., network issues, invalid URL) will return a wrapped error. - Invalid MaxPriority values (outside the range 0-255) may cause unexpected behavior.
func NewRabbitMQFromURL ¶
NewRabbitMQFromURL creates a new RabbitMQ queue with a simple URL
func (*RabbitMQ) IsConnectionOpen ¶
IsConnectionOpen checks if the RabbitMQ connection is open
func (*RabbitMQ) PurgeQueue ¶
PurgeQueue removes all messages from the queue
func (*RabbitMQ) Reconnect ¶
func (rq *RabbitMQ) Reconnect(config RabbitMQConfig) error
Reconnect attempts to reconnect to RabbitMQ
type RabbitMQConfig ¶
type RabbitMQConfig struct {
URL string
QueueName string
ExchangeName string
RoutingKey string
Durable bool
AutoDelete bool
Exclusive bool
NoWait bool
MaxPriority int // Maximum priority level for the queue (0-255)
}
RabbitMQConfig holds configuration for RabbitMQ queue
type RedisQueue ¶
type RedisQueue struct {
// contains filtered or unexported fields
}
RedisQueue implements a Redis-backed job queue
func NewRedisQueue ¶
func NewRedisQueue(client redis.Cmdable, keyPrefix string) *RedisQueue
NewRedisQueue creates a new Redis-backed queue.
Parameters: - client: A Redis client implementing the redis.Cmdable interface, used to interact with the Redis database. - keyPrefix: A string used as a prefix for all Redis keys created by the queue. If an empty string is provided, the default prefix "queue" is used.
Purpose: This function initializes a RedisQueue instance, which provides methods for enqueueing, dequeueing, and scheduling jobs in a Redis-backed job queue.
func (*RedisQueue) DeleteJob ¶
func (rq *RedisQueue) DeleteJob(ctx context.Context, id string) error
DeleteJob removes a job from the queue
func (*RedisQueue) Enqueue ¶
func (rq *RedisQueue) Enqueue(ctx context.Context, job *Job) error
Enqueue adds a job to the queue
func (*RedisQueue) GetStats ¶
func (rq *RedisQueue) GetStats(ctx context.Context) (*Stats, error)
GetStats returns queue statistics
func (*RedisQueue) MoveScheduledToPending ¶
func (rq *RedisQueue) MoveScheduledToPending(ctx context.Context) error
MoveScheduledToPending moves scheduled jobs that are ready to be processed
type RetryableError ¶
type RetryableError struct {
Err error
}
RetryableError represents an error that should trigger a retry
func NewRetryableError ¶
func NewRetryableError(err error) *RetryableError
NewRetryableError wraps an error to indicate that a job should be retried. When a job handler returns a RetryableError, the job manager will automatically retry the job according to the configured retry policy, instead of marking it as failed.
Example usage:
func myJobHandler(job *queue.Job) error {
if err := doSomething(); err != nil {
if isTemporaryError(err) {
return queue.NewRetryableError(err) // Will retry
}
return err // Will fail permanently
}
return nil
}
func (*RetryableError) Error ¶
func (e *RetryableError) Error() string
type Stats ¶
type Stats struct {
JobsProcessed int64 `json:"jobs_processed"`
JobsFailed int64 `json:"jobs_failed"`
JobsRetried int64 `json:"jobs_retried"`
QueueSize int64 `json:"queue_size"`
WorkersActive int64 `json:"workers_active"`
WorkersBusy int64 `json:"workers_busy"`
TotalJobs int64 `json:"total_jobs"`
Pending int64 `json:"pending"`
Running int64 `json:"running"`
Completed int64 `json:"completed"`
Failed int64 `json:"failed"`
Retrying int64 `json:"retrying"`
Scheduled int64 `json:"scheduled"`
DeadLetter int64 `json:"dead_letter"`
}
Stats represents queue statistics
type Status ¶
type Status int
Status represents the status of a job
const ( // StatusPending indicates the job is pending execution StatusPending Status = iota // StatusRunning indicates the job is currently being executed StatusRunning // StatusCompleted indicates the job has completed successfully StatusCompleted // StatusFailed indicates the job has failed StatusFailed // StatusRetrying indicates the job is scheduled for retry StatusRetrying // StatusScheduled indicates the job is scheduled for future execution StatusScheduled // StatusDeadLetter indicates the job has been moved to dead letter queue StatusDeadLetter )
type Task ¶
type Task struct {
ID string `json:"id"`
Name string `json:"name"`
Type TaskType `json:"type"`
CronSpec string `json:"cron_spec,omitempty"`
Interval time.Duration `json:"interval,omitempty"`
Function TaskFunc `json:"-"`
NextRun time.Time `json:"next_run"`
LastRun time.Time `json:"last_run"`
RunCount int64 `json:"run_count"`
ErrorCount int64 `json:"error_count"`
ConsecutiveFailures int64 `json:"consecutive_failures"`
LastError string `json:"last_error,omitempty"`
IsRunning bool `json:"is_running"`
Quiet bool `json:"log_on_first_failure_only"`
AllowConcurrent bool `json:"allow_concurrent"`
MaxRetries int `json:"max_retries"`
RetryDelay time.Duration `json:"retry_delay"`
Timeout time.Duration `json:"timeout"`
Enabled bool `json:"enabled"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
// contains filtered or unexported fields
}
Task represents a scheduled task
type TaskOptions ¶
type TaskOptions struct {
// MaxRetries specifies the maximum number of retries for a task default is 0 (no retries)
MaxRetries int
// RetryDelay specifies the delay between retries (default is 5 seconds)
RetryDelay time.Duration
// Timeout specifies the maximum duration for task execution (default is 5 minutes)
Timeout time.Duration
// Concurrent specifies whether the task can run concurrently (default is false)
Concurrent bool
// Immediately specifies whether the task should run immediately upon registration (default is false)
Immediately bool
// Quiet specifies whether to log only the first failure in a series of consecutive failures (default is false)
Quiet bool
}
TaskOptions provides configuration options for tasks
type TaskScheduler ¶
type TaskScheduler struct {
// contains filtered or unexported fields
}
TaskScheduler manages background tasks
func NewTaskScheduler ¶
func NewTaskScheduler() *TaskScheduler
NewTaskScheduler creates a new task scheduler with default settings
func (*TaskScheduler) DisableTask ¶
func (s *TaskScheduler) DisableTask(name string) error
DisableTask disables a task
func (*TaskScheduler) EnableTask ¶
func (s *TaskScheduler) EnableTask(name string) error
EnableTask enables a task
func (*TaskScheduler) GetTask ¶
func (s *TaskScheduler) GetTask(name string) (*Task, error)
GetTask returns a task by name
func (*TaskScheduler) GetTasks ¶
func (s *TaskScheduler) GetTasks() map[string]*Task
GetTasks returns all registered tasks
func (*TaskScheduler) IsRunning ¶
func (s *TaskScheduler) IsRunning() bool
IsRunning returns true if the scheduler is running
func (*TaskScheduler) ParseCronSpec ¶
func (s *TaskScheduler) ParseCronSpec(cronSpec string) (*CronExpression, error)
ParseCronSpec parses a cron specification
func (*TaskScheduler) RegisterCronTask ¶
func (s *TaskScheduler) RegisterCronTask(name, cronSpec string, fn TaskFunc) error
RegisterCronTask registers a new cron-based task
func (*TaskScheduler) RegisterCronTaskWithOptions ¶
func (s *TaskScheduler) RegisterCronTaskWithOptions(name, cronSpec string, fn TaskFunc, options TaskOptions) error
RegisterCronTaskWithOptions registers a new cron-based task with options
func (*TaskScheduler) RegisterIntervalTask ¶
func (s *TaskScheduler) RegisterIntervalTask(name string, interval time.Duration, fn TaskFunc) error
RegisterIntervalTask registers a new interval-based task
func (*TaskScheduler) RegisterIntervalTaskWithOptions ¶
func (s *TaskScheduler) RegisterIntervalTaskWithOptions(name string, interval time.Duration, fn TaskFunc, options TaskOptions) error
RegisterIntervalTaskWithOptions registers a new interval-based task with options
func (*TaskScheduler) RegisterOrRescheduleCronTask ¶
func (s *TaskScheduler) RegisterOrRescheduleCronTask(name, cronSpec string, fn TaskFunc) error
RegisterOrRescheduleCronTask registers a new cron-based task or reschedules an existing one
func (*TaskScheduler) RegisterOrRescheduleCronTaskWithOptions ¶
func (s *TaskScheduler) RegisterOrRescheduleCronTaskWithOptions(name, cronSpec string, fn TaskFunc, options TaskOptions) error
RegisterOrRescheduleCronTaskWithOptions registers a new cron-based task or reschedules an existing one with options
func (*TaskScheduler) RegisterOrRescheduleIntervalTask ¶
func (s *TaskScheduler) RegisterOrRescheduleIntervalTask(name string, interval time.Duration, fn TaskFunc) error
RegisterOrRescheduleIntervalTask registers a new interval-based task or reschedules an existing one
func (*TaskScheduler) RegisterOrRescheduleIntervalTaskWithOptions ¶
func (s *TaskScheduler) RegisterOrRescheduleIntervalTaskWithOptions(name string, interval time.Duration, fn TaskFunc, options TaskOptions) error
RegisterOrRescheduleIntervalTaskWithOptions registers a new interval-based task or reschedules an existing one with options
func (*TaskScheduler) RemoveTask ¶
func (s *TaskScheduler) RemoveTask(name string) error
RemoveTask removes a task from the scheduler
func (*TaskScheduler) RescheduleTaskWithCron ¶
func (s *TaskScheduler) RescheduleTaskWithCron(name, cronSpec string) error
RescheduleTaskWithCron reschedules an existing task with a new cron specification
func (*TaskScheduler) RescheduleTaskWithInterval ¶
func (s *TaskScheduler) RescheduleTaskWithInterval(name string, interval time.Duration) error
RescheduleTaskWithInterval reschedules an existing task with a new interval
func (*TaskScheduler) Start ¶
func (s *TaskScheduler) Start(ctx context.Context) error
Start starts the task scheduler
func (*TaskScheduler) ValidateCronSpec ¶
func (s *TaskScheduler) ValidateCronSpec(spec string) error
ValidateCronSpec validates a cron specification.
Purpose: This function checks whether the provided cron specification is valid. It supports both standard cron formats (5 or 6 fields) and predefined expressions.
Supported formats: - Standard cron expressions with 5 fields (minute, hour, day, month, day of week). - Extended cron expressions with 6 fields (second, minute, hour, day, month, day of week). - Predefined expressions such as "@yearly", "@daily", "@hourly", etc., which are mapped to standard cron strings.
Validation rules: - Ensures the cron syntax is correct. - Maps predefined expressions to their equivalent cron strings. - Delegates parsing and validation to the parseCronSpec function.
func (*TaskScheduler) WithCheckInterval ¶
func (s *TaskScheduler) WithCheckInterval(interval time.Duration) *TaskScheduler
WithCheckInterval sets the interval for checking scheduled tasks
func (*TaskScheduler) WithDefaultTimeout ¶
func (s *TaskScheduler) WithDefaultTimeout(timeout time.Duration) *TaskScheduler
WithDefaultTimeout sets the default timeout for task execution
func (*TaskScheduler) WithRetryDelay ¶
func (s *TaskScheduler) WithRetryDelay(delay time.Duration) *TaskScheduler
WithRetryDelay sets the delay between retries