graphrag

package
v0.0.11-beta.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package graphrag provides a layered in-memory graph for real-time observability retrieval — error chains, root cause analysis, impact analysis. It replaces the simpler internal/graph package with typed stores.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AutoMigrateGraphRAG

func AutoMigrateGraphRAG(db *gorm.DB) error

AutoMigrateGraphRAG runs GORM auto-migration for GraphRAG models.

Types

type AffectedEntry

type AffectedEntry struct {
	Service     string  `json:"service"`
	Depth       int     `json:"depth"`
	CallCount   int64   `json:"call_count"`
	ImpactScore float64 `json:"impact_score"`
}

AffectedEntry is a service affected by an upstream failure.

type AnomalyNode

type AnomalyNode struct {
	ID        string          `json:"id"`
	Type      AnomalyType     `json:"type"`
	Severity  AnomalySeverity `json:"severity"`
	Service   string          `json:"service"`
	Evidence  string          `json:"evidence"`
	Timestamp time.Time       `json:"timestamp"`
}

AnomalyNode represents a detected anomaly.

type AnomalySeverity

type AnomalySeverity string

AnomalySeverity indicates the severity of an anomaly.

const (
	SeverityCritical AnomalySeverity = "critical"
	SeverityWarning  AnomalySeverity = "warning"
	SeverityInfo     AnomalySeverity = "info"
)

type AnomalyStore

type AnomalyStore struct {
	Anomalies map[string]*AnomalyNode // key: anomaly ID
	Edges     map[string]*Edge        // key: type|from|to
	// contains filtered or unexported fields
}

AnomalyStore holds detected anomalies and their temporal correlations.

func (*AnomalyStore) AddAnomaly

func (as *AnomalyStore) AddAnomaly(anomaly AnomalyNode)

func (*AnomalyStore) AddPrecededByEdge

func (as *AnomalyStore) AddPrecededByEdge(anomalyID, precedingID string, ts time.Time)

func (*AnomalyStore) AnomaliesForService

func (as *AnomalyStore) AnomaliesForService(service string, since time.Time) []*AnomalyNode

func (*AnomalyStore) AnomaliesSince

func (as *AnomalyStore) AnomaliesSince(since time.Time) []*AnomalyNode

type AnomalyType

type AnomalyType string

AnomalyType indicates the kind of anomaly detected.

const (
	AnomalyErrorSpike   AnomalyType = "error_spike"
	AnomalyLatencySpike AnomalyType = "latency_spike"
	AnomalyMetricZScore AnomalyType = "metric_zscore"
)

type Config

type Config struct {
	TraceTTL      time.Duration
	RefreshEvery  time.Duration
	SnapshotEvery time.Duration
	AnomalyEvery  time.Duration
	WorkerCount   int
	ChannelSize   int
}

Config holds GraphRAG configuration.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns sensible defaults.

type CorrelatedSignalsResult

type CorrelatedSignalsResult struct {
	Service     string             `json:"service"`
	ErrorLogs   []LogClusterNode   `json:"error_logs"`
	Metrics     []MetricNode       `json:"metrics"`
	Anomalies   []AnomalyNode      `json:"anomalies"`
	ErrorChains []ErrorChainResult `json:"error_chains,omitempty"`
}

CorrelatedSignals gathers all related signals for a service within a time range.

type Edge

type Edge struct {
	Type       EdgeType  `json:"type"`
	FromID     string    `json:"from_id"`
	ToID       string    `json:"to_id"`
	Weight     float64   `json:"weight,omitempty"`
	CallCount  int64     `json:"call_count,omitempty"`
	ErrorRate  float64   `json:"error_rate,omitempty"`
	AvgMs      float64   `json:"avg_latency_ms,omitempty"`
	TotalMs    float64   `json:"-"`
	ErrorCount int64     `json:"-"`
	UpdatedAt  time.Time `json:"updated_at"`
}

Edge represents a directed relationship between two nodes.

type EdgeType

type EdgeType string

EdgeType distinguishes different relationship categories.

const (
	EdgeCalls        EdgeType = "CALLS"
	EdgeExposes      EdgeType = "EXPOSES"
	EdgeContains     EdgeType = "CONTAINS"
	EdgeChildOf      EdgeType = "CHILD_OF"
	EdgeEmittedBy    EdgeType = "EMITTED_BY"
	EdgeLoggedDuring EdgeType = "LOGGED_DURING"
	EdgeMeasuredBy   EdgeType = "MEASURED_BY"
	EdgePrecededBy   EdgeType = "PRECEDED_BY"
	EdgeTriggeredBy  EdgeType = "TRIGGERED_BY"
)

type ErrorChainResult

type ErrorChainResult struct {
	RootCause        *RootCauseInfo   `json:"root_cause"`
	SpanChain        []SpanNode       `json:"span_chain"`
	CorrelatedLogs   []LogClusterNode `json:"correlated_logs,omitempty"`
	AnomalousMetrics []MetricNode     `json:"anomalous_metrics,omitempty"`
	TraceID          string           `json:"trace_id"`
}

ErrorChainResult is the output of an error chain query.

type GraphRAG

type GraphRAG struct {
	ServiceStore *ServiceStore
	TraceStore   *TraceStore
	SignalStore  *SignalStore
	AnomalyStore *AnomalyStore
	// contains filtered or unexported fields
}

GraphRAG is the main coordinator for the layered graph system.

func New

func New(repo *storage.Repository, vectorIdx *vectordb.Index, tsdbAgg *tsdb.Aggregator, ringBuf *tsdb.RingBuffer, cfg Config) *GraphRAG

New creates a new GraphRAG coordinator.

func (*GraphRAG) AnomalyTimeline

func (g *GraphRAG) AnomalyTimeline(since time.Time) []*AnomalyNode

AnomalyTimeline returns recent anomalies sorted by time.

func (*GraphRAG) CorrelatedSignals

func (g *GraphRAG) CorrelatedSignals(service string, since time.Time) *CorrelatedSignalsResult

func (*GraphRAG) DependencyChain

func (g *GraphRAG) DependencyChain(traceID string) []SpanNode

DependencyChain returns the full span tree for a trace.

func (*GraphRAG) ErrorChain

func (g *GraphRAG) ErrorChain(service string, since time.Time, limit int) []ErrorChainResult

ErrorChain traces error spans upstream to find the root cause service.

func (*GraphRAG) GetGraphSnapshot

func (g *GraphRAG) GetGraphSnapshot(at time.Time) (*GraphSnapshot, error)

GetGraphSnapshot retrieves the snapshot closest to the requested time.

func (*GraphRAG) GetInvestigation

func (g *GraphRAG) GetInvestigation(id string) (*Investigation, error)

GetInvestigation retrieves a single investigation by ID.

func (*GraphRAG) GetInvestigations

func (g *GraphRAG) GetInvestigations(service, severity, status string, limit int) ([]Investigation, error)

GetInvestigations queries persisted investigations.

func (*GraphRAG) ImpactAnalysis

func (g *GraphRAG) ImpactAnalysis(service string, maxDepth int) *ImpactResult

ImpactAnalysis performs BFS downstream from a service to find affected services.

func (*GraphRAG) OnLogIngested

func (g *GraphRAG) OnLogIngested(log storage.Log)

OnLogIngested is the callback wired into the log ingestion pipeline.

func (*GraphRAG) OnMetricIngested

func (g *GraphRAG) OnMetricIngested(metric tsdb.RawMetric)

OnMetricIngested is the callback wired into the metric ingestion pipeline.

func (*GraphRAG) OnSpanIngested

func (g *GraphRAG) OnSpanIngested(span storage.Span)

OnSpanIngested is the callback wired into the trace ingestion pipeline.

func (*GraphRAG) PersistInvestigation

func (g *GraphRAG) PersistInvestigation(triggerService string, chains []ErrorChainResult, anomalies []*AnomalyNode)

PersistInvestigation saves an investigation record from an error chain analysis.

func (*GraphRAG) RootCauseAnalysis

func (g *GraphRAG) RootCauseAnalysis(service string, since time.Time) []RankedCause

RootCauseAnalysis combines ErrorChain with anomaly correlation to rank probable causes.

func (*GraphRAG) ServiceMap

func (g *GraphRAG) ServiceMap(depth int) []ServiceMapEntry

func (*GraphRAG) ShortestPath

func (g *GraphRAG) ShortestPath(from, to string) []string

ShortestPath finds the shortest path between two services using Dijkstra.

func (*GraphRAG) SimilarErrors

func (g *GraphRAG) SimilarErrors(clusterID string, k int) []LogClusterNode

SimilarErrors finds log clusters similar to a given cluster using the vector index.

func (*GraphRAG) Start

func (g *GraphRAG) Start(ctx context.Context)

Start begins background goroutines: workers, refresh, snapshot, anomaly detection.

func (*GraphRAG) Stop

func (g *GraphRAG) Stop()

Stop signals all goroutines to exit.

type GraphSnapshot

type GraphSnapshot struct {
	ID             string          `gorm:"primaryKey;size:64" json:"id"`
	CreatedAt      time.Time       `json:"created_at"`
	Nodes          json.RawMessage `gorm:"type:text" json:"nodes"`
	Edges          json.RawMessage `gorm:"type:text" json:"edges"`
	ServiceCount   int             `json:"service_count"`
	TotalCalls     int64           `json:"total_calls"`
	AvgHealthScore float64         `json:"avg_health_score"`
}

GraphSnapshot is a periodic snapshot of the service topology persisted to DB.

func (GraphSnapshot) TableName

func (GraphSnapshot) TableName() string

TableName overrides GORM's default table name.

type ImpactResult

type ImpactResult struct {
	Service          string          `json:"service"`
	AffectedServices []AffectedEntry `json:"affected_services"`
	TotalDownstream  int             `json:"total_downstream"`
}

ImpactResult describes the blast radius of a service failure.

type Investigation

type Investigation struct {
	ID               string          `gorm:"primaryKey;size:64" json:"id"`
	CreatedAt        time.Time       `json:"created_at"`
	Status           string          `gorm:"size:20" json:"status"`   // detected, triaged, resolved
	Severity         string          `gorm:"size:20" json:"severity"` // critical, warning, info
	TriggerService   string          `gorm:"size:255;index" json:"trigger_service"`
	TriggerOperation string          `gorm:"size:255" json:"trigger_operation"`
	ErrorMessage     string          `gorm:"type:text" json:"error_message"`
	RootService      string          `gorm:"size:255" json:"root_service"`
	RootOperation    string          `gorm:"size:255" json:"root_operation"`
	CausalChain      json.RawMessage `gorm:"type:text" json:"causal_chain"`
	TraceIDs         json.RawMessage `gorm:"type:text" json:"trace_ids"`
	ErrorLogs        json.RawMessage `gorm:"type:text" json:"error_logs"`
	AnomalousMetrics json.RawMessage `gorm:"type:text" json:"anomalous_metrics"`
	AffectedServices json.RawMessage `gorm:"type:text" json:"affected_services"`
	SpanChain        json.RawMessage `gorm:"type:text" json:"span_chain"`
}

Investigation is a persisted record of an automated error investigation.

func (Investigation) TableName

func (Investigation) TableName() string

TableName overrides GORM's default table name.

type LogClusterNode

type LogClusterNode struct {
	ID           string           `json:"id"` // template hash
	Template     string           `json:"template"`
	Count        int64            `json:"count"`
	FirstSeen    time.Time        `json:"first_seen"`
	LastSeen     time.Time        `json:"last_seen"`
	SeverityDist map[string]int64 `json:"severity_distribution"`
}

LogClusterNode groups similar log messages.

type MetricNode

type MetricNode struct {
	ID          string    `json:"id"` // metric_name + "|" + service
	MetricName  string    `json:"metric_name"`
	Service     string    `json:"service"`
	RollingMin  float64   `json:"rolling_min"`
	RollingMax  float64   `json:"rolling_max"`
	RollingAvg  float64   `json:"rolling_avg"`
	SampleCount int64     `json:"sample_count"`
	LastSeen    time.Time `json:"last_seen"`
}

MetricNode represents a metric series for a service.

type NodeType

type NodeType string

NodeType distinguishes different node categories in the graph.

const (
	NodeService    NodeType = "service"
	NodeOperation  NodeType = "operation"
	NodeTrace      NodeType = "trace"
	NodeSpan       NodeType = "span"
	NodeLogCluster NodeType = "log_cluster"
	NodeMetric     NodeType = "metric"
	NodeAnomaly    NodeType = "anomaly"
)

type OperationNode

type OperationNode struct {
	ID          string    `json:"id"` // service + "|" + operation
	Service     string    `json:"service"`
	Operation   string    `json:"operation"`
	FirstSeen   time.Time `json:"first_seen"`
	LastSeen    time.Time `json:"last_seen"`
	HealthScore float64   `json:"health_score"`

	CallCount  int64   `json:"call_count"`
	ErrorCount int64   `json:"error_count"`
	ErrorRate  float64 `json:"error_rate"`
	AvgLatency float64 `json:"avg_latency_ms"`
	P50Latency float64 `json:"p50_latency_ms"`
	P95Latency float64 `json:"p95_latency_ms"`
	P99Latency float64 `json:"p99_latency_ms"`
	TotalMs    float64 `json:"-"`
}

OperationNode represents an endpoint/RPC within a service.

type RankedCause

type RankedCause struct {
	Service    string        `json:"service"`
	Operation  string        `json:"operation"`
	Score      float64       `json:"score"`
	Evidence   []string      `json:"evidence"`
	ErrorChain []SpanNode    `json:"error_chain,omitempty"`
	Anomalies  []AnomalyNode `json:"anomalies,omitempty"`
}

RankedCause is a probable root cause with evidence.

type RootCauseInfo

type RootCauseInfo struct {
	Service      string `json:"service"`
	Operation    string `json:"operation"`
	ErrorMessage string `json:"error_message"`
	SpanID       string `json:"span_id"`
	TraceID      string `json:"trace_id"`
}

RootCauseInfo identifies the responsible service and operation.

type ServiceMapEntry

type ServiceMapEntry struct {
	Service    *ServiceNode     `json:"service"`
	Operations []*OperationNode `json:"operations,omitempty"`
	CallsTo    []*Edge          `json:"calls_to,omitempty"`
	CalledBy   []*Edge          `json:"called_by,omitempty"`
}

ServiceMap returns the service topology with health scores for the API.

type ServiceNode

type ServiceNode struct {
	ID          string    `json:"id"`
	Name        string    `json:"name"`
	FirstSeen   time.Time `json:"first_seen"`
	LastSeen    time.Time `json:"last_seen"`
	HealthScore float64   `json:"health_score"` // 0.0–1.0

	CallCount  int64   `json:"call_count"`
	ErrorCount int64   `json:"error_count"`
	ErrorRate  float64 `json:"error_rate"`
	AvgLatency float64 `json:"avg_latency_ms"`
	TotalMs    float64 `json:"-"` // for computing avg
}

ServiceNode represents a microservice with aggregated health stats.

type ServiceStore

type ServiceStore struct {
	Services   map[string]*ServiceNode   // key: service name
	Operations map[string]*OperationNode // key: service|operation
	Edges      map[string]*Edge          // key: type|from|to
	// contains filtered or unexported fields
}

ServiceStore holds permanent service topology data.

func (*ServiceStore) AllEdges

func (s *ServiceStore) AllEdges() []*Edge

func (*ServiceStore) AllServices

func (s *ServiceStore) AllServices() []*ServiceNode

func (*ServiceStore) CallEdgesFrom

func (s *ServiceStore) CallEdgesFrom(service string) []*Edge

func (*ServiceStore) CallEdgesTo

func (s *ServiceStore) CallEdgesTo(service string) []*Edge

func (*ServiceStore) GetService

func (s *ServiceStore) GetService(name string) (*ServiceNode, bool)

func (*ServiceStore) UpsertCallEdge

func (s *ServiceStore) UpsertCallEdge(source, target string, durationMs float64, isError bool, ts time.Time)

func (*ServiceStore) UpsertOperation

func (s *ServiceStore) UpsertOperation(service, operation string, durationMs float64, isError bool, ts time.Time)

func (*ServiceStore) UpsertService

func (s *ServiceStore) UpsertService(name string, durationMs float64, isError bool, ts time.Time)

type SignalStore

type SignalStore struct {
	LogClusters map[string]*LogClusterNode // key: cluster ID
	Metrics     map[string]*MetricNode     // key: metric|service
	Edges       map[string]*Edge           // key: type|from|to
	// contains filtered or unexported fields
}

SignalStore holds log cluster and metric correlation data.

func (*SignalStore) AddLoggedDuringEdge

func (ss *SignalStore) AddLoggedDuringEdge(clusterID, spanID string, ts time.Time)

func (*SignalStore) LogClustersForService

func (ss *SignalStore) LogClustersForService(service string) []*LogClusterNode

func (*SignalStore) MetricsForService

func (ss *SignalStore) MetricsForService(service string) []*MetricNode

func (*SignalStore) UpsertLogCluster

func (ss *SignalStore) UpsertLogCluster(id, template, severity, service string, ts time.Time)

func (*SignalStore) UpsertMetric

func (ss *SignalStore) UpsertMetric(metricName, service string, value float64, ts time.Time)

type SpanNode

type SpanNode struct {
	ID           string    `json:"id"` // span_id
	TraceID      string    `json:"trace_id"`
	ParentSpanID string    `json:"parent_span_id"`
	Service      string    `json:"service"`
	Operation    string    `json:"operation"`
	Duration     float64   `json:"duration_ms"`
	StatusCode   string    `json:"status_code"`
	IsError      bool      `json:"is_error"`
	Timestamp    time.Time `json:"timestamp"`
}

SpanNode represents a single span within a trace.

type TraceNode

type TraceNode struct {
	ID          string    `json:"id"` // trace_id
	RootService string    `json:"root_service"`
	Duration    float64   `json:"duration_ms"`
	Status      string    `json:"status"`
	Timestamp   time.Time `json:"timestamp"`
	SpanCount   int       `json:"span_count"`
}

TraceNode represents a distributed trace.

type TraceStore

type TraceStore struct {
	Traces map[string]*TraceNode // key: trace_id
	Spans  map[string]*SpanNode  // key: span_id
	Edges  map[string]*Edge      // key: type|from|to
	TTL    time.Duration
	// contains filtered or unexported fields
}

TraceStore holds trace/span detail with TTL-based pruning.

func (*TraceStore) ErrorSpans

func (ts *TraceStore) ErrorSpans(service string, since time.Time) []*SpanNode

func (*TraceStore) GetSpan

func (ts *TraceStore) GetSpan(spanID string) (*SpanNode, bool)

func (*TraceStore) GetTrace

func (ts *TraceStore) GetTrace(traceID string) (*TraceNode, bool)

func (*TraceStore) Prune

func (ts *TraceStore) Prune() int

Prune removes spans and traces older than TTL.

func (*TraceStore) SpansForTrace

func (ts *TraceStore) SpansForTrace(traceID string) []*SpanNode

func (*TraceStore) UpsertSpan

func (ts *TraceStore) UpsertSpan(span SpanNode)

func (*TraceStore) UpsertTrace

func (ts *TraceStore) UpsertTrace(traceID, rootService, status string, durationMs float64, timestamp time.Time)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL