pluginhost

package
v0.0.0-...-1a39f93 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2025 License: Apache-2.0 Imports: 23 Imported by: 0

README

Plugin Host 系統 - 完整實作指南

Plugin Host 是 Detectviz 平台的核心組件,負責管理和執行各種工具插件。經過優化後的系統提供健康狀態管理、負載保護、熔斷機制和 Prometheus 監控整合,確保生產環境的高可用性。

📁 檔案結構說明

  • bridge_server.go:ToolBridge gRPC 伺服端,掛載 mTLS 與攔截器,分派請求至 Registry
  • registry.go:優化的插件註冊中心,支援健康狀態管理、負載保護、熔斷機制和 Prometheus 監控
  • metrics.go:Prometheus 監控整合,提供插件負載、健康狀態和調用統計指標
  • registry_test.go:完整的單元測試,涵蓋並發安全、資源釋放、生命週期管理
  • runtime.go:插件運行時管理,啟停控制、健康檢查
  • observability.go:可觀測性整合,OpenTelemetry 監控與追蹤
  • security.go:mTLS 憑證載入與安全邊界控制
  • interceptors.go:Metadata 攔截、追蹤上下文透傳

🔄 Plugin 生命週期管理

註冊階段
// 創建優化的註冊中心
config := &RegistryConfig{
    MaxLoad:        100,             // 最大負載閾值
    HealthInterval: 30 * time.Second, // 健康檢查間隔
}
registry := NewRegistryWithConfig(config)

// 啟動健康檢查
registry.StartHealthChecks()

// 註冊插件 - 不允許重複註冊
err := registry.Register("http_request", handler)
if err != nil {
    log.Fatal("插件註冊失敗:", err)
}

// 熱替換註冊 - 開發環境使用
err = registry.RegisterOrReplace("http_request", newHandler)
服務階段
  • 接收 gRPC 請求並路由到對應 Handler
  • 自動健康狀態檢查和熔斷保護
  • 負載保護機制,防止插件過載
  • 自動注入追蹤上下文和監控指標
  • 處理超時控制和錯誤回傳
  • Prometheus 指標自動收集和上報
關閉階段
// 優雅關機 - 三階段關閉流程
registry.Shutdown()
// 1. 停止健康檢查,標記所有插件為關閉中
// 2. 等待進行中請求完成(最多2秒)
// 3. 釋放所有插件資源

🛡️ 資源治理與安全

插件介面
// 基本處理器介面
type Handler interface {
    Invoke(ctx context.Context, req *v1.InvokeRequest) (*v1.InvokeResponse, error)
}

// 支援資源釋放的處理器
type ClosableHandler interface {
    Handler
    Close() error  // 實作資源釋放邏輯
}

// 支援健康檢查的處理器(推薦)
type HealthAwareHandler interface {
    ClosableHandler
    HealthCheck() error  // 自定義健康檢查邏輯
}
併發安全保證
  • 使用 sync.RWMutex 保證 Registry 操作的線程安全
  • 讀操作(Lookup, GetPluginStatus)允許並發
  • 寫操作(Register, Unregister)互斥執行
  • 原子操作管理插件負載計數器
健康狀態管理
  • 四級健康狀態:未知、正常、降級、異常、關閉中
  • 自動健康檢查:定期檢查插件狀態,支援自定義邏輯
  • 熔斷機制:異常插件自動熔斷,避免級聯故障
  • 負載保護:超載時自動拒絕新請求,防止系統過載
資源洩漏防護
  • 註冊替換時自動關閉舊 Handler
  • 三階段優雅關機,確保所有資源正確釋放
  • 等待進行中請求完成,避免數據丟失

🧪 測試覆蓋

測試涵蓋以下場景:

  • ✅ 基本註冊與查詢功能
  • ✅ 健康狀態管理和熔斷機制
  • ✅ 負載保護和過載防護
  • ✅ 熱替換與資源自動釋放
  • ✅ 併發操作安全性
  • ✅ 三階段優雅關機流程
  • ✅ Prometheus 監控指標
  • ✅ 錯誤處理與恢復
  • ✅ 性能基準測試

執行測試:

cd go-platform
go test -v ./internal/pluginhost/... -race

🔧 最佳實踐

Handler 實作建議
type MyHandler struct {
    client *http.Client
    wg     sync.WaitGroup  // 追蹤活躍請求
    closed atomic.Bool     // 關閉狀態標記
}

func (h *MyHandler) Invoke(ctx context.Context, req *v1.InvokeRequest) (*v1.InvokeResponse, error) {
    if h.closed.Load() {
        return nil, fmt.Errorf("handler is closed")
    }
    
    h.wg.Add(1)
    defer h.wg.Done()
    
    // 實作處理邏輯...
}

func (h *MyHandler) Close() error {
    h.closed.Store(true)
    h.wg.Wait()  // 等待所有請求完成
    if h.client != nil {
        h.client.CloseIdleConnections()
    }
    return nil
}

func (h *MyHandler) HealthCheck() error {
    if h.closed.Load() {
        return fmt.Errorf("handler is closed")
    }
    // 實作自定義健康檢查邏輯
    return nil
}
監控與狀態查詢
// 獲取插件狀態
status := registry.GetPluginStatus()
for name, pluginStatus := range status {
    statusMap := pluginStatus.(map[string]interface{})
    fmt.Printf("插件 %s: 健康=%s, 負載=%d\n", 
        name, statusMap["health_str"], statusMap["load"])
}

// 更新 Prometheus 指標
registry.UpdateMetrics()

// 獲取插件列表
pluginNames := registry.GetPluginNames()
pluginCount := registry.GetPluginCount()
錯誤處理模式
// 統一錯誤回傳格式
return &v1.InvokeResponse{
    Result: result,
    Status: &statuspb.Status{
        Code:    int32(codes.InvalidArgument),
        Message: "參數驗證失敗",
    },
}, nil  // gRPC 層面不報錯,錯誤信息在 Status 中

🔗 相關資源

  • 契約來源contracts/proto/adk_bridge.proto
  • 生成指令cd contracts && make gen
  • 健康檢查GET /contracts - 檢視契約版本信息
  • 監控端點:集成 OpenTelemetry,支援 traces 和 metrics

📝 維護提醒:任何對插件接口的修改都需要同步更新契約定義,並重新生成跨語言存根檔案。

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ListenAndServe

func ListenAndServe(addr string, tlsCfg *tls.Config, reg *Registry, unary ...grpc.UnaryServerInterceptor) error

監聽與啟動服務

func LoadTLSConfig

func LoadTLSConfig(certData, keyData, caData []byte) (*tls.Config, error)

LoadTLSConfig 由憑證內容載入 mTLS 設定。若任一為空,回傳 nil 代表使用明文(僅限開發)。

func NewGRPCServer

func NewGRPCServer(tlsCfg *tls.Config, unary ...grpc.UnaryServerInterceptor) *grpc.Server

建立 gRPC 伺服器(可掛載 mTLS 與攔截器)

func StartSpan

func StartSpan(ctx context.Context, name string) (context.Context, func())

StartSpan 創建 pluginhost 相關的追蹤 span

func UnaryMetaInterceptor

func UnaryMetaInterceptor() grpc.UnaryServerInterceptor

UnaryMetaInterceptor 取得與透傳必要 metadata(tenant/traceparent/owner 等)

Types

type ClosableHandler

type ClosableHandler interface {
	Handler
	Close() error
}

ClosableHandler 支援資源釋放的處理器

type EnhancedMonitoredHandler

type EnhancedMonitoredHandler struct {
	*MonitoredHandler
	// contains filtered or unexported fields
}

EnhancedMonitoredHandler 增強型監控處理器(支援資源感知)

func NewEnhancedMonitoredHandler

func NewEnhancedMonitoredHandler(pluginID string, handler ResourceAwareHandler, monitor *ResourceMonitor) *EnhancedMonitoredHandler

NewEnhancedMonitoredHandler 創建增強型監控處理器

func (*EnhancedMonitoredHandler) GetDetailedMetrics

func (emh *EnhancedMonitoredHandler) GetDetailedMetrics() map[string]interface{}

GetDetailedMetrics 獲取詳細的監控指標

func (*EnhancedMonitoredHandler) Invoke

Invoke 執行增強型監控請求

func (*EnhancedMonitoredHandler) SetResourceLimits

func (emh *EnhancedMonitoredHandler) SetResourceLimits(maxMemoryBytes int64, maxGoroutines int32, maxConnections int32) error

SetResourceLimits 設置資源限制

type Handler

type Handler interface {
	Invoke(ctx context.Context, req *v1.InvokeRequest) (*v1.InvokeResponse, error)
}

Handler 為每個插件的處理器介面

type HealthAwareHandler

type HealthAwareHandler interface {
	ClosableHandler
	HealthCheck() error
}

HealthAwareHandler 支援健康檢查的處理器

type MonitoredHandler

type MonitoredHandler struct {
	// contains filtered or unexported fields
}

MonitoredHandler 具備資源監控功能的處理器包裝器

func NewMonitoredHandler

func NewMonitoredHandler(pluginID string, handler Handler, monitor *ResourceMonitor) *MonitoredHandler

NewMonitoredHandler 創建新的監控處理器

func (*MonitoredHandler) Close

func (mh *MonitoredHandler) Close() error

Close 實作資源釋放

func (*MonitoredHandler) GetActiveRequests

func (mh *MonitoredHandler) GetActiveRequests() int64

GetActiveRequests 獲取活躍請求數量

func (*MonitoredHandler) Invoke

Invoke 執行請求並進行資源監控

type PluginHealth

type PluginHealth int

插件健康狀態枚舉

const (
	HealthUnknown      PluginHealth = iota // 0: 未知狀態
	HealthOk                               // 1: 正常運行
	HealthDegraded                         // 2: 部分功能可用
	HealthCritical                         // 3: 嚴重錯誤
	HealthShuttingDown                     // 4: 關閉中
)

func (PluginHealth) String

func (h PluginHealth) String() string

String 實作 Stringer 介面

type PluginResourceMetrics

type PluginResourceMetrics struct {
	// 基本統計
	TotalRequests  int64 // 總請求數
	ActiveRequests int64 // 活躍請求數
	TotalErrors    int64 // 總錯誤數
	TotalDuration  int64 // 總執行時間(毫秒)

	// 資源使用
	MemoryUsageBytes int64 // 記憶體使用量(位元組)
	GoroutineCount   int32 // Goroutine 數量
	ConnectionCount  int32 // 連線數量

	// 性能指標
	AvgResponseTimeMs float64 // 平均回應時間
	RequestsPerSecond float64 // 每秒請求數
	ErrorRate         float64 // 錯誤率

	// 時間戳
	LastUpdateTime int64 // 最後更新時間(Unix毫秒)
	StartTime      int64 // 插件啟動時間
}

PluginResourceMetrics 插件級別資源指標

type PluginWrapper

type PluginWrapper struct {
	// contains filtered or unexported fields
}

PluginWrapper 插件封裝結構(包含健康狀態和負載追蹤)

func NewPluginWrapper

func NewPluginWrapper(name string, handler ClosableHandler) *PluginWrapper

NewPluginWrapper 創建新的插件包裝器

func (*PluginWrapper) GetHealth

func (pw *PluginWrapper) GetHealth() PluginHealth

GetHealth 取得健康狀態

func (*PluginWrapper) GetLastCheck

func (pw *PluginWrapper) GetLastCheck() time.Time

GetLastCheck 取得最後檢查時間

func (*PluginWrapper) GetLoad

func (pw *PluginWrapper) GetLoad() int32

GetLoad 取得當前負載

func (*PluginWrapper) SetHealth

func (pw *PluginWrapper) SetHealth(health PluginHealth)

SetHealth 設定健康狀態

type ReadyFunc

type ReadyFunc func()

ReadyFunc 用於在 runtime 完成就緒時通知外部(例如健康檢查)

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry 插件註冊中心(核心數據結構)

func NewRegistry

func NewRegistry() *Registry

NewRegistry 創建新註冊中心

func NewRegistryWithConfig

func NewRegistryWithConfig(config *RegistryConfig) *Registry

NewRegistryWithConfig 使用配置創建註冊中心

func (*Registry) GetPluginCount

func (r *Registry) GetPluginCount() int

GetPluginCount 獲取插件數量

func (*Registry) GetPluginNames

func (r *Registry) GetPluginNames() []string

GetPluginNames 獲取所有插件名稱

func (*Registry) GetPluginStatus

func (r *Registry) GetPluginStatus() map[string]interface{}

GetPluginStatus 獲取插件狀態(用於監控)

func (*Registry) Invoke

func (r *Registry) Invoke(ctx context.Context, pluginName string, req *v1.InvokeRequest) (*v1.InvokeResponse, error)

Invoke 調用插件(含熔斷和負載保護)

func (*Registry) Lookup

func (r *Registry) Lookup(pluginName string) (Handler, bool)

Lookup 查找插件處理器(保持相容性)

func (*Registry) Register

func (r *Registry) Register(name string, handler ClosableHandler) error

Register 註冊插件(執行緒安全)

func (*Registry) RegisterOrReplace

func (r *Registry) RegisterOrReplace(name string, handler ClosableHandler) error

RegisterOrReplace 註冊或替換插件

func (*Registry) Shutdown

func (r *Registry) Shutdown()

Shutdown 優雅關閉流程

func (*Registry) StartHealthChecks

func (r *Registry) StartHealthChecks()

StartHealthChecks 啟動健康檢查協程

func (*Registry) Unregister

func (r *Registry) Unregister(name string) error

Unregister 取消註冊插件

func (*Registry) UpdateMetrics

func (r *Registry) UpdateMetrics()

UpdateMetrics 更新插件指標(由 Registry 定期調用)

type RegistryConfig

type RegistryConfig struct {
	MaxLoad        int32         `yaml:"max_load" json:"max_load"`               // 最大負載,預設 100
	HealthInterval time.Duration `yaml:"health_interval" json:"health_interval"` // 健康檢查間隔,預設 30s
}

RegistryConfig 註冊中心配置

func DefaultRegistryConfig

func DefaultRegistryConfig() *RegistryConfig

DefaultRegistryConfig 預設配置

type ResourceAwareHandler

type ResourceAwareHandler interface {
	Handler

	// GetResourceUsage 返回當前資源使用情況
	GetResourceUsage() (memoryBytes int64, goroutines int32, connections int32)

	// SetResourceLimits 設置資源限制
	SetResourceLimits(maxMemoryBytes int64, maxGoroutines int32, maxConnections int32) error
}

ResourceAwareHandler 支援資源感知的處理器接口

type ResourceMonitor

type ResourceMonitor struct {
	// contains filtered or unexported fields
}

ResourceMonitor 資源監控管理器

func NewResourceMonitor

func NewResourceMonitor(interval time.Duration) (*ResourceMonitor, error)

NewResourceMonitor 創建新的資源監控器

func (*ResourceMonitor) GetAllMetrics

func (rm *ResourceMonitor) GetAllMetrics() map[string]*PluginResourceMetrics

GetAllMetrics 獲取所有插件指標

func (*ResourceMonitor) GetHealthStatus

func (rm *ResourceMonitor) GetHealthStatus() map[string]interface{}

GetHealthStatus 獲取監控健康狀態

func (*ResourceMonitor) GetPluginMetrics

func (rm *ResourceMonitor) GetPluginMetrics(pluginID string) *PluginResourceMetrics

GetPluginMetrics 獲取插件指標

func (*ResourceMonitor) RecordRequest

func (rm *ResourceMonitor) RecordRequest(pluginID string)

RecordRequest 記錄請求開始

func (*ResourceMonitor) RecordRequestComplete

func (rm *ResourceMonitor) RecordRequestComplete(pluginID string, durationMs int64, isError bool)

RecordRequestComplete 記錄請求完成

func (*ResourceMonitor) RegisterPlugin

func (rm *ResourceMonitor) RegisterPlugin(pluginID string)

RegisterPlugin 註冊插件監控

func (*ResourceMonitor) Stop

func (rm *ResourceMonitor) Stop()

Stop 停止監控器

func (*ResourceMonitor) UnregisterPlugin

func (rm *ResourceMonitor) UnregisterPlugin(pluginID string)

UnregisterPlugin 取消註冊插件監控

func (*ResourceMonitor) UpdateResourceUsage

func (rm *ResourceMonitor) UpdateResourceUsage(pluginID string, memoryBytes int64, goroutines int32, connections int32)

UpdateResourceUsage 更新資源使用情況

type Runtime

type Runtime struct {
	// contains filtered or unexported fields
}

func NewRuntime

func NewRuntime(addr string, tlsCfg *tls.Config, reg *Registry) *Runtime

func (*Runtime) SetOnReady

func (rt *Runtime) SetOnReady(f ReadyFunc)

SetOnReady 設置就緒回呼(可選)

func (*Runtime) Start

func (rt *Runtime) Start(ctx context.Context, unary ...grpc.UnaryServerInterceptor) error

Start 啟動 gRPC ToolBridge 服務(阻塞 Serve 放進 goroutine)

func (*Runtime) Stop

func (rt *Runtime) Stop(_ context.Context) error

type Server

type Server struct {
	v1.UnimplementedToolBridgeServiceServer
	// contains filtered or unexported fields
}

Server 實作 ToolBridge,將請求分派至已註冊的 Handler。

func NewServer

func NewServer(reg *Registry) *Server

func (*Server) Healthz

func (s *Server) Healthz(ctx context.Context, _ *v1.HealthzRequest) (*v1.HealthzResponse, error)

func (*Server) Invoke

func (s *Server) Invoke(ctx context.Context, req *v1.InvokeRequest) (*v1.InvokeResponse, error)

func (*Server) InvokeStream

Streaming 版(保留擴充)

Directories

Path Synopsis
plugins
knowledge
Package knowledge 實作知識庫管理插件
Package knowledge 實作知識庫管理插件
observability/health_aggregator
Package health_aggregator 實作健康指標聚合插件
Package health_aggregator 實作健康指標聚合插件

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL