edge

package
v1.16.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2026 License: BSD-3-Clause Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultHeartbeatInterval is how often the client sends heartbeats
	DefaultHeartbeatInterval = 30 * time.Second
	// DefaultWriteTimeout is the timeout for write operations
	DefaultWriteTimeout = 10 * time.Second
	// DefaultRequestTimeout is the timeout for executing local requests
	DefaultRequestTimeout = 5 * time.Minute
)
View Source
const (
	// EdgeTransportWebSocket forces WebSocket tunnel transport.
	EdgeTransportWebSocket = "websocket"
	// EdgeTransportGRPC forces gRPC transport without WebSocket fallback.
	EdgeTransportGRPC = "grpc"
	// EdgeTransportAuto prefers gRPC and falls back to WebSocket automatically.
	EdgeTransportAuto = "auto"
)
View Source
const (
	// DefaultProxyTimeout is the default timeout for proxied requests
	DefaultProxyTimeout = 5 * time.Minute
)
View Source
const (
	// TunnelStaleTimeout is how long before a tunnel is considered stale.
	TunnelStaleTimeout = 2 * time.Minute
)

Variables

View Source
var DefaultEdgeAwareClient = NewEdgeAwareClient(30 * time.Second)

DefaultEdgeAwareClient is a singleton client with reasonable defaults

View Source
var ErrNoActiveAgentTunnel = errors.New("no active edge agent tunnel")

ErrNoActiveAgentTunnel is returned when no active agent tunnel exists for outbound event sync.

Functions

func DoRequest

func DoRequest(ctx context.Context, envID, method, path string, body []byte) (int, []byte, error)

DoRequest performs an HTTP request through an edge tunnel. This is for service-level calls that need to route through the tunnel. Returns (statusCode, responseBody, error)

func GetActiveTunnelTransport

func GetActiveTunnelTransport(envID string) (string, bool)

GetActiveTunnelTransport returns the currently active tunnel transport for an environment.

func HasActiveTunnel

func HasActiveTunnel(envID string) bool

HasActiveTunnel checks if an environment has an active edge tunnel

func NormalizeEdgeTransport

func NormalizeEdgeTransport(value string) string

NormalizeEdgeTransport normalizes transport config and defaults to auto-negotiation.

func ProxyHTTPRequest

func ProxyHTTPRequest(c *gin.Context, tunnel *AgentTunnel, targetPath string)

ProxyHTTPRequest is a helper that proxies a gin context through a tunnel

func ProxyRequest

func ProxyRequest(ctx context.Context, tunnel *AgentTunnel, method, path, query string, headers map[string]string, body []byte) (int, map[string]string, []byte, error)

ProxyRequest sends an HTTP request through an edge tunnel Returns the response status, headers, and body

func ProxyWebSocketRequest

func ProxyWebSocketRequest(c *gin.Context, tunnel *AgentTunnel, targetPath string)

ProxyWebSocketRequest proxies a WebSocket upgrade through an edge tunnel. This handles logs, stats, and other streaming endpoints.

func PublishEventToManager

func PublishEventToManager(event *TunnelEvent) error

PublishEventToManager sends an event from the active agent tunnel to the manager.

func SetDefaultRegistry

func SetDefaultRegistry(registry *TunnelRegistry)

SetDefaultRegistry replaces the process-wide default tunnel registry.

func StartTunnelClientWithErrors

func StartTunnelClientWithErrors(ctx context.Context, cfg *config.Config, handler http.Handler) (<-chan error, error)

StartTunnelClientWithErrors starts the tunnel client and returns a channel for connection errors.

func UseGRPCEdgeTransport

func UseGRPCEdgeTransport(cfg *config.Config) bool

UseGRPCEdgeTransport reports whether gRPC should be attempted.

func UseWebSocketEdgeTransport

func UseWebSocketEdgeTransport(cfg *config.Config) bool

UseWebSocketEdgeTransport reports whether WebSocket transport is allowed.

Types

type AgentTunnel

type AgentTunnel struct {
	EnvironmentID string
	Conn          TunnelConnection
	Pending       sync.Map // map[string]*PendingRequest
	ConnectedAt   time.Time
	LastHeartbeat time.Time
	// contains filtered or unexported fields
}

AgentTunnel represents an active tunnel connection from an edge agent

func NewAgentTunnel

func NewAgentTunnel(envID string, conn *websocket.Conn) *AgentTunnel

NewAgentTunnel creates a new agent tunnel.

func NewAgentTunnelWithConn

func NewAgentTunnelWithConn(envID string, conn TunnelConnection) *AgentTunnel

NewAgentTunnelWithConn creates a new agent tunnel from a transport-agnostic connection.

func (*AgentTunnel) Close

func (t *AgentTunnel) Close() error

Close closes the tunnel connection

func (*AgentTunnel) GetLastHeartbeat

func (t *AgentTunnel) GetLastHeartbeat() time.Time

GetLastHeartbeat returns the last heartbeat time

func (*AgentTunnel) UpdateHeartbeat

func (t *AgentTunnel) UpdateHeartbeat()

UpdateHeartbeat updates the last heartbeat timestamp

type EdgeAwareClient

type EdgeAwareClient struct {
	// contains filtered or unexported fields
}

EdgeAwareClient is an HTTP client that automatically routes requests to edge environments through the active edge tunnel instead of direct HTTP. Works with both gRPC and WebSocket tunnel transports.

func NewEdgeAwareClient

func NewEdgeAwareClient(timeout time.Duration) *EdgeAwareClient

NewEdgeAwareClient creates a new edge-aware HTTP client

func (*EdgeAwareClient) DoForEnvironment

func (c *EdgeAwareClient) DoForEnvironment(
	ctx context.Context,
	envID string,
	isEdge bool,
	method string,
	url string,
	path string,
	headers map[string]string,
	body []byte,
) (*EdgeResponse, error)

DoForEnvironment makes an HTTP request, automatically routing through the edge tunnel if the environment is an edge environment with an active tunnel. Parameters:

  • ctx: request context
  • envID: environment ID (used to find tunnel for edge envs)
  • isEdge: whether this is an edge environment
  • method: HTTP method (GET, POST, etc.)
  • url: full URL for direct requests (ignored for edge, only path is used)
  • path: API path (e.g., "/api/health") - used for both edge and non-edge
  • headers: HTTP headers to include
  • body: request body (can be nil)

Returns EdgeResponse with status code, body bytes, and headers

type EdgeResponse

type EdgeResponse struct {
	StatusCode int
	Body       []byte
	Headers    map[string]string
}

EdgeResponse wraps the response from either direct HTTP or tunnel request

func DoEdgeAwareRequest

func DoEdgeAwareRequest(
	ctx context.Context,
	envID string,
	isEdge bool,
	method string,
	url string,
	path string,
	headers map[string]string,
	body []byte,
) (*EdgeResponse, error)

DoRequest is a convenience function for making edge-aware requests using the default client

type EnvironmentResolver

type EnvironmentResolver func(ctx context.Context, token string) (environmentID string, err error)

EnvironmentResolver resolves an agent token to an environment ID.

type EventCallback

type EventCallback func(ctx context.Context, environmentID string, event *TunnelEvent) error

EventCallback is called when an edge agent publishes an event.

type GRPCAgentTunnelConn

type GRPCAgentTunnelConn struct {
	// contains filtered or unexported fields
}

GRPCAgentTunnelConn wraps the agent-side gRPC tunnel stream.

func NewGRPCAgentTunnelConn

func NewGRPCAgentTunnelConn(stream grpcAgentStream) *GRPCAgentTunnelConn

NewGRPCAgentTunnelConn creates an agent-side gRPC tunnel wrapper.

func (*GRPCAgentTunnelConn) Close

func (t *GRPCAgentTunnelConn) Close() error

Close closes the client send stream.

func (*GRPCAgentTunnelConn) IsClosed

func (t *GRPCAgentTunnelConn) IsClosed() bool

IsClosed returns whether the stream is closed.

func (*GRPCAgentTunnelConn) IsExpectedReceiveError

func (t *GRPCAgentTunnelConn) IsExpectedReceiveError(err error) bool

IsExpectedReceiveError returns true for expected gRPC stream shutdown errors.

func (*GRPCAgentTunnelConn) Receive

func (t *GRPCAgentTunnelConn) Receive() (*TunnelMessage, error)

Receive receives a manager->agent tunnel message from gRPC.

func (*GRPCAgentTunnelConn) Send

func (t *GRPCAgentTunnelConn) Send(msg *TunnelMessage) error

Send sends an agent->manager tunnel message over gRPC.

func (*GRPCAgentTunnelConn) SendRequest

func (t *GRPCAgentTunnelConn) SendRequest(ctx context.Context, msg *TunnelMessage, pending *sync.Map) (*TunnelMessage, error)

SendRequest sends a request and waits for response.

type GRPCManagerTunnelConn

type GRPCManagerTunnelConn struct {
	// contains filtered or unexported fields
}

GRPCManagerTunnelConn wraps the manager-side gRPC tunnel stream.

func NewGRPCManagerTunnelConn

func NewGRPCManagerTunnelConn(stream grpcManagerStream) *GRPCManagerTunnelConn

NewGRPCManagerTunnelConn creates a manager-side gRPC tunnel wrapper.

func (*GRPCManagerTunnelConn) Close

func (t *GRPCManagerTunnelConn) Close() error

Close marks the stream closed on manager side.

func (*GRPCManagerTunnelConn) IsClosed

func (t *GRPCManagerTunnelConn) IsClosed() bool

IsClosed returns whether the stream is closed.

func (*GRPCManagerTunnelConn) IsExpectedReceiveError

func (t *GRPCManagerTunnelConn) IsExpectedReceiveError(err error) bool

IsExpectedReceiveError returns true for expected gRPC stream shutdown errors.

func (*GRPCManagerTunnelConn) Receive

func (t *GRPCManagerTunnelConn) Receive() (*TunnelMessage, error)

Receive receives an agent->manager tunnel message from gRPC.

func (*GRPCManagerTunnelConn) Send

Send sends a manager->agent tunnel message over gRPC.

func (*GRPCManagerTunnelConn) SendRequest

func (t *GRPCManagerTunnelConn) SendRequest(ctx context.Context, msg *TunnelMessage, pending *sync.Map) (*TunnelMessage, error)

SendRequest sends a request and waits for response.

type PendingRequest

type PendingRequest struct {
	ResponseCh chan *TunnelMessage
	CreatedAt  time.Time
}

PendingRequest tracks an in-flight request waiting for response.

type StatusUpdateCallback

type StatusUpdateCallback func(ctx context.Context, environmentID string, connected bool)

StatusUpdateCallback is called when an edge agent connects or disconnects. The connected parameter is true on connect, false on disconnect.

type TunnelClient

type TunnelClient struct {
	// contains filtered or unexported fields
}

TunnelClient represents the agent-side tunnel client

func NewTunnelClient

func NewTunnelClient(cfg *config.Config, handler http.Handler) *TunnelClient

NewTunnelClient creates a new tunnel client

func (*TunnelClient) StartWithErrorChan

func (c *TunnelClient) StartWithErrorChan(ctx context.Context, errCh chan error)

StartWithErrorChan runs the tunnel client and optionally emits connection errors.

type TunnelConn

type TunnelConn struct {
	// contains filtered or unexported fields
}

TunnelConn wraps a WebSocket connection with send/receive helpers.

func NewTunnelConn

func NewTunnelConn(conn *websocket.Conn) *TunnelConn

NewTunnelConn creates a new WebSocket tunnel connection wrapper.

func (*TunnelConn) Close

func (t *TunnelConn) Close() error

Close closes the WebSocket tunnel connection.

func (*TunnelConn) IsClosed

func (t *TunnelConn) IsClosed() bool

IsClosed returns whether the connection is closed.

func (*TunnelConn) IsExpectedReceiveError

func (t *TunnelConn) IsExpectedReceiveError(err error) bool

IsExpectedReceiveError returns true for normal WebSocket close/teardown errors.

func (*TunnelConn) Receive

func (t *TunnelConn) Receive() (*TunnelMessage, error)

Receive receives a tunnel message from the WebSocket connection.

func (*TunnelConn) Send

func (t *TunnelConn) Send(msg *TunnelMessage) error

Send sends a tunnel message over the WebSocket connection.

func (*TunnelConn) SendRequest

func (t *TunnelConn) SendRequest(ctx context.Context, msg *TunnelMessage, pending *sync.Map) (*TunnelMessage, error)

SendRequest sends a request and waits for response.

type TunnelConnection

type TunnelConnection interface {
	Send(msg *TunnelMessage) error
	Receive() (*TunnelMessage, error)
	IsExpectedReceiveError(err error) bool
	Close() error
	IsClosed() bool
	SendRequest(ctx context.Context, msg *TunnelMessage, pending *sync.Map) (*TunnelMessage, error)
}

TunnelConnection is the transport contract shared by WebSocket and gRPC wrappers.

type TunnelEvent

type TunnelEvent struct {
	Type         string `json:"type"`
	Severity     string `json:"severity,omitempty"`
	Title        string `json:"title"`
	Description  string `json:"description,omitempty"`
	ResourceType string `json:"resource_type,omitempty"`
	ResourceID   string `json:"resource_id,omitempty"`
	ResourceName string `json:"resource_name,omitempty"`
	UserID       string `json:"user_id,omitempty"`
	Username     string `json:"username,omitempty"`
	MetadataJSON []byte `json:"metadata_json,omitempty"`
}

TunnelEvent is an event payload sent from an agent to the manager.

type TunnelMessage

type TunnelMessage struct {
	ID            string            `json:"id"`                        // Unique request/stream ID
	Type          TunnelMessageType `json:"type"`                      // Message type
	Method        string            `json:"method,omitempty"`          // HTTP method for requests
	Path          string            `json:"path,omitempty"`            // Request path
	Query         string            `json:"query,omitempty"`           // Query string
	Headers       map[string]string `json:"headers,omitempty"`         // HTTP headers
	Body          []byte            `json:"body,omitempty"`            // Request/response body
	WSMessageType int               `json:"ws_message_type,omitempty"` // WebSocket message type
	Status        int               `json:"status,omitempty"`          // HTTP status for responses
	Accepted      bool              `json:"accepted,omitempty"`        // Registration accepted
	AgentToken    string            `json:"agent_token,omitempty"`     // Register request token
	EnvironmentID string            `json:"environment_id,omitempty"`  // Manager-resolved environment ID
	Error         string            `json:"error,omitempty"`           // Error field for register response
	Event         *TunnelEvent      `json:"event,omitempty"`           // Agent event payload
}

TunnelMessage represents a transport-agnostic edge tunnel message.

func (*TunnelMessage) MarshalJSON

func (m *TunnelMessage) MarshalJSON() ([]byte, error)

MarshalJSON custom marshaler to handle nil body as empty.

type TunnelMessageType

type TunnelMessageType string

TunnelMessageType represents the type of message sent over the tunnel.

const (
	// MessageTypeRequest is sent from manager to agent to initiate a request.
	MessageTypeRequest TunnelMessageType = "request"
	// MessageTypeResponse is sent from agent to manager with the response.
	MessageTypeResponse TunnelMessageType = "response"
	// MessageTypeHeartbeat is sent by agents to keep the connection alive.
	MessageTypeHeartbeat TunnelMessageType = "heartbeat"
	// MessageTypeHeartbeatAck is sent by manager to acknowledge a heartbeat.
	MessageTypeHeartbeatAck TunnelMessageType = "heartbeat_ack"
	// MessageTypeStreamData is sent for streaming responses (logs, stats).
	MessageTypeStreamData TunnelMessageType = "stream_data"
	// MessageTypeStreamEnd indicates end of a stream.
	MessageTypeStreamEnd TunnelMessageType = "stream_end"
	// MessageTypeWebSocketStart starts a WebSocket stream for logs/stats.
	MessageTypeWebSocketStart TunnelMessageType = "ws_start"
	// MessageTypeWebSocketData is a WebSocket message in either direction.
	MessageTypeWebSocketData TunnelMessageType = "ws_data"
	// MessageTypeWebSocketClose closes a WebSocket stream.
	MessageTypeWebSocketClose TunnelMessageType = "ws_close"
	// MessageTypeRegister is the first message sent by the agent on gRPC transport.
	MessageTypeRegister TunnelMessageType = "register"
	// MessageTypeRegisterResponse is sent by manager after register validation.
	MessageTypeRegisterResponse TunnelMessageType = "register_response"
	// MessageTypeEvent carries an event emitted by an agent to the manager.
	MessageTypeEvent TunnelMessageType = "event"
)

type TunnelRegistry

type TunnelRegistry struct {
	// contains filtered or unexported fields
}

TunnelRegistry manages active edge agent tunnel connections

func GetRegistry

func GetRegistry() *TunnelRegistry

GetRegistry returns the global tunnel registry

func NewTunnelRegistry

func NewTunnelRegistry() *TunnelRegistry

NewTunnelRegistry creates a new tunnel registry

func (*TunnelRegistry) CleanupStale

func (r *TunnelRegistry) CleanupStale(maxAge time.Duration) int

CleanupStale removes tunnels that haven't had a heartbeat within the given duration

func (*TunnelRegistry) Get

func (r *TunnelRegistry) Get(envID string) (*AgentTunnel, bool)

Get retrieves a tunnel by environment ID

func (*TunnelRegistry) Register

func (r *TunnelRegistry) Register(envID string, tunnel *AgentTunnel)

Register adds a tunnel to the registry, closing any existing tunnel for the same env

func (*TunnelRegistry) Unregister

func (r *TunnelRegistry) Unregister(envID string)

Unregister removes a tunnel from the registry

type TunnelRuntimeState

type TunnelRuntimeState struct {
	Transport     string
	ConnectedAt   *time.Time
	LastHeartbeat *time.Time
}

TunnelRuntimeState describes the live, in-memory state of an active edge tunnel.

func GetTunnelRuntimeState

func GetTunnelRuntimeState(envID string) (*TunnelRuntimeState, bool)

GetTunnelRuntimeState returns live metadata for an active tunnel.

type TunnelServer

type TunnelServer struct {
	// contains filtered or unexported fields
}

TunnelServer handles incoming edge agent connections on the manager side.

func NewTunnelServer

func NewTunnelServer(resolver EnvironmentResolver, statusCallback StatusUpdateCallback) *TunnelServer

NewTunnelServer creates a new tunnel server.

func NewTunnelServerWithRegistry

func NewTunnelServerWithRegistry(registry *TunnelRegistry, resolver EnvironmentResolver, statusCallback StatusUpdateCallback) *TunnelServer

NewTunnelServerWithRegistry creates a new tunnel server using an injected tunnel registry.

func (*TunnelServer) Connect

Connect is the gRPC bidi stream handler for edge agent connections.

func (*TunnelServer) GRPCServerOptions

func (s *TunnelServer) GRPCServerOptions(ctx context.Context) []grpc.ServerOption

GRPCServerOptions returns the stream interceptor chain used by the tunnel service.

func (*TunnelServer) HandleConnect

func (s *TunnelServer) HandleConnect(c *gin.Context)

HandleConnect is the WebSocket handler for edge agent connections. This is registered at /api/tunnel/connect.

func (*TunnelServer) SetEventCallback

func (s *TunnelServer) SetEventCallback(callback EventCallback)

SetEventCallback configures the manager callback invoked for agent events.

func (*TunnelServer) StartCleanupLoop

func (s *TunnelServer) StartCleanupLoop(ctx context.Context)

StartCleanupLoop periodically cleans up stale tunnels.

func (*TunnelServer) WaitForCleanupDone

func (s *TunnelServer) WaitForCleanupDone()

WaitForCleanupDone blocks until the cleanup loop has stopped.

Directories

Path Synopsis
proto

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL