node

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 5, 2021 License: MIT Imports: 11 Imported by: 9

Documentation

Index

Constants

View Source
const (
	// CloseNormalClosure indicates normal closure
	CloseNormalClosure = websocket.CloseNormalClosure

	// CloseInternalServerErr indicates closure because of internal error
	CloseInternalServerErr = websocket.CloseInternalServerErr

	// CloseAbnormalClosure indicates abnormal close
	CloseAbnormalClosure = websocket.CloseAbnormalClosure

	// CloseGoingAway indicates closing because of server shuts down or client disconnects
	CloseGoingAway = websocket.CloseGoingAway
)

Variables

This section is empty.

Functions

func WebsocketHandler added in v1.0.1

func WebsocketHandler(app *Node, fetchHeaders []string, config *WSConfig) http.Handler

WebsocketHandler generate a new http handler for WebSocket connections

Types

type AppNode added in v1.0.1

type AppNode interface {
	HandlePubSub(msg []byte)
}

AppNode describes a basic node interface

type Controller

type Controller interface {
	Shutdown() error
	Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error)
	Subscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error)
	Unsubscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error)
	Perform(sid string, env *common.SessionEnv, id string, channel string, data string) (*common.CommandResult, error)
	Disconnect(sid string, env *common.SessionEnv, id string, subscriptions []string) error
}

Controller is an interface describing business-logic handler (e.g. RPC)

type DisconnectQueue

type DisconnectQueue struct {
	// contains filtered or unexported fields
}

DisconnectQueue is a rate-limited executor

func NewDisconnectQueue

func NewDisconnectQueue(node *Node, config *DisconnectQueueConfig) *DisconnectQueue

NewDisconnectQueue builds new queue with a specified rate (max calls per second)

func (*DisconnectQueue) Enqueue

func (d *DisconnectQueue) Enqueue(s *Session) error

Enqueue adds session to the disconnect queue

func (*DisconnectQueue) Run

func (d *DisconnectQueue) Run() error

Run starts queue

func (*DisconnectQueue) Shutdown

func (d *DisconnectQueue) Shutdown() error

Shutdown stops throttling and makes requests one by one

func (*DisconnectQueue) Size

func (d *DisconnectQueue) Size() int

Size returns the number of enqueued tasks

type DisconnectQueueConfig added in v1.0.1

type DisconnectQueueConfig struct {
	// Limit the number of Disconnect RPC calls per second
	Rate int
	// How much time wait to call all enqueued calls at exit (in seconds)
	ShutdownTimeout int
}

DisconnectQueueConfig contains DisconnectQueue configuration

func NewDisconnectQueueConfig added in v1.0.1

func NewDisconnectQueueConfig() DisconnectQueueConfig

NewDisconnectQueueConfig builds a new config

type Disconnector added in v1.0.1

type Disconnector interface {
	Run() error
	Shutdown() error
	Enqueue(*Session) error
	Size() int
}

Disconnector is an interface for disconnect queue implementation

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub stores all the sessions and the corresponding subscriptions info

func NewHub

func NewHub() *Hub

NewHub builds new hub instance

func (*Hub) Run

func (h *Hub) Run()

Run makes hub active

func (*Hub) Shutdown

func (h *Hub) Shutdown()

Shutdown sends shutdown command to hub

func (*Hub) Size

func (h *Hub) Size() int

Size returns a number of active sessions

func (*Hub) StreamsSize

func (h *Hub) StreamsSize() int

StreamsSize returns a number of uniq streams

func (*Hub) UniqSize

func (h *Hub) UniqSize() int

UniqSize returns a number of uniq identifiers

type Node

type Node struct {
	Metrics *metrics.Metrics
	// contains filtered or unexported fields
}

Node represents the whole application

func NewNode

func NewNode(controller Controller, metrics *metrics.Metrics) *Node

NewNode builds new node struct

func (*Node) Authenticate

func (n *Node) Authenticate(s *Session) (err error)

Authenticate calls controller to perform authentication. If authentication is successful, session is registered with a hub.

func (*Node) Broadcast

func (n *Node) Broadcast(msg *common.StreamMessage)

Broadcast message to stream

func (*Node) Disconnect

func (n *Node) Disconnect(s *Session) error

Disconnect adds session to disconnector queue and unregister session from hub

func (*Node) DisconnectNow

func (n *Node) DisconnectNow(s *Session) error

DisconnectNow execute disconnect on controller

func (*Node) HandleCommand

func (n *Node) HandleCommand(s *Session, raw []byte) error

HandleCommand parses incoming message from client and execute the command (if recognized)

func (*Node) HandlePubSub added in v1.0.1

func (n *Node) HandlePubSub(raw []byte)

HandlePubSub parses incoming pubsub message and broadcast it

func (*Node) Perform

func (n *Node) Perform(s *Session, msg *common.Message) (err error)

Perform executes client command

func (*Node) RemoteDisconnect added in v1.0.1

func (n *Node) RemoteDisconnect(msg *common.RemoteDisconnectMessage)

RemoteDisconnect find a session by identifier and closes it

func (*Node) SetDisconnector added in v1.0.1

func (n *Node) SetDisconnector(d Disconnector)

SetDisconnector set disconnector for the node

func (*Node) Shutdown

func (n *Node) Shutdown()

Shutdown stops all services (hub, controller)

func (*Node) Start added in v1.0.1

func (n *Node) Start()

Start runs all the required goroutines

func (*Node) Subscribe

func (n *Node) Subscribe(s *Session, msg *common.Message) (err error)

Subscribe subscribes session to a channel

func (*Node) Unsubscribe

func (n *Node) Unsubscribe(s *Session, msg *common.Message) (err error)

Unsubscribe unsubscribes session from a channel

type NoopDisconnectQueue added in v1.0.1

type NoopDisconnectQueue struct{}

NoopDisconnectQueue is non-operational disconnect queue implementation

func NewNoopDisconnector added in v1.0.1

func NewNoopDisconnector() *NoopDisconnectQueue

NewNoopDisconnector returns new NoopDisconnectQueue

func (*NoopDisconnectQueue) Enqueue added in v1.0.1

func (d *NoopDisconnectQueue) Enqueue(s *Session) error

Enqueue does nothing

func (*NoopDisconnectQueue) Run added in v1.0.1

func (d *NoopDisconnectQueue) Run() error

Run does nothing

func (*NoopDisconnectQueue) Shutdown added in v1.0.1

func (d *NoopDisconnectQueue) Shutdown() error

Shutdown does nothing

func (*NoopDisconnectQueue) Size added in v1.0.1

func (d *NoopDisconnectQueue) Size() int

Size returns 0

type Session

type Session struct {
	UID         string
	Identifiers string
	Log         *log.Entry
	// contains filtered or unexported fields
}

Session represents active client

func NewSession

func NewSession(node *Node, ws *websocket.Conn, url string, headers map[string]string, uid string) (*Session, error)

NewSession build a new Session struct from ws connetion and http request

func (*Session) Close

func (s *Session) Close(reason string, code int)

Close websocket connection with the specified reason

func (*Session) Disconnect

func (s *Session) Disconnect(reason string, code int)

Disconnect enqueues RPC disconnect request and closes the connection

func (*Session) ReadMessages

func (s *Session) ReadMessages()

ReadMessages reads messages from ws connection and send them to node

func (*Session) Send

func (s *Session) Send(msg []byte)

Send data to client connection

func (*Session) SendMessages

func (s *Session) SendMessages()

SendMessages waits for incoming messages and send them to the client connection

type SubscriptionInfo

type SubscriptionInfo struct {
	// contains filtered or unexported fields
}

SubscriptionInfo contains information about session-channel(-stream) subscription

type WSConfig added in v1.0.1

type WSConfig struct {
	ReadBufferSize    int
	WriteBufferSize   int
	MaxMessageSize    int64
	EnableCompression bool
}

WSConfig contains WebSocket connection configuration.

func NewWSConfig added in v1.0.1

func NewWSConfig() WSConfig

NewWSConfig build a new WSConfig struct

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL