Documentation
¶
Overview ¶
Package mqtt is a thread safe and context controlled MQTT 3.1.1 client library.
Index ¶
- Variables
- func KeepAlive(ctx context.Context, cli Client, interval, timeout time.Duration) error
- type BaseClient
- func (c *BaseClient) Close() error
- func (c *BaseClient) Connect(ctx context.Context, clientID string, opts ...ConnectOption) (sessionPresent bool, err error)
- func (c *BaseClient) Disconnect(ctx context.Context) error
- func (c *BaseClient) Done() <-chan struct{}
- func (c *BaseClient) Err() error
- func (c *BaseClient) Handle(handler Handler)
- func (c *BaseClient) Ping(ctx context.Context) error
- func (c *BaseClient) Publish(ctx context.Context, message *Message) error
- func (c *BaseClient) SetErrorOnce(err error)
- func (c *BaseClient) Stats() BaseStats
- func (c *BaseClient) Subscribe(ctx context.Context, subs ...Subscription) ([]Subscription, error)
- func (c *BaseClient) Unsubscribe(ctx context.Context, subs ...string) error
- func (c *BaseClient) ValidateMessage(message *Message) error
- type BaseClientStoreDialer
- type BaseStats
- type Client
- type ClientCloser
- type Closer
- type ConnState
- type ConnectOption
- type ConnectOptions
- type ConnectionError
- type ConnectionReturnCode
- type DialOption
- type DialOptions
- type Dialer
- type DialerFunc
- type Error
- type ErrorWithRetry
- type Handler
- type HandlerFunc
- type Message
- type NoContextDialer
- type NoContextDialerIface
- type ProtocolLevel
- type QoS
- type ReconnectClient
- type ReconnectOption
- type ReconnectOptions
- type RequestTimeoutError
- type RetryClient
- func (c *RetryClient) Client() *BaseClient
- func (c *RetryClient) Connect(ctx context.Context, clientID string, opts ...ConnectOption) (sessionPresent bool, err error)
- func (c *RetryClient) Disconnect(ctx context.Context) error
- func (c *RetryClient) Handle(handler Handler)
- func (c *RetryClient) Ping(ctx context.Context) error
- func (c *RetryClient) Publish(ctx context.Context, message *Message) error
- func (c *RetryClient) Resubscribe(ctx context.Context)
- func (c *RetryClient) Retry(ctx context.Context)
- func (c *RetryClient) SetClient(ctx context.Context, cli *BaseClient)
- func (c *RetryClient) Stats() RetryStats
- func (c *RetryClient) Subscribe(ctx context.Context, subs ...Subscription) ([]Subscription, error)
- func (c *RetryClient) Unsubscribe(ctx context.Context, topics ...string) error
- type RetryStats
- type Retryer
- type ServeAsync
- type ServeMux
- type Subscription
- type URLDialer
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrClosedClient = errors.New("operation on closed client")
ErrClosedClient means operation was requested on closed client.
var ErrClosedTransport = errors.New("read/write on closed transport")
ErrClosedTransport means that the underlying connection is closed.
var ErrConnectionFailed = errors.New("connection failed")
ErrConnectionFailed means the connection is not established.
var ErrInvalidPacket = errors.New("invalid packet")
ErrInvalidPacket means that an invalid message is arrived from the broker.
var ErrInvalidPacketLength = errors.New("invalid packet length")
ErrInvalidPacketLength means that an invalid length of the message is arrived.
var ErrInvalidQoS = errors.New("invalid QoS")
ErrInvalidQoS means the QoS value is not allowed.
var ErrInvalidRune = errors.New("invalid rune in UTF-8 string")
ErrInvalidRune means that the string has a rune not allowed in MQTT.
var ErrInvalidSubAck = errors.New("invalid SUBACK")
ErrInvalidSubAck means that the incomming SUBACK packet is inconsistent with the request.
var ErrInvalidTopicFilter = errors.New("invalid topic filter")
ErrInvalidTopicFilter means that the topic filter string is invalid.
var ErrKeepAliveDisabled = errors.New("keep alive disabled")
ErrKeepAliveDisabled is returned if Runned on keep alive disabled connection.
var ErrNotConnected = errors.New("not connected")
ErrNotConnected is returned if a function is called before Connect.
var ErrPayloadLenExceeded = errors.New("payload length exceeded")
ErrPayloadLenExceeded means the payload length is too large.
var ErrPingTimeout = errors.New("ping timeout")
ErrPingTimeout is returned on ping response timeout.
var ErrUnsupportedProtocol = errors.New("unsupported protocol")
ErrUnsupportedProtocol means that the specified scheme in the URL is not supported.
Functions ¶
Types ¶
type BaseClient ¶
type BaseClient struct {
// Transport is an underlying connection. Typically net.Conn.
Transport io.ReadWriteCloser
// ConnState is called if the connection state is changed.
ConnState func(ConnState, error)
// MaxPayloadLen is a maximum allowed length of message payload.
// 0 means unlimited. (It will panic if exceeds protocol maximum message length (256MB).)
MaxPayloadLen int
// contains filtered or unexported fields
}
BaseClient is a low layer MQTT client. Zero values with valid underlying Transport is a valid BaseClient.
func DialContext ¶ added in v0.14.0
func DialContext(ctx context.Context, urlStr string, opts ...DialOption) (*BaseClient, error)
DialContext creates MQTT client using URL string.
func (*BaseClient) Connect ¶
func (c *BaseClient) Connect(ctx context.Context, clientID string, opts ...ConnectOption) (sessionPresent bool, err error)
Connect to the broker.
func (*BaseClient) Disconnect ¶
func (c *BaseClient) Disconnect(ctx context.Context) error
Disconnect from the broker.
func (*BaseClient) Done ¶ added in v0.1.0
func (c *BaseClient) Done() <-chan struct{}
Done is a channel to signal connection close.
func (*BaseClient) Err ¶ added in v0.1.0
func (c *BaseClient) Err() error
Err returns connection error.
func (*BaseClient) Handle ¶ added in v0.1.0
func (c *BaseClient) Handle(handler Handler)
Handle registers the message handler.
func (*BaseClient) Publish ¶
func (c *BaseClient) Publish(ctx context.Context, message *Message) error
Publish a message to the broker. ID field of the message is filled if zero.
func (*BaseClient) SetErrorOnce ¶ added in v0.11.1
func (c *BaseClient) SetErrorOnce(err error)
SetErrorOnce sets client error value if not yet set.
func (*BaseClient) Stats ¶ added in v0.19.0
func (c *BaseClient) Stats() BaseStats
Stats returns base client stats.
func (*BaseClient) Subscribe ¶
func (c *BaseClient) Subscribe(ctx context.Context, subs ...Subscription) ([]Subscription, error)
Subscribe topics.
func (*BaseClient) Unsubscribe ¶
func (c *BaseClient) Unsubscribe(ctx context.Context, subs ...string) error
Unsubscribe topics.
func (*BaseClient) ValidateMessage ¶ added in v0.10.0
func (c *BaseClient) ValidateMessage(message *Message) error
ValidateMessage validates given message.
type BaseClientStoreDialer ¶ added in v0.13.0
type BaseClientStoreDialer struct {
// Dialer is a wrapped dialer. Valid Dialer must be set before use.
Dialer
// contains filtered or unexported fields
}
BaseClientStoreDialer is a dialer wrapper which stores the latest BaseClient.
Example ¶
dialer := &BaseClientStoreDialer{Dialer: &URLDialer{URL: "mqtt://localhost:1883"}}
cli, err := NewReconnectClient(dialer)
if err != nil {
panic(err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if _, err := cli.Connect(ctx, "TestClient"); err != nil {
panic(err)
}
defer cli.Disconnect(context.Background())
// Publish asynchronously
if err := cli.Publish(
ctx, &Message{Topic: "test", QoS: QoS1, Payload: []byte("async")},
); err != nil {
panic(err)
}
// Publish synchronously
if err := dialer.BaseClient().Publish(
ctx, &Message{Topic: "test", QoS: QoS1, Payload: []byte("sync")},
); err != nil {
panic(err)
}
func (*BaseClientStoreDialer) BaseClient ¶ added in v0.13.0
func (d *BaseClientStoreDialer) BaseClient() *BaseClient
BaseClient returns latest BaseClient created by the dialer. It returns nil before the first call of Dial.
func (*BaseClientStoreDialer) DialContext ¶ added in v0.14.0
func (d *BaseClientStoreDialer) DialContext(ctx context.Context) (*BaseClient, error)
DialContext creates a new BaseClient.
type BaseStats ¶ added in v0.19.0
type BaseStats struct {
// Recent ping delay.
PingDelayRecent time.Duration
// Maximum ping delay.
PingDelayMax time.Duration
// Minimum ping delay.
PingDelayMin time.Duration
// Count of ping error.
CountPingError int
}
BaseStats stores base client statistics.
type Client ¶
type Client interface {
Connect(ctx context.Context, clientID string, opts ...ConnectOption) (sessionPresent bool, err error)
Disconnect(ctx context.Context) error
Publish(ctx context.Context, message *Message) error
Subscribe(ctx context.Context, subs ...Subscription) ([]Subscription, error)
Unsubscribe(ctx context.Context, subs ...string) error
Ping(ctx context.Context) error
Handle(Handler)
}
Client is the interface of MQTT client.
type ClientCloser ¶ added in v0.1.0
ClientCloser groups Client and Closer interface
type ConnState ¶
type ConnState int
ConnState represents the status of MQTT connection.
type ConnectOption ¶
type ConnectOption func(*ConnectOptions) error
ConnectOption sets option for Connect.
func WithCleanSession ¶
func WithCleanSession(cleanSession bool) ConnectOption
WithCleanSession sets clean session flag.
func WithKeepAlive ¶ added in v0.1.0
func WithKeepAlive(interval uint16) ConnectOption
WithKeepAlive sets keep alive interval in seconds.
func WithProtocolLevel ¶ added in v0.3.0
func WithProtocolLevel(level ProtocolLevel) ConnectOption
WithProtocolLevel sets protocol level.
func WithUserNamePassword ¶
func WithUserNamePassword(userName, password string) ConnectOption
WithUserNamePassword sets plain text auth information used in Connect.
type ConnectOptions ¶
type ConnectOptions struct {
UserName string
Password string
CleanSession bool
KeepAlive uint16
Will *Message
ProtocolLevel ProtocolLevel
}
ConnectOptions represents options for Connect.
type ConnectionError ¶ added in v0.4.4
type ConnectionError struct {
Err error
Code ConnectionReturnCode
}
ConnectionError ia a error storing connection return code.
func (*ConnectionError) Error ¶ added in v0.4.4
func (e *ConnectionError) Error() string
func (*ConnectionError) Unwrap ¶ added in v0.4.4
func (e *ConnectionError) Unwrap() error
Unwrap returns base error of ConnectionError. (for Go1.13 error unwrapping.)
type ConnectionReturnCode ¶
type ConnectionReturnCode byte
ConnectionReturnCode represents return code of connect request.
const ( ConnectionAccepted ConnectionReturnCode = 0 UnacceptableProtocolVersion ConnectionReturnCode = 1 IdentifierRejected ConnectionReturnCode = 2 BadUserNameOrPassword ConnectionReturnCode = 4 NotAuthorized ConnectionReturnCode = 5 )
Connection acceptance/rejection code.
func (ConnectionReturnCode) String ¶
func (c ConnectionReturnCode) String() string
type DialOption ¶
type DialOption func(*DialOptions) error
DialOption sets option for Dial.
func WithConnStateHandler ¶ added in v0.6.0
func WithConnStateHandler(handler func(ConnState, error)) DialOption
WithConnStateHandler sets connection state change handler.
func WithMaxPayloadLen ¶ added in v0.10.0
func WithMaxPayloadLen(l int) DialOption
WithMaxPayloadLen sets maximum payload length of the BaseClient.
func WithTLSCertFiles ¶ added in v0.9.0
func WithTLSCertFiles(host, caFile, certFile, privateKeyFile string) DialOption
WithTLSCertFiles loads certificate files
func WithTLSConfig ¶
func WithTLSConfig(config *tls.Config) DialOption
WithTLSConfig sets TLS configuration.
type DialOptions ¶
type DialOptions struct {
Dialer *net.Dialer
TLSConfig *tls.Config
ConnState func(ConnState, error)
MaxPayloadLen int
}
DialOptions stores options for Dial.
type Dialer ¶ added in v0.1.0
type Dialer interface {
DialContext(context.Context) (*BaseClient, error)
}
Dialer is an interface to create connection.
type DialerFunc ¶ added in v0.4.1
type DialerFunc func(ctx context.Context) (*BaseClient, error)
DialerFunc type is an adapter to use functions as MQTT connection dialer.
func (DialerFunc) DialContext ¶ added in v0.14.0
func (d DialerFunc) DialContext(ctx context.Context) (*BaseClient, error)
DialContext calls d().
type Error ¶ added in v0.6.0
Error records a failed parsing.
type ErrorWithRetry ¶ added in v0.12.0
type ErrorWithRetry interface {
error
Retry(context.Context, *BaseClient) error
}
ErrorWithRetry is a error with packets which should be retransmitted.
type HandlerFunc ¶
type HandlerFunc func(*Message)
HandlerFunc type is an adapter to use functions as MQTT message handler.
type NoContextDialer ¶ added in v0.14.0
type NoContextDialer struct {
NoContextDialerIface
}
NoContextDialer is a wrapper to use Dialer of mqtt-go<1.14 as mqtt-go>=1.14 Dialer.
WARNING: passed context is ignored by NoContextDialer. Make sure timeout is handled inside NoContextDialer.
Example ¶
d := oldDialer()
cli, err := NewReconnectClient(&NoContextDialer{d})
if err != nil {
fmt.Println("error:", err.Error())
return
}
cli.Handle(HandlerFunc(func(*Message) {}))
fmt.Println("ok")
Output: ok
func (*NoContextDialer) DialContext ¶ added in v0.14.0
func (d *NoContextDialer) DialContext(context.Context) (*BaseClient, error)
DialContext wraps Dial without context.
WARNING: passed context is ignored by NoContextDialer. Make sure timeout is handled inside NoContextDialer.
type NoContextDialerIface ¶ added in v0.14.0
type NoContextDialerIface interface {
Dial() (*BaseClient, error)
}
NoContextDialerIface is a Dialer interface of mqtt-go<1.14.
type ProtocolLevel ¶ added in v0.3.0
type ProtocolLevel byte
ProtocolLevel represents MQTT protocol level.
const ( ProtocolLevel3 ProtocolLevel = 0x03 // MQTT 3.1 ProtocolLevel4 ProtocolLevel = 0x04 // MQTT 3.1.1 (default) )
ProtocolLevel values.
type ReconnectClient ¶ added in v0.15.0
ReconnectClient is a Client with reconnect and retry features.
func NewReconnectClient ¶ added in v0.1.0
func NewReconnectClient(dialer Dialer, opts ...ReconnectOption) (ReconnectClient, error)
NewReconnectClient creates a MQTT client with re-connect/re-publish/re-subscribe features.
type ReconnectOption ¶ added in v0.4.0
type ReconnectOption func(*ReconnectOptions) error
ReconnectOption sets option for Connect.
func WithAlwaysResubscribe ¶ added in v0.18.0
func WithAlwaysResubscribe(always bool) ReconnectOption
WithAlwaysResubscribe enables or disables re-subscribe on reconnect. Default value is false. This option can be used to ensure all subscriptions are restored even if the server is buggy.
func WithPingInterval ¶ added in v0.4.0
func WithPingInterval(interval time.Duration) ReconnectOption
WithPingInterval sets ping request interval. Default value is KeepAlive value set by ConnectOption.
func WithReconnectWait ¶ added in v0.4.0
func WithReconnectWait(base, max time.Duration) ReconnectOption
WithReconnectWait sets parameters of incremental reconnect wait.
func WithRetryClient ¶ added in v0.16.0
func WithRetryClient(cli *RetryClient) ReconnectOption
WithRetryClient sets RetryClient. Default value is zero RetryClient.
func WithTimeout ¶ added in v0.4.0
func WithTimeout(timeout time.Duration) ReconnectOption
WithTimeout sets timeout duration of server response. Default value is PingInterval.
type ReconnectOptions ¶ added in v0.4.0
type ReconnectOptions struct {
ConnectOptions []ConnectOption
Timeout time.Duration
ReconnectWaitBase time.Duration
ReconnectWaitMax time.Duration
PingInterval time.Duration
RetryClient *RetryClient
AlwaysResubscribe bool
}
ReconnectOptions represents options for Connect.
type RequestTimeoutError ¶ added in v0.16.0
type RequestTimeoutError struct {
// contains filtered or unexported fields
}
RequestTimeoutError is a context deadline exceeded error caused by RetryClient.ResponseTimeout.
func (*RequestTimeoutError) Error ¶ added in v0.16.0
func (e *RequestTimeoutError) Error() string
Error implements error.
type RetryClient ¶ added in v0.1.0
type RetryClient struct {
// Maximum duration to wait for acknoledge response.
// Messages with QoS1 and QoS2 will be retried.
ResponseTimeout time.Duration
// Directly publish QoS0 messages without queuing.
// It will cause inorder of the messages but performance may be increased.
DirectlyPublishQoS0 bool
// Callback to receive background errors on raw message publish/subscribe operations.
OnError func(error)
// contains filtered or unexported fields
}
RetryClient queues unacknowledged messages and retry on reconnect.
func (*RetryClient) Client ¶ added in v0.11.1
func (c *RetryClient) Client() *BaseClient
Client returns the base client.
func (*RetryClient) Connect ¶ added in v0.1.0
func (c *RetryClient) Connect(ctx context.Context, clientID string, opts ...ConnectOption) (sessionPresent bool, err error)
Connect to the broker.
func (*RetryClient) Disconnect ¶ added in v0.4.3
func (c *RetryClient) Disconnect(ctx context.Context) error
Disconnect from the broker.
func (*RetryClient) Handle ¶ added in v0.1.0
func (c *RetryClient) Handle(handler Handler)
Handle registers the message handler.
func (*RetryClient) Ping ¶ added in v0.4.3
func (c *RetryClient) Ping(ctx context.Context) error
Ping to the broker.
func (*RetryClient) Publish ¶ added in v0.1.0
func (c *RetryClient) Publish(ctx context.Context, message *Message) error
Publish tries to publish the message and immediately returns. If it is not acknowledged to be published, the message will be queued.
func (*RetryClient) Resubscribe ¶ added in v0.1.0
func (c *RetryClient) Resubscribe(ctx context.Context)
Resubscribe subscribes all established subscriptions.
func (*RetryClient) Retry ¶ added in v0.4.7
func (c *RetryClient) Retry(ctx context.Context)
Retry all queued publish/subscribe requests.
func (*RetryClient) SetClient ¶ added in v0.1.0
func (c *RetryClient) SetClient(ctx context.Context, cli *BaseClient)
SetClient sets the new BaseClient. Call Retry() and Resubscribe() to process queued messages and subscriptions. The BaseClient must be unconnected when it is passed to the RetryClient.
func (*RetryClient) Stats ¶ added in v0.15.0
func (c *RetryClient) Stats() RetryStats
Stats returns retry stats.
func (*RetryClient) Subscribe ¶ added in v0.1.0
func (c *RetryClient) Subscribe(ctx context.Context, subs ...Subscription) ([]Subscription, error)
Subscribe tries to subscribe the topic and immediately return nil. If it is not acknowledged to be subscribed, the request will be queued. First return value ([]Subscription) is always nil.
func (*RetryClient) Unsubscribe ¶ added in v0.4.3
func (c *RetryClient) Unsubscribe(ctx context.Context, topics ...string) error
Unsubscribe tries to unsubscribe the topic and immediately return nil. If it is not acknowledged to be unsubscribed, the request will be queued.
type RetryStats ¶ added in v0.15.0
type RetryStats struct {
// Number of queued tasks.
QueuedTasks int
// Number of queued messages and subscriptions.
QueuedRetries int
// Total number of proceeded tasks.
TotalTasks int
// Total number of retries.
TotalRetries int
// Count of SetClient.
CountSetClient int
// Count of Connect.
CountConnect int
// Count of error on Connect.
CountConnectError int
}
RetryStats stores retry statistics.
type Retryer ¶ added in v0.15.0
type Retryer interface {
// SetClient sets the new BaseClient.
// Call Retry() and Resubscribe() to process queued messages and subscriptions.
// The BaseClient must be unconnected when it is passed to the RetryClient.
SetClient(ctx context.Context, cli *BaseClient)
// Client returns the base client.
Client() *BaseClient
// Resubscribe subscribes all established subscriptions.
Resubscribe(ctx context.Context)
// Retry all queued publish/subscribe requests.
Retry(ctx context.Context)
// Stat returns retry stats.
Stats() RetryStats
}
Retryer is an interface to control message retrying.
type ServeAsync ¶ added in v0.7.0
type ServeAsync struct {
// Handler is an underlying handler.
// Handler.Serve() will be called asynchronously.
Handler
}
ServeAsync is a MQTT message handler to process messages asynchronously.
func (*ServeAsync) Serve ¶ added in v0.7.0
func (m *ServeAsync) Serve(message *Message)
Serve the message in a new goroutine.
type ServeMux ¶
type ServeMux struct {
// contains filtered or unexported fields
}
ServeMux is a MQTT message handler multiplexer. The idea is very similar to http.ServeMux.
func (*ServeMux) HandleFunc ¶
HandleFunc registers the handler function for the given pattern.
type Subscription ¶
Subscription represents MQTT subscription target.
type URLDialer ¶ added in v0.1.0
type URLDialer struct {
URL string
Options []DialOption
}
URLDialer is a Dialer using URL string.
func (*URLDialer) DialContext ¶ added in v0.14.0
func (d *URLDialer) DialContext(ctx context.Context) (*BaseClient, error)
DialContext creates connection using its values.
Source Files
¶
- atomic.go
- client.go
- conn.go
- connack.go
- connect.go
- dialer.go
- disconnect.go
- error.go
- filter.go
- keepalive.go
- message.go
- mqtt.go
- packet.go
- pingreq.go
- pingresp.go
- puback.go
- pubcomp.go
- publish.go
- pubrec.go
- pubrel.go
- reconnclient.go
- retryclient.go
- serve.go
- serveasync.go
- servemux.go
- suback.go
- subscribe.go
- subscriptions.go
- uniqid.go
- unsuback.go
- unsubscribe.go
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
mqtts-client-cert
command
|
|
|
wss-presign-url
command
|
|
|
internal
|
|
|
filteredpipe
Package filteredpipe provides pipes with interceptor for testing.
|
Package filteredpipe provides pipes with interceptor for testing. |
|
Package mockmqtt provides simple standalone mock of mqtt.Client.
|
Package mockmqtt provides simple standalone mock of mqtt.Client. |
|
paho
module
|