buffer

package
v0.0.91 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2026 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package buffer implements the Buffer Hub — the central in-memory ring buffer. It is the single data pipeline between Ingestor and all consumers (Transcoder, Publisher, DVR). Each stream has its own ring buffer; consumers subscribe and get an independent read cursor.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BestRenditionIndex

func BestRenditionIndex(rends []RenditionPlayout) int

BestRenditionIndex picks the highest resolution (then bitrate) for single-bitrate protocols.

func PlaybackBufferID

func PlaybackBufferID(code domain.StreamCode, tc *domain.TranscoderConfig) domain.StreamCode

PlaybackBufferID returns the buffer subscribers should use for single-rendition outputs (RTSP, RTMP play, SRT, push, DVR) — best ladder rung when transcoding, else the main stream code.

func RawIngestBufferID

func RawIngestBufferID(code domain.StreamCode) domain.StreamCode

RawIngestBufferID is the Buffer Hub slot where ingest writes MPEG-TS when a transcoder republishes to the main stream buffer. The prefix is not allowed in API stream codes (ValidateStreamCode), so it cannot collide with user codes.

func RenditionBufferID

func RenditionBufferID(code domain.StreamCode, slug string) domain.StreamCode

RenditionBufferID is the Buffer Hub id for one transcoded video profile (ABR ladder). Slug is always VideoTrackSlug(i) for the profile index i in the ladder.

func VideoTrackSlug

func VideoTrackSlug(index int) string

VideoTrackSlug returns the stable path segment for the profile at index (0-based): track_1, track_2, ….

Types

type Packet

type Packet struct {
	TS []byte
	AV *domain.AVPacket
}

Packet is the Buffer Hub wire format. Exactly one of TS or AV should be set per write:

  • TS: raw MPEG-TS chunk (transcoder output, push ingest, or legacy passthrough).
  • AV: one elementary-stream access unit (ingest pull after demux or native ES readers).

func TSPacket

func TSPacket(b []byte) Packet

TSPacket wraps a raw MPEG-TS chunk.

type RenditionPlayout

type RenditionPlayout struct {
	Slug        string
	BufferID    domain.StreamCode
	Width       int
	Height      int
	BitrateKbps int
}

RenditionPlayout describes one ladder rung for publisher routing.

func RenditionsForTranscoder

func RenditionsForTranscoder(code domain.StreamCode, tc *domain.TranscoderConfig) []RenditionPlayout

RenditionsForTranscoder returns ladder entries when transcoding is active (non-external). When video.copy is true or video.profiles is empty, a single passthrough rendition is returned (one worker: copy from raw ingest, no ABR ladder).

func (RenditionPlayout) BandwidthBps

func (r RenditionPlayout) BandwidthBps() int

BandwidthBps returns EXT-X-STREAM-INF BANDWIDTH (bits/s).

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service manages ring buffers for all active streams.

func New

func New(i do.Injector) (*Service, error)

New creates a Service and registers it with the DI injector.

func NewServiceForTesting

func NewServiceForTesting(capacity int) *Service

NewServiceForTesting creates a Service without DI, for use in unit tests.

func (*Service) Create

func (s *Service) Create(id domain.StreamCode)

Create initialises a ring buffer for the given stream.

func (*Service) Delete

func (s *Service) Delete(id domain.StreamCode)

Delete tears down the ring buffer for a stream (call when stream is stopped). Closes every subscriber's channel BEFORE removing the map entry so consumers observe a clean EOF (`pkt, ok := <-sub.Recv(); !ok`) and can react — the mixer / copy taps in particular rely on this signal to retry against the freshly-created buffer when an upstream stream restarts. Without the channel-close, taps stay blocked on a stale ringBuffer indefinitely while the new ringBuffer for the same id receives packets they never see.

func (*Service) Subscribe

func (s *Service) Subscribe(id domain.StreamCode) (*Subscriber, error)

Subscribe registers a new consumer for the stream's buffer. The caller must call Unsubscribe when done to avoid a goroutine/channel leak.

func (*Service) Unsubscribe

func (s *Service) Unsubscribe(id domain.StreamCode, sub *Subscriber)

Unsubscribe removes a consumer and closes its channel.

func (*Service) UnsubscribeAll added in v0.0.22

func (s *Service) UnsubscribeAll(id domain.StreamCode)

UnsubscribeAll closes every subscriber's channel for the given buffer. Subscribers' next Recv will see ok=false (a clean EOF signal). The buffer itself is left in place — call Delete after if you also want it removed. No-op when the buffer doesn't exist.

func (*Service) Write

func (s *Service) Write(id domain.StreamCode, pkt Packet) error

Write pushes a packet into the stream's ring buffer (deep-copied for subscribers). Only the active Ingestor goroutine for this stream should call Write.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber is a read cursor into a stream's ring buffer.

func (*Subscriber) Recv

func (s *Subscriber) Recv() <-chan Packet

Recv returns the channel from which the subscriber reads packets.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL