Documentation
¶
Overview ¶
Package graphrag provides a layered in-memory graph for real-time observability retrieval — error chains, root cause analysis, impact analysis. It replaces the simpler internal/graph package with typed stores.
Index ¶
- func AutoMigrateGraphRAG(db *gorm.DB) error
- type AffectedEntry
- type AnomalyNode
- type AnomalySeverity
- type AnomalyStore
- func (as *AnomalyStore) AddAnomaly(anomaly AnomalyNode)
- func (as *AnomalyStore) AddPrecededByEdge(anomalyID, precedingID string, ts time.Time)
- func (as *AnomalyStore) AnomaliesForService(service string, since time.Time) []*AnomalyNode
- func (as *AnomalyStore) AnomaliesSince(since time.Time) []*AnomalyNode
- type AnomalyType
- type Config
- type CorrelatedSignalsResult
- type Edge
- type EdgeType
- type ErrorChainResult
- type GraphRAG
- func (g *GraphRAG) AnomalyTimeline(since time.Time) []*AnomalyNode
- func (g *GraphRAG) CorrelatedSignals(service string, since time.Time) *CorrelatedSignalsResult
- func (g *GraphRAG) DependencyChain(traceID string) []SpanNode
- func (g *GraphRAG) ErrorChain(service string, since time.Time, limit int) []ErrorChainResult
- func (g *GraphRAG) GetGraphSnapshot(at time.Time) (*GraphSnapshot, error)
- func (g *GraphRAG) GetInvestigation(id string) (*Investigation, error)
- func (g *GraphRAG) GetInvestigations(service, severity, status string, limit int) ([]Investigation, error)
- func (g *GraphRAG) ImpactAnalysis(service string, maxDepth int) *ImpactResult
- func (g *GraphRAG) OnLogIngested(log storage.Log)
- func (g *GraphRAG) OnMetricIngested(metric tsdb.RawMetric)
- func (g *GraphRAG) OnSpanIngested(span storage.Span)
- func (g *GraphRAG) PersistInvestigation(triggerService string, chains []ErrorChainResult, anomalies []*AnomalyNode)
- func (g *GraphRAG) RootCauseAnalysis(service string, since time.Time) []RankedCause
- func (g *GraphRAG) ServiceMap(depth int) []ServiceMapEntry
- func (g *GraphRAG) ShortestPath(from, to string) []string
- func (g *GraphRAG) SimilarErrors(clusterID string, k int) []LogClusterNode
- func (g *GraphRAG) Start(ctx context.Context)
- func (g *GraphRAG) Stop()
- type GraphSnapshot
- type ImpactResult
- type Investigation
- type LogClusterNode
- type MetricNode
- type NodeType
- type OperationNode
- type RankedCause
- type RootCauseInfo
- type ServiceMapEntry
- type ServiceNode
- type ServiceStore
- func (s *ServiceStore) AllEdges() []*Edge
- func (s *ServiceStore) AllServices() []*ServiceNode
- func (s *ServiceStore) CallEdgesFrom(service string) []*Edge
- func (s *ServiceStore) CallEdgesTo(service string) []*Edge
- func (s *ServiceStore) GetService(name string) (*ServiceNode, bool)
- func (s *ServiceStore) UpsertCallEdge(source, target string, durationMs float64, isError bool, ts time.Time)
- func (s *ServiceStore) UpsertOperation(service, operation string, durationMs float64, isError bool, ts time.Time)
- func (s *ServiceStore) UpsertService(name string, durationMs float64, isError bool, ts time.Time)
- type SignalStore
- func (ss *SignalStore) AddLoggedDuringEdge(clusterID, spanID string, ts time.Time)
- func (ss *SignalStore) LogClustersForService(service string) []*LogClusterNode
- func (ss *SignalStore) MetricsForService(service string) []*MetricNode
- func (ss *SignalStore) UpsertLogCluster(id, template, severity, service string, ts time.Time)
- func (ss *SignalStore) UpsertMetric(metricName, service string, value float64, ts time.Time)
- type SpanNode
- type TraceNode
- type TraceStore
- func (ts *TraceStore) ErrorSpans(service string, since time.Time) []*SpanNode
- func (ts *TraceStore) GetSpan(spanID string) (*SpanNode, bool)
- func (ts *TraceStore) GetTrace(traceID string) (*TraceNode, bool)
- func (ts *TraceStore) Prune() int
- func (ts *TraceStore) SpansForTrace(traceID string) []*SpanNode
- func (ts *TraceStore) UpsertSpan(span SpanNode)
- func (ts *TraceStore) UpsertTrace(traceID, rootService, status string, durationMs float64, timestamp time.Time)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AutoMigrateGraphRAG ¶
AutoMigrateGraphRAG runs GORM auto-migration for GraphRAG models.
Types ¶
type AffectedEntry ¶
type AffectedEntry struct {
Service string `json:"service"`
Depth int `json:"depth"`
CallCount int64 `json:"call_count"`
ImpactScore float64 `json:"impact_score"`
}
AffectedEntry is a service affected by an upstream failure.
type AnomalyNode ¶
type AnomalyNode struct {
ID string `json:"id"`
Type AnomalyType `json:"type"`
Severity AnomalySeverity `json:"severity"`
Service string `json:"service"`
Evidence string `json:"evidence"`
Timestamp time.Time `json:"timestamp"`
}
AnomalyNode represents a detected anomaly.
type AnomalySeverity ¶
type AnomalySeverity string
AnomalySeverity indicates the severity of an anomaly.
const ( SeverityCritical AnomalySeverity = "critical" SeverityWarning AnomalySeverity = "warning" SeverityInfo AnomalySeverity = "info" )
type AnomalyStore ¶
type AnomalyStore struct {
Anomalies map[string]*AnomalyNode // key: anomaly ID
Edges map[string]*Edge // key: type|from|to
// contains filtered or unexported fields
}
AnomalyStore holds detected anomalies and their temporal correlations.
func (*AnomalyStore) AddAnomaly ¶
func (as *AnomalyStore) AddAnomaly(anomaly AnomalyNode)
func (*AnomalyStore) AddPrecededByEdge ¶
func (as *AnomalyStore) AddPrecededByEdge(anomalyID, precedingID string, ts time.Time)
func (*AnomalyStore) AnomaliesForService ¶
func (as *AnomalyStore) AnomaliesForService(service string, since time.Time) []*AnomalyNode
func (*AnomalyStore) AnomaliesSince ¶
func (as *AnomalyStore) AnomaliesSince(since time.Time) []*AnomalyNode
type AnomalyType ¶
type AnomalyType string
AnomalyType indicates the kind of anomaly detected.
const ( AnomalyErrorSpike AnomalyType = "error_spike" AnomalyLatencySpike AnomalyType = "latency_spike" AnomalyMetricZScore AnomalyType = "metric_zscore" )
type Config ¶
type Config struct {
TraceTTL time.Duration
RefreshEvery time.Duration
SnapshotEvery time.Duration
AnomalyEvery time.Duration
WorkerCount int
ChannelSize int
}
Config holds GraphRAG configuration.
type CorrelatedSignalsResult ¶
type CorrelatedSignalsResult struct {
}
CorrelatedSignals gathers all related signals for a service within a time range.
type Edge ¶
type Edge struct {
Type EdgeType `json:"type"`
FromID string `json:"from_id"`
ToID string `json:"to_id"`
Weight float64 `json:"weight,omitempty"`
CallCount int64 `json:"call_count,omitempty"`
ErrorRate float64 `json:"error_rate,omitempty"`
AvgMs float64 `json:"avg_latency_ms,omitempty"`
TotalMs float64 `json:"-"`
ErrorCount int64 `json:"-"`
UpdatedAt time.Time `json:"updated_at"`
}
Edge represents a directed relationship between two nodes.
type EdgeType ¶
type EdgeType string
EdgeType distinguishes different relationship categories.
const ( EdgeCalls EdgeType = "CALLS" EdgeExposes EdgeType = "EXPOSES" EdgeContains EdgeType = "CONTAINS" EdgeChildOf EdgeType = "CHILD_OF" EdgeEmittedBy EdgeType = "EMITTED_BY" EdgeLoggedDuring EdgeType = "LOGGED_DURING" EdgeMeasuredBy EdgeType = "MEASURED_BY" EdgePrecededBy EdgeType = "PRECEDED_BY" EdgeTriggeredBy EdgeType = "TRIGGERED_BY" )
type ErrorChainResult ¶
type ErrorChainResult struct {
RootCause *RootCauseInfo `json:"root_cause"`
SpanChain []SpanNode `json:"span_chain"`
AnomalousMetrics []MetricNode `json:"anomalous_metrics,omitempty"`
TraceID string `json:"trace_id"`
}
ErrorChainResult is the output of an error chain query.
type GraphRAG ¶
type GraphRAG struct {
ServiceStore *ServiceStore
TraceStore *TraceStore
SignalStore *SignalStore
AnomalyStore *AnomalyStore
// contains filtered or unexported fields
}
GraphRAG is the main coordinator for the layered graph system.
func New ¶
func New(repo *storage.Repository, vectorIdx *vectordb.Index, tsdbAgg *tsdb.Aggregator, ringBuf *tsdb.RingBuffer, cfg Config) *GraphRAG
New creates a new GraphRAG coordinator.
func (*GraphRAG) AnomalyTimeline ¶
func (g *GraphRAG) AnomalyTimeline(since time.Time) []*AnomalyNode
AnomalyTimeline returns recent anomalies sorted by time.
func (*GraphRAG) CorrelatedSignals ¶
func (g *GraphRAG) CorrelatedSignals(service string, since time.Time) *CorrelatedSignalsResult
func (*GraphRAG) DependencyChain ¶
DependencyChain returns the full span tree for a trace.
func (*GraphRAG) ErrorChain ¶
ErrorChain traces error spans upstream to find the root cause service.
func (*GraphRAG) GetGraphSnapshot ¶
func (g *GraphRAG) GetGraphSnapshot(at time.Time) (*GraphSnapshot, error)
GetGraphSnapshot retrieves the snapshot closest to the requested time.
func (*GraphRAG) GetInvestigation ¶
func (g *GraphRAG) GetInvestigation(id string) (*Investigation, error)
GetInvestigation retrieves a single investigation by ID.
func (*GraphRAG) GetInvestigations ¶
func (g *GraphRAG) GetInvestigations(service, severity, status string, limit int) ([]Investigation, error)
GetInvestigations queries persisted investigations.
func (*GraphRAG) ImpactAnalysis ¶
func (g *GraphRAG) ImpactAnalysis(service string, maxDepth int) *ImpactResult
ImpactAnalysis performs BFS downstream from a service to find affected services.
func (*GraphRAG) OnLogIngested ¶
OnLogIngested is the callback wired into the log ingestion pipeline.
func (*GraphRAG) OnMetricIngested ¶
OnMetricIngested is the callback wired into the metric ingestion pipeline.
func (*GraphRAG) OnSpanIngested ¶
OnSpanIngested is the callback wired into the trace ingestion pipeline.
func (*GraphRAG) PersistInvestigation ¶
func (g *GraphRAG) PersistInvestigation(triggerService string, chains []ErrorChainResult, anomalies []*AnomalyNode)
PersistInvestigation saves an investigation record from an error chain analysis.
func (*GraphRAG) RootCauseAnalysis ¶
func (g *GraphRAG) RootCauseAnalysis(service string, since time.Time) []RankedCause
RootCauseAnalysis combines ErrorChain with anomaly correlation to rank probable causes.
func (*GraphRAG) ServiceMap ¶
func (g *GraphRAG) ServiceMap(depth int) []ServiceMapEntry
func (*GraphRAG) ShortestPath ¶
ShortestPath finds the shortest path between two services using Dijkstra.
func (*GraphRAG) SimilarErrors ¶
func (g *GraphRAG) SimilarErrors(clusterID string, k int) []LogClusterNode
SimilarErrors finds log clusters similar to a given cluster using the vector index.
type GraphSnapshot ¶
type GraphSnapshot struct {
ID string `gorm:"primaryKey;size:64" json:"id"`
CreatedAt time.Time `json:"created_at"`
Nodes json.RawMessage `gorm:"type:text" json:"nodes"`
Edges json.RawMessage `gorm:"type:text" json:"edges"`
ServiceCount int `json:"service_count"`
TotalCalls int64 `json:"total_calls"`
AvgHealthScore float64 `json:"avg_health_score"`
}
GraphSnapshot is a periodic snapshot of the service topology persisted to DB.
func (GraphSnapshot) TableName ¶
func (GraphSnapshot) TableName() string
TableName overrides GORM's default table name.
type ImpactResult ¶
type ImpactResult struct {
Service string `json:"service"`
AffectedServices []AffectedEntry `json:"affected_services"`
TotalDownstream int `json:"total_downstream"`
}
ImpactResult describes the blast radius of a service failure.
type Investigation ¶
type Investigation struct {
ID string `gorm:"primaryKey;size:64" json:"id"`
CreatedAt time.Time `json:"created_at"`
Status string `gorm:"size:20" json:"status"` // detected, triaged, resolved
Severity string `gorm:"size:20" json:"severity"` // critical, warning, info
TriggerService string `gorm:"size:255;index" json:"trigger_service"`
TriggerOperation string `gorm:"size:255" json:"trigger_operation"`
ErrorMessage string `gorm:"type:text" json:"error_message"`
RootService string `gorm:"size:255" json:"root_service"`
RootOperation string `gorm:"size:255" json:"root_operation"`
CausalChain json.RawMessage `gorm:"type:text" json:"causal_chain"`
TraceIDs json.RawMessage `gorm:"type:text" json:"trace_ids"`
ErrorLogs json.RawMessage `gorm:"type:text" json:"error_logs"`
AnomalousMetrics json.RawMessage `gorm:"type:text" json:"anomalous_metrics"`
AffectedServices json.RawMessage `gorm:"type:text" json:"affected_services"`
SpanChain json.RawMessage `gorm:"type:text" json:"span_chain"`
}
Investigation is a persisted record of an automated error investigation.
func (Investigation) TableName ¶
func (Investigation) TableName() string
TableName overrides GORM's default table name.
type LogClusterNode ¶
type LogClusterNode struct {
ID string `json:"id"` // template hash
Template string `json:"template"`
Count int64 `json:"count"`
FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"`
SeverityDist map[string]int64 `json:"severity_distribution"`
}
LogClusterNode groups similar log messages.
type MetricNode ¶
type MetricNode struct {
ID string `json:"id"` // metric_name + "|" + service
MetricName string `json:"metric_name"`
Service string `json:"service"`
RollingMin float64 `json:"rolling_min"`
RollingMax float64 `json:"rolling_max"`
RollingAvg float64 `json:"rolling_avg"`
SampleCount int64 `json:"sample_count"`
LastSeen time.Time `json:"last_seen"`
}
MetricNode represents a metric series for a service.
type OperationNode ¶
type OperationNode struct {
ID string `json:"id"` // service + "|" + operation
Service string `json:"service"`
Operation string `json:"operation"`
FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"`
HealthScore float64 `json:"health_score"`
CallCount int64 `json:"call_count"`
ErrorCount int64 `json:"error_count"`
ErrorRate float64 `json:"error_rate"`
AvgLatency float64 `json:"avg_latency_ms"`
P50Latency float64 `json:"p50_latency_ms"`
P95Latency float64 `json:"p95_latency_ms"`
P99Latency float64 `json:"p99_latency_ms"`
TotalMs float64 `json:"-"`
}
OperationNode represents an endpoint/RPC within a service.
type RankedCause ¶
type RankedCause struct {
Service string `json:"service"`
Operation string `json:"operation"`
Score float64 `json:"score"`
Evidence []string `json:"evidence"`
ErrorChain []SpanNode `json:"error_chain,omitempty"`
Anomalies []AnomalyNode `json:"anomalies,omitempty"`
}
RankedCause is a probable root cause with evidence.
type RootCauseInfo ¶
type RootCauseInfo struct {
Service string `json:"service"`
Operation string `json:"operation"`
ErrorMessage string `json:"error_message"`
SpanID string `json:"span_id"`
TraceID string `json:"trace_id"`
}
RootCauseInfo identifies the responsible service and operation.
type ServiceMapEntry ¶
type ServiceMapEntry struct {
Service *ServiceNode `json:"service"`
Operations []*OperationNode `json:"operations,omitempty"`
CallsTo []*Edge `json:"calls_to,omitempty"`
CalledBy []*Edge `json:"called_by,omitempty"`
}
ServiceMap returns the service topology with health scores for the API.
type ServiceNode ¶
type ServiceNode struct {
ID string `json:"id"`
Name string `json:"name"`
FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"`
HealthScore float64 `json:"health_score"` // 0.0–1.0
CallCount int64 `json:"call_count"`
ErrorCount int64 `json:"error_count"`
ErrorRate float64 `json:"error_rate"`
AvgLatency float64 `json:"avg_latency_ms"`
TotalMs float64 `json:"-"` // for computing avg
}
ServiceNode represents a microservice with aggregated health stats.
type ServiceStore ¶
type ServiceStore struct {
Services map[string]*ServiceNode // key: service name
Operations map[string]*OperationNode // key: service|operation
Edges map[string]*Edge // key: type|from|to
// contains filtered or unexported fields
}
ServiceStore holds permanent service topology data.
func (*ServiceStore) AllEdges ¶
func (s *ServiceStore) AllEdges() []*Edge
func (*ServiceStore) AllServices ¶
func (s *ServiceStore) AllServices() []*ServiceNode
func (*ServiceStore) CallEdgesFrom ¶
func (s *ServiceStore) CallEdgesFrom(service string) []*Edge
func (*ServiceStore) CallEdgesTo ¶
func (s *ServiceStore) CallEdgesTo(service string) []*Edge
func (*ServiceStore) GetService ¶
func (s *ServiceStore) GetService(name string) (*ServiceNode, bool)
func (*ServiceStore) UpsertCallEdge ¶
func (*ServiceStore) UpsertOperation ¶
func (*ServiceStore) UpsertService ¶
type SignalStore ¶
type SignalStore struct {
LogClusters map[string]*LogClusterNode // key: cluster ID
Metrics map[string]*MetricNode // key: metric|service
Edges map[string]*Edge // key: type|from|to
// contains filtered or unexported fields
}
SignalStore holds log cluster and metric correlation data.
func (*SignalStore) AddLoggedDuringEdge ¶
func (ss *SignalStore) AddLoggedDuringEdge(clusterID, spanID string, ts time.Time)
func (*SignalStore) LogClustersForService ¶
func (ss *SignalStore) LogClustersForService(service string) []*LogClusterNode
func (*SignalStore) MetricsForService ¶
func (ss *SignalStore) MetricsForService(service string) []*MetricNode
func (*SignalStore) UpsertLogCluster ¶
func (ss *SignalStore) UpsertLogCluster(id, template, severity, service string, ts time.Time)
func (*SignalStore) UpsertMetric ¶
func (ss *SignalStore) UpsertMetric(metricName, service string, value float64, ts time.Time)
type SpanNode ¶
type SpanNode struct {
ID string `json:"id"` // span_id
TraceID string `json:"trace_id"`
ParentSpanID string `json:"parent_span_id"`
Service string `json:"service"`
Operation string `json:"operation"`
Duration float64 `json:"duration_ms"`
StatusCode string `json:"status_code"`
IsError bool `json:"is_error"`
Timestamp time.Time `json:"timestamp"`
}
SpanNode represents a single span within a trace.
type TraceNode ¶
type TraceNode struct {
ID string `json:"id"` // trace_id
RootService string `json:"root_service"`
Duration float64 `json:"duration_ms"`
Status string `json:"status"`
Timestamp time.Time `json:"timestamp"`
SpanCount int `json:"span_count"`
}
TraceNode represents a distributed trace.
type TraceStore ¶
type TraceStore struct {
Traces map[string]*TraceNode // key: trace_id
Spans map[string]*SpanNode // key: span_id
Edges map[string]*Edge // key: type|from|to
TTL time.Duration
// contains filtered or unexported fields
}
TraceStore holds trace/span detail with TTL-based pruning.
func (*TraceStore) ErrorSpans ¶
func (ts *TraceStore) ErrorSpans(service string, since time.Time) []*SpanNode
func (*TraceStore) Prune ¶
func (ts *TraceStore) Prune() int
Prune removes spans and traces older than TTL.
func (*TraceStore) SpansForTrace ¶
func (ts *TraceStore) SpansForTrace(traceID string) []*SpanNode
func (*TraceStore) UpsertSpan ¶
func (ts *TraceStore) UpsertSpan(span SpanNode)
func (*TraceStore) UpsertTrace ¶
func (ts *TraceStore) UpsertTrace(traceID, rootService, status string, durationMs float64, timestamp time.Time)