Documentation
¶
Index ¶
- Constants
- Variables
- type AppNode
- type AuthOption
- type AuthOptions
- type Config
- type Connection
- type Controller
- type DisconnectQueue
- type DisconnectQueueConfig
- type Disconnector
- type Executor
- type InlineDisconnector
- type Node
- func (n *Node) Authenticate(s *Session, options ...AuthOption) (*common.ConnectResult, error)
- func (n *Node) Authenticated(s *Session, ids string)
- func (n *Node) Broadcast(msg *common.StreamMessage)
- func (n *Node) Disconnect(s *Session) error
- func (n *Node) DisconnectNow(s *Session) error
- func (n *Node) ExecuteRemoteCommand(msg *common.RemoteCommandMessage)
- func (n *Node) HandleBroadcast(raw []byte)
- func (n *Node) HandleCommand(s *Session, msg *common.Message) (err error)
- func (n *Node) HandlePubSub(raw []byte)
- func (n *Node) History(s *Session, msg *common.Message) error
- func (n *Node) ID() string
- func (n *Node) Instrumenter() metrics.Instrumenter
- func (n *Node) IsShuttingDown() bool
- func (n *Node) LookupSession(id string) *Session
- func (n *Node) Perform(s *Session, msg *common.Message) (*common.CommandResult, error)
- func (n *Node) Presence(s *Session, msg *common.Message) error
- func (n *Node) PresenceJoin(s *Session, msg *common.Message) error
- func (n *Node) PresenceLeave(s *Session, msg *common.Message) error
- func (n *Node) RemoteDisconnect(msg *common.RemoteDisconnectMessage)
- func (n *Node) SetBroker(b broker.Broker)
- func (n *Node) SetDisconnector(d Disconnector)
- func (n *Node) Shutdown(ctx context.Context) (err error)
- func (n *Node) Size() int
- func (n *Node) Start() error
- func (n *Node) Subscribe(s *Session, msg *common.Message) (*common.CommandResult, error)
- func (n *Node) TryRestoreSession(s *Session) (restored bool)
- func (n *Node) Unsubscribe(s *Session, msg *common.Message) (*common.CommandResult, error)
- func (n *Node) Whisper(s *Session, msg *common.Message) error
- type NodeOption
- type NoopDisconnectQueue
- type NullController
- func (c *NullController) Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error)
- func (c *NullController) Disconnect(sid string, env *common.SessionEnv, ids string, subscriptions []string) error
- func (c *NullController) Perform(sid string, env *common.SessionEnv, ids string, channel string, data string) (*common.CommandResult, error)
- func (c *NullController) Shutdown() (err error)
- func (c *NullController) Start() (err error)
- func (c *NullController) Subscribe(sid string, env *common.SessionEnv, ids string, channel string) (*common.CommandResult, error)
- func (c *NullController) Unsubscribe(sid string, env *common.SessionEnv, ids string, channel string) (*common.CommandResult, error)
- type Session
- func (s *Session) AuthenticateOnConnect() bool
- func (s *Session) Disconnect(reason string, code int)
- func (s *Session) DisconnectNow(reason string, code int)
- func (s *Session) DisconnectWithMessage(msg encoders.EncodedMessage, code string)
- func (s *Session) GetEnv() *common.SessionEnv
- func (s *Session) GetID() string
- func (s *Session) GetIdentifiers() string
- func (s *Session) IsClosed() bool
- func (s *Session) IsConnected() bool
- func (s *Session) IsDisconnectable() bool
- func (s *Session) IsResumeable() bool
- func (s *Session) MarkDisconnectable(val bool)
- func (s *Session) MergeEnv(env *common.SessionEnv)
- func (s *Session) PrevSid() string
- func (s *Session) ReadInternalState(key string) (interface{}, bool)
- func (s *Session) ReadMessage(message []byte) error
- func (s *Session) RestoreFromCache(cached []byte) error
- func (s *Session) Send(msg encoders.EncodedMessage)
- func (s *Session) SendJSONTransmission(msg string)
- func (s *Session) SendMessages()
- func (s *Session) Serve(callback func()) error
- func (s *Session) SetEnv(env *common.SessionEnv)
- func (s *Session) SetID(id string)
- func (s *Session) SetIdentifiers(ids string)
- func (s *Session) String() string
- func (s *Session) ToCacheEntry() ([]byte, error)
- func (s *Session) UnderlyingConn() Connection
- func (s *Session) WriteInternalState(key string, val interface{})
- type SessionOption
- func WithEncoder(enc encoders.Encoder) SessionOption
- func WithExecutor(ex Executor) SessionOption
- func WithHandshakeMessageDeadline(deadline time.Time) SessionOption
- func WithMetrics(m metrics.Instrumenter) SessionOption
- func WithPingInterval(interval time.Duration) SessionOption
- func WithPingPrecision(val string) SessionOption
- func WithPongTimeout(timeout time.Duration) SessionOption
- func WithPrevSID(sid string) SessionOption
- func WithResumable(val bool) SessionOption
- type SubscriptionState
- func (st *SubscriptionState) AddChannel(id string)
- func (st *SubscriptionState) AddChannelStream(id string, stream string)
- func (st *SubscriptionState) Channels() []string
- func (st *SubscriptionState) HasChannel(id string) bool
- func (st *SubscriptionState) RemoveChannel(id string)
- func (st *SubscriptionState) RemoveChannelStream(id string, stream string)
- func (st *SubscriptionState) RemoveChannelStreams(id string) []string
- func (st *SubscriptionState) StreamsFor(id string) []string
- func (st *SubscriptionState) ToMap() map[string][]string
Constants ¶
const ( DISCONNECT_MODE_ALWAYS = "always" DISCONNECT_MODE_AUTO = "auto" DISCONNECT_MODE_NEVER = "never" )
Variables ¶
var DISCONNECT_MODES = []string{DISCONNECT_MODE_ALWAYS, DISCONNECT_MODE_AUTO, DISCONNECT_MODE_NEVER}
Functions ¶
This section is empty.
Types ¶
type AppNode ¶
type AppNode interface {
HandlePubSub(msg []byte)
LookupSession(id string) *Session
Authenticate(s *Session, opts ...AuthOption) (*common.ConnectResult, error)
Authenticated(s *Session, identifiers string)
Subscribe(s *Session, msg *common.Message) (*common.CommandResult, error)
Unsubscribe(s *Session, msg *common.Message) (*common.CommandResult, error)
Perform(s *Session, msg *common.Message) (*common.CommandResult, error)
Disconnect(s *Session) error
}
AppNode describes a basic node interface
type AuthOption ¶
type AuthOption = func(*AuthOptions)
func WithDisconnectOnFailure ¶
func WithDisconnectOnFailure(disconnect bool) AuthOption
type AuthOptions ¶
type AuthOptions struct {
DisconnectOnFailure bool
}
type Config ¶
type Config struct {
// Define when to invoke Disconnect callback
DisconnectMode string `toml:"disconnect_mode"`
// The number of goroutines to use for disconnect calls on shutdown
ShutdownDisconnectPoolSize int `toml:"shutdown_disconnect_gopool_size"`
// How often server should send Action Cable ping messages (seconds)
PingInterval int `toml:"ping_interval"`
// How ofter to refresh node stats (seconds)
StatsRefreshInterval int `toml:"stats_refresh_interval"`
// The max size of the Go routines pool for hub
HubGopoolSize int `toml:"broadcast_gopool_size"`
// How should ping message timestamp be formatted? ('s' => seconds, 'ms' => milli seconds, 'ns' => nano seconds)
PingTimestampPrecision string `toml:"ping_timestamp_precision"`
// For how long to wait for pong message before disconnecting (seconds)
PongTimeout int `toml:"pong_timeout"`
// For how long to wait for disconnect callbacks to be processed before exiting (seconds)
ShutdownTimeout int `toml:"shutdown_timeout"`
}
Config contains general application/node settings
type Connection ¶
type Connection interface {
Write(msg []byte, deadline time.Time) error
WriteBinary(msg []byte, deadline time.Time) error
Read() ([]byte, error)
Close(code int, reason string)
}
Connection represents underlying connection
type Controller ¶
type Controller interface {
Start() error
Shutdown() error
Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error)
Subscribe(sid string, env *common.SessionEnv, ids string, channel string) (*common.CommandResult, error)
Unsubscribe(sid string, env *common.SessionEnv, ids string, channel string) (*common.CommandResult, error)
Perform(sid string, env *common.SessionEnv, ids string, channel string, data string) (*common.CommandResult, error)
Disconnect(sid string, env *common.SessionEnv, ids string, subscriptions []string) error
}
Controller is an interface describing business-logic handler (e.g. RPC)
type DisconnectQueue ¶
type DisconnectQueue struct {
// contains filtered or unexported fields
}
DisconnectQueue is a rate-limited executor
func NewDisconnectQueue ¶
func NewDisconnectQueue(node *Node, config *DisconnectQueueConfig, l *slog.Logger) *DisconnectQueue
NewDisconnectQueue builds new queue with a specified rate (max calls per second)
func (*DisconnectQueue) Enqueue ¶
func (d *DisconnectQueue) Enqueue(s *Session) error
Enqueue adds session to the disconnect queue
func (*DisconnectQueue) Shutdown ¶
func (d *DisconnectQueue) Shutdown(ctx context.Context) error
Shutdown stops throttling and makes requests one by one
func (*DisconnectQueue) Size ¶
func (d *DisconnectQueue) Size() int
Size returns the number of enqueued tasks
type DisconnectQueueConfig ¶
type DisconnectQueueConfig struct {
// Limit the number of Disconnect RPC calls per second
Rate int
// The size of the channel's buffer for disconnect requests
Backlog int
// How much time wait to call all enqueued calls at exit (in seconds) [DEPREACTED]
ShutdownTimeout int
}
DisconnectQueueConfig contains DisconnectQueue configuration
func NewDisconnectQueueConfig ¶
func NewDisconnectQueueConfig() DisconnectQueueConfig
NewDisconnectQueueConfig builds a new config
func (DisconnectQueueConfig) ToToml ¶
func (c DisconnectQueueConfig) ToToml() string
type Disconnector ¶
type Disconnector interface {
Run() error
Shutdown(ctx context.Context) error
Enqueue(*Session) error
Size() int
}
Disconnector is an interface for disconnect queue implementation
type Executor ¶
type Executor interface {
HandleCommand(*Session, *common.Message) error
Disconnect(*Session) error
}
Executor handles incoming commands (messages)
type InlineDisconnector ¶
type InlineDisconnector struct {
// contains filtered or unexported fields
}
InlineDisconnector performs Disconnect calls synchronously
func NewInlineDisconnector ¶
func NewInlineDisconnector(n *Node) *InlineDisconnector
NewInlineDisconnector returns new InlineDisconnector
func (*InlineDisconnector) Enqueue ¶
func (d *InlineDisconnector) Enqueue(s *Session) error
Enqueue disconnects session immediately
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node represents the whole application
func NewNode ¶
func NewNode(config *Config, opts ...NodeOption) *Node
NewNode builds new node struct
func (*Node) Authenticate ¶
func (n *Node) Authenticate(s *Session, options ...AuthOption) (*common.ConnectResult, error)
Authenticate calls controller to perform authentication. If authentication is successful, session is registered with a hub.
func (*Node) Authenticated ¶
Mark session as authenticated and register it with a hub. Useful when you perform authentication manually, not using a controller.
func (*Node) Broadcast ¶
func (n *Node) Broadcast(msg *common.StreamMessage)
Broadcast message to stream (locally)
func (*Node) Disconnect ¶
Disconnect adds session to disconnector queue and unregister session from hub
func (*Node) DisconnectNow ¶
DisconnectNow execute disconnect on controller
func (*Node) ExecuteRemoteCommand ¶
func (n *Node) ExecuteRemoteCommand(msg *common.RemoteCommandMessage)
Execute remote command (locally)
func (*Node) HandleBroadcast ¶
HandleBroadcast parses incoming broadcast message, record it and re-transmit to other nodes
func (*Node) HandleCommand ¶
HandleCommand parses incoming message from client and execute the command (if recognized)
func (*Node) HandlePubSub ¶
HandlePubSub parses incoming pubsub message and broadcast it to all clients (w/o using a broker)
func (*Node) Instrumenter ¶
func (n *Node) Instrumenter() metrics.Instrumenter
Return current instrumenter for the node
func (*Node) IsShuttingDown ¶
func (*Node) LookupSession ¶
func (*Node) PresenceJoin ¶
PresenceJoin adds the session to the presence state for the specified identifier
func (*Node) PresenceLeave ¶
PresenceLeave removes the session to the presence state for the specified identifier
func (*Node) RemoteDisconnect ¶
func (n *Node) RemoteDisconnect(msg *common.RemoteDisconnectMessage)
RemoteDisconnect find a session by identifier and closes it
func (*Node) SetDisconnector ¶
func (n *Node) SetDisconnector(d Disconnector)
SetDisconnector set disconnector for the node
func (*Node) TryRestoreSession ¶
func (*Node) Unsubscribe ¶
Unsubscribe unsubscribes session from a channel
type NodeOption ¶
type NodeOption = func(*Node)
func WithController ¶
func WithController(c Controller) NodeOption
func WithID ¶
func WithID(id string) NodeOption
func WithInstrumenter ¶
func WithInstrumenter(i metrics.Instrumenter) NodeOption
func WithLogger ¶
func WithLogger(l *slog.Logger) NodeOption
type NoopDisconnectQueue ¶
type NoopDisconnectQueue struct{}
NoopDisconnectQueue is non-operational disconnect queue implementation
func NewNoopDisconnector ¶
func NewNoopDisconnector() *NoopDisconnectQueue
NewNoopDisconnector returns new NoopDisconnectQueue
func (*NoopDisconnectQueue) Enqueue ¶
func (d *NoopDisconnectQueue) Enqueue(s *Session) error
Enqueue does nothing
type NullController ¶
type NullController struct {
// contains filtered or unexported fields
}
func NewNullController ¶
func NewNullController(l *slog.Logger) *NullController
func (*NullController) Authenticate ¶
func (c *NullController) Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error)
func (*NullController) Disconnect ¶
func (c *NullController) Disconnect(sid string, env *common.SessionEnv, ids string, subscriptions []string) error
func (*NullController) Perform ¶
func (c *NullController) Perform(sid string, env *common.SessionEnv, ids string, channel string, data string) (*common.CommandResult, error)
func (*NullController) Shutdown ¶
func (c *NullController) Shutdown() (err error)
func (*NullController) Start ¶
func (c *NullController) Start() (err error)
func (*NullController) Subscribe ¶
func (c *NullController) Subscribe(sid string, env *common.SessionEnv, ids string, channel string) (*common.CommandResult, error)
func (*NullController) Unsubscribe ¶
func (c *NullController) Unsubscribe(sid string, env *common.SessionEnv, ids string, channel string) (*common.CommandResult, error)
type Session ¶
type Session struct {
Connected bool
// Could be used to store arbitrary data within a session
InternalState map[string]interface{}
Log *slog.Logger
// contains filtered or unexported fields
}
Session represents active client
func NewSession ¶
func NewSession(node *Node, conn Connection, url string, headers *map[string]string, uid string, opts ...SessionOption) *Session
NewSession build a new Session struct from ws connetion and http request
func (*Session) AuthenticateOnConnect ¶
func (*Session) Disconnect ¶
Disconnect schedules connection disconnect
func (*Session) DisconnectNow ¶
func (*Session) DisconnectWithMessage ¶
func (s *Session) DisconnectWithMessage(msg encoders.EncodedMessage, code string)
func (*Session) GetEnv ¶
func (s *Session) GetEnv() *common.SessionEnv
func (*Session) GetIdentifiers ¶
func (*Session) IsConnected ¶
func (*Session) IsDisconnectable ¶
func (*Session) IsResumeable ¶
func (*Session) MarkDisconnectable ¶
func (*Session) MergeEnv ¶
func (s *Session) MergeEnv(env *common.SessionEnv)
Merge connection and channel states into current env. This method locks the state for writing (so, goroutine-safe)
func (*Session) ReadInternalState ¶
ReadInternalState reads internal state value by key
func (*Session) ReadMessage ¶
ReadMessage reads messages from ws connection and send them to node
func (*Session) RestoreFromCache ¶
func (*Session) Send ¶
func (s *Session) Send(msg encoders.EncodedMessage)
Send schedules a data transmission
func (*Session) SendJSONTransmission ¶
SendJSONTransmission is used to propagate the direct transmission to the client (from RPC call result)
func (*Session) SendMessages ¶
func (s *Session) SendMessages()
SendMessages waits for incoming messages and send them to the client connection
func (*Session) SetEnv ¶
func (s *Session) SetEnv(env *common.SessionEnv)
func (*Session) SetIdentifiers ¶
func (*Session) String ¶
String returns session string representation (for %v in Printf-like functions)
func (*Session) ToCacheEntry ¶
func (*Session) UnderlyingConn ¶
func (s *Session) UnderlyingConn() Connection
func (*Session) WriteInternalState ¶
WriteInternalState
type SessionOption ¶
type SessionOption = func(*Session)
func WithEncoder ¶
func WithEncoder(enc encoders.Encoder) SessionOption
WithEncoder allows to set a custom encoder for a session
func WithExecutor ¶
func WithExecutor(ex Executor) SessionOption
WithExecutor allows to set a custom executor for a session
func WithHandshakeMessageDeadline ¶
func WithHandshakeMessageDeadline(deadline time.Time) SessionOption
WithHandshakeMessageDeadline allows to set a custom deadline for handshake messages. This option also indicates that we MUST NOT perform Authenticate on connect.
func WithMetrics ¶
func WithMetrics(m metrics.Instrumenter) SessionOption
WithMetrics allows to set a custom metrics instrumenter for a session
func WithPingInterval ¶
func WithPingInterval(interval time.Duration) SessionOption
WithPingInterval allows to set a custom ping interval for a session or disable pings at all (by passing 0)
func WithPingPrecision ¶
func WithPingPrecision(val string) SessionOption
WithPingPrecision allows to configure precision for timestamps attached to pings
func WithPongTimeout ¶
func WithPongTimeout(timeout time.Duration) SessionOption
WithPongTimeout allows to set a custom pong timeout for a session
func WithPrevSID ¶
func WithPrevSID(sid string) SessionOption
WithPrevSID allows providing the previous session ID to restore from
func WithResumable ¶
func WithResumable(val bool) SessionOption
WithResumable allows marking session as resumable (so we store its state in cache)
type SubscriptionState ¶
type SubscriptionState struct {
// contains filtered or unexported fields
}
func NewSubscriptionState ¶
func NewSubscriptionState() *SubscriptionState
func (*SubscriptionState) AddChannel ¶
func (st *SubscriptionState) AddChannel(id string)
func (*SubscriptionState) AddChannelStream ¶
func (st *SubscriptionState) AddChannelStream(id string, stream string)
func (*SubscriptionState) Channels ¶
func (st *SubscriptionState) Channels() []string
func (*SubscriptionState) HasChannel ¶
func (st *SubscriptionState) HasChannel(id string) bool
func (*SubscriptionState) RemoveChannel ¶
func (st *SubscriptionState) RemoveChannel(id string)
func (*SubscriptionState) RemoveChannelStream ¶
func (st *SubscriptionState) RemoveChannelStream(id string, stream string)
func (*SubscriptionState) RemoveChannelStreams ¶
func (st *SubscriptionState) RemoveChannelStreams(id string) []string
func (*SubscriptionState) StreamsFor ¶
func (st *SubscriptionState) StreamsFor(id string) []string
func (*SubscriptionState) ToMap ¶
func (st *SubscriptionState) ToMap() map[string][]string