queue

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2025 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SerializeJob

func SerializeJob(job *Job) ([]byte, error)

SerializeJob serializes a job to JSON

Types

type FailedJob

type FailedJob struct {
	Job   *Job
	Error string
	Time  time.Time
}

type Handler

type Handler interface {
	Handle(ctx context.Context, job *Job) error
}

Handler processes jobs

type Job

type Job struct {
	ID        string                 `json:"id"`
	Queue     string                 `json:"queue"`
	Handler   string                 `json:"handler"`
	Payload   map[string]interface{} `json:"payload"`
	Attempts  int                    `json:"attempts"`
	MaxTries  int                    `json:"max_tries"`
	Timeout   time.Duration          `json:"timeout"`
	CreatedAt time.Time              `json:"created_at"`
	StartedAt *time.Time             `json:"started_at,omitempty"`
}

Job represents a queued job

func DeserializeJob

func DeserializeJob(data []byte) (*Job, error)

DeserializeJob deserializes a job from JSON

type JobError

type JobError struct {
	Message string
}

JobError represents a job processing error

func (*JobError) Error

func (e *JobError) Error() string

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages job handlers and queue operations

func NewManager

func NewManager(queue Queue) *Manager

NewManager creates a new queue manager

func (*Manager) Dispatch

func (m *Manager) Dispatch(ctx context.Context, handlerName string, payload map[string]interface{}) error

Dispatch pushes a job to the queue

func (*Manager) Register

func (m *Manager) Register(name string, handler Handler)

Register registers a job handler

func (*Manager) Work

func (m *Manager) Work(ctx context.Context, queueName string) error

Work starts processing jobs from the queue

type MemoryQueue

type MemoryQueue struct {
	// contains filtered or unexported fields
}

MemoryQueue implements Queue using in-memory channels (for testing/dev)

func NewMemoryQueue

func NewMemoryQueue() *MemoryQueue

NewMemoryQueue creates a new in-memory queue

func (*MemoryQueue) Ack

func (m *MemoryQueue) Ack(ctx context.Context, job *Job) error

func (*MemoryQueue) Close

func (m *MemoryQueue) Close() error

func (*MemoryQueue) Fail

func (m *MemoryQueue) Fail(ctx context.Context, job *Job, err error) error

func (*MemoryQueue) Failed

func (m *MemoryQueue) Failed(ctx context.Context, limit int) ([]*Job, error)

func (*MemoryQueue) Pop

func (m *MemoryQueue) Pop(ctx context.Context, queue string) (*Job, error)

func (*MemoryQueue) Push

func (m *MemoryQueue) Push(ctx context.Context, job *Job) error

func (*MemoryQueue) Retry

func (m *MemoryQueue) Retry(ctx context.Context, jobID string) error

type Queue

type Queue interface {
	// Push adds a job to the queue
	Push(ctx context.Context, job *Job) error
	// Pop retrieves a job from the queue
	Pop(ctx context.Context, queue string) (*Job, error)
	// Ack acknowledges job completion
	Ack(ctx context.Context, job *Job) error
	// Fail marks a job as failed
	Fail(ctx context.Context, job *Job, err error) error
	// Retry requeues a failed job
	Retry(ctx context.Context, jobID string) error
	// Failed returns failed jobs
	Failed(ctx context.Context, limit int) ([]*Job, error)
	// Close closes the queue connection
	Close() error
}

Queue interface for different queue backends

type RedisClient

type RedisClient interface {
	RPush(ctx context.Context, key string, values ...interface{}) error
	BLPop(ctx context.Context, timeout time.Duration, keys ...string) ([]string, error)
	LPush(ctx context.Context, key string, values ...interface{}) error
	LRange(ctx context.Context, key string, start, stop int64) ([]string, error)
	Del(ctx context.Context, keys ...string) error
	Close() error
}

RedisClient interface for Redis operations

type RedisQueue

type RedisQueue struct {
	// contains filtered or unexported fields
}

RedisQueue implements Queue using Redis lists

func NewRedisQueue

func NewRedisQueue(client RedisClient, prefix string) *RedisQueue

NewRedisQueue creates a new Redis queue

func (*RedisQueue) Ack

func (r *RedisQueue) Ack(ctx context.Context, job *Job) error

func (*RedisQueue) Close

func (r *RedisQueue) Close() error

func (*RedisQueue) Fail

func (r *RedisQueue) Fail(ctx context.Context, job *Job, err error) error

func (*RedisQueue) Failed

func (r *RedisQueue) Failed(ctx context.Context, limit int) ([]*Job, error)

func (*RedisQueue) Pop

func (r *RedisQueue) Pop(ctx context.Context, queue string) (*Job, error)

func (*RedisQueue) Push

func (r *RedisQueue) Push(ctx context.Context, job *Job) error

func (*RedisQueue) Retry

func (r *RedisQueue) Retry(ctx context.Context, jobID string) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL