stream

package
v0.0.0-...-69fac63 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2026 License: MPL-2.0 Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const (
	FCC_TYPE_TELECOM = 0 // Telecom/ZTE/Fiberhome
	FCC_TYPE_HUAWEI  = 1 // Huawei
)

FCC Protocol Type - Based on vendor and port

View Source
const (
	FCC_TIMEOUT_SIGNALING_MS  = 80   // 信令阶段超时(FCC_STATE_REQUESTED或FCC_STATE_UNICAST_PENDING)
	FCC_TIMEOUT_UNICAST_SEC   = 1.0  // 单播媒体包超时(FCC_STATE_UNICAST_ACTIVE)
	FCC_TIMEOUT_SYNC_WAIT_SEC = 15.0 // 等待服务器同步通知的最大时间
)

FCC Timeout Configuration

View Source
const (
	FCC_STATE_INIT            = 0 // 初始状态
	FCC_STATE_REQUESTED       = 1 // 已发送FCC请求,等待服务器响应
	FCC_STATE_UNICAST_PENDING = 2 // 服务器已接受,等待第一个单播包
	FCC_STATE_UNICAST_ACTIVE  = 3 // 正在接收FCC单播流
	FCC_STATE_MCAST_REQUESTED = 4 // 通知服务器加入组播,正在转换中
	FCC_STATE_MCAST_ACTIVE    = 5 // 已完全切换到组播接收
	FCC_STATE_ERROR           = 6 // 错误状态
)

FCC State Machine - Based on Fast Channel Change Protocol

View Source
const (
	// Telecom FCC Packet Types
	FCC_FMT_TELECOM_REQ  = 2 // 电信FCC请求包 (FMT 2)
	FCC_FMT_TELECOM_RESP = 3 // 电信FCC响应包 (FMT 3)
	FCC_FMT_TELECOM_SYNC = 4 // 电信FCC同步通知 (FMT 4)
	FCC_FMT_TELECOM_TERM = 5 // 电信FCC终止包 (FMT 5)

	// Huawei FCC Packet Types
	FCC_FMT_HUAWEI_REQ  = 5  // 华为FCC请求包 (FMT 5)
	FCC_FMT_HUAWEI_RESP = 6  // 华为FCC响应包 (FMT 6)
	FCC_FMT_HUAWEI_NAT  = 12 // 华为FCC NAT穿越包 (FMT 12)
	FCC_FMT_HUAWEI_SYNC = 8  // 华为FCC同步通知 (FMT 8)
	FCC_FMT_HUAWEI_TERM = 9  // 华为FCC终止包 (FMT 9)
)

FCC RTCP Packet Format Types

View Source
const (
	StateStopped = iota
	StatePlaying
	StateError
)
View Source
const (
	SourceUnknown = iota
	SourceMulticast
	SourceUnicast
)
View Source
const (
	RTP_VERSION = 2
	P_MPGA      = 14
	P_MPGV      = 32
	NULL_PID    = 0x1FFF
	PAT_PID     = 0x0000
	PMT_PID     = 0x1000
)
View Source
const (
	CLIENT_STATE_FCC_INIT = iota
	CLIENT_STATE_FCC_REQUESTED
	CLIENT_STATE_FCC_UNICAST_PENDING
	CLIENT_STATE_FCC_UNICAST_ACTIVE
	CLIENT_STATE_FCC_MCAST_REQUESTED
	CLIENT_STATE_FCC_MCAST_ACTIVE
	CLIENT_STATE_ERROR
)

定义客户端状态常量

Variables

View Source
var ErrCacheClosed = errors.New("cache item closed")
View Source
var GlobalChannelManager = NewChannelManager()
View Source
var GlobalMultiChannelHub = NewMultiChannelHub()

Functions

func Close

func Close()

Close closes the global MultiChannelHub

func CopyHeader

func CopyHeader(dst, src http.Header, protoMajor int)

copyHeader 复制 HTTP 头部,按协议版本过滤不兼容字段 - dst: 目标 Header(客户端或后端) - src: 来源 Header(后端响应或客户端请求) - protoMajor: 客户端使用的协议版本,1 = HTTP/1.x, 2 = HTTP/2

func CopyHeadersExceptSensitive

func CopyHeadersExceptSensitive(dst http.Header, src http.Header, protoMajor int)

copyHeadersExceptSensitive 复制 HTTP 头部,跳过敏感或特殊头部

func CopyResponse

func CopyResponse(
	ctx context.Context,
	w http.ResponseWriter,
	r *http.Request,
	resp *http.Response,
	targetURL string,
	buf []byte,
	bufSize int,
	updateActive func(),
	statusCode int,
) error

CopyResponse 根据内容类型选择适当的复制方法

func CopyTSWithCache

func CopyTSWithCache(ctx context.Context, dst http.ResponseWriter, src io.Reader, key string) error

CopyTSWithCache 处理 TS 流缓存读取或从源读取写入响应

func CopyWithContext

func CopyWithContext(
	ctx context.Context,
	dst http.ResponseWriter,
	src io.Reader,
	buf []byte,
	bufSize int,
	updateActive func(),
	backendKey string,
) error

CopyWithContext 支持 HTTP hub 模式:按后端URL为键,单上游广播到所有前端

func Copytext

func Copytext(ctx context.Context, dst io.Writer, src io.Reader, buf []byte, updateActive func()) error

Copytext 流式复制 src -> dst,使用 buffer 池,bufio 内部缓存可控

func GetTargetPath

func GetTargetPath(r *http.Request) string

getTargetPath 获取目标路径

func GetTargetURL

func GetTargetURL(r *http.Request, targetPath string) string

getTargetURL 获取完整的目标URL

func HandleCopyError

func HandleCopyError(r *http.Request, err error, proxyResp *http.Response)

func HandleH264AacStream

func HandleH264AacStream(
	ctx context.Context,
	w http.ResponseWriter,
	client *gortsplib.Client,
	videoMedia *description.Media,
	videoFormat *format.H264,
	audioMedia *description.Media,
	audioFormat *format.MPEG4Audio,
	r *http.Request,
	rtspURL string,
	hub *StreamHubs,
	updateActive func(),
) error

func HandleMpegtsStream

func HandleMpegtsStream(
	ctx context.Context,
	w http.ResponseWriter,
	client *gortsplib.Client,
	videoMedia *description.Media,
	mpegtsFormat *format.MPEGTS,
	r *http.Request,
	rtspURL string,
	hub *StreamHubs,
	updateActive func(),
) error

func HandleProxyResponse

func HandleProxyResponse(ctx context.Context, w http.ResponseWriter, r *http.Request, targetURL string, resp *http.Response, updateActive func())

统一处理响应(重定向、特殊类型、普通内容)

func InitOrUpdateTSCacheFromConfig

func InitOrUpdateTSCacheFromConfig()

func InitTSCacheFromConfig

func InitTSCacheFromConfig()

func IsSupportedContentType

func IsSupportedContentType(contentType string) bool

检查内容类型是否受支持

func IsTSRequest

func IsTSRequest(rawURL string) bool

func RemoveHTTPHub

func RemoveHTTPHub(key string)

func RemoveHub

func RemoveHub(streamURL string)

RemoveHub 强制移除一个 StreamHub

func RemoveHubIfEmpty

func RemoveHubIfEmpty(streamURL string, hub *StreamHubs)

RemoveHubIfEmpty 只有在没有客户端时才移除 StreamHub

func UpdateAllHubsConfig

func UpdateAllHubsConfig(newConfig interface{})

UpdateAllHubsConfig 动态更新所有 StreamHubs 的配置

func UpdateHubConfig

func UpdateHubConfig(streamURL string, newConfig interface{}) error

UpdateHubConfig 动态更新指定 StreamHub 的配置

Types

type BufferPool

type BufferPool struct {
	// contains filtered or unexported fields
}

添加一个简单的内存池实现

func NewBufferPool

func NewBufferPool() *BufferPool

func (*BufferPool) Get

func (bp *BufferPool) Get() []byte

func (*BufferPool) Put

func (bp *BufferPool) Put(buf []byte)

type BufferRef

type BufferRef struct {
	Source int // 数据来源: 0=未知, 1=多播, 2=单播(FCC)
	// contains filtered or unexported fields
}

BufferRef 用于零拷贝缓冲区管理

func NewBufferRef

func NewBufferRef(data []byte) *BufferRef

NewBufferRef 创建新的BufferRef实例

func NewPooledBufferRef

func NewPooledBufferRef(backing []byte, view []byte, pool *sync.Pool) *BufferRef

NewPooledBufferRef 创建绑定池的BufferRef实例(用于真零拷贝)

func (*BufferRef) Get

func (b *BufferRef) Get()

Get 增加引用数

func (*BufferRef) Put

func (b *BufferRef) Put()

Put 减少引用数,当引用数为0时可以回收内存

type ChannelManager

type ChannelManager struct {
	Wg tsync.WaitGroup
	// contains filtered or unexported fields
}

func NewChannelManager

func NewChannelManager() *ChannelManager

NewChannelManager 创建新的频道管理器

func (*ChannelManager) Get

func (cm *ChannelManager) Get(channel string) *MulticastChannel

func (*ChannelManager) GetOrCreate

func (cm *ChannelManager) GetOrCreate(channel string) *MulticastChannel

func (*ChannelManager) StartCleaner

func (cm *ChannelManager) StartCleaner()

func (*ChannelManager) Stop

func (cm *ChannelManager) Stop()

Stop 停止清理器

type ClientFccState

type ClientFccState struct {
	// contains filtered or unexported fields
}

ClientFccState 存储每个客户端的FCC状态信息

func NewClientFccState

func NewClientFccState() *ClientFccState

NewClientFccState 创建新的客户端FCC状态

type FccSession

type FccSession struct {
	ConnID     string
	ChannelID  string
	ReadSeq    int64
	LastActive time.Time
}

func NewFccSession

func NewFccSession(connID, ch string, startSeq int64) *FccSession

func (*FccSession) Touch

func (s *FccSession) Touch()

type HTTPHub

type HTTPHub struct {
	// contains filtered or unexported fields
}

func GetOrCreateHTTPHub

func GetOrCreateHTTPHub(rawURL string) *HTTPHub

func (*HTTPHub) AddClient

func (h *HTTPHub) AddClient(w http.ResponseWriter, bufSize int) *HTTPHubClient

func (*HTTPHub) Broadcast

func (h *HTTPHub) Broadcast(data []byte)

func (*HTTPHub) ClientCount

func (h *HTTPHub) ClientCount() int

func (*HTTPHub) Close

func (h *HTTPHub) Close()

func (*HTTPHub) EnsureProducer

func (h *HTTPHub) EnsureProducer(ctx context.Context, src io.Reader, buf []byte)

func (*HTTPHub) GetLastError

func (h *HTTPHub) GetLastError() error

新增方法:获取最后的错误

func (*HTTPHub) RemoveClient

func (h *HTTPHub) RemoveClient(c *HTTPHubClient)

func (*HTTPHub) SetError

func (h *HTTPHub) SetError(err error)

新增方法:设置流为错误状态

func (*HTTPHub) SetPlaying

func (h *HTTPHub) SetPlaying()

新增方法:设置流为播放状态

func (*HTTPHub) SetStopped

func (h *HTTPHub) SetStopped()

新增方法:设置流为停止状态

func (*HTTPHub) WaitForPlaying

func (h *HTTPHub) WaitForPlaying(ctx context.Context) bool

新增方法:等待流变为播放状态

type HTTPHubClient

type HTTPHubClient struct {
	// contains filtered or unexported fields
}

func (*HTTPHubClient) WriteLoop

func (c *HTTPHubClient) WriteLoop(ctx context.Context, updateActive func()) error

type MultiChannelHub

type MultiChannelHub struct {
	Mu   sync.RWMutex
	Hubs map[string]*StreamHub

	Wg tsync.WaitGroup // 等待监控 goroutine 结束
	// contains filtered or unexported fields
}

==================== MultiChannelHub ====================

func NewMultiChannelHub

func NewMultiChannelHub() *MultiChannelHub

func (*MultiChannelHub) Close

func (m *MultiChannelHub) Close()

Close 关闭 MultiChannelHub 并释放所有资源

func (*MultiChannelHub) GetOrCreateHub

func (m *MultiChannelHub) GetOrCreateHub(udpAddr string, ifaces []string) (*StreamHub, error)

func (*MultiChannelHub) HubKey

func (m *MultiChannelHub) HubKey(udpAddr string, ifaces []string) string

MD5(IP:Port@ifaces) 作为 Hub key

func (*MultiChannelHub) RemoveHub

func (m *MultiChannelHub) RemoveHub(udpAddr string)

func (*MultiChannelHub) RemoveHubEx

func (m *MultiChannelHub) RemoveHubEx(udpAddr string, ifaces []string)

func (*MultiChannelHub) RemoveHubSpecific

func (m *MultiChannelHub) RemoveHubSpecific(h *StreamHub, ifaces []string)

type MulticastChannel

type MulticastChannel struct {
	ID       string
	Cache    *TsRingBuffer
	Sessions map[string]*FccSession
	// contains filtered or unexported fields
}

func NewMulticastChannel

func NewMulticastChannel(id string, cacheSize int) *MulticastChannel

func (*MulticastChannel) AddSession

func (mc *MulticastChannel) AddSession(connID string) *FccSession

func (*MulticastChannel) AddTsPacket

func (mc *MulticastChannel) AddTsPacket(pkt []byte)

func (*MulticastChannel) ReadForSession

func (mc *MulticastChannel) ReadForSession(s *FccSession) [][]byte

func (*MulticastChannel) RefCount

func (mc *MulticastChannel) RefCount() int32

func (*MulticastChannel) RemoveSession

func (mc *MulticastChannel) RemoveSession(connID string)

type RingBuffer

type RingBuffer struct {
	// contains filtered or unexported fields
}

==================== RingBuffer 环形缓冲区 ====================

func NewRingBuffer

func NewRingBuffer(size int) *RingBuffer

func (*RingBuffer) GetAllRefs

func (r *RingBuffer) GetAllRefs() []*BufferRef

func (*RingBuffer) GetCount

func (r *RingBuffer) GetCount() int

GetCount 返回当前缓冲区中的元素数量

func (*RingBuffer) Push

func (r *RingBuffer) Push(item *BufferRef) (evicted *BufferRef)

func (*RingBuffer) PushWithReuse

func (r *RingBuffer) PushWithReuse(item *BufferRef) (evicted *BufferRef)

优化版Push,支持预分配和重用

func (*RingBuffer) Reset

func (r *RingBuffer) Reset()

Reset clears the ring buffer

type StreamHub

type StreamHub struct {
	Mu          sync.RWMutex
	Clients     map[string]*hubClient
	AddCh       chan *hubClient
	RemoveCh    chan string
	UdpConns    []*net.UDPConn
	CacheBuffer *RingBuffer
	Closed      chan struct{}
	BufPool     *sync.Pool
	AddrList    []string

	LastFrame *BufferRef
	OnEmpty   func(*StreamHub)

	Wg tsync.WaitGroup // 等待所有后台 goroutine 结束
	// contains filtered or unexported fields
}

StreamHub represents a multicast/unicast streaming hub

func NewStreamHub

func NewStreamHub(addrs []string, ifaces []string) (*StreamHub, error)

==================== 创建新 Hub ====================

func (*StreamHub) Close

func (h *StreamHub) Close()

==================== 关闭 Hub ====================

func (*StreamHub) EnableFCC

func (h *StreamHub) EnableFCC(enabled bool)

EnableFCC 启用或禁用FCC功能

func (*StreamHub) GetFccCacheSize

func (h *StreamHub) GetFccCacheSize() int

GetFccCacheSize 获取FCC缓存大小

func (*StreamHub) GetFccPortMax

func (h *StreamHub) GetFccPortMax() int

GetFccPortMax 获取FCC监听端口最大值

func (*StreamHub) GetFccPortMin

func (h *StreamHub) GetFccPortMin() int

GetFccPortMin 获取FCC监听端口最小值

func (*StreamHub) GetFccServerAddr

func (h *StreamHub) GetFccServerAddr() *net.UDPAddr

GetFccServerAddr 获取FCC服务器地址

func (*StreamHub) GetFccState

func (h *StreamHub) GetFccState() int

GetFccState 获取FCC状态

func (*StreamHub) GetFccStateInfo

func (h *StreamHub) GetFccStateInfo() map[string]interface{}

GetFccStateInfo 获取FCC状态信息

func (*StreamHub) GetFccType

func (h *StreamHub) GetFccType() string

GetFccType 获取FCC类型

func (*StreamHub) GetRejoinInterval

func (h *StreamHub) GetRejoinInterval() time.Duration

GetRejoinInterval 获取重新加入间隔

func (*StreamHub) IsClosed

func (h *StreamHub) IsClosed() bool

==================== 判断 Hub 是否关闭 ====================

func (*StreamHub) IsFccActive

func (h *StreamHub) IsFccActive() bool

IsFccActive 检查FCC是否处于活动状态(已切换到多播)

func (*StreamHub) IsFccEnabled

func (h *StreamHub) IsFccEnabled() bool

IsFccEnabled 检查FCC是否启用

func (*StreamHub) ResetRejoinTimer

func (h *StreamHub) ResetRejoinTimer()

ResetRejoinTimer 重置重新加入定时器

func (*StreamHub) ServeHTTP

func (h *StreamHub) ServeHTTP(w http.ResponseWriter, r *http.Request, contentType string, updateActive func())

==================== HTTP 播放 ====================

func (*StreamHub) SetFccParams

func (h *StreamHub) SetFccParams(cacheSize, portMin, portMax int)

SetFccParams 设置FCC参数

func (*StreamHub) SetFccServerAddr

func (h *StreamHub) SetFccServerAddr(addr string) error

SetFccServerAddr 设置FCC服务器地址

func (*StreamHub) SetFccType

func (h *StreamHub) SetFccType(fccType string)

SetFccType 设置FCC类型

func (*StreamHub) SetRejoinInterval

func (h *StreamHub) SetRejoinInterval(interval time.Duration)

SetRejoinInterval 设置重新加入间隔

func (*StreamHub) TransferClientsTo

func (h *StreamHub) TransferClientsTo(newHub *StreamHub)

==================== 客户端迁移到新 Hub ====================

func (*StreamHub) UpdateInterfaces

func (h *StreamHub) UpdateInterfaces(ifaces []string) error

==================== 更新 Hub 的接口 ====================

func (*StreamHub) UpdateRejoinTimer

func (h *StreamHub) UpdateRejoinTimer()

UpdateRejoinTimer 更新重新加入定时器

func (*StreamHub) WaitClosed

func (h *StreamHub) WaitClosed()

WaitClosed 等待 Hub 完全关闭并释放所有资源

func (*StreamHub) WaitForPlaying

func (h *StreamHub) WaitForPlaying(ctx context.Context) bool

==================== 等待播放状态 ====================

type StreamHubs

type StreamHubs struct {
	// contains filtered or unexported fields
}

func GetOrCreateHubs

func GetOrCreateHubs(streamURL string) *StreamHubs

GetOrCreateHub 获取或创建一个 StreamHub

func NewStreamHubs

func NewStreamHubs() *StreamHubs

func (*StreamHubs) AddClient

func (hub *StreamHubs) AddClient(ch *ringbuffer.RingBuffer)

func (*StreamHubs) AddWG

func (hub *StreamHubs) AddWG(n int)

AddWG 为 hub 添加等待组计数

func (*StreamHubs) Broadcast

func (hub *StreamHubs) Broadcast(data []byte)

func (*StreamHubs) ClearMediaInfo

func (h *StreamHubs) ClearMediaInfo()

ClearMediaInfo 清除媒体信息

func (*StreamHubs) ClientCount

func (hub *StreamHubs) ClientCount() int

func (*StreamHubs) Close

func (hub *StreamHubs) Close()

func (*StreamHubs) DoneWG

func (hub *StreamHubs) DoneWG()

DoneWG 减少 hub 等待组计数

func (*StreamHubs) GetContext

func (hub *StreamHubs) GetContext() context.Context

GetContext 获取 hub 的上下文

func (*StreamHubs) GetLastError

func (hub *StreamHubs) GetLastError() error

新增方法:获取最后的错误

func (*StreamHubs) GetMediaInfo

func (h *StreamHubs) GetMediaInfo() (*description.Media, interface{}, *description.Media, *format.MPEG4Audio)

GetMediaInfo retrieves stored video media and format

func (*StreamHubs) GetRtspClient

func (hub *StreamHubs) GetRtspClient() *gortsplib.Client

新增方法:获取RTSP客户端

func (*StreamHubs) GetSetupLock

func (h *StreamHubs) GetSetupLock()

GetSetupLock 获取初始化锁

func (*StreamHubs) Go

func (hub *StreamHubs) Go(f func())

Go 启动一个协程并自动管理 WaitGroup 的计数

func (*StreamHubs) HasRtspClient

func (hub *StreamHubs) HasRtspClient() bool

新增方法:检查RTSP客户端是否存在

func (*StreamHubs) ReleaseSetupLock

func (h *StreamHubs) ReleaseSetupLock()

ReleaseSetupLock 释放初始化锁

func (*StreamHubs) RemoveClient

func (hub *StreamHubs) RemoveClient(ch *ringbuffer.RingBuffer)

func (*StreamHubs) SetAudioMediaInfo

func (h *StreamHubs) SetAudioMediaInfo(media *description.Media, format *format.MPEG4Audio)

SetAudioMediaInfo stores the audio media and format for reuse

func (*StreamHubs) SetError

func (hub *StreamHubs) SetError(err error)

SetError 设置流为错误状态

func (*StreamHubs) SetMediaInfo

func (h *StreamHubs) SetMediaInfo(media *description.Media, format interface{})

SetMediaInfo stores the video media and format for reuse

func (*StreamHubs) SetPlaying

func (hub *StreamHubs) SetPlaying()

新增方法:设置流为播放状态

func (*StreamHubs) SetRtspClient

func (hub *StreamHubs) SetRtspClient(client *gortsplib.Client)

新增方法:设置RTSP客户端

func (*StreamHubs) SetStopped

func (hub *StreamHubs) SetStopped()

新增方法:设置流为停止状态

func (*StreamHubs) WaitForPlaying

func (hub *StreamHubs) WaitForPlaying(ctx context.Context) bool

新增方法:等待流变为播放状态

type TSCache

type TSCache struct {
	// contains filtered or unexported fields
}
var GlobalTSCache *TSCache

func NewTSCache

func NewTSCache(maxBytes int64, ttl time.Duration) *TSCache

func (*TSCache) Close

func (c *TSCache) Close()

func (*TSCache) Get

func (c *TSCache) Get(key string) (*tsCacheItem, bool)

func (*TSCache) GetOrCreate

func (c *TSCache) GetOrCreate(key string) (*tsCacheItem, bool)

func (*TSCache) Remove

func (c *TSCache) Remove(key string)

func (*TSCache) UpdateConfig

func (c *TSCache) UpdateConfig(newMaxBytes int64, newTTL time.Duration)

UpdateConfig 更新缓存配置

func (*TSCache) WriteChunkWithByteTracking

func (c *TSCache) WriteChunkWithByteTracking(item *tsCacheItem, data []byte)

WriteChunkWithByteTracking 向缓存项写入数据块,并跟踪字节计数到父缓存

type TsRingBuffer

type TsRingBuffer struct {
	// contains filtered or unexported fields
}

func NewTsRingBuffer

func NewTsRingBuffer(size int) *TsRingBuffer

func (*TsRingBuffer) ReadFrom

func (rb *TsRingBuffer) ReadFrom(seq int64) (out [][]byte, nextSeq int64)

func (*TsRingBuffer) Reset

func (rb *TsRingBuffer) Reset()

func (*TsRingBuffer) TailSeq

func (rb *TsRingBuffer) TailSeq() int64

func (*TsRingBuffer) Write

func (rb *TsRingBuffer) Write(pkt []byte)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL