agent

package
v1.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2026 License: MIT Imports: 12 Imported by: 0

README

Baremetal Agent Bridge

go-micro/registry/agent 用于承接业务服务与本机 sidecar-agent 的联动能力。

定位

这个包是当前 Firefly 裸机版本的主路径

它不直接对接 Consul,也不直接承担服务发现职责。

它只负责:

  • 持有业务服务的标准注册描述
  • 对本机 sidecar-agent 发起 register
  • 在本地连接恢复后自动重放注册
  • 对外提供统一的 drain / deregister 入口

设计目标

  • 不让业务服务自己轮询 agent
  • 不让业务服务直接处理 agent 重启逻辑
  • 把“注册描述缓存 + 连接恢复后重放”收敛到核心库
  • 给后续 v2.2 的 agent lifecycle 机制提供统一接入点
  • 把业务服务和 consul / envoy 的耦合统一收口到 sidecar-agent

当前范围

当前仅提供通用模型、契约与控制器骨架:

  • RegisterRequest
  • DrainRequest
  • DeregisterRequest
  • DescriptorProvider
  • Client
  • Controller
  • ConnectionEvent
  • EventSource
  • Runner
  • JSONHTTPClient
  • WatchSource
  • LocalRuntime
  • ServiceRegistration
  • ServiceRegistrationProvider
  • GRPCDescriptorOptions
  • NewServiceRegistrationFromGRPC(...)
  • NewServiceLifecycleFromGRPC(...)
  • ServiceLifecycle
  • ManagedServer

暂不包含:

  • 与业务框架自动启动集成的完整实现
  • 与 sidecar-agent 的 lease / stream 协议实现

当前边界

当前默认假设:

业务服务
  → registry/agent
  → 本机 sidecar-agent
  → sidecar-agent 对接 consul / envoy

因此:

  • 业务服务不再直接对接 go-consul/registry
  • 业务服务不再直接感知 envoy
  • registry/agent 只处理本机 agent 生命周期,不处理云原生 mesh 语义

K8s 说明

这个包是裸机专用桥接层

在 K8s 中应视为:

  • 不进入业务服务主链
  • 不承担注册/摘流/注销职责
  • 不复用本机 sidecar-agent + consul + envoy 这一套运行模型

也就是说:

  • 裸机走 registry/agent
  • K8s 走 k8s + mesh + invocation

后续演进

后续可在此基础上继续补:

  • 更强约束的本地长连接协议
  • 连接状态订阅
  • 注册重放退避策略
  • 与 go-micro 启动钩子集成

建议接入方式

业务服务若已经在使用本地服务节点模型,可优先复用:

  • ServiceRegistration
  • ServiceRegistrationProvider
  • NewLocalRuntimeFromServiceRegistration(...)

这样可以直接把已有服务元信息映射成 sidecar-agent 的注册请求,减少重复拼装代码。

node := &agent.ServiceNode{
  Weight: 100,
  Methods: map[string]bool{
    "/acme.auth.v1.AuthService/Login": true,
  },
  Kernel: &agent.ServiceKernel{
    Language: "go",
    Version:  "go-micro/v1.12.0",
  },
  Meta: &agent.ServiceMeta{
    AppId: "10001",
  },
}

lifecycle, err := agent.NewServiceLifecycleFromServiceRegistration(agent.ServiceRegistration{
  ServiceName: "auth",
  Namespace:   "default",
  DNS:         "auth.default.svc.cluster.local",
  Env:         "prod",
  Port:        9090,
  Protocol:    "grpc",
  Version:     "v1.0.0",
  Node:        node,
}, agent.DefaultLocalRuntimeOptions(""), agent.LifecycleOptions{
  GracePeriod: "20s",
})
if err != nil {
  return err
}

errCh := lifecycle.Start(ctx)

go func() {
  for err := range errCh {
    logger.Error(err)
  }
}()

如果业务服务当前更接近 “grpc.ServiceDesc + agent.ServiceOptions” 这类输入,也可以直接使用:

  • GRPCDescriptorOptions
  • NewServiceLifecycleFromGRPC(...)

它会自动:

  • 解析 grpc.ServiceDesc 中的完整 method path
  • 复用 ServiceOptions 中的 weight / kernel / instance_id
  • 组装 sidecar-agent 所需的标准注册描述

如果你希望把“业务服务启动/退出”和“agent 注册/摘流/注销”统一收敛成一个入口,还可以继续使用:

  • ServiceLifecycle
  • ManagedServer

这样业务侧可以把:

  • 本地 agent 连接恢复后的自动重放 register
  • 退出时的 drain + deregister
  • 业务服务自己的 serve + shutdown

统一收敛到一个 Run(ctx) 入口。

Documentation

Index

Constants

View Source
const (
	// DefaultAdminBaseURL 表示业务服务默认访问的本机 sidecar-agent 管理地址。
	DefaultAdminBaseURL = "http://127.0.0.1:15010"
	// DefaultWatchPath 表示 sidecar-agent 提供的默认 watch 路径。
	DefaultWatchPath = "/watch"
	// DefaultRequestTimeout 表示 register、drain、deregister 的默认请求超时。
	DefaultRequestTimeout = 3 * time.Second
	// DefaultReconnectInterval 表示 watch 断开后的默认重连间隔。
	DefaultReconnectInterval = time.Second
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client interface {
	// Register 负责把当前服务注册到本机 sidecar-agent。
	Register(ctx context.Context, request RegisterRequest) error
	// Drain 负责把当前服务切换到摘流状态。
	Drain(ctx context.Context, request DrainRequest) error
	// Deregister 负责把当前服务从本机 sidecar-agent 注销。
	Deregister(ctx context.Context, request DeregisterRequest) error
}

Client 抽象本机 sidecar-agent 的最小交互能力。

type ConnectionEvent

type ConnectionEvent struct {
	// Connected 表示当前事件是否意味着连接已建立。
	Connected bool
	// Err 保存连接断开或处理失败时的上下文错误。
	Err error
}

ConnectionEvent 描述本机 agent 连接状态变化事件。

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

Controller 负责在业务服务侧缓存注册描述,并在连接恢复后重放 register。

func NewController

func NewController(client Client, provider DescriptorProvider) (*Controller, error)

NewController 创建一个新的业务侧 agent 联动控制器。

func (*Controller) Deregister

func (c *Controller) Deregister(ctx context.Context) error

Deregister 使用最近一次成功注册的服务描述发起注销。

func (*Controller) Drain

func (c *Controller) Drain(ctx context.Context, gracePeriod string) error

Drain 使用最近一次成功注册的服务描述发起摘流。

func (*Controller) OnConnected

func (c *Controller) OnConnected(ctx context.Context) error

OnConnected 在本机 agent 连接建立或恢复时重放 register。

func (*Controller) OnDisconnected

func (c *Controller) OnDisconnected()

OnDisconnected 在本机 agent 连接断开时把控制器状态标记为未连接。

func (*Controller) Status

func (c *Controller) Status() Status

Status 返回控制器的当前状态快照。

type DeregisterRequest

type DeregisterRequest struct {
	// Name 表示逻辑服务名。
	Name string `json:"name"`
	// Port 表示业务服务监听端口。
	Port int `json:"port"`
}

DeregisterRequest 表示业务服务向本机 sidecar-agent 发起的注销请求。

type DescriptorProvider

type DescriptorProvider interface {
	// BuildRegisterRequest 返回当前业务服务完整注册描述。
	BuildRegisterRequest(ctx context.Context) (RegisterRequest, error)
}

DescriptorProvider 抽象业务服务注册描述的构造能力。

type DrainRequest

type DrainRequest struct {
	// Name 表示逻辑服务名。
	Name string `json:"name"`
	// Port 表示业务服务监听端口。
	Port int `json:"port"`
	// GracePeriod 表示摘流宽限期。
	GracePeriod string `json:"grace_period"`
}

DrainRequest 表示业务服务向本机 sidecar-agent 发起的摘流请求。

type ErrorHandler

type ErrorHandler func(context.Context, error)

ErrorHandler 用于统一处理运行时中的非致命错误。

type EventSource

type EventSource interface {
	// Subscribe 返回一个持续输出连接事件的只读通道。
	Subscribe(ctx context.Context) (<-chan ConnectionEvent, error)
}

EventSource 抽象本地 agent 连接事件来源。

type GRPCDescriptorOptions

type GRPCDescriptorOptions struct {
	// AppID 表示应用标识。
	AppId string
	// AppName 表示应用名称;为空时回退到 ServiceName。
	AppName string
	// ServiceName 表示逻辑服务名。
	ServiceName string
	// Namespace 表示命名空间。
	Namespace string
	// DNS 表示统一服务域名。
	DNS string
	// Env 表示运行环境。
	Env string
	// Port 表示业务监听端口。
	Port int
	// Protocol 表示业务协议。
	Protocol string
	// Version 表示业务版本号。
	Version string
	// ServiceOptions 表示业务服务当前的裸机接入配置。
	ServiceOptions *ServiceOptions
	// RawServices 表示业务已注册的 gRPC ServiceDesc 集合。
	RawServices []*grpc.ServiceDesc
}

GRPCDescriptorOptions 描述如何从 gRPC 服务描述直接构造 agent 注册信息。

type JSONHTTPClient

type JSONHTTPClient struct {
	// contains filtered or unexported fields
}

JSONHTTPClient 提供一个基于 JSON over HTTP 的本地 agent client 实现。

func NewJSONHTTPClient

func NewJSONHTTPClient(baseURL string, timeout time.Duration) *JSONHTTPClient

NewJSONHTTPClient 创建一个新的本地 HTTP JSON client。

func (*JSONHTTPClient) Deregister

func (c *JSONHTTPClient) Deregister(ctx context.Context, request DeregisterRequest) error

Deregister 通过本机管理接口发起注销。

func (*JSONHTTPClient) Drain

func (c *JSONHTTPClient) Drain(ctx context.Context, request DrainRequest) error

Drain 通过本机管理接口发起摘流。

func (*JSONHTTPClient) Register

func (c *JSONHTTPClient) Register(ctx context.Context, request RegisterRequest) error

Register 通过本机管理接口注册当前服务。

type KernelInfo

type KernelInfo struct {
	// Language 表示业务服务使用的开发语言。
	Language string `json:"language"`
	// Version 表示业务服务运行时或框架版本。
	Version string `json:"version"`
}

KernelInfo 描述业务服务运行时信息。

type LifecycleOptions

type LifecycleOptions struct {
	// Runtime 表示已经组装完成的本地 agent 运行时。
	Runtime *LocalRuntime
	// GracePeriod 表示业务服务优雅下线时使用的默认摘流宽限期。
	GracePeriod string
}

LifecycleOptions 描述业务服务接入 agent 生命周期桥接时的最小参数。

type LocalRuntime

type LocalRuntime struct {
	// Client 负责发起 register / drain / deregister 请求。
	Client *JSONHTTPClient
	// Source 负责把本地 `/watch` 连接转换成连接事件流。
	Source *WatchSource
	// Controller 负责缓存注册描述并在连接恢复时重放 register。
	Controller *Controller
	// Runner 负责驱动事件流与 register 重放过程。
	Runner *Runner
}

LocalRuntime 把本地 HTTP client、watch 事件源、控制器与运行器组装成一个整体。

func NewLocalRuntime

func NewLocalRuntime(provider DescriptorProvider, options LocalRuntimeOptions) (*LocalRuntime, error)

NewLocalRuntime 使用最小参数组装一套可直接接入业务服务的本地运行时。

func NewLocalRuntimeFromServiceRegistration

func NewLocalRuntimeFromServiceRegistration(descriptor ServiceRegistration, options LocalRuntimeOptions) (*LocalRuntime, error)

NewLocalRuntimeFromServiceRegistration 使用 go-micro 服务描述直接组装本地 agent 运行时。

func (*LocalRuntime) Deregister

func (r *LocalRuntime) Deregister(ctx context.Context) error

Deregister 通过控制器复用最近一次注册描述发起注销。

func (*LocalRuntime) Drain

func (r *LocalRuntime) Drain(ctx context.Context, gracePeriod string) error

Drain 通过控制器复用最近一次注册描述发起摘流。

func (*LocalRuntime) Run

func (r *LocalRuntime) Run(ctx context.Context) error

Run 启动本地 watch 事件循环,并在连接恢复时自动重放 register。

func (*LocalRuntime) Status

func (r *LocalRuntime) Status() Status

Status 返回当前本地运行时的状态快照。

type LocalRuntimeOptions

type LocalRuntimeOptions struct {
	// BaseURL 表示本机 sidecar-agent 管理接口前缀。
	BaseURL string
	// WatchURL 表示本机 sidecar-agent 的长连接 watch 地址。
	WatchURL string
	// RequestTimeout 表示 register / drain / deregister 请求超时。
	RequestTimeout time.Duration
	// ReconnectInterval 表示 watch 断开后的重连间隔。
	ReconnectInterval time.Duration
	// OnError 用于统一处理 watch 与 register 重放过程中的错误。
	OnError ErrorHandler
}

LocalRuntimeOptions 描述业务服务接入本机 sidecar-agent 的最小参数。

func DefaultLocalRuntimeOptions

func DefaultLocalRuntimeOptions(baseURL string) LocalRuntimeOptions

DefaultLocalRuntimeOptions 返回一份可直接接入业务服务的默认本地运行时参数。

type ManagedServer

type ManagedServer struct {
	// contains filtered or unexported fields
}

ManagedServer 把业务服务运行逻辑与 sidecar-agent 生命周期收敛成一个统一入口。

func NewManagedServer

func NewManagedServer(options ManagedServerOptions) (*ManagedServer, error)

NewManagedServer 创建一个新的业务服务托管器。

func (*ManagedServer) Run

func (s *ManagedServer) Run(ctx context.Context) error

Run 启动业务服务,并在退出时统一执行 agent 注销与业务优雅关闭。

type ManagedServerOptions

type ManagedServerOptions struct {
	// Lifecycle 表示 sidecar-agent 生命周期桥接对象。
	Lifecycle *ServiceLifecycle
	// Serve 表示业务服务真正的阻塞运行入口。
	Serve ServeFunc
	// Shutdown 表示业务服务优雅关闭入口;可为空。
	Shutdown ShutdownFunc
}

ManagedServerOptions 描述如何把业务服务运行逻辑与 agent 生命周期桥接到一起。

type RegisterRequest

type RegisterRequest struct {
	// AppID 表示应用标识。
	AppID string `json:"app_id"`
	// AppName 表示应用名称。
	AppName string `json:"app_name"`
	// Name 表示逻辑服务名。
	Name string `json:"name"`
	// Namespace 表示命名空间。
	Namespace string `json:"namespace"`
	// Port 表示业务服务监听端口。
	Port int `json:"port"`
	// DNS 表示业务服务统一域名。
	DNS string `json:"dns"`
	// Env 表示业务服务所属环境。
	Env string `json:"env"`
	// Weight 表示实例权重。
	Weight int `json:"weight"`
	// Protocol 表示服务协议。
	Protocol string `json:"protocol"`
	// Kernel 表示业务服务运行时信息。
	Kernel KernelInfo `json:"kernel"`
	// Methods 表示业务服务暴露的方法列表。
	Methods []string `json:"methods"`
	// Version 表示业务版本号。
	Version string `json:"version"`
}

RegisterRequest 表示业务服务向本机 sidecar-agent 发起的注册请求。

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner 负责把连接事件转换成 register 重放动作。

func NewRunner

func NewRunner(source EventSource, controller *Controller, onError ErrorHandler) (*Runner, error)

NewRunner 创建一个新的连接事件驱动运行器。

func (*Runner) Run

func (r *Runner) Run(ctx context.Context) error

Run 持续消费连接事件,并在连接恢复后自动重放 register。

type ServeFunc

type ServeFunc func(context.Context) error

ServeFunc 抽象业务服务真正的阻塞运行入口。

type ServiceKernel

type ServiceKernel struct {
	Language string `json:"language"`
	Version  string `json:"version"`
}

ServiceKernel 描述业务服务运行时内核信息。

func (*ServiceKernel) Bootstrap

func (k *ServiceKernel) Bootstrap()

Bootstrap 补齐内核默认值。

type ServiceLifecycle

type ServiceLifecycle struct {
	// contains filtered or unexported fields
}

ServiceLifecycle 把本地 agent 运行时包装成更适合业务启动与退出阶段接入的对象。

func NewServiceLifecycle

func NewServiceLifecycle(options LifecycleOptions) (*ServiceLifecycle, error)

NewServiceLifecycle 基于已组装好的本地运行时创建业务生命周期桥接对象。

func NewServiceLifecycleFromGRPC

func NewServiceLifecycleFromGRPC(descriptorOptions GRPCDescriptorOptions, runtimeOptions LocalRuntimeOptions, lifecycleOptions LifecycleOptions) (*ServiceLifecycle, error)

NewServiceLifecycleFromGRPC 使用 gRPC ServiceDesc 与 ServiceOptions 直接创建业务生命周期桥接对象。

func NewServiceLifecycleFromServiceRegistration

func NewServiceLifecycleFromServiceRegistration(descriptor ServiceRegistration, runtimeOptions LocalRuntimeOptions, lifecycleOptions LifecycleOptions) (*ServiceLifecycle, error)

NewServiceLifecycleFromServiceRegistration 允许业务直接基于 go-micro 服务描述组装生命周期桥接对象。

func (*ServiceLifecycle) Deregister

func (l *ServiceLifecycle) Deregister(ctx context.Context) error

Deregister 对当前服务发起注销。

func (*ServiceLifecycle) Drain

func (l *ServiceLifecycle) Drain(ctx context.Context) error

Drain 使用默认宽限期对当前服务发起摘流。

func (*ServiceLifecycle) Shutdown

func (l *ServiceLifecycle) Shutdown(ctx context.Context) error

Shutdown 先摘流再注销,适合业务服务优雅退出路径直接调用。

func (*ServiceLifecycle) Start

func (l *ServiceLifecycle) Start(ctx context.Context) <-chan error

Start 在后台启动本地 watch 运行循环,并返回异步错误通道。

func (*ServiceLifecycle) Status

func (l *ServiceLifecycle) Status() Status

Status 返回当前生命周期桥接对象的最新状态快照。

type ServiceMeta

type ServiceMeta struct {
	AppId      string `json:"app_id"`
	InstanceId string `json:"instance_id"`
	Version    string `json:"version"`
	Env        string `json:"env"`
}

ServiceMeta 描述业务服务在裸机场景下的最小身份信息。

type ServiceNode

type ServiceNode struct {
	ProtoCount int             `json:"proto_count"`
	Weight     int             `json:"weight"`
	RunDate    string          `json:"run_date"`
	Methods    map[string]bool `json:"methods"`
	Kernel     *ServiceKernel  `json:"kernel"`
	Meta       *ServiceMeta    `json:"meta"`
}

ServiceNode 描述业务服务在裸机场景下的最小注册节点信息。

type ServiceOptions

type ServiceOptions struct {
	InstanceId string         `json:"instance_id"`
	Namespace  string         `json:"namespace"`
	Kernel     *ServiceKernel `json:"kernel"`
	MaxRetry   uint32         `json:"max_retry"`
	TTL        uint32         `json:"ttl"`
	Weight     int            `json:"weight"`
}

ServiceOptions 描述业务服务接入本机 sidecar-agent 时的基础选项。

func (*ServiceOptions) Bootstrap

func (o *ServiceOptions) Bootstrap()

Bootstrap 补齐裸机服务接入时的默认值。

type ServiceRegistration

type ServiceRegistration struct {
	// AppId 表示应用标识;为空时优先回退到 Node.Meta.AppID。
	AppId string
	// AppName 表示应用名称;为空时回退到 ServiceName。
	AppName string
	// ServiceName 表示逻辑服务名。
	ServiceName string
	// Namespace 表示命名空间。
	Namespace string
	// DNS 表示统一服务域名。
	DNS string
	// Env 表示运行环境。
	Env string
	// Port 表示业务监听端口。
	Port int
	// Protocol 表示当前服务协议。
	Protocol string
	// Version 表示业务版本号。
	Version string
	// Weight 表示实例权重;为空时优先回退到 Node.Weight。
	Weight int
	// Node 表示业务服务当前的注册节点信息。
	Node *ServiceNode
}

ServiceRegistration 描述如何把 go-micro 的服务信息映射成 sidecar-agent register 请求。

func NewServiceRegistrationFromGRPC

func NewServiceRegistrationFromGRPC(options GRPCDescriptorOptions) (ServiceRegistration, error)

NewServiceRegistrationFromGRPC 使用 gRPC ServiceDesc 与 ServiceOptions 构造服务注册描述。

type ServiceRegistrationProvider

type ServiceRegistrationProvider struct {
	// contains filtered or unexported fields
}

ServiceRegistrationProvider 负责把 go-micro 的服务节点信息转换成 agent 注册描述。

func NewServiceRegistrationProvider

func NewServiceRegistrationProvider(descriptor ServiceRegistration) (*ServiceRegistrationProvider, error)

NewServiceRegistrationProvider 创建一个基于 go-micro 服务描述的 provider。

func (*ServiceRegistrationProvider) BuildRegisterRequest

func (p *ServiceRegistrationProvider) BuildRegisterRequest(ctx context.Context) (RegisterRequest, error)

BuildRegisterRequest 生成 sidecar-agent 所需的标准注册请求。

type ShutdownFunc

type ShutdownFunc func(context.Context) error

ShutdownFunc 抽象业务服务优雅关闭入口。

type Status

type Status struct {
	// Connected 表示当前是否与本机 sidecar-agent 保持连接。
	Connected bool
	// Registered 表示最近一次 register 是否成功完成。
	Registered bool
	// LastServiceName 表示最近一次成功注册的服务名。
	LastServiceName string
	// LastServicePort 表示最近一次成功注册的服务端口。
	LastServicePort int
}

Status 描述当前 agent 联动控制器的最新状态。

type WatchSource

type WatchSource struct {
	// contains filtered or unexported fields
}

WatchSource 基于 sidecar-agent 的 `/watch` 长连接接口输出连接事件。

func NewWatchSource

func NewWatchSource(watchURL string, reconnectInterval time.Duration) *WatchSource

NewWatchSource 创建一个新的长连接事件源。

func (*WatchSource) Subscribe

func (s *WatchSource) Subscribe(ctx context.Context) (<-chan ConnectionEvent, error)

Subscribe 启动后台重连循环,并把连接状态变化转换成事件流。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL