jsongeneric

package
v1.0.0-alpha.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2026 License: MIT Imports: 15 Imported by: 0

README

JSON Generic Processor

A protocol-layer processor that wraps plain JSON into GenericJSON (core .json.v1) format for integration with SemStreams pipelines.

Purpose

The JSON generic processor acts as an ingestion adapter, converting external JSON data into the SemStreams GenericJSON message format. It subscribes to NATS subjects carrying plain JSON, wraps the data in GenericJSONPayload structure, and publishes to output subjects for downstream processing. This processor is a protocol-layer utility that handles data plumbing without making semantic decisions about entity identity, graph triples, or domain meaning.

Configuration

The processor requires port configuration specifying input subjects (plain JSON) and output subjects (GenericJSON format).

YAML Example
components:
  - name: json-wrapper
    type: json_generic
    ports:
      inputs:
        - name: nats_input
          type: nats
          subject: external.sensors.>
          required: true
          description: NATS subjects with plain JSON data
      outputs:
        - name: nats_output
          type: nats
          subject: internal.sensors
          interface: core .json.v1
          required: true
          description: NATS subject for GenericJSON wrapped messages
JetStream Support

The processor supports both core NATS and JetStream for inputs and outputs:

components:
  - name: json-wrapper-jetstream
    type: json_generic
    ports:
      inputs:
        - name: jetstream_input
          type: jetstream
          subject: raw.weather.*
          stream_name: RAW
          required: true
      outputs:
        - name: jetstream_output
          type: jetstream
          subject: generic.weather
          interface: core .json.v1
          required: true
Default Configuration

If no configuration is provided, the processor uses these defaults:

  • Input: raw.> (all subjects under raw namespace)
  • Output: generic.messages (wrapped GenericJSON messages)

Input/Output Ports

Input Ports
  • Type: nats or jetstream
  • Format: Plain JSON objects
  • Required: At least one input port must be configured
  • Wildcards: Supports NATS wildcard subscriptions (>, *)

Example Input (Plain JSON):

{
  "sensor_id": "temp-001",
  "value": 23.5,
  "unit": "celsius",
  "timestamp": 1234567890
}
Output Ports
  • Type: nats or jetstream
  • Interface: core .json.v1 (GenericJSON format)
  • Format: BaseMessage wrapper containing GenericJSONPayload
  • Required: Typically one output port

Example Output (GenericJSON):

{
  "type": {
    "domain": "core",
    "category": "json",
    "version": "v1"
  },
  "payload": {
    "data": {
      "sensor_id": "temp-001",
      "value": 23.5,
      "unit": "celsius",
      "timestamp": 1234567890
    }
  },
  "source": "json-generic-processor"
}

Message Flow

flowchart LR
    A[External System] -->|Plain JSON| B[NATS: raw.>]
    B --> C[JSON Generic Processor]
    C -->|Wrap in GenericJSON| D[NATS: generic.messages]
    D --> E[Downstream Processors]

    style C fill:#e1f5ff
    style D fill:#fff5e1

Example Use Cases

External API Integration

Ingest third-party JSON APIs into SemStreams pipelines:

# Weather API publishes plain JSON to raw.weather
# JSON Generic wraps it for processing pipeline
components:
  - name: weather-wrapper
    type: json_generic
    ports:
      inputs:
        - name: api_input
          type: nats
          subject: raw.weather
      outputs:
        - name: wrapped_output
          type: nats
          subject: internal.weather
          interface: core .json.v1
Legacy System Migration

Wrap legacy JSON formats for modern pipeline compatibility:

# Legacy system emits plain JSON
# Wrap for processing by json_filter and json_map
components:
  - name: legacy-wrapper
    type: json_generic
    ports:
      inputs:
        - name: legacy_input
          type: nats
          subject: legacy.data
      outputs:
        - name: modern_output
          type: nats
          subject: modern.pipeline
          interface: core .json.v1
Data Normalization

Standardize multiple JSON sources into unified GenericJSON format:

# Multiple sources feed into single wrapper
# Output goes to unified processing pipeline
components:
  - name: multi-source-wrapper
    type: json_generic
    ports:
      inputs:
        - name: source_a
          type: nats
          subject: source.a.data
        - name: source_b
          type: nats
          subject: source.b.data
        - name: source_c
          type: nats
          subject: source.c.data
      outputs:
        - name: unified_output
          type: nats
          subject: unified.data
          interface: core .json.v1
Pipeline Entry Point

Convert raw JSON to pipeline-compatible format for filtering and transformation:

# Raw input → Wrap → Filter → Map → Domain processor
components:
  - name: entry-wrapper
    type: json_generic
    ports:
      inputs:
        - name: raw_input
          type: nats
          subject: raw.input
      outputs:
        - name: validated_output
          type: nats
          subject: validated.input
          interface: core .json.v1

  - name: filter-step
    type: json_filter
    ports:
      inputs:
        - name: filter_input
          type: nats
          subject: validated.input
          interface: core .json.v1
      outputs:
        - name: filtered_output
          type: nats
          subject: filtered.data
          interface: core .json.v1

Error Handling

The processor handles errors gracefully to maintain pipeline resilience:

Invalid JSON

Messages that fail JSON parsing are logged at Debug level and dropped:

Input: {this is not valid json}
Log: "Failed to parse message as JSON"
Action: Message dropped, error counter incremented
Impact: No output published, processing continues
NATS Publish Failures

Network issues during publish are logged as errors with full context:

Error: Failed to publish wrapped message
Logged Fields: component, output_subject, error
Action: Error counter incremented
Impact: Message lost (no retry), processing continues
Validation Failures

If wrapped payload fails validation (rare, indicates internal issue):

Error: Wrapped payload validation failed
Action: Error counter incremented
Impact: Message dropped, processing continues

Performance

Typical throughput: 15,000+ messages/second per processor instance

Complexity:

  • Wrapping: O(1) - Single map allocation per message
  • Validation: O(n) - Validates payload structure (minimal overhead)
  • Marshaling: O(n) - JSON serialization of wrapped payload

Resource Usage:

  • Memory: ~1KB per message in flight
  • CPU: Minimal, dominated by JSON marshaling
  • Network: Adds ~150 bytes overhead per message (BaseMessage wrapper)

Observability

The processor implements the Discoverable interface for comprehensive monitoring:

Metadata
meta := processor.Meta()
// Name: json-generic-processor
// Type: processor
// Description: Wraps plain JSON into GenericJSON (core .json.v1) format
// Version: 0.1.0
Metrics
dataFlow := processor.DataFlow()
// ErrorRate: JSON parse errors / Messages processed
// LastActivity: Timestamp of last message processed
Health Status
health := processor.Health()
// Healthy: true if processor is running
// ErrorCount: Total errors (parse + publish)
// Uptime: Time since processor started
Key Metrics
  • MessagesProcessed: Total messages received (valid + invalid JSON)
  • MessagesWrapped: Successfully wrapped and published messages
  • Errors: JSON parse errors + NATS publish errors

Quality Indicator:

Error Rate = Errors / MessagesProcessed

< 0.01 (1%): Good input quality
0.01-0.05: Investigate data sources
> 0.05: Significant data quality issues

Design Philosophy

The JSON generic processor follows SemStreams protocol-layer design principles:

What It Does NOT Do
  • No EntityID Generation: Does not determine entity identities
  • No Triple Creation: Does not create semantic graph relationships
  • No Domain Interpretation: Does not classify or interpret field meanings

These responsibilities belong to domain processors that understand your data semantics.

Pipeline Position
External JSON → [json_generic] → GenericJSON → [json_filter/map] → [Domain Processor] → Graph
                 ^^^^^^^^^^^^                                       ^^^^^^^^^^^^^^^^
                 Protocol layer                                     Semantic layer
                 (this package)                                     (your code)
When to Use

Use json_generic when:

  • Ingesting data from external systems that emit plain JSON
  • Converting raw JSON to GenericJSON for use with json_filter or json_map
  • Normalizing heterogeneous JSON sources into standard format
  • Adding GenericJSON interface compatibility to legacy data sources

Do NOT use when:

  • Input is already in GenericJSON format (use json_filter or json_map directly)
  • You need custom wrapping structure (extend or create custom processor)
  • Schema validation is required (add json_filter downstream)

Comparison with Other Processors

vs JSONFilterProcessor
  • json_generic: Wraps plain JSON → GenericJSON (no filtering)
  • json_filter: Filters GenericJSON → GenericJSON (no wrapping)
vs JSONMapProcessor
  • json_generic: Wraps plain JSON → GenericJSON (no transformation)
  • json_map: Transforms GenericJSON → GenericJSON (no wrapping)
Typical Pipeline
raw.json → [json_generic] → wrapped.json → [json_filter] → filtered.json → [json_map] → mapped.json

Limitations

Current version limitations:

  • No schema validation of input JSON (accepts any valid JSON object)
  • No custom wrapping structure (always uses "data" field in GenericJSONPayload)
  • No metadata injection (timestamps, source tags, etc.)
  • Invalid JSON messages are dropped (no dead letter queue or retry)
  • Single output subject (no routing based on content)

These may be addressed in future versions based on user requirements.

Testing

Run the test suite:

# Unit tests
task test -- ./processor/json_generic -v

# With race detection
task test:race -- ./processor/json_generic -v

# Integration tests (when available)
task test:integration -- ./processor/json_generic -v

See Also

Documentation

Overview

Package jsongeneric provides a processor for wrapping plain JSON into GenericJSON (core .json.v1) format for integration with StreamKit pipelines.

Overview

The JSON generic processor acts as an ingestion adapter, converting external JSON data into the StreamKit GenericJSON message format. It subscribes to NATS subjects carrying plain JSON, wraps the data in GenericJSONPayload structure, and publishes to output subjects for downstream processing.

Design Context: Protocol-Layer Processor

This processor is a **protocol-layer utility** - it handles data plumbing without making semantic decisions. It does NOT:

  • Determine entity identities (no EntityID generation)
  • Create semantic triples (no Graphable implementation)
  • Interpret domain meaning (no field classification)

These responsibilities belong to **domain processors** that understand your data. See docs/PROCESSOR-DESIGN-PHILOSOPHY.md for the full rationale.

**Pipeline Position:**

External JSON → [json_generic] → GenericJSON → [json_filter/map] → [Domain Processor] → Graph
                 ^^^^^^^^^^^^                                       ^^^^^^^^^^^^^^^^
                 Protocol layer                                     Semantic layer
                 (this package)                                     (your code)

Purpose

Use the JSON generic processor when:

  • Ingesting data from external systems that emit plain JSON
  • Converting raw JSON to GenericJSON for use with json_filter or json_map
  • Normalizing heterogeneous JSON sources into a standard format
  • Adding GenericJSON interface compatibility to legacy data sources

Quick Start

Wrap plain JSON sensor data into GenericJSON format:

config := jsongeneric.JSONGenericConfig{
    Ports: &component.PortConfig{
        Inputs: []component.PortDefinition{
            {Name: "input", Type: "nats", Subject: "external.sensors.>", Required: true},
        },
        Outputs: []component.PortDefinition{
            {Name: "output", Type: "nats", Subject: "internal.sensors",
             Interface: "core .json.v1", Required: true},
        },
    },
}

rawConfig, _ := json.Marshal(config)
processor, err := jsongeneric.NewJSONGenericProcessor(rawConfig, deps)

Message Transformation

**Input (Plain JSON):**

{
  "sensor_id": "temp-001",
  "value": 23.5,
  "unit": "celsius",
  "timestamp": 1234567890
}

**Output (GenericJSON):**

{
  "data": {
    "sensor_id": "temp-001",
    "value": 23.5,
    "unit": "celsius",
    "timestamp": 1234567890
  }
}

The processor wraps the original JSON object in a "data" field, conforming to the GenericJSONPayload structure required by the core .json.v1 interface.

Message Flow

External System → Plain JSON → NATS(raw.>) → JSONGenericProcessor →
                  GenericJSON → NATS(generic.messages) → Downstream Processors

GenericJSON Interface

Output messages conform to core .json.v1:

type GenericJSONPayload struct {
    Data map[string]any `json:"data"`
}

This enables integration with other StreamKit processors:

  • json_filter: Filter wrapped messages by field values
  • json_map: Transform wrapped message fields
  • Custom processors: Process standardized GenericJSON format

Configuration Schema

{
  "ports": {
    "inputs": [
      {
        "name": "nats_input",
        "type": "nats",
        "subject": "raw.>",
        "required": true,
        "description": "NATS subjects with plain JSON data"
      }
    ],
    "outputs": [
      {
        "name": "nats_output",
        "type": "nats",
        "subject": "generic.messages",
        "interface": "core .json.v1",
        "required": true,
        "description": "NATS subject for GenericJSON wrapped messages"
      }
    ]
  }
}

Error Handling

The processor uses streamkit/errors for consistent error classification:

  • Invalid config: errs.WrapInvalid (bad configuration)
  • NATS errors: errs.WrapTransient (network issues, retryable)
  • Unmarshal errors: errs.WrapInvalid (malformed JSON input)

**Invalid JSON Handling:**

Messages that fail JSON parsing are logged at Debug level and dropped:

// Invalid JSON input
{this is not valid json}

// Logged: "Failed to parse message as JSON"
// Action: Message dropped, error counter incremented
// Impact: No output published, processing continues

This prevents downstream processors from receiving malformed data while maintaining system resilience.

Complete Integration Example

**Scenario:** External weather API publishes plain JSON to NATS, StreamKit pipeline filters for high temperatures.

// External API publishes plain JSON
weatherAPI → NATS("weather.raw")

// JSONGenericProcessor wraps to GenericJSON
raw.weather → JSONGenericProcessor → generic.weather (core .json.v1)

// JSONFilterProcessor filters high temperatures
generic.weather → JSONFilterProcessor → weather.high_temp

// Configuration: JSONGenericProcessor
{
  "ports": {
    "inputs": [{"subject": "weather.raw"}],
    "outputs": [{"subject": "generic.weather", "interface": "core .json.v1"}]
  }
}

// Configuration: JSONFilterProcessor
{
  "ports": {
    "inputs": [{"subject": "generic.weather", "interface": "core .json.v1"}],
    "outputs": [{"subject": "weather.high_temp", "interface": "core .json.v1"}]
  },
  "rules": [{"field": "temperature", "operator": "gt", "value": 30}]
}

Performance Considerations

  • Wrapping: O(1) - Single map allocation per message
  • Validation: O(n) - Validates payload structure (minimal overhead)
  • Marshaling: O(n) - JSON serialization of wrapped payload

Typical throughput: 15,000+ messages/second per processor instance.

Observability

The processor implements component.Discoverable for monitoring:

meta := processor.Meta()
// Name: json-generic-processor
// Type: processor
// Description: Wraps plain JSON into GenericJSON (core .json.v1) format

dataFlow := processor.DataFlow()
// MessagesProcessed: Total messages received (valid + invalid JSON)
// MessagesWrapped: Successfully wrapped messages
// ErrorsTotal: JSON parse errors + NATS publish errors

Metrics help identify:

  • Input data quality (ErrorsTotal / MessagesProcessed)
  • Processing rate (MessagesWrapped / time)
  • System health (NATS publish errors)

Use Cases

**External API Integration:**

// Ingest third-party JSON APIs
external.api.weather → JSONGenericProcessor → internal.weather (GenericJSON)

**Legacy System Migration:**

// Wrap legacy JSON formats
legacy.system.data → JSONGenericProcessor → modern.pipeline (GenericJSON)

**Data Normalization:**

// Standardize multiple JSON sources
source.a.data → JSONGenericProcessor ┐
source.b.data → JSONGenericProcessor ├→ unified.data (GenericJSON)
source.c.data → JSONGenericProcessor ┘

**Pipeline Entry Point:**

// Convert raw JSON to pipeline-compatible format
raw.input → JSONGenericProcessor → validated.input → FilterProcessor → MapProcessor

Comparison with Other Processors

**JSONGenericProcessor vs JSONFilterProcessor:**

  • JSONGenericProcessor: Wraps plain JSON → GenericJSON (no filtering)
  • JSONFilterProcessor: Filters GenericJSON → GenericJSON (no wrapping)

**JSONGenericProcessor vs JSONMapProcessor:**

  • JSONGenericProcessor: Wraps plain JSON → GenericJSON (no transformation)
  • JSONMapProcessor: Transforms GenericJSON → GenericJSON (no wrapping)

**When to use JSONGenericProcessor:**

Use when input is plain JSON that needs GenericJSON wrapping. Do not use when input is already GenericJSON format.

Limitations

Current version limitations:

  • No schema validation of input JSON
  • No custom wrapping structure (always uses "data" field)
  • No metadata injection (timestamps, source tags, etc.)
  • Invalid JSON messages are dropped (no DLQ/retry)

These may be addressed in future versions based on user requirements.

Testing

The package includes test coverage:

  • Unit tests: Creation, configuration, port handling, metadata
  • Integration tests: TBD (end-to-end NATS message flows)

Run tests:

go test ./processor/json_generic -v                        # Unit tests
go test -tags=integration ./processor/json_generic -v      # Integration tests (when available)

Design Decisions

**Why separate JSONGenericProcessor from parsers:**

  • Parser package is stateless utility functions
  • JSONGenericProcessor is stateful component with lifecycle management
  • Separation enables parser reuse in other contexts

**Why drop invalid JSON instead of error:**

  • Resilience: One bad message shouldn't stop processing
  • Observability: Errors are counted and logged
  • Simplicity: No complex error recovery needed

**Why no schema validation:**

  • Performance: Validation adds overhead
  • Flexibility: Accepts any valid JSON structure
  • Downstream: Schema validation can be added in pipeline

For questions or contributions, see the StreamKit repository.

Package jsongeneric provides a core processor for wrapping plain JSON into GenericJSONPayload

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewProcessor

func NewProcessor(
	rawConfig json.RawMessage, deps component.Dependencies,
) (component.Discoverable, error)

NewProcessor creates a new JSON generic processor from configuration

func Register

func Register(registry *component.Registry) error

Register registers the JSON generic processor component with the given registry

Types

type Config

type Config struct {
	Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
}

Config holds configuration for JSON generic processor

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns the default configuration for JSON generic processor

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor wraps plain JSON into GenericJSONPayload

func (*Processor) ConfigSchema

func (p *Processor) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema for this processor.

func (*Processor) DataFlow

func (p *Processor) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics for this processor.

func (*Processor) Health

func (p *Processor) Health() component.HealthStatus

Health returns the current health status of this processor.

func (*Processor) Initialize

func (p *Processor) Initialize() error

Initialize prepares the processor (no-op for JSON generic)

func (*Processor) InputPorts

func (p *Processor) InputPorts() []component.Port

InputPorts returns the NATS input ports this processor subscribes to.

func (*Processor) IsStarted

func (p *Processor) IsStarted() bool

IsStarted returns whether the processor is running

func (*Processor) Meta

func (p *Processor) Meta() component.Metadata

Meta returns metadata describing this processor component.

func (*Processor) OutputPorts

func (p *Processor) OutputPorts() []component.Port

OutputPorts returns the NATS output port for wrapped GenericJSON messages.

func (*Processor) Start

func (p *Processor) Start(ctx context.Context) error

Start begins wrapping messages

func (*Processor) Stop

func (p *Processor) Stop(timeout time.Duration) error

Stop gracefully stops the processor

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL