queue

package
v2.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RegisterHandler

func RegisterHandler(typeName string, handler HandlerFunc)

RegisterHandler maps a type name to a handler function.

func ResetHandlers

func ResetHandlers()

ResetHandlers clears the registry. For testing only.

Types

type DatabaseDriver

type DatabaseDriver struct {
	// contains filtered or unexported fields
}

DatabaseDriver implements Driver using GORM.

func NewDatabaseDriver

func NewDatabaseDriver(db *gorm.DB, table, failedTable string) *DatabaseDriver

NewDatabaseDriver creates a database-backed queue driver.

func (*DatabaseDriver) Delete

func (d *DatabaseDriver) Delete(_ context.Context, job *Job) error

func (*DatabaseDriver) Fail

func (d *DatabaseDriver) Fail(_ context.Context, job *Job, jobErr error) error

func (*DatabaseDriver) Pop

func (d *DatabaseDriver) Pop(_ context.Context, queue string) (*Job, error)

func (*DatabaseDriver) Push

func (d *DatabaseDriver) Push(_ context.Context, job *Job) error

func (*DatabaseDriver) Release

func (d *DatabaseDriver) Release(_ context.Context, job *Job, delay time.Duration) error

func (*DatabaseDriver) Size

func (d *DatabaseDriver) Size(_ context.Context, queue string) (int64, error)

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher is the public API for dispatching jobs.

func NewDispatcher

func NewDispatcher(driver Driver) *Dispatcher

NewDispatcher creates a dispatcher with the given driver.

func (*Dispatcher) Dispatch

func (d *Dispatcher) Dispatch(ctx context.Context, queue, typeName string, payload interface{}) error

Dispatch pushes a job onto the named queue.

func (*Dispatcher) DispatchDelayed

func (d *Dispatcher) DispatchDelayed(ctx context.Context, queue, typeName string, payload interface{}, delay time.Duration) error

DispatchDelayed pushes a job onto the queue with a delay.

func (*Dispatcher) DispatchWithBackoff

func (d *Dispatcher) DispatchWithBackoff(ctx context.Context, queue, typeName string, payload interface{}, backoffSeconds []uint) error

DispatchWithBackoff pushes a job with per-attempt retry delays. backoffSeconds defines the delay (in seconds) before each retry attempt. For example, []uint{5, 30, 120} means: 5s before retry 1, 30s before retry 2, 120s before retry 3. MaxAttempts is set to len(backoffSeconds)+1.

func (*Dispatcher) Driver

func (d *Dispatcher) Driver() Driver

Driver returns the underlying driver.

type Driver

type Driver interface {
	// Push adds a job to the queue.
	Push(ctx context.Context, job *Job) error

	// Pop retrieves and reserves the next available job from the given queue.
	// Returns nil, nil if no job is available.
	Pop(ctx context.Context, queue string) (*Job, error)

	// Delete removes a completed job.
	Delete(ctx context.Context, job *Job) error

	// Release puts a reserved job back into the queue for retry.
	Release(ctx context.Context, job *Job, delay time.Duration) error

	// Fail moves a job to the failed jobs storage.
	Fail(ctx context.Context, job *Job, jobErr error) error

	// Size returns the number of pending jobs in a queue.
	Size(ctx context.Context, queue string) (int64, error)
}

Driver is the storage backend for the queue system.

type HandlerFunc

type HandlerFunc func(ctx context.Context, payload json.RawMessage) error

HandlerFunc processes a job. Receives the raw JSON payload.

func ResolveHandler

func ResolveHandler(typeName string) HandlerFunc

ResolveHandler returns the handler for a type name, or nil.

type Job

type Job struct {
	ID             uint64
	Queue          string
	Type           string
	Payload        json.RawMessage
	Attempts       uint
	MaxAttempts    uint
	BackoffSeconds []uint // per-attempt retry delays in seconds; falls back to WorkerConfig.RetryDelay
	AvailableAt    time.Time
	ReservedAt     *time.Time
	CreatedAt      time.Time
}

Job represents a unit of work to be processed by a worker.

type MemoryDriver

type MemoryDriver struct {
	// contains filtered or unexported fields
}

MemoryDriver implements Driver using in-process slices. No persistence — useful for testing and development.

func NewMemoryDriver

func NewMemoryDriver() *MemoryDriver

NewMemoryDriver creates an in-memory queue driver.

func (*MemoryDriver) Delete

func (d *MemoryDriver) Delete(_ context.Context, job *Job) error

func (*MemoryDriver) Fail

func (d *MemoryDriver) Fail(_ context.Context, job *Job, _ error) error

func (*MemoryDriver) Pop

func (d *MemoryDriver) Pop(_ context.Context, queue string) (*Job, error)

func (*MemoryDriver) Push

func (d *MemoryDriver) Push(_ context.Context, job *Job) error

func (*MemoryDriver) Release

func (d *MemoryDriver) Release(_ context.Context, job *Job, delay time.Duration) error

func (*MemoryDriver) Size

func (d *MemoryDriver) Size(_ context.Context, queue string) (int64, error)

type RedisDriver

type RedisDriver struct {
	// contains filtered or unexported fields
}

RedisDriver implements Driver using Redis lists.

func NewRedisDriver

func NewRedisDriver(client *redis.Client) *RedisDriver

NewRedisDriver creates a Redis-backed queue driver.

func (*RedisDriver) Delete

func (d *RedisDriver) Delete(_ context.Context, _ *Job) error

func (*RedisDriver) Fail

func (d *RedisDriver) Fail(ctx context.Context, job *Job, _ error) error

func (*RedisDriver) Pop

func (d *RedisDriver) Pop(ctx context.Context, queue string) (*Job, error)

func (*RedisDriver) Push

func (d *RedisDriver) Push(ctx context.Context, job *Job) error

func (*RedisDriver) Release

func (d *RedisDriver) Release(ctx context.Context, job *Job, delay time.Duration) error

func (*RedisDriver) Size

func (d *RedisDriver) Size(ctx context.Context, queue string) (int64, error)

type SyncDriver

type SyncDriver struct{}

SyncDriver executes jobs immediately when dispatched. No worker needed. Useful for local development and testing.

func NewSyncDriver

func NewSyncDriver() *SyncDriver

NewSyncDriver creates a synchronous queue driver.

func (*SyncDriver) Delete

func (d *SyncDriver) Delete(_ context.Context, _ *Job) error

func (*SyncDriver) Fail

func (d *SyncDriver) Fail(_ context.Context, _ *Job, _ error) error

func (*SyncDriver) Pop

func (d *SyncDriver) Pop(_ context.Context, _ string) (*Job, error)

func (*SyncDriver) Push

func (d *SyncDriver) Push(ctx context.Context, job *Job) error

func (*SyncDriver) Release

func (d *SyncDriver) Release(_ context.Context, _ *Job, _ time.Duration) error

func (*SyncDriver) Size

func (d *SyncDriver) Size(_ context.Context, _ string) (int64, error)

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker manages a pool of goroutines that process jobs.

func NewWorker

func NewWorker(driver Driver, config WorkerConfig, log *slog.Logger) *Worker

NewWorker creates a worker with the given driver and config.

func (*Worker) Run

func (w *Worker) Run(ctx context.Context) error

Run starts the worker pool and blocks until ctx is cancelled.

type WorkerConfig

type WorkerConfig struct {
	Queues       []string
	Concurrency  int
	PollInterval time.Duration
	MaxAttempts  uint
	RetryDelay   time.Duration
	Timeout      time.Duration
}

WorkerConfig configures the worker pool.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL