Documentation
¶
Overview ¶
Package gmqtt provides an MQTT v3.1.1 server library.
Example ¶
see /examples for more details.
ln, err := net.Listen("tcp", ":1883")
if err != nil {
fmt.Println(err.Error())
return
}
ws := &WsServer{
Server: &http.Server{Addr: ":8080"},
Path: "/",
}
l, _ := zap.NewProduction()
srv := NewServer(
WithTCPListener(ln),
WithWebsocketServer(ws),
// add config
WithConfig(DefaultConfig),
// add plugins
// WithPlugin(prometheus.New(&http.Server{Addr: ":8082"}, "/metrics")),
// add Hook
WithHook(Hooks{
OnConnect: func(ctx context.Context, client Client) (code uint8) {
return packets.CodeAccepted
},
OnSubscribe: func(ctx context.Context, client Client, topic packets.Topic) (qos uint8) {
fmt.Println("register onSubscribe callback")
return packets.QOS_1
},
}),
// add logger
WithLogger(l),
)
srv.Run()
fmt.Println("started...")
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
<-signalCh
srv.Stop(context.Background())
fmt.Println("stopped")
Index ¶
- Constants
- Variables
- func LoggerWithField(fields ...zap.Field) *zap.Logger
- func NewMessage(topic string, payload []byte, qos uint8, opts ...msgOptions) packets.Message
- func NewServer(opts ...Options) *server
- func Retained(retained bool) msgOptions
- type Client
- type ClientOptionsReader
- type ClientStats
- type Config
- type DeliveryMode
- type HookWrapper
- type Hooks
- type MessageStats
- type OnAccept
- type OnAcceptWrapper
- type OnAcked
- type OnAckedWrapper
- type OnClose
- type OnCloseWrapper
- type OnConnect
- type OnConnectWrapper
- type OnConnected
- type OnConnectedWrapper
- type OnDeliver
- type OnDeliverWrapper
- type OnMsgArrived
- type OnMsgArrivedWrapper
- type OnMsgDropped
- type OnMsgDroppedWrapper
- type OnSessionCreated
- type OnSessionCreatedWrapper
- type OnSessionResumed
- type OnSessionResumedWrapper
- type OnSessionTerminated
- type OnSessionTerminatedWrapper
- type OnStop
- type OnStopWrapper
- type OnSubscribe
- type OnSubscribeWrapper
- type OnSubscribed
- type OnSubscribedWrapper
- type OnUnsubscribed
- type OnUnsubscribedWrapper
- type Options
- type PacketBytes
- type PacketCount
- type PacketStats
- type Plugable
- type PublishService
- type Server
- type ServerStats
- type SessionStats
- type SessionStatsManager
- type SessionTerminatedReason
- type StatsManager
- type WsServer
Examples ¶
Constants ¶
const ( Connecting = iota Connected Switiching Disconnected )
Client status
const ( DefaultMsgRouterLen = 4096 DefaultRegisterLen = 2048 DefaultUnRegisterLen = 2048 )
Default configration
Variables ¶
var ( ErrInvalStatus = errors.New("invalid connection status") ErrConnectTimeOut = errors.New("connect time out") )
Error
var DefaultConfig = Config{ RetryInterval: 20 * time.Second, RetryCheckInterval: 20 * time.Second, SessionExpiryInterval: 0 * time.Second, SessionExpiryCheckInterval: 0 * time.Second, QueueQos0Messages: true, MaxInflight: 32, MaxAwaitRel: 100, MaxMsgQueue: 1000, DeliveryMode: OnlyOnce, MsgRouterLen: DefaultMsgRouterLen, RegisterLen: DefaultRegisterLen, UnregisterLen: DefaultUnRegisterLen, }
DefaultConfig default config used by NewServer()
var ( // ErrInvalWsMsgType [MQTT-6.0.0-1] ErrInvalWsMsgType = errors.New("invalid websocket message type") )
Functions ¶
func LoggerWithField ¶
LoggerWithField add fields to a new logger. Plugins can use this method to add plugin name field.
func NewMessage ¶
NewMessage creates a message for publish service.
Types ¶
type Client ¶
type Client interface {
// OptionsReader returns ClientOptionsReader for reading options data.
OptionsReader() ClientOptionsReader
// IsConnected returns whether the client is connected.
IsConnected() bool
// ConnectedAt returns the connected time
ConnectedAt() time.Time
// DisconnectedAt return the disconnected time
DisconnectedAt() time.Time
// Close closes the client connection. The returned channel will be closed after unregister process has been done
Close() <-chan struct{}
GetSessionStatsManager() SessionStatsManager
}
Client represent
type ClientOptionsReader ¶
type ClientOptionsReader interface {
ClientID() string
Username() string
Password() string
KeepAlive() uint16
CleanSession() bool
WillFlag() bool
WillRetain() bool
WillQos() uint8
WillTopic() string
WillPayload() []byte
LocalAddr() net.Addr
RemoteAddr() net.Addr
}
ClientOptionsReader is mainly used in callback functions.
type ClientStats ¶
type ClientStats struct {
ConnectedTotal uint64
DisconnectedTotal uint64
// ActiveCurrent is the number of current active session.
ActiveCurrent uint64
// InactiveCurrent is the number of current inactive session.
InactiveCurrent uint64
// ExpiredTotal is the number of expired session.
ExpiredTotal uint64
}
ClientStats provides the statistics of client connections.
type Config ¶
type Config struct {
RetryInterval time.Duration
RetryCheckInterval time.Duration
SessionExpiryInterval time.Duration
SessionExpiryCheckInterval time.Duration
QueueQos0Messages bool
MaxInflight int
MaxAwaitRel int
MaxMsgQueue int
DeliveryMode DeliveryMode
MsgRouterLen int
RegisterLen int
UnregisterLen int
}
type DeliveryMode ¶
type DeliveryMode int
const ( Overlap DeliveryMode = 0 OnlyOnce DeliveryMode = 1 )
type HookWrapper ¶
type HookWrapper struct {
OnConnectWrapper OnConnectWrapper
OnConnectedWrapper OnConnectedWrapper
OnSessionCreatedWrapper OnSessionCreatedWrapper
OnSessionResumedWrapper OnSessionResumedWrapper
OnSessionTerminatedWrapper OnSessionTerminatedWrapper
OnSubscribeWrapper OnSubscribeWrapper
OnSubscribedWrapper OnSubscribedWrapper
OnUnsubscribedWrapper OnUnsubscribedWrapper
OnMsgArrivedWrapper OnMsgArrivedWrapper
OnAckedWrapper OnAckedWrapper
OnMsgDroppedWrapper OnMsgDroppedWrapper
OnDeliverWrapper OnDeliverWrapper
OnCloseWrapper OnCloseWrapper
OnAcceptWrapper OnAcceptWrapper
OnStopWrapper OnStopWrapper
}
HookWrapper groups all hook wrappers function
type MessageStats ¶
type MessageStats struct {
Qos0 struct {
DroppedTotal uint64
ReceivedTotal uint64
SentTotal uint64
}
Qos1 struct {
DroppedTotal uint64
ReceivedTotal uint64
SentTotal uint64
}
Qos2 struct {
DroppedTotal uint64
ReceivedTotal uint64
SentTotal uint64
}
QueuedCurrent uint64
}
MessageStats represents the statistics of PUBLISH packet, separated by QOS.
type OnAccept ¶
OnAccept 会在新连接建立的时候调用,只在TCP server中有效。如果返回false,则会直接关闭连接
OnAccept will be called after a new connection established in TCP server. If returns false, the connection will be close directly.
type OnAcceptWrapper ¶
type OnAcked ¶
OnAcked 当客户端对qos1或qos2返回确认的时候调用
OnAcked will be called when receiving the ack packet for a published qos1 or qos2 message.
type OnAckedWrapper ¶
type OnClose ¶
OnClose tcp连接关闭之后触发
OnClose will be called after the tcp connection of the client has been closed
type OnCloseWrapper ¶
type OnConnect ¶
OnConnect 当合法的connect报文到达的时候触发,返回connack中响应码
OnConnect will be called when a valid connect packet is received. It returns the code of the connack packet
type OnConnectWrapper ¶
type OnConnected ¶
OnConnected 当客户端成功连接后触发
OnConnected will be called when a mqtt client connect successfully.
type OnConnectedWrapper ¶
type OnConnectedWrapper func(OnConnected) OnConnected
type OnDeliverWrapper ¶
type OnMsgArrived ¶
OnMsgArrived 返回接收到的publish报文是否允许转发,返回false则该报文不会被继续转发
OnMsgArrived returns whether the publish packet will be delivered or not. If returns false, the packet will not be delivered to any clients.
type OnMsgArrivedWrapper ¶
type OnMsgArrivedWrapper func(OnMsgArrived) OnMsgArrived
type OnMsgDroppedWrapper ¶
type OnMsgDroppedWrapper func(OnMsgDropped) OnMsgDropped
type OnSessionCreated ¶
OnSessionCreated 新建session时触发
OnSessionCreated will be called when session created.
type OnSessionCreatedWrapper ¶
type OnSessionCreatedWrapper func(OnSessionCreated) OnSessionCreated
type OnSessionResumed ¶
OnSessionResumed 恢复session时触发
OnSessionResumed will be called when session resumed.
type OnSessionResumedWrapper ¶
type OnSessionResumedWrapper func(OnSessionResumed) OnSessionResumed
type OnSessionTerminated ¶
type OnSessionTerminated func(ctx context.Context, client Client, reason SessionTerminatedReason)
OnSessionTerminated session 下线时触发
OnSessionTerminated will be called when session terminated.
type OnSessionTerminatedWrapper ¶
type OnSessionTerminatedWrapper func(OnSessionTerminated) OnSessionTerminated
type OnStopWrapper ¶
type OnSubscribe ¶
OnSubscribe 返回topic允许订阅的最高QoS等级
OnSubscribe returns the maximum available QoS for the topic:
0x00 - Success - Maximum QoS 0 0x01 - Success - Maximum QoS 1 0x02 - Success - Maximum QoS 2 0x80 - Failure
type OnSubscribeWrapper ¶
type OnSubscribeWrapper func(OnSubscribe) OnSubscribe
type OnSubscribed ¶
OnSubscribed will be called after the topic subscribe successfully
type OnSubscribedWrapper ¶
type OnSubscribedWrapper func(OnSubscribed) OnSubscribed
type OnUnsubscribed ¶
OnUnsubscribed will be called after the topic has been unsubscribed
type OnUnsubscribedWrapper ¶
type OnUnsubscribedWrapper func(OnUnsubscribed) OnUnsubscribed
type Options ¶
type Options func(srv *server)
func WithLogger ¶
func WithPlugin ¶
WithPlugin set plugin(s) of the server.
func WithTCPListener ¶
WithTCPListener set tcp listener(s) of the server. Default listen on :1883.
func WithWebsocketServer ¶
WithWebsocketServer set websocket server(s) of the server.
type PacketBytes ¶
type PacketBytes struct {
Connect uint64
Connack uint64
Disconnect uint64
Pingreq uint64
Pingresp uint64
Puback uint64
Pubcomp uint64
Publish uint64
Pubrec uint64
Pubrel uint64
Suback uint64
Subscribe uint64
Unsuback uint64
Unsubscribe uint64
}
PacketBytes represents total bytes of each packet type have been received or sent.
type PacketCount ¶
type PacketCount struct {
Connect uint64
Connack uint64
Disconnect uint64
Pingreq uint64
Pingresp uint64
Puback uint64
Pubcomp uint64
Publish uint64
Pubrec uint64
Pubrel uint64
Suback uint64
Subscribe uint64
Unsuback uint64
Unsubscribe uint64
}
PacketCount represents total number of each packet type have been received or sent.
type PacketStats ¶
type PacketStats struct {
BytesReceived *PacketBytes
ReceivedTotal *PacketCount
BytesSent *PacketBytes
SentTotal *PacketCount
}
PacketStats represents the statistics of MQTT Packet.
type Plugable ¶
type Plugable interface {
// Load will be called in server.Run(). If return error, the server will panic.
Load(service Server) error
// Unload will be called when the server is shutdown, the return error is only for logging
Unload() error
// HookWrapper returns all hook wrappers that used by the plugin.
// Return a empty wrapper if the plugin does not need any hooks
HookWrapper() HookWrapper
// Name return the plugin name
Name() string
}
Plugable is the interface need to be implemented for every plugins.
type PublishService ¶
type PublishService interface {
// Publish publish a message to broker.
// Calling this method will not trigger OnMsgArrived hook.
Publish(message packets.Message)
// PublishToClient publish a message to a specific client.
// If match sets to true, the message will send to the client
// only if the client is subscribed to a topic that matches the message.
// If match sets to false, the message will send to the client directly even
// there are no matched subscriptions.
// Calling this method will not trigger OnMsgArrived hook.
PublishToClient(clientID string, message packets.Message, match bool)
}
PublishService provides the ability to publish messages to the broker.
type Server ¶
type Server interface {
// SubscriptionStore returns the subscription.Store.
SubscriptionStore() subscription.Store
// RetainedStore returns the retained.Store.
RetainedStore() retained.Store
// PublishService returns the PublishService
PublishService() PublishService
// Client return the client specified by clientID.
Client(clientID string) Client
// GetConfig returns the config of the server
GetConfig() Config
// GetStatsManager returns StatsManager
GetStatsManager() StatsManager
}
Server interface represents a mqtt server instance.
type ServerStats ¶
type ServerStats struct {
PacketStats *PacketStats
ClientStats *ClientStats
MessageStats *MessageStats
SubscriptionStats *subscription.Stats
}
ServerStats is the collection of global statistics.
type SessionStats ¶
type SessionStats struct {
// InflightCurrent, the current length of the inflight queue.
InflightCurrent uint64
// AwaitRelCurrent, the current length of the awaitRel queue.
AwaitRelCurrent uint64
MessageStats
}
SessionStats the collection of statistics of each session.
type SessionStatsManager ¶
type SessionStatsManager interface {
// GetStats return the session statistics
GetStats() *SessionStats
// contains filtered or unexported methods
}
SessionStatsManager interface provides the ability to access the statistics of the session
type SessionTerminatedReason ¶
type SessionTerminatedReason byte
const ( NormalTermination SessionTerminatedReason = iota ConflictTermination ExpiredTermination )
type StatsManager ¶
type StatsManager interface {
// GetStats return the server statistics
GetStats() *ServerStats
// contains filtered or unexported methods
}
StatsManager interface provides the ability to access the statistics of the server