api

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2026 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Configuration

type Configuration struct {
	Backend             string   `yaml:"Backend"`             // "memory" or "redis" — default "memory"
	WorkerCount         int      `yaml:"WorkerCount"`         // number of worker goroutines — default 1
	MaxAttempts         int      `yaml:"MaxAttempts"`         // total handler executions before permanent failure, counting the initial attempt — default 3 (initial + 2 retries)
	HighWaterMark       int      `yaml:"HighWaterMark"`       // in-memory failed-jobs cache size before reset — default 10000
	QueueChannels       []string `yaml:"QueueChannels"`       // named queue channels
	QueueChannelDefault string   `yaml:"QueueChannelDefault"` // default channel when Job.Queue is empty
	Debug               bool     `yaml:"Debug"`               // verbose logging
	RedisPrefix         string   `yaml:"RedisPrefix"`         // redis keyspace prefix
	RedisScanInterval   int      `yaml:"RedisScanInterval"`   // seconds between redis SCAN iterations — default 1
	LockTimeout         int      `yaml:"LockTimeout"`         // seconds; queue-wide default when Job.LockFor is 0
	ReaperInterval      int      `yaml:"ReaperInterval"`      // seconds; default 30
}

Configuration holds the queue server configuration loaded from queue.yml.

Example (queue.yml):

Backend: memory
WorkerCount: 1
MaxAttempts: 3
HighWaterMark: 10000
QueueChannels:
  - job
QueueChannelDefault: job
Debug: false

type Job

type Job struct {
	ID             string                          `db:"job_id" json:"id"`
	Handler        func(payload interface{}) error `json:"-" redis:"-"`
	Name           string                          `db:"name" json:"name"`
	Payload        []byte                          `db:"payload,type=TEXT"`
	Retry          bool                            `json:"-"`
	RetryInSeconds int                             `json:"-"`
	RetryCounter   int                             `db:"attempts" json:"retryCounter"`
	ReservedAt     string                          `db:"-" json:"reservedAt"`
	RetryAfter     string                          `json:"-"`
	// DispatchAt schedules the earliest moment the job may run, in RFC3339
	// UTC format. A zero/empty value preserves immediate-dispatch behavior.
	// Honored by both the memory and redis backends; the redis scheduler
	// reuses the RetryAfter gate, the memory backend defers via goroutine.
	DispatchAt  string    `json:"dispatchAt,omitempty"`
	CompletedAt string    `json:"completedAt"`
	FailedAt    string    `json:"failedAt"`
	LockedAt    string    `json:"lockedAt,omitempty"`
	LockFor     int       `json:"lockFor,omitempty"`
	Queue       string    `json:"queue"`
	CreatedAt   time.Time `db:"created_at" redis:"-" json:"-"`
	UpdatedAt   time.Time `db:"updated_at" redis:"-" json:"-"`
	Exception   string    `db:"exception" json:"exception"`
	Status      string    `db:"-" json:"status"`
}

Job is the representation of a unit of work added to the queue. The payload is marshaled and passed between the main application and the queue.

type Queue

type Queue struct {
	Backend    string
	Cache      cache.Cache
	Debug      bool
	FailedJobs struct {
		// contains filtered or unexported fields
	}
	Jobs                chan Job
	Redis               Redis
	WorkerCount         int
	MaxAttempts         int
	HighWaterMark       int
	ErrorLog            *log.Logger
	InfoLog             *log.Logger
	QueueChannels       []string
	QueueChannelDefault string
	DB                  up.Session
	LockTimeout         int
	ReaperInterval      int
	// contains filtered or unexported fields
}

Queue is used to hold the jobs that are being processed by the workers.

func New

func New(a *adele.Adele) (*Queue, error)

New creates a new Queue instance with the given Adele application. It loads configuration from config/queue.yml (or the embedded default if no on-disk copy exists), applies sensible defaults, and wires the framework's loggers, cache, and database session onto the queue.

Example:

q, err := api.New(app)
if err != nil {
    return err
}
q.Listen()

func NewWithConfig

func NewWithConfig(a *adele.Adele, config Configuration) (*Queue, error)

NewWithConfig creates a new Queue instance with a caller-supplied configuration. Used primarily for testing or programmatic overrides.

Example:

cfg := api.Configuration{Backend: "memory", WorkerCount: 2}
q, err := api.NewWithConfig(app, cfg)
if err != nil {
    return err
}

func (*Queue) Close

func (q *Queue) Close(mWG *sync.WaitGroup)

Close closes the queue's job channel so that no more jobs can be added to it and waits for workers to complete before returning. When called with a non-nil waitgroup (redis backend shutdown case) it signals Done on behalf of the caller.

Close also cancels the queue lifecycle context, which is the context passed to handlers registered via RegisterHandlerCtx. Context-aware handlers that honor ctx.Done() can therefore unblock immediately on shutdown instead of running to completion against a torn-down system.

func (*Queue) Depth

func (q *Queue) Depth() (int, error)

Depth returns the total number of jobs in pending state across every configured QueueChannel. Used by consumers that want to gate dispatch on queue backpressure (return 503 + Retry-After when the worker pool is over a high-water mark).

For the memory backend, returns the count of jobs that have been Dispatched but not yet fully processed by a worker (incremented on Dispatch, decremented after the handler returns). The Jobs channel is unbuffered, so len(q.Jobs) cannot be used as a backpressure signal — the queue maintains an atomic counter instead.

For the redis backend, SCANs queues:<channel>:pending:* across every channel in q.QueueChannels. The SCAN is cursor-based and bounded; on a healthy queue the call returns in single-digit milliseconds.

Returns an error if the redis pool is unavailable or the SCAN itself errors. Safe to call concurrently with Dispatch and Listen.

Note: the redis match pattern is built from the literal "queues:" prefix used by formatPending. q.Redis.Prefix is not currently woven into the pending-key format strings; if that wiring changes in the future, this method must follow.

func (*Queue) Dispatch

func (q *Queue) Dispatch(job Job) (string, error)

Dispatch adds a job to the queue and returns the id of the job.

When Job.DispatchAt is a non-empty RFC3339 timestamp, dispatch is deferred until that moment: the redis backend seeds RetryAfter so the scanner gates the job, and the memory backend dispatches via a detached goroutine that sleeps until the target time. An unparseable DispatchAt returns an error.

func (*Queue) DispatchIn

func (q *Queue) DispatchIn(job Job, delay time.Duration) (string, error)

DispatchIn schedules a job to run no sooner than `delay` from now. Equivalent to setting Job.DispatchAt = now+delay (RFC3339) and calling Dispatch. A non-positive delay falls through to immediate dispatch.

func (*Queue) GetCompletedJobs

func (q *Queue) GetCompletedJobs() (*[]Job, error)

GetCompletedJobs returns every completed job persisted to storage, ordered by ascending id.

func (*Queue) GetFailedJobs

func (q *Queue) GetFailedJobs() (*[]Job, error)

GetFailedJobs returns every failed job persisted to storage, ordered by ascending id.

func (*Queue) GetFailedJobsFromMemory

func (q *Queue) GetFailedJobsFromMemory() []string

GetFailedJobsFromMemory returns the ids of failed jobs currently held in the in-memory failure cache.

func (*Queue) Listen

func (q *Queue) Listen()

Listen starts the configured number of worker goroutines, each of which processes jobs until the queue is closed.

func (*Queue) ReaperStats

func (q *Queue) ReaperStats() ReaperStats

func (*Queue) RedisKeyspaceFormater

func (q *Queue) RedisKeyspaceFormater(keyspace string) string

RedisKeyspaceFormater normalizes a channel name for use in a redis keyspace.

func (*Queue) RegisterHandler

func (q *Queue) RegisterHandler(name string, fn func(payload interface{}) error) error

RegisterHandler binds a handler function to a job name for the in-process dispatch registry used by the redis backend. Consumers call this at application bootstrap, once per unique Job.Name they will dispatch.

The memory backend does not use the registry for jobs that ship a Job.Handler inline; for those, registration is only required when the redis backend is active. Registering defensively in both cases is safe and recommended. When a name is registered, the registry takes precedence over Job.Handler in the memory dispatch path as well.

Returns an error on empty name, nil fn, or duplicate registration. A name previously registered via RegisterHandlerCtx is also rejected so the registry remains a single source of truth (one handler per name).

Example:

q.RegisterHandler("hello", helloHandler)

func (*Queue) RegisterHandlerCtx

func (q *Queue) RegisterHandlerCtx(name string, fn func(ctx context.Context, payload interface{}) error) error

RegisterHandlerCtx is the context-aware variant of RegisterHandler. The handler receives a context that is cancelled when the queue is shutting down (Close called), so handlers can short-circuit long downstream calls instead of running to completion against a torn-down system.

Per-job timeouts can be applied via context.WithTimeout inside the handler.

Both RegisterHandler and RegisterHandlerCtx may coexist on the same Queue; each name registers exactly one handler regardless of which method was used.

func (*Queue) UnmarshalPayload

func (q *Queue) UnmarshalPayload(cachedJob []byte) (*Job, error)

UnmarshalPayload unmarshals a job from redis into a Job value suitable for processing by the adele RPC client.

type ReaperStats

type ReaperStats struct {
	Ticks              uint64
	ScannedKeys        uint64
	Requeued           uint64
	Permafailed        uint64
	SkippedRecent      uint64
	SkippedDupePending uint64
	Errors             uint64
}

ReaperStats returns a snapshot of reaper counters. Intended for tests, operator debugging, and metrics export.

type Redis

type Redis struct {
	Pool         *redis.Pool
	Prefix       string
	ScanInterval int
}

Redis holds the redis connection pool and keyspace-scan settings used by the redis backend.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL