connect

package
v0.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2026 License: MIT Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// 默认缓冲区大小(字节)
	DefaultBufferSize = 4096 // 4KB
	// 默认刷新间隔
	DefaultFlushInterval = 1 * time.Second
)
View Source
const (
	APP_NAME = "agent.connect"
)

Variables

This section is empty.

Functions

func RegistryTaskHandler

func RegistryTaskHandler(t task.TYPE, h agent.TaskHandler)

Types

type Client

type Client struct {
	ioc.ObjectImpl

	ServerAddress string `json:"server_address" yaml:"server_address" toml:"server_address" env:"SERVER_ADDRESS" validate:"required,url"`
	AgentId       string `json:"agent_id" yaml:"agent_id" toml:"agent_id" env:"AGENT_ID" validate:"required"`
	AgentKey      string `json:"agent_key" yaml:"agent_key" toml:"agent_key" env:"AGENT_KEY" validate:"required"`
	PingInterval  int    `json:"ping_interval" yaml:"ping_interval" toml:"ping_interval" env:"PING_INTERVAL" validate:"required"`
	// 握手超时时间, 默认5秒
	HandshakeTimeoutSecond int `` /* 146-byte string literal not displayed */

	// 消息重试队列配置
	// 重试队列存储目录,默认 data/retry-queue
	RetryQueueDir string `json:"retry_queue_dir" yaml:"retry_queue_dir" toml:"retry_queue_dir" env:"RETRY_QUEUE_DIR"`
	// 内存队列最大容量,默认1000
	RetryQueueMemorySize int `json:"retry_queue_memory_size" yaml:"retry_queue_memory_size" toml:"retry_queue_memory_size" env:"RETRY_QUEUE_MEMORY_SIZE"`
	// 磁盘队列最大文件数,默认10000
	RetryQueueDiskSize int `json:"retry_queue_disk_size" yaml:"retry_queue_disk_size" toml:"retry_queue_disk_size" env:"RETRY_QUEUE_DISK_SIZE"`
	// 单条消息最大重试次数,默认3
	RetryQueueMaxRetries int `json:"retry_queue_max_retries" yaml:"retry_queue_max_retries" toml:"retry_queue_max_retries" env:"RETRY_QUEUE_MAX_RETRIES"`
	// 磁盘文件保留时间,默认24小时
	RetryQueueDiskTTL string `json:"retry_queue_disk_ttl" yaml:"retry_queue_disk_ttl" toml:"retry_queue_disk_ttl" env:"RETRY_QUEUE_DISK_TTL"`
	// 磁盘队列告警阈值,默认5000
	RetryQueueWarningThreshold int `` /* 146-byte string literal not displayed */
	// 后台加载间隔,默认500ms
	RetryQueueLoadInterval string `` /* 130-byte string literal not displayed */
	// 单次加载最大消息数,默认100
	RetryQueueBatchLoadSize int `` /* 138-byte string literal not displayed */
	// 过期文件清理间隔,默认1小时
	RetryQueueCleanupInterval string `` /* 142-byte string literal not displayed */
	// contains filtered or unexported fields
}

并发模型说明: - mu: 统一互斥锁,保护连接状态、WebSocket 写入与最后活动时间。

  • 重连/关闭/发送/心跳都通过该锁序列化,避免在连接切换期间写入;
  • 所有 WebSocket 写操作必须在 mu 写锁下执行;
  • 最后活动时间的读取用 RLock,更新在持有写锁时直接写入。

- 其他细粒度锁(logSequenceMu/logBuffersMu)分别用于独立的任务日志序列与缓冲管理,避免跨职责的大锁竞争。

func Get

func Get() *Client

func (*Client) Close

func (c *Client) Close(ctx context.Context)

关闭客户端

func (*Client) CloseLogBuffer

func (c *Client) CloseLogBuffer(taskId string) error

CloseLogBuffer 关闭并清理日志缓冲器

func (*Client) Connect

func (c *Client) Connect() error

Agent注册 /ws/devops/v1/agents

func (*Client) GetOrCreateLogBuffer

func (c *Client) GetOrCreateLogBuffer(requestId, taskId string) *LogBuffer

GetOrCreateLogBuffer 获取或创建日志缓冲器

func (*Client) GetRetryQueue

func (c *Client) GetRetryQueue() *RetryQueue

GetRetryQueue 获取重试队列

func (*Client) GetRetryQueueCleanupInterval

func (c *Client) GetRetryQueueCleanupInterval() time.Duration

GetRetryQueueCleanupInterval 获取清理间隔

func (*Client) GetRetryQueueDiskTTL

func (c *Client) GetRetryQueueDiskTTL() time.Duration

GetRetryQueueDiskTTL 获取磁盘队列 TTL

func (*Client) GetRetryQueueLoadInterval

func (c *Client) GetRetryQueueLoadInterval() time.Duration

GetRetryQueueLoadInterval 获取后台加载间隔

func (*Client) Init

func (c *Client) Init() error

func (*Client) IsConnected

func (c *Client) IsConnected() bool

检查连接状态

func (*Client) Name

func (c *Client) Name() string

func (*Client) SendTaskConfirm

func (c *Client) SendTaskConfirm(requestId, taskId string) error

SendTaskConfirm 发送任务调度确认 (使用Protocol V2)

func (*Client) SendTaskLog

func (c *Client) SendTaskLog(requestId, taskId, content string) error

SendTaskLog 发送任务日志 (使用Protocol V2)

func (*Client) SendTaskLogBuffered

func (c *Client) SendTaskLogBuffered(requestId, taskId, content string) error

SendTaskLogBuffered 通过缓冲器发送任务日志(推荐使用)

func (*Client) SendTaskResult

func (c *Client) SendTaskResult(requestId string, resp *agent.RunTaskResponse) error

SendTaskResult 发送任务结果 (使用Protocol V2)

type LogBuffer

type LogBuffer struct {
	// contains filtered or unexported fields
}

LogBuffer 日志缓冲器,用于批量发送日志

func NewLogBuffer

func NewLogBuffer(client *Client, requestId, taskId string) *LogBuffer

NewLogBuffer 创建日志缓冲器

func (*LogBuffer) Close

func (lb *LogBuffer) Close() error

Close 关闭缓冲器,刷新剩余内容

func (*LogBuffer) Flush

func (lb *LogBuffer) Flush() error

Flush 手动刷新缓冲区

func (*LogBuffer) Write

func (lb *LogBuffer) Write(content string) error

Write 写入日志内容

type RetryMessage

type RetryMessage struct {
	Message   *protocol.Message `json:"message"`
	RetryAt   time.Time         `json:"retry_at"`   // 下次重试时间
	Attempt   int               `json:"attempt"`    // 已重试次数
	CreatedAt time.Time         `json:"created_at"` // 创建时间
	ID        string            `json:"id"`         // 消息唯一ID
	Status    string            `json:"status"`     // 消息状态: pending, processing, completed
}

RetryMessage 待重发的消息

type RetryQueue

type RetryQueue struct {
	// contains filtered or unexported fields
}

RetryQueue 消息重发队列(混合存储:内存+磁盘)

func NewRetryQueue

func NewRetryQueue(client *Client, diskDir string) (*RetryQueue, error)

NewRetryQueue 创建重试队列

func (*RetryQueue) Close

func (rq *RetryQueue) Close() error

Close 关闭重试队列(停止后台 goroutine)

func (*RetryQueue) Enqueue

func (rq *RetryQueue) Enqueue(msg *protocol.Message) error

Enqueue 将消息加入重试队列

func (*RetryQueue) GetStats

func (rq *RetryQueue) GetStats() map[string]any

GetStats 获取统计信息

func (*RetryQueue) ProcessRetries

func (rq *RetryQueue) ProcessRetries()

ProcessRetries 处理重试(连接恢复后调用)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL