queue

package
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 1, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Clear

func Clear(queue string) error

Clear removes all jobs from the queue

func Later

func Later(delay time.Duration, job Job, queue ...string) error

Later adds a delayed job to the queue

func Push

func Push(job Job, queue ...string) error

Push adds a job to the default or specified queue

func Register

func Register(jobType string, handler func([]byte) (Job, error))

Register registers a job type for deserialization

func Reinitialize

func Reinitialize() error

Reinitialize reinitializes the queue driver from environment

func SetDefault

func SetDefault(d Driver)

SetDefault sets the global queue instance

func Size

func Size(queue string) (int64, error)

Size returns the number of jobs in the queue

Types

type DatabaseDriver

type DatabaseDriver struct {
	// contains filtered or unexported fields
}

DatabaseDriver implements the Driver interface using database

func (*DatabaseDriver) Clear

func (d *DatabaseDriver) Clear(queueName string) error

Clear removes all jobs from a queue

func (*DatabaseDriver) Failed

func (d *DatabaseDriver) Failed(job Job, err error, queueName string) error

Failed marks a job as failed

func (*DatabaseDriver) GetDelayedJobs

func (d *DatabaseDriver) GetDelayedJobs(queueName string) (int64, error)

GetDelayedJobs returns the number of delayed jobs

func (*DatabaseDriver) Pop

func (d *DatabaseDriver) Pop(queueName string) (Job, error)

Pop retrieves and removes a job from the queue

func (*DatabaseDriver) ProcessDelayedJobs

func (d *DatabaseDriver) ProcessDelayedJobs(queueName string) error

ProcessDelayedJobs moves ready delayed jobs to the main queue

func (*DatabaseDriver) Push

func (d *DatabaseDriver) Push(job Job, queueName ...string) error

Push adds a job to the queue

func (*DatabaseDriver) PushDelayed

func (d *DatabaseDriver) PushDelayed(job Job, delay time.Duration, queueName ...string) error

PushDelayed adds a delayed job to the queue

func (*DatabaseDriver) Size

func (d *DatabaseDriver) Size(queueName string) (int64, error)

Size returns the number of jobs in the queue

type Driver

type Driver interface {
	// Push adds a job to the queue
	Push(job Job, queue ...string) error

	// PushDelayed adds a job to the queue with a delay
	PushDelayed(job Job, delay time.Duration, queue ...string) error

	// Pop retrieves and removes the next job from the queue
	Pop(queue string) (Job, error)

	// Size returns the number of jobs in the queue
	Size(queue string) (int64, error)

	// Clear removes all jobs from the queue
	Clear(queue string) error

	// Failed moves a job to the failed queue
	Failed(job Job, err error, queue string) error
}

Driver defines the interface for queue drivers

func NewDatabaseDriver

func NewDatabaseDriver() Driver

NewDatabaseDriver creates a new database queue driver

func NewDatabaseQueue

func NewDatabaseQueue() (Driver, error)

NewDatabaseQueue creates a database queue from environment config

func NewMemoryQueue

func NewMemoryQueue() Driver

NewMemoryQueue creates an in-memory queue

func NewRedisQueue

func NewRedisQueue() (Driver, error)

NewRedisQueue creates a Redis queue from environment config

type FailedJobRecord

type FailedJobRecord struct {
	ID        uint `orm:"primaryKey;autoIncrement" json:"id"`
	Queue     string
	Payload   string    `orm:"type:text"`
	Exception string    `orm:"type:text"`
	CreatedAt time.Time `orm:"autoCreateTime" json:"created_at"`
	UpdatedAt time.Time `orm:"autoUpdateTime" json:"updated_at"`
}

FailedJobRecord represents a failed job

type GenericJob

type GenericJob struct {
	Payload *Payload
}

GenericJob is a wrapper for jobs in memory driver

func (*GenericJob) Failed

func (g *GenericJob) Failed(err error)

func (*GenericJob) Handle

func (g *GenericJob) Handle() error

func (*GenericJob) MarshalJSON

func (g *GenericJob) MarshalJSON() ([]byte, error)

MarshalJSON for GenericJob

type Job

type Job interface {
	Handle() error
	Failed(error)
}

Job represents a queue job

func GetJobFromWrapper

func GetJobFromWrapper(wrapper *JobWrapper) Job

GetJobFromWrapper retrieves the job from a wrapper

func Pop

func Pop(queue string) (Job, error)

Pop retrieves the next job from the queue

type JobRecord

type JobRecord struct {
	ID           uint      `orm:"primaryKey;autoIncrement" json:"id"`
	Queue        string    `orm:"index"`
	Payload      string    `orm:"type:text"`
	Attempts     int       `orm:"default:0"`
	ScheduledAt  time.Time `orm:"index"`
	ReservedAt   *time.Time
	ReservedBy   *string
	FailedAt     *time.Time
	FailedReason *string
	CreatedAt    time.Time `orm:"autoCreateTime" json:"created_at"`
	UpdatedAt    time.Time `orm:"autoUpdateTime" json:"updated_at"`
}

JobRecord represents a job in the database

func (JobRecord) TableName

func (JobRecord) TableName() string

type JobRegistry

type JobRegistry struct {
	// contains filtered or unexported fields
}

JobRegistry for deserializing jobs

func (*JobRegistry) Deserialize

func (r *JobRegistry) Deserialize(payload *Payload) (Job, error)

Deserialize converts a payload back to a Job

type JobWrapper

type JobWrapper struct {
	Job     Job             `json:"-"` // The actual job instance
	Payload *Payload        `json:"payload"`
	RawData json.RawMessage `json:"raw_data"`
}

JobWrapper wraps a job with its metadata for internal storage

func CreateJobWrapper

func CreateJobWrapper(job Job, queueName string) (*JobWrapper, error)

CreateJobWrapper creates a wrapper for a job

type MemoryDriver

type MemoryDriver struct {
	// contains filtered or unexported fields
}

MemoryDriver implements Queue interface using in-memory storage

func NewMemoryDriver

func NewMemoryDriver() *MemoryDriver

NewMemoryDriver creates a new memory queue driver

func (*MemoryDriver) Clear

func (m *MemoryDriver) Clear(queueName string) error

Clear removes all jobs from the queue

func (*MemoryDriver) Close

func (m *MemoryDriver) Close() error

Close gracefully shuts down the driver

func (*MemoryDriver) Failed

func (m *MemoryDriver) Failed(job Job, err error, queueName string) error

Failed moves a job to the failed queue

func (*MemoryDriver) GetFailed

func (m *MemoryDriver) GetFailed(queueName string) ([]*failedJob, error)

GetFailed returns all failed jobs for a queue

func (*MemoryDriver) Pop

func (m *MemoryDriver) Pop(queueName string) (Job, error)

Pop retrieves and removes the next job from the queue

func (*MemoryDriver) Push

func (m *MemoryDriver) Push(job Job, queueName ...string) error

Push adds a job to the queue

func (*MemoryDriver) PushDelayed

func (m *MemoryDriver) PushDelayed(job Job, delay time.Duration, queueName ...string) error

PushDelayed adds a job to the queue with a delay

func (*MemoryDriver) Size

func (m *MemoryDriver) Size(queueName string) (int64, error)

Size returns the number of jobs in the queue

type Payload

type Payload struct {
	Type       string          `json:"type"`
	Data       json.RawMessage `json:"data"`
	Queue      string          `json:"queue"`
	Attempts   int             `json:"attempts"`
	CreatedAt  time.Time       `json:"created_at"`
	DatabaseID int64           `json:"-"` // Internal use for database driver
}

Payload represents a serialized job

func SerializeJob

func SerializeJob(job Job, queueName string) (*Payload, error)

SerializeJob converts a job to a payload

type Queue

type Queue = Driver

Queue is an alias for Driver interface for backward compatibility

type RedisConfig

type RedisConfig struct {
	Host     string
	Port     string
	Password string
	DB       string
}

RedisConfig holds Redis connection configuration

type RedisDriver

type RedisDriver struct {
	// contains filtered or unexported fields
}

RedisDriver implements Queue interface using Redis

func NewRedisDriver

func NewRedisDriver(config RedisConfig) (*RedisDriver, error)

NewRedisDriver creates a new Redis queue driver

func (*RedisDriver) Clear

func (r *RedisDriver) Clear(queueName string) error

Clear removes all jobs from the queue

func (*RedisDriver) Close

func (r *RedisDriver) Close() error

Close closes the Redis connection

func (*RedisDriver) Failed

func (r *RedisDriver) Failed(job Job, err error, queueName string) error

Failed moves a job to the failed queue

func (*RedisDriver) Pop

func (r *RedisDriver) Pop(queueName string) (Job, error)

Pop retrieves and removes the next job from the queue

func (*RedisDriver) Push

func (r *RedisDriver) Push(job Job, queueName ...string) error

Push adds a job to the queue

func (*RedisDriver) PushDelayed

func (r *RedisDriver) PushDelayed(job Job, delay time.Duration, queueName ...string) error

PushDelayed adds a job to the queue with a delay

func (*RedisDriver) Size

func (r *RedisDriver) Size(queueName string) (int64, error)

Size returns the number of jobs in the queue

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker processes jobs from a queue

func NewWorker

func NewWorker(queue Driver, queueName string, handler func(Job) error, opts ...WorkerOption) *Worker

NewWorker creates a new queue worker

func Work

func Work(queueName string, handler func(Job) error, opts ...WorkerOption) *Worker

Work is a global helper to start a worker

func (*Worker) Start

func (w *Worker) Start()

Start begins processing jobs

func (*Worker) Stop

func (w *Worker) Stop()

Stop gracefully stops the worker

type WorkerOption

type WorkerOption func(*Worker)

WorkerOption configures a worker

func WithConcurrency

func WithConcurrency(n int) WorkerOption

WithConcurrency sets the number of concurrent workers

func WithInterval

func WithInterval(d time.Duration) WorkerOption

WithInterval sets the polling interval

func WithMaxRetries

func WithMaxRetries(n int) WorkerOption

WithMaxRetries sets the maximum number of retries

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL