iotsensor

package
v1.0.0-alpha.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2026 License: MIT Imports: 18 Imported by: 0

README

IoT Sensor Example Processor

This package demonstrates the correct pattern for implementing domain-specific processors in SemStreams. It shows how to create payloads that implement the Graphable interface with proper federated entity IDs and semantic predicates.

Why Domain Processors Matter

SemStreams exists to help developers transform incoming data into meaningful semantic graphs. The transformation step is where domain knowledge lives - it cannot be automated away by generic processors.

Generic processors that automatically convert JSON to entities:

  • Make semantic decisions without domain understanding
  • Produce low-quality, auto-generated triples
  • Create entity IDs without federated structure
  • Derive predicates from JSON keys, not semantic meaning
  • Treat relationships as strings, not entity references

Domain processors encode your understanding of the data.

The Graphable Interface

Every domain payload must implement:

type Graphable interface {
    EntityID() string      // 6-part federated identifier
    Triples() []Triple     // Semantic facts about the entity
}

Using This Example

1. Copy and Adapt

Copy this package to your domain repository:

cp -r examples/processors/iot_sensor/ your-repo/processors/your_domain/
2. Define Your Payload

Replace SensorReading with your domain entity:

type YourPayload struct {
    // Input fields from incoming data
    ID        string
    Type      string
    // ... domain-specific fields

    // Context fields (from processor config)
    OrgID    string
    Platform string
}
3. Implement EntityID

Return a deterministic 6-part federated ID:

func (p *YourPayload) EntityID() string {
    // {org}.{platform}.{domain}.{system}.{type}.{instance}
    return fmt.Sprintf("%s.%s.yourdomain.system.%s.%s",
        p.OrgID,
        p.Platform,
        p.Type,
        p.ID,
    )
}
4. Implement Triples

Return semantic facts using registered predicates:

func (p *YourPayload) Triples() []message.Triple {
    entityID := p.EntityID()
    return []message.Triple{
        {
            Subject:   entityID,
            Predicate: "yourdomain.category.property",
            Object:    p.SomeValue,
            // ...
        },
        // Entity references, not strings!
        {
            Subject:   entityID,
            Predicate: "yourdomain.relationship.type",
            Object:    p.relatedEntityID(), // Another entity ID
        },
    }
}
5. Register Your Predicates

Create a vocabulary file with your domain predicates:

func RegisterVocabulary() {
    vocabulary.Register("yourdomain.category.property",
        vocabulary.WithDescription("Description of this predicate"),
        vocabulary.WithDataType("float64"),
    )
    // ... more predicates
}

Files in This Package

File Purpose
payload.go SensorReading and Zone implementing Graphable
payload_test.go Tests verifying Graphable contract
processor.go JSON transformation with domain logic
processor_test.go Processor unit tests
vocabulary.go IoT predicate registration
README.md This file

Running Tests

go test -race ./examples/processors/iot_sensor/...

Key Patterns Demonstrated

Unit-Specific Predicates
// Instead of generic "value" + "unit" triples:
Predicate: fmt.Sprintf("sensor.measurement.%s", s.Unit)
// Produces: sensor.measurement.celsius, sensor.measurement.percent, etc.
Entity References
// Instead of location as string:
Predicate: "geo.location.zone"
Object:    s.zoneEntityID()  // Another 6-part entity ID
Classification Triples
// Add domain knowledge about the entity:
Predicate: "sensor.classification.type"
Object:    s.SensorType

Further Reading

Documentation

Overview

Package iotsensor provides an example domain processor demonstrating the correct Graphable implementation pattern for SemStreams.

This package serves as a reference implementation showing how to:

  • Create domain-specific payloads that implement the Graphable interface
  • Generate federated 6-part entity IDs with organizational context
  • Produce semantic triples using registered vocabulary predicates
  • Transform incoming JSON into meaningful graph structures

IoT sensors are used as a neutral example domain that is:

  • Simple enough to understand quickly
  • Complex enough to demonstrate real patterns
  • Universally understood across industries
  • Not tied to any specific customer domain

For production use, copy this example and adapt it to your domain vocabulary.

Index

Constants

View Source
const (
	// Sensor measurement predicates (unit-specific)
	PredicateMeasurementCelsius    = "sensor.measurement.celsius"
	PredicateMeasurementFahrenheit = "sensor.measurement.fahrenheit"
	PredicateMeasurementPercent    = "sensor.measurement.percent"
	PredicateMeasurementHPA        = "sensor.measurement.hpa"

	// Sensor classification predicates
	PredicateClassificationType    = "sensor.classification.type"
	PredicateClassificationAmbient = "sensor.classification.ambient"

	// Geo location predicates
	PredicateLocationZone      = "geo.location.zone"
	PredicateLocationLatitude  = "geo.location.latitude"
	PredicateLocationLongitude = "geo.location.longitude"

	// Time observation predicates
	PredicateObservationRecorded = "time.observation.recorded"
	PredicateObservationReceived = "time.observation.received"

	// Facility zone predicates
	PredicateZoneName = "facility.zone.name"
	PredicateZoneType = "facility.zone.type"

	// Sensor identity predicates (for ALIAS_INDEX)
	PredicateSensorSerial = "iot.sensor.serial"
)

Predicate constants for the IoT sensor domain. These follow the three-level dotted notation: domain.category.property

Variables

This section is empty.

Functions

func NewComponent

func NewComponent(
	rawConfig json.RawMessage, deps component.Dependencies,
) (component.Discoverable, error)

NewComponent creates a new IoT sensor processor component from configuration. This is the factory function registered with the component registry.

func ParseZoneEntityID

func ParseZoneEntityID(entityID string) (zoneType, zoneID string)

ParseZoneEntityID extracts zone type and zone ID from a full zone entity ID. Zone entity ID format: org.platform.facility.zone.{zoneType}.{zoneID} Example: "c360.logistics.facility.zone.area.cold-storage-1" -> ("area", "cold-storage-1") Returns empty strings if the entity ID is not a valid zone format.

func Register

func Register(registry *component.Registry) error

Register registers the IoT sensor processor component with the given registry. This enables the component to be discovered and instantiated by the component management system.

The registration includes:

  • Component factory function for creating instances
  • Configuration schema for validation and UI generation
  • Type information (processor, domain: iot)
  • Protocol identifier for component routing
  • Version information for compatibility tracking

func RegisterVocabulary

func RegisterVocabulary()

RegisterVocabulary registers all IoT sensor domain predicates with the vocabulary system. This should be called during application initialization.

Example usage:

func init() {
    iotsensor.RegisterVocabulary()
}

func ZoneEntityID

func ZoneEntityID(orgID, platform, zoneType, zoneID string) string

ZoneEntityID generates a federated 6-part entity ID for a zone. This is the single source of truth for zone entity ID format, ensuring consistency between Zone.EntityID() and any references to zones.

Example: ZoneEntityID("acme", "logistics", "area", "warehouse-7") Returns: "acme.logistics.facility.zone.area.warehouse-7"

Types

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component wraps the domain-specific IoT sensor processor with component lifecycle. It bridges the gap between the stateless domain processor and the stateful component framework that handles NATS messaging and lifecycle management.

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema for this processor.

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics for this processor.

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health returns the current health status of this processor.

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize prepares the component (no-op for IoT sensor processor)

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts returns the NATS input ports this processor subscribes to.

func (*Component) IsStarted

func (c *Component) IsStarted() bool

IsStarted returns whether the component is running

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta returns metadata describing this processor component.

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts returns the NATS output port for Graphable sensor readings.

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start begins processing sensor messages

func (*Component) Stop

func (c *Component) Stop(timeout time.Duration) error

Stop gracefully stops the component

type ComponentConfig

type ComponentConfig struct {
	// Ports defines NATS input/output subjects for message routing
	Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`

	// OrgID is the organization identifier for federated entity IDs
	OrgID string `json:"org_id" schema:"type:string,description:Organization identifier,category:basic,required:true"`

	// Platform is the platform/product identifier for federated entity IDs
	Platform string `json:"platform" schema:"type:string,description:Platform identifier,category:basic,required:true"`
}

ComponentConfig holds configuration for the IoT sensor processor component. This wraps the domain-specific processor configuration with port information required by the component framework.

func DefaultConfig

func DefaultConfig() ComponentConfig

DefaultConfig returns the default configuration for IoT sensor processor

type Config

type Config struct {
	// OrgID is the organization identifier (e.g., "acme")
	// This becomes the first part of federated entity IDs.
	OrgID string

	// Platform is the platform/product identifier (e.g., "logistics")
	// This becomes the second part of federated entity IDs.
	Platform string
}

Config holds the configuration for the IoT sensor processor. It provides the organizational context that is applied to all processed readings.

func (Config) Validate

func (c Config) Validate() error

Validate checks that the configuration is valid.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor transforms incoming JSON sensor data into Graphable payloads. It applies the organizational context from configuration and produces SensorReading instances with proper federated entity IDs and semantic triples.

This demonstrates the correct pattern for domain processors:

  • Configuration provides organizational context
  • Process method transforms data with domain understanding
  • Output is a Graphable payload, not generic JSON

func NewProcessor

func NewProcessor(config Config) *Processor

NewProcessor creates a new IoT sensor processor with the given configuration.

func (*Processor) Process

func (p *Processor) Process(input map[string]any) (*SensorReading, error)

Process transforms incoming JSON data into a SensorReading.

Expected JSON format:

{
  "device_id": "sensor-042",
  "type": "temperature",
  "reading": 23.5,
  "unit": "celsius",
  "location": "warehouse-7",
  "timestamp": "2025-11-26T10:30:00Z"
}

The processor:

  1. Extracts fields from the incoming JSON
  2. Applies organizational context from config
  3. Returns a SensorReading that implements Graphable

This method demonstrates domain-specific transformation logic:

  • Field extraction with proper type handling
  • Context enrichment from configuration
  • Validation of required fields

type SensorReading

type SensorReading struct {
	// Input fields (from incoming JSON)
	DeviceID   string    // e.g., "sensor-042"
	SensorType string    // e.g., "temperature", "humidity", "pressure"
	Value      float64   // e.g., 23.5
	Unit       string    // e.g., "celsius", "percent", "hpa"
	ObservedAt time.Time // When measurement was taken

	// Alias field (for ALIAS_INDEX testing)
	SerialNumber string // e.g., "SN-2025-001234" - manufacturer serial number

	// Geospatial fields (for SPATIAL_INDEX testing)
	Latitude  *float64 // e.g., 37.7749 (nil if not provided)
	Longitude *float64 // e.g., -122.4194 (nil if not provided)
	Altitude  *float64 // e.g., 10.0 meters (optional)

	// Entity reference fields (computed by processor)
	ZoneEntityID string // e.g., "acme.logistics.facility.zone.area.warehouse-7"

	// Context fields (set by processor from config)
	OrgID    string // e.g., "acme"
	Platform string // e.g., "logistics"
}

SensorReading represents an IoT sensor measurement. It implements the Graphable interface with federated entity IDs and semantic predicates.

This is an example of a domain-specific payload that encodes semantic understanding of the data, as opposed to generic processors that make semantic decisions without domain knowledge.

func (*SensorReading) EntityID

func (s *SensorReading) EntityID() string

EntityID returns a deterministic 6-part federated entity ID following the pattern: {org}.{platform}.{domain}.{system}.{type}.{instance}

Example: "acme.logistics.environmental.sensor.temperature.sensor-042"

The 6 parts provide:

  • org: Organization identifier (multi-tenancy)
  • platform: Platform/product within the organization
  • domain: Business domain (environmental, logistics, etc.)
  • system: System or subsystem (sensor, actuator, etc.)
  • type: Entity type within the system (temperature, humidity, etc.)
  • instance: Unique instance identifier

func (*SensorReading) MarshalJSON

func (s *SensorReading) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler for SensorReading. Uses alias pattern to avoid infinite recursion.

func (*SensorReading) Schema

func (s *SensorReading) Schema() message.Type

Schema returns the message type for sensor readings. This identifies the payload type for routing and processing. Type format: domain.category.version → iot.sensor.v1

func (*SensorReading) Triples

func (s *SensorReading) Triples() []message.Triple

Triples returns semantic facts about this sensor reading using domain-appropriate predicates from the vocabulary system.

Each triple follows the Subject-Predicate-Object pattern where:

  • Subject: This entity's ID (self-reference)
  • Predicate: Semantic property using dotted notation (domain.category.property)
  • Object: The value (literal) or entity reference (another entity ID)

The triples produced demonstrate:

  • Unit-specific predicates (sensor.measurement.celsius vs generic "value")
  • Entity references (geo.location.zone points to Zone entity, not a string)
  • Classification triples (sensor.classification.type)
  • Temporal tracking (time.observation.recorded)

func (*SensorReading) UnmarshalJSON

func (s *SensorReading) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler for SensorReading. Uses alias pattern to avoid infinite recursion.

func (*SensorReading) Validate

func (s *SensorReading) Validate() error

Validate checks that the sensor reading has all required fields.

type Zone

type Zone struct {
	ZoneID   string // e.g., "warehouse-7"
	ZoneType string // e.g., "warehouse", "office", "outdoor"
	Name     string // e.g., "Main Warehouse"

	// Context fields
	OrgID    string
	Platform string
}

Zone represents a location zone entity. It demonstrates how entity references work in triples - SensorReading references Zone by entity ID, not by string.

func (*Zone) EntityID

func (z *Zone) EntityID() string

EntityID returns a deterministic 6-part federated entity ID for the zone. Example: "acme.logistics.facility.zone.area.warehouse-7"

func (*Zone) MarshalJSON

func (z *Zone) MarshalJSON() ([]byte, error)

MarshalJSON implements custom JSON marshaling for Zone.

func (*Zone) Schema

func (z *Zone) Schema() message.Type

Schema returns the message type for Zone payloads.

func (*Zone) Triples

func (z *Zone) Triples() []message.Triple

Triples returns semantic facts about this zone.

func (*Zone) UnmarshalJSON

func (z *Zone) UnmarshalJSON(data []byte) error

UnmarshalJSON implements custom JSON unmarshaling for Zone.

func (*Zone) Validate

func (z *Zone) Validate() error

Validate checks that the Zone has required fields.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL