delayqueue

package
v0.0.0-...-7086fdf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2026 License: AGPL-3.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Handler

type Handler[T any] func(ctx context.Context, job *Job[T]) error

Handler 处理任务(泛型载荷)

type Job

type Job[T any] struct {
	ID       string
	Queue    string
	Payload  T
	Attempts int
}

Job 表示一个延迟任务(泛型载荷)

type Option

type Option[T any] func(*RedisDelayQueue[T])

Option 可选项(泛型载荷)

func WithBatchSize

func WithBatchSize[T any](n int) Option[T]

func WithJobTTL

func WithJobTTL[T any](d time.Duration) Option[T]

func WithMaxAttempts

func WithMaxAttempts[T any](n int) Option[T]

func WithPollInterval

func WithPollInterval[T any](d time.Duration) Option[T]

func WithPrefix

func WithPrefix[T any](prefix string) Option[T]

func WithRequeueDelay

func WithRequeueDelay[T any](d time.Duration) Option[T]

type RedisDelayQueue

type RedisDelayQueue[T any] struct {
	// contains filtered or unexported fields
}

RedisDelayQueue 基于 Redis 的延迟队列(泛型载荷)

func NewRedisDelayQueue

func NewRedisDelayQueue[T any](rdb *redis.Client, logger *slog.Logger, opts ...Option[T]) *RedisDelayQueue[T]

NewRedisDelayQueue 创建延迟队列(泛型)

func (*RedisDelayQueue[T]) Enqueue

func (q *RedisDelayQueue[T]) Enqueue(ctx context.Context, queue string, payload T, runAt time.Time, id string) (string, error)

Enqueue 入队:始终写入/覆盖 payload 并更新到期时间。 这样即使 pollOnce 在 handleJob 完成后删除了旧 job key, 新的 Enqueue 调用也能重建 job 数据,避免 ZSet 中存在孤立条目。

func (*RedisDelayQueue[T]) ExtendTTL

func (q *RedisDelayQueue[T]) ExtendTTL(ctx context.Context, queue, id string, ttl time.Duration) error

ExtendTTL 续期任务存储 TTL

func (*RedisDelayQueue[T]) GetJobInfo

func (q *RedisDelayQueue[T]) GetJobInfo(ctx context.Context, queue, id string) (*Job[T], time.Time, bool, error)

GetJobInfo 查询任务信息

func (*RedisDelayQueue[T]) Remove

func (q *RedisDelayQueue[T]) Remove(ctx context.Context, queue, id string) error

Remove 移除任务

func (*RedisDelayQueue[T]) RemoveByPrefix

func (q *RedisDelayQueue[T]) RemoveByPrefix(ctx context.Context, queue, prefix string) (int, error)

RemoveByPrefix 批量移除匹配前缀的任务

func (*RedisDelayQueue[T]) StartConsumer

func (q *RedisDelayQueue[T]) StartConsumer(ctx context.Context, queue string, handler Handler[T]) error

StartConsumer 启动消费循环(阻塞)

type TaskSummaryPayload

type TaskSummaryPayload struct {
	TaskID    string `json:"task_id"`
	CreatedAt int64  `json:"created_at"`
}

TaskSummaryPayload 任务摘要生成的载荷

type TaskSummaryQueue

type TaskSummaryQueue struct {
	*RedisDelayQueue[*TaskSummaryPayload]
}

TaskSummaryQueue 任务摘要生成延时队列

func NewTaskSummaryQueue

func NewTaskSummaryQueue(redis *redis.Client, logger *slog.Logger) *TaskSummaryQueue

NewTaskSummaryQueue 创建任务摘要生成延时队列

type VMExpireQueue

type VMExpireQueue struct {
	*RedisDelayQueue[*domain.VmExpireInfo]
}

VMExpireQueue VM 过期队列

func NewVMExpireQueue

func NewVMExpireQueue(rdb *redis.Client, logger *slog.Logger) *VMExpireQueue

type VMNotifyQueue

type VMNotifyQueue struct {
	*RedisDelayQueue[*domain.VmIdleInfo]
}

VMNotifyQueue 回收预警通知队列

func NewVMNotifyQueue

func NewVMNotifyQueue(rdb *redis.Client, logger *slog.Logger) *VMNotifyQueue

type VMRecycleQueue

type VMRecycleQueue struct {
	*RedisDelayQueue[*domain.VmIdleInfo]
}

VMRecycleQueue 空闲回收队列

func NewVMRecycleQueue

func NewVMRecycleQueue(rdb *redis.Client, logger *slog.Logger) *VMRecycleQueue

type VMSleepQueue

type VMSleepQueue struct {
	*RedisDelayQueue[*domain.VmIdleInfo]
}

VMSleepQueue 空闲休眠队列

func NewVMSleepQueue

func NewVMSleepQueue(rdb *redis.Client, logger *slog.Logger) *VMSleepQueue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL