message

package
v1.0.0-alpha.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2026 License: MIT Imports: 12 Imported by: 0

README

Message Package

Polymorphic message handling for SemStreams data flows with behavior-based processing and type-safe payload management.

Overview

The message package provides the foundation for type-safe, extensible message processing in SemStreams. Messages flow through components as polymorphic envelopes (BaseMessage) containing typed payloads that can be type-asserted to behavioral interfaces for specialized processing.

Key design principles:

  • Polymorphic by default: Messages deserialize to correct concrete types automatically
  • Behavior-based processing: Type-assert to capabilities (Locatable, Timeable, etc.) not concrete types
  • Registry-driven: Global payload registry enables extensibility without code changes
  • Type-safe: Compile-time type checking with runtime polymorphism
  • Immutable: Messages are read-only after creation (safe for concurrent access)

Installation

import "github.com/c360/semstreams/message"

Core Concepts

Message Structure

Every SemStreams message consists of:

  1. Envelope (BaseMessage): Polymorphic wrapper with type metadata
  2. Payload: Concrete data implementing the Payload interface
  3. Behavioral Interfaces: Optional capabilities for specialized processing
┌─ BaseMessage ────────────────────────────┐
│ Type: "core.json.v1"                     │
│ PayloadData: <raw JSON>                  │
│ ├─ payloadInstance ─────────────────┐    │
│ │  GenericJSONPayload               │    │
│ │  ├─ Data: map[string]any          │    │
│ │  ├─ Implements: Payload           │    │
│ │  └─ Optionally: Locatable,        │    │
│ │                 Timeable, etc.    │    │
│ └───────────────────────────────────┘    │
└──────────────────────────────────────────┘
Type System

Payloads are identified by "domain.category.version" strings:

  • core.json.v1 - Generic JSON (built-in SemStreams type)
  • robotics.mavlink.v1 - MAVLink messages (domain-specific)
  • iot.sensor.v2 - IoT sensor data (domain-specific)

Components declare accepted types in their configuration, and the flow engine validates compatibility before deployment.

Behavioral Interfaces

Optional capabilities that payloads can implement:

Interface Purpose Methods Use Case
Locatable Geographic coordinates Location() (lat, lon float64) Mapping, geofencing
Timeable Temporal metadata Timestamp() time.Time Time-series analysis
Observable Sensor observations ObservedEntity(), ObservedProperty(), ObservedValue(), ObservedUnit() Sensor data processing
Correlatable Message correlation CorrelationID() string Request/response matching
Identifiable Entity identification EntityID() string, EntityType() string Entity tracking
Targetable Destination info TargetID() string, TargetType() string Routing decisions

Processors type-assert to the specific behaviors they need, making components reusable across different payload types.

Quick Start

Creating Messages
// Create a GenericJSON payload
payload := message.NewGenericJSON(map[string]any{
    "sensor_id": "temp-001",
    "temperature": 23.5,
    "unit": "celsius",
})

// Wrap in BaseMessage
msg := message.NewBaseMessage(payload)

// Serialize to JSON
data, err := json.Marshal(msg)
// Result: {"type":"core.json.v1","payload":{"data":{"sensor_id":"temp-001"...}}}
Deserializing Messages
// Automatic polymorphic reconstruction
var msg message.BaseMessage
err := json.Unmarshal(data, &msg)
// Payload automatically created based on "type" field

// Access the payload
payload := msg.Payload()
if genericJSON, ok := payload.(*message.GenericJSONPayload); ok {
    temperature := genericJSON.Data["temperature"].(float64)
    fmt.Printf("Temperature: %.1f°C\n", temperature)
}
Using Behavioral Interfaces
// Check if payload has location data
if locatable, ok := msg.Payload().(message.Locatable); ok {
    lat, lon := locatable.Location()
    fmt.Printf("Location: %.6f, %.6f\n", lat, lon)
}

// Check if payload has timestamp
if timeable, ok := msg.Payload().(message.Timeable); ok {
    ts := timeable.Timestamp()
    fmt.Printf("Recorded at: %s\n", ts.Format(time.RFC3339))
}

// Check for observation data
if obs, ok := msg.Payload().(message.Observable); ok {
    entity := obs.ObservedEntity()
    property := obs.ObservedProperty()
    value := obs.ObservedValue()
    unit := obs.ObservedUnit()
    fmt.Printf("%s.%s = %v %s\n", entity, property, value, unit)
}

Architecture

Message Flow
┌─────────────┐     ┌────────────────┐     ┌─────────────┐     ┌─────────────┐
│   Input     │────▶│   Processor    │────▶│  Processor  │────▶│   Output    │
│ (UDP/File)  │     │  (Filter/Map)  │     │(Transform)  │     │(File/WS)    │
└─────────────┘     └────────────────┘     └─────────────┘     └─────────────┘
      │                     │                      │                    │
      ▼                     ▼                      ▼                    ▼
 BaseMessage          BaseMessage            BaseMessage          BaseMessage
  (Payload)            (Payload)              (Payload)            (Payload)

Each component:

  1. Receives BaseMessage from NATS
  2. Type-checks the payload type
  3. Type-asserts to required behavioral interfaces
  4. Processes the payload
  5. Creates new BaseMessage with transformed payload
  6. Publishes to next component
Payload Registry

The global payload registry maps type identifiers to factory functions:

Registry: {
  "core.json.v1" → func() any { return &GenericJSONPayload{} }
  "robotics.position.v1" → func() any { return &PositionPayload{} }
  "iot.sensor.v2" → func() any { return &SensorPayload{} }
}

When BaseMessage.UnmarshalJSON() encounters "type":"core.json.v1", it:

  1. Looks up "core.json.v1" in the registry
  2. Calls the factory to create an empty payload instance
  3. Unmarshals the JSON data into that instance
  4. Stores the typed instance in payloadInstance

This enables polymorphic deserialization without reflection or code generation.

Integration Points

NATS: Messages serialize to JSON for NATS pub/sub:

  • Subjects: sensors.temperature, robots.position, etc.
  • Payload: Complete BaseMessage JSON

Component Registry: Components declare type compatibility:

  • input_types: ["core.json.v1", "iot.sensor.v2"]
  • output_types: ["core.json.v1"]

Flow Engine: Validates type compatibility in flow graphs before deployment

Metrics: Message counts tracked by type (via Schema().String())

Usage Patterns

Creating Custom Payload Types
package myapp

import (
    "time"
    "github.com/c360/semstreams/component"
    "github.com/c360/semstreams/message"
)

// Define your payload
type RobotPositionPayload struct {
    RobotID   string    `json:"robot_id"`
    Latitude  float64   `json:"latitude"`
    Longitude float64   `json:"longitude"`
    Timestamp time.Time `json:"timestamp"`
}

// Implement Payload interface
// PayloadType() is not part of the Payload interface
// Use Schema().String() to get the type identifier
// Schema() method is implemented above

func (p *RobotPositionPayload) Validate() error {
    // Structural validation: required fields
    if p.RobotID == "" {
        return errors.WrapInvalid(errors.ErrInvalidData, "RobotPositionPayload", "Validate", "robot_id is required")
    }

    // Semantic validation: value ranges
    if p.Latitude < -90 || p.Latitude > 90 {
        return errors.WrapInvalid(errors.ErrInvalidData, "RobotPositionPayload", "Validate",
            fmt.Sprintf("latitude must be between -90 and 90, got: %.6f", p.Latitude))
    }
    if p.Longitude < -180 || p.Longitude > 180 {
        return errors.WrapInvalid(errors.ErrInvalidData, "RobotPositionPayload", "Validate",
            fmt.Sprintf("longitude must be between -180 and 180, got: %.6f", p.Longitude))
    }
    return nil
}

// Implement behavioral interfaces
func (p *RobotPositionPayload) Location() (float64, float64) {
    return p.Latitude, p.Longitude
}

func (p *RobotPositionPayload) Timestamp() time.Time {
    return p.Timestamp
}

func (p *RobotPositionPayload) EntityID() string {
    return p.RobotID
}

func (p *RobotPositionPayload) EntityType() string {
    return "robot"
}

// Register with global registry
func init() {
    err := component.RegisterPayload(&component.PayloadRegistration{
        Domain:      "robotics",
        Category:    "position",
        Version:     "v1",
        Description: "Robot geographic position with timestamp",
        Factory: func() any {
            return &RobotPositionPayload{}
        },
        Example: RobotPositionPayload{
            RobotID:   "robot-001",
            Latitude:  40.7128,
            Longitude: -74.0060,
            Timestamp: time.Now(),
        },
    })
    if err != nil {
        panic(err)
    }
}
Writing Behavior-Based Processors
// Processor that works with ANY payload that is Locatable
type GeofenceProcessor struct {
    center message.Location
    radius float64
}

func (p *GeofenceProcessor) Process(msg *message.BaseMessage) (*message.BaseMessage, error) {
    // Type-assert to Locatable behavior
    locatable, ok := msg.Payload().(message.Locatable)
    if !ok {
        return nil, fmt.Errorf("payload must be Locatable")
    }

    lat, lon := locatable.Location()
    distance := p.calculateDistance(lat, lon, p.center.Lat, p.center.Lon)

    if distance > p.radius {
        // Outside geofence - create alert payload
        alertPayload := message.NewGenericJSON(map[string]any{
            "alert_type": "geofence_breach",
            "distance": distance,
            "threshold": p.radius,
        })
        return message.NewBaseMessage(alertPayload), nil
    }

    // Inside geofence - pass through
    return msg, nil
}

This processor works with RobotPositionPayload, VehicleLocationPayload, or ANY payload implementing Locatable.

GenericJSON for Prototyping

Use GenericJSONPayload for rapid iteration:

// Quick prototype - no custom types needed
func createTestMessage() *message.BaseMessage {
    payload := message.NewGenericJSON(map[string]any{
        "test_id": "test-001",
        "status": "running",
        "metrics": map[string]any{
            "cpu": 45.2,
            "memory": 67.8,
        },
    })
    return message.NewBaseMessage(payload)
}

// Process in a flow
func processTestData(msg *message.BaseMessage) {
    if genericJSON, ok := msg.Payload().(*message.GenericJSONPayload); ok {
        metrics := genericJSON.Data["metrics"].(map[string]any)
        cpu := metrics["cpu"].(float64)

        if cpu > 80.0 {
            // Alert high CPU
        }
    }
}

When to use GenericJSON:

  • ✅ Rapid prototyping and iteration
  • ✅ Integration tests with flexible schemas
  • ✅ ETL pipelines with varying structures
  • ✅ JSON transformation workflows

When NOT to use GenericJSON:

  • ❌ Production systems with strict schemas
  • ❌ Type-safe domain models
  • ❌ Performance-critical paths (use typed payloads)

Testing

Testing Behavioral Interfaces
func TestProcessorWithLocatable(t *testing.T) {
    // Create mock Locatable payload
    type MockLocatable struct {
        Lat, Lon float64
    }

    // Schema() implements Payload interface - Schema().String() returns type ID
    func (m *MockLocatable) Validate() error { return nil }
    func (m *MockLocatable) Location() (float64, float64) { return m.Lat, m.Lon }

    // Register mock type
    component.RegisterPayload(&component.PayloadRegistration{
        Domain: "mock",
        Category: "locatable",
        Version: "v1",
        Factory: func() any { return &MockLocatable{} },
    })

    // Test processor
    payload := &MockLocatable{Lat: 40.7, Lon: -74.0}
    msg := message.NewBaseMessage(payload)

    processor := NewGeofenceProcessor(...)
    result, err := processor.Process(msg)

    require.NoError(t, err)
    assert.NotNil(t, result)
}
Testing Round-Trip Serialization
func TestMessageRoundTrip(t *testing.T) {
    // Create original message
    original := message.NewGenericJSON(map[string]any{
        "test": "value",
        "number": 42,
    })
    originalMsg := message.NewBaseMessage(original)

    // Marshal to JSON
    data, err := json.Marshal(originalMsg)
    require.NoError(t, err)

    // Unmarshal back
    var reconstructed message.BaseMessage
    err = json.Unmarshal(data, &reconstructed)
    require.NoError(t, err)

    // Verify type and payload
    assert.Equal(t, "core.json.v1", reconstructed.Type)
    payload := reconstructed.Payload().(*message.GenericJSONPayload)
    assert.Equal(t, "value", payload.Data["test"])
    assert.Equal(t, float64(42), payload.Data["number"])
}

Implementation Notes

Thread Safety

Messages are immutable after creation:

  • BaseMessage fields set once during construction or unmarshaling
  • Payload instances should be immutable or use copy-on-write
  • Safe for concurrent reads from multiple goroutines
  • NOT safe for concurrent modification

Global registry is thread-safe:

  • Payload registry uses internal synchronization
  • Safe to register types concurrently (during init())
  • Safe to create payloads concurrently
Performance

Polymorphic deserialization overhead:

  • Registry lookup: O(1) map access (~10ns)
  • Type assertion: Zero-cost Go interface check
  • JSON unmarshaling: Standard library performance

For high-throughput scenarios (>10K msg/sec):

  • Use typed payloads (avoid map[string]any)
  • Pool BaseMessage instances if needed
  • Consider binary serialization (protobuf, msgpack)

Benchmarks (typical hardware):

BenchmarkBaseMessage_Marshal       1000000   1200 ns/op
BenchmarkBaseMessage_Unmarshal     800000    1500 ns/op
BenchmarkTypedPayload_Marshal      2000000    600 ns/op
BenchmarkTypedPayload_Unmarshal    1500000    900 ns/op
Error Handling

Uses SemStreams error classification:

import "github.com/c360/semstreams/errors"

// Invalid input (malformed JSON, unknown type)
errors.WrapInvalid(err, "BaseMessage", "UnmarshalJSON", "validate type format")

// Programming errors (factory returns wrong type)
errors.WrapFatal(err, "BaseMessage", "UnmarshalJSON", "factory contract violation")

// General errors (JSON marshaling)
errors.Wrap(err, "BaseMessage", "MarshalJSON", "marshal payload")

Error classification enables retry logic and proper error handling in components.

Security Considerations

Input Validation:

  • Payload types validated against "domain.category.version" format
  • Unknown types rejected (must be registered)
  • Factory contract enforced (must return Payload instance)
  • Payload-specific validation via Validate() method

GenericJSON Limitations:

  • No built-in size limits (enforce at transport layer)
  • No depth limits (enforce at transport layer)
  • No schema validation (use typed payloads for strict schemas)

Best Practices:

  • Validate payloads in components before processing
  • Enforce message size limits at NATS/transport level
  • Use typed payloads for production systems
  • Implement Validate() thoroughly for custom payloads

Common Patterns

Enrichment Pattern
// Add metadata to existing message
func enrichMessage(msg *message.BaseMessage) (*message.BaseMessage, error) {
    // Extract original data
    original := msg.Payload().(*message.GenericJSONPayload)

    // Create enriched payload
    enriched := message.NewGenericJSON(map[string]any{
        "original": original.Data,
        "enriched_at": time.Now(),
        "enrichment_version": "v1",
    })

    return message.NewBaseMessage(enriched), nil
}
Transformation Pattern
// Transform payload to different type
func transformToAlert(msg *message.BaseMessage) (*message.BaseMessage, error) {
    // Type-assert to source type
    if obs, ok := msg.Payload().(message.Observable); ok {
        value := obs.ObservedValue()

        if value > threshold {
            // Create alert payload (custom type)
            alert := &AlertPayload{
                AlertType: "threshold_exceeded",
                Source: obs.ObservedEntity(),
                Value: value,
                Timestamp: time.Now(),
            }
            return message.NewBaseMessage(alert), nil
        }
    }
    return nil, nil // No alert needed
}
Filtering Pattern
// Filter messages based on behavioral capabilities
func filterLocatableOnly(msgs []*message.BaseMessage) []*message.BaseMessage {
    var result []*message.BaseMessage
    for _, msg := range msgs {
        if _, ok := msg.Payload().(message.Locatable); ok {
            result = append(result, msg)
        }
    }
    return result
}

Migration from Legacy Systems

If migrating from string-based message types:

// Old: Type in payload
type OldMessage struct {
    Type    string
    Payload json.RawMessage
}

// New: Use BaseMessage
func migrateOldMessage(old *OldMessage) (*message.BaseMessage, error) {
    // Map old types to new type identifiers
    typeMap := map[string]string{
        "temperature": "iot.sensor.v1",
        "position": "robotics.position.v1",
    }

    newType, ok := typeMap[old.Type]
    if !ok {
        // Fall back to GenericJSON
        var data map[string]any
        json.Unmarshal(old.Payload, &data)
        payload := message.NewGenericJSON(data)
        return message.NewBaseMessage(payload), nil
    }

    // Reconstruct as BaseMessage
    envelope := fmt.Sprintf(`{"type":"%s","payload":%s}`, newType, old.Payload)
    var msg message.BaseMessage
    err := json.Unmarshal([]byte(envelope), &msg)
    return &msg, err
}

Troubleshooting

"unknown payload type" Error

Problem: UnmarshalJSON fails with "unknown payload type: X.Y.Z"

Solution: Ensure the payload type is registered with the global registry:

func init() {
    component.RegisterPayload(&component.PayloadRegistration{
        Domain: "X",
        Category: "Y",
        Version: "Z",
        Factory: func() any { return &MyPayload{} },
    })
}
Type Assertion Fails

Problem: Type assertion returns ok=false

Debugging:

// Check actual type
payload := msg.Payload()
fmt.Printf("Payload type: %T\n", payload)
fmt.Printf("Payload type ID: %s\n", payload.Schema().String())

// Check behavioral interface
if locatable, ok := payload.(message.Locatable); ok {
    fmt.Println("Payload IS Locatable")
} else {
    fmt.Println("Payload is NOT Locatable")
    // Check what interfaces it does implement
}
JSON Deserialization Fails

Problem: UnmarshalJSON succeeds but payload fields are zero-valued

Cause: JSON field names don't match struct tags

Solution: Ensure struct tags match JSON field names:

// Wrong
type MyPayload struct {
    Value string // Missing json tag
}

// Correct
type MyPayload struct {
    Value string `json:"value"`
}

Examples

See *_test.go files for comprehensive examples:

  • base_message_test.go - Round-trip serialization, behavioral interfaces
  • generic_json_test.go - GenericJSON usage, nested structures
  • behaviors.go - Behavioral interface definitions with documentation

References

Contributing

When adding new behavioral interfaces:

  1. Keep interfaces small (1-3 methods)
  2. Document use cases clearly
  3. Provide example implementations
  4. Add tests demonstrating usage
  5. Update this README with the new interface

When adding new payload types:

  1. Implement Payload interface
  2. Implement relevant behavioral interfaces
  3. Register in init() function
  4. Add comprehensive tests
  5. Document type identifier format

Documentation

Overview

Package message provides the core message infrastructure for the SemStreams platform. It defines interfaces and types for creating, validating, and processing messages that flow through the semantic event mesh.

Architecture

The package follows a clean, domain-agnostic design with three core concepts:

1. Messages - Containers that combine typed payloads with metadata 2. Payloads - Domain-specific data that may implement behavioral interfaces 3. Metadata - Information about message lifecycle and origin

Message Structure

Every message consists of:

  • A unique ID for tracking and deduplication
  • A structured Type (domain, category, version)
  • A Payload containing the actual data
  • Metadata about creation time, source, etc.
  • A content-based hash for integrity

Behavioral Interfaces

The message package uses runtime capability discovery through optional interfaces. Payloads implement only the interfaces relevant to their domain, and services discover these capabilities dynamically through type assertions.

## Primary Entity Interface

Graphable: Declares entities and relationships for knowledge graph storage

  • EntityID() string - Returns federated entity identifier
  • Triples() []Triple - Returns semantic facts about the entity
  • Use when: Payload represents entities that should be stored in the graph
  • Example: Drone telemetry, sensor readings, IoT device states

## Spatial Interfaces

Locatable: Provides geographic coordinates for spatial indexing

  • Location() (lat, lon float64) - Returns decimal degrees coordinates
  • Use when: Payload has geographic location data
  • Example: GPS coordinates, facility locations, vehicle positions

## Temporal Interfaces

Timeable: Provides event/observation timestamp for time-series analysis

  • Timestamp() time.Time - Returns event time (not message creation time)
  • Use when: Payload represents time-series data or historical events
  • Example: Sensor readings, log entries, financial trades

Expirable: Defines time-to-live for automatic cleanup

  • ExpiresAt() time.Time - Returns expiration timestamp
  • TTL() time.Duration - Returns time-to-live from creation
  • Use when: Payload should be automatically cleaned up after expiration
  • Example: Temporary cache entries, session data, ephemeral events

## Observational Interfaces

Observable: Represents observations of other entities

  • ObservedEntity() string - Returns ID of observed entity
  • ObservedProperty() string - Returns observed property name
  • ObservedValue() any - Returns observed value
  • ObservedUnit() string - Returns measurement unit
  • Use when: Payload is a sensor reading or observation
  • Example: Temperature sensor reading, pressure measurement

Measurable: Contains multiple measurements with units

  • Measurements() map[string]any - Returns all measurements
  • Unit(measurement string) string - Returns unit for specific measurement
  • Use when: Payload contains multiple related measurements
  • Example: Weather station data, health monitor readings

## Correlation Interfaces

Correlatable: Enables distributed tracing and request/response matching

  • CorrelationID() string - Returns correlation identifier
  • Use when: Messages need to be correlated across requests/responses
  • Example: Request/response pairs, distributed transactions

Traceable: Supports distributed tracing with spans (OpenTelemetry compatible)

  • TraceID() string - Returns trace identifier
  • SpanID() string - Returns span identifier
  • ParentSpanID() string - Returns parent span identifier
  • Use when: Integrating with distributed tracing systems
  • Example: Microservice calls, async processing chains

## Processing Interfaces

Processable: Specifies processing priority and deadlines

  • Priority() int - Returns priority (0-10, higher = more important)
  • Deadline() time.Time - Returns processing deadline
  • Use when: Messages need priority-based or deadline-aware processing
  • Example: Real-time alerts, time-sensitive commands

Deployable: Indicates deployment or collection membership

  • DeploymentID() string - Returns deployment identifier
  • Use when: Messages need to be grouped by deployment context
  • Example: Multi-tenant systems, A/B testing, staging environments

## Runtime Discovery Pattern

Services discover capabilities at runtime through type assertions:

// Check for location data
if locatable, ok := msg.Payload().(Locatable); ok {
    lat, lon := locatable.Location()
    // Index by location, build spatial queries, etc.
}

// Check for entity data
if graphable, ok := msg.Payload().(Graphable); ok {
    entityID := graphable.EntityID()
    triples := graphable.Triples()
    // Store in knowledge graph, build relationships, etc.
}

// Check for time-series data
if timeable, ok := msg.Payload().(Timeable); ok {
    timestamp := timeable.Timestamp()
    // Index by time, build time-series queries, etc.
}

This pattern enables services to process any message type without prior knowledge of the specific payload structure, discovering capabilities dynamically.

Type System Hierarchy

The message package uses three related but distinct type representations, each serving a specific purpose in the semantic event mesh architecture. All three implement the Keyable interface, providing consistent dotted notation for NATS routing, storage keys, and entity identification.

## 1. Type (Message Schema) - "domain.category.version"

Purpose: Identifies message schemas for routing, validation, and evolution. Format: Type{Domain: "sensors", Category: "gps", Version: "v1"} -> "sensors.gps.v1"

Use When:

  • Defining message schemas in payload implementations (Schema() method)
  • Configuring component input/output types
  • Routing messages through NATS subjects
  • Versioning payload schemas for evolution

Example:

type TemperaturePayload struct { /* fields */ }
func (t *TemperaturePayload) Schema() Type {
    return Type{Domain: "sensors", Category: "temperature", Version: "v1"}
}
// Used for: NATS subject "sensors.temperature.v1"

## 2. EntityType (Graph Classification) - "domain.type"

Purpose: Classifies entities in the property graph for querying and analysis. Format: EntityType{Domain: "robotics", Type: "drone"} -> "robotics.drone"

Use When:

  • Extracting type from EntityID: entityID.EntityType()
  • Querying the graph for entities by type
  • Classifying nodes in the knowledge graph
  • Building semantic relationships between entity types

Example:

// EntityType is derived from EntityID
entityID := EntityID{
    Org: "c360", Platform: "platform1",
    Domain: "robotics", Type: "drone",
    System: "mav1", Instance: "1",
}
entityType := entityID.EntityType()  // Returns EntityType{Domain: "robotics", Type: "drone"}
// Used for: Graph queries like "find all robotics.drone entities"

## 3. EntityID (Federated Identity) - "org.platform.domain.system.type.instance"

Purpose: Provides globally unique entity identifiers across federated platforms. Format: EntityID with 6 parts -> "c360.platform1.robotics.mav1.drone.1"

Use When:

  • Multi-platform deployments need unique entity identity
  • Merging data from multiple sources with overlapping IDs
  • Federating entities across organizational boundaries
  • Building distributed knowledge graphs

Example:

entityID := EntityID{
    Org:      "c360",      // Organization namespace
    Platform: "platform1", // Platform instance
    Domain:   "robotics",  // Data domain
    System:   "mav1",      // Message source system
    Type:     "drone",     // Entity type
    Instance: "1",         // Local instance ID
}
// Key(): "c360.platform1.robotics.mav1.drone.1"
// Used for: Federated entity resolution and deduplication

## Type Relationships

These types form a hierarchy:

EntityID (6 parts) contains → EntityType (2 parts)
  entityID.EntityType() returns EntityType{Domain: "robotics", Type: "drone"}

Type (3 parts) is independent → Used for message schemas, not entity identity

All three implement Keyable:

type Keyable interface {
    Key() string  // Returns dotted notation for NATS subjects and storage
}

## Choosing the Right Type

Ask yourself:

  1. Defining a message schema? → Use Type (Schema() method)
  2. Providing entity identity? → Use EntityID (Graphable.EntityID() method)
  3. Extracting entity classification? → Use EntityType (derived from EntityID)

Most payloads only need Type (for Schema()). Only implement Graphable if your payload represents entities that should be stored in the knowledge graph. EntityType is typically not constructed directly - it's extracted from EntityID using the EntityType() method when querying or classifying graph entities.

Usage Example

// Define a payload type
type TemperaturePayload struct {
    SensorID    string    `json:"sensor_id"`
    Temperature float64   `json:"temperature"`
    Unit        string    `json:"unit"`
    Timestamp   time.Time `json:"timestamp"`
}

// Implement required Payload interface
func (t *TemperaturePayload) Schema() Type {
    return Type{
        Domain:   "sensors",
        Category: "temperature",
        Version:  "v1",
    }
}

func (t *TemperaturePayload) Validate() error {
    if t.SensorID == "" {
        return errors.New("sensor ID required")
    }
    return nil
}

func (t *TemperaturePayload) MarshalJSON() ([]byte, error) {
    // Use alias to avoid infinite recursion
    type Alias TemperaturePayload
    return json.Marshal((*Alias)(t))
}

func (t *TemperaturePayload) UnmarshalJSON(data []byte) error {
    // Use alias to avoid infinite recursion
    type Alias TemperaturePayload
    return json.Unmarshal(data, (*Alias)(t))
}

// Implement optional behavioral interfaces
func (t *TemperaturePayload) Timestamp() time.Time {
    return t.Timestamp
}

// Create and use a message
payload := &TemperaturePayload{
    SensorID:    "temp-001",
    Temperature: 22.5,
    Unit:        "celsius",
    Timestamp:   time.Now(),
}

msg := NewBaseMessage(
    payload.Schema(),
    payload,
    "temperature-monitor",
)

// Services can discover capabilities
// Modern approach using Graphable (preferred)
if graphable, ok := msg.Payload().(Graphable); ok {
    entityID := graphable.EntityID()
    triples := graphable.Triples()
    fmt.Printf("Entity: %s with %d triples\n", entityID, len(triples))
    for _, triple := range triples {
        fmt.Printf("  %s: %v\n", triple.Predicate, triple.Object)
    }
}

Message Lifecycle

Messages follow a predictable lifecycle from creation through processing:

## 1. Creation

Messages are created using NewBaseMessage with optional configuration:

// Simple message with current timestamp
msg := NewBaseMessage(payload.Schema(), payload, "my-service")

// Historical data with specific timestamp
msg := NewBaseMessage(payload.Schema(), payload, "my-service",
    WithTime(historicalTime))

// Federated message for multi-platform deployment
msg := NewBaseMessage(payload.Schema(), payload, "my-service",
    WithFederation(platformConfig))

Messages are immutable after creation - all fields are set during construction and cannot be modified. This ensures message integrity throughout processing.

## 2. Validation

Validation happens at two levels:

Structural Validation (Required):

  • Message type must be valid (domain, category, version all present)
  • Payload must not be nil
  • Metadata must not be nil

Payload Validation (Domain-specific):

  • Implemented by Payload.Validate() method
  • Checks domain-specific business rules
  • May be structural (required fields) or semantic (value ranges)

Example validation:

if err := msg.Validate(); err != nil {
    // Handle validation error
    // Check error type: Fatal, Transient, or Invalid
}

## 3. Serialization

Messages are serialized to JSON for transmission over NATS:

// Serialize message
data, err := json.Marshal(msg)

// Deserialize message
var msg BaseMessage
err := json.Unmarshal(data, &msg)

Wire format preserves:

  • Message ID for deduplication and tracking
  • Type information for routing and schema validation
  • Payload data using Payload.MarshalJSON()
  • Metadata timestamps (millisecond precision) and source

Note: Deserialization requires payload types to be registered in the global PayloadRegistry. For generic JSON processing, use the well-known type "core.json.v1" (GenericJSONPayload).

## 4. Transmission

Messages are published to NATS subjects derived from their type:

subject := msg.Type().Key()  // e.g., "sensors.temperature.v1"
nc.Publish(subject, data)

This enables:

  • Type-based routing using NATS wildcards
  • Version-specific processing
  • Domain isolation and security policies

## 5. Processing

Services receive and process messages based on their subscriptions:

// Subscribe to specific type
nc.Subscribe("sensors.temperature.v1", handler)

// Subscribe to all versions
nc.Subscribe("sensors.temperature.*", handler)

// Subscribe to entire domain
nc.Subscribe("sensors.>", handler)

Services discover payload capabilities at runtime:

func handler(m *nats.Msg) {
    var msg BaseMessage
    json.Unmarshal(m.Data, &msg)

    // Discover capabilities
    if graphable, ok := msg.Payload().(Graphable); ok {
        // Process entity data
    }
    if locatable, ok := msg.Payload().(Locatable); ok {
        // Process location data
    }
}

Best Practices

## Payload Implementation

1. Implement Required Methods

  • Schema() Type - Return structured type information
  • Validate() error - Perform domain-specific validation
  • MarshalJSON() ([]byte, error) - Use alias pattern to avoid recursion
  • UnmarshalJSON([]byte) error - Use alias pattern to avoid recursion

2. Implement Optional Interfaces Thoughtfully

  • Only implement behavioral interfaces that make semantic sense
  • Don't implement Locatable just because you have two numbers
  • Don't implement Timeable if you only have message creation time
  • Consider whether your payload truly represents an entity before implementing Graphable

3. Validation Philosophy

  • Structural validation: Check required fields and basic types
  • Semantic validation: Check business rules and value ranges (optional)
  • Fail fast: Return first validation error encountered
  • Provide clear error messages with context

4. Use Type Constants

  • Define message types as package constants

  • Don't inline type construction at call sites

  • Example:

    // Good: Type as constant var TemperatureType = message.Type{ Domain: "sensors", Category: "temperature", Version: "v1", }

    // Bad: Inline type construction msg := NewBaseMessage( message.Type{Domain: "sensors", Category: "temperature", Version: "v1"}, payload, source)

## Message Creation

1. Use Functional Options for Configuration

  • Start with simple NewBaseMessage(type, payload, source)
  • Add options only when needed: WithTime(), WithFederation()
  • Don't create custom constructors - use options instead

2. Set Source Meaningfully

  • Use service name or component identifier
  • Be consistent across your application
  • Example: "temperature-monitor", "gps-processor", "user-service"

3. Timestamp Considerations

  • Default timestamp (time.Now()) is correct for most cases
  • Use WithTime() only for historical data or testing
  • For event time vs creation time: use Timeable interface

## Message Processing

1. Type Assertion Pattern

  • Use comma-ok idiom for capability discovery

  • Don't assume interfaces are implemented

  • Example:

    if graphable, ok := msg.Payload().(Graphable); ok { // Safe to use graphable methods } else { // Payload doesn't implement Graphable, skip or handle differently }

2. Error Handling

  • Always check validation errors before processing
  • Use errors.As() to detect error types (Fatal, Transient, Invalid)
  • Don't rely on error message strings for logic

3. Immutability

  • Don't modify message fields after creation
  • Create new messages for transformations
  • Payload data can be read but not written

## Testing

1. Use Real Test Data

  • Create actual payload instances, don't use mocks
  • Test with both valid and invalid data
  • Test serialization round-trips

2. Test Behavioral Interfaces

  • Verify interface implementations return correct values
  • Test type assertions work as expected
  • Ensure optional interfaces are truly optional

3. Test Validation

  • Test both valid and invalid payloads
  • Verify error messages are clear
  • Test edge cases and boundary conditions

Design Philosophy

The messages package embodies several key design principles:

1. Separation of Concerns: Messages know nothing about routing, storage, or entity extraction. They are pure data containers.

2. Discoverability: Behavioral interfaces allow services to discover and utilize message capabilities at runtime without tight coupling.

3. Domain Agnosticism: The core package contains no domain-specific code. All domain logic lives in domain packages that use these interfaces.

4. Extensibility: New behavioral interfaces can be added without breaking existing code. Payloads can implement any combination of interfaces.

5. Type Safety: Structured Type provides compile-time safety while maintaining flexibility.

Package message provides the GenericJSON payload for StreamKit.

Package message provides core message infrastructure for SemStreams. See doc.go for complete package documentation.

Index

Constants

View Source
const (
	// ContentRoleBody is the primary text content (full document text).
	// Embedding workers prioritize this role for text extraction.
	ContentRoleBody = "body"

	// ContentRoleAbstract is a brief summary or description.
	// Used when body is not available or for additional context.
	ContentRoleAbstract = "abstract"

	// ContentRoleTitle is the document or entity title.
	// Typically included in embeddings for context.
	ContentRoleTitle = "title"

	// ContentRoleMedia is the primary media content (video, image, audio).
	// Used for binary content that may be processed by multimodal embedders.
	ContentRoleMedia = "media"

	// ContentRoleThumbnail is a preview image for media content.
	ContentRoleThumbnail = "thumbnail"
)

ContentRole constants define standard semantic roles for ContentFields. Using these constants ensures consistency across implementations.

Variables

View Source
var ParseEntityID = types.ParseEntityID

ParseEntityID creates EntityID from dotted string format. Expects exactly 6 parts: org.platform.domain.system.type.instance Returns an error if the format is invalid.

Functions

func GetPlatform

func GetPlatform(msg Message) (config.PlatformConfig, bool)

GetPlatform is a helper function to extract platform information from any message. It returns the platform config and true if the message has federation metadata, or an empty config and false otherwise.

This allows code to gracefully handle both federated and non-federated messages:

if platform, ok := GetPlatform(msg); ok {
    // Handle federated message with platform info
    globalID := BuildGlobalID(entityID, platform)
} else {
    // Handle non-federated message
    localID := entityID
}

func GetUID

func GetUID(msg Message) (uuid.UUID, bool)

GetUID is a helper function to extract the UID from any message. It returns the UID and true if the message has federation metadata, or a zero UUID and false otherwise.

func IsValidEntityID

func IsValidEntityID(s string) bool

IsValidEntityID checks if a string conforms to the canonical 6-part EntityID format. Valid format: organization.platform.domain.system.type.instance (e.g., "c360.platform1.robotics.mav1.drone.0") This enables consistent entity identification across the system.

Types

type BaseMessage

type BaseMessage struct {
	// contains filtered or unexported fields
}

BaseMessage provides the standard implementation of the Message interface. It combines a typed payload with metadata to create a complete message ready for transmission through the semantic event mesh.

BaseMessage is immutable after creation - all fields are set during construction and cannot be modified. This ensures message integrity throughout the processing pipeline.

Construction using Functional Options:

NewBaseMessage uses the functional options pattern for clean, composable configuration:

// Simple message (most common)
msg := NewBaseMessage(msgType, payload, "my-service")

// With specific timestamp (testing/historical data)
msg := NewBaseMessage(msgType, payload, "my-service", WithTime(pastTime))

// With federation support (multi-platform)
msg := NewBaseMessage(msgType, payload, "my-service", WithFederation(platform))

// Composable options
msg := NewBaseMessage(msgType, payload, "my-service",
    WithFederation(platform),
    WithTime(pastTime))

func NewBaseMessage

func NewBaseMessage(msgType Type, payload Payload, source string, opts ...Option) *BaseMessage

NewBaseMessage creates a new BaseMessage with optional configuration.

Parameters:

  • msgType: Structured type information (domain, category, version)
  • payload: The message payload implementing the Payload interface
  • source: Identifier of the service or component creating this message
  • opts: Optional configuration functions

Examples:

// Simple message with current timestamp
msg := NewBaseMessage(msgType, payload, "my-service")

// Message with specific timestamp (for historical data)
msg := NewBaseMessage(msgType, payload, "my-service", WithTime(pastTime))

// Federated message for multi-platform deployment
msg := NewBaseMessage(msgType, payload, "my-service", WithFederation(platform))

// Federated message with specific timestamp
msg := NewBaseMessage(msgType, payload, "my-service",
    WithFederationAndTime(platform, pastTime))

func (*BaseMessage) Hash

func (m *BaseMessage) Hash() string

Hash returns a SHA256 hash of the message content. The hash includes the message type and payload data.

func (*BaseMessage) ID

func (m *BaseMessage) ID() string

ID returns the unique message identifier.

func (*BaseMessage) MarshalJSON

func (m *BaseMessage) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler for BaseMessage. This allows BaseMessage to be serialized to JSON even though its fields are private.

Validation is performed before serialization to ensure invalid messages cannot be serialized and published to the message bus.

func (*BaseMessage) Meta

func (m *BaseMessage) Meta() Meta

Meta returns the message metadata.

func (*BaseMessage) Payload

func (m *BaseMessage) Payload() Payload

Payload returns the message payload.

func (*BaseMessage) Type

func (m *BaseMessage) Type() Type

Type returns the structured message type.

func (*BaseMessage) UnmarshalJSON

func (m *BaseMessage) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler for BaseMessage. Requires payload types to be registered in the global PayloadRegistry. For generic JSON processing, use the well-known type "core.json.v1" (GenericJSONPayload).

func (*BaseMessage) Validate

func (m *BaseMessage) Validate() error

Validate performs comprehensive message validation.

type BinaryContent

type BinaryContent struct {
	ContentType string // MIME type: "image/jpeg", "video/mp4", "application/pdf"
	Data        []byte // The actual binary data
}

BinaryContent describes a binary field for storage. Used by BinaryStorable implementations to provide binary data along with its content type for proper handling.

type BinaryStorable

type BinaryStorable interface {
	ContentStorable

	// BinaryFields returns binary content to store in ObjectStore.
	//
	// Each field is stored as a separate binary blob with its own key.
	// The returned map keys are field names that can be referenced in
	// ContentFields for semantic role mapping.
	//
	// Example return value:
	//   {"video": {ContentType: "video/mp4", Data: videoBytes},
	//    "thumbnail": {ContentType: "image/jpeg", Data: thumbBytes}}
	BinaryFields() map[string]BinaryContent
}

BinaryStorable extends ContentStorable for payloads with binary content.

This interface enables storage of images, videos, PDFs, and other binary files alongside text content. Binary data is stored directly in ObjectStore (no base64 encoding) with JSON metadata referencing the binary keys.

Example implementation:

type VideoDocument struct {
    Title       string
    Description string
    VideoData   []byte
    Thumbnail   []byte
}

func (v *VideoDocument) RawContent() map[string]string {
    return map[string]string{
        "title":       v.Title,
        "description": v.Description,
    }
}

func (v *VideoDocument) BinaryFields() map[string]BinaryContent {
    return map[string]BinaryContent{
        "video":     {ContentType: "video/mp4", Data: v.VideoData},
        "thumbnail": {ContentType: "image/jpeg", Data: v.Thumbnail},
    }
}

type ContentStorable

type ContentStorable interface {
	Storable // EntityID() + Triples() + StorageRef()

	// ContentFields returns semantic role → field name mapping.
	//
	// This tells consumers how to find content in the stored data without
	// hardcoding field names. Keys are semantic roles understood by consumers
	// (like embedding workers), values are field names in RawContent().
	//
	// Standard semantic roles:
	//   - "body":     Primary text content (full document text)
	//   - "abstract": Brief summary or description
	//   - "title":    Document title
	//
	// Example return value:
	//   {"body": "content", "abstract": "description", "title": "title"}
	//
	// The map should only include roles that have non-empty content.
	ContentFields() map[string]string

	// RawContent returns the content to store in ObjectStore.
	//
	// Field names in this map should match values in ContentFields().
	// This is what gets serialized and stored; consumers retrieve it
	// via StorageRef and use ContentFields to find specific content.
	//
	// Example return value:
	//   {"title": "Safety Manual", "content": "Full document text...", "description": "Brief summary"}
	RawContent() map[string]string
}

ContentStorable extends Storable for payloads with large content fields.

This interface enables the "process → store → graph" pattern where:

  1. Processor creates semantic understanding (triples with metadata only)
  2. ObjectStore stores raw content and returns StorageReference
  3. GraphProcessor receives ContentStorable with both semantics and content reference
  4. EmbeddingWorker uses ContentFields to find text for embedding

The key insight is that ContentStorable separates metadata (in triples) from content (in ObjectStore), while providing a semantic map of content structure via ContentFields. This avoids bloating triples with large text and enables efficient embedding extraction without hardcoded field name coupling.

Example implementation:

type Document struct {
    Title       string
    Description string
    Body        string
    storageRef  *StorageReference
}

func (d *Document) EntityID() string { return d.entityID }
func (d *Document) Triples() []Triple { return metadataTriples } // NO body
func (d *Document) StorageRef() *StorageReference { return d.storageRef }

func (d *Document) ContentFields() map[string]string {
    return map[string]string{
        "body":     "body",        // semantic role → field name
        "abstract": "description",
        "title":    "title",
    }
}

func (d *Document) RawContent() map[string]string {
    return map[string]string{
        "title":       d.Title,
        "description": d.Description,
        "body":        d.Body,
    }
}

type Correlatable

type Correlatable interface {
	// CorrelationID returns a unique identifier for correlating related messages.
	// This enables request/response matching and distributed tracing.
	// Common patterns:
	//   - Request ID: Shared by request and response
	//   - Trace ID: Shared by all messages in a distributed operation
	//   - Causation ID: Links cause and effect messages
	CorrelationID() string
}

Correlatable enables distributed tracing and causality tracking. Payloads implementing this interface can be correlated across requests, responses, and distributed operations.

type DefaultFederationMeta

type DefaultFederationMeta struct {
	*DefaultMeta
	// contains filtered or unexported fields
}

DefaultFederationMeta provides the standard implementation of FederationMeta. It embeds DefaultMeta and adds federation-specific fields.

func NewFederationMeta

func NewFederationMeta(source string, platform config.PlatformConfig) *DefaultFederationMeta

NewFederationMeta creates a new DefaultFederationMeta with auto-generated UID.

Parameters:

  • source: The service or component creating this message
  • platform: The platform configuration identifying the origin instance

The UID is automatically generated as a new UUID, and timestamps are set to the current time.

func NewFederationMetaWithTime

func NewFederationMetaWithTime(
	source string,
	platform config.PlatformConfig,
	createdAt time.Time,
) *DefaultFederationMeta

NewFederationMetaWithTime creates a new DefaultFederationMeta with specific creation time. Useful for historical data import or testing.

func (*DefaultFederationMeta) Platform

Platform returns the originating platform configuration.

func (*DefaultFederationMeta) UID

func (m *DefaultFederationMeta) UID() uuid.UUID

UID returns the global unique identifier for this message.

type DefaultMeta

type DefaultMeta struct {
	// contains filtered or unexported fields
}

DefaultMeta provides the standard implementation of the Meta interface. It tracks when an event occurred, when it was received by the system, and where it originated from.

func NewDefaultMeta

func NewDefaultMeta(createdAt time.Time, source string) *DefaultMeta

NewDefaultMeta creates a new DefaultMeta instance with the given creation time and source. The received time is automatically set to the current time.

func NewDefaultMetaWithReceivedAt

func NewDefaultMetaWithReceivedAt(createdAt, receivedAt time.Time, source string) *DefaultMeta

NewDefaultMetaWithReceivedAt creates a new DefaultMeta instance with explicit creation and received times. This is useful for testing or when importing historical data.

func (*DefaultMeta) CreatedAt

func (m *DefaultMeta) CreatedAt() time.Time

CreatedAt returns when the original event occurred.

func (*DefaultMeta) ReceivedAt

func (m *DefaultMeta) ReceivedAt() time.Time

ReceivedAt returns when the system received the message.

func (*DefaultMeta) Source

func (m *DefaultMeta) Source() string

Source returns the origin of the message.

type Deployable

type Deployable interface {
	// DeploymentID returns the identifier of the deployment or collection.
	// Example: "production-us-west", "staging-experiment-42"
	DeploymentID() string
}

Deployable indicates the payload belongs to a deployment or collection. This enables grouping messages by deployment context.

type EntityID

type EntityID = types.EntityID

EntityID represents a complete entity identifier with semantic structure. Follows the pattern: org.platform.domain.system.type.instance for federated entity management.

type EntityType

type EntityType = types.EntityType

EntityType represents a structured entity type identifier using dotted notation. Format: EntityType{Domain: "domain", Type: "type"} -> Key() returns "domain.type"

type Expirable

type Expirable interface {
	// ExpiresAt returns the time when this payload expires.
	// Returns zero time if payload never expires.
	ExpiresAt() time.Time

	// TTL returns the time-to-live duration from creation.
	// Returns 0 if payload never expires.
	TTL() time.Duration
}

Expirable defines time-to-live for automatic cleanup. Payloads implementing this interface can be automatically expired and cleaned up by storage systems.

type FederationMeta

type FederationMeta interface {
	Meta

	// UID returns the global unique identifier for this message.
	// This is a proper UUID that ensures uniqueness across all platforms.
	UID() uuid.UUID

	// Platform returns information about the originating platform.
	// This identifies which SemStreams instance created this message.
	Platform() config.PlatformConfig
}

FederationMeta extends Meta with federation support for multi-platform deployments. It adds a global unique identifier (UID) and platform information to enable entity correlation across distributed SemStreams instances.

The UID is a proper UUID that provides global uniqueness, while the Platform information identifies the specific instance/region where the message originated.

This enables several federation patterns:

  • Cross-platform entity correlation (same drone tracked by multiple stations)
  • Data aggregation from multiple regions
  • Entity resolution across federated deployments
  • Distributed graph queries

type GenericJSONPayload

type GenericJSONPayload struct {
	// Data contains the JSON payload as a map.
	// This supports arbitrary JSON structures while remaining type-safe
	// at the component level (components declare they work with core.json.v1).
	Data map[string]any `json:"data"`
}

GenericJSONPayload provides a simple, explicitly flexible payload type for testing, prototyping, and basic data processing flows.

This is an intentional, well-known type (core.json.v1) designed for:

  • Rapid prototyping of flows
  • Integration testing
  • Basic JSON data processing (filter, map, transform)
  • Simple ETL pipelines

Components that work with GenericJSON (JSONFilter, JSONMap) explicitly declare they require "core.json.v1" type, providing type safety while maintaining flexibility for arbitrary JSON structures.

Example usage:

payload := &GenericJSONPayload{
    Data: map[string]any{
        "sensor_id": "temp-001",
        "temperature": 23.5,
        "unit": "celsius",
    },
}

func NewGenericJSON

func NewGenericJSON(data map[string]any) *GenericJSONPayload

NewGenericJSON creates a new GenericJSON payload with the given data.

func (*GenericJSONPayload) MarshalJSON

func (g *GenericJSONPayload) MarshalJSON() ([]byte, error)

MarshalJSON serializes the GenericJSON payload to JSON format. The output format matches the input structure with a "data" wrapper.

func (*GenericJSONPayload) Schema

func (g *GenericJSONPayload) Schema() Type

Schema returns the payload type identifier for GenericJSON. Always returns core.json.v1 as this is the well-known type for generic JSON processing in StreamKit.

func (*GenericJSONPayload) UnmarshalJSON

func (g *GenericJSONPayload) UnmarshalJSON(data []byte) error

UnmarshalJSON deserializes JSON data into the GenericJSON payload.

func (*GenericJSONPayload) Validate

func (g *GenericJSONPayload) Validate() error

Validate performs basic validation on the GenericJSON payload. Ensures the data map is not nil.

type Keyable

type Keyable = types.Keyable

Keyable interface represents types that can be converted to semantic keys using dotted notation.

type Locatable

type Locatable interface {
	// Location returns geographic coordinates as (latitude, longitude).
	// Values should be in decimal degrees:
	//   - Latitude: -90.0 to +90.0 (negative = South, positive = North)
	//   - Longitude: -180.0 to +180.0 (negative = West, positive = East)
	Location() (lat, lon float64)
}

Locatable provides geographic coordinates for spatial indexing. Payloads implementing this interface can be indexed by location and queried spatially.

type Measurable

type Measurable interface {
	// Measurements returns a map of measurement names to values.
	// Each measurement may have different units.
	Measurements() map[string]any

	// Unit returns the unit for a specific measurement.
	// Returns empty string if measurement doesn't exist or is unitless.
	Unit(measurement string) string
}

Measurable contains multiple measurements with units. This interface extends Observable for payloads that contain multiple related measurements.

type Message

type Message interface {
	// ID returns a unique identifier for this message instance.
	// Typically a UUID, this ID is immutable and globally unique.
	ID() string

	// Type returns structured type information used for routing and processing.
	// The type contains domain, category, and version information.
	Type() Type

	// Payload returns the message payload that may implement behavioral interfaces.
	// Payloads expose capabilities like Identifiable, Locatable, Observable, etc.
	Payload() Payload

	// Meta returns metadata about the message lifecycle and origin.
	// Includes creation time, receipt time, and source service information.
	Meta() Meta

	// Hash returns a content-based hash for deduplication and storage.
	// The hash is computed from the message type and payload data.
	Hash() string

	// Validate performs comprehensive validation of the message.
	// Checks message type validity, payload presence, and payload-specific validation.
	Validate() error
}

Message represents the core message interface for the SemStreams platform. Messages are the fundamental unit of data flow, carrying typed payloads with metadata through the semantic event mesh.

Design principles:

  • Infrastructure-agnostic: Messages contain only data, no routing or storage logic
  • Behavioral payloads: Payloads expose capabilities through -able interfaces
  • Flexible metadata: Meta interface allows different metadata implementations
  • Content-addressable: Hash method enables deduplication and referencing

Example:

msg := NewBaseMessage(
    Type{Domain: "sensors", Category: "gps", Version: "v1"},
    gpsPayload,
    "gps-reader-service"
)

type Meta

type Meta interface {
	// CreatedAt returns when the original event or observation occurred.
	// For sensor data, this is the measurement time.
	// For business events, this is when the event happened.
	CreatedAt() time.Time

	// ReceivedAt returns when the message entered the processing system.
	// This helps track ingestion latency and message age.
	// May be the same as CreatedAt for real-time streams.
	ReceivedAt() time.Time

	// Source returns the identifier of the message originator.
	// Examples: "gps-reader-service", "sensor-12345", "order-processor"
	// Used for debugging, tracing, and access control.
	Source() string
}

Meta provides metadata about a message's lifecycle and origin. This interface enables tracking of when messages were created, when they entered the system, and where they originated.

Using an interface rather than a concrete type allows for:

  • Custom metadata implementations for specific domains
  • Extended metadata with additional fields when needed
  • Easier testing with mock implementations

type Observable

type Observable interface {
	// ObservedEntity returns the ID of the entity being observed.
	// Example: "sensor-123", "vehicle-456", "building-789"
	ObservedEntity() string

	// ObservedProperty returns the property being observed.
	// Example: "temperature", "pressure", "velocity", "position"
	ObservedProperty() string

	// ObservedValue returns the observed value as a generic interface.
	// The actual type depends on the property being observed.
	ObservedValue() any

	// ObservedUnit returns the unit of measurement.
	// Example: "celsius", "pascal", "m/s", "meters"
	// Returns empty string if unitless.
	ObservedUnit() string
}

Observable represents observations of other entities. This interface is used for sensor data, measurements, and observations that reference an external entity or property being observed.

type Option

type Option func(*BaseMessage)

Option is a functional option for configuring BaseMessage construction.

func WithFederation

func WithFederation(platform config.PlatformConfig) Option

WithFederation enables federation support by using FederationMeta. This adds global UIDs for cross-platform message correlation.

func WithFederationAndTime

func WithFederationAndTime(platform config.PlatformConfig, createdAt time.Time) Option

WithFederationAndTime combines federation support with a specific timestamp.

func WithMeta

func WithMeta(meta Meta) Option

WithMeta replaces the default metadata with a custom Meta implementation.

func WithTime

func WithTime(createdAt time.Time) Option

WithTime sets a specific creation timestamp instead of using time.Now(). Useful for historical data import or testing.

type Payload

type Payload interface {
	// Schema returns the Type that defines this payload's structure.
	// This enables type-safe routing and processing throughout the system.
	Schema() Type

	// Validate checks the payload data for correctness.
	// Returns nil if valid, or an error describing the validation failure.
	// Should validate:
	//   - Required fields are present
	//   - Values are within acceptable ranges
	//   - Business rules are satisfied
	Validate() error

	// JSON serialization using standard Go interfaces.
	// Payloads must implement json.Marshaler and json.Unmarshaler
	// for deterministic serialization. The same payload must always
	// produce the same JSON output.
	json.Marshaler
	json.Unmarshaler
}

Payload represents the data carried by a message. All message payloads must implement this interface to provide schema information, validation, and serialization capabilities.

Payloads may also implement behavioral interfaces (Graphable, Locatable, Observable, etc.) to expose additional capabilities that can be discovered and utilized at runtime.

Example implementation:

type GPSPayload struct {
    DeviceID  string    `json:"device_id"`
    Latitude  float64   `json:"latitude"`
    Longitude float64   `json:"longitude"`
    Timestamp time.Time `json:"timestamp"`
}

func (p *GPSPayload) Schema() Type {
    return Type{Domain: "sensors", Category: "gps", Version: "v1"}
}

func (p *GPSPayload) Validate() error {
    if p.DeviceID == "" {
        return errors.New("device ID is required")
    }
    if p.Latitude < -90 || p.Latitude > 90 {
        return errors.New("invalid latitude")
    }
    if p.Longitude < -180 || p.Longitude > 180 {
        return errors.New("invalid longitude")
    }
    return nil
}

func (p *GPSPayload) MarshalJSON() ([]byte, error) {
    // Use alias to avoid infinite recursion
    type Alias GPSPayload
    return json.Marshal((*Alias)(p))
}

func (p *GPSPayload) UnmarshalJSON(data []byte) error {
    // Use alias to avoid infinite recursion
    type Alias GPSPayload
    return json.Unmarshal(data, (*Alias)(p))
}

type Processable

type Processable interface {
	// Priority returns the processing priority (higher = more important).
	// Typical range: 0 (lowest) to 10 (highest).
	Priority() int

	// Deadline returns the time by which processing must complete.
	// Returns zero time if no deadline.
	Deadline() time.Time
}

Processable specifies processing priority and deadlines. This enables deadline-aware processing and priority queues.

type Storable

type Storable interface {
	// EntityID returns deterministic 6-part ID: org.platform.domain.system.type.instance
	// (duplicated from graph.Graphable to avoid import cycle)
	EntityID() string

	// Triples returns all facts about this entity
	// (duplicated from graph.Graphable to avoid import cycle)
	Triples() []Triple

	// StorageRef returns reference to where full data is stored.
	// May return nil if data is not stored externally.
	StorageRef() *StorageReference
}

Storable extends graph.Graphable with storage reference capability. Components that implement Storable can provide both semantic information (via Graphable) and a reference to their full data.

This interface enables the lightweight message pattern where:

  1. Domain processors create messages with semantic data
  2. ObjectStore stores full message and adds StorageReference
  3. GraphProcessor receives Storable with both semantics and reference
  4. Consumers can access full data via StorageReference when needed

NOTE: This interface duplicates graph.Graphable methods inline to avoid an import cycle (graph imports message for Triple, so message cannot import graph). Any type implementing this interface also implements graph.Graphable automatically due to identical method signatures.

Example implementation:

type StoredEntity struct {
    entityID string
    triples  []Triple
    storage  *StorageReference
}

func (s *StoredEntity) EntityID() string { return s.entityID }
func (s *StoredEntity) Triples() []Triple { return s.triples }
func (s *StoredEntity) StorageRef() *StorageReference { return s.storage }

type StorageReference

type StorageReference struct {
	// StorageInstance identifies which storage component holds the data.
	// This enables federation across multiple storage instances.
	// Examples: "message-store", "cache-1", "objectstore-primary"
	StorageInstance string `json:"storage_instance"`

	// Key is the storage-specific key to retrieve the data.
	// Format depends on the storage backend but typically includes
	// time-based partitioning for efficient retrieval.
	// Examples: "2025/01/13/14/msg_abc123", "robotics/drone/1/latest"
	Key string `json:"key"`

	// ContentType specifies the MIME type of the stored content.
	// This helps consumers understand how to process the data.
	// Examples: "application/json", "application/protobuf", "application/avro"
	ContentType string `json:"content_type"`

	// Size is an optional hint about the stored data size in bytes.
	// This helps consumers decide whether to fetch the full data.
	// A value of 0 indicates the size is unknown.
	Size int64 `json:"size,omitempty"`
}

StorageReference points to where the full message data is stored. This enables lightweight message passing where components can reference stored data without transmitting the full payload.

The StorageReference pattern supports the "store once, reference everywhere" architecture, reducing data duplication and enabling efficient processing of large messages.

Example usage:

  • ObjectStore stores full message and returns StorageReference
  • GraphProcessor receives Storable with reference to full data
  • Components can fetch full data only when needed

type Timeable

type Timeable interface {
	// Timestamp returns the observation or event time.
	// This should be the actual time of observation/event, not message
	// creation time (which is in BaseMessage metadata).
	Timestamp() time.Time
}

Timeable provides temporal information for time-series analysis. Payloads implementing this interface can be indexed by time and queried temporally.

type Traceable

type Traceable interface {
	// TraceID returns the trace identifier.
	TraceID() string

	// SpanID returns the span identifier for this message.
	SpanID() string

	// ParentSpanID returns the parent span identifier.
	// Returns empty string if this is a root span.
	ParentSpanID() string
}

Traceable supports distributed tracing with spans. This interface enables integration with distributed tracing systems like OpenTelemetry, Jaeger, or Zipkin.

type Triple

type Triple struct {
	// Subject identifies the entity this triple describes.
	// Must use EntityID.Key() format for consistency with federated entity management.
	// Examples: "telemetry.robotics.drone.1", "gcs-alpha.robotics.drone.001"
	Subject string `json:"subject"`

	// Predicate identifies the semantic property using three-level dotted notation.
	// Format: domain.category.property (e.g., "robotics.battery.level")
	// This maintains consistency with the unified dotted notation from Alpha Week 1.
	// Predicates should be defined in the vocabulary package for consistency.
	Predicate string `json:"predicate"`

	// Object contains the property value or entity reference.
	// For literals: primitive types (float64, bool, string, int)
	// For entity references: entity ID strings using EntityID.Key() format
	// Complex objects should be flattened into multiple triples.
	Object any `json:"object"`

	// Source identifies where this assertion came from.
	// Examples: "mavlink_heartbeat", "gps_fix", "operator_input", "ai_inference"
	// Enables traceability and conflict resolution during entity merging.
	Source string `json:"source"`

	// Timestamp indicates when this assertion was made.
	// Should typically be the message timestamp or processing time.
	// Enables temporal queries and helps with entity state evolution.
	Timestamp time.Time `json:"timestamp"`

	// Confidence indicates the reliability of this assertion (0.0 to 1.0).
	// Higher values indicate greater certainty about the triple.
	//
	// Typical confidence levels:
	//   - 1.0: Direct telemetry or explicit data
	//   - 0.9: High-confidence sensor readings
	//   - 0.7: Calculated or derived values
	//   - 0.5: Inferred relationships
	//   - 0.0: Uncertain or placeholder data
	Confidence float64 `json:"confidence"`

	// Context provides correlation ID for message batches or request tracking.
	// This enables grouping related triples from the same processing batch
	// or correlating triples across distributed systems.
	// Examples: message ID, batch ID, correlation ID, request ID
	Context string `json:"context,omitempty"`

	// Datatype provides optional RDF datatype hint for the Object value.
	// This helps with type interpretation and validation in downstream systems.
	// Examples: "xsd:float", "xsd:dateTime", "geo:point", "xsd:boolean"
	// If omitted, the type is inferred from the Go type of Object.
	Datatype string `json:"datatype,omitempty"`

	// ExpiresAt indicates when this triple should be considered expired.
	// When nil, the triple never expires and remains valid indefinitely.
	// When set, the triple is considered expired after this timestamp.
	// This enables TTL-based triple expiration for temporal data management.
	ExpiresAt *time.Time `json:"expires_at,omitempty"`
}

Triple represents a semantic statement about an entity following the Subject-Predicate-Object pattern. This enables RDF-like knowledge graphs while maintaining simplicity for Go developers.

Triple design follows these principles:

  • Subject: Always an entity ID using EntityID.Key() format (e.g., "telemetry.robotics.drone.1")
  • Predicate: Semantic property using three-level dotted notation (e.g., "robotics.battery.level")
  • Object: Typed value (literals) or entity references (other entity IDs)
  • Metadata: Source, timestamp, and confidence for provenance tracking

Example triples from a drone heartbeat message:

  • ("telemetry.robotics.drone.1", "robotics.battery.level", 85.5)
  • ("telemetry.robotics.drone.1", "robotics.flight.armed", true)
  • ("telemetry.robotics.drone.1", "robotics.component.has", "telemetry.robotics.battery.1")

This structure enables:

  • NATS wildcard queries: "robotics.battery.*" finds all battery predicates
  • Entity relationship modeling: Objects can reference other entities
  • Temporal tracking: Each triple has timestamp and confidence
  • Provenance: Source field tracks where each assertion came from
  • Federation: Works with federated entity IDs from multiple sources

func (Triple) IsExpired

func (t Triple) IsExpired() bool

IsExpired returns true if the triple has an expiration time that has passed. Returns false if ExpiresAt is nil (never expires) or if the expiration time has not yet been reached (including exact equality with current time). A triple is only considered expired when time.Now() is strictly after ExpiresAt.

func (Triple) IsRelationship

func (t Triple) IsRelationship() bool

IsRelationship checks if this triple represents a relationship between entities rather than a property with a literal value. Returns true if Object is a valid EntityID (4-part dotted notation).

type TripleGenerator

type TripleGenerator interface {
	// Triples returns semantic triples extracted from this message.
	// Each triple represents a meaningful assertion about an entity,
	// using structured predicates and proper confidence scoring.
	Triples() []Triple
}

TripleGenerator enables messages to produce semantic triples for graph storage. This interface replaces the use of untyped Properties maps with structured semantic assertions that enable reasoning and complex queries.

Implementations should:

  • Generate triples for all meaningful properties from the message
  • Use vocabulary predicate constants for consistency
  • Include entity relationships as triples with entity reference objects
  • Set appropriate confidence levels based on data source quality
  • Use EntityID.Key() format for consistent subject identification

Example implementation for a drone battery payload:

func (b *BatteryPayload) Triples() []Triple {
    entityID := EntityID{
        Source: "telemetry", Domain: "robotics", Type: "drone",
        Instance: fmt.Sprintf("%d", b.SystemID),
    }.Key()

    return []Triple{{
        Subject: entityID,
        Predicate: vocabulary.ROBOTICS_BATTERY_LEVEL, // "robotics.battery.level"
        Object: float64(b.BatteryRemaining),
        Source: "mavlink_battery",
        Timestamp: time.Now(),
        Confidence: 1.0,
    }}
}

type Type

type Type = types.Type

Type provides structured type information for messages. It enables type-safe routing and processing by clearly identifying the domain, category, and version of each message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL