storage

package
v0.2.0-beta.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2026 License: MIT Imports: 34 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultTenantID = "default"

DefaultTenantID is the tenant assigned when no X-Tenant-ID header or tenant.id resource attribute is present on an OTLP request.

View Source
const MaxTenantIDLength = 128

MaxTenantIDLength caps the length of an accepted tenant ID. Tenant IDs are stored in a VARCHAR(64) column on every domain row plus propagate into structured logs and Prometheus labels. The cap is a defense in depth against silent VARCHAR truncation at insert time and unbounded label cardinality from a hostile or misconfigured client.

View Source
const PartitioningModeDaily = "daily"

PartitioningModeDaily is the canonical opt-in value for daily partitioning.

Variables

View Source
var ErrLogNotFoundOrWrongTenant = errors.New("log not found or not accessible by current tenant")

ErrLogNotFoundOrWrongTenant is returned by UpdateLogInsight when the target row does not exist or belongs to a different tenant. Handlers should translate this to a 404 (to avoid confirming the existence of cross-tenant rows).

Functions

func AutoMigrateModels

func AutoMigrateModels(db *gorm.DB, driver string) error

AutoMigrateModels runs GORM auto-migration for all OtelContext models.

When DB_POSTGRES_PARTITIONING=daily, the `logs` table is provisioned as a declarative range-partitioned parent BEFORE GORM AutoMigrate runs, so AutoMigrate sees an existing table and skips it (GORM's IF NOT EXISTS behaviour). This is greenfield-only — see setupPostgresPartitionedLogs for the safety check.

func AutoMigrateModelsWithOptions

func AutoMigrateModelsWithOptions(db *gorm.DB, driver string, opts MigrateOptions) error

AutoMigrateModelsWithOptions is the option-driven variant of AutoMigrateModels. Existing callers should continue to use AutoMigrateModels — the options entry point is for new wiring (currently main.go) that needs to plumb the partitioning flag.

func ClampSearchWindowTo24h

func ClampSearchWindowTo24h(start, end, now time.Time) (time.Time, time.Time, error)

ClampSearchWindowTo24h enforces a 24-hour ceiling on log keyword search windows applied at the API/MCP boundary. With FTS5 disabled (the default), the only safety mechanism for unscoped LIKE substring scans is a hard time-range cap so the worst-case query is bounded in disk pages scanned.

Behaviour:

  • Both zero (no caller hint): defaults to (now-24h, now)
  • end zero: end = now
  • start zero: start = end - 24h
  • end > now: end clamped to now (don't reject — clock skew on the caller side is common)
  • start < now-24h with end >= now-24h: start clamped to now-24h (the window is shrunk to fit the cap, callers never see a silent truncation of an in-window query)
  • end < now-24h (window entirely outside the cap): rejected with error so the caller gets a deterministic signal instead of an empty result
  • start >= end: rejected with error (caller mistake)

The cap fires whenever a body keyword search is requested; pure filtered listings (no Search field) keep the full retention range and bypass this helper.

func DropExpiredLogsPartitions

func DropExpiredLogsPartitions(ctx context.Context, db *gorm.DB, cutoff time.Time) (int, error)

DropExpiredLogsPartitions drops every daily logs partition whose entire upper bound is older than `cutoff`. Returns the number of partitions dropped. Safe to call repeatedly (no-op when nothing matches).

We use the partition catalog (pg_partitioned_table + pg_inherits) instead of guessing names, so partitions created by earlier code paths or operator scripts are also covered.

func EnsureLogsLookahead

func EnsureLogsLookahead(db *gorm.DB, lookaheadDays int) (int, error)

EnsureLogsLookahead ensures partitions exist for the next `lookaheadDays` starting at "today" (UTC). Returns the count of partitions newly created for telemetry.

func EnsureLogsPartitionForDay

func EnsureLogsPartitionForDay(db *gorm.DB, day time.Time) error

EnsureLogsPartitionForDay creates the daily partition that covers `day` (UTC). Idempotent — uses CREATE TABLE IF NOT EXISTS PARTITION OF semantics so concurrent boots / scheduler ticks never collide.

func HasTenantContext

func HasTenantContext(ctx context.Context) bool

HasTenantContext reports whether ctx carries a tenant value set by WithTenantContext. It lets callers (e.g. OTLP ingest) distinguish "no tenant set at all" (fall through to other sources like gRPC metadata) from "explicitly set to default tenant" — a distinction TenantFromContext erases by returning DefaultTenantID for both cases.

func NewDatabase

func NewDatabase(driver, dsn string) (*gorm.DB, error)

NewDatabase creates a GORM database connection for any supported driver. Supported drivers: sqlite, postgres, mysql, sqlserver. Applies per-driver optimizations (WAL for SQLite, connection pooling for others).

func SanitizeTenantID

func SanitizeTenantID(s string) string

SanitizeTenantID validates and normalizes a tenant ID supplied by an HTTP header, gRPC metadata key, or OTLP resource attribute. It returns the empty string for any value the caller should reject (and substitute with their configured default), so the rejection contract is uniform across transports.

Rejection criteria:

  • empty after TrimSpace
  • length exceeds MaxTenantIDLength after trim
  • contains a Unicode control character (\n, \r, \t, NUL, escape codes)

On the happy path it returns the trimmed value verbatim — no case folding, no allowlist, since legitimate tenant IDs may be UUIDs, slugs, or organisation names in non-ASCII scripts.

func TenantFromContext

func TenantFromContext(ctx context.Context) string

TenantFromContext returns the tenant ID stashed on ctx by TenantMiddleware (or WithTenantContext). When the context carries no tenant, or a nil context is passed, it returns DefaultTenantID so single-tenant installs behave as before this feature landed.

func WithTenantContext

func WithTenantContext(ctx context.Context, tenant string) context.Context

WithTenantContext returns a copy of ctx that carries tenant as its tenant ID. Empty strings are coerced to DefaultTenantID so downstream queries always have a non-empty value to filter on.

Types

type CompressedText

type CompressedText string

CompressedText is a string type that is transparently compressed using zstd before being stored in the database. It implements sql.Scanner and driver.Valuer for GORM.

func (CompressedText) GormDBDataType

func (CompressedText) GormDBDataType(db *gorm.DB, _ *schema.Field) string

GormDBDataType returns the dialect-specific binary column type. Zstd-compressed bytes include non-text bytes (magic header 0x28 0xB5 0x2F 0xFD) so a TEXT column would corrupt data on Postgres; BYTEA is required.

func (*CompressedText) Scan

func (ct *CompressedText) Scan(value any) error

func (CompressedText) Value

func (ct CompressedText) Value() (driver.Value, error)

type DashboardStats

type DashboardStats struct {
	TotalTraces        int64          `json:"total_traces"`
	TotalLogs          int64          `json:"total_logs"`
	TotalErrors        int64          `json:"total_errors"`
	AvgLatencyMs       float64        `json:"avg_latency_ms"`
	ErrorRate          float64        `json:"error_rate"`
	ActiveServices     int64          `json:"active_services"`
	P99Latency         int64          `json:"p99_latency"`
	TopFailingServices []ServiceError `json:"top_failing_services"`
}

DashboardStats represents aggregated metrics for the dashboard.

type LatencyPoint

type LatencyPoint struct {
	Timestamp time.Time `json:"timestamp"`
	Duration  int64     `json:"duration"` // Microseconds
}

LatencyPoint represents a data point for the latency heatmap.

type Log

type Log struct {
	ID             uint           `gorm:"primaryKey" json:"id"`
	TenantID       string         `` /* 177-byte string literal not displayed */
	TraceID        string         `gorm:"index;size:32" json:"trace_id"`
	SpanID         string         `gorm:"size:16" json:"span_id"`
	Severity       string         `gorm:"size:50;index:idx_logs_tenant_severity,priority:2" json:"severity"`
	Body           string         `gorm:"type:text" json:"body"`
	ServiceName    string         `gorm:"size:255;index:idx_logs_tenant_service,priority:2" json:"service_name"`
	AttributesJSON CompressedText `json:"attributes_json"`
	AIInsight      CompressedText `json:"ai_insight"`                                                 // Populated by AI analysis
	Timestamp      time.Time      `gorm:"index;index:idx_logs_tenant_ts,priority:2" json:"timestamp"` // standalone index for global retention sweeps
}

Log represents a log entry associated with a trace.

type LogFilter

type LogFilter struct {
	ServiceName string
	Severity    string
	Search      string
	TraceID     string
	StartTime   time.Time
	EndTime     time.Time
	Limit       int
	Offset      int
}

LogFilter defines criteria for searching logs.

type MetricBucket

type MetricBucket struct {
	ID             uint           `gorm:"primaryKey" json:"id"`
	TenantID       string         `` /* 157-byte string literal not displayed */
	Name           string         `gorm:"size:255;not null;index:idx_metrics_tenant_name_bucket,priority:2" json:"name"`
	ServiceName    string         `gorm:"size:255;not null;index:idx_metrics_tenant_service_bucket,priority:2" json:"service_name"`
	TimeBucket     time.Time      `` /* 139-byte string literal not displayed */
	Min            float64        `json:"min"`
	Max            float64        `json:"max"`
	Sum            float64        `json:"sum"`
	Count          int64          `json:"count"`
	AttributesJSON CompressedText `json:"attributes_json"` // Grouped attributes
}

MetricBucket represents aggregated metric data over a time window (e.g., 10s).

type MigrateOptions

type MigrateOptions struct {
	// PostgresPartitioning, when "daily", provisions `logs` as a partitioned
	// table. Any other value (including the empty string) keeps the legacy
	// unpartitioned schema.
	PostgresPartitioning string
	// PartitionLookaheadDays is the number of future daily partitions to
	// pre-create at boot. Defaults to 3 when zero.
	PartitionLookaheadDays int
	// Timeout, when > 0, bounds the AutoMigrate call. Without it,
	// db.AutoMigrate inherits no deadline and an ALTER TABLE waiting on a
	// Postgres relation lock can hang startup indefinitely. The timeout is
	// applied via db.WithContext to the AutoMigrate call only — pre/post
	// hooks (FTS5 triggers, legacy index drops) are not bounded since they
	// don't take long-held locks. Zero preserves legacy unbounded behaviour.
	Timeout time.Duration
}

MigrateOptions tunes AutoMigrateModelsWithOptions. Empty zero value preserves the legacy AutoMigrateModels behaviour.

type PartitionScheduler

type PartitionScheduler struct {
	// contains filtered or unexported fields
}

PartitionScheduler maintains daily logs partitions on Postgres when DB_POSTGRES_PARTITIONING=daily is enabled. Hourly it ensures the next `lookaheadDays` partitions exist; daily it drops partitions whose upper bound predates the retention cutoff. Both passes are idempotent so a stalled tick (or a parallel scheduler from a different replica) is safe.

The scheduler is independent of RetentionScheduler so the legacy DELETE path (used for SQLite/MySQL/MSSQL or non-partitioned Postgres) keeps running on its own loop. When partitioning is enabled, RetentionScheduler SHOULD skip logs — wire that up at construction time, not here.

func NewPartitionScheduler

func NewPartitionScheduler(repo *Repository, retentionDays, lookaheadDays int) *PartitionScheduler

NewPartitionScheduler constructs a scheduler. retentionDays must match the HOT_RETENTION_DAYS setting so DROP PARTITION is the moral equivalent of the hourly DELETE-by-age the RetentionScheduler runs for non-partitioned tables.

func (*PartitionScheduler) SetMetrics

func (s *PartitionScheduler) SetMetrics(onDrop, onKeep func(int))

SetMetrics wires telemetry callbacks. Both arguments may be nil.

func (*PartitionScheduler) Start

func (s *PartitionScheduler) Start(parent context.Context)

Start kicks off the background loop. It performs an initial ensure+drop pass synchronously so a fresh boot has the next-day partition staged before any ingest hits it.

One-shot lifecycle: Start is idempotent (a second call while running is a no-op), and a Start-Stop-Start sequence is NOT supported — Stop closes the internal `done` channel, and re-running Start would re-close it during shutdown of the second iteration. Construct a fresh PartitionScheduler if you need to restart.

func (*PartitionScheduler) Stop

func (s *PartitionScheduler) Stop()

Stop cancels the loop and waits for it to exit. Safe to call multiple times.

type Repository

type Repository struct {
	// contains filtered or unexported fields
}

Repository wraps the GORM database handle for all data access operations.

func NewRepository

func NewRepository(metrics *telemetry.Metrics) (*Repository, error)

NewRepository initializes the database connection using environment variables and migrates the schema.

func NewRepositoryFromDB

func NewRepositoryFromDB(db *gorm.DB, driver string) *Repository

NewRepositoryFromDB constructs a Repository from an existing *gorm.DB. Intended for tests and advanced wiring — production code should use NewRepository.

func (*Repository) BatchCreateAll

func (r *Repository) BatchCreateAll(traces []Trace, spans []Span, logs []Log) error

BatchCreateAll persists traces, spans, and logs in a single DB transaction. The async ingest pipeline uses this path so a failure (or panic) mid-batch rolls back any partial commit, preventing orphan FK rows from a worker that crashed between BatchCreateTraces and BatchCreateSpans.

Idempotency: traces and spans both collapse duplicates silently —

  • traces via idx_traces_tenant_trace_id on (tenant_id, trace_id)
  • spans via idx_spans_tenant_trace_span on (tenant_id, trace_id, span_id)

so a DLQ replay of an already-persisted batch is a safe no-op for those signals. Logs do not yet have a unique key (OTLP logs lack a stable identifier) and a replay can still produce duplicate log rows; that is a separate idempotency concern out of scope for this method.

func (*Repository) BatchCreateLogs

func (r *Repository) BatchCreateLogs(logs []Log) error

BatchCreateLogs inserts multiple logs in batches.

func (*Repository) BatchCreateMetrics

func (r *Repository) BatchCreateMetrics(buckets []MetricBucket) error

BatchCreateMetrics inserts aggregated metrics in batches.

func (*Repository) BatchCreateSpans

func (r *Repository) BatchCreateSpans(spans []Span) error

BatchCreateSpans inserts multiple spans, skipping duplicates. Duplicate is defined per the composite uniqueIndex idx_spans_tenant_trace_span on (tenant_id, trace_id, span_id): a (tenant, trace, span) clash is silently absorbed so DLQ replays (or any duplicate ingest) collapse to a no-op rather than double-inserting.

func (*Repository) BatchCreateTraces

func (r *Repository) BatchCreateTraces(traces []Trace) error

BatchCreateTraces inserts traces, skipping duplicates. Duplicate is defined per the composite uniqueIndex idx_traces_tenant_trace_id on (tenant_id, trace_id): a trace_id clash within the same tenant is ignored, while the same trace_id under a different tenant inserts cleanly.

func (*Repository) Close

func (r *Repository) Close() error

Close closes the underlying database connection.

func (*Repository) CreateTrace

func (r *Repository) CreateTrace(trace Trace) error

CreateTrace inserts a new trace, skipping if it already exists. Uniqueness is per idx_traces_tenant_trace_id (tenant_id, trace_id), so the same trace_id across tenants is allowed.

func (*Repository) DB

func (r *Repository) DB() *gorm.DB

DB returns the underlying gorm.DB for advanced queries.

func (*Repository) DropLogsFTS

func (r *Repository) DropLogsFTS(ctx context.Context) error

DropLogsFTS removes the FTS5 virtual table and its sync triggers, then runs VACUUM to reclaim freed pages. Used by /api/admin/drop_fts on existing SQLite deployments after LOG_FTS_ENABLED has been set to false, to recover the 30-40% of DB disk the inverted index occupied.

VACUUM blocks writes for ~10-60 minutes on a multi-GB DB and cannot run inside a transaction. Idempotent — safe to call when the FTS5 table or triggers are already absent.

func (*Repository) GetDashboardStats

func (r *Repository) GetDashboardStats(ctx context.Context, start, end time.Time, serviceNames []string) (*DashboardStats, error)

GetDashboardStats calculates high-level metrics for the dashboard, scoped to the tenant on ctx.

func (*Repository) GetLatencyHeatmap

func (r *Repository) GetLatencyHeatmap(ctx context.Context, start, end time.Time, serviceNames []string) ([]LatencyPoint, error)

GetLatencyHeatmap returns trace duration and timestamps for heatmap rendering, scoped to the tenant on ctx.

func (*Repository) GetLog

func (r *Repository) GetLog(ctx context.Context, id uint) (*Log, error)

GetLog returns a single log by ID, scoped to the tenant on ctx.

func (*Repository) GetLogContext

func (r *Repository) GetLogContext(ctx context.Context, targetTime time.Time) ([]Log, error)

GetLogContext returns logs surrounding a specific timestamp (+/- 1 minute), scoped to the tenant on ctx.

func (*Repository) GetLogsV2

func (r *Repository) GetLogsV2(ctx context.Context, filter LogFilter) ([]Log, int64, error)

GetLogsV2 performs advanced filtering and search on logs scoped to the tenant on ctx. COUNT and SELECT run in parallel via errgroup for reduced latency.

When `filter.Search` is set and the driver is SQLite, the query routes through the FTS5 virtual table (`logs_fts`) and results are ordered by BM25 relevance. Other drivers continue to use LIKE/ILIKE against logs.body and logs.trace_id.

func (*Repository) GetMetricBuckets

func (r *Repository) GetMetricBuckets(ctx context.Context, start, end time.Time, serviceName string, metricName string) ([]MetricBucket, error)

GetMetricBuckets returns aggregated metrics for a specific time range and service, scoped to the tenant on ctx.

func (*Repository) GetMetricNames

func (r *Repository) GetMetricNames(ctx context.Context, serviceName string) ([]string, error)

GetMetricNames returns a list of distinct metric names for the tenant on ctx, optionally filtered by service.

func (*Repository) GetRecentLogs

func (r *Repository) GetRecentLogs(ctx context.Context, limit int) ([]Log, error)

GetRecentLogs returns the most recent logs scoped to the tenant on ctx.

func (*Repository) GetServiceMapMetrics

func (r *Repository) GetServiceMapMetrics(ctx context.Context, start, end time.Time) (*ServiceMapMetrics, error)

GetServiceMapMetrics computes topology metrics from spans scoped to the tenant on ctx.

func (*Repository) GetServices

func (r *Repository) GetServices(ctx context.Context) ([]string, error)

GetServices returns a list of all distinct service names seen in traces for the tenant on ctx.

func (*Repository) GetSpansForGraph

func (r *Repository) GetSpansForGraph(since time.Time) ([]SpanGraphRow, error)

GetSpansForGraph returns a lightweight projection of recent spans used to rebuild the in-memory service dependency graph.

Duration is stored in microseconds; we convert to milliseconds here so the graph layer doesn't need to know the storage unit.

func (*Repository) GetStats

func (r *Repository) GetStats(ctx context.Context) (map[string]any, error)

GetStats returns high-level database stats scoped to the tenant carried on ctx. Unscoped aggregates (DB size, etc.) are not tenant-specific and are reported as-is.

func (*Repository) GetTrace

func (r *Repository) GetTrace(ctx context.Context, traceID string) (*Trace, error)

GetTrace returns a trace by ID with its spans and logs, scoped to the tenant on ctx. Trace uniqueness is composite (tenant_id, trace_id), so the same trace_id can legitimately exist in multiple tenants; the Preloaded Spans and Logs are filtered by tenant_id as defense-in-depth against cross-tenant child leakage.

func (*Repository) GetTracesFiltered

func (r *Repository) GetTracesFiltered(ctx context.Context, start, end time.Time, serviceNames []string, status, search string, limit, offset int, sortBy, orderBy string) (*TracesResponse, error)

GetTracesFiltered retrieves traces with filtering and pagination, scoped to the tenant on ctx. Spans are NOT eagerly loaded — a single batch summary query is used instead.

func (*Repository) GetTrafficMetrics

func (r *Repository) GetTrafficMetrics(ctx context.Context, start, end time.Time, serviceNames []string) ([]TrafficPoint, error)

GetTrafficMetrics returns request counts bucketed by minute (including error counts), scoped to the tenant on ctx.

func (*Repository) HotDBSizeBytes

func (r *Repository) HotDBSizeBytes() int64

HotDBSizeBytes returns an approximate size of the hot DB in bytes. For SQLite this reads the file size. For others it queries pg_database_size / information_schema.

func (*Repository) ListRecentHighSeverityLogsAllTenants

func (r *Repository) ListRecentHighSeverityLogsAllTenants(ctx context.Context, severity string, since, until time.Time, limit int) ([]Log, error)

ListRecentHighSeverityLogsAllTenants returns recent logs of the given severity across EVERY tenant, each row carrying its own TenantID. This is an administrative read used exclusively by the vector index's startup hydration path, which fans rows out to per-tenant shards. It is not exposed on any tenant-scoped API surface — tenant isolation for read paths must otherwise be preserved via the context-driven WHERE clause.

func (*Repository) LogsForVectorReplay

func (r *Repository) LogsForVectorReplay(ctx context.Context, sinceID uint, limit int) ([]Log, error)

LogsForVectorReplay returns ERROR/WARN-family logs with id > sinceID, page-bounded by limit and ordered by id ASC. Used at startup by the vector-index tail-replay path to pick up DB rows inserted after the last snapshot. The id-ascending order lets the caller use the last row's id as the next page's sinceID — clean cursor pagination, no offset cost.

Cross-tenant by design: vectordb is a global index with per-doc tenant tags enforced at Search time. Not exposed on any tenant-scoped API.

Severity filter is intentionally narrow (ERROR / WARN / WARNING / FATAL / CRITICAL) so non-indexed rows don't waste page space; this matches vectordb.shouldIndex().

func (*Repository) LogsPartitioned

func (r *Repository) LogsPartitioned() bool

LogsPartitioned reports whether the `logs` table is provisioned as a declarative partitioned parent. Used by RetentionScheduler to bypass the row-level DELETE path when partition-level DROP is in charge of retention.

func (*Repository) MarkLogsPartitioned

func (r *Repository) MarkLogsPartitioned()

MarkLogsPartitioned flips the partitioned flag. Called by the partitioning setup path (factory.go) once the partitioned schema is in place.

func (*Repository) PurgeLogs

func (r *Repository) PurgeLogs(olderThan time.Time) (int64, error)

PurgeLogs deletes logs older than the given timestamp in a single statement. Suitable for SQLite; for Postgres at large retention volumes prefer PurgeLogsBatched.

func (*Repository) PurgeLogsBatched

func (r *Repository) PurgeLogsBatched(ctx context.Context, olderThan time.Time, batchSize int, sleep time.Duration) (int64, error)

PurgeLogsBatched deletes logs in bounded chunks to avoid long locks and bloat on Postgres/MySQL. On SQLite it falls through to a single-statement delete.

Tenant scope: this is a SYSTEM-WIDE retention operation and intentionally does NOT filter by tenant. All rows older than olderThan are purged across every tenant. Never expose this on a tenant-scoped API surface.

func (*Repository) PurgeMetricBucketsBatched

func (r *Repository) PurgeMetricBucketsBatched(ctx context.Context, olderThan time.Time, batchSize int, sleep time.Duration) (int64, error)

PurgeMetricBucketsBatched deletes metric buckets older than the given timestamp in bounded chunks.

Tenant scope: this is a SYSTEM-WIDE retention operation and intentionally does NOT filter by tenant. Rows are deleted across every tenant. Never expose this on a tenant-scoped API surface.

func (*Repository) PurgeTraces

func (r *Repository) PurgeTraces(olderThan time.Time) (int64, error)

PurgeTraces deletes traces older than the given timestamp in a single statement. Uses Unscoped() for a hard DELETE (Trace has a soft-delete column that would otherwise leave rows present and block storage reclamation).

func (*Repository) PurgeTracesBatched

func (r *Repository) PurgeTracesBatched(ctx context.Context, olderThan time.Time, batchSize int, sleep time.Duration) (int64, error)

PurgeTracesBatched deletes traces (and their spans) in bounded chunks. On SQLite it falls through to a single-statement delete.

Tenant scope: this is a SYSTEM-WIDE retention operation and intentionally does NOT filter by tenant. Rows are deleted across every tenant. Never expose this on a tenant-scoped API surface.

func (*Repository) RecentLogs

func (r *Repository) RecentLogs(ctx context.Context, limit int) ([]Log, error)

RecentLogs returns the most recent logs scoped to the tenant carried on ctx.

func (*Repository) RecentTraces

func (r *Repository) RecentTraces(ctx context.Context, limit int) ([]Trace, error)

RecentTraces returns the most recent traces scoped to the tenant carried on ctx.

func (*Repository) SearchLogs

func (r *Repository) SearchLogs(ctx context.Context, query string, limit int) ([]Log, error)

SearchLogs searches for logs based on query, scoped to the tenant carried on ctx.

On SQLite the search routes through the FTS5 virtual table (`logs_fts`) and orders by BM25 score for relevance. On Postgres / MySQL / others it falls back to LIKE/ILIKE against logs.body and logs.service_name (Postgres uses the pg_trgm GIN indexes built in AutoMigrateModels).

func (*Repository) UpdateLogInsight

func (r *Repository) UpdateLogInsight(ctx context.Context, logID uint, insight string) error

UpdateLogInsight updates the AI insight for a specific log. The update is scoped to the tenant derived from ctx — a caller that attempts to update a log belonging to another tenant gets ErrLogNotFoundOrWrongTenant (IDOR fix).

func (*Repository) VacuumDB

func (r *Repository) VacuumDB() error

VacuumDB runs VACUUM on the database (SQLite only, no-op for others).

type RetentionScheduler

type RetentionScheduler struct {
	// contains filtered or unexported fields
}

RetentionScheduler periodically enforces hot-DB retention and runs DB maintenance. On startup and hourly thereafter it deletes rows older than retentionDays. Daily it runs driver-appropriate maintenance (VACUUM ANALYZE / OPTIMIZE / VACUUM).

func NewRetentionScheduler

func NewRetentionScheduler(repo *Repository, retentionDays, batchSize int, batchSleep time.Duration) *RetentionScheduler

NewRetentionScheduler constructs a scheduler but does not start it. batchSize <= 0 defaults to 10_000; batchSleep < 0 defaults to 5ms.

func (*RetentionScheduler) SkippedRuns

func (r *RetentionScheduler) SkippedRuns() int64

SkippedRuns returns the number of purge/maintenance ticks that were dropped because a previous run was still executing. Intended for tests and telemetry.

func (*RetentionScheduler) Start

func (r *RetentionScheduler) Start(parent context.Context)

Start launches the scheduler goroutine. It runs an initial purge immediately. Idempotent and race-free: atomic CAS elects the first caller, and mu publishes cancel+done before any concurrent Stop can observe started=true.

func (*RetentionScheduler) Stop

func (r *RetentionScheduler) Stop()

Stop signals the scheduler to exit and waits for the loop to return. No-op if Start was never called. Safe to call concurrently / repeatedly.

type ServiceError

type ServiceError struct {
	ServiceName string  `json:"service_name"`
	ErrorCount  int64   `json:"error_count"`
	TotalCount  int64   `json:"total_count"`
	ErrorRate   float64 `json:"error_rate"`
}

ServiceError represents error counts per service.

type ServiceMapEdge

type ServiceMapEdge struct {
	Source       string  `json:"source"`
	Target       string  `json:"target"`
	CallCount    int64   `json:"call_count"`
	AvgLatencyMs float64 `json:"avg_latency_ms"`
	ErrorRate    float64 `json:"error_rate"`
}

ServiceMapEdge represents a connection between two services.

type ServiceMapMetrics

type ServiceMapMetrics struct {
	Nodes []ServiceMapNode `json:"nodes"`
	Edges []ServiceMapEdge `json:"edges"`
}

ServiceMapMetrics holds the complete service topology with metrics.

type ServiceMapNode

type ServiceMapNode struct {
	Name         string  `json:"name"`
	TotalTraces  int64   `json:"total_traces"`
	ErrorCount   int64   `json:"error_count"`
	AvgLatencyMs float64 `json:"avg_latency_ms"`
}

ServiceMapNode represents a single service node on the service map.

type Span

type Span struct {
	ID             uint           `gorm:"primaryKey" json:"id"`
	TenantID       string         `` /* 197-byte string literal not displayed */
	TraceID        string         `` /* 130-byte string literal not displayed */
	SpanID         string         `gorm:"size:16;not null;uniqueIndex:idx_spans_tenant_trace_span,priority:3" json:"span_id"`
	ParentSpanID   string         `gorm:"size:16" json:"parent_span_id"`
	OperationName  string         `gorm:"size:255;index" json:"operation_name"`
	StartTime      time.Time      `gorm:"index:idx_spans_tenant_service_start,priority:3" json:"start_time"`
	EndTime        time.Time      `json:"end_time"`
	Duration       int64          `json:"duration"`                                                                     // Microseconds
	ServiceName    string         `gorm:"size:255;index:idx_spans_tenant_service_start,priority:2" json:"service_name"` // Originating service
	Status         string         `gorm:"size:50;default:'STATUS_CODE_UNSET';index" json:"status"`                      // OTLP status code (e.g. STATUS_CODE_ERROR); drives GraphRAG error signal
	AttributesJSON CompressedText `json:"attributes_json"`                                                              // Compressed JSON string
}

Span represents a single operation within a trace.

Idempotency: the composite uniqueIndex idx_spans_tenant_trace_span on (tenant_id, trace_id, span_id) ensures a span is written at most once per tenant. DLQ replay (or any duplicate ingest) collapses cleanly via OnConflict.DoNothing in BatchCreateAll/BatchCreateSpans rather than double-counting in downstream metrics or GraphRAG. The composite covers the legacy idx_spans_tenant_trace as a left-prefix; the legacy index is retained for query-plan stability across upgrades.

type SpanGraphRow

type SpanGraphRow struct {
	SpanID        string
	ParentSpanID  string
	ServiceName   string
	OperationName string
	DurationMs    float64
	IsError       bool
	Timestamp     time.Time
}

SpanGraphRow is the minimal projection needed by the in-memory service graph.

type Trace

type Trace struct {
	ID          uint    `gorm:"primaryKey" json:"id"`
	TenantID    string  `` /* 189-byte string literal not displayed */
	TraceID     string  `gorm:"size:32;not null;uniqueIndex:idx_traces_tenant_trace_id,priority:2" json:"trace_id"`
	ServiceName string  `gorm:"size:255;index:idx_traces_tenant_service,priority:2" json:"service_name"`
	Duration    int64   `gorm:"index" json:"duration"` // Microseconds
	DurationMs  float64 `gorm:"-" json:"duration_ms"`
	SpanCount   int     `gorm:"-" json:"span_count"`
	Operation   string  `gorm:"-" json:"operation"`
	Status      string  `gorm:"size:50" json:"status"`
	// Timestamp is both part of idx_traces_tenant_ts (composite) and retains a
	// standalone index so range scans on traces across all tenants (e.g.
	// retention sweeps) still use an index.
	Timestamp time.Time      `gorm:"index;index:idx_traces_tenant_ts,priority:2" json:"timestamp"`
	Spans     []Span         `gorm:"foreignKey:TraceID;references:TraceID;constraint:false" json:"spans,omitempty"`
	Logs      []Log          `gorm:"foreignKey:TraceID;references:TraceID;constraint:false" json:"logs,omitempty"`
	CreatedAt time.Time      `json:"-"`
	UpdatedAt time.Time      `json:"-"`
	DeletedAt gorm.DeletedAt `gorm:"index" json:"-"`
}

Trace represents a complete distributed trace.

Index strategy: single-column tenant_id is redundant — every tenant-scoped query joins tenant_id with another filter (timestamp, service_name). The leftmost column of a composite index satisfies single-column tenant lookups, so we only declare composites. TraceID uniqueness is scoped to (tenant_id, trace_id): distinct tenants may legitimately ingest identical trace_ids (RAN-21). The old standalone `uniqueIndex` on trace_id is dropped at migration time by dropLegacyTraceIDUniqueIndex.

type TracesResponse

type TracesResponse struct {
	Traces []Trace `json:"traces"`
	Total  int64   `json:"total"`
	Limit  int     `json:"limit"`
	Offset int     `json:"offset"`
}

TracesResponse represents the response for the traces endpoint with pagination

type TrafficPoint

type TrafficPoint struct {
	Timestamp  time.Time `json:"timestamp"`
	Count      int64     `json:"count"`
	ErrorCount int64     `json:"error_count"`
}

TrafficPoint represents a data point for the traffic chart.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL