udp

package
v1.0.0-alpha.20 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2026 License: MIT Imports: 18 Imported by: 0

README

UDP Input Component

High-performance UDP datagram receiver that publishes to NATS JetStream or core NATS subjects.

Purpose

The UDP input component enables receiving datagram messages over UDP with built-in buffer overflow handling, retry logic, and NATS integration for message distribution. It provides MAVLink reception capabilities and implements the SemStreams component interfaces for lifecycle management and observability. The component is designed for high-throughput scenarios with concurrent message processing and graceful degradation under load.

Configuration

Basic Configuration
ports:
  inputs:
    - name: "udp_socket"
      type: "network"
      subject: "udp://0.0.0.0:14550"
      required: true
      description: "UDP socket listening for incoming data"
  outputs:
    - name: "nats_output"
      type: "nats"
      subject: "input.udp.mavlink"
      required: true
      description: "NATS subject for publishing received UDP data"
JetStream Output Configuration
ports:
  inputs:
    - name: "udp_socket"
      type: "network"
      subject: "udp://0.0.0.0:5000"
      required: true
  outputs:
    - name: "stream_output"
      type: "jetstream"
      subject: "sensors.telemetry"
      required: true
Configuration Parameters
Parameter Type Default Description
ports.inputs[].subject string udp://0.0.0.0:14550 UDP bind address in format udp://host:port
ports.outputs[].subject string input.udp.mavlink NATS subject for publishing received data
ports.outputs[].type string nats Output type: nats or jetstream
Internal Defaults

The component uses the following non-configurable defaults:

  • Buffer Size: 5000 messages (circular buffer)
  • Max Datagram Size: 65536 bytes (full UDP packet size)
  • Overflow Policy: Drop oldest messages when buffer is full
  • Socket Buffer: 2MB OS-level receive buffer
  • Batch Size: 100 messages per processing cycle
  • Retry Policy: 3 attempts with exponential backoff

Input/Output Ports

Input Ports

udp_socket (network)

  • Direction: Input
  • Protocol: UDP
  • Configuration: udp://host:port format
  • Description: UDP socket listening for incoming datagrams
  • Behavior: Binds to specified address and port, receives datagrams up to 65536 bytes
Output Ports

nats_output (nats or jetstream)

  • Direction: Output
  • Configuration: NATS subject string
  • Description: Publishes received UDP data to NATS
  • Behavior:
    • Core NATS (type: nats): Fire-and-forget publish
    • JetStream (type: jetstream): Persistent stream publish with acknowledgment

Performance Considerations

Throughput Characteristics
  • Packet Rate: 10,000+ datagrams/second (8KB datagrams)
  • Latency: Sub-millisecond for buffered messages
  • CPU Overhead: Minimal (single goroutine for reception, batched processing)
  • Memory Usage: O(BufferSize) + per-datagram allocations
  • Concurrency: Single receiver goroutine, batch processing for NATS publishing
Buffer Management

The component uses a circular buffer with overflow protection:

// Buffer Configuration (internal)
Capacity: 5000 messages
Policy: Drop oldest when full
Batch Processing: 100 messages per cycle

Buffer Overflow Behavior:

  1. Overflow counter increments
  2. Oldest message dropped automatically
  3. Warning logged at first overflow
  4. Processing continues without disruption
  5. Metrics updated (packets_dropped_total)
Network Tuning

OS Socket Buffer: The component sets a 2MB receive buffer to prevent kernel-level drops during traffic bursts. If the system limits buffer size, a warning is logged but operation continues.

Read Deadline: 100ms read timeout allows periodic shutdown checks without blocking indefinitely.

Retry Logic

Transient errors (network issues, NATS connection problems) trigger automatic retry with exponential backoff:

  • Max Attempts: 3
  • Initial Delay: 100ms
  • Max Delay: 5s
  • Backoff Multiplier: 2.0
  • Jitter: Enabled

Permanent errors (configuration issues) fail immediately without retry.

Metrics

The component exports Prometheus metrics for monitoring:

  • semstreams_udp_packets_received_total: Total UDP packets received
  • semstreams_udp_bytes_received_total: Total bytes received
  • semstreams_udp_packets_dropped_total: Packets dropped due to buffer overflow
  • semstreams_udp_buffer_utilization_ratio: Buffer usage (0-1)
  • semstreams_udp_batch_size: Distribution of processing batch sizes
  • semstreams_udp_publish_duration_seconds: NATS publish latency
  • semstreams_udp_socket_errors_total: Socket read errors encountered
  • semstreams_udp_last_activity_timestamp: Unix timestamp of last received packet

Example Use Cases

IoT Sensor Data Collection

Receive sensor datagrams from hundreds of devices with burst handling:

ports:
  inputs:
    - name: "sensor_socket"
      type: "network"
      subject: "udp://0.0.0.0:5000"
      required: true
  outputs:
    - name: "sensor_stream"
      type: "jetstream"
      subject: "iot.sensors.telemetry"
      required: true

Characteristics:

  • Small packets (1KB typical)
  • High burst traffic (hundreds of sensors reporting simultaneously)
  • 5000-message buffer handles concurrent sensor reports
  • JetStream persistence prevents data loss
Network Monitoring (Syslog/NetFlow)

Receive syslog or NetFlow data with newest-first priority:

ports:
  inputs:
    - name: "syslog_socket"
      type: "network"
      subject: "udp://0.0.0.0:514"
      required: true
  outputs:
    - name: "log_stream"
      type: "jetstream"
      subject: "network.syslog"
      required: true

Characteristics:

  • Variable packet sizes (512-2048 bytes)
  • Continuous traffic with periodic spikes
  • Drop oldest policy keeps newest logs during overflow
  • JetStream enables log replay and analysis

Receive MAVLink messages from drones or robotics systems:

ports:
  inputs:
    - name: "mavlink_socket"
      type: "network"
      subject: "udp://0.0.0.0:14550"
      required: true
  outputs:
    - name: "telemetry_output"
      type: "nats"
      subject: "input.udp.mavlink"
      required: true

Characteristics:

  • MAVLink standard port (14550)
  • Small packets (280 bytes typical)
  • Real-time telemetry with low latency requirements
  • Core NATS for minimal overhead
Multicast Reception

Join multicast group for broadcast data reception:

ports:
  inputs:
    - name: "multicast_socket"
      type: "network"
      subject: "udp://239.0.0.1:9999"
      required: true
  outputs:
    - name: "broadcast_stream"
      type: "jetstream"
      subject: "multicast.data"
      required: true

Characteristics:

  • Multicast address (239.0.0.1)
  • Large packets (8KB+)
  • High bandwidth broadcast data
  • JetStream handles multiple subscribers efficiently

Thread Safety

The component is fully thread-safe with proper synchronization:

  • Start/Stop: Can be called from any goroutine (idempotent operations)
  • Metrics: Atomic operations for lock-free updates
  • Buffer Access: Internal mutex protection
  • Socket Operations: Protected by RWMutex during shutdown

Lifecycle Management

Initialization
input.Initialize() // Validates configuration, does not start receiving
Starting
input.Start(ctx) // Begins receiving datagrams and publishing to NATS

Startup Sequence:

  1. Bind UDP socket with retry
  2. Create NATS KV bucket for lifecycle reporting
  3. Start read loop goroutine
  4. Report "idle" stage to lifecycle tracker
Stopping
input.Stop(5 * time.Second) // Graceful shutdown with timeout

Shutdown Sequence:

  1. Signal shutdown to goroutines
  2. Close UDP socket (unblocks read loop)
  3. Wait for goroutine completion (with timeout)
  4. Process remaining buffered messages
  5. Clean up resources

Error Handling

The component uses SemStreams error classification for consistent behavior:

  • Invalid Configuration: errs.WrapInvalid (port out of range, empty subject, nil client)
  • Network Errors: errs.WrapTransient (socket binding failures, read errors)
  • NATS Errors: errs.WrapTransient (connection issues, publish failures)

Error Behavior:

  • Configuration errors fail component initialization
  • Transient errors trigger retry with exponential backoff
  • Socket errors increment error counter but continue processing
  • Individual message publish failures don't stop the component

Testing

Run the component test suites:

# Unit tests (fast, no external dependencies)
go test ./input/udp -v

# Integration tests (requires Docker for NATS testcontainer)
go test -tags=integration ./input/udp -v

# Race detector (concurrency validation)
go test ./input/udp -race

# Coverage report
go test ./input/udp -cover

Limitations

Current version constraints:

  • IPv4 Only: IPv6 support planned for future release
  • Single Socket: One UDP socket per component instance
  • No Deduplication: Application-level deduplication must be implemented separately
  • Unordered Delivery: UDP provides no ordering guarantees (consider sequence numbering in payload)
  • No Fragmentation Handling: Assumes datagrams fit in 65536 bytes (UDP maximum)

Health Monitoring

Query component health status:

health := input.Health()
// health.Healthy: true if running and socket connected
// health.ErrorCount: Total errors encountered since start
// health.Uptime: Duration since Start() was called

Query data flow metrics:

flow := input.DataFlow()
// flow.MessagesPerSecond: Current throughput rate
// flow.BytesPerSecond: Current byte rate
// flow.ErrorRate: Error percentage (errors / messages)
// flow.LastActivity: Timestamp of last received packet
  • processor/graph: Process received data into knowledge graph triples
  • processor/jsonmap: Transform JSON payloads from UDP data
  • output/websocket: Forward processed data to WebSocket clients
  • gateway/http: Query knowledge graph built from UDP-ingested data

Documentation

Overview

Package udp provides a UDP input component for receiving data over UDP sockets.

Overview

The UDP input component enables receiving datagram messages over UDP, with built-in buffer overflow handling, retry logic, and NATS integration for message distribution. It implements the StreamKit component interfaces for lifecycle management and observability.

Quick Start

Create a UDP input listening on port 5000:

config := udp.InputConfig{
    Ports: &component.PortConfig{
        Outputs: []component.PortDefinition{
            {Name: "output", Type: "nats", Subject: "udp.messages", Required: true},
        },
    },
    Address:            "0.0.0.0:5000",
    MaxDatagramSize:    8192,
    BufferSize:         1000,
    BufferOverflowMode: "drop_oldest",
}

rawConfig, _ := json.Marshal(config)
input, err := udp.NewInput(rawConfig, deps)

Configuration

The UDPInputConfig struct controls all aspects of UDP reception:

  • Address: IP:Port to bind to (e.g., "0.0.0.0:5000", ":5000")
  • MaxDatagramSize: Maximum UDP datagram size in bytes (default: 8192)
  • BufferSize: Internal buffer size for handling bursts (default: 1000)
  • BufferOverflowMode: How to handle buffer overflow ("drop_oldest" or "drop_newest")
  • RetryPolicy: Retry configuration for transient errors

Buffer Overflow Handling

The component includes sophisticated buffer management to handle traffic bursts:

// Drop oldest messages when buffer is full
BufferOverflowMode: "drop_oldest"

// Or drop newest messages
BufferOverflowMode: "drop_newest"

When buffer overflow occurs:

  1. Overflow counter increments
  2. Message logged at Warn level
  3. Configured drop policy applied
  4. Processing continues

Retry Logic

Transient errors (network issues, NATS temporary failures) trigger automatic retry:

RetryPolicy: &retry.Config{
    MaxAttempts:  3,
    InitialDelay: 100 * time.Millisecond,
    MaxDelay:     5 * time.Second,
    Multiplier:   2.0,
    AddJitter:    true,
}

Message Flow

UDP Socket → Buffer → Processing Goroutine → NATS Subject
                ↓
        Overflow Handling (if buffer full)

Lifecycle Management

The component implements proper lifecycle with graceful shutdown:

// Start receiving
input.Start(ctx)

// Graceful shutdown with timeout
input.Stop(5 * time.Second)

During shutdown:

  1. Stop accepting new datagrams
  2. Drain buffer (process remaining messages)
  3. Close UDP socket
  4. Wait for goroutines to complete (with timeout)

Observability

The component implements component.Discoverable for monitoring:

meta := input.Meta()
// Name: udp-input
// Type: input
// Description: UDP datagram input

health := input.Health()
// Healthy: true/false
// ErrorCount: Total errors encountered
// Uptime: Time since Start()

dataFlow := input.DataFlow()
// MessagesPerSecond: Current throughput
// BytesPerSecond: Current byte rate
// ErrorRate: Error percentage

Performance Characteristics

  • Throughput: 10,000+ datagrams/second (8KB datagrams)
  • Memory: O(BufferSize) + per-datagram allocations
  • Latency: Sub-millisecond for buffered messages
  • CPU: Minimal (single goroutine for reception)

Error Handling

The component uses semstreams/errors for consistent error classification:

  • Invalid config: errs.WrapInvalid (bad configuration)
  • Network errors: errs.WrapTransient (retryable)
  • NATS errors: errs.WrapTransient (connection issues)

Errors are logged and counted but don't stop the component unless fatal.

Common Use Cases

**IoT Sensor Data:**

// Receive sensor datagrams on port 5000
Address: "0.0.0.0:5000"
MaxDatagramSize: 1024
BufferSize: 5000  // Handle bursts from 100s of sensors

**Network Monitoring:**

// Receive syslog or netflow data
Address: "0.0.0.0:514"
MaxDatagramSize: 2048
BufferOverflowMode: "drop_oldest"  // Keep newest logs

**Multicast Reception:**

// Join multicast group
Address: "239.0.0.1:9999"
MaxDatagramSize: 8192

Thread Safety

The component is fully thread-safe:

  • Start/Stop can be called from any goroutine
  • Metrics updates use atomic operations
  • Buffer access protected by sync.Mutex

Testing

The package includes comprehensive test coverage:

  • Unit tests: Config validation, buffer overflow, error handling
  • Integration tests: Real UDP sockets with testcontainers NATS
  • Race tests: Concurrent Start/Stop, buffer access
  • Leak tests: Goroutine cleanup verification
  • Panic tests: Error recovery

Run tests:

go test ./input/udp -v                        # Unit tests
go test -tags=integration ./input/udp -v      # Integration tests
go test ./input/udp -race                     # Race detector

Limitations

Current version limitations:

  • IPv4 only (IPv6 support planned)
  • Single UDP socket per component instance
  • No built-in message deduplication
  • No message ordering guarantees (UDP is unordered)

Example: Complete Configuration

{
  "ports": {
    "outputs": [
      {"name": "output", "type": "nats", "subject": "sensors.udp", "required": true}
    ]
  },
  "address": "0.0.0.0:5000",
  "max_datagram_size": 8192,
  "buffer_size": 1000,
  "buffer_overflow_mode": "drop_oldest",
  "retry_policy": {
    "max_attempts": 3,
    "initial_delay": "100ms",
    "max_delay": "5s",
    "multiplier": 2.0,
    "add_jitter": true
  }
}

Package udp provides UDP input component for receiving data from external sources

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateInput

func CreateInput(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

CreateInput creates a UDP input component following service pattern

func Register

func Register(registry *component.Registry) error

Register registers the UDP input component with the given registry

Types

type Input

type Input struct {
	// contains filtered or unexported fields
}

Input implements a UDP listener that publishes received data to NATS This is specifically designed for receiving MAVLink messages on port 14550

func NewInput

func NewInput(deps InputDeps) (*Input, error)

NewInput creates a new UDP input component using idiomatic Go constructor pattern. Returns an error if buffer creation fails.

func (*Input) ConfigSchema

func (u *Input) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema for this component References the package-level udpSchema variable for efficient retrieval

func (*Input) DataFlow

func (u *Input) DataFlow() component.FlowMetrics

DataFlow returns the current data flow metrics

func (*Input) Health

func (u *Input) Health() component.HealthStatus

Health returns the current health status of the component

func (*Input) Initialize

func (u *Input) Initialize() error

Initialize prepares the UDP input component but does not start listening

func (*Input) InputPorts

func (u *Input) InputPorts() []component.Port

InputPorts returns the input ports for this component

func (*Input) Meta

func (u *Input) Meta() component.Metadata

Meta returns the component metadata

func (*Input) OutputPorts

func (u *Input) OutputPorts() []component.Port

OutputPorts returns the output ports for this component

func (*Input) Start

func (u *Input) Start(ctx context.Context) error

Start begins listening for UDP packets and publishing to NATS

func (*Input) Stop

func (u *Input) Stop(timeout time.Duration) error

Stop gracefully stops the UDP listener with the specified timeout

func (*Input) StopWithTimeout

func (u *Input) StopWithTimeout(timeout time.Duration) error

StopWithTimeout gracefully stops the UDP listener with the specified timeout

type InputConfig

type InputConfig struct {
	// Port configuration for inputs and outputs
	Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
}

InputConfig holds configuration for UDP input component

func DefaultConfig

func DefaultConfig() InputConfig

DefaultConfig returns sensible defaults for UDP input

func (*InputConfig) Validate

func (c *InputConfig) Validate() error

Validate implements component.Validatable interface for secure config validation

type InputDeps

type InputDeps struct {
	Name            string                  // Instance name
	Config          InputConfig             // Business logic configuration
	NATSClient      *natsclient.Client      // Runtime dependency
	MetricsRegistry *metric.MetricsRegistry // Runtime dependency
	Logger          *slog.Logger            // Runtime dependency
}

InputDeps holds runtime dependencies for UDP input component

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics holds Prometheus metrics for UDP input component

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL