Documentation
¶
Overview ¶
Package otel provides an OpenTelemetry exporter for SemStreams agent telemetry.
The OTEL exporter subscribes to agent lifecycle events from NATS JetStream and converts them to OpenTelemetry spans and metrics for export to OTEL collectors.
Architecture ¶
The exporter follows the AGNTCY integration pattern for observability:
┌─────────────────────┐
Agent Events │ OTEL Exporter │
─────────────────────────────────┤ │
agent.loop.created ─────────────►│ SpanCollector │──────► OTEL Collector
agent.loop.completed ───────────►│ │
agent.task.* ───────────────────►│ MetricMapper │──────► (Traces + Metrics)
agent.tool.* ───────────────────►│ │
└─────────────────────┘
Span Collection ¶
The SpanCollector converts agent lifecycle events to OTEL spans:
- loop.created → Root span start
- loop.completed/failed → Root span end
- task.started/completed/failed → Child span for task
- tool.started/completed/failed → Child span for tool execution
Spans are automatically linked via trace ID derived from the loop ID, creating a complete trace hierarchy for each agent execution.
Metric Mapping ¶
The MetricMapper converts internal metrics to OTEL format:
- Counters (cumulative values)
- Gauges (instantaneous values)
- Histograms (distribution buckets)
- Summaries (quantile values)
Configuration ¶
Key configuration options:
{
"endpoint": "localhost:4317", // OTEL collector endpoint
"protocol": "grpc", // "grpc" or "http"
"service_name": "semstreams", // Service name for traces
"export_traces": true, // Enable trace export
"export_metrics": true, // Enable metric export
"batch_timeout": "5s", // Batch export interval
"sampling_rate": 1.0 // Trace sampling rate (0.0-1.0)
}
NATS Subjects ¶
The exporter subscribes to agent events from JetStream:
| Stream | Subject | Purpose | |----------------|-----------|----------------------------| | AGENT_EVENTS | agent.> | All agent lifecycle events |
Integration with OTEL SDK ¶
This package provides the data collection and transformation layer. For actual export to OTEL collectors, integrate with the OTEL Go SDK:
import (
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
)
The Exporter interface allows plugging in real OTEL exporters or using the stub implementation for testing.
Example Flow Configuration ¶
components:
- type: output
name: otel-exporter
config:
endpoint: "localhost:4317"
protocol: "grpc"
service_name: "my-agent-system"
export_traces: true
export_metrics: true
sampling_rate: 0.1 # Sample 10% of traces
Trace Correlation ¶
Traces are correlated using the agent loop ID as the trace seed:
TraceID = hash(loop_id) → 32-character hex SpanID = hash(span_key) → 16-character hex
This ensures consistent trace IDs across distributed agent executions and allows correlating spans from multiple components.
References ¶
- ADR-019: AGNTCY Integration
- OpenTelemetry Specification: https://opentelemetry.io/docs/specs/
- OTEL Go SDK: https://pkg.go.dev/go.opentelemetry.io/otel
Index ¶
- func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry RegistryInterface) error
- type AgentEvent
- type BucketCount
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) GetMetricMapper() *MetricMapper
- func (c *Component) GetSpanCollector() *SpanCollector
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) SetExporter(exp Exporter)
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(_ time.Duration) error
- type Config
- type DataPoint
- type Exporter
- type MetricData
- type MetricMapper
- func (mm *MetricMapper) FlushMetrics() []*MetricData
- func (mm *MetricMapper) MapFromPrometheus(prom *PrometheusMetric)
- func (mm *MetricMapper) RecordAgentMetrics(loopID, role string, stats map[string]int64)
- func (mm *MetricMapper) RecordCounter(name, description, unit string, value float64, attrs map[string]any)
- func (mm *MetricMapper) RecordGauge(name, description, unit string, value float64, attrs map[string]any)
- func (mm *MetricMapper) RecordHistogram(name, description, unit string, count uint64, sum float64, ...)
- func (mm *MetricMapper) RecordSummary(name, description, unit string, count uint64, sum float64, ...)
- func (mm *MetricMapper) Stats() map[string]int64
- type MetricType
- type PrometheusMetric
- type QuantileValue
- type RegistryInterface
- type SpanCollector
- type SpanData
- type SpanEvent
- type SpanLink
- type SpanStatus
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewComponent ¶
func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
NewComponent creates a new OTEL exporter component.
func Register ¶
func Register(registry RegistryInterface) error
Register registers the OTEL exporter component with the given registry.
Types ¶
type AgentEvent ¶
type AgentEvent struct {
// Type is the event type (loop.created, loop.completed, loop.failed, etc.)
Type string `json:"type"`
// LoopID is the agent loop identifier.
LoopID string `json:"loop_id"`
// TaskID is the task identifier (for task events).
TaskID string `json:"task_id,omitempty"`
// ToolName is the tool name (for tool events).
ToolName string `json:"tool_name,omitempty"`
// Timestamp is when the event occurred.
Timestamp time.Time `json:"timestamp"`
// EntityID is the agent's entity ID.
EntityID string `json:"entity_id,omitempty"`
// Role is the agent's role.
Role string `json:"role,omitempty"`
// Error is the error message for failure events.
Error string `json:"error,omitempty"`
// Duration is the operation duration (for completion events).
Duration time.Duration `json:"duration,omitempty"`
// Metadata contains additional event metadata.
Metadata map[string]any `json:"metadata,omitempty"`
}
AgentEvent represents an agent lifecycle event from NATS.
type BucketCount ¶
type BucketCount struct {
// UpperBound is the bucket upper bound.
UpperBound float64 `json:"upper_bound"`
// Count is the cumulative count.
Count uint64 `json:"count"`
}
BucketCount represents a histogram bucket.
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the OTEL exporter component. It collects spans and metrics from agent events and exports them to OTEL collectors.
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema.
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics.
func (*Component) GetMetricMapper ¶
func (c *Component) GetMetricMapper() *MetricMapper
GetMetricMapper returns the metric mapper (for testing).
func (*Component) GetSpanCollector ¶
func (c *Component) GetSpanCollector() *SpanCollector
GetSpanCollector returns the span collector (for testing).
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns the current health status.
func (*Component) Initialize ¶
Initialize prepares the component.
func (*Component) InputPorts ¶
InputPorts returns configured input port definitions.
func (*Component) OutputPorts ¶
OutputPorts returns configured output port definitions.
func (*Component) SetExporter ¶
SetExporter sets the OTEL exporter (for testing).
type Config ¶
type Config struct {
// Ports defines the input/output port configuration.
Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
// Endpoint is the OTEL collector endpoint.
Endpoint string `json:"endpoint" schema:"type:string,description:OTEL collector endpoint,category:basic,default:localhost:4317"`
// Protocol specifies the export protocol.
// Supported values: "grpc", "http"
Protocol string `json:"protocol" schema:"type:string,description:Export protocol,category:basic,default:grpc"`
// ServiceName is the service name for OTEL traces.
ServiceName string `json:"service_name" schema:"type:string,description:Service name for traces,category:basic,default:semstreams"`
// ServiceVersion is the service version for OTEL traces.
ServiceVersion string `json:"service_version" schema:"type:string,description:Service version,category:basic,default:1.0.0"`
// ExportTraces enables trace export.
ExportTraces bool `json:"export_traces" schema:"type:bool,description:Enable trace export,category:basic,default:true"`
// ExportMetrics enables metric export.
ExportMetrics bool `json:"export_metrics" schema:"type:bool,description:Enable metric export,category:basic,default:true"`
// ExportLogs enables log export.
ExportLogs bool `json:"export_logs" schema:"type:bool,description:Enable log export,category:basic,default:false"`
// BatchTimeout is the timeout for batching exports.
BatchTimeout string `json:"batch_timeout" schema:"type:string,description:Batch export timeout,category:advanced,default:5s"`
// MaxBatchSize is the maximum number of items per batch.
MaxBatchSize int `json:"max_batch_size" schema:"type:int,description:Maximum batch size,category:advanced,default:512"`
// MaxExportBatchSize is the maximum number of items per export.
MaxExportBatchSize int `json:"max_export_batch_size" schema:"type:int,description:Max export batch size,category:advanced,default:512"`
// ExportTimeout is the timeout for each export operation.
ExportTimeout string `json:"export_timeout" schema:"type:string,description:Export operation timeout,category:advanced,default:30s"`
// Insecure allows insecure connections to the collector.
Insecure bool `json:"insecure" schema:"type:bool,description:Allow insecure connections,category:security,default:true"`
// Headers are additional headers to send with exports.
Headers map[string]string `json:"headers" schema:"type:object,description:Additional export headers,category:advanced"`
// ResourceAttributes are additional resource attributes.
ResourceAttributes map[string]string `json:"resource_attributes" schema:"type:object,description:Resource attributes,category:advanced"`
// SamplingRate is the trace sampling rate (0.0 to 1.0).
SamplingRate float64 `json:"sampling_rate" schema:"type:float,description:Trace sampling rate,category:advanced,default:1.0"`
// ConsumerNameSuffix adds a suffix to consumer names for uniqueness in tests.
ConsumerNameSuffix string `json:"consumer_name_suffix" schema:"type:string,description:Suffix for consumer names,category:advanced"`
// DeleteConsumerOnStop enables consumer cleanup on stop (for testing).
DeleteConsumerOnStop bool `` /* 128-byte string literal not displayed */
}
Config defines the configuration for the OTEL exporter component.
func (*Config) GetBatchTimeout ¶
GetBatchTimeout returns the batch timeout duration.
func (*Config) GetExportTimeout ¶
GetExportTimeout returns the export timeout duration.
type DataPoint ¶
type DataPoint struct {
// Timestamp is when the data point was recorded.
Timestamp time.Time `json:"timestamp"`
// Value is the metric value (for counter/gauge).
Value float64 `json:"value,omitempty"`
// Count is the count (for histogram/summary).
Count uint64 `json:"count,omitempty"`
// Sum is the sum (for histogram/summary).
Sum float64 `json:"sum,omitempty"`
// Buckets are histogram bucket counts.
Buckets []BucketCount `json:"buckets,omitempty"`
// Quantiles are summary quantile values.
Quantiles []QuantileValue `json:"quantiles,omitempty"`
// Attributes are data point attributes.
Attributes map[string]any `json:"attributes,omitempty"`
}
DataPoint represents a single metric data point.
type Exporter ¶
type Exporter interface {
// ExportSpans exports spans to the OTEL collector.
ExportSpans(ctx context.Context, spans []*SpanData) error
// ExportMetrics exports metrics to the OTEL collector.
ExportMetrics(ctx context.Context, metrics []*MetricData) error
// Shutdown gracefully shuts down the exporter.
Shutdown(ctx context.Context) error
}
Exporter defines the interface for OTEL export operations. This is a stub interface - full implementation requires OTEL SDK.
type MetricData ¶
type MetricData struct {
// Name is the metric name.
Name string `json:"name"`
// Description describes the metric.
Description string `json:"description,omitempty"`
// Unit is the metric unit.
Unit string `json:"unit,omitempty"`
// Type is the metric type.
Type MetricType `json:"type"`
// DataPoints contains the metric values.
DataPoints []DataPoint `json:"data_points"`
// Attributes are metric-level attributes.
Attributes map[string]any `json:"attributes,omitempty"`
}
MetricData represents a metric ready for export.
type MetricMapper ¶
type MetricMapper struct {
// contains filtered or unexported fields
}
MetricMapper maps internal metrics to OTEL format.
func NewMetricMapper ¶
func NewMetricMapper(serviceName, serviceVersion string) *MetricMapper
NewMetricMapper creates a new metric mapper.
func (*MetricMapper) FlushMetrics ¶
func (mm *MetricMapper) FlushMetrics() []*MetricData
FlushMetrics returns and clears all collected metrics.
func (*MetricMapper) MapFromPrometheus ¶
func (mm *MetricMapper) MapFromPrometheus(prom *PrometheusMetric)
MapFromPrometheus converts Prometheus metrics to OTEL format.
func (*MetricMapper) RecordAgentMetrics ¶
func (mm *MetricMapper) RecordAgentMetrics(loopID, role string, stats map[string]int64)
RecordAgentMetrics records standard agent metrics.
func (*MetricMapper) RecordCounter ¶
func (mm *MetricMapper) RecordCounter(name, description, unit string, value float64, attrs map[string]any)
RecordCounter records a counter metric.
func (*MetricMapper) RecordGauge ¶
func (mm *MetricMapper) RecordGauge(name, description, unit string, value float64, attrs map[string]any)
RecordGauge records a gauge metric.
func (*MetricMapper) RecordHistogram ¶
func (mm *MetricMapper) RecordHistogram(name, description, unit string, count uint64, sum float64, buckets []BucketCount, attrs map[string]any)
RecordHistogram records a histogram metric.
func (*MetricMapper) RecordSummary ¶
func (mm *MetricMapper) RecordSummary(name, description, unit string, count uint64, sum float64, quantiles []QuantileValue, attrs map[string]any)
RecordSummary records a summary metric.
func (*MetricMapper) Stats ¶
func (mm *MetricMapper) Stats() map[string]int64
Stats returns mapper statistics.
type MetricType ¶
type MetricType string
MetricType represents the type of metric.
const ( MetricTypeCounter MetricType = "counter" MetricTypeGauge MetricType = "gauge" MetricTypeHistogram MetricType = "histogram" MetricTypeSummary MetricType = "summary" )
MetricType constants for supported OTEL metric types.
type PrometheusMetric ¶
type PrometheusMetric struct {
Name string
Help string
Type string
Labels map[string]string
Value float64
// For histograms
Buckets map[float64]uint64
Count uint64
Sum float64
// For summaries
Quantiles map[float64]float64
}
PrometheusMetric represents a Prometheus-style metric for mapping to OTEL format.
type QuantileValue ¶
type QuantileValue struct {
// Quantile is the quantile (e.g., 0.5, 0.9, 0.99).
Quantile float64 `json:"quantile"`
// Value is the quantile value.
Value float64 `json:"value"`
}
QuantileValue represents a summary quantile.
type RegistryInterface ¶
type RegistryInterface interface {
RegisterWithConfig(config component.RegistrationConfig) error
}
RegistryInterface defines the interface for component registration.
type SpanCollector ¶
type SpanCollector struct {
// contains filtered or unexported fields
}
SpanCollector collects spans from agent events.
func NewSpanCollector ¶
func NewSpanCollector(serviceName, serviceVersion string, samplingRate float64) *SpanCollector
NewSpanCollector creates a new span collector.
func (*SpanCollector) FlushCompleted ¶
func (sc *SpanCollector) FlushCompleted() []*SpanData
FlushCompleted returns and clears completed spans.
func (*SpanCollector) ProcessEvent ¶
func (sc *SpanCollector) ProcessEvent(_ context.Context, data []byte) error
ProcessEvent processes an agent event and creates/updates spans.
func (*SpanCollector) Stats ¶
func (sc *SpanCollector) Stats() map[string]int64
Stats returns collector statistics.
type SpanData ¶
type SpanData struct {
// TraceID is the trace identifier.
TraceID string `json:"trace_id"`
// SpanID is the span identifier.
SpanID string `json:"span_id"`
// ParentSpanID is the parent span identifier.
ParentSpanID string `json:"parent_span_id,omitempty"`
// Name is the span name.
Name string `json:"name"`
// Kind is the span kind (client, server, internal, producer, consumer).
Kind string `json:"kind"`
// StartTime is when the span started.
StartTime time.Time `json:"start_time"`
// EndTime is when the span ended.
EndTime time.Time `json:"end_time,omitempty"`
// Status indicates the span status.
Status SpanStatus `json:"status"`
// Attributes are span attributes.
Attributes map[string]any `json:"attributes,omitempty"`
// Events are span events.
Events []SpanEvent `json:"events,omitempty"`
// Links are span links.
Links []SpanLink `json:"links,omitempty"`
}
SpanData represents collected span information.
type SpanEvent ¶
type SpanEvent struct {
// Name is the event name.
Name string `json:"name"`
// Timestamp is when the event occurred.
Timestamp time.Time `json:"timestamp"`
// Attributes are event attributes.
Attributes map[string]any `json:"attributes,omitempty"`
}
SpanEvent represents an event within a span.
type SpanLink ¶
type SpanLink struct {
// TraceID is the linked trace ID.
TraceID string `json:"trace_id"`
// SpanID is the linked span ID.
SpanID string `json:"span_id"`
// Attributes are link attributes.
Attributes map[string]any `json:"attributes,omitempty"`
}
SpanLink represents a link to another span.
type SpanStatus ¶
type SpanStatus struct {
// Code is the status code (unset, ok, error).
Code string `json:"code"`
// Message is an optional status message.
Message string `json:"message,omitempty"`
}
SpanStatus represents the status of a span.