node

package
v1.4.0-rc.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2023 License: MIT Imports: 15 Imported by: 9

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AppNode added in v1.0.1

type AppNode interface {
	HandlePubSub(msg []byte)
	LookupSession(id string) *Session
	Authenticate(s *Session) (*common.ConnectResult, error)
	Subscribe(s *Session, msg *common.Message) (*common.CommandResult, error)
	Unsubscribe(s *Session, msg *common.Message) (*common.CommandResult, error)
	Perform(s *Session, msg *common.Message) (*common.CommandResult, error)
	Disconnect(s *Session) error
}

AppNode describes a basic node interface

type AppNodeExt

type AppNodeExt interface {
	AppNode
	AuthenticateWithOptions(s *Session, opts ...AuthOption) (*common.ConnectResult, error)
}

type AuthOption added in v1.4.0

type AuthOption = func(*authOptions)

func WithDisconnectOnFailure added in v1.4.0

func WithDisconnectOnFailure(disconnect bool) AuthOption

type Config added in v1.0.5

type Config struct {
	// How often server should send Action Cable ping messages (seconds)
	PingInterval int
	// How ofter to refresh node stats (seconds)
	StatsRefreshInterval int
	// The max size of the Go routines pool for hub
	HubGopoolSize int
	// How should ping message timestamp be formatted? ('s' => seconds, 'ms' => milli seconds, 'ns' => nano seconds)
	PingTimestampPrecision string
}

Config contains general application/node settings

func NewConfig added in v1.0.5

func NewConfig() Config

NewConfig builds a new config

type Connection added in v1.1.0

type Connection interface {
	Write(msg []byte, deadline time.Time) error
	WriteBinary(msg []byte, deadline time.Time) error
	Read() ([]byte, error)
	Close(code int, reason string)
}

Connection represents underlying connection

type Controller

type Controller interface {
	Start() error
	Shutdown() error
	Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error)
	Subscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error)
	Unsubscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error)
	Perform(sid string, env *common.SessionEnv, id string, channel string, data string) (*common.CommandResult, error)
	Disconnect(sid string, env *common.SessionEnv, id string, subscriptions []string) error
}

Controller is an interface describing business-logic handler (e.g. RPC)

type DisconnectQueue

type DisconnectQueue struct {
	// contains filtered or unexported fields
}

DisconnectQueue is a rate-limited executor

func NewDisconnectQueue

func NewDisconnectQueue(node *Node, config *DisconnectQueueConfig) *DisconnectQueue

NewDisconnectQueue builds new queue with a specified rate (max calls per second)

func (*DisconnectQueue) Enqueue

func (d *DisconnectQueue) Enqueue(s *Session) error

Enqueue adds session to the disconnect queue

func (*DisconnectQueue) Run

func (d *DisconnectQueue) Run() error

Run starts queue

func (*DisconnectQueue) Shutdown

func (d *DisconnectQueue) Shutdown() error

Shutdown stops throttling and makes requests one by one

func (*DisconnectQueue) Size

func (d *DisconnectQueue) Size() int

Size returns the number of enqueued tasks

type DisconnectQueueConfig added in v1.0.1

type DisconnectQueueConfig struct {
	// Limit the number of Disconnect RPC calls per second
	Rate int
	// How much time wait to call all enqueued calls at exit (in seconds)
	ShutdownTimeout int
}

DisconnectQueueConfig contains DisconnectQueue configuration

func NewDisconnectQueueConfig added in v1.0.1

func NewDisconnectQueueConfig() DisconnectQueueConfig

NewDisconnectQueueConfig builds a new config

type Disconnector added in v1.0.1

type Disconnector interface {
	Run() error
	Shutdown() error
	Enqueue(*Session) error
	Size() int
}

Disconnector is an interface for disconnect queue implementation

type Executor added in v1.1.0

type Executor interface {
	HandleCommand(*Session, *common.Message) error
	Disconnect(*Session) error
}

Executor handles incoming commands (messages)

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node represents the whole application

func NewNode

func NewNode(controller Controller, metrics *metrics.Metrics, config *Config) *Node

NewNode builds new node struct

func (*Node) Authenticate

func (n *Node) Authenticate(s *Session) (res *common.ConnectResult, err error)

Authenticate calls controller to perform authentication. If authentication is successful, session is registered with a hub.

func (*Node) AuthenticateWithOptions

func (n *Node) AuthenticateWithOptions(s *Session, options ...AuthOption) (res *common.ConnectResult, err error)

AuthenticateWithOptions provides more control on how authentication is performed.

func (*Node) Broadcast

func (n *Node) Broadcast(msg *common.StreamMessage)

Broadcast message to stream (locally)

func (*Node) Disconnect

func (n *Node) Disconnect(s *Session) error

Disconnect adds session to disconnector queue and unregister session from hub

func (*Node) DisconnectNow

func (n *Node) DisconnectNow(s *Session) error

DisconnectNow execute disconnect on controller

func (*Node) ExecuteRemoteCommand added in v1.4.0

func (n *Node) ExecuteRemoteCommand(msg *common.RemoteCommandMessage)

Execute remote command (locally)

func (*Node) HandleBroadcast added in v1.4.0

func (n *Node) HandleBroadcast(raw []byte)

HandleBroadcast parses incoming broadcast message, record it and re-transmit to other nodes

func (*Node) HandleCommand

func (n *Node) HandleCommand(s *Session, msg *common.Message) (err error)

HandleCommand parses incoming message from client and execute the command (if recognized)

func (*Node) HandlePubSub added in v1.0.1

func (n *Node) HandlePubSub(raw []byte)

HandlePubSub parses incoming pubsub message and broadcast it to all clients (w/o using a broker)

func (*Node) History added in v1.4.0

func (n *Node) History(s *Session, msg *common.Message) (err error)

History fetches the stream history for the specified identifier

func (*Node) LookupSession added in v1.1.4

func (n *Node) LookupSession(id string) *Session

func (*Node) Perform

func (n *Node) Perform(s *Session, msg *common.Message) (res *common.CommandResult, err error)

Perform executes client command

func (*Node) RemoteDisconnect added in v1.0.1

func (n *Node) RemoteDisconnect(msg *common.RemoteDisconnectMessage)

RemoteDisconnect find a session by identifier and closes it

func (*Node) SetBroker added in v1.4.0

func (n *Node) SetBroker(b broker.Broker)

func (*Node) SetDisconnector added in v1.0.1

func (n *Node) SetDisconnector(d Disconnector)

SetDisconnector set disconnector for the node

func (*Node) Shutdown

func (n *Node) Shutdown() (err error)

Shutdown stops all services (hub, controller)

func (*Node) Start added in v1.0.1

func (n *Node) Start() error

Start runs all the required goroutines

func (*Node) Subscribe

func (n *Node) Subscribe(s *Session, msg *common.Message) (res *common.CommandResult, err error)

Subscribe subscribes session to a channel

func (*Node) TryRestoreSession added in v1.4.0

func (n *Node) TryRestoreSession(s *Session) (restored bool)

func (*Node) Unsubscribe

func (n *Node) Unsubscribe(s *Session, msg *common.Message) (res *common.CommandResult, err error)

Unsubscribe unsubscribes session from a channel

type NoopDisconnectQueue added in v1.0.1

type NoopDisconnectQueue struct{}

NoopDisconnectQueue is non-operational disconnect queue implementation

func NewNoopDisconnector added in v1.0.1

func NewNoopDisconnector() *NoopDisconnectQueue

NewNoopDisconnector returns new NoopDisconnectQueue

func (*NoopDisconnectQueue) Enqueue added in v1.0.1

func (d *NoopDisconnectQueue) Enqueue(s *Session) error

Enqueue does nothing

func (*NoopDisconnectQueue) Run added in v1.0.1

func (d *NoopDisconnectQueue) Run() error

Run does nothing

func (*NoopDisconnectQueue) Shutdown added in v1.0.1

func (d *NoopDisconnectQueue) Shutdown() error

Shutdown does nothing

func (*NoopDisconnectQueue) Size added in v1.0.1

func (d *NoopDisconnectQueue) Size() int

Size returns 0

type Session

type Session struct {
	Connected bool
	// Could be used to store arbitrary data within a session
	InternalState map[string]interface{}
	Log           *log.Entry
	// contains filtered or unexported fields
}

Session represents active client

func NewSession

func NewSession(node *Node, conn Connection, url string, headers *map[string]string, uid string) *Session

NewSession build a new Session struct from ws connetion and http request

func (*Session) Disconnect

func (s *Session) Disconnect(reason string, code int)

Disconnect schedules connection disconnect

func (*Session) DisconnectWithMessage added in v1.2.3

func (s *Session) DisconnectWithMessage(msg encoders.EncodedMessage, code string)

func (*Session) GetEnv added in v1.1.2

func (s *Session) GetEnv() *common.SessionEnv

func (*Session) GetID added in v1.2.3

func (s *Session) GetID() string

func (*Session) GetIdentifiers added in v1.2.3

func (s *Session) GetIdentifiers() string

func (*Session) MergeEnv added in v1.1.4

func (s *Session) MergeEnv(env *common.SessionEnv)

Merge connection and channel states into current env. This method locks the state for writing (so, goroutine-safe)

func (*Session) PrevSid added in v1.4.0

func (s *Session) PrevSid() (psid string)

func (*Session) ReadInternalState added in v1.4.0

func (s *Session) ReadInternalState(key string) (interface{}, bool)

ReadInternalState reads internal state value by key

func (*Session) ReadMessage added in v1.1.0

func (s *Session) ReadMessage(message []byte) error

ReadMessage reads messages from ws connection and send them to node

func (*Session) RestoreFromCache added in v1.4.0

func (s *Session) RestoreFromCache(cached []byte) error

func (*Session) Send

func (s *Session) Send(msg encoders.EncodedMessage)

Send schedules a data transmission

func (*Session) SendJSONTransmission added in v1.1.0

func (s *Session) SendJSONTransmission(msg string)

SendJSONTransmission is used to propagate the direct transmission to the client (from RPC call result)

func (*Session) SendMessages

func (s *Session) SendMessages()

SendMessages waits for incoming messages and send them to the client connection

func (*Session) Serve added in v1.1.0

func (s *Session) Serve(callback func()) error

Serve enters a loop to read incoming data

func (*Session) SetEncoder added in v1.1.0

func (s *Session) SetEncoder(enc encoders.Encoder)

func (*Session) SetEnv added in v1.1.2

func (s *Session) SetEnv(env *common.SessionEnv)

func (*Session) SetExecutor added in v1.1.0

func (s *Session) SetExecutor(ex Executor)

func (*Session) SetID added in v1.3.0

func (s *Session) SetID(id string)

func (*Session) SetIdentifiers added in v1.2.3

func (s *Session) SetIdentifiers(ids string)

func (*Session) SetIdleTimeout added in v1.2.3

func (s *Session) SetIdleTimeout(val time.Duration)

func (*Session) SetMetrics added in v1.2.2

func (s *Session) SetMetrics(m metrics.Instrumenter)

func (*Session) ToCacheEntry added in v1.4.0

func (s *Session) ToCacheEntry() ([]byte, error)

func (*Session) WriteInternalState added in v1.4.0

func (s *Session) WriteInternalState(key string, val interface{})

WriteInternalState

type SubscriptionState added in v1.2.3

type SubscriptionState struct {
	// contains filtered or unexported fields
}

func NewSubscriptionState added in v1.2.3

func NewSubscriptionState() *SubscriptionState

func (*SubscriptionState) AddChannel added in v1.2.3

func (st *SubscriptionState) AddChannel(id string)

func (*SubscriptionState) AddChannelStream added in v1.2.3

func (st *SubscriptionState) AddChannelStream(id string, stream string)

func (*SubscriptionState) Channels added in v1.2.3

func (st *SubscriptionState) Channels() []string

func (*SubscriptionState) HasChannel added in v1.2.3

func (st *SubscriptionState) HasChannel(id string) bool

func (*SubscriptionState) RemoveChannel added in v1.2.3

func (st *SubscriptionState) RemoveChannel(id string)

func (*SubscriptionState) RemoveChannelStream added in v1.2.3

func (st *SubscriptionState) RemoveChannelStream(id string, stream string)

func (*SubscriptionState) RemoveChannelStreams added in v1.2.3

func (st *SubscriptionState) RemoveChannelStreams(id string) []string

func (*SubscriptionState) StreamsFor added in v1.2.3

func (st *SubscriptionState) StreamsFor(id string) []string

func (*SubscriptionState) ToMap added in v1.2.3

func (st *SubscriptionState) ToMap() map[string][]string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL