task

package
v0.0.22 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 7, 2025 License: MIT Imports: 12 Imported by: 6

README

异步任务管理模块

Documentation

Index

Constants

View Source
const (
	APP_NAME = "tasks"
)
View Source
const (
	EVENT_LABLE_TASK_ID = "task_id"
)
View Source
const (
	PRIORITY = 699
)
View Source
const (
	RUN_PARAM_TYPE_JSON = "json"
)

Variables

View Source
var (
	STATUS_MAP = map[STATUS]string{
		STATUS_PENDDING: "PENDDING",
		STATUS_QUEUED:   "QUEUED",
		STATUS_RUNNING:  "RUNNING",
		STATUS_SUCCESS:  "SUCCESS",
		STATUS_FAILED:   "FAILED",
		STATUS_CANCELED: "CANCELED",
	}
	STATUS_COMPLETE = []STATUS{
		STATUS_SUCCESS,
		STATUS_FAILED,
		STATUS_CANCELED,
	}
)
View Source
var (
	DEFAULT_TIMEOUT = time.Second * 30
)

Functions

func ListAsyncRunner added in v0.0.21

func ListAsyncRunner() (ns []string)

func ListSyncRunner added in v0.0.21

func ListSyncRunner() (ns []string)

func NewDebugEvent added in v0.0.22

func NewDebugEvent(taskId string, format string, a ...any) *event.EventSpec

func NewErrorEvent

func NewErrorEvent(taskId string, format string, a ...any) *event.EventSpec

func NewInfoEvent

func NewInfoEvent(taskId string, format string, a ...any) *event.EventSpec

func NewWarnEvent added in v0.0.22

func NewWarnEvent(taskId string, format string, a ...any) *event.EventSpec

func RegistryAsyncRunner added in v0.0.21

func RegistryAsyncRunner(name string, runner AsyncRunner)

func RegistrySyncRunner added in v0.0.21

func RegistrySyncRunner(name string, runner SyncRunner)

func StatusCompleteString

func StatusCompleteString() []string

Types

type AsyncRunner added in v0.0.21

type AsyncRunner interface {
	// 触发异步执行
	Start(ctx context.Context, task *Task) error
	// 获取任务状态
	GetStatus(ctx context.Context, taskID string) (*TaskStatus, error)
	// 取消任务
	Cancel(ctx context.Context, taskID string) error
}

func GetAsyncRunner added in v0.0.21

func GetAsyncRunner(name string) AsyncRunner

type CancelRequest added in v0.0.21

type CancelRequest struct {
	DescribeTaskRequest
}

type DescribeTaskRequest

type DescribeTaskRequest struct {
	TaskId string `json:"task_id"`
}

func NewDescribeTaskRequest

func NewDescribeTaskRequest(taskId string) *DescribeTaskRequest

type QUEUE_EVENT_TYPE added in v0.0.21

type QUEUE_EVENT_TYPE string
const (
	// 任务运行事件
	QUEUE_EVENT_TYPE_RUN QUEUE_EVENT_TYPE = "run"
	// 任务取消事件
	QUEUE_EVENT_TYPE_CANCEL QUEUE_EVENT_TYPE = "cancel"
)

type QueryTaskRequest

type QueryTaskRequest struct {
	request.PageRequest

	Label map[string]string `json:"label"`
}

func NewQueryTaskRequest added in v0.0.14

func NewQueryTaskRequest() *QueryTaskRequest

func (*QueryTaskRequest) SetLabel added in v0.0.21

func (r *QueryTaskRequest) SetLabel(key, value string) *QueryTaskRequest

type QueueEvent added in v0.0.21

type QueueEvent struct {
	// 事件类型
	Type QUEUE_EVENT_TYPE `json:"type"`
	// 事件值
	TaskId string `json:"task_id"`
}

func NewQueueEvent added in v0.0.21

func NewQueueEvent() *QueueEvent

func (*QueueEvent) Load added in v0.0.21

func (e *QueueEvent) Load(v []byte) error

func (*QueueEvent) String added in v0.0.21

func (e *QueueEvent) String() string

type RUN_PARAM_TYPE added in v0.0.21

type RUN_PARAM_TYPE string

type RunParam added in v0.0.21

type RunParam struct {
	// 参数类型
	Type RUN_PARAM_TYPE `json:"type" gorm:"column:run_param_type;type:varchar(60)" description:"参数类型"`
	// 参数值
	Value string `json:"value" gorm:"column:run_param_value;type:text" description:"参数值"`
}

func NewJsonRunParam added in v0.0.21

func NewJsonRunParam(jsonStr string) *RunParam

func (*RunParam) Load added in v0.0.21

func (r *RunParam) Load(v any) error

type STATUS

type STATUS int
const (
	// 任务处于挂起, 等待排队
	STATUS_PENDDING STATUS = iota
	// 队列中
	STATUS_QUEUED
	// 任务正在运行
	STATUS_RUNNING
	// 取消中
	STATUS_CANCELING
	// 任务已完成
	STATUS_SUCCESS
	// 任务失败
	STATUS_FAILED
	// 任务已取消
	STATUS_CANCELED
	// 忽略执行
	STATUS_SKIPPED
)

func (STATUS) String

func (s STATUS) String() string

type Service

type Service interface {
	// 创建任务
	CreateTask(context.Context, *TaskSpec) (*Task, error)
	// 任务执行
	Run(context.Context, *TaskSpec) (*Task, error)
	// 任务取消
	Cancel(context.Context, *CancelRequest) (*Task, error)
	// 查询任务列表
	QueryTask(context.Context, *QueryTaskRequest) (*types.Set[*Task], error)
	// 查询任务详情
	DescribeTask(context.Context, *DescribeTaskRequest) (*Task, error)
}

func GetService

func GetService() Service

type SyncRunner added in v0.0.21

type SyncRunner interface {
	// 同步执行,通过ctx控制取消
	Run(ctx context.Context, task *Task) error
}

func GetSyncRunner added in v0.0.21

func GetSyncRunner(name string) SyncRunner

type Task

type Task struct {
	// 任务Id
	Id string `json:"id" gorm:"column:id;type:string;primary_key;" unique:"true" description:"Id"`
	// 创建时间
	CreatedAt time.Time `` /* 126-byte string literal not displayed */
	// 任务定义
	TaskSpec
	// 任务状态
	TaskStatus

	// 执行过程中的事件, 执行日志
	Events []*event.Event `json:"events" gorm:"-" description:"执行过程中的事件"`
}

func NewTask

func NewTask(spec TaskSpec) *Task

func (*Task) BuildTimeoutCtx

func (t *Task) BuildTimeoutCtx() context.Context

注入上下文当中

func (*Task) Cancel added in v0.0.14

func (t *Task) Cancel()

func (*Task) Failed

func (t *Task) Failed(msg string) *Task

func (*Task) IsCompleted added in v0.0.21

func (t *Task) IsCompleted() bool

func (*Task) IsRunning added in v0.0.21

func (t *Task) IsRunning() bool

func (*Task) Running

func (t *Task) Running() *Task

func (*Task) SetMessage added in v0.0.21

func (t *Task) SetMessage(msg string) *Task

func (*Task) String

func (t *Task) String() string

func (*Task) Success

func (t *Task) Success() *Task

func (*Task) TableName

func (t *Task) TableName() string

type TaskFunc

type TaskFunc func(ctx context.Context, req any) error

type TaskSpec

type TaskSpec struct {
	// 任务Id
	TaskId string `json:"task_id" gorm:"-" description:"任务Id, 如果是任务Id,则查询任务执行"`
	// 异步执行时的超时时间
	Timeout string `json:"timeout" gorm:"column:timeout;" description:"异步执行时的超时时间"`
	// 尝试执行,用于做执行前检查
	Async bool `json:"async" gorm:"column:async;type:bool;" description:"是否是异步任务"`
	// 执行器名称
	Runner string `json:"runner" gorm:"column:type;type:varchar(60);" description:"执行器名称"`
	// 执行器参数
	Params *RunParam `json:"params" gorm:"embedded" description:"任务参数"`
	// 任务名称
	Name string `json:"name" gorm:"column:name;type:varchar(200);" description:"任务名称"`
	// 任务名称
	Description string `json:"description" gorm:"column:description;type:text;" description:"任务描述"`
	// 尝试执行,用于做执行前检查
	DryRun bool `json:"dryrun" gorm:"column:dryrun;type:bool;" description:"尝试执行,用于做执行前检查"`
	// 事件所属资源
	Resource string `json:"resource" gorm:"column:resource;type:varchar(120);" description:"事件所属资源"`
	// 任务标签
	Label map[string]string `json:"label" bson:"label" gorm:"column:label;serializer:json;type:json" description:"任务标签" optional:"true"`

	// 任务执行结束回调
	WebHooks []*webhook.WebHook `` /* 138-byte string literal not displayed */
}

func NewTaskSpec added in v0.0.21

func NewTaskSpec(runner string, param *RunParam) *TaskSpec

func (*TaskSpec) AddWebHook

func (t *TaskSpec) AddWebHook(hs ...*webhook.WebHook) *TaskSpec

func (*TaskSpec) SetDescription added in v0.0.14

func (t *TaskSpec) SetDescription(desc string) *TaskSpec

func (*TaskSpec) SetLabel

func (t *TaskSpec) SetLabel(key, value string) *TaskSpec

func (*TaskSpec) SetName added in v0.0.14

func (t *TaskSpec) SetName(name string) *TaskSpec

type TaskStatus

type TaskStatus struct {
	// 开始执行时间
	StartAt *time.Time `json:"start_at" gorm:"column:start_at;type:timestamp;" description:"开始执行时间"`
	// 执行结束的时间
	EndAt *time.Time `json:"end_at" gorm:"column:end_at;type:timestamp;" description:"执行结束的时间"`
	// 任务状态更新时间
	UpdateAt *time.Time `json:"update_at" gorm:"column:update_at;type:timestamp;" description:"任务状态更新时间"`
	// 任务执行状态
	Status STATUS `json:"status" gorm:"column:status;type:tinyint(2);" description:"任务执行状态"`
	// 失败信息
	Message string `json:"message" gorm:"column:message;type:text;" description:"失败信息"`
	// 失败信息
	Detail string `json:"detail" gorm:"column:detail;type:text;" description:"详情内容"`
	// contains filtered or unexported fields
}

func NewTaskStatus

func NewTaskStatus() *TaskStatus

func (*TaskStatus) SetEndAt

func (s *TaskStatus) SetEndAt(t time.Time)

func (*TaskStatus) SetStartAt

func (s *TaskStatus) SetStartAt(t time.Time)

func (*TaskStatus) SetUpdateAt

func (s *TaskStatus) SetUpdateAt(t time.Time)

func (*TaskStatus) TableName added in v0.0.21

func (s *TaskStatus) TableName() string

type UnimplementedAsyncRunner added in v0.0.21

type UnimplementedAsyncRunner struct{}

func (*UnimplementedAsyncRunner) Cancel added in v0.0.21

func (r *UnimplementedAsyncRunner) Cancel(ctx context.Context, taskID string) error

func (*UnimplementedAsyncRunner) GetStatus added in v0.0.21

func (r *UnimplementedAsyncRunner) GetStatus(ctx context.Context, taskID string) (*TaskStatus, error)

func (*UnimplementedAsyncRunner) Start added in v0.0.21

type UnimplementedSyncRunner added in v0.0.21

type UnimplementedSyncRunner struct{}

func (*UnimplementedSyncRunner) Start added in v0.0.21

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL