routing

package
v1.9.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2025 License: Apache-2.0 Imports: 25 Imported by: 12

Documentation

Index

Constants

View Source
const (
	// hash of node_id => Node proto
	NodesKey = "nodes"

	// hash of room_name => node_id
	NodeRoomKey = "room_node_map"
)
View Source
const DefaultMessageChannelSize = 200

Variables

View Source
var (
	ErrNotFound             = errors.New("could not find object")
	ErrIPNotSet             = errors.New("ip address is required and not set")
	ErrHandlerNotDefined    = errors.New("handler not defined")
	ErrIncorrectRTCNode     = errors.New("current node isn't the RTC node for the room")
	ErrNodeNotFound         = errors.New("could not locate the node")
	ErrNodeLimitReached     = errors.New("reached configured limit for node")
	ErrInvalidRouterMessage = errors.New("invalid router message")
	ErrChannelClosed        = errors.New("channel closed")
	ErrChannelFull          = errors.New("channel is full")

	// errors when starting signal connection
	ErrRequestChannelClosed       = errors.New("request channel closed")
	ErrCouldNotMigrateParticipant = errors.New("could not migrate participant")
	ErrClientInfoNotSet           = errors.New("client info not set")
)
View Source
var ErrSignalMessageDropped = errors.New("signal message dropped")
View Source
var ErrSignalWriteFailed = errors.New("signal write failed")

Functions

func CopySignalStreamToMessageChannel added in v1.4.2

func CopySignalStreamToMessageChannel[SendType, RecvType RelaySignalMessage](
	stream psrpc.Stream[SendType, RecvType],
	ch *MessageChannel,
	reader SignalMessageReader[RecvType],
	config config.SignalRelayConfig,
	promSignalSuccess func(),
	promSignalFailure func(),
) error

Types

type LocalNode

type LocalNode interface {
	Clone() *livekit.Node
	SetNodeID(nodeID livekit.NodeID)
	NodeID() livekit.NodeID
	NodeType() livekit.NodeType
	NodeIP() string
	Region() string
	SetState(state livekit.NodeState)
	SetStats(stats *livekit.NodeStats)
	UpdateNodeStats() bool
	SecondsSinceNodeStatsUpdate() float64
}

type LocalNodeImpl added in v1.8.1

type LocalNodeImpl struct {
	// contains filtered or unexported fields
}

func NewLocalNode

func NewLocalNode(conf *config.Config) (*LocalNodeImpl, error)

func NewLocalNodeFromNodeProto added in v1.8.1

func NewLocalNodeFromNodeProto(node *livekit.Node) (*LocalNodeImpl, error)

func (*LocalNodeImpl) Clone added in v1.8.1

func (l *LocalNodeImpl) Clone() *livekit.Node

func (*LocalNodeImpl) NodeID added in v1.8.1

func (l *LocalNodeImpl) NodeID() livekit.NodeID

func (*LocalNodeImpl) NodeIP added in v1.8.1

func (l *LocalNodeImpl) NodeIP() string

func (*LocalNodeImpl) NodeType added in v1.8.1

func (l *LocalNodeImpl) NodeType() livekit.NodeType

func (*LocalNodeImpl) Region added in v1.8.1

func (l *LocalNodeImpl) Region() string

func (*LocalNodeImpl) SecondsSinceNodeStatsUpdate added in v1.8.1

func (l *LocalNodeImpl) SecondsSinceNodeStatsUpdate() float64

func (*LocalNodeImpl) SetNodeID added in v1.8.1

func (l *LocalNodeImpl) SetNodeID(nodeID livekit.NodeID)

for testing only

func (*LocalNodeImpl) SetState added in v1.8.1

func (l *LocalNodeImpl) SetState(state livekit.NodeState)

func (*LocalNodeImpl) SetStats added in v1.8.1

func (l *LocalNodeImpl) SetStats(stats *livekit.NodeStats)

for testing only

func (*LocalNodeImpl) UpdateNodeStats added in v1.8.1

func (l *LocalNodeImpl) UpdateNodeStats() bool

type LocalRouter

type LocalRouter struct {
	// contains filtered or unexported fields
}

a router of messages on the same node, basic implementation for local testing

func NewLocalRouter

func NewLocalRouter(
	currentNode LocalNode,
	signalClient SignalClient,
	roomManagerClient RoomManagerClient,
	nodeStatsConfig config.NodeStatsConfig,
) *LocalRouter

func (*LocalRouter) ClearRoomState

func (r *LocalRouter) ClearRoomState(_ context.Context, _ livekit.RoomName) error

func (*LocalRouter) CreateRoom added in v1.8.0

func (r *LocalRouter) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (res *livekit.Room, err error)

func (*LocalRouter) CreateRoomWithNodeID added in v1.8.0

func (r *LocalRouter) CreateRoomWithNodeID(ctx context.Context, req *livekit.CreateRoomRequest, nodeID livekit.NodeID) (res *livekit.Room, err error)

func (*LocalRouter) Drain added in v0.13.5

func (r *LocalRouter) Drain()

func (*LocalRouter) GetNode

func (r *LocalRouter) GetNode(nodeID livekit.NodeID) (*livekit.Node, error)

func (*LocalRouter) GetNodeForRoom

func (r *LocalRouter) GetNodeForRoom(_ context.Context, _ livekit.RoomName) (*livekit.Node, error)

func (*LocalRouter) GetRegion added in v0.15.7

func (r *LocalRouter) GetRegion() string

func (*LocalRouter) ListNodes

func (r *LocalRouter) ListNodes() ([]*livekit.Node, error)

func (*LocalRouter) RegisterNode

func (r *LocalRouter) RegisterNode() error

func (*LocalRouter) RemoveDeadNodes

func (r *LocalRouter) RemoveDeadNodes() error

func (*LocalRouter) SetNodeForRoom

func (r *LocalRouter) SetNodeForRoom(_ context.Context, _ livekit.RoomName, _ livekit.NodeID) error

func (*LocalRouter) Start

func (r *LocalRouter) Start() error

func (*LocalRouter) StartParticipantSignal

func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)

func (*LocalRouter) StartParticipantSignalWithNodeID added in v1.4.0

func (r *LocalRouter) StartParticipantSignalWithNodeID(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, nodeID livekit.NodeID) (res StartParticipantSignalResults, err error)

func (*LocalRouter) Stop

func (r *LocalRouter) Stop()

func (*LocalRouter) UnregisterNode

func (r *LocalRouter) UnregisterNode() error

type MessageChannel

type MessageChannel struct {
	// contains filtered or unexported fields
}

func NewDefaultMessageChannel added in v1.4.0

func NewDefaultMessageChannel(connectionID livekit.ConnectionID) *MessageChannel

func NewMessageChannel

func NewMessageChannel(connectionID livekit.ConnectionID, size int) *MessageChannel

func (*MessageChannel) Close

func (m *MessageChannel) Close()

func (*MessageChannel) ConnectionID added in v1.4.4

func (m *MessageChannel) ConnectionID() livekit.ConnectionID

func (*MessageChannel) IsClosed added in v0.13.1

func (m *MessageChannel) IsClosed() bool

func (*MessageChannel) OnClose

func (m *MessageChannel) OnClose(f func())

func (*MessageChannel) ReadChan

func (m *MessageChannel) ReadChan() <-chan proto.Message

func (*MessageChannel) WriteMessage

func (m *MessageChannel) WriteMessage(msg proto.Message) error

type MessageRouter added in v0.14.2

type MessageRouter interface {
	// CreateRoom starts an rtc room
	CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (res *livekit.Room, err error)

	// StartParticipantSignal participant signal connection is ready to start
	StartParticipantSignal(
		ctx context.Context,
		roomName livekit.RoomName,
		pi ParticipantInit,
	) (res StartParticipantSignalResults, err error)
}

type MessageSink

type MessageSink interface {
	WriteMessage(msg proto.Message) error
	IsClosed() bool
	Close()
	ConnectionID() livekit.ConnectionID
}

MessageSink is an abstraction for writing protobuf messages and having them read by a MessageSource, potentially on a different node via a transport

func NewSignalMessageSink added in v1.4.2

func NewSignalMessageSink[SendType, RecvType RelaySignalMessage](params SignalSinkParams[SendType, RecvType]) MessageSink

type MessageSource

type MessageSource interface {
	// ReadChan exposes a one way channel to make it easier to use with select
	ReadChan() <-chan proto.Message
	IsClosed() bool
	Close()
	ConnectionID() livekit.ConnectionID
}

type NodeStats

type NodeStats struct {
	// contains filtered or unexported fields
}

func NewNodeStats added in v1.9.0

func NewNodeStats(conf *config.NodeStatsConfig, startedAt int64) *NodeStats

func (*NodeStats) UpdateAndGetNodeStats added in v1.9.0

func (n *NodeStats) UpdateAndGetNodeStats() (*livekit.NodeStats, error)

func (*NodeStats) UpdateConfig added in v1.9.0

func (n *NodeStats) UpdateConfig(conf *config.NodeStatsConfig)

type NullMessageSink added in v1.8.1

type NullMessageSink struct {
	// contains filtered or unexported fields
}

func NewNullMessageSink added in v1.8.1

func NewNullMessageSink(connID livekit.ConnectionID) *NullMessageSink

func (*NullMessageSink) Close added in v1.8.1

func (n *NullMessageSink) Close()

func (*NullMessageSink) ConnectionID added in v1.8.1

func (n *NullMessageSink) ConnectionID() livekit.ConnectionID

func (*NullMessageSink) IsClosed added in v1.8.1

func (n *NullMessageSink) IsClosed() bool

func (*NullMessageSink) WriteMessage added in v1.8.1

func (n *NullMessageSink) WriteMessage(_msg proto.Message) error

type NullMessageSource added in v1.8.1

type NullMessageSource struct {
	// contains filtered or unexported fields
}

func NewNullMessageSource added in v1.8.1

func NewNullMessageSource(connID livekit.ConnectionID) *NullMessageSource

func (*NullMessageSource) Close added in v1.8.1

func (n *NullMessageSource) Close()

func (*NullMessageSource) ConnectionID added in v1.8.1

func (n *NullMessageSource) ConnectionID() livekit.ConnectionID

func (*NullMessageSource) IsClosed added in v1.8.1

func (n *NullMessageSource) IsClosed() bool

func (*NullMessageSource) ReadChan added in v1.8.1

func (n *NullMessageSource) ReadChan() <-chan proto.Message

type ParticipantInit

type ParticipantInit struct {
	Identity                livekit.ParticipantIdentity
	Name                    livekit.ParticipantName
	Reconnect               bool
	ReconnectReason         livekit.ReconnectReason
	AutoSubscribe           bool
	Client                  *livekit.ClientInfo
	Grants                  *auth.ClaimGrants
	Region                  string
	AdaptiveStream          bool
	ID                      livekit.ParticipantID
	SubscriberAllowPause    *bool
	DisableICELite          bool
	CreateRoom              *livekit.CreateRoomRequest
	AddTrackRequests        []*livekit.AddTrackRequest
	PublisherOffer          *livekit.SessionDescription
	SyncState               *livekit.SyncState
	UseSinglePeerConnection bool
}

func ParticipantInitFromStartSession added in v0.15.7

func ParticipantInitFromStartSession(ss *livekit.StartSession, region string) (*ParticipantInit, error)

func (*ParticipantInit) MarshalLogObject added in v1.8.1

func (pi *ParticipantInit) MarshalLogObject(e zapcore.ObjectEncoder) error

func (*ParticipantInit) ToStartSession added in v0.15.7

func (pi *ParticipantInit) ToStartSession(roomName livekit.RoomName, connectionID livekit.ConnectionID) (*livekit.StartSession, error)

type RedisRouter

type RedisRouter struct {
	*LocalRouter
	// contains filtered or unexported fields
}

RedisRouter uses Redis pub/sub to route signaling messages across different nodes It relies on the RTC node to be the primary driver of the participant connection. Because

func (*RedisRouter) ClearRoomState

func (r *RedisRouter) ClearRoomState(_ context.Context, roomName livekit.RoomName) error

func (*RedisRouter) CreateRoom added in v1.8.0

func (r *RedisRouter) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (res *livekit.Room, err error)

func (*RedisRouter) Drain added in v0.13.5

func (r *RedisRouter) Drain()

func (*RedisRouter) GetNode

func (r *RedisRouter) GetNode(nodeID livekit.NodeID) (*livekit.Node, error)

func (*RedisRouter) GetNodeForRoom

func (r *RedisRouter) GetNodeForRoom(_ context.Context, roomName livekit.RoomName) (*livekit.Node, error)

GetNodeForRoom finds the node where the room is hosted at

func (*RedisRouter) ListNodes

func (r *RedisRouter) ListNodes() ([]*livekit.Node, error)

func (*RedisRouter) RegisterNode

func (r *RedisRouter) RegisterNode() error

func (*RedisRouter) RemoveDeadNodes

func (r *RedisRouter) RemoveDeadNodes() error

func (*RedisRouter) SetNodeForRoom

func (r *RedisRouter) SetNodeForRoom(_ context.Context, roomName livekit.RoomName, nodeID livekit.NodeID) error

func (*RedisRouter) Start

func (r *RedisRouter) Start() error

func (*RedisRouter) StartParticipantSignal

func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)

StartParticipantSignal signal connection sets up paths to the RTC node, and starts to route messages to that message queue

func (*RedisRouter) Stop

func (r *RedisRouter) Stop()

func (*RedisRouter) UnregisterNode

func (r *RedisRouter) UnregisterNode() error

type RelaySignalMessage added in v1.4.2

type RelaySignalMessage interface {
	proto.Message
	GetSeq() uint64
	GetClose() bool
}

type RoomManagerClient added in v1.8.0

type RoomManagerClient interface {
	rpc.TypedRoomManagerClient
}

func NewRoomManagerClient added in v1.8.0

func NewRoomManagerClient(clientParams rpc.ClientParams, config config.RoomConfig) (RoomManagerClient, error)

type Router

type Router interface {
	MessageRouter

	RegisterNode() error
	UnregisterNode() error
	RemoveDeadNodes() error

	ListNodes() ([]*livekit.Node, error)

	GetNodeForRoom(ctx context.Context, roomName livekit.RoomName) (*livekit.Node, error)
	SetNodeForRoom(ctx context.Context, roomName livekit.RoomName, nodeId livekit.NodeID) error
	ClearRoomState(ctx context.Context, roomName livekit.RoomName) error

	GetRegion() string

	Start() error
	Drain()
	Stop()
}

Router allows multiple nodes to coordinate the participant session

func CreateRouter added in v0.13.5

func CreateRouter(
	rc redis.UniversalClient,
	node LocalNode,
	signalClient SignalClient,
	roomManagerClient RoomManagerClient,
	kps rpc.KeepalivePubSub,
	nodeStatsConfig config.NodeStatsConfig,
) Router

type SignalClient added in v1.4.0

type SignalClient interface {
	ActiveCount() int
	StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, nodeID livekit.NodeID) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error)
}

func NewSignalClient added in v1.4.0

func NewSignalClient(nodeID livekit.NodeID, bus psrpc.MessageBus, config config.SignalRelayConfig) (SignalClient, error)

type SignalMessageReader added in v1.4.2

type SignalMessageReader[RecvType RelaySignalMessage] interface {
	Read(msg RecvType) ([]proto.Message, error)
}

type SignalMessageWriter added in v1.4.2

type SignalMessageWriter[SendType RelaySignalMessage] interface {
	Write(seq uint64, close bool, msgs []proto.Message) SendType
}

type SignalSinkParams added in v1.4.2

type SignalSinkParams[SendType, RecvType RelaySignalMessage] struct {
	Stream         psrpc.Stream[SendType, RecvType]
	Logger         logger.Logger
	Config         config.SignalRelayConfig
	Writer         SignalMessageWriter[SendType]
	CloseOnFailure bool
	BlockOnClose   bool
	ConnectionID   livekit.ConnectionID
}

type StartParticipantSignalResults added in v1.5.2

type StartParticipantSignalResults struct {
	ConnectionID        livekit.ConnectionID
	RequestSink         MessageSink
	ResponseSource      MessageSource
	NodeID              livekit.NodeID
	NodeSelectionReason string
}

Directories

Path Synopsis
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL