Documentation
¶
Index ¶
- type Configuration
- type Job
- type Queue
- func (q *Queue) Close(mWG *sync.WaitGroup)
- func (q *Queue) Depth() (int, error)
- func (q *Queue) Dispatch(job Job) (string, error)
- func (q *Queue) DispatchIn(job Job, delay time.Duration) (string, error)
- func (q *Queue) GetCompletedJobs() (*[]Job, error)
- func (q *Queue) GetFailedJobs() (*[]Job, error)
- func (q *Queue) GetFailedJobsFromMemory() []string
- func (q *Queue) Listen()
- func (q *Queue) ReaperStats() ReaperStats
- func (q *Queue) RedisKeyspaceFormater(keyspace string) string
- func (q *Queue) RegisterHandler(name string, fn func(payload interface{}) error) error
- func (q *Queue) RegisterHandlerCtx(name string, fn func(ctx context.Context, payload interface{}) error) error
- func (q *Queue) UnmarshalPayload(cachedJob []byte) (*Job, error)
- type ReaperStats
- type Redis
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Configuration ¶
type Configuration struct {
Backend string `yaml:"Backend"` // "memory" or "redis" — default "memory"
WorkerCount int `yaml:"WorkerCount"` // number of worker goroutines — default 1
MaxAttempts int `yaml:"MaxAttempts"` // total handler executions before permanent failure, counting the initial attempt — default 3 (initial + 2 retries)
HighWaterMark int `yaml:"HighWaterMark"` // in-memory failed-jobs cache size before reset — default 10000
QueueChannels []string `yaml:"QueueChannels"` // named queue channels
QueueChannelDefault string `yaml:"QueueChannelDefault"` // default channel when Job.Queue is empty
Debug bool `yaml:"Debug"` // verbose logging
RedisPrefix string `yaml:"RedisPrefix"` // redis keyspace prefix
RedisScanInterval int `yaml:"RedisScanInterval"` // seconds between redis SCAN iterations — default 1
LockTimeout int `yaml:"LockTimeout"` // seconds; queue-wide default when Job.LockFor is 0
ReaperInterval int `yaml:"ReaperInterval"` // seconds; default 30
}
Configuration holds the queue server configuration loaded from queue.yml.
Example (queue.yml):
Backend: memory WorkerCount: 1 MaxAttempts: 3 HighWaterMark: 10000 QueueChannels: - job QueueChannelDefault: job Debug: false
type Job ¶
type Job struct {
ID string `db:"job_id" json:"id"`
Handler func(payload interface{}) error `json:"-" redis:"-"`
Name string `db:"name" json:"name"`
Payload []byte `db:"payload,type=TEXT"`
Retry bool `json:"-"`
RetryInSeconds int `json:"-"`
RetryCounter int `db:"attempts" json:"retryCounter"`
ReservedAt string `db:"-" json:"reservedAt"`
RetryAfter string `json:"-"`
// DispatchAt schedules the earliest moment the job may run, in RFC3339
// UTC format. A zero/empty value preserves immediate-dispatch behavior.
// Honored by both the memory and redis backends; the redis scheduler
// reuses the RetryAfter gate, the memory backend defers via goroutine.
DispatchAt string `json:"dispatchAt,omitempty"`
CompletedAt string `json:"completedAt"`
FailedAt string `json:"failedAt"`
LockedAt string `json:"lockedAt,omitempty"`
LockFor int `json:"lockFor,omitempty"`
Queue string `json:"queue"`
CreatedAt time.Time `db:"created_at" redis:"-" json:"-"`
UpdatedAt time.Time `db:"updated_at" redis:"-" json:"-"`
Exception string `db:"exception" json:"exception"`
Status string `db:"-" json:"status"`
}
Job is the representation of a unit of work added to the queue. The payload is marshaled and passed between the main application and the queue.
type Queue ¶
type Queue struct {
Backend string
Cache cache.Cache
Debug bool
FailedJobs struct {
// contains filtered or unexported fields
}
Jobs chan Job
Redis Redis
WorkerCount int
MaxAttempts int
HighWaterMark int
ErrorLog *log.Logger
InfoLog *log.Logger
QueueChannels []string
QueueChannelDefault string
DB up.Session
LockTimeout int
ReaperInterval int
// contains filtered or unexported fields
}
Queue is used to hold the jobs that are being processed by the workers.
func New ¶
New creates a new Queue instance with the given Adele application. It loads configuration from config/queue.yml (or the embedded default if no on-disk copy exists), applies sensible defaults, and wires the framework's loggers, cache, and database session onto the queue.
Example:
q, err := api.New(app)
if err != nil {
return err
}
q.Listen()
func NewWithConfig ¶
func NewWithConfig(a *adele.Adele, config Configuration) (*Queue, error)
NewWithConfig creates a new Queue instance with a caller-supplied configuration. Used primarily for testing or programmatic overrides.
Example:
cfg := api.Configuration{Backend: "memory", WorkerCount: 2}
q, err := api.NewWithConfig(app, cfg)
if err != nil {
return err
}
func (*Queue) Close ¶
Close closes the queue's job channel so that no more jobs can be added to it and waits for workers to complete before returning. When called with a non-nil waitgroup (redis backend shutdown case) it signals Done on behalf of the caller.
Close also cancels the queue lifecycle context, which is the context passed to handlers registered via RegisterHandlerCtx. Context-aware handlers that honor ctx.Done() can therefore unblock immediately on shutdown instead of running to completion against a torn-down system.
func (*Queue) Depth ¶
Depth returns the total number of jobs in pending state across every configured QueueChannel. Used by consumers that want to gate dispatch on queue backpressure (return 503 + Retry-After when the worker pool is over a high-water mark).
For the memory backend, returns the count of jobs that have been Dispatched but not yet fully processed by a worker (incremented on Dispatch, decremented after the handler returns). The Jobs channel is unbuffered, so len(q.Jobs) cannot be used as a backpressure signal — the queue maintains an atomic counter instead.
For the redis backend, SCANs queues:<channel>:pending:* across every channel in q.QueueChannels. The SCAN is cursor-based and bounded; on a healthy queue the call returns in single-digit milliseconds.
Returns an error if the redis pool is unavailable or the SCAN itself errors. Safe to call concurrently with Dispatch and Listen.
Note: the redis match pattern is built from the literal "queues:" prefix used by formatPending. q.Redis.Prefix is not currently woven into the pending-key format strings; if that wiring changes in the future, this method must follow.
func (*Queue) Dispatch ¶
Dispatch adds a job to the queue and returns the id of the job.
When Job.DispatchAt is a non-empty RFC3339 timestamp, dispatch is deferred until that moment: the redis backend seeds RetryAfter so the scanner gates the job, and the memory backend dispatches via a detached goroutine that sleeps until the target time. An unparseable DispatchAt returns an error.
func (*Queue) DispatchIn ¶
DispatchIn schedules a job to run no sooner than `delay` from now. Equivalent to setting Job.DispatchAt = now+delay (RFC3339) and calling Dispatch. A non-positive delay falls through to immediate dispatch.
func (*Queue) GetCompletedJobs ¶
GetCompletedJobs returns every completed job persisted to storage, ordered by ascending id.
func (*Queue) GetFailedJobs ¶
GetFailedJobs returns every failed job persisted to storage, ordered by ascending id.
func (*Queue) GetFailedJobsFromMemory ¶
GetFailedJobsFromMemory returns the ids of failed jobs currently held in the in-memory failure cache.
func (*Queue) Listen ¶
func (q *Queue) Listen()
Listen starts the configured number of worker goroutines, each of which processes jobs until the queue is closed.
func (*Queue) ReaperStats ¶
func (q *Queue) ReaperStats() ReaperStats
func (*Queue) RedisKeyspaceFormater ¶
RedisKeyspaceFormater normalizes a channel name for use in a redis keyspace.
func (*Queue) RegisterHandler ¶
RegisterHandler binds a handler function to a job name for the in-process dispatch registry used by the redis backend. Consumers call this at application bootstrap, once per unique Job.Name they will dispatch.
The memory backend does not use the registry for jobs that ship a Job.Handler inline; for those, registration is only required when the redis backend is active. Registering defensively in both cases is safe and recommended. When a name is registered, the registry takes precedence over Job.Handler in the memory dispatch path as well.
Returns an error on empty name, nil fn, or duplicate registration. A name previously registered via RegisterHandlerCtx is also rejected so the registry remains a single source of truth (one handler per name).
Example:
q.RegisterHandler("hello", helloHandler)
func (*Queue) RegisterHandlerCtx ¶
func (q *Queue) RegisterHandlerCtx(name string, fn func(ctx context.Context, payload interface{}) error) error
RegisterHandlerCtx is the context-aware variant of RegisterHandler. The handler receives a context that is cancelled when the queue is shutting down (Close called), so handlers can short-circuit long downstream calls instead of running to completion against a torn-down system.
Per-job timeouts can be applied via context.WithTimeout inside the handler.
Both RegisterHandler and RegisterHandlerCtx may coexist on the same Queue; each name registers exactly one handler regardless of which method was used.