mqtt

package module
v0.0.0-...-25d3f34 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

README

MQTT Extension

The MQTT extension provides a production-ready MQTT client with support for pub/sub operations, QoS levels, and connection management.

Features

  • Pub/Sub Support: Publish and subscribe to MQTT topics
  • QoS Levels: Support for QoS 0, 1, and 2
  • TLS/SSL: Secure connections with mTLS support
  • Auto-Reconnect: Automatic reconnection on connection loss
  • Last Will and Testament: LWT message support
  • Message Persistence: Memory and file-based message stores
  • Metrics & Tracing: Built-in observability

Installation

go get github.com/eclipse/paho.mqtt.golang

Basic Usage

package main

import (
    "context"
    "log"
    
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/mqtt"
)

func main() {
    app := forge.New("my-app")
    
    // Add MQTT extension
    app.AddExtension(mqtt.NewExtension(
        mqtt.WithBroker("tcp://localhost:1883"),
        mqtt.WithClientID("my-app"),
    ))
    
    // Start application
    if err := app.Start(context.Background()); err != nil {
        log.Fatal(err)
    }
    
    // Get MQTT client
    var client mqtt.MQTT
    app.Container().Resolve(&client)
    
    // Subscribe to topic
    err := client.Subscribe("sensors/temperature", 1, func(c mqttclient.Client, msg mqttclient.Message) {
        log.Printf("Received: %s", string(msg.Payload()))
    })
    
    // Publish message
    err = client.Publish("sensors/temperature", 1, false, "25.5")
    
    app.Wait()
}

Configuration

YAML Configuration
mqtt:
  broker: tcp://localhost:1883
  client_id: my-app
  username: user
  password: pass
  clean_session: true
  keep_alive: 60s
  
  # TLS settings
  enable_tls: true
  tls_cert_file: /path/to/cert.pem
  tls_key_file: /path/to/key.pem
  tls_ca_file: /path/to/ca.pem
  
  # QoS and reliability
  default_qos: 1
  auto_reconnect: true
  max_reconnect_delay: 10m
  
  # Last Will and Testament
  will_enabled: true
  will_topic: clients/status
  will_payload: offline
  will_qos: 1
  will_retained: true
  
  # Message handling
  message_store: file
  store_directory: /var/mqtt/store
Programmatic Configuration
app.AddExtension(mqtt.NewExtension(
    mqtt.WithBroker("tcp://localhost:1883"),
    mqtt.WithClientID("my-app"),
    mqtt.WithCredentials("user", "pass"),
    mqtt.WithTLS("cert.pem", "key.pem", "ca.pem", false),
    mqtt.WithQoS(1),
    mqtt.WithWill("clients/status", "offline", 1, true),
    mqtt.WithAutoReconnect(true),
))

Publishing

Synchronous Publishing
err := client.Publish(
    "sensors/temperature",
    1,     // QoS
    false, // retained
    "25.5",
)
Asynchronous Publishing
err := client.PublishAsync(
    "sensors/temperature",
    1,
    false,
    "25.5",
)
Publishing JSON
type SensorData struct {
    Temperature float64 `json:"temperature"`
    Humidity    float64 `json:"humidity"`
}

data := SensorData{Temperature: 25.5, Humidity: 60.0}
err := client.Publish("sensors/data", 1, false, data)

Subscribing

Single Topic
err := client.Subscribe("sensors/+/temperature", 1, func(c mqttclient.Client, msg mqttclient.Message) {
    log.Printf("Topic: %s, Payload: %s", msg.Topic(), string(msg.Payload()))
})
Multiple Topics
filters := map[string]byte{
    "sensors/+/temperature": 1,
    "sensors/+/humidity":    2,
}

err := client.SubscribeMultiple(filters, func(c mqttclient.Client, msg mqttclient.Message) {
    log.Printf("Received from %s: %s", msg.Topic(), string(msg.Payload()))
})
Unsubscribe
err := client.Unsubscribe("sensors/+/temperature")

QoS Levels

MQTT supports three Quality of Service levels:

  • QoS 0: At most once (fire and forget)
  • QoS 1: At least once (acknowledged delivery)
  • QoS 2: Exactly once (assured delivery)
// QoS 0 - Fire and forget
client.Publish("topic", 0, false, "message")

// QoS 1 - At least once
client.Publish("topic", 1, false, "message")

// QoS 2 - Exactly once
client.Publish("topic", 2, false, "message")

Topic Wildcards

MQTT supports two wildcards:

  • +: Single level wildcard
  • #: Multi-level wildcard
// Subscribe to all sensors
client.Subscribe("sensors/+/temperature", 1, handler)

// Subscribe to all messages under sensors
client.Subscribe("sensors/#", 1, handler)

Retained Messages

// Publish retained message
client.Publish("sensors/temperature", 1, true, "25.5")

// New subscribers will immediately receive the last retained message

Last Will and Testament

app.AddExtension(mqtt.NewExtension(
    mqtt.WithWill(
        "clients/status",  // topic
        "offline",         // payload
        1,                 // qos
        true,              // retained
    ),
))

TLS/SSL Configuration

Basic TLS
app.AddExtension(mqtt.NewExtension(
    mqtt.WithBroker("ssl://localhost:8883"),
    mqtt.WithTLS("", "", "ca.pem", false),
))
Mutual TLS (mTLS)
app.AddExtension(mqtt.NewExtension(
    mqtt.WithBroker("ssl://localhost:8883"),
    mqtt.WithTLS(
        "client-cert.pem",
        "client-key.pem",
        "ca.pem",
        false, // skipVerify
    ),
))

Connection Management

Connection Status
if client.IsConnected() {
    log.Println("Connected to broker")
}
Manual Reconnect
if err := client.Reconnect(); err != nil {
    log.Printf("Reconnect failed: %v", err)
}
Disconnect
if err := client.Disconnect(ctx); err != nil {
    log.Printf("Disconnect failed: %v", err)
}

Observability

Metrics

The extension automatically tracks:

  • mqtt.connections - Connection status counter
  • mqtt.messages.published - Messages published counter
  • mqtt.messages.received - Messages received counter
  • mqtt.reconnects - Reconnection attempts counter
Client Statistics
stats := client.GetStats()
log.Printf("Connected: %v", stats.Connected)
log.Printf("Messages sent: %d", stats.MessagesSent)
log.Printf("Messages received: %d", stats.MessagesReceived)
log.Printf("Reconnects: %d", stats.Reconnects)
Subscriptions
subs := client.GetSubscriptions()
for _, sub := range subs {
    log.Printf("Topic: %s, QoS: %d", sub.Topic, sub.QoS)
}

Best Practices

  1. Use Clean Session: Set clean_session: false for persistent sessions
  2. QoS Selection: Use QoS 1 for most use cases (balance between reliability and performance)
  3. Topic Design: Use hierarchical topics (e.g., building/floor/room/sensor)
  4. Retained Messages: Use for status messages that new subscribers should receive
  5. LWT Configuration: Always configure Last Will and Testament for clients
  6. Wildcard Subscriptions: Be careful with # wildcard, it can receive many messages

Message Store

Memory Store (Default)
app.AddExtension(mqtt.NewExtension(
    mqtt.WithConfig(mqtt.Config{
        MessageStore: "memory",
    }),
))
File Store
app.AddExtension(mqtt.NewExtension(
    mqtt.WithConfig(mqtt.Config{
        MessageStore:   "file",
        StoreDirectory: "/var/mqtt/store",
    }),
))

Error Handling

if err := client.Publish(topic, qos, retained, payload); err != nil {
    switch err {
    case mqtt.ErrNotConnected:
        log.Println("Not connected to broker")
    case mqtt.ErrPublishFailed:
        log.Println("Publish failed")
    default:
        log.Printf("Error: %v", err)
    }
}

Testing

func TestMQTTIntegration(t *testing.T) {
    app := forge.New("test-app")
    app.AddExtension(mqtt.NewExtension(
        mqtt.WithBroker("tcp://localhost:1883"),
    ))
    
    ctx := context.Background()
    if err := app.Start(ctx); err != nil {
        t.Fatal(err)
    }
    defer app.Stop(ctx)
    
    var client mqtt.MQTT
    app.Container().Resolve(&client)
    
    // Test publish
    err := client.Publish("test/topic", 1, false, "test message")
    if err != nil {
        t.Fatalf("Publish failed: %v", err)
    }
}

License

Part of the Forge framework.

Documentation

Index

Constants

View Source
const (
	// ServiceKey is the DI key for the MQTT service.
	ServiceKey = "mqtt"
)

DI container keys for MQTT extension services.

Variables

View Source
var (
	// ErrNotConnected is returned when operation requires connection
	ErrNotConnected = errors.New("mqtt: not connected")

	// ErrAlreadyConnected is returned when already connected
	ErrAlreadyConnected = errors.New("mqtt: already connected")

	// ErrConnectionFailed is returned when connection fails
	ErrConnectionFailed = errors.New("mqtt: connection failed")

	// ErrPublishFailed is returned when publish fails
	ErrPublishFailed = errors.New("mqtt: publish failed")

	// ErrSubscribeFailed is returned when subscription fails
	ErrSubscribeFailed = errors.New("mqtt: subscribe failed")

	// ErrUnsubscribeFailed is returned when unsubscribe fails
	ErrUnsubscribeFailed = errors.New("mqtt: unsubscribe failed")

	// ErrInvalidQoS is returned when QoS value is invalid
	ErrInvalidQoS = errors.New("mqtt: invalid QoS value")

	// ErrInvalidTopic is returned when topic is invalid
	ErrInvalidTopic = errors.New("mqtt: invalid topic")

	// ErrTimeout is returned when operation times out
	ErrTimeout = errors.New("mqtt: operation timeout")
)

Functions

func NewExtension

func NewExtension(opts ...ConfigOption) forge.Extension

NewExtension creates a new MQTT extension

func NewExtensionWithConfig

func NewExtensionWithConfig(config Config) forge.Extension

NewExtensionWithConfig creates a new MQTT extension with a complete config

Types

type ClientStats

type ClientStats struct {
	Connected        bool
	ConnectTime      time.Time
	LastMessageTime  time.Time
	MessagesReceived int64
	MessagesSent     int64
	Subscriptions    int
	Reconnects       int64
}

ClientStats contains client statistics

type Config

type Config struct {
	// Connection settings
	Broker            string        `json:"broker" yaml:"broker" mapstructure:"broker"`
	ClientID          string        `json:"client_id" yaml:"client_id" mapstructure:"client_id"`
	Username          string        `json:"username,omitempty" yaml:"username,omitempty" mapstructure:"username"`
	Password          string        `json:"password,omitempty" yaml:"password,omitempty" mapstructure:"password"`
	CleanSession      bool          `json:"clean_session" yaml:"clean_session" mapstructure:"clean_session"`
	ConnectTimeout    time.Duration `json:"connect_timeout" yaml:"connect_timeout" mapstructure:"connect_timeout"`
	KeepAlive         time.Duration `json:"keep_alive" yaml:"keep_alive" mapstructure:"keep_alive"`
	PingTimeout       time.Duration `json:"ping_timeout" yaml:"ping_timeout" mapstructure:"ping_timeout"`
	MaxReconnectDelay time.Duration `json:"max_reconnect_delay" yaml:"max_reconnect_delay" mapstructure:"max_reconnect_delay"`

	// TLS/SSL
	EnableTLS     bool   `json:"enable_tls" yaml:"enable_tls" mapstructure:"enable_tls"`
	TLSCertFile   string `json:"tls_cert_file,omitempty" yaml:"tls_cert_file,omitempty" mapstructure:"tls_cert_file"`
	TLSKeyFile    string `json:"tls_key_file,omitempty" yaml:"tls_key_file,omitempty" mapstructure:"tls_key_file"`
	TLSCAFile     string `json:"tls_ca_file,omitempty" yaml:"tls_ca_file,omitempty" mapstructure:"tls_ca_file"`
	TLSSkipVerify bool   `json:"tls_skip_verify" yaml:"tls_skip_verify" mapstructure:"tls_skip_verify"`

	// QoS settings
	DefaultQoS byte `json:"default_qos" yaml:"default_qos" mapstructure:"default_qos"`

	// Retry and reliability
	AutoReconnect        bool          `json:"auto_reconnect" yaml:"auto_reconnect" mapstructure:"auto_reconnect"`
	ResumeSubs           bool          `json:"resume_subs" yaml:"resume_subs" mapstructure:"resume_subs"`
	MaxReconnectAttempts int           `json:"max_reconnect_attempts" yaml:"max_reconnect_attempts" mapstructure:"max_reconnect_attempts"`
	WriteTimeout         time.Duration `json:"write_timeout" yaml:"write_timeout" mapstructure:"write_timeout"`

	// Message handling
	MessageChannelDepth uint   `json:"message_channel_depth" yaml:"message_channel_depth" mapstructure:"message_channel_depth"`
	OrderMatters        bool   `json:"order_matters" yaml:"order_matters" mapstructure:"order_matters"`
	MessageStore        string `json:"message_store" yaml:"message_store" mapstructure:"message_store"` // "memory", "file"
	StoreDirectory      string `json:"store_directory,omitempty" yaml:"store_directory,omitempty" mapstructure:"store_directory"`

	// Last Will and Testament
	WillEnabled  bool   `json:"will_enabled" yaml:"will_enabled" mapstructure:"will_enabled"`
	WillTopic    string `json:"will_topic,omitempty" yaml:"will_topic,omitempty" mapstructure:"will_topic"`
	WillPayload  string `json:"will_payload,omitempty" yaml:"will_payload,omitempty" mapstructure:"will_payload"`
	WillQoS      byte   `json:"will_qos" yaml:"will_qos" mapstructure:"will_qos"`
	WillRetained bool   `json:"will_retained" yaml:"will_retained" mapstructure:"will_retained"`

	// Observability
	EnableMetrics bool `json:"enable_metrics" yaml:"enable_metrics" mapstructure:"enable_metrics"`
	EnableTracing bool `json:"enable_tracing" yaml:"enable_tracing" mapstructure:"enable_tracing"`
	EnableLogging bool `json:"enable_logging" yaml:"enable_logging" mapstructure:"enable_logging"`

	// Config loading flags
	RequireConfig bool `json:"-" yaml:"-" mapstructure:"-"`
}

Config contains configuration for the MQTT extension

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns default MQTT configuration

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration

type ConfigOption

type ConfigOption func(*Config)

ConfigOption is a functional option for Config

func WithAutoReconnect

func WithAutoReconnect(enable bool) ConfigOption

func WithBroker

func WithBroker(broker string) ConfigOption

func WithCleanSession

func WithCleanSession(clean bool) ConfigOption

func WithClientID

func WithClientID(clientID string) ConfigOption

func WithConfig

func WithConfig(config Config) ConfigOption

func WithCredentials

func WithCredentials(username, password string) ConfigOption

func WithKeepAlive

func WithKeepAlive(duration time.Duration) ConfigOption

func WithMetrics

func WithMetrics(enable bool) ConfigOption

func WithQoS

func WithQoS(qos byte) ConfigOption

func WithRequireConfig

func WithRequireConfig(require bool) ConfigOption

func WithTLS

func WithTLS(certFile, keyFile, caFile string, skipVerify bool) ConfigOption

func WithTracing

func WithTracing(enable bool) ConfigOption

func WithWill

func WithWill(topic, payload string, qos byte, retained bool) ConfigOption

type ConnectHandler

type ConnectHandler = mqttclient.OnConnectHandler

ConnectHandler is called when connection is established

type ConnectionLostHandler

type ConnectionLostHandler = mqttclient.ConnectionLostHandler

ConnectionLostHandler is called when connection is lost

type Extension

type Extension struct {
	*forge.BaseExtension
	// contains filtered or unexported fields
}

Extension implements forge.Extension for MQTT functionality. The extension is now a lightweight facade that loads config and registers services.

func (*Extension) Health

func (e *Extension) Health(ctx context.Context) error

Health checks the extension health. Service health is managed by Vessel through MQTTService.Health().

func (*Extension) Register

func (e *Extension) Register(app forge.App) error

Register registers the MQTT extension with the app

func (*Extension) Start

func (e *Extension) Start(ctx context.Context) error

Start marks the extension as started. The actual client is started by Vessel calling MQTTService.Start().

func (*Extension) Stop

func (e *Extension) Stop(ctx context.Context) error

Stop marks the extension as stopped. The actual client is stopped by Vessel calling MQTTService.Stop().

type MQTT

type MQTT interface {
	// Connection management
	Connect(ctx context.Context) error
	Disconnect(ctx context.Context) error
	IsConnected() bool
	Reconnect() error

	// Publishing
	Publish(topic string, qos byte, retained bool, payload interface{}) error
	PublishAsync(topic string, qos byte, retained bool, payload interface{}) error

	// Subscribing
	Subscribe(topic string, qos byte, handler MessageHandler) error
	SubscribeMultiple(filters map[string]byte, handler MessageHandler) error
	Unsubscribe(topics ...string) error

	// Message handling
	AddRoute(topic string, handler MessageHandler)
	SetDefaultHandler(handler MessageHandler)
	SetOnConnectHandler(handler ConnectHandler)
	SetConnectionLostHandler(handler ConnectionLostHandler)
	SetReconnectingHandler(handler ReconnectingHandler)

	// Client info
	GetClient() mqttclient.Client
	GetStats() ClientStats
	GetSubscriptions() []SubscriptionInfo

	// Health
	Ping(ctx context.Context) error
}

MQTT represents a unified MQTT client interface

func NewMQTTClient

func NewMQTTClient(config Config, logger forge.Logger, metrics forge.Metrics) MQTT

NewMQTTClient creates a new MQTT client

type MQTTService

type MQTTService struct {
	// contains filtered or unexported fields
}

MQTTService wraps an MQTT client and provides lifecycle management. It implements vessel's di.Service interface so Vessel can manage its lifecycle.

func NewMQTTService

func NewMQTTService(config Config, logger forge.Logger, metrics forge.Metrics) (*MQTTService, error)

NewMQTTService creates a new MQTT service with the given configuration. This is the constructor that will be registered with the DI container.

func (*MQTTService) Client

func (s *MQTTService) Client() MQTT

Client returns the underlying MQTT client.

func (*MQTTService) Connect

func (s *MQTTService) Connect(ctx context.Context) error

func (*MQTTService) Disconnect

func (s *MQTTService) Disconnect(ctx context.Context) error

func (*MQTTService) Health

func (s *MQTTService) Health(ctx context.Context) error

Health checks if the MQTT service is healthy.

func (*MQTTService) IsConnected

func (s *MQTTService) IsConnected() bool

func (*MQTTService) Name

func (s *MQTTService) Name() string

Name returns the service name for Vessel's lifecycle management.

func (*MQTTService) Ping

func (s *MQTTService) Ping(ctx context.Context) error

func (*MQTTService) Publish

func (s *MQTTService) Publish(ctx context.Context, topic string, payload []byte, opts ...PublishOption) error

func (*MQTTService) Start

func (s *MQTTService) Start(ctx context.Context) error

Start starts the MQTT service by connecting to broker. This is called automatically by Vessel during container.Start().

func (*MQTTService) Stop

func (s *MQTTService) Stop(ctx context.Context) error

Stop stops the MQTT service by disconnecting from broker. This is called automatically by Vessel during container.Stop().

func (*MQTTService) Subscribe

func (s *MQTTService) Subscribe(ctx context.Context, topic string, handler MessageHandler, opts ...SubscribeOption) error

func (*MQTTService) Unsubscribe

func (s *MQTTService) Unsubscribe(ctx context.Context, topic string) error

type MessageHandler

type MessageHandler = mqttclient.MessageHandler

MessageHandler processes incoming MQTT messages This is an alias to the paho MQTT MessageHandler for compatibility

type ReconnectingHandler

type ReconnectingHandler = mqttclient.ReconnectHandler

ReconnectingHandler is called when client is reconnecting

type SubscriptionInfo

type SubscriptionInfo struct {
	Topic   string
	QoS     byte
	Handler string // Handler function name
}

SubscriptionInfo contains subscription metadata

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL