queue

package
v0.0.0-...-d68a214 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TaskTypePipeline = "pipeline" // 流水线任务
	TaskTypeJob      = "job"      // 作业任务
	TaskTypeStep     = "step"     // 步骤任务
	TaskTypeCustom   = "custom"   // 自定义任务
)

任务类型常量

View Source
const (
	Critical = "critical" // 关键队列(优先级最高)
	Default  = "default"  // 默认队列
	Low      = "low"      // 低优先级队列
)

队列名称常量

View Source
const (
	TaskRecordStatusPending   = "pending"   // 等待中
	TaskRecordStatusRunning   = "running"   // 执行中
	TaskRecordStatusCompleted = "completed" // 已完成
	TaskRecordStatusFailed    = "failed"    // 失败
)

任务记录状态常量

Variables

AgentProviderSet 提供 queue 相关的依赖(Agent 使用)

ProviderSet 提供 queue 相关的依赖(主程序使用)

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client 队列客户端(Agent 使用) 负责执行任务,不发布任务

func NewQueueClient

func NewQueueClient(cfg *Config) (*Client, error)

NewQueueClient 创建队列客户端(Agent 使用)

func ProvideQueueClient

func ProvideQueueClient(taskQueueConfig *Config) (*Client, error)

ProvideQueueClient 提供队列客户端(Agent 使用)

func (*Client) GetRedisConnOpt

func (c *Client) GetRedisConnOpt() asynq.RedisConnOpt

GetRedisConnOpt 获取 Redis 连接选项(用于创建 Inspector)

func (*Client) GetServer

func (c *Client) GetServer() *asynq.Server

GetServer 获取 asynq 服务器

func (*Client) RegisterHandler

func (c *Client) RegisterHandler(taskType string, handler TaskHandler)

RegisterHandler 注册任务处理器

func (*Client) RegisterHandlerFunc

func (c *Client) RegisterHandlerFunc(taskType string, handlerFunc TaskHandlerFunc)

RegisterHandlerFunc 注册任务处理器函数

func (*Client) Run

func (c *Client) Run() error

Run 启动任务队列客户端并阻塞等待信号 此方法会阻塞直到收到退出信号,然后自动关闭服务器

func (*Client) Shutdown

func (c *Client) Shutdown()

Shutdown 关闭任务队列客户端

func (*Client) Start

func (c *Client) Start() error

Start 启动任务队列客户端 注意:Start() 方法会立即返回,不会阻塞。如果需要阻塞等待,请使用 Run() 方法。

type Config

type Config struct {
	RedisClient      redis.UniversalClient // Redis 客户端(复用已有的客户端)
	ClickHouse       *gorm.DB              // ClickHouse GORM 实例(用于任务记录)
	Concurrency      int                   // 并发处理数
	StrictPriority   bool                  // 是否严格优先级
	Queues           map[string]int        // 队列配置:队列名 -> 优先级权重
	DefaultQueue     string                // 默认队列名称
	LogLevel         string                // 日志级别: debug, info, warn, error
	ShutdownTimeout  int                   // 关闭超时时间(秒)
	GroupGracePeriod int                   // 组优雅关闭周期(秒)
	GroupMaxDelay    int                   // 组最大延迟(秒)
	GroupMaxSize     int                   // 组最大大小
}

Config queue 配置

func ProvideAgentConfig

func ProvideAgentConfig(agentConf *agentconfig.AgentConfig, cmdable redis.Cmdable) (*Config, error)

ProvideAgentConfig 提供 queue 配置(Agent 使用)

func ProvideConfig

func ProvideConfig(appConf *config.AppConfig, clickHouse *gorm.DB, cmdable redis.Cmdable) (*Config, error)

ProvideConfig 提供 queue 配置(主程序使用)

type DefaultTaskHandler

type DefaultTaskHandler struct {
	// contains filtered or unexported fields
}

DefaultTaskHandler 默认任务处理器(带状态更新和执行器)

func NewDefaultTaskHandler

func NewDefaultTaskHandler(statusUpdater TaskStatusUpdater, executor TaskExecutor) *DefaultTaskHandler

func (*DefaultTaskHandler) HandleTask

func (h *DefaultTaskHandler) HandleTask(ctx context.Context, payload *TaskPayload) error

type JobTaskHandler

type JobTaskHandler struct {
}

JobTaskHandler 作业任务处理器

func NewJobTaskHandler

func NewJobTaskHandler() *JobTaskHandler

func ProvideJobTaskHandler

func ProvideJobTaskHandler() *JobTaskHandler

ProvideJobTaskHandler 提供作业任务处理器

func (*JobTaskHandler) HandleTask

func (h *JobTaskHandler) HandleTask(ctx context.Context, payload *TaskPayload) error

type PipelineTaskHandler

type PipelineTaskHandler struct {
}

PipelineTaskHandler 流水线任务处理器

func NewPipelineTaskHandler

func NewPipelineTaskHandler() *PipelineTaskHandler

func ProvidePipelineTaskHandler

func ProvidePipelineTaskHandler() *PipelineTaskHandler

ProvidePipelineTaskHandler 提供流水线任务处理器

func (*PipelineTaskHandler) HandleTask

func (h *PipelineTaskHandler) HandleTask(ctx context.Context, payload *TaskPayload) error

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server 队列服务器(主程序使用) 负责任务发布和调度,不执行任务

func NewQueueServer

func NewQueueServer(cfg *Config) (*Server, error)

NewQueueServer 创建队列服务器(主程序使用)

func ProvideQueueServer

func ProvideQueueServer(taskQueueConfig *Config) (*Server, error)

ProvideQueueServer 提供队列服务器(主程序使用)

func (*Server) Enqueue

func (s *Server) Enqueue(payload *TaskPayload, queueName string) error

Enqueue 入队任务

func (*Server) EnqueueDelayed

func (s *Server) EnqueueDelayed(payload *TaskPayload, delay time.Duration, queueName string) error

EnqueueDelayed 延迟入队任务

func (*Server) EnqueueWithPriority

func (s *Server) EnqueueWithPriority(payload *TaskPayload, priorityWeight int) error

EnqueueWithPriority 根据权重入队任务 priorityWeight: 任务权重,根据配置的队列权重自动选择最合适的队列 选择策略:找到权重 >= priorityWeight 的最小队列(向上匹配),如果没有则使用权重最高的队列

func (*Server) GetClient

func (s *Server) GetClient() *asynq.Client

GetClient 获取 asynq 客户端

func (*Server) GetRedisConnOpt

func (s *Server) GetRedisConnOpt() asynq.RedisConnOpt

GetRedisConnOpt 获取 Redis 连接选项(用于创建 Inspector)

func (*Server) Shutdown

func (s *Server) Shutdown()

Shutdown 关闭队列服务器

type StepRunRecordManager

type StepRunRecordManager struct {
	// contains filtered or unexported fields
}

StepRunRecordManager 步骤执行记录管理器,负责将步骤执行状态写入 ClickHouse

func NewStepRunRecordManager

func NewStepRunRecordManager(clickHouse *gorm.DB) (*StepRunRecordManager, error)

NewStepRunRecordManager 创建步骤执行记录管理器

func (*StepRunRecordManager) RecordStepRunCompleted

func (m *StepRunRecordManager) RecordStepRunCompleted(payload *TaskPayload)

RecordStepRunCompleted 记录步骤执行完成

func (*StepRunRecordManager) RecordStepRunEnqueued

func (m *StepRunRecordManager) RecordStepRunEnqueued(payload *TaskPayload, queueName string)

RecordStepRunEnqueued 记录步骤执行入队

func (*StepRunRecordManager) RecordStepRunFailed

func (m *StepRunRecordManager) RecordStepRunFailed(payload *TaskPayload, err error)

RecordStepRunFailed 记录步骤执行失败

func (*StepRunRecordManager) RecordStepRunStarted

func (m *StepRunRecordManager) RecordStepRunStarted(payload *TaskPayload)

RecordStepRunStarted 记录步骤执行开始

type StepTaskHandler

type StepTaskHandler struct {
}

StepTaskHandler 步骤任务处理器

func NewStepTaskHandler

func NewStepTaskHandler() *StepTaskHandler

func ProvideStepTaskHandler

func ProvideStepTaskHandler() *StepTaskHandler

ProvideStepTaskHandler 提供步骤任务处理器

func (*StepTaskHandler) HandleTask

func (h *StepTaskHandler) HandleTask(ctx context.Context, payload *TaskPayload) error

type TaskExecutor

type TaskExecutor interface {
	ExecuteTask(ctx context.Context, payload *TaskPayload) error
}

TaskExecutor 任务执行器接口

type TaskHandler

type TaskHandler interface {
	HandleTask(ctx context.Context, payload *TaskPayload) error
}

TaskHandler 任务处理器接口

type TaskHandlerFunc

type TaskHandlerFunc func(ctx context.Context, payload *TaskPayload) error

TaskHandlerFunc 任务处理器函数类型

func (TaskHandlerFunc) HandleTask

func (f TaskHandlerFunc) HandleTask(ctx context.Context, payload *TaskPayload) error

type TaskManager

type TaskManager struct {
	// contains filtered or unexported fields
}

TaskManager 任务管理器

func NewTaskManager

func NewTaskManager(server *Server) *TaskManager

NewTaskManager 创建任务管理器

func (*TaskManager) CancelTask

func (m *TaskManager) CancelTask(ctx context.Context, taskID string) error

CancelTask 取消任务

func (*TaskManager) EnqueueDelayedTask

func (m *TaskManager) EnqueueDelayedTask(ctx context.Context, payload *TaskPayload, delay time.Duration) error

EnqueueDelayedTask 入队延迟任务

func (*TaskManager) EnqueueTask

func (m *TaskManager) EnqueueTask(ctx context.Context, payload *TaskPayload, priorityWeight int) error

EnqueueTask 根据权重入队任务 priorityWeight: 任务权重,根据配置的队列权重自动选择最合适的队列

func (*TaskManager) GetQueueStats

func (m *TaskManager) GetQueueStats(ctx context.Context) (map[string]interface{}, error)

GetQueueStats 获取队列统计信息

func (*TaskManager) GetTaskStatus

func (m *TaskManager) GetTaskStatus(ctx context.Context, taskID string) (string, error)

GetTaskStatus 获取任务状态

type TaskPayload

type TaskPayload struct {
	TaskID        string            `json:"task_id"`
	TaskType      string            `json:"task_type"`
	Priority      int               `json:"priority"`
	PipelineID    string            `json:"pipeline_id"`
	PipelineRunID string            `json:"pipeline_run_id"`
	StageID       string            `json:"stage_id"`
	Stage         int               `json:"stage"`
	AgentID       string            `json:"agent_id"`
	Name          string            `json:"name"`
	Commands      []string          `json:"commands"`
	Env           map[string]string `json:"env"`
	Workspace     string            `json:"workspace"`
	Timeout       int               `json:"timeout"`
	RetryCount    int               `json:"retry_count"`
	LabelSelector map[string]any    `json:"label_selector"`
	Data          map[string]any    `json:"data"` // 扩展数据
}

TaskPayload 任务负载

type TaskQueue

type TaskQueue struct {
	// contains filtered or unexported fields
}

TaskQueue 基于 asynq 的分布式任务队列

func NewTaskQueue

func NewTaskQueue(cfg *Config) (*TaskQueue, error)

NewTaskQueue 创建任务队列

func (*TaskQueue) Enqueue

func (q *TaskQueue) Enqueue(payload *TaskPayload, queueName string) error

Enqueue 入队任务

func (*TaskQueue) EnqueueDelayed

func (q *TaskQueue) EnqueueDelayed(payload *TaskPayload, delay time.Duration, queueName string) error

EnqueueDelayed 延迟入队任务

func (*TaskQueue) EnqueueWithPriority

func (q *TaskQueue) EnqueueWithPriority(payload *TaskPayload, priorityWeight int) error

EnqueueWithPriority 按优先级入队任务 EnqueueWithPriority 根据权重入队任务 priorityWeight: 任务权重,根据配置的队列权重自动选择最合适的队列 选择策略:找到权重 >= priorityWeight 的最小队列(向上匹配),如果没有则使用权重最高的队列

func (*TaskQueue) GetClient

func (q *TaskQueue) GetClient() *asynq.Client

GetClient 获取 asynq 客户端

func (*TaskQueue) GetRedisConnOpt

func (q *TaskQueue) GetRedisConnOpt() asynq.RedisConnOpt

GetRedisConnOpt 获取 Redis 连接选项(用于创建 Inspector)

func (*TaskQueue) GetServer

func (q *TaskQueue) GetServer() *asynq.Server

GetServer 获取 asynq 服务器

func (*TaskQueue) RegisterHandler

func (q *TaskQueue) RegisterHandler(taskType string, handler TaskHandler)

RegisterHandler 注册任务处理器

func (*TaskQueue) RegisterHandlerFunc

func (q *TaskQueue) RegisterHandlerFunc(taskType string, handlerFunc TaskHandlerFunc)

RegisterHandlerFunc 注册任务处理器函数

func (*TaskQueue) Run

func (q *TaskQueue) Run() error

Run 启动任务队列服务器并阻塞等待信号 此方法会阻塞直到收到退出信号,然后自动关闭服务器

func (*TaskQueue) Shutdown

func (q *TaskQueue) Shutdown()

Shutdown 关闭任务队列服务器

func (*TaskQueue) Start

func (q *TaskQueue) Start() error

Start 启动任务队列服务器 注意:Start() 方法会立即返回,不会阻塞。如果需要阻塞等待,请使用 Run() 方法。

type TaskStatusUpdater

type TaskStatusUpdater interface {
	UpdateTaskStatus(ctx context.Context, taskID string, status int, result map[string]interface{}) error
}

TaskStatusUpdater 任务状态更新器接口

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL