mqttspec

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2025 License: MIT Imports: 20 Imported by: 0

README

MQTTSpec - MQTT-based Database Query Framework

MQTTSpec is an MQTT-based database query framework that enables real-time database operations and subscriptions via MQTT protocol. It mirrors the functionality of WebSocketSpec but uses MQTT as the transport layer, making it ideal for IoT applications, mobile apps with unreliable networks, and distributed systems requiring QoS guarantees.

Features

  • Dual Broker Support: Embedded broker (Mochi MQTT) or external broker connection (Paho MQTT)
  • QoS 1 (At-least-once delivery): Reliable message delivery for all operations
  • Full CRUD Operations: Create, Read, Update, Delete with hooks
  • Real-time Subscriptions: Subscribe to entity changes with filtering
  • Database Agnostic: GORM and Bun ORM support
  • Lifecycle Hooks: 12 hooks for authentication, authorization, validation, and auditing
  • Multi-tenancy Support: Built-in tenant isolation via hooks
  • Thread-safe: Proper concurrency handling throughout

Installation

go get github.com/bitechdev/ResolveSpec/pkg/mqttspec

Quick Start

Embedded Broker (Default)
package main

import (
    "github.com/bitechdev/ResolveSpec/pkg/mqttspec"
    "gorm.io/driver/postgres"
    "gorm.io/gorm"
)

type User struct {
    ID     uint   `json:"id" gorm:"primaryKey"`
    Name   string `json:"name"`
    Email  string `json:"email"`
    Status string `json:"status"`
}

func main() {
    // Connect to database
    db, _ := gorm.Open(postgres.Open("postgres://..."), &gorm.Config{})
    db.AutoMigrate(&User{})

    // Create MQTT handler with embedded broker
    handler, err := mqttspec.NewHandlerWithGORM(db)
    if err != nil {
        panic(err)
    }

    // Register models
    handler.Registry().RegisterModel("public.users", &User{})

    // Start handler (starts embedded broker on localhost:1883)
    if err := handler.Start(); err != nil {
        panic(err)
    }

    // Handler is now listening for MQTT messages
    select {} // Keep running
}
External Broker
handler, err := mqttspec.NewHandlerWithGORM(db,
    mqttspec.WithExternalBroker(mqttspec.ExternalBrokerConfig{
        BrokerURL:      "tcp://mqtt.example.com:1883",
        ClientID:       "mqttspec-server",
        Username:       "admin",
        Password:       "secret",
        ConnectTimeout: 10 * time.Second,
    }),
)
Custom Port (Embedded Broker)
handler, err := mqttspec.NewHandlerWithGORM(db,
    mqttspec.WithEmbeddedBroker(mqttspec.BrokerConfig{
        Host: "0.0.0.0",
        Port: 1884,
    }),
)

Topic Structure

MQTTSpec uses a client-based topic hierarchy:

spec/{client_id}/request         # Client publishes requests
spec/{client_id}/response        # Server publishes responses
spec/{client_id}/notify/{sub_id} # Server publishes notifications
Wildcard Subscriptions
  • Server: spec/+/request (receives all client requests)
  • Client: spec/{client_id}/response + spec/{client_id}/notify/+

Message Protocol

MQTTSpec uses the same JSON message structure as WebSocketSpec and ResolveSpec for consistency.

Request Message
{
  "id": "msg-123",
  "type": "request",
  "operation": "read",
  "schema": "public",
  "entity": "users",
  "options": {
    "filters": [
      {"column": "status", "operator": "eq", "value": "active"}
    ],
    "sort": [{"column": "created_at", "direction": "desc"}],
    "limit": 10
  }
}
Response Message
{
  "id": "msg-123",
  "type": "response",
  "success": true,
  "data": [
    {"id": 1, "name": "John Doe", "email": "john@example.com", "status": "active"},
    {"id": 2, "name": "Jane Smith", "email": "jane@example.com", "status": "active"}
  ],
  "metadata": {
    "total": 50,
    "count": 2
  }
}
Notification Message
{
  "type": "notification",
  "operation": "create",
  "subscription_id": "sub-xyz",
  "schema": "public",
  "entity": "users",
  "data": {
    "id": 3,
    "name": "New User",
    "email": "new@example.com",
    "status": "active"
  }
}

CRUD Operations

Read (Single Record)

MQTT Client Publishes to: spec/{client_id}/request

{
  "id": "msg-1",
  "type": "request",
  "operation": "read",
  "schema": "public",
  "entity": "users",
  "data": {"id": 1}
}

Server Publishes Response to: spec/{client_id}/response

{
  "id": "msg-1",
  "success": true,
  "data": {"id": 1, "name": "John Doe", "email": "john@example.com"}
}
Read (Multiple Records with Filtering)
{
  "id": "msg-2",
  "type": "request",
  "operation": "read",
  "schema": "public",
  "entity": "users",
  "options": {
    "filters": [
      {"column": "status", "operator": "eq", "value": "active"}
    ],
    "sort": [{"column": "name", "direction": "asc"}],
    "limit": 20,
    "offset": 0
  }
}
Create
{
  "id": "msg-3",
  "type": "request",
  "operation": "create",
  "schema": "public",
  "entity": "users",
  "data": {
    "name": "Alice Brown",
    "email": "alice@example.com",
    "status": "active"
  }
}
Update
{
  "id": "msg-4",
  "type": "request",
  "operation": "update",
  "schema": "public",
  "entity": "users",
  "data": {
    "id": 1,
    "status": "inactive"
  }
}
Delete
{
  "id": "msg-5",
  "type": "request",
  "operation": "delete",
  "schema": "public",
  "entity": "users",
  "data": {"id": 1}
}

Real-time Subscriptions

Subscribe to Entity Changes

Client Publishes to: spec/{client_id}/request

{
  "id": "msg-6",
  "type": "subscription",
  "operation": "subscribe",
  "schema": "public",
  "entity": "users",
  "options": {
    "filters": [
      {"column": "status", "operator": "eq", "value": "active"}
    ]
  }
}

Server Response (published to spec/{client_id}/response):

{
  "id": "msg-6",
  "success": true,
  "data": {
    "subscription_id": "sub-abc123",
    "notify_topic": "spec/{client_id}/notify/sub-abc123"
  }
}

Client Then Subscribes to MQTT topic: spec/{client_id}/notify/sub-abc123

Receiving Notifications

When any client creates/updates/deletes a user matching the subscription filters, the subscriber receives:

{
  "type": "notification",
  "operation": "create",
  "subscription_id": "sub-abc123",
  "schema": "public",
  "entity": "users",
  "data": {
    "id": 10,
    "name": "New User",
    "email": "newuser@example.com",
    "status": "active"
  }
}
Unsubscribe
{
  "id": "msg-7",
  "type": "subscription",
  "operation": "unsubscribe",
  "data": {
    "subscription_id": "sub-abc123"
  }
}

Lifecycle Hooks

MQTTSpec provides 12 lifecycle hooks for implementing cross-cutting concerns:

Hook Types
  • BeforeConnect / AfterConnect - Connection lifecycle
  • BeforeDisconnect / AfterDisconnect - Disconnection lifecycle
  • BeforeRead / AfterRead - Read operations
  • BeforeCreate / AfterCreate - Create operations
  • BeforeUpdate / AfterUpdate - Update operations
  • BeforeDelete / AfterDelete - Delete operations
  • BeforeSubscribe / AfterSubscribe - Subscription creation
  • BeforeUnsubscribe / AfterUnsubscribe - Subscription removal
Authentication Example (JWT)
handler.Hooks().Register(mqttspec.BeforeConnect, func(ctx *mqttspec.HookContext) error {
    client := ctx.Metadata["mqtt_client"].(*mqttspec.Client)

    // MQTT username contains JWT token
    token := client.Username
    claims, err := jwt.Validate(token)
    if err != nil {
        return fmt.Errorf("invalid token: %w", err)
    }

    // Store user info in client metadata for later use
    client.SetMetadata("user_id", claims.UserID)
    client.SetMetadata("tenant_id", claims.TenantID)
    client.SetMetadata("roles", claims.Roles)

    logger.Info("Client authenticated: user_id=%d, tenant=%s", claims.UserID, claims.TenantID)
    return nil
})
Multi-tenancy Example
// Auto-inject tenant filter for all read operations
handler.Hooks().Register(mqttspec.BeforeRead, func(ctx *mqttspec.HookContext) error {
    client := ctx.Metadata["mqtt_client"].(*mqttspec.Client)
    tenantID, _ := client.GetMetadata("tenant_id")

    // Add tenant filter to ensure users only see their own data
    ctx.Options.Filters = append(ctx.Options.Filters, common.FilterOption{
        Column:   "tenant_id",
        Operator: "eq",
        Value:    tenantID,
    })

    return nil
})

// Auto-set tenant_id for all create operations
handler.Hooks().Register(mqttspec.BeforeCreate, func(ctx *mqttspec.HookContext) error {
    client := ctx.Metadata["mqtt_client"].(*mqttspec.Client)
    tenantID, _ := client.GetMetadata("tenant_id")

    // Inject tenant_id into new records
    if dataMap, ok := ctx.Data.(map[string]interface{}); ok {
        dataMap["tenant_id"] = tenantID
    }

    return nil
})
Role-based Access Control (RBAC)
handler.Hooks().Register(mqttspec.BeforeDelete, func(ctx *mqttspec.HookContext) error {
    client := ctx.Metadata["mqtt_client"].(*mqttspec.Client)
    roles, _ := client.GetMetadata("roles")

    roleList := roles.([]string)
    hasAdminRole := false
    for _, role := range roleList {
        if role == "admin" {
            hasAdminRole = true
            break
        }
    }

    if !hasAdminRole {
        return fmt.Errorf("permission denied: delete requires admin role")
    }

    return nil
})
Audit Logging Example
handler.Hooks().Register(mqttspec.AfterCreate, func(ctx *mqttspec.HookContext) error {
    client := ctx.Metadata["mqtt_client"].(*mqttspec.Client)
    userID, _ := client.GetMetadata("user_id")

    logger.Info("Audit: user %d created %s.%s record: %+v",
        userID, ctx.Schema, ctx.Entity, ctx.Result)

    // Could also write to audit log table
    return nil
})

Client Examples

JavaScript (MQTT.js)
const mqtt = require('mqtt');

// Connect to MQTT broker
const client = mqtt.connect('mqtt://localhost:1883', {
  clientId: 'client-abc123',
  username: 'your-jwt-token',
  password: '',  // JWT in username, password can be empty
});

client.on('connect', () => {
  console.log('Connected to MQTT broker');

  // Subscribe to responses
  client.subscribe('spec/client-abc123/response');

  // Read users
  const readMsg = {
    id: 'msg-1',
    type: 'request',
    operation: 'read',
    schema: 'public',
    entity: 'users',
    options: {
      filters: [
        { column: 'status', operator: 'eq', value: 'active' }
      ]
    }
  };

  client.publish('spec/client-abc123/request', JSON.stringify(readMsg));
});

client.on('message', (topic, payload) => {
  const message = JSON.parse(payload.toString());
  console.log('Received:', message);

  if (message.type === 'response') {
    console.log('Response data:', message.data);
  } else if (message.type === 'notification') {
    console.log('Notification:', message.operation, message.data);
  }
});
Python (paho-mqtt)
import paho.mqtt.client as mqtt
import json

client_id = 'client-python-123'

def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")

    # Subscribe to responses
    client.subscribe(f"spec/{client_id}/response")

    # Create a user
    create_msg = {
        'id': 'msg-create-1',
        'type': 'request',
        'operation': 'create',
        'schema': 'public',
        'entity': 'users',
        'data': {
            'name': 'Python User',
            'email': 'python@example.com',
            'status': 'active'
        }
    }

    client.publish(f"spec/{client_id}/request", json.dumps(create_msg))

def on_message(client, userdata, msg):
    message = json.loads(msg.payload.decode())
    print(f"Received on {msg.topic}: {message}")

client = mqtt.Client(client_id=client_id)
client.username_pw_set('your-jwt-token', '')
client.on_connect = on_connect
client.on_message = on_message

client.connect('localhost', 1883, 60)
client.loop_forever()
Go (paho.mqtt.golang)
package main

import (
    "encoding/json"
    "fmt"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
)

func main() {
    clientID := "client-go-123"

    opts := mqtt.NewClientOptions()
    opts.AddBroker("tcp://localhost:1883")
    opts.SetClientID(clientID)
    opts.SetUsername("your-jwt-token")
    opts.SetPassword("")

    opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
        var message map[string]interface{}
        json.Unmarshal(msg.Payload(), &message)
        fmt.Printf("Received on %s: %+v\n", msg.Topic(), message)
    })

    opts.OnConnect = func(client mqtt.Client) {
        fmt.Println("Connected to MQTT broker")

        // Subscribe to responses
        client.Subscribe(fmt.Sprintf("spec/%s/response", clientID), 1, nil)

        // Read users
        readMsg := map[string]interface{}{
            "id":        "msg-1",
            "type":      "request",
            "operation": "read",
            "schema":    "public",
            "entity":    "users",
            "options": map[string]interface{}{
                "filters": []map[string]interface{}{
                    {"column": "status", "operator": "eq", "value": "active"},
                },
            },
        }

        payload, _ := json.Marshal(readMsg)
        client.Publish(fmt.Sprintf("spec/%s/request", clientID), 1, false, payload)
    }

    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    // Keep running
    select {}
}

Configuration Options

BrokerConfig (Embedded Broker)
type BrokerConfig struct {
    Host            string        // Default: "localhost"
    Port            int           // Default: 1883
    EnableWebSocket bool          // Enable WebSocket listener
    WSPort          int           // WebSocket port (default: 1884)
    MaxConnections  int           // Max concurrent connections
    KeepAlive       time.Duration // MQTT keep-alive interval
    EnableAuth      bool          // Enable authentication
}
ExternalBrokerConfig
type ExternalBrokerConfig struct {
    BrokerURL      string         // MQTT broker URL (tcp://host:port)
    ClientID       string         // MQTT client ID
    Username       string         // MQTT username
    Password       string         // MQTT password
    CleanSession   bool           // Clean session flag
    KeepAlive      time.Duration  // Keep-alive interval
    ConnectTimeout time.Duration  // Connection timeout
    ReconnectDelay time.Duration  // Auto-reconnect delay
    MaxReconnect   int            // Max reconnect attempts
    TLSConfig      *tls.Config    // TLS configuration
}
QoS Configuration
handler, err := mqttspec.NewHandlerWithGORM(db,
    mqttspec.WithQoS(1, 1, 1), // Request, Response, Notification
)
Topic Prefix
handler, err := mqttspec.NewHandlerWithGORM(db,
    mqttspec.WithTopicPrefix("myapp"), // Changes topics to myapp/{client_id}/...
)

Documentation References

  • ResolveSpec JSON Protocol: See /pkg/resolvespec/README.md for the full message protocol specification
  • WebSocketSpec Documentation: See /pkg/websocketspec/README.md for similar WebSocket-based implementation
  • Common Interfaces: See /pkg/common/types.go for database adapter interfaces and query options
  • Model Registry: See /pkg/modelregistry/README.md for model registration and reflection
  • Hooks Reference: See /pkg/websocketspec/hooks.go for hook types (same as MQTTSpec)
  • Subscription Management: See /pkg/websocketspec/subscription.go for subscription filtering

Comparison: MQTTSpec vs WebSocketSpec

Feature MQTTSpec WebSocketSpec
Transport MQTT (pub/sub broker) WebSocket (direct connection)
Connection Model Broker-mediated Direct client-server
QoS Levels QoS 0, 1, 2 support No built-in QoS
Offline Messages Yes (with QoS 1+) No
Auto-reconnect Yes (built into MQTT) Manual implementation needed
Network Efficiency Better for unreliable networks Better for low-latency
Best For IoT, mobile apps, distributed systems Web applications, real-time dashboards
Message Protocol Same JSON structure Same JSON structure
Hooks Same 12 hooks Same 12 hooks
CRUD Operations Identical Identical
Subscriptions Identical (via MQTT topics) Identical (via app-level)

Use Cases

IoT Sensor Data
// Sensors publish data, backend stores and notifies subscribers
handler.Registry().RegisterModel("public.sensor_readings", &SensorReading{})

// Auto-set device_id from client metadata
handler.Hooks().Register(mqttspec.BeforeCreate, func(ctx *mqttspec.HookContext) error {
    client := ctx.Metadata["mqtt_client"].(*mqttspec.Client)
    deviceID, _ := client.GetMetadata("device_id")

    if ctx.Entity == "sensor_readings" {
        if dataMap, ok := ctx.Data.(map[string]interface{}); ok {
            dataMap["device_id"] = deviceID
            dataMap["timestamp"] = time.Now()
        }
    }
    return nil
})
Mobile App with Offline Support

MQTTSpec's QoS 1 ensures messages are delivered even if the client temporarily disconnects.

Distributed Microservices

Multiple services can subscribe to entity changes and react accordingly.

Testing

Run unit tests:

go test -v ./pkg/mqttspec

Run with race detection:

go test -race -v ./pkg/mqttspec

License

This package is part of the ResolveSpec project.

Contributing

Contributions are welcome! Please ensure:

  • All tests pass (go test ./pkg/mqttspec)
  • No race conditions (go test -race ./pkg/mqttspec)
  • Documentation is updated
  • Examples are provided for new features

Support

For issues, questions, or feature requests, please open an issue in the ResolveSpec repository.

Documentation

Index

Constants

View Source
const (
	// CRUD operation hooks
	BeforeRead   = websocketspec.BeforeRead
	AfterRead    = websocketspec.AfterRead
	BeforeCreate = websocketspec.BeforeCreate
	AfterCreate  = websocketspec.AfterCreate
	BeforeUpdate = websocketspec.BeforeUpdate
	AfterUpdate  = websocketspec.AfterUpdate
	BeforeDelete = websocketspec.BeforeDelete
	AfterDelete  = websocketspec.AfterDelete

	// Subscription hooks
	BeforeSubscribe   = websocketspec.BeforeSubscribe
	AfterSubscribe    = websocketspec.AfterSubscribe
	BeforeUnsubscribe = websocketspec.BeforeUnsubscribe
	AfterUnsubscribe  = websocketspec.AfterUnsubscribe

	// Connection hooks
	BeforeConnect    = websocketspec.BeforeConnect
	AfterConnect     = websocketspec.AfterConnect
	BeforeDisconnect = websocketspec.BeforeDisconnect
	AfterDisconnect  = websocketspec.AfterDisconnect
)

Hook type constants - all 12 lifecycle hooks

View Source
const (
	MessageTypeRequest      = websocketspec.MessageTypeRequest
	MessageTypeResponse     = websocketspec.MessageTypeResponse
	MessageTypeNotification = websocketspec.MessageTypeNotification
	MessageTypeSubscription = websocketspec.MessageTypeSubscription
	MessageTypeError        = websocketspec.MessageTypeError
	MessageTypePing         = websocketspec.MessageTypePing
	MessageTypePong         = websocketspec.MessageTypePong
)

Message type constants

View Source
const (
	OperationRead        = websocketspec.OperationRead
	OperationCreate      = websocketspec.OperationCreate
	OperationUpdate      = websocketspec.OperationUpdate
	OperationDelete      = websocketspec.OperationDelete
	OperationSubscribe   = websocketspec.OperationSubscribe
	OperationUnsubscribe = websocketspec.OperationUnsubscribe
	OperationMeta        = websocketspec.OperationMeta
)

Operation type constants

Variables

View Source
var (
	// NewResponseMessage creates a new response message
	NewResponseMessage = websocketspec.NewResponseMessage

	// NewErrorResponse creates an error response
	NewErrorResponse = websocketspec.NewErrorResponse

	// NewNotificationMessage creates a notification message
	NewNotificationMessage = websocketspec.NewNotificationMessage

	// ParseMessage parses a JSON message into a Message struct
	ParseMessage = websocketspec.ParseMessage
)

Helper functions from websocketspec

Functions

This section is empty.

Types

type AuthConfig

type AuthConfig struct {
	// ValidateCredentials is called to validate username/password for embedded broker
	// Return true if credentials are valid, false otherwise
	ValidateCredentials func(username, password string) bool
}

AuthConfig for MQTT-level authentication

type BrokerConfig

type BrokerConfig struct {
	// Host to bind to (default: "localhost")
	Host string

	// Port to listen on (default: 1883)
	Port int

	// EnableWebSocket enables WebSocket support
	EnableWebSocket bool

	// WSPort is the WebSocket port (default: 8883)
	WSPort int

	// MaxConnections limits concurrent client connections
	MaxConnections int

	// KeepAlive is the client keepalive interval
	KeepAlive time.Duration

	// EnableAuth enables username/password authentication
	EnableAuth bool
}

BrokerConfig configures the embedded Mochi MQTT broker

type BrokerInterface

type BrokerInterface interface {
	// Start initializes the broker/client connection
	Start(ctx context.Context) error

	// Stop gracefully shuts down the broker/client
	Stop(ctx context.Context) error

	// Publish sends a message to a topic
	Publish(topic string, qos byte, payload []byte) error

	// Subscribe subscribes to a topic pattern with callback
	Subscribe(topicFilter string, qos byte, callback MessageCallback) error

	// Unsubscribe removes subscription
	Unsubscribe(topicFilter string) error

	// IsConnected returns connection status
	IsConnected() bool

	// GetClientManager returns the client manager
	GetClientManager() *ClientManager

	// SetHandler sets the handler reference (needed for hooks)
	SetHandler(handler *Handler)
}

BrokerInterface abstracts MQTT broker operations

type BrokerMode

type BrokerMode string

BrokerMode specifies how to connect to MQTT

const (
	// BrokerModeEmbedded runs Mochi MQTT broker in-process
	BrokerModeEmbedded BrokerMode = "embedded"
	// BrokerModeExternal connects to external MQTT broker as client
	BrokerModeExternal BrokerMode = "external"
)

type Client

type Client struct {
	// ID is the MQTT client ID (unique per connection)
	ID string

	// Username from MQTT CONNECT packet
	Username string

	// ConnectedAt is when the client connected
	ConnectedAt time.Time
	// contains filtered or unexported fields
}

Client represents an MQTT client connection

func NewClient

func NewClient(id, username string, handler *Handler) *Client

NewClient creates a new MQTT client

func (*Client) AddSubscription

func (c *Client) AddSubscription(sub *Subscription)

AddSubscription adds a subscription to this client

func (*Client) Close

func (c *Client) Close()

Close cleans up the client

func (*Client) GetMetadata

func (c *Client) GetMetadata(key string) (interface{}, bool)

GetMetadata retrieves metadata for this client

func (*Client) GetSubscription

func (c *Client) GetSubscription(subID string) (*Subscription, bool)

GetSubscription retrieves a subscription by ID

func (*Client) RemoveSubscription

func (c *Client) RemoveSubscription(subID string)

RemoveSubscription removes a subscription from this client

func (*Client) SetMetadata

func (c *Client) SetMetadata(key string, value interface{})

SetMetadata sets metadata for this client

type ClientManager

type ClientManager struct {
	// contains filtered or unexported fields
}

ClientManager manages all MQTT client connections

func NewClientManager

func NewClientManager(ctx context.Context) *ClientManager

NewClientManager creates a new client manager

func (*ClientManager) Count

func (cm *ClientManager) Count() int

Count returns the number of active clients

func (*ClientManager) GetClient

func (cm *ClientManager) GetClient(clientID string) (*Client, bool)

GetClient retrieves a client by ID

func (*ClientManager) Register

func (cm *ClientManager) Register(clientID, username string, handler *Handler) *Client

Register registers a new MQTT client

func (*ClientManager) Shutdown

func (cm *ClientManager) Shutdown()

Shutdown gracefully shuts down the client manager

func (*ClientManager) Unregister

func (cm *ClientManager) Unregister(clientID string)

Unregister removes a client

type Config

type Config struct {
	// BrokerMode determines whether to use embedded or external broker
	BrokerMode BrokerMode

	// Broker configuration for embedded mode
	Broker BrokerConfig

	// ExternalBroker configuration for external client mode
	ExternalBroker ExternalBrokerConfig

	// Topics configuration
	Topics TopicConfig

	// QoS configuration for different message types
	QoS QoSConfig

	// Auth configuration
	Auth AuthConfig

	// Timeouts for various operations
	Timeouts TimeoutConfig
}

Config holds all mqttspec configuration

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a configuration with sensible defaults

type EmbeddedBroker

type EmbeddedBroker struct {
	// contains filtered or unexported fields
}

EmbeddedBroker wraps Mochi MQTT server

func NewEmbeddedBroker

func NewEmbeddedBroker(config BrokerConfig, clientManager *ClientManager) *EmbeddedBroker

NewEmbeddedBroker creates a new embedded broker

func (*EmbeddedBroker) GetClientManager

func (eb *EmbeddedBroker) GetClientManager() *ClientManager

GetClientManager returns the client manager

func (*EmbeddedBroker) IsConnected

func (eb *EmbeddedBroker) IsConnected() bool

IsConnected returns whether the broker is running

func (*EmbeddedBroker) Publish

func (eb *EmbeddedBroker) Publish(topic string, qos byte, payload []byte) error

Publish publishes a message to a topic

func (*EmbeddedBroker) SetHandler

func (eb *EmbeddedBroker) SetHandler(handler *Handler)

SetHandler sets the handler reference

func (*EmbeddedBroker) Start

func (eb *EmbeddedBroker) Start(ctx context.Context) error

Start starts the embedded MQTT broker

func (*EmbeddedBroker) Stop

func (eb *EmbeddedBroker) Stop(ctx context.Context) error

Stop stops the embedded broker

func (*EmbeddedBroker) Subscribe

func (eb *EmbeddedBroker) Subscribe(topicFilter string, qos byte, callback MessageCallback) error

Subscribe subscribes to a topic

func (*EmbeddedBroker) Unsubscribe

func (eb *EmbeddedBroker) Unsubscribe(topicFilter string) error

Unsubscribe unsubscribes from a topic

type ErrorInfo

type ErrorInfo = websocketspec.ErrorInfo

ErrorInfo contains error details

type ExternalBrokerClient

type ExternalBrokerClient struct {
	// contains filtered or unexported fields
}

ExternalBrokerClient wraps Paho MQTT client

func NewExternalBrokerClient

func NewExternalBrokerClient(config ExternalBrokerConfig, clientManager *ClientManager) *ExternalBrokerClient

NewExternalBrokerClient creates a new external broker client

func (*ExternalBrokerClient) GetClientManager

func (ebc *ExternalBrokerClient) GetClientManager() *ClientManager

GetClientManager returns the client manager

func (*ExternalBrokerClient) IsConnected

func (ebc *ExternalBrokerClient) IsConnected() bool

IsConnected returns connection status

func (*ExternalBrokerClient) Publish

func (ebc *ExternalBrokerClient) Publish(topic string, qos byte, payload []byte) error

Publish publishes a message to a topic

func (*ExternalBrokerClient) SetHandler

func (ebc *ExternalBrokerClient) SetHandler(handler *Handler)

SetHandler sets the handler reference

func (*ExternalBrokerClient) Start

func (ebc *ExternalBrokerClient) Start(ctx context.Context) error

Start connects to the external MQTT broker

func (*ExternalBrokerClient) Stop

func (ebc *ExternalBrokerClient) Stop(ctx context.Context) error

Stop disconnects from the external broker

func (*ExternalBrokerClient) Subscribe

func (ebc *ExternalBrokerClient) Subscribe(topicFilter string, qos byte, callback MessageCallback) error

Subscribe subscribes to a topic

func (*ExternalBrokerClient) Unsubscribe

func (ebc *ExternalBrokerClient) Unsubscribe(topicFilter string) error

Unsubscribe unsubscribes from a topic

type ExternalBrokerConfig

type ExternalBrokerConfig struct {
	// BrokerURL is the broker address (e.g., tcp://host:port or ssl://host:port)
	BrokerURL string

	// ClientID is a unique identifier for this handler instance
	ClientID string

	// Username for MQTT authentication
	Username string

	// Password for MQTT authentication
	Password string

	// CleanSession flag (default: true)
	CleanSession bool

	// KeepAlive interval (default: 60s)
	KeepAlive time.Duration

	// ConnectTimeout for initial connection (default: 30s)
	ConnectTimeout time.Duration

	// ReconnectDelay between reconnection attempts (default: 5s)
	ReconnectDelay time.Duration

	// MaxReconnect attempts (0 = unlimited, default: 0)
	MaxReconnect int

	// TLSConfig for SSL/TLS connections
	TLSConfig *tls.Config
}

ExternalBrokerConfig for connecting as a client to external broker

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler handles MQTT messages and operations

func NewHandler

func NewHandler(db common.Database, registry common.ModelRegistry, config *Config) (*Handler, error)

NewHandler creates a new MQTT handler

func NewHandlerWithBun

func NewHandlerWithBun(db *bun.DB, opts ...Option) (*Handler, error)

NewHandlerWithBun creates an MQTT handler with Bun database adapter

func NewHandlerWithDatabase

func NewHandlerWithDatabase(db common.Database, registry common.ModelRegistry, opts ...Option) (*Handler, error)

NewHandlerWithDatabase creates an MQTT handler with a custom database adapter

func NewHandlerWithGORM

func NewHandlerWithGORM(db *gorm.DB, opts ...Option) (*Handler, error)

NewHandlerWithGORM creates an MQTT handler with GORM database adapter

func (*Handler) GetDatabase

func (h *Handler) GetDatabase() common.Database

GetDatabase returns the database adapter

func (*Handler) GetRelationshipInfo

func (h *Handler) GetRelationshipInfo(modelType reflect.Type, relationName string) *common.RelationshipInfo

GetRelationshipInfo is a placeholder for relationship detection

func (*Handler) Hooks

func (h *Handler) Hooks() *HookRegistry

Hooks returns the hook registry

func (*Handler) Registry

func (h *Handler) Registry() common.ModelRegistry

Registry returns the model registry

func (*Handler) Shutdown

func (h *Handler) Shutdown() error

Shutdown gracefully shuts down the handler

func (*Handler) Start

func (h *Handler) Start() error

Start initializes and starts the handler

type HookContext

type HookContext = websocketspec.HookContext

HookContext contains all context for hook execution Note: For MQTT, the Client is stored in Metadata["mqtt_client"]

type HookFunc

type HookFunc = websocketspec.HookFunc

HookFunc is a function that executes during a lifecycle hook

type HookRegistry

type HookRegistry = websocketspec.HookRegistry

HookRegistry manages all registered hooks

func NewHookRegistry

func NewHookRegistry() *HookRegistry

NewHookRegistry creates a new hook registry

type HookType

type HookType = websocketspec.HookType

HookType defines the type of lifecycle hook

type Message

type Message = websocketspec.Message

Message represents an MQTT message (identical to WebSocket message protocol)

type MessageCallback

type MessageCallback func(topic string, payload []byte)

MessageCallback is called when a message arrives

type MessageType

type MessageType = websocketspec.MessageType

MessageType defines the type of message

type NotificationMessage

type NotificationMessage = websocketspec.NotificationMessage

NotificationMessage is sent to subscribers when data changes

type OperationType

type OperationType = websocketspec.OperationType

OperationType defines the operation to perform

type Option

type Option func(*Handler) error

Option is a functional option for configuring the handler

func WithEmbeddedBroker

func WithEmbeddedBroker(config BrokerConfig) Option

WithEmbeddedBroker configures the handler to use an embedded MQTT broker

func WithExternalBroker

func WithExternalBroker(config ExternalBrokerConfig) Option

WithExternalBroker configures the handler to connect to an external MQTT broker

func WithHooks

func WithHooks(hooks *HookRegistry) Option

WithHooks sets a pre-configured hook registry

func WithQoS

func WithQoS(request, response, notification byte) Option

WithQoS sets custom QoS levels for different message types

func WithTopicPrefix

func WithTopicPrefix(prefix string) Option

WithTopicPrefix sets a custom topic prefix (default: "spec")

type QoSConfig

type QoSConfig struct {
	// Request messages QoS (default: 1 - at-least-once)
	Request byte

	// Response messages QoS (default: 1 - at-least-once)
	Response byte

	// Notification messages QoS (default: 1 - at-least-once)
	Notification byte
}

QoSConfig defines quality of service levels for different message types

type ResponseMessage

type ResponseMessage = websocketspec.ResponseMessage

ResponseMessage is sent back to clients after processing requests

type Subscription

type Subscription = websocketspec.Subscription

Subscription represents an active subscription to entity changes The key difference for MQTT: notifications are delivered via MQTT publish to spec/{client_id}/notify/{subscription_id} instead of WebSocket send

type SubscriptionManager

type SubscriptionManager = websocketspec.SubscriptionManager

SubscriptionManager manages all active subscriptions

func NewSubscriptionManager

func NewSubscriptionManager() *SubscriptionManager

NewSubscriptionManager creates a new subscription manager

type TimeoutConfig

type TimeoutConfig struct {
	// Connect timeout for MQTT connection (default: 30s)
	Connect time.Duration

	// Publish timeout for publishing messages (default: 5s)
	Publish time.Duration

	// Disconnect timeout for graceful shutdown (default: 10s)
	Disconnect time.Duration
}

TimeoutConfig defines timeouts for various operations

type TopicConfig

type TopicConfig struct {
	// Prefix for all topics (default: "spec")
	// Topics will be: {Prefix}/{client_id}/request|response|notify/{sub_id}
	Prefix string
}

TopicConfig defines the MQTT topic structure

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL