Documentation
¶
Index ¶
- func CleanupJob(wrapper *JobWrapper)
- func IsSigningEnabled() bool
- func Register(jobType string, handler func([]byte) (Job, error))
- func SetSigningKey(key []byte)
- type DatabaseDriver
- func (d *DatabaseDriver) Clear(queueName string) error
- func (d *DatabaseDriver) Close() error
- func (d *DatabaseDriver) Failed(job Job, err error, queueName string) error
- func (d *DatabaseDriver) GetDelayedJobs(queueName string) (int64, error)
- func (d *DatabaseDriver) Pop(queueName string) (Job, error)
- func (d *DatabaseDriver) ProcessDelayedJobs(queueName string) error
- func (d *DatabaseDriver) Push(job Job, queueName ...string) error
- func (d *DatabaseDriver) PushDelayed(job Job, delay time.Duration, queueName ...string) error
- func (d *DatabaseDriver) SetEventDispatcher(fn func(event interface{}) error)
- func (d *DatabaseDriver) Size(queueName string) (int64, error)
- type Driver
- type FailedJobRecord
- type GenericJob
- type Job
- type JobFailed
- type JobProcessed
- type JobProcessing
- type JobQueued
- type JobRecord
- type JobRegistry
- type JobWrapper
- type MemoryDriver
- func (m *MemoryDriver) Clear(queueName string) error
- func (m *MemoryDriver) Close() error
- func (m *MemoryDriver) Failed(job Job, err error, queueName string) error
- func (m *MemoryDriver) GetFailed(queueName string) ([]*failedJob, error)
- func (m *MemoryDriver) Pop(queueName string) (Job, error)
- func (m *MemoryDriver) Push(job Job, queueName ...string) error
- func (m *MemoryDriver) PushDelayed(job Job, delay time.Duration, queueName ...string) error
- func (m *MemoryDriver) SetEventDispatcher(fn func(event interface{}) error)
- func (m *MemoryDriver) Size(queueName string) (int64, error)
- type Payload
- type Queue
- type QueueConfig
- type RedisConfig
- type RedisDriver
- func (r *RedisDriver) Clear(queueName string) error
- func (r *RedisDriver) Close() error
- func (r *RedisDriver) Failed(job Job, err error, queueName string) error
- func (r *RedisDriver) Pop(queueName string) (Job, error)
- func (r *RedisDriver) Push(job Job, queueName ...string) error
- func (r *RedisDriver) PushDelayed(job Job, delay time.Duration, queueName ...string) error
- func (r *RedisDriver) SetEventDispatcher(fn func(event interface{}) error)
- func (r *RedisDriver) Size(queueName string) (int64, error)
- type Worker
- type WorkerOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CleanupJob ¶ added in v0.9.2
func CleanupJob(wrapper *JobWrapper)
CleanupJob removes a job from the in-memory store after processing. Call this after a job has been processed (success or failure) to prevent memory leaks.
func IsSigningEnabled ¶ added in v0.9.2
func IsSigningEnabled() bool
IsSigningEnabled returns whether payload signing is active
func SetSigningKey ¶ added in v0.9.2
func SetSigningKey(key []byte)
SetSigningKey configures the HMAC key for queue payload signing. Pass nil or empty to disable signing.
Types ¶
type DatabaseDriver ¶
type DatabaseDriver struct {
// contains filtered or unexported fields
}
DatabaseDriver implements the Driver interface using database
func NewDatabaseDriver ¶
func NewDatabaseDriver(db *sql.DB) *DatabaseDriver
NewDatabaseDriver creates a new database queue driver with an injected *sql.DB.
func (*DatabaseDriver) Clear ¶
func (d *DatabaseDriver) Clear(queueName string) error
Clear removes all jobs from a queue
func (*DatabaseDriver) Close ¶ added in v0.9.6
func (d *DatabaseDriver) Close() error
Close is a no-op for the database driver; the underlying DB connection is owned by the ORM and closed separately.
func (*DatabaseDriver) Failed ¶
func (d *DatabaseDriver) Failed(job Job, err error, queueName string) error
Failed marks a job as failed
func (*DatabaseDriver) GetDelayedJobs ¶
func (d *DatabaseDriver) GetDelayedJobs(queueName string) (int64, error)
GetDelayedJobs returns the number of delayed jobs
func (*DatabaseDriver) Pop ¶
func (d *DatabaseDriver) Pop(queueName string) (Job, error)
Pop retrieves and removes a job from the queue
func (*DatabaseDriver) ProcessDelayedJobs ¶
func (d *DatabaseDriver) ProcessDelayedJobs(queueName string) error
ProcessDelayedJobs moves ready delayed jobs to the main queue
func (*DatabaseDriver) Push ¶
func (d *DatabaseDriver) Push(job Job, queueName ...string) error
Push adds a job to the queue
func (*DatabaseDriver) PushDelayed ¶
PushDelayed adds a delayed job to the queue
func (*DatabaseDriver) SetEventDispatcher ¶ added in v0.9.11
func (d *DatabaseDriver) SetEventDispatcher(fn func(event interface{}) error)
SetEventDispatcher sets the function used to dispatch events.
type Driver ¶
type Driver interface {
// Push adds a job to the queue
Push(job Job, queue ...string) error
// PushDelayed adds a job to the queue with a delay
PushDelayed(job Job, delay time.Duration, queue ...string) error
// Pop retrieves and removes the next job from the queue
Pop(queue string) (Job, error)
// Size returns the number of jobs in the queue
Size(queue string) (int64, error)
// Clear removes all jobs from the queue
Clear(queue string) error
// Failed moves a job to the failed queue
Failed(job Job, err error, queue string) error
// Close gracefully shuts down the driver, releasing resources.
Close() error
}
Driver defines the interface for queue drivers
func NewQueue ¶ added in v0.9.5
func NewQueue(config QueueConfig) (Driver, error)
NewQueue creates a new queue driver from the given configuration.
type FailedJobRecord ¶
type FailedJobRecord struct {
ID uint `orm:"primaryKey;autoIncrement" json:"id"`
Queue string
Payload string `orm:"type:text"`
Exception string `orm:"type:text"`
CreatedAt time.Time `orm:"autoCreateTime" json:"created_at"`
UpdatedAt time.Time `orm:"autoUpdateTime" json:"updated_at"`
}
FailedJobRecord represents a failed job
type GenericJob ¶
type GenericJob struct {
Payload *Payload
}
GenericJob is a wrapper for jobs in memory driver
func (*GenericJob) Failed ¶
func (g *GenericJob) Failed(err error)
func (*GenericJob) Handle ¶
func (g *GenericJob) Handle() error
func (*GenericJob) MarshalJSON ¶
func (g *GenericJob) MarshalJSON() ([]byte, error)
MarshalJSON for GenericJob
type Job ¶
Job represents a queue job
func GetJobFromWrapper ¶
func GetJobFromWrapper(wrapper *JobWrapper) Job
GetJobFromWrapper retrieves the job from a wrapper
type JobFailed ¶ added in v0.8.0
type JobFailed struct {
Context context.Context
JobType string
Queue string
Error string
DurationMs int64
TraceID string
SpanID string
ParentID string
}
JobFailed is dispatched when a job fails
type JobProcessed ¶ added in v0.8.0
type JobProcessed struct {
Context context.Context
JobType string
Queue string
DurationMs int64
TraceID string
SpanID string
ParentID string
}
JobProcessed is dispatched when a job completes successfully
func (*JobProcessed) Name ¶ added in v0.8.0
func (e *JobProcessed) Name() string
Name returns the event name
type JobProcessing ¶ added in v0.8.0
type JobProcessing struct {
Context context.Context
JobType string
Queue string
TraceID string
SpanID string
ParentID string
}
JobProcessing is dispatched when a worker starts processing a job
func (*JobProcessing) Name ¶ added in v0.8.0
func (e *JobProcessing) Name() string
Name returns the event name
type JobQueued ¶ added in v0.8.0
type JobQueued struct {
Context context.Context
JobType string
Queue string
Delayed bool
DelayMs int64
TraceID string
SpanID string
ParentID string
}
JobQueued is dispatched when a job is pushed to the queue
type JobRecord ¶
type JobRecord struct {
ID uint `orm:"primaryKey;autoIncrement" json:"id"`
Queue string `orm:"index"`
Payload string `orm:"type:text"`
Attempts int `orm:"default:0"`
ScheduledAt time.Time `orm:"index"`
ReservedAt *time.Time
ReservedBy *string
FailedAt *time.Time
FailedReason *string
CreatedAt time.Time `orm:"autoCreateTime" json:"created_at"`
UpdatedAt time.Time `orm:"autoUpdateTime" json:"updated_at"`
}
JobRecord represents a job in the database
type JobRegistry ¶
type JobRegistry struct {
// contains filtered or unexported fields
}
JobRegistry for deserializing jobs
func (*JobRegistry) Deserialize ¶
func (r *JobRegistry) Deserialize(payload *Payload) (Job, error)
Deserialize converts a payload back to a Job
type JobWrapper ¶
type JobWrapper struct {
Job Job `json:"-"` // The actual job instance
Payload *Payload `json:"payload"`
RawData json.RawMessage `json:"raw_data"`
}
JobWrapper wraps a job with its metadata for internal storage
func CreateJobWrapper ¶
func CreateJobWrapper(job Job, queueName string) (*JobWrapper, error)
CreateJobWrapper creates a wrapper for a job
type MemoryDriver ¶
type MemoryDriver struct {
// contains filtered or unexported fields
}
MemoryDriver implements Queue interface using in-memory storage
func NewMemoryDriver ¶
func NewMemoryDriver() *MemoryDriver
NewMemoryDriver creates a new memory queue driver
func (*MemoryDriver) Clear ¶
func (m *MemoryDriver) Clear(queueName string) error
Clear removes all jobs from the queue
func (*MemoryDriver) Close ¶
func (m *MemoryDriver) Close() error
Close gracefully shuts down the driver
func (*MemoryDriver) Failed ¶
func (m *MemoryDriver) Failed(job Job, err error, queueName string) error
Failed moves a job to the failed queue
func (*MemoryDriver) GetFailed ¶
func (m *MemoryDriver) GetFailed(queueName string) ([]*failedJob, error)
GetFailed returns all failed jobs for a queue
func (*MemoryDriver) Pop ¶
func (m *MemoryDriver) Pop(queueName string) (Job, error)
Pop retrieves and removes the next job from the queue
func (*MemoryDriver) Push ¶
func (m *MemoryDriver) Push(job Job, queueName ...string) error
Push adds a job to the queue
func (*MemoryDriver) PushDelayed ¶
PushDelayed adds a job to the queue with a delay
func (*MemoryDriver) SetEventDispatcher ¶ added in v0.9.11
func (m *MemoryDriver) SetEventDispatcher(fn func(event interface{}) error)
SetEventDispatcher sets the function used to dispatch events.
type Payload ¶
type Payload struct {
Type string `json:"type"`
Data json.RawMessage `json:"data"`
Queue string `json:"queue"`
Attempts int `json:"attempts"`
CreatedAt time.Time `json:"created_at"`
Signature string `json:"signature,omitempty"` // HMAC-SHA256 integrity signature
DatabaseID int64 `json:"-"` // Internal use for database driver
}
Payload represents a serialized job
type QueueConfig ¶ added in v0.9.5
type QueueConfig struct {
// Driver is the queue driver to use: "memory", "redis", or "database".
Driver string
// Redis holds Redis-specific configuration. Required when Driver is "redis".
Redis RedisConfig
// DB holds a *sql.DB for the database driver. Required when Driver is "database".
DB *sql.DB
}
QueueConfig holds configuration for creating a queue driver.
type RedisConfig ¶
type RedisConfig struct {
Host string
Port string
Password string // SENSITIVE: do not log
DB string
}
RedisConfig holds Redis connection configuration. The Password field contains sensitive credentials and must not be logged.
func (RedisConfig) String ¶ added in v0.9.2
func (c RedisConfig) String() string
String returns a safe representation with credentials redacted.
type RedisDriver ¶
type RedisDriver struct {
// contains filtered or unexported fields
}
RedisDriver implements Queue interface using Redis
func NewRedisDriver ¶
func NewRedisDriver(config RedisConfig) (*RedisDriver, error)
NewRedisDriver creates a new Redis queue driver. Set REDIS_TLS=true environment variable to enable TLS connections.
func (*RedisDriver) Clear ¶
func (r *RedisDriver) Clear(queueName string) error
Clear removes all jobs from the queue
func (*RedisDriver) Failed ¶
func (r *RedisDriver) Failed(job Job, err error, queueName string) error
Failed moves a job to the failed queue
func (*RedisDriver) Pop ¶
func (r *RedisDriver) Pop(queueName string) (Job, error)
Pop retrieves and removes the next job from the queue
func (*RedisDriver) Push ¶
func (r *RedisDriver) Push(job Job, queueName ...string) error
Push adds a job to the queue
func (*RedisDriver) PushDelayed ¶
PushDelayed adds a job to the queue with a delay
func (*RedisDriver) SetEventDispatcher ¶ added in v0.9.11
func (r *RedisDriver) SetEventDispatcher(fn func(event interface{}) error)
SetEventDispatcher sets the function used to dispatch events.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker processes jobs from a queue
func NewWorker ¶
func NewWorker(queue Driver, queueName string, handler func(Job) error, opts ...WorkerOption) *Worker
NewWorker creates a new queue worker
func (*Worker) SetEventDispatcher ¶ added in v0.9.11
SetEventDispatcher sets the function used to dispatch events.
type WorkerOption ¶
type WorkerOption func(*Worker)
WorkerOption configures a worker
func WithConcurrency ¶
func WithConcurrency(n int) WorkerOption
WithConcurrency sets the number of concurrent workers
func WithInterval ¶
func WithInterval(d time.Duration) WorkerOption
WithInterval sets the polling interval
func WithMaxRetries ¶
func WithMaxRetries(n int) WorkerOption
WithMaxRetries sets the maximum number of retries
func WithTimeout ¶ added in v0.9.0
func WithTimeout(d time.Duration) WorkerOption
WithTimeout sets the job processing timeout