queue

package module
v2.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2025 License: MIT Imports: 15 Imported by: 6

README

Queue Module for Tinh Tinh

Tinh Tinh Logo

Overview

The Queue module provides a robust, Redis-based job queue for the Tinh Tinh framework, supporting job scheduling, rate limiting, retries, concurrency, delayed jobs, priorities, and more.

Install

go get -u github.com/tinh-tinh/queue/v2

Features

  • Redis-Based: Robust persistence and distributed processing.
  • Delayed Jobs: Schedule jobs to run after a delay.
  • Cron Scheduling: Schedule and repeat jobs using cron patterns.
  • Rate Limiting: Control job processing rate.
  • Retries: Automatic retry on failure.
  • Priority: Job prioritization.
  • Concurrency: Multiple workers per queue.
  • Pause/Resume: Temporarily stop and resume job processing.
  • Crash Recovery: Recovers jobs after process crashes.
  • Remove on Complete/Fail: Clean up jobs after handling.

Quick Start

1. Register the Module
import "github.com/tinh-tinh/queue/v2"

queueModule := queue.ForRoot(&queue.Options{
    Connect: &redis.Options{
        Addr: "localhost:6379",
        DB:   0,
    },
    Workers: 3,
    RetryFailures: 3,
})

Or via factory:

queueModule := queue.ForRootFactory(func(ref core.RefProvider) *queue.Options {
    return &queue.Options{ /* ... */ }
})
2. Register and Inject Queues
userQueueModule := queue.Register("user") // uses default/global options

// In your service or controller:
userQueue := queue.Inject(module, "user")

Contributing

We welcome contributions! Please feel free to submit a Pull Request.

Support

If you encounter any issues or need help, you can:

  • Open an issue in the GitHub repository
  • Check our documentation
  • Join our community discussions

Documentation

Index

Constants

View Source
const QUEUE core.Provide = "QUEUE"

Variables

This section is empty.

Functions

func ForRoot added in v2.0.1

func ForRoot(opt *Options) core.Modules

func ForRootFactory added in v2.1.0

func ForRootFactory(factory func(ref core.RefProvider) *Options) core.Modules

func Min

func Min(a int, b int) int

func Register

func Register(name string, opts ...*Options) core.Modules

Register registers a new queue module with the given name and options. The registered module creates a new queue with the given name and options, and exports the queue under the name "<name>Queue".

Types

type AddJobOptions

type AddJobOptions struct {
	Id       string
	Data     interface{}
	Priority int
}

type Callback

type Callback func() error

type Job

type Job struct {
	Id       string
	Data     interface{}
	Priority int
	Status   JobStatus

	ProcessedOn   time.Time
	FinishedOn    time.Time
	Stacktrace    []string
	FailedReason  string
	RetryFailures int
	// contains filtered or unexported fields
}

func (*Job) HandlerError added in v2.1.1

func (job *Job) HandlerError(reasonError string)

func (*Job) IsFinished

func (job *Job) IsFinished() bool

IsFinished returns true if the job has finished, either successfully or with an error.

func (*Job) IsReady

func (job *Job) IsReady() bool

IsReady returns true if the job is ready to be processed. If the job uses a scheduler, it will always be ready. Otherwise, the job is ready if it is waiting or active.

func (*Job) Process

func (job *Job) Process(cb Callback)

Process runs the given callback and updates the job's status accordingly. It also measures and logs the execution time. If the callback returns an error, the job is either retried or marked as failed.

type JobFnc

type JobFnc func(job *Job)

type JobStatus

type JobStatus string
const (
	WaitStatus      JobStatus = "wait"
	DelayedStatus   JobStatus = "delayed"
	ActiveStatus    JobStatus = "active"
	CompletedStatus JobStatus = "completed"
	FailedStatus    JobStatus = "failed"
)

type Logger added in v2.1.1

type Logger interface {
	Infof(msg string, metadata ...any)
	Warnf(msg string, metadata ...any)
	Errorf(msg string, metadata ...any)
	Fatalf(msg string, metadata ...any)
}

type LoggerType added in v2.0.1

type LoggerType string
const (
	LoggerDefault  LoggerType = "default"
	LoggerInfo     LoggerType = "info"
	LoggerWarn     LoggerType = "warn"
	LoggerError    LoggerType = "error"
	LoggerFatal    LoggerType = "fatal"
	LoggerDisabled LoggerType = "disabled"
)

type Options

type Options struct {
	Connect          *redis.Options
	Workers          int
	RetryFailures    int
	Limiter          *RateLimiter
	Pattern          string
	Logger           Logger
	DisableLog       bool
	RemoveOnComplete bool
	RemoveOnFail     bool
	Delay            time.Duration
	Timeout          time.Duration // Default: 1 minutes
	Prefix           string
}

type Processor added in v2.1.0

type Processor struct {
	core.DynamicProvider
	// contains filtered or unexported fields
}

func NewProcessor added in v2.1.0

func NewProcessor(name string, module core.Module) *Processor

func (*Processor) Process added in v2.1.0

func (p *Processor) Process(jobFnc JobFnc)

type Queue

type Queue struct {
	Name string

	Logger Logger
	// contains filtered or unexported fields
}

func Inject

func Inject(module core.RefProvider, name string) *Queue

InjectQueue injects a queue from the given module, using the given name. If the module does not contain a queue with the given name, or if the queue is not of type *Queue, InjectQueue returns nil.

func New

func New(name string, opt *Options) *Queue

New creates a new queue with the given name and options. The name is used to identify the queue in Redis, and the options are used to configure the queue behavior. The options are as follows:

- Connect: the Redis connection options - Workers: the number of workers to run concurrently - RetryFailures: the number of times to retry a failed job - Limiter: the rate limiter options - Pattern: the cron pattern to use for scheduling jobs

The returned queue is ready to use.

func (*Queue) AddJob

func (q *Queue) AddJob(opt AddJobOptions)

AddJob adds a new job to the queue. If the queue is currently rate limited, the job is delayed. Otherwise, the job is added to the waiting list and the queue is run.

func (*Queue) BulkAddJob

func (q *Queue) BulkAddJob(options []AddJobOptions)

BulkAddJob adds multiple jobs to the queue at once. If the queue is currently rate limited, the jobs are delayed. Otherwise, the jobs are added to the waiting list and the queue is run.

func (*Queue) CountJobs

func (q *Queue) CountJobs(status JobStatus) int

CountJobs returns the number of jobs in the queue that have the given status.

This can be used to monitor the queue, and to test the queue's behavior.

func (*Queue) IsLimit

func (q *Queue) IsLimit() bool

IsLimit returns true if the number of jobs in the queue has reached the maximum value set in the RateLimiter. It checks the current value of the counter in Redis and returns true if it is greater than or equal to the maximum value. If the counter does not exist or is less than the maximum, it increments the counter and returns false. If the increment fails, it panics.

func (*Queue) MarkJobFailedTimeout added in v2.1.0

func (q *Queue) MarkJobFailedTimeout(numberJobs []*Job)

func (*Queue) Pause

func (q *Queue) Pause()

Pause stops the queue from running. When paused, the queue will not accept new jobs and will not run any jobs in the queue. It will resume when Resume is called.

func (*Queue) Process

func (q *Queue) Process(jobFnc JobFnc)

Process sets the callback for the queue to process jobs. If the queue has a scheduler, it will be started with the given cron pattern. Otherwise, the callback is simply stored.

func (*Queue) Remove

func (q *Queue) Remove(key string)

Remove removes the job with the given key from the queue. It uses a linear search, so it has a time complexity of O(n), where n is the number of jobs in the queue.

func (*Queue) RemoveCompleted added in v2.0.1

func (q *Queue) RemoveCompleted()

func (*Queue) RemoveFailed added in v2.0.1

func (q *Queue) RemoveFailed()

func (*Queue) Resume

func (q *Queue) Resume()

Resume resumes the queue from a paused state. When resumed, the queue will accept new jobs and run any jobs in the queue.

func (*Queue) Retry

func (q *Queue) Retry()

func (*Queue) Run

func (q *Queue) Run()

Run runs all ready jobs in the queue. It locks the mutex, runs all ready jobs in parallel, and then unlocks the mutex. If the queue has a scheduler, it will be started with the given cron pattern. Otherwise, the callback is simply stored.

type RateLimiter

type RateLimiter struct {
	Max      int
	Duration time.Duration
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL