http

package
v1.0.0-alpha.39 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2026 License: MIT Imports: 18 Imported by: 0

README

HTTP Gateway Component

The HTTP Gateway component enables external HTTP/REST clients to query SemStreams via bidirectional NATS request/reply patterns.

Overview

Type: Gateway Protocol: HTTP/REST Pattern: Request/Reply (External ↔ NATS) Read-Only: Yes (default)

The HTTP Gateway bridges external HTTP requests to internal NATS request/reply subjects, enabling REST API access to SemStreams components like GraphProcessor without requiring direct NATS connections.

Component Type: Gateway

SemStreams has 5 component types:

Type Pattern Example
Input External → NATS UDP, WebSocket ingestion
Processor NATS → Transform → NATS JSONFilter, GraphProcessor
Output NATS → External File, HTTP POST, WebSocket push
Storage NATS → Persistent Store ObjectStore (JetStream)
Gateway External ↔ NATS HTTP request/reply (this component)

Key Difference from Output:

  • Output: Unidirectional push (NATS → External)
  • Gateway: Bidirectional request/reply (External ↔ NATS ↔ External)

Architecture

┌──────────────────┐
│  HTTP Client     │  POST /api-gateway/search/semantic
└────────┬─────────┘
         ↓ HTTP Request
┌────────────────────────────────────────┐
│  ServiceManager (Port 8080)            │
│  /api-gateway/* → HTTPGateway handlers │
└────────┬───────────────────────────────┘
         ↓ NATS Request/Reply
┌────────────────────────────────────────┐
│  graph-processor Component             │
│  Subscribed to graph.query.semantic    │
└────────┬───────────────────────────────┘
         ↓ NATS Reply
┌────────────────────────────────────────┐
│  ServiceManager                        │
│  Translate reply → HTTP Response       │
└────────┬───────────────────────────────┘
         ↓ HTTP Response
┌──────────────────┐
│  HTTP Client     │  Receives SearchResults JSON
└──────────────────┘

Configuration

Basic Configuration
{
  "components": {
    "api-gateway": {
      "type": "gateway",
      "name": "http",
      "enabled": true,
      "config": {
        "routes": [
          {
            "path": "/search/semantic",
            "method": "POST",
            "nats_subject": "graph.query.semantic",
            "timeout": "5s",
            "description": "Semantic similarity search"
          }
        ]
      }
    }
  }
}
Full Configuration
{
  "components": {
    "api-gateway": {
      "type": "gateway",
      "name": "http",
      "enabled": true,
      "config": {
        "enable_cors": true,
        "cors_origins": ["http://localhost:3000", "https://app.example.com"],
        "max_request_size": 1048576,
        "routes": [
          {
            "path": "/search/semantic",
            "method": "POST",
            "nats_subject": "graph.query.semantic",
            "timeout": "5s",
            "description": "Semantic similarity search across indexed entities"
          },
          {
            "path": "/entity/:id",
            "method": "GET",
            "nats_subject": "graph.query.entity",
            "timeout": "2s",
            "description": "Retrieve single entity by ID"
          }
        ]
      }
    }
  }
}

Configuration Schema

Gateway Config
Field Type Required Default Description
routes RouteMapping[] Yes - HTTP path to NATS subject mappings
enable_cors bool No false Enable CORS headers (requires explicit cors_origins)
cors_origins string[] No [] Allowed CORS origins
max_request_size int No 1048576 Max request body size (bytes)
Route Mapping
Field Type Required Default Description
path string Yes - HTTP route path (supports :param)
method string Yes - HTTP method (GET, POST, PUT, DELETE, PATCH)
nats_subject string Yes - NATS request/reply subject
timeout duration No 5s Request timeout (100ms-30s)
description string No - Route description (for OpenAPI docs)

Route Registration

Routes are automatically registered at startup:

Component Instance Name: "api-gateway"
URL Prefix: "/api-gateway/"

Route: "/search/semantic"
Full Path: "/api-gateway/search/semantic"

ServiceManager discovers gateway components via interface:

type Gateway interface {
    component.Discoverable
    RegisterHTTPHandlers(prefix string, mux *http.ServeMux)
}

Mutation Control

HTTP methods (GET, POST, PUT, DELETE) don't directly map to mutation semantics. For example, POST is commonly used for complex queries like semantic search.

Design Principle: Mutation control should be enforced at the NATS subject/component level, not at the HTTP gateway layer. The gateway is protocol translation only.

CORS

CORS is disabled by default and requires explicit configuration for security:

Allow All Origins (Development)
{
  "enable_cors": true,
  "cors_origins": ["*"]
}
Restrict Origins (Production)
{
  "enable_cors": true,
  "cors_origins": [
    "https://app.example.com",
    "https://dashboard.example.com"
  ]
}
CORS Headers
Access-Control-Allow-Origin: https://app.example.com
Access-Control-Allow-Methods: GET, POST, PUT, DELETE, OPTIONS
Access-Control-Allow-Headers: Content-Type, Authorization
Access-Control-Max-Age: 3600

Error Handling

HTTP status codes are mapped from SemStreams error types:

Error Type HTTP Status Example
Invalid 400 Bad Request Malformed JSON, missing fields
Transient (timeout) 504 Gateway Timeout NATS request timeout
Transient (other) 503 Service Unavailable NATS connection down
Fatal 500 Internal Server Error Unexpected errors
Not Found 404 Not Found Entity doesn't exist
Unauthorized 403 Forbidden Permission denied

Error Response Format:

{
  "error": "entity not found",
  "status": 404
}

Request Size Limits

Prevent DoS attacks with request size limits:

{
  "max_request_size": 1048576  // 1MB (default)
}

Range: 0 to 100MB (104857600 bytes)

Requests exceeding the limit are truncated to max_request_size.

Timeouts

Per-route timeout configuration:

{
  "routes": [
    {
      "path": "/search/semantic",
      "timeout": "5s"   // Quick queries
    },
    {
      "path": "/entity/:id/path",
      "timeout": "30s"  // Complex graph traversal
    }
  ]
}

Range: 100ms to 30s

If NATS doesn't reply within timeout, returns 504 Gateway Timeout.

Metrics

Prometheus metrics exported at :9090/metrics:

Gateway Metrics
# Request totals
gateway_requests_total{component="api-gateway", route="/search/semantic"}

# Failed requests
gateway_requests_failed_total{component="api-gateway", route="/search/semantic"}

# Request latency
gateway_request_duration_seconds{component="api-gateway", route="/search/semantic"}
Component Health
# Gateway health
component_healthy{component="api-gateway", type="gateway"}

# Error count
component_errors_total{component="api-gateway"}

Usage Examples

curl -X POST http://localhost:8080/api-gateway/search/semantic \
  -H "Content-Type: application/json" \
  -d '{
    "query": "emergency alert system",
    "threshold": 0.3,
    "limit": 10
  }'

Response:

{
  "data": {
    "query": "emergency alert system",
    "threshold": 0.3,
    "hits": [
      {
        "entity_id": "alert-001",
        "score": 0.87,
        "entity_type": "alert",
        "properties": {
          "title": "Emergency Alert Test",
          "content": "Testing the emergency alert system"
        }
      }
    ]
  }
}
Entity Lookup
curl http://localhost:8080/api-gateway/entity/device-123
TypeScript Client
class SemStreamsClient {
  constructor(private baseURL = 'http://localhost:8080/api-gateway') {}

  async semanticSearch(query: string, limit = 10, threshold = 0.3) {
    const res = await fetch(`${this.baseURL}/search/semantic`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ query, limit, threshold })
    });

    if (!res.ok) {
      const error = await res.json();
      throw new Error(error.error);
    }

    return await res.json();
  }
}

const client = new SemStreamsClient();
const results = await client.semanticSearch('drone battery');

OpenAPI Integration

The HTTP gateway uses config-driven dynamic routes that are defined at runtime via YAML/JSON configuration. Because routes vary by deployment, they are not included in the static OpenAPI specification.

For gateways with well-defined endpoints (like graph-gateway), see their OpenAPI contributions in the generated spec.

Access OpenAPI:

  • JSON Spec: http://localhost:8080/openapi.json
  • Swagger UI: http://localhost:8080/docs

Note: The routes you configure for this gateway won't appear in the OpenAPI spec. Document your deployment-specific routes separately if needed.

Security Considerations

Production Deployment
  1. TLS: Deploy behind reverse proxy with TLS
server {
    listen 443 ssl;
    server_name api.example.com;

    ssl_certificate /path/to/cert.pem;
    ssl_certificate_key /path/to/key.pem;

    location / {
        proxy_pass http://localhost:8080;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
    }
}
  1. Authentication: Add auth middleware in reverse proxy
location /api-gateway/ {
    auth_request /auth;
    proxy_pass http://localhost:8080;
}
  1. Rate Limiting: Prevent abuse
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;

location /api-gateway/ {
    limit_req zone=api burst=20;
    proxy_pass http://localhost:8080;
}
  1. CORS: Restrict origins
{
  "cors_origins": ["https://app.example.com"]
}

Troubleshooting

"NATS connection not available" (503)

Cause: NATS server down or unreachable

Fix:

# Check NATS status
docker ps | grep nats

# Restart NATS
task integration:start
"timeout" errors (504)

Cause: Query exceeds route timeout

Fix: Increase timeout:

{
  "routes": [{
    "timeout": "30s"  // Increased from 5s
  }]
}
CORS errors in browser

Cause: Origin not in cors_origins list

Fix:

{
  "cors_origins": ["http://localhost:3000"]
}
Method not allowed (405)

Cause: Wrong HTTP method for route

Fix: Use correct method from route config (GET vs POST)

Documentation

Overview

Package http provides an HTTP gateway implementation for bridging REST APIs to NATS services.

Overview

The http package implements a Gateway component that maps HTTP routes to NATS request/reply subjects. It translates HTTP requests into NATS messages and forwards responses back to HTTP clients, enabling REST API access to internal NATS services.

Key features:

  • Route mapping from HTTP paths to NATS subjects
  • Configurable request timeouts per route
  • CORS support with configurable origins
  • Request size limits
  • Distributed tracing via X-Request-ID headers
  • Error sanitization (prevents internal details leaking to clients)

Architecture

┌────────────────────────────────────────────────────────────────────────┐
│                         HTTP Gateway                                   │
├────────────────────────────────────────────────────────────────────────┤
│  HTTP Mux  →  Route Handler  →  NATS Request  →  Service              │
│                                  ↓                  ↓                  │
│             ←  JSON Response ←  NATS Reply   ←  Response              │
└────────────────────────────────────────────────────────────────────────┘

Usage

Register the gateway and configure routes:

err := http.Register(registry)

Or configure in a flow definition:

{
    "type": "gateway",
    "name": "http",
    "config": {
        "routes": [
            {
                "path": "/api/v1/entities/{id}",
                "method": "GET",
                "nats_subject": "graph.query.entity",
                "timeout": "5s"
            },
            {
                "path": "/api/v1/search",
                "method": "POST",
                "nats_subject": "graph.query.globalSearch",
                "timeout": "30s"
            }
        ],
        "enable_cors": true,
        "cors_origins": ["https://example.com"],
        "max_request_size": 1048576
    }
}

Register handlers with an HTTP server:

gateway.RegisterHTTPHandlers("/api", mux)

Route Configuration

Each route mapping specifies:

  • path: HTTP path pattern
  • method: HTTP method (GET, POST, PUT, DELETE)
  • nats_subject: Target NATS subject for request/reply
  • timeout: Request timeout (default: from gateway config)

Request Processing

Request flow:

  1. Extract or generate X-Request-ID for distributed tracing
  2. Validate HTTP method against route configuration
  3. Apply CORS headers if enabled
  4. Read and validate request body (size limit enforced)
  5. Forward to NATS subject with timeout
  6. Return response with Content-Type: application/json

Error Handling

Errors are mapped to appropriate HTTP status codes:

  • Invalid errors → 400 Bad Request
  • Transient errors → 503 Service Unavailable or 504 Gateway Timeout
  • Fatal errors → 500 Internal Server Error
  • Pattern matching → 404 Not Found, 403 Forbidden

Error messages are sanitized to prevent information disclosure:

  • Internal details logged but not returned to clients
  • NATS subjects never exposed
  • Generic messages returned (e.g., "service temporarily unavailable")

Configuration

Gateway configuration options:

Routes:          []RouteMapping  # Route definitions
EnableCORS:      true            # Enable CORS headers
CORSOrigins:     ["*"]           # Allowed origins (use specific domains in production)
MaxRequestSize:  1048576         # Max request body size (1MB default)

Distributed Tracing

The gateway supports distributed tracing via request IDs:

  • Incoming X-Request-ID header is preserved if present
  • New request ID generated if not provided (crypto/rand based)
  • Request ID returned in response X-Request-ID header
  • Request ID can be propagated to downstream NATS services

Thread Safety

The Gateway is safe for concurrent use. Metrics use atomic operations, and state is protected by RWMutex where needed.

Metrics

The gateway tracks:

  • requestsTotal: Total HTTP requests received
  • requestsSuccess: Successful requests
  • requestsFailed: Failed requests
  • bytesReceived: Total bytes in request bodies
  • bytesSent: Total bytes in responses

DataFlow() returns calculated rates from these metrics.

See Also

Related packages:

Package http provides HTTP gateway implementation for SemStreams.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewGateway

func NewGateway(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

NewGateway creates a new HTTP gateway from configuration

func Register

func Register(registry *component.Registry) error

Register registers the HTTP gateway with the component registry

Types

type Gateway

type Gateway struct {
	// contains filtered or unexported fields
}

Gateway implements the Gateway interface for HTTP protocol

func (*Gateway) ConfigSchema

func (g *Gateway) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema

func (*Gateway) DataFlow

func (g *Gateway) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics

func (*Gateway) Health

func (g *Gateway) Health() component.HealthStatus

Health returns the current health status

func (*Gateway) Initialize

func (g *Gateway) Initialize() error

Initialize prepares the HTTP gateway

func (*Gateway) InputPorts

func (g *Gateway) InputPorts() []component.Port

InputPorts returns no input ports (gateway is request-driven)

func (*Gateway) Meta

func (g *Gateway) Meta() component.Metadata

Meta returns component metadata

func (*Gateway) OutputPorts

func (g *Gateway) OutputPorts() []component.Port

OutputPorts returns no output ports (gateway uses request/reply)

func (*Gateway) RegisterHTTPHandlers

func (g *Gateway) RegisterHTTPHandlers(prefix string, mux *http.ServeMux)

RegisterHTTPHandlers registers gateway routes with the HTTP mux

func (*Gateway) Start

func (g *Gateway) Start(ctx context.Context) error

Start begins the HTTP gateway operation

func (*Gateway) Stop

func (g *Gateway) Stop(_ time.Duration) error

Stop gracefully stops the HTTP gateway

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL