weatherstation

package
v1.0.0-beta.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package weatherstation provides an example weather station processor demonstrating how to build a domain processor following the tutorial.

Index

Constants

View Source
const (
	// Weather measurement predicates (unit-specific)
	PredicateWeatherTempCelsius = "weather.temperature.celsius"
	PredicateWeatherWindKph     = "weather.wind.kph"
	PredicateWeatherHumidity    = "weather.humidity.percent"

	// Weather classification predicates
	PredicateWeatherCondition = "weather.classification.condition"

	// Location predicates
	PredicateLocationCity    = "geo.location.city"
	PredicateLocationCountry = "geo.location.country"

	// Time predicates
	PredicateObservationRecorded = "time.observation.recorded"
)

Predicate constants for the weather station domain. These follow the three-level dotted notation: domain.category.property

Variables

This section is empty.

Functions

func NewComponent

func NewComponent(
	rawConfig json.RawMessage, deps component.Dependencies,
) (component.Discoverable, error)

NewComponent creates a new component from configuration.

func Register

func Register(registry *component.Registry) error

Register registers the component with the registry.

func RegisterPayloads

func RegisterPayloads(reg *payloadregistry.Registry) error

RegisterPayloads registers the WeatherReading payload type (weather.station.v1) with the supplied registry. Called by binaries that load this example processor.

func RegisterVocabulary

func RegisterVocabulary()

RegisterVocabulary registers all weather station predicates with the vocabulary system.

Types

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component wraps the domain processor with component lifecycle.

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema for this processor.

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics for this processor.

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health returns the current health status of this processor.

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize prepares the component.

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts returns the NATS input ports this processor subscribes to.

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta returns metadata describing this processor component.

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts returns the NATS output ports for weather readings.

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start begins processing messages.

func (*Component) Stop

func (c *Component) Stop(timeout time.Duration) error

Stop gracefully stops the component.

type ComponentConfig

type ComponentConfig struct {
	Ports    *component.PortConfig `json:"ports"`
	OrgID    string                `json:"org_id"`
	Platform string                `json:"platform"`
}

ComponentConfig holds configuration for the component.

func DefaultConfig

func DefaultConfig() ComponentConfig

DefaultConfig returns the default configuration.

type Config

type Config struct {
	OrgID    string
	Platform string
}

Config holds the configuration for the processor.

func (Config) Validate

func (c Config) Validate() error

Validate checks that the configuration is valid.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor transforms incoming JSON weather data into Graphable payloads.

func NewProcessor

func NewProcessor(config Config) *Processor

NewProcessor creates a new weather station processor with the given configuration.

func (*Processor) Process

func (p *Processor) Process(input map[string]any) (*WeatherReading, error)

Process transforms incoming JSON data into a WeatherReading.

Expected JSON format:

{
  "station_id": "ws-001",
  "temperature": 22.5,
  "humidity": 65.0,
  "wind_speed": 15.0,
  "condition": "sunny",
  "city": "San Francisco",
  "country": "USA",
  "timestamp": "2025-11-26T10:30:00Z"
}

type WeatherReading

type WeatherReading struct {
	// Input fields (from incoming JSON)
	StationID   string    `json:"station_id"`
	Temperature float64   `json:"temperature"`
	Humidity    float64   `json:"humidity"`
	WindSpeed   float64   `json:"wind_speed"`
	Condition   string    `json:"condition"`
	City        string    `json:"city"`
	Country     string    `json:"country"`
	ObservedAt  time.Time `json:"observed_at"`

	// Context fields (set by processor from config)
	OrgID    string `json:"org_id"`
	Platform string `json:"platform"`
}

WeatherReading represents a weather station measurement.

func (*WeatherReading) EntityID

func (w *WeatherReading) EntityID() string

EntityID returns a deterministic 6-part federated entity ID. Format: {org}.{platform}.{domain}.{system}.{type}.{instance} Example: "acme.weather.meteorology.station.outdoor.ws-001"

func (*WeatherReading) MarshalJSON

func (w *WeatherReading) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler.

func (*WeatherReading) Schema

func (w *WeatherReading) Schema() message.Type

Schema returns the message type for weather readings. This must match the PayloadRegistration in init().

func (*WeatherReading) Triples

func (w *WeatherReading) Triples() []message.Triple

Triples returns semantic facts about this weather reading.

func (*WeatherReading) UnmarshalJSON

func (w *WeatherReading) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler.

func (*WeatherReading) Validate

func (w *WeatherReading) Validate() error

Validate checks that the weather reading has all required fields.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL