Documentation
¶
Overview ¶
Package buffer implements the Buffer Hub — the central in-memory ring buffer. It is the single data pipeline between Ingestor and all consumers (Transcoder, Publisher, DVR). Each stream has its own ring buffer; consumers subscribe and get an independent read cursor.
Index ¶
- func BestRenditionIndex(rends []RenditionPlayout) int
- func PlaybackBufferID(code domain.StreamCode, tc *domain.TranscoderConfig) domain.StreamCode
- func RawIngestBufferID(code domain.StreamCode) domain.StreamCode
- func RenditionBufferID(code domain.StreamCode, slug string) domain.StreamCode
- func VideoTrackSlug(index int) string
- type Packet
- type RenditionPlayout
- type Service
- func (s *Service) Create(id domain.StreamCode)
- func (s *Service) Delete(id domain.StreamCode)
- func (s *Service) Subscribe(id domain.StreamCode) (*Subscriber, error)
- func (s *Service) Unsubscribe(id domain.StreamCode, sub *Subscriber)
- func (s *Service) UnsubscribeAll(id domain.StreamCode)
- func (s *Service) Write(id domain.StreamCode, pkt Packet) error
- type Subscriber
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BestRenditionIndex ¶
func BestRenditionIndex(rends []RenditionPlayout) int
BestRenditionIndex picks the highest resolution (then bitrate) for single-bitrate protocols.
func PlaybackBufferID ¶
func PlaybackBufferID(code domain.StreamCode, tc *domain.TranscoderConfig) domain.StreamCode
PlaybackBufferID returns the buffer subscribers should use for single-rendition outputs (RTSP, RTMP play, SRT, push, DVR) — best ladder rung when transcoding, else the main stream code.
func RawIngestBufferID ¶
func RawIngestBufferID(code domain.StreamCode) domain.StreamCode
RawIngestBufferID is the Buffer Hub slot where ingest writes MPEG-TS when a transcoder republishes to the main stream buffer. The prefix is not allowed in API stream codes (ValidateStreamCode), so it cannot collide with user codes.
func RenditionBufferID ¶
func RenditionBufferID(code domain.StreamCode, slug string) domain.StreamCode
RenditionBufferID is the Buffer Hub id for one transcoded video profile (ABR ladder). Slug is always VideoTrackSlug(i) for the profile index i in the ladder.
func VideoTrackSlug ¶
VideoTrackSlug returns the stable path segment for the profile at index (0-based): track_1, track_2, ….
Types ¶
type Packet ¶
Packet is the Buffer Hub wire format. Exactly one of TS or AV should be set per write:
- TS: raw MPEG-TS chunk (transcoder output, push ingest, or legacy passthrough).
- AV: one elementary-stream access unit (ingest pull after demux or native ES readers).
type RenditionPlayout ¶
type RenditionPlayout struct {
Slug string
BufferID domain.StreamCode
Width int
Height int
BitrateKbps int
}
RenditionPlayout describes one ladder rung for publisher routing.
func RenditionsForTranscoder ¶
func RenditionsForTranscoder(code domain.StreamCode, tc *domain.TranscoderConfig) []RenditionPlayout
RenditionsForTranscoder returns ladder entries when transcoding is active (non-external). When video.copy is true or video.profiles is empty, a single passthrough rendition is returned (one worker: copy from raw ingest, no ABR ladder).
func (RenditionPlayout) BandwidthBps ¶
func (r RenditionPlayout) BandwidthBps() int
BandwidthBps returns EXT-X-STREAM-INF BANDWIDTH (bits/s).
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service manages ring buffers for all active streams.
func NewServiceForTesting ¶
NewServiceForTesting creates a Service without DI, for use in unit tests.
func (*Service) Create ¶
func (s *Service) Create(id domain.StreamCode)
Create initialises a ring buffer for the given stream.
func (*Service) Delete ¶
func (s *Service) Delete(id domain.StreamCode)
Delete tears down the ring buffer for a stream (call when stream is stopped). Closes every subscriber's channel BEFORE removing the map entry so consumers observe a clean EOF (`pkt, ok := <-sub.Recv(); !ok`) and can react — the mixer / copy taps in particular rely on this signal to retry against the freshly-created buffer when an upstream stream restarts. Without the channel-close, taps stay blocked on a stale ringBuffer indefinitely while the new ringBuffer for the same id receives packets they never see.
func (*Service) Subscribe ¶
func (s *Service) Subscribe(id domain.StreamCode) (*Subscriber, error)
Subscribe registers a new consumer for the stream's buffer. The caller must call Unsubscribe when done to avoid a goroutine/channel leak.
func (*Service) Unsubscribe ¶
func (s *Service) Unsubscribe(id domain.StreamCode, sub *Subscriber)
Unsubscribe removes a consumer and closes its channel.
func (*Service) UnsubscribeAll ¶ added in v0.0.22
func (s *Service) UnsubscribeAll(id domain.StreamCode)
UnsubscribeAll closes every subscriber's channel for the given buffer. Subscribers' next Recv will see ok=false (a clean EOF signal). The buffer itself is left in place — call Delete after if you also want it removed. No-op when the buffer doesn't exist.
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber is a read cursor into a stream's ring buffer.
func (*Subscriber) Recv ¶
func (s *Subscriber) Recv() <-chan Packet
Recv returns the channel from which the subscriber reads packets.