queue

package module
v0.0.0-...-70c877a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 10, 2025 License: GPL-3.0 Imports: 18 Imported by: 8

README

Queue 队列

一、说明

Queue队列为生产 -> 消费模型的简单实现,即:producer -> consumer(worker),一般分为生产端和消费端。

当前已实现以下三种驱动:

  • 开发测试用memory驱动
  • 可用于生产的redis类型驱动
  • 可用于生产的mysql类型驱动

⚠️ memory类型驱动仅可用于开发调试

由于多个独立进程间内存隔离,以及进程退出后进程所属内存销毁的原因,memory方案在进程退出后未消费的队列数据会丢失,故而仅能用于开发调试环境,且生产端和消费端只能在同一进程。

驱动特性对比

特性 Memory Redis MySQL
持久化
分布式
延迟队列
事务支持
性能
运维复杂度
适用场景 开发测试 高并发生产 一般生产

二、使用示例

完整使用示例查看 example 目录代码结构

step1、实现任务类

任务类即按任务类iface规则实现的结构体,也是队列投递任务和实际执行任务的单元。

package tasks

import (
    "fmt"
    "github.com/jjonline/go-mod-librar/queue"
)

// 定义的任务类struct,需完整实现 queue.TaskIFace
type TestTask struct {
    // 单个job最大执行时长、最大重试次数、多次重试之间间隔时长等设置
    // 这里使用默认设置,若需要自定义参数,自定义方法实现即可
    queue.DefaultTaskSetting
}

func (t TestTask) Name() string {
    return "test_task"
}

func (t TestTask) Execute(ctx context.Context, job *queue.RawBody) error {
    // 队列实际执行的入口方法,请注意处理 context.Context 内部用于超时控制 
    fmt.Println(job.ID)
    return nil
}
step2、消费者端注册启动
// 初始化队列Queue对象,生产者、消费者均通过该对象操作
// 重要:生产者、消费者均需要实例化
service := queue.New(
    queue.Redis, // 队列底层驱动器类型,支持:queue.Memory, queue.Redis, queue.MySQL
    redisClient, // 队列底层驱动client实例,Redis用*redis.Client,MySQL用*sql.DB
    logger, // 实现 queue.Logger 接口的日志实例,用于记录日志
    5, // 单个队列最大并发消费协程数
)

// 注册单个任务类
_ = service.BootstrapOne(&tasks.TestTask{})

// 也可以这样批量注册任务类
// _ = service.Bootstrap([]*queue.TaskIFace)

// 启动消费端进程,注意传递上下文context用于控制进程优雅控制
idleCloser := make(chan struct{})

// 接收退出信号
quitChan := make(chan os.Signal)
signal.Notify(
    quitChan,
    syscall.SIGINT,  // 用户发送INTR字符(Ctrl+C)触发
    syscall.SIGTERM, // 结束程序
    syscall.SIGHUP,  // 终端控制进程结束(终端连接断开)
    syscall.SIGQUIT, // 用户发送QUIT字符(Ctrl+/)触发
)

go func() {
    // wait exit signal
    <-quitChan

    logger.Info("receive exit signal")

    // shutdown worker daemon with timeout context
    timeoutCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    
    // graceful shutdown by signal
    if err := queueService.ShutDown(timeoutCtx); nil != err {
        logger.Warn("violence shutdown by signal: " + err.Error())
    } else {
        logger.Info("graceful shutdown by signal")
    }

    // closer close
    close(idleCloser)
}()

// start worker daemon
if err := queueService.Start(); nil != err && err != queue.ErrQueueClosed {
    logger.Info("queue started failed: " + err.Error())
    close(idleCloser)
} else {
    logger.Info("queue worker started")
}

<-idleCloser
logger.Info("queue worker quit, daemon exited")
step3、生产者端投递job任务
// 初始化队列Queue对象,生产者、消费者均通过该对象操作
// 生产者&&消费者处于同一进程则可共用,不同进程则需要各自独立实例化
service := queue.New(
    queue.Redis, // 队列底层驱动器类型,支持:queue.Memory, queue.Redis, queue.MySQL
    redisClient, // 队列底层驱动client实例,Redis用*redis.Client,MySQL用*sql.DB
    logger, // 实现 queue.Logger 接口的日志实例,用于记录日志
)

// 单个任务类:若生产者端和消费者端分处不同进程,生产者端任务类也需要执行注册

// 投递一条普通队列任务
service.Dispatch(&tasks.TestTask{}, "job执行时的参数")

// 投递一条延迟队列任务(指定执行时刻)
// 指定执行时刻,如果时刻是过去则立即执行
service.DelayAt(&tasks.TestTask{}, "job执行时的参数", time.Time类型的延迟到将来时刻)

// 投递一条延迟队列任务(指定相对于当前的延迟时长)
// 指定相对于投递时刻需要延迟的时长
service.Delay(&tasks.TestTask{}, "job执行时的参数", time.Duration类型的时长)

四、重试次数 & 重试间隔 & 超时

队列保证每个job至少能被执行1次

3.1、重试次数

任务类定义实现的 MaxTries() int64 方法指定单个job能被重试的次数

注意:返回值若小于等于1则仅被执行1次

执行任务类失败或异常会触发重试

3.2、重试间隔

当任务类允许多次重试时,下一次重试可以并不是立即执行,通过RetryInterval() int64方法设置重试之前的等待时长间隔,单位:秒

注意:返回值若小于等于0则取值0表示立即重试

重试间隔 是配合 重试次数 起作用的,仅可多次重试的任务有效

3.3、超时

因goroutine无法从外部kill掉,超时控制通过context.Context上下文实现,需任务类自主实现超时控制的退出机制!

任务类通过嵌入DefaultTaskSetting则设置的最大超时时长为900秒,可通过任务类Timeout方法自定义超时时间。

3.4、约定
  1. 重试次数若小于等于1则取值1
  2. 重试间隔若小于等于0则取值0,0表示没有重试间隔
  3. 任务执行成功:Execute(job *RawBody) error返回nil
  4. 任务执行失败:Execute(job *RawBody) error返回error
  5. 任务执行异常:Execute(job *RawBody) error发生了panic
  • 提供有默认设置最大超时时间、最大重试次数、重试间隔的可嵌入结构体 queue.DefaultTaskSetting
  • 提供有默认设置最大重试次数、重试间隔而不设置超时时间可自定义超时的可嵌入结构体 queue.DefaultTaskSettingWithoutTimeout
  • 当然你也可以完全自定义任务类而不嵌入任何默认构件结构体

Documentation

Index

Constants

View Source
const (
	DefaultMaxExecuteDuration    = 900 * time.Second // job任务执行时长极限预警值:15分钟
	DefaultMaxTries              = 2                 // 默认最大重试次数:2次<发版出现异常时兜底>
	DefaultRetryInterval         = 60                // 默认下次任务重试间隔:1分钟<即可多次执行任务失败后下一次尝试是在60秒后>
	DefaultMaxConcurrency        = 3                 // 默认单个task最大并发数
	DefaultAutoScaleInterval     = 5 * time.Minute   // 默认自动扩缩容监测时长间隔
	DefaultAutoScaleJobThreshold = 1000              // 默认自动扩容job堆积数阈值
)

定义常量

View Source
const (
	Redis  = "redis"
	Memory = "memory"
	MySQL  = "mysql"
)

queue队列支持的底层驱动名称常量 后续扩充mq、sqs、db等在此添加常量并实现 QueueIFace 接口予以关联

Variables

View Source
var (
	// ErrQueueClosed 队列处于优雅关闭或关闭状态错误
	ErrQueueClosed = errors.New("queue.error.queue.closed")
	// ErrMaxAttemptsExceeded 尝试执行次数超限
	ErrMaxAttemptsExceeded = errors.New("queue.max.execute.attempts")
	// ErrAbortForWaitingPrevJobFinish 等待上一次任务执行结束退出
	ErrAbortForWaitingPrevJobFinish = errors.New("queue.abort.for.waiting.prev.job.finish")
)

Functions

func FakeUniqueID

func FakeUniqueID() string

FakeUniqueID 生成一个V4版本的uuid字符串,生成失败返回时间戳纳秒 UUID单机足以保障唯一,生成失败场景下纳秒时间戳也可以一定程度上保障单机唯一

func IFaceToString

func IFaceToString(value interface{}) string

IFaceToString interface类型转string

Types

type Config

type Config struct {
	// MaxConcurrency 单个task的最大并发处理数量,默认为3
	// 最大worker数 = 可运行的task数 * MaxConcurrency + 1
	MaxConcurrency uint8
	// TablePrefix 当使用MySQL驱动时数据表的前缀,默认为空
	TablePrefix string
	// AutoScale 是否开启自动扩缩容,默认不开启
	AutoScale bool
	// AutoScaleInterval 开启自动扩缩容时,自动监测是否需扩容数间隔时间间隔
	// 默认 5 * time.Minute
	AutoScaleInterval time.Duration
	// AutoScaleJobThreshold 开启自动扩缩容时
	// 当个task累计堆积job数大于等于该值时触发扩容,默认值:1000
	// 自动扩缩容主要在于扩容worker,指标是job堆积数量(等待执行的job数据)
	// 而自动缩容则是当堆积job小于等于可运行的task数时自动降低worker数至
	// 扩容worker最大值为:最大worker数,参照 MaxConcurrency 的说明
	AutoScaleJobThreshold int64
}

Config 队列配置

type DefaultTaskSetting

type DefaultTaskSetting struct{}

DefaultTaskSetting 默认task设置struct:实现默认的最大尝试次数、尝试间隔时长、最大执行时长

func (*DefaultTaskSetting) MaxTries

func (task *DefaultTaskSetting) MaxTries() int64

MaxTries 默认最大尝试次数1,即投递的任务仅执行1次

func (*DefaultTaskSetting) RetryInterval

func (task *DefaultTaskSetting) RetryInterval() int64

RetryInterval 当任务执行失败后再次尝试执行的间隔时长,默认立即重试,即间隔时长为0秒

func (*DefaultTaskSetting) Timeout

func (task *DefaultTaskSetting) Timeout() time.Duration

Timeout 任务最大执行超时时长:默认超时时长为900秒

type DefaultTaskSettingWithoutTimeout

type DefaultTaskSettingWithoutTimeout struct{}

DefaultTaskSettingWithoutTimeout 默认task设置struct:实现默认的最大尝试次数、尝试间隔时长、最大执行时长

func (*DefaultTaskSettingWithoutTimeout) MaxTries

func (task *DefaultTaskSettingWithoutTimeout) MaxTries() int64

MaxTries 默认最大尝试次数1,即投递的任务仅执行1次

func (*DefaultTaskSettingWithoutTimeout) RetryInterval

func (task *DefaultTaskSettingWithoutTimeout) RetryInterval() int64

RetryInterval 当任务执行失败后再次尝试执行的间隔时长,默认立即重试,即间隔时长为0秒

type FailedJobHandler

type FailedJobHandler func(payload *Payload, err error) error

FailedJobHandler 失败任务记录|处理回调方法 @param *Payload 失败job的对象信息 @param error job任务失败的error报错信息

type JobIFace

type JobIFace interface {
	Release(delay int64) (err error) // 释放任务:将任务重新放入队列
	Delete() (err error)             // 删除任务:任务不再执行
	IsDeleted() (deleted bool)       // 检查任务是否已删除
	IsReleased() (released bool)     // 检查任务是否已释放
	Attempts() (attempt int64)       // 获取任务已尝试执行过的次数
	PopTime() (time time.Time)       // 获取任务首次被pop取出的时刻
	Timeout() (time time.Duration)   // 任务超时时长
	TimeoutAt() (time time.Time)     // 任务执行超时的时刻
	HasFailed() (hasFail bool)       // 检测当前job任务执行是否出现了错误
	MarkAsFailed()                   // 设置当前job任务执行出现了错误
	Failed(err error)                // 设置任务执行失败
	Queue() (queue QueueIFace)       // 获取job任务所属队列queue句柄
	GetName() (queueName string)     // 获取job所属队列名称
	Payload() (payload *Payload)     // 获取任务执行参数payload
}

JobIFace 基于不同技术栈的队列任务Job实现契约

type JobMemory

type JobMemory struct {
	// contains filtered or unexported fields
}

func (*JobMemory) Attempts

func (job *JobMemory) Attempts() (attempt int64)

func (*JobMemory) Delete

func (job *JobMemory) Delete() (err error)

func (*JobMemory) Failed

func (job *JobMemory) Failed(err error)

func (*JobMemory) GetName

func (job *JobMemory) GetName() (queueName string)

func (*JobMemory) HasFailed

func (job *JobMemory) HasFailed() (hasFail bool)

func (*JobMemory) IsDeleted

func (job *JobMemory) IsDeleted() (deleted bool)

func (*JobMemory) IsReleased

func (job *JobMemory) IsReleased() (released bool)

func (*JobMemory) MarkAsFailed

func (job *JobMemory) MarkAsFailed()

func (*JobMemory) Payload

func (job *JobMemory) Payload() (payload *Payload)

func (*JobMemory) PopTime

func (job *JobMemory) PopTime() (time time.Time)

func (*JobMemory) Queue

func (job *JobMemory) Queue() (queue QueueIFace)

func (*JobMemory) Release

func (job *JobMemory) Release(delay int64) (err error)

func (*JobMemory) Timeout

func (job *JobMemory) Timeout() (time time.Duration)

Timeout 任务超时时长

func (*JobMemory) TimeoutAt

func (job *JobMemory) TimeoutAt() (time time.Time)

TimeoutAt 任务job执行超时的时刻

type JobMySQL

type JobMySQL struct {
	// contains filtered or unexported fields
}

func (*JobMySQL) Attempts

func (job *JobMySQL) Attempts() (attempt int64)

Attempts 获取当前job已被尝试执行的次数

func (*JobMySQL) Delete

func (job *JobMySQL) Delete() (err error)

Delete 删除任务job:任务不再执行--从数据库删除记录

func (*JobMySQL) Failed

func (job *JobMySQL) Failed(err error)

func (*JobMySQL) GetName

func (job *JobMySQL) GetName() (queueName string)

func (*JobMySQL) HasFailed

func (job *JobMySQL) HasFailed() (hasFail bool)

func (*JobMySQL) IsDeleted

func (job *JobMySQL) IsDeleted() (deleted bool)

func (*JobMySQL) IsReleased

func (job *JobMySQL) IsReleased() (released bool)

func (*JobMySQL) MarkAsFailed

func (job *JobMySQL) MarkAsFailed()

func (*JobMySQL) Payload

func (job *JobMySQL) Payload() (payload *Payload)

func (*JobMySQL) PopTime

func (job *JobMySQL) PopTime() (time time.Time)

PopTime 任务job首次被执行的时刻

func (*JobMySQL) Queue

func (job *JobMySQL) Queue() (queue QueueIFace)

func (*JobMySQL) Release

func (job *JobMySQL) Release(delay int64) (err error)

Release 释放任务job:job重新再试--清除reserved_at标记,设置新的available_at延迟时间

func (*JobMySQL) Timeout

func (job *JobMySQL) Timeout() (time time.Duration)

Timeout 任务超时时长

func (*JobMySQL) TimeoutAt

func (job *JobMySQL) TimeoutAt() (time time.Time)

TimeoutAt 任务job执行超时的时刻

type JobRedis

type JobRedis struct {
	// contains filtered or unexported fields
}

func (*JobRedis) Attempts

func (job *JobRedis) Attempts() (attempt int64)

Attempts 获取当前job已被尝试执行的次数

func (*JobRedis) Delete

func (job *JobRedis) Delete() (err error)

Delete 删除任务job:任务不再执行--从reserved有序集合删除

func (*JobRedis) Failed

func (job *JobRedis) Failed(err error)

func (*JobRedis) GetName

func (job *JobRedis) GetName() (queueName string)

func (*JobRedis) HasFailed

func (job *JobRedis) HasFailed() (hasFail bool)

func (*JobRedis) IsDeleted

func (job *JobRedis) IsDeleted() (deleted bool)

func (*JobRedis) IsReleased

func (job *JobRedis) IsReleased() (released bool)

func (*JobRedis) MarkAsFailed

func (job *JobRedis) MarkAsFailed()

func (*JobRedis) Payload

func (job *JobRedis) Payload() (payload *Payload)

func (*JobRedis) PopTime

func (job *JobRedis) PopTime() (time time.Time)

PopTime 任务job首次被执行的时刻

func (*JobRedis) Queue

func (job *JobRedis) Queue() (queue QueueIFace)

func (*JobRedis) Release

func (job *JobRedis) Release(delay int64) (err error)

Release 释放任务job:job重新再试--从reserved有序集合丢到delayed延迟有序集合

func (*JobRedis) Timeout

func (job *JobRedis) Timeout() (time time.Duration)

Timeout 任务超时时长

func (*JobRedis) TimeoutAt

func (job *JobRedis) TimeoutAt() (time time.Time)

TimeoutAt 任务job执行超时的时刻

type JobStatistics

type JobStatistics struct {
	TotalJobs      int64            `json:"total_jobs"`      // 待消费的job总数
	JobsStatistics map[string]int64 `json:"jobs_statistics"` // job和待消费数map
}

JobStatistics job任务统计结构

type Logger

type Logger interface {
	// Debug debug级别输出的日志
	//   - msg 日志消息文本描述
	//   - keyValue 按顺序一个key一个value,len(keyValue)一定是偶数<注意0也是偶数>
	Debug(msg string, keyValue ...any)
	// Info info级别输出的日志
	//   - msg 日志消息文本描述
	//   - keyValue 按顺序一个key一个value,len(keyValue)一定是偶数<注意0也是偶数>
	Info(msg string, keyValue ...any)
	// Warn warn级别输出的日志
	//   - msg 日志消息文本描述
	//   - keyValue 按顺序一个key一个value,len(keyValue)一定是偶数<注意0也是偶数>
	Warn(msg string, keyValue ...any)
	// Error error级别输出的日志
	//   - msg 日志消息文本描述
	//   - keyValue 按顺序一个key一个value,len(keyValue)一定是偶数<注意0也是偶数>
	Error(msg string, keyValue ...any)
}

Logger 日志接口定义

type MemoryStatistics

type MemoryStatistics struct {
	SysMemoryTotal       uint64  `json:"sys_memory_total"`        // 系统内存总数(单位:字节)
	SysMemoryUsed        uint64  `json:"sys_memory_used"`         // 系统内存已使用数(单位:字节)
	SysMemoryAvailable   uint64  `json:"sys_memory_available"`    // 系统内存可供申请使用的内存数(单位:字节)
	SysMemoryUsedPercent float64 `json:"sys_memory_used_percent"` // 系统内存已使用比例(0~1之间的小数)
	GoMemoryTotal        uint64  `json:"go_memory_total"`         // go程序从系统申请的内存总数(单位:字节)
	GoMemoryAlloc        uint64  `json:"go_memory_alloc"`         // go程序当前正在使用的内存数(单位:字节)
	GoMemoryUsedPercent  float64 `json:"go_memory_used_percent"`  // go程序申请的内存与系统总内存的比例(0~1之间的小数)
}

MemoryStatistics 内存统计

type Payload

type Payload struct {
	Name          string `json:"Name"`          // 队列名称
	ID            string `json:"ID"`            // 任务ID
	MaxTries      int64  `json:"MaxTries"`      // 任务最大尝试次数,默认1
	RetryInterval int64  `json:"RetryInterval"` // 当任务最大允许尝试次数大于0时,下次尝试之前的间隔时长,单位:秒
	Attempts      int64  `json:"Attempts"`      // 任务已被尝试执行的的次数
	Payload       []byte `json:"Payload"`       // 任务参数比特字面量,可decode成具体job被execute时的类型
	PopTime       int64  `json:"PopTime"`       // 任务首次被取出执行的时间戳,取出的时候才去设置
	Timeout       int64  `json:"Timeout"`       // 任务最大执行超时时长,单位:秒
	TimeoutAt     int64  `json:"TimeoutAt"`     // 任务超时时刻时间戳,被执行时刻才会去设置
}

Payload 存储于队列中的job任务结构

func (*Payload) RawBody

func (payload *Payload) RawBody() *RawBody

RawBody PayLoad结构体获取载体实体

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue 队列struct

func New

func New(driver string, conn interface{}, logger Logger, config Config) *Queue

New 初始化一个队列

	@param driver     队列实现底层驱动,可选值见上方14行附近位置的常量
	@param conn       driver对应底层驱动连接器句柄,具体类型参考 QueueIFace 实体类
	@param logger     实现 Logger 接口的结构体实例的指针对象
    @param config     配置

func (*Queue) AutoScaleWorkers

func (q *Queue) AutoScaleWorkers() error

AutoScaleWorkers 自动扩缩容消费进程的Worker 内部自动依据待消费job和内存情况决定是扩容还是缩容 注意:只有当前进程是消费者进程(即调用了Start方法的)才能扩容

func (*Queue) Bootstrap

func (q *Queue) Bootstrap(tasks []TaskIFace) error

Bootstrap boot注册载入多个队列任务

@tasks 任务类实例指针切片

func (*Queue) BootstrapOne

func (q *Queue) BootstrapOne(task TaskIFace) error

BootstrapOne boot注册载入一个队列任务

@param task 任务类实例指针

func (*Queue) Delay

func (q *Queue) Delay(task TaskIFace, payload interface{}, duration time.Duration) error

Delay 投递一个指定延迟时长的延迟队列Job任务

func (*Queue) DelayAt

func (q *Queue) DelayAt(task TaskIFace, payload interface{}, delay time.Time) error

DelayAt 投递一个指定的将来时刻执行的延迟队列Job任务

func (*Queue) DelayAtByName

func (q *Queue) DelayAtByName(name string, payload interface{}, delay time.Time) error

DelayAtByName 按任务name投递一个延迟队列Job任务

  • 投递一个异步延迟执行的任务
  • 重要提示:使用该方法则意味着投递任务之前必须bootstrap任务类,新项目请尽量使用DelayAt方法

func (*Queue) DelayByName

func (q *Queue) DelayByName(name string, payload interface{}, duration time.Duration) error

DelayByName 按任务name投递一个将来时刻执行的延迟队列Job任务

  • 投递一个异步延迟执行的任务
  • 重要提示:使用该方法则意味着投递任务之前必须bootstrap任务类,新项目请尽量使用Delay方法

func (*Queue) Dispatch

func (q *Queue) Dispatch(task TaskIFace, payload interface{}) error

Dispatch 投递一个队列Job任务

func (*Queue) DispatchByName

func (q *Queue) DispatchByName(name string, payload interface{}) error

DispatchByName 按任务name投递一个队列Job任务

  • 投递一个异步立即执行的任务
  • 重要:使用该方法则意味着投递任务之前必须bootstrap任务类,新项目请尽量使用Dispatch方法

func (*Queue) GetStatistics

func (q *Queue) GetStatistics() Statistics

GetStatistics 获取统计信息,请勿频繁调用,底层统计内存会STW

  • 返回包含当前worker数、最大worker数、活跃worker数、内存使用情况等信息

func (*Queue) SetAllowTasks

func (q *Queue) SetAllowTasks(taskNames ...string)

SetAllowTasks 指定可以运行的任务

func (*Queue) SetExcludeTasks

func (q *Queue) SetExcludeTasks(taskNames ...string)

SetExcludeTasks 指定不可运行的任务

func (*Queue) SetFailedJobHandler

func (q *Queue) SetFailedJobHandler(failedJobHandler FailedJobHandler)

SetFailedJobHandler 设置失败任务的收尾处理器 1、尝试了指定的最大尝试次数后仍然失败的任务善后方法 2、此时通过此处设置的处理器可记录到底哪个任务失败了以及失败任务的payload参数情况 3、以及后续的重试等逻辑等

func (*Queue) ShutDown

func (q *Queue) ShutDown(ctx context.Context) error

ShutDown graceful shut down

func (*Queue) Size

func (q *Queue) Size(task TaskIFace) int64

Size 获取指定队列当前长度

func (*Queue) Start

func (q *Queue) Start() error

Start 守护进程启动队列消费者

type QueueIFace

type QueueIFace interface {
	// Size 获取当前队列长度方法
	// @param queue 队列的名称
	Size(queue string) (size int64)
	// Push 投递一条任务到队列方法
	// @param queue 队列的名称
	// @param payload 投递进队列的参数负载
	Push(queue string, payload interface{}) (err error)
	// Later 投递一条指定延长时长的延迟任务到队列的方法
	// @param queue 延迟队列的名称
	// @param durationTo 相对于投递任务时刻延迟的时长
	// @param payload 投递进队列的多个参数负载
	Later(queue string, durationTo time.Duration, payload interface{}) (err error)
	// LaterAt 投递一条指定执行时间的延迟任务到队列的方法
	// @param queue 延迟队列的名称
	// @param timeAt 延迟执行的时刻
	// @param payload 投递进队列的多个参数负载
	LaterAt(queue string, timeAt time.Time, payload interface{}) (err error)
	// Pop 从队尾取出一条任务的方法
	// @param queue 队列的名称
	Pop(queue string) (job JobIFace, exist bool)
	// SetConnection 设置队列底层连接器
	// @param connection 底层连接器实例
	SetConnection(connection interface{}) (err error)
	// GetConnection 获取队列底层连接器
	GetConnection() (connection interface{}, err error)
}

QueueIFace 基于不同技术栈的队列实现契约

type RawBody

type RawBody struct {
	ID string // 队列内部唯一标识符ID
	// contains filtered or unexported fields
}

RawBody 队列execute执行时传递给执行方法的参数Raw结构:job任务参数的包装器

  • ID 内部标记队列任务的唯一ID,使用UUID生成

func MakeRawBody

func MakeRawBody(queue, uniqueId string, payload []byte) *RawBody

MakeRawBody 构造RawBody结构方法

  • queue 队列标识符、名称
  • uniqueId 唯一id
  • payload 队列载体,字节数组

func (*RawBody) Bytes

func (rawBody *RawBody) Bytes() []byte

Bytes 任务参数转[]byte

如果投递的任务参数为[]byte型标量参数,使用该方法获取传参

func (*RawBody) Int

func (rawBody *RawBody) Int() int

Int 任务参数数据转int

如果投递的任务参数为int型标量参数,使用该方法获取传参

func (*RawBody) Int64

func (rawBody *RawBody) Int64() int64

Int64 任务参数转int64

如果投递的任务参数为int64型标量参数,使用该方法获取传参

func (*RawBody) String

func (rawBody *RawBody) String() string

String 任务参数转string

如果投递的任务参数为string型标量参数,使用该方法获取传参

func (*RawBody) Unmarshal

func (rawBody *RawBody) Unmarshal(result interface{}) error

Unmarshal 任务参数Unmarshal为投递调度任务时的结构类型

  • 传参为基础类型的不要使用该方法转换而是使用 Int String Bytes 等method
  • result 具体类型的指针引用变量,转换成功将自动填充
  • 转换成功填充result返回nil,转换失败时返回error

type Statistics

type Statistics struct {
	StatisticsTime   int64            `json:"statistics_time"`   // 统计时间戳
	MemoryStatistics MemoryStatistics `json:"memory_statistics"` // 内存情况统计
	WorkerStatistics WorkerStatistics `json:"worker_statistics"` // worker情况统计
	JobStatistics    JobStatistics    `json:"job_statistics"`    // job情况统计
}

Statistics 统计信息

type TaskIFace

type TaskIFace interface {
	MaxTries() int64                                 // 定义队列任务最大尝试次数:任务执行的最大尝试次数
	RetryInterval() int64                            // 定义队列任务最大尝试间隔:当任务执行失败后再次尝试执行的间隔时长,单位:秒
	Timeout() time.Duration                          // 定义队列超时方法:返回超时时长
	Name() string                                    // 定义队列名称方法:返回队列名称
	Execute(ctx context.Context, job *RawBody) error // 定义队列任务执行时的方法:执行成功返回nil,执行失败返回error
	Remark() string                                  // 队列任务說明
}

TaskIFace 定义队列Job任务执行逻辑的契约(队列任务执行类)

type WorkerStatistics

type WorkerStatistics struct {
	ActiveWorkers int64          `json:"active_workers"` // 正在处理任务的worker数量
	TotalWorkers  int64          `json:"total_workers"`  // 实际存在的worker总数
	WorkerState   map[int64]bool `json:"worker_state"`   // 每个worker的状态映射
}

WorkerStatistics 工作进程统计结构

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL