README
¶
Digital Hub Serverless
Nuclio "Serverless"-based framework for Job/serverless executions compatible with DH Core. The product is a set of python images that can be used to run
- serverless jobs in a Kubernetes cluster (
jobtrigger). - functions as APIs in a Kubernetes cluster (based on Nuclio
httptrigger). - traffic processing tasks as extensions (
ext_proc) to Envoy proxy as a service sidecar or standalone.
Triggers
http- HTTP triggerjob- Job triggerwebsocket- WebSocket triggerextproc- Envoy proxy ExtProc triggeropeninference- Open Inference v2 protocol triggerrtsp- RTSP trigger for audio (LPCM) and video content (MJPEG, H264)mjpeg- MJPEG trigger
Job Trigger Configuration
Trigger of type job extends the family of available triggers with the possibility to execute the container just ones with the predefined
event defined in the trigger specification.
Extproc Trigger Configuration
Trigger of type extproc extends the family of available triggers with the functionality to handle envoy extProc message processing. See the corresponding
Envoy Proxy Specification for details of the integration configuration.
When exposed, the Envoy proxy ProcessingRequest messages are handled by the specified runtime implementation. Based on the processor pattern, the
handler processes and transforms the incoming request, the outgoing response, or even controls whether the processing may be interrupted. More specifically, the
following scenarios are available:
preprocessorhandler: receives the Event object representing the incoming HTTP message (with URL, body, headers, etc) and returns the modified object to be passed to the upstream service. If response with Status > 0 is returned, it is sent as immediate response. In case of processing error, an error is returned as immediate response.postprocessorhandler: receives the Event object representing the outgoing HTTP message (with URL, body, headers, etc) and returns the modified object to be passed to the client. If response with Status > 0 is returned, it is sent as response with that status. In case of processing error, an error is returned as immediate response.observeprocessorhandler: receives both the request and response objects and may perform some processing of that without, however, altering the flow. In fact, the execution of observe processor should be considered asynchronous and its results are ignored.wrapprocessorhandler: receives both the request and response objects and may perform some processing of ther messages. If upon request event it is necesary to prevent the propagation to the upstream service, the wrap processor should return the result asnuclio.Responsewith status > 0..
In order to see which processing phase is engaged, the request object is equipped with the processing-phase header with the following values:
- process request headers: 1
- process request body: 2
- process response headers: 4
- process response body: 5
The trigger configuration is defined as follows:
myextproctrigger:
kind: extproc
attributes:
type: wrapprocessor
port: 5051
gracefulShutdownTimeout: 15
maxConcurrentStreams: 100
processingOptions:
logStream: false
logPhases: false
requestIdHeaderName: x-request-id
bufferStreamedBodies: false
perRequestBodyBufferBytes: 102400
decompressBodies: true
maxWorkers: 4
...
where
-
typedefines the processing pattern (required) -
portdefines the gRPC port to expose (required) -
gracefulShutdownTimeouttimeout for the processor shutdown (15 sec) -
maxConcurrentStreamsfor the gRPC server (100) -
logStreamandlogPhasesdefine whether to log the processing information for debugging (false) -
requestIdHeaderNamethe name of the request id header as defined by the Envoy proxy (x-request-id) -
bufferStreamedBodieswhether streamed body should be bufffered withperRequestBodyBufferBytesspecifying the buffer size (false, 0) -
decompressBodieswhether to decompress body for processing (true)
Testing
To test the extproc functionality, it is possible to use the Docker compose application including the Envoy proxy with the predefined configuration and a simple upstream service. The Envoy configuration handles all the traffic with the
extproc gRPC server outside of the compose (host.docker.internal, port 5051).
To run / debug the extproc processor, run the predefined script: test/extproc/run.sh. The application relies on a Python runtime and therefore expects a preconfigured Python runtime with the Nuclio python SDK installed.
Once you have the docker container and the application running, you can test it with the following curl command:
curl localhost:8080/resource -X POST -H 'Content-type: text/plain' -d 'hello' -s -vvv
HTTP Trigger Configuration
Trigger of type http exposes REST endpoints and is the most common way to serve APIs and lightweight inference endpoints.
Configuration example:
triggers:
myhttp:
kind: http
attributes:
port: 8080
maxWorkers: 4
Handler function:
def handler(context, event):
# event.body contains request body
return context.Response(body='ok', status_code=200)
Testing:
curl http://localhost:8080/ -X POST -d 'hello'
WebSocket Trigger Configuration
Trigger of type websocket accepts websocket connections and forwards messages to the function handler. Useful for real-time bidirectional communication (streams, control channels).
Configuration example:
triggers:
myws:
kind: websocket
attributes:
port: 8080
maxWorkers: 4
Handler behavior:
- On connection the handler may receive connection events
- Received messages are delivered as events to the handler
- Handler can return responses which are sent back to the client
Example handler (simplified):
def handler(context, event):
msg = event.body.decode('utf-8')
# echo
return context.Response(body=msg, status_code=200)
Testing (ws client):
websocat ws://localhost:8080/
RTSP Trigger Configuration
Trigger of type rtsp enables receiving and streaming media over RTSP, commonly used for camera feeds and audio streams. It integrates with sinks to re-stream or forward processed media.
Configuration example:
triggers:
myrtsp:
kind: rtsp
attributes:
url: "rtsp://camera.local:8554/stream"
maxWorkers: 4
Event structure: frames or audio packets depending on stream type. Handler receives binary payloads with metadata fields such as timestamp and frame_num.
Testing:
# Start processor and connect a client (e.g. ffplay)
ffplay rtsp://localhost:8554/stream
OpenInference Trigger Configuration
Trigger of type openinference extends the family of available triggers with the functionality to serve machine learning models using the OpenInference protocol. This trigger provides standardized REST and gRPC endpoints compatible with inference serving frameworks like NVIDIA Triton, KServe, and other OpenInference-compliant systems.
The trigger exposes the following endpoints:
REST Endpoints (HTTP/JSON)
GET /v2/health/live- Server liveness checkGET /v2/health/ready- Server readiness checkGET /v2/models/{model_name}/versions/{version}/ready- Model readiness checkGET /v2/models/{model_name}/versions/{version}- Model metadata (inputs/outputs schema)POST /v2/models/{model_name}/versions/{version}/infer- Perform inference
gRPC Endpoints
The trigger implements the GRPCInferenceService from the OpenInference protocol specification:
ServerLive- Server liveness checkServerReady- Server readiness checkModelReady- Model readiness checkServerMetadata- Server metadataModelMetadata- Model metadata with tensor definitionsModelInfer- Perform inference
Configuration
The trigger configuration is defined as follows:
triggers:
myopeninferencetrigger:
kind: openinference
attributes:
model_name: my-model # Model name (default: "model")
model_version: "1.0" # Model version (default: "1")
rest_port: 8080 # REST API port (default: 8080)
grpc_port: 9000 # gRPC port (default: 9000)
enable_rest: true # Enable REST endpoints (default: true)
enable_grpc: true # Enable gRPC endpoints (default: true)
input_tensors: # Input tensor definitions
- name: input
datatype: FP32
shape: [1, 3, 224, 224]
output_tensors: # Output tensor definitions
- name: output
datatype: FP32
shape: [1, 1000]
maxWorkers: 4
Configuration Parameters:
model_name- Name of the model being served (required)model_version- Version identifier for the model (default: "1")rest_port- TCP port for REST API endpoints (default: 8080)grpc_port- TCP port for gRPC service (default: 9000)enable_rest- Enable REST API endpoints (default: true)enable_grpc- Enable gRPC service (default: true)input_tensors- Array of input tensor definitions with name, datatype, and shapeoutput_tensors- Array of output tensor definitions with name, datatype, and shape
Supported Data Types:
BOOL, UINT8, UINT16, UINT32, UINT64, INT8, INT16, INT32, INT64, FP16, FP32, FP64, BYTES
Handler Function:
The handler function receives an event with the inference request and should return a response in the OpenInference format:
def handler(context, event):
# Parse input tensors from event.body
request = json.loads(event.body)
inputs = request["inputs"]
# Perform inference
# ... your model inference code ...
# Return output tensors
return context.Response(
body=json.dumps({
"model_name": request["model_name"],
"model_version": request["model_version"],
"outputs": [
{
"name": "output",
"datatype": "FP32",
"shape": [1, 1000],
"data": output_data
}
]
}),
headers={},
content_type="application/json",
status_code=200
)
Testing
To test the OpenInference trigger functionality, use the test suite in the test/openinference/ directory. The test suite includes:
- Python inference handler example
- REST API test client with comprehensive test scenarios
- gRPC client test examples
- Sample configuration
To run the test:
# Start the processor with the OpenInference trigger
./test/openinference/run.sh
# In another terminal, run the REST API tests
cd test/openinference
python3 test_rest_client.py
# Or run the gRPC tests
python3 test_grpc_client.py
Example REST API test:
# Check server liveness
curl http://localhost:8080/v2/health/live
# Get model metadata
curl http://localhost:8080/v2/models/test-model/versions/1.0
# Perform inference
curl -X POST http://localhost:8080/v2/models/test-model/versions/1.0/infer \
-H "Content-Type: application/json" \
-d '{
"inputs": [{
"name": "input",
"datatype": "FP32",
"shape": [1, 3],
"data": [1.0, 2.0, 3.0]
}]
}'
MJPEG Trigger Configuration
Trigger of type mjpeg enables processing of frames from Motion JPEG (MJPEG) video streams in real-time. MJPEG is a video compression format where each frame is individually compressed as a JPEG image and is commonly used in IP cameras, webcams, and video surveillance systems.
The MJPEG trigger connects to an MJPEG stream URL, extracts individual frames, and passes them to your function handler for processing. This enables use cases such as:
- Video Analytics: Real-time object detection, face recognition, motion detection
- Surveillance: Monitor camera feeds for security events
- Quality Control: Inspect manufacturing processes via camera feeds
- Traffic Monitoring: Analyze traffic patterns from road cameras
- Retail Analytics: Count customers, analyze behavior patterns
Configuration
The trigger configuration is defined as follows:
triggers:
mjpeg_stream:
kind: mjpeg
attributes:
url: "http://camera.example.com:8080/stream.mjpg" # MJPEG stream URL (required)
processing_factor: 1 # Frame sampling (default: 1)
sink: # Optional sink configuration
kind: rtsp # Sink type (rtsp, websocket, webhook, mjpeg)
attributes:
port: 8554
path: "/stream"
maxWorkers: 4
Configuration Parameters:
url- URL of the MJPEG stream to connect to (required)- Example:
http://192.168.1.100:8080/video.mjpg
- Example:
processing_factor- Controls frame sampling to reduce processing load (default: 1)- Value
1: process every frame - Value
2: process every 2nd frame (50% frame drop) - Value
5: process every 5th frame (80% frame drop) - Must be >= 1
- Value
sink- Optional sink configuration for output streaming (see Sink documentation)
Event Structure
When a frame is processed, the handler receives an event with:
- Body: Raw JPEG image data (bytes)
- Content-Type:
image/jpeg - Fields:
frame_num: Sequential frame number (int64)url: The source MJPEG stream URLtimestamp: Frame capture timestamp
Handler Function
Example Python handler for processing MJPEG frames:
def handler(context, event):
# Get the JPEG frame data
frame_data = event.body
# Get frame metadata
frame_num = event.get_field("frame_num")
url = event.get_field("url")
context.logger.info(f"Processing frame {frame_num} from {url}")
context.logger.info(f"Frame size: {len(frame_data)} bytes")
# Process the frame (e.g., save, analyze with CV library)
# ...
return context.Response(
body=frame_data, # Return processed frame
content_type="image/jpeg",
status_code=200
)
Example handler with image processing using OpenCV:
import cv2
import numpy as np
def handler(context, event):
# Decode JPEG frame
nparr = np.frombuffer(event.body, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
# Perform image processing (e.g., face detection)
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
faces = face_cascade.detectMultiScale(gray, 1.1, 4)
# Draw rectangles around faces
for (x, y, w, h) in faces:
cv2.rectangle(img, (x, y), (x+w, y+h), (255, 0, 0), 2)
frame_num = event.get_field("frame_num")
context.logger.info(f"Detected {len(faces)} faces in frame {frame_num}")
# Encode back to JPEG
_, encoded = cv2.imencode('.jpg', img)
return context.Response(
body=encoded.tobytes(),
content_type="image/jpeg",
status_code=200
)
Behavior
Stream Connection:
- The trigger automatically connects to the MJPEG stream on start
- If the connection is lost, it automatically retries every 5 seconds
- The trigger continues running until explicitly stopped
Frame Processing:
- Frames are extracted from the multipart MIME stream
- Each frame is wrapped as a Nuclio event
- The event is submitted to an available worker for processing
- If
processing_factor > 1, frames are dropped to reduce load
Error Handling:
- Connection errors trigger automatic reconnection
- Frame parsing errors are logged but don't stop the stream
- Worker allocation failures are logged (event is dropped)
Sink Integration
The MJPEG trigger supports optional sink configuration for outputting processed frames. Available sinks include:
- RTSP: Stream to RTSP clients (native Go implementation using gortsplib)
- WebSocket: Stream to WebSocket clients
- Webhook: Send frames to HTTP endpoints
- MJPEG: Re-stream as MJPEG
Example with RTSP sink:
triggers:
mjpeg_camera:
kind: mjpeg
attributes:
url: "http://camera.local/stream.mjpg"
processing_factor: 1
sink:
kind: rtsp
attributes:
port: 8554
path: "/processed"
type: "video"
maxWorkers: 4
After starting the processor, clients can connect to the RTSP stream:
ffplay rtsp://localhost:8554/processed
# or
vlc rtsp://localhost:8554/processed
Testing
To test the MJPEG trigger functionality, use the test examples in the test/ directory:
test/mjpeg/- Basic MJPEG stream processingtest/mjpeg-rtsp/- MJPEG to RTSP streamingtest/mjpeg-webhook/- MJPEG with webhook sinktest/mjpeg-websocket/- MJPEG with WebSocket sink
Example test run:
# Start the MJPEG processor
./test/mjpeg-rtsp/run.sh
# In another terminal, connect to the output stream
ffplay rtsp://localhost:8554/processed
Performance Considerations
- Frame Rate: Use
processing_factorto control CPU/memory usage - Worker Pool: Configure adequate workers to handle frame processing rate
- Network: Ensure stable network connection to the MJPEG source
- Processing Time: If processing takes longer than frame interval, increase
processing_factor
Development
See CONTRIBUTING for contribution instructions.
Build container images
To build the container image, you need to:
Clone the repository and navigate to the digitalhub-serverless directory. The build process consists of three main steps:
- Build the processor image (modify the
Makefilefile to change the SERVERLESS_DOCKER_REPO and SERVERLESS_CACHE_REPO variable to your Docker repository, e.g.,docker.io/yourusername)
make processor
- Build the base image (chooses the Python 3 version from 9, 10, 11 or 12)
docker build -t python-base-3-<ver> -f ./Dockerfile/Dockerfile-base-3-<ver> .
- Build the onbuild image (Modify the
Dockerfile/Dockerfile-onbuild-3-<ver>file to change the SERVERLESS_DOCKER_REP variable to your Docker repository, e.g.,docker.io/yourusername)
docker build -t python-onbuild-3-<ver> -f ./Dockerfile/Dockerfile-onbuild-3-<ver> .
- Build the runtime image (Modify the
Dockerfile/Dockerfile-handler-3-<ver>file to change the NUCLIO_BASE_IMAGE and NUCLIO_ONBUILD_IMAGE variables that point to the base and onbuild image you just built, e.g.,python-onbuild-3-<ver>)
docker build -t python-runtime-3-<ver> -f ./Dockerfile/Dockerfile-handler-3-<ver> --build-arg GIT_TAG=<some-tag> .
Launch container
To run the container, use the following command:
docker run -e PROJECT_NAME=<project-name> -e RUN_ID=<run-id> python-runtime-3-<ver>
Required environment variables:
PROJECT: The name of the projectRUN_ID: The ID of the run to execute
Security Policy
The current release is the supported version. Security fixes are released together with all other fixes in each new release.
If you discover a security vulnerability in this project, please do not open a public issue.
Instead, report it privately by emailing us at digitalhub@fbk.eu. Include as much detail as possible to help us understand and address the issue quickly and responsibly.
Contributing
To report a bug or request a feature, please first check the existing issues to avoid duplicates. If none exist, open a new issue with a clear title and a detailed description, including any steps to reproduce if it's a bug.
To contribute code, start by forking the repository. Clone your fork locally and create a new branch for your changes. Make sure your commits follow the Conventional Commits v1.0 specification to keep history readable and consistent.
Once your changes are ready, push your branch to your fork and open a pull request against the main branch. Be sure to include a summary of what you changed and why. If your pull request addresses an issue, mention it in the description (e.g., “Closes #123”).
Please note that new contributors may be asked to sign a Contributor License Agreement (CLA) before their pull requests can be merged. This helps us ensure compliance with open source licensing standards.
We appreciate contributions and help in improving the project!
Authors
This project is developed and maintained by DSLab – Fondazione Bruno Kessler, with contributions from the open source community. A complete list of contributors is available in the project’s commit history and pull requests.
For questions or inquiries, please contact: digitalhub@fbk.eu
Copyright and license
Copyright © 2025 DSLab – Fondazione Bruno Kessler and individual contributors.
This project is licensed under the Apache License, Version 2.0. You may not use this file except in compliance with the License. Ownership of contributions remains with the original authors and is governed by the terms of the Apache 2.0 License, including the requirement to grant a license to the project.
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
processor
command
|
|
|
pkg
|
|
|
processor/trigger/extproc
SPDX-FileCopyrightText: © 2025 DSLab - Fondazione Bruno Kessler
|
SPDX-FileCopyrightText: © 2025 DSLab - Fondazione Bruno Kessler |
|
test
|
|
|
websocket_discrete
command
|
|
|
websocket_stream
command
|