jsonmapprocessor

package
v1.0.0-alpha.29 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2026 License: MIT Imports: 17 Imported by: 0

README

JSON Map Processor

Protocol-layer processor for transforming GenericJSON message fields through mapping, adding, and removing operations.

Purpose

The JSON Map processor enables flexible field-level transformations of GenericJSON payloads without making semantic decisions. It subscribes to NATS subjects carrying GenericJSON messages (core .json.v1 interface), applies mapping rules to transform the data, and publishes the transformed messages to output subjects. This processor handles protocol-layer normalization such as field renaming, debug data removal, and metadata injection before semantic processing occurs.

Configuration

type: processor
component: json_map
config:
  ports:
    inputs:
      - name: input
        type: nats
        subject: sensor.raw
        interface: core .json.v1
        required: true
    outputs:
      - name: output
        type: nats
        subject: sensor.normalized
        interface: core .json.v1
        required: true
  mappings:
    - source_field: temp
      target_field: temperature
      transform: copy
    - source_field: stat
      target_field: status
      transform: lowercase
  add_fields:
    unit: celsius
    version: 2
    source: sensor-network
  remove_fields:
    - debug_timestamp
    - internal_id
Configuration Fields

Ports (required)

  • inputs: Array of input port definitions. Each port must specify interface: core .json.v1.
  • outputs: Array of output port definitions. Transformed messages use the same core .json.v1 interface.

Mappings (optional)

Array of field mapping rules. Each mapping contains:

  • source_field: Original field name to map from
  • target_field: New field name to map to
  • transform: Transformation type (copy, uppercase, lowercase, trim)

Source fields are removed after mapping unless source_field equals target_field.

Add Fields (optional)

Map of constant fields to add to every message. Useful for injecting metadata, version tags, or configuration values.

Remove Fields (optional)

Array of field names to remove from payloads. Applied after mappings but before string transformations.

Input/Output Ports

Input Ports
  • Type: NATS or JetStream
  • Interface: core .json.v1 (GenericJSON)
  • Wildcard Support: Yes (e.g., sensor.*.raw, data.>)
  • Required: Yes (at least one input port)
Output Ports
  • Type: NATS or JetStream
  • Interface: core .json.v1 (GenericJSON)
  • Required: No (processor can be used for side effects only)

Transformation Operations

The processor applies operations in this order:

  1. Field Mappings: Rename fields and remove source
  2. Add Fields: Inject constant values
  3. Remove Fields: Delete unwanted fields
  4. String Transformations: Apply string operations to mapped field names
Field Mapping Example
// Input
{"data": {"temp": 23.5, "location": "lab-1"}}

// Mapping: {"source_field": "temp", "target_field": "temperature"}

// Output
{"data": {"temperature": 23.5, "location": "lab-1"}}
Transformation Types
  • copy: No transformation (default)
  • uppercase: Convert string to uppercase ("active""ACTIVE")
  • lowercase: Convert string to lowercase ("ACTIVE""active")
  • trim: Remove leading and trailing whitespace (" error ""error")

Non-string values are passed through unchanged when string transforms are specified.

Example Use Cases

Schema Migration

Migrate sensor data from version 1 to version 2 format.

mappings:
  - source_field: temp
    target_field: temperature
    transform: copy
  - source_field: loc
    target_field: location
    transform: copy
add_fields:
  schema_version: 2
remove_fields:
  - deprecated_field
Data Sanitization

Remove PII and debug information before publishing to external systems.

remove_fields:
  - email
  - phone
  - ssn
  - internal_id
  - debug_timestamp
add_fields:
  sanitized: true
  sanitized_at: 2026-02-11T00:00:00Z
Field Standardization

Normalize inconsistent field names and values from multiple data sources.

mappings:
  - source_field: temp
    target_field: temperature
    transform: copy
  - source_field: stat
    target_field: status
    transform: uppercase
  - source_field: msg
    target_field: message
    transform: trim
add_fields:
  standardized: true
  source: data-ingestion-pipeline
Enrichment

Add contextual metadata to raw sensor readings.

add_fields:
  facility: warehouse-a
  region: north-america
  timezone: America/New_York
  processed_by: json-map-v2

Pipeline Position

The JSON Map processor is a protocol-layer utility positioned before semantic processors:

GenericJSON → [json_map] → Transformed GenericJSON → [Domain Processor] → Graph
               ^^^^^^^^                               ^^^^^^^^^^^^^^^^
               Protocol layer                         Semantic layer

This processor does NOT:

  • Generate EntityIDs (no identity determination)
  • Create semantic triples (no Graphable implementation)
  • Interpret domain meaning (transformations are mechanical)

Use domain processors for semantic transformations like "classify this sensor reading as critical."

Performance Characteristics

  • Complexity: O(n + m + k) where n = fields to add, m = fields to remove, k = transformations
  • Throughput: 10,000+ messages/second per instance
  • Memory: Constant per message (creates new map for each transformation)
  • Concurrency: Thread-safe with goroutine-per-message processing

Observability

Health Metrics
health := processor.Health()
// Healthy: true/false
// ErrorCount: Parse + transformation errors
// Uptime: Time since start
Data Flow Metrics
flow := processor.DataFlow()
// ErrorRate: errors / messages processed
// LastActivity: Timestamp of last message
Prometheus Metrics

The processor exports Prometheus metrics when a registry is provided:

  • semstreams_json_map_messages_processed_total: Total messages received
  • semstreams_json_map_transformations_total: Successful transformations
  • semstreams_json_map_errors_total: Error count by type
  • semstreams_json_map_transformation_duration_seconds: Transformation latency

Limitations

Current version does not support:

  • Nested field mapping (e.g., position.latlatitude)
  • Conditional transformations (transform based on field values)
  • Computed fields (combine multiple fields)
  • Custom transformation functions
  • Type conversions (string to number, etc.)

These features may be added in future versions based on requirements.

Testing

# Unit tests
go test ./processor/json_map -v

# Integration tests (requires Docker)
go test -tags=integration ./processor/json_map -v

# Race detection
go test -race ./processor/json_map -v

Documentation

Overview

Package jsonmapprocessor provides a processor for transforming GenericJSON messages through field mapping, adding, removing, and string transformations.

Overview

The JSON map processor enables flexible field-level transformations of GenericJSON payloads. It subscribes to NATS subjects carrying GenericJSON messages (core .json.v1 interface), applies mapping rules to transform the data, and publishes the transformed messages to output subjects.

Design Context: Protocol-Layer Processor

This processor is a **protocol-layer utility** - it handles data transformation without making semantic decisions. It does NOT:

  • Determine entity identities (no EntityID generation)
  • Create semantic triples (no Graphable implementation)
  • Interpret domain meaning (transformations are mechanical field operations)

Use this for pre-semantic normalization: rename fields to match expected schemas, remove debug data, or add routing metadata. Semantic transformation (e.g., "classify this sensor reading as critical") belongs in domain processors.

See docs/PROCESSOR-DESIGN-PHILOSOPHY.md for the full rationale.

**Pipeline Position:**

GenericJSON → [json_map] → Transformed GenericJSON → [Domain Processor] → Graph
               ^^^^^^^^                               ^^^^^^^^^^^^^^^^
               Protocol layer                         Semantic layer
               (this package)                         (your code)

Transformation Operations

The processor supports four types of transformations:

  • Mapping: Rename fields (source field is removed after mapping)
  • Adding: Create new fields with constant values
  • Removing: Delete specific fields from the payload
  • Transformations: Apply string transformations (uppercase, lowercase, trim)

Quick Start

Rename "temp" to "temperature" and add units:

config := jsonmap.JSONMapConfig{
    Ports: &component.PortConfig{
        Inputs: []component.PortDefinition{
            {Name: "input", Type: "nats", Subject: "sensor.raw", Interface: "core .json.v1"},
        },
        Outputs: []component.PortDefinition{
            {Name: "output", Type: "nats", Subject: "sensor.normalized", Interface: "core .json.v1"},
        },
    },
    Mappings: []jsonmap.FieldMapping{
        {Source: "temp", Target: "temperature"},
    },
    AddFields: map[string]any{
        "unit": "celsius",
        "version": 2,
    },
}

rawConfig, _ := json.Marshal(config)
processor, err := jsonmap.NewJSONMapProcessor(rawConfig, deps)

Field Mapping

Field mapping renames fields and removes the source:

// Input
{"data": {"temp": 23.5, "location": "lab-1"}}

// Mapping rule
{Source: "temp", Target: "temperature"}

// Output
{"data": {"temperature": 23.5, "location": "lab-1"}}

The original "temp" field is removed after mapping.

Adding Fields

Add new fields with constant values:

AddFields: map[string]any{
    "version": 2,
    "source": "sensor-network-a",
    "processed": true,
}

These fields are added to every message. Useful for:

  • Adding metadata (version, source, timestamp)
  • Tagging data with processing stage
  • Injecting configuration values

Removing Fields

Remove unwanted fields from payloads:

RemoveFields: []string{"internal_id", "debug_info", "raw_buffer"}

Common use cases:

  • Data sanitization (remove PII before publishing)
  • Payload size reduction (remove debug fields)
  • Schema migration (remove deprecated fields)

String Transformations

Apply string transformations to specific fields:

Transformations: []jsonmap.FieldTransformation{
    {Field: "status", Type: "uppercase"},    // "active" → "ACTIVE"
    {Field: "name", Type: "lowercase"},      // "Sensor-001" → "sensor-001"
    {Field: "message", Type: "trim"},        // "  error  " → "error"
}

Supported transformation types:

  • "uppercase": Convert string to uppercase
  • "lowercase": Convert string to lowercase
  • "trim": Remove leading/trailing whitespace

Non-string fields are skipped silently.

Complete Example

Normalize sensor data with multiple transformations:

// Input message
{
  "data": {
    "temp": 23.5,
    "stat": "ACTIVE",
    "loc": "lab-1",
    "debug_timestamp": 1234567890
  }
}

// Configuration
config := jsonmap.JSONMapConfig{
    Mappings: []jsonmap.FieldMapping{
        {Source: "temp", Target: "temperature"},
        {Source: "stat", Target: "status"},
        {Source: "loc", Target: "location"},
    },
    AddFields: map[string]any{
        "unit": "celsius",
        "source": "sensor-network",
    },
    RemoveFields: []string{"debug_timestamp"},
    Transformations: []jsonmap.FieldTransformation{
        {Field: "status", Type: "lowercase"},
    },
}

// Output message
{
  "data": {
    "temperature": 23.5,
    "status": "active",
    "location": "lab-1",
    "unit": "celsius",
    "source": "sensor-network"
  }
}

Message Flow

Input Subject → GenericJSON → Apply Mappings → Add Fields → Remove Fields →
                Transform Strings → Output Subject

Transformation Order

Operations are applied in this order:

  1. Field Mappings (rename and remove source)
  2. Add Fields (inject new fields)
  3. Remove Fields (delete unwanted fields)
  4. String Transformations (apply string operations)

This order ensures:

  • Mapped fields can be transformed
  • Added fields won't be removed
  • Transformations apply to final field names

GenericJSON Interface

Input and output messages conform to core .json.v1:

type GenericJSONPayload struct {
    Data map[string]any `json:"data"`
}

All transformations operate on the Data field, preserving the GenericJSON structure.

Configuration Schema

{
  "ports": {
    "inputs": [
      {"name": "input", "type": "nats", "subject": "raw.>", "interface": "core .json.v1"}
    ],
    "outputs": [
      {"name": "output", "type": "nats", "subject": "mapped.messages", "interface": "core .json.v1"}
    ]
  },
  "mappings": [
    {"source": "old_field", "target": "new_field"}
  ],
  "add_fields": {
    "version": 2,
    "processed": true
  },
  "remove_fields": ["internal_id", "debug_info"],
  "transformations": [
    {"field": "status", "type": "lowercase"}
  ]
}

Error Handling

The processor uses streamkit/errors for consistent error classification:

  • Invalid config: errs.WrapInvalid (bad configuration)
  • NATS errors: errs.WrapTransient (network issues, retryable)
  • Unmarshal errors: errs.WrapInvalid (malformed JSON payloads)

Messages that fail parsing are logged at Debug level and dropped.

Performance Considerations

  • Mapping: O(1) map operations per field
  • Adding: O(n) where n is number of fields to add
  • Removing: O(m) where m is number of fields to remove
  • Transformations: O(k) where k is number of transformations

Overall complexity: O(n + m + k) per message

Typical throughput: 10,000+ messages/second per processor instance.

Observability

The processor implements component.Discoverable for monitoring:

meta := processor.Meta()
// Name: json-map-processor
// Type: processor
// Description: Maps/transforms GenericJSON messages (core .json.v1)

dataFlow := processor.DataFlow()
// MessagesProcessed: Total messages received
// MessagesMapped: Messages successfully transformed
// ErrorsTotal: Parse errors + transformation errors

Use Cases

**Schema Migration:**

// Migrate from v1 to v2 schema
Mappings: [{Source: "temp", Target: "temperature"}]
AddFields: {"schema_version": 2}
RemoveFields: ["deprecated_field"]

**Data Sanitization:**

// Remove PII before publishing externally
RemoveFields: ["email", "phone", "ssn", "internal_id"]

**Standardization:**

// Normalize field names and values
Mappings: [
    {Source: "temp", Target: "temperature"},
    {Source: "stat", Target: "status"},
]
Transformations: [
    {Field: "status", Type: "uppercase"},
]

**Enrichment:**

// Add context to raw sensor data
AddFields: {
    "facility": "warehouse-a",
    "region": "north-america",
    "processed_by": "json-map-v2",
}

Limitations

Current version limitations:

  • No support for nested field mapping (e.g., "position.lat" → "latitude")
  • No conditional transformations (transform based on field values)
  • No computed fields (e.g., combine firstName + lastName)
  • No custom transformation functions

These may be addressed in future versions based on user requirements.

Testing

The package includes comprehensive test coverage:

  • Unit tests: Mapping logic, transformation functions, edge cases
  • Integration tests: End-to-end NATS message flows with testcontainers

Run tests:

go test ./processor/json_map -v                        # Unit tests
go test -tags=integration ./processor/json_map -v      # Integration tests

Package jsonmapprocessor provides a core processor for transforming GenericJSON message fields

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewProcessor

func NewProcessor(
	rawConfig json.RawMessage, deps component.Dependencies,
) (component.Discoverable, error)

NewProcessor creates a new JSON map processor from configuration

func Register

func Register(registry *component.Registry) error

Register registers the JSON map processor component with the given registry

Types

type Config

type Config struct {
	Ports        *component.PortConfig `json:"ports"         schema:"type:ports,description:Port configuration,category:basic"`
	Mappings     []FieldMapping        `json:"mappings"      schema:"type:array,description:Field mappings,category:basic"`
	AddFields    map[string]any        `json:"add_fields"    schema:"type:object,description:Static fields"`
	RemoveFields []string              `json:"remove_fields" schema:"type:array,description:Field removal"`
}

Config holds configuration for JSON map processor

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns the default configuration for JSON map processor

type FieldMapping

type FieldMapping struct {
	SourceField string `json:"source_field" schema:"type:string,description:Source field,required:true"`
	TargetField string `json:"target_field" schema:"type:string,description:Target field,required:true"`
	Transform   string `json:"transform"    schema:"type:enum,enum:copy|uppercase|lowercase|trim,description:Type"`
}

FieldMapping defines a single field transformation

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor implements a GenericJSON message field transformer

func (*Processor) ConfigSchema

func (m *Processor) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema for this processor.

func (*Processor) DataFlow

func (m *Processor) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics for this processor.

func (*Processor) Health

func (m *Processor) Health() component.HealthStatus

Health returns the current health status of this processor.

func (*Processor) Initialize

func (m *Processor) Initialize() error

Initialize prepares the processor (no-op for JSON map)

func (*Processor) InputPorts

func (m *Processor) InputPorts() []component.Port

InputPorts returns the NATS input ports this processor subscribes to.

func (*Processor) Meta

func (m *Processor) Meta() component.Metadata

Meta returns metadata describing this processor component.

func (*Processor) OutputPorts

func (m *Processor) OutputPorts() []component.Port

OutputPorts returns the NATS output port for transformed messages.

func (*Processor) Start

func (m *Processor) Start(ctx context.Context) error

Start begins transforming messages

func (*Processor) Stop

func (m *Processor) Stop(timeout time.Duration) error

Stop gracefully stops the processor

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL