Documentation
¶
Overview ¶
Package jsonmapprocessor provides a processor for transforming GenericJSON messages through field mapping, adding, removing, and string transformations.
Overview ¶
The JSON map processor enables flexible field-level transformations of GenericJSON payloads. It subscribes to NATS subjects carrying GenericJSON messages (core .json.v1 interface), applies mapping rules to transform the data, and publishes the transformed messages to output subjects.
Design Context: Protocol-Layer Processor ¶
This processor is a **protocol-layer utility** - it handles data transformation without making semantic decisions. It does NOT:
- Determine entity identities (no EntityID generation)
- Create semantic triples (no Graphable implementation)
- Interpret domain meaning (transformations are mechanical field operations)
Use this for pre-semantic normalization: rename fields to match expected schemas, remove debug data, or add routing metadata. Semantic transformation (e.g., "classify this sensor reading as critical") belongs in domain processors.
See docs/PROCESSOR-DESIGN-PHILOSOPHY.md for the full rationale.
**Pipeline Position:**
GenericJSON → [json_map] → Transformed GenericJSON → [Domain Processor] → Graph
^^^^^^^^ ^^^^^^^^^^^^^^^^
Protocol layer Semantic layer
(this package) (your code)
Transformation Operations ¶
The processor supports four types of transformations:
- Mapping: Rename fields (source field is removed after mapping)
- Adding: Create new fields with constant values
- Removing: Delete specific fields from the payload
- Transformations: Apply string transformations (uppercase, lowercase, trim)
Quick Start ¶
Rename "temp" to "temperature" and add units:
config := jsonmap.JSONMapConfig{
Ports: &component.PortConfig{
Inputs: []component.PortDefinition{
{Name: "input", Type: "nats", Subject: "sensor.raw", Interface: "core .json.v1"},
},
Outputs: []component.PortDefinition{
{Name: "output", Type: "nats", Subject: "sensor.normalized", Interface: "core .json.v1"},
},
},
Mappings: []jsonmap.FieldMapping{
{Source: "temp", Target: "temperature"},
},
AddFields: map[string]any{
"unit": "celsius",
"version": 2,
},
}
rawConfig, _ := json.Marshal(config)
processor, err := jsonmap.NewJSONMapProcessor(rawConfig, deps)
Field Mapping ¶
Field mapping renames fields and removes the source:
// Input
{"data": {"temp": 23.5, "location": "lab-1"}}
// Mapping rule
{Source: "temp", Target: "temperature"}
// Output
{"data": {"temperature": 23.5, "location": "lab-1"}}
The original "temp" field is removed after mapping.
Adding Fields ¶
Add new fields with constant values:
AddFields: map[string]any{
"version": 2,
"source": "sensor-network-a",
"processed": true,
}
These fields are added to every message. Useful for:
- Adding metadata (version, source, timestamp)
- Tagging data with processing stage
- Injecting configuration values
Removing Fields ¶
Remove unwanted fields from payloads:
RemoveFields: []string{"internal_id", "debug_info", "raw_buffer"}
Common use cases:
- Data sanitization (remove PII before publishing)
- Payload size reduction (remove debug fields)
- Schema migration (remove deprecated fields)
String Transformations ¶
Apply string transformations to specific fields:
Transformations: []jsonmap.FieldTransformation{
{Field: "status", Type: "uppercase"}, // "active" → "ACTIVE"
{Field: "name", Type: "lowercase"}, // "Sensor-001" → "sensor-001"
{Field: "message", Type: "trim"}, // " error " → "error"
}
Supported transformation types:
- "uppercase": Convert string to uppercase
- "lowercase": Convert string to lowercase
- "trim": Remove leading/trailing whitespace
Non-string fields are skipped silently.
Complete Example ¶
Normalize sensor data with multiple transformations:
// Input message
{
"data": {
"temp": 23.5,
"stat": "ACTIVE",
"loc": "lab-1",
"debug_timestamp": 1234567890
}
}
// Configuration
config := jsonmap.JSONMapConfig{
Mappings: []jsonmap.FieldMapping{
{Source: "temp", Target: "temperature"},
{Source: "stat", Target: "status"},
{Source: "loc", Target: "location"},
},
AddFields: map[string]any{
"unit": "celsius",
"source": "sensor-network",
},
RemoveFields: []string{"debug_timestamp"},
Transformations: []jsonmap.FieldTransformation{
{Field: "status", Type: "lowercase"},
},
}
// Output message
{
"data": {
"temperature": 23.5,
"status": "active",
"location": "lab-1",
"unit": "celsius",
"source": "sensor-network"
}
}
Message Flow ¶
Input Subject → GenericJSON → Apply Mappings → Add Fields → Remove Fields →
Transform Strings → Output Subject
Transformation Order ¶
Operations are applied in this order:
- Field Mappings (rename and remove source)
- Add Fields (inject new fields)
- Remove Fields (delete unwanted fields)
- String Transformations (apply string operations)
This order ensures:
- Mapped fields can be transformed
- Added fields won't be removed
- Transformations apply to final field names
GenericJSON Interface ¶
Input and output messages conform to core .json.v1:
type GenericJSONPayload struct {
Data map[string]any `json:"data"`
}
All transformations operate on the Data field, preserving the GenericJSON structure.
Configuration Schema ¶
{
"ports": {
"inputs": [
{"name": "input", "type": "nats", "subject": "raw.>", "interface": "core .json.v1"}
],
"outputs": [
{"name": "output", "type": "nats", "subject": "mapped.messages", "interface": "core .json.v1"}
]
},
"mappings": [
{"source": "old_field", "target": "new_field"}
],
"add_fields": {
"version": 2,
"processed": true
},
"remove_fields": ["internal_id", "debug_info"],
"transformations": [
{"field": "status", "type": "lowercase"}
]
}
Error Handling ¶
The processor uses streamkit/errors for consistent error classification:
- Invalid config: errs.WrapInvalid (bad configuration)
- NATS errors: errs.WrapTransient (network issues, retryable)
- Unmarshal errors: errs.WrapInvalid (malformed JSON payloads)
Messages that fail parsing are logged at Debug level and dropped.
Performance Considerations ¶
- Mapping: O(1) map operations per field
- Adding: O(n) where n is number of fields to add
- Removing: O(m) where m is number of fields to remove
- Transformations: O(k) where k is number of transformations
Overall complexity: O(n + m + k) per message
Typical throughput: 10,000+ messages/second per processor instance.
Observability ¶
The processor implements component.Discoverable for monitoring:
meta := processor.Meta() // Name: json-map-processor // Type: processor // Description: Maps/transforms GenericJSON messages (core .json.v1) dataFlow := processor.DataFlow() // MessagesProcessed: Total messages received // MessagesMapped: Messages successfully transformed // ErrorsTotal: Parse errors + transformation errors
Use Cases ¶
**Schema Migration:**
// Migrate from v1 to v2 schema
Mappings: [{Source: "temp", Target: "temperature"}]
AddFields: {"schema_version": 2}
RemoveFields: ["deprecated_field"]
**Data Sanitization:**
// Remove PII before publishing externally RemoveFields: ["email", "phone", "ssn", "internal_id"]
**Standardization:**
// Normalize field names and values
Mappings: [
{Source: "temp", Target: "temperature"},
{Source: "stat", Target: "status"},
]
Transformations: [
{Field: "status", Type: "uppercase"},
]
**Enrichment:**
// Add context to raw sensor data
AddFields: {
"facility": "warehouse-a",
"region": "north-america",
"processed_by": "json-map-v2",
}
Limitations ¶
Current version limitations:
- No support for nested field mapping (e.g., "position.lat" → "latitude")
- No conditional transformations (transform based on field values)
- No computed fields (e.g., combine firstName + lastName)
- No custom transformation functions
These may be addressed in future versions based on user requirements.
Testing ¶
The package includes comprehensive test coverage:
- Unit tests: Mapping logic, transformation functions, edge cases
- Integration tests: End-to-end NATS message flows with testcontainers
Run tests:
go test ./processor/json_map -v # Unit tests go test -tags=integration ./processor/json_map -v # Integration tests
Package jsonmapprocessor provides a core processor for transforming GenericJSON message fields
Index ¶
- func NewProcessor(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry *component.Registry) error
- type Config
- type FieldMapping
- type Processor
- func (m *Processor) ConfigSchema() component.ConfigSchema
- func (m *Processor) DataFlow() component.FlowMetrics
- func (m *Processor) Health() component.HealthStatus
- func (m *Processor) Initialize() error
- func (m *Processor) InputPorts() []component.Port
- func (m *Processor) Meta() component.Metadata
- func (m *Processor) OutputPorts() []component.Port
- func (m *Processor) Start(ctx context.Context) error
- func (m *Processor) Stop(timeout time.Duration) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewProcessor ¶
func NewProcessor( rawConfig json.RawMessage, deps component.Dependencies, ) (component.Discoverable, error)
NewProcessor creates a new JSON map processor from configuration
Types ¶
type Config ¶
type Config struct {
Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
Mappings []FieldMapping `json:"mappings" schema:"type:array,description:Field mappings,category:basic"`
AddFields map[string]any `json:"add_fields" schema:"type:object,description:Static fields"`
RemoveFields []string `json:"remove_fields" schema:"type:array,description:Field removal"`
}
Config holds configuration for JSON map processor
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns the default configuration for JSON map processor
type FieldMapping ¶
type FieldMapping struct {
SourceField string `json:"source_field" schema:"type:string,description:Source field,required:true"`
TargetField string `json:"target_field" schema:"type:string,description:Target field,required:true"`
Transform string `json:"transform" schema:"type:enum,enum:copy|uppercase|lowercase|trim,description:Type"`
}
FieldMapping defines a single field transformation
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor implements a GenericJSON message field transformer
func (*Processor) ConfigSchema ¶
func (m *Processor) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema for this processor.
func (*Processor) DataFlow ¶
func (m *Processor) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics for this processor.
func (*Processor) Health ¶
func (m *Processor) Health() component.HealthStatus
Health returns the current health status of this processor.
func (*Processor) Initialize ¶
Initialize prepares the processor (no-op for JSON map)
func (*Processor) InputPorts ¶
InputPorts returns the NATS input ports this processor subscribes to.
func (*Processor) OutputPorts ¶
OutputPorts returns the NATS output port for transformed messages.