queue

package
v0.9.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2026 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CleanupJob added in v0.9.2

func CleanupJob(wrapper *JobWrapper)

CleanupJob removes a job from the in-memory store after processing. Call this after a job has been processed (success or failure) to prevent memory leaks.

func IsSigningEnabled added in v0.9.2

func IsSigningEnabled() bool

IsSigningEnabled returns whether payload signing is active

func Register

func Register(jobType string, handler func([]byte) (Job, error))

Register registers a job type for deserialization

func SetSigningKey added in v0.9.2

func SetSigningKey(key []byte)

SetSigningKey configures the HMAC key for queue payload signing. Pass nil or empty to disable signing.

Types

type DatabaseDriver

type DatabaseDriver struct {
	// contains filtered or unexported fields
}

DatabaseDriver implements the Driver interface using database

func NewDatabaseDriver

func NewDatabaseDriver(db *sql.DB) *DatabaseDriver

NewDatabaseDriver creates a new database queue driver with an injected *sql.DB.

func (*DatabaseDriver) Clear

func (d *DatabaseDriver) Clear(queueName string) error

Clear removes all jobs from a queue

func (*DatabaseDriver) Close added in v0.9.6

func (d *DatabaseDriver) Close() error

Close is a no-op for the database driver; the underlying DB connection is owned by the ORM and closed separately.

func (*DatabaseDriver) Failed

func (d *DatabaseDriver) Failed(job Job, err error, queueName string) error

Failed marks a job as failed

func (*DatabaseDriver) GetDelayedJobs

func (d *DatabaseDriver) GetDelayedJobs(queueName string) (int64, error)

GetDelayedJobs returns the number of delayed jobs

func (*DatabaseDriver) Pop

func (d *DatabaseDriver) Pop(queueName string) (Job, error)

Pop retrieves and removes a job from the queue

func (*DatabaseDriver) ProcessDelayedJobs

func (d *DatabaseDriver) ProcessDelayedJobs(queueName string) error

ProcessDelayedJobs moves ready delayed jobs to the main queue

func (*DatabaseDriver) Push

func (d *DatabaseDriver) Push(job Job, queueName ...string) error

Push adds a job to the queue

func (*DatabaseDriver) PushDelayed

func (d *DatabaseDriver) PushDelayed(job Job, delay time.Duration, queueName ...string) error

PushDelayed adds a delayed job to the queue

func (*DatabaseDriver) SetEventDispatcher added in v0.9.11

func (d *DatabaseDriver) SetEventDispatcher(fn func(event interface{}) error)

SetEventDispatcher sets the function used to dispatch events.

func (*DatabaseDriver) Size

func (d *DatabaseDriver) Size(queueName string) (int64, error)

Size returns the number of jobs in the queue

type Driver

type Driver interface {
	// Push adds a job to the queue
	Push(job Job, queue ...string) error

	// PushDelayed adds a job to the queue with a delay
	PushDelayed(job Job, delay time.Duration, queue ...string) error

	// Pop retrieves and removes the next job from the queue
	Pop(queue string) (Job, error)

	// Size returns the number of jobs in the queue
	Size(queue string) (int64, error)

	// Clear removes all jobs from the queue
	Clear(queue string) error

	// Failed moves a job to the failed queue
	Failed(job Job, err error, queue string) error

	// Close gracefully shuts down the driver, releasing resources.
	Close() error
}

Driver defines the interface for queue drivers

func NewQueue added in v0.9.5

func NewQueue(config QueueConfig) (Driver, error)

NewQueue creates a new queue driver from the given configuration.

type FailedJobRecord

type FailedJobRecord struct {
	ID        uint `orm:"primaryKey;autoIncrement" json:"id"`
	Queue     string
	Payload   string    `orm:"type:text"`
	Exception string    `orm:"type:text"`
	CreatedAt time.Time `orm:"autoCreateTime" json:"created_at"`
	UpdatedAt time.Time `orm:"autoUpdateTime" json:"updated_at"`
}

FailedJobRecord represents a failed job

type GenericJob

type GenericJob struct {
	Payload *Payload
}

GenericJob is a wrapper for jobs in memory driver

func (*GenericJob) Failed

func (g *GenericJob) Failed(err error)

func (*GenericJob) Handle

func (g *GenericJob) Handle() error

func (*GenericJob) MarshalJSON

func (g *GenericJob) MarshalJSON() ([]byte, error)

MarshalJSON for GenericJob

type Job

type Job interface {
	Handle() error
	Failed(error)
}

Job represents a queue job

func GetJobFromWrapper

func GetJobFromWrapper(wrapper *JobWrapper) Job

GetJobFromWrapper retrieves the job from a wrapper

type JobFailed added in v0.8.0

type JobFailed struct {
	Context    context.Context
	JobType    string
	Queue      string
	Error      string
	DurationMs int64
	TraceID    string
	SpanID     string
	ParentID   string
}

JobFailed is dispatched when a job fails

func (*JobFailed) Name added in v0.8.0

func (e *JobFailed) Name() string

Name returns the event name

type JobProcessed added in v0.8.0

type JobProcessed struct {
	Context    context.Context
	JobType    string
	Queue      string
	DurationMs int64
	TraceID    string
	SpanID     string
	ParentID   string
}

JobProcessed is dispatched when a job completes successfully

func (*JobProcessed) Name added in v0.8.0

func (e *JobProcessed) Name() string

Name returns the event name

type JobProcessing added in v0.8.0

type JobProcessing struct {
	Context  context.Context
	JobType  string
	Queue    string
	TraceID  string
	SpanID   string
	ParentID string
}

JobProcessing is dispatched when a worker starts processing a job

func (*JobProcessing) Name added in v0.8.0

func (e *JobProcessing) Name() string

Name returns the event name

type JobQueued added in v0.8.0

type JobQueued struct {
	Context  context.Context
	JobType  string
	Queue    string
	Delayed  bool
	DelayMs  int64
	TraceID  string
	SpanID   string
	ParentID string
}

JobQueued is dispatched when a job is pushed to the queue

func (*JobQueued) Name added in v0.8.0

func (e *JobQueued) Name() string

Name returns the event name

type JobRecord

type JobRecord struct {
	ID           uint      `orm:"primaryKey;autoIncrement" json:"id"`
	Queue        string    `orm:"index"`
	Payload      string    `orm:"type:text"`
	Attempts     int       `orm:"default:0"`
	ScheduledAt  time.Time `orm:"index"`
	ReservedAt   *time.Time
	ReservedBy   *string
	FailedAt     *time.Time
	FailedReason *string
	CreatedAt    time.Time `orm:"autoCreateTime" json:"created_at"`
	UpdatedAt    time.Time `orm:"autoUpdateTime" json:"updated_at"`
}

JobRecord represents a job in the database

func (JobRecord) TableName

func (JobRecord) TableName() string

type JobRegistry

type JobRegistry struct {
	// contains filtered or unexported fields
}

JobRegistry for deserializing jobs

func (*JobRegistry) Deserialize

func (r *JobRegistry) Deserialize(payload *Payload) (Job, error)

Deserialize converts a payload back to a Job

type JobWrapper

type JobWrapper struct {
	Job     Job             `json:"-"` // The actual job instance
	Payload *Payload        `json:"payload"`
	RawData json.RawMessage `json:"raw_data"`
}

JobWrapper wraps a job with its metadata for internal storage

func CreateJobWrapper

func CreateJobWrapper(job Job, queueName string) (*JobWrapper, error)

CreateJobWrapper creates a wrapper for a job

type MemoryDriver

type MemoryDriver struct {
	// contains filtered or unexported fields
}

MemoryDriver implements Queue interface using in-memory storage

func NewMemoryDriver

func NewMemoryDriver() *MemoryDriver

NewMemoryDriver creates a new memory queue driver

func (*MemoryDriver) Clear

func (m *MemoryDriver) Clear(queueName string) error

Clear removes all jobs from the queue

func (*MemoryDriver) Close

func (m *MemoryDriver) Close() error

Close gracefully shuts down the driver

func (*MemoryDriver) Failed

func (m *MemoryDriver) Failed(job Job, err error, queueName string) error

Failed moves a job to the failed queue

func (*MemoryDriver) GetFailed

func (m *MemoryDriver) GetFailed(queueName string) ([]*failedJob, error)

GetFailed returns all failed jobs for a queue

func (*MemoryDriver) Pop

func (m *MemoryDriver) Pop(queueName string) (Job, error)

Pop retrieves and removes the next job from the queue

func (*MemoryDriver) Push

func (m *MemoryDriver) Push(job Job, queueName ...string) error

Push adds a job to the queue

func (*MemoryDriver) PushDelayed

func (m *MemoryDriver) PushDelayed(job Job, delay time.Duration, queueName ...string) error

PushDelayed adds a job to the queue with a delay

func (*MemoryDriver) SetEventDispatcher added in v0.9.11

func (m *MemoryDriver) SetEventDispatcher(fn func(event interface{}) error)

SetEventDispatcher sets the function used to dispatch events.

func (*MemoryDriver) Size

func (m *MemoryDriver) Size(queueName string) (int64, error)

Size returns the number of jobs in the queue

type Payload

type Payload struct {
	Type       string          `json:"type"`
	Data       json.RawMessage `json:"data"`
	Queue      string          `json:"queue"`
	Attempts   int             `json:"attempts"`
	CreatedAt  time.Time       `json:"created_at"`
	Signature  string          `json:"signature,omitempty"` // HMAC-SHA256 integrity signature
	DatabaseID int64           `json:"-"`                   // Internal use for database driver
}

Payload represents a serialized job

func SerializeJob

func SerializeJob(job Job, queueName string) (*Payload, error)

SerializeJob converts a job to a payload

type Queue

type Queue = Driver

Queue is an alias for Driver interface for backward compatibility

type QueueConfig added in v0.9.5

type QueueConfig struct {
	// Driver is the queue driver to use: "memory", "redis", or "database".
	Driver string

	// Redis holds Redis-specific configuration. Required when Driver is "redis".
	Redis RedisConfig

	// DB holds a *sql.DB for the database driver. Required when Driver is "database".
	DB *sql.DB
}

QueueConfig holds configuration for creating a queue driver.

type RedisConfig

type RedisConfig struct {
	Host     string
	Port     string
	Password string // SENSITIVE: do not log
	DB       string
}

RedisConfig holds Redis connection configuration. The Password field contains sensitive credentials and must not be logged.

func (RedisConfig) String added in v0.9.2

func (c RedisConfig) String() string

String returns a safe representation with credentials redacted.

type RedisDriver

type RedisDriver struct {
	// contains filtered or unexported fields
}

RedisDriver implements Queue interface using Redis

func NewRedisDriver

func NewRedisDriver(config RedisConfig) (*RedisDriver, error)

NewRedisDriver creates a new Redis queue driver. Set REDIS_TLS=true environment variable to enable TLS connections.

func (*RedisDriver) Clear

func (r *RedisDriver) Clear(queueName string) error

Clear removes all jobs from the queue

func (*RedisDriver) Close

func (r *RedisDriver) Close() error

Close closes the Redis connection

func (*RedisDriver) Failed

func (r *RedisDriver) Failed(job Job, err error, queueName string) error

Failed moves a job to the failed queue

func (*RedisDriver) Pop

func (r *RedisDriver) Pop(queueName string) (Job, error)

Pop retrieves and removes the next job from the queue

func (*RedisDriver) Push

func (r *RedisDriver) Push(job Job, queueName ...string) error

Push adds a job to the queue

func (*RedisDriver) PushDelayed

func (r *RedisDriver) PushDelayed(job Job, delay time.Duration, queueName ...string) error

PushDelayed adds a job to the queue with a delay

func (*RedisDriver) SetEventDispatcher added in v0.9.11

func (r *RedisDriver) SetEventDispatcher(fn func(event interface{}) error)

SetEventDispatcher sets the function used to dispatch events.

func (*RedisDriver) Size

func (r *RedisDriver) Size(queueName string) (int64, error)

Size returns the number of jobs in the queue

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker processes jobs from a queue

func NewWorker

func NewWorker(queue Driver, queueName string, handler func(Job) error, opts ...WorkerOption) *Worker

NewWorker creates a new queue worker

func (*Worker) SetEventDispatcher added in v0.9.11

func (w *Worker) SetEventDispatcher(fn func(event interface{}) error)

SetEventDispatcher sets the function used to dispatch events.

func (*Worker) Start

func (w *Worker) Start()

Start begins processing jobs

func (*Worker) Stop

func (w *Worker) Stop()

Stop gracefully stops the worker

type WorkerLogger added in v0.9.12

type WorkerLogger interface {
	Info(msg string, kvs ...any)
	Error(msg string, kvs ...any)
}

WorkerLogger is the logging interface used by Worker. It is intentionally minimal to avoid coupling the queue package to a specific logging implementation. Any structured logger (e.g. velocity's log.Logger) satisfies this interface.

type WorkerOption

type WorkerOption func(*Worker)

WorkerOption configures a worker

func WithConcurrency

func WithConcurrency(n int) WorkerOption

WithConcurrency sets the number of concurrent workers

func WithInterval

func WithInterval(d time.Duration) WorkerOption

WithInterval sets the polling interval

func WithMaxRetries

func WithMaxRetries(n int) WorkerOption

WithMaxRetries sets the maximum number of retries

func WithTimeout added in v0.9.0

func WithTimeout(d time.Duration) WorkerOption

WithTimeout sets the job processing timeout

func WithWorkerLogger added in v0.9.12

func WithWorkerLogger(l WorkerLogger) WorkerOption

WithWorkerLogger sets the logger for the worker. If not set, the worker uses Go's standard log package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL