Documentation
¶
Index ¶
- Variables
- type FrameKind
- type PlayFunc
- type PlayInfo
- type RTMPFrameWriter
- type RTMPServer
- func (s *RTMPServer) ClearStreamCallbacks(streamID domain.StreamCode)
- func (s *RTMPServer) OnDelRtmpPubSession(session *rtmp.ServerSession)
- func (s *RTMPServer) OnDelRtmpSubSession(session *rtmp.ServerSession)
- func (s *RTMPServer) OnNewRtmpPubSession(session *rtmp.ServerSession) error
- func (s *RTMPServer) OnNewRtmpSubSession(session *rtmp.ServerSession) error
- func (s *RTMPServer) OnRtmpConnect(session *rtmp.ServerSession, _ rtmp.ObjectPairArray)
- func (s *RTMPServer) Run(ctx context.Context) error
- func (s *RTMPServer) SetPlayFunc(fn PlayFunc)
- func (s *RTMPServer) SetStreamCallbacks(streamID domain.StreamCode, cb StreamCallbacks)
- type Registry
- type StreamCallbacks
Constants ¶
This section is empty.
Variables ¶
var ErrStreamAlreadyActive = errors.New("ingestor: stream already has an active pusher")
ErrStreamAlreadyActive is returned by Registry.Acquire when another pusher is already connected to the requested stream slot.
Functions ¶
This section is empty.
Types ¶
type FrameKind ¶ added in v0.0.74
type FrameKind int
FrameKind classifies an access unit so the writer doesn't depend on the source TS demuxer's codec enum.
type PlayFunc ¶
type PlayFunc func(ctx context.Context, key string, info PlayInfo, session *rtmp.ServerSession) error
PlayFunc is invoked when an external play client connects to the RTMP server. Implementations should subscribe to the Buffer Hub for `key` and feed the lal session frames until ctx is cancelled or the session disconnects.
Returning a non-nil error causes the session to be torn down with a "stream not found" status code.
type PlayInfo ¶ added in v0.0.42
type PlayInfo struct {
// RemoteAddr is the peer's "ip:port" string from the underlying TCP
// connection. Useful for the play-sessions tracker / abuse mitigation.
RemoteAddr string
}
PlayInfo describes the remote peer of an external RTMP play client.
type RTMPFrameWriter ¶ added in v0.0.74
type RTMPFrameWriter struct {
// contains filtered or unexported fields
}
RTMPFrameWriter sends one access unit at a time to a lal ServerSession, emitting onMetaData and AVC / AAC sequence headers automatically on the first frame of each codec.
func NewRTMPFrameWriter ¶ added in v0.0.74
func NewRTMPFrameWriter(session *rtmp.ServerSession) *RTMPFrameWriter
NewRTMPFrameWriter wraps session for per-frame writes.
func (*RTMPFrameWriter) WriteFrame ¶ added in v0.0.74
func (w *RTMPFrameWriter) WriteFrame(kind FrameKind, data []byte, pts, dts uint32) error
WriteFrame sends one access unit. dts/pts are RTMP wire timestamps in milliseconds. Returns an error if the underlying TCP write fails.
Audio frames received before the AVC sequence header (and onMetaData) have been sent are dropped: strict players parse onMetaData first to pick the audio decoder, and tags arriving before it are at best ignored, at worst cause re-buffering / resync. The publisher already holds audio until the first video keyframe so this drop is rare.
type RTMPServer ¶
type RTMPServer struct {
// contains filtered or unexported fields
}
RTMPServer accepts RTMP push connections from encoders, validates them against the push Registry, and writes decoded AVPackets into the Buffer Hub. External RTMP play clients are served via the optional PlayFunc.
func NewRTMPServer ¶
func NewRTMPServer(addr string, registry Registry) (*RTMPServer, error)
NewRTMPServer creates an RTMPServer. `addr` is the TCP bind address (e.g. ":1935"). `registry` resolves push keys to buffer hub targets.
func (*RTMPServer) ClearStreamCallbacks ¶ added in v0.0.75
func (s *RTMPServer) ClearStreamCallbacks(streamID domain.StreamCode)
ClearStreamCallbacks removes the callbacks for streamID. Safe to call for an unregistered stream; subsequent push sessions for streamID will simply have no observer hooks (the buffer-write path is unaffected).
func (*RTMPServer) OnDelRtmpPubSession ¶ added in v0.0.74
func (s *RTMPServer) OnDelRtmpPubSession(session *rtmp.ServerSession)
OnDelRtmpPubSession is invoked when a publish session ends (encoder disconnect, network error, server shutdown). Releases the registry slot so the next pusher can claim the key, and tells the manager the input has gone away so it can fail over.
func (*RTMPServer) OnDelRtmpSubSession ¶ added in v0.0.74
func (s *RTMPServer) OnDelRtmpSubSession(session *rtmp.ServerSession)
OnDelRtmpSubSession is invoked when a play session ends. Cancels the PlayFunc goroutine so it stops reading from the buffer hub.
func (*RTMPServer) OnNewRtmpPubSession ¶ added in v0.0.74
func (s *RTMPServer) OnNewRtmpPubSession(session *rtmp.ServerSession) error
OnNewRtmpPubSession is invoked when an encoder issues a `publish` command. We acquire the registry slot for the stream key and wire the session's AV-message observer to feed our converter.
Returning an error rejects the session — lal sends a NetStream.Publish.BadName status code and tears down the connection.
func (*RTMPServer) OnNewRtmpSubSession ¶ added in v0.0.74
func (s *RTMPServer) OnNewRtmpSubSession(session *rtmp.ServerSession) error
OnNewRtmpSubSession is invoked when a play client connects. We delegate to the registered PlayFunc (publisher side). If no PlayFunc is wired, reject — playback isn't available without the publisher being present.
func (*RTMPServer) OnRtmpConnect ¶ added in v0.0.74
func (s *RTMPServer) OnRtmpConnect(session *rtmp.ServerSession, _ rtmp.ObjectPairArray)
OnRtmpConnect is invoked once per accepted TCP connection at the end of the RTMP `connect` command. We log the remote app/tcUrl combo for ops and don't reject anything here — rejections happen at OnNewRtmpPubSession / OnNewRtmpSubSession when we know whether the key is registered.
func (*RTMPServer) Run ¶
func (s *RTMPServer) Run(ctx context.Context) error
Run binds the TCP listener and accepts RTMP connections until ctx is cancelled. lal's RunLoop blocks until the listener closes; we close the listener via Dispose() when ctx fires so RunLoop returns nil.
func (*RTMPServer) SetPlayFunc ¶
func (s *RTMPServer) SetPlayFunc(fn PlayFunc)
SetPlayFunc registers a handler for external RTMP play clients. Safe to call concurrently with Run.
func (*RTMPServer) SetStreamCallbacks ¶ added in v0.0.75
func (s *RTMPServer) SetStreamCallbacks(streamID domain.StreamCode, cb StreamCallbacks)
SetStreamCallbacks installs the per-session callback set for streamID. The ingestor.Service calls this once it has registered the routing slot (see startPushRegistration). Without callbacks the Stream Manager never sees that an encoder connected and the stream stays Exhausted.
Pass an empty StreamCallbacks{} to detach (or call ClearStreamCallbacks). Replaces any previous callbacks for the same stream.
type Registry ¶
type Registry interface {
// Lookup checks whether key is registered and returns its targets.
// It does NOT change the active-pusher state; use it for pre-flight
// checks (e.g. SRT HandleConnect) where you can't yet commit.
Lookup(key string) (bufferWriteID domain.StreamCode, streamID domain.StreamCode, buf *buffer.Service, err error)
// Acquire atomically checks that key is registered and that no other
// pusher is currently active, then marks the slot as occupied.
// Returns ErrStreamAlreadyActive if another pusher holds the slot.
Acquire(key string) (bufferWriteID domain.StreamCode, streamID domain.StreamCode, buf *buffer.Service, err error)
// Release clears the active-pusher mark so the next encoder can Acquire.
// It is safe to call Release even if Acquire was never called for key.
Release(key string)
}
Registry maps a stream key (the stream code) to the stream's buffer hub slot. Push servers (RTMP, SRT) use this to route incoming encoder connections.
A stream is eligible to receive a push connection only if it has been registered via ingestor.Service.Start with a publish:// input URL. Only one active pusher per stream is allowed at a time; subsequent connection attempts are rejected until the current pusher disconnects.
type StreamCallbacks ¶ added in v0.0.75
type StreamCallbacks struct {
// OnConnect fires once when the encoder finishes the publish handshake.
OnConnect func()
// OnPacket fires for every RTMP AV message — drives the manager's
// Active-status / clear-Exhausted logic.
OnPacket func()
// OnPacketBytes fires for every RTMP AV message with the payload byte
// count, for ingest bytes / packets metrics.
OnPacketBytes func(n int)
// OnMedia fires for every emitted AVPacket — drives the manager's
// per-track codec / bitrate / resolution panel.
OnMedia func(p *domain.AVPacket)
// OnDisconnect fires when the publish session ends (encoder closed,
// network error, server shutdown). The manager uses this to mark
// the input degraded and trigger failover.
OnDisconnect func(err error)
}
StreamCallbacks receives per-publish-session events for a single stream. The ingestor.Service registers these via SetStreamCallbacks at the same time it registers the routing slot — closures capture the input priority (and any other manager-side context) so the push server stays agnostic of those concerns. Without these the Stream Manager never sees that an encoder is connected and the stream stays "Exhausted" forever (Path B has no loopback pull worker to surface packets).
Any field may be nil. Callbacks fire on lal's read goroutine and must be cheap; defer heavy work.