Documentation
¶
Overview ¶
Package http provides an HTTP gateway implementation for bridging REST APIs to NATS services.
Overview ¶
The http package implements a Gateway component that maps HTTP routes to NATS request/reply subjects. It translates HTTP requests into NATS messages and forwards responses back to HTTP clients, enabling REST API access to internal NATS services.
Key features:
- Route mapping from HTTP paths to NATS subjects
- Configurable request timeouts per route
- CORS support with configurable origins
- Request size limits
- Distributed tracing via X-Request-ID headers
- Error sanitization (prevents internal details leaking to clients)
Architecture ¶
┌────────────────────────────────────────────────────────────────────────┐ │ HTTP Gateway │ ├────────────────────────────────────────────────────────────────────────┤ │ HTTP Mux → Route Handler → NATS Request → Service │ │ ↓ ↓ │ │ ← JSON Response ← NATS Reply ← Response │ └────────────────────────────────────────────────────────────────────────┘
Usage ¶
Register the gateway and configure routes:
err := http.Register(registry)
Or configure in a flow definition:
{
"type": "gateway",
"name": "http",
"config": {
"routes": [
{
"path": "/api/v1/entities/{id}",
"method": "GET",
"nats_subject": "graph.query.entity",
"timeout": "5s"
},
{
"path": "/api/v1/search",
"method": "POST",
"nats_subject": "graph.query.globalSearch",
"timeout": "30s"
}
],
"enable_cors": true,
"cors_origins": ["https://example.com"],
"max_request_size": 1048576
}
}
Register handlers with an HTTP server:
gateway.RegisterHTTPHandlers("/api", mux)
Route Configuration ¶
Each route mapping specifies:
- path: HTTP path pattern
- method: HTTP method (GET, POST, PUT, DELETE)
- nats_subject: Target NATS subject for request/reply
- timeout: Request timeout (default: from gateway config)
Request Processing ¶
Request flow:
- Extract or generate X-Request-ID for distributed tracing
- Validate HTTP method against route configuration
- Apply CORS headers if enabled
- Read and validate request body (size limit enforced)
- Forward to NATS subject with timeout
- Return response with Content-Type: application/json
Error Handling ¶
Errors are mapped to appropriate HTTP status codes:
- Invalid errors → 400 Bad Request
- Transient errors → 503 Service Unavailable or 504 Gateway Timeout
- Fatal errors → 500 Internal Server Error
- Pattern matching → 404 Not Found, 403 Forbidden
Error messages are sanitized to prevent information disclosure:
- Internal details logged but not returned to clients
- NATS subjects never exposed
- Generic messages returned (e.g., "service temporarily unavailable")
Configuration ¶
Gateway configuration options:
Routes: []RouteMapping # Route definitions EnableCORS: true # Enable CORS headers CORSOrigins: ["*"] # Allowed origins (use specific domains in production) MaxRequestSize: 1048576 # Max request body size (1MB default)
Distributed Tracing ¶
The gateway supports distributed tracing via request IDs:
- Incoming X-Request-ID header is preserved if present
- New request ID generated if not provided (crypto/rand based)
- Request ID returned in response X-Request-ID header
- Request ID can be propagated to downstream NATS services
Thread Safety ¶
The Gateway is safe for concurrent use. Metrics use atomic operations, and state is protected by RWMutex where needed.
Metrics ¶
The gateway tracks:
- requestsTotal: Total HTTP requests received
- requestsSuccess: Successful requests
- requestsFailed: Failed requests
- bytesReceived: Total bytes in request bodies
- bytesSent: Total bytes in responses
DataFlow() returns calculated rates from these metrics.
See Also ¶
Related packages:
- github.com/c360studio/semstreams/gateway: Gateway interface and Config types
- github.com/c360studio/semstreams/natsclient: NATS connection management
- github.com/c360studio/semstreams/component: Component lifecycle interface
Package http provides HTTP gateway implementation for SemStreams.
Index ¶
- func NewGateway(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry *component.Registry) error
- type Gateway
- func (g *Gateway) ConfigSchema() component.ConfigSchema
- func (g *Gateway) DataFlow() component.FlowMetrics
- func (g *Gateway) Health() component.HealthStatus
- func (g *Gateway) Initialize() error
- func (g *Gateway) InputPorts() []component.Port
- func (g *Gateway) Meta() component.Metadata
- func (g *Gateway) OutputPorts() []component.Port
- func (g *Gateway) RegisterHTTPHandlers(prefix string, mux *http.ServeMux)
- func (g *Gateway) Start(ctx context.Context) error
- func (g *Gateway) Stop(_ time.Duration) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewGateway ¶
func NewGateway(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
NewGateway creates a new HTTP gateway from configuration
Types ¶
type Gateway ¶
type Gateway struct {
// contains filtered or unexported fields
}
Gateway implements the Gateway interface for HTTP protocol
func (*Gateway) ConfigSchema ¶
func (g *Gateway) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema
func (*Gateway) DataFlow ¶
func (g *Gateway) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics
func (*Gateway) Health ¶
func (g *Gateway) Health() component.HealthStatus
Health returns the current health status
func (*Gateway) Initialize ¶
Initialize prepares the HTTP gateway
func (*Gateway) InputPorts ¶
InputPorts returns no input ports (gateway is request-driven)
func (*Gateway) OutputPorts ¶
OutputPorts returns no output ports (gateway uses request/reply)
func (*Gateway) RegisterHTTPHandlers ¶
RegisterHTTPHandlers registers gateway routes with the HTTP mux