impl

package
v0.0.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2026 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AgentServiceImpl

type AgentServiceImpl struct {
	ioc.ObjectImpl

	// Agent 任务运行广播主题
	TaskRunTopic string `json:"task_run_topic" toml:"task_run_topic" yaml:"task_run_topic"`
	// Agent调度确认超时时间,单位秒
	ScheduledConfirmTTL int64 `json:"scheduled_confirm_ttl" toml:"scheduled_confirm_ttl" yaml:"scheduled_confirm_ttl"`

	// 负载均衡配置
	// Redis Key 前缀 - 存储各节点的 Agent 数量
	RedisKeyNodeAgents string `json:"redis_key_node_agents" toml:"redis_key_node_agents" yaml:"redis_key_node_agents"`
	// Redis Key 前缀 - 存储活跃节点列表
	RedisKeyNodeActive string `json:"redis_key_node_active" toml:"redis_key_node_active" yaml:"redis_key_node_active"`
	// Redis Key 前缀 - 节点最后心跳时间
	RedisKeyNodeLastHeartbeat string `json:"redis_key_node_last_heartbeat" toml:"redis_key_node_last_heartbeat" yaml:"redis_key_node_last_heartbeat"`
	// 允许的不均衡阈值(当前节点数量 > 平均值 * 阈值时拒绝注册)
	BalanceThreshold float64 `json:"balance_threshold" toml:"balance_threshold" yaml:"balance_threshold"`
	// 最小均衡节点数,低于该数量不进行均衡检查
	BalanceMinAgents int `json:"balance_min_agents" toml:"balance_min_agents" yaml:"balance_min_agents"`
	// 节点超时时间(秒),超过此时间未心跳的节点视为离线
	NodeTimeout int64 `json:"node_timeout" toml:"node_timeout" yaml:"node_timeout"` // 节点名称
	// 节点名称
	NodeName string `json:"node_name" toml:"node_name" yaml:"node_name"`
	// 注册锁超时时间(秒)
	RegistryLockTimeoutSeconds int64 `json:"registry_lock_timeout_second" toml:"registry_lock_timeout_second" yaml:"registry_lock_timeout_second"`

	// 性能优化:异步批量心跳更新(基于消息总线)
	// 心跳事件发布Topic
	HeartbeatTopic string `json:"heartbeat_topic" toml:"heartbeat_topic" yaml:"heartbeat_topic"`
	// 心跳批量处理间隔(默认1秒)
	HeartbeatBatchInterval time.Duration `json:"heartbeat_batch_interval" toml:"heartbeat_batch_interval" yaml:"heartbeat_batch_interval"`
	// 心跳批量大小(达到该数量即触发处理)
	HeartbeatBatchSize int `json:"heartbeat_batch_size" toml:"heartbeat_batch_size" yaml:"heartbeat_batch_size"`
	// contains filtered or unexported fields
}

func (*AgentServiceImpl) AddConn

func (i *AgentServiceImpl) AddConn(conn *agent.Connection)

func (*AgentServiceImpl) CheckBalanceBeforeRegister

func (i *AgentServiceImpl) CheckBalanceBeforeRegister(ctx context.Context, agentId string) (*BalanceDecision, error)

CheckBalanceBeforeRegister 注册前检查负载均衡

func (*AgentServiceImpl) Close

func (i *AgentServiceImpl) Close(ctx context.Context)

func (*AgentServiceImpl) CloseConnection

func (i *AgentServiceImpl) CloseConnection(ctx context.Context, in *agent.CloseConnectionRequest) error

CloseConnection implements agent.Register.

func (*AgentServiceImpl) ConnExists

func (i *AgentServiceImpl) ConnExists(agentId string) bool

检查连接是否存在

func (*AgentServiceImpl) CreateAgent

CreateAgent implements agent.Service.

func (*AgentServiceImpl) DecrementAgentCount

func (i *AgentServiceImpl) DecrementAgentCount(ctx context.Context) error

DecrementAgentCount 减少节点的 Agent 计数(断开连接后调用)

func (*AgentServiceImpl) DelConn

func (i *AgentServiceImpl) DelConn(agentId string)

func (*AgentServiceImpl) DescribeAgent

func (i *AgentServiceImpl) DescribeAgent(ctx context.Context, in *agent.DescribeAgentRequest) (*agent.Agent, error)

DescribeAgent implements agent.Service. 使用缓存优化频繁查询,减少数据库压力

func (*AgentServiceImpl) GetAndRemoveConn

func (i *AgentServiceImpl) GetAndRemoveConn(agentId string) *agent.Connection

原子性地获取并删除连接

func (*AgentServiceImpl) GetConnection

GetConnection implements agent.Register.

func (*AgentServiceImpl) GetCurrentNodeLoad

func (i *AgentServiceImpl) GetCurrentNodeLoad(ctx context.Context) (*NodeLoadInfo, error)

GetCurrentNodeLoad 获取当前节点的负载信息(用于监控)

func (*AgentServiceImpl) GetHeartbeatBufferSize

func (i *AgentServiceImpl) GetHeartbeatBufferSize() int

GetHeartbeatBufferSize 获取当前心跳缓冲区大小(用于监控)

func (*AgentServiceImpl) HanleTaskResponse

func (i *AgentServiceImpl) HanleTaskResponse(ctx context.Context) error

HandleBroadcastTask 处理其他节点广播过来的Agent运行任务

func (*AgentServiceImpl) HeartbeatReport

心跳上报

func (*AgentServiceImpl) IncrementAgentCount

func (i *AgentServiceImpl) IncrementAgentCount(ctx context.Context) error

IncrementAgentCount 增加节点的 Agent 计数(注册成功后调用)

func (*AgentServiceImpl) Init

func (i *AgentServiceImpl) Init() error

func (*AgentServiceImpl) Name

func (i *AgentServiceImpl) Name() string

func (*AgentServiceImpl) QueryAgent

QueryAgent implements agent.Service.

func (*AgentServiceImpl) RegisterAgent

RegisterAgent implements agent.Service.

func (*AgentServiceImpl) RunTask

func (i *AgentServiceImpl) RunTask(ctx context.Context, in *task.Task) error

RunTask implements agent.Service.

func (*AgentServiceImpl) ScheduledAgent

func (i *AgentServiceImpl) ScheduledAgent(ctx context.Context, in *task.Task) (*agent.Agent, error)

func (*AgentServiceImpl) UpdateAgent

UpdateAgent implements agent.Service.

func (*AgentServiceImpl) WithConn

func (i *AgentServiceImpl) WithConn(agentId string, fn func(conn *agent.Connection) error) error

在连接上执行操作(线程安全)

type BalanceDecision

type BalanceDecision struct {
	// 是否允许注册
	Allowed bool
	// 原因说明
	Reason string
	// 建议注册的节点(如果当前节点不允许)
	SuggestedNode string
	// 当前节点 Agent 数量
	CurrentCount int64
	// 平均 Agent 数量
	AverageCount float64
}

BalanceDecision 均衡决策结果

type HeartbeatEvent

type HeartbeatEvent struct {
	AgentId   string             `json:"agent_id"`
	Timestamp time.Time          `json:"timestamp"`
	Status    *agent.AgentStatus `json:"status"`
}

HeartbeatEvent 心跳事件(用于bus消息总线)

type MetricResponseHandler

type MetricResponseHandler struct {
	// contains filtered or unexported fields
}

MetricResponseHandler 指标上报处理器

func NewMetricResponseHandler

func NewMetricResponseHandler(logger *zerolog.Logger) *MetricResponseHandler

NewMetricResponseHandler 创建指标上报处理器

func (*MetricResponseHandler) HandleResponse

func (h *MetricResponseHandler) HandleResponse(ctx context.Context, msg *agent.AgentMessage) error

HandleResponse 处理来自 Agent 的指标上报消息

type NodeLoadInfo

type NodeLoadInfo struct {
	NodeName      string
	AgentCount    int64
	LastHeartbeat int64
	IsActive      bool
}

NodeLoadInfo 节点负载信息

type TaskMessageDispatcher

type TaskMessageDispatcher struct {
	// contains filtered or unexported fields
}

TaskMessageDispatcher 任务消息分发器

func NewTaskMessageDispatcher

func NewTaskMessageDispatcher(logger *zerolog.Logger) *TaskMessageDispatcher

NewTaskMessageDispatcher 创建任务消息分发器

func (*TaskMessageDispatcher) HandleResponse

func (d *TaskMessageDispatcher) HandleResponse(ctx context.Context, msg *agent.AgentMessage) error

HandleResponse 处理任务消息

func (*TaskMessageDispatcher) Register

func (d *TaskMessageDispatcher) Register(requestId string, handler agent.ResponseHandler)

Register 注册任务响应处理器

func (*TaskMessageDispatcher) Unregister

func (d *TaskMessageDispatcher) Unregister(requestId string)

Unregister 取消注册任务响应处理器

type TaskResponseHandler

type TaskResponseHandler struct {
	// contains filtered or unexported fields
}

TaskResponseHandler 任务响应处理器

func NewTaskResponseHandler

func NewTaskResponseHandler(taskId string, logger *zerolog.Logger) *TaskResponseHandler

NewTaskResponseHandler 创建任务响应处理器

func (*TaskResponseHandler) HandleResponse

func (h *TaskResponseHandler) HandleResponse(ctx context.Context, msg *agent.AgentMessage) error

HandleResponse 处理来自 Agent 的响应消息

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL