trace

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2026 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package trace implements OpenTelemetry span ingestion and runtime confidence scoring.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildProvenance

func BuildProvenance(traceIDs []string) string

BuildProvenance returns the provenance string for runtime edges. Format: otel_trace:{"trace_ids":["id1","id2"]} (max 5 trace IDs). If traceIDs is empty, returns "otel_trace".

func ComputeConfidence

func ComputeConfidence(observationCount int, daysSinceLastObserved int) float64

ComputeConfidence combines observation volume and recency to produce a confidence score for a runtime edge.

func ConfidenceFromCount

func ConfidenceFromCount(count int) float64

ConfidenceFromCount computes confidence score from observation count within the last 7 days, per the design doc scoring table.

func DecayBracket

func DecayBracket(daysSinceLastObserved int) string

DecayBracket returns a human-readable decay bracket for diagnostics.

func ShouldGarbageCollect

func ShouldGarbageCollect(lastObserved int64, gcThresholdDays int) bool

ShouldGarbageCollect returns true if the edge has not been observed within the given threshold (in days).

Types

type HTTPLogEntry

type HTTPLogEntry struct {
	Timestamp   time.Time
	Method      string
	Path        string
	StatusCode  int
	ServiceName string
	ClientIP    string
	Duration    time.Duration
}

HTTPLogEntry represents a single HTTP access log line.

type HealthState

type HealthState string

HealthState represents the connection health of the trace ingestor.

const (
	HealthConnected    HealthState = "CONNECTED"
	HealthReconnecting HealthState = "RECONNECTING"
	HealthDisconnected HealthState = "DISCONNECTED"
	HealthDisabled     HealthState = "DISABLED"
)

type IngestResult

type IngestResult struct {
	Created int
	Updated int
}

IngestResult reports the outcome of a batch ingestion.

type Ingestor

type Ingestor struct {
	// contains filtered or unexported fields
}

Ingestor processes trace spans and HTTP log entries into graph edges. It implements the TraceIngestor interface.

func NewIngestor

func NewIngestor(db *sql.DB, resolver *SymbolResolver, config TraceIngestConfig) *Ingestor

NewIngestor creates a new Ingestor backed by the given database, symbol resolver, and configuration.

func (*Ingestor) AddToBatch

func (ing *Ingestor) AddToBatch(span TraceSpan)

AddToBatch appends a span to the pending batch. If the batch reaches the configured BatchSize, FlushBatch is triggered automatically.

func (*Ingestor) DecayConfidence

func (ing *Ingestor) DecayConfidence(ctx context.Context) (int, error)

DecayConfidence reduces confidence on stale runtime edges. Edges derived from OTel traces that have not been observed in 30+ days get their confidence set to 0.2. Returns the count of updated edges.

func (*Ingestor) FlushBatch

func (ing *Ingestor) FlushBatch(ctx context.Context) error

FlushBatch ingests all pending spans and clears the batch.

func (*Ingestor) IngestHTTPLogs

func (ing *Ingestor) IngestHTTPLogs(ctx context.Context, entries []HTTPLogEntry) (IngestResult, error)

IngestHTTPLogs converts HTTP log entries to trace spans and delegates to IngestSpans.

func (*Ingestor) IngestSpans

func (ing *Ingestor) IngestSpans(ctx context.Context, spans []TraceSpan) (IngestResult, error)

IngestSpans processes a batch of trace spans, creating or updating graph edges for each span. For each span it resolves source/target node hashes, determines edge type from span attributes, and upserts the edge.

func (*Ingestor) RuntimeEdgeStats

func (ing *Ingestor) RuntimeEdgeStats(ctx context.Context, _ types.Hash) (*RuntimeStats, error)

RuntimeEdgeStats returns aggregated statistics for runtime-derived edges. The snapshot parameter is accepted for interface compatibility but is not currently used for filtering.

type OTLPReceiver

type OTLPReceiver struct {
	coltracepb.UnimplementedTraceServiceServer
	// contains filtered or unexported fields
}

OTLPReceiver is a gRPC server that implements the OTLP trace receiver protocol. It receives ExportTraceServiceRequest messages, normalizes the spans to TraceSpan, and passes them to the TraceIngestor via batching.

func NewOTLPReceiver

func NewOTLPReceiver(addr string, ingestor TraceIngestor) *OTLPReceiver

NewOTLPReceiver creates a new OTLPReceiver that will listen on addr and forward spans to the given ingestor.

func (*OTLPReceiver) Addr

func (r *OTLPReceiver) Addr() string

Addr returns the configured listen address.

func (*OTLPReceiver) Export

Export implements coltracepb.TraceServiceServer. It accepts real OTLP ExportTraceServiceRequest messages, converts ResourceSpans to TraceSpan structs, and passes them to the ingestor via AddToBatch.

func (*OTLPReceiver) ExportSpans

func (r *OTLPReceiver) ExportSpans(ctx context.Context, spans []TraceSpan) error

ExportSpans processes a batch of pre-converted TraceSpan values. Kept for backward compatibility with existing tests.

func (*OTLPReceiver) Health

func (r *OTLPReceiver) Health() HealthState

Health returns the current health state of the receiver.

func (*OTLPReceiver) ListenAddr

func (r *OTLPReceiver) ListenAddr() string

ListenAddr returns the actual address the server is listening on. This is useful when Start was called with port :0 (random port).

func (*OTLPReceiver) Start

func (r *OTLPReceiver) Start(_ context.Context) error

Start begins accepting OTLP trace data. It creates a gRPC server, registers the OTLP trace service, and listens on the configured address.

func (*OTLPReceiver) Stop

func (r *OTLPReceiver) Stop() error

Stop gracefully shuts down the OTLP receiver.

type RouteMapping

type RouteMapping struct {
	ServiceName  string
	RoutePattern string
	NodeHash     types.Hash
	MappingType  string // "http_route", "grpc_method", "queue_topic"
}

RouteMapping represents a mapping from a runtime identifier (HTTP route, gRPC method, queue topic) to a graph node.

type RuntimeStats

type RuntimeStats struct {
	TotalEdges   int
	BySourceType map[string]int
	ActiveCount  int // edges with observations in last 7 days
	StaleCount   int // edges with no observations in last 30 days
	GCEligible   int // edges with no observations in last 90 days
}

RuntimeStats holds aggregated statistics for runtime edges.

type SymbolResolver

type SymbolResolver struct {
	// contains filtered or unexported fields
}

SymbolResolver resolves runtime identifiers (HTTP routes, gRPC methods, queue topics) to graph node hashes using the route_symbols table.

func NewSymbolResolver

func NewSymbolResolver(db *sql.DB) *SymbolResolver

NewSymbolResolver creates a new SymbolResolver backed by the given database.

func (*SymbolResolver) Resolve

func (r *SymbolResolver) Resolve(ctx context.Context, serviceName, routePattern, mappingType string) (types.Hash, float64, error)

Resolve looks up a route symbol by exact match on (service_name, route_pattern, mapping_type). If found, returns the node hash with confidence 1.0. If not found, returns a synthetic unresolved node hash with confidence 0.3.

func (*SymbolResolver) ResolveSpan

func (r *SymbolResolver) ResolveSpan(ctx context.Context, span TraceSpan) (sourceHash, targetHash types.Hash, confidence float64, err error)

ResolveSpan extracts runtime identifiers from a TraceSpan's attributes and resolves them to source and target node hashes.

type TraceIngestConfig

type TraceIngestConfig struct {
	Enabled                 bool
	OTLPEndpoint            string
	BatchSize               int
	BatchInterval           time.Duration
	ConfidenceDecayInterval time.Duration
	GCThresholdDays         int
	ServiceMap              map[string]string // OTel name -> knowing repo name
}

TraceIngestConfig holds configuration for the trace ingestion pipeline.

type TraceIngestor

type TraceIngestor interface {
	IngestSpans(ctx context.Context, spans []TraceSpan) (IngestResult, error)
	IngestHTTPLogs(ctx context.Context, entries []HTTPLogEntry) (IngestResult, error)
	RuntimeEdgeStats(ctx context.Context, snapshot types.Hash) (*RuntimeStats, error)
	DecayConfidence(ctx context.Context) (updated int, err error)
}

TraceIngestor defines the interface for converting raw observability data into graph edges. Implemented by the ingestor in internal/trace/ingestor.go.

type TraceSpan

type TraceSpan struct {
	TraceID       string
	SpanID        string
	ParentSpanID  string
	ServiceName   string
	OperationName string
	PeerService   string
	Attributes    map[string]string
	StartTime     time.Time
	Duration      time.Duration
}

TraceSpan is a normalized representation of a single span from any tracing system.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL