Documentation
¶
Overview ¶
Package broker implements an extensible MQTT broker.
Example ¶
server, err := transport.Launch("tcp://localhost:8080")
if err != nil {
panic(err)
}
engine := NewEngine()
engine.Accept(server)
c := client.New()
wait := make(chan struct{})
c.Callback = func(msg *packet.Message, err error) error {
if err != nil {
panic(err)
}
fmt.Println(msg.String())
close(wait)
return nil
}
cf, err := c.Connect(client.NewConfig("tcp://localhost:8080"))
if err != nil {
panic(err)
}
cf.Wait(10 * time.Second)
sf, err := c.Subscribe("test", 0)
if err != nil {
panic(err)
}
sf.Wait(10 * time.Second)
pf, err := c.Publish("test", []byte("test"), 0, false)
if err != nil {
panic(err)
}
pf.Wait(10 * time.Second)
<-wait
err = c.Disconnect()
if err != nil {
panic(err)
}
err = server.Close()
if err != nil {
panic(err)
}
engine.Close()
Output: <Message Topic="test" QOS=0 Retain=false Payload=[116 101 115 116]>
Index ¶
- Variables
- func Run(engine *Engine, protocol string) (string, chan struct{}, chan struct{})
- type Backend
- type Client
- type Engine
- type LogEvent
- type Logger
- type MemoryBackend
- func (m *MemoryBackend) Authenticate(client *Client, user, password string) (bool, error)
- func (m *MemoryBackend) ClearRetained(client *Client, topic string) error
- func (m *MemoryBackend) Publish(client *Client, msg *packet.Message) error
- func (m *MemoryBackend) QueueOffline(client *Client) error
- func (m *MemoryBackend) QueueRetained(client *Client, topic string) error
- func (m *MemoryBackend) Setup(client *Client, id string) (Session, bool, error)
- func (m *MemoryBackend) StoreRetained(client *Client, msg *packet.Message) error
- func (m *MemoryBackend) Subscribe(client *Client, sub *packet.Subscription) error
- func (m *MemoryBackend) Terminate(client *Client) error
- func (m *MemoryBackend) Unsubscribe(client *Client, topic string) error
- type MessageQueue
- type Session
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrExpectedConnect = errors.New("expected a ConnectPacket as the first packet")
ErrExpectedConnect is returned when the first received packet is not a ConnectPacket.
Functions ¶
Types ¶
type Backend ¶
type Backend interface {
// Authenticate should authenticate the client using the user and password
// values and return true if the client is eligible to continue or false
// when the broker should terminate the connection.
Authenticate(client *Client, user, password string) (bool, error)
// Setup is called when a new client comes online and is successfully
// authenticated. Setup should return the already stored session for the
// supplied id or create and return a new one. If the supplied id has a zero
// length, a new temporary session should returned that is not stored
// further. The backend may also close any existing clients that use the
// same client id.
//
// Note: In this call the Backend may also allocate other resources and
// setup the client for further usage as the broker will acknowledge the
// connection when the call returns.
Setup(client *Client, id string) (Session, bool, error)
// QueueOffline is called after the clients stored subscriptions have been
// resubscribed. It should be used to trigger a background process that
// forwards all missed messages.
QueueOffline(*Client) error
// Subscribe should subscribe the passed client to the specified topic and
// call Publish with any incoming messages.
Subscribe(*Client, *packet.Subscription) error
// Unsubscribe should unsubscribe the passed client from the specified topic.
Unsubscribe(client *Client, topic string) error
// StoreRetained should store the specified message.
StoreRetained(*Client, *packet.Message) error
// ClearRetained should remove the stored messages for the given topic.
ClearRetained(client *Client, topic string) error
// QueueRetained is called after acknowledging a subscription and should be
// used to trigger a background process that forwards all retained messages.
QueueRetained(client *Client, topic string) error
// Publish should forward the passed message to all other clients that hold
// a subscription that matches the messages topic. It should also add the
// message to all sessions that have a matching offline subscription.
Publish(*Client, *packet.Message) error
// Terminate is called when the client goes offline. Terminate should
// unsubscribe the passed client from all previously subscribed topics. The
// backend may also convert a clients subscriptions to offline subscriptions.
//
// Note: The Backend may also cleanup previously allocated resources for
// that client as the broker will close the connection when the call
// returns.
Terminate(*Client) error
}
A Backend provides the effective brokering functionality to its clients.
type Client ¶
type Client struct {
// Ref can be used to store a custom reference to an object. This is usually
// used to attach a state object to client that is created in the Backend.
Ref interface{}
// contains filtered or unexported fields
}
A Client represents a remote client that is connected to the broker.
func (*Client) CleanSession ¶
CleanSession returns whether the client requested a clean session during connect.
func (*Client) Close ¶
Close will immediately close the connection. When clean=true the client will be marked as cleanly disconnected, and the will message will not get dispatched.
func (*Client) RemoteAddr ¶
RemoteAddr returns the client's remote net address from the underlying connection.
type Engine ¶
type Engine struct {
Backend Backend
Logger Logger
ConnectTimeout time.Duration
DefaultReadLimit int64
// contains filtered or unexported fields
}
The Engine handles incoming connections and connects them to the backend.
func NewEngine ¶
func NewEngine() *Engine
NewEngine returns a new Engine with a basic MemoryBackend.
func NewEngineWithBackend ¶
NewEngineWithBackend returns a new Engine with a custom Backend.
func (*Engine) Close ¶
func (e *Engine) Close()
Close will stop handling incoming connections and close all current clients. The call will block until all clients are properly closed.
Note: All passed servers to Accept must be closed before calling this method.
type LogEvent ¶
type LogEvent int
LogEvent are received by a Logger.
const ( // NewConnection is emitted when a client comes online. NewConnection LogEvent = iota // PacketReceived is emitted when a packet has been received. PacketReceived // MessagePublished is emitted after a message has been published. MessagePublished // MessageForwarded is emitted after a message has been forwarded. MessageForwarded // PacketSent is emitted when a packet has been sent. PacketSent // LostConnection is emitted when the connection has been terminated. LostConnection // TransportError is emitted when an underlying transport error occurs. TransportError // SessionError is emitted when a call to the session fails. SessionError // BackendError is emitted when a call to the backend fails. BackendError // ClientError is emitted when the client violates the protocol. ClientError )
type MemoryBackend ¶
type MemoryBackend struct {
Credentials map[string]string
// contains filtered or unexported fields
}
A MemoryBackend stores everything in memory.
func NewMemoryBackend ¶
func NewMemoryBackend() *MemoryBackend
NewMemoryBackend returns a new MemoryBackend.
func (*MemoryBackend) Authenticate ¶
func (m *MemoryBackend) Authenticate(client *Client, user, password string) (bool, error)
Authenticate authenticates a clients credentials by matching them to the saved Credentials map.
func (*MemoryBackend) ClearRetained ¶
func (m *MemoryBackend) ClearRetained(client *Client, topic string) error
ClearRetained will remove the stored messages for the given topic.
func (*MemoryBackend) Publish ¶
func (m *MemoryBackend) Publish(client *Client, msg *packet.Message) error
Publish will forward the passed message to all other subscribed clients. It will also add the message to all sessions that have a matching offline subscription.
func (*MemoryBackend) QueueOffline ¶
func (m *MemoryBackend) QueueOffline(client *Client) error
QueueOffline will begin with forwarding all missed messages in a separate goroutine.
func (*MemoryBackend) QueueRetained ¶
func (m *MemoryBackend) QueueRetained(client *Client, topic string) error
QueueRetained will queue all retained messages matching the given topic.
func (*MemoryBackend) Setup ¶
Setup returns the already stored session for the supplied id or creates and returns a new one. If the supplied id has a zero length, a new session is returned that is not stored further. Furthermore, it will disconnect any client connected with the same client id.
func (*MemoryBackend) StoreRetained ¶
func (m *MemoryBackend) StoreRetained(client *Client, msg *packet.Message) error
StoreRetained will store the specified message.
func (*MemoryBackend) Subscribe ¶
func (m *MemoryBackend) Subscribe(client *Client, sub *packet.Subscription) error
Subscribe will subscribe the passed client to the specified topic and begin to forward messages by calling the clients Publish method.
func (*MemoryBackend) Terminate ¶
func (m *MemoryBackend) Terminate(client *Client) error
Terminate will unsubscribe the passed client from all previously subscribed topics. If the client connect with clean=true it will also clean the session. Otherwise it will create offline subscriptions for all QOS 1 and QOS 2 subscriptions.
func (*MemoryBackend) Unsubscribe ¶
func (m *MemoryBackend) Unsubscribe(client *Client, topic string) error
Unsubscribe will unsubscribe the passed client from the specified topic.
type MessageQueue ¶ added in v0.5.0
type MessageQueue struct {
// contains filtered or unexported fields
}
MessageQueue is a basic FIFO queue for messages.
func NewMessageQueue ¶ added in v0.5.0
func NewMessageQueue(size int) *MessageQueue
NewMessageQueue returns a new MessageQueue. If size is greater than zero the queue will not grow more than the defined size.
func (*MessageQueue) Len ¶ added in v0.5.0
func (q *MessageQueue) Len() int
Len returns the length of the queue.
func (*MessageQueue) Pop ¶ added in v0.5.0
func (q *MessageQueue) Pop() *packet.Message
Pop removes and returns a message from the queue in first to last order.
func (*MessageQueue) Push ¶ added in v0.5.0
func (q *MessageQueue) Push(msg *packet.Message)
Push adds a message to the queue.
func (*MessageQueue) Range ¶ added in v0.5.0
func (q *MessageQueue) Range(fn func(*packet.Message) bool)
Range will call range with the contents of the queue. If fn returns false the operation is stopped immediately.
func (*MessageQueue) Reset ¶ added in v0.5.0
func (q *MessageQueue) Reset()
Reset returns and removes all messages from the queue.
type Session ¶
type Session interface {
// NextID should return the next id for outgoing packets.
NextID() packet.ID
// SavePacket should store a packet in the session. An eventual existing
// packet with the same id should be quietly overwritten.
SavePacket(session.Direction, packet.GenericPacket) error
// LookupPacket should retrieve a packet from the session using the packet id.
LookupPacket(session.Direction, packet.ID) (packet.GenericPacket, error)
// DeletePacket should remove a packet from the session. The method should
// not return an error if no packet with the specified id does exists.
DeletePacket(session.Direction, packet.ID) error
// AllPackets should return all packets currently saved in the session. This
// method is used to resend stored packets when the session is resumed.
AllPackets(session.Direction) ([]packet.GenericPacket, error)
// SaveSubscription should store the subscription in the session. An eventual
// subscription with the same topic should be quietly overwritten.
SaveSubscription(*packet.Subscription) error
// LookupSubscription should match a topic against the stored subscriptions
// and eventually return the first found subscription.
LookupSubscription(topic string) (*packet.Subscription, error)
// DeleteSubscription should remove the subscription from the session. The
// method should not return an error if no subscription with the specified
// topic does exist.
DeleteSubscription(topic string) error
// AllSubscriptions should return all subscriptions currently saved in the
// session. This method is used to restore a clients subscriptions when the
// session is resumed.
AllSubscriptions() ([]*packet.Subscription, error)
// SaveWill should store the will message.
SaveWill(*packet.Message) error
// LookupWill should retrieve the will message.
LookupWill() (*packet.Message, error)
// ClearWill should remove the will message from the store.
ClearWill() error
// Reset should completely reset the session.
Reset() error
}
A Session is used to persist incoming/outgoing packets, subscriptions and the will.