routing

package
v1.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2023 License: Apache-2.0 Imports: 25 Imported by: 12

Documentation

Index

Constants

View Source
const (
	// hash of node_id => Node proto
	NodesKey = "nodes"

	// hash of room_name => node_id
	NodeRoomKey = "room_node_map"
)
View Source
const DefaultMessageChannelSize = 200

Variables

View Source
var (
	ErrNotFound             = errors.New("could not find object")
	ErrIPNotSet             = errors.New("ip address is required and not set")
	ErrHandlerNotDefined    = errors.New("handler not defined")
	ErrIncorrectRTCNode     = errors.New("current node isn't the RTC node for the room")
	ErrNodeNotFound         = errors.New("could not locate the node")
	ErrNodeLimitReached     = errors.New("reached configured limit for node")
	ErrInvalidRouterMessage = errors.New("invalid router message")
	ErrChannelClosed        = errors.New("channel closed")
	ErrChannelFull          = errors.New("channel is full")

	// errors when starting signal connection
	ErrRequestChannelClosed       = errors.New("request channel closed")
	ErrCouldNotMigrateParticipant = errors.New("could not migrate participant")
	ErrClientInfoNotSet           = errors.New("client info not set")
)
View Source
var ErrSignalMessageDropped = errors.New("signal message dropped")
View Source
var ErrSignalWriteFailed = errors.New("signal write failed")

Functions

func CopySignalStreamToMessageChannel added in v1.4.2

func CopySignalStreamToMessageChannel[SendType, RecvType RelaySignalMessage](
	stream psrpc.Stream[SendType, RecvType],
	ch *MessageChannel,
	reader SignalMessageReader[RecvType],
	config config.SignalRelayConfig,
) error

func ParticipantKey added in v0.11.1

func ParticipantKey(roomName livekit.RoomName, identity livekit.ParticipantIdentity) livekit.ParticipantKey

func ParticipantKeyLegacy added in v1.4.1

func ParticipantKeyLegacy(roomName livekit.RoomName, identity livekit.ParticipantIdentity) livekit.ParticipantKey

Types

type LocalNode

type LocalNode *livekit.Node

func NewLocalNode

func NewLocalNode(conf *config.Config) (LocalNode, error)

type LocalRouter

type LocalRouter struct {
	// contains filtered or unexported fields
}

a router of messages on the same node, basic implementation for local testing

func NewLocalRouter

func NewLocalRouter(currentNode LocalNode, signalClient SignalClient) *LocalRouter

func (*LocalRouter) ClearRoomState

func (r *LocalRouter) ClearRoomState(_ context.Context, _ livekit.RoomName) error

func (*LocalRouter) Drain added in v0.13.5

func (r *LocalRouter) Drain()

func (*LocalRouter) GetNode

func (r *LocalRouter) GetNode(nodeID livekit.NodeID) (*livekit.Node, error)

func (*LocalRouter) GetNodeForRoom

func (r *LocalRouter) GetNodeForRoom(_ context.Context, _ livekit.RoomName) (*livekit.Node, error)

func (*LocalRouter) GetRegion added in v0.15.7

func (r *LocalRouter) GetRegion() string

func (*LocalRouter) ListNodes

func (r *LocalRouter) ListNodes() ([]*livekit.Node, error)

func (*LocalRouter) OnNewParticipantRTC

func (r *LocalRouter) OnNewParticipantRTC(callback NewParticipantCallback)

func (*LocalRouter) OnRTCMessage

func (r *LocalRouter) OnRTCMessage(callback RTCMessageCallback)

func (*LocalRouter) RegisterNode

func (r *LocalRouter) RegisterNode() error

func (*LocalRouter) RemoveDeadNodes

func (r *LocalRouter) RemoveDeadNodes() error

func (*LocalRouter) SetNodeForRoom

func (r *LocalRouter) SetNodeForRoom(_ context.Context, _ livekit.RoomName, _ livekit.NodeID) error

func (*LocalRouter) Start

func (r *LocalRouter) Start() error

func (*LocalRouter) StartParticipantSignal

func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)

func (*LocalRouter) StartParticipantSignalWithNodeID added in v1.4.0

func (r *LocalRouter) StartParticipantSignalWithNodeID(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, nodeID livekit.NodeID) (res StartParticipantSignalResults, err error)

func (*LocalRouter) Stop

func (r *LocalRouter) Stop()

func (*LocalRouter) UnregisterNode

func (r *LocalRouter) UnregisterNode() error

func (*LocalRouter) WriteNodeRTC added in v0.14.2

func (r *LocalRouter) WriteNodeRTC(_ context.Context, _ string, msg *livekit.RTCNodeMessage) error

func (*LocalRouter) WriteParticipantRTC added in v0.14.2

func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error

func (*LocalRouter) WriteRoomRTC added in v0.14.2

func (r *LocalRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error

type MessageChannel

type MessageChannel struct {
	// contains filtered or unexported fields
}

func NewDefaultMessageChannel added in v1.4.0

func NewDefaultMessageChannel(connectionID livekit.ConnectionID) *MessageChannel

func NewMessageChannel

func NewMessageChannel(connectionID livekit.ConnectionID, size int) *MessageChannel

func (*MessageChannel) Close

func (m *MessageChannel) Close()

func (*MessageChannel) ConnectionID added in v1.4.4

func (m *MessageChannel) ConnectionID() livekit.ConnectionID

func (*MessageChannel) IsClosed added in v0.13.1

func (m *MessageChannel) IsClosed() bool

func (*MessageChannel) OnClose

func (m *MessageChannel) OnClose(f func())

func (*MessageChannel) ReadChan

func (m *MessageChannel) ReadChan() <-chan proto.Message

func (*MessageChannel) WriteMessage

func (m *MessageChannel) WriteMessage(msg proto.Message) error

type MessageRouter added in v0.14.2

type MessageRouter interface {
	// StartParticipantSignal participant signal connection is ready to start
	StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)

	// Write a message to a participant or room
	WriteParticipantRTC(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error
	WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error
}

type MessageSink

type MessageSink interface {
	WriteMessage(msg proto.Message) error
	IsClosed() bool
	Close()
	ConnectionID() livekit.ConnectionID
}

MessageSink is an abstraction for writing protobuf messages and having them read by a MessageSource, potentially on a different node via a transport

func NewSignalMessageSink added in v1.4.2

func NewSignalMessageSink[SendType, RecvType RelaySignalMessage](params SignalSinkParams[SendType, RecvType]) MessageSink

type MessageSource

type MessageSource interface {
	// ReadChan exposes a one way channel to make it easier to use with select
	ReadChan() <-chan proto.Message
	IsClosed() bool
	Close()
	ConnectionID() livekit.ConnectionID
}

type NewParticipantCallback

type NewParticipantCallback func(
	ctx context.Context,
	roomName livekit.RoomName,
	pi ParticipantInit,
	requestSource MessageSource,
	responseSink MessageSink,
) error

type ParticipantInit

type ParticipantInit struct {
	Identity             livekit.ParticipantIdentity
	Name                 livekit.ParticipantName
	Reconnect            bool
	ReconnectReason      livekit.ReconnectReason
	AutoSubscribe        bool
	Client               *livekit.ClientInfo
	Grants               *auth.ClaimGrants
	Region               string
	AdaptiveStream       bool
	ID                   livekit.ParticipantID
	SubscriberAllowPause *bool
}

func ParticipantInitFromStartSession added in v0.15.7

func ParticipantInitFromStartSession(ss *livekit.StartSession, region string) (*ParticipantInit, error)

func (*ParticipantInit) ToStartSession added in v0.15.7

func (pi *ParticipantInit) ToStartSession(roomName livekit.RoomName, connectionID livekit.ConnectionID) (*livekit.StartSession, error)

type RTCMessageCallback

type RTCMessageCallback func(
	ctx context.Context,
	roomName livekit.RoomName,
	identity livekit.ParticipantIdentity,
	msg *livekit.RTCNodeMessage,
)

type RTCNodeSink

type RTCNodeSink struct {
	// contains filtered or unexported fields
}

func NewRTCNodeSink

func NewRTCNodeSink(
	rc redis.UniversalClient,
	nodeID livekit.NodeID,
	connectionID livekit.ConnectionID,
	participantKey livekit.ParticipantKey,
	participantKeyB62 livekit.ParticipantKey,
) *RTCNodeSink

func (*RTCNodeSink) Close

func (s *RTCNodeSink) Close()

func (*RTCNodeSink) ConnectionID added in v1.4.4

func (s *RTCNodeSink) ConnectionID() livekit.ConnectionID

func (*RTCNodeSink) IsClosed added in v1.3.4

func (s *RTCNodeSink) IsClosed() bool

func (*RTCNodeSink) OnClose

func (s *RTCNodeSink) OnClose(f func())

func (*RTCNodeSink) WriteMessage

func (s *RTCNodeSink) WriteMessage(msg proto.Message) error

type RedisRouter

type RedisRouter struct {
	*LocalRouter
	// contains filtered or unexported fields
}

RedisRouter uses Redis pub/sub to route signaling messages across different nodes It relies on the RTC node to be the primary driver of the participant connection. Because

func NewRedisRouter

func NewRedisRouter(config *config.Config, lr *LocalRouter, rc redis.UniversalClient) *RedisRouter

func (*RedisRouter) ClearRoomState

func (r *RedisRouter) ClearRoomState(_ context.Context, roomName livekit.RoomName) error

func (*RedisRouter) Drain added in v0.13.5

func (r *RedisRouter) Drain()

func (*RedisRouter) GetNode

func (r *RedisRouter) GetNode(nodeID livekit.NodeID) (*livekit.Node, error)

func (*RedisRouter) GetNodeForRoom

func (r *RedisRouter) GetNodeForRoom(_ context.Context, roomName livekit.RoomName) (*livekit.Node, error)

func (*RedisRouter) ListNodes

func (r *RedisRouter) ListNodes() ([]*livekit.Node, error)

func (*RedisRouter) RegisterNode

func (r *RedisRouter) RegisterNode() error

func (*RedisRouter) RemoveDeadNodes

func (r *RedisRouter) RemoveDeadNodes() error

func (*RedisRouter) SetNodeForRoom

func (r *RedisRouter) SetNodeForRoom(_ context.Context, roomName livekit.RoomName, nodeID livekit.NodeID) error

func (*RedisRouter) SetParticipantRTCNode added in v1.4.1

func (r *RedisRouter) SetParticipantRTCNode(participantKey livekit.ParticipantKey, participantKeyB62 livekit.ParticipantKey, nodeID string) error

func (*RedisRouter) Start

func (r *RedisRouter) Start() error

func (*RedisRouter) StartParticipantSignal

func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)

StartParticipantSignal signal connection sets up paths to the RTC node, and starts to route messages to that message queue

func (*RedisRouter) Stop

func (r *RedisRouter) Stop()

func (*RedisRouter) UnregisterNode

func (r *RedisRouter) UnregisterNode() error

func (*RedisRouter) WriteNodeRTC added in v0.14.2

func (r *RedisRouter) WriteNodeRTC(_ context.Context, rtcNodeID string, msg *livekit.RTCNodeMessage) error

func (*RedisRouter) WriteParticipantRTC added in v0.14.2

func (r *RedisRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error

func (*RedisRouter) WriteRoomRTC added in v0.14.2

func (r *RedisRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error

type RelaySignalMessage added in v1.4.2

type RelaySignalMessage interface {
	proto.Message
	GetSeq() uint64
	GetClose() bool
}

type Router

type Router interface {
	MessageRouter

	RegisterNode() error
	UnregisterNode() error
	RemoveDeadNodes() error

	ListNodes() ([]*livekit.Node, error)

	GetNodeForRoom(ctx context.Context, roomName livekit.RoomName) (*livekit.Node, error)
	SetNodeForRoom(ctx context.Context, roomName livekit.RoomName, nodeId livekit.NodeID) error
	ClearRoomState(ctx context.Context, roomName livekit.RoomName) error

	GetRegion() string

	Start() error
	Drain()
	Stop()

	// OnNewParticipantRTC is called to start a new participant's RTC connection
	OnNewParticipantRTC(callback NewParticipantCallback)

	// OnRTCMessage is called to execute actions on the RTC node
	OnRTCMessage(callback RTCMessageCallback)
}

Router allows multiple nodes to coordinate the participant session

func CreateRouter added in v0.13.5

func CreateRouter(config *config.Config, rc redis.UniversalClient, node LocalNode, signalClient SignalClient) Router

type SignalClient added in v1.4.0

type SignalClient interface {
	ActiveCount() int
	StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, nodeID livekit.NodeID) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error)
}

func NewSignalClient added in v1.4.0

func NewSignalClient(nodeID livekit.NodeID, bus psrpc.MessageBus, config config.SignalRelayConfig) (SignalClient, error)

type SignalMessageReader added in v1.4.2

type SignalMessageReader[RecvType RelaySignalMessage] interface {
	Read(msg RecvType) ([]proto.Message, error)
}

type SignalMessageWriter added in v1.4.2

type SignalMessageWriter[SendType RelaySignalMessage] interface {
	Write(seq uint64, close bool, msgs []proto.Message) SendType
}

type SignalNodeSink

type SignalNodeSink struct {
	// contains filtered or unexported fields
}

func NewSignalNodeSink

func NewSignalNodeSink(rc redis.UniversalClient, nodeID livekit.NodeID, connectionID livekit.ConnectionID) *SignalNodeSink

func (*SignalNodeSink) Close

func (s *SignalNodeSink) Close()

func (*SignalNodeSink) ConnectionID added in v1.4.4

func (s *SignalNodeSink) ConnectionID() livekit.ConnectionID

func (*SignalNodeSink) IsClosed added in v1.3.4

func (s *SignalNodeSink) IsClosed() bool

func (*SignalNodeSink) OnClose

func (s *SignalNodeSink) OnClose(f func())

func (*SignalNodeSink) WriteMessage

func (s *SignalNodeSink) WriteMessage(msg proto.Message) error

type SignalSinkParams added in v1.4.2

type SignalSinkParams[SendType, RecvType RelaySignalMessage] struct {
	Stream         psrpc.Stream[SendType, RecvType]
	Logger         logger.Logger
	Config         config.SignalRelayConfig
	Writer         SignalMessageWriter[SendType]
	CloseOnFailure bool
	BlockOnClose   bool
	ConnectionID   livekit.ConnectionID
}

type StartParticipantSignalResults added in v1.5.2

type StartParticipantSignalResults struct {
	ConnectionID   livekit.ConnectionID
	RequestSink    MessageSink
	ResponseSource MessageSource
	NodeID         livekit.NodeID
}

Directories

Path Synopsis
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL