grpc

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2026 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PluginKeyGateway    = "gateway"
	PluginKeyExtension  = "extension"
	PluginKeyMiddleware = "middleware"
)

PluginMap 插件类型到 go-plugin.Plugin 的映射键名

View Source
const MetadataRequestIDKey = "x-request-id"

MetadataRequestIDKey 是 gRPC metadata 中 request_id 的键。 gRPC metadata key 会被转为小写,这里直接用小写定义避免歧义。 用于 core→plugin 透传 request_id,让插件 server 端日志能跨进程串起来。

View Source
const PluginGRPCMaxMessageBytes = 64 * 1024 * 1024

PluginGRPCMaxMessageBytes 是插件 gRPC 服务端单条消息最大字节数(收/发同值)。 默认 4 MB 经常被大段 LLM 响应或翻译后的 SSE 事件击穿,统一抬到 64 MB; 必须与 core 侧 ClientConfig.GRPCDialOptions 中的上限保持一致。

Variables

View Source
var Handshake = plugin.HandshakeConfig{
	ProtocolVersion:  1,
	MagicCookieKey:   "AIRGATE_PLUGIN",
	MagicCookieValue: "airgate-v1",
}

Handshake 统一握手配置,核心和插件必须使用相同值

Functions

func LoggingStreamClientInterceptor added in v0.1.1

func LoggingStreamClientInterceptor() grpc.StreamClientInterceptor

LoggingStreamClientInterceptor 给 core→plugin 的 streaming RPC 加结构化日志(仅在建立 stream 时打点)。

func LoggingStreamServerInterceptor added in v0.1.1

func LoggingStreamServerInterceptor() grpc.StreamServerInterceptor

LoggingStreamServerInterceptor 给插件 server 端 streaming RPC 加结构化日志。

仅在 stream 结束(handler 返回)时打点:成功 Debug、失败 Error。 stream 内部的 chunk 收发由业务侧自行决定要不要打 trace 日志。

func LoggingUnaryClientInterceptor added in v0.1.1

func LoggingUnaryClientInterceptor() grpc.UnaryClientInterceptor

LoggingUnaryClientInterceptor 给 core→plugin 的 unary RPC 加结构化日志。

行为:

  • 进入时把 ctx 中的 request_id(若有)append 到 outgoing metadata,让插件 server 端能拿到
  • 进入时 Debug "grpc_call_start"
  • 失败时 Error "grpc_call_failed",带 grpc_code、duration_ms、error
  • 成功时 Debug "grpc_call_completed",带 duration_ms

高频路径(GetInfo / HealthCheck)只在 debug 级别输出,避免污染 info 流。 所有日志统一使用 sdk.LogFieldXxx 字段名,方便日志采集侧建索引。

func LoggingUnaryServerInterceptor added in v0.1.1

func LoggingUnaryServerInterceptor() grpc.UnaryServerInterceptor

LoggingUnaryServerInterceptor 给插件 server 端 unary RPC 加结构化日志。

行为:

  • 进入时从 incoming metadata 抽取 x-request-id(若有),写入 ctx 并派生带 request_id 的 logger
  • 失败时 Error "grpc_server_handle_failed"
  • 成功时 Debug "grpc_server_handle_completed"

注意:这里把 ctx 中的 logger 替换为派生 logger,业务 handler 用 sdk.LoggerFromContext(ctx) 即可拿到带 request_id 的 logger,自然形成跨进程链路。

func NewHostClient

func NewHostClient(c pb.HostServiceClient) sdk.Host

NewHostClient 用一个 grpc client 构造 sdk.Host。 一般由 grpcPluginContext.Host() lazy 调用,不建议插件直接构造。

func Serve

func Serve(impl interface{})

Serve 便捷函数:启动插件 gRPC 服务(插件的 main.go 中调用) 自动识别插件类型,注册对应的 gRPC 服务 自动初始化带 module=plugin.<ID> 前缀的全局日志

Types

type ExtensionGRPCClient

type ExtensionGRPCClient struct {
	// contains filtered or unexported fields
}

ExtensionGRPCClient 将 gRPC 客户端包装为 ExtensionPlugin 接口(核心侧使用)

func (*ExtensionGRPCClient) BackgroundTasks

func (c *ExtensionGRPCClient) BackgroundTasks() []sdk.BackgroundTask

func (*ExtensionGRPCClient) GetWebAssets

func (b *ExtensionGRPCClient) GetWebAssets() (map[string][]byte, error)

GetWebAssets 获取插件前端静态资源

func (*ExtensionGRPCClient) HandleHTTPRequest

func (c *ExtensionGRPCClient) HandleHTTPRequest(ctx context.Context, req *pb.HttpRequest) (*pb.HttpResponse, error)

HandleHTTPRequest 代理 HTTP 请求到插件(核心内部调用)

func (*ExtensionGRPCClient) HandleHTTPStreamRequest added in v0.1.1

HandleHTTPStreamRequest 代理流式 HTTP 请求到插件,返回 chunk 流。 调用方逐个 Recv() 读取 HttpResponseChunk,第一个 chunk 包含 status_code 和 headers。

func (*ExtensionGRPCClient) HealthCheck

func (b *ExtensionGRPCClient) HealthCheck(ctx context.Context) error

HealthCheck 健康检查(客户端侧调用)

func (*ExtensionGRPCClient) Info

func (b *ExtensionGRPCClient) Info() sdk.PluginInfo

Info 获取插件信息(带缓存)

func (*ExtensionGRPCClient) Init

func (b *ExtensionGRPCClient) Init(ctx sdk.PluginContext) error

Init 初始化插件

func (*ExtensionGRPCClient) InvalidateCache

func (c *ExtensionGRPCClient) InvalidateCache()

InvalidateCache 清除缓存的插件信息,下次调用时重新获取

func (*ExtensionGRPCClient) Migrate

func (c *ExtensionGRPCClient) Migrate() error

func (*ExtensionGRPCClient) RegisterRoutes

func (c *ExtensionGRPCClient) RegisterRoutes(_ sdk.RouteRegistrar)

func (*ExtensionGRPCClient) RunBackgroundTask

func (c *ExtensionGRPCClient) RunBackgroundTask(ctx context.Context, name string) error

RunBackgroundTask 让插件进程执行已声明的后台任务(由 Core 调度器调用)。 使用独立的 ctx(一般由调用方控制超时),不复用 withTimeout(),因为任务可能较慢。

func (*ExtensionGRPCClient) Start

func (b *ExtensionGRPCClient) Start(ctx context.Context) error

Start 启动插件

func (*ExtensionGRPCClient) Stop

func (b *ExtensionGRPCClient) Stop(ctx context.Context) error

Stop 停止插件

type ExtensionGRPCPlugin

type ExtensionGRPCPlugin struct {
	goplugin.Plugin
	Impl     sdk.ExtensionPlugin
	HostImpl pb.HostServiceServer
}

ExtensionGRPCPlugin 实现扩展插件的 go-plugin 接口

func (*ExtensionGRPCPlugin) GRPCClient

func (p *ExtensionGRPCPlugin) GRPCClient(_ context.Context, broker *goplugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error)

func (*ExtensionGRPCPlugin) GRPCServer

func (p *ExtensionGRPCPlugin) GRPCServer(broker *goplugin.GRPCBroker, s *grpc.Server) error

type ExtensionGRPCServer

type ExtensionGRPCServer struct {
	pb.UnimplementedExtensionServiceServer
	Impl sdk.ExtensionPlugin
	// contains filtered or unexported fields
}

ExtensionGRPCServer 将 ExtensionPlugin 包装为 gRPC 服务端

func (*ExtensionGRPCServer) GetBackgroundTasks

func (s *ExtensionGRPCServer) GetBackgroundTasks(_ context.Context, _ *pb.Empty) (*pb.BackgroundTasksResponse, error)

func (*ExtensionGRPCServer) HandleRequest

func (s *ExtensionGRPCServer) HandleRequest(ctx context.Context, req *pb.HttpRequest) (*pb.HttpResponse, error)

func (*ExtensionGRPCServer) HandleStreamRequest

func (*ExtensionGRPCServer) Migrate

func (s *ExtensionGRPCServer) Migrate(_ context.Context, _ *pb.Empty) (*pb.Empty, error)

func (*ExtensionGRPCServer) RunBackgroundTask

func (s *ExtensionGRPCServer) RunBackgroundTask(ctx context.Context, req *pb.RunBackgroundTaskRequest) (*pb.Empty, error)

RunBackgroundTask 由 Core 调度器周期触发,按名查表执行 Handler。

type GatewayGRPCClient

type GatewayGRPCClient struct {
	// contains filtered or unexported fields
}

GatewayGRPCClient 把 gRPC 客户端包装成 GatewayPlugin 接口,供 Core 消费。

func (*GatewayGRPCClient) Forward

func (*GatewayGRPCClient) GetWebAssets

func (b *GatewayGRPCClient) GetWebAssets() (map[string][]byte, error)

GetWebAssets 获取插件前端静态资源

func (*GatewayGRPCClient) HandleHTTPRequest

func (b *GatewayGRPCClient) HandleHTTPRequest(ctx context.Context, method, path, query string, headers http.Header, body []byte) (int, http.Header, []byte, error)

HandleHTTPRequest 通用请求代理,Core 透传请求给插件

func (*GatewayGRPCClient) HandleWebSocket

func (c *GatewayGRPCClient) HandleWebSocket(ctx context.Context, conn sdk.WebSocketConn) (sdk.ForwardOutcome, error)

HandleWebSocket 通过 gRPC 双向流处理 WebSocket(Core 侧调用)。

func (*GatewayGRPCClient) HealthCheck

func (b *GatewayGRPCClient) HealthCheck(ctx context.Context) error

HealthCheck 健康检查(客户端侧调用)

func (*GatewayGRPCClient) Info

func (b *GatewayGRPCClient) Info() sdk.PluginInfo

Info 获取插件信息(带缓存)

func (*GatewayGRPCClient) Init

func (b *GatewayGRPCClient) Init(ctx sdk.PluginContext) error

Init 初始化插件

func (*GatewayGRPCClient) InvalidateCache

func (c *GatewayGRPCClient) InvalidateCache()

InvalidateCache 清除所有元数据缓存,下次调用时重新从插件获取。 典型场景:ConfigWatcher.OnConfigUpdate 后调用。

func (*GatewayGRPCClient) Models

func (c *GatewayGRPCClient) Models() []sdk.ModelInfo

func (*GatewayGRPCClient) Platform

func (c *GatewayGRPCClient) Platform() string

func (*GatewayGRPCClient) QueryQuota

func (c *GatewayGRPCClient) QueryQuota(ctx context.Context, credentials map[string]string) (*sdk.QuotaInfo, error)

func (*GatewayGRPCClient) Routes

func (c *GatewayGRPCClient) Routes() []sdk.RouteDefinition

func (*GatewayGRPCClient) Start

func (b *GatewayGRPCClient) Start(ctx context.Context) error

Start 启动插件

func (*GatewayGRPCClient) Stop

func (b *GatewayGRPCClient) Stop(ctx context.Context) error

Stop 停止插件

func (*GatewayGRPCClient) ValidateAccount

func (c *GatewayGRPCClient) ValidateAccount(ctx context.Context, credentials map[string]string) error

type GatewayGRPCPlugin

type GatewayGRPCPlugin struct {
	goplugin.Plugin
	Impl     sdk.GatewayPlugin
	HostImpl pb.HostServiceServer // host 侧注入;plugin 侧为 nil
}

GatewayGRPCPlugin 实现 hashicorp/go-plugin.GRPCPlugin 接口

HostImpl 字段由 Core 在构造 ClientConfig 时注入。当 HostImpl 非 nil 时, GRPCClient 钩子会通过 GRPCBroker 启一条新的 stream,注册 HostService server, 把 stream id 通过 pluginBase.hostBrokerID 透传给后续 Init 调用。

插件进程构造 GRPCServer 时不会用到 HostImpl(HostImpl 只在 host 侧有值), 所以插件二进制 main.go 里 Serve(impl) 时不需要也不能填 HostImpl。

func (*GatewayGRPCPlugin) GRPCClient

func (p *GatewayGRPCPlugin) GRPCClient(_ context.Context, broker *goplugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error)

func (*GatewayGRPCPlugin) GRPCServer

func (p *GatewayGRPCPlugin) GRPCServer(broker *goplugin.GRPCBroker, s *grpc.Server) error

type GatewayGRPCServer

type GatewayGRPCServer struct {
	pb.UnimplementedGatewayServiceServer
	Impl sdk.GatewayPlugin
}

GatewayGRPCServer 将 GatewayPlugin 包装为 gRPC 服务端。

func (*GatewayGRPCServer) Forward

func (*GatewayGRPCServer) ForwardStream

func (*GatewayGRPCServer) GetModels

func (s *GatewayGRPCServer) GetModels(_ context.Context, _ *pb.Empty) (*pb.ModelsResponse, error)

func (*GatewayGRPCServer) GetPlatform

func (s *GatewayGRPCServer) GetPlatform(_ context.Context, _ *pb.Empty) (*pb.StringResponse, error)

func (*GatewayGRPCServer) GetRoutes

func (s *GatewayGRPCServer) GetRoutes(_ context.Context, _ *pb.Empty) (*pb.RoutesResponse, error)

func (*GatewayGRPCServer) HandleWebSocket

HandleWebSocket 处理核心发来的 WebSocket 双向流 将 gRPC 双向流包装为 sdk.WebSocketConn,传给插件的 HandleWebSocket()

func (*GatewayGRPCServer) QueryQuota

func (*GatewayGRPCServer) ValidateAccount

func (s *GatewayGRPCServer) ValidateAccount(ctx context.Context, req *pb.CredentialsRequest) (*pb.Empty, error)

type MiddlewareGRPCClient

type MiddlewareGRPCClient struct {
	// contains filtered or unexported fields
}

MiddlewareGRPCClient 把 pb.MiddlewareServiceClient 包装成给 core 用的 plain Go API。

嵌入 pluginBase 自动获得 Info / Init / Start / Stop / GetWebAssets / HealthCheck 等基础能力,与 GatewayGRPCClient / ExtensionGRPCClient 平行。

失败语义(ADR-0001 Decision 2):transport 层 error 在 core 侧由 manager 转化为 log warn + 跳过本次调用。本 client 只做 protobuf 序列化,不吞 error。

func (*MiddlewareGRPCClient) GetWebAssets

func (b *MiddlewareGRPCClient) GetWebAssets() (map[string][]byte, error)

GetWebAssets 获取插件前端静态资源

func (*MiddlewareGRPCClient) HandleHTTPRequest

func (b *MiddlewareGRPCClient) HandleHTTPRequest(ctx context.Context, method, path, query string, headers http.Header, body []byte) (int, http.Header, []byte, error)

HandleHTTPRequest 通用请求代理,Core 透传请求给插件

func (*MiddlewareGRPCClient) HealthCheck

func (b *MiddlewareGRPCClient) HealthCheck(ctx context.Context) error

HealthCheck 健康检查(客户端侧调用)

func (*MiddlewareGRPCClient) Info

func (b *MiddlewareGRPCClient) Info() sdk.PluginInfo

Info 获取插件信息(带缓存)

func (*MiddlewareGRPCClient) Init

func (b *MiddlewareGRPCClient) Init(ctx sdk.PluginContext) error

Init 初始化插件

func (*MiddlewareGRPCClient) OnForwardBegin

OnForwardBegin 调用插件的 OnForwardBegin RPC。

ctx 由 core 侧附 deadline;超时或 transport error 都会返回 error, 上层负责跳过本 middleware(不阻塞主流程)。

func (*MiddlewareGRPCClient) OnForwardEnd

func (c *MiddlewareGRPCClient) OnForwardEnd(ctx context.Context, evt *sdk.MiddlewareEvent) error

OnForwardEnd 调用插件的 OnForwardEnd RPC。

func (*MiddlewareGRPCClient) Start

func (b *MiddlewareGRPCClient) Start(ctx context.Context) error

Start 启动插件

func (*MiddlewareGRPCClient) Stop

func (b *MiddlewareGRPCClient) Stop(ctx context.Context) error

Stop 停止插件

type MiddlewareGRPCPlugin

type MiddlewareGRPCPlugin struct {
	goplugin.Plugin
	Impl     sdk.MiddlewarePlugin
	HostImpl pb.HostServiceServer
}

MiddlewareGRPCPlugin 实现中间件插件的 go-plugin 接口(ADR-0001 Decision 2)。

HostImpl 用法与 GatewayGRPCPlugin 相同:core 侧注入 HostService 实现, 在 GRPCClient 钩子里通过 GRPCBroker 启反向 stream。

func (*MiddlewareGRPCPlugin) GRPCClient

func (p *MiddlewareGRPCPlugin) GRPCClient(_ context.Context, broker *goplugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error)

func (*MiddlewareGRPCPlugin) GRPCServer

func (p *MiddlewareGRPCPlugin) GRPCServer(broker *goplugin.GRPCBroker, s *grpc.Server) error

type MiddlewareGRPCServer

type MiddlewareGRPCServer struct {
	pb.UnimplementedMiddlewareServiceServer
	Impl sdk.MiddlewarePlugin
}

MiddlewareGRPCServer 把 sdk.MiddlewarePlugin 实现包成 gRPC server。

失败语义:插件代码返回 error 时,server 仍然返回 (response, nil) 给 core。 这样 core 在 transport 层不会看到 error,由 core 自己根据 response 内容判断 (或在 core 侧做 deadline 超时控制)。这是 ADR-0001 Decision 2 "middleware 永远 不能 block 生产" 的落地点。

目前 server 端不主动吞 error;client 端会把 transport error 转化为 log warn 然后让 core 跳过这个 middleware。两层都有保护。

func (*MiddlewareGRPCServer) OnForwardBegin

func (*MiddlewareGRPCServer) OnForwardEnd

func (s *MiddlewareGRPCServer) OnForwardEnd(ctx context.Context, evt *pb.MiddlewareEvent) (*pb.Empty, error)

type PluginGRPCServer

type PluginGRPCServer struct {
	pb.UnimplementedPluginServiceServer
	Impl   sdk.Plugin
	Broker *goplugin.GRPCBroker
}

PluginGRPCServer 将 sdk.Plugin 实现包装为 gRPC 服务端

Broker 字段由 GatewayGRPCPlugin / ExtensionGRPCPlugin 在 GRPCServer 钩子里注入。 它代表插件进程侧的 hashicorp/go-plugin GRPCBroker,用于通过 broker.Dial() 拿到 Core 暴露的 HostService 反向连接。

func (*PluginGRPCServer) GetInfo

func (*PluginGRPCServer) GetWebAssets

func (s *PluginGRPCServer) GetWebAssets(_ context.Context, _ *pb.Empty) (*pb.WebAssetsResponse, error)

GetWebAssets 获取插件的前端静态资源

func (*PluginGRPCServer) HandleRequest

func (s *PluginGRPCServer) HandleRequest(ctx context.Context, req *pb.HttpRequest) (*pb.HttpResponse, error)

HandleRequest 通用请求代理,插件实现 RequestHandler 接口即可处理自定义请求

func (*PluginGRPCServer) HealthCheck

func (s *PluginGRPCServer) HealthCheck(ctx context.Context, _ *pb.Empty) (*pb.Empty, error)

HealthCheck 健康检查,如果插件实现了 HealthChecker 接口则调用,否则默认返回成功

func (*PluginGRPCServer) Init

func (s *PluginGRPCServer) Init(_ context.Context, req *pb.InitRequest) (*pb.Empty, error)

func (*PluginGRPCServer) Start

func (s *PluginGRPCServer) Start(ctx context.Context, _ *pb.Empty) (*pb.Empty, error)

func (*PluginGRPCServer) Stop

func (s *PluginGRPCServer) Stop(ctx context.Context, _ *pb.Empty) (*pb.Empty, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL