Documentation
¶
Index ¶
- Constants
- Variables
- func FakeUniqueID() string
- func IFaceToString(value interface{}) string
- type Config
- type DefaultTaskSetting
- type DefaultTaskSettingWithoutTimeout
- type FailedJobHandler
- type JobIFace
- type JobMemory
- func (job *JobMemory) Attempts() (attempt int64)
- func (job *JobMemory) Delete() (err error)
- func (job *JobMemory) Failed(err error)
- func (job *JobMemory) GetName() (queueName string)
- func (job *JobMemory) HasFailed() (hasFail bool)
- func (job *JobMemory) IsDeleted() (deleted bool)
- func (job *JobMemory) IsReleased() (released bool)
- func (job *JobMemory) MarkAsFailed()
- func (job *JobMemory) Payload() (payload *Payload)
- func (job *JobMemory) PopTime() (time time.Time)
- func (job *JobMemory) Queue() (queue QueueIFace)
- func (job *JobMemory) Release(delay int64) (err error)
- func (job *JobMemory) Timeout() (time time.Duration)
- func (job *JobMemory) TimeoutAt() (time time.Time)
- type JobMySQL
- func (job *JobMySQL) Attempts() (attempt int64)
- func (job *JobMySQL) Delete() (err error)
- func (job *JobMySQL) Failed(err error)
- func (job *JobMySQL) GetName() (queueName string)
- func (job *JobMySQL) HasFailed() (hasFail bool)
- func (job *JobMySQL) IsDeleted() (deleted bool)
- func (job *JobMySQL) IsReleased() (released bool)
- func (job *JobMySQL) MarkAsFailed()
- func (job *JobMySQL) Payload() (payload *Payload)
- func (job *JobMySQL) PopTime() (time time.Time)
- func (job *JobMySQL) Queue() (queue QueueIFace)
- func (job *JobMySQL) Release(delay int64) (err error)
- func (job *JobMySQL) Timeout() (time time.Duration)
- func (job *JobMySQL) TimeoutAt() (time time.Time)
- type JobRedis
- func (job *JobRedis) Attempts() (attempt int64)
- func (job *JobRedis) Delete() (err error)
- func (job *JobRedis) Failed(err error)
- func (job *JobRedis) GetName() (queueName string)
- func (job *JobRedis) HasFailed() (hasFail bool)
- func (job *JobRedis) IsDeleted() (deleted bool)
- func (job *JobRedis) IsReleased() (released bool)
- func (job *JobRedis) MarkAsFailed()
- func (job *JobRedis) Payload() (payload *Payload)
- func (job *JobRedis) PopTime() (time time.Time)
- func (job *JobRedis) Queue() (queue QueueIFace)
- func (job *JobRedis) Release(delay int64) (err error)
- func (job *JobRedis) Timeout() (time time.Duration)
- func (job *JobRedis) TimeoutAt() (time time.Time)
- type JobStatistics
- type Logger
- type MemoryStatistics
- type Payload
- type Queue
- func (q *Queue) AutoScaleWorkers() error
- func (q *Queue) Bootstrap(tasks []TaskIFace) error
- func (q *Queue) BootstrapOne(task TaskIFace) error
- func (q *Queue) Delay(task TaskIFace, payload interface{}, duration time.Duration) error
- func (q *Queue) DelayAt(task TaskIFace, payload interface{}, delay time.Time) error
- func (q *Queue) DelayAtByName(name string, payload interface{}, delay time.Time) error
- func (q *Queue) DelayByName(name string, payload interface{}, duration time.Duration) error
- func (q *Queue) Dispatch(task TaskIFace, payload interface{}) error
- func (q *Queue) DispatchByName(name string, payload interface{}) error
- func (q *Queue) GetStatistics() Statistics
- func (q *Queue) SetAllowTasks(taskNames ...string)
- func (q *Queue) SetExcludeTasks(taskNames ...string)
- func (q *Queue) SetFailedJobHandler(failedJobHandler FailedJobHandler)
- func (q *Queue) ShutDown(ctx context.Context) error
- func (q *Queue) Size(task TaskIFace) int64
- func (q *Queue) Start() error
- type QueueIFace
- type RawBody
- type Statistics
- type TaskIFace
- type WorkerStatistics
Constants ¶
const ( DefaultMaxExecuteDuration = 900 * time.Second // job任务执行时长极限预警值:15分钟 DefaultMaxTries = 2 // 默认最大重试次数:2次<发版出现异常时兜底> DefaultRetryInterval = 60 // 默认下次任务重试间隔:1分钟<即可多次执行任务失败后下一次尝试是在60秒后> DefaultMaxConcurrency = 3 // 默认单个task最大并发数 DefaultAutoScaleInterval = 5 * time.Minute // 默认自动扩缩容监测时长间隔 DefaultAutoScaleJobThreshold = 1000 // 默认自动扩容job堆积数阈值 )
定义常量
const ( Redis = "redis" Memory = "memory" MySQL = "mysql" )
queue队列支持的底层驱动名称常量 后续扩充mq、sqs、db等在此添加常量并实现 QueueIFace 接口予以关联
Variables ¶
var ( // ErrQueueClosed 队列处于优雅关闭或关闭状态错误 ErrQueueClosed = errors.New("queue.error.queue.closed") // ErrMaxAttemptsExceeded 尝试执行次数超限 ErrMaxAttemptsExceeded = errors.New("queue.max.execute.attempts") // ErrAbortForWaitingPrevJobFinish 等待上一次任务执行结束退出 ErrAbortForWaitingPrevJobFinish = errors.New("queue.abort.for.waiting.prev.job.finish") )
Functions ¶
func FakeUniqueID ¶
func FakeUniqueID() string
FakeUniqueID 生成一个V4版本的uuid字符串,生成失败返回时间戳纳秒 UUID单机足以保障唯一,生成失败场景下纳秒时间戳也可以一定程度上保障单机唯一
Types ¶
type Config ¶
type Config struct {
// MaxConcurrency 单个task的最大并发处理数量,默认为3
// 最大worker数 = 可运行的task数 * MaxConcurrency + 1
MaxConcurrency uint8
// TablePrefix 当使用MySQL驱动时数据表的前缀,默认为空
TablePrefix string
// AutoScale 是否开启自动扩缩容,默认不开启
AutoScale bool
// AutoScaleInterval 开启自动扩缩容时,自动监测是否需扩容数间隔时间间隔
// 默认 5 * time.Minute
AutoScaleInterval time.Duration
// AutoScaleJobThreshold 开启自动扩缩容时
// 当个task累计堆积job数大于等于该值时触发扩容,默认值:1000
// 自动扩缩容主要在于扩容worker,指标是job堆积数量(等待执行的job数据)
// 而自动缩容则是当堆积job小于等于可运行的task数时自动降低worker数至
// 扩容worker最大值为:最大worker数,参照 MaxConcurrency 的说明
AutoScaleJobThreshold int64
}
Config 队列配置
type DefaultTaskSetting ¶
type DefaultTaskSetting struct{}
DefaultTaskSetting 默认task设置struct:实现默认的最大尝试次数、尝试间隔时长、最大执行时长
func (*DefaultTaskSetting) MaxTries ¶
func (task *DefaultTaskSetting) MaxTries() int64
MaxTries 默认最大尝试次数1,即投递的任务仅执行1次
func (*DefaultTaskSetting) RetryInterval ¶
func (task *DefaultTaskSetting) RetryInterval() int64
RetryInterval 当任务执行失败后再次尝试执行的间隔时长,默认立即重试,即间隔时长为0秒
func (*DefaultTaskSetting) Timeout ¶
func (task *DefaultTaskSetting) Timeout() time.Duration
Timeout 任务最大执行超时时长:默认超时时长为900秒
type DefaultTaskSettingWithoutTimeout ¶
type DefaultTaskSettingWithoutTimeout struct{}
DefaultTaskSettingWithoutTimeout 默认task设置struct:实现默认的最大尝试次数、尝试间隔时长、最大执行时长
func (*DefaultTaskSettingWithoutTimeout) MaxTries ¶
func (task *DefaultTaskSettingWithoutTimeout) MaxTries() int64
MaxTries 默认最大尝试次数1,即投递的任务仅执行1次
func (*DefaultTaskSettingWithoutTimeout) RetryInterval ¶
func (task *DefaultTaskSettingWithoutTimeout) RetryInterval() int64
RetryInterval 当任务执行失败后再次尝试执行的间隔时长,默认立即重试,即间隔时长为0秒
type FailedJobHandler ¶
FailedJobHandler 失败任务记录|处理回调方法 @param *Payload 失败job的对象信息 @param error job任务失败的error报错信息
type JobIFace ¶
type JobIFace interface {
Release(delay int64) (err error) // 释放任务:将任务重新放入队列
Delete() (err error) // 删除任务:任务不再执行
IsDeleted() (deleted bool) // 检查任务是否已删除
IsReleased() (released bool) // 检查任务是否已释放
Attempts() (attempt int64) // 获取任务已尝试执行过的次数
PopTime() (time time.Time) // 获取任务首次被pop取出的时刻
Timeout() (time time.Duration) // 任务超时时长
TimeoutAt() (time time.Time) // 任务执行超时的时刻
HasFailed() (hasFail bool) // 检测当前job任务执行是否出现了错误
MarkAsFailed() // 设置当前job任务执行出现了错误
Failed(err error) // 设置任务执行失败
Queue() (queue QueueIFace) // 获取job任务所属队列queue句柄
GetName() (queueName string) // 获取job所属队列名称
Payload() (payload *Payload) // 获取任务执行参数payload
}
JobIFace 基于不同技术栈的队列任务Job实现契约
type JobMemory ¶
type JobMemory struct {
// contains filtered or unexported fields
}
func (*JobMemory) IsReleased ¶
func (*JobMemory) MarkAsFailed ¶
func (job *JobMemory) MarkAsFailed()
func (*JobMemory) Queue ¶
func (job *JobMemory) Queue() (queue QueueIFace)
type JobMySQL ¶
type JobMySQL struct {
// contains filtered or unexported fields
}
func (*JobMySQL) IsReleased ¶
func (*JobMySQL) MarkAsFailed ¶
func (job *JobMySQL) MarkAsFailed()
func (*JobMySQL) Queue ¶
func (job *JobMySQL) Queue() (queue QueueIFace)
type JobRedis ¶
type JobRedis struct {
// contains filtered or unexported fields
}
func (*JobRedis) IsReleased ¶
func (*JobRedis) MarkAsFailed ¶
func (job *JobRedis) MarkAsFailed()
func (*JobRedis) Queue ¶
func (job *JobRedis) Queue() (queue QueueIFace)
type JobStatistics ¶
type JobStatistics struct {
TotalJobs int64 `json:"total_jobs"` // 待消费的job总数
JobsStatistics map[string]int64 `json:"jobs_statistics"` // job和待消费数map
}
JobStatistics job任务统计结构
type Logger ¶
type Logger interface {
// Debug debug级别输出的日志
// - msg 日志消息文本描述
// - keyValue 按顺序一个key一个value,len(keyValue)一定是偶数<注意0也是偶数>
Debug(msg string, keyValue ...any)
// Info info级别输出的日志
// - msg 日志消息文本描述
// - keyValue 按顺序一个key一个value,len(keyValue)一定是偶数<注意0也是偶数>
Info(msg string, keyValue ...any)
// Warn warn级别输出的日志
// - msg 日志消息文本描述
// - keyValue 按顺序一个key一个value,len(keyValue)一定是偶数<注意0也是偶数>
Warn(msg string, keyValue ...any)
// Error error级别输出的日志
// - msg 日志消息文本描述
// - keyValue 按顺序一个key一个value,len(keyValue)一定是偶数<注意0也是偶数>
Error(msg string, keyValue ...any)
}
Logger 日志接口定义
type MemoryStatistics ¶
type MemoryStatistics struct {
SysMemoryTotal uint64 `json:"sys_memory_total"` // 系统内存总数(单位:字节)
SysMemoryUsed uint64 `json:"sys_memory_used"` // 系统内存已使用数(单位:字节)
SysMemoryAvailable uint64 `json:"sys_memory_available"` // 系统内存可供申请使用的内存数(单位:字节)
SysMemoryUsedPercent float64 `json:"sys_memory_used_percent"` // 系统内存已使用比例(0~1之间的小数)
GoMemoryTotal uint64 `json:"go_memory_total"` // go程序从系统申请的内存总数(单位:字节)
GoMemoryAlloc uint64 `json:"go_memory_alloc"` // go程序当前正在使用的内存数(单位:字节)
GoMemoryUsedPercent float64 `json:"go_memory_used_percent"` // go程序申请的内存与系统总内存的比例(0~1之间的小数)
}
MemoryStatistics 内存统计
type Payload ¶
type Payload struct {
Name string `json:"Name"` // 队列名称
ID string `json:"ID"` // 任务ID
MaxTries int64 `json:"MaxTries"` // 任务最大尝试次数,默认1
RetryInterval int64 `json:"RetryInterval"` // 当任务最大允许尝试次数大于0时,下次尝试之前的间隔时长,单位:秒
Attempts int64 `json:"Attempts"` // 任务已被尝试执行的的次数
Payload []byte `json:"Payload"` // 任务参数比特字面量,可decode成具体job被execute时的类型
PopTime int64 `json:"PopTime"` // 任务首次被取出执行的时间戳,取出的时候才去设置
Timeout int64 `json:"Timeout"` // 任务最大执行超时时长,单位:秒
TimeoutAt int64 `json:"TimeoutAt"` // 任务超时时刻时间戳,被执行时刻才会去设置
}
Payload 存储于队列中的job任务结构
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue 队列struct
func New ¶
New 初始化一个队列
@param driver 队列实现底层驱动,可选值见上方14行附近位置的常量
@param conn driver对应底层驱动连接器句柄,具体类型参考 QueueIFace 实体类
@param logger 实现 Logger 接口的结构体实例的指针对象
@param config 配置
func (*Queue) AutoScaleWorkers ¶
AutoScaleWorkers 自动扩缩容消费进程的Worker 内部自动依据待消费job和内存情况决定是扩容还是缩容 注意:只有当前进程是消费者进程(即调用了Start方法的)才能扩容
func (*Queue) DelayAtByName ¶
DelayAtByName 按任务name投递一个延迟队列Job任务
- 投递一个异步延迟执行的任务
- 重要提示:使用该方法则意味着投递任务之前必须bootstrap任务类,新项目请尽量使用DelayAt方法
func (*Queue) DelayByName ¶
DelayByName 按任务name投递一个将来时刻执行的延迟队列Job任务
- 投递一个异步延迟执行的任务
- 重要提示:使用该方法则意味着投递任务之前必须bootstrap任务类,新项目请尽量使用Delay方法
func (*Queue) DispatchByName ¶
DispatchByName 按任务name投递一个队列Job任务
- 投递一个异步立即执行的任务
- 重要:使用该方法则意味着投递任务之前必须bootstrap任务类,新项目请尽量使用Dispatch方法
func (*Queue) GetStatistics ¶
func (q *Queue) GetStatistics() Statistics
GetStatistics 获取统计信息,请勿频繁调用,底层统计内存会STW
- 返回包含当前worker数、最大worker数、活跃worker数、内存使用情况等信息
func (*Queue) SetAllowTasks ¶
SetAllowTasks 指定可以运行的任务
func (*Queue) SetExcludeTasks ¶
SetExcludeTasks 指定不可运行的任务
func (*Queue) SetFailedJobHandler ¶
func (q *Queue) SetFailedJobHandler(failedJobHandler FailedJobHandler)
SetFailedJobHandler 设置失败任务的收尾处理器 1、尝试了指定的最大尝试次数后仍然失败的任务善后方法 2、此时通过此处设置的处理器可记录到底哪个任务失败了以及失败任务的payload参数情况 3、以及后续的重试等逻辑等
type QueueIFace ¶
type QueueIFace interface {
// Size 获取当前队列长度方法
// @param queue 队列的名称
Size(queue string) (size int64)
// Push 投递一条任务到队列方法
// @param queue 队列的名称
// @param payload 投递进队列的参数负载
Push(queue string, payload interface{}) (err error)
// Later 投递一条指定延长时长的延迟任务到队列的方法
// @param queue 延迟队列的名称
// @param durationTo 相对于投递任务时刻延迟的时长
// @param payload 投递进队列的多个参数负载
Later(queue string, durationTo time.Duration, payload interface{}) (err error)
// LaterAt 投递一条指定执行时间的延迟任务到队列的方法
// @param queue 延迟队列的名称
// @param timeAt 延迟执行的时刻
// @param payload 投递进队列的多个参数负载
LaterAt(queue string, timeAt time.Time, payload interface{}) (err error)
// Pop 从队尾取出一条任务的方法
// @param queue 队列的名称
Pop(queue string) (job JobIFace, exist bool)
// SetConnection 设置队列底层连接器
// @param connection 底层连接器实例
SetConnection(connection interface{}) (err error)
// GetConnection 获取队列底层连接器
GetConnection() (connection interface{}, err error)
}
QueueIFace 基于不同技术栈的队列实现契约
type RawBody ¶
type RawBody struct {
ID string // 队列内部唯一标识符ID
// contains filtered or unexported fields
}
RawBody 队列execute执行时传递给执行方法的参数Raw结构:job任务参数的包装器
- ID 内部标记队列任务的唯一ID,使用UUID生成
type Statistics ¶
type Statistics struct {
StatisticsTime int64 `json:"statistics_time"` // 统计时间戳
MemoryStatistics MemoryStatistics `json:"memory_statistics"` // 内存情况统计
WorkerStatistics WorkerStatistics `json:"worker_statistics"` // worker情况统计
JobStatistics JobStatistics `json:"job_statistics"` // job情况统计
}
Statistics 统计信息
type TaskIFace ¶
type TaskIFace interface {
MaxTries() int64 // 定义队列任务最大尝试次数:任务执行的最大尝试次数
RetryInterval() int64 // 定义队列任务最大尝试间隔:当任务执行失败后再次尝试执行的间隔时长,单位:秒
Timeout() time.Duration // 定义队列超时方法:返回超时时长
Name() string // 定义队列名称方法:返回队列名称
Execute(ctx context.Context, job *RawBody) error // 定义队列任务执行时的方法:执行成功返回nil,执行失败返回error
Remark() string // 队列任务說明
}
TaskIFace 定义队列Job任务执行逻辑的契约(队列任务执行类)