graphrag

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package graphrag provides a layered in-memory graph for real-time observability retrieval — error chains, root cause analysis, impact analysis. It replaces the simpler internal/graph package with typed stores.

Index

Constants

View Source
const Wildcard = "<*>"

Wildcard is the placeholder used for variable positions within a template.

Variables

This section is empty.

Functions

func AutoMigrateGraphRAG

func AutoMigrateGraphRAG(db *gorm.DB) error

AutoMigrateGraphRAG runs GORM auto-migration for GraphRAG models and applies tenant backfill + drain_templates composite-PK promotion. Safe to call repeatedly.

func Preprocess

func Preprocess(line string) string

Preprocess masks common variable patterns in a raw log line prior to tokenization. This implements Drain's configurable regex-replacement stage.

func SaveDrainTemplates

func SaveDrainTemplates(db *gorm.DB, tenant string, templates []Template) error

SaveDrainTemplates upserts the given templates into the drain_templates table under the supplied tenant. Tokens are JSON-encoded. Uses GORM's clause.OnConflict upsert which is dialect-aware (ON CONFLICT for SQLite/ PostgreSQL, ON DUPLICATE KEY UPDATE for MySQL).

The conflict target is the composite (tenant_id, id) primary key so the same Drain template hash can coexist across tenants — a future per-tenant Drain miner can rely on this to keep cluster IDs stable per tenant.

func SetPanicMetrics

func SetPanicMetrics(m *telemetry.Metrics)

SetPanicMetrics wires the telemetry metrics so GraphRAG worker recovery closures can increment OtelContext_panics_recovered_total{subsystem="graphrag"}. Safe to leave unset in tests.

Types

type AffectedEntry

type AffectedEntry struct {
	Service     string  `json:"service"`
	Depth       int     `json:"depth"`
	CallCount   int64   `json:"call_count"`
	ImpactScore float64 `json:"impact_score"`
}

AffectedEntry is a service affected by an upstream failure.

type AnomalyNode

type AnomalyNode struct {
	ID        string          `json:"id"`
	Type      AnomalyType     `json:"type"`
	Severity  AnomalySeverity `json:"severity"`
	Service   string          `json:"service"`
	Evidence  string          `json:"evidence"`
	Timestamp time.Time       `json:"timestamp"`
}

AnomalyNode represents a detected anomaly.

type AnomalySeverity

type AnomalySeverity string

AnomalySeverity indicates the severity of an anomaly.

const (
	SeverityCritical AnomalySeverity = "critical"
	SeverityWarning  AnomalySeverity = "warning"
	SeverityInfo     AnomalySeverity = "info"
)

type AnomalyStore

type AnomalyStore struct {
	Anomalies map[string]*AnomalyNode // key: anomaly ID
	Edges     map[string]*Edge        // key: type|from|to
	// contains filtered or unexported fields
}

AnomalyStore holds detected anomalies and their temporal correlations.

func (*AnomalyStore) AddAnomaly

func (as *AnomalyStore) AddAnomaly(anomaly AnomalyNode)

func (*AnomalyStore) AddPrecededByEdge

func (as *AnomalyStore) AddPrecededByEdge(anomalyID, precedingID string, ts time.Time)

func (*AnomalyStore) AnomaliesForService

func (as *AnomalyStore) AnomaliesForService(service string, since time.Time) []*AnomalyNode

func (*AnomalyStore) AnomaliesSince

func (as *AnomalyStore) AnomaliesSince(since time.Time) []*AnomalyNode

func (*AnomalyStore) AnomaliesSinceLimit

func (as *AnomalyStore) AnomaliesSinceLimit(since time.Time, n int) []*AnomalyNode

AnomaliesSinceLimit is AnomaliesSince with a result cap (n <= 0 means unlimited). correlateWithRecent walks this on every detection tick, so the cap keeps a pathological anomaly backlog from turning each tick into an O(N) scan plus O(N) edge fan-out. Selection past the cap follows map iteration order — correlation is best-effort by design.

type AnomalyType

type AnomalyType string

AnomalyType indicates the kind of anomaly detected.

const (
	AnomalyErrorSpike   AnomalyType = "error_spike"
	AnomalyLatencySpike AnomalyType = "latency_spike"
	AnomalyMetricZScore AnomalyType = "metric_zscore"
)

type Config

type Config struct {
	TraceTTL      time.Duration
	RefreshEvery  time.Duration
	SnapshotEvery time.Duration
	AnomalyEvery  time.Duration
	WorkerCount   int
	ChannelSize   int
	// MaxSpansPerTenant caps each tenant's in-memory TraceStore span map.
	// 0 = defaultMaxSpansPerTenant; negative disables the cap.
	MaxSpansPerTenant int
	// TenantIdleTTL evicts a tenant's store slice after this much time
	// without any ingest event or query. 0 = defaultTenantIdleTTL;
	// negative disables eviction.
	TenantIdleTTL time.Duration
}

Config holds GraphRAG configuration.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns sensible defaults.

type CorrelatedSignalsResult

type CorrelatedSignalsResult struct {
	Service     string             `json:"service"`
	ErrorLogs   []LogClusterNode   `json:"error_logs"`
	Metrics     []MetricNode       `json:"metrics"`
	Anomalies   []AnomalyNode      `json:"anomalies"`
	ErrorChains []ErrorChainResult `json:"error_chains,omitempty"`
}

CorrelatedSignals gathers all related signals for a service within a time range.

type Drain

type Drain struct {
	// contains filtered or unexported fields
}

Drain is a thread-safe log template miner.

func NewDrain

func NewDrain(opts ...DrainOption) *Drain

NewDrain constructs a Drain miner with the supplied options.

func (*Drain) Len

func (d *Drain) Len() int

Len returns the number of live templates.

func (*Drain) LoadTemplates

func (d *Drain) LoadTemplates(tpls []Template)

LoadTemplates restores templates from a previous snapshot. Existing state is cleared. Intended for startup recovery. The LRU list is rebuilt by inserting templates in LastSeen order (oldest first, each PushFront'd), so the most recently-seen template ends up at the front.

func (*Drain) Match

func (d *Drain) Match(logLine string, ts time.Time) *Template

Match preprocesses the log line, descends the prefix tree, and either merges into an existing template or creates a new one. Returns the resulting template. Returns nil only for empty input.

func (*Drain) TemplateCount

func (d *Drain) TemplateCount() int

TemplateCount reports the number of live templates under the read lock.

func (*Drain) Templates

func (d *Drain) Templates() []Template

Templates returns a snapshot copy of all currently known templates. Internal back-pointers (elem, leaf) are deliberately omitted from copies.

type DrainOption

type DrainOption func(*Drain)

DrainOption configures a Drain instance.

func WithDepth

func WithDepth(depth int) DrainOption

WithDepth sets the prefix tree depth (default 4). Minimum 2.

func WithMaxChildren

func WithMaxChildren(n int) DrainOption

WithMaxChildren sets the max distinct children per internal node (default 100). Beyond this, tokens collapse to Wildcard.

func WithMaxTemplates

func WithMaxTemplates(n int) DrainOption

WithMaxTemplates sets the total template cap (default 50000). When exceeded, the least-recently-seen template is evicted.

func WithSimilarityThreshold

func WithSimilarityThreshold(st float64) DrainOption

WithSimilarityThreshold sets st (default 0.4). Clamped to (0, 1].

type DrainTemplateRow

type DrainTemplateRow struct {
	TenantID  string    `gorm:"primaryKey;size:64;default:'default';not null" json:"tenant_id"`
	ID        int64     `gorm:"primaryKey;autoIncrement:false" json:"id"` // int64(Template.ID)
	Tokens    string    `gorm:"type:text;not null" json:"tokens"`         // JSON-encoded []string
	Count     int       `json:"count"`
	FirstSeen time.Time `gorm:"index" json:"first_seen"`
	LastSeen  time.Time `gorm:"index" json:"last_seen"`
	Sample    string    `gorm:"type:text" json:"sample"`
}

DrainTemplateRow is the persisted GORM representation of a Drain log template. Tokens are JSON-encoded to stay schema-simple across SQLite/ MySQL/PostgreSQL/MSSQL.

ID is stored as int64 (bit-reinterpretation of the uint64 FNV-64 hash): the standard SQL drivers reject uint64 values with the high bit set, and signed int64 carries the same 64 bits without loss. Conversion happens in the persistence helpers.

The primary key is composite (tenant_id, id): the same template tokens can legitimately recur across tenants, and we want the cluster ID to stay stable per tenant once the in-memory Drain miner is partitioned per-tenant. TenantID is declared first so it leads the PK index.

func (DrainTemplateRow) TableName

func (DrainTemplateRow) TableName() string

TableName overrides GORM's default table name.

type Edge

type Edge struct {
	Type       EdgeType  `json:"type"`
	FromID     string    `json:"from_id"`
	ToID       string    `json:"to_id"`
	Weight     float64   `json:"weight,omitempty"`
	CallCount  int64     `json:"call_count,omitempty"`
	ErrorRate  float64   `json:"error_rate,omitempty"`
	AvgMs      float64   `json:"avg_latency_ms,omitempty"`
	TotalMs    float64   `json:"-"`
	ErrorCount int64     `json:"-"`
	UpdatedAt  time.Time `json:"updated_at"`
}

Edge represents a directed relationship between two nodes.

type EdgeType

type EdgeType string

EdgeType distinguishes different relationship categories.

const (
	EdgeCalls        EdgeType = "CALLS"
	EdgeExposes      EdgeType = "EXPOSES"
	EdgeContains     EdgeType = "CONTAINS"
	EdgeChildOf      EdgeType = "CHILD_OF"
	EdgeEmittedBy    EdgeType = "EMITTED_BY"
	EdgeLoggedDuring EdgeType = "LOGGED_DURING"
	EdgeMeasuredBy   EdgeType = "MEASURED_BY"
	EdgePrecededBy   EdgeType = "PRECEDED_BY"
	EdgeTriggeredBy  EdgeType = "TRIGGERED_BY"
)

type ErrorChainResult

type ErrorChainResult struct {
	RootCause        *RootCauseInfo   `json:"root_cause"`
	SpanChain        []SpanNode       `json:"span_chain"`
	CorrelatedLogs   []LogClusterNode `json:"correlated_logs,omitempty"`
	AnomalousMetrics []MetricNode     `json:"anomalous_metrics,omitempty"`
	TraceID          string           `json:"trace_id"`
}

ErrorChainResult is the output of an error chain query.

type GraphRAG

type GraphRAG struct {
	// contains filtered or unexported fields
}

GraphRAG is the main coordinator for the layered graph system.

Every in-memory store is partitioned by tenant. The coordinator holds a map of tenant ID → *tenantStores and a reader/writer mutex that protects only the outer map; per-tenant stores keep their own RWMutexes for fine-grained concurrent access. All event ingestion and queries route through storesFor(ctx) / storesForTenant(tenant) — there is no "global" slice.

func New

func New(repo *storage.Repository, tsdbAgg *tsdb.Aggregator, ringBuf *tsdb.RingBuffer, cfg Config) *GraphRAG

New creates a new GraphRAG coordinator.

The vectordb-backed semantic similarity path was removed on 2026-05-24 along with the find_similar_logs MCP tool — log clustering now relies solely on the Drain template miner (see drain.go).

func (*GraphRAG) AllServiceEdges

func (g *GraphRAG) AllServiceEdges(ctx context.Context) []*Edge

AllServiceEdges returns every edge in the caller's tenant's ServiceStore. Kept as a narrow helper so API handlers do not need to traverse the tenantStores composite themselves.

func (*GraphRAG) AnomaliesForService

func (g *GraphRAG) AnomaliesForService(ctx context.Context, service string, since time.Time) []*AnomalyNode

AnomaliesForService is a tenant-aware read-through to the per-tenant AnomalyStore, exported so handlers outside this package never need to reach into the store maps directly.

func (*GraphRAG) AnomalyTimeline

func (g *GraphRAG) AnomalyTimeline(ctx context.Context, since time.Time) []*AnomalyNode

AnomalyTimeline returns recent anomalies sorted by time.

func (*GraphRAG) CorrelatedSignals

func (g *GraphRAG) CorrelatedSignals(ctx context.Context, service string, since time.Time) *CorrelatedSignalsResult

func (*GraphRAG) DependencyChain

func (g *GraphRAG) DependencyChain(ctx context.Context, traceID string) []SpanNode

DependencyChain returns the full span tree for a trace.

func (*GraphRAG) DrainTemplateCount

func (g *GraphRAG) DrainTemplateCount() int

DrainTemplateCount reports the number of live Drain templates (bounded by maxTemplates, but the gauge proves it in production).

func (*GraphRAG) DroppedLogsCount

func (g *GraphRAG) DroppedLogsCount() int64

DroppedLogsCount reports the number of log events dropped because the ingestion channel was full.

func (*GraphRAG) DroppedMetricsCount

func (g *GraphRAG) DroppedMetricsCount() int64

DroppedMetricsCount reports the number of metric events dropped because the ingestion channel was full.

func (*GraphRAG) DroppedSpansCount

func (g *GraphRAG) DroppedSpansCount() int64

DroppedSpansCount reports the number of span events dropped because the ingestion channel was full. Exported for tests and readiness probes; atomic, safe from any goroutine.

func (*GraphRAG) ErrorChain

func (g *GraphRAG) ErrorChain(ctx context.Context, service string, since time.Time, limit int) []ErrorChainResult

ErrorChain traces error spans upstream to find the root cause service. The tenant slice is selected via ctx — callers without a tenant ctx collapse to storage.DefaultTenantID at the coordinator boundary.

func (*GraphRAG) EventBufferDepth

func (g *GraphRAG) EventBufferDepth() int

EventBufferDepth returns the current number of events queued in the ingestion channel. Exported for telemetry polling; never blocks.

func (*GraphRAG) GetInvestigation

func (g *GraphRAG) GetInvestigation(ctx context.Context, id string) (*Investigation, error)

GetInvestigation retrieves a single investigation by ID, scoped to the tenant carried by ctx. Returning ErrRecordNotFound for cross-tenant lookups prevents id-guessing from leaking another tenant's row.

func (*GraphRAG) GetInvestigations

func (g *GraphRAG) GetInvestigations(ctx context.Context, service, severity, status string, limit int) ([]Investigation, error)

GetInvestigations queries persisted investigations scoped to the tenant carried by ctx. The composite (tenant_id, created_at) index supports the recency-ordered scan.

func (*GraphRAG) ImpactAnalysis

func (g *GraphRAG) ImpactAnalysis(ctx context.Context, service string, maxDepth int) *ImpactResult

ImpactAnalysis performs BFS downstream from a service to find affected services.

func (*GraphRAG) InvestigationInsertCount

func (g *GraphRAG) InvestigationInsertCount() int64

InvestigationInsertCount reports cooldown-allowed PersistInvestigation calls. Semantics: this counter increments when the cooldown check passes, BEFORE the DB write — so a subsequent DB failure still increments this. It is NOT a strict DB insert count. Intended for tests to assert cooldown behavior without requiring a live repo.

func (*GraphRAG) IsRunning

func (g *GraphRAG) IsRunning() bool

IsRunning reports whether the coordinator's stop channel has not been closed. Used by readiness probes to confirm the background workers are still live.

func (*GraphRAG) ObserveSpanTopology added in v0.3.1

func (g *GraphRAG) ObserveSpanTopology(tenant, traceID, spanID, parentSpanID, service string)

ObserveSpanTopology records one received span's cross-service call topology INDEPENDENT of the sampler. The OTLP trace receiver calls this for every span BEFORE the sampler's keep/drop decision, so the service map retains flow direction even when sampling drops the spans that would have formed the edge.

tenant is the resolved tenant (empty collapses to DefaultTenantID, matching the rest of GraphRAG). traceID is accepted for symmetry with the event path and possible future per-trace scoping; the current implementation keys solely on span IDs, which are globally unique within a tenant. Safe to call from many goroutines.

func (*GraphRAG) OnLogIngested

func (g *GraphRAG) OnLogIngested(log storage.Log)

OnLogIngested is the callback wired into the log ingestion pipeline.

func (*GraphRAG) OnMetricIngested

func (g *GraphRAG) OnMetricIngested(metric tsdb.RawMetric)

OnMetricIngested is the callback wired into the metric ingestion pipeline. tsdb.RawMetric already carries a resolved TenantID (set in ingest/otlp.go Export), so we read it here instead of adding a second argument — keeping the metric callback signature identical across TSDB and GraphRAG.

func (*GraphRAG) OnSpanIngested

func (g *GraphRAG) OnSpanIngested(span storage.Span)

OnSpanIngested is the callback wired into the trace ingestion pipeline. Tenant is taken straight from the persisted Span (already resolved upstream by the OTLP Export handlers) and carried on the event — the callback signature is intentionally unchanged so external wiring stays trivial.

func (*GraphRAG) PersistInvestigation

func (g *GraphRAG) PersistInvestigation(tenant, triggerService string, chains []ErrorChainResult, anomalies []*AnomalyNode)

PersistInvestigation saves an investigation record from an error chain analysis. Tenant is accepted explicitly so the caller (the per-tenant anomaly loop) can re-enter ImpactAnalysis on the correct tenant slice and so the persisted row carries its originating tenant_id.

func (*GraphRAG) RegisterAnomaly

func (g *GraphRAG) RegisterAnomaly(tenant string, anomaly AnomalyNode)

RegisterAnomaly inserts an anomaly into the AnomalyStore for tenant. Mirrors PersistInvestigation's "tenant accepted explicitly" shape so out-of-band anomaly producers (synthetic detectors, integration tests, future external anomaly feeds) can land directly on the right tenant slice without going through the metric/error detection loops. Empty tenant collapses to storage.DefaultTenantID.

func (*GraphRAG) RootCauseAnalysis

func (g *GraphRAG) RootCauseAnalysis(ctx context.Context, service string, since time.Time) []RankedCause

RootCauseAnalysis combines ErrorChain with anomaly correlation to rank probable causes.

func (*GraphRAG) ServiceMap

func (g *GraphRAG) ServiceMap(ctx context.Context, depth int) []ServiceMapEntry

func (*GraphRAG) ServiceNames

func (g *GraphRAG) ServiceNames(ctx context.Context) []string

ServiceNames returns every service the caller's tenant has emitted any span for, sorted ascending. Reads from the in-memory ServiceStore — no DB scan. Used by /api/metadata/services so the dropdown matches the system map exactly (both are sourced from the same store).

func (*GraphRAG) SetMetrics

func (g *GraphRAG) SetMetrics(m *telemetry.Metrics)

SetMetrics wires the Prometheus registry so GraphRAG event drops are observable via otelcontext_graphrag_events_dropped_total. Safe to call before Start; pass nil to disable Prometheus recording (atomic counters still tick).

func (*GraphRAG) ShortestPath

func (g *GraphRAG) ShortestPath(ctx context.Context, from, to string) []string

ShortestPath finds the shortest path between two services using Dijkstra.

func (*GraphRAG) SpanCapacityDropsCount

func (g *GraphRAG) SpanCapacityDropsCount() int64

SpanCapacityDropsCount reports the number of spans skipped because the tenant's TraceStore was at its MaxSpans cap. Atomic, safe from any goroutine; exported for tests and readiness probes.

func (*GraphRAG) Start

func (g *GraphRAG) Start(ctx context.Context)

Start begins background goroutines: workers, refresh, snapshot, anomaly detection. Each goroutine is wrapped in a panic recovery so one misbehaving event can't take down the whole subsystem.

func (*GraphRAG) Stop

func (g *GraphRAG) Stop()

Stop signals all goroutines to exit.

func (*GraphRAG) StoreCounts

func (g *GraphRAG) StoreCounts() StoreCounts

StoreCounts walks a tenant snapshot and takes len() under each store's read lock — no slice building, cheap enough for a periodic sampler.

func (*GraphRAG) TenantsEvictedCount

func (g *GraphRAG) TenantsEvictedCount() int64

TenantsEvictedCount reports the number of tenant store slices evicted for exceeding the idle TTL since startup.

type ImpactResult

type ImpactResult struct {
	Service          string          `json:"service"`
	AffectedServices []AffectedEntry `json:"affected_services"`
	TotalDownstream  int             `json:"total_downstream"`
}

ImpactResult describes the blast radius of a service failure.

type Investigation

type Investigation struct {
	TenantID         string          `gorm:"size:64;default:'default';not null;index:idx_investigations_tenant_created,priority:1" json:"tenant_id"`
	ID               string          `gorm:"primaryKey;size:64" json:"id"`
	CreatedAt        time.Time       `gorm:"index:idx_investigations_tenant_created,priority:2" json:"created_at"`
	Status           string          `gorm:"size:20" json:"status"`   // detected, triaged, resolved
	Severity         string          `gorm:"size:20" json:"severity"` // critical, warning, info
	TriggerService   string          `gorm:"size:255;index" json:"trigger_service"`
	TriggerOperation string          `gorm:"size:255" json:"trigger_operation"`
	ErrorMessage     string          `gorm:"type:text" json:"error_message"`
	RootService      string          `gorm:"size:255" json:"root_service"`
	RootOperation    string          `gorm:"size:255" json:"root_operation"`
	CausalChain      json.RawMessage `gorm:"type:text" json:"causal_chain"`
	TraceIDs         json.RawMessage `gorm:"type:text" json:"trace_ids"`
	ErrorLogs        json.RawMessage `gorm:"type:text" json:"error_logs"`
	AnomalousMetrics json.RawMessage `gorm:"type:text" json:"anomalous_metrics"`
	AffectedServices json.RawMessage `gorm:"type:text" json:"affected_services"`
	SpanChain        json.RawMessage `gorm:"type:text" json:"span_chain"`
}

Investigation is a persisted record of an automated error investigation.

TenantID scopes the row to its originating tenant. The composite (tenant_id, created_at) index supports the recency-ordered "investigations for tenant X" query that GetInvestigations runs on every read.

func (Investigation) TableName

func (Investigation) TableName() string

TableName overrides GORM's default table name.

type LogClusterNode

type LogClusterNode struct {
	ID             string           `json:"id"` // service-scoped cluster id (stable)
	Template       string           `json:"template"`
	TemplateID     uint64           `json:"template_id,omitempty"`
	TemplateTokens []string         `json:"template_tokens,omitempty"`
	SampleLog      string           `json:"sample_log,omitempty"`
	Count          int64            `json:"count"`
	FirstSeen      time.Time        `json:"first_seen"`
	LastSeen       time.Time        `json:"last_seen"`
	SeverityDist   map[string]int64 `json:"severity_distribution"`
}

LogClusterNode groups similar log messages.

Clustering is performed by the Drain template miner. TemplateID is the stable FNV-64 hash of TemplateTokens; ID is the user-facing cluster identifier (service-scoped) that remains stable across Drain re-merges.

type MetricNode

type MetricNode struct {
	ID          string    `json:"id"` // metric_name + "|" + service
	MetricName  string    `json:"metric_name"`
	Service     string    `json:"service"`
	RollingMin  float64   `json:"rolling_min"`
	RollingMax  float64   `json:"rolling_max"`
	RollingAvg  float64   `json:"rolling_avg"`
	SampleCount int64     `json:"sample_count"`
	LastSeen    time.Time `json:"last_seen"`
}

MetricNode represents a metric series for a service.

type NodeType

type NodeType string

NodeType distinguishes different node categories in the graph.

const (
	NodeService    NodeType = "service"
	NodeOperation  NodeType = "operation"
	NodeTrace      NodeType = "trace"
	NodeSpan       NodeType = "span"
	NodeLogCluster NodeType = "log_cluster"
	NodeMetric     NodeType = "metric"
	NodeAnomaly    NodeType = "anomaly"
)

type OperationNode

type OperationNode struct {
	ID          string    `json:"id"` // service + "|" + operation
	Service     string    `json:"service"`
	Operation   string    `json:"operation"`
	FirstSeen   time.Time `json:"first_seen"`
	LastSeen    time.Time `json:"last_seen"`
	HealthScore float64   `json:"health_score"`

	CallCount  int64   `json:"call_count"`
	ErrorCount int64   `json:"error_count"`
	ErrorRate  float64 `json:"error_rate"`
	AvgLatency float64 `json:"avg_latency_ms"`
	P50Latency float64 `json:"p50_latency_ms"`
	P95Latency float64 `json:"p95_latency_ms"`
	P99Latency float64 `json:"p99_latency_ms"`
	TotalMs    float64 `json:"-"`
}

OperationNode represents an endpoint/RPC within a service.

type RankedCause

type RankedCause struct {
	Service    string        `json:"service"`
	Operation  string        `json:"operation"`
	Score      float64       `json:"score"`
	Evidence   []string      `json:"evidence"`
	ErrorChain []SpanNode    `json:"error_chain,omitempty"`
	Anomalies  []AnomalyNode `json:"anomalies,omitempty"`
}

RankedCause is a probable root cause with evidence.

type RootCauseInfo

type RootCauseInfo struct {
	Service      string `json:"service"`
	Operation    string `json:"operation"`
	ErrorMessage string `json:"error_message"`
	SpanID       string `json:"span_id"`
	TraceID      string `json:"trace_id"`
}

RootCauseInfo identifies the responsible service and operation.

type ServiceMapEntry

type ServiceMapEntry struct {
	Service    *ServiceNode     `json:"service"`
	Operations []*OperationNode `json:"operations,omitempty"`
	CallsTo    []*Edge          `json:"calls_to,omitempty"`
	CalledBy   []*Edge          `json:"called_by,omitempty"`
}

ServiceMap returns the service topology with health scores for the API.

type ServiceNode

type ServiceNode struct {
	ID          string    `json:"id"`
	Name        string    `json:"name"`
	FirstSeen   time.Time `json:"first_seen"`
	LastSeen    time.Time `json:"last_seen"`
	HealthScore float64   `json:"health_score"` // 0.0–1.0

	CallCount  int64   `json:"call_count"`
	ErrorCount int64   `json:"error_count"`
	ErrorRate  float64 `json:"error_rate"`
	AvgLatency float64 `json:"avg_latency_ms"`
	TotalMs    float64 `json:"-"` // for computing avg
}

ServiceNode represents a microservice with aggregated health stats.

type ServiceStore

type ServiceStore struct {
	Services   map[string]*ServiceNode   // key: service name
	Operations map[string]*OperationNode // key: service|operation
	Edges      map[string]*Edge          // key: type|from|to
	// contains filtered or unexported fields
}

ServiceStore holds permanent service topology data.

func (*ServiceStore) AllEdges

func (s *ServiceStore) AllEdges() []*Edge

func (*ServiceStore) AllServices

func (s *ServiceStore) AllServices() []*ServiceNode

func (*ServiceStore) CallEdgesFrom

func (s *ServiceStore) CallEdgesFrom(service string) []*Edge

func (*ServiceStore) CallEdgesTo

func (s *ServiceStore) CallEdgesTo(service string) []*Edge

func (*ServiceStore) EnsureCallEdge added in v0.3.1

func (s *ServiceStore) EnsureCallEdge(source, target string, ts time.Time) bool

EnsureCallEdge guarantees a CALLS edge source→target EXISTS without touching its aggregates. If the edge is absent it is created with zeroed CallCount/latency/error stats (UpdatedAt = ts); if it already exists this is a no-op. The pre-sample topology observer uses this so the service map shows flow direction even when sampling dropped every span that would have formed the edge — while the sampled path (UpsertCallEdge) remains the sole source of CallCount/latency/error-rate aggregates. Returns true if a new edge was created.

func (*ServiceStore) EnsureService added in v0.3.1

func (s *ServiceStore) EnsureService(name string, ts time.Time) bool

EnsureService guarantees a ServiceNode for name EXISTS without touching its aggregates. Absent → created with zeroed call/error stats (FirstSeen/LastSeen = ts); present → no-op. The pre-sample topology observer uses this so the service map has a row to hang flow-direction edges on even when sampling dropped every span for the service; the sampled path (UpsertService) and the 60s DB rebuild remain the sole source of call/error/latency aggregates. Returns true if a new node was created.

func (*ServiceStore) GetService

func (s *ServiceStore) GetService(name string) (*ServiceNode, bool)

func (*ServiceStore) UpsertCallEdge

func (s *ServiceStore) UpsertCallEdge(source, target string, durationMs float64, isError bool, ts time.Time)

func (*ServiceStore) UpsertOperation

func (s *ServiceStore) UpsertOperation(service, operation string, durationMs float64, isError bool, ts time.Time)

func (*ServiceStore) UpsertService

func (s *ServiceStore) UpsertService(name string, durationMs float64, isError bool, ts time.Time)

type SignalStore

type SignalStore struct {
	LogClusters map[string]*LogClusterNode // key: cluster ID
	Metrics     map[string]*MetricNode     // key: metric|service
	Edges       map[string]*Edge           // key: type|from|to
	// contains filtered or unexported fields
}

SignalStore holds log cluster and metric correlation data.

func (*SignalStore) AddLoggedDuringEdge

func (ss *SignalStore) AddLoggedDuringEdge(clusterID, spanID string, ts time.Time)

func (*SignalStore) LogClustersForService

func (ss *SignalStore) LogClustersForService(service string) []*LogClusterNode

func (*SignalStore) MetricsForService

func (ss *SignalStore) MetricsForService(service string) []*MetricNode

func (*SignalStore) Prune

func (ss *SignalStore) Prune(cutoff time.Time, maxMetrics, maxLogClusters int) int

Prune bounds the SignalStore (modeled on TraceStore.Prune): MetricNodes and LogClusterNodes whose LastSeen predates cutoff are removed; if either map still exceeds its cap (maxMetrics / maxLogClusters; <=0 = uncapped) the oldest-LastSeen overflow is evicted. Each removed node takes its edges with it — metric MEASURED_BY edges inline, evicted-cluster EMITTED_BY / LOGGED_DURING edges in the final sweep — and any edge whose UpdatedAt predates cutoff is swept too (upsert paths refresh edge timestamps, so live correlations survive). Returns the number of nodes (metrics + clusters) removed.

func (*SignalStore) UpsertLogCluster

func (ss *SignalStore) UpsertLogCluster(id, template, severity, service string, ts time.Time)

func (*SignalStore) UpsertLogClusterWithTemplate

func (ss *SignalStore) UpsertLogClusterWithTemplate(id, template, severity, service string, templateID uint64, tokens []string, sample string, ts time.Time)

UpsertLogClusterWithTemplate is the Drain-aware upsert. It stores the mined template tokens, the stable template ID, and a sample raw log. The older UpsertLogCluster is preserved for backward compatibility.

func (*SignalStore) UpsertMetric

func (ss *SignalStore) UpsertMetric(metricName, service string, value float64, ts time.Time)

type SpanNode

type SpanNode struct {
	ID           string    `json:"id"` // span_id
	TraceID      string    `json:"trace_id"`
	ParentSpanID string    `json:"parent_span_id"`
	Service      string    `json:"service"`
	Operation    string    `json:"operation"`
	Duration     float64   `json:"duration_ms"`
	StatusCode   string    `json:"status_code"`
	IsError      bool      `json:"is_error"`
	Timestamp    time.Time `json:"timestamp"`
}

SpanNode represents a single span within a trace.

type StoreCounts

type StoreCounts struct {
	Tenants      int
	Services     int
	Operations   int
	Traces       int
	Spans        int
	LogClusters  int
	Metrics      int
	Anomalies    int
	ServiceEdges int
	TraceEdges   int
	SignalEdges  int
	AnomalyEdges int
}

StoreCounts is a point-in-time census of every long-lived in-memory structure the coordinator owns, aggregated across tenants. It feeds the otelcontext_graphrag_* gauges so operators can attribute RSS growth to a specific store before reaching for a heap profile.

type Template

type Template struct {
	ID        uint64    `json:"id"`
	Tokens    []string  `json:"tokens"`
	Count     int       `json:"count"`
	FirstSeen time.Time `json:"first_seen"`
	LastSeen  time.Time `json:"last_seen"`
	Sample    string    `json:"sample,omitempty"`
	// contains filtered or unexported fields
}

Template represents a mined log group.

func LoadDrainTemplates

func LoadDrainTemplates(db *gorm.DB, tenant string) ([]Template, error)

LoadDrainTemplates reads persisted Drain templates for the supplied tenant and returns them in a format ready to pass to Drain.LoadTemplates. Returns an empty slice (and nil error) if no rows match.

func (*Template) TemplateString

func (t *Template) TemplateString() string

TemplateString returns the template rendered as a single string.

type TraceNode

type TraceNode struct {
	ID          string    `json:"id"` // trace_id
	RootService string    `json:"root_service"`
	Duration    float64   `json:"duration_ms"`
	Status      string    `json:"status"`
	Timestamp   time.Time `json:"timestamp"`
	SpanCount   int       `json:"span_count"`
}

TraceNode represents a distributed trace.

type TraceStore

type TraceStore struct {
	Traces map[string]*TraceNode // key: trace_id
	Spans  map[string]*SpanNode  // key: span_id
	Edges  map[string]*Edge      // key: type|from|to
	TTL    time.Duration
	// MaxSpans hard-caps the Spans map: at the cap, NEW span IDs are
	// skipped (UpsertSpan returns false) while updates to resident IDs
	// still apply. <=0 disables the cap.
	MaxSpans int
	// contains filtered or unexported fields
}

TraceStore holds trace/span detail with TTL-based pruning.

func (*TraceStore) ErrorSpans

func (ts *TraceStore) ErrorSpans(service string, since time.Time) []*SpanNode

func (*TraceStore) GetSpan

func (ts *TraceStore) GetSpan(spanID string) (*SpanNode, bool)

func (*TraceStore) GetTrace

func (ts *TraceStore) GetTrace(traceID string) (*TraceNode, bool)

func (*TraceStore) Prune

func (ts *TraceStore) Prune() int

Prune removes spans and traces older than TTL.

func (*TraceStore) SpansForTrace

func (ts *TraceStore) SpansForTrace(traceID string) []*SpanNode

func (*TraceStore) UpsertSpan

func (ts *TraceStore) UpsertSpan(span SpanNode) bool

UpsertSpan inserts or updates a span node and its CONTAINS/CHILD_OF edges. Returns false when the span is NEW and MaxSpans is already reached — the span is skipped entirely (the graph is best-effort; the DB is the source of truth, same doctrine as the event-channel overflow in builder.go).

func (*TraceStore) UpsertTrace

func (ts *TraceStore) UpsertTrace(traceID, rootService, status string, durationMs float64, timestamp time.Time)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL