queue

package
v1.1.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2025 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrQueueNotFound      = errors.New("queue: 队列不存在")
	ErrQueueAlreadyExists = errors.New("queue: 队列已存在")
	ErrJobNotFound        = errors.New("queue: 任务不存在")
	ErrInvalidJobID       = errors.New("queue: 无效的任务ID")
	ErrQueueFull          = errors.New("queue: 队列已满")
	ErrInvalidPayload     = errors.New("queue: 无效的任务负载")
)

常见错误定义

View Source
var (
	ErrDefaultQueueNotSet = errors.New("未设置默认队列")
)

队列管理器特有的错误定义

Functions

This section is empty.

Types

type Handler

type Handler func(ctx context.Context, job *Job) error

Handler 表示任务处理器

func ApplyMiddleware

func ApplyMiddleware(handler Handler, middlewares ...QueueMiddleware) Handler

ApplyMiddleware 应用中间件到任务处理器

type Job

type Job struct {
	ID          string                 `json:"id"`                     // 任务唯一标识
	Queue       string                 `json:"queue"`                  // 所属队列
	Name        string                 `json:"name"`                   // 任务名称
	Payload     map[string]interface{} `json:"payload"`                // 任务负载数据
	Attempts    int                    `json:"attempts"`               // 尝试次数
	MaxRetries  int                    `json:"max_retries"`            // 最大重试次数
	Status      JobStatus              `json:"status"`                 // 任务状态
	CreatedAt   time.Time              `json:"created_at"`             // 创建时间
	UpdatedAt   time.Time              `json:"updated_at"`             // 更新时间
	ScheduledAt *time.Time             `json:"scheduled_at,omitempty"` // 计划执行时间
	StartedAt   *time.Time             `json:"started_at,omitempty"`   // 开始执行时间
	FinishedAt  *time.Time             `json:"finished_at,omitempty"`  // 完成时间
	Error       string                 `json:"error,omitempty"`        // 错误信息
}

Job 表示队列中的一个任务

func (*Job) GetPayload

func (j *Job) GetPayload(v interface{}) error

GetPayload 将任务负载解析为指定类型

func (*Job) GetPayloadValue

func (j *Job) GetPayloadValue(key string) (interface{}, bool)

GetPayloadValue 从任务负载中获取指定键的值,返回值和是否存在

type JobStatus

type JobStatus string

JobStatus 表示任务的状态

const (
	JobStatusPending   JobStatus = "pending"   // 等待执行
	JobStatusScheduled JobStatus = "scheduled" // 已计划执行
	JobStatusRunning   JobStatus = "running"   // 执行中
	JobStatusCompleted JobStatus = "completed" // 已完成
	JobStatusFailed    JobStatus = "failed"    // 执行失败
	JobStatusRetrying  JobStatus = "retrying"  // 等待重试
	JobStatusCancelled JobStatus = "cancelled" // 已取消
)

type Queue

type Queue interface {
	// Push 将任务推送到队列
	Push(ctx context.Context, queueName string, jobName string, payload map[string]interface{}) (string, error)

	// PushWithDelay 将任务推送到队列,延迟指定时间后执行
	PushWithDelay(ctx context.Context, queueName string, jobName string, payload map[string]interface{}, delay time.Duration) (string, error)

	// Schedule 计划一个任务在指定时间执行
	Schedule(ctx context.Context, queueName string, jobName string, payload map[string]interface{}, scheduledAt time.Time) (string, error)

	// Get 获取任务信息
	Get(ctx context.Context, queueName string, jobID string) (*Job, error)

	// Delete 删除任务
	Delete(ctx context.Context, queueName string, jobID string) error

	// Clear 清空队列
	Clear(ctx context.Context, queueName string) error

	// Size 获取队列大小
	Size(ctx context.Context, queueName string) (int, error)

	// Register 注册任务处理器
	Register(jobName string, handler Handler)

	// ProcessNext 处理队列中的下一个任务
	ProcessNext(ctx context.Context, queueName string) error

	// StartWorker 启动工作进程处理任务
	StartWorker(ctx context.Context, queueName string, concurrency int) error

	// StopWorker 停止工作进程
	StopWorker(ctx context.Context, queueName string) error

	// Retry 重试失败的任务
	Retry(ctx context.Context, queueName string, jobID string) error
}

Queue 表示一个任务队列的抽象接口

type QueueEventListener

type QueueEventListener struct {
	// contains filtered or unexported fields
}

QueueEventListener 队列事件监听器,将事件转换为队列任务

func NewQueueEventListener

func NewQueueEventListener(manager *QueueManager, queueName string) *QueueEventListener

NewQueueEventListener 创建一个新的队列事件监听器

func (*QueueEventListener) Handle

func (l *QueueEventListener) Handle(evt event.Event) error

Handle 处理事件,将其转换为队列任务

func (*QueueEventListener) RegisterEventJob

func (l *QueueEventListener) RegisterEventJob(eventName, jobName string)

RegisterEventJob 注册事件到任务的映射

func (*QueueEventListener) ShouldHandle

func (l *QueueEventListener) ShouldHandle(evt event.Event) bool

ShouldHandle 判断是否应该处理此事件

type QueueEventProvider

type QueueEventProvider struct {
	// contains filtered or unexported fields
}

QueueEventProvider 将队列任务状态变化转换为事件

func NewQueueEventProvider

func NewQueueEventProvider(dispatcher event.Dispatcher) *QueueEventProvider

NewQueueEventProvider 创建一个新的队列事件提供者

func (*QueueEventProvider) JobStatusChanged

func (p *QueueEventProvider) JobStatusChanged(job *Job, oldStatus JobStatus)

JobStatusChanged 当任务状态改变时触发事件

type QueueManager

type QueueManager struct {
	// contains filtered or unexported fields
}

QueueManager 队列管理器实现

func NewQueueManager

func NewQueueManager() *QueueManager

NewQueueManager 创建一个新的队列管理器

func (*QueueManager) AddQueue

func (m *QueueManager) AddQueue(name string, queue Queue) error

AddQueue 添加队列到管理器

func (*QueueManager) DefaultQueue

func (m *QueueManager) DefaultQueue() Queue

DefaultQueue 获取默认队列

func (*QueueManager) GetDefaultQueue

func (m *QueueManager) GetDefaultQueue() (Queue, error)

GetDefaultQueue 获取默认队列

func (*QueueManager) GetDefaultQueueName

func (m *QueueManager) GetDefaultQueueName() (string, error)

GetDefaultQueueName 获取默认队列名称

func (*QueueManager) GetQueue

func (m *QueueManager) GetQueue(name string) (Queue, error)

GetQueue 获取指定队列

func (*QueueManager) HasQueue

func (m *QueueManager) HasQueue(name string) bool

HasQueue 检查队列是否存在

func (*QueueManager) ListQueues

func (m *QueueManager) ListQueues() []string

ListQueues 列出所有队列名称

func (*QueueManager) Push

func (m *QueueManager) Push(ctx context.Context, jobName string, payload map[string]interface{}) (string, error)

Push 使用默认队列推送任务

func (*QueueManager) PushWithDelay

func (m *QueueManager) PushWithDelay(ctx context.Context, jobName string, payload map[string]interface{}, delay time.Duration) (string, error)

PushWithDelay 使用默认队列延迟推送任务

func (*QueueManager) Register

func (m *QueueManager) Register(jobName string, handler Handler)

Register 为所有队列注册同一个处理器

func (*QueueManager) RegisterWithMiddleware

func (m *QueueManager) RegisterWithMiddleware(jobName string, handler Handler, middlewares ...QueueMiddleware)

RegisterWithMiddleware 使用中间件注册任务处理器

func (*QueueManager) RemoveQueue

func (m *QueueManager) RemoveQueue(name string) error

RemoveQueue 从管理器删除队列

func (*QueueManager) Schedule

func (m *QueueManager) Schedule(ctx context.Context, jobName string, payload map[string]interface{}, scheduledAt time.Time) (string, error)

Schedule 使用默认队列计划任务

func (*QueueManager) SetDefaultQueue

func (m *QueueManager) SetDefaultQueue(name string) error

SetDefaultQueue 设置默认队列

type QueueMiddleware

type QueueMiddleware func(next Handler) Handler

QueueMiddleware 队列中间件,为任务处理添加额外功能

func LoggingMiddleware

func LoggingMiddleware() QueueMiddleware

LoggingMiddleware 创建一个记录日志的中间件

func RetryMiddleware

func RetryMiddleware(maxRetries int) QueueMiddleware

RetryMiddleware 创建一个处理重试的中间件

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL