node

package
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2021 License: MIT Imports: 11 Imported by: 9

Documentation

Index

Constants

View Source
const (
	// CloseNormalClosure indicates normal closure
	CloseNormalClosure = websocket.CloseNormalClosure

	// CloseInternalServerErr indicates closure because of internal error
	CloseInternalServerErr = websocket.CloseInternalServerErr

	// CloseAbnormalClosure indicates abnormal close
	CloseAbnormalClosure = websocket.CloseAbnormalClosure

	// CloseGoingAway indicates closing because of server shuts down or client disconnects
	CloseGoingAway = websocket.CloseGoingAway
)

Variables

This section is empty.

Functions

func WebsocketHandler added in v1.0.1

func WebsocketHandler(app *Node, fetchHeaders []string, config *WSConfig) http.Handler

WebsocketHandler generate a new http handler for WebSocket connections

Types

type AppNode added in v1.0.1

type AppNode interface {
	HandlePubSub(msg []byte)
}

AppNode describes a basic node interface

type Config added in v1.0.5

type Config struct {
	// How often server should send Action Cable ping messages (seconds)
	PingInterval int
	// How ofter to refresh node stats (seconds)
	StatsRefreshInterval int
}

Config contains general application/node settings

func NewConfig added in v1.0.5

func NewConfig() Config

NewConfig builds a new config

type Controller

type Controller interface {
	Shutdown() error
	Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error)
	Subscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error)
	Unsubscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error)
	Perform(sid string, env *common.SessionEnv, id string, channel string, data string) (*common.CommandResult, error)
	Disconnect(sid string, env *common.SessionEnv, id string, subscriptions []string) error
}

Controller is an interface describing business-logic handler (e.g. RPC)

type DisconnectQueue

type DisconnectQueue struct {
	// contains filtered or unexported fields
}

DisconnectQueue is a rate-limited executor

func NewDisconnectQueue

func NewDisconnectQueue(node *Node, config *DisconnectQueueConfig) *DisconnectQueue

NewDisconnectQueue builds new queue with a specified rate (max calls per second)

func (*DisconnectQueue) Enqueue

func (d *DisconnectQueue) Enqueue(s *Session) error

Enqueue adds session to the disconnect queue

func (*DisconnectQueue) Run

func (d *DisconnectQueue) Run() error

Run starts queue

func (*DisconnectQueue) Shutdown

func (d *DisconnectQueue) Shutdown() error

Shutdown stops throttling and makes requests one by one

func (*DisconnectQueue) Size

func (d *DisconnectQueue) Size() int

Size returns the number of enqueued tasks

type DisconnectQueueConfig added in v1.0.1

type DisconnectQueueConfig struct {
	// Limit the number of Disconnect RPC calls per second
	Rate int
	// How much time wait to call all enqueued calls at exit (in seconds)
	ShutdownTimeout int
}

DisconnectQueueConfig contains DisconnectQueue configuration

func NewDisconnectQueueConfig added in v1.0.1

func NewDisconnectQueueConfig() DisconnectQueueConfig

NewDisconnectQueueConfig builds a new config

type Disconnector added in v1.0.1

type Disconnector interface {
	Run() error
	Shutdown() error
	Enqueue(*Session) error
	Size() int
}

Disconnector is an interface for disconnect queue implementation

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub stores all the sessions and the corresponding subscriptions info

func NewHub

func NewHub() *Hub

NewHub builds new hub instance

func (*Hub) AddSession added in v1.0.4

func (h *Hub) AddSession(s *Session)

AddSession enqueues sessions registration

func (*Hub) AddSubscription added in v1.0.4

func (h *Hub) AddSubscription(sid string, identifier string, stream string)

AddSubscription enqueues adding a subscription for session-identifier pair to the hub

func (*Hub) Broadcast added in v1.0.4

func (h *Hub) Broadcast(stream string, data string)

Broadcast enqueues data broadcasting to a stream

func (*Hub) BroadcastMessage added in v1.0.4

func (h *Hub) BroadcastMessage(msg *common.StreamMessage)

BroadcastMessage enqueues broadcasting a pre-built StreamMessage

func (*Hub) RemoteDisconnect added in v1.0.4

func (h *Hub) RemoteDisconnect(msg *common.RemoteDisconnectMessage)

RemoteDisconnect enqueues remote disconnect command

func (*Hub) RemoveAllSubscriptions added in v1.0.4

func (h *Hub) RemoveAllSubscriptions(sid string, identifier string)

RemoveAllSubscriptions enqueues removing all subscription for session-identifier pair from the hub

func (*Hub) RemoveSession added in v1.0.4

func (h *Hub) RemoveSession(s *Session)

RemoveSession enqueues session un-registration

func (*Hub) RemoveSubscription added in v1.0.4

func (h *Hub) RemoveSubscription(sid string, identifier string, stream string)

RemoveSubscription enqueues removing a subscription for session-identifier pair from the hub

func (*Hub) Run

func (h *Hub) Run()

Run makes hub active

func (*Hub) Shutdown

func (h *Hub) Shutdown()

Shutdown sends shutdown command to hub

func (*Hub) Size

func (h *Hub) Size() int

Size returns a number of active sessions

func (*Hub) StreamsSize

func (h *Hub) StreamsSize() int

StreamsSize returns a number of uniq streams

func (*Hub) UniqSize

func (h *Hub) UniqSize() int

UniqSize returns a number of uniq identifiers

type HubRegistration added in v1.0.4

type HubRegistration struct {
	// contains filtered or unexported fields
}

HubRegistration represents registration event ("add" or "remove")

type HubSubscription added in v1.0.4

type HubSubscription struct {
	// contains filtered or unexported fields
}

HubSubscription contains information about session-channel(-stream) subscription

type Node

type Node struct {
	Metrics *metrics.Metrics
	// contains filtered or unexported fields
}

Node represents the whole application

func NewNode

func NewNode(controller Controller, metrics *metrics.Metrics, config *Config) *Node

NewNode builds new node struct

func (*Node) Authenticate

func (n *Node) Authenticate(s *Session) (err error)

Authenticate calls controller to perform authentication. If authentication is successful, session is registered with a hub.

func (*Node) Broadcast

func (n *Node) Broadcast(msg *common.StreamMessage)

Broadcast message to stream

func (*Node) Disconnect

func (n *Node) Disconnect(s *Session) error

Disconnect adds session to disconnector queue and unregister session from hub

func (*Node) DisconnectNow

func (n *Node) DisconnectNow(s *Session) error

DisconnectNow execute disconnect on controller

func (*Node) HandleCommand

func (n *Node) HandleCommand(s *Session, raw []byte) error

HandleCommand parses incoming message from client and execute the command (if recognized)

func (*Node) HandlePubSub added in v1.0.1

func (n *Node) HandlePubSub(raw []byte)

HandlePubSub parses incoming pubsub message and broadcast it

func (*Node) Perform

func (n *Node) Perform(s *Session, msg *common.Message) (err error)

Perform executes client command

func (*Node) RemoteDisconnect added in v1.0.1

func (n *Node) RemoteDisconnect(msg *common.RemoteDisconnectMessage)

RemoteDisconnect find a session by identifier and closes it

func (*Node) SetDisconnector added in v1.0.1

func (n *Node) SetDisconnector(d Disconnector)

SetDisconnector set disconnector for the node

func (*Node) Shutdown

func (n *Node) Shutdown()

Shutdown stops all services (hub, controller)

func (*Node) Start added in v1.0.1

func (n *Node) Start()

Start runs all the required goroutines

func (*Node) Subscribe

func (n *Node) Subscribe(s *Session, msg *common.Message) (err error)

Subscribe subscribes session to a channel

func (*Node) Unsubscribe

func (n *Node) Unsubscribe(s *Session, msg *common.Message) (err error)

Unsubscribe unsubscribes session from a channel

type NoopDisconnectQueue added in v1.0.1

type NoopDisconnectQueue struct{}

NoopDisconnectQueue is non-operational disconnect queue implementation

func NewNoopDisconnector added in v1.0.1

func NewNoopDisconnector() *NoopDisconnectQueue

NewNoopDisconnector returns new NoopDisconnectQueue

func (*NoopDisconnectQueue) Enqueue added in v1.0.1

func (d *NoopDisconnectQueue) Enqueue(s *Session) error

Enqueue does nothing

func (*NoopDisconnectQueue) Run added in v1.0.1

func (d *NoopDisconnectQueue) Run() error

Run does nothing

func (*NoopDisconnectQueue) Shutdown added in v1.0.1

func (d *NoopDisconnectQueue) Shutdown() error

Shutdown does nothing

func (*NoopDisconnectQueue) Size added in v1.0.1

func (d *NoopDisconnectQueue) Size() int

Size returns 0

type Session

type Session struct {
	UID         string
	Identifiers string
	Log         *log.Entry
	// contains filtered or unexported fields
}

Session represents active client

func NewSession

func NewSession(node *Node, ws *websocket.Conn, url string, headers map[string]string, uid string) *Session

NewSession build a new Session struct from ws connetion and http request

func (*Session) Close

func (s *Session) Close(reason string, code int)

Close websocket connection with the specified reason

func (*Session) Disconnect

func (s *Session) Disconnect(reason string, code int)

Disconnect enqueues RPC disconnect request and closes the connection

func (*Session) ReadMessages

func (s *Session) ReadMessages()

ReadMessages reads messages from ws connection and send them to node

func (*Session) Send

func (s *Session) Send(msg []byte)

Send data to client connection

func (*Session) SendMessages

func (s *Session) SendMessages()

SendMessages waits for incoming messages and send them to the client connection

type WSConfig added in v1.0.1

type WSConfig struct {
	ReadBufferSize    int
	WriteBufferSize   int
	MaxMessageSize    int64
	EnableCompression bool
}

WSConfig contains WebSocket connection configuration.

func NewWSConfig added in v1.0.1

func NewWSConfig() WSConfig

NewWSConfig build a new WSConfig struct

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL