Documentation
¶
Index ¶
- Constants
- Variables
- func DoRequest(ctx context.Context, envID, method, path string, body []byte) (int, []byte, error)
- func HasActiveTunnel(envID string) bool
- func ProxyHTTPRequest(c *gin.Context, tunnel *AgentTunnel, targetPath string)
- func ProxyRequest(ctx context.Context, tunnel *AgentTunnel, method, path, query string, ...) (int, map[string]string, []byte, error)
- func ProxyWebSocketRequest(c *gin.Context, tunnel *AgentTunnel, targetPath string)
- func StartTunnelClientWithErrors(ctx context.Context, cfg *config.Config, handler http.Handler) (<-chan error, error)
- type AgentTunnel
- type EdgeAwareClient
- type EdgeResponse
- type EnvironmentResolver
- type PendingRequest
- type StatusUpdateCallback
- type TunnelClient
- type TunnelConn
- type TunnelMessage
- type TunnelMessageType
- type TunnelRegistry
- type TunnelServer
Constants ¶
const ( // DefaultHeartbeatInterval is how often the client sends heartbeats DefaultHeartbeatInterval = 30 * time.Second // DefaultWriteTimeout is the timeout for write operations DefaultWriteTimeout = 10 * time.Second // DefaultRequestTimeout is the timeout for executing local requests DefaultRequestTimeout = 5 * time.Minute )
const ( // DefaultProxyTimeout is the default timeout for proxied requests DefaultProxyTimeout = 5 * time.Minute )
const ( // TunnelStaleTimeout is how long before a tunnel is considered stale TunnelStaleTimeout = 2 * time.Minute )
Variables ¶
var DefaultEdgeAwareClient = NewEdgeAwareClient(30 * time.Second)
DefaultEdgeAwareClient is a singleton client with reasonable defaults
Functions ¶
func DoRequest ¶
DoRequest performs an HTTP request through an edge tunnel. This is for service-level calls that need to route through the tunnel. Returns (statusCode, responseBody, error)
func HasActiveTunnel ¶
HasActiveTunnel checks if an environment has an active edge tunnel
func ProxyHTTPRequest ¶
func ProxyHTTPRequest(c *gin.Context, tunnel *AgentTunnel, targetPath string)
ProxyHTTPRequest is a helper that proxies a gin context through a tunnel
func ProxyRequest ¶
func ProxyRequest(ctx context.Context, tunnel *AgentTunnel, method, path, query string, headers map[string]string, body []byte) (int, map[string]string, []byte, error)
ProxyRequest sends an HTTP request through an edge tunnel Returns the response status, headers, and body
func ProxyWebSocketRequest ¶
func ProxyWebSocketRequest(c *gin.Context, tunnel *AgentTunnel, targetPath string)
ProxyWebSocketRequest proxies a WebSocket upgrade through an edge tunnel This handles logs, stats, and other streaming endpoints
Types ¶
type AgentTunnel ¶
type AgentTunnel struct {
EnvironmentID string
Conn *TunnelConn
Pending sync.Map // map[string]*PendingRequest
ConnectedAt time.Time
LastHeartbeat time.Time
// contains filtered or unexported fields
}
AgentTunnel represents an active tunnel connection from an edge agent
func NewAgentTunnel ¶
func NewAgentTunnel(envID string, conn *websocket.Conn) *AgentTunnel
NewAgentTunnel creates a new agent tunnel
func (*AgentTunnel) GetLastHeartbeat ¶
func (t *AgentTunnel) GetLastHeartbeat() time.Time
GetLastHeartbeat returns the last heartbeat time
func (*AgentTunnel) UpdateHeartbeat ¶
func (t *AgentTunnel) UpdateHeartbeat()
UpdateHeartbeat updates the last heartbeat timestamp
type EdgeAwareClient ¶
type EdgeAwareClient struct {
// contains filtered or unexported fields
}
EdgeAwareClient is an HTTP client that automatically routes requests to edge environments through the WebSocket tunnel instead of direct HTTP.
func NewEdgeAwareClient ¶
func NewEdgeAwareClient(timeout time.Duration) *EdgeAwareClient
NewEdgeAwareClient creates a new edge-aware HTTP client
func (*EdgeAwareClient) DoForEnvironment ¶
func (c *EdgeAwareClient) DoForEnvironment( ctx context.Context, envID string, isEdge bool, method string, url string, path string, headers map[string]string, body []byte, ) (*EdgeResponse, error)
DoForEnvironment makes an HTTP request, automatically routing through the edge tunnel if the environment is an edge environment with an active tunnel. Parameters:
- ctx: request context
- envID: environment ID (used to find tunnel for edge envs)
- isEdge: whether this is an edge environment
- method: HTTP method (GET, POST, etc.)
- url: full URL for direct requests (ignored for edge, only path is used)
- path: API path (e.g., "/api/health") - used for both edge and non-edge
- headers: HTTP headers to include
- body: request body (can be nil)
Returns EdgeResponse with status code, body bytes, and headers
type EdgeResponse ¶
EdgeResponse wraps the response from either direct HTTP or tunnel request
type EnvironmentResolver ¶
EnvironmentResolver resolves an agent token to an environment ID
type PendingRequest ¶
type PendingRequest struct {
ResponseCh chan *TunnelMessage
CreatedAt time.Time
}
PendingRequest tracks an in-flight request waiting for response
type StatusUpdateCallback ¶
StatusUpdateCallback is called when an edge agent connects or disconnects The connected parameter is true on connect, false on disconnect
type TunnelClient ¶
type TunnelClient struct {
// contains filtered or unexported fields
}
TunnelClient represents the agent-side tunnel client
func NewTunnelClient ¶
func NewTunnelClient(cfg *config.Config, handler http.Handler) *TunnelClient
NewTunnelClient creates a new tunnel client
func (*TunnelClient) StartWithErrorChan ¶
func (c *TunnelClient) StartWithErrorChan(ctx context.Context, errCh chan error)
StartWithErrorChan runs the tunnel client and optionally emits connection errors.
type TunnelConn ¶
type TunnelConn struct {
// contains filtered or unexported fields
}
TunnelConn wraps a WebSocket connection with send/receive helpers
func NewTunnelConn ¶
func NewTunnelConn(conn *websocket.Conn) *TunnelConn
NewTunnelConn creates a new tunnel connection wrapper
func (*TunnelConn) IsClosed ¶
func (t *TunnelConn) IsClosed() bool
IsClosed returns whether the connection is closed
func (*TunnelConn) Receive ¶
func (t *TunnelConn) Receive() (*TunnelMessage, error)
Receive receives a tunnel message from the connection
func (*TunnelConn) Send ¶
func (t *TunnelConn) Send(msg *TunnelMessage) error
Send sends a tunnel message over the connection
func (*TunnelConn) SendRequest ¶
func (t *TunnelConn) SendRequest(ctx context.Context, msg *TunnelMessage, pending *sync.Map) (*TunnelMessage, error)
SendRequest is a helper for the manager side to send a request and wait for response
type TunnelMessage ¶
type TunnelMessage struct {
ID string `json:"id"` // Unique request/stream ID
Type TunnelMessageType `json:"type"` // Message type
Method string `json:"method,omitempty"` // HTTP method for requests
Path string `json:"path,omitempty"` // Request path
Query string `json:"query,omitempty"` // Query string
Headers map[string]string `json:"headers,omitempty"` // HTTP headers
Body []byte `json:"body,omitempty"` // Request/response body
WSMessageType int `json:"ws_message_type,omitempty"` // WebSocket message type
Status int `json:"status,omitempty"` // HTTP status for responses
}
TunnelMessage represents a message sent over the edge tunnel
func (*TunnelMessage) MarshalJSON ¶
func (m *TunnelMessage) MarshalJSON() ([]byte, error)
MarshalJSON custom marshaler to handle nil body as empty
type TunnelMessageType ¶
type TunnelMessageType string
TunnelMessageType represents the type of message sent over the tunnel
const ( // MessageTypeRequest is sent from manager to agent to initiate a request MessageTypeRequest TunnelMessageType = "request" // MessageTypeResponse is sent from agent to manager with the response MessageTypeResponse TunnelMessageType = "response" // MessageTypeHeartbeat is sent by agents to keep the connection alive MessageTypeHeartbeat TunnelMessageType = "heartbeat" // MessageTypeHeartbeatAck is sent by manager to acknowledge a heartbeat MessageTypeHeartbeatAck TunnelMessageType = "heartbeat_ack" // MessageTypeStreamData is sent for streaming responses (logs, stats) MessageTypeStreamData TunnelMessageType = "stream_data" // MessageTypeStreamEnd indicates end of a stream MessageTypeStreamEnd TunnelMessageType = "stream_end" // MessageTypeWebSocketStart starts a WebSocket stream for logs/stats MessageTypeWebSocketStart TunnelMessageType = "ws_start" // MessageTypeWebSocketData is a WebSocket message in either direction MessageTypeWebSocketData TunnelMessageType = "ws_data" // MessageTypeWebSocketClose closes a WebSocket stream MessageTypeWebSocketClose TunnelMessageType = "ws_close" )
type TunnelRegistry ¶
type TunnelRegistry struct {
// contains filtered or unexported fields
}
TunnelRegistry manages active edge agent tunnel connections
func GetRegistry ¶
func GetRegistry() *TunnelRegistry
GetRegistry returns the global tunnel registry
func NewTunnelRegistry ¶
func NewTunnelRegistry() *TunnelRegistry
NewTunnelRegistry creates a new tunnel registry
func (*TunnelRegistry) CleanupStale ¶
func (r *TunnelRegistry) CleanupStale(maxAge time.Duration) int
CleanupStale removes tunnels that haven't had a heartbeat within the given duration
func (*TunnelRegistry) Get ¶
func (r *TunnelRegistry) Get(envID string) (*AgentTunnel, bool)
Get retrieves a tunnel by environment ID
func (*TunnelRegistry) Register ¶
func (r *TunnelRegistry) Register(envID string, tunnel *AgentTunnel)
Register adds a tunnel to the registry, closing any existing tunnel for the same env
func (*TunnelRegistry) Unregister ¶
func (r *TunnelRegistry) Unregister(envID string)
Unregister removes a tunnel from the registry
type TunnelServer ¶
type TunnelServer struct {
// contains filtered or unexported fields
}
TunnelServer handles incoming edge agent connections on the manager side
func NewTunnelServer ¶
func NewTunnelServer(resolver EnvironmentResolver, statusCallback StatusUpdateCallback) *TunnelServer
NewTunnelServer creates a new tunnel server
func RegisterTunnelRoutes ¶
func RegisterTunnelRoutes(ctx context.Context, group *gin.RouterGroup, resolver EnvironmentResolver, statusCallback StatusUpdateCallback) *TunnelServer
RegisterTunnelRoutes registers the tunnel WebSocket endpoint and returns the server for graceful shutdown. Call server.WaitForCleanupDone() after canceling the context.
func (*TunnelServer) HandleConnect ¶
func (s *TunnelServer) HandleConnect(c *gin.Context)
HandleConnect is the WebSocket handler for edge agent connections This is registered at /api/tunnel/connect
func (*TunnelServer) StartCleanupLoop ¶
func (s *TunnelServer) StartCleanupLoop(ctx context.Context)
StartCleanupLoop periodically cleans up stale tunnels
func (*TunnelServer) WaitForCleanupDone ¶
func (s *TunnelServer) WaitForCleanupDone()
WaitForCleanupDone blocks until the cleanup loop has stopped