otel

package
v1.0.0-alpha.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2026 License: MIT Imports: 11 Imported by: 0

README

OTEL Exporter Component

The OTEL exporter component exports agent telemetry to OpenTelemetry collectors. It subscribes to agent lifecycle events from NATS JetStream and converts them into OpenTelemetry spans and metrics for observability and distributed tracing.

Overview

This component implements the AGNTCY integration pattern for observability, collecting telemetry from agentic workflows and making it available to standard OTEL tooling (Jaeger, Zipkin, Prometheus, etc.).

Key Features:

  • Automatic span collection from agent lifecycle events
  • Hierarchical trace construction (loop → task → tool)
  • Metric mapping from internal to OTEL format
  • Batched export with configurable intervals
  • Sampling support for high-volume scenarios
  • Protocol support for gRPC and HTTP

Architecture

flowchart TB
    subgraph NATS["NATS JetStream"]
        AS[AGENT_EVENTS Stream]
    end

    subgraph OTEL["OTEL Exporter Component"]
        SUB[Event Subscriber]
        SC[SpanCollector]
        MM[MetricMapper]
        EL[Export Loop]
    end

    subgraph Export["OTEL Collectors"]
        TC[Trace Collector]
        MC[Metric Collector]
    end

    AS -->|agent.>| SUB
    SUB --> SC
    SUB --> MM
    SC --> EL
    MM --> EL
    EL -->|gRPC/HTTP| TC
    EL -->|gRPC/HTTP| MC

    classDef natsStyle fill:#e1f5ff,stroke:#0066cc,stroke-width:2px
    classDef otelStyle fill:#fff3e0,stroke:#ff9800,stroke-width:2px
    classDef exportStyle fill:#e8f5e9,stroke:#4caf50,stroke-width:2px

    class AS natsStyle
    class SUB,SC,MM,EL otelStyle
    class TC,MC exportStyle
Data Flow
  1. Event Ingestion: Subscribe to agent.> subject on AGENT_EVENTS stream
  2. Span Collection: Convert lifecycle events to OTEL spans with trace hierarchy
  3. Metric Mapping: Transform internal metrics to OTEL metric format
  4. Batch Export: Periodically export completed spans and metrics to collectors
  5. Trace Correlation: Link spans via trace ID derived from loop ID

Configuration

Basic Configuration
{
  "type": "output",
  "name": "otel-exporter",
  "config": {
    "endpoint": "localhost:4317",
    "protocol": "grpc",
    "service_name": "semstreams",
    "service_version": "1.0.0",
    "export_traces": true,
    "export_metrics": true
  }
}
Configuration Options
Option Type Default Description
endpoint string localhost:4317 OTEL collector endpoint address
protocol string grpc Export protocol (grpc or http)
service_name string semstreams Service name for traces and metrics
service_version string 1.0.0 Service version
export_traces bool true Enable trace export
export_metrics bool true Enable metric export
export_logs bool false Enable log export (future)
batch_timeout string 5s Batch export interval
max_batch_size int 512 Maximum items per batch
max_export_batch_size int 512 Maximum items per export
export_timeout string 30s Timeout for export operations
insecure bool true Allow insecure connections
headers object {} Additional headers for export
resource_attributes object {} Additional resource attributes
sampling_rate float 1.0 Trace sampling rate (0.0-1.0)
Advanced Configuration
{
  "config": {
    "endpoint": "otel-collector.monitoring.svc.cluster.local:4317",
    "protocol": "grpc",
    "service_name": "agent-orchestrator",
    "service_version": "2.1.0",
    "export_traces": true,
    "export_metrics": true,
    "batch_timeout": "10s",
    "sampling_rate": 0.1,
    "headers": {
      "X-API-Key": "secret-key"
    },
    "resource_attributes": {
      "deployment.environment": "production",
      "service.namespace": "agents"
    }
  }
}

Span Collection

The SpanCollector converts agent lifecycle events into hierarchical OTEL spans with automatic trace linking.

Event to Span Mapping
flowchart TD
    subgraph Events["Agent Events"]
        LC[loop.created]
        LD[loop.completed/failed]
        TS[task.started]
        TD[task.completed/failed]
        TLS[tool.started]
        TLD[tool.completed/failed]
    end

    subgraph Spans["OTEL Spans"]
        RS[Root Span<br/>agent.loop]
        TSP[Task Span<br/>agent.task]
        TLSP[Tool Span<br/>agent.tool.X]
    end

    LC -->|Start| RS
    LD -->|End| RS
    TS -->|Start Child| TSP
    TD -->|End Child| TSP
    TLS -->|Start Child| TLSP
    TLD -->|End Child| TLSP

    RS --> TSP
    TSP --> TLSP

    classDef eventStyle fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
    classDef spanStyle fill:#fff3e0,stroke:#f57c00,stroke-width:2px

    class LC,LD,TS,TD,TLS,TLD eventStyle
    class RS,TSP,TLSP spanStyle
Span Hierarchy

Each agent execution creates a trace with the following structure:

Root Span (agent.loop)
├── Task Span 1 (agent.task)
│   ├── Tool Span A (agent.tool.query_graph)
│   └── Tool Span B (agent.tool.update_memory)
├── Task Span 2 (agent.task)
│   └── Tool Span C (agent.tool.call_llm)
└── Task Span 3 (agent.task)
Span Attributes

Each span includes contextual attributes based on the event type:

Root Span (agent.loop):

  • agent.loop_id: Loop identifier
  • agent.entity_id: Agent entity ID
  • agent.role: Agent role
  • service.name: Service name
  • service.version: Service version

Task Span (agent.task):

  • agent.loop_id: Parent loop ID
  • agent.task_id: Task identifier
  • task.duration_ms: Task duration
  • task.*: Additional task metadata

Tool Span (agent.tool.{name}):

  • agent.loop_id: Parent loop ID
  • agent.task_id: Parent task ID
  • tool.name: Tool name
  • tool.duration_ms: Tool execution duration
  • tool.*: Additional tool metadata
Trace Correlation

Traces are correlated using deterministic trace IDs derived from the loop ID:

TraceID = hashToHex(loop_id, 32)  // 32-character hex string
SpanID  = hashToHex(span_key, 16) // 16-character hex string

This ensures:

  • Consistent trace IDs across distributed components
  • Automatic span linking without explicit context propagation
  • Replay-friendly trace reconstruction

Metric Mapping

The MetricMapper converts internal metrics to OpenTelemetry format, supporting multiple metric types.

Supported Metric Types
flowchart LR
    subgraph Internal["Internal Metrics"]
        IC[Counters]
        IG[Gauges]
        IH[Histograms]
        IS[Summaries]
    end

    subgraph OTEL["OTEL Metrics"]
        OC[Counter<br/>cumulative]
        OG[Gauge<br/>instantaneous]
        OH[Histogram<br/>buckets]
        OS[Summary<br/>quantiles]
    end

    IC --> OC
    IG --> OG
    IH --> OH
    IS --> OS

    classDef internalStyle fill:#e8eaf6,stroke:#3f51b5,stroke-width:2px
    classDef otelStyle fill:#f3e5f5,stroke:#9c27b0,stroke-width:2px

    class IC,IG,IH,IS internalStyle
    class OC,OG,OH,OS otelStyle
Metric Examples

Counter (Cumulative):

{
  "name": "agent.tasks_completed",
  "type": "counter",
  "value": 42,
  "attributes": {
    "agent.loop_id": "loop-123",
    "agent.role": "planner"
  }
}

Gauge (Instantaneous):

{
  "name": "agent.active_tasks",
  "type": "gauge",
  "value": 3,
  "attributes": {
    "agent.loop_id": "loop-123"
  }
}

Histogram (Distribution):

{
  "name": "agent.task_duration_ms",
  "type": "histogram",
  "count": 100,
  "sum": 5420.5,
  "buckets": [
    {"upper_bound": 10, "count": 5},
    {"upper_bound": 50, "count": 45},
    {"upper_bound": 100, "count": 85},
    {"upper_bound": 500, "count": 100}
  ]
}

Summary (Quantiles):

{
  "name": "agent.tool_latency_ms",
  "type": "summary",
  "count": 1000,
  "sum": 12500.0,
  "quantiles": [
    {"quantile": 0.5, "value": 10.2},
    {"quantile": 0.9, "value": 25.6},
    {"quantile": 0.99, "value": 48.3}
  ]
}
Prometheus Integration

The MetricMapper supports mapping from Prometheus-style metrics:

mapper.MapFromPrometheus(&PrometheusMetric{
    Name:   "http_requests_total",
    Help:   "Total HTTP requests",
    Type:   "counter",
    Labels: map[string]string{"method": "GET", "status": "200"},
    Value:  1523,
})

NATS Topology

Input Subjects

The exporter subscribes to agent events from JetStream:

Port Name Stream Subject Type Description
agent_events AGENT_EVENTS agent.> JetStream All agent lifecycle events
Event Subject Hierarchy
agent.
├── loop.
│   ├── created
│   ├── completed
│   └── failed
├── task.
│   ├── started
│   ├── completed
│   └── failed
└── tool.
    ├── started
    ├── completed
    └── failed
Consumer Configuration

The exporter creates a durable consumer with the following configuration:

ConsumerConfig{
    Name:          "otel-exporter",
    Durable:       "otel-exporter",
    FilterSubject: "agent.>",
    AckPolicy:     jetstream.AckExplicitPolicy,
    DeliverPolicy: jetstream.DeliverNewPolicy,
}

Integration with OTEL Collectors

Collector Configuration

To receive data from the exporter, configure your OTEL collector with the appropriate receivers:

receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

processors:
  batch:
    timeout: 10s
    send_batch_size: 512

exporters:
  jaeger:
    endpoint: jaeger:14250
  prometheus:
    endpoint: 0.0.0.0:8889

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch]
      exporters: [jaeger]
    metrics:
      receivers: [otlp]
      processors: [batch]
      exporters: [prometheus]
Docker Compose Example
version: '3.8'
services:
  otel-collector:
    image: otel/opentelemetry-collector:latest
    command: ["--config=/etc/otel-collector-config.yaml"]
    volumes:
      - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
    ports:
      - "4317:4317"  # OTLP gRPC
      - "4318:4318"  # OTLP HTTP
      - "8889:8889"  # Prometheus exporter

  jaeger:
    image: jaegertracing/all-in-one:latest
    ports:
      - "16686:16686"  # Jaeger UI
      - "14250:14250"  # OTLP receiver

  prometheus:
    image: prom/prometheus:latest
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    ports:
      - "9090:9090"
Viewing Traces

After starting the collectors, access the Jaeger UI at http://localhost:16686 to view distributed traces:

  1. Select service: semstreams (or your configured service name)
  2. Find traces by operation: agent.loop, agent.task, agent.tool.*
  3. View trace timeline with parent-child relationships
  4. Inspect span attributes for debugging

Usage Example

Flow Configuration
components:
  - type: output
    name: agent-telemetry
    config:
      endpoint: "localhost:4317"
      protocol: "grpc"
      service_name: "agent-orchestrator"
      export_traces: true
      export_metrics: true
      sampling_rate: 1.0
      ports:
        inputs:
          - name: agent_events
            type: jetstream
            stream_name: AGENT_EVENTS
            subject: "agent.>"
Programmatic Usage
package main

import (
    "context"
    "encoding/json"
    "time"

    "github.com/c360studio/semstreams/component"
    "github.com/c360studio/semstreams/output/otel"
)

func main() {
    // Create configuration
    cfg := otel.DefaultConfig()
    cfg.Endpoint = "localhost:4317"
    cfg.ServiceName = "my-agent-system"
    cfg.SamplingRate = 0.1 // Sample 10% of traces

    rawConfig, _ := json.Marshal(cfg)

    // Create component
    comp, err := otel.NewComponent(rawConfig, component.Dependencies{
        NATSClient: natsClient,
        GetLogger:  func() *slog.Logger { return logger },
    })
    if err != nil {
        panic(err)
    }

    // Initialize
    otelComp := comp.(*otel.Component)
    if err := otelComp.Initialize(); err != nil {
        panic(err)
    }

    // Start
    ctx := context.Background()
    if err := otelComp.Start(ctx); err != nil {
        panic(err)
    }

    // Run for duration
    time.Sleep(10 * time.Minute)

    // Stop
    if err := otelComp.Stop(30 * time.Second); err != nil {
        panic(err)
    }
}
Custom Exporter Implementation

For production use, implement the Exporter interface with the OTEL Go SDK:

import (
    "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
    "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
)

type OTELSDKExporter struct {
    traceExporter  *otlptracegrpc.Exporter
    metricExporter *otlpmetricgrpc.Exporter
}

func (e *OTELSDKExporter) ExportSpans(ctx context.Context, spans []*otel.SpanData) error {
    // Convert SpanData to OTEL SDK spans and export
    return e.traceExporter.ExportSpans(ctx, convertSpans(spans))
}

func (e *OTELSDKExporter) ExportMetrics(ctx context.Context, metrics []*otel.MetricData) error {
    // Convert MetricData to OTEL SDK metrics and export
    return e.metricExporter.ExportMetrics(ctx, convertMetrics(metrics))
}

func (e *OTELSDKExporter) Shutdown(ctx context.Context) error {
    if err := e.traceExporter.Shutdown(ctx); err != nil {
        return err
    }
    return e.metricExporter.Shutdown(ctx)
}

Testing

Unit Tests

Run unit tests for the component:

# Run all tests
task test

# Run with race detector
task test:race

# Run otel package tests specifically
go test -v ./output/otel/...
Integration Tests

The package includes integration tests that verify:

  • Configuration validation
  • Component lifecycle (Initialize, Start, Stop)
  • Span collection from agent events
  • Metric mapping
  • Export batch processing
  • Error handling
# Run integration tests
task test:integration
Mock Exporter

For testing, use the provided MockExporter:

func TestWithMockExporter(t *testing.T) {
    // Create component
    comp := createComponent(t)
    otelComp := comp.(*otel.Component)
    otelComp.Initialize()

    // Set mock exporter
    mockExp := &otel.MockExporter{}
    otelComp.SetExporter(mockExp)

    // Start and process events
    ctx := context.Background()
    otelComp.Start(ctx)

    // Verify exports
    spans := mockExp.GetExportedSpans()
    if len(spans) == 0 {
        t.Error("expected spans to be exported")
    }

    metrics := mockExp.GetExportedMetrics()
    if len(metrics) == 0 {
        t.Error("expected metrics to be exported")
    }
}
Test Coverage

The package maintains high test coverage:

  • Configuration validation: 100%
  • Component lifecycle: 100%
  • Span collection: 95%
  • Metric mapping: 95%
  • Event processing: 90%

Performance Considerations

Batching

The exporter uses batched exports to optimize performance:

  • Default batch interval: 5 seconds
  • Max batch size: 512 items
  • Configurable via batch_timeout and max_batch_size

Tune these settings based on your traffic:

  • High volume: Reduce batch_timeout, increase max_batch_size
  • Low volume: Increase batch_timeout to reduce export overhead
Sampling

For high-throughput scenarios, use sampling to reduce data volume:

{
  "sampling_rate": 0.1  // Sample 10% of traces
}

Sampling is applied at the root span (loop) level, ensuring complete traces are either fully sampled or fully dropped.

Memory Usage

The component maintains:

  • Active spans: In-memory until completion
  • Completed spans: Batched until export
  • Metrics: Aggregated per metric name

Adjust batch_timeout to control memory footprint for completed data.

Troubleshooting

No Traces in Collector
  1. Check collector endpoint: Verify endpoint matches your collector address
  2. Verify protocol: Ensure protocol is grpc or http as supported by collector
  3. Check network connectivity: Use telnet localhost 4317 to test connectivity
  4. Review logs: Enable debug logging to see export attempts
  5. Verify sampling: Ensure sampling_rate is not too low
Spans Not Linked
  1. Check event order: Ensure events arrive in order (created before completed)
  2. Verify loop IDs: Confirm events share the same loop_id
  3. Review span collector stats: Check active_spans and completed_spans counters
Export Failures
  1. Check timeout settings: Increase export_timeout if exports fail
  2. Verify collector health: Ensure collector is running and accepting data
  3. Review headers: Check if headers are required for authentication
  4. Enable insecure mode: Set insecure: true for development
High Memory Usage
  1. Reduce batch interval: Decrease batch_timeout to export more frequently
  2. Increase sampling: Lower sampling_rate to reduce span volume
  3. Monitor active spans: Check for stuck spans that never complete

References

Contributing

When contributing to the OTEL exporter:

  1. Maintain test coverage: Add tests for new features
  2. Follow OTEL conventions: Align with OpenTelemetry semantic conventions
  3. Update documentation: Keep this README in sync with code changes
  4. Performance test: Verify batching and export performance under load
  5. Validate integration: Test with real OTEL collectors (Jaeger, Zipkin, etc.)

License

Part of the SemStreams project. See project LICENSE for details.

Documentation

Overview

Package otel provides an OpenTelemetry exporter for SemStreams agent telemetry.

The OTEL exporter subscribes to agent lifecycle events from NATS JetStream and converts them to OpenTelemetry spans and metrics for export to OTEL collectors.

Architecture

The exporter follows the AGNTCY integration pattern for observability:

                                 ┌─────────────────────┐
Agent Events                     │   OTEL Exporter     │
─────────────────────────────────┤                     │
agent.loop.created ─────────────►│  SpanCollector     │──────► OTEL Collector
agent.loop.completed ───────────►│                     │
agent.task.* ───────────────────►│  MetricMapper      │──────► (Traces + Metrics)
agent.tool.* ───────────────────►│                     │
                                 └─────────────────────┘

Span Collection

The SpanCollector converts agent lifecycle events to OTEL spans:

  • loop.created → Root span start
  • loop.completed/failed → Root span end
  • task.started/completed/failed → Child span for task
  • tool.started/completed/failed → Child span for tool execution

Spans are automatically linked via trace ID derived from the loop ID, creating a complete trace hierarchy for each agent execution.

Metric Mapping

The MetricMapper converts internal metrics to OTEL format:

  • Counters (cumulative values)
  • Gauges (instantaneous values)
  • Histograms (distribution buckets)
  • Summaries (quantile values)

Configuration

Key configuration options:

{
  "endpoint": "localhost:4317",      // OTEL collector endpoint
  "protocol": "grpc",                // "grpc" or "http"
  "service_name": "semstreams",      // Service name for traces
  "export_traces": true,             // Enable trace export
  "export_metrics": true,            // Enable metric export
  "batch_timeout": "5s",             // Batch export interval
  "sampling_rate": 1.0               // Trace sampling rate (0.0-1.0)
}

NATS Subjects

The exporter subscribes to agent events from JetStream:

| Stream         | Subject   | Purpose                    |
|----------------|-----------|----------------------------|
| AGENT_EVENTS   | agent.>   | All agent lifecycle events |

Integration with OTEL SDK

This package provides the data collection and transformation layer. For actual export to OTEL collectors, integrate with the OTEL Go SDK:

import (
    "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
    "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
)

The Exporter interface allows plugging in real OTEL exporters or using the stub implementation for testing.

Example Flow Configuration

components:
  - type: output
    name: otel-exporter
    config:
      endpoint: "localhost:4317"
      protocol: "grpc"
      service_name: "my-agent-system"
      export_traces: true
      export_metrics: true
      sampling_rate: 0.1  # Sample 10% of traces

Trace Correlation

Traces are correlated using the agent loop ID as the trace seed:

TraceID = hash(loop_id)  → 32-character hex
SpanID = hash(span_key)  → 16-character hex

This ensures consistent trace IDs across distributed agent executions and allows correlating spans from multiple components.

References

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewComponent

func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

NewComponent creates a new OTEL exporter component.

func Register

func Register(registry RegistryInterface) error

Register registers the OTEL exporter component with the given registry.

Types

type AgentEvent

type AgentEvent struct {
	// Type is the event type (loop.created, loop.completed, loop.failed, etc.)
	Type string `json:"type"`

	// LoopID is the agent loop identifier.
	LoopID string `json:"loop_id"`

	// TaskID is the task identifier (for task events).
	TaskID string `json:"task_id,omitempty"`

	// ToolName is the tool name (for tool events).
	ToolName string `json:"tool_name,omitempty"`

	// Timestamp is when the event occurred.
	Timestamp time.Time `json:"timestamp"`

	// EntityID is the agent's entity ID.
	EntityID string `json:"entity_id,omitempty"`

	// Role is the agent's role.
	Role string `json:"role,omitempty"`

	// Error is the error message for failure events.
	Error string `json:"error,omitempty"`

	// Duration is the operation duration (for completion events).
	Duration time.Duration `json:"duration,omitempty"`

	// Metadata contains additional event metadata.
	Metadata map[string]any `json:"metadata,omitempty"`
}

AgentEvent represents an agent lifecycle event from NATS.

type BucketCount

type BucketCount struct {
	// UpperBound is the bucket upper bound.
	UpperBound float64 `json:"upper_bound"`

	// Count is the cumulative count.
	Count uint64 `json:"count"`
}

BucketCount represents a histogram bucket.

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implements the OTEL exporter component. It collects spans and metrics from agent events and exports them to OTEL collectors.

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema.

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics.

func (*Component) GetMetricMapper

func (c *Component) GetMetricMapper() *MetricMapper

GetMetricMapper returns the metric mapper (for testing).

func (*Component) GetSpanCollector

func (c *Component) GetSpanCollector() *SpanCollector

GetSpanCollector returns the span collector (for testing).

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health returns the current health status.

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize prepares the component.

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts returns configured input port definitions.

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta returns component metadata.

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts returns configured output port definitions.

func (*Component) SetExporter

func (c *Component) SetExporter(exp Exporter)

SetExporter sets the OTEL exporter (for testing).

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start begins processing agent events and exporting OTEL data.

func (*Component) Stop

func (c *Component) Stop(_ time.Duration) error

Stop gracefully stops the component.

type Config

type Config struct {
	// Ports defines the input/output port configuration.
	Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`

	// Endpoint is the OTEL collector endpoint.
	Endpoint string `json:"endpoint" schema:"type:string,description:OTEL collector endpoint,category:basic,default:localhost:4317"`

	// Protocol specifies the export protocol.
	// Supported values: "grpc", "http"
	Protocol string `json:"protocol" schema:"type:string,description:Export protocol,category:basic,default:grpc"`

	// ServiceName is the service name for OTEL traces.
	ServiceName string `json:"service_name" schema:"type:string,description:Service name for traces,category:basic,default:semstreams"`

	// ServiceVersion is the service version for OTEL traces.
	ServiceVersion string `json:"service_version" schema:"type:string,description:Service version,category:basic,default:1.0.0"`

	// ExportTraces enables trace export.
	ExportTraces bool `json:"export_traces" schema:"type:bool,description:Enable trace export,category:basic,default:true"`

	// ExportMetrics enables metric export.
	ExportMetrics bool `json:"export_metrics" schema:"type:bool,description:Enable metric export,category:basic,default:true"`

	// ExportLogs enables log export.
	ExportLogs bool `json:"export_logs" schema:"type:bool,description:Enable log export,category:basic,default:false"`

	// BatchTimeout is the timeout for batching exports.
	BatchTimeout string `json:"batch_timeout" schema:"type:string,description:Batch export timeout,category:advanced,default:5s"`

	// MaxBatchSize is the maximum number of items per batch.
	MaxBatchSize int `json:"max_batch_size" schema:"type:int,description:Maximum batch size,category:advanced,default:512"`

	// MaxExportBatchSize is the maximum number of items per export.
	MaxExportBatchSize int `json:"max_export_batch_size" schema:"type:int,description:Max export batch size,category:advanced,default:512"`

	// ExportTimeout is the timeout for each export operation.
	ExportTimeout string `json:"export_timeout" schema:"type:string,description:Export operation timeout,category:advanced,default:30s"`

	// Insecure allows insecure connections to the collector.
	Insecure bool `json:"insecure" schema:"type:bool,description:Allow insecure connections,category:security,default:true"`

	// Headers are additional headers to send with exports.
	Headers map[string]string `json:"headers" schema:"type:object,description:Additional export headers,category:advanced"`

	// ResourceAttributes are additional resource attributes.
	ResourceAttributes map[string]string `json:"resource_attributes" schema:"type:object,description:Resource attributes,category:advanced"`

	// SamplingRate is the trace sampling rate (0.0 to 1.0).
	SamplingRate float64 `json:"sampling_rate" schema:"type:float,description:Trace sampling rate,category:advanced,default:1.0"`

	// ConsumerNameSuffix adds a suffix to consumer names for uniqueness in tests.
	ConsumerNameSuffix string `json:"consumer_name_suffix" schema:"type:string,description:Suffix for consumer names,category:advanced"`

	// DeleteConsumerOnStop enables consumer cleanup on stop (for testing).
	DeleteConsumerOnStop bool `` /* 128-byte string literal not displayed */
}

Config defines the configuration for the OTEL exporter component.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns the default configuration.

func (*Config) GetBatchTimeout

func (c *Config) GetBatchTimeout() time.Duration

GetBatchTimeout returns the batch timeout duration.

func (*Config) GetExportTimeout

func (c *Config) GetExportTimeout() time.Duration

GetExportTimeout returns the export timeout duration.

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration.

type DataPoint

type DataPoint struct {
	// Timestamp is when the data point was recorded.
	Timestamp time.Time `json:"timestamp"`

	// Value is the metric value (for counter/gauge).
	Value float64 `json:"value,omitempty"`

	// Count is the count (for histogram/summary).
	Count uint64 `json:"count,omitempty"`

	// Sum is the sum (for histogram/summary).
	Sum float64 `json:"sum,omitempty"`

	// Buckets are histogram bucket counts.
	Buckets []BucketCount `json:"buckets,omitempty"`

	// Quantiles are summary quantile values.
	Quantiles []QuantileValue `json:"quantiles,omitempty"`

	// Attributes are data point attributes.
	Attributes map[string]any `json:"attributes,omitempty"`
}

DataPoint represents a single metric data point.

type Exporter

type Exporter interface {
	// ExportSpans exports spans to the OTEL collector.
	ExportSpans(ctx context.Context, spans []*SpanData) error

	// ExportMetrics exports metrics to the OTEL collector.
	ExportMetrics(ctx context.Context, metrics []*MetricData) error

	// Shutdown gracefully shuts down the exporter.
	Shutdown(ctx context.Context) error
}

Exporter defines the interface for OTEL export operations. This is a stub interface - full implementation requires OTEL SDK.

type MetricData

type MetricData struct {
	// Name is the metric name.
	Name string `json:"name"`

	// Description describes the metric.
	Description string `json:"description,omitempty"`

	// Unit is the metric unit.
	Unit string `json:"unit,omitempty"`

	// Type is the metric type.
	Type MetricType `json:"type"`

	// DataPoints contains the metric values.
	DataPoints []DataPoint `json:"data_points"`

	// Attributes are metric-level attributes.
	Attributes map[string]any `json:"attributes,omitempty"`
}

MetricData represents a metric ready for export.

type MetricMapper

type MetricMapper struct {
	// contains filtered or unexported fields
}

MetricMapper maps internal metrics to OTEL format.

func NewMetricMapper

func NewMetricMapper(serviceName, serviceVersion string) *MetricMapper

NewMetricMapper creates a new metric mapper.

func (*MetricMapper) FlushMetrics

func (mm *MetricMapper) FlushMetrics() []*MetricData

FlushMetrics returns and clears all collected metrics.

func (*MetricMapper) MapFromPrometheus

func (mm *MetricMapper) MapFromPrometheus(prom *PrometheusMetric)

MapFromPrometheus converts Prometheus metrics to OTEL format.

func (*MetricMapper) RecordAgentMetrics

func (mm *MetricMapper) RecordAgentMetrics(loopID, role string, stats map[string]int64)

RecordAgentMetrics records standard agent metrics.

func (*MetricMapper) RecordCounter

func (mm *MetricMapper) RecordCounter(name, description, unit string, value float64, attrs map[string]any)

RecordCounter records a counter metric.

func (*MetricMapper) RecordGauge

func (mm *MetricMapper) RecordGauge(name, description, unit string, value float64, attrs map[string]any)

RecordGauge records a gauge metric.

func (*MetricMapper) RecordHistogram

func (mm *MetricMapper) RecordHistogram(name, description, unit string, count uint64, sum float64, buckets []BucketCount, attrs map[string]any)

RecordHistogram records a histogram metric.

func (*MetricMapper) RecordSummary

func (mm *MetricMapper) RecordSummary(name, description, unit string, count uint64, sum float64, quantiles []QuantileValue, attrs map[string]any)

RecordSummary records a summary metric.

func (*MetricMapper) Stats

func (mm *MetricMapper) Stats() map[string]int64

Stats returns mapper statistics.

type MetricType

type MetricType string

MetricType represents the type of metric.

const (
	MetricTypeCounter   MetricType = "counter"
	MetricTypeGauge     MetricType = "gauge"
	MetricTypeHistogram MetricType = "histogram"
	MetricTypeSummary   MetricType = "summary"
)

MetricType constants for supported OTEL metric types.

type PrometheusMetric

type PrometheusMetric struct {
	Name   string
	Help   string
	Type   string
	Labels map[string]string
	Value  float64
	// For histograms
	Buckets map[float64]uint64
	Count   uint64
	Sum     float64
	// For summaries
	Quantiles map[float64]float64
}

PrometheusMetric represents a Prometheus-style metric for mapping to OTEL format.

type QuantileValue

type QuantileValue struct {
	// Quantile is the quantile (e.g., 0.5, 0.9, 0.99).
	Quantile float64 `json:"quantile"`

	// Value is the quantile value.
	Value float64 `json:"value"`
}

QuantileValue represents a summary quantile.

type RegistryInterface

type RegistryInterface interface {
	RegisterWithConfig(config component.RegistrationConfig) error
}

RegistryInterface defines the interface for component registration.

type SpanCollector

type SpanCollector struct {
	// contains filtered or unexported fields
}

SpanCollector collects spans from agent events.

func NewSpanCollector

func NewSpanCollector(serviceName, serviceVersion string, samplingRate float64) *SpanCollector

NewSpanCollector creates a new span collector.

func (*SpanCollector) FlushCompleted

func (sc *SpanCollector) FlushCompleted() []*SpanData

FlushCompleted returns and clears completed spans.

func (*SpanCollector) ProcessEvent

func (sc *SpanCollector) ProcessEvent(_ context.Context, data []byte) error

ProcessEvent processes an agent event and creates/updates spans.

func (*SpanCollector) Stats

func (sc *SpanCollector) Stats() map[string]int64

Stats returns collector statistics.

type SpanData

type SpanData struct {
	// TraceID is the trace identifier.
	TraceID string `json:"trace_id"`

	// SpanID is the span identifier.
	SpanID string `json:"span_id"`

	// ParentSpanID is the parent span identifier.
	ParentSpanID string `json:"parent_span_id,omitempty"`

	// Name is the span name.
	Name string `json:"name"`

	// Kind is the span kind (client, server, internal, producer, consumer).
	Kind string `json:"kind"`

	// StartTime is when the span started.
	StartTime time.Time `json:"start_time"`

	// EndTime is when the span ended.
	EndTime time.Time `json:"end_time,omitempty"`

	// Status indicates the span status.
	Status SpanStatus `json:"status"`

	// Attributes are span attributes.
	Attributes map[string]any `json:"attributes,omitempty"`

	// Events are span events.
	Events []SpanEvent `json:"events,omitempty"`

	// Links are span links.
	Links []SpanLink `json:"links,omitempty"`
}

SpanData represents collected span information.

type SpanEvent

type SpanEvent struct {
	// Name is the event name.
	Name string `json:"name"`

	// Timestamp is when the event occurred.
	Timestamp time.Time `json:"timestamp"`

	// Attributes are event attributes.
	Attributes map[string]any `json:"attributes,omitempty"`
}

SpanEvent represents an event within a span.

type SpanLink struct {
	// TraceID is the linked trace ID.
	TraceID string `json:"trace_id"`

	// SpanID is the linked span ID.
	SpanID string `json:"span_id"`

	// Attributes are link attributes.
	Attributes map[string]any `json:"attributes,omitempty"`
}

SpanLink represents a link to another span.

type SpanStatus

type SpanStatus struct {
	// Code is the status code (unset, ok, error).
	Code string `json:"code"`

	// Message is an optional status message.
	Message string `json:"message,omitempty"`
}

SpanStatus represents the status of a span.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL