Documentation
¶
Index ¶
- Constants
- func RegistryTaskHandler(t task.TYPE, h agent.TaskHandler)
- type Client
- func (c *Client) Close(ctx context.Context)
- func (c *Client) CloseLogBuffer(taskId string) error
- func (c *Client) Connect() error
- func (c *Client) GetOrCreateLogBuffer(requestId, taskId string) *LogBuffer
- func (c *Client) GetRetryQueue() *RetryQueue
- func (c *Client) GetRetryQueueCleanupInterval() time.Duration
- func (c *Client) GetRetryQueueDiskTTL() time.Duration
- func (c *Client) GetRetryQueueLoadInterval() time.Duration
- func (c *Client) Init() error
- func (c *Client) IsConnected() bool
- func (c *Client) Name() string
- func (c *Client) SendTaskConfirm(requestId, taskId string) error
- func (c *Client) SendTaskLog(requestId, taskId, content string) error
- func (c *Client) SendTaskLogBuffered(requestId, taskId, content string) error
- func (c *Client) SendTaskResult(requestId string, resp *agent.RunTaskResponse) error
- type LogBuffer
- type RetryMessage
- type RetryQueue
Constants ¶
View Source
const ( // 默认缓冲区大小(字节) DefaultBufferSize = 4096 // 4KB // 默认刷新间隔 DefaultFlushInterval = 1 * time.Second )
View Source
const (
APP_NAME = "agent.connect"
)
Variables ¶
This section is empty.
Functions ¶
func RegistryTaskHandler ¶
func RegistryTaskHandler(t task.TYPE, h agent.TaskHandler)
Types ¶
type Client ¶
type Client struct {
ioc.ObjectImpl
ServerAddress string `json:"server_address" yaml:"server_address" toml:"server_address" env:"SERVER_ADDRESS" validate:"required,url"`
AgentId string `json:"agent_id" yaml:"agent_id" toml:"agent_id" env:"AGENT_ID" validate:"required"`
AgentKey string `json:"agent_key" yaml:"agent_key" toml:"agent_key" env:"AGENT_KEY" validate:"required"`
PingInterval int `json:"ping_interval" yaml:"ping_interval" toml:"ping_interval" env:"PING_INTERVAL" validate:"required"`
// 握手超时时间, 默认5秒
HandshakeTimeoutSecond int `` /* 146-byte string literal not displayed */
// 消息重试队列配置
// 重试队列存储目录,默认 data/retry-queue
RetryQueueDir string `json:"retry_queue_dir" yaml:"retry_queue_dir" toml:"retry_queue_dir" env:"RETRY_QUEUE_DIR"`
// 内存队列最大容量,默认1000
RetryQueueMemorySize int `json:"retry_queue_memory_size" yaml:"retry_queue_memory_size" toml:"retry_queue_memory_size" env:"RETRY_QUEUE_MEMORY_SIZE"`
// 磁盘队列最大文件数,默认10000
RetryQueueDiskSize int `json:"retry_queue_disk_size" yaml:"retry_queue_disk_size" toml:"retry_queue_disk_size" env:"RETRY_QUEUE_DISK_SIZE"`
// 单条消息最大重试次数,默认3
RetryQueueMaxRetries int `json:"retry_queue_max_retries" yaml:"retry_queue_max_retries" toml:"retry_queue_max_retries" env:"RETRY_QUEUE_MAX_RETRIES"`
// 磁盘文件保留时间,默认24小时
RetryQueueDiskTTL string `json:"retry_queue_disk_ttl" yaml:"retry_queue_disk_ttl" toml:"retry_queue_disk_ttl" env:"RETRY_QUEUE_DISK_TTL"`
// 磁盘队列告警阈值,默认5000
RetryQueueWarningThreshold int `` /* 146-byte string literal not displayed */
// 后台加载间隔,默认500ms
RetryQueueLoadInterval string `` /* 130-byte string literal not displayed */
// 单次加载最大消息数,默认100
RetryQueueBatchLoadSize int `` /* 138-byte string literal not displayed */
// 过期文件清理间隔,默认1小时
RetryQueueCleanupInterval string `` /* 142-byte string literal not displayed */
// contains filtered or unexported fields
}
并发模型说明: - mu: 统一互斥锁,保护连接状态、WebSocket 写入与最后活动时间。
- 重连/关闭/发送/心跳都通过该锁序列化,避免在连接切换期间写入;
- 所有 WebSocket 写操作必须在 mu 写锁下执行;
- 最后活动时间的读取用 RLock,更新在持有写锁时直接写入。
- 其他细粒度锁(logSequenceMu/logBuffersMu)分别用于独立的任务日志序列与缓冲管理,避免跨职责的大锁竞争。
func (*Client) CloseLogBuffer ¶
CloseLogBuffer 关闭并清理日志缓冲器
func (*Client) GetOrCreateLogBuffer ¶
GetOrCreateLogBuffer 获取或创建日志缓冲器
func (*Client) GetRetryQueueCleanupInterval ¶
GetRetryQueueCleanupInterval 获取清理间隔
func (*Client) GetRetryQueueDiskTTL ¶
GetRetryQueueDiskTTL 获取磁盘队列 TTL
func (*Client) GetRetryQueueLoadInterval ¶
GetRetryQueueLoadInterval 获取后台加载间隔
func (*Client) SendTaskConfirm ¶
SendTaskConfirm 发送任务调度确认 (使用Protocol V2)
func (*Client) SendTaskLog ¶
SendTaskLog 发送任务日志 (使用Protocol V2)
func (*Client) SendTaskLogBuffered ¶
SendTaskLogBuffered 通过缓冲器发送任务日志(推荐使用)
func (*Client) SendTaskResult ¶
func (c *Client) SendTaskResult(requestId string, resp *agent.RunTaskResponse) error
SendTaskResult 发送任务结果 (使用Protocol V2)
type LogBuffer ¶
type LogBuffer struct {
// contains filtered or unexported fields
}
LogBuffer 日志缓冲器,用于批量发送日志
func NewLogBuffer ¶
NewLogBuffer 创建日志缓冲器
type RetryMessage ¶
type RetryMessage struct {
Message *protocol.Message `json:"message"`
RetryAt time.Time `json:"retry_at"` // 下次重试时间
Attempt int `json:"attempt"` // 已重试次数
CreatedAt time.Time `json:"created_at"` // 创建时间
ID string `json:"id"` // 消息唯一ID
Status string `json:"status"` // 消息状态: pending, processing, completed
}
RetryMessage 待重发的消息
type RetryQueue ¶
type RetryQueue struct {
// contains filtered or unexported fields
}
RetryQueue 消息重发队列(混合存储:内存+磁盘)
func NewRetryQueue ¶
func NewRetryQueue(client *Client, diskDir string) (*RetryQueue, error)
NewRetryQueue 创建重试队列
func (*RetryQueue) Enqueue ¶
func (rq *RetryQueue) Enqueue(msg *protocol.Message) error
Enqueue 将消息加入重试队列
func (*RetryQueue) ProcessRetries ¶
func (rq *RetryQueue) ProcessRetries()
ProcessRetries 处理重试(连接恢复后调用)
Click to show internal directories.
Click to hide internal directories.