graphrag

package
v0.2.0-beta.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2026 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

Package graphrag provides a layered in-memory graph for real-time observability retrieval — error chains, root cause analysis, impact analysis. It replaces the simpler internal/graph package with typed stores.

Index

Constants

View Source
const Wildcard = "<*>"

Wildcard is the placeholder used for variable positions within a template.

Variables

This section is empty.

Functions

func AutoMigrateGraphRAG

func AutoMigrateGraphRAG(db *gorm.DB) error

AutoMigrateGraphRAG runs GORM auto-migration for GraphRAG models and applies tenant backfill + drain_templates composite-PK promotion. Safe to call repeatedly.

func Preprocess

func Preprocess(line string) string

Preprocess masks common variable patterns in a raw log line prior to tokenization. This implements Drain's configurable regex-replacement stage.

func SaveDrainTemplates

func SaveDrainTemplates(db *gorm.DB, tenant string, templates []Template) error

SaveDrainTemplates upserts the given templates into the drain_templates table under the supplied tenant. Tokens are JSON-encoded. Uses GORM's clause.OnConflict upsert which is dialect-aware (ON CONFLICT for SQLite/ PostgreSQL, ON DUPLICATE KEY UPDATE for MySQL).

The conflict target is the composite (tenant_id, id) primary key so the same Drain template hash can coexist across tenants — a future per-tenant Drain miner can rely on this to keep cluster IDs stable per tenant.

func SetPanicMetrics

func SetPanicMetrics(m *telemetry.Metrics)

SetPanicMetrics wires the telemetry metrics so GraphRAG worker recovery closures can increment OtelContext_panics_recovered_total{subsystem="graphrag"}. Safe to leave unset in tests.

Types

type AffectedEntry

type AffectedEntry struct {
	Service     string  `json:"service"`
	Depth       int     `json:"depth"`
	CallCount   int64   `json:"call_count"`
	ImpactScore float64 `json:"impact_score"`
}

AffectedEntry is a service affected by an upstream failure.

type AnomalyNode

type AnomalyNode struct {
	ID        string          `json:"id"`
	Type      AnomalyType     `json:"type"`
	Severity  AnomalySeverity `json:"severity"`
	Service   string          `json:"service"`
	Evidence  string          `json:"evidence"`
	Timestamp time.Time       `json:"timestamp"`
}

AnomalyNode represents a detected anomaly.

type AnomalySeverity

type AnomalySeverity string

AnomalySeverity indicates the severity of an anomaly.

const (
	SeverityCritical AnomalySeverity = "critical"
	SeverityWarning  AnomalySeverity = "warning"
	SeverityInfo     AnomalySeverity = "info"
)

type AnomalyStore

type AnomalyStore struct {
	Anomalies map[string]*AnomalyNode // key: anomaly ID
	Edges     map[string]*Edge        // key: type|from|to
	// contains filtered or unexported fields
}

AnomalyStore holds detected anomalies and their temporal correlations.

func (*AnomalyStore) AddAnomaly

func (as *AnomalyStore) AddAnomaly(anomaly AnomalyNode)

func (*AnomalyStore) AddPrecededByEdge

func (as *AnomalyStore) AddPrecededByEdge(anomalyID, precedingID string, ts time.Time)

func (*AnomalyStore) AnomaliesForService

func (as *AnomalyStore) AnomaliesForService(service string, since time.Time) []*AnomalyNode

func (*AnomalyStore) AnomaliesSince

func (as *AnomalyStore) AnomaliesSince(since time.Time) []*AnomalyNode

type AnomalyType

type AnomalyType string

AnomalyType indicates the kind of anomaly detected.

const (
	AnomalyErrorSpike   AnomalyType = "error_spike"
	AnomalyLatencySpike AnomalyType = "latency_spike"
	AnomalyMetricZScore AnomalyType = "metric_zscore"
)

type Config

type Config struct {
	TraceTTL      time.Duration
	RefreshEvery  time.Duration
	SnapshotEvery time.Duration
	AnomalyEvery  time.Duration
	WorkerCount   int
	ChannelSize   int
}

Config holds GraphRAG configuration.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns sensible defaults.

type CorrelatedSignalsResult

type CorrelatedSignalsResult struct {
	Service     string             `json:"service"`
	ErrorLogs   []LogClusterNode   `json:"error_logs"`
	Metrics     []MetricNode       `json:"metrics"`
	Anomalies   []AnomalyNode      `json:"anomalies"`
	ErrorChains []ErrorChainResult `json:"error_chains,omitempty"`
}

CorrelatedSignals gathers all related signals for a service within a time range.

type Drain

type Drain struct {
	// contains filtered or unexported fields
}

Drain is a thread-safe log template miner.

func NewDrain

func NewDrain(opts ...DrainOption) *Drain

NewDrain constructs a Drain miner with the supplied options.

func (*Drain) Len

func (d *Drain) Len() int

Len returns the number of live templates.

func (*Drain) LoadTemplates

func (d *Drain) LoadTemplates(tpls []Template)

LoadTemplates restores templates from a previous snapshot. Existing state is cleared. Intended for startup recovery. The LRU list is rebuilt by inserting templates in LastSeen order (oldest first, each PushFront'd), so the most recently-seen template ends up at the front.

func (*Drain) Match

func (d *Drain) Match(logLine string, ts time.Time) *Template

Match preprocesses the log line, descends the prefix tree, and either merges into an existing template or creates a new one. Returns the resulting template. Returns nil only for empty input.

func (*Drain) Templates

func (d *Drain) Templates() []Template

Templates returns a snapshot copy of all currently known templates. Internal back-pointers (elem, leaf) are deliberately omitted from copies.

type DrainOption

type DrainOption func(*Drain)

DrainOption configures a Drain instance.

func WithDepth

func WithDepth(depth int) DrainOption

WithDepth sets the prefix tree depth (default 4). Minimum 2.

func WithMaxChildren

func WithMaxChildren(n int) DrainOption

WithMaxChildren sets the max distinct children per internal node (default 100). Beyond this, tokens collapse to Wildcard.

func WithMaxTemplates

func WithMaxTemplates(n int) DrainOption

WithMaxTemplates sets the total template cap (default 50000). When exceeded, the least-recently-seen template is evicted.

func WithSimilarityThreshold

func WithSimilarityThreshold(st float64) DrainOption

WithSimilarityThreshold sets st (default 0.4). Clamped to (0, 1].

type DrainTemplateRow

type DrainTemplateRow struct {
	TenantID  string    `gorm:"primaryKey;size:64;default:'default';not null" json:"tenant_id"`
	ID        int64     `gorm:"primaryKey;autoIncrement:false" json:"id"` // int64(Template.ID)
	Tokens    string    `gorm:"type:text;not null" json:"tokens"`         // JSON-encoded []string
	Count     int       `json:"count"`
	FirstSeen time.Time `gorm:"index" json:"first_seen"`
	LastSeen  time.Time `gorm:"index" json:"last_seen"`
	Sample    string    `gorm:"type:text" json:"sample"`
}

DrainTemplateRow is the persisted GORM representation of a Drain log template. Tokens are JSON-encoded to stay schema-simple across SQLite/ MySQL/PostgreSQL/MSSQL.

ID is stored as int64 (bit-reinterpretation of the uint64 FNV-64 hash): the standard SQL drivers reject uint64 values with the high bit set, and signed int64 carries the same 64 bits without loss. Conversion happens in the persistence helpers.

The primary key is composite (tenant_id, id): the same template tokens can legitimately recur across tenants, and we want the cluster ID to stay stable per tenant once the in-memory Drain miner is partitioned per-tenant. TenantID is declared first so it leads the PK index.

func (DrainTemplateRow) TableName

func (DrainTemplateRow) TableName() string

TableName overrides GORM's default table name.

type Edge

type Edge struct {
	Type       EdgeType  `json:"type"`
	FromID     string    `json:"from_id"`
	ToID       string    `json:"to_id"`
	Weight     float64   `json:"weight,omitempty"`
	CallCount  int64     `json:"call_count,omitempty"`
	ErrorRate  float64   `json:"error_rate,omitempty"`
	AvgMs      float64   `json:"avg_latency_ms,omitempty"`
	TotalMs    float64   `json:"-"`
	ErrorCount int64     `json:"-"`
	UpdatedAt  time.Time `json:"updated_at"`
}

Edge represents a directed relationship between two nodes.

type EdgeType

type EdgeType string

EdgeType distinguishes different relationship categories.

const (
	EdgeCalls        EdgeType = "CALLS"
	EdgeExposes      EdgeType = "EXPOSES"
	EdgeContains     EdgeType = "CONTAINS"
	EdgeChildOf      EdgeType = "CHILD_OF"
	EdgeEmittedBy    EdgeType = "EMITTED_BY"
	EdgeLoggedDuring EdgeType = "LOGGED_DURING"
	EdgeMeasuredBy   EdgeType = "MEASURED_BY"
	EdgePrecededBy   EdgeType = "PRECEDED_BY"
	EdgeTriggeredBy  EdgeType = "TRIGGERED_BY"
)

type ErrorChainResult

type ErrorChainResult struct {
	RootCause        *RootCauseInfo   `json:"root_cause"`
	SpanChain        []SpanNode       `json:"span_chain"`
	CorrelatedLogs   []LogClusterNode `json:"correlated_logs,omitempty"`
	AnomalousMetrics []MetricNode     `json:"anomalous_metrics,omitempty"`
	TraceID          string           `json:"trace_id"`
}

ErrorChainResult is the output of an error chain query.

type GraphRAG

type GraphRAG struct {
	// contains filtered or unexported fields
}

GraphRAG is the main coordinator for the layered graph system.

Every in-memory store is partitioned by tenant. The coordinator holds a map of tenant ID → *tenantStores and a reader/writer mutex that protects only the outer map; per-tenant stores keep their own RWMutexes for fine-grained concurrent access. All event ingestion and queries route through storesFor(ctx) / storesForTenant(tenant) — there is no "global" slice.

func New

func New(repo *storage.Repository, vectorIdx *vectordb.Index, tsdbAgg *tsdb.Aggregator, ringBuf *tsdb.RingBuffer, cfg Config) *GraphRAG

New creates a new GraphRAG coordinator.

func (*GraphRAG) AllServiceEdges

func (g *GraphRAG) AllServiceEdges(ctx context.Context) []*Edge

AllServiceEdges returns every edge in the caller's tenant's ServiceStore. Kept as a narrow helper so API handlers do not need to traverse the tenantStores composite themselves.

func (*GraphRAG) AnomaliesForService

func (g *GraphRAG) AnomaliesForService(ctx context.Context, service string, since time.Time) []*AnomalyNode

AnomaliesForService is a tenant-aware read-through to the per-tenant AnomalyStore, exported so handlers outside this package never need to reach into the store maps directly.

func (*GraphRAG) AnomalyTimeline

func (g *GraphRAG) AnomalyTimeline(ctx context.Context, since time.Time) []*AnomalyNode

AnomalyTimeline returns recent anomalies sorted by time.

func (*GraphRAG) CorrelatedSignals

func (g *GraphRAG) CorrelatedSignals(ctx context.Context, service string, since time.Time) *CorrelatedSignalsResult

func (*GraphRAG) DependencyChain

func (g *GraphRAG) DependencyChain(ctx context.Context, traceID string) []SpanNode

DependencyChain returns the full span tree for a trace.

func (*GraphRAG) DroppedLogsCount

func (g *GraphRAG) DroppedLogsCount() int64

DroppedLogsCount reports the number of log events dropped because the ingestion channel was full.

func (*GraphRAG) DroppedMetricsCount

func (g *GraphRAG) DroppedMetricsCount() int64

DroppedMetricsCount reports the number of metric events dropped because the ingestion channel was full.

func (*GraphRAG) DroppedSpansCount

func (g *GraphRAG) DroppedSpansCount() int64

DroppedSpansCount reports the number of span events dropped because the ingestion channel was full. Exported for tests and readiness probes; atomic, safe from any goroutine.

func (*GraphRAG) ErrorChain

func (g *GraphRAG) ErrorChain(ctx context.Context, service string, since time.Time, limit int) []ErrorChainResult

ErrorChain traces error spans upstream to find the root cause service. The tenant slice is selected via ctx — callers without a tenant ctx collapse to storage.DefaultTenantID at the coordinator boundary.

func (*GraphRAG) EventBufferDepth

func (g *GraphRAG) EventBufferDepth() int

EventBufferDepth returns the current number of events queued in the ingestion channel. Exported for telemetry polling; never blocks.

func (*GraphRAG) GetGraphSnapshot

func (g *GraphRAG) GetGraphSnapshot(ctx context.Context, at time.Time) (*GraphSnapshot, error)

GetGraphSnapshot retrieves the snapshot closest to the requested time, scoped to the tenant carried by ctx. The composite (tenant_id, created_at) index supports the descending lookup.

func (*GraphRAG) GetInvestigation

func (g *GraphRAG) GetInvestigation(ctx context.Context, id string) (*Investigation, error)

GetInvestigation retrieves a single investigation by ID, scoped to the tenant carried by ctx. Returning ErrRecordNotFound for cross-tenant lookups prevents id-guessing from leaking another tenant's row.

func (*GraphRAG) GetInvestigations

func (g *GraphRAG) GetInvestigations(ctx context.Context, service, severity, status string, limit int) ([]Investigation, error)

GetInvestigations queries persisted investigations scoped to the tenant carried by ctx. The composite (tenant_id, created_at) index supports the recency-ordered scan.

func (*GraphRAG) ImpactAnalysis

func (g *GraphRAG) ImpactAnalysis(ctx context.Context, service string, maxDepth int) *ImpactResult

ImpactAnalysis performs BFS downstream from a service to find affected services.

func (*GraphRAG) InvestigationInsertCount

func (g *GraphRAG) InvestigationInsertCount() int64

InvestigationInsertCount reports cooldown-allowed PersistInvestigation calls. Semantics: this counter increments when the cooldown check passes, BEFORE the DB write — so a subsequent DB failure still increments this. It is NOT a strict DB insert count. Intended for tests to assert cooldown behavior without requiring a live repo.

func (*GraphRAG) IsRunning

func (g *GraphRAG) IsRunning() bool

IsRunning reports whether the coordinator's stop channel has not been closed. Used by readiness probes to confirm the background workers are still live.

func (*GraphRAG) OnLogIngested

func (g *GraphRAG) OnLogIngested(log storage.Log)

OnLogIngested is the callback wired into the log ingestion pipeline.

func (*GraphRAG) OnMetricIngested

func (g *GraphRAG) OnMetricIngested(metric tsdb.RawMetric)

OnMetricIngested is the callback wired into the metric ingestion pipeline. tsdb.RawMetric already carries a resolved TenantID (set in ingest/otlp.go Export), so we read it here instead of adding a second argument — keeping the metric callback signature identical across TSDB and GraphRAG.

func (*GraphRAG) OnSpanIngested

func (g *GraphRAG) OnSpanIngested(span storage.Span)

OnSpanIngested is the callback wired into the trace ingestion pipeline. Tenant is taken straight from the persisted Span (already resolved upstream by the OTLP Export handlers) and carried on the event — the callback signature is intentionally unchanged so external wiring stays trivial.

func (*GraphRAG) PersistInvestigation

func (g *GraphRAG) PersistInvestigation(tenant, triggerService string, chains []ErrorChainResult, anomalies []*AnomalyNode)

PersistInvestigation saves an investigation record from an error chain analysis. Tenant is accepted explicitly so the caller (the per-tenant anomaly loop) can re-enter ImpactAnalysis on the correct tenant slice and so the persisted row carries its originating tenant_id.

func (*GraphRAG) RegisterAnomaly

func (g *GraphRAG) RegisterAnomaly(tenant string, anomaly AnomalyNode)

RegisterAnomaly inserts an anomaly into the AnomalyStore for tenant. Mirrors PersistInvestigation's "tenant accepted explicitly" shape so out-of-band anomaly producers (synthetic detectors, integration tests, future external anomaly feeds) can land directly on the right tenant slice without going through the metric/error detection loops. Empty tenant collapses to storage.DefaultTenantID.

func (*GraphRAG) RootCauseAnalysis

func (g *GraphRAG) RootCauseAnalysis(ctx context.Context, service string, since time.Time) []RankedCause

RootCauseAnalysis combines ErrorChain with anomaly correlation to rank probable causes.

func (*GraphRAG) ServiceMap

func (g *GraphRAG) ServiceMap(ctx context.Context, depth int) []ServiceMapEntry

func (*GraphRAG) ServiceNames

func (g *GraphRAG) ServiceNames(ctx context.Context) []string

ServiceNames returns every service the caller's tenant has emitted any span for, sorted ascending. Reads from the in-memory ServiceStore — no DB scan. Used by /api/metadata/services so the dropdown matches the system map exactly (both are sourced from the same store).

func (*GraphRAG) SetMetrics

func (g *GraphRAG) SetMetrics(m *telemetry.Metrics)

SetMetrics wires the Prometheus registry so GraphRAG event drops are observable via otelcontext_graphrag_events_dropped_total. Safe to call before Start; pass nil to disable Prometheus recording (atomic counters still tick).

func (*GraphRAG) ShortestPath

func (g *GraphRAG) ShortestPath(ctx context.Context, from, to string) []string

ShortestPath finds the shortest path between two services using Dijkstra.

func (*GraphRAG) SimilarErrors

func (g *GraphRAG) SimilarErrors(ctx context.Context, clusterID string, k int) []LogClusterNode

SimilarErrors finds log clusters similar to a given cluster using the vector index, scoped to the tenant carried on ctx. Cross-tenant hits are impossible because the underlying vectordb partitions docs per tenant and this lookup resolves the SignalStore through storesFor(ctx).

func (*GraphRAG) Start

func (g *GraphRAG) Start(ctx context.Context)

Start begins background goroutines: workers, refresh, snapshot, anomaly detection. Each goroutine is wrapped in a panic recovery so one misbehaving event can't take down the whole subsystem.

func (*GraphRAG) Stop

func (g *GraphRAG) Stop()

Stop signals all goroutines to exit.

type GraphSnapshot

type GraphSnapshot struct {
	TenantID       string          `gorm:"size:64;default:'default';not null;index:idx_graph_snapshots_tenant_created,priority:1" json:"tenant_id"`
	ID             string          `gorm:"primaryKey;size:64" json:"id"`
	CreatedAt      time.Time       `gorm:"index:idx_graph_snapshots_tenant_created,priority:2" json:"created_at"`
	Nodes          json.RawMessage `gorm:"type:text" json:"nodes"`
	Edges          json.RawMessage `gorm:"type:text" json:"edges"`
	ServiceCount   int             `json:"service_count"`
	TotalCalls     int64           `json:"total_calls"`
	AvgHealthScore float64         `json:"avg_health_score"`
}

GraphSnapshot is a periodic snapshot of the service topology persisted to DB.

TenantID scopes the row to the tenant slice it was captured from. The composite (tenant_id, created_at) index supports the "most recent snapshot at-or-before T for tenant X" lookup that GetGraphSnapshot runs on every read.

func (GraphSnapshot) TableName

func (GraphSnapshot) TableName() string

TableName overrides GORM's default table name.

type ImpactResult

type ImpactResult struct {
	Service          string          `json:"service"`
	AffectedServices []AffectedEntry `json:"affected_services"`
	TotalDownstream  int             `json:"total_downstream"`
}

ImpactResult describes the blast radius of a service failure.

type Investigation

type Investigation struct {
	TenantID         string          `gorm:"size:64;default:'default';not null;index:idx_investigations_tenant_created,priority:1" json:"tenant_id"`
	ID               string          `gorm:"primaryKey;size:64" json:"id"`
	CreatedAt        time.Time       `gorm:"index:idx_investigations_tenant_created,priority:2" json:"created_at"`
	Status           string          `gorm:"size:20" json:"status"`   // detected, triaged, resolved
	Severity         string          `gorm:"size:20" json:"severity"` // critical, warning, info
	TriggerService   string          `gorm:"size:255;index" json:"trigger_service"`
	TriggerOperation string          `gorm:"size:255" json:"trigger_operation"`
	ErrorMessage     string          `gorm:"type:text" json:"error_message"`
	RootService      string          `gorm:"size:255" json:"root_service"`
	RootOperation    string          `gorm:"size:255" json:"root_operation"`
	CausalChain      json.RawMessage `gorm:"type:text" json:"causal_chain"`
	TraceIDs         json.RawMessage `gorm:"type:text" json:"trace_ids"`
	ErrorLogs        json.RawMessage `gorm:"type:text" json:"error_logs"`
	AnomalousMetrics json.RawMessage `gorm:"type:text" json:"anomalous_metrics"`
	AffectedServices json.RawMessage `gorm:"type:text" json:"affected_services"`
	SpanChain        json.RawMessage `gorm:"type:text" json:"span_chain"`
}

Investigation is a persisted record of an automated error investigation.

TenantID scopes the row to its originating tenant. The composite (tenant_id, created_at) index supports the recency-ordered "investigations for tenant X" query that GetInvestigations runs on every read.

func (Investigation) TableName

func (Investigation) TableName() string

TableName overrides GORM's default table name.

type LogClusterNode

type LogClusterNode struct {
	ID             string           `json:"id"` // service-scoped cluster id (stable)
	Template       string           `json:"template"`
	TemplateID     uint64           `json:"template_id,omitempty"`
	TemplateTokens []string         `json:"template_tokens,omitempty"`
	SampleLog      string           `json:"sample_log,omitempty"`
	Count          int64            `json:"count"`
	FirstSeen      time.Time        `json:"first_seen"`
	LastSeen       time.Time        `json:"last_seen"`
	SeverityDist   map[string]int64 `json:"severity_distribution"`
}

LogClusterNode groups similar log messages.

Clustering is performed by the Drain template miner. TemplateID is the stable FNV-64 hash of TemplateTokens; ID is the user-facing cluster identifier (service-scoped) that remains stable across Drain re-merges.

type MetricNode

type MetricNode struct {
	ID          string    `json:"id"` // metric_name + "|" + service
	MetricName  string    `json:"metric_name"`
	Service     string    `json:"service"`
	RollingMin  float64   `json:"rolling_min"`
	RollingMax  float64   `json:"rolling_max"`
	RollingAvg  float64   `json:"rolling_avg"`
	SampleCount int64     `json:"sample_count"`
	LastSeen    time.Time `json:"last_seen"`
}

MetricNode represents a metric series for a service.

type NodeType

type NodeType string

NodeType distinguishes different node categories in the graph.

const (
	NodeService    NodeType = "service"
	NodeOperation  NodeType = "operation"
	NodeTrace      NodeType = "trace"
	NodeSpan       NodeType = "span"
	NodeLogCluster NodeType = "log_cluster"
	NodeMetric     NodeType = "metric"
	NodeAnomaly    NodeType = "anomaly"
)

type OperationNode

type OperationNode struct {
	ID          string    `json:"id"` // service + "|" + operation
	Service     string    `json:"service"`
	Operation   string    `json:"operation"`
	FirstSeen   time.Time `json:"first_seen"`
	LastSeen    time.Time `json:"last_seen"`
	HealthScore float64   `json:"health_score"`

	CallCount  int64   `json:"call_count"`
	ErrorCount int64   `json:"error_count"`
	ErrorRate  float64 `json:"error_rate"`
	AvgLatency float64 `json:"avg_latency_ms"`
	P50Latency float64 `json:"p50_latency_ms"`
	P95Latency float64 `json:"p95_latency_ms"`
	P99Latency float64 `json:"p99_latency_ms"`
	TotalMs    float64 `json:"-"`
}

OperationNode represents an endpoint/RPC within a service.

type RankedCause

type RankedCause struct {
	Service    string        `json:"service"`
	Operation  string        `json:"operation"`
	Score      float64       `json:"score"`
	Evidence   []string      `json:"evidence"`
	ErrorChain []SpanNode    `json:"error_chain,omitempty"`
	Anomalies  []AnomalyNode `json:"anomalies,omitempty"`
}

RankedCause is a probable root cause with evidence.

type RootCauseInfo

type RootCauseInfo struct {
	Service      string `json:"service"`
	Operation    string `json:"operation"`
	ErrorMessage string `json:"error_message"`
	SpanID       string `json:"span_id"`
	TraceID      string `json:"trace_id"`
}

RootCauseInfo identifies the responsible service and operation.

type ServiceMapEntry

type ServiceMapEntry struct {
	Service    *ServiceNode     `json:"service"`
	Operations []*OperationNode `json:"operations,omitempty"`
	CallsTo    []*Edge          `json:"calls_to,omitempty"`
	CalledBy   []*Edge          `json:"called_by,omitempty"`
}

ServiceMap returns the service topology with health scores for the API.

type ServiceNode

type ServiceNode struct {
	ID          string    `json:"id"`
	Name        string    `json:"name"`
	FirstSeen   time.Time `json:"first_seen"`
	LastSeen    time.Time `json:"last_seen"`
	HealthScore float64   `json:"health_score"` // 0.0–1.0

	CallCount  int64   `json:"call_count"`
	ErrorCount int64   `json:"error_count"`
	ErrorRate  float64 `json:"error_rate"`
	AvgLatency float64 `json:"avg_latency_ms"`
	TotalMs    float64 `json:"-"` // for computing avg
}

ServiceNode represents a microservice with aggregated health stats.

type ServiceStore

type ServiceStore struct {
	Services   map[string]*ServiceNode   // key: service name
	Operations map[string]*OperationNode // key: service|operation
	Edges      map[string]*Edge          // key: type|from|to
	// contains filtered or unexported fields
}

ServiceStore holds permanent service topology data.

func (*ServiceStore) AllEdges

func (s *ServiceStore) AllEdges() []*Edge

func (*ServiceStore) AllServices

func (s *ServiceStore) AllServices() []*ServiceNode

func (*ServiceStore) CallEdgesFrom

func (s *ServiceStore) CallEdgesFrom(service string) []*Edge

func (*ServiceStore) CallEdgesTo

func (s *ServiceStore) CallEdgesTo(service string) []*Edge

func (*ServiceStore) GetService

func (s *ServiceStore) GetService(name string) (*ServiceNode, bool)

func (*ServiceStore) UpsertCallEdge

func (s *ServiceStore) UpsertCallEdge(source, target string, durationMs float64, isError bool, ts time.Time)

func (*ServiceStore) UpsertOperation

func (s *ServiceStore) UpsertOperation(service, operation string, durationMs float64, isError bool, ts time.Time)

func (*ServiceStore) UpsertService

func (s *ServiceStore) UpsertService(name string, durationMs float64, isError bool, ts time.Time)

type SignalStore

type SignalStore struct {
	LogClusters map[string]*LogClusterNode // key: cluster ID
	Metrics     map[string]*MetricNode     // key: metric|service
	Edges       map[string]*Edge           // key: type|from|to
	// contains filtered or unexported fields
}

SignalStore holds log cluster and metric correlation data.

func (*SignalStore) AddLoggedDuringEdge

func (ss *SignalStore) AddLoggedDuringEdge(clusterID, spanID string, ts time.Time)

func (*SignalStore) LogClustersForService

func (ss *SignalStore) LogClustersForService(service string) []*LogClusterNode

func (*SignalStore) MetricsForService

func (ss *SignalStore) MetricsForService(service string) []*MetricNode

func (*SignalStore) UpsertLogCluster

func (ss *SignalStore) UpsertLogCluster(id, template, severity, service string, ts time.Time)

func (*SignalStore) UpsertLogClusterWithTemplate

func (ss *SignalStore) UpsertLogClusterWithTemplate(id, template, severity, service string, templateID uint64, tokens []string, sample string, ts time.Time)

UpsertLogClusterWithTemplate is the Drain-aware upsert. It stores the mined template tokens, the stable template ID, and a sample raw log. The older UpsertLogCluster is preserved for backward compatibility.

func (*SignalStore) UpsertMetric

func (ss *SignalStore) UpsertMetric(metricName, service string, value float64, ts time.Time)

type SpanNode

type SpanNode struct {
	ID           string    `json:"id"` // span_id
	TraceID      string    `json:"trace_id"`
	ParentSpanID string    `json:"parent_span_id"`
	Service      string    `json:"service"`
	Operation    string    `json:"operation"`
	Duration     float64   `json:"duration_ms"`
	StatusCode   string    `json:"status_code"`
	IsError      bool      `json:"is_error"`
	Timestamp    time.Time `json:"timestamp"`
}

SpanNode represents a single span within a trace.

type Template

type Template struct {
	ID        uint64    `json:"id"`
	Tokens    []string  `json:"tokens"`
	Count     int       `json:"count"`
	FirstSeen time.Time `json:"first_seen"`
	LastSeen  time.Time `json:"last_seen"`
	Sample    string    `json:"sample,omitempty"`
	// contains filtered or unexported fields
}

Template represents a mined log group.

func LoadDrainTemplates

func LoadDrainTemplates(db *gorm.DB, tenant string) ([]Template, error)

LoadDrainTemplates reads persisted Drain templates for the supplied tenant and returns them in a format ready to pass to Drain.LoadTemplates. Returns an empty slice (and nil error) if no rows match.

func (*Template) TemplateString

func (t *Template) TemplateString() string

TemplateString returns the template rendered as a single string.

type TraceNode

type TraceNode struct {
	ID          string    `json:"id"` // trace_id
	RootService string    `json:"root_service"`
	Duration    float64   `json:"duration_ms"`
	Status      string    `json:"status"`
	Timestamp   time.Time `json:"timestamp"`
	SpanCount   int       `json:"span_count"`
}

TraceNode represents a distributed trace.

type TraceStore

type TraceStore struct {
	Traces map[string]*TraceNode // key: trace_id
	Spans  map[string]*SpanNode  // key: span_id
	Edges  map[string]*Edge      // key: type|from|to
	TTL    time.Duration
	// contains filtered or unexported fields
}

TraceStore holds trace/span detail with TTL-based pruning.

func (*TraceStore) ErrorSpans

func (ts *TraceStore) ErrorSpans(service string, since time.Time) []*SpanNode

func (*TraceStore) GetSpan

func (ts *TraceStore) GetSpan(spanID string) (*SpanNode, bool)

func (*TraceStore) GetTrace

func (ts *TraceStore) GetTrace(traceID string) (*TraceNode, bool)

func (*TraceStore) Prune

func (ts *TraceStore) Prune() int

Prune removes spans and traces older than TTL.

func (*TraceStore) SpansForTrace

func (ts *TraceStore) SpansForTrace(traceID string) []*SpanNode

func (*TraceStore) UpsertSpan

func (ts *TraceStore) UpsertSpan(span SpanNode)

func (*TraceStore) UpsertTrace

func (ts *TraceStore) UpsertTrace(traceID, rootService, status string, durationMs float64, timestamp time.Time)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL