tasker

package
v0.0.0-...-7086fdf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2026 License: AGPL-3.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTaskExists        = errors.New("task already exists")
	ErrTaskNotFound      = errors.New("task not found")
	ErrInvalidTransition = errors.New("invalid state transition")
)

Functions

This section is empty.

Types

type EventHandler

type EventHandler[T any] func(ctx context.Context, task Task[T])

EventHandler 事件处理器

type Opt

type Opt[T any] func(t *Tasker[T])

Opt 配置选项

func WithLogger

func WithLogger[T any](logger *slog.Logger) Opt[T]

WithLogger 设置日志

func WithPrefix

func WithPrefix[T any](prefix string) Opt[T]

WithPrefix 设置 Redis key 前缀

func WithStreamTrimEnabled

func WithStreamTrimEnabled[T any](enabled bool) Opt[T]

WithStreamTrimEnabled 设置是否启用 stream 裁剪

func WithStreamTrimInterval

func WithStreamTrimInterval[T any](interval time.Duration) Opt[T]

WithStreamTrimInterval 设置 stream 裁剪间隔

func WithStreamTrimWindow

func WithStreamTrimWindow[T any](window time.Duration) Opt[T]

WithStreamTrimWindow 设置 stream 裁剪窗口

type Phase

type Phase string

Phase 任务阶段

const (
	PhaseCreated  Phase = "created"
	PhaseStarted  Phase = "started"
	PhaseRunning  Phase = "running"
	PhaseFailed   Phase = "failed"
	PhaseFinished Phase = "finished"
)

type Task

type Task[T any] struct {
	ID        string    `json:"id"`
	Phase     Phase     `json:"phase"`
	Payload   T         `json:"payload,omitempty"`
	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`
}

Task 泛型任务

type TaskUpdateFunc

type TaskUpdateFunc[T any] func(task *Task[T]) error

TaskUpdateFunc 任务更新函数

type Tasker

type Tasker[T any] struct {
	// contains filtered or unexported fields
}

Tasker 泛型 Redis 任务状态机

func NewTasker

func NewTasker[T any](redis *redis.Client, opts ...Opt[T]) *Tasker[T]

NewTasker 创建任务状态机

func (*Tasker[T]) ClaimStalePending

func (t *Tasker[T]) ClaimStalePending(ctx context.Context, phase Phase, group, consumer string, minIdle time.Duration, count int64) error

ClaimStalePending 认领陈旧的待处理消息

func (*Tasker[T]) CleanupTask

func (t *Tasker[T]) CleanupTask(ctx context.Context, id string, _ Phase) error

CleanupTask 清理任务

func (*Tasker[T]) CreateTask

func (t *Tasker[T]) CreateTask(ctx context.Context, id string, payload T) error

CreateTask 创建任务并触发 created 事件

func (*Tasker[T]) EnsureStreamsAndGroup

func (t *Tasker[T]) EnsureStreamsAndGroup(ctx context.Context, group string) error

EnsureStreamsAndGroup 确保各阶段事件流与消费组存在

func (*Tasker[T]) GetTask

func (t *Tasker[T]) GetTask(ctx context.Context, id string) (Task[T], error)

GetTask 获取任务

func (*Tasker[T]) On

func (t *Tasker[T]) On(phase Phase, handler EventHandler[T])

On 注册阶段事件处理器

func (*Tasker[T]) ReconcileStartup

func (t *Tasker[T]) ReconcileStartup(ctx context.Context) error

ReconcileStartup 启动期对账扫描

func (*Tasker[T]) SetTaskTTL

func (t *Tasker[T]) SetTaskTTL(ctx context.Context, id string, ttl time.Duration) error

SetTaskTTL 设置任务过期时间

func (*Tasker[T]) StartGroupConsumers

func (t *Tasker[T]) StartGroupConsumers(ctx context.Context, group, consumer string, block time.Duration, count int64) error

StartGroupConsumers 启动消费者组阻塞消费

func (*Tasker[T]) Transition

func (t *Tasker[T]) Transition(ctx context.Context, id string, to Phase) error

Transition 阶段转换

func (*Tasker[T]) TransitionWithUpdate

func (t *Tasker[T]) TransitionWithUpdate(ctx context.Context, id string, to Phase, update TaskUpdateFunc[T]) error

TransitionWithUpdate 阶段转换并更新任务数据

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL