g_task

package module
v0.0.0-...-b15c4a3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2022 License: MIT Imports: 2 Imported by: 0

README

g_task 异步任务队列

特点

  • 简单,无侵入
  • 支持任务超时设置
  • 支持任务超时重试
  • 方便扩展broker backend,目前支持redis

QuickStart

server
package main

import (
	"fmt"
	"time"

	"github.com/YuleiGong/g_task"
	"github.com/YuleiGong/g_task/backend"
	"github.com/YuleiGong/g_task/broker"
	"github.com/YuleiGong/g_task/server"
)

var (
	url      = "127.0.0.1:6379"
	db       = 1
	poolSize = 50
	password = ""
)

func add(a, b int) (int, error) {

	return a + b, nil
}
func main() {
	//broker
	brokerCfg := broker.NewRedisConf(url, password, db)
	brokerCfg.SetPoolSize(poolSize)
	brokerCfg.SetExpireTime(1 * time.Hour)

	//backend
	backendCfg := backend.NewRedisConf(url, password, db)
	backendCfg.SetPoolSize(poolSize)
	backendCfg.SetExpireTime(1 * time.Hour)

	opts := []server.WorkerOpt{
		server.WithBroker(broker.NewRedis(brokerCfg)),
		server.WithBackend(backend.NewRedis(backendCfg)),
	}
	svr := g_task.Server(opts...)
	//函数注册
	svr.Reg("add", add)
	if err := svr.Run(10); err != nil {
		fmt.Println("%v", err)
	}
	defer svr.ShutDown()

}
client
package main

import (
	"fmt"
	"time"

	"github.com/YuleiGong/g_task"
	"github.com/YuleiGong/g_task/backend"
	"github.com/YuleiGong/g_task/broker"
	"github.com/YuleiGong/g_task/client"
)

var (
	url      = "127.0.0.1:6379"
	db       = 1
	poolSize = 50
	password = ""
)

func main() {
	var err error
	//broker
	brokerCfg := broker.NewRedisConf(url, password, db)
	brokerCfg.SetPoolSize(poolSize)
	brokerCfg.SetExpireTime(1 * time.Hour)
	//backend
	backendCfg := backend.NewRedisConf(url, password, db)
	backendCfg.SetPoolSize(poolSize)
	backendCfg.SetExpireTime(1 * time.Hour)
	opts := []client.ClientOpt{
		client.WithBroker(broker.NewRedis(brokerCfg)),
		client.WithBackend(backend.NewRedis(backendCfg)),
	} //实际使用中,不需要初始化broker broker, client会自动复用server的配置
	var cli *client.Client
	if cli, err = g_task.Client(opts...); err != nil {
		fmt.Printf("%v", err)
		return
	}

    sig := []message.Signature{
		{Type: message.Int, Val: 1},
		{Type: message.Int, Val: 2},
	}
	sendConf := client.NewSendConf("add")
	var taskID string
	if taskID, err = cli.Send(sendConf, sig...); err != nil {
		fmt.Printf("%s", err.Error())
		return
	}
	fmt.Printf("%s", taskID)

}
Example

example 目录下有更多的样例可供参考

Server

  • 初始化: 通过 Server 函数获取一个服务。
opts := []server.WorkerOpt{
	server.WithBroker(broker.NewRedis(brokerCfg)),
	server.WithBackend(backend.NewRedis(backendCfg)),
}
svr := g_task.Server(opts...)

  • 配置: 需要配置服务的 brokerbackend 。 详细见 broker backend 章节
//broker
brokerCfg := broker.NewRedisConf(url, password, db)
brokerCfg.SetPoolSize(poolSize)
brokerCfg.SetExpireTime(1 * time.Hour)

//backend
backendCfg := backend.NewRedisConf(url, password, db)
backendCfg.SetPoolSize(poolSize)
backendCfg.SetExpireTime(1 * time.Hour)

opts := []server.WorkerOpt{
	server.WithBroker(broker.NewRedis(brokerCfg)),
	server.WithBackend(backend.NewRedis(backendCfg)),
}
  • 任务注册: 一个任务,就是一个 函数 。注册后,可以作为异步任务执行。注册函数至少要有一个error 返回值
func add(a, b int) (int, error) {

	return a + b, nil
}
svr.Reg("add", add)

  • 启动和退出
if err := svr.Run(10); err != nil {
		fmt.Printf("%v", err)
	}
defer svr.ShutDown()

Client

  • 初始化: 通过Client 获取一个客户端。
var cli *client.Client
if cli, err = g_task.Client(opts...); err != nil {
	fmt.Printf("%v", err)
	return
}
  • 配置: 需要配客户端的 brokerbackend 。 broker和backend必须和server保持的一致。
brokerCfg := broker.NewRedisConf(url, password, db)
brokerCfg.SetPoolSize(poolSize)
brokerCfg.SetExpireTime(1 * time.Hour)
//backend
backendCfg := backend.NewRedisConf(url, password, db)
backendCfg.SetPoolSize(poolSize)
backendCfg.SetExpireTime(1 * time.Hour)
opts := []client.ClientOpt{
	client.WithBroker(broker.NewRedis(brokerCfg)),
	client.WithBackend(backend.NewRedis(backendCfg)),
}
  • 配置(实际使用): 实际使用中,不必初始化 brokerbackend 。client会自动选择server的broker/backend配置。
var cli *client.Client
if cli, err = g_task.Client(opts...); err != nil {
	fmt.Printf("%v", err)
	return
}
  • 发送任务: 发送任务,需要初始化一个 SendConf 配置和 Signature 参数签名配置。
sendConf := client.NewSendConf("add")
sig := []message.Signature{
		{Type: message.Int, Val: 1},
		{Type: message.Int, Val: 2},
}
if taskID, err = cli.Send(sendConf, sig...); err != nil {
    return
}
  • 支持的异步函数参数类型:
const (
	Bool = iota
	Int
	Int8
	Int16
	Int32
	Int64
	Uint
	Uint8
	Uint16
	Uint32
	Uint64
	Float32
	Float64
	String
)
  • 查看任务执行情况
func TaskStatus(task []string) {
	for _, t := range task {
		code, status := cli.Status(t)
		fmt.Printf("code %d status %s \n", code, status)
	}
}
func TaskResult(task []string) {
	var err error
	for _, t := range task {
		for !cli.IsFinish(t) {
			time.Sleep(1 * time.Second)
		}
		var res *message.MessageResult
		if res, err = cli.GetTaskResult(t); err != nil {
			fmt.Printf("err %v \n", err)
		}
		fmt.Printf("%+v \n", res)
	}
}

TimeoutTask

  • 支持为异步任务设置超时时间,在发送任务的时候,需要配置超时时间。默认情况下,无超时。
  • 任务超时后,可以在 backend 中查看任务状态。
func cfgWithTimeout() *client.sendConf {
	sendConf := client.NewSendConf("add")
	sendConf.SetTimeout(2 * time.Second)

	return sendConf
}

RetryTask

  • 支持为超时事件,设置重试,并设置重试次数。注意: 同时设置了Timeout 和 Retrynum ,才能触发RetryTask。
//超时重试
func cfgWithRetryNum() *client.sendConf {
	sendConf := client.NewSendConf("add")
	sendConf.SetTimeout(2 * time.Second)
	sendConf.SetRetryNum(2)

	return sendConf
}

Broker

  • 使用 Broker 与任务队列通信,目前支持的任务队列只有 redis
//初始化
brokerCfg := broker.NewRedisConf(url, password, db)
brokerCfg.SetPoolSize(poolSize)
brokerCfg.SetExpireTime(1 * time.Hour)//默认2H

  • 通过实现以下接口,可以自定义broker
type Broker interface {
	Clone() Broker
	Activate() error
	Push(taskID string, msg *message.Message) (err error)
	Pop() (taskID string, msg *message.Message, err error)
	Del(taskID string) (err error)
	Set(taskID string, msg *message.Message) (err error)
	Get(taskID string) (msg *message.Message, err error)
}

Backend

  • 使用 Backend 存储任务结果,目前支持的Backend只有 redis
//初始化
//backend
backendCfg := backend.NewRedisConf(url, password, db)
backendCfg.SetPoolSize(poolSize)
backendCfg.SetExpireTime(1 * time.Hour) //消息存放时间,默认2H
  • 通过实现以下接口,可以自定义backend
type Backend interface {
	SetResult(taskID string, msg *message.MessageResult) error
	GetResult(taskID string) (string, error)
	Activate() error
	Clone() Backend
}

任务状态标识

  • SUCCES int64 = 0 //成功
  • FAILURE int64 = -1 //失败
  • PENDING int64 = 1 //排队中/挂起
  • RETRY int64 = 2 //重试
  • STARTED int64 = 3 //任务开始执行

后续计划

  • CallBack
  • 任务参数校验

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Client

func Client(opts ...client.ClientOpt) (cli *client.Client, err error)

func Server

func Server(opts ...server.WorkerOpt) (svr *server.Server)

Types

This section is empty.

Directories

Path Synopsis
example
client command
server command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL