worker

package
v1.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 19, 2025 License: LGPL-3.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrUuidNil                       = fmt.Errorf("uuid is empty")
	ErrRedisNil                      = fmt.Errorf("redis is empty")
	ErrRedisInvalid                  = fmt.Errorf("redis is invalid")
	ErrExprInvalid                   = fmt.Errorf("expr is invalid")
	ErrSaveCron                      = fmt.Errorf("save cron failed")
	ErrHttpCallbackInvalidStatusCode = fmt.Errorf("http callback invalid status code")
)

Functions

func CallMethod

func CallMethod(task *tasks.TaskJob) (err error)

CallMethod 调用任务的实际方法

func UnmarshalTask

func UnmarshalTask(data []byte) (*tasks.TaskJob, error)

UnmarshalTask 解析TaskJob,使用自定义JSON解析来处理类型问题

func WithCallback

func WithCallback(s string) func(*Options)

WithCallback 设置任务完成后的回调地址

func WithClearArchived

func WithClearArchived(second int) func(*Options)

WithClearArchived 清除已存档任务的间隔,默认为 300 秒

func WithGroup

func WithGroup(s string) func(*Options)

WithGroup 设置任务处理器的组名

func WithHandler

func WithHandler(fun func(ctx context.Context, p Payload) error) func(*Options)

WithHandler 设置任务的回调处理器

func WithHandlerNeedWorker

func WithHandlerNeedWorker(fun func(worker Worker, ctx context.Context, p Payload) error) func(*Options)

WithHandlerNeedWorker 设置需要Worker参数的任务处理函数

func WithMaxRetry

func WithMaxRetry(count int) func(*Options)

WithMaxRetry 任务出错时的最大重试次数,默认为 3

func WithRedisPeriodKey

func WithRedisPeriodKey(s string) func(*Options)

WithRedisPeriodKey 设置redis周期任务key

func WithRedisUri

func WithRedisUri(s string) func(*Options)

WithRedisUri 设置redis连接地址,默认值redis://127.0.0.1:6379/0

func WithRetention

func WithRetention(second int) func(*Options)

WithRetention 成功任务存储时间,默认 60 秒,如果提供此选项,任务将在成功处理后作为已完成任务存储

func WithRunAt

func WithRunAt(at time.Time) func(*RunOptions)

WithRunAt 运行任务的时间

func WithRunCtx

func WithRunCtx(ctx context.Context) func(*RunOptions)

WithRunCtx 任务上下文

func WithRunExpr

func WithRunExpr(s string) func(*RunOptions)

WithRunExpr Cron表达式, 最小单位1分钟, 参见gorhill/cronexpr

func WithRunGroup

func WithRunGroup(s string) func(*RunOptions)

WithRunGroup 组前缀,默认组

func WithRunIn

func WithRunIn(in time.Duration) func(*RunOptions)

WithRunIn 任务延迟执行,在xxx秒内运行

func WithRunMaxRetry

func WithRunMaxRetry(count int) func(*RunOptions)

WithRunMaxRetry 最大重试次数, 任务回调发生error会重试,默认3次

func WithRunNow

func WithRunNow(flag bool) func(*RunOptions)

WithRunNow 立即运行任务

func WithRunPayload

func WithRunPayload(s []byte) func(*RunOptions)

WithRunPayload 任务负载,任务回调会使用

func WithRunReplace

func WithRunReplace(flag bool) func(*RunOptions)

WithRunReplace 当uid重复时,删除旧的并创建新的

func WithRunRetention

func WithRunRetention(second int) func(*RunOptions)

WithRunRetention 任务过期时间,默认60秒

func WithRunTimeout

func WithRunTimeout(second int) func(*RunOptions)

WithRunTimeout 任务超时,默认60秒

func WithRunUuid

func WithRunUuid(s string) func(*RunOptions)

WithRunUuid 任务唯一id

func WithTimeout

func WithTimeout(second int) func(*Options)

WithTimeout 任务超时时间,默认为 10 秒

Types

type Options

type Options struct {
	// contains filtered or unexported fields
}

type Payload

type Payload struct {
	Group   string `json:"group"`
	Uid     string `json:"uid"`
	Payload []byte `json:"payload"`
}

func (Payload) String

func (p Payload) String() (str string)

type Process

type Process interface {
	GetTopic() string                                  // 获取消费主题
	Handle(ctx context.Context, p Payload) (err error) // 处理过程的方法
}

Process 任务具体处理过程接口,实现该接口即可加入到任务队列中

type RunOptions

type RunOptions struct {
	// contains filtered or unexported fields
}

type Scheduled

type Scheduled struct {
	// contains filtered or unexported fields
}

Scheduled 任务调度器

func RegisterProcess

func RegisterProcess(p Process) (s *Scheduled)

func (*Scheduled) Cron

func (s *Scheduled) Cron(ctx context.Context, topic, cronExpr string, data []byte) (err error)

Cron 采用定时任务的方式执行任务

func (*Scheduled) Push

func (s *Scheduled) Push(ctx context.Context, topic string, data []byte, timeout int) (err error)

Push 采用消息队列的方式执行任务

type Tasks

type Tasks struct {
	// contains filtered or unexported fields
}

func NewTasks

func NewTasks() (tk *Tasks, err error)

func TasksInstance

func TasksInstance() *Tasks

func (Tasks) CheckFuncName

func (tk Tasks) CheckFuncName(funcName string) (exists bool)

CheckFuncName 检查方法名是否存在

func (Tasks) Cron

func (tk Tasks) Cron(options ...func(*RunOptions)) error

Cron 注册一个任务以由cron表达式运行。

func (Tasks) GetTaskJobNameList

func (tk Tasks) GetTaskJobNameList() (res map[string]string)

GetTaskJobNameList 获取任务可用方法名列表。

func (Tasks) Once

func (tk Tasks) Once(options ...func(*RunOptions)) error

Once 注册一个任务以运行一次。

func (Tasks) ParseParameters

func (tk Tasks) ParseParameters(parseData string) (params []interface{}, err error)

ParseParameters 解析参数

func (Tasks) Remove

func (tk Tasks) Remove(ctx context.Context, uid string) error

Remove 从任务队列中删除一个任务。

type Worker

type Worker struct {
	Error error
	// contains filtered or unexported fields
}

func New

func New(options ...func(*Options)) (tk *Worker)

New 创建一个新的任务处理器

func (Worker) Cron

func (wk Worker) Cron(options ...func(*RunOptions)) (err error)

Cron 设置周期性任务

func (Worker) Once

func (wk Worker) Once(options ...func(*RunOptions)) (err error)

func (Worker) Remove

func (wk Worker) Remove(ctx context.Context, uid string) (err error)

Remove 移除任务

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL