Documentation
¶
Index ¶
- Constants
- Variables
- func DoRequest(ctx context.Context, envID, method, path string, body []byte) (int, []byte, error)
- func GetActiveTunnelTransport(envID string) (string, bool)
- func HasActiveTunnel(envID string) bool
- func NormalizeEdgeTransport(value string) string
- func ProxyHTTPRequest(c *gin.Context, tunnel *AgentTunnel, targetPath string)
- func ProxyRequest(ctx context.Context, tunnel *AgentTunnel, method, path, query string, ...) (int, map[string]string, []byte, error)
- func ProxyWebSocketRequest(c *gin.Context, tunnel *AgentTunnel, targetPath string)
- func PublishEventToManager(event *TunnelEvent) error
- func SetDefaultRegistry(registry *TunnelRegistry)
- func StartTunnelClientWithErrors(ctx context.Context, cfg *config.Config, handler http.Handler) (<-chan error, error)
- func UseGRPCEdgeTransport(cfg *config.Config) bool
- func UseWebSocketEdgeTransport(cfg *config.Config) bool
- type AgentTunnel
- type EdgeAwareClient
- type EdgeResponse
- type EnvironmentResolver
- type EventCallback
- type GRPCAgentTunnelConn
- func (t *GRPCAgentTunnelConn) Close() error
- func (t *GRPCAgentTunnelConn) IsClosed() bool
- func (t *GRPCAgentTunnelConn) IsExpectedReceiveError(err error) bool
- func (t *GRPCAgentTunnelConn) Receive() (*TunnelMessage, error)
- func (t *GRPCAgentTunnelConn) Send(msg *TunnelMessage) error
- func (t *GRPCAgentTunnelConn) SendRequest(ctx context.Context, msg *TunnelMessage, pending *sync.Map) (*TunnelMessage, error)
- type GRPCManagerTunnelConn
- func (t *GRPCManagerTunnelConn) Close() error
- func (t *GRPCManagerTunnelConn) IsClosed() bool
- func (t *GRPCManagerTunnelConn) IsExpectedReceiveError(err error) bool
- func (t *GRPCManagerTunnelConn) Receive() (*TunnelMessage, error)
- func (t *GRPCManagerTunnelConn) Send(msg *TunnelMessage) error
- func (t *GRPCManagerTunnelConn) SendRequest(ctx context.Context, msg *TunnelMessage, pending *sync.Map) (*TunnelMessage, error)
- type PendingRequest
- type StatusUpdateCallback
- type TunnelClient
- type TunnelConn
- func (t *TunnelConn) Close() error
- func (t *TunnelConn) IsClosed() bool
- func (t *TunnelConn) IsExpectedReceiveError(err error) bool
- func (t *TunnelConn) Receive() (*TunnelMessage, error)
- func (t *TunnelConn) Send(msg *TunnelMessage) error
- func (t *TunnelConn) SendRequest(ctx context.Context, msg *TunnelMessage, pending *sync.Map) (*TunnelMessage, error)
- type TunnelConnection
- type TunnelEvent
- type TunnelMessage
- type TunnelMessageType
- type TunnelRegistry
- type TunnelRuntimeState
- type TunnelServer
- func (s *TunnelServer) Connect(...) error
- func (s *TunnelServer) GRPCServerOptions(ctx context.Context) []grpc.ServerOption
- func (s *TunnelServer) HandleConnect(c *gin.Context)
- func (s *TunnelServer) SetEventCallback(callback EventCallback)
- func (s *TunnelServer) StartCleanupLoop(ctx context.Context)
- func (s *TunnelServer) WaitForCleanupDone()
Constants ¶
const ( // DefaultHeartbeatInterval is how often the client sends heartbeats DefaultHeartbeatInterval = 30 * time.Second // DefaultWriteTimeout is the timeout for write operations DefaultWriteTimeout = 10 * time.Second // DefaultRequestTimeout is the timeout for executing local requests DefaultRequestTimeout = 5 * time.Minute )
const ( // EdgeTransportWebSocket forces WebSocket tunnel transport. EdgeTransportWebSocket = "websocket" // EdgeTransportGRPC forces gRPC transport without WebSocket fallback. EdgeTransportGRPC = "grpc" // EdgeTransportAuto prefers gRPC and falls back to WebSocket automatically. EdgeTransportAuto = "auto" )
const ( // DefaultProxyTimeout is the default timeout for proxied requests DefaultProxyTimeout = 5 * time.Minute )
const ( // TunnelStaleTimeout is how long before a tunnel is considered stale. TunnelStaleTimeout = 2 * time.Minute )
Variables ¶
var DefaultEdgeAwareClient = NewEdgeAwareClient(30 * time.Second)
DefaultEdgeAwareClient is a singleton client with reasonable defaults
var ErrNoActiveAgentTunnel = errors.New("no active edge agent tunnel")
ErrNoActiveAgentTunnel is returned when no active agent tunnel exists for outbound event sync.
Functions ¶
func DoRequest ¶
DoRequest performs an HTTP request through an edge tunnel. This is for service-level calls that need to route through the tunnel. Returns (statusCode, responseBody, error)
func GetActiveTunnelTransport ¶
GetActiveTunnelTransport returns the currently active tunnel transport for an environment.
func HasActiveTunnel ¶
HasActiveTunnel checks if an environment has an active edge tunnel
func NormalizeEdgeTransport ¶
NormalizeEdgeTransport normalizes transport config and defaults to auto-negotiation.
func ProxyHTTPRequest ¶
func ProxyHTTPRequest(c *gin.Context, tunnel *AgentTunnel, targetPath string)
ProxyHTTPRequest is a helper that proxies a gin context through a tunnel
func ProxyRequest ¶
func ProxyRequest(ctx context.Context, tunnel *AgentTunnel, method, path, query string, headers map[string]string, body []byte) (int, map[string]string, []byte, error)
ProxyRequest sends an HTTP request through an edge tunnel Returns the response status, headers, and body
func ProxyWebSocketRequest ¶
func ProxyWebSocketRequest(c *gin.Context, tunnel *AgentTunnel, targetPath string)
ProxyWebSocketRequest proxies a WebSocket upgrade through an edge tunnel. This handles logs, stats, and other streaming endpoints.
func PublishEventToManager ¶
func PublishEventToManager(event *TunnelEvent) error
PublishEventToManager sends an event from the active agent tunnel to the manager.
func SetDefaultRegistry ¶
func SetDefaultRegistry(registry *TunnelRegistry)
SetDefaultRegistry replaces the process-wide default tunnel registry.
func StartTunnelClientWithErrors ¶
func StartTunnelClientWithErrors(ctx context.Context, cfg *config.Config, handler http.Handler) (<-chan error, error)
StartTunnelClientWithErrors starts the tunnel client and returns a channel for connection errors.
func UseGRPCEdgeTransport ¶
UseGRPCEdgeTransport reports whether gRPC should be attempted.
func UseWebSocketEdgeTransport ¶
UseWebSocketEdgeTransport reports whether WebSocket transport is allowed.
Types ¶
type AgentTunnel ¶
type AgentTunnel struct {
EnvironmentID string
Conn TunnelConnection
Pending sync.Map // map[string]*PendingRequest
ConnectedAt time.Time
LastHeartbeat time.Time
// contains filtered or unexported fields
}
AgentTunnel represents an active tunnel connection from an edge agent
func NewAgentTunnel ¶
func NewAgentTunnel(envID string, conn *websocket.Conn) *AgentTunnel
NewAgentTunnel creates a new agent tunnel.
func NewAgentTunnelWithConn ¶
func NewAgentTunnelWithConn(envID string, conn TunnelConnection) *AgentTunnel
NewAgentTunnelWithConn creates a new agent tunnel from a transport-agnostic connection.
func (*AgentTunnel) GetLastHeartbeat ¶
func (t *AgentTunnel) GetLastHeartbeat() time.Time
GetLastHeartbeat returns the last heartbeat time
func (*AgentTunnel) UpdateHeartbeat ¶
func (t *AgentTunnel) UpdateHeartbeat()
UpdateHeartbeat updates the last heartbeat timestamp
type EdgeAwareClient ¶
type EdgeAwareClient struct {
// contains filtered or unexported fields
}
EdgeAwareClient is an HTTP client that automatically routes requests to edge environments through the active edge tunnel instead of direct HTTP. Works with both gRPC and WebSocket tunnel transports.
func NewEdgeAwareClient ¶
func NewEdgeAwareClient(timeout time.Duration) *EdgeAwareClient
NewEdgeAwareClient creates a new edge-aware HTTP client
func (*EdgeAwareClient) DoForEnvironment ¶
func (c *EdgeAwareClient) DoForEnvironment( ctx context.Context, envID string, isEdge bool, method string, url string, path string, headers map[string]string, body []byte, ) (*EdgeResponse, error)
DoForEnvironment makes an HTTP request, automatically routing through the edge tunnel if the environment is an edge environment with an active tunnel. Parameters:
- ctx: request context
- envID: environment ID (used to find tunnel for edge envs)
- isEdge: whether this is an edge environment
- method: HTTP method (GET, POST, etc.)
- url: full URL for direct requests (ignored for edge, only path is used)
- path: API path (e.g., "/api/health") - used for both edge and non-edge
- headers: HTTP headers to include
- body: request body (can be nil)
Returns EdgeResponse with status code, body bytes, and headers
type EdgeResponse ¶
EdgeResponse wraps the response from either direct HTTP or tunnel request
type EnvironmentResolver ¶
EnvironmentResolver resolves an agent token to an environment ID.
type EventCallback ¶
type EventCallback func(ctx context.Context, environmentID string, event *TunnelEvent) error
EventCallback is called when an edge agent publishes an event.
type GRPCAgentTunnelConn ¶
type GRPCAgentTunnelConn struct {
// contains filtered or unexported fields
}
GRPCAgentTunnelConn wraps the agent-side gRPC tunnel stream.
func NewGRPCAgentTunnelConn ¶
func NewGRPCAgentTunnelConn(stream grpcAgentStream) *GRPCAgentTunnelConn
NewGRPCAgentTunnelConn creates an agent-side gRPC tunnel wrapper.
func (*GRPCAgentTunnelConn) Close ¶
func (t *GRPCAgentTunnelConn) Close() error
Close closes the client send stream.
func (*GRPCAgentTunnelConn) IsClosed ¶
func (t *GRPCAgentTunnelConn) IsClosed() bool
IsClosed returns whether the stream is closed.
func (*GRPCAgentTunnelConn) IsExpectedReceiveError ¶
func (t *GRPCAgentTunnelConn) IsExpectedReceiveError(err error) bool
IsExpectedReceiveError returns true for expected gRPC stream shutdown errors.
func (*GRPCAgentTunnelConn) Receive ¶
func (t *GRPCAgentTunnelConn) Receive() (*TunnelMessage, error)
Receive receives a manager->agent tunnel message from gRPC.
func (*GRPCAgentTunnelConn) Send ¶
func (t *GRPCAgentTunnelConn) Send(msg *TunnelMessage) error
Send sends an agent->manager tunnel message over gRPC.
func (*GRPCAgentTunnelConn) SendRequest ¶
func (t *GRPCAgentTunnelConn) SendRequest(ctx context.Context, msg *TunnelMessage, pending *sync.Map) (*TunnelMessage, error)
SendRequest sends a request and waits for response.
type GRPCManagerTunnelConn ¶
type GRPCManagerTunnelConn struct {
// contains filtered or unexported fields
}
GRPCManagerTunnelConn wraps the manager-side gRPC tunnel stream.
func NewGRPCManagerTunnelConn ¶
func NewGRPCManagerTunnelConn(stream grpcManagerStream) *GRPCManagerTunnelConn
NewGRPCManagerTunnelConn creates a manager-side gRPC tunnel wrapper.
func (*GRPCManagerTunnelConn) Close ¶
func (t *GRPCManagerTunnelConn) Close() error
Close marks the stream closed on manager side.
func (*GRPCManagerTunnelConn) IsClosed ¶
func (t *GRPCManagerTunnelConn) IsClosed() bool
IsClosed returns whether the stream is closed.
func (*GRPCManagerTunnelConn) IsExpectedReceiveError ¶
func (t *GRPCManagerTunnelConn) IsExpectedReceiveError(err error) bool
IsExpectedReceiveError returns true for expected gRPC stream shutdown errors.
func (*GRPCManagerTunnelConn) Receive ¶
func (t *GRPCManagerTunnelConn) Receive() (*TunnelMessage, error)
Receive receives an agent->manager tunnel message from gRPC.
func (*GRPCManagerTunnelConn) Send ¶
func (t *GRPCManagerTunnelConn) Send(msg *TunnelMessage) error
Send sends a manager->agent tunnel message over gRPC.
func (*GRPCManagerTunnelConn) SendRequest ¶
func (t *GRPCManagerTunnelConn) SendRequest(ctx context.Context, msg *TunnelMessage, pending *sync.Map) (*TunnelMessage, error)
SendRequest sends a request and waits for response.
type PendingRequest ¶
type PendingRequest struct {
ResponseCh chan *TunnelMessage
CreatedAt time.Time
}
PendingRequest tracks an in-flight request waiting for response.
type StatusUpdateCallback ¶
StatusUpdateCallback is called when an edge agent connects or disconnects. The connected parameter is true on connect, false on disconnect.
type TunnelClient ¶
type TunnelClient struct {
// contains filtered or unexported fields
}
TunnelClient represents the agent-side tunnel client
func NewTunnelClient ¶
func NewTunnelClient(cfg *config.Config, handler http.Handler) *TunnelClient
NewTunnelClient creates a new tunnel client
func (*TunnelClient) StartWithErrorChan ¶
func (c *TunnelClient) StartWithErrorChan(ctx context.Context, errCh chan error)
StartWithErrorChan runs the tunnel client and optionally emits connection errors.
type TunnelConn ¶
type TunnelConn struct {
// contains filtered or unexported fields
}
TunnelConn wraps a WebSocket connection with send/receive helpers.
func NewTunnelConn ¶
func NewTunnelConn(conn *websocket.Conn) *TunnelConn
NewTunnelConn creates a new WebSocket tunnel connection wrapper.
func (*TunnelConn) Close ¶
func (t *TunnelConn) Close() error
Close closes the WebSocket tunnel connection.
func (*TunnelConn) IsClosed ¶
func (t *TunnelConn) IsClosed() bool
IsClosed returns whether the connection is closed.
func (*TunnelConn) IsExpectedReceiveError ¶
func (t *TunnelConn) IsExpectedReceiveError(err error) bool
IsExpectedReceiveError returns true for normal WebSocket close/teardown errors.
func (*TunnelConn) Receive ¶
func (t *TunnelConn) Receive() (*TunnelMessage, error)
Receive receives a tunnel message from the WebSocket connection.
func (*TunnelConn) Send ¶
func (t *TunnelConn) Send(msg *TunnelMessage) error
Send sends a tunnel message over the WebSocket connection.
func (*TunnelConn) SendRequest ¶
func (t *TunnelConn) SendRequest(ctx context.Context, msg *TunnelMessage, pending *sync.Map) (*TunnelMessage, error)
SendRequest sends a request and waits for response.
type TunnelConnection ¶
type TunnelConnection interface {
Send(msg *TunnelMessage) error
Receive() (*TunnelMessage, error)
IsExpectedReceiveError(err error) bool
Close() error
IsClosed() bool
SendRequest(ctx context.Context, msg *TunnelMessage, pending *sync.Map) (*TunnelMessage, error)
}
TunnelConnection is the transport contract shared by WebSocket and gRPC wrappers.
type TunnelEvent ¶
type TunnelEvent struct {
Type string `json:"type"`
Severity string `json:"severity,omitempty"`
Title string `json:"title"`
Description string `json:"description,omitempty"`
ResourceType string `json:"resource_type,omitempty"`
ResourceID string `json:"resource_id,omitempty"`
ResourceName string `json:"resource_name,omitempty"`
UserID string `json:"user_id,omitempty"`
Username string `json:"username,omitempty"`
MetadataJSON []byte `json:"metadata_json,omitempty"`
}
TunnelEvent is an event payload sent from an agent to the manager.
type TunnelMessage ¶
type TunnelMessage struct {
ID string `json:"id"` // Unique request/stream ID
Type TunnelMessageType `json:"type"` // Message type
Method string `json:"method,omitempty"` // HTTP method for requests
Path string `json:"path,omitempty"` // Request path
Query string `json:"query,omitempty"` // Query string
Headers map[string]string `json:"headers,omitempty"` // HTTP headers
Body []byte `json:"body,omitempty"` // Request/response body
WSMessageType int `json:"ws_message_type,omitempty"` // WebSocket message type
Status int `json:"status,omitempty"` // HTTP status for responses
Accepted bool `json:"accepted,omitempty"` // Registration accepted
AgentToken string `json:"agent_token,omitempty"` // Register request token
EnvironmentID string `json:"environment_id,omitempty"` // Manager-resolved environment ID
Error string `json:"error,omitempty"` // Error field for register response
Event *TunnelEvent `json:"event,omitempty"` // Agent event payload
}
TunnelMessage represents a transport-agnostic edge tunnel message.
func (*TunnelMessage) MarshalJSON ¶
func (m *TunnelMessage) MarshalJSON() ([]byte, error)
MarshalJSON custom marshaler to handle nil body as empty.
type TunnelMessageType ¶
type TunnelMessageType string
TunnelMessageType represents the type of message sent over the tunnel.
const ( // MessageTypeRequest is sent from manager to agent to initiate a request. MessageTypeRequest TunnelMessageType = "request" // MessageTypeResponse is sent from agent to manager with the response. MessageTypeResponse TunnelMessageType = "response" // MessageTypeHeartbeat is sent by agents to keep the connection alive. MessageTypeHeartbeat TunnelMessageType = "heartbeat" // MessageTypeHeartbeatAck is sent by manager to acknowledge a heartbeat. MessageTypeHeartbeatAck TunnelMessageType = "heartbeat_ack" // MessageTypeStreamData is sent for streaming responses (logs, stats). MessageTypeStreamData TunnelMessageType = "stream_data" // MessageTypeStreamEnd indicates end of a stream. MessageTypeStreamEnd TunnelMessageType = "stream_end" // MessageTypeWebSocketStart starts a WebSocket stream for logs/stats. MessageTypeWebSocketStart TunnelMessageType = "ws_start" // MessageTypeWebSocketData is a WebSocket message in either direction. MessageTypeWebSocketData TunnelMessageType = "ws_data" // MessageTypeWebSocketClose closes a WebSocket stream. MessageTypeWebSocketClose TunnelMessageType = "ws_close" // MessageTypeRegister is the first message sent by the agent on gRPC transport. MessageTypeRegister TunnelMessageType = "register" // MessageTypeRegisterResponse is sent by manager after register validation. MessageTypeRegisterResponse TunnelMessageType = "register_response" // MessageTypeEvent carries an event emitted by an agent to the manager. MessageTypeEvent TunnelMessageType = "event" )
type TunnelRegistry ¶
type TunnelRegistry struct {
// contains filtered or unexported fields
}
TunnelRegistry manages active edge agent tunnel connections
func GetRegistry ¶
func GetRegistry() *TunnelRegistry
GetRegistry returns the global tunnel registry
func NewTunnelRegistry ¶
func NewTunnelRegistry() *TunnelRegistry
NewTunnelRegistry creates a new tunnel registry
func (*TunnelRegistry) CleanupStale ¶
func (r *TunnelRegistry) CleanupStale(maxAge time.Duration) int
CleanupStale removes tunnels that haven't had a heartbeat within the given duration
func (*TunnelRegistry) Get ¶
func (r *TunnelRegistry) Get(envID string) (*AgentTunnel, bool)
Get retrieves a tunnel by environment ID
func (*TunnelRegistry) Register ¶
func (r *TunnelRegistry) Register(envID string, tunnel *AgentTunnel)
Register adds a tunnel to the registry, closing any existing tunnel for the same env
func (*TunnelRegistry) Unregister ¶
func (r *TunnelRegistry) Unregister(envID string)
Unregister removes a tunnel from the registry
type TunnelRuntimeState ¶
TunnelRuntimeState describes the live, in-memory state of an active edge tunnel.
func GetTunnelRuntimeState ¶
func GetTunnelRuntimeState(envID string) (*TunnelRuntimeState, bool)
GetTunnelRuntimeState returns live metadata for an active tunnel.
type TunnelServer ¶
type TunnelServer struct {
// contains filtered or unexported fields
}
TunnelServer handles incoming edge agent connections on the manager side.
func NewTunnelServer ¶
func NewTunnelServer(resolver EnvironmentResolver, statusCallback StatusUpdateCallback) *TunnelServer
NewTunnelServer creates a new tunnel server.
func NewTunnelServerWithRegistry ¶
func NewTunnelServerWithRegistry(registry *TunnelRegistry, resolver EnvironmentResolver, statusCallback StatusUpdateCallback) *TunnelServer
NewTunnelServerWithRegistry creates a new tunnel server using an injected tunnel registry.
func (*TunnelServer) Connect ¶
func (s *TunnelServer) Connect(stream grpc.BidiStreamingServer[tunnelpb.AgentMessage, tunnelpb.ManagerMessage]) error
Connect is the gRPC bidi stream handler for edge agent connections.
func (*TunnelServer) GRPCServerOptions ¶
func (s *TunnelServer) GRPCServerOptions(ctx context.Context) []grpc.ServerOption
GRPCServerOptions returns the stream interceptor chain used by the tunnel service.
func (*TunnelServer) HandleConnect ¶
func (s *TunnelServer) HandleConnect(c *gin.Context)
HandleConnect is the WebSocket handler for edge agent connections. This is registered at /api/tunnel/connect.
func (*TunnelServer) SetEventCallback ¶
func (s *TunnelServer) SetEventCallback(callback EventCallback)
SetEventCallback configures the manager callback invoked for agent events.
func (*TunnelServer) StartCleanupLoop ¶
func (s *TunnelServer) StartCleanupLoop(ctx context.Context)
StartCleanupLoop periodically cleans up stale tunnels.
func (*TunnelServer) WaitForCleanupDone ¶
func (s *TunnelServer) WaitForCleanupDone()
WaitForCleanupDone blocks until the cleanup loop has stopped.