tracing

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 22, 2025 License: Apache-2.0 Imports: 16 Imported by: 0

README

StreamBus Distributed Tracing

OpenTelemetry-based distributed tracing for StreamBus.

Features

  • OpenTelemetry integration
  • Multiple exporters (Jaeger, OTLP)
  • Context propagation across services
  • HTTP middleware for automatic request tracing
  • Instrumentation helpers for common operations
  • Configurable sampling rates
  • Custom attributes and events

Quick Start

Basic Setup
import "github.com/shawntherrien/streambus/pkg/tracing"

// Create tracer
tracer, err := tracing.New(&tracing.Config{
    Enabled:        true,
    ServiceName:    "streambus-broker",
    ServiceVersion: "1.0.0",
    Environment:    "production",
    Exporter:       "jaeger",
    JaegerEndpoint: "http://localhost:14268/api/traces",
    SamplingRate:   1.0, // 100% sampling
})
if err != nil {
    log.Fatal(err)
}
defer tracer.Shutdown(context.Background())
Tracing Operations
// Start a span
ctx, span := tracer.Start(ctx, "my-operation")
defer span.End()

// Add attributes
tracer.SetAttributes(ctx, map[string]interface{}{
    "user_id": 123,
    "action":  "create",
})

// Add events
tracer.AddEvent(ctx, "validation-complete", map[string]interface{}{
    "valid": true,
})

// Record errors
if err != nil {
    tracer.RecordError(ctx, err, map[string]interface{}{
        "error_code": "VALIDATION_FAILED",
    })
}
Instrumented Operations
// Automatically trace with error handling
err := tracer.InstrumentedOperation(ctx, "process-message", func(ctx context.Context) error {
    return processMessage(ctx, msg)
})
Message Tracing
// Trace producer
ctx, span := tracer.TraceProduceMessage(ctx, "events", 0, key, value)
defer span.End()

err := producer.Send(ctx, topic, partition, key, value)

// Trace consumer
ctx, span := tracer.TraceConsumeMessage(ctx, "events", 0, offset)
defer span.End()

messages, err := consumer.Fetch(ctx, offset, maxBytes)
HTTP Middleware
// Add tracing to HTTP handlers
http.Handle("/api/", tracer.HTTPMiddleware(apiHandler))
Retry Tracing
// Trace operations with retries
err := tracer.TraceWithRetry(ctx, "send-request", 3, func(ctx context.Context, attempt int) error {
    return sendRequest(ctx, req)
})

Configuration

Jaeger
tracer, err := tracing.New(&tracing.Config{
    Enabled:        true,
    ServiceName:    "streambus-broker",
    Exporter:       "jaeger",
    JaegerEndpoint: "http://localhost:14268/api/traces",
    SamplingRate:   0.1, // 10% sampling
})
OTLP (OpenTelemetry Protocol)
tracer, err := tracing.New(&tracing.Config{
    Enabled:      true,
    ServiceName:  "streambus-broker",
    Exporter:     "otlp",
    OTLPEndpoint: "localhost:4317",
    SamplingRate: 1.0,
})
Custom Attributes
tracer, err := tracing.New(&tracing.Config{
    Enabled:     true,
    ServiceName: "streambus-broker",
    Attributes: map[string]string{
        "cluster":    "us-west-1",
        "datacenter": "dc1",
        "version":    "1.0.0",
    },
})

Context Propagation

Extract from HTTP Headers
// Extract trace context from incoming request
ctx := tracer.Extract(r.Context(), propagation.HeaderCarrier(r.Header))

ctx, span := tracer.Start(ctx, "handle-request")
defer span.End()
Inject into HTTP Headers
// Inject trace context into outgoing request
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
tracer.Inject(ctx, propagation.HeaderCarrier(req.Header))
Message Headers
// Inject into message headers
headers := make(map[string]string)
tracer.InjectTraceContext(ctx, headers)

// Extract from message headers
ctx = tracer.ExtractTraceContext(ctx, headers)

Deployment

With Jaeger
# Run Jaeger all-in-one
docker run -d --name jaeger \
  -p 16686:16686 \
  -p 14268:14268 \
  jaegertracing/all-in-one:latest

# Access UI at http://localhost:16686
With OpenTelemetry Collector
# collector-config.yaml
receivers:
  otlp:
    protocols:
      grpc:

exporters:
  jaeger:
    endpoint: jaeger:14250
    tls:
      insecure: true

service:
  pipelines:
    traces:
      receivers: [otlp]
      exporters: [jaeger]
# Run collector
docker run -d --name otel-collector \
  -p 4317:4317 \
  -v $(pwd)/collector-config.yaml:/etc/otel-collector-config.yaml \
  otel/opentelemetry-collector:latest \
  --config=/etc/otel-collector-config.yaml

Performance Considerations

Sampling

Use sampling to reduce overhead in production:

// 10% sampling
SamplingRate: 0.1

// Head-based sampling for specific operations
if isImportantOperation(op) {
    span.SetAttributes(attribute.Bool("sampled", true))
}
Async Export

Traces are exported asynchronously in batches to minimize performance impact.

Overhead

Typical overhead with tracing enabled:

  • Latency: < 1ms per span
  • Memory: ~1KB per span
  • CPU: < 5% with 10% sampling

Best Practices

  1. Use meaningful span names: "producer.send" not "send"
  2. Add relevant attributes: Include IDs, types, sizes
  3. Record errors: Always use RecordError() for failures
  4. Propagate context: Pass context through all function calls
  5. Close spans: Always defer span.End()
  6. Sample intelligently: Use lower rates in production
  7. Monitor overhead: Track tracing performance impact

Troubleshooting

No traces appearing
  1. Check tracer is enabled: tracer.IsEnabled()
  2. Verify exporter endpoint is reachable
  3. Check sampling rate (use 1.0 for testing)
  4. Review logs for export errors
High overhead
  1. Reduce sampling rate
  2. Limit span attributes
  3. Avoid tracing hot paths
  4. Use batch export (default)
Context not propagating
  1. Ensure context is passed through call chain
  2. Verify Inject() and Extract() are called
  3. Check header propagation in HTTP/gRPC

Examples

See examples/tracing for complete examples:

  • Basic tracing
  • HTTP service tracing
  • Message producer/consumer tracing
  • Multi-service distributed tracing

References

Documentation

Index

Constants

View Source
const (
	// Broker attributes
	AttrBrokerID   = attribute.Key("streambus.broker.id")
	AttrBrokerHost = attribute.Key("streambus.broker.host")
	AttrBrokerPort = attribute.Key("streambus.broker.port")

	// Topic attributes
	AttrTopicName       = attribute.Key("streambus.topic.name")
	AttrTopicPartitions = attribute.Key("streambus.topic.partitions")
	AttrTopicReplicas   = attribute.Key("streambus.topic.replicas")

	// Partition attributes
	AttrPartitionID     = attribute.Key("streambus.partition.id")
	AttrPartitionOffset = attribute.Key("streambus.partition.offset")

	// Message attributes
	AttrMessageKey    = attribute.Key("streambus.message.key")
	AttrMessageSize   = attribute.Key("streambus.message.size")
	AttrMessageCount  = attribute.Key("streambus.message.count")
	AttrMessageOffset = attribute.Key("streambus.message.offset")
	AttrMessageBatch  = attribute.Key("streambus.message.batch")

	// Consumer attributes
	AttrConsumerGroup  = attribute.Key("streambus.consumer.group")
	AttrConsumerID     = attribute.Key("streambus.consumer.id")
	AttrConsumerMember = attribute.Key("streambus.consumer.member")

	// Transaction attributes
	AttrTransactionID     = attribute.Key("streambus.transaction.id")
	AttrTransactionStatus = attribute.Key("streambus.transaction.status")

	// Request attributes
	AttrRequestID   = attribute.Key("streambus.request.id")
	AttrRequestType = attribute.Key("streambus.request.type")

	// Schema attributes
	AttrSchemaID      = attribute.Key("streambus.schema.id")
	AttrSchemaVersion = attribute.Key("streambus.schema.version")
	AttrSchemaType    = attribute.Key("streambus.schema.type")

	// Error attributes
	AttrErrorType    = attribute.Key("streambus.error.type")
	AttrErrorMessage = attribute.Key("streambus.error.message")
	AttrErrorCode    = attribute.Key("streambus.error.code")
)

Common attribute keys for StreamBus operations

Variables

View Source
var (
	// ErrTracerNotInitialized is returned when tracer is not initialized
	ErrTracerNotInitialized = errors.New("tracer not initialized")

	// ErrInvalidServiceName is returned when service name is invalid
	ErrInvalidServiceName = errors.New("invalid service name")

	// ErrInvalidSamplingRate is returned when sampling rate is invalid
	ErrInvalidSamplingRate = errors.New("invalid sampling rate: must be between 0.0 and 1.0")

	// ErrInvalidExporterType is returned when exporter type is invalid
	ErrInvalidExporterType = errors.New("invalid exporter type")

	// ErrInvalidOTLPEndpoint is returned when OTLP endpoint is invalid
	ErrInvalidOTLPEndpoint = errors.New("invalid OTLP endpoint")

	// ErrInvalidJaegerEndpoint is returned when Jaeger endpoint is invalid
	ErrInvalidJaegerEndpoint = errors.New("invalid Jaeger endpoint: must specify agent or collector endpoint")

	// ErrInvalidZipkinEndpoint is returned when Zipkin endpoint is invalid
	ErrInvalidZipkinEndpoint = errors.New("invalid Zipkin endpoint")

	// ErrExporterShutdownFailed is returned when exporter shutdown fails
	ErrExporterShutdownFailed = errors.New("exporter shutdown failed")

	// ErrTracerProviderShutdownFailed is returned when tracer provider shutdown fails
	ErrTracerProviderShutdownFailed = errors.New("tracer provider shutdown failed")
)

Functions

func AddSpanEvent

func AddSpanEvent(span trace.Span, name string, attrs ...attribute.KeyValue)

AddSpanEvent adds an event to a span

func BrokerAttributes

func BrokerAttributes(brokerID uint64, host string, port uint16) []attribute.KeyValue

BrokerAttributes creates broker-related attributes

func ConsumerAttributes

func ConsumerAttributes(groupID, consumerID string) []attribute.KeyValue

ConsumerAttributes creates consumer-related attributes

func ContextWithSpan

func ContextWithSpan(ctx context.Context, span trace.Span) context.Context

ContextWithSpan returns a context with the given span

func ErrorAttributes

func ErrorAttributes(errorType, message string, code int) []attribute.KeyValue

ErrorAttributes creates error-related attributes

func HeaderCarrierFromMap

func HeaderCarrierFromMap(m map[string]string) propagation.TextMapCarrier

HeaderCarrierFromMap creates a propagation carrier from a map

func MessageAttributes

func MessageAttributes(key string, size int, offset int64) []attribute.KeyValue

MessageAttributes creates message-related attributes

func PartitionAttributes

func PartitionAttributes(partitionID uint32, offset int64) []attribute.KeyValue

PartitionAttributes creates partition-related attributes

func RecordError

func RecordError(span trace.Span, err error)

RecordError records an error on the span

func RequestAttributes

func RequestAttributes(requestID uint64, requestType string) []attribute.KeyValue

RequestAttributes creates request-related attributes

func SchemaAttributes

func SchemaAttributes(schemaID uint64, version uint32, schemaType string) []attribute.KeyValue

SchemaAttributes creates schema-related attributes

func SetSpanAttributes

func SetSpanAttributes(span trace.Span, attrs ...attribute.KeyValue)

SetSpanAttributes sets multiple attributes on a span

func SetSpanStatus

func SetSpanStatus(span trace.Span, code codes.Code, description string)

SetSpanStatus sets the status of a span

func SpanContextFromContext

func SpanContextFromContext(ctx context.Context) trace.SpanContext

SpanContextFromContext returns the span context from the context

func SpanFromContext

func SpanFromContext(ctx context.Context) trace.Span

SpanFromContext returns the span from the context

func StartSpan

func StartSpan(ctx context.Context, tracer trace.Tracer, name string, opts ...SpanOption) (context.Context, trace.Span)

StartSpan starts a new span with the given name and options

func TopicAttributes

func TopicAttributes(name string, partitions, replicas uint32) []attribute.KeyValue

TopicAttributes creates topic-related attributes

func TransactionAttributes

func TransactionAttributes(txID, status string) []attribute.KeyValue

TransactionAttributes creates transaction-related attributes

Types

type Config

type Config struct {
	// Enable tracing
	Enabled bool

	// Service name for traces
	ServiceName string

	// Service version
	ServiceVersion string

	// Environment (dev, staging, prod)
	Environment string

	// Exporter configuration
	Exporter ExporterConfig

	// Sampling configuration
	Sampling SamplingConfig

	// Resource attributes
	ResourceAttributes map[string]string

	// Propagators (tracecontext, baggage, b3, jaeger, etc.)
	Propagators []string
}

Config holds tracing configuration

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns default tracing configuration

func (*Config) ToResourceAttributes

func (c *Config) ToResourceAttributes() []attribute.KeyValue

ToResourceAttributes converts resource attributes to OpenTelemetry attributes

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration

type ExporterConfig

type ExporterConfig struct {
	// Exporter type (otlp, jaeger, zipkin, stdout)
	Type ExporterType

	// OTLP configuration
	OTLP OTLPConfig

	// Jaeger configuration
	Jaeger JaegerConfig

	// Zipkin configuration
	Zipkin ZipkinConfig
}

ExporterConfig holds exporter configuration

type ExporterType

type ExporterType string

ExporterType represents the type of trace exporter

const (
	// ExporterTypeOTLP exports to OTLP endpoint (OpenTelemetry Collector)
	ExporterTypeOTLP ExporterType = "otlp"

	// ExporterTypeJaeger exports to Jaeger
	ExporterTypeJaeger ExporterType = "jaeger"

	// ExporterTypeZipkin exports to Zipkin
	ExporterTypeZipkin ExporterType = "zipkin"

	// ExporterTypeStdout exports to stdout (for debugging)
	ExporterTypeStdout ExporterType = "stdout"

	// ExporterTypeNone disables exporting
	ExporterTypeNone ExporterType = "none"
)

type JaegerConfig

type JaegerConfig struct {
	// Agent endpoint (e.g., "localhost:6831")
	AgentEndpoint string

	// Collector endpoint (e.g., "http://localhost:14268/api/traces")
	CollectorEndpoint string

	// Username for authentication
	Username string

	// Password for authentication
	Password string
}

JaegerConfig holds Jaeger exporter configuration

type OTLPConfig

type OTLPConfig struct {
	// Endpoint (e.g., "localhost:4317")
	Endpoint string

	// Use insecure connection
	Insecure bool

	// Headers to send with requests
	Headers map[string]string

	// Timeout for exporting
	Timeout time.Duration

	// Compression (gzip, none)
	Compression string
}

OTLPConfig holds OTLP exporter configuration

type SamplingConfig

type SamplingConfig struct {
	// Sampling rate (0.0 to 1.0)
	// 1.0 = sample all traces, 0.1 = sample 10% of traces
	SamplingRate float64

	// Parent-based sampling
	ParentBased bool
}

SamplingConfig holds sampling configuration

type SpanOption

type SpanOption func(*spanConfig)

SpanOption is a functional option for configuring spans

func WithAttributes

func WithAttributes(attrs ...attribute.KeyValue) SpanOption

WithAttributes adds attributes to a span

func WithSpanKind

func WithSpanKind(kind trace.SpanKind) SpanOption

WithSpanKind sets the span kind

type Tracer

type Tracer struct {
	// contains filtered or unexported fields
}

Tracer wraps OpenTelemetry tracer

func New

func New(cfg *Config) (*Tracer, error)

New creates a new tracer with the given configuration

func (*Tracer) AddEvent

func (t *Tracer) AddEvent(ctx context.Context, name string, attrs map[string]interface{})

AddEvent adds an event to the current span

func (*Tracer) Extract

func (t *Tracer) Extract(ctx context.Context, carrier propagation.TextMapCarrier) context.Context

Extract extracts the trace context from a carrier

func (*Tracer) ExtractTraceContext

func (t *Tracer) ExtractTraceContext(ctx context.Context, headers map[string]string) context.Context

ExtractTraceContext extracts trace context from a map (from message headers)

func (*Tracer) HTTPMiddleware

func (t *Tracer) HTTPMiddleware(next http.Handler) http.Handler

HTTPMiddleware returns an HTTP middleware that traces requests

func (*Tracer) Inject

func (t *Tracer) Inject(ctx context.Context, carrier propagation.TextMapCarrier)

Inject injects the trace context into a carrier

func (*Tracer) InjectTraceContext

func (t *Tracer) InjectTraceContext(ctx context.Context, headers map[string]string)

InjectTraceContext injects trace context into a map (for message headers)

func (*Tracer) InstrumentedOperation

func (t *Tracer) InstrumentedOperation(ctx context.Context, operationName string, fn func(context.Context) error) error

InstrumentedOperation runs an operation with tracing

func (*Tracer) IsEnabled

func (t *Tracer) IsEnabled() bool

IsEnabled returns whether tracing is enabled

func (*Tracer) MeasureDuration

func (t *Tracer) MeasureDuration(ctx context.Context, operation string, fn func() error) error

MeasureDuration measures operation duration and records it

func (*Tracer) RecordError

func (t *Tracer) RecordError(ctx context.Context, err error, attrs map[string]interface{})

RecordError records an error in the current span

func (*Tracer) RecordMetric

func (t *Tracer) RecordMetric(ctx context.Context, metricName string, value interface{}, unit string)

RecordMetric records a metric as a span event

func (*Tracer) SetAttributes

func (t *Tracer) SetAttributes(ctx context.Context, attrs map[string]interface{})

SetAttributes sets attributes on the current span

func (*Tracer) Shutdown

func (t *Tracer) Shutdown(ctx context.Context) error

Shutdown shuts down the tracer provider

func (*Tracer) Start

func (t *Tracer) Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span)

Start starts a new span

func (*Tracer) StartWithAttributes

func (t *Tracer) StartWithAttributes(ctx context.Context, spanName string, attrs map[string]interface{}) (context.Context, trace.Span)

StartWithAttributes starts a new span with attributes

func (*Tracer) TraceConsumeMessage

func (t *Tracer) TraceConsumeMessage(ctx context.Context, topic string, partition uint32, offset int64) (context.Context, trace.Span)

TraceConsumeMessage traces message consumption

func (*Tracer) TraceNetworkRequest

func (t *Tracer) TraceNetworkRequest(ctx context.Context, operation, targetBroker string) (context.Context, trace.Span)

TraceNetworkRequest traces network requests between brokers

func (*Tracer) TraceProduceMessage

func (t *Tracer) TraceProduceMessage(ctx context.Context, topic string, partition uint32, key, value []byte) (context.Context, trace.Span)

TraceProduceMessage traces message production

func (*Tracer) TraceRaftOperation

func (t *Tracer) TraceRaftOperation(ctx context.Context, operation string, term, index int64) (context.Context, trace.Span)

TraceRaftOperation traces Raft consensus operations

func (*Tracer) TraceStorageOperation

func (t *Tracer) TraceStorageOperation(ctx context.Context, operation, component string) (context.Context, trace.Span)

TraceStorageOperation traces storage layer operations

func (*Tracer) TraceWithRetry

func (t *Tracer) TraceWithRetry(ctx context.Context, operation string, maxRetries int, fn func(ctx context.Context, attempt int) error) error

TraceWithRetry traces an operation with retry logic

type ZipkinConfig

type ZipkinConfig struct {
	// Endpoint (e.g., "http://localhost:9411/api/v2/spans")
	Endpoint string

	// Timeout for exporting
	Timeout time.Duration
}

ZipkinConfig holds Zipkin exporter configuration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL