Documentation
¶
Index ¶
- Variables
- type EventHandler
- type Opt
- type Phase
- type Task
- type TaskUpdateFunc
- type Tasker
- func (t *Tasker[T]) ClaimStalePending(ctx context.Context, phase Phase, group, consumer string, ...) error
- func (t *Tasker[T]) CleanupTask(ctx context.Context, id string, _ Phase) error
- func (t *Tasker[T]) CreateTask(ctx context.Context, id string, payload T) error
- func (t *Tasker[T]) EnsureStreamsAndGroup(ctx context.Context, group string) error
- func (t *Tasker[T]) GetTask(ctx context.Context, id string) (Task[T], error)
- func (t *Tasker[T]) On(phase Phase, handler EventHandler[T])
- func (t *Tasker[T]) ReconcileStartup(ctx context.Context) error
- func (t *Tasker[T]) SetTaskTTL(ctx context.Context, id string, ttl time.Duration) error
- func (t *Tasker[T]) StartGroupConsumers(ctx context.Context, group, consumer string, block time.Duration, count int64) error
- func (t *Tasker[T]) Transition(ctx context.Context, id string, to Phase) error
- func (t *Tasker[T]) TransitionWithUpdate(ctx context.Context, id string, to Phase, update TaskUpdateFunc[T]) error
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type EventHandler ¶
EventHandler 事件处理器
type Opt ¶
Opt 配置选项
func WithStreamTrimEnabled ¶
WithStreamTrimEnabled 设置是否启用 stream 裁剪
func WithStreamTrimInterval ¶
WithStreamTrimInterval 设置 stream 裁剪间隔
type Task ¶
type Task[T any] struct { ID string `json:"id"` Phase Phase `json:"phase"` Payload T `json:"payload,omitempty"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` }
Task 泛型任务
type Tasker ¶
type Tasker[T any] struct { // contains filtered or unexported fields }
Tasker 泛型 Redis 任务状态机
func (*Tasker[T]) ClaimStalePending ¶
func (t *Tasker[T]) ClaimStalePending(ctx context.Context, phase Phase, group, consumer string, minIdle time.Duration, count int64) error
ClaimStalePending 认领陈旧的待处理消息
func (*Tasker[T]) CleanupTask ¶
CleanupTask 清理任务
func (*Tasker[T]) CreateTask ¶
CreateTask 创建任务并触发 created 事件
func (*Tasker[T]) EnsureStreamsAndGroup ¶
EnsureStreamsAndGroup 确保各阶段事件流与消费组存在
func (*Tasker[T]) ReconcileStartup ¶
ReconcileStartup 启动期对账扫描
func (*Tasker[T]) SetTaskTTL ¶
SetTaskTTL 设置任务过期时间
func (*Tasker[T]) StartGroupConsumers ¶
func (t *Tasker[T]) StartGroupConsumers(ctx context.Context, group, consumer string, block time.Duration, count int64) error
StartGroupConsumers 启动消费者组阻塞消费
func (*Tasker[T]) Transition ¶
Transition 阶段转换
func (*Tasker[T]) TransitionWithUpdate ¶
func (t *Tasker[T]) TransitionWithUpdate(ctx context.Context, id string, to Phase, update TaskUpdateFunc[T]) error
TransitionWithUpdate 阶段转换并更新任务数据
Click to show internal directories.
Click to hide internal directories.