Documentation
¶
Overview ¶
Package message provides the core message infrastructure for the SemStreams platform. It defines interfaces and types for creating, validating, and processing messages that flow through the semantic event mesh.
Architecture ¶
The package follows a clean, domain-agnostic design with three core concepts:
1. Messages - Containers that combine typed payloads with metadata 2. Payloads - Domain-specific data that may implement behavioral interfaces 3. Metadata - Information about message lifecycle and origin
Message Structure ¶
Every message consists of:
- A unique ID for tracking and deduplication
- A structured Type (domain, category, version)
- A Payload containing the actual data
- Metadata about creation time, source, etc.
- A content-based hash for integrity
Behavioral Interfaces ¶
The message package uses runtime capability discovery through optional interfaces. Payloads implement only the interfaces relevant to their domain, and services discover these capabilities dynamically through type assertions.
## Primary Entity Interface
Graphable: Declares entities and relationships for knowledge graph storage
- EntityID() string - Returns federated entity identifier
- Triples() []Triple - Returns semantic facts about the entity
- Use when: Payload represents entities that should be stored in the graph
- Example: Drone telemetry, sensor readings, IoT device states
## Spatial Interfaces
Locatable: Provides geographic coordinates for spatial indexing
- Location() (lat, lon float64) - Returns decimal degrees coordinates
- Use when: Payload has geographic location data
- Example: GPS coordinates, facility locations, vehicle positions
## Temporal Interfaces
Timeable: Provides event/observation timestamp for time-series analysis
- Timestamp() time.Time - Returns event time (not message creation time)
- Use when: Payload represents time-series data or historical events
- Example: Sensor readings, log entries, financial trades
Expirable: Defines time-to-live for automatic cleanup
- ExpiresAt() time.Time - Returns expiration timestamp
- TTL() time.Duration - Returns time-to-live from creation
- Use when: Payload should be automatically cleaned up after expiration
- Example: Temporary cache entries, session data, ephemeral events
## Observational Interfaces
Observable: Represents observations of other entities
- ObservedEntity() string - Returns ID of observed entity
- ObservedProperty() string - Returns observed property name
- ObservedValue() any - Returns observed value
- ObservedUnit() string - Returns measurement unit
- Use when: Payload is a sensor reading or observation
- Example: Temperature sensor reading, pressure measurement
Measurable: Contains multiple measurements with units
- Measurements() map[string]any - Returns all measurements
- Unit(measurement string) string - Returns unit for specific measurement
- Use when: Payload contains multiple related measurements
- Example: Weather station data, health monitor readings
## Correlation Interfaces
Correlatable: Enables distributed tracing and request/response matching
- CorrelationID() string - Returns correlation identifier
- Use when: Messages need to be correlated across requests/responses
- Example: Request/response pairs, distributed transactions
Traceable: Supports distributed tracing with spans (OpenTelemetry compatible)
- TraceID() string - Returns trace identifier
- SpanID() string - Returns span identifier
- ParentSpanID() string - Returns parent span identifier
- Use when: Integrating with distributed tracing systems
- Example: Microservice calls, async processing chains
## Processing Interfaces
Processable: Specifies processing priority and deadlines
- Priority() int - Returns priority (0-10, higher = more important)
- Deadline() time.Time - Returns processing deadline
- Use when: Messages need priority-based or deadline-aware processing
- Example: Real-time alerts, time-sensitive commands
Deployable: Indicates deployment or collection membership
- DeploymentID() string - Returns deployment identifier
- Use when: Messages need to be grouped by deployment context
- Example: Multi-tenant systems, A/B testing, staging environments
## Runtime Discovery Pattern
Services discover capabilities at runtime through type assertions:
// Check for location data
if locatable, ok := msg.Payload().(Locatable); ok {
lat, lon := locatable.Location()
// Index by location, build spatial queries, etc.
}
// Check for entity data
if graphable, ok := msg.Payload().(Graphable); ok {
entityID := graphable.EntityID()
triples := graphable.Triples()
// Store in knowledge graph, build relationships, etc.
}
// Check for time-series data
if timeable, ok := msg.Payload().(Timeable); ok {
timestamp := timeable.Timestamp()
// Index by time, build time-series queries, etc.
}
This pattern enables services to process any message type without prior knowledge of the specific payload structure, discovering capabilities dynamically.
Type System Hierarchy ¶
The message package uses three related but distinct type representations, each serving a specific purpose in the semantic event mesh architecture. All three implement the Keyable interface, providing consistent dotted notation for NATS routing, storage keys, and entity identification.
## 1. Type (Message Schema) - "domain.category.version"
Purpose: Identifies message schemas for routing, validation, and evolution. Format: Type{Domain: "sensors", Category: "gps", Version: "v1"} -> "sensors.gps.v1"
Use When:
- Defining message schemas in payload implementations (Schema() method)
- Configuring component input/output types
- Routing messages through NATS subjects
- Versioning payload schemas for evolution
Example:
type TemperaturePayload struct { /* fields */ }
func (t *TemperaturePayload) Schema() Type {
return Type{Domain: "sensors", Category: "temperature", Version: "v1"}
}
// Used for: NATS subject "sensors.temperature.v1"
## 2. EntityType (Graph Classification) - "domain.type"
Purpose: Classifies entities in the property graph for querying and analysis. Format: EntityType{Domain: "robotics", Type: "drone"} -> "robotics.drone"
Use When:
- Extracting type from EntityID: entityID.EntityType()
- Querying the graph for entities by type
- Classifying nodes in the knowledge graph
- Building semantic relationships between entity types
Example:
// EntityType is derived from EntityID
entityID := EntityID{
Org: "c360", Platform: "platform1",
Domain: "robotics", Type: "drone",
System: "mav1", Instance: "1",
}
entityType := entityID.EntityType() // Returns EntityType{Domain: "robotics", Type: "drone"}
// Used for: Graph queries like "find all robotics.drone entities"
## 3. EntityID (Federated Identity) - "org.platform.domain.system.type.instance"
Purpose: Provides globally unique entity identifiers across federated platforms. Format: EntityID with 6 parts -> "c360.platform1.robotics.mav1.drone.1"
Use When:
- Multi-platform deployments need unique entity identity
- Merging data from multiple sources with overlapping IDs
- Federating entities across organizational boundaries
- Building distributed knowledge graphs
Example:
entityID := EntityID{
Org: "c360", // Organization namespace
Platform: "platform1", // Platform instance
Domain: "robotics", // Data domain
System: "mav1", // Message source system
Type: "drone", // Entity type
Instance: "1", // Local instance ID
}
// Key(): "c360.platform1.robotics.mav1.drone.1"
// Used for: Federated entity resolution and deduplication
## Type Relationships
These types form a hierarchy:
EntityID (6 parts) contains → EntityType (2 parts)
entityID.EntityType() returns EntityType{Domain: "robotics", Type: "drone"}
Type (3 parts) is independent → Used for message schemas, not entity identity
All three implement Keyable:
type Keyable interface {
Key() string // Returns dotted notation for NATS subjects and storage
}
## Choosing the Right Type
Ask yourself:
- Defining a message schema? → Use Type (Schema() method)
- Providing entity identity? → Use EntityID (Graphable.EntityID() method)
- Extracting entity classification? → Use EntityType (derived from EntityID)
Most payloads only need Type (for Schema()). Only implement Graphable if your payload represents entities that should be stored in the knowledge graph. EntityType is typically not constructed directly - it's extracted from EntityID using the EntityType() method when querying or classifying graph entities.
Usage Example ¶
// Define a payload type
type TemperaturePayload struct {
SensorID string `json:"sensor_id"`
Temperature float64 `json:"temperature"`
Unit string `json:"unit"`
Timestamp time.Time `json:"timestamp"`
}
// Implement required Payload interface
func (t *TemperaturePayload) Schema() Type {
return Type{
Domain: "sensors",
Category: "temperature",
Version: "v1",
}
}
func (t *TemperaturePayload) Validate() error {
if t.SensorID == "" {
return errors.New("sensor ID required")
}
return nil
}
func (t *TemperaturePayload) MarshalJSON() ([]byte, error) {
// Use alias to avoid infinite recursion
type Alias TemperaturePayload
return json.Marshal((*Alias)(t))
}
func (t *TemperaturePayload) UnmarshalJSON(data []byte) error {
// Use alias to avoid infinite recursion
type Alias TemperaturePayload
return json.Unmarshal(data, (*Alias)(t))
}
// Implement optional behavioral interfaces
func (t *TemperaturePayload) Timestamp() time.Time {
return t.Timestamp
}
// Create and use a message
payload := &TemperaturePayload{
SensorID: "temp-001",
Temperature: 22.5,
Unit: "celsius",
Timestamp: time.Now(),
}
msg := NewBaseMessage(
payload.Schema(),
payload,
"temperature-monitor",
)
// Services can discover capabilities
// Modern approach using Graphable (preferred)
if graphable, ok := msg.Payload().(Graphable); ok {
entityID := graphable.EntityID()
triples := graphable.Triples()
fmt.Printf("Entity: %s with %d triples\n", entityID, len(triples))
for _, triple := range triples {
fmt.Printf(" %s: %v\n", triple.Predicate, triple.Object)
}
}
Message Lifecycle ¶
Messages follow a predictable lifecycle from creation through processing:
## 1. Creation
Messages are created using NewBaseMessage with optional configuration:
// Simple message with current timestamp
msg := NewBaseMessage(payload.Schema(), payload, "my-service")
// Historical data with specific timestamp
msg := NewBaseMessage(payload.Schema(), payload, "my-service",
WithTime(historicalTime))
// Federated message for multi-platform deployment
msg := NewBaseMessage(payload.Schema(), payload, "my-service",
WithFederation(platformConfig))
Messages are immutable after creation - all fields are set during construction and cannot be modified. This ensures message integrity throughout processing.
## 2. Validation
Validation happens at two levels:
Structural Validation (Required):
- Message type must be valid (domain, category, version all present)
- Payload must not be nil
- Metadata must not be nil
Payload Validation (Domain-specific):
- Implemented by Payload.Validate() method
- Checks domain-specific business rules
- May be structural (required fields) or semantic (value ranges)
Example validation:
if err := msg.Validate(); err != nil {
// Handle validation error
// Check error type: Fatal, Transient, or Invalid
}
## 3. Serialization
Messages are serialized to JSON for transmission over NATS:
// Serialize message data, err := json.Marshal(msg) // Deserialize message var msg BaseMessage err := json.Unmarshal(data, &msg)
Wire format preserves:
- Message ID for deduplication and tracking
- Type information for routing and schema validation
- Payload data using Payload.MarshalJSON()
- Metadata timestamps (millisecond precision) and source
Note: Deserialization requires payload types to be registered in the global PayloadRegistry. For generic JSON processing, use the well-known type "core.json.v1" (GenericJSONPayload).
## 4. Transmission
Messages are published to NATS subjects derived from their type:
subject := msg.Type().Key() // e.g., "sensors.temperature.v1" nc.Publish(subject, data)
This enables:
- Type-based routing using NATS wildcards
- Version-specific processing
- Domain isolation and security policies
## 5. Processing
Services receive and process messages based on their subscriptions:
// Subscribe to specific type
nc.Subscribe("sensors.temperature.v1", handler)
// Subscribe to all versions
nc.Subscribe("sensors.temperature.*", handler)
// Subscribe to entire domain
nc.Subscribe("sensors.>", handler)
Services discover payload capabilities at runtime:
func handler(m *nats.Msg) {
var msg BaseMessage
json.Unmarshal(m.Data, &msg)
// Discover capabilities
if graphable, ok := msg.Payload().(Graphable); ok {
// Process entity data
}
if locatable, ok := msg.Payload().(Locatable); ok {
// Process location data
}
}
Best Practices ¶
## Payload Implementation
1. Implement Required Methods
- Schema() Type - Return structured type information
- Validate() error - Perform domain-specific validation
- MarshalJSON() ([]byte, error) - Use alias pattern to avoid recursion
- UnmarshalJSON([]byte) error - Use alias pattern to avoid recursion
2. Implement Optional Interfaces Thoughtfully
- Only implement behavioral interfaces that make semantic sense
- Don't implement Locatable just because you have two numbers
- Don't implement Timeable if you only have message creation time
- Consider whether your payload truly represents an entity before implementing Graphable
3. Validation Philosophy
- Structural validation: Check required fields and basic types
- Semantic validation: Check business rules and value ranges (optional)
- Fail fast: Return first validation error encountered
- Provide clear error messages with context
4. Use Type Constants
Define message types as package constants
Don't inline type construction at call sites
Example:
// Good: Type as constant var TemperatureType = message.Type{ Domain: "sensors", Category: "temperature", Version: "v1", }
// Bad: Inline type construction msg := NewBaseMessage( message.Type{Domain: "sensors", Category: "temperature", Version: "v1"}, payload, source)
## Message Creation
1. Use Functional Options for Configuration
- Start with simple NewBaseMessage(type, payload, source)
- Add options only when needed: WithTime(), WithFederation()
- Don't create custom constructors - use options instead
2. Set Source Meaningfully
- Use service name or component identifier
- Be consistent across your application
- Example: "temperature-monitor", "gps-processor", "user-service"
3. Timestamp Considerations
- Default timestamp (time.Now()) is correct for most cases
- Use WithTime() only for historical data or testing
- For event time vs creation time: use Timeable interface
## Message Processing
1. Type Assertion Pattern
Use comma-ok idiom for capability discovery
Don't assume interfaces are implemented
Example:
if graphable, ok := msg.Payload().(Graphable); ok { // Safe to use graphable methods } else { // Payload doesn't implement Graphable, skip or handle differently }
2. Error Handling
- Always check validation errors before processing
- Use errors.As() to detect error types (Fatal, Transient, Invalid)
- Don't rely on error message strings for logic
3. Immutability
- Don't modify message fields after creation
- Create new messages for transformations
- Payload data can be read but not written
## Testing
1. Use Real Test Data
- Create actual payload instances, don't use mocks
- Test with both valid and invalid data
- Test serialization round-trips
2. Test Behavioral Interfaces
- Verify interface implementations return correct values
- Test type assertions work as expected
- Ensure optional interfaces are truly optional
3. Test Validation
- Test both valid and invalid payloads
- Verify error messages are clear
- Test edge cases and boundary conditions
Design Philosophy ¶
The messages package embodies several key design principles:
1. Separation of Concerns: Messages know nothing about routing, storage, or entity extraction. They are pure data containers.
2. Discoverability: Behavioral interfaces allow services to discover and utilize message capabilities at runtime without tight coupling.
3. Domain Agnosticism: The core package contains no domain-specific code. All domain logic lives in domain packages that use these interfaces.
4. Extensibility: New behavioral interfaces can be added without breaking existing code. Payloads can implement any combination of interfaces.
5. Type Safety: Structured Type provides compile-time safety while maintaining flexibility.
Package message provides the GenericJSON payload for StreamKit.
Package message provides core message infrastructure for SemStreams. See doc.go for complete package documentation.
Index ¶
- Constants
- Variables
- func GetPlatform(msg Message) (config.PlatformConfig, bool)
- func GetUID(msg Message) (uuid.UUID, bool)
- func IsValidEntityID(s string) bool
- type BaseMessage
- func (m *BaseMessage) Hash() string
- func (m *BaseMessage) ID() string
- func (m *BaseMessage) MarshalJSON() ([]byte, error)
- func (m *BaseMessage) Meta() Meta
- func (m *BaseMessage) Payload() Payload
- func (m *BaseMessage) Type() Type
- func (m *BaseMessage) UnmarshalJSON(data []byte) error
- func (m *BaseMessage) Validate() error
- type BinaryContent
- type BinaryStorable
- type ContentStorable
- type Correlatable
- type DefaultFederationMeta
- type DefaultMeta
- type Deployable
- type EntityID
- type EntityType
- type Expirable
- type FederationMeta
- type GenericJSONPayload
- type Keyable
- type Locatable
- type Measurable
- type Message
- type Meta
- type Observable
- type Option
- type Payload
- type Processable
- type Storable
- type StorageReference
- type Timeable
- type Traceable
- type Triple
- type TripleGenerator
- type Type
Constants ¶
const ( // ContentRoleBody is the primary text content (full document text). // Embedding workers prioritize this role for text extraction. ContentRoleBody = "body" // ContentRoleAbstract is a brief summary or description. // Used when body is not available or for additional context. ContentRoleAbstract = "abstract" // ContentRoleTitle is the document or entity title. // Typically included in embeddings for context. ContentRoleTitle = "title" // ContentRoleMedia is the primary media content (video, image, audio). // Used for binary content that may be processed by multimodal embedders. ContentRoleMedia = "media" // ContentRoleThumbnail is a preview image for media content. ContentRoleThumbnail = "thumbnail" )
ContentRole constants define standard semantic roles for ContentFields. Using these constants ensures consistency across implementations.
Variables ¶
var ParseEntityID = types.ParseEntityID
ParseEntityID creates EntityID from dotted string format. Expects exactly 6 parts: org.platform.domain.system.type.instance Returns an error if the format is invalid.
Functions ¶
func GetPlatform ¶
func GetPlatform(msg Message) (config.PlatformConfig, bool)
GetPlatform is a helper function to extract platform information from any message. It returns the platform config and true if the message has federation metadata, or an empty config and false otherwise.
This allows code to gracefully handle both federated and non-federated messages:
if platform, ok := GetPlatform(msg); ok {
// Handle federated message with platform info
globalID := BuildGlobalID(entityID, platform)
} else {
// Handle non-federated message
localID := entityID
}
func GetUID ¶
GetUID is a helper function to extract the UID from any message. It returns the UID and true if the message has federation metadata, or a zero UUID and false otherwise.
func IsValidEntityID ¶
IsValidEntityID checks if a string conforms to the canonical 6-part EntityID format. Valid format: organization.platform.domain.system.type.instance (e.g., "c360.platform1.robotics.mav1.drone.0") This enables consistent entity identification across the system.
Types ¶
type BaseMessage ¶
type BaseMessage struct {
// contains filtered or unexported fields
}
BaseMessage provides the standard implementation of the Message interface. It combines a typed payload with metadata to create a complete message ready for transmission through the semantic event mesh.
BaseMessage is immutable after creation - all fields are set during construction and cannot be modified. This ensures message integrity throughout the processing pipeline.
Construction using Functional Options:
NewBaseMessage uses the functional options pattern for clean, composable configuration:
// Simple message (most common)
msg := NewBaseMessage(msgType, payload, "my-service")
// With specific timestamp (testing/historical data)
msg := NewBaseMessage(msgType, payload, "my-service", WithTime(pastTime))
// With federation support (multi-platform)
msg := NewBaseMessage(msgType, payload, "my-service", WithFederation(platform))
// Composable options
msg := NewBaseMessage(msgType, payload, "my-service",
WithFederation(platform),
WithTime(pastTime))
func NewBaseMessage ¶
func NewBaseMessage(msgType Type, payload Payload, source string, opts ...Option) *BaseMessage
NewBaseMessage creates a new BaseMessage with optional configuration.
Parameters:
- msgType: Structured type information (domain, category, version)
- payload: The message payload implementing the Payload interface
- source: Identifier of the service or component creating this message
- opts: Optional configuration functions
Examples:
// Simple message with current timestamp
msg := NewBaseMessage(msgType, payload, "my-service")
// Message with specific timestamp (for historical data)
msg := NewBaseMessage(msgType, payload, "my-service", WithTime(pastTime))
// Federated message for multi-platform deployment
msg := NewBaseMessage(msgType, payload, "my-service", WithFederation(platform))
// Federated message with specific timestamp
msg := NewBaseMessage(msgType, payload, "my-service",
WithFederationAndTime(platform, pastTime))
func (*BaseMessage) Hash ¶
func (m *BaseMessage) Hash() string
Hash returns a SHA256 hash of the message content. The hash includes the message type and payload data.
func (*BaseMessage) ID ¶
func (m *BaseMessage) ID() string
ID returns the unique message identifier.
func (*BaseMessage) MarshalJSON ¶
func (m *BaseMessage) MarshalJSON() ([]byte, error)
MarshalJSON implements json.Marshaler for BaseMessage. This allows BaseMessage to be serialized to JSON even though its fields are private.
Validation is performed before serialization to ensure invalid messages cannot be serialized and published to the message bus.
func (*BaseMessage) Payload ¶
func (m *BaseMessage) Payload() Payload
Payload returns the message payload.
func (*BaseMessage) Type ¶
func (m *BaseMessage) Type() Type
Type returns the structured message type.
func (*BaseMessage) UnmarshalJSON ¶
func (m *BaseMessage) UnmarshalJSON(data []byte) error
UnmarshalJSON implements json.Unmarshaler for BaseMessage. Requires payload types to be registered in the global PayloadRegistry. For generic JSON processing, use the well-known type "core.json.v1" (GenericJSONPayload).
func (*BaseMessage) Validate ¶
func (m *BaseMessage) Validate() error
Validate performs comprehensive message validation.
type BinaryContent ¶
type BinaryContent struct {
ContentType string // MIME type: "image/jpeg", "video/mp4", "application/pdf"
Data []byte // The actual binary data
}
BinaryContent describes a binary field for storage. Used by BinaryStorable implementations to provide binary data along with its content type for proper handling.
type BinaryStorable ¶
type BinaryStorable interface {
ContentStorable
// BinaryFields returns binary content to store in ObjectStore.
//
// Each field is stored as a separate binary blob with its own key.
// The returned map keys are field names that can be referenced in
// ContentFields for semantic role mapping.
//
// Example return value:
// {"video": {ContentType: "video/mp4", Data: videoBytes},
// "thumbnail": {ContentType: "image/jpeg", Data: thumbBytes}}
BinaryFields() map[string]BinaryContent
}
BinaryStorable extends ContentStorable for payloads with binary content.
This interface enables storage of images, videos, PDFs, and other binary files alongside text content. Binary data is stored directly in ObjectStore (no base64 encoding) with JSON metadata referencing the binary keys.
Example implementation:
type VideoDocument struct {
Title string
Description string
VideoData []byte
Thumbnail []byte
}
func (v *VideoDocument) RawContent() map[string]string {
return map[string]string{
"title": v.Title,
"description": v.Description,
}
}
func (v *VideoDocument) BinaryFields() map[string]BinaryContent {
return map[string]BinaryContent{
"video": {ContentType: "video/mp4", Data: v.VideoData},
"thumbnail": {ContentType: "image/jpeg", Data: v.Thumbnail},
}
}
type ContentStorable ¶
type ContentStorable interface {
Storable // EntityID() + Triples() + StorageRef()
// ContentFields returns semantic role → field name mapping.
//
// This tells consumers how to find content in the stored data without
// hardcoding field names. Keys are semantic roles understood by consumers
// (like embedding workers), values are field names in RawContent().
//
// Standard semantic roles:
// - "body": Primary text content (full document text)
// - "abstract": Brief summary or description
// - "title": Document title
//
// Example return value:
// {"body": "content", "abstract": "description", "title": "title"}
//
// The map should only include roles that have non-empty content.
ContentFields() map[string]string
// RawContent returns the content to store in ObjectStore.
//
// Field names in this map should match values in ContentFields().
// This is what gets serialized and stored; consumers retrieve it
// via StorageRef and use ContentFields to find specific content.
//
// Example return value:
// {"title": "Safety Manual", "content": "Full document text...", "description": "Brief summary"}
RawContent() map[string]string
}
ContentStorable extends Storable for payloads with large content fields.
This interface enables the "process → store → graph" pattern where:
- Processor creates semantic understanding (triples with metadata only)
- ObjectStore stores raw content and returns StorageReference
- GraphProcessor receives ContentStorable with both semantics and content reference
- EmbeddingWorker uses ContentFields to find text for embedding
The key insight is that ContentStorable separates metadata (in triples) from content (in ObjectStore), while providing a semantic map of content structure via ContentFields. This avoids bloating triples with large text and enables efficient embedding extraction without hardcoded field name coupling.
Example implementation:
type Document struct {
Title string
Description string
Body string
storageRef *StorageReference
}
func (d *Document) EntityID() string { return d.entityID }
func (d *Document) Triples() []Triple { return metadataTriples } // NO body
func (d *Document) StorageRef() *StorageReference { return d.storageRef }
func (d *Document) ContentFields() map[string]string {
return map[string]string{
"body": "body", // semantic role → field name
"abstract": "description",
"title": "title",
}
}
func (d *Document) RawContent() map[string]string {
return map[string]string{
"title": d.Title,
"description": d.Description,
"body": d.Body,
}
}
type Correlatable ¶
type Correlatable interface {
// CorrelationID returns a unique identifier for correlating related messages.
// This enables request/response matching and distributed tracing.
// Common patterns:
// - Request ID: Shared by request and response
// - Trace ID: Shared by all messages in a distributed operation
// - Causation ID: Links cause and effect messages
CorrelationID() string
}
Correlatable enables distributed tracing and causality tracking. Payloads implementing this interface can be correlated across requests, responses, and distributed operations.
type DefaultFederationMeta ¶
type DefaultFederationMeta struct {
*DefaultMeta
// contains filtered or unexported fields
}
DefaultFederationMeta provides the standard implementation of FederationMeta. It embeds DefaultMeta and adds federation-specific fields.
func NewFederationMeta ¶
func NewFederationMeta(source string, platform config.PlatformConfig) *DefaultFederationMeta
NewFederationMeta creates a new DefaultFederationMeta with auto-generated UID.
Parameters:
- source: The service or component creating this message
- platform: The platform configuration identifying the origin instance
The UID is automatically generated as a new UUID, and timestamps are set to the current time.
func NewFederationMetaWithTime ¶
func NewFederationMetaWithTime( source string, platform config.PlatformConfig, createdAt time.Time, ) *DefaultFederationMeta
NewFederationMetaWithTime creates a new DefaultFederationMeta with specific creation time. Useful for historical data import or testing.
func (*DefaultFederationMeta) Platform ¶
func (m *DefaultFederationMeta) Platform() config.PlatformConfig
Platform returns the originating platform configuration.
func (*DefaultFederationMeta) UID ¶
func (m *DefaultFederationMeta) UID() uuid.UUID
UID returns the global unique identifier for this message.
type DefaultMeta ¶
type DefaultMeta struct {
// contains filtered or unexported fields
}
DefaultMeta provides the standard implementation of the Meta interface. It tracks when an event occurred, when it was received by the system, and where it originated from.
func NewDefaultMeta ¶
func NewDefaultMeta(createdAt time.Time, source string) *DefaultMeta
NewDefaultMeta creates a new DefaultMeta instance with the given creation time and source. The received time is automatically set to the current time.
func NewDefaultMetaWithReceivedAt ¶
func NewDefaultMetaWithReceivedAt(createdAt, receivedAt time.Time, source string) *DefaultMeta
NewDefaultMetaWithReceivedAt creates a new DefaultMeta instance with explicit creation and received times. This is useful for testing or when importing historical data.
func (*DefaultMeta) CreatedAt ¶
func (m *DefaultMeta) CreatedAt() time.Time
CreatedAt returns when the original event occurred.
func (*DefaultMeta) ReceivedAt ¶
func (m *DefaultMeta) ReceivedAt() time.Time
ReceivedAt returns when the system received the message.
func (*DefaultMeta) Source ¶
func (m *DefaultMeta) Source() string
Source returns the origin of the message.
type Deployable ¶
type Deployable interface {
// DeploymentID returns the identifier of the deployment or collection.
// Example: "production-us-west", "staging-experiment-42"
DeploymentID() string
}
Deployable indicates the payload belongs to a deployment or collection. This enables grouping messages by deployment context.
type EntityID ¶
EntityID represents a complete entity identifier with semantic structure. Follows the pattern: org.platform.domain.system.type.instance for federated entity management.
type EntityType ¶
type EntityType = types.EntityType
EntityType represents a structured entity type identifier using dotted notation. Format: EntityType{Domain: "domain", Type: "type"} -> Key() returns "domain.type"
type Expirable ¶
type Expirable interface {
// ExpiresAt returns the time when this payload expires.
// Returns zero time if payload never expires.
ExpiresAt() time.Time
// TTL returns the time-to-live duration from creation.
// Returns 0 if payload never expires.
TTL() time.Duration
}
Expirable defines time-to-live for automatic cleanup. Payloads implementing this interface can be automatically expired and cleaned up by storage systems.
type FederationMeta ¶
type FederationMeta interface {
Meta
// UID returns the global unique identifier for this message.
// This is a proper UUID that ensures uniqueness across all platforms.
UID() uuid.UUID
// Platform returns information about the originating platform.
// This identifies which SemStreams instance created this message.
Platform() config.PlatformConfig
}
FederationMeta extends Meta with federation support for multi-platform deployments. It adds a global unique identifier (UID) and platform information to enable entity correlation across distributed SemStreams instances.
The UID is a proper UUID that provides global uniqueness, while the Platform information identifies the specific instance/region where the message originated.
This enables several federation patterns:
- Cross-platform entity correlation (same drone tracked by multiple stations)
- Data aggregation from multiple regions
- Entity resolution across federated deployments
- Distributed graph queries
type GenericJSONPayload ¶
type GenericJSONPayload struct {
// Data contains the JSON payload as a map.
// This supports arbitrary JSON structures while remaining type-safe
// at the component level (components declare they work with core.json.v1).
Data map[string]any `json:"data"`
}
GenericJSONPayload provides a simple, explicitly flexible payload type for testing, prototyping, and basic data processing flows.
This is an intentional, well-known type (core.json.v1) designed for:
- Rapid prototyping of flows
- Integration testing
- Basic JSON data processing (filter, map, transform)
- Simple ETL pipelines
Components that work with GenericJSON (JSONFilter, JSONMap) explicitly declare they require "core.json.v1" type, providing type safety while maintaining flexibility for arbitrary JSON structures.
Example usage:
payload := &GenericJSONPayload{
Data: map[string]any{
"sensor_id": "temp-001",
"temperature": 23.5,
"unit": "celsius",
},
}
func NewGenericJSON ¶
func NewGenericJSON(data map[string]any) *GenericJSONPayload
NewGenericJSON creates a new GenericJSON payload with the given data.
func (*GenericJSONPayload) MarshalJSON ¶
func (g *GenericJSONPayload) MarshalJSON() ([]byte, error)
MarshalJSON serializes the GenericJSON payload to JSON format. The output format matches the input structure with a "data" wrapper.
func (*GenericJSONPayload) Schema ¶
func (g *GenericJSONPayload) Schema() Type
Schema returns the payload type identifier for GenericJSON. Always returns core.json.v1 as this is the well-known type for generic JSON processing in StreamKit.
func (*GenericJSONPayload) UnmarshalJSON ¶
func (g *GenericJSONPayload) UnmarshalJSON(data []byte) error
UnmarshalJSON deserializes JSON data into the GenericJSON payload.
func (*GenericJSONPayload) Validate ¶
func (g *GenericJSONPayload) Validate() error
Validate performs basic validation on the GenericJSON payload. Ensures the data map is not nil.
type Keyable ¶
Keyable interface represents types that can be converted to semantic keys using dotted notation.
type Locatable ¶
type Locatable interface {
// Location returns geographic coordinates as (latitude, longitude).
// Values should be in decimal degrees:
// - Latitude: -90.0 to +90.0 (negative = South, positive = North)
// - Longitude: -180.0 to +180.0 (negative = West, positive = East)
Location() (lat, lon float64)
}
Locatable provides geographic coordinates for spatial indexing. Payloads implementing this interface can be indexed by location and queried spatially.
type Measurable ¶
type Measurable interface {
// Measurements returns a map of measurement names to values.
// Each measurement may have different units.
Measurements() map[string]any
// Unit returns the unit for a specific measurement.
// Returns empty string if measurement doesn't exist or is unitless.
Unit(measurement string) string
}
Measurable contains multiple measurements with units. This interface extends Observable for payloads that contain multiple related measurements.
type Message ¶
type Message interface {
// ID returns a unique identifier for this message instance.
// Typically a UUID, this ID is immutable and globally unique.
ID() string
// Type returns structured type information used for routing and processing.
// The type contains domain, category, and version information.
Type() Type
// Payload returns the message payload that may implement behavioral interfaces.
// Payloads expose capabilities like Identifiable, Locatable, Observable, etc.
Payload() Payload
// Meta returns metadata about the message lifecycle and origin.
// Includes creation time, receipt time, and source service information.
Meta() Meta
// Hash returns a content-based hash for deduplication and storage.
// The hash is computed from the message type and payload data.
Hash() string
// Validate performs comprehensive validation of the message.
// Checks message type validity, payload presence, and payload-specific validation.
Validate() error
}
Message represents the core message interface for the SemStreams platform. Messages are the fundamental unit of data flow, carrying typed payloads with metadata through the semantic event mesh.
Design principles:
- Infrastructure-agnostic: Messages contain only data, no routing or storage logic
- Behavioral payloads: Payloads expose capabilities through -able interfaces
- Flexible metadata: Meta interface allows different metadata implementations
- Content-addressable: Hash method enables deduplication and referencing
Example:
msg := NewBaseMessage(
Type{Domain: "sensors", Category: "gps", Version: "v1"},
gpsPayload,
"gps-reader-service"
)
type Meta ¶
type Meta interface {
// CreatedAt returns when the original event or observation occurred.
// For sensor data, this is the measurement time.
// For business events, this is when the event happened.
CreatedAt() time.Time
// ReceivedAt returns when the message entered the processing system.
// This helps track ingestion latency and message age.
// May be the same as CreatedAt for real-time streams.
ReceivedAt() time.Time
// Source returns the identifier of the message originator.
// Examples: "gps-reader-service", "sensor-12345", "order-processor"
// Used for debugging, tracing, and access control.
Source() string
}
Meta provides metadata about a message's lifecycle and origin. This interface enables tracking of when messages were created, when they entered the system, and where they originated.
Using an interface rather than a concrete type allows for:
- Custom metadata implementations for specific domains
- Extended metadata with additional fields when needed
- Easier testing with mock implementations
type Observable ¶
type Observable interface {
// ObservedEntity returns the ID of the entity being observed.
// Example: "sensor-123", "vehicle-456", "building-789"
ObservedEntity() string
// ObservedProperty returns the property being observed.
// Example: "temperature", "pressure", "velocity", "position"
ObservedProperty() string
// ObservedValue returns the observed value as a generic interface.
// The actual type depends on the property being observed.
ObservedValue() any
// ObservedUnit returns the unit of measurement.
// Example: "celsius", "pascal", "m/s", "meters"
// Returns empty string if unitless.
ObservedUnit() string
}
Observable represents observations of other entities. This interface is used for sensor data, measurements, and observations that reference an external entity or property being observed.
type Option ¶
type Option func(*BaseMessage)
Option is a functional option for configuring BaseMessage construction.
func WithFederation ¶
func WithFederation(platform config.PlatformConfig) Option
WithFederation enables federation support by using FederationMeta. This adds global UIDs for cross-platform message correlation.
func WithFederationAndTime ¶
func WithFederationAndTime(platform config.PlatformConfig, createdAt time.Time) Option
WithFederationAndTime combines federation support with a specific timestamp.
type Payload ¶
type Payload interface {
// Schema returns the Type that defines this payload's structure.
// This enables type-safe routing and processing throughout the system.
Schema() Type
// Validate checks the payload data for correctness.
// Returns nil if valid, or an error describing the validation failure.
// Should validate:
// - Required fields are present
// - Values are within acceptable ranges
// - Business rules are satisfied
Validate() error
// JSON serialization using standard Go interfaces.
// Payloads must implement json.Marshaler and json.Unmarshaler
// for deterministic serialization. The same payload must always
// produce the same JSON output.
json.Marshaler
json.Unmarshaler
}
Payload represents the data carried by a message. All message payloads must implement this interface to provide schema information, validation, and serialization capabilities.
Payloads may also implement behavioral interfaces (Graphable, Locatable, Observable, etc.) to expose additional capabilities that can be discovered and utilized at runtime.
Example implementation:
type GPSPayload struct {
DeviceID string `json:"device_id"`
Latitude float64 `json:"latitude"`
Longitude float64 `json:"longitude"`
Timestamp time.Time `json:"timestamp"`
}
func (p *GPSPayload) Schema() Type {
return Type{Domain: "sensors", Category: "gps", Version: "v1"}
}
func (p *GPSPayload) Validate() error {
if p.DeviceID == "" {
return errors.New("device ID is required")
}
if p.Latitude < -90 || p.Latitude > 90 {
return errors.New("invalid latitude")
}
if p.Longitude < -180 || p.Longitude > 180 {
return errors.New("invalid longitude")
}
return nil
}
func (p *GPSPayload) MarshalJSON() ([]byte, error) {
// Use alias to avoid infinite recursion
type Alias GPSPayload
return json.Marshal((*Alias)(p))
}
func (p *GPSPayload) UnmarshalJSON(data []byte) error {
// Use alias to avoid infinite recursion
type Alias GPSPayload
return json.Unmarshal(data, (*Alias)(p))
}
type Processable ¶
type Processable interface {
// Priority returns the processing priority (higher = more important).
// Typical range: 0 (lowest) to 10 (highest).
Priority() int
// Deadline returns the time by which processing must complete.
// Returns zero time if no deadline.
Deadline() time.Time
}
Processable specifies processing priority and deadlines. This enables deadline-aware processing and priority queues.
type Storable ¶
type Storable interface {
// EntityID returns deterministic 6-part ID: org.platform.domain.system.type.instance
// (duplicated from graph.Graphable to avoid import cycle)
EntityID() string
// Triples returns all facts about this entity
// (duplicated from graph.Graphable to avoid import cycle)
Triples() []Triple
// StorageRef returns reference to where full data is stored.
// May return nil if data is not stored externally.
StorageRef() *StorageReference
}
Storable extends graph.Graphable with storage reference capability. Components that implement Storable can provide both semantic information (via Graphable) and a reference to their full data.
This interface enables the lightweight message pattern where:
- Domain processors create messages with semantic data
- ObjectStore stores full message and adds StorageReference
- GraphProcessor receives Storable with both semantics and reference
- Consumers can access full data via StorageReference when needed
NOTE: This interface duplicates graph.Graphable methods inline to avoid an import cycle (graph imports message for Triple, so message cannot import graph). Any type implementing this interface also implements graph.Graphable automatically due to identical method signatures.
Example implementation:
type StoredEntity struct {
entityID string
triples []Triple
storage *StorageReference
}
func (s *StoredEntity) EntityID() string { return s.entityID }
func (s *StoredEntity) Triples() []Triple { return s.triples }
func (s *StoredEntity) StorageRef() *StorageReference { return s.storage }
type StorageReference ¶
type StorageReference struct {
// StorageInstance identifies which storage component holds the data.
// This enables federation across multiple storage instances.
// Examples: "message-store", "cache-1", "objectstore-primary"
StorageInstance string `json:"storage_instance"`
// Key is the storage-specific key to retrieve the data.
// Format depends on the storage backend but typically includes
// time-based partitioning for efficient retrieval.
// Examples: "2025/01/13/14/msg_abc123", "robotics/drone/1/latest"
Key string `json:"key"`
// ContentType specifies the MIME type of the stored content.
// This helps consumers understand how to process the data.
// Examples: "application/json", "application/protobuf", "application/avro"
ContentType string `json:"content_type"`
// Size is an optional hint about the stored data size in bytes.
// This helps consumers decide whether to fetch the full data.
// A value of 0 indicates the size is unknown.
Size int64 `json:"size,omitempty"`
}
StorageReference points to where the full message data is stored. This enables lightweight message passing where components can reference stored data without transmitting the full payload.
The StorageReference pattern supports the "store once, reference everywhere" architecture, reducing data duplication and enabling efficient processing of large messages.
Example usage:
- ObjectStore stores full message and returns StorageReference
- GraphProcessor receives Storable with reference to full data
- Components can fetch full data only when needed
type Timeable ¶
type Timeable interface {
// Timestamp returns the observation or event time.
// This should be the actual time of observation/event, not message
// creation time (which is in BaseMessage metadata).
Timestamp() time.Time
}
Timeable provides temporal information for time-series analysis. Payloads implementing this interface can be indexed by time and queried temporally.
type Traceable ¶
type Traceable interface {
// TraceID returns the trace identifier.
TraceID() string
// SpanID returns the span identifier for this message.
SpanID() string
// ParentSpanID returns the parent span identifier.
// Returns empty string if this is a root span.
ParentSpanID() string
}
Traceable supports distributed tracing with spans. This interface enables integration with distributed tracing systems like OpenTelemetry, Jaeger, or Zipkin.
type Triple ¶
type Triple struct {
// Subject identifies the entity this triple describes.
// Must use EntityID.Key() format for consistency with federated entity management.
// Examples: "telemetry.robotics.drone.1", "gcs-alpha.robotics.drone.001"
Subject string `json:"subject"`
// Predicate identifies the semantic property using three-level dotted notation.
// Format: domain.category.property (e.g., "robotics.battery.level")
// This maintains consistency with the unified dotted notation from Alpha Week 1.
// Predicates should be defined in the vocabulary package for consistency.
Predicate string `json:"predicate"`
// Object contains the property value or entity reference.
// For literals: primitive types (float64, bool, string, int)
// For entity references: entity ID strings using EntityID.Key() format
// Complex objects should be flattened into multiple triples.
Object any `json:"object"`
// Source identifies where this assertion came from.
// Examples: "mavlink_heartbeat", "gps_fix", "operator_input", "ai_inference"
// Enables traceability and conflict resolution during entity merging.
Source string `json:"source"`
// Timestamp indicates when this assertion was made.
// Should typically be the message timestamp or processing time.
// Enables temporal queries and helps with entity state evolution.
Timestamp time.Time `json:"timestamp"`
// Confidence indicates the reliability of this assertion (0.0 to 1.0).
// Higher values indicate greater certainty about the triple.
//
// Typical confidence levels:
// - 1.0: Direct telemetry or explicit data
// - 0.9: High-confidence sensor readings
// - 0.7: Calculated or derived values
// - 0.5: Inferred relationships
// - 0.0: Uncertain or placeholder data
Confidence float64 `json:"confidence"`
// Context provides correlation ID for message batches or request tracking.
// This enables grouping related triples from the same processing batch
// or correlating triples across distributed systems.
// Examples: message ID, batch ID, correlation ID, request ID
Context string `json:"context,omitempty"`
// Datatype provides optional RDF datatype hint for the Object value.
// This helps with type interpretation and validation in downstream systems.
// Examples: "xsd:float", "xsd:dateTime", "geo:point", "xsd:boolean"
// If omitted, the type is inferred from the Go type of Object.
Datatype string `json:"datatype,omitempty"`
// ExpiresAt indicates when this triple should be considered expired.
// When nil, the triple never expires and remains valid indefinitely.
// When set, the triple is considered expired after this timestamp.
// This enables TTL-based triple expiration for temporal data management.
ExpiresAt *time.Time `json:"expires_at,omitempty"`
}
Triple represents a semantic statement about an entity following the Subject-Predicate-Object pattern. This enables RDF-like knowledge graphs while maintaining simplicity for Go developers.
Triple design follows these principles:
- Subject: Always an entity ID using EntityID.Key() format (e.g., "telemetry.robotics.drone.1")
- Predicate: Semantic property using three-level dotted notation (e.g., "robotics.battery.level")
- Object: Typed value (literals) or entity references (other entity IDs)
- Metadata: Source, timestamp, and confidence for provenance tracking
Example triples from a drone heartbeat message:
- ("telemetry.robotics.drone.1", "robotics.battery.level", 85.5)
- ("telemetry.robotics.drone.1", "robotics.flight.armed", true)
- ("telemetry.robotics.drone.1", "robotics.component.has", "telemetry.robotics.battery.1")
This structure enables:
- NATS wildcard queries: "robotics.battery.*" finds all battery predicates
- Entity relationship modeling: Objects can reference other entities
- Temporal tracking: Each triple has timestamp and confidence
- Provenance: Source field tracks where each assertion came from
- Federation: Works with federated entity IDs from multiple sources
func (Triple) IsExpired ¶
IsExpired returns true if the triple has an expiration time that has passed. Returns false if ExpiresAt is nil (never expires) or if the expiration time has not yet been reached (including exact equality with current time). A triple is only considered expired when time.Now() is strictly after ExpiresAt.
func (Triple) IsRelationship ¶
IsRelationship checks if this triple represents a relationship between entities rather than a property with a literal value. Returns true if Object is a valid EntityID (4-part dotted notation).
type TripleGenerator ¶
type TripleGenerator interface {
// Triples returns semantic triples extracted from this message.
// Each triple represents a meaningful assertion about an entity,
// using structured predicates and proper confidence scoring.
Triples() []Triple
}
TripleGenerator enables messages to produce semantic triples for graph storage. This interface replaces the use of untyped Properties maps with structured semantic assertions that enable reasoning and complex queries.
Implementations should:
- Generate triples for all meaningful properties from the message
- Use vocabulary predicate constants for consistency
- Include entity relationships as triples with entity reference objects
- Set appropriate confidence levels based on data source quality
- Use EntityID.Key() format for consistent subject identification
Example implementation for a drone battery payload:
func (b *BatteryPayload) Triples() []Triple {
entityID := EntityID{
Source: "telemetry", Domain: "robotics", Type: "drone",
Instance: fmt.Sprintf("%d", b.SystemID),
}.Key()
return []Triple{{
Subject: entityID,
Predicate: vocabulary.ROBOTICS_BATTERY_LEVEL, // "robotics.battery.level"
Object: float64(b.BatteryRemaining),
Source: "mavlink_battery",
Timestamp: time.Now(),
Confidence: 1.0,
}}
}