README
ΒΆ
go-mqtt
Enhanced Golang MQTT client wrapping github.com/eclipse/paho.mqtt.golang with automatic resubscription on reconnect.
CHINESE README
Main Features
π Automatic Resubscription: Solves Paho client's missing auto-resubscribe feature on reconnect
β‘ OnConnect Callbacks: Subscription management via callback pattern
π Repeat Logic: Built-in re-attempt mechanism handling connection operations with backoff
β±οΈ Token Helpers: Simplified token status checking with timeout support
π οΈ Mate Ecosystem: Integrated with erero, must, rese to handle errors
π Structured Logging: Customizable logging interface with zap integration
Installation
go get github.com/go-xlan/go-mqtt
Requirements:
- Go 1.23.0 and above
- MQTT broker (e.g., EMQX, Mosquitto)
Quick Start
Basic Publish/Subscribe
This example demonstrates the core feature: creating publishing and subscribing clients with automatic resubscription on reconnect.
package main
import (
"encoding/json"
"math/rand/v2"
"time"
"github.com/go-xlan/go-mqtt/internal/utils"
"github.com/go-xlan/go-mqtt/mqttgo"
"github.com/yyle88/must"
"github.com/yyle88/neatjson/neatjsons"
"github.com/yyle88/rese"
"github.com/yyle88/zaplog"
)
func main() {
const mqttTopic = "mqtt-go-demo1-topic"
config := &mqttgo.Config{
BrokerServer: "ws://127.0.0.1:8083/mqtt",
Username: "username",
Password: "password",
OrderMatters: false,
}
client1 := rese.V1(mqttgo.NewClientWithCallback(config, utils.NewUUID(), mqttgo.NewCallback().
OnConnect(func(c mqttgo.Client, retryTimes uint64) (mqttgo.CallbackState, error) {
return mqttgo.CallbackSuccess, nil
}),
))
defer client1.Disconnect(500)
client2 := rese.V1(mqttgo.NewClientWithCallback(config, utils.NewUUID(), mqttgo.NewCallback().
OnConnect(func(c mqttgo.Client, retryTimes uint64) (mqttgo.CallbackState, error) {
token := c.Subscribe(mqttTopic, 1, func(client mqttgo.Client, message mqttgo.Message) {
zaplog.SUG.Debugln("subscribe-msg:", neatjsons.SxB(message.Payload()))
})
must.Same(rese.C1(mqttgo.CheckToken(token, time.Minute)), mqttgo.TokenStateSuccess)
return mqttgo.CallbackSuccess, nil
}),
))
defer client2.Disconnect(500)
type MessageType struct {
A string
B int
C float64
}
for i := 0; i < 10; i++ {
msg := &MessageType{
A: time.Now().String(),
B: i,
C: rand.Float64(),
}
contentBytes := rese.A1(json.Marshal(msg))
zaplog.SUG.Debugln("publish-msg:", neatjsons.SxB(contentBytes))
token := client1.Publish(mqttTopic, 1, false, contentBytes)
must.Same(rese.C1(mqttgo.CheckToken(token, time.Second*3)), mqttgo.TokenStateSuccess)
time.Sleep(time.Second)
}
}
β¬οΈ Source: demo1x/main.go
With Repeat Logic
This example shows how to use callback states to manage re-attempt patterns on subscription issues.
package main
import (
"encoding/json"
"math/rand/v2"
"time"
"github.com/go-xlan/go-mqtt/internal/utils"
"github.com/go-xlan/go-mqtt/mqttgo"
"github.com/pkg/errors"
"github.com/yyle88/erero"
"github.com/yyle88/must"
"github.com/yyle88/neatjson/neatjsons"
"github.com/yyle88/rese"
"github.com/yyle88/zaplog"
)
func main() {
const mqttTopic = "mqtt-go-demo2-topic"
config := &mqttgo.Config{
BrokerServer: "ws://127.0.0.1:8083/mqtt",
Username: "username",
Password: "password",
OrderMatters: false,
}
client1 := rese.V1(mqttgo.NewClientWithCallback(config, utils.NewUUID(), mqttgo.NewCallback().
OnConnect(func(c mqttgo.Client, retryTimes uint64) (mqttgo.CallbackState, error) {
if retryTimes > 10 {
return mqttgo.CallbackTimeout, nil
}
if rand.IntN(100) >= 10 {
return mqttgo.CallbackUnknown, erero.New("random-rate-not-success")
}
return mqttgo.CallbackSuccess, nil
}),
))
defer client1.Disconnect(500)
client2 := rese.V1(mqttgo.NewClientWithCallback(config, utils.NewUUID(), mqttgo.NewCallback().
OnConnect(func(c mqttgo.Client, retryTimes uint64) (mqttgo.CallbackState, error) {
token := c.Subscribe(mqttTopic, 1, func(client mqttgo.Client, message mqttgo.Message) {
zaplog.SUG.Debugln("subscribe-msg:", neatjsons.SxB(message.Payload()))
})
tokenState, err := mqttgo.WaitToken(token)
if err != nil {
return mqttgo.CallbackRetries, errors.WithMessage(err, "subscribe-is-wrong")
}
must.Same(tokenState, mqttgo.TokenStateSuccess)
return mqttgo.CallbackSuccess, nil
}),
))
defer client2.Disconnect(500)
type MessageType struct {
A string
B int
C float64
}
for i := 0; i < 100; i++ {
msg := &MessageType{
A: time.Now().String(),
B: i,
C: rand.Float64(),
}
contentBytes := rese.A1(json.Marshal(msg))
zaplog.SUG.Debugln("publish-msg:", neatjsons.SxB(contentBytes))
token := client1.Publish(mqttTopic, 1, false, contentBytes)
must.Same(rese.C1(mqttgo.CheckToken(token, time.Second*3)), mqttgo.TokenStateSuccess)
time.Sleep(time.Second)
}
}
β¬οΈ Source: demo2x/main.go
API Reference
Core Functions
NewClientWithCallback(config *Config, clientID string, callback *Callback) (mqtt.Client, error)
Creates MQTT client with automatic resubscription support via OnConnect callbacks.
Parameters:
config: MQTT broker configurationclientID: Unique client IDcallback: OnConnect callback handlers managing subscriptions
Returns: Connected MQTT client / error
NewClient(opts *mqtt.ClientOptions) (mqtt.Client, error)
Creates basic MQTT client with given options. Waits on connection success.
NewCallback() *Callback
Creates callback collection handling connection events.
(*Callback).OnConnect(handler func(client mqtt.Client, retryTimes uint64) (CallbackState, error)) *Callback
Registers callback to run on connect/reconnect. Supports chaining multiple handlers.
Token Utilities
CheckToken(token mqtt.Token, timeout time.Duration) (TokenState, error)
Checks token completion with timeout.
Returns:
TokenStateSuccess: Operation completedTokenStateTimeout: Exceeded timeoutTokenStateUnknown: Error occurred
WaitToken(token mqtt.Token) (TokenState, error)
Waits on token completion without timeout.
Configuration
type Config struct {
BrokerServer string // MQTT broker URL (tcp:// or ws://)
Username string // Authentication username
Password string // Authentication password
OrderMatters bool // Whether message order matters
}
Callback States
const (
CallbackSuccess CallbackState = "success" // Operation succeeded
CallbackRetries CallbackState = "retries" // Retry requested
CallbackTimeout CallbackState = "timeout" // Stop retrying
CallbackUnknown CallbackState = "unknown" // Unknown error, retry
)
Examples
TCP Connection
config := &mqttgo.Config{
BrokerServer: "tcp://localhost:1883",
Username: "username",
Password: "password",
}
WebSocket Connection
config := &mqttgo.Config{
BrokerServer: "ws://127.0.0.1:8083/mqtt",
Username: "username",
Password: "password",
}
Multiple Subscription Topics
callback := mqttgo.NewCallback().
OnConnect(func(c mqttgo.Client, retryTimes uint64) (mqttgo.CallbackState, error) {
topics := map[string]byte{
"topic/one": 1,
"topic/two": 1,
"topic/three": 2,
}
token := c.SubscribeMultiple(topics, messageHandler)
state, err := mqttgo.WaitToken(token)
if err != nil || state != mqttgo.TokenStateSuccess {
return mqttgo.CallbackRetries, err
}
return mqttgo.CallbackSuccess, nil
})
Custom Retry Logic
Timeout past 10 retries:
OnConnect(func(c mqttgo.Client, retryTimes uint64) (mqttgo.CallbackState, error) {
if retryTimes > 10 {
return mqttgo.CallbackTimeout, nil
}
token := c.Subscribe("topic", 1, handler)
state, err := mqttgo.CheckToken(token, time.Second*5)
if err != nil {
return mqttgo.CallbackRetries, err
}
return mqttgo.CallbackSuccess, nil
})
Immediate retry on failures:
OnConnect(func(c mqttgo.Client, retryTimes uint64) (mqttgo.CallbackState, error) {
token := c.Subscribe("topic", 1, handler)
if state, err := mqttgo.WaitToken(token); err != nil {
return mqttgo.CallbackRetries, err
}
return mqttgo.CallbackSuccess, nil
})
Custom Logger
type CustomLogger struct{}
func (l *CustomLogger) ErrorLog(msg string, fields ...zap.Field) {
// Your error logging logic
}
func (l *CustomLogger) DebugLog(msg string, fields ...zap.Field) {
// Your debug logging logic
}
func main() {
mqttgo.SetLog(&CustomLogger{})
// Create clients...
}
What makes go-mqtt different?
The Eclipse Paho MQTT client does not auto-resubscribe to topics following reconnection (Issue #22). This package solves that problem:
- OnConnect Callbacks: Executes subscription logic on each connect/reconnect
- Retry Mechanism: Auto retries failed subscriptions with 100ms backoff
- State Management: Simple callback states to manage retry patterns
- Mate Integration: Leverages
rese,must,ereroto write clean code
Running EMQX to Test
See internal/sketches/sketch1/README.md about Docker setup instructions.
Running Embedded MQTT
See internal/sketches/sketch2/README.md about embedded MQTT setup using github.com/mochi-mqtt/server/v2.
π License
MIT License. See LICENSE.
π€ Contributing
Contributions are welcome! Report bugs, suggest features, and contribute code:
- π Found a mistake? Open an issue on GitHub with reproduction steps
- π‘ Have a feature idea? Create an issue to discuss the suggestion
- π Documentation confusing? Report it so we can improve
- π Need new features? Share the use cases to help us understand requirements
- β‘ Performance issue? Help us optimize through reporting slow operations
- π§ Configuration problem? Ask questions about complex setups
- π’ Follow project progress? Watch the repo to get new releases and features
- π Success stories? Share how this package improved the workflow
- π¬ Feedback? We welcome suggestions and comments
π§ Development
New code contributions, follow this process:
- Fork: Fork the repo on GitHub (using the webpage UI).
- Clone: Clone the forked project (
git clone https://github.com/yourname/repo-name.git). - Navigate: Navigate to the cloned project (
cd repo-name) - Branch: Create a feature branch (
git checkout -b feature/xxx). - Code: Implement the changes with comprehensive tests
- Testing: (Golang project) Ensure tests pass (
go test ./...) and follow Go code style conventions - Documentation: Update documentation to support client-facing changes and use significant commit messages
- Stage: Stage changes (
git add .) - Commit: Commit changes (
git commit -m "Add feature xxx") ensuring backward compatible code - Push: Push to the branch (
git push origin feature/xxx). - PR: Open a merge request on GitHub (on the GitHub webpage) with detailed description.
Please ensure tests pass and include relevant documentation updates.
π Support
Welcome to contribute to this project via submitting merge requests and reporting issues.
Project Support:
- β Give GitHub stars if this project helps you
- π€ Share with teammates and (golang) programming friends
- π Write tech blogs about development tools and workflows - we provide content writing support
- π Join the ecosystem - committed to supporting open source and the (golang) development scene
Have Fun Coding with this package! πππ
GitHub Stars
Directories
ΒΆ
| Path | Synopsis |
|---|---|
|
internal
|
|
|
demos/demo1x
command
|
|
|
demos/demo2x
command
|
|
|
sketches/sketch1
command
|
|
|
sketches/sketch2
command
|
|
|
Package mqttgo wraps Eclipse Paho MQTT Golang client with enhanced reconnection handling Provides simplified API for MQTT operations with mate ecosystem integration Solves automatic resubscription on reconnect using OnConnect callback pattern
|
Package mqttgo wraps Eclipse Paho MQTT Golang client with enhanced reconnection handling Provides simplified API for MQTT operations with mate ecosystem integration Solves automatic resubscription on reconnect using OnConnect callback pattern |