camera

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2025 License: MIT Imports: 37 Imported by: 0

Documentation

Index

Constants

View Source
const StreamSinkChanDefaultBufferSize = 10

There isn't much rhyme or reason behind this number UPDATE: While running on RPi5, 2 seems to be too small. We frequently get blocking on various stream sinks. Specifically, I'm seeing blocking of up to 4ms on these sinks: 'LD Decode', 'HD Ring'. So I'm raising the size of this buffer from 2 to 4. Blocking when sending to these sinks is very bad, because if you do it enough, you end up losing incoming camera packets. 4 is not enough. Trying 10.

Variables

View Source
var AllCameraBrands []CameraBrands

AllCameraBrands is an array of all camera model names, excluding "Unknown"

Functions

func EstimateFPS

func EstimateFPS(frameIntervals []time.Duration) float64

Given a set of consecutive frame intervals, estimate the average frames per second. The value is a float64 because cameras can be configured for less than 1 FPS. The numbers I've seen on Hikvision are 1/2, 1/4, 1/8, 1/16

func RunStandardStream

func RunStandardStream(stream *Stream, sink StandardStreamSink, ch StreamSinkChan)

A generic message loop that should cater for most streams

Types

type Camera

type Camera struct {
	// Copy from the config database.
	// The pointer can be modified in-place if the camera is reconfigured.
	// This is shared state, so do not modify the Config fields directly.
	// Instead, use Config.Store() to atomically store an entirely new config.
	Config atomic.Pointer[configdb.Camera]

	Log        logs.Log
	LowStream  *Stream
	HighStream *Stream
	HighDumper *VideoRingBuffer
	LowDecoder *VideoDecodeReader
	LowDumper  *VideoRingBuffer
	// contains filtered or unexported fields
}

Camera represents a single physical camera, with two streams (high and low res)

func NewCamera

func NewCamera(log logs.Log, cfg configdb.Camera, ringBufferSizeBytes int) (*Camera, error)

func (*Camera) Close

func (c *Camera) Close(wg *sync.WaitGroup)

Close the camera. If wg is not nil, then you must use it to signal when all of your resources are closed.

func (*Camera) ExtractHighRes

func (c *Camera) ExtractHighRes(method ExtractMethod, duration time.Duration) (*videox.PacketBuffer, error)

Extract from <now - duration> until <now>. duration is a positive number.

func (*Camera) GetStream

func (c *Camera) GetStream(res defs.Resolution) *Stream

Get either the high or low resolution stream

func (*Camera) HighResRecordingStreamName

func (c *Camera) HighResRecordingStreamName() string

The name of the high res recording stream in the video archive

func (*Camera) ID

func (c *Camera) ID() int64

func (*Camera) LastPacketAt

func (c *Camera) LastPacketAt() time.Time

Return the time of the most recently received packet from the camera. But NOTE! We return the oldest such packet of the two streams. In other words, we return min(LowStream.LastPacketReceivedAt(), HighStream.LastPacketReceivedAt()). This is to work around an issue where I have a camera who's HD stream frequently fails to send any packets, but the LD stream works fine. I'm not sure if this is a bug in gortsplib, but this seems like a robust strategy to have anyway. This bug only manifests during startup. Once you start receiving packets, it generally keeps going forever.

func (*Camera) LatestImage

func (c *Camera) LatestImage(contentType string) []byte

func (*Camera) LongLivedName

func (c *Camera) LongLivedName() string

func (*Camera) LowResRecordingStreamName

func (c *Camera) LowResRecordingStreamName() string

The name of the low res recording stream in the video archive

func (*Camera) Name

func (c *Camera) Name() string

func (*Camera) RecordingStreamName

func (c *Camera) RecordingStreamName(resolution defs.Resolution) string

The name of high/low res recording stream in the video archive

func (*Camera) Start

func (c *Camera) Start() error

type CameraBrands

type CameraBrands string
const (
	// SYNC-CAMERA-BRANDS
	CameraBrandUnknown      CameraBrands = ""
	CameraBrandHikVision    CameraBrands = "HikVision"
	CameraBrandReolink      CameraBrands = "Reolink"
	CameraBrandGenericRTSP  CameraBrands = "Generic RTSP"  // Used as a response from the port scanner to indicate that we can connect on RTSP, but we don't know anything else yet
	CameraBrandGenericONVIF CameraBrands = "Generic ONVIF" // Used as a response from OnvifGetDeviceInfo() to indicate a camera that supports ONVIF, but which we don't recognize
)

func IdentifyCameraFromHTTP

func IdentifyCameraFromHTTP(headers http.Header, body string) CameraBrands

Attempt to identify the camera from the HTTP response it sends when asked for it's root page (eg http://192.168.10.5)

type CameraRTSPInfo

type CameraRTSPInfo struct {
	LowResURL               string
	HighResURL              string
	PacketsAreAnnexBEncoded bool
}

func GetCameraRTSP

func GetCameraRTSP(brand CameraBrands, host, username, password string, port int, lowResSuffix, highResSuffix string) (*CameraRTSPInfo, error)

type ExtractMethod

type ExtractMethod int
const (
	ExtractMethodShallowClone ExtractMethod = iota // Make a shallow copy of the packets, leaving the camera's buffer intact
	ExtractMethodDeepClone                         // Make a deep copy of the packet contents, leaving the camera's buffer intact
	ExtractMethodDrain                             // Drain the buffer, leaving the camera's buffer empty
)

A note on Shallow vs Deep clone: I think that initially when I was building this, I had forgotten that I was using a garbage collected language, and so I made the Clone extraction a deep clone, where I make a copy of the packet contents. Subsequently, I realized that this is a waste of effort, we should simply use shallow clones most of the time, and let the garbage collector handle the memory sweep. So if in doubt, just use a shallow clone. The reason I leave shallow and deep here explicitly, is for a future proof, in case we need to be stricter about our memory consumption, and take more careful accounting of memory use.

type FullChannelPolicy

type FullChannelPolicy int

If we're sending to a channel, and it is full, what do we do?

const (
	FullChannelPolicyDrop  FullChannelPolicy = iota // If channel is full, drop packets
	FullChannelPolicyStall                          // If channel is full, write the packet anyway, knowing that we'll block
)

type OnvifDeviceInfo

type OnvifDeviceInfo struct {
	Brand         CameraBrands
	Model         string
	Firmware      string
	Serial        string
	MainStreamURL string
	SubStreamURL  string
}

Whatever we have discovered about the camera via ONVIF

func OnvifGetDeviceInfo

func OnvifGetDeviceInfo(host, username, password string) (*OnvifDeviceInfo, error)

Use ONVIF to discover whatever we need to know about the device

type RingBufferListener

type RingBufferListener struct {
	Name   string
	Chan   chan *videox.VideoPacket
	Policy FullChannelPolicy
	// contains filtered or unexported fields
}

RingBufferListener is a listener that receives video packets from the ring buffer via a channel. The name is used only for logging and debugging. The thing that uniquely identifies the listener is the channel.

type StandardStreamSink

type StandardStreamSink interface {
	// OnConnect is called by Stream.ConnectSinkAndRun().
	// You must return a channel to which all stream messages will be sent.
	OnConnect(stream *Stream) (StreamSinkChan, error)
	OnPacketRTP(packet *videox.VideoPacket) // Called by RunStandardStream(), when it receives a StreamMsgTypePacket
	Close()                                 // Called by RunStandardStream(), when it receives a StreamMsgTypeClose
}

StandardStreamSink allows you to implement an interface for receiving stream packets, instead of writing a select loop.

type Stream

type Stream struct {
	Log        logs.Log
	Client     *gortsplib.Client
	Ident      string // Just for logs. Simply CameraName.StreamName.
	CameraName string // Just for logs
	StreamName string // The stream name, such as "low" and "high"
	Codec      videox.Codec
	// contains filtered or unexported fields
}

Stream is a bridge between the RTSP library (gortsplib) and one or more "sink" objects. The stream understands just enough about RTSP and video codecs to be able to receive information from gortsplib, transform them into our own internal data structures, and pass them onto the sinks. For each camera, we create one stream to handle the high res video, and another stream for the low res video.

func NewStream

func NewStream(logger logs.Log, cameraName, streamName string, cameraSendsAnnexBEncoded bool) *Stream

func (*Stream) Close

func (s *Stream) Close(wg *sync.WaitGroup)

Close the stream. If wg is not nil, then you must call wg.Done() once all of your sinks have closed themselves.

func (*Stream) ConnectSink

func (s *Stream) ConnectSink(name string, sink StreamSinkChan)

Connect a sink.

Every call to ConnectSink must be accompanied by a call to RemoveSink. The usual time to do this is when receiving StreamMsgTypeClose.

This function will panic if you attempt to add the same sink twice.

func (*Stream) ConnectSinkAndRun

func (s *Stream) ConnectSinkAndRun(name string, sink StandardStreamSink) error

Connect a standard sink object and run it.

You don't need to call RemoveSink when using ConnectSinkAndRun. When RunStandardStream exits, it will call RemoveSink for you.

func (*Stream) Info

func (s *Stream) Info() *StreamInfo

Return the stream info, or nil if we have not yet encountered the necessary NALUs

func (*Stream) LastPacketReceivedAt

func (s *Stream) LastPacketReceivedAt() time.Time

Return the wall time of the most recently received packet

func (*Stream) Listen

func (s *Stream) Listen(address string) error

func (*Stream) RecentFrameStats

func (s *Stream) RecentFrameStats() StreamStats

Estimate the frame rate

func (*Stream) RecentFrameTimes

func (s *Stream) RecentFrameTimes() []float64

func (*Stream) RemoveSink

func (s *Stream) RemoveSink(sink StreamSinkChan)

Remove a sink

type StreamInfo

type StreamInfo struct {
	Width  int
	Height int
}

type StreamMsg

type StreamMsg struct {
	Type   StreamMsgType
	Stream *Stream
	Packet *videox.VideoPacket
}

StreamMsg is sent on a channel from the stream to a sink

type StreamMsgType

type StreamMsgType int
const (
	StreamMsgTypePacket StreamMsgType = iota // New camera packet
	StreamMsgTypeClose                       // Close yourself. There will be no further packets.
)

type StreamSinkChan

type StreamSinkChan chan StreamMsg

A stream sink is fundamentally just a channel

type StreamStats

type StreamStats struct {
	KeyframeInterval int     `json:"keyframeInterval"` // number of frames between keyframes
	FPS              float64 `json:"fps"`              // frames per second
	FrameSize        float64 `json:"frameSize"`        // average frame size in bytes
	KeyframeSize     float64 `json:"keyframeSize"`     // average key-frame size in bytes
	InterframeSize   float64 `json:"interframeSize"`   // average non-key-frame size in bytes
	FrameIntervalAvg float64 `json:"frameIntervalAvg"` // Average seconds between frames
	FrameIntervalVar float64 `json:"frameIntervalVar"` // Variance of seconds between frames
}

Average observed stats from a sample of recent frames

func (*StreamStats) FPSRounded

func (s *StreamStats) FPSRounded() int

func (*StreamStats) KeyframeIntervalDuration

func (s *StreamStats) KeyframeIntervalDuration() time.Duration

Time between keyframes

type VideoDecodeReader

type VideoDecodeReader struct {
	Log logs.Log
	//TrackID int
	//Track   *gortsplib.TrackH264
	Decoder *videox.VideoDecoder
	// contains filtered or unexported fields
}

VideoDecodeReader decodes the video stream and emits frames NOTE: Our lastImg is a copy of the most recent frame. This memcpy might be a substantial waste if you're decoding a high res stream, and only need access to the latest frame occasionally. Such a scenario might be better suited by a blocking call which waits for a new frame to be decoded, depending upon the acceptable latency.

func NewVideoDecodeReader

func NewVideoDecodeReader() *VideoDecodeReader

func (*VideoDecodeReader) Close

func (r *VideoDecodeReader) Close()

func (*VideoDecodeReader) GetLastImageIfDifferent

func (r *VideoDecodeReader) GetLastImageIfDifferent(ifNotEqualTo int64) (*accel.YUVImage, int64, time.Time)

Return a copy of the latest image and its ID, if it's different to the given ID

func (*VideoDecodeReader) LastImageCopy

func (r *VideoDecodeReader) LastImageCopy() (*accel.YUVImage, int64)

Return a copy of the most recently decoded frame (or nil, if there is none available yet), and the frame ID

func (*VideoDecodeReader) LastPacketAt

func (r *VideoDecodeReader) LastPacketAt() time.Time

Return the time when the last packet was received

func (*VideoDecodeReader) OnConnect

func (r *VideoDecodeReader) OnConnect(stream *Stream) (StreamSinkChan, error)

func (*VideoDecodeReader) OnPacketRTP

func (r *VideoDecodeReader) OnPacketRTP(packet *videox.VideoPacket)

type VideoRecorder

type VideoRecorder struct {
	Log logs.Log
	// contains filtered or unexported fields
}

VideoRecorder writes incoming packets to our 'fsv' video archive We operate on top of VideoRingBuffer, so that we can start recording at any moment, and have some history of packets to write to the archive. For event-triggered recording modes, this is vital because you always want some history that preceded the moment of the event trigger. For continuous recording modes this is not important.

func StartVideoRecorder

func StartVideoRecorder(ringBuffer *VideoRingBuffer, streamName string, archive *fsv.Archive, includeHistory time.Duration) *VideoRecorder

Create a new video recorder and start recording. This function is expected to return very quickly. Specifically, the code inside LiveCameras that starts/stops recorders holds a lock while it performs this operation, assuming that this function will return very quickly.

func (*VideoRecorder) Stop

func (r *VideoRecorder) Stop()

Stop recording. Like StartVideoRecorder(), this function is expected to return immediately.

type VideoRingBuffer

type VideoRingBuffer struct {
	Log logs.Log

	BufferLock sync.Mutex // Guards all access to Buffer
	Buffer     ringbuffer.WeightedRingT[videox.VideoPacket]
	// contains filtered or unexported fields
}

VideoRingBuffer stores incoming packets in a fixed-size ring buffer, so that we always have a bit of video history to use. This is specifically useful when recordings are triggered by events such as motion or object detection. In such a case, you always want some seconds of prior history, from the moments before the event was detected.

If you need to extract some history, and then continue receiving packets and guarantee that there is no gap in between those two, then you do this: 1. BufferLock.Lock() 2. ExtractRawBufferNoLock() 3. AddPacketListener() 4. BufferLock.Unlock()

The above sequence is what videoRecorder uses when it starts recording.

func NewVideoRingBuffer

func NewVideoRingBuffer(maxRingBufferBytes int) *VideoRingBuffer

func (*VideoRingBuffer) AddPacketListener

func (r *VideoRingBuffer) AddPacketListener(name string, c chan *videox.VideoPacket, policy FullChannelPolicy)

func (*VideoRingBuffer) Close

func (r *VideoRingBuffer) Close()

func (*VideoRingBuffer) ExtractRawBuffer

func (r *VideoRingBuffer) ExtractRawBuffer(method ExtractMethod, duration time.Duration) (*videox.PacketBuffer, error)

Take BufferLock, then call ExtractRawBufferNoLock

func (*VideoRingBuffer) ExtractRawBufferNoLock

func (r *VideoRingBuffer) ExtractRawBufferNoLock(method ExtractMethod, duration time.Duration) (*videox.PacketBuffer, error)

Extract from <video_end - duration> until <video_end>. video_end is the PTS of the most recently received packet. duration is a positive number. You must be holding BufferLock before calling this function.

func (*VideoRingBuffer) FindLatestIDRPacketNoLock

func (r *VideoRingBuffer) FindLatestIDRPacketNoLock() int

Scan backwards in the ring buffer to find the most recent packet containing an IDR frame Assumes that you are holding BufferLock Returns the index in the buffer, or -1 if none found

func (*VideoRingBuffer) OnConnect

func (r *VideoRingBuffer) OnConnect(stream *Stream) (StreamSinkChan, error)

func (*VideoRingBuffer) OnPacketRTP

func (r *VideoRingBuffer) OnPacketRTP(packet *videox.VideoPacket)

func (*VideoRingBuffer) RemovePacketListener

func (r *VideoRingBuffer) RemovePacketListener(c chan *videox.VideoPacket)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL