Documentation
¶
Overview ¶
Package graphrag provides a layered in-memory graph for real-time observability retrieval — error chains, root cause analysis, impact analysis. It replaces the simpler internal/graph package with typed stores.
Index ¶
- Constants
- func AutoMigrateGraphRAG(db *gorm.DB) error
- func Preprocess(line string) string
- func SaveDrainTemplates(db *gorm.DB, tenant string, templates []Template) error
- func SetPanicMetrics(m *telemetry.Metrics)
- type AffectedEntry
- type AnomalyNode
- type AnomalySeverity
- type AnomalyStore
- func (as *AnomalyStore) AddAnomaly(anomaly AnomalyNode)
- func (as *AnomalyStore) AddPrecededByEdge(anomalyID, precedingID string, ts time.Time)
- func (as *AnomalyStore) AnomaliesForService(service string, since time.Time) []*AnomalyNode
- func (as *AnomalyStore) AnomaliesSince(since time.Time) []*AnomalyNode
- type AnomalyType
- type Config
- type CorrelatedSignalsResult
- type Drain
- type DrainOption
- type DrainTemplateRow
- type Edge
- type EdgeType
- type ErrorChainResult
- type GraphRAG
- func (g *GraphRAG) AllServiceEdges(ctx context.Context) []*Edge
- func (g *GraphRAG) AnomaliesForService(ctx context.Context, service string, since time.Time) []*AnomalyNode
- func (g *GraphRAG) AnomalyTimeline(ctx context.Context, since time.Time) []*AnomalyNode
- func (g *GraphRAG) CorrelatedSignals(ctx context.Context, service string, since time.Time) *CorrelatedSignalsResult
- func (g *GraphRAG) DependencyChain(ctx context.Context, traceID string) []SpanNode
- func (g *GraphRAG) DroppedLogsCount() int64
- func (g *GraphRAG) DroppedMetricsCount() int64
- func (g *GraphRAG) DroppedSpansCount() int64
- func (g *GraphRAG) ErrorChain(ctx context.Context, service string, since time.Time, limit int) []ErrorChainResult
- func (g *GraphRAG) EventBufferDepth() int
- func (g *GraphRAG) GetGraphSnapshot(ctx context.Context, at time.Time) (*GraphSnapshot, error)
- func (g *GraphRAG) GetInvestigation(ctx context.Context, id string) (*Investigation, error)
- func (g *GraphRAG) GetInvestigations(ctx context.Context, service, severity, status string, limit int) ([]Investigation, error)
- func (g *GraphRAG) ImpactAnalysis(ctx context.Context, service string, maxDepth int) *ImpactResult
- func (g *GraphRAG) InvestigationInsertCount() int64
- func (g *GraphRAG) IsRunning() bool
- func (g *GraphRAG) OnLogIngested(log storage.Log)
- func (g *GraphRAG) OnMetricIngested(metric tsdb.RawMetric)
- func (g *GraphRAG) OnSpanIngested(span storage.Span)
- func (g *GraphRAG) PersistInvestigation(tenant, triggerService string, chains []ErrorChainResult, ...)
- func (g *GraphRAG) RegisterAnomaly(tenant string, anomaly AnomalyNode)
- func (g *GraphRAG) RootCauseAnalysis(ctx context.Context, service string, since time.Time) []RankedCause
- func (g *GraphRAG) ServiceMap(ctx context.Context, depth int) []ServiceMapEntry
- func (g *GraphRAG) ServiceNames(ctx context.Context) []string
- func (g *GraphRAG) SetMetrics(m *telemetry.Metrics)
- func (g *GraphRAG) ShortestPath(ctx context.Context, from, to string) []string
- func (g *GraphRAG) SimilarErrors(ctx context.Context, clusterID string, k int) []LogClusterNode
- func (g *GraphRAG) Start(ctx context.Context)
- func (g *GraphRAG) Stop()
- type GraphSnapshot
- type ImpactResult
- type Investigation
- type LogClusterNode
- type MetricNode
- type NodeType
- type OperationNode
- type RankedCause
- type RootCauseInfo
- type ServiceMapEntry
- type ServiceNode
- type ServiceStore
- func (s *ServiceStore) AllEdges() []*Edge
- func (s *ServiceStore) AllServices() []*ServiceNode
- func (s *ServiceStore) CallEdgesFrom(service string) []*Edge
- func (s *ServiceStore) CallEdgesTo(service string) []*Edge
- func (s *ServiceStore) GetService(name string) (*ServiceNode, bool)
- func (s *ServiceStore) UpsertCallEdge(source, target string, durationMs float64, isError bool, ts time.Time)
- func (s *ServiceStore) UpsertOperation(service, operation string, durationMs float64, isError bool, ts time.Time)
- func (s *ServiceStore) UpsertService(name string, durationMs float64, isError bool, ts time.Time)
- type SignalStore
- func (ss *SignalStore) AddLoggedDuringEdge(clusterID, spanID string, ts time.Time)
- func (ss *SignalStore) LogClustersForService(service string) []*LogClusterNode
- func (ss *SignalStore) MetricsForService(service string) []*MetricNode
- func (ss *SignalStore) UpsertLogCluster(id, template, severity, service string, ts time.Time)
- func (ss *SignalStore) UpsertLogClusterWithTemplate(id, template, severity, service string, templateID uint64, tokens []string, ...)
- func (ss *SignalStore) UpsertMetric(metricName, service string, value float64, ts time.Time)
- type SpanNode
- type Template
- type TraceNode
- type TraceStore
- func (ts *TraceStore) ErrorSpans(service string, since time.Time) []*SpanNode
- func (ts *TraceStore) GetSpan(spanID string) (*SpanNode, bool)
- func (ts *TraceStore) GetTrace(traceID string) (*TraceNode, bool)
- func (ts *TraceStore) Prune() int
- func (ts *TraceStore) SpansForTrace(traceID string) []*SpanNode
- func (ts *TraceStore) UpsertSpan(span SpanNode)
- func (ts *TraceStore) UpsertTrace(traceID, rootService, status string, durationMs float64, timestamp time.Time)
Constants ¶
const Wildcard = "<*>"
Wildcard is the placeholder used for variable positions within a template.
Variables ¶
This section is empty.
Functions ¶
func AutoMigrateGraphRAG ¶
AutoMigrateGraphRAG runs GORM auto-migration for GraphRAG models and applies tenant backfill + drain_templates composite-PK promotion. Safe to call repeatedly.
func Preprocess ¶
Preprocess masks common variable patterns in a raw log line prior to tokenization. This implements Drain's configurable regex-replacement stage.
func SaveDrainTemplates ¶
SaveDrainTemplates upserts the given templates into the drain_templates table under the supplied tenant. Tokens are JSON-encoded. Uses GORM's clause.OnConflict upsert which is dialect-aware (ON CONFLICT for SQLite/ PostgreSQL, ON DUPLICATE KEY UPDATE for MySQL).
The conflict target is the composite (tenant_id, id) primary key so the same Drain template hash can coexist across tenants — a future per-tenant Drain miner can rely on this to keep cluster IDs stable per tenant.
func SetPanicMetrics ¶
SetPanicMetrics wires the telemetry metrics so GraphRAG worker recovery closures can increment OtelContext_panics_recovered_total{subsystem="graphrag"}. Safe to leave unset in tests.
Types ¶
type AffectedEntry ¶
type AffectedEntry struct {
Service string `json:"service"`
Depth int `json:"depth"`
CallCount int64 `json:"call_count"`
ImpactScore float64 `json:"impact_score"`
}
AffectedEntry is a service affected by an upstream failure.
type AnomalyNode ¶
type AnomalyNode struct {
ID string `json:"id"`
Type AnomalyType `json:"type"`
Severity AnomalySeverity `json:"severity"`
Service string `json:"service"`
Evidence string `json:"evidence"`
Timestamp time.Time `json:"timestamp"`
}
AnomalyNode represents a detected anomaly.
type AnomalySeverity ¶
type AnomalySeverity string
AnomalySeverity indicates the severity of an anomaly.
const ( SeverityCritical AnomalySeverity = "critical" SeverityWarning AnomalySeverity = "warning" SeverityInfo AnomalySeverity = "info" )
type AnomalyStore ¶
type AnomalyStore struct {
Anomalies map[string]*AnomalyNode // key: anomaly ID
Edges map[string]*Edge // key: type|from|to
// contains filtered or unexported fields
}
AnomalyStore holds detected anomalies and their temporal correlations.
func (*AnomalyStore) AddAnomaly ¶
func (as *AnomalyStore) AddAnomaly(anomaly AnomalyNode)
func (*AnomalyStore) AddPrecededByEdge ¶
func (as *AnomalyStore) AddPrecededByEdge(anomalyID, precedingID string, ts time.Time)
func (*AnomalyStore) AnomaliesForService ¶
func (as *AnomalyStore) AnomaliesForService(service string, since time.Time) []*AnomalyNode
func (*AnomalyStore) AnomaliesSince ¶
func (as *AnomalyStore) AnomaliesSince(since time.Time) []*AnomalyNode
type AnomalyType ¶
type AnomalyType string
AnomalyType indicates the kind of anomaly detected.
const ( AnomalyErrorSpike AnomalyType = "error_spike" AnomalyLatencySpike AnomalyType = "latency_spike" AnomalyMetricZScore AnomalyType = "metric_zscore" )
type Config ¶
type Config struct {
TraceTTL time.Duration
RefreshEvery time.Duration
SnapshotEvery time.Duration
AnomalyEvery time.Duration
WorkerCount int
ChannelSize int
}
Config holds GraphRAG configuration.
type CorrelatedSignalsResult ¶
type CorrelatedSignalsResult struct {
}
CorrelatedSignals gathers all related signals for a service within a time range.
type Drain ¶
type Drain struct {
// contains filtered or unexported fields
}
Drain is a thread-safe log template miner.
func NewDrain ¶
func NewDrain(opts ...DrainOption) *Drain
NewDrain constructs a Drain miner with the supplied options.
func (*Drain) LoadTemplates ¶
LoadTemplates restores templates from a previous snapshot. Existing state is cleared. Intended for startup recovery. The LRU list is rebuilt by inserting templates in LastSeen order (oldest first, each PushFront'd), so the most recently-seen template ends up at the front.
type DrainOption ¶
type DrainOption func(*Drain)
DrainOption configures a Drain instance.
func WithDepth ¶
func WithDepth(depth int) DrainOption
WithDepth sets the prefix tree depth (default 4). Minimum 2.
func WithMaxChildren ¶
func WithMaxChildren(n int) DrainOption
WithMaxChildren sets the max distinct children per internal node (default 100). Beyond this, tokens collapse to Wildcard.
func WithMaxTemplates ¶
func WithMaxTemplates(n int) DrainOption
WithMaxTemplates sets the total template cap (default 50000). When exceeded, the least-recently-seen template is evicted.
func WithSimilarityThreshold ¶
func WithSimilarityThreshold(st float64) DrainOption
WithSimilarityThreshold sets st (default 0.4). Clamped to (0, 1].
type DrainTemplateRow ¶
type DrainTemplateRow struct {
TenantID string `gorm:"primaryKey;size:64;default:'default';not null" json:"tenant_id"`
ID int64 `gorm:"primaryKey;autoIncrement:false" json:"id"` // int64(Template.ID)
Tokens string `gorm:"type:text;not null" json:"tokens"` // JSON-encoded []string
Count int `json:"count"`
FirstSeen time.Time `gorm:"index" json:"first_seen"`
LastSeen time.Time `gorm:"index" json:"last_seen"`
Sample string `gorm:"type:text" json:"sample"`
}
DrainTemplateRow is the persisted GORM representation of a Drain log template. Tokens are JSON-encoded to stay schema-simple across SQLite/ MySQL/PostgreSQL/MSSQL.
ID is stored as int64 (bit-reinterpretation of the uint64 FNV-64 hash): the standard SQL drivers reject uint64 values with the high bit set, and signed int64 carries the same 64 bits without loss. Conversion happens in the persistence helpers.
The primary key is composite (tenant_id, id): the same template tokens can legitimately recur across tenants, and we want the cluster ID to stay stable per tenant once the in-memory Drain miner is partitioned per-tenant. TenantID is declared first so it leads the PK index.
func (DrainTemplateRow) TableName ¶
func (DrainTemplateRow) TableName() string
TableName overrides GORM's default table name.
type Edge ¶
type Edge struct {
Type EdgeType `json:"type"`
FromID string `json:"from_id"`
ToID string `json:"to_id"`
Weight float64 `json:"weight,omitempty"`
CallCount int64 `json:"call_count,omitempty"`
ErrorRate float64 `json:"error_rate,omitempty"`
AvgMs float64 `json:"avg_latency_ms,omitempty"`
TotalMs float64 `json:"-"`
ErrorCount int64 `json:"-"`
UpdatedAt time.Time `json:"updated_at"`
}
Edge represents a directed relationship between two nodes.
type EdgeType ¶
type EdgeType string
EdgeType distinguishes different relationship categories.
const ( EdgeCalls EdgeType = "CALLS" EdgeExposes EdgeType = "EXPOSES" EdgeContains EdgeType = "CONTAINS" EdgeChildOf EdgeType = "CHILD_OF" EdgeEmittedBy EdgeType = "EMITTED_BY" EdgeLoggedDuring EdgeType = "LOGGED_DURING" EdgeMeasuredBy EdgeType = "MEASURED_BY" EdgePrecededBy EdgeType = "PRECEDED_BY" EdgeTriggeredBy EdgeType = "TRIGGERED_BY" )
type ErrorChainResult ¶
type ErrorChainResult struct {
RootCause *RootCauseInfo `json:"root_cause"`
SpanChain []SpanNode `json:"span_chain"`
AnomalousMetrics []MetricNode `json:"anomalous_metrics,omitempty"`
TraceID string `json:"trace_id"`
}
ErrorChainResult is the output of an error chain query.
type GraphRAG ¶
type GraphRAG struct {
// contains filtered or unexported fields
}
GraphRAG is the main coordinator for the layered graph system.
Every in-memory store is partitioned by tenant. The coordinator holds a map of tenant ID → *tenantStores and a reader/writer mutex that protects only the outer map; per-tenant stores keep their own RWMutexes for fine-grained concurrent access. All event ingestion and queries route through storesFor(ctx) / storesForTenant(tenant) — there is no "global" slice.
func New ¶
func New(repo *storage.Repository, vectorIdx *vectordb.Index, tsdbAgg *tsdb.Aggregator, ringBuf *tsdb.RingBuffer, cfg Config) *GraphRAG
New creates a new GraphRAG coordinator.
func (*GraphRAG) AllServiceEdges ¶
AllServiceEdges returns every edge in the caller's tenant's ServiceStore. Kept as a narrow helper so API handlers do not need to traverse the tenantStores composite themselves.
func (*GraphRAG) AnomaliesForService ¶
func (g *GraphRAG) AnomaliesForService(ctx context.Context, service string, since time.Time) []*AnomalyNode
AnomaliesForService is a tenant-aware read-through to the per-tenant AnomalyStore, exported so handlers outside this package never need to reach into the store maps directly.
func (*GraphRAG) AnomalyTimeline ¶
AnomalyTimeline returns recent anomalies sorted by time.
func (*GraphRAG) CorrelatedSignals ¶
func (*GraphRAG) DependencyChain ¶
DependencyChain returns the full span tree for a trace.
func (*GraphRAG) DroppedLogsCount ¶
DroppedLogsCount reports the number of log events dropped because the ingestion channel was full.
func (*GraphRAG) DroppedMetricsCount ¶
DroppedMetricsCount reports the number of metric events dropped because the ingestion channel was full.
func (*GraphRAG) DroppedSpansCount ¶
DroppedSpansCount reports the number of span events dropped because the ingestion channel was full. Exported for tests and readiness probes; atomic, safe from any goroutine.
func (*GraphRAG) ErrorChain ¶
func (g *GraphRAG) ErrorChain(ctx context.Context, service string, since time.Time, limit int) []ErrorChainResult
ErrorChain traces error spans upstream to find the root cause service. The tenant slice is selected via ctx — callers without a tenant ctx collapse to storage.DefaultTenantID at the coordinator boundary.
func (*GraphRAG) EventBufferDepth ¶
EventBufferDepth returns the current number of events queued in the ingestion channel. Exported for telemetry polling; never blocks.
func (*GraphRAG) GetGraphSnapshot ¶
GetGraphSnapshot retrieves the snapshot closest to the requested time, scoped to the tenant carried by ctx. The composite (tenant_id, created_at) index supports the descending lookup.
func (*GraphRAG) GetInvestigation ¶
GetInvestigation retrieves a single investigation by ID, scoped to the tenant carried by ctx. Returning ErrRecordNotFound for cross-tenant lookups prevents id-guessing from leaking another tenant's row.
func (*GraphRAG) GetInvestigations ¶
func (g *GraphRAG) GetInvestigations(ctx context.Context, service, severity, status string, limit int) ([]Investigation, error)
GetInvestigations queries persisted investigations scoped to the tenant carried by ctx. The composite (tenant_id, created_at) index supports the recency-ordered scan.
func (*GraphRAG) ImpactAnalysis ¶
ImpactAnalysis performs BFS downstream from a service to find affected services.
func (*GraphRAG) InvestigationInsertCount ¶
InvestigationInsertCount reports cooldown-allowed PersistInvestigation calls. Semantics: this counter increments when the cooldown check passes, BEFORE the DB write — so a subsequent DB failure still increments this. It is NOT a strict DB insert count. Intended for tests to assert cooldown behavior without requiring a live repo.
func (*GraphRAG) IsRunning ¶
IsRunning reports whether the coordinator's stop channel has not been closed. Used by readiness probes to confirm the background workers are still live.
func (*GraphRAG) OnLogIngested ¶
OnLogIngested is the callback wired into the log ingestion pipeline.
func (*GraphRAG) OnMetricIngested ¶
OnMetricIngested is the callback wired into the metric ingestion pipeline. tsdb.RawMetric already carries a resolved TenantID (set in ingest/otlp.go Export), so we read it here instead of adding a second argument — keeping the metric callback signature identical across TSDB and GraphRAG.
func (*GraphRAG) OnSpanIngested ¶
OnSpanIngested is the callback wired into the trace ingestion pipeline. Tenant is taken straight from the persisted Span (already resolved upstream by the OTLP Export handlers) and carried on the event — the callback signature is intentionally unchanged so external wiring stays trivial.
func (*GraphRAG) PersistInvestigation ¶
func (g *GraphRAG) PersistInvestigation(tenant, triggerService string, chains []ErrorChainResult, anomalies []*AnomalyNode)
PersistInvestigation saves an investigation record from an error chain analysis. Tenant is accepted explicitly so the caller (the per-tenant anomaly loop) can re-enter ImpactAnalysis on the correct tenant slice and so the persisted row carries its originating tenant_id.
func (*GraphRAG) RegisterAnomaly ¶
func (g *GraphRAG) RegisterAnomaly(tenant string, anomaly AnomalyNode)
RegisterAnomaly inserts an anomaly into the AnomalyStore for tenant. Mirrors PersistInvestigation's "tenant accepted explicitly" shape so out-of-band anomaly producers (synthetic detectors, integration tests, future external anomaly feeds) can land directly on the right tenant slice without going through the metric/error detection loops. Empty tenant collapses to storage.DefaultTenantID.
func (*GraphRAG) RootCauseAnalysis ¶
func (g *GraphRAG) RootCauseAnalysis(ctx context.Context, service string, since time.Time) []RankedCause
RootCauseAnalysis combines ErrorChain with anomaly correlation to rank probable causes.
func (*GraphRAG) ServiceMap ¶
func (g *GraphRAG) ServiceMap(ctx context.Context, depth int) []ServiceMapEntry
func (*GraphRAG) ServiceNames ¶
ServiceNames returns every service the caller's tenant has emitted any span for, sorted ascending. Reads from the in-memory ServiceStore — no DB scan. Used by /api/metadata/services so the dropdown matches the system map exactly (both are sourced from the same store).
func (*GraphRAG) SetMetrics ¶
SetMetrics wires the Prometheus registry so GraphRAG event drops are observable via otelcontext_graphrag_events_dropped_total. Safe to call before Start; pass nil to disable Prometheus recording (atomic counters still tick).
func (*GraphRAG) ShortestPath ¶
ShortestPath finds the shortest path between two services using Dijkstra.
func (*GraphRAG) SimilarErrors ¶
SimilarErrors finds log clusters similar to a given cluster using the vector index, scoped to the tenant carried on ctx. Cross-tenant hits are impossible because the underlying vectordb partitions docs per tenant and this lookup resolves the SignalStore through storesFor(ctx).
type GraphSnapshot ¶
type GraphSnapshot struct {
TenantID string `gorm:"size:64;default:'default';not null;index:idx_graph_snapshots_tenant_created,priority:1" json:"tenant_id"`
ID string `gorm:"primaryKey;size:64" json:"id"`
CreatedAt time.Time `gorm:"index:idx_graph_snapshots_tenant_created,priority:2" json:"created_at"`
Nodes json.RawMessage `gorm:"type:text" json:"nodes"`
Edges json.RawMessage `gorm:"type:text" json:"edges"`
ServiceCount int `json:"service_count"`
TotalCalls int64 `json:"total_calls"`
AvgHealthScore float64 `json:"avg_health_score"`
}
GraphSnapshot is a periodic snapshot of the service topology persisted to DB.
TenantID scopes the row to the tenant slice it was captured from. The composite (tenant_id, created_at) index supports the "most recent snapshot at-or-before T for tenant X" lookup that GetGraphSnapshot runs on every read.
func (GraphSnapshot) TableName ¶
func (GraphSnapshot) TableName() string
TableName overrides GORM's default table name.
type ImpactResult ¶
type ImpactResult struct {
Service string `json:"service"`
AffectedServices []AffectedEntry `json:"affected_services"`
TotalDownstream int `json:"total_downstream"`
}
ImpactResult describes the blast radius of a service failure.
type Investigation ¶
type Investigation struct {
TenantID string `gorm:"size:64;default:'default';not null;index:idx_investigations_tenant_created,priority:1" json:"tenant_id"`
ID string `gorm:"primaryKey;size:64" json:"id"`
CreatedAt time.Time `gorm:"index:idx_investigations_tenant_created,priority:2" json:"created_at"`
Status string `gorm:"size:20" json:"status"` // detected, triaged, resolved
Severity string `gorm:"size:20" json:"severity"` // critical, warning, info
TriggerService string `gorm:"size:255;index" json:"trigger_service"`
TriggerOperation string `gorm:"size:255" json:"trigger_operation"`
ErrorMessage string `gorm:"type:text" json:"error_message"`
RootService string `gorm:"size:255" json:"root_service"`
RootOperation string `gorm:"size:255" json:"root_operation"`
CausalChain json.RawMessage `gorm:"type:text" json:"causal_chain"`
TraceIDs json.RawMessage `gorm:"type:text" json:"trace_ids"`
ErrorLogs json.RawMessage `gorm:"type:text" json:"error_logs"`
AnomalousMetrics json.RawMessage `gorm:"type:text" json:"anomalous_metrics"`
AffectedServices json.RawMessage `gorm:"type:text" json:"affected_services"`
SpanChain json.RawMessage `gorm:"type:text" json:"span_chain"`
}
Investigation is a persisted record of an automated error investigation.
TenantID scopes the row to its originating tenant. The composite (tenant_id, created_at) index supports the recency-ordered "investigations for tenant X" query that GetInvestigations runs on every read.
func (Investigation) TableName ¶
func (Investigation) TableName() string
TableName overrides GORM's default table name.
type LogClusterNode ¶
type LogClusterNode struct {
ID string `json:"id"` // service-scoped cluster id (stable)
Template string `json:"template"`
TemplateID uint64 `json:"template_id,omitempty"`
TemplateTokens []string `json:"template_tokens,omitempty"`
SampleLog string `json:"sample_log,omitempty"`
Count int64 `json:"count"`
FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"`
SeverityDist map[string]int64 `json:"severity_distribution"`
}
LogClusterNode groups similar log messages.
Clustering is performed by the Drain template miner. TemplateID is the stable FNV-64 hash of TemplateTokens; ID is the user-facing cluster identifier (service-scoped) that remains stable across Drain re-merges.
type MetricNode ¶
type MetricNode struct {
ID string `json:"id"` // metric_name + "|" + service
MetricName string `json:"metric_name"`
Service string `json:"service"`
RollingMin float64 `json:"rolling_min"`
RollingMax float64 `json:"rolling_max"`
RollingAvg float64 `json:"rolling_avg"`
SampleCount int64 `json:"sample_count"`
LastSeen time.Time `json:"last_seen"`
}
MetricNode represents a metric series for a service.
type OperationNode ¶
type OperationNode struct {
ID string `json:"id"` // service + "|" + operation
Service string `json:"service"`
Operation string `json:"operation"`
FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"`
HealthScore float64 `json:"health_score"`
CallCount int64 `json:"call_count"`
ErrorCount int64 `json:"error_count"`
ErrorRate float64 `json:"error_rate"`
AvgLatency float64 `json:"avg_latency_ms"`
P50Latency float64 `json:"p50_latency_ms"`
P95Latency float64 `json:"p95_latency_ms"`
P99Latency float64 `json:"p99_latency_ms"`
TotalMs float64 `json:"-"`
}
OperationNode represents an endpoint/RPC within a service.
type RankedCause ¶
type RankedCause struct {
Service string `json:"service"`
Operation string `json:"operation"`
Score float64 `json:"score"`
Evidence []string `json:"evidence"`
ErrorChain []SpanNode `json:"error_chain,omitempty"`
Anomalies []AnomalyNode `json:"anomalies,omitempty"`
}
RankedCause is a probable root cause with evidence.
type RootCauseInfo ¶
type RootCauseInfo struct {
Service string `json:"service"`
Operation string `json:"operation"`
ErrorMessage string `json:"error_message"`
SpanID string `json:"span_id"`
TraceID string `json:"trace_id"`
}
RootCauseInfo identifies the responsible service and operation.
type ServiceMapEntry ¶
type ServiceMapEntry struct {
Service *ServiceNode `json:"service"`
Operations []*OperationNode `json:"operations,omitempty"`
CallsTo []*Edge `json:"calls_to,omitempty"`
CalledBy []*Edge `json:"called_by,omitempty"`
}
ServiceMap returns the service topology with health scores for the API.
type ServiceNode ¶
type ServiceNode struct {
ID string `json:"id"`
Name string `json:"name"`
FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"`
HealthScore float64 `json:"health_score"` // 0.0–1.0
CallCount int64 `json:"call_count"`
ErrorCount int64 `json:"error_count"`
ErrorRate float64 `json:"error_rate"`
AvgLatency float64 `json:"avg_latency_ms"`
TotalMs float64 `json:"-"` // for computing avg
}
ServiceNode represents a microservice with aggregated health stats.
type ServiceStore ¶
type ServiceStore struct {
Services map[string]*ServiceNode // key: service name
Operations map[string]*OperationNode // key: service|operation
Edges map[string]*Edge // key: type|from|to
// contains filtered or unexported fields
}
ServiceStore holds permanent service topology data.
func (*ServiceStore) AllEdges ¶
func (s *ServiceStore) AllEdges() []*Edge
func (*ServiceStore) AllServices ¶
func (s *ServiceStore) AllServices() []*ServiceNode
func (*ServiceStore) CallEdgesFrom ¶
func (s *ServiceStore) CallEdgesFrom(service string) []*Edge
func (*ServiceStore) CallEdgesTo ¶
func (s *ServiceStore) CallEdgesTo(service string) []*Edge
func (*ServiceStore) GetService ¶
func (s *ServiceStore) GetService(name string) (*ServiceNode, bool)
func (*ServiceStore) UpsertCallEdge ¶
func (*ServiceStore) UpsertOperation ¶
func (*ServiceStore) UpsertService ¶
type SignalStore ¶
type SignalStore struct {
LogClusters map[string]*LogClusterNode // key: cluster ID
Metrics map[string]*MetricNode // key: metric|service
Edges map[string]*Edge // key: type|from|to
// contains filtered or unexported fields
}
SignalStore holds log cluster and metric correlation data.
func (*SignalStore) AddLoggedDuringEdge ¶
func (ss *SignalStore) AddLoggedDuringEdge(clusterID, spanID string, ts time.Time)
func (*SignalStore) LogClustersForService ¶
func (ss *SignalStore) LogClustersForService(service string) []*LogClusterNode
func (*SignalStore) MetricsForService ¶
func (ss *SignalStore) MetricsForService(service string) []*MetricNode
func (*SignalStore) UpsertLogCluster ¶
func (ss *SignalStore) UpsertLogCluster(id, template, severity, service string, ts time.Time)
func (*SignalStore) UpsertLogClusterWithTemplate ¶
func (ss *SignalStore) UpsertLogClusterWithTemplate(id, template, severity, service string, templateID uint64, tokens []string, sample string, ts time.Time)
UpsertLogClusterWithTemplate is the Drain-aware upsert. It stores the mined template tokens, the stable template ID, and a sample raw log. The older UpsertLogCluster is preserved for backward compatibility.
func (*SignalStore) UpsertMetric ¶
func (ss *SignalStore) UpsertMetric(metricName, service string, value float64, ts time.Time)
type SpanNode ¶
type SpanNode struct {
ID string `json:"id"` // span_id
TraceID string `json:"trace_id"`
ParentSpanID string `json:"parent_span_id"`
Service string `json:"service"`
Operation string `json:"operation"`
Duration float64 `json:"duration_ms"`
StatusCode string `json:"status_code"`
IsError bool `json:"is_error"`
Timestamp time.Time `json:"timestamp"`
}
SpanNode represents a single span within a trace.
type Template ¶
type Template struct {
ID uint64 `json:"id"`
Tokens []string `json:"tokens"`
Count int `json:"count"`
FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"`
Sample string `json:"sample,omitempty"`
// contains filtered or unexported fields
}
Template represents a mined log group.
func LoadDrainTemplates ¶
LoadDrainTemplates reads persisted Drain templates for the supplied tenant and returns them in a format ready to pass to Drain.LoadTemplates. Returns an empty slice (and nil error) if no rows match.
func (*Template) TemplateString ¶
TemplateString returns the template rendered as a single string.
type TraceNode ¶
type TraceNode struct {
ID string `json:"id"` // trace_id
RootService string `json:"root_service"`
Duration float64 `json:"duration_ms"`
Status string `json:"status"`
Timestamp time.Time `json:"timestamp"`
SpanCount int `json:"span_count"`
}
TraceNode represents a distributed trace.
type TraceStore ¶
type TraceStore struct {
Traces map[string]*TraceNode // key: trace_id
Spans map[string]*SpanNode // key: span_id
Edges map[string]*Edge // key: type|from|to
TTL time.Duration
// contains filtered or unexported fields
}
TraceStore holds trace/span detail with TTL-based pruning.
func (*TraceStore) ErrorSpans ¶
func (ts *TraceStore) ErrorSpans(service string, since time.Time) []*SpanNode
func (*TraceStore) Prune ¶
func (ts *TraceStore) Prune() int
Prune removes spans and traces older than TTL.
func (*TraceStore) SpansForTrace ¶
func (ts *TraceStore) SpansForTrace(traceID string) []*SpanNode
func (*TraceStore) UpsertSpan ¶
func (ts *TraceStore) UpsertSpan(span SpanNode)
func (*TraceStore) UpsertTrace ¶
func (ts *TraceStore) UpsertTrace(traceID, rootService, status string, durationMs float64, timestamp time.Time)