Documentation
¶
Index ¶
- Constants
- Variables
- func Close()
- func CopyHeader(dst, src http.Header, protoMajor int)
- func CopyHeadersExceptSensitive(dst http.Header, src http.Header, protoMajor int)
- func CopyResponse(ctx context.Context, w http.ResponseWriter, r *http.Request, ...) error
- func CopyTSWithCache(ctx context.Context, dst http.ResponseWriter, src io.Reader, key string) error
- func CopyWithContext(ctx context.Context, dst http.ResponseWriter, src io.Reader, buf []byte, ...) error
- func Copytext(ctx context.Context, dst io.Writer, src io.Reader, buf []byte, ...) error
- func GetTargetPath(r *http.Request) string
- func GetTargetURL(r *http.Request, targetPath string) string
- func HandleCopyError(r *http.Request, err error, proxyResp *http.Response)
- func HandleH264AacStream(ctx context.Context, w http.ResponseWriter, client *gortsplib.Client, ...) error
- func HandleMpegtsStream(ctx context.Context, w http.ResponseWriter, client *gortsplib.Client, ...) error
- func HandleProxyResponse(ctx context.Context, w http.ResponseWriter, r *http.Request, targetURL string, ...)
- func InitOrUpdateTSCacheFromConfig()
- func InitTSCacheFromConfig()
- func IsSupportedContentType(contentType string) bool
- func IsTSRequest(rawURL string) bool
- func RemoveHTTPHub(key string)
- func RemoveHub(streamURL string)
- func RemoveHubIfEmpty(streamURL string, hub *StreamHubs)
- func UpdateAllHubsConfig(newConfig interface{})
- func UpdateHubConfig(streamURL string, newConfig interface{}) error
- type BufferPool
- type BufferRef
- type ChannelManager
- type ClientFccState
- type FccSession
- type HTTPHub
- func (h *HTTPHub) AddClient(w http.ResponseWriter, bufSize int) *HTTPHubClient
- func (h *HTTPHub) Broadcast(data []byte)
- func (h *HTTPHub) ClientCount() int
- func (h *HTTPHub) Close()
- func (h *HTTPHub) EnsureProducer(ctx context.Context, src io.Reader, buf []byte)
- func (h *HTTPHub) GetLastError() error
- func (h *HTTPHub) RemoveClient(c *HTTPHubClient)
- func (h *HTTPHub) SetError(err error)
- func (h *HTTPHub) SetPlaying()
- func (h *HTTPHub) SetStopped()
- func (h *HTTPHub) WaitForPlaying(ctx context.Context) bool
- type HTTPHubClient
- type MultiChannelHub
- func (m *MultiChannelHub) Close()
- func (m *MultiChannelHub) GetOrCreateHub(udpAddr string, ifaces []string) (*StreamHub, error)
- func (m *MultiChannelHub) HubKey(udpAddr string, ifaces []string) string
- func (m *MultiChannelHub) RemoveHub(udpAddr string)
- func (m *MultiChannelHub) RemoveHubEx(udpAddr string, ifaces []string)
- func (m *MultiChannelHub) RemoveHubSpecific(h *StreamHub, ifaces []string)
- type MulticastChannel
- type RingBuffer
- type StreamHub
- func (h *StreamHub) Close()
- func (h *StreamHub) EnableFCC(enabled bool)
- func (h *StreamHub) GetFccCacheSize() int
- func (h *StreamHub) GetFccPortMax() int
- func (h *StreamHub) GetFccPortMin() int
- func (h *StreamHub) GetFccServerAddr() *net.UDPAddr
- func (h *StreamHub) GetFccState() int
- func (h *StreamHub) GetFccStateInfo() map[string]interface{}
- func (h *StreamHub) GetFccType() string
- func (h *StreamHub) GetRejoinInterval() time.Duration
- func (h *StreamHub) IsClosed() bool
- func (h *StreamHub) IsFccActive() bool
- func (h *StreamHub) IsFccEnabled() bool
- func (h *StreamHub) ResetRejoinTimer()
- func (h *StreamHub) ServeHTTP(w http.ResponseWriter, r *http.Request, contentType string, ...)
- func (h *StreamHub) SetFccParams(cacheSize, portMin, portMax int)
- func (h *StreamHub) SetFccServerAddr(addr string) error
- func (h *StreamHub) SetFccType(fccType string)
- func (h *StreamHub) SetRejoinInterval(interval time.Duration)
- func (h *StreamHub) TransferClientsTo(newHub *StreamHub)
- func (h *StreamHub) UpdateInterfaces(ifaces []string) error
- func (h *StreamHub) UpdateRejoinTimer()
- func (h *StreamHub) WaitClosed()
- func (h *StreamHub) WaitForPlaying(ctx context.Context) bool
- type StreamHubs
- func (hub *StreamHubs) AddClient(ch *ringbuffer.RingBuffer)
- func (hub *StreamHubs) AddWG(n int)
- func (hub *StreamHubs) Broadcast(data []byte)
- func (h *StreamHubs) ClearMediaInfo()
- func (hub *StreamHubs) ClientCount() int
- func (hub *StreamHubs) Close()
- func (hub *StreamHubs) DoneWG()
- func (hub *StreamHubs) GetContext() context.Context
- func (hub *StreamHubs) GetLastError() error
- func (h *StreamHubs) GetMediaInfo() (*description.Media, interface{}, *description.Media, *format.MPEG4Audio)
- func (hub *StreamHubs) GetRtspClient() *gortsplib.Client
- func (h *StreamHubs) GetSetupLock()
- func (hub *StreamHubs) Go(f func())
- func (hub *StreamHubs) HasRtspClient() bool
- func (h *StreamHubs) ReleaseSetupLock()
- func (hub *StreamHubs) RemoveClient(ch *ringbuffer.RingBuffer)
- func (h *StreamHubs) SetAudioMediaInfo(media *description.Media, format *format.MPEG4Audio)
- func (hub *StreamHubs) SetError(err error)
- func (h *StreamHubs) SetMediaInfo(media *description.Media, format interface{})
- func (hub *StreamHubs) SetPlaying()
- func (hub *StreamHubs) SetRtspClient(client *gortsplib.Client)
- func (hub *StreamHubs) SetStopped()
- func (hub *StreamHubs) WaitForPlaying(ctx context.Context) bool
- type TSCache
- func (c *TSCache) Close()
- func (c *TSCache) Get(key string) (*tsCacheItem, bool)
- func (c *TSCache) GetOrCreate(key string) (*tsCacheItem, bool)
- func (c *TSCache) Remove(key string)
- func (c *TSCache) UpdateConfig(newMaxBytes int64, newTTL time.Duration)
- func (c *TSCache) WriteChunkWithByteTracking(item *tsCacheItem, data []byte)
- type TsRingBuffer
Constants ¶
const ( FCC_TYPE_TELECOM = 0 // Telecom/ZTE/Fiberhome FCC_TYPE_HUAWEI = 1 // Huawei )
FCC Protocol Type - Based on vendor and port
const ( FCC_TIMEOUT_SIGNALING_MS = 80 // 信令阶段超时(FCC_STATE_REQUESTED或FCC_STATE_UNICAST_PENDING) FCC_TIMEOUT_UNICAST_SEC = 1.0 // 单播媒体包超时(FCC_STATE_UNICAST_ACTIVE) FCC_TIMEOUT_SYNC_WAIT_SEC = 15.0 // 等待服务器同步通知的最大时间 )
FCC Timeout Configuration
const ( FCC_STATE_INIT = 0 // 初始状态 FCC_STATE_REQUESTED = 1 // 已发送FCC请求,等待服务器响应 FCC_STATE_UNICAST_PENDING = 2 // 服务器已接受,等待第一个单播包 FCC_STATE_UNICAST_ACTIVE = 3 // 正在接收FCC单播流 FCC_STATE_MCAST_REQUESTED = 4 // 通知服务器加入组播,正在转换中 FCC_STATE_MCAST_ACTIVE = 5 // 已完全切换到组播接收 FCC_STATE_ERROR = 6 // 错误状态 )
FCC State Machine - Based on Fast Channel Change Protocol
const ( // Telecom FCC Packet Types FCC_FMT_TELECOM_REQ = 2 // 电信FCC请求包 (FMT 2) FCC_FMT_TELECOM_RESP = 3 // 电信FCC响应包 (FMT 3) FCC_FMT_TELECOM_SYNC = 4 // 电信FCC同步通知 (FMT 4) FCC_FMT_TELECOM_TERM = 5 // 电信FCC终止包 (FMT 5) // Huawei FCC Packet Types FCC_FMT_HUAWEI_REQ = 5 // 华为FCC请求包 (FMT 5) FCC_FMT_HUAWEI_RESP = 6 // 华为FCC响应包 (FMT 6) FCC_FMT_HUAWEI_NAT = 12 // 华为FCC NAT穿越包 (FMT 12) FCC_FMT_HUAWEI_SYNC = 8 // 华为FCC同步通知 (FMT 8) FCC_FMT_HUAWEI_TERM = 9 // 华为FCC终止包 (FMT 9) )
FCC RTCP Packet Format Types
const ( StateStopped = iota StatePlaying StateError )
const ( SourceUnknown = iota SourceMulticast SourceUnicast )
const ( RTP_VERSION = 2 P_MPGA = 14 P_MPGV = 32 NULL_PID = 0x1FFF PAT_PID = 0x0000 PMT_PID = 0x1000 )
const ( CLIENT_STATE_FCC_INIT = iota CLIENT_STATE_FCC_REQUESTED CLIENT_STATE_FCC_UNICAST_PENDING CLIENT_STATE_FCC_UNICAST_ACTIVE CLIENT_STATE_FCC_MCAST_REQUESTED CLIENT_STATE_FCC_MCAST_ACTIVE CLIENT_STATE_ERROR )
定义客户端状态常量
Variables ¶
var ErrCacheClosed = errors.New("cache item closed")
var GlobalChannelManager = NewChannelManager()
var GlobalMultiChannelHub = NewMultiChannelHub()
Functions ¶
func CopyHeader ¶
copyHeader 复制 HTTP 头部,按协议版本过滤不兼容字段 - dst: 目标 Header(客户端或后端) - src: 来源 Header(后端响应或客户端请求) - protoMajor: 客户端使用的协议版本,1 = HTTP/1.x, 2 = HTTP/2
func CopyHeadersExceptSensitive ¶
copyHeadersExceptSensitive 复制 HTTP 头部,跳过敏感或特殊头部
func CopyResponse ¶
func CopyResponse( ctx context.Context, w http.ResponseWriter, r *http.Request, resp *http.Response, targetURL string, buf []byte, bufSize int, updateActive func(), statusCode int, ) error
CopyResponse 根据内容类型选择适当的复制方法
func CopyTSWithCache ¶
CopyTSWithCache 处理 TS 流缓存读取或从源读取写入响应
func CopyWithContext ¶
func CopyWithContext( ctx context.Context, dst http.ResponseWriter, src io.Reader, buf []byte, bufSize int, updateActive func(), backendKey string, ) error
CopyWithContext 支持 HTTP hub 模式:按后端URL为键,单上游广播到所有前端
func Copytext ¶
func Copytext(ctx context.Context, dst io.Writer, src io.Reader, buf []byte, updateActive func()) error
Copytext 流式复制 src -> dst,使用 buffer 池,bufio 内部缓存可控
func GetTargetURL ¶
getTargetURL 获取完整的目标URL
func HandleH264AacStream ¶
func HandleH264AacStream( ctx context.Context, w http.ResponseWriter, client *gortsplib.Client, videoMedia *description.Media, videoFormat *format.H264, audioMedia *description.Media, audioFormat *format.MPEG4Audio, r *http.Request, rtspURL string, hub *StreamHubs, updateActive func(), ) error
func HandleMpegtsStream ¶
func HandleMpegtsStream( ctx context.Context, w http.ResponseWriter, client *gortsplib.Client, videoMedia *description.Media, mpegtsFormat *format.MPEGTS, r *http.Request, rtspURL string, hub *StreamHubs, updateActive func(), ) error
func HandleProxyResponse ¶
func HandleProxyResponse(ctx context.Context, w http.ResponseWriter, r *http.Request, targetURL string, resp *http.Response, updateActive func())
统一处理响应(重定向、特殊类型、普通内容)
func InitOrUpdateTSCacheFromConfig ¶
func InitOrUpdateTSCacheFromConfig()
func InitTSCacheFromConfig ¶
func InitTSCacheFromConfig()
func IsTSRequest ¶
func RemoveHTTPHub ¶
func RemoveHTTPHub(key string)
func RemoveHubIfEmpty ¶
func RemoveHubIfEmpty(streamURL string, hub *StreamHubs)
RemoveHubIfEmpty 只有在没有客户端时才移除 StreamHub
func UpdateAllHubsConfig ¶
func UpdateAllHubsConfig(newConfig interface{})
UpdateAllHubsConfig 动态更新所有 StreamHubs 的配置
func UpdateHubConfig ¶
UpdateHubConfig 动态更新指定 StreamHub 的配置
Types ¶
type BufferPool ¶
type BufferPool struct {
// contains filtered or unexported fields
}
添加一个简单的内存池实现
func NewBufferPool ¶
func NewBufferPool() *BufferPool
func (*BufferPool) Get ¶
func (bp *BufferPool) Get() []byte
func (*BufferPool) Put ¶
func (bp *BufferPool) Put(buf []byte)
type BufferRef ¶
type BufferRef struct {
Source int // 数据来源: 0=未知, 1=多播, 2=单播(FCC)
// contains filtered or unexported fields
}
BufferRef 用于零拷贝缓冲区管理
func NewPooledBufferRef ¶
NewPooledBufferRef 创建绑定池的BufferRef实例(用于真零拷贝)
type ChannelManager ¶
func (*ChannelManager) Get ¶
func (cm *ChannelManager) Get(channel string) *MulticastChannel
func (*ChannelManager) GetOrCreate ¶
func (cm *ChannelManager) GetOrCreate(channel string) *MulticastChannel
func (*ChannelManager) StartCleaner ¶
func (cm *ChannelManager) StartCleaner()
type ClientFccState ¶
type ClientFccState struct {
// contains filtered or unexported fields
}
ClientFccState 存储每个客户端的FCC状态信息
type FccSession ¶
func NewFccSession ¶
func NewFccSession(connID, ch string, startSeq int64) *FccSession
func (*FccSession) Touch ¶
func (s *FccSession) Touch()
type HTTPHub ¶
type HTTPHub struct {
// contains filtered or unexported fields
}
func GetOrCreateHTTPHub ¶
func (*HTTPHub) AddClient ¶
func (h *HTTPHub) AddClient(w http.ResponseWriter, bufSize int) *HTTPHubClient
func (*HTTPHub) ClientCount ¶
func (*HTTPHub) EnsureProducer ¶
func (*HTTPHub) RemoveClient ¶
func (h *HTTPHub) RemoveClient(c *HTTPHubClient)
type HTTPHubClient ¶
type HTTPHubClient struct {
// contains filtered or unexported fields
}
type MultiChannelHub ¶
type MultiChannelHub struct {
Mu sync.RWMutex
Hubs map[string]*StreamHub
Wg tsync.WaitGroup // 等待监控 goroutine 结束
// contains filtered or unexported fields
}
==================== MultiChannelHub ====================
func NewMultiChannelHub ¶
func NewMultiChannelHub() *MultiChannelHub
func (*MultiChannelHub) GetOrCreateHub ¶
func (m *MultiChannelHub) GetOrCreateHub(udpAddr string, ifaces []string) (*StreamHub, error)
func (*MultiChannelHub) HubKey ¶
func (m *MultiChannelHub) HubKey(udpAddr string, ifaces []string) string
MD5(IP:Port@ifaces) 作为 Hub key
func (*MultiChannelHub) RemoveHub ¶
func (m *MultiChannelHub) RemoveHub(udpAddr string)
func (*MultiChannelHub) RemoveHubEx ¶
func (m *MultiChannelHub) RemoveHubEx(udpAddr string, ifaces []string)
func (*MultiChannelHub) RemoveHubSpecific ¶
func (m *MultiChannelHub) RemoveHubSpecific(h *StreamHub, ifaces []string)
type MulticastChannel ¶
type MulticastChannel struct {
ID string
Cache *TsRingBuffer
Sessions map[string]*FccSession
// contains filtered or unexported fields
}
func NewMulticastChannel ¶
func NewMulticastChannel(id string, cacheSize int) *MulticastChannel
func (*MulticastChannel) AddSession ¶
func (mc *MulticastChannel) AddSession(connID string) *FccSession
func (*MulticastChannel) AddTsPacket ¶
func (mc *MulticastChannel) AddTsPacket(pkt []byte)
func (*MulticastChannel) ReadForSession ¶
func (mc *MulticastChannel) ReadForSession(s *FccSession) [][]byte
func (*MulticastChannel) RefCount ¶
func (mc *MulticastChannel) RefCount() int32
func (*MulticastChannel) RemoveSession ¶
func (mc *MulticastChannel) RemoveSession(connID string)
type RingBuffer ¶
type RingBuffer struct {
// contains filtered or unexported fields
}
==================== RingBuffer 环形缓冲区 ====================
func NewRingBuffer ¶
func NewRingBuffer(size int) *RingBuffer
func (*RingBuffer) GetAllRefs ¶
func (r *RingBuffer) GetAllRefs() []*BufferRef
func (*RingBuffer) Push ¶
func (r *RingBuffer) Push(item *BufferRef) (evicted *BufferRef)
func (*RingBuffer) PushWithReuse ¶
func (r *RingBuffer) PushWithReuse(item *BufferRef) (evicted *BufferRef)
优化版Push,支持预分配和重用
type StreamHub ¶
type StreamHub struct {
Mu sync.RWMutex
Clients map[string]*hubClient
AddCh chan *hubClient
RemoveCh chan string
UdpConns []*net.UDPConn
CacheBuffer *RingBuffer
Closed chan struct{}
BufPool *sync.Pool
AddrList []string
LastFrame *BufferRef
OnEmpty func(*StreamHub)
Wg tsync.WaitGroup // 等待所有后台 goroutine 结束
// contains filtered or unexported fields
}
StreamHub represents a multicast/unicast streaming hub
func NewStreamHub ¶
==================== 创建新 Hub ====================
func (*StreamHub) Close ¶
func (h *StreamHub) Close()
==================== 关闭 Hub ====================
func (*StreamHub) GetFccCacheSize ¶
GetFccCacheSize 获取FCC缓存大小
func (*StreamHub) GetFccPortMax ¶
GetFccPortMax 获取FCC监听端口最大值
func (*StreamHub) GetFccPortMin ¶
GetFccPortMin 获取FCC监听端口最小值
func (*StreamHub) GetFccServerAddr ¶
GetFccServerAddr 获取FCC服务器地址
func (*StreamHub) GetFccStateInfo ¶
GetFccStateInfo 获取FCC状态信息
func (*StreamHub) GetRejoinInterval ¶
GetRejoinInterval 获取重新加入间隔
func (*StreamHub) IsFccActive ¶
IsFccActive 检查FCC是否处于活动状态(已切换到多播)
func (*StreamHub) ResetRejoinTimer ¶
func (h *StreamHub) ResetRejoinTimer()
ResetRejoinTimer 重置重新加入定时器
func (*StreamHub) ServeHTTP ¶
func (h *StreamHub) ServeHTTP(w http.ResponseWriter, r *http.Request, contentType string, updateActive func())
==================== HTTP 播放 ====================
func (*StreamHub) SetFccParams ¶
SetFccParams 设置FCC参数
func (*StreamHub) SetFccServerAddr ¶
SetFccServerAddr 设置FCC服务器地址
func (*StreamHub) SetRejoinInterval ¶
SetRejoinInterval 设置重新加入间隔
func (*StreamHub) TransferClientsTo ¶
==================== 客户端迁移到新 Hub ====================
func (*StreamHub) UpdateInterfaces ¶
==================== 更新 Hub 的接口 ====================
func (*StreamHub) UpdateRejoinTimer ¶
func (h *StreamHub) UpdateRejoinTimer()
UpdateRejoinTimer 更新重新加入定时器
type StreamHubs ¶
type StreamHubs struct {
// contains filtered or unexported fields
}
func GetOrCreateHubs ¶
func GetOrCreateHubs(streamURL string) *StreamHubs
GetOrCreateHub 获取或创建一个 StreamHub
func NewStreamHubs ¶
func NewStreamHubs() *StreamHubs
func (*StreamHubs) AddClient ¶
func (hub *StreamHubs) AddClient(ch *ringbuffer.RingBuffer)
func (*StreamHubs) Broadcast ¶
func (hub *StreamHubs) Broadcast(data []byte)
func (*StreamHubs) ClientCount ¶
func (hub *StreamHubs) ClientCount() int
func (*StreamHubs) Close ¶
func (hub *StreamHubs) Close()
func (*StreamHubs) GetContext ¶
func (hub *StreamHubs) GetContext() context.Context
GetContext 获取 hub 的上下文
func (*StreamHubs) GetMediaInfo ¶
func (h *StreamHubs) GetMediaInfo() (*description.Media, interface{}, *description.Media, *format.MPEG4Audio)
GetMediaInfo retrieves stored video media and format
func (*StreamHubs) GetRtspClient ¶
func (hub *StreamHubs) GetRtspClient() *gortsplib.Client
新增方法:获取RTSP客户端
func (*StreamHubs) ReleaseSetupLock ¶
func (h *StreamHubs) ReleaseSetupLock()
ReleaseSetupLock 释放初始化锁
func (*StreamHubs) RemoveClient ¶
func (hub *StreamHubs) RemoveClient(ch *ringbuffer.RingBuffer)
func (*StreamHubs) SetAudioMediaInfo ¶
func (h *StreamHubs) SetAudioMediaInfo(media *description.Media, format *format.MPEG4Audio)
SetAudioMediaInfo stores the audio media and format for reuse
func (*StreamHubs) SetMediaInfo ¶
func (h *StreamHubs) SetMediaInfo(media *description.Media, format interface{})
SetMediaInfo stores the video media and format for reuse
func (*StreamHubs) SetRtspClient ¶
func (hub *StreamHubs) SetRtspClient(client *gortsplib.Client)
新增方法:设置RTSP客户端
func (*StreamHubs) WaitForPlaying ¶
func (hub *StreamHubs) WaitForPlaying(ctx context.Context) bool
新增方法:等待流变为播放状态
type TSCache ¶
type TSCache struct {
// contains filtered or unexported fields
}
var GlobalTSCache *TSCache
func (*TSCache) GetOrCreate ¶
func (*TSCache) UpdateConfig ¶
UpdateConfig 更新缓存配置
func (*TSCache) WriteChunkWithByteTracking ¶
WriteChunkWithByteTracking 向缓存项写入数据块,并跟踪字节计数到父缓存
type TsRingBuffer ¶
type TsRingBuffer struct {
// contains filtered or unexported fields
}
func NewTsRingBuffer ¶
func NewTsRingBuffer(size int) *TsRingBuffer
func (*TsRingBuffer) ReadFrom ¶
func (rb *TsRingBuffer) ReadFrom(seq int64) (out [][]byte, nextSeq int64)
func (*TsRingBuffer) Reset ¶
func (rb *TsRingBuffer) Reset()
func (*TsRingBuffer) TailSeq ¶
func (rb *TsRingBuffer) TailSeq() int64
func (*TsRingBuffer) Write ¶
func (rb *TsRingBuffer) Write(pkt []byte)