Documentation
¶
Index ¶
- type AgentServiceImpl
- func (i *AgentServiceImpl) AddConn(conn *agent.Connection)
- func (i *AgentServiceImpl) CheckBalanceBeforeRegister(ctx context.Context, agentId string) (*BalanceDecision, error)
- func (i *AgentServiceImpl) Close(ctx context.Context)
- func (i *AgentServiceImpl) CloseConnection(ctx context.Context, in *agent.CloseConnectionRequest) error
- func (i *AgentServiceImpl) ConnExists(agentId string) bool
- func (i *AgentServiceImpl) CreateAgent(ctx context.Context, in *agent.CreateAgentRequest) (*agent.Agent, error)
- func (i *AgentServiceImpl) DecrementAgentCount(ctx context.Context) error
- func (i *AgentServiceImpl) DelConn(agentId string)
- func (i *AgentServiceImpl) DescribeAgent(ctx context.Context, in *agent.DescribeAgentRequest) (*agent.Agent, error)
- func (i *AgentServiceImpl) GetAndRemoveConn(agentId string) *agent.Connection
- func (i *AgentServiceImpl) GetConnection(ctx context.Context, in *agent.GetConnectionRequest) (*agent.Connection, error)
- func (i *AgentServiceImpl) GetCurrentNodeLoad(ctx context.Context) (*NodeLoadInfo, error)
- func (i *AgentServiceImpl) GetHeartbeatBufferSize() int
- func (i *AgentServiceImpl) HanleTaskResponse(ctx context.Context) error
- func (i *AgentServiceImpl) HeartbeatReport(ctx context.Context, in *agent.HeartbeatReportRequest) (*agent.HeartbeatReportResponse, error)
- func (i *AgentServiceImpl) IncrementAgentCount(ctx context.Context) error
- func (i *AgentServiceImpl) Init() error
- func (i *AgentServiceImpl) Name() string
- func (s *AgentServiceImpl) QueryAgent(ctx context.Context, in *agent.QueryAgentRequest) (*types.Set[*agent.Agent], error)
- func (i *AgentServiceImpl) RegisterAgent(ctx context.Context, in *agent.AgentRegistryRequest) (*agent.Connection, error)
- func (i *AgentServiceImpl) RunTask(ctx context.Context, in *task.Task) error
- func (i *AgentServiceImpl) ScheduledAgent(ctx context.Context, in *task.Task) (*agent.Agent, error)
- func (i *AgentServiceImpl) UpdateAgent(ctx context.Context, in *agent.UpdateAgentRequest) (*agent.Agent, error)
- func (i *AgentServiceImpl) WithConn(agentId string, fn func(conn *agent.Connection) error) error
- type BalanceDecision
- type HeartbeatEvent
- type MetricResponseHandler
- type NodeLoadInfo
- type TaskMessageDispatcher
- type TaskResponseHandler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AgentServiceImpl ¶
type AgentServiceImpl struct {
ioc.ObjectImpl
// Agent 任务运行广播主题
TaskRunTopic string `json:"task_run_topic" toml:"task_run_topic" yaml:"task_run_topic"`
// Agent调度确认超时时间,单位秒
ScheduledConfirmTTL int64 `json:"scheduled_confirm_ttl" toml:"scheduled_confirm_ttl" yaml:"scheduled_confirm_ttl"`
// 负载均衡配置
// Redis Key 前缀 - 存储各节点的 Agent 数量
RedisKeyNodeAgents string `json:"redis_key_node_agents" toml:"redis_key_node_agents" yaml:"redis_key_node_agents"`
// Redis Key 前缀 - 存储活跃节点列表
RedisKeyNodeActive string `json:"redis_key_node_active" toml:"redis_key_node_active" yaml:"redis_key_node_active"`
// Redis Key 前缀 - 节点最后心跳时间
RedisKeyNodeLastHeartbeat string `json:"redis_key_node_last_heartbeat" toml:"redis_key_node_last_heartbeat" yaml:"redis_key_node_last_heartbeat"`
// 允许的不均衡阈值(当前节点数量 > 平均值 * 阈值时拒绝注册)
BalanceThreshold float64 `json:"balance_threshold" toml:"balance_threshold" yaml:"balance_threshold"`
// 最小均衡节点数,低于该数量不进行均衡检查
BalanceMinAgents int `json:"balance_min_agents" toml:"balance_min_agents" yaml:"balance_min_agents"`
// 节点超时时间(秒),超过此时间未心跳的节点视为离线
NodeTimeout int64 `json:"node_timeout" toml:"node_timeout" yaml:"node_timeout"` // 节点名称
// 节点名称
NodeName string `json:"node_name" toml:"node_name" yaml:"node_name"`
// 注册锁超时时间(秒)
RegistryLockTimeoutSeconds int64 `json:"registry_lock_timeout_second" toml:"registry_lock_timeout_second" yaml:"registry_lock_timeout_second"`
// 性能优化:异步批量心跳更新(基于消息总线)
// 心跳事件发布Topic
HeartbeatTopic string `json:"heartbeat_topic" toml:"heartbeat_topic" yaml:"heartbeat_topic"`
// 心跳批量处理间隔(默认1秒)
HeartbeatBatchInterval time.Duration `json:"heartbeat_batch_interval" toml:"heartbeat_batch_interval" yaml:"heartbeat_batch_interval"`
// 心跳批量大小(达到该数量即触发处理)
HeartbeatBatchSize int `json:"heartbeat_batch_size" toml:"heartbeat_batch_size" yaml:"heartbeat_batch_size"`
// contains filtered or unexported fields
}
func (*AgentServiceImpl) AddConn ¶
func (i *AgentServiceImpl) AddConn(conn *agent.Connection)
func (*AgentServiceImpl) CheckBalanceBeforeRegister ¶
func (i *AgentServiceImpl) CheckBalanceBeforeRegister(ctx context.Context, agentId string) (*BalanceDecision, error)
CheckBalanceBeforeRegister 注册前检查负载均衡
func (*AgentServiceImpl) Close ¶
func (i *AgentServiceImpl) Close(ctx context.Context)
func (*AgentServiceImpl) CloseConnection ¶
func (i *AgentServiceImpl) CloseConnection(ctx context.Context, in *agent.CloseConnectionRequest) error
CloseConnection implements agent.Register.
func (*AgentServiceImpl) ConnExists ¶
func (i *AgentServiceImpl) ConnExists(agentId string) bool
检查连接是否存在
func (*AgentServiceImpl) CreateAgent ¶
func (i *AgentServiceImpl) CreateAgent(ctx context.Context, in *agent.CreateAgentRequest) (*agent.Agent, error)
CreateAgent implements agent.Service.
func (*AgentServiceImpl) DecrementAgentCount ¶
func (i *AgentServiceImpl) DecrementAgentCount(ctx context.Context) error
DecrementAgentCount 减少节点的 Agent 计数(断开连接后调用)
func (*AgentServiceImpl) DelConn ¶
func (i *AgentServiceImpl) DelConn(agentId string)
func (*AgentServiceImpl) DescribeAgent ¶
func (i *AgentServiceImpl) DescribeAgent(ctx context.Context, in *agent.DescribeAgentRequest) (*agent.Agent, error)
DescribeAgent implements agent.Service. 使用缓存优化频繁查询,减少数据库压力
func (*AgentServiceImpl) GetAndRemoveConn ¶
func (i *AgentServiceImpl) GetAndRemoveConn(agentId string) *agent.Connection
原子性地获取并删除连接
func (*AgentServiceImpl) GetConnection ¶
func (i *AgentServiceImpl) GetConnection(ctx context.Context, in *agent.GetConnectionRequest) (*agent.Connection, error)
GetConnection implements agent.Register.
func (*AgentServiceImpl) GetCurrentNodeLoad ¶
func (i *AgentServiceImpl) GetCurrentNodeLoad(ctx context.Context) (*NodeLoadInfo, error)
GetCurrentNodeLoad 获取当前节点的负载信息(用于监控)
func (*AgentServiceImpl) GetHeartbeatBufferSize ¶
func (i *AgentServiceImpl) GetHeartbeatBufferSize() int
GetHeartbeatBufferSize 获取当前心跳缓冲区大小(用于监控)
func (*AgentServiceImpl) HanleTaskResponse ¶
func (i *AgentServiceImpl) HanleTaskResponse(ctx context.Context) error
HandleBroadcastTask 处理其他节点广播过来的Agent运行任务
func (*AgentServiceImpl) HeartbeatReport ¶
func (i *AgentServiceImpl) HeartbeatReport(ctx context.Context, in *agent.HeartbeatReportRequest) (*agent.HeartbeatReportResponse, error)
心跳上报
func (*AgentServiceImpl) IncrementAgentCount ¶
func (i *AgentServiceImpl) IncrementAgentCount(ctx context.Context) error
IncrementAgentCount 增加节点的 Agent 计数(注册成功后调用)
func (*AgentServiceImpl) Init ¶
func (i *AgentServiceImpl) Init() error
func (*AgentServiceImpl) Name ¶
func (i *AgentServiceImpl) Name() string
func (*AgentServiceImpl) QueryAgent ¶
func (s *AgentServiceImpl) QueryAgent(ctx context.Context, in *agent.QueryAgentRequest) (*types.Set[*agent.Agent], error)
QueryAgent implements agent.Service.
func (*AgentServiceImpl) RegisterAgent ¶
func (i *AgentServiceImpl) RegisterAgent(ctx context.Context, in *agent.AgentRegistryRequest) (*agent.Connection, error)
RegisterAgent implements agent.Service.
func (*AgentServiceImpl) ScheduledAgent ¶
func (*AgentServiceImpl) UpdateAgent ¶
func (i *AgentServiceImpl) UpdateAgent(ctx context.Context, in *agent.UpdateAgentRequest) (*agent.Agent, error)
UpdateAgent implements agent.Service.
func (*AgentServiceImpl) WithConn ¶
func (i *AgentServiceImpl) WithConn(agentId string, fn func(conn *agent.Connection) error) error
在连接上执行操作(线程安全)
type BalanceDecision ¶
type BalanceDecision struct {
// 是否允许注册
Allowed bool
// 原因说明
Reason string
// 建议注册的节点(如果当前节点不允许)
SuggestedNode string
// 当前节点 Agent 数量
CurrentCount int64
// 平均 Agent 数量
AverageCount float64
}
BalanceDecision 均衡决策结果
type HeartbeatEvent ¶
type HeartbeatEvent struct {
AgentId string `json:"agent_id"`
Timestamp time.Time `json:"timestamp"`
Status *agent.AgentStatus `json:"status"`
}
HeartbeatEvent 心跳事件(用于bus消息总线)
type MetricResponseHandler ¶
type MetricResponseHandler struct {
// contains filtered or unexported fields
}
MetricResponseHandler 指标上报处理器
func NewMetricResponseHandler ¶
func NewMetricResponseHandler(logger *zerolog.Logger) *MetricResponseHandler
NewMetricResponseHandler 创建指标上报处理器
func (*MetricResponseHandler) HandleResponse ¶
func (h *MetricResponseHandler) HandleResponse(ctx context.Context, msg *agent.AgentMessage) error
HandleResponse 处理来自 Agent 的指标上报消息
type NodeLoadInfo ¶
NodeLoadInfo 节点负载信息
type TaskMessageDispatcher ¶
type TaskMessageDispatcher struct {
// contains filtered or unexported fields
}
TaskMessageDispatcher 任务消息分发器
func NewTaskMessageDispatcher ¶
func NewTaskMessageDispatcher(logger *zerolog.Logger) *TaskMessageDispatcher
NewTaskMessageDispatcher 创建任务消息分发器
func (*TaskMessageDispatcher) HandleResponse ¶
func (d *TaskMessageDispatcher) HandleResponse(ctx context.Context, msg *agent.AgentMessage) error
HandleResponse 处理任务消息
func (*TaskMessageDispatcher) Register ¶
func (d *TaskMessageDispatcher) Register(requestId string, handler agent.ResponseHandler)
Register 注册任务响应处理器
func (*TaskMessageDispatcher) Unregister ¶
func (d *TaskMessageDispatcher) Unregister(requestId string)
Unregister 取消注册任务响应处理器
type TaskResponseHandler ¶
type TaskResponseHandler struct {
// contains filtered or unexported fields
}
TaskResponseHandler 任务响应处理器
func NewTaskResponseHandler ¶
func NewTaskResponseHandler(taskId string, logger *zerolog.Logger) *TaskResponseHandler
NewTaskResponseHandler 创建任务响应处理器
func (*TaskResponseHandler) HandleResponse ¶
func (h *TaskResponseHandler) HandleResponse(ctx context.Context, msg *agent.AgentMessage) error
HandleResponse 处理来自 Agent 的响应消息