mqtt

package module
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2025 License: MIT Imports: 34 Imported by: 0

README

mqtt

| API Documentation | Release Notes |

Overview

This module provides an MQTTv5 client with automatic session, reconnection, and retry logic. It is designed to allow you to focus on application logic without needing to consider the underlying connection state.

This module is intended for use with the Azure IoT Operations MQTT Broker, but it is compatible with any MQTTv5 broker.

Simple Send and Receive

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/Azure/iot-operations-sdks/go/mqtt"
)

const (
	clientID = "aio_example_client"
	hostname = "localhost"
	port     = 1883
	topic    = "hello/mqtt"
)

func main() {
	ctx := context.Background()

	// Create a new session client with the above settings.
	client, _ := mqtt.NewSessionClient(
		clientID,
		mqtt.TCPConnection(hostname, port),
	)

	// Message handlers should be registered before calling Start unless using
	// mqtt.WithCleanStart(true), to handle messages from an existing session.
	done := client.RegisterMessageHandler(
		func(ctx context.Context, msg *mqtt.Message) {
			fmt.Printf("Received: %s\n", msg.Payload)
		},
	)
	defer done()

	// Note: Error handling omitted for simplicity. In addition to error return
	// values, client.RegisterFatalErrorHandler is recommended to handle any
	// unrecoverable errors encountered during connection attempts.

	client.Start()
	defer client.Stop()

	// Subscribe to the topic.
	client.Subscribe(ctx, topic, mqtt.WithQoS(1))
	defer client.Unsubscribe(ctx, topic)

	// Publish 10 messages, then exit.
	for i := range 10 {
		client.Publish(ctx, topic, []byte(fmt.Sprintf("Hello %d", i+1)))
		time.Sleep(time.Second)
	}
}

Documentation

Overview

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Index

Constants

View Source
const AIOPersistence = "aio-persistence"

AIOPersistence is the user-property used to indicate to the AIO broker that it should persist messages to disk.

Variables

This section is empty.

Functions

func IsTopicFilterMatch added in v0.2.0

func IsTopicFilterMatch(topicFilter, topicName string) bool

IsTopicFilterMatch checks if a topic name matches a topic filter.

func RandomClientID added in v0.4.0

func RandomClientID() string

RandomClientID generates a random valid MQTT client ID. This should never be used in production (as it fully invalidates all session guarantees) but can be useful in testing to prevent parallel tests from conflicting.

Types

type AIOBrokerFeatureError added in v0.4.1

type AIOBrokerFeatureError struct {
	// contains filtered or unexported fields
}

AIOBrokerFeatureError indicates that a feature specific to the AIO Broker was used when AIO Broker features were explicitly disabled.

func (*AIOBrokerFeatureError) Error added in v0.4.1

func (e *AIOBrokerFeatureError) Error() string

type Ack added in v0.2.0

type Ack = mqtt.Ack

Ack contains values from PUBACK/SUBACK/UNSUBACK packets received from the MQTT server.

type ClientState added in v0.3.0

type ClientState byte

ClientState indicates the current state of the session client.

const (
	// The session client has not yet been started.
	NotStarted ClientState = iota

	// The session client has been started and has not yet been stopped by the
	// user or terminated due to a fatal error.
	Started

	// The session client has been stopped by the user or terminated due to a
	// fatal error.
	ShutDown
)

type ClientStateError added in v0.3.0

type ClientStateError struct {
	State ClientState
}

ClientStateError is returned when the operation cannot proceed due to the state of the session client.

func (*ClientStateError) Error added in v0.3.0

func (e *ClientStateError) Error() string

type ConnackError added in v0.3.0

type ConnackError struct {
	ReasonCode byte
}

ConnackError indicates that the session client received a CONNACK with a reason code that indicates an error but is not deemed to be fatal. It may appear as a fatal error if it is the final error returned once the session client has exhausted its connection retries.

func (*ConnackError) Error added in v0.3.0

func (e *ConnackError) Error() string

type ConnectEvent added in v0.2.0

type ConnectEvent = mqtt.ConnectEvent

ConnectEvent contains the relevent metadata provided to the handler when the MQTT client connects to the broker.

type ConnectEventHandler added in v0.2.0

type ConnectEventHandler = mqtt.ConnectEventHandler

ConnectEventHandler is a user-defined callback function used to respond to connection notifications from the MQTT client.

type ConnectionError added in v0.3.0

type ConnectionError struct {
	// contains filtered or unexported fields
}

ConnectionError indicates that the session client has terminated due to an issue opening the network connection to the MQTT server. It may wrap an underlying error using Go standard error wrapping.

func (*ConnectionError) Error added in v0.3.0

func (e *ConnectionError) Error() string

func (*ConnectionError) Unwrap added in v0.3.0

func (e *ConnectionError) Unwrap() error

type ConnectionProvider added in v0.3.0

type ConnectionProvider func(context.Context) (net.Conn, error)

ConnectionProvider is a function that returns a net.Conn connected to an MQTT server that is ready to read to and write from. Note that the returned net.Conn must be thread-safe (i.e., concurrent Write calls must not interleave).

func TCPConnection added in v0.3.0

func TCPConnection(hostname string, port uint16) ConnectionProvider

TCPConnection is a connection provider that connects to an MQTT server over TCP.

func TLSConnection added in v0.3.0

func TLSConnection(
	hostname string,
	port uint16,
	opts ...TLSOption,
) ConnectionProvider

TLSConnection is a connection provider that connects to an MQTT server with TLS over TCP.

type DisconnectError added in v0.3.0

type DisconnectError struct {
	ReasonCode byte
}

DisconnectError indicates that the session client received a DISCONNECT packet from the server with a reason code that is not deemed to be fatal. It is primarily used for internal tracking and should not be expected by users except in rare cases in logs.

func (*DisconnectError) Error added in v0.3.0

func (e *DisconnectError) Error() string

type DisconnectEvent added in v0.2.0

type DisconnectEvent = mqtt.DisconnectEvent

DisconnectEvent contains the relevent metadata provided to the handler when the MQTT client disconnects from the broker.

type DisconnectEventHandler added in v0.2.0

type DisconnectEventHandler = mqtt.DisconnectEventHandler

DisconnectEventHandler is a user-defined callback function used to respond to disconnection notifications from the MQTT client.

type Env added in v0.4.0

type Env struct {
	ClientID           string
	ConnectionProvider ConnectionProvider
	*SessionClientOptions
}

Env provides all session client parameters parsed from well-known environment variables.

func SessionClientConfigFromEnv added in v0.3.0

func SessionClientConfigFromEnv() (*Env, error)

SessionClientConfigFromEnv parses a session client configuration from well-known environment variables. Note that this will only return an error if the environment variables parse incorrectly; it will not return an error if required parameters (e.g. for the connection provider) are missing, to allow optional parameters to be specified from environment independently.

type FatalConnackError added in v0.3.0

type FatalConnackError struct {
	ReasonCode byte
}

FatalConnackError indicates that the session client has terminated due to receiving a CONNACK with with a reason code that is deemed to be fatal.

func (*FatalConnackError) Error added in v0.3.0

func (e *FatalConnackError) Error() string

type FatalDisconnectError added in v0.3.0

type FatalDisconnectError struct {
	ReasonCode byte
}

FatalDisconnectError indicates that the session client has terminated due to receiving a DISCONNECT packet from the server with a reason code that is deemed to be fatal.

func (*FatalDisconnectError) Error added in v0.3.0

func (e *FatalDisconnectError) Error() string

type HandlerPanicError added in v0.4.1

type HandlerPanicError struct {
	// contains filtered or unexported fields
}

HandlerPanicError indicates that a user-provided handler panicked. This error will never be returned, only logged.

func (*HandlerPanicError) Error added in v0.4.1

func (e *HandlerPanicError) Error() string

type InvalidArgumentError added in v0.3.0

type InvalidArgumentError struct {
	// contains filtered or unexported fields
}

InvalidArgumentError indicates that the user has provided an invalid value for an option. It may wrap an underlying error using Go standard error wrapping.

func (*InvalidArgumentError) Error added in v0.3.0

func (e *InvalidArgumentError) Error() string

func (*InvalidArgumentError) Unwrap added in v0.3.0

func (e *InvalidArgumentError) Unwrap() error

type Message added in v0.2.0

type Message = mqtt.Message

Message represents a received message.

type MessageHandler added in v0.2.0

type MessageHandler = mqtt.MessageHandler

MessageHandler is a user-defined callback function used to handle messages received on the subscribed topic.

type PasswordProvider added in v0.3.0

type PasswordProvider func(context.Context) ([]byte, bool, error)

PasswordProvider is a function that returns an MQTT password and flag. Note that if the returned flag is false, the returned password is ignored.

func ConstantPassword added in v0.3.0

func ConstantPassword(password []byte) PasswordProvider

ConstantPassword is a PasswordProvider implementation that returns an unchanging password. This can be used if the password does not need to be updated between MQTT connections.

func FilePassword added in v0.3.0

func FilePassword(filename string) PasswordProvider

FilePassword is a PasswordProvider implementation that reads a password from a given filename for each MQTT connection.

type PublishOption added in v0.2.0

type PublishOption = mqtt.PublishOption

PublishOption represents a single publish option.

func WithPersist added in v0.4.1

func WithPersist() PublishOption

WithPersist is a convenience option to set the AIO persistence flag on a PUBLISH request.

type PublishOptions added in v0.2.0

type PublishOptions = mqtt.PublishOptions

PublishOptions are the resolved publish options.

type PublishQueueFullError added in v0.3.0

type PublishQueueFullError struct{}

PublishQueueFullError is returned if there are too many publishes enqueued and the session client is not accepting any more. This should very rarely occur, and if it does, it is a sign that either the connection is unstable or the application is sending messages at a faster rate than can be handled by the session client or server.

func (*PublishQueueFullError) Error added in v0.3.0

func (*PublishQueueFullError) Error() string

type SessionClient

type SessionClient struct {
	// contains filtered or unexported fields
}

SessionClient implements an MQTT session client supporting MQTT v5 with QoS 0 and QoS 1 support.

func NewSessionClient

func NewSessionClient(
	clientID string,
	connectionProvider ConnectionProvider,
	opts ...SessionClientOption,
) (*SessionClient, error)

NewSessionClient constructs a new session client with user options.

func NewSessionClientFromEnv

func NewSessionClientFromEnv(
	opt ...SessionClientOption,
) (*SessionClient, error)

NewSessionClientFromEnv is a shorthand for constructing a session client using SessionClientConfigFromEnv.

func (*SessionClient) ID added in v0.2.0

func (c *SessionClient) ID() string

ID returns the MQTT client ID for this session client.

func (*SessionClient) Publish

func (c *SessionClient) Publish(
	ctx context.Context,
	topic string,
	payload []byte,
	opts ...PublishOption,
) (*Ack, error)

Publish a MQTT message on the given topic.

func (*SessionClient) RegisterConnectEventHandler added in v0.2.0

func (c *SessionClient) RegisterConnectEventHandler(
	handler ConnectEventHandler,
) func()

RegisterConnectEventHandler registers a handler to a list of handlers that are called synchronously in registration order whenever the session client successfully establishes an MQTT connection. Note that since the handler gets called synchronously, handlers should not block for an extended period of time to avoid blocking the session client.

func (*SessionClient) RegisterDisconnectEventHandler added in v0.2.0

func (c *SessionClient) RegisterDisconnectEventHandler(
	handler DisconnectEventHandler,
) func()

RegisterDisconnectEventHandler registers a handler to a list of handlers that are called synchronously in registration order whenever the session client detects a disconnection from the MQTT server. Note that since the handler gets called synchronously, handlers should not block for an extended period of time to avoid blocking the session client.

func (*SessionClient) RegisterFatalErrorHandler added in v0.2.0

func (c *SessionClient) RegisterFatalErrorHandler(
	handler func(error),
) func()

RegisterFatalErrorHandler registers a handler that is called in a goroutine if the session client terminates due to a fatal error.

func (*SessionClient) RegisterMessageHandler added in v0.2.0

func (c *SessionClient) RegisterMessageHandler(handler MessageHandler) func()

RegisterMessageHandler registers a message handler on this client. Returns a callback to remove the message handler.

func (*SessionClient) Start added in v0.2.0

func (c *SessionClient) Start() error

Start the session client, spawning any necessary background goroutines. In order to terminate the session client and clean up any running goroutines, Stop() must be called after calling Start().

func (*SessionClient) Stop added in v0.2.0

func (c *SessionClient) Stop() error

Stop the session client, terminating any pending operations and cleaning up background goroutines.

func (*SessionClient) Subscribe

func (c *SessionClient) Subscribe(
	ctx context.Context,
	topic string,
	opts ...SubscribeOption,
) (*Ack, error)

Subscribe to the given topic.

func (*SessionClient) Unsubscribe added in v0.2.0

func (c *SessionClient) Unsubscribe(
	ctx context.Context,
	topic string,
	opts ...UnsubscribeOption,
) (*Ack, error)

Unsubscribe from the given topic.

type SessionClientOption

type SessionClientOption interface {
	// contains filtered or unexported methods
}

SessionClientOption represents a single option for the session client.

func WithAuth added in v0.3.0

func WithAuth(provider auth.Provider) SessionClientOption

WithAuth sets the enhanced authentication provider for the session client.

func WithConnectPersist added in v0.4.1

func WithConnectPersist() SessionClientOption

WithConnectPersist is a convenience option to set the AIO persistence flag on the CONNECT request.

func WithConnectionRetry added in v0.3.0

func WithConnectionRetry(policy retry.Policy) SessionClientOption

WithConnectionRetry sets the connection retry policy for the session client.

func WithLogger added in v0.2.0

func WithLogger(log *slog.Logger) SessionClientOption

WithLogger sets the logger for the session client.

type SessionClientOptions added in v0.3.0

type SessionClientOptions struct {
	CleanStart               bool
	KeepAlive                uint16
	SessionExpiry            uint32
	ReceiveMaximum           uint16
	ConnectUserProperties    map[string]string
	DisableAIOBrokerFeatures bool

	ConnectionRetry   retry.Policy
	ConnectionTimeout time.Duration

	Username UsernameProvider
	Password PasswordProvider
	Auth     auth.Provider

	Logger *slog.Logger
}

SessionClientOptions are the resolved options for the session client.

func (*SessionClientOptions) Apply added in v0.3.0

func (o *SessionClientOptions) Apply(
	opts []SessionClientOption,
	rest ...SessionClientOption,
)

Apply resolves the provided list of options.

type SessionLostError added in v0.3.0

type SessionLostError struct{}

SessionLostError indicates that the session client has terminated due to receiving a CONNACK with session present false when reconnecting.

func (*SessionLostError) Error added in v0.3.0

func (*SessionLostError) Error() string

type SubscribeOption added in v0.2.0

type SubscribeOption = mqtt.SubscribeOption

SubscribeOption represents a single subscribe option.

type SubscribeOptions added in v0.2.0

type SubscribeOptions = mqtt.SubscribeOptions

SubscribeOptions are the resolved subscribe options.

type TLSOption added in v0.3.0

type TLSOption func(context.Context, *tls.Config) error

TLSOption is a function that modifies a tls.Config to be used when opening a TLS connection to an MQTT server. More than one can be provided to TLSConnection; they will be executed in order, with the first passed the empty (default) TLS config. See tls.Config for more information on TLS configuration options.

func WithCA added in v0.3.0

func WithCA(caFile string) TLSOption

WithCA loads a CA certificate pool into the root CAs of the TLS configuration.

func WithEncryptedX509 added in v0.3.0

func WithEncryptedX509(certFile, keyFile, passFile string) TLSOption

WithEncryptedX509 appends an X509 certificate to the TLS certificates, using a password file to decrypt the certificate key.

func WithX509 added in v0.3.0

func WithX509(certFile, keyFile string) TLSOption

WithX509 appends an X509 certificate to the TLS certificates.

type UnsubscribeOption added in v0.2.0

type UnsubscribeOption = mqtt.UnsubscribeOption

UnsubscribeOption represents a single unsubscribe option.

type UnsubscribeOptions added in v0.2.0

type UnsubscribeOptions = mqtt.UnsubscribeOptions

UnsubscribeOptions are the resolve unsubscribe options.

type UsernameProvider added in v0.3.0

type UsernameProvider func(context.Context) (string, bool, error)

UsernameProvider is a function that returns an MQTT username and flag. Note that if the returned flag is false, the returned username is ignored.

func ConstantUsername added in v0.3.0

func ConstantUsername(username string) UsernameProvider

ConstantUsername is a UsernameProvider implementation that returns an unchanging username. This can be used if the username does not need to be updated between MQTT connections.

type WithCleanStart

type WithCleanStart bool

WithCleanStart sets whether the initial connection will be made without retaining any existing session state. This is by definition set to false for any reconnections.

type WithConnectUserProperties added in v0.3.0

type WithConnectUserProperties map[string]string

WithConnectUserProperties sets the user properties for the CONNECT packet.

type WithConnectionTimeout

type WithConnectionTimeout time.Duration

WithConnectionTimeout sets the connection timeout for a single connection attempt. If a timeout is desired for the entire connection process, it should be specified via the connection retry policy.

type WithContentType added in v0.2.0

type WithContentType = mqtt.WithContentType

WithContentType sets the content type for the publish.

type WithCorrelationData added in v0.2.0

type WithCorrelationData = mqtt.WithCorrelationData

WithCorrelationData sets the correlation data for the publish.

type WithDisableAIOBrokerFeatures added in v0.4.0

type WithDisableAIOBrokerFeatures bool

WithDisableAIOBrokerFeatures disables behavior specific to the AIO Broker. Only use this option if you are using another broker and encounter failures.

type WithKeepAlive

type WithKeepAlive uint16

WithKeepAlive sets the keep-alive interval (in seconds).

type WithMessageExpiry added in v0.2.0

type WithMessageExpiry = mqtt.WithMessageExpiry

WithMessageExpiry sets the message expiry interval for the publish.

type WithNoLocal added in v0.2.0

type WithNoLocal = mqtt.WithNoLocal

WithNoLocal sets the no local flag for the subscription.

type WithPassword

type WithPassword PasswordProvider

WithPassword sets the PasswordProvider that the session client uses to get the password for each connection.

type WithPayloadFormat added in v0.2.0

type WithPayloadFormat = mqtt.WithPayloadFormat

WithPayloadFormat sets the payload format indicator for the publish.

type WithQoS added in v0.2.0

type WithQoS = mqtt.WithQoS

WithQoS sets the QoS level for the publish or subscribe.

type WithReceiveMaximum

type WithReceiveMaximum uint16

WithReceiveMaximum sets the client-side receive maximum.

type WithResponseTopic added in v0.2.0

type WithResponseTopic = mqtt.WithResponseTopic

WithResponseTopic sets the response topic for the publish.

type WithRetain added in v0.2.0

type WithRetain = mqtt.WithRetain

WithRetain sets the retain flag for the publish or the retain-as-publish flag for the subscribe.

type WithRetainHandling added in v0.2.0

type WithRetainHandling = mqtt.WithRetainHandling

WithRetainHandling specifies the handling of retained messages on the subscribe.

type WithSessionExpiry

type WithSessionExpiry uint32

WithSessionExpiry sets the session expiry interval (in seconds).

type WithUserProperties added in v0.2.0

type WithUserProperties = mqtt.WithUserProperties

WithUserProperties sets the user properties for the publish or subscribe.

type WithUsername

type WithUsername UsernameProvider

WithUsername sets the UsernameProvider that the session client uses to get the username for each connection.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL