Documentation
¶
Index ¶
- Constants
- Variables
- func EstimateFPS(frameIntervals []time.Duration) float64
- func RunStandardStream(stream *Stream, sink StandardStreamSink, ch StreamSinkChan)
- type Camera
- func (c *Camera) Close(wg *sync.WaitGroup)
- func (c *Camera) ExtractHighRes(method ExtractMethod, duration time.Duration) (*videox.PacketBuffer, error)
- func (c *Camera) GetStream(res defs.Resolution) *Stream
- func (c *Camera) HighResRecordingStreamName() string
- func (c *Camera) ID() int64
- func (c *Camera) LastPacketAt() time.Time
- func (c *Camera) LatestImage(contentType string) []byte
- func (c *Camera) LongLivedName() string
- func (c *Camera) LowResRecordingStreamName() string
- func (c *Camera) Name() string
- func (c *Camera) RecordingStreamName(resolution defs.Resolution) string
- func (c *Camera) Start() error
- type CameraBrands
- type CameraRTSPInfo
- type ExtractMethod
- type FullChannelPolicy
- type OnvifDeviceInfo
- type RingBufferListener
- type StandardStreamSink
- type Stream
- func (s *Stream) Close(wg *sync.WaitGroup)
- func (s *Stream) ConnectSink(name string, sink StreamSinkChan)
- func (s *Stream) ConnectSinkAndRun(name string, sink StandardStreamSink) error
- func (s *Stream) Info() *StreamInfo
- func (s *Stream) LastPacketReceivedAt() time.Time
- func (s *Stream) Listen(address string) error
- func (s *Stream) RecentFrameStats() StreamStats
- func (s *Stream) RecentFrameTimes() []float64
- func (s *Stream) RemoveSink(sink StreamSinkChan)
- type StreamInfo
- type StreamMsg
- type StreamMsgType
- type StreamSinkChan
- type StreamStats
- type VideoDecodeReader
- func (r *VideoDecodeReader) Close()
- func (r *VideoDecodeReader) GetLastImageIfDifferent(ifNotEqualTo int64) (*accel.YUVImage, int64, time.Time)
- func (r *VideoDecodeReader) LastImageCopy() (*accel.YUVImage, int64)
- func (r *VideoDecodeReader) LastPacketAt() time.Time
- func (r *VideoDecodeReader) OnConnect(stream *Stream) (StreamSinkChan, error)
- func (r *VideoDecodeReader) OnPacketRTP(packet *videox.VideoPacket)
- type VideoRecorder
- type VideoRingBuffer
- func (r *VideoRingBuffer) AddPacketListener(name string, c chan *videox.VideoPacket, policy FullChannelPolicy)
- func (r *VideoRingBuffer) Close()
- func (r *VideoRingBuffer) ExtractRawBuffer(method ExtractMethod, duration time.Duration) (*videox.PacketBuffer, error)
- func (r *VideoRingBuffer) ExtractRawBufferNoLock(method ExtractMethod, duration time.Duration) (*videox.PacketBuffer, error)
- func (r *VideoRingBuffer) FindLatestIDRPacketNoLock() int
- func (r *VideoRingBuffer) OnConnect(stream *Stream) (StreamSinkChan, error)
- func (r *VideoRingBuffer) OnPacketRTP(packet *videox.VideoPacket)
- func (r *VideoRingBuffer) RemovePacketListener(c chan *videox.VideoPacket)
Constants ¶
const StreamSinkChanDefaultBufferSize = 10
There isn't much rhyme or reason behind this number UPDATE: While running on RPi5, 2 seems to be too small. We frequently get blocking on various stream sinks. Specifically, I'm seeing blocking of up to 4ms on these sinks: 'LD Decode', 'HD Ring'. So I'm raising the size of this buffer from 2 to 4. Blocking when sending to these sinks is very bad, because if you do it enough, you end up losing incoming camera packets. 4 is not enough. Trying 10.
Variables ¶
var AllCameraBrands []CameraBrands
AllCameraBrands is an array of all camera model names, excluding "Unknown"
Functions ¶
func EstimateFPS ¶
Given a set of consecutive frame intervals, estimate the average frames per second. The value is a float64 because cameras can be configured for less than 1 FPS. The numbers I've seen on Hikvision are 1/2, 1/4, 1/8, 1/16
func RunStandardStream ¶
func RunStandardStream(stream *Stream, sink StandardStreamSink, ch StreamSinkChan)
A generic message loop that should cater for most streams
Types ¶
type Camera ¶
type Camera struct {
// Copy from the config database.
// The pointer can be modified in-place if the camera is reconfigured.
// This is shared state, so do not modify the Config fields directly.
// Instead, use Config.Store() to atomically store an entirely new config.
Config atomic.Pointer[configdb.Camera]
Log logs.Log
LowStream *Stream
HighStream *Stream
HighDumper *VideoRingBuffer
LowDecoder *VideoDecodeReader
LowDumper *VideoRingBuffer
// contains filtered or unexported fields
}
Camera represents a single physical camera, with two streams (high and low res)
func (*Camera) Close ¶
Close the camera. If wg is not nil, then you must use it to signal when all of your resources are closed.
func (*Camera) ExtractHighRes ¶
func (c *Camera) ExtractHighRes(method ExtractMethod, duration time.Duration) (*videox.PacketBuffer, error)
Extract from <now - duration> until <now>. duration is a positive number.
func (*Camera) GetStream ¶
func (c *Camera) GetStream(res defs.Resolution) *Stream
Get either the high or low resolution stream
func (*Camera) HighResRecordingStreamName ¶
The name of the high res recording stream in the video archive
func (*Camera) LastPacketAt ¶
Return the time of the most recently received packet from the camera. But NOTE! We return the oldest such packet of the two streams. In other words, we return min(LowStream.LastPacketReceivedAt(), HighStream.LastPacketReceivedAt()). This is to work around an issue where I have a camera who's HD stream frequently fails to send any packets, but the LD stream works fine. I'm not sure if this is a bug in gortsplib, but this seems like a robust strategy to have anyway. This bug only manifests during startup. Once you start receiving packets, it generally keeps going forever.
func (*Camera) LatestImage ¶
func (*Camera) LongLivedName ¶
func (*Camera) LowResRecordingStreamName ¶
The name of the low res recording stream in the video archive
func (*Camera) RecordingStreamName ¶
func (c *Camera) RecordingStreamName(resolution defs.Resolution) string
The name of high/low res recording stream in the video archive
type CameraBrands ¶
type CameraBrands string
const ( // SYNC-CAMERA-BRANDS CameraBrandUnknown CameraBrands = "" CameraBrandHikVision CameraBrands = "HikVision" CameraBrandReolink CameraBrands = "Reolink" CameraBrandGenericRTSP CameraBrands = "Generic RTSP" // Used as a response from the port scanner to indicate that we can connect on RTSP, but we don't know anything else yet CameraBrandGenericONVIF CameraBrands = "Generic ONVIF" // Used as a response from OnvifGetDeviceInfo() to indicate a camera that supports ONVIF, but which we don't recognize )
func IdentifyCameraFromHTTP ¶
func IdentifyCameraFromHTTP(headers http.Header, body string) CameraBrands
Attempt to identify the camera from the HTTP response it sends when asked for it's root page (eg http://192.168.10.5)
type CameraRTSPInfo ¶
func GetCameraRTSP ¶
func GetCameraRTSP(brand CameraBrands, host, username, password string, port int, lowResSuffix, highResSuffix string) (*CameraRTSPInfo, error)
type ExtractMethod ¶
type ExtractMethod int
const ( ExtractMethodShallowClone ExtractMethod = iota // Make a shallow copy of the packets, leaving the camera's buffer intact ExtractMethodDeepClone // Make a deep copy of the packet contents, leaving the camera's buffer intact ExtractMethodDrain // Drain the buffer, leaving the camera's buffer empty )
A note on Shallow vs Deep clone: I think that initially when I was building this, I had forgotten that I was using a garbage collected language, and so I made the Clone extraction a deep clone, where I make a copy of the packet contents. Subsequently, I realized that this is a waste of effort, we should simply use shallow clones most of the time, and let the garbage collector handle the memory sweep. So if in doubt, just use a shallow clone. The reason I leave shallow and deep here explicitly, is for a future proof, in case we need to be stricter about our memory consumption, and take more careful accounting of memory use.
type FullChannelPolicy ¶
type FullChannelPolicy int
If we're sending to a channel, and it is full, what do we do?
const ( FullChannelPolicyDrop FullChannelPolicy = iota // If channel is full, drop packets FullChannelPolicyStall // If channel is full, write the packet anyway, knowing that we'll block )
type OnvifDeviceInfo ¶
type OnvifDeviceInfo struct {
Brand CameraBrands
Model string
Firmware string
Serial string
MainStreamURL string
SubStreamURL string
}
Whatever we have discovered about the camera via ONVIF
func OnvifGetDeviceInfo ¶
func OnvifGetDeviceInfo(host, username, password string) (*OnvifDeviceInfo, error)
Use ONVIF to discover whatever we need to know about the device
type RingBufferListener ¶
type RingBufferListener struct {
Name string
Chan chan *videox.VideoPacket
Policy FullChannelPolicy
// contains filtered or unexported fields
}
RingBufferListener is a listener that receives video packets from the ring buffer via a channel. The name is used only for logging and debugging. The thing that uniquely identifies the listener is the channel.
type StandardStreamSink ¶
type StandardStreamSink interface {
// OnConnect is called by Stream.ConnectSinkAndRun().
// You must return a channel to which all stream messages will be sent.
OnConnect(stream *Stream) (StreamSinkChan, error)
OnPacketRTP(packet *videox.VideoPacket) // Called by RunStandardStream(), when it receives a StreamMsgTypePacket
Close() // Called by RunStandardStream(), when it receives a StreamMsgTypeClose
}
StandardStreamSink allows you to implement an interface for receiving stream packets, instead of writing a select loop.
type Stream ¶
type Stream struct {
Log logs.Log
Client *gortsplib.Client
Ident string // Just for logs. Simply CameraName.StreamName.
CameraName string // Just for logs
StreamName string // The stream name, such as "low" and "high"
Codec videox.Codec
// contains filtered or unexported fields
}
Stream is a bridge between the RTSP library (gortsplib) and one or more "sink" objects. The stream understands just enough about RTSP and video codecs to be able to receive information from gortsplib, transform them into our own internal data structures, and pass them onto the sinks. For each camera, we create one stream to handle the high res video, and another stream for the low res video.
func (*Stream) Close ¶
Close the stream. If wg is not nil, then you must call wg.Done() once all of your sinks have closed themselves.
func (*Stream) ConnectSink ¶
func (s *Stream) ConnectSink(name string, sink StreamSinkChan)
Connect a sink.
Every call to ConnectSink must be accompanied by a call to RemoveSink. The usual time to do this is when receiving StreamMsgTypeClose.
This function will panic if you attempt to add the same sink twice.
func (*Stream) ConnectSinkAndRun ¶
func (s *Stream) ConnectSinkAndRun(name string, sink StandardStreamSink) error
Connect a standard sink object and run it.
You don't need to call RemoveSink when using ConnectSinkAndRun. When RunStandardStream exits, it will call RemoveSink for you.
func (*Stream) Info ¶
func (s *Stream) Info() *StreamInfo
Return the stream info, or nil if we have not yet encountered the necessary NALUs
func (*Stream) LastPacketReceivedAt ¶
Return the wall time of the most recently received packet
func (*Stream) RecentFrameStats ¶
func (s *Stream) RecentFrameStats() StreamStats
Estimate the frame rate
func (*Stream) RecentFrameTimes ¶
type StreamInfo ¶
type StreamMsg ¶
type StreamMsg struct {
Type StreamMsgType
Stream *Stream
Packet *videox.VideoPacket
}
StreamMsg is sent on a channel from the stream to a sink
type StreamMsgType ¶
type StreamMsgType int
const ( StreamMsgTypePacket StreamMsgType = iota // New camera packet StreamMsgTypeClose // Close yourself. There will be no further packets. )
type StreamSinkChan ¶
type StreamSinkChan chan StreamMsg
A stream sink is fundamentally just a channel
type StreamStats ¶
type StreamStats struct {
KeyframeInterval int `json:"keyframeInterval"` // number of frames between keyframes
FPS float64 `json:"fps"` // frames per second
FrameSize float64 `json:"frameSize"` // average frame size in bytes
KeyframeSize float64 `json:"keyframeSize"` // average key-frame size in bytes
InterframeSize float64 `json:"interframeSize"` // average non-key-frame size in bytes
FrameIntervalAvg float64 `json:"frameIntervalAvg"` // Average seconds between frames
FrameIntervalVar float64 `json:"frameIntervalVar"` // Variance of seconds between frames
}
Average observed stats from a sample of recent frames
func (*StreamStats) FPSRounded ¶
func (s *StreamStats) FPSRounded() int
func (*StreamStats) KeyframeIntervalDuration ¶
func (s *StreamStats) KeyframeIntervalDuration() time.Duration
Time between keyframes
type VideoDecodeReader ¶
type VideoDecodeReader struct {
Log logs.Log
//TrackID int
//Track *gortsplib.TrackH264
Decoder *videox.VideoDecoder
// contains filtered or unexported fields
}
VideoDecodeReader decodes the video stream and emits frames NOTE: Our lastImg is a copy of the most recent frame. This memcpy might be a substantial waste if you're decoding a high res stream, and only need access to the latest frame occasionally. Such a scenario might be better suited by a blocking call which waits for a new frame to be decoded, depending upon the acceptable latency.
func NewVideoDecodeReader ¶
func NewVideoDecodeReader() *VideoDecodeReader
func (*VideoDecodeReader) Close ¶
func (r *VideoDecodeReader) Close()
func (*VideoDecodeReader) GetLastImageIfDifferent ¶
func (r *VideoDecodeReader) GetLastImageIfDifferent(ifNotEqualTo int64) (*accel.YUVImage, int64, time.Time)
Return a copy of the latest image and its ID, if it's different to the given ID
func (*VideoDecodeReader) LastImageCopy ¶
func (r *VideoDecodeReader) LastImageCopy() (*accel.YUVImage, int64)
Return a copy of the most recently decoded frame (or nil, if there is none available yet), and the frame ID
func (*VideoDecodeReader) LastPacketAt ¶
func (r *VideoDecodeReader) LastPacketAt() time.Time
Return the time when the last packet was received
func (*VideoDecodeReader) OnConnect ¶
func (r *VideoDecodeReader) OnConnect(stream *Stream) (StreamSinkChan, error)
func (*VideoDecodeReader) OnPacketRTP ¶
func (r *VideoDecodeReader) OnPacketRTP(packet *videox.VideoPacket)
type VideoRecorder ¶
VideoRecorder writes incoming packets to our 'fsv' video archive We operate on top of VideoRingBuffer, so that we can start recording at any moment, and have some history of packets to write to the archive. For event-triggered recording modes, this is vital because you always want some history that preceded the moment of the event trigger. For continuous recording modes this is not important.
func StartVideoRecorder ¶
func StartVideoRecorder(ringBuffer *VideoRingBuffer, streamName string, archive *fsv.Archive, includeHistory time.Duration) *VideoRecorder
Create a new video recorder and start recording. This function is expected to return very quickly. Specifically, the code inside LiveCameras that starts/stops recorders holds a lock while it performs this operation, assuming that this function will return very quickly.
func (*VideoRecorder) Stop ¶
func (r *VideoRecorder) Stop()
Stop recording. Like StartVideoRecorder(), this function is expected to return immediately.
type VideoRingBuffer ¶
type VideoRingBuffer struct {
Log logs.Log
BufferLock sync.Mutex // Guards all access to Buffer
Buffer ringbuffer.WeightedRingT[videox.VideoPacket]
// contains filtered or unexported fields
}
VideoRingBuffer stores incoming packets in a fixed-size ring buffer, so that we always have a bit of video history to use. This is specifically useful when recordings are triggered by events such as motion or object detection. In such a case, you always want some seconds of prior history, from the moments before the event was detected.
If you need to extract some history, and then continue receiving packets and guarantee that there is no gap in between those two, then you do this: 1. BufferLock.Lock() 2. ExtractRawBufferNoLock() 3. AddPacketListener() 4. BufferLock.Unlock()
The above sequence is what videoRecorder uses when it starts recording.
func NewVideoRingBuffer ¶
func NewVideoRingBuffer(maxRingBufferBytes int) *VideoRingBuffer
func (*VideoRingBuffer) AddPacketListener ¶
func (r *VideoRingBuffer) AddPacketListener(name string, c chan *videox.VideoPacket, policy FullChannelPolicy)
func (*VideoRingBuffer) Close ¶
func (r *VideoRingBuffer) Close()
func (*VideoRingBuffer) ExtractRawBuffer ¶
func (r *VideoRingBuffer) ExtractRawBuffer(method ExtractMethod, duration time.Duration) (*videox.PacketBuffer, error)
Take BufferLock, then call ExtractRawBufferNoLock
func (*VideoRingBuffer) ExtractRawBufferNoLock ¶
func (r *VideoRingBuffer) ExtractRawBufferNoLock(method ExtractMethod, duration time.Duration) (*videox.PacketBuffer, error)
Extract from <video_end - duration> until <video_end>. video_end is the PTS of the most recently received packet. duration is a positive number. You must be holding BufferLock before calling this function.
func (*VideoRingBuffer) FindLatestIDRPacketNoLock ¶
func (r *VideoRingBuffer) FindLatestIDRPacketNoLock() int
Scan backwards in the ring buffer to find the most recent packet containing an IDR frame Assumes that you are holding BufferLock Returns the index in the buffer, or -1 if none found
func (*VideoRingBuffer) OnConnect ¶
func (r *VideoRingBuffer) OnConnect(stream *Stream) (StreamSinkChan, error)
func (*VideoRingBuffer) OnPacketRTP ¶
func (r *VideoRingBuffer) OnPacketRTP(packet *videox.VideoPacket)
func (*VideoRingBuffer) RemovePacketListener ¶
func (r *VideoRingBuffer) RemovePacketListener(c chan *videox.VideoPacket)