http

package
v0.18.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

README

HTTP Adapter

The HTTP Adapter exposes HTTP endpoints for publishing messages and WebSocket capabilities for publishing and subscribing to messages from SuperMQ channels. It authenticates clients via tokens or Basic auth, resolves domains/channels over gRPC, and forwards payloads to the message broker.

For more on SuperMQ, see the official documentation.

Configuration

Environment variables (unset values fall back to defaults):

Variable Description Default
SMQ_HTTP_ADAPTER_LOG_LEVEL Log level (debug, info, warn, error) debug
SMQ_HTTP_ADAPTER_HOST HTTP Adapter host http-adapter
SMQ_HTTP_ADAPTER_PORT HTTP Adapter port 8008
SMQ_HTTP_ADAPTER_SERVER_CERT Path to PEM-encoded server certificate (enables TLS) ""
SMQ_HTTP_ADAPTER_SERVER_KEY Path to PEM-encoded server key ""
SMQ_HTTP_ADAPTER_SERVER_CA_CERTS Trusted CA bundle for HTTPS server ""
SMQ_HTTP_ADAPTER_CLIENT_CA_CERTS Client CA bundle to require mTLS on HTTPS server ""
SMQ_HTTP_ADAPTER_CACHE_NUM_COUNTERS Cache counters for topic parsing 200000
SMQ_HTTP_ADAPTER_CACHE_MAX_COST Maximum cache size (bytes) 1048576
SMQ_HTTP_ADAPTER_CACHE_BUFFER_ITEMS Cache buffer items 64
SMQ_MESSAGE_BROKER_URL Message broker URL (publishing target) nats://nats:4222
SMQ_ES_URL Event store URL (publishing middleware) nats://nats:4222
SMQ_JAEGER_URL Jaeger tracing endpoint http://jaeger:4318/v1/traces
SMQ_JAEGER_TRACE_RATIO Trace sampling ratio 1.0
SMQ_SEND_TELEMETRY Send telemetry to SuperMQ call-home server true
SMQ_HTTP_ADAPTER_INSTANCE_ID Service instance ID (auto-generated when empty) ""
SMQ_CLIENTS_GRPC_URL Clients service gRPC URL clients:7006
SMQ_CLIENTS_GRPC_TIMEOUT Clients gRPC request timeout 300s
SMQ_CLIENTS_GRPC_CLIENT_CERT Clients gRPC client certificate ""
SMQ_CLIENTS_GRPC_CLIENT_KEY Clients gRPC client key ""
SMQ_CLIENTS_GRPC_SERVER_CA_CERTS Clients gRPC trusted CA bundle ""
SMQ_CHANNELS_GRPC_URL Channels service gRPC URL channels:7005
SMQ_CHANNELS_GRPC_TIMEOUT Channels gRPC request timeout 300s
SMQ_CHANNELS_GRPC_CLIENT_CERT Channels gRPC client certificate ""
SMQ_CHANNELS_GRPC_CLIENT_KEY Channels gRPC client key ""
SMQ_CHANNELS_GRPC_SERVER_CA_CERTS Channels gRPC trusted CA bundle ""
SMQ_DOMAINS_GRPC_URL Domains service gRPC URL domains:7003
SMQ_DOMAINS_GRPC_TIMEOUT Domains gRPC request timeout 300s
SMQ_DOMAINS_GRPC_CLIENT_CERT Domains gRPC client certificate ""
SMQ_DOMAINS_GRPC_CLIENT_KEY Domains gRPC client key ""
SMQ_DOMAINS_GRPC_SERVER_CA_CERTS Domains gRPC trusted CA bundle ""
SMQ_AUTH_GRPC_URL Auth service gRPC URL auth:7001
SMQ_AUTH_GRPC_TIMEOUT Auth service gRPC request timeout 300s
SMQ_AUTH_GRPC_CLIENT_CERT Auth gRPC client certificate ""
SMQ_AUTH_GRPC_CLIENT_KEY Auth gRPC client key ""
SMQ_AUTH_GRPC_SERVER_CA_CERTS Auth gRPC trusted CA bundle ""

Deployment

The adapter is shipped as a Docker container. See the http-adapter section of docker-compose.yaml for deployment details.

To build and run locally:

# download the latest version of the service
git clone https://github.com/absmach/supermq
cd supermq

# compile the http adapter
make http

# copy binary to $GOBIN
make install

# set the environment variables and run the service
SMQ_HTTP_ADAPTER_LOG_LEVEL=debug \
SMQ_HTTP_ADAPTER_HOST=http-adapter \
SMQ_HTTP_ADAPTER_PORT=8008 \
SMQ_HTTP_ADAPTER_SERVER_CERT="" \
SMQ_HTTP_ADAPTER_SERVER_KEY="" \
SMQ_HTTP_ADAPTER_CACHE_NUM_COUNTERS=200000 \
SMQ_HTTP_ADAPTER_CACHE_MAX_COST=1048576 \
SMQ_HTTP_ADAPTER_CACHE_BUFFER_ITEMS=64 \
SMQ_MESSAGE_BROKER_URL=nats://nats:4222 \
SMQ_ES_URL=nats://nats:4222 \
SMQ_JAEGER_URL=<http://jaeger:4318/v1/traces> \
SMQ_JAEGER_TRACE_RATIO=1.0 \
SMQ_CLIENTS_GRPC_URL=clients:7006 \
SMQ_CLIENTS_GRPC_TIMEOUT=300s \
SMQ_CLIENTS_GRPC_CLIENT_CERT="" \
SMQ_CLIENTS_GRPC_CLIENT_KEY="" \
SMQ_CLIENTS_GRPC_SERVER_CA_CERTS="" \
SMQ_CHANNELS_GRPC_URL=channels:7005 \
SMQ_CHANNELS_GRPC_TIMEOUT=300s \
SMQ_CHANNELS_GRPC_CLIENT_CERT="" \
SMQ_CHANNELS_GRPC_CLIENT_KEY="" \
SMQ_CHANNELS_GRPC_SERVER_CA_CERTS="" \
SMQ_DOMAINS_GRPC_URL=domains:7003 \
SMQ_DOMAINS_GRPC_TIMEOUT=300s \
SMQ_DOMAINS_GRPC_CLIENT_CERT="" \
SMQ_DOMAINS_GRPC_CLIENT_KEY="" \
SMQ_DOMAINS_GRPC_SERVER_CA_CERTS="" \
SMQ_AUTH_GRPC_URL=auth:7001 \
SMQ_AUTH_GRPC_TIMEOUT=300s \
SMQ_AUTH_GRPC_CLIENT_CERT="" \
SMQ_AUTH_GRPC_CLIENT_KEY="" \
SMQ_AUTH_GRPC_SERVER_CA_CERTS="" \
SMQ_SEND_TELEMETRY=true \
SMQ_HTTP_ADAPTER_INSTANCE_ID="" \
$GOBIN/supermq-http

TLS is enabled by setting SMQ_HTTP_ADAPTER_SERVER_CERT and SMQ_HTTP_ADAPTER_SERVER_KEY. mTLS is enabled when SMQ_HTTP_ADAPTER_CLIENT_CA_CERTS is provided. gRPC client TLS/mTLS is enabled by setting the corresponding client cert/key/CA variables.

Usage

Endpoints:

  • POST /m/{domain}/c/{channel} (and wildcard /m/{domain}/c/{channel}/*): publish a message.
  • POST /hc/{domain}: health-check message path (authenticated).
  • GET /health: service health probe.
  • GET /metrics: Prometheus metrics.

Authentication:

  • Bearer token in Authorization header, or
  • Basic auth where the password is the token (username ignored).

Supported content types: application/json, application/senml+json, application/senml+cbor.

Example publish:

curl -X POST http://localhost:8008/m/<domainID>/c/<channelID>/sub/topic \
  -H "Authorization: Bearer <client_token>" \
  -H "Content-Type: application/json" \
  -d '{ "temp": 22.5, "unit": "C" }'

Implementation Details

  • Publishes to the configured message broker (SMQ_MESSAGE_BROKER_URL) with optional event-store middleware (SMQ_ES_URL).
  • Resolves domains and channels over gRPC to validate/route topics; authenticates via Auth gRPC; validates client identity via Clients gRPC.
  • Topic parsing is cached (Ristretto) with configurable counters/cost/buffers to reduce resolver calls.
  • Observability: Jaeger tracing, Prometheus metrics at /metrics, service health at /health.
  • Optional call-home telemetry is enabled by default.

Best Practices

  • Use domain/channel routes consistently in publish paths; include subtopics to segment data.
  • Keep cache defaults unless load patterns require tuning; monitor /metrics for cache hit ratios.
  • Enable TLS/mTLS for production deployments (HTTP server and gRPC clients).
  • Reuse a single broker URL across services (often NATS) to simplify operations.

Versioning and Health Check

The adapter exposes /health with status and build metadata.

curl -X GET http://localhost:8008/health \
  -H "accept: application/health+json"

Example response:

{
  "status": "pass",
  "version": "0.18.0",
  "commit": "7d6f4dc4f7f0c1fa3dc24eddfb18bb5073ff4f62",
  "description": "http adapter",
  "build_time": "1970-01-01_00:00:00"
}

For endpoint details, see the HTTP Adapter API documentation.

Documentation

Overview

Package http contains the domain concept definitions needed to support SuperMQ HTTP Adapter functionality.

Index

Constants

View Source
const (
	LogInfoSubscribed   = "subscribed with client_id %s to topics %s"
	LogInfoConnected    = "connected with client_id %s"
	LogInfoDisconnected = "disconnected client_id %s and username %s"
	LogInfoPublished    = "published with client_id %s to the topic %s"
)

Log message formats.

Variables

View Source
var (
	// ErrFailedSubscription indicates that client couldn't subscribe to specified channel.
	ErrFailedSubscription = errors.New("failed to subscribe to a channel")
	// ErrFailedPublish indicates that client couldn't publish to specified channel.
	ErrFailedSubscribe = errors.New("failed to unsubscribe from topic")
	// ErrEmptyTopic indicate absence of clientKey in the request.
	ErrEmptyTopic = errors.New("empty topic")
)

Functions

func NewHandler

NewHandler creates new Handler entity.

Types

type Client added in v0.18.4

type Client struct {
	// contains filtered or unexported fields
}

Client handles messaging and websocket connection.

func NewClient added in v0.18.4

func NewClient(logger *slog.Logger, conn *websocket.Conn, sessionID string) *Client

NewClient returns a new websocket client.

func (*Client) Cancel added in v0.18.4

func (c *Client) Cancel() error

Cancel handles the websocket connection after unsubscribing.

func (*Client) Close added in v0.18.4

func (c *Client) Close() error

Close handles the websocket connection after unsubscribing.

func (*Client) Handle added in v0.18.4

func (c *Client) Handle(msg *messaging.Message) error

Handle handles the sending and receiving of messages via the broker.

func (*Client) SetCloseHandler added in v0.18.4

func (c *Client) SetCloseHandler(handler func(code int, text string) error)

SetCloseHandler sets a close handler for the WebSocket connection.

func (*Client) Start added in v0.18.4

func (c *Client) Start(ctx context.Context)

type Service added in v0.18.4

type Service interface {
	// Subscribe subscribes message from the broker using the clientKey for authorization,
	// the channelID for subscription and domainID specifies the domain for authorization.
	// Subtopic is optional.
	// If the subscription is successful, nil is returned otherwise error is returned.
	Subscribe(ctx context.Context, sessionID, username, password, domainID, chanID, subtopic string, topicType messaging.TopicType, client *Client) error

	Unsubscribe(ctx context.Context, sessionID, domainID, chanID, subtopic string, topicType messaging.TopicType) error
}

Service specifies web socket service API.

func NewService added in v0.18.4

NewService instantiates the HTTP adapter implementation.

Directories

Path Synopsis
Package api contains API-related concerns: endpoint definitions, middlewares and all resource representations.
Package api contains API-related concerns: endpoint definitions, middlewares and all resource representations.
Package middleware provides logging, metrics and tracing middleware for SuperMQ HTTP service.
Package middleware provides logging, metrics and tracing middleware for SuperMQ HTTP service.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL