Documentation
¶
Index ¶
- Constants
- Variables
- type Client
- func (c *Client) GetRedisConnOpt() asynq.RedisConnOpt
- func (c *Client) GetServer() *asynq.Server
- func (c *Client) RegisterHandler(taskType string, handler TaskHandler)
- func (c *Client) RegisterHandlerFunc(taskType string, handlerFunc TaskHandlerFunc)
- func (c *Client) Run() error
- func (c *Client) Shutdown()
- func (c *Client) Start() error
- type Config
- type DefaultTaskHandler
- type JobTaskHandler
- type PipelineTaskHandler
- type Server
- func (s *Server) Enqueue(payload *TaskPayload, queueName string) error
- func (s *Server) EnqueueDelayed(payload *TaskPayload, delay time.Duration, queueName string) error
- func (s *Server) EnqueueWithPriority(payload *TaskPayload, priorityWeight int) error
- func (s *Server) GetClient() *asynq.Client
- func (s *Server) GetRedisConnOpt() asynq.RedisConnOpt
- func (s *Server) Shutdown()
- type StepRunRecordManager
- func (m *StepRunRecordManager) RecordStepRunCompleted(payload *TaskPayload)
- func (m *StepRunRecordManager) RecordStepRunEnqueued(payload *TaskPayload, queueName string)
- func (m *StepRunRecordManager) RecordStepRunFailed(payload *TaskPayload, err error)
- func (m *StepRunRecordManager) RecordStepRunStarted(payload *TaskPayload)
- type StepTaskHandler
- type TaskExecutor
- type TaskHandler
- type TaskHandlerFunc
- type TaskManager
- func (m *TaskManager) CancelTask(ctx context.Context, taskID string) error
- func (m *TaskManager) EnqueueDelayedTask(ctx context.Context, payload *TaskPayload, delay time.Duration) error
- func (m *TaskManager) EnqueueTask(ctx context.Context, payload *TaskPayload, priorityWeight int) error
- func (m *TaskManager) GetQueueStats(ctx context.Context) (map[string]interface{}, error)
- func (m *TaskManager) GetTaskStatus(ctx context.Context, taskID string) (string, error)
- type TaskPayload
- type TaskQueue
- func (q *TaskQueue) Enqueue(payload *TaskPayload, queueName string) error
- func (q *TaskQueue) EnqueueDelayed(payload *TaskPayload, delay time.Duration, queueName string) error
- func (q *TaskQueue) EnqueueWithPriority(payload *TaskPayload, priorityWeight int) error
- func (q *TaskQueue) GetClient() *asynq.Client
- func (q *TaskQueue) GetRedisConnOpt() asynq.RedisConnOpt
- func (q *TaskQueue) GetServer() *asynq.Server
- func (q *TaskQueue) RegisterHandler(taskType string, handler TaskHandler)
- func (q *TaskQueue) RegisterHandlerFunc(taskType string, handlerFunc TaskHandlerFunc)
- func (q *TaskQueue) Run() error
- func (q *TaskQueue) Shutdown()
- func (q *TaskQueue) Start() error
- type TaskStatusUpdater
Constants ¶
const ( TaskTypePipeline = "pipeline" // 流水线任务 TaskTypeJob = "job" // 作业任务 TaskTypeStep = "step" // 步骤任务 TaskTypeCustom = "custom" // 自定义任务 )
任务类型常量
const ( Critical = "critical" // 关键队列(优先级最高) Default = "default" // 默认队列 Low = "low" // 低优先级队列 )
队列名称常量
const ( TaskRecordStatusPending = "pending" // 等待中 TaskRecordStatusRunning = "running" // 执行中 TaskRecordStatusCompleted = "completed" // 已完成 TaskRecordStatusFailed = "failed" // 失败 )
任务记录状态常量
Variables ¶
var AgentProviderSet = wire.NewSet( ProvideAgentConfig, ProvideQueueClient, ProvidePipelineTaskHandler, ProvideJobTaskHandler, ProvideStepTaskHandler, )
AgentProviderSet 提供 queue 相关的依赖(Agent 使用)
var ProviderSet = wire.NewSet( ProvideConfig, ProvideQueueServer, )
ProviderSet 提供 queue 相关的依赖(主程序使用)
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client 队列客户端(Agent 使用) 负责执行任务,不发布任务
func NewQueueClient ¶
NewQueueClient 创建队列客户端(Agent 使用)
func ProvideQueueClient ¶
ProvideQueueClient 提供队列客户端(Agent 使用)
func (*Client) GetRedisConnOpt ¶
func (c *Client) GetRedisConnOpt() asynq.RedisConnOpt
GetRedisConnOpt 获取 Redis 连接选项(用于创建 Inspector)
func (*Client) RegisterHandler ¶
func (c *Client) RegisterHandler(taskType string, handler TaskHandler)
RegisterHandler 注册任务处理器
func (*Client) RegisterHandlerFunc ¶
func (c *Client) RegisterHandlerFunc(taskType string, handlerFunc TaskHandlerFunc)
RegisterHandlerFunc 注册任务处理器函数
type Config ¶
type Config struct {
RedisClient redis.UniversalClient // Redis 客户端(复用已有的客户端)
ClickHouse *gorm.DB // ClickHouse GORM 实例(用于任务记录)
Concurrency int // 并发处理数
StrictPriority bool // 是否严格优先级
Queues map[string]int // 队列配置:队列名 -> 优先级权重
DefaultQueue string // 默认队列名称
LogLevel string // 日志级别: debug, info, warn, error
ShutdownTimeout int // 关闭超时时间(秒)
GroupGracePeriod int // 组优雅关闭周期(秒)
GroupMaxDelay int // 组最大延迟(秒)
GroupMaxSize int // 组最大大小
}
Config queue 配置
func ProvideAgentConfig ¶
func ProvideAgentConfig(agentConf *agentconfig.AgentConfig, cmdable redis.Cmdable) (*Config, error)
ProvideAgentConfig 提供 queue 配置(Agent 使用)
type DefaultTaskHandler ¶
type DefaultTaskHandler struct {
// contains filtered or unexported fields
}
DefaultTaskHandler 默认任务处理器(带状态更新和执行器)
func NewDefaultTaskHandler ¶
func NewDefaultTaskHandler(statusUpdater TaskStatusUpdater, executor TaskExecutor) *DefaultTaskHandler
func (*DefaultTaskHandler) HandleTask ¶
func (h *DefaultTaskHandler) HandleTask(ctx context.Context, payload *TaskPayload) error
type JobTaskHandler ¶
type JobTaskHandler struct {
}
JobTaskHandler 作业任务处理器
func NewJobTaskHandler ¶
func NewJobTaskHandler() *JobTaskHandler
func ProvideJobTaskHandler ¶
func ProvideJobTaskHandler() *JobTaskHandler
ProvideJobTaskHandler 提供作业任务处理器
func (*JobTaskHandler) HandleTask ¶
func (h *JobTaskHandler) HandleTask(ctx context.Context, payload *TaskPayload) error
type PipelineTaskHandler ¶
type PipelineTaskHandler struct {
}
PipelineTaskHandler 流水线任务处理器
func NewPipelineTaskHandler ¶
func NewPipelineTaskHandler() *PipelineTaskHandler
func ProvidePipelineTaskHandler ¶
func ProvidePipelineTaskHandler() *PipelineTaskHandler
ProvidePipelineTaskHandler 提供流水线任务处理器
func (*PipelineTaskHandler) HandleTask ¶
func (h *PipelineTaskHandler) HandleTask(ctx context.Context, payload *TaskPayload) error
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server 队列服务器(主程序使用) 负责任务发布和调度,不执行任务
func NewQueueServer ¶
NewQueueServer 创建队列服务器(主程序使用)
func ProvideQueueServer ¶
ProvideQueueServer 提供队列服务器(主程序使用)
func (*Server) Enqueue ¶
func (s *Server) Enqueue(payload *TaskPayload, queueName string) error
Enqueue 入队任务
func (*Server) EnqueueDelayed ¶
EnqueueDelayed 延迟入队任务
func (*Server) EnqueueWithPriority ¶
func (s *Server) EnqueueWithPriority(payload *TaskPayload, priorityWeight int) error
EnqueueWithPriority 根据权重入队任务 priorityWeight: 任务权重,根据配置的队列权重自动选择最合适的队列 选择策略:找到权重 >= priorityWeight 的最小队列(向上匹配),如果没有则使用权重最高的队列
func (*Server) GetRedisConnOpt ¶
func (s *Server) GetRedisConnOpt() asynq.RedisConnOpt
GetRedisConnOpt 获取 Redis 连接选项(用于创建 Inspector)
type StepRunRecordManager ¶
type StepRunRecordManager struct {
// contains filtered or unexported fields
}
StepRunRecordManager 步骤执行记录管理器,负责将步骤执行状态写入 ClickHouse
func NewStepRunRecordManager ¶
func NewStepRunRecordManager(clickHouse *gorm.DB) (*StepRunRecordManager, error)
NewStepRunRecordManager 创建步骤执行记录管理器
func (*StepRunRecordManager) RecordStepRunCompleted ¶
func (m *StepRunRecordManager) RecordStepRunCompleted(payload *TaskPayload)
RecordStepRunCompleted 记录步骤执行完成
func (*StepRunRecordManager) RecordStepRunEnqueued ¶
func (m *StepRunRecordManager) RecordStepRunEnqueued(payload *TaskPayload, queueName string)
RecordStepRunEnqueued 记录步骤执行入队
func (*StepRunRecordManager) RecordStepRunFailed ¶
func (m *StepRunRecordManager) RecordStepRunFailed(payload *TaskPayload, err error)
RecordStepRunFailed 记录步骤执行失败
func (*StepRunRecordManager) RecordStepRunStarted ¶
func (m *StepRunRecordManager) RecordStepRunStarted(payload *TaskPayload)
RecordStepRunStarted 记录步骤执行开始
type StepTaskHandler ¶
type StepTaskHandler struct {
}
StepTaskHandler 步骤任务处理器
func NewStepTaskHandler ¶
func NewStepTaskHandler() *StepTaskHandler
func ProvideStepTaskHandler ¶
func ProvideStepTaskHandler() *StepTaskHandler
ProvideStepTaskHandler 提供步骤任务处理器
func (*StepTaskHandler) HandleTask ¶
func (h *StepTaskHandler) HandleTask(ctx context.Context, payload *TaskPayload) error
type TaskExecutor ¶
type TaskExecutor interface {
ExecuteTask(ctx context.Context, payload *TaskPayload) error
}
TaskExecutor 任务执行器接口
type TaskHandler ¶
type TaskHandler interface {
HandleTask(ctx context.Context, payload *TaskPayload) error
}
TaskHandler 任务处理器接口
type TaskHandlerFunc ¶
type TaskHandlerFunc func(ctx context.Context, payload *TaskPayload) error
TaskHandlerFunc 任务处理器函数类型
func (TaskHandlerFunc) HandleTask ¶
func (f TaskHandlerFunc) HandleTask(ctx context.Context, payload *TaskPayload) error
type TaskManager ¶
type TaskManager struct {
// contains filtered or unexported fields
}
TaskManager 任务管理器
func (*TaskManager) CancelTask ¶
func (m *TaskManager) CancelTask(ctx context.Context, taskID string) error
CancelTask 取消任务
func (*TaskManager) EnqueueDelayedTask ¶
func (m *TaskManager) EnqueueDelayedTask(ctx context.Context, payload *TaskPayload, delay time.Duration) error
EnqueueDelayedTask 入队延迟任务
func (*TaskManager) EnqueueTask ¶
func (m *TaskManager) EnqueueTask(ctx context.Context, payload *TaskPayload, priorityWeight int) error
EnqueueTask 根据权重入队任务 priorityWeight: 任务权重,根据配置的队列权重自动选择最合适的队列
func (*TaskManager) GetQueueStats ¶
func (m *TaskManager) GetQueueStats(ctx context.Context) (map[string]interface{}, error)
GetQueueStats 获取队列统计信息
func (*TaskManager) GetTaskStatus ¶
GetTaskStatus 获取任务状态
type TaskPayload ¶
type TaskPayload struct {
TaskID string `json:"task_id"`
TaskType string `json:"task_type"`
Priority int `json:"priority"`
PipelineID string `json:"pipeline_id"`
PipelineRunID string `json:"pipeline_run_id"`
StageID string `json:"stage_id"`
Stage int `json:"stage"`
AgentID string `json:"agent_id"`
Name string `json:"name"`
Commands []string `json:"commands"`
Env map[string]string `json:"env"`
Workspace string `json:"workspace"`
Timeout int `json:"timeout"`
RetryCount int `json:"retry_count"`
LabelSelector map[string]any `json:"label_selector"`
Data map[string]any `json:"data"` // 扩展数据
}
TaskPayload 任务负载
type TaskQueue ¶
type TaskQueue struct {
// contains filtered or unexported fields
}
TaskQueue 基于 asynq 的分布式任务队列
func (*TaskQueue) Enqueue ¶
func (q *TaskQueue) Enqueue(payload *TaskPayload, queueName string) error
Enqueue 入队任务
func (*TaskQueue) EnqueueDelayed ¶
func (q *TaskQueue) EnqueueDelayed(payload *TaskPayload, delay time.Duration, queueName string) error
EnqueueDelayed 延迟入队任务
func (*TaskQueue) EnqueueWithPriority ¶
func (q *TaskQueue) EnqueueWithPriority(payload *TaskPayload, priorityWeight int) error
EnqueueWithPriority 按优先级入队任务 EnqueueWithPriority 根据权重入队任务 priorityWeight: 任务权重,根据配置的队列权重自动选择最合适的队列 选择策略:找到权重 >= priorityWeight 的最小队列(向上匹配),如果没有则使用权重最高的队列
func (*TaskQueue) GetRedisConnOpt ¶
func (q *TaskQueue) GetRedisConnOpt() asynq.RedisConnOpt
GetRedisConnOpt 获取 Redis 连接选项(用于创建 Inspector)
func (*TaskQueue) RegisterHandler ¶
func (q *TaskQueue) RegisterHandler(taskType string, handler TaskHandler)
RegisterHandler 注册任务处理器
func (*TaskQueue) RegisterHandlerFunc ¶
func (q *TaskQueue) RegisterHandlerFunc(taskType string, handlerFunc TaskHandlerFunc)
RegisterHandlerFunc 注册任务处理器函数