queue

package
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2026 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Clear

func Clear(queue string) error

Clear removes all jobs from the queue

func Later

func Later(delay time.Duration, job Job, queue ...string) error

Later adds a delayed job to the queue

func Push

func Push(job Job, queue ...string) error

Push adds a job to the default or specified queue

func Register

func Register(jobType string, handler func([]byte) (Job, error))

Register registers a job type for deserialization

func Reinitialize

func Reinitialize() error

Reinitialize reinitializes the queue driver from environment

func SetDefault

func SetDefault(d Driver)

SetDefault sets the global queue instance

func SetEventDispatcher added in v0.8.0

func SetEventDispatcher(fn func(event interface{}) error)

SetEventDispatcher sets the function used to dispatch events. This is called by the events package to wire up event dispatching.

func Size

func Size(queue string) (int64, error)

Size returns the number of jobs in the queue

Types

type DatabaseDriver

type DatabaseDriver struct {
	// contains filtered or unexported fields
}

DatabaseDriver implements the Driver interface using database

func (*DatabaseDriver) Clear

func (d *DatabaseDriver) Clear(queueName string) error

Clear removes all jobs from a queue

func (*DatabaseDriver) Failed

func (d *DatabaseDriver) Failed(job Job, err error, queueName string) error

Failed marks a job as failed

func (*DatabaseDriver) GetDelayedJobs

func (d *DatabaseDriver) GetDelayedJobs(queueName string) (int64, error)

GetDelayedJobs returns the number of delayed jobs

func (*DatabaseDriver) Pop

func (d *DatabaseDriver) Pop(queueName string) (Job, error)

Pop retrieves and removes a job from the queue

func (*DatabaseDriver) ProcessDelayedJobs

func (d *DatabaseDriver) ProcessDelayedJobs(queueName string) error

ProcessDelayedJobs moves ready delayed jobs to the main queue

func (*DatabaseDriver) Push

func (d *DatabaseDriver) Push(job Job, queueName ...string) error

Push adds a job to the queue

func (*DatabaseDriver) PushDelayed

func (d *DatabaseDriver) PushDelayed(job Job, delay time.Duration, queueName ...string) error

PushDelayed adds a delayed job to the queue

func (*DatabaseDriver) Size

func (d *DatabaseDriver) Size(queueName string) (int64, error)

Size returns the number of jobs in the queue

type Driver

type Driver interface {
	// Push adds a job to the queue
	Push(job Job, queue ...string) error

	// PushDelayed adds a job to the queue with a delay
	PushDelayed(job Job, delay time.Duration, queue ...string) error

	// Pop retrieves and removes the next job from the queue
	Pop(queue string) (Job, error)

	// Size returns the number of jobs in the queue
	Size(queue string) (int64, error)

	// Clear removes all jobs from the queue
	Clear(queue string) error

	// Failed moves a job to the failed queue
	Failed(job Job, err error, queue string) error
}

Driver defines the interface for queue drivers

func NewDatabaseDriver

func NewDatabaseDriver() Driver

NewDatabaseDriver creates a new database queue driver

func NewDatabaseQueue

func NewDatabaseQueue() (Driver, error)

NewDatabaseQueue creates a database queue from environment config

func NewMemoryQueue

func NewMemoryQueue() Driver

NewMemoryQueue creates an in-memory queue

func NewRedisQueue

func NewRedisQueue() (Driver, error)

NewRedisQueue creates a Redis queue from environment config

type FailedJobRecord

type FailedJobRecord struct {
	ID        uint `orm:"primaryKey;autoIncrement" json:"id"`
	Queue     string
	Payload   string    `orm:"type:text"`
	Exception string    `orm:"type:text"`
	CreatedAt time.Time `orm:"autoCreateTime" json:"created_at"`
	UpdatedAt time.Time `orm:"autoUpdateTime" json:"updated_at"`
}

FailedJobRecord represents a failed job

type GenericJob

type GenericJob struct {
	Payload *Payload
}

GenericJob is a wrapper for jobs in memory driver

func (*GenericJob) Failed

func (g *GenericJob) Failed(err error)

func (*GenericJob) Handle

func (g *GenericJob) Handle() error

func (*GenericJob) MarshalJSON

func (g *GenericJob) MarshalJSON() ([]byte, error)

MarshalJSON for GenericJob

type Job

type Job interface {
	Handle() error
	Failed(error)
}

Job represents a queue job

func GetJobFromWrapper

func GetJobFromWrapper(wrapper *JobWrapper) Job

GetJobFromWrapper retrieves the job from a wrapper

func Pop

func Pop(queue string) (Job, error)

Pop retrieves the next job from the queue

type JobFailed added in v0.8.0

type JobFailed struct {
	Context    context.Context
	JobType    string
	Queue      string
	Error      string
	DurationMs int64
	TraceID    string
	SpanID     string
	ParentID   string
}

JobFailed is dispatched when a job fails

func (*JobFailed) Name added in v0.8.0

func (e *JobFailed) Name() string

Name returns the event name

type JobProcessed added in v0.8.0

type JobProcessed struct {
	Context    context.Context
	JobType    string
	Queue      string
	DurationMs int64
	TraceID    string
	SpanID     string
	ParentID   string
}

JobProcessed is dispatched when a job completes successfully

func (*JobProcessed) Name added in v0.8.0

func (e *JobProcessed) Name() string

Name returns the event name

type JobProcessing added in v0.8.0

type JobProcessing struct {
	Context  context.Context
	JobType  string
	Queue    string
	TraceID  string
	SpanID   string
	ParentID string
}

JobProcessing is dispatched when a worker starts processing a job

func (*JobProcessing) Name added in v0.8.0

func (e *JobProcessing) Name() string

Name returns the event name

type JobQueued added in v0.8.0

type JobQueued struct {
	Context  context.Context
	JobType  string
	Queue    string
	Delayed  bool
	DelayMs  int64
	TraceID  string
	SpanID   string
	ParentID string
}

JobQueued is dispatched when a job is pushed to the queue

func (*JobQueued) Name added in v0.8.0

func (e *JobQueued) Name() string

Name returns the event name

type JobRecord

type JobRecord struct {
	ID           uint      `orm:"primaryKey;autoIncrement" json:"id"`
	Queue        string    `orm:"index"`
	Payload      string    `orm:"type:text"`
	Attempts     int       `orm:"default:0"`
	ScheduledAt  time.Time `orm:"index"`
	ReservedAt   *time.Time
	ReservedBy   *string
	FailedAt     *time.Time
	FailedReason *string
	CreatedAt    time.Time `orm:"autoCreateTime" json:"created_at"`
	UpdatedAt    time.Time `orm:"autoUpdateTime" json:"updated_at"`
}

JobRecord represents a job in the database

func (JobRecord) TableName

func (JobRecord) TableName() string

type JobRegistry

type JobRegistry struct {
	// contains filtered or unexported fields
}

JobRegistry for deserializing jobs

func (*JobRegistry) Deserialize

func (r *JobRegistry) Deserialize(payload *Payload) (Job, error)

Deserialize converts a payload back to a Job

type JobWrapper

type JobWrapper struct {
	Job     Job             `json:"-"` // The actual job instance
	Payload *Payload        `json:"payload"`
	RawData json.RawMessage `json:"raw_data"`
}

JobWrapper wraps a job with its metadata for internal storage

func CreateJobWrapper

func CreateJobWrapper(job Job, queueName string) (*JobWrapper, error)

CreateJobWrapper creates a wrapper for a job

type MemoryDriver

type MemoryDriver struct {
	// contains filtered or unexported fields
}

MemoryDriver implements Queue interface using in-memory storage

func NewMemoryDriver

func NewMemoryDriver() *MemoryDriver

NewMemoryDriver creates a new memory queue driver

func (*MemoryDriver) Clear

func (m *MemoryDriver) Clear(queueName string) error

Clear removes all jobs from the queue

func (*MemoryDriver) Close

func (m *MemoryDriver) Close() error

Close gracefully shuts down the driver

func (*MemoryDriver) Failed

func (m *MemoryDriver) Failed(job Job, err error, queueName string) error

Failed moves a job to the failed queue

func (*MemoryDriver) GetFailed

func (m *MemoryDriver) GetFailed(queueName string) ([]*failedJob, error)

GetFailed returns all failed jobs for a queue

func (*MemoryDriver) Pop

func (m *MemoryDriver) Pop(queueName string) (Job, error)

Pop retrieves and removes the next job from the queue

func (*MemoryDriver) Push

func (m *MemoryDriver) Push(job Job, queueName ...string) error

Push adds a job to the queue

func (*MemoryDriver) PushDelayed

func (m *MemoryDriver) PushDelayed(job Job, delay time.Duration, queueName ...string) error

PushDelayed adds a job to the queue with a delay

func (*MemoryDriver) Size

func (m *MemoryDriver) Size(queueName string) (int64, error)

Size returns the number of jobs in the queue

type Payload

type Payload struct {
	Type       string          `json:"type"`
	Data       json.RawMessage `json:"data"`
	Queue      string          `json:"queue"`
	Attempts   int             `json:"attempts"`
	CreatedAt  time.Time       `json:"created_at"`
	DatabaseID int64           `json:"-"` // Internal use for database driver
}

Payload represents a serialized job

func SerializeJob

func SerializeJob(job Job, queueName string) (*Payload, error)

SerializeJob converts a job to a payload

type Queue

type Queue = Driver

Queue is an alias for Driver interface for backward compatibility

type RedisConfig

type RedisConfig struct {
	Host     string
	Port     string
	Password string
	DB       string
}

RedisConfig holds Redis connection configuration

type RedisDriver

type RedisDriver struct {
	// contains filtered or unexported fields
}

RedisDriver implements Queue interface using Redis

func NewRedisDriver

func NewRedisDriver(config RedisConfig) (*RedisDriver, error)

NewRedisDriver creates a new Redis queue driver

func (*RedisDriver) Clear

func (r *RedisDriver) Clear(queueName string) error

Clear removes all jobs from the queue

func (*RedisDriver) Close

func (r *RedisDriver) Close() error

Close closes the Redis connection

func (*RedisDriver) Failed

func (r *RedisDriver) Failed(job Job, err error, queueName string) error

Failed moves a job to the failed queue

func (*RedisDriver) Pop

func (r *RedisDriver) Pop(queueName string) (Job, error)

Pop retrieves and removes the next job from the queue

func (*RedisDriver) Push

func (r *RedisDriver) Push(job Job, queueName ...string) error

Push adds a job to the queue

func (*RedisDriver) PushDelayed

func (r *RedisDriver) PushDelayed(job Job, delay time.Duration, queueName ...string) error

PushDelayed adds a job to the queue with a delay

func (*RedisDriver) Size

func (r *RedisDriver) Size(queueName string) (int64, error)

Size returns the number of jobs in the queue

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker processes jobs from a queue

func NewWorker

func NewWorker(queue Driver, queueName string, handler func(Job) error, opts ...WorkerOption) *Worker

NewWorker creates a new queue worker

func Work

func Work(queueName string, handler func(Job) error, opts ...WorkerOption) *Worker

Work is a global helper to start a worker

func (*Worker) Start

func (w *Worker) Start()

Start begins processing jobs

func (*Worker) Stop

func (w *Worker) Stop()

Stop gracefully stops the worker

type WorkerOption

type WorkerOption func(*Worker)

WorkerOption configures a worker

func WithConcurrency

func WithConcurrency(n int) WorkerOption

WithConcurrency sets the number of concurrent workers

func WithInterval

func WithInterval(d time.Duration) WorkerOption

WithInterval sets the polling interval

func WithMaxRetries

func WithMaxRetries(n int) WorkerOption

WithMaxRetries sets the maximum number of retries

func WithTimeout added in v0.9.0

func WithTimeout(d time.Duration) WorkerOption

WithTimeout sets the job processing timeout

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL