Documentation
¶
Index ¶
- Constants
- Variables
- func CopySignalStreamToMessageChannel[SendType, RecvType RelaySignalMessage](stream psrpc.Stream[SendType, RecvType], ch *MessageChannel, ...) error
- type LocalNode
- type LocalNodeImpl
- func (l *LocalNodeImpl) Clone() *livekit.Node
- func (l *LocalNodeImpl) NodeID() livekit.NodeID
- func (l *LocalNodeImpl) NodeIP() string
- func (l *LocalNodeImpl) NodeType() livekit.NodeType
- func (l *LocalNodeImpl) Region() string
- func (l *LocalNodeImpl) SecondsSinceNodeStatsUpdate() float64
- func (l *LocalNodeImpl) SetNodeID(nodeID livekit.NodeID)
- func (l *LocalNodeImpl) SetState(state livekit.NodeState)
- func (l *LocalNodeImpl) SetStats(stats *livekit.NodeStats)
- func (l *LocalNodeImpl) UpdateNodeStats() bool
- type LocalRouter
- func (r *LocalRouter) ClearRoomState(_ context.Context, _ livekit.RoomName) error
- func (r *LocalRouter) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (res *livekit.Room, err error)
- func (r *LocalRouter) CreateRoomWithNodeID(ctx context.Context, req *livekit.CreateRoomRequest, nodeID livekit.NodeID) (res *livekit.Room, err error)
- func (r *LocalRouter) Drain()
- func (r *LocalRouter) GetNode(nodeID livekit.NodeID) (*livekit.Node, error)
- func (r *LocalRouter) GetNodeForRoom(_ context.Context, _ livekit.RoomName) (*livekit.Node, error)
- func (r *LocalRouter) GetRegion() string
- func (r *LocalRouter) ListNodes() ([]*livekit.Node, error)
- func (r *LocalRouter) RegisterNode() error
- func (r *LocalRouter) RemoveDeadNodes() error
- func (r *LocalRouter) SetNodeForRoom(_ context.Context, _ livekit.RoomName, _ livekit.NodeID) error
- func (r *LocalRouter) Start() error
- func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)
- func (r *LocalRouter) StartParticipantSignalWithNodeID(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, ...) (res StartParticipantSignalResults, err error)
- func (r *LocalRouter) Stop()
- func (r *LocalRouter) UnregisterNode() error
- type MessageChannel
- type MessageRouter
- type MessageSink
- type MessageSource
- type NodeStats
- type NullMessageSink
- type NullMessageSource
- type ParticipantInit
- type RedisRouter
- func (r *RedisRouter) ClearRoomState(_ context.Context, roomName livekit.RoomName) error
- func (r *RedisRouter) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (res *livekit.Room, err error)
- func (r *RedisRouter) Drain()
- func (r *RedisRouter) GetNode(nodeID livekit.NodeID) (*livekit.Node, error)
- func (r *RedisRouter) GetNodeForRoom(_ context.Context, roomName livekit.RoomName) (*livekit.Node, error)
- func (r *RedisRouter) ListNodes() ([]*livekit.Node, error)
- func (r *RedisRouter) RegisterNode() error
- func (r *RedisRouter) RemoveDeadNodes() error
- func (r *RedisRouter) SetNodeForRoom(_ context.Context, roomName livekit.RoomName, nodeID livekit.NodeID) error
- func (r *RedisRouter) Start() error
- func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)
- func (r *RedisRouter) Stop()
- func (r *RedisRouter) UnregisterNode() error
- type RelaySignalMessage
- type RoomManagerClient
- type Router
- type SignalClient
- type SignalMessageReader
- type SignalMessageWriter
- type SignalSinkParams
- type StartParticipantSignalResults
Constants ¶
View Source
const ( // hash of node_id => Node proto NodesKey = "nodes" // hash of room_name => node_id NodeRoomKey = "room_node_map" )
View Source
const DefaultMessageChannelSize = 200
Variables ¶
View Source
var ( ErrNotFound = errors.New("could not find object") ErrIPNotSet = errors.New("ip address is required and not set") ErrHandlerNotDefined = errors.New("handler not defined") ErrIncorrectRTCNode = errors.New("current node isn't the RTC node for the room") ErrNodeNotFound = errors.New("could not locate the node") ErrNodeLimitReached = errors.New("reached configured limit for node") ErrInvalidRouterMessage = errors.New("invalid router message") ErrChannelClosed = errors.New("channel closed") ErrChannelFull = errors.New("channel is full") // errors when starting signal connection ErrRequestChannelClosed = errors.New("request channel closed") ErrCouldNotMigrateParticipant = errors.New("could not migrate participant") ErrClientInfoNotSet = errors.New("client info not set") )
View Source
var ErrSignalMessageDropped = errors.New("signal message dropped")
View Source
var ErrSignalWriteFailed = errors.New("signal write failed")
Functions ¶
func CopySignalStreamToMessageChannel ¶ added in v1.4.2
func CopySignalStreamToMessageChannel[SendType, RecvType RelaySignalMessage]( stream psrpc.Stream[SendType, RecvType], ch *MessageChannel, reader SignalMessageReader[RecvType], config config.SignalRelayConfig, promSignalSuccess func(), promSignalFailure func(), ) error
Types ¶
type LocalNodeImpl ¶ added in v1.8.1
type LocalNodeImpl struct {
// contains filtered or unexported fields
}
func NewLocalNode ¶
func NewLocalNode(conf *config.Config) (*LocalNodeImpl, error)
func NewLocalNodeFromNodeProto ¶ added in v1.8.1
func NewLocalNodeFromNodeProto(node *livekit.Node) (*LocalNodeImpl, error)
func (*LocalNodeImpl) Clone ¶ added in v1.8.1
func (l *LocalNodeImpl) Clone() *livekit.Node
func (*LocalNodeImpl) NodeID ¶ added in v1.8.1
func (l *LocalNodeImpl) NodeID() livekit.NodeID
func (*LocalNodeImpl) NodeIP ¶ added in v1.8.1
func (l *LocalNodeImpl) NodeIP() string
func (*LocalNodeImpl) NodeType ¶ added in v1.8.1
func (l *LocalNodeImpl) NodeType() livekit.NodeType
func (*LocalNodeImpl) Region ¶ added in v1.8.1
func (l *LocalNodeImpl) Region() string
func (*LocalNodeImpl) SecondsSinceNodeStatsUpdate ¶ added in v1.8.1
func (l *LocalNodeImpl) SecondsSinceNodeStatsUpdate() float64
func (*LocalNodeImpl) SetNodeID ¶ added in v1.8.1
func (l *LocalNodeImpl) SetNodeID(nodeID livekit.NodeID)
for testing only
func (*LocalNodeImpl) SetState ¶ added in v1.8.1
func (l *LocalNodeImpl) SetState(state livekit.NodeState)
func (*LocalNodeImpl) SetStats ¶ added in v1.8.1
func (l *LocalNodeImpl) SetStats(stats *livekit.NodeStats)
for testing only
func (*LocalNodeImpl) UpdateNodeStats ¶ added in v1.8.1
func (l *LocalNodeImpl) UpdateNodeStats() bool
type LocalRouter ¶
type LocalRouter struct {
// contains filtered or unexported fields
}
a router of messages on the same node, basic implementation for local testing
func NewLocalRouter ¶
func NewLocalRouter( currentNode LocalNode, signalClient SignalClient, roomManagerClient RoomManagerClient, nodeStatsConfig config.NodeStatsConfig, ) *LocalRouter
func (*LocalRouter) ClearRoomState ¶
func (*LocalRouter) CreateRoom ¶ added in v1.8.0
func (r *LocalRouter) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (res *livekit.Room, err error)
func (*LocalRouter) CreateRoomWithNodeID ¶ added in v1.8.0
func (r *LocalRouter) CreateRoomWithNodeID(ctx context.Context, req *livekit.CreateRoomRequest, nodeID livekit.NodeID) (res *livekit.Room, err error)
func (*LocalRouter) Drain ¶ added in v0.13.5
func (r *LocalRouter) Drain()
func (*LocalRouter) GetNodeForRoom ¶
func (*LocalRouter) GetRegion ¶ added in v0.15.7
func (r *LocalRouter) GetRegion() string
func (*LocalRouter) RegisterNode ¶
func (r *LocalRouter) RegisterNode() error
func (*LocalRouter) RemoveDeadNodes ¶
func (r *LocalRouter) RemoveDeadNodes() error
func (*LocalRouter) SetNodeForRoom ¶
func (*LocalRouter) Start ¶
func (r *LocalRouter) Start() error
func (*LocalRouter) StartParticipantSignal ¶
func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)
func (*LocalRouter) StartParticipantSignalWithNodeID ¶ added in v1.4.0
func (r *LocalRouter) StartParticipantSignalWithNodeID(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, nodeID livekit.NodeID) (res StartParticipantSignalResults, err error)
func (*LocalRouter) Stop ¶
func (r *LocalRouter) Stop()
func (*LocalRouter) UnregisterNode ¶
func (r *LocalRouter) UnregisterNode() error
type MessageChannel ¶
type MessageChannel struct {
// contains filtered or unexported fields
}
func NewDefaultMessageChannel ¶ added in v1.4.0
func NewDefaultMessageChannel(connectionID livekit.ConnectionID) *MessageChannel
func NewMessageChannel ¶
func NewMessageChannel(connectionID livekit.ConnectionID, size int) *MessageChannel
func (*MessageChannel) Close ¶
func (m *MessageChannel) Close()
func (*MessageChannel) ConnectionID ¶ added in v1.4.4
func (m *MessageChannel) ConnectionID() livekit.ConnectionID
func (*MessageChannel) IsClosed ¶ added in v0.13.1
func (m *MessageChannel) IsClosed() bool
func (*MessageChannel) OnClose ¶
func (m *MessageChannel) OnClose(f func())
func (*MessageChannel) ReadChan ¶
func (m *MessageChannel) ReadChan() <-chan proto.Message
func (*MessageChannel) WriteMessage ¶
func (m *MessageChannel) WriteMessage(msg proto.Message) error
type MessageRouter ¶ added in v0.14.2
type MessageRouter interface {
// CreateRoom starts an rtc room
CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (res *livekit.Room, err error)
// StartParticipantSignal participant signal connection is ready to start
StartParticipantSignal(
ctx context.Context,
roomName livekit.RoomName,
pi ParticipantInit,
) (res StartParticipantSignalResults, err error)
}
type MessageSink ¶
type MessageSink interface {
WriteMessage(msg proto.Message) error
IsClosed() bool
Close()
ConnectionID() livekit.ConnectionID
}
MessageSink is an abstraction for writing protobuf messages and having them read by a MessageSource, potentially on a different node via a transport
func NewSignalMessageSink ¶ added in v1.4.2
func NewSignalMessageSink[SendType, RecvType RelaySignalMessage](params SignalSinkParams[SendType, RecvType]) MessageSink
type MessageSource ¶
type MessageSource interface {
// ReadChan exposes a one way channel to make it easier to use with select
ReadChan() <-chan proto.Message
IsClosed() bool
Close()
ConnectionID() livekit.ConnectionID
}
type NodeStats ¶
type NodeStats struct {
// contains filtered or unexported fields
}
func NewNodeStats ¶ added in v1.9.0
func NewNodeStats(conf *config.NodeStatsConfig, startedAt int64) *NodeStats
func (*NodeStats) UpdateAndGetNodeStats ¶ added in v1.9.0
func (*NodeStats) UpdateConfig ¶ added in v1.9.0
func (n *NodeStats) UpdateConfig(conf *config.NodeStatsConfig)
type NullMessageSink ¶ added in v1.8.1
type NullMessageSink struct {
// contains filtered or unexported fields
}
func NewNullMessageSink ¶ added in v1.8.1
func NewNullMessageSink(connID livekit.ConnectionID) *NullMessageSink
func (*NullMessageSink) Close ¶ added in v1.8.1
func (n *NullMessageSink) Close()
func (*NullMessageSink) ConnectionID ¶ added in v1.8.1
func (n *NullMessageSink) ConnectionID() livekit.ConnectionID
func (*NullMessageSink) IsClosed ¶ added in v1.8.1
func (n *NullMessageSink) IsClosed() bool
func (*NullMessageSink) WriteMessage ¶ added in v1.8.1
func (n *NullMessageSink) WriteMessage(_msg proto.Message) error
type NullMessageSource ¶ added in v1.8.1
type NullMessageSource struct {
// contains filtered or unexported fields
}
func NewNullMessageSource ¶ added in v1.8.1
func NewNullMessageSource(connID livekit.ConnectionID) *NullMessageSource
func (*NullMessageSource) Close ¶ added in v1.8.1
func (n *NullMessageSource) Close()
func (*NullMessageSource) ConnectionID ¶ added in v1.8.1
func (n *NullMessageSource) ConnectionID() livekit.ConnectionID
func (*NullMessageSource) IsClosed ¶ added in v1.8.1
func (n *NullMessageSource) IsClosed() bool
func (*NullMessageSource) ReadChan ¶ added in v1.8.1
func (n *NullMessageSource) ReadChan() <-chan proto.Message
type ParticipantInit ¶
type ParticipantInit struct {
Identity livekit.ParticipantIdentity
Name livekit.ParticipantName
Reconnect bool
ReconnectReason livekit.ReconnectReason
AutoSubscribe bool
Client *livekit.ClientInfo
Grants *auth.ClaimGrants
Region string
AdaptiveStream bool
ID livekit.ParticipantID
SubscriberAllowPause *bool
DisableICELite bool
CreateRoom *livekit.CreateRoomRequest
AddTrackRequests []*livekit.AddTrackRequest
PublisherOffer *livekit.SessionDescription
SyncState *livekit.SyncState
UseSinglePeerConnection bool
}
func ParticipantInitFromStartSession ¶ added in v0.15.7
func ParticipantInitFromStartSession(ss *livekit.StartSession, region string) (*ParticipantInit, error)
func (*ParticipantInit) MarshalLogObject ¶ added in v1.8.1
func (pi *ParticipantInit) MarshalLogObject(e zapcore.ObjectEncoder) error
func (*ParticipantInit) ToStartSession ¶ added in v0.15.7
func (pi *ParticipantInit) ToStartSession(roomName livekit.RoomName, connectionID livekit.ConnectionID) (*livekit.StartSession, error)
type RedisRouter ¶
type RedisRouter struct {
*LocalRouter
// contains filtered or unexported fields
}
RedisRouter uses Redis pub/sub to route signaling messages across different nodes It relies on the RTC node to be the primary driver of the participant connection. Because
func NewRedisRouter ¶
func NewRedisRouter(lr *LocalRouter, rc redis.UniversalClient, kps rpc.KeepalivePubSub) *RedisRouter
func (*RedisRouter) ClearRoomState ¶
func (*RedisRouter) CreateRoom ¶ added in v1.8.0
func (r *RedisRouter) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (res *livekit.Room, err error)
func (*RedisRouter) Drain ¶ added in v0.13.5
func (r *RedisRouter) Drain()
func (*RedisRouter) GetNodeForRoom ¶
func (r *RedisRouter) GetNodeForRoom(_ context.Context, roomName livekit.RoomName) (*livekit.Node, error)
GetNodeForRoom finds the node where the room is hosted at
func (*RedisRouter) RegisterNode ¶
func (r *RedisRouter) RegisterNode() error
func (*RedisRouter) RemoveDeadNodes ¶
func (r *RedisRouter) RemoveDeadNodes() error
func (*RedisRouter) SetNodeForRoom ¶
func (*RedisRouter) Start ¶
func (r *RedisRouter) Start() error
func (*RedisRouter) StartParticipantSignal ¶
func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)
StartParticipantSignal signal connection sets up paths to the RTC node, and starts to route messages to that message queue
func (*RedisRouter) Stop ¶
func (r *RedisRouter) Stop()
func (*RedisRouter) UnregisterNode ¶
func (r *RedisRouter) UnregisterNode() error
type RelaySignalMessage ¶ added in v1.4.2
type RoomManagerClient ¶ added in v1.8.0
type RoomManagerClient interface {
rpc.TypedRoomManagerClient
}
func NewRoomManagerClient ¶ added in v1.8.0
func NewRoomManagerClient(clientParams rpc.ClientParams, config config.RoomConfig) (RoomManagerClient, error)
type Router ¶
type Router interface {
MessageRouter
RegisterNode() error
UnregisterNode() error
RemoveDeadNodes() error
ListNodes() ([]*livekit.Node, error)
GetNodeForRoom(ctx context.Context, roomName livekit.RoomName) (*livekit.Node, error)
SetNodeForRoom(ctx context.Context, roomName livekit.RoomName, nodeId livekit.NodeID) error
ClearRoomState(ctx context.Context, roomName livekit.RoomName) error
GetRegion() string
Start() error
Drain()
Stop()
}
Router allows multiple nodes to coordinate the participant session
func CreateRouter ¶ added in v0.13.5
func CreateRouter( rc redis.UniversalClient, node LocalNode, signalClient SignalClient, roomManagerClient RoomManagerClient, kps rpc.KeepalivePubSub, nodeStatsConfig config.NodeStatsConfig, ) Router
type SignalClient ¶ added in v1.4.0
type SignalClient interface {
ActiveCount() int
StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, nodeID livekit.NodeID) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error)
}
func NewSignalClient ¶ added in v1.4.0
func NewSignalClient(nodeID livekit.NodeID, bus psrpc.MessageBus, config config.SignalRelayConfig) (SignalClient, error)
type SignalMessageReader ¶ added in v1.4.2
type SignalMessageReader[RecvType RelaySignalMessage] interface { Read(msg RecvType) ([]proto.Message, error) }
type SignalMessageWriter ¶ added in v1.4.2
type SignalMessageWriter[SendType RelaySignalMessage] interface { Write(seq uint64, close bool, msgs []proto.Message) SendType }
type SignalSinkParams ¶ added in v1.4.2
type SignalSinkParams[SendType, RecvType RelaySignalMessage] struct { Stream psrpc.Stream[SendType, RecvType] Logger logger.Logger Config config.SignalRelayConfig Writer SignalMessageWriter[SendType] CloseOnFailure bool BlockOnClose bool ConnectionID livekit.ConnectionID }
type StartParticipantSignalResults ¶ added in v1.5.2
type StartParticipantSignalResults struct {
ConnectionID livekit.ConnectionID
RequestSink MessageSink
ResponseSource MessageSource
NodeID livekit.NodeID
NodeSelectionReason string
}
Source Files
¶
Click to show internal directories.
Click to hide internal directories.