Documentation
¶
Index ¶
- Constants
- func WebsocketHandler(app *Node, fetchHeaders []string, config *WSConfig) http.Handler
- type AppNode
- type Controller
- type DisconnectQueue
- type DisconnectQueueConfig
- type Disconnector
- type Hub
- func (h *Hub) AddSession(s *Session)
- func (h *Hub) AddSubscription(sid string, identifier string, stream string)
- func (h *Hub) Broadcast(stream string, data string)
- func (h *Hub) BroadcastMessage(msg *common.StreamMessage)
- func (h *Hub) RemoteDisconnect(msg *common.RemoteDisconnectMessage)
- func (h *Hub) RemoveAllSubscriptions(sid string, identifier string)
- func (h *Hub) RemoveSession(s *Session)
- func (h *Hub) RemoveSubscription(sid string, identifier string, stream string)
- func (h *Hub) Run()
- func (h *Hub) Shutdown()
- func (h *Hub) Size() int
- func (h *Hub) StreamsSize() int
- func (h *Hub) UniqSize() int
- type HubRegistration
- type HubSubscription
- type Node
- func (n *Node) Authenticate(s *Session) (err error)
- func (n *Node) Broadcast(msg *common.StreamMessage)
- func (n *Node) Disconnect(s *Session) error
- func (n *Node) DisconnectNow(s *Session) error
- func (n *Node) HandleCommand(s *Session, raw []byte) error
- func (n *Node) HandlePubSub(raw []byte)
- func (n *Node) Perform(s *Session, msg *common.Message) (err error)
- func (n *Node) RemoteDisconnect(msg *common.RemoteDisconnectMessage)
- func (n *Node) SetDisconnector(d Disconnector)
- func (n *Node) Shutdown()
- func (n *Node) Start()
- func (n *Node) Subscribe(s *Session, msg *common.Message) (err error)
- func (n *Node) Unsubscribe(s *Session, msg *common.Message) (err error)
- type NoopDisconnectQueue
- type Session
- type WSConfig
Constants ¶
const ( // CloseNormalClosure indicates normal closure CloseNormalClosure = websocket.CloseNormalClosure // CloseInternalServerErr indicates closure because of internal error CloseInternalServerErr = websocket.CloseInternalServerErr // CloseAbnormalClosure indicates abnormal close CloseAbnormalClosure = websocket.CloseAbnormalClosure // CloseGoingAway indicates closing because of server shuts down or client disconnects CloseGoingAway = websocket.CloseGoingAway )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type AppNode ¶ added in v1.0.1
type AppNode interface {
HandlePubSub(msg []byte)
}
AppNode describes a basic node interface
type Controller ¶
type Controller interface {
Shutdown() error
Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error)
Subscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error)
Unsubscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error)
Perform(sid string, env *common.SessionEnv, id string, channel string, data string) (*common.CommandResult, error)
Disconnect(sid string, env *common.SessionEnv, id string, subscriptions []string) error
}
Controller is an interface describing business-logic handler (e.g. RPC)
type DisconnectQueue ¶
type DisconnectQueue struct {
// contains filtered or unexported fields
}
DisconnectQueue is a rate-limited executor
func NewDisconnectQueue ¶
func NewDisconnectQueue(node *Node, config *DisconnectQueueConfig) *DisconnectQueue
NewDisconnectQueue builds new queue with a specified rate (max calls per second)
func (*DisconnectQueue) Enqueue ¶
func (d *DisconnectQueue) Enqueue(s *Session) error
Enqueue adds session to the disconnect queue
func (*DisconnectQueue) Shutdown ¶
func (d *DisconnectQueue) Shutdown() error
Shutdown stops throttling and makes requests one by one
func (*DisconnectQueue) Size ¶
func (d *DisconnectQueue) Size() int
Size returns the number of enqueued tasks
type DisconnectQueueConfig ¶ added in v1.0.1
type DisconnectQueueConfig struct {
// Limit the number of Disconnect RPC calls per second
Rate int
// How much time wait to call all enqueued calls at exit (in seconds)
ShutdownTimeout int
}
DisconnectQueueConfig contains DisconnectQueue configuration
func NewDisconnectQueueConfig ¶ added in v1.0.1
func NewDisconnectQueueConfig() DisconnectQueueConfig
NewDisconnectQueueConfig builds a new config
type Disconnector ¶ added in v1.0.1
Disconnector is an interface for disconnect queue implementation
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub stores all the sessions and the corresponding subscriptions info
func (*Hub) AddSession ¶ added in v1.0.4
AddSession enqueues sessions registration
func (*Hub) AddSubscription ¶ added in v1.0.4
AddSubscription enqueues adding a subscription for session-identifier pair to the hub
func (*Hub) BroadcastMessage ¶ added in v1.0.4
func (h *Hub) BroadcastMessage(msg *common.StreamMessage)
BroadcastMessage enqueues broadcasting a pre-built StreamMessage
func (*Hub) RemoteDisconnect ¶ added in v1.0.4
func (h *Hub) RemoteDisconnect(msg *common.RemoteDisconnectMessage)
RemoteDisconnect enqueues remote disconnect command
func (*Hub) RemoveAllSubscriptions ¶ added in v1.0.4
RemoveAllSubscriptions enqueues removing all subscription for session-identifier pair from the hub
func (*Hub) RemoveSession ¶ added in v1.0.4
RemoveSession enqueues session un-registration
func (*Hub) RemoveSubscription ¶ added in v1.0.4
RemoveSubscription enqueues removing a subscription for session-identifier pair from the hub
func (*Hub) StreamsSize ¶
StreamsSize returns a number of uniq streams
type HubRegistration ¶ added in v1.0.4
type HubRegistration struct {
// contains filtered or unexported fields
}
HubRegistration represents registration event ("add" or "remove")
type HubSubscription ¶ added in v1.0.4
type HubSubscription struct {
// contains filtered or unexported fields
}
HubSubscription contains information about session-channel(-stream) subscription
type Node ¶
Node represents the whole application
func NewNode ¶
func NewNode(controller Controller, metrics *metrics.Metrics) *Node
NewNode builds new node struct
func (*Node) Authenticate ¶
Authenticate calls controller to perform authentication. If authentication is successful, session is registered with a hub.
func (*Node) Broadcast ¶
func (n *Node) Broadcast(msg *common.StreamMessage)
Broadcast message to stream
func (*Node) Disconnect ¶
Disconnect adds session to disconnector queue and unregister session from hub
func (*Node) DisconnectNow ¶
DisconnectNow execute disconnect on controller
func (*Node) HandleCommand ¶
HandleCommand parses incoming message from client and execute the command (if recognized)
func (*Node) HandlePubSub ¶ added in v1.0.1
HandlePubSub parses incoming pubsub message and broadcast it
func (*Node) RemoteDisconnect ¶ added in v1.0.1
func (n *Node) RemoteDisconnect(msg *common.RemoteDisconnectMessage)
RemoteDisconnect find a session by identifier and closes it
func (*Node) SetDisconnector ¶ added in v1.0.1
func (n *Node) SetDisconnector(d Disconnector)
SetDisconnector set disconnector for the node
type NoopDisconnectQueue ¶ added in v1.0.1
type NoopDisconnectQueue struct{}
NoopDisconnectQueue is non-operational disconnect queue implementation
func NewNoopDisconnector ¶ added in v1.0.1
func NewNoopDisconnector() *NoopDisconnectQueue
NewNoopDisconnector returns new NoopDisconnectQueue
func (*NoopDisconnectQueue) Enqueue ¶ added in v1.0.1
func (d *NoopDisconnectQueue) Enqueue(s *Session) error
Enqueue does nothing
func (*NoopDisconnectQueue) Run ¶ added in v1.0.1
func (d *NoopDisconnectQueue) Run() error
Run does nothing
func (*NoopDisconnectQueue) Shutdown ¶ added in v1.0.1
func (d *NoopDisconnectQueue) Shutdown() error
Shutdown does nothing
func (*NoopDisconnectQueue) Size ¶ added in v1.0.1
func (d *NoopDisconnectQueue) Size() int
Size returns 0
type Session ¶
type Session struct {
UID string
Identifiers string
Log *log.Entry
// contains filtered or unexported fields
}
Session represents active client
func NewSession ¶
func NewSession(node *Node, ws *websocket.Conn, url string, headers map[string]string, uid string) (*Session, error)
NewSession build a new Session struct from ws connetion and http request
func (*Session) Disconnect ¶
Disconnect enqueues RPC disconnect request and closes the connection
func (*Session) ReadMessages ¶
func (s *Session) ReadMessages()
ReadMessages reads messages from ws connection and send them to node
func (*Session) SendMessages ¶
func (s *Session) SendMessages()
SendMessages waits for incoming messages and send them to the client connection