Documentation
¶
Index ¶
- type Handler
- type Job
- type Option
- type RedisDelayQueue
- func (q *RedisDelayQueue[T]) Enqueue(ctx context.Context, queue string, payload T, runAt time.Time, id string) (string, error)
- func (q *RedisDelayQueue[T]) ExtendTTL(ctx context.Context, queue, id string, ttl time.Duration) error
- func (q *RedisDelayQueue[T]) GetJobInfo(ctx context.Context, queue, id string) (*Job[T], time.Time, bool, error)
- func (q *RedisDelayQueue[T]) Remove(ctx context.Context, queue, id string) error
- func (q *RedisDelayQueue[T]) RemoveByPrefix(ctx context.Context, queue, prefix string) (int, error)
- func (q *RedisDelayQueue[T]) StartConsumer(ctx context.Context, queue string, handler Handler[T]) error
- type TaskSummaryPayload
- type TaskSummaryQueue
- type VMExpireQueue
- type VMNotifyQueue
- type VMRecycleQueue
- type VMSleepQueue
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Option ¶
type Option[T any] func(*RedisDelayQueue[T])
Option 可选项(泛型载荷)
func WithBatchSize ¶
func WithMaxAttempts ¶
func WithPrefix ¶
type RedisDelayQueue ¶
type RedisDelayQueue[T any] struct { // contains filtered or unexported fields }
RedisDelayQueue 基于 Redis 的延迟队列(泛型载荷)
func NewRedisDelayQueue ¶
func NewRedisDelayQueue[T any](rdb *redis.Client, logger *slog.Logger, opts ...Option[T]) *RedisDelayQueue[T]
NewRedisDelayQueue 创建延迟队列(泛型)
func (*RedisDelayQueue[T]) Enqueue ¶
func (q *RedisDelayQueue[T]) Enqueue(ctx context.Context, queue string, payload T, runAt time.Time, id string) (string, error)
Enqueue 入队:始终写入/覆盖 payload 并更新到期时间。 这样即使 pollOnce 在 handleJob 完成后删除了旧 job key, 新的 Enqueue 调用也能重建 job 数据,避免 ZSet 中存在孤立条目。
func (*RedisDelayQueue[T]) ExtendTTL ¶
func (q *RedisDelayQueue[T]) ExtendTTL(ctx context.Context, queue, id string, ttl time.Duration) error
ExtendTTL 续期任务存储 TTL
func (*RedisDelayQueue[T]) GetJobInfo ¶
func (q *RedisDelayQueue[T]) GetJobInfo(ctx context.Context, queue, id string) (*Job[T], time.Time, bool, error)
GetJobInfo 查询任务信息
func (*RedisDelayQueue[T]) Remove ¶
func (q *RedisDelayQueue[T]) Remove(ctx context.Context, queue, id string) error
Remove 移除任务
func (*RedisDelayQueue[T]) RemoveByPrefix ¶
RemoveByPrefix 批量移除匹配前缀的任务
func (*RedisDelayQueue[T]) StartConsumer ¶
func (q *RedisDelayQueue[T]) StartConsumer(ctx context.Context, queue string, handler Handler[T]) error
StartConsumer 启动消费循环(阻塞)
type TaskSummaryPayload ¶
type TaskSummaryPayload struct {
TaskID string `json:"task_id"`
CreatedAt int64 `json:"created_at"`
}
TaskSummaryPayload 任务摘要生成的载荷
type TaskSummaryQueue ¶
type TaskSummaryQueue struct {
*RedisDelayQueue[*TaskSummaryPayload]
}
TaskSummaryQueue 任务摘要生成延时队列
func NewTaskSummaryQueue ¶
func NewTaskSummaryQueue(redis *redis.Client, logger *slog.Logger) *TaskSummaryQueue
NewTaskSummaryQueue 创建任务摘要生成延时队列
type VMExpireQueue ¶
type VMExpireQueue struct {
*RedisDelayQueue[*domain.VmExpireInfo]
}
VMExpireQueue VM 过期队列
func NewVMExpireQueue ¶
func NewVMExpireQueue(rdb *redis.Client, logger *slog.Logger) *VMExpireQueue
type VMNotifyQueue ¶
type VMNotifyQueue struct {
*RedisDelayQueue[*domain.VmIdleInfo]
}
VMNotifyQueue 回收预警通知队列
func NewVMNotifyQueue ¶
func NewVMNotifyQueue(rdb *redis.Client, logger *slog.Logger) *VMNotifyQueue
type VMRecycleQueue ¶
type VMRecycleQueue struct {
*RedisDelayQueue[*domain.VmIdleInfo]
}
VMRecycleQueue 空闲回收队列
func NewVMRecycleQueue ¶
func NewVMRecycleQueue(rdb *redis.Client, logger *slog.Logger) *VMRecycleQueue
type VMSleepQueue ¶
type VMSleepQueue struct {
*RedisDelayQueue[*domain.VmIdleInfo]
}
VMSleepQueue 空闲休眠队列
func NewVMSleepQueue ¶
func NewVMSleepQueue(rdb *redis.Client, logger *slog.Logger) *VMSleepQueue
Click to show internal directories.
Click to hide internal directories.