mjpeg

package
v0.0.0-...-cfd2245 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2026 License: Apache-2.0 Imports: 18 Imported by: 0

README

MJPEG Trigger

The MJPEG trigger enables Nuclio functions to process frames from Motion JPEG (MJPEG) video streams in real-time.

Overview

Motion JPEG is a video compression format where each frame is individually compressed as a JPEG image. MJPEG streams are commonly used in:

  • IP cameras and webcams
  • Video surveillance systems
  • Live streaming applications

The MJPEG trigger connects to an MJPEG stream URL, extracts individual frames, and passes them to your function for processing.

Configuration

Required Attributes
  • url (string): The URL of the MJPEG stream to connect to
    • Example: http://camera.example.com:8080/stream.mjpg
Optional Attributes
  • processing_factor (int): Controls frame sampling to reduce processing load
    • Default: 1 (process every frame)
    • Value 2: process every 2nd frame (drop 50% of frames)
    • Value 3: process every 3rd frame (drop 66% of frames)
    • Must be >= 1

Example Configuration

Process Every Frame
triggers:
  mjpeg_stream:
    kind: mjpeg
    attributes:
      url: "http://192.168.1.100:8080/video.mjpg"
      processing_factor: 1
Process Every 5th Frame (80% frame drop rate)
triggers:
  mjpeg_stream:
    kind: mjpeg
    attributes:
      url: "http://camera.local/stream.mjpg"
      processing_factor: 5

Event Structure

When a frame is processed, your function receives an event with:

  • Body: Raw JPEG image data (bytes)
  • Content-Type: image/jpeg
  • Fields:
    • frame_num: Sequential frame number (int64)
    • url: The source MJPEG stream URL
    • timestamp: Frame capture timestamp
Example Handler (Python)
def handler(context, event):
    # Get the JPEG frame data
    frame_data = event.body
    
    # Get frame metadata
    frame_num = event.get_field("frame_num")
    url = event.get_field("url")
    
    context.logger.info(f"Processing frame {frame_num} from {url}")
    context.logger.info(f"Frame size: {len(frame_data)} bytes")
    
    # Process the frame (e.g., save to disk, analyze with CV library, etc.)
    # ...
    
    return context.Response(
        body=f"Processed frame {frame_num}",
        status_code=200
    )
Example Handler with Image Processing (Python)
import cv2
import numpy as np

def handler(context, event):
    # Decode JPEG frame
    nparr = np.frombuffer(event.body, np.uint8)
    img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
    
    # Perform image processing
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    faces = face_cascade.detectMultiScale(gray, 1.1, 4)
    
    frame_num = event.get_field("frame_num")
    context.logger.info(f"Detected {len(faces)} faces in frame {frame_num}")
    
    return context.Response(
        body=f"Faces detected: {len(faces)}",
        status_code=200
    )

Behavior

Stream Connection
  • The trigger automatically connects to the MJPEG stream on start
  • If the connection is lost, it automatically retries every 5 seconds
  • The trigger continues running until explicitly stopped
Frame Processing
  • Frames are extracted from the multipart MIME stream
  • Each frame is wrapped as a Nuclio event
  • The event is submitted to an available worker for processing
  • If processing_factor > 1, frames are dropped to reduce load
Error Handling
  • Connection errors trigger automatic reconnection
  • Frame parsing errors are logged but don't stop the stream
  • Worker allocation failures are logged (event is dropped)

Use Cases

  1. Video Analytics: Real-time object detection, face recognition, motion detection
  2. Surveillance: Monitor camera feeds for security events
  3. Quality Control: Inspect manufacturing processes via camera feeds
  4. Traffic Monitoring: Analyze traffic patterns from road cameras
  5. Retail Analytics: Count customers, analyze behavior patterns

Performance Considerations

  • Frame Rate: Use processing_factor to control CPU/memory usage
  • Worker Pool: Configure adequate workers to handle frame processing rate
  • Network: Ensure stable network connection to the MJPEG source
  • Processing Time: If processing takes longer than frame interval, consider increasing processing_factor

Limitations

  • Only supports MJPEG streams (not H.264, RTSP, etc.)
  • One trigger instance connects to one stream URL
  • Frames are processed asynchronously - order is not guaranteed if worker pool > 1

Implementation Details

Files
  • factory.go: Trigger factory implementation for registration
  • event.go: Event structure wrapping MJPEG frames
  • trigger.go: Main trigger logic for stream connection and frame processing
  • types.go: Configuration structure and validation
Key Components
  1. Stream Connection: Uses http.Client to connect to MJPEG URL
  2. Frame Extraction: Parses multipart MIME boundary and headers
  3. Frame Processing: Submits frames to worker pool with configured sampling factor
  4. Error Recovery: Automatic reconnection on connection failures

Documentation

Index

Constants

View Source
const (
	DefaultProcessingFactor = 1 // Process every frame by default
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Configuration

type Configuration struct {
	trigger.Configuration
	URL              string             `mapstructure:"url"`
	ProcessingFactor int                `mapstructure:"processing_factor"`
	Sink             *SinkConfiguration `mapstructure:"sink"`
}

Configuration holds the MJPEG trigger configuration

func NewConfiguration

func NewConfiguration(id string,
	triggerConfiguration *functionconfig.Trigger,
	runtimeConfiguration *runtime.Configuration) (*Configuration, error)

NewConfiguration creates a new MJPEG trigger configuration

type Event

type Event struct {
	nuclio.AbstractEvent
	// contains filtered or unexported fields
}

Event contains the data for a MJPEG frame event

func (*Event) GetBody

func (e *Event) GetBody() []byte

GetBody returns the frame data (JPEG image bytes)

func (*Event) GetBodyObject

func (e *Event) GetBodyObject() interface{}

GetBodyObject returns nil since MJPEG events don't have a body object

func (*Event) GetBodyString

func (e *Event) GetBodyString() string

GetBodyString returns the frame data as a string (not recommended for binary data)

func (*Event) GetContentType

func (e *Event) GetContentType() string

GetContentType returns the content type for JPEG images

func (*Event) GetField

func (e *Event) GetField(key string) interface{}

GetField returns a specific event field

func (*Event) GetFieldByteSlice

func (e *Event) GetFieldByteSlice(key string) []byte

GetFieldByteSlice returns an event field as a byte slice

func (*Event) GetFieldInt

func (e *Event) GetFieldInt(key string) (int, error)

GetFieldInt returns an event field as an int

func (*Event) GetFieldString

func (e *Event) GetFieldString(key string) string

GetFieldString returns a specific event field as a string

func (*Event) GetFields

func (e *Event) GetFields() map[string]interface{}

GetFields returns the event fields

func (*Event) GetHeader

func (e *Event) GetHeader(key string) interface{}

GetHeader returns nil since MJPEG events don't have headers

func (*Event) GetHeaderByteSlice

func (e *Event) GetHeaderByteSlice(key string) []byte

GetHeaderByteSlice returns nil since MJPEG events don't have headers

func (*Event) GetHeaderInt

func (e *Event) GetHeaderInt(key string) (int, error)

GetHeaderInt returns 0 since MJPEG events don't have headers

func (*Event) GetHeaderString

func (e *Event) GetHeaderString(key string) string

GetHeaderString returns empty string since MJPEG events don't have headers

func (*Event) GetHeaders

func (e *Event) GetHeaders() map[string]interface{}

GetHeaders returns nil since MJPEG events don't have headers

func (*Event) GetMethod

func (e *Event) GetMethod() string

GetMethod returns an empty string since MJPEG events don't have a method

func (*Event) GetPath

func (e *Event) GetPath() string

GetPath returns the URL path of the MJPEG stream

func (*Event) GetShardID

func (e *Event) GetShardID() int

GetShardID returns 0 since MJPEG events don't have a shard ID

func (*Event) GetSize

func (e *Event) GetSize() int

GetSize returns the size of the frame data

func (*Event) GetTimestamp

func (e *Event) GetTimestamp() time.Time

GetTimestamp returns the event timestamp

func (*Event) GetType

func (e *Event) GetType() string

GetType returns "mjpeg" as the type

func (*Event) GetTypeVersion

func (e *Event) GetTypeVersion() string

GetTypeVersion returns an empty string since MJPEG events don't have a type version

func (*Event) GetURL

func (e *Event) GetURL() string

GetURL returns the full URL of the MJPEG stream

type SinkConfiguration

type SinkConfiguration struct {
	Kind       string                 `mapstructure:"kind"`
	Attributes map[string]interface{} `mapstructure:"attributes"`
}

SinkConfiguration holds the sink configuration for the trigger

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL