jsonfilter

package
v1.0.0-alpha.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2026 License: MIT Imports: 17 Imported by: 0

README

JSON Filter Processor

Protocol-layer message filtering based on field values and comparison rules.

Purpose

The JSON filter processor enables field-based filtering of GenericJSON messages using flexible comparison operators. It evaluates filter rules against message data and publishes matching messages to output subjects, providing pre-semantic data routing and volume reduction before expensive domain processing.

Configuration

type: json_filter
config:
  ports:
    inputs:
      - name: nats_input
        type: nats
        subject: sensor.raw
        interface: core .json.v1
        required: true
    outputs:
      - name: nats_output
        type: nats
        subject: sensor.filtered
        interface: core .json.v1
        required: true
  rules:
    - field: temperature
      operator: gte
      value: 30
    - field: status
      operator: eq
      value: active
Configuration Fields

ports (required): Port configuration for input and output subjects

  • inputs: Array of input port definitions (must use core .json.v1 interface)
  • outputs: Array of output port definitions (supports multiple outputs)

rules (required): Array of filter rules (evaluated as logical AND)

  • field: Field name to check (string, required)
  • operator: Comparison operator (enum, required)
  • value: Comparison value (any type, required)
Supported Operators
Operator Description Example
eq Equals {field: "status", operator: "eq", value: "active"}
ne Not equals {field: "status", operator: "ne", value: "inactive"}
gt Greater than {field: "temperature", operator: "gt", value: 100}
gte Greater than or equal {field: "altitude", operator: "gte", value: 1000}
lt Less than {field: "pressure", operator: "lt", value: 500}
lte Less than or equal {field: "humidity", operator: "lte", value: 80}
contains Substring match {field: "message", operator: "contains", value: "error"}

Input/Output Ports

Input Ports

Type: NATS or JetStream subscriptions

Interface: core .json.v1 (GenericJSON messages only)

Behavior: Subscribes to configured subjects and evaluates filter rules against each message

Output Ports

Type: NATS or JetStream publishers

Interface: core .json.v1 (passes through GenericJSON messages)

Behavior: Publishes messages that match all filter rules to configured output subjects

Message Flow
flowchart LR
    A[Input Subject] --> B[JSON Filter]
    B -->|Match| C[Output Subject]
    B -->|No Match| D[Dropped]

Rule Evaluation: All rules must match (AND logic) for a message to pass

Non-matching Messages: Dropped and logged at Debug level

Example Use Cases

High-Temperature Sensor Filtering

Filter sensor data to only process readings above critical thresholds:

rules:
  - field: temperature
    operator: gte
    value: 30
  - field: sensor_type
    operator: eq
    value: thermocouple

Input Message:

{
  "type": {"domain": "core", "category": "json", "version": "v1"},
  "payload": {
    "data": {
      "sensor_id": "temp-001",
      "temperature": 32.5,
      "sensor_type": "thermocouple",
      "location": "warehouse-a"
    }
  }
}

Result: Message passes (temperature >= 30 AND sensor_type == "thermocouple")

Status-Based Message Routing

Route messages based on operational status:

rules:
  - field: status
    operator: eq
    value: critical
  - field: priority
    operator: gte
    value: 5

Use Case: Separate critical high-priority messages for immediate processing

Error Log Filtering

Filter log messages containing specific error patterns:

rules:
  - field: level
    operator: eq
    value: error
  - field: message
    operator: contains
    value: connection

Use Case: Extract connection-related errors from application logs

Multi-Condition Data Validation

Ensure data meets multiple validity criteria:

rules:
  - field: altitude
    operator: gte
    value: 0
  - field: altitude
    operator: lte
    value: 10000
  - field: status
    operator: ne
    value: offline

Use Case: Filter out invalid or offline sensor readings before graph ingestion

Performance Characteristics

Throughput: 10,000+ messages/second per processor instance

Evaluation Complexity: O(n) where n is the number of rules

Field Lookup: O(1) map lookup for direct field access

Type Conversion: Minimal overhead for primitive types

Observability

Metrics

The processor exposes Prometheus metrics:

  • json_filter_messages_processed_total: Total messages received
  • json_filter_messages_passed_total: Messages matching all rules
  • json_filter_messages_filtered_total: Messages dropped
  • json_filter_evaluation_duration_seconds: Rule evaluation latency
  • json_filter_errors_total: Parsing and evaluation errors
Health Status

Health checks report:

  • Healthy: Processor is running and processing messages
  • ErrorCount: Total errors encountered
  • Uptime: Time since processor started
Data Flow Metrics

Flow metrics include:

  • ErrorRate: Ratio of errors to processed messages
  • LastActivity: Timestamp of most recent message processing

Limitations

Current Version Constraints:

  • No support for nested field access (e.g., position.lat)
  • No logical OR between rules (all rules are AND)
  • No regular expression matching (only exact/substring matching)
  • No custom comparison functions
  • Field lookup is limited to top-level fields in the data object

Documentation

Overview

Package jsonfilter provides a processor for filtering GenericJSON messages based on field values and comparison rules.

Overview

The JSON filter processor enables field-based filtering of GenericJSON payloads using flexible comparison operators. It subscribes to NATS subjects carrying GenericJSON messages (core .json.v1 interface), evaluates filter rules against the message data, and publishes matching messages to output subjects.

Design Context: Protocol-Layer Processor

This processor is a **protocol-layer utility** - it handles data routing without making semantic decisions. It does NOT:

  • Determine entity identities (no EntityID generation)
  • Create semantic triples (no Graphable implementation)
  • Interpret domain meaning (filtering is field-value comparison only)

Use this for pre-semantic filtering: drop invalid data, route by type, or reduce volume before expensive domain processing. Semantic filtering (e.g., "find all drones in fleet Alpha") belongs in domain processors or graph queries.

See docs/PROCESSOR-DESIGN-PHILOSOPHY.md for the full rationale.

**Pipeline Position:**

GenericJSON → [json_filter] → Filtered GenericJSON → [Domain Processor] → Graph
               ^^^^^^^^^^^^                          ^^^^^^^^^^^^^^^^
               Protocol layer                        Semantic layer
               (this package)                        (your code)

Supported Operators

The processor supports six comparison operators:

  • eq (equals): String or numeric equality
  • ne (not equals): String or numeric inequality
  • gt (greater than): Numeric comparison (field > value)
  • gte (greater than or equal): Numeric comparison (field >= value)
  • lt (less than): Numeric comparison (field < value)
  • lte (less than or equal): Numeric comparison (field <= value)
  • contains: Substring matching (case-sensitive)

Quick Start

Filter messages where altitude exceeds 1000 meters:

config := jsonfilter.JSONFilterConfig{
    Ports: &component.PortConfig{
        Inputs: []component.PortDefinition{
            {Name: "input", Type: "nats", Subject: "sensor.data", Interface: "core .json.v1"},
        },
        Outputs: []component.PortDefinition{
            {Name: "output", Type: "nats", Subject: "sensor.high_altitude", Interface: "core .json.v1"},
        },
    },
    Rules: []jsonfilter.FilterRule{
        {Field: "altitude", Operator: "gt", Value: 1000},
    },
}

rawConfig, _ := json.Marshal(config)
processor, err := jsonfilter.NewJSONFilterProcessor(rawConfig, deps)

Filter Rules

Rules are evaluated as logical AND - all rules must match for a message to pass.

String equality example:

{Field: "status", Operator: "eq", Value: "active"}

Numeric comparison example:

{Field: "temperature", Operator: "gte", Value: 20.5}

Substring matching example:

{Field: "message", Operator: "contains", Value: "error"}

Multiple Rules

Configure multiple rules for complex filtering:

Rules: []jsonfilter.FilterRule{
    {Field: "status", Operator: "eq", Value: "active"},
    {Field: "priority", Operator: "gte", Value: 5},
    {Field: "region", Operator: "contains", Value: "north"},
}

All three rules must match for the message to be published.

Message Flow

Input Subject → GenericJSON → Filter Rules → Matching Messages → Output Subject
                  ↓
           Non-matching messages dropped (logged at Debug level)

GenericJSON Interface

Input messages must conform to the core .json.v1 interface:

type GenericJSONPayload struct {
    Data map[string]any `json:"data"`
}

Example input message:

{
  "data": {
    "sensor_id": "temp-001",
    "value": 23.5,
    "unit": "celsius",
    "location": "warehouse-a"
  }
}

Filter rule: {Field: "value", Operator: "gt", Value: 20} Result: Message passes (23.5 > 20)

Type Handling

Field values are converted to appropriate types for comparison:

  • Numeric operators (gt, gte, lt, lte): Converts to float64
  • String operators (eq, ne, contains): Converts to string

If type conversion fails, the rule does not match (message is dropped).

Configuration Schema

The processor uses component.PortConfig for flexible input/output configuration:

{
  "ports": {
    "inputs": [
      {"name": "input", "type": "nats", "subject": "raw.>", "interface": "core .json.v1"}
    ],
    "outputs": [
      {"name": "output", "type": "nats", "subject": "filtered.messages", "interface": "core .json.v1"}
    ]
  },
  "rules": [
    {"field": "value", "operator": "gt", "value": 100}
  ]
}

Error Handling

The processor uses semstreams/errors for consistent error classification:

  • Invalid config: errs.WrapInvalid (bad configuration, missing fields)
  • NATS errors: errs.WrapTransient (network issues, retryable)
  • Unmarshal errors: errs.WrapInvalid (malformed JSON payloads)

Messages that fail parsing or filtering are logged at Debug level and dropped.

Performance Considerations

  • Filter evaluation is O(n) where n is number of rules
  • Field extraction is O(1) map lookup
  • Type conversion overhead is minimal for primitive types
  • Concurrent message processing via goroutines

Typical throughput: 10,000+ messages/second per processor instance.

Observability

The processor implements component.Discoverable for monitoring:

meta := processor.Meta()
// Name: json-filter-processor
// Type: processor
// Description: Filters GenericJSON messages (core .json.v1) based on field rules

dataFlow := processor.DataFlow()
// MessagesProcessed: Total messages received
// MessagesFiltered: Messages that passed all rules
// ErrorsTotal: Parse errors + filter evaluation errors

Integration Example

Complete flow with sensor data filtering:

// Sensor publishes to "sensors.raw"
sensor → NATS("sensors.raw") → JSONFilterProcessor → NATS("sensors.high_temp")

// Configuration
config := jsonfilter.JSONFilterConfig{
    Ports: &component.PortConfig{
        Inputs: []component.PortDefinition{
            {Name: "input", Type: "nats", Subject: "sensors.raw", Interface: "core .json.v1"},
        },
        Outputs: []component.PortDefinition{
            {Name: "output", Type: "nats", Subject: "sensors.high_temp", Interface: "core .json.v1"},
        },
    },
    Rules: []jsonfilter.FilterRule{
        {Field: "temperature", Operator: "gte", Value: 30},
        {Field: "sensor_type", Operator: "eq", Value: "thermocouple"},
    },
}

Only thermocouples reading >= 30°C will be forwarded to "sensors.high_temp".

Limitations

Current version limitations:

  • No support for nested field access (e.g., "position.lat")
  • No logical OR between rules (all rules are AND)
  • No regular expression matching (only exact/substring matching)
  • No custom comparison functions

These may be addressed in future versions based on user requirements.

Testing

The package includes comprehensive test coverage:

  • Unit tests: Rule matching logic, operator behavior
  • Integration tests: End-to-end NATS message flows with testcontainers

Run tests:

go test ./processor/json_filter -v                        # Unit tests
go test -tags=integration ./processor/json_filter -v      # Integration tests

Package jsonfilter provides a core processor for filtering GenericJSON messages

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewProcessor

func NewProcessor(
	rawConfig json.RawMessage, deps component.Dependencies,
) (component.Discoverable, error)

NewProcessor creates a new JSON filter processor from configuration

func Register

func Register(registry *component.Registry) error

Register registers the JSON filter processor component with the given registry

Types

type Config

type Config struct {
	Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
	Rules []FilterRule          `json:"rules" schema:"type:array,description:Filter rules,category:basic"`
}

Config holds configuration for JSON filter processor

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns the default configuration for JSON filter processor

type FilterRule

type FilterRule struct {
	Field    string `json:"field"    schema:"type:string,description:Field path to check,required:true"`
	Operator string `json:"operator" schema:"type:enum,enum:eq|ne|gt|gte|lt|lte|contains,required:true"`
	Value    any    `json:"value"    schema:"type:string,description:Comparison value,required:true"`
}

FilterRule defines a single filter condition

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor implements a GenericJSON message filter

func (*Processor) ConfigSchema

func (f *Processor) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema for this processor.

func (*Processor) DataFlow

func (f *Processor) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics for this processor.

func (*Processor) Health

func (f *Processor) Health() component.HealthStatus

Health returns the current health status of this processor.

func (*Processor) Initialize

func (f *Processor) Initialize() error

Initialize prepares the processor (no-op for JSON filter)

func (*Processor) InputPorts

func (f *Processor) InputPorts() []component.Port

InputPorts returns the NATS input ports this processor subscribes to.

func (*Processor) Meta

func (f *Processor) Meta() component.Metadata

Meta returns metadata describing this processor component.

func (*Processor) OutputPorts

func (f *Processor) OutputPorts() []component.Port

OutputPorts returns the NATS output port for filtered messages.

func (*Processor) Start

func (f *Processor) Start(ctx context.Context) error

Start begins filtering messages

func (*Processor) Stop

func (f *Processor) Stop(timeout time.Duration) error

Stop gracefully stops the processor

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL