Documentation
¶
Overview ¶
Package iotsensor provides an example domain processor demonstrating the correct Graphable implementation pattern for SemStreams.
This package serves as a reference implementation showing how to:
- Create domain-specific payloads that implement the Graphable interface
- Generate federated 6-part entity IDs with organizational context
- Produce semantic triples using registered vocabulary predicates
- Transform incoming JSON into meaningful graph structures
IoT sensors are used as a neutral example domain that is:
- Simple enough to understand quickly
- Complex enough to demonstrate real patterns
- Universally understood across industries
- Not tied to any specific customer domain
For production use, copy this example and adapt it to your domain vocabulary.
Index ¶
- Constants
- func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func ParseZoneEntityID(entityID string) (zoneType, zoneID string)
- func Register(registry *component.Registry) error
- func RegisterVocabulary()
- func ZoneEntityID(orgID, platform, zoneType, zoneID string) string
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) IsStarted() bool
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(timeout time.Duration) error
- type ComponentConfig
- type Config
- type Processor
- type SensorReading
- type Zone
Constants ¶
const ( // Sensor measurement predicates (unit-specific) PredicateMeasurementCelsius = "sensor.measurement.celsius" PredicateMeasurementFahrenheit = "sensor.measurement.fahrenheit" PredicateMeasurementPercent = "sensor.measurement.percent" PredicateMeasurementHPA = "sensor.measurement.hpa" // Sensor classification predicates PredicateClassificationType = "sensor.classification.type" PredicateClassificationAmbient = "sensor.classification.ambient" // Geo location predicates PredicateLocationZone = "geo.location.zone" PredicateLocationLatitude = "geo.location.latitude" PredicateLocationLongitude = "geo.location.longitude" // Time observation predicates PredicateObservationRecorded = "time.observation.recorded" PredicateObservationReceived = "time.observation.received" // Facility zone predicates PredicateZoneName = "facility.zone.name" PredicateZoneType = "facility.zone.type" // Sensor identity predicates (for ALIAS_INDEX) PredicateSensorSerial = "iot.sensor.serial" )
Predicate constants for the IoT sensor domain. These follow the three-level dotted notation: domain.category.property
Variables ¶
This section is empty.
Functions ¶
func NewComponent ¶
func NewComponent( rawConfig json.RawMessage, deps component.Dependencies, ) (component.Discoverable, error)
NewComponent creates a new IoT sensor processor component from configuration. This is the factory function registered with the component registry.
func ParseZoneEntityID ¶
ParseZoneEntityID extracts zone type and zone ID from a full zone entity ID. Zone entity ID format: org.platform.facility.zone.{zoneType}.{zoneID} Example: "c360.logistics.facility.zone.area.cold-storage-1" -> ("area", "cold-storage-1") Returns empty strings if the entity ID is not a valid zone format.
func Register ¶
Register registers the IoT sensor processor component with the given registry. This enables the component to be discovered and instantiated by the component management system.
The registration includes:
- Component factory function for creating instances
- Configuration schema for validation and UI generation
- Type information (processor, domain: iot)
- Protocol identifier for component routing
- Version information for compatibility tracking
func RegisterVocabulary ¶
func RegisterVocabulary()
RegisterVocabulary registers all IoT sensor domain predicates with the vocabulary system. This should be called during application initialization.
Example usage:
func init() {
iotsensor.RegisterVocabulary()
}
func ZoneEntityID ¶
ZoneEntityID generates a federated 6-part entity ID for a zone. This is the single source of truth for zone entity ID format, ensuring consistency between Zone.EntityID() and any references to zones.
Example: ZoneEntityID("acme", "logistics", "area", "warehouse-7") Returns: "acme.logistics.facility.zone.area.warehouse-7"
Types ¶
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component wraps the domain-specific IoT sensor processor with component lifecycle. It bridges the gap between the stateless domain processor and the stateful component framework that handles NATS messaging and lifecycle management.
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema for this processor.
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics for this processor.
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns the current health status of this processor.
func (*Component) Initialize ¶
Initialize prepares the component (no-op for IoT sensor processor)
func (*Component) InputPorts ¶
InputPorts returns the NATS input ports this processor subscribes to.
func (*Component) OutputPorts ¶
OutputPorts returns the NATS output port for Graphable sensor readings.
type ComponentConfig ¶
type ComponentConfig struct {
// Ports defines NATS input/output subjects for message routing
Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
// OrgID is the organization identifier for federated entity IDs
OrgID string `json:"org_id" schema:"type:string,description:Organization identifier,category:basic,required:true"`
// Platform is the platform/product identifier for federated entity IDs
Platform string `json:"platform" schema:"type:string,description:Platform identifier,category:basic,required:true"`
}
ComponentConfig holds configuration for the IoT sensor processor component. This wraps the domain-specific processor configuration with port information required by the component framework.
func DefaultConfig ¶
func DefaultConfig() ComponentConfig
DefaultConfig returns the default configuration for IoT sensor processor
type Config ¶
type Config struct {
// OrgID is the organization identifier (e.g., "acme")
// This becomes the first part of federated entity IDs.
OrgID string
// Platform is the platform/product identifier (e.g., "logistics")
// This becomes the second part of federated entity IDs.
Platform string
}
Config holds the configuration for the IoT sensor processor. It provides the organizational context that is applied to all processed readings.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor transforms incoming JSON sensor data into Graphable payloads. It applies the organizational context from configuration and produces SensorReading instances with proper federated entity IDs and semantic triples.
This demonstrates the correct pattern for domain processors:
- Configuration provides organizational context
- Process method transforms data with domain understanding
- Output is a Graphable payload, not generic JSON
func NewProcessor ¶
NewProcessor creates a new IoT sensor processor with the given configuration.
func (*Processor) Process ¶
func (p *Processor) Process(input map[string]any) (*SensorReading, error)
Process transforms incoming JSON data into a SensorReading.
Expected JSON format:
{
"device_id": "sensor-042",
"type": "temperature",
"reading": 23.5,
"unit": "celsius",
"location": "warehouse-7",
"timestamp": "2025-11-26T10:30:00Z"
}
The processor:
- Extracts fields from the incoming JSON
- Applies organizational context from config
- Returns a SensorReading that implements Graphable
This method demonstrates domain-specific transformation logic:
- Field extraction with proper type handling
- Context enrichment from configuration
- Validation of required fields
type SensorReading ¶
type SensorReading struct {
// Input fields (from incoming JSON)
DeviceID string // e.g., "sensor-042"
SensorType string // e.g., "temperature", "humidity", "pressure"
Value float64 // e.g., 23.5
Unit string // e.g., "celsius", "percent", "hpa"
ObservedAt time.Time // When measurement was taken
// Alias field (for ALIAS_INDEX testing)
SerialNumber string // e.g., "SN-2025-001234" - manufacturer serial number
// Geospatial fields (for SPATIAL_INDEX testing)
Latitude *float64 // e.g., 37.7749 (nil if not provided)
Longitude *float64 // e.g., -122.4194 (nil if not provided)
Altitude *float64 // e.g., 10.0 meters (optional)
// Entity reference fields (computed by processor)
ZoneEntityID string // e.g., "acme.logistics.facility.zone.area.warehouse-7"
// Context fields (set by processor from config)
OrgID string // e.g., "acme"
Platform string // e.g., "logistics"
}
SensorReading represents an IoT sensor measurement. It implements the Graphable interface with federated entity IDs and semantic predicates.
This is an example of a domain-specific payload that encodes semantic understanding of the data, as opposed to generic processors that make semantic decisions without domain knowledge.
func (*SensorReading) EntityID ¶
func (s *SensorReading) EntityID() string
EntityID returns a deterministic 6-part federated entity ID following the pattern: {org}.{platform}.{domain}.{system}.{type}.{instance}
Example: "acme.logistics.environmental.sensor.temperature.sensor-042"
The 6 parts provide:
- org: Organization identifier (multi-tenancy)
- platform: Platform/product within the organization
- domain: Business domain (environmental, logistics, etc.)
- system: System or subsystem (sensor, actuator, etc.)
- type: Entity type within the system (temperature, humidity, etc.)
- instance: Unique instance identifier
func (*SensorReading) MarshalJSON ¶
func (s *SensorReading) MarshalJSON() ([]byte, error)
MarshalJSON implements json.Marshaler for SensorReading. Uses alias pattern to avoid infinite recursion.
func (*SensorReading) Schema ¶
func (s *SensorReading) Schema() message.Type
Schema returns the message type for sensor readings. This identifies the payload type for routing and processing. Type format: domain.category.version → iot.sensor.v1
func (*SensorReading) Triples ¶
func (s *SensorReading) Triples() []message.Triple
Triples returns semantic facts about this sensor reading using domain-appropriate predicates from the vocabulary system.
Each triple follows the Subject-Predicate-Object pattern where:
- Subject: This entity's ID (self-reference)
- Predicate: Semantic property using dotted notation (domain.category.property)
- Object: The value (literal) or entity reference (another entity ID)
The triples produced demonstrate:
- Unit-specific predicates (sensor.measurement.celsius vs generic "value")
- Entity references (geo.location.zone points to Zone entity, not a string)
- Classification triples (sensor.classification.type)
- Temporal tracking (time.observation.recorded)
func (*SensorReading) UnmarshalJSON ¶
func (s *SensorReading) UnmarshalJSON(data []byte) error
UnmarshalJSON implements json.Unmarshaler for SensorReading. Uses alias pattern to avoid infinite recursion.
func (*SensorReading) Validate ¶
func (s *SensorReading) Validate() error
Validate checks that the sensor reading has all required fields.
type Zone ¶
type Zone struct {
ZoneID string // e.g., "warehouse-7"
ZoneType string // e.g., "warehouse", "office", "outdoor"
Name string // e.g., "Main Warehouse"
// Context fields
OrgID string
Platform string
}
Zone represents a location zone entity. It demonstrates how entity references work in triples - SensorReading references Zone by entity ID, not by string.
func (*Zone) EntityID ¶
EntityID returns a deterministic 6-part federated entity ID for the zone. Example: "acme.logistics.facility.zone.area.warehouse-7"
func (*Zone) MarshalJSON ¶
MarshalJSON implements custom JSON marshaling for Zone.
func (*Zone) UnmarshalJSON ¶
UnmarshalJSON implements custom JSON unmarshaling for Zone.