cluster

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package cluster 处理多节点路由表管理。

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager 维护当前路由表,并提供给 s-proxy 注入响应头。 同时接受外部更新(peer 注册/注销时调用)以触发版本递增。

func NewManager

func NewManager(
	logger *zap.Logger,
	balancer lb.Balancer,
	initialTargets []lb.Target,
	cacheDir string,
) *Manager

NewManager 创建 ClusterManager。 initialTargets: 初始目标列表(通常来自配置文件)。 cacheDir: 路由表缓存目录(空串表示不持久化)。

func (*Manager) CurrentTable

func (m *Manager) CurrentTable() RoutingTable

CurrentTable 返回当前路由表快照。

func (*Manager) InjectResponseHeaders

func (m *Manager) InjectResponseHeaders(headers interface {
	Set(key, value string)
}, clientVersion int64)

InjectResponseHeaders 将路由表版本注入响应头。 若 clientVersion < 当前版本,同时注入完整路由表(X-Routing-Update)。

func (*Manager) MarkHealthy

func (m *Manager) MarkHealthy(id string)

MarkHealthy 将节点标记为健康并更新路由表版本。

func (*Manager) MarkUnhealthy

func (m *Manager) MarkUnhealthy(id string)

MarkUnhealthy 将节点标记为不健康并更新路由表版本。

func (*Manager) UpdateTargets

func (m *Manager) UpdateTargets(targets []lb.Target)

UpdateTargets 原子更新目标列表,版本号递增,并持久化。

type PeerInfo

type PeerInfo struct {
	ID         string    `json:"id"`
	Addr       string    `json:"addr"`   // HTTP 地址,如 "http://sp-2:9000"
	Weight     int       `json:"weight"` // 负载权重(≥1)
	LastSeen   time.Time `json:"last_seen"`
	IsHealthy  bool      `json:"is_healthy"`
	SourceNode string    `json:"source_node"` // peer 自报的节点标识
}

PeerInfo 记录一个已注册 sp-2 节点的信息。

type PeerRegistry

type PeerRegistry struct {
	// contains filtered or unexported fields
}

PeerRegistry 管理已注册的 sp-2 节点。 sp-1 持有此 Registry;sp-2 通过 POST /api/internal/register 心跳注册。

func NewPeerRegistry

func NewPeerRegistry(logger *zap.Logger, manager *Manager) *PeerRegistry

NewPeerRegistry 创建 PeerRegistry。

func (*PeerRegistry) Deregister

func (pr *PeerRegistry) Deregister(id string)

Deregister 主动注销一个 peer(优雅下线时调用)。

func (*PeerRegistry) EvictStale

func (pr *PeerRegistry) EvictStale()

EvictStale 将超过 TTL 未心跳的 peer 标记为不健康,并从路由表中移除。 通常由后台 goroutine 定期调用。

func (*PeerRegistry) Peers

func (pr *PeerRegistry) Peers() []PeerInfo

Peers 返回所有 peer 的快照(含不健康的)。

func (*PeerRegistry) Register

func (pr *PeerRegistry) Register(id, addr, sourceNode string, weight int)

Register 注册或更新一个 peer(心跳调用)。

type RegisterPayload

type RegisterPayload struct {
	ID         string `json:"id"`
	Addr       string `json:"addr"`
	Weight     int    `json:"weight"`
	SourceNode string `json:"source_node"`
}

RegisterPayload 心跳注册请求体。

type Reporter

type Reporter struct {
	// contains filtered or unexported fields
}

Reporter 运行在 sp-2 上,周期性地:

  1. 向 sp-1 发送心跳注册(POST /api/internal/register)
  2. 批量上报本地采集的 usage 记录(POST /api/internal/usage)

func NewReporter

func NewReporter(logger *zap.Logger, cfg ReporterConfig, usageRepo *db.UsageRepo) *Reporter

NewReporter 创建 Reporter。

func (*Reporter) ReportUsage

func (r *Reporter) ReportUsage(records []db.UsageRecord) error

ReportUsage 立即上报一批 usage 记录(供调用方手动调用或测试)。

func (*Reporter) Start

func (r *Reporter) Start(ctx context.Context)

Start 启动后台上报 goroutine。

type ReporterConfig

type ReporterConfig struct {
	SP1Addr      string
	SelfID       string
	SelfAddr     string
	SelfWeight   int
	Interval     time.Duration
	SharedSecret string // 内部 API 共享密钥(Bearer token)
}

ReporterConfig 配置 Reporter。

type RoutingEntry

type RoutingEntry struct {
	ID      string `json:"id"`
	Addr    string `json:"addr"`
	Weight  int    `json:"weight"`
	Healthy bool   `json:"healthy"`
}

RoutingEntry 单个 s-proxy 节点的路由信息。

type RoutingTable

type RoutingTable struct {
	Version int64          `json:"version"`
	Entries []RoutingEntry `json:"entries"`
}

RoutingTable 路由表(版本化)。 Version 单调递增;c-proxy 保存本地版本,响应头版本更大时则更新。

func DecodeRoutingTable

func DecodeRoutingTable(encoded string) (*RoutingTable, error)

DecodeRoutingTable 从 Base64+JSON 字符串解析路由表。

func LoadFromDir

func LoadFromDir(dir string) (*RoutingTable, error)

LoadFromDir 从指定目录下的 routing-cache.json 加载路由表。 文件不存在时返回 nil, nil(无缓存)。

func (*RoutingTable) Encode

func (rt *RoutingTable) Encode() (string, error)

Encode 将路由表序列化为 Base64+JSON 字符串(用于放入响应头)。

func (*RoutingTable) SaveToDir

func (rt *RoutingTable) SaveToDir(dir string) error

SaveToDir 将路由表持久化到指定目录下的 routing-cache.json。

type UsageReportPayload

type UsageReportPayload struct {
	SourceNode string           `json:"source_node"`
	Records    []db.UsageRecord `json:"records"`
}

UsageReportPayload 用量批量上报请求体。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL