Documentation
¶
Index ¶
- Constants
- func ForRoot(opt *Options) core.Modules
- func ForRootFactory(factory func(ref core.RefProvider) *Options) core.Modules
- func Min(a int, b int) int
- func Register(name string, opts ...*Options) core.Modules
- type AddJobOptions
- type Callback
- type Job
- type JobFnc
- type JobStatus
- type Logger
- type LoggerType
- type Options
- type Processor
- type Queue
- func (q *Queue) AddJob(opt AddJobOptions)
- func (q *Queue) BulkAddJob(options []AddJobOptions)
- func (q *Queue) ClearFailedJobs() error
- func (q *Queue) CountJobs(status JobStatus) int
- func (q *Queue) GetFailedJob(jobId string) (string, error)
- func (q *Queue) GetFailedJobs() ([]Job, error)
- func (q *Queue) IsLimit() bool
- func (q *Queue) MarkJobFailedTimeout(numberJobs []*Job)
- func (q *Queue) Pause()
- func (q *Queue) Process(jobFnc JobFnc)
- func (q *Queue) Remove(key string)
- func (q *Queue) RemoveCompleted()
- func (q *Queue) RemoveFailed()
- func (q *Queue) Resume()
- func (q *Queue) Retry()
- func (q *Queue) Run()
- type RateLimiter
Constants ¶
const QUEUE core.Provide = "QUEUE"
Variables ¶
This section is empty.
Functions ¶
func ForRootFactory ¶ added in v2.1.0
func ForRootFactory(factory func(ref core.RefProvider) *Options) core.Modules
Types ¶
type AddJobOptions ¶
type Job ¶
type Job struct {
Id string
Data interface{}
Priority int
Status JobStatus
ProcessedOn time.Time
FinishedOn time.Time
Stacktrace []string
FailedReason string
RetryFailures int
// contains filtered or unexported fields
}
func (*Job) HandlerError ¶ added in v2.1.1
func (*Job) IsFinished ¶
IsFinished returns true if the job has finished, either successfully or with an error.
type LoggerType ¶ added in v2.0.1
type LoggerType string
const ( LoggerDefault LoggerType = "default" LoggerInfo LoggerType = "info" LoggerWarn LoggerType = "warn" LoggerError LoggerType = "error" LoggerFatal LoggerType = "fatal" LoggerDisabled LoggerType = "disabled" )
type Processor ¶ added in v2.1.0
type Processor struct {
core.DynamicProvider
// contains filtered or unexported fields
}
type Queue ¶
func Inject ¶
func Inject(module core.RefProvider, name string) *Queue
InjectQueue injects a queue from the given module, using the given name. If the module does not contain a queue with the given name, or if the queue is not of type *Queue, InjectQueue returns nil.
func New ¶
New creates a new queue with the given name and options. The name is used to identify the queue in Redis, and the options are used to configure the queue behavior. The options are as follows:
- Connect: the Redis connection options - Workers: the number of workers to run concurrently - RetryFailures: the number of times to retry a failed job - Limiter: the rate limiter options - Pattern: the cron pattern to use for scheduling jobs
The returned queue is ready to use.
func (*Queue) AddJob ¶
func (q *Queue) AddJob(opt AddJobOptions)
AddJob adds a new job to the queue. If the queue is currently rate limited, the job is delayed. Otherwise, the job is added to the waiting list and the queue is run.
func (*Queue) BulkAddJob ¶
func (q *Queue) BulkAddJob(options []AddJobOptions)
BulkAddJob adds multiple jobs to the queue at once. If the queue is currently rate limited, the jobs are delayed. Otherwise, the jobs are added to the waiting list and the queue is run.
func (*Queue) ClearFailedJobs ¶ added in v2.1.2
ClearFailedJobs removes all failed job records from Redis for this queue. Returns an error if the Redis operation fails.
func (*Queue) CountJobs ¶
CountJobs returns the number of jobs in the queue that have the given status.
This can be used to monitor the queue, and to test the queue's behavior.
func (*Queue) GetFailedJob ¶ added in v2.1.2
GetFailedJob retrieves the failure reason for a specific job by its ID. Returns the failure reason string or an error if the job is not found or if the Redis operation fails.
func (*Queue) GetFailedJobs ¶ added in v2.1.2
GetFailedJobs retrieves all failed jobs stored in Redis for this queue. It returns a slice of Job with Id, FailedReason, and Status populated. Other fields (Data, Priority, etc.) are not available as only the failure reason is stored in Redis. Returns an error if the Redis operation fails.
func (*Queue) IsLimit ¶
IsLimit returns true if the number of jobs in the queue has reached the maximum value set in the RateLimiter. It checks the current value of the counter in Redis and returns true if it is greater than or equal to the maximum value. If the counter does not exist or is less than the maximum, it increments the counter and returns false. If the increment fails, it panics.
func (*Queue) MarkJobFailedTimeout ¶ added in v2.1.0
func (*Queue) Pause ¶
func (q *Queue) Pause()
Pause stops the queue from running. When paused, the queue will not accept new jobs and will not run any jobs in the queue. It will resume when Resume is called.
func (*Queue) Process ¶
Process sets the callback for the queue to process jobs. If the queue has a scheduler, it will be started with the given cron pattern. Otherwise, the callback is simply stored.
func (*Queue) Remove ¶
Remove removes the job with the given key from the queue. It uses a linear search, so it has a time complexity of O(n), where n is the number of jobs in the queue.
func (*Queue) RemoveCompleted ¶ added in v2.0.1
func (q *Queue) RemoveCompleted()
func (*Queue) RemoveFailed ¶ added in v2.0.1
func (q *Queue) RemoveFailed()