Documentation
¶
Index ¶
- Constants
- Variables
- func AutoMigrateModels(db *gorm.DB, driver string) error
- func AutoMigrateModelsWithOptions(db *gorm.DB, driver string, opts MigrateOptions) error
- func ClampSearchWindowTo24h(start, end, now time.Time) (time.Time, time.Time, error)
- func DropExpiredLogsPartitions(ctx context.Context, db *gorm.DB, cutoff time.Time) (int, error)
- func EnsureLogsLookahead(db *gorm.DB, lookaheadDays int) (int, error)
- func EnsureLogsPartitionForDay(db *gorm.DB, day time.Time) error
- func HasTenantContext(ctx context.Context) bool
- func NewDatabase(driver, dsn string) (*gorm.DB, error)
- func SanitizeTenantID(s string) string
- func TenantFromContext(ctx context.Context) string
- func WithTenantContext(ctx context.Context, tenant string) context.Context
- type CompressedText
- type DashboardStats
- type LatencyPoint
- type Log
- type LogFilter
- type MetricBucket
- type MigrateOptions
- type PartitionScheduler
- type Repository
- func (r *Repository) BatchCreateAll(traces []Trace, spans []Span, logs []Log) error
- func (r *Repository) BatchCreateLogs(logs []Log) error
- func (r *Repository) BatchCreateMetrics(buckets []MetricBucket) error
- func (r *Repository) BatchCreateSpans(spans []Span) error
- func (r *Repository) BatchCreateTraces(traces []Trace) error
- func (r *Repository) Close() error
- func (r *Repository) CreateTrace(trace Trace) error
- func (r *Repository) DB() *gorm.DB
- func (r *Repository) DropLogsFTS(ctx context.Context) error
- func (r *Repository) GetDashboardStats(ctx context.Context, start, end time.Time, serviceNames []string) (*DashboardStats, error)
- func (r *Repository) GetLatencyHeatmap(ctx context.Context, start, end time.Time, serviceNames []string) ([]LatencyPoint, error)
- func (r *Repository) GetLog(ctx context.Context, id uint) (*Log, error)
- func (r *Repository) GetLogContext(ctx context.Context, targetTime time.Time) ([]Log, error)
- func (r *Repository) GetLogsV2(ctx context.Context, filter LogFilter) ([]Log, int64, error)
- func (r *Repository) GetMetricBuckets(ctx context.Context, start, end time.Time, serviceName string, ...) ([]MetricBucket, error)
- func (r *Repository) GetMetricNames(ctx context.Context, serviceName string) ([]string, error)
- func (r *Repository) GetRecentLogs(ctx context.Context, limit int) ([]Log, error)
- func (r *Repository) GetServiceMapMetrics(ctx context.Context, start, end time.Time) (*ServiceMapMetrics, error)
- func (r *Repository) GetServices(ctx context.Context) ([]string, error)
- func (r *Repository) GetSpansForGraph(since time.Time) ([]SpanGraphRow, error)
- func (r *Repository) GetStats(ctx context.Context) (map[string]any, error)
- func (r *Repository) GetTrace(ctx context.Context, traceID string) (*Trace, error)
- func (r *Repository) GetTracesFiltered(ctx context.Context, start, end time.Time, serviceNames []string, ...) (*TracesResponse, error)
- func (r *Repository) GetTrafficMetrics(ctx context.Context, start, end time.Time, serviceNames []string) ([]TrafficPoint, error)
- func (r *Repository) HotDBSizeBytes() int64
- func (r *Repository) ListRecentHighSeverityLogsAllTenants(ctx context.Context, severity string, since, until time.Time, limit int) ([]Log, error)
- func (r *Repository) LogsForVectorReplay(ctx context.Context, sinceID uint, limit int) ([]Log, error)
- func (r *Repository) LogsPartitioned() bool
- func (r *Repository) MarkLogsPartitioned()
- func (r *Repository) PurgeLogs(olderThan time.Time) (int64, error)
- func (r *Repository) PurgeLogsBatched(ctx context.Context, olderThan time.Time, batchSize int, sleep time.Duration) (int64, error)
- func (r *Repository) PurgeMetricBucketsBatched(ctx context.Context, olderThan time.Time, batchSize int, sleep time.Duration) (int64, error)
- func (r *Repository) PurgeTraces(olderThan time.Time) (int64, error)
- func (r *Repository) PurgeTracesBatched(ctx context.Context, olderThan time.Time, batchSize int, sleep time.Duration) (int64, error)
- func (r *Repository) RecentLogs(ctx context.Context, limit int) ([]Log, error)
- func (r *Repository) RecentTraces(ctx context.Context, limit int) ([]Trace, error)
- func (r *Repository) SearchLogs(ctx context.Context, query string, limit int) ([]Log, error)
- func (r *Repository) UpdateLogInsight(ctx context.Context, logID uint, insight string) error
- func (r *Repository) VacuumDB() error
- type RetentionScheduler
- type ServiceError
- type ServiceMapEdge
- type ServiceMapMetrics
- type ServiceMapNode
- type Span
- type SpanGraphRow
- type Trace
- type TracesResponse
- type TrafficPoint
Constants ¶
const DefaultTenantID = "default"
DefaultTenantID is the tenant assigned when no X-Tenant-ID header or tenant.id resource attribute is present on an OTLP request.
const MaxTenantIDLength = 128
MaxTenantIDLength caps the length of an accepted tenant ID. Tenant IDs are stored in a VARCHAR(64) column on every domain row plus propagate into structured logs and Prometheus labels. The cap is a defense in depth against silent VARCHAR truncation at insert time and unbounded label cardinality from a hostile or misconfigured client.
const PartitioningModeDaily = "daily"
PartitioningModeDaily is the canonical opt-in value for daily partitioning.
Variables ¶
var ErrLogNotFoundOrWrongTenant = errors.New("log not found or not accessible by current tenant")
ErrLogNotFoundOrWrongTenant is returned by UpdateLogInsight when the target row does not exist or belongs to a different tenant. Handlers should translate this to a 404 (to avoid confirming the existence of cross-tenant rows).
Functions ¶
func AutoMigrateModels ¶
AutoMigrateModels runs GORM auto-migration for all OtelContext models.
When DB_POSTGRES_PARTITIONING=daily, the `logs` table is provisioned as a declarative range-partitioned parent BEFORE GORM AutoMigrate runs, so AutoMigrate sees an existing table and skips it (GORM's IF NOT EXISTS behaviour). This is greenfield-only — see setupPostgresPartitionedLogs for the safety check.
func AutoMigrateModelsWithOptions ¶
func AutoMigrateModelsWithOptions(db *gorm.DB, driver string, opts MigrateOptions) error
AutoMigrateModelsWithOptions is the option-driven variant of AutoMigrateModels. Existing callers should continue to use AutoMigrateModels — the options entry point is for new wiring (currently main.go) that needs to plumb the partitioning flag.
func ClampSearchWindowTo24h ¶
ClampSearchWindowTo24h enforces a 24-hour ceiling on log keyword search windows applied at the API/MCP boundary. With FTS5 disabled (the default), the only safety mechanism for unscoped LIKE substring scans is a hard time-range cap so the worst-case query is bounded in disk pages scanned.
Behaviour:
- Both zero (no caller hint): defaults to (now-24h, now)
- end zero: end = now
- start zero: start = end - 24h
- end > now: end clamped to now (don't reject — clock skew on the caller side is common)
- start < now-24h with end >= now-24h: start clamped to now-24h (the window is shrunk to fit the cap, callers never see a silent truncation of an in-window query)
- end < now-24h (window entirely outside the cap): rejected with error so the caller gets a deterministic signal instead of an empty result
- start >= end: rejected with error (caller mistake)
The cap fires whenever a body keyword search is requested; pure filtered listings (no Search field) keep the full retention range and bypass this helper.
func DropExpiredLogsPartitions ¶
DropExpiredLogsPartitions drops every daily logs partition whose entire upper bound is older than `cutoff`. Returns the number of partitions dropped. Safe to call repeatedly (no-op when nothing matches).
We use the partition catalog (pg_partitioned_table + pg_inherits) instead of guessing names, so partitions created by earlier code paths or operator scripts are also covered.
func EnsureLogsLookahead ¶
EnsureLogsLookahead ensures partitions exist for the next `lookaheadDays` starting at "today" (UTC). Returns the count of partitions newly created for telemetry.
func EnsureLogsPartitionForDay ¶
EnsureLogsPartitionForDay creates the daily partition that covers `day` (UTC). Idempotent — uses CREATE TABLE IF NOT EXISTS PARTITION OF semantics so concurrent boots / scheduler ticks never collide.
func HasTenantContext ¶
HasTenantContext reports whether ctx carries a tenant value set by WithTenantContext. It lets callers (e.g. OTLP ingest) distinguish "no tenant set at all" (fall through to other sources like gRPC metadata) from "explicitly set to default tenant" — a distinction TenantFromContext erases by returning DefaultTenantID for both cases.
func NewDatabase ¶
NewDatabase creates a GORM database connection for any supported driver. Supported drivers: sqlite, postgres, mysql, sqlserver. Applies per-driver optimizations (WAL for SQLite, connection pooling for others).
func SanitizeTenantID ¶
SanitizeTenantID validates and normalizes a tenant ID supplied by an HTTP header, gRPC metadata key, or OTLP resource attribute. It returns the empty string for any value the caller should reject (and substitute with their configured default), so the rejection contract is uniform across transports.
Rejection criteria:
- empty after TrimSpace
- length exceeds MaxTenantIDLength after trim
- contains a Unicode control character (\n, \r, \t, NUL, escape codes)
On the happy path it returns the trimmed value verbatim — no case folding, no allowlist, since legitimate tenant IDs may be UUIDs, slugs, or organisation names in non-ASCII scripts.
func TenantFromContext ¶
TenantFromContext returns the tenant ID stashed on ctx by TenantMiddleware (or WithTenantContext). When the context carries no tenant, or a nil context is passed, it returns DefaultTenantID so single-tenant installs behave as before this feature landed.
Types ¶
type CompressedText ¶
type CompressedText string
CompressedText is a string type that is transparently compressed using zstd before being stored in the database. It implements sql.Scanner and driver.Valuer for GORM.
func (CompressedText) GormDBDataType ¶
GormDBDataType returns the dialect-specific binary column type. Zstd-compressed bytes include non-text bytes (magic header 0x28 0xB5 0x2F 0xFD) so a TEXT column would corrupt data on Postgres; BYTEA is required.
func (*CompressedText) Scan ¶
func (ct *CompressedText) Scan(value any) error
type DashboardStats ¶
type DashboardStats struct {
TotalTraces int64 `json:"total_traces"`
TotalLogs int64 `json:"total_logs"`
TotalErrors int64 `json:"total_errors"`
AvgLatencyMs float64 `json:"avg_latency_ms"`
ErrorRate float64 `json:"error_rate"`
ActiveServices int64 `json:"active_services"`
P99Latency int64 `json:"p99_latency"`
TopFailingServices []ServiceError `json:"top_failing_services"`
}
DashboardStats represents aggregated metrics for the dashboard.
type LatencyPoint ¶
type LatencyPoint struct {
Timestamp time.Time `json:"timestamp"`
Duration int64 `json:"duration"` // Microseconds
}
LatencyPoint represents a data point for the latency heatmap.
type Log ¶
type Log struct {
ID uint `gorm:"primaryKey" json:"id"`
TenantID string `` /* 177-byte string literal not displayed */
TraceID string `gorm:"index;size:32" json:"trace_id"`
SpanID string `gorm:"size:16" json:"span_id"`
Severity string `gorm:"size:50;index:idx_logs_tenant_severity,priority:2" json:"severity"`
Body string `gorm:"type:text" json:"body"`
ServiceName string `gorm:"size:255;index:idx_logs_tenant_service,priority:2" json:"service_name"`
AttributesJSON CompressedText `json:"attributes_json"`
AIInsight CompressedText `json:"ai_insight"` // Populated by AI analysis
Timestamp time.Time `gorm:"index;index:idx_logs_tenant_ts,priority:2" json:"timestamp"` // standalone index for global retention sweeps
}
Log represents a log entry associated with a trace.
type LogFilter ¶
type LogFilter struct {
ServiceName string
Severity string
Search string
TraceID string
StartTime time.Time
EndTime time.Time
Limit int
Offset int
}
LogFilter defines criteria for searching logs.
type MetricBucket ¶
type MetricBucket struct {
ID uint `gorm:"primaryKey" json:"id"`
TenantID string `` /* 157-byte string literal not displayed */
Name string `gorm:"size:255;not null;index:idx_metrics_tenant_name_bucket,priority:2" json:"name"`
ServiceName string `gorm:"size:255;not null;index:idx_metrics_tenant_service_bucket,priority:2" json:"service_name"`
TimeBucket time.Time `` /* 139-byte string literal not displayed */
Min float64 `json:"min"`
Max float64 `json:"max"`
Sum float64 `json:"sum"`
Count int64 `json:"count"`
AttributesJSON CompressedText `json:"attributes_json"` // Grouped attributes
}
MetricBucket represents aggregated metric data over a time window (e.g., 10s).
type MigrateOptions ¶
type MigrateOptions struct {
// PostgresPartitioning, when "daily", provisions `logs` as a partitioned
// table. Any other value (including the empty string) keeps the legacy
// unpartitioned schema.
PostgresPartitioning string
// PartitionLookaheadDays is the number of future daily partitions to
// pre-create at boot. Defaults to 3 when zero.
PartitionLookaheadDays int
// Timeout, when > 0, bounds the AutoMigrate call. Without it,
// db.AutoMigrate inherits no deadline and an ALTER TABLE waiting on a
// Postgres relation lock can hang startup indefinitely. The timeout is
// applied via db.WithContext to the AutoMigrate call only — pre/post
// hooks (FTS5 triggers, legacy index drops) are not bounded since they
// don't take long-held locks. Zero preserves legacy unbounded behaviour.
Timeout time.Duration
}
MigrateOptions tunes AutoMigrateModelsWithOptions. Empty zero value preserves the legacy AutoMigrateModels behaviour.
type PartitionScheduler ¶
type PartitionScheduler struct {
// contains filtered or unexported fields
}
PartitionScheduler maintains daily logs partitions on Postgres when DB_POSTGRES_PARTITIONING=daily is enabled. Hourly it ensures the next `lookaheadDays` partitions exist; daily it drops partitions whose upper bound predates the retention cutoff. Both passes are idempotent so a stalled tick (or a parallel scheduler from a different replica) is safe.
The scheduler is independent of RetentionScheduler so the legacy DELETE path (used for SQLite/MySQL/MSSQL or non-partitioned Postgres) keeps running on its own loop. When partitioning is enabled, RetentionScheduler SHOULD skip logs — wire that up at construction time, not here.
func NewPartitionScheduler ¶
func NewPartitionScheduler(repo *Repository, retentionDays, lookaheadDays int) *PartitionScheduler
NewPartitionScheduler constructs a scheduler. retentionDays must match the HOT_RETENTION_DAYS setting so DROP PARTITION is the moral equivalent of the hourly DELETE-by-age the RetentionScheduler runs for non-partitioned tables.
func (*PartitionScheduler) SetMetrics ¶
func (s *PartitionScheduler) SetMetrics(onDrop, onKeep func(int))
SetMetrics wires telemetry callbacks. Both arguments may be nil.
func (*PartitionScheduler) Start ¶
func (s *PartitionScheduler) Start(parent context.Context)
Start kicks off the background loop. It performs an initial ensure+drop pass synchronously so a fresh boot has the next-day partition staged before any ingest hits it.
One-shot lifecycle: Start is idempotent (a second call while running is a no-op), and a Start-Stop-Start sequence is NOT supported — Stop closes the internal `done` channel, and re-running Start would re-close it during shutdown of the second iteration. Construct a fresh PartitionScheduler if you need to restart.
func (*PartitionScheduler) Stop ¶
func (s *PartitionScheduler) Stop()
Stop cancels the loop and waits for it to exit. Safe to call multiple times.
type Repository ¶
type Repository struct {
// contains filtered or unexported fields
}
Repository wraps the GORM database handle for all data access operations.
func NewRepository ¶
func NewRepository(metrics *telemetry.Metrics) (*Repository, error)
NewRepository initializes the database connection using environment variables and migrates the schema.
func NewRepositoryFromDB ¶
func NewRepositoryFromDB(db *gorm.DB, driver string) *Repository
NewRepositoryFromDB constructs a Repository from an existing *gorm.DB. Intended for tests and advanced wiring — production code should use NewRepository.
func (*Repository) BatchCreateAll ¶
func (r *Repository) BatchCreateAll(traces []Trace, spans []Span, logs []Log) error
BatchCreateAll persists traces, spans, and logs in a single DB transaction. The async ingest pipeline uses this path so a failure (or panic) mid-batch rolls back any partial commit, preventing orphan FK rows from a worker that crashed between BatchCreateTraces and BatchCreateSpans.
Idempotency: traces and spans both collapse duplicates silently —
- traces via idx_traces_tenant_trace_id on (tenant_id, trace_id)
- spans via idx_spans_tenant_trace_span on (tenant_id, trace_id, span_id)
so a DLQ replay of an already-persisted batch is a safe no-op for those signals. Logs do not yet have a unique key (OTLP logs lack a stable identifier) and a replay can still produce duplicate log rows; that is a separate idempotency concern out of scope for this method.
func (*Repository) BatchCreateLogs ¶
func (r *Repository) BatchCreateLogs(logs []Log) error
BatchCreateLogs inserts multiple logs in batches.
func (*Repository) BatchCreateMetrics ¶
func (r *Repository) BatchCreateMetrics(buckets []MetricBucket) error
BatchCreateMetrics inserts aggregated metrics in batches.
func (*Repository) BatchCreateSpans ¶
func (r *Repository) BatchCreateSpans(spans []Span) error
BatchCreateSpans inserts multiple spans, skipping duplicates. Duplicate is defined per the composite uniqueIndex idx_spans_tenant_trace_span on (tenant_id, trace_id, span_id): a (tenant, trace, span) clash is silently absorbed so DLQ replays (or any duplicate ingest) collapse to a no-op rather than double-inserting.
func (*Repository) BatchCreateTraces ¶
func (r *Repository) BatchCreateTraces(traces []Trace) error
BatchCreateTraces inserts traces, skipping duplicates. Duplicate is defined per the composite uniqueIndex idx_traces_tenant_trace_id on (tenant_id, trace_id): a trace_id clash within the same tenant is ignored, while the same trace_id under a different tenant inserts cleanly.
func (*Repository) Close ¶
func (r *Repository) Close() error
Close closes the underlying database connection.
func (*Repository) CreateTrace ¶
func (r *Repository) CreateTrace(trace Trace) error
CreateTrace inserts a new trace, skipping if it already exists. Uniqueness is per idx_traces_tenant_trace_id (tenant_id, trace_id), so the same trace_id across tenants is allowed.
func (*Repository) DB ¶
func (r *Repository) DB() *gorm.DB
DB returns the underlying gorm.DB for advanced queries.
func (*Repository) DropLogsFTS ¶
func (r *Repository) DropLogsFTS(ctx context.Context) error
DropLogsFTS removes the FTS5 virtual table and its sync triggers, then runs VACUUM to reclaim freed pages. Used by /api/admin/drop_fts on existing SQLite deployments after LOG_FTS_ENABLED has been set to false, to recover the 30-40% of DB disk the inverted index occupied.
VACUUM blocks writes for ~10-60 minutes on a multi-GB DB and cannot run inside a transaction. Idempotent — safe to call when the FTS5 table or triggers are already absent.
func (*Repository) GetDashboardStats ¶
func (r *Repository) GetDashboardStats(ctx context.Context, start, end time.Time, serviceNames []string) (*DashboardStats, error)
GetDashboardStats calculates high-level metrics for the dashboard, scoped to the tenant on ctx.
func (*Repository) GetLatencyHeatmap ¶
func (r *Repository) GetLatencyHeatmap(ctx context.Context, start, end time.Time, serviceNames []string) ([]LatencyPoint, error)
GetLatencyHeatmap returns trace duration and timestamps for heatmap rendering, scoped to the tenant on ctx.
func (*Repository) GetLogContext ¶
GetLogContext returns logs surrounding a specific timestamp (+/- 1 minute), scoped to the tenant on ctx.
func (*Repository) GetLogsV2 ¶
GetLogsV2 performs advanced filtering and search on logs scoped to the tenant on ctx. COUNT and SELECT run in parallel via errgroup for reduced latency.
When `filter.Search` is set and the driver is SQLite, the query routes through the FTS5 virtual table (`logs_fts`) and results are ordered by BM25 relevance. Other drivers continue to use LIKE/ILIKE against logs.body and logs.trace_id.
func (*Repository) GetMetricBuckets ¶
func (r *Repository) GetMetricBuckets(ctx context.Context, start, end time.Time, serviceName string, metricName string) ([]MetricBucket, error)
GetMetricBuckets returns aggregated metrics for a specific time range and service, scoped to the tenant on ctx.
func (*Repository) GetMetricNames ¶
GetMetricNames returns a list of distinct metric names for the tenant on ctx, optionally filtered by service.
func (*Repository) GetRecentLogs ¶
GetRecentLogs returns the most recent logs scoped to the tenant on ctx.
func (*Repository) GetServiceMapMetrics ¶
func (r *Repository) GetServiceMapMetrics(ctx context.Context, start, end time.Time) (*ServiceMapMetrics, error)
GetServiceMapMetrics computes topology metrics from spans scoped to the tenant on ctx.
func (*Repository) GetServices ¶
func (r *Repository) GetServices(ctx context.Context) ([]string, error)
GetServices returns a list of all distinct service names seen in traces for the tenant on ctx.
func (*Repository) GetSpansForGraph ¶
func (r *Repository) GetSpansForGraph(since time.Time) ([]SpanGraphRow, error)
GetSpansForGraph returns a lightweight projection of recent spans used to rebuild the in-memory service dependency graph.
Duration is stored in microseconds; we convert to milliseconds here so the graph layer doesn't need to know the storage unit.
func (*Repository) GetStats ¶
GetStats returns high-level database stats scoped to the tenant carried on ctx. Unscoped aggregates (DB size, etc.) are not tenant-specific and are reported as-is.
func (*Repository) GetTrace ¶
GetTrace returns a trace by ID with its spans and logs, scoped to the tenant on ctx. Trace uniqueness is composite (tenant_id, trace_id), so the same trace_id can legitimately exist in multiple tenants; the Preloaded Spans and Logs are filtered by tenant_id as defense-in-depth against cross-tenant child leakage.
func (*Repository) GetTracesFiltered ¶
func (r *Repository) GetTracesFiltered(ctx context.Context, start, end time.Time, serviceNames []string, status, search string, limit, offset int, sortBy, orderBy string) (*TracesResponse, error)
GetTracesFiltered retrieves traces with filtering and pagination, scoped to the tenant on ctx. Spans are NOT eagerly loaded — a single batch summary query is used instead.
func (*Repository) GetTrafficMetrics ¶
func (r *Repository) GetTrafficMetrics(ctx context.Context, start, end time.Time, serviceNames []string) ([]TrafficPoint, error)
GetTrafficMetrics returns request counts bucketed by minute (including error counts), scoped to the tenant on ctx.
func (*Repository) HotDBSizeBytes ¶
func (r *Repository) HotDBSizeBytes() int64
HotDBSizeBytes returns an approximate size of the hot DB in bytes. For SQLite this reads the file size. For others it queries pg_database_size / information_schema.
func (*Repository) ListRecentHighSeverityLogsAllTenants ¶
func (r *Repository) ListRecentHighSeverityLogsAllTenants(ctx context.Context, severity string, since, until time.Time, limit int) ([]Log, error)
ListRecentHighSeverityLogsAllTenants returns recent logs of the given severity across EVERY tenant, each row carrying its own TenantID. This is an administrative read used exclusively by the vector index's startup hydration path, which fans rows out to per-tenant shards. It is not exposed on any tenant-scoped API surface — tenant isolation for read paths must otherwise be preserved via the context-driven WHERE clause.
func (*Repository) LogsForVectorReplay ¶
func (r *Repository) LogsForVectorReplay(ctx context.Context, sinceID uint, limit int) ([]Log, error)
LogsForVectorReplay returns ERROR/WARN-family logs with id > sinceID, page-bounded by limit and ordered by id ASC. Used at startup by the vector-index tail-replay path to pick up DB rows inserted after the last snapshot. The id-ascending order lets the caller use the last row's id as the next page's sinceID — clean cursor pagination, no offset cost.
Cross-tenant by design: vectordb is a global index with per-doc tenant tags enforced at Search time. Not exposed on any tenant-scoped API.
Severity filter is intentionally narrow (ERROR / WARN / WARNING / FATAL / CRITICAL) so non-indexed rows don't waste page space; this matches vectordb.shouldIndex().
func (*Repository) LogsPartitioned ¶
func (r *Repository) LogsPartitioned() bool
LogsPartitioned reports whether the `logs` table is provisioned as a declarative partitioned parent. Used by RetentionScheduler to bypass the row-level DELETE path when partition-level DROP is in charge of retention.
func (*Repository) MarkLogsPartitioned ¶
func (r *Repository) MarkLogsPartitioned()
MarkLogsPartitioned flips the partitioned flag. Called by the partitioning setup path (factory.go) once the partitioned schema is in place.
func (*Repository) PurgeLogs ¶
func (r *Repository) PurgeLogs(olderThan time.Time) (int64, error)
PurgeLogs deletes logs older than the given timestamp in a single statement. Suitable for SQLite; for Postgres at large retention volumes prefer PurgeLogsBatched.
func (*Repository) PurgeLogsBatched ¶
func (r *Repository) PurgeLogsBatched(ctx context.Context, olderThan time.Time, batchSize int, sleep time.Duration) (int64, error)
PurgeLogsBatched deletes logs in bounded chunks to avoid long locks and bloat on Postgres/MySQL. On SQLite it falls through to a single-statement delete.
Tenant scope: this is a SYSTEM-WIDE retention operation and intentionally does NOT filter by tenant. All rows older than olderThan are purged across every tenant. Never expose this on a tenant-scoped API surface.
func (*Repository) PurgeMetricBucketsBatched ¶
func (r *Repository) PurgeMetricBucketsBatched(ctx context.Context, olderThan time.Time, batchSize int, sleep time.Duration) (int64, error)
PurgeMetricBucketsBatched deletes metric buckets older than the given timestamp in bounded chunks.
Tenant scope: this is a SYSTEM-WIDE retention operation and intentionally does NOT filter by tenant. Rows are deleted across every tenant. Never expose this on a tenant-scoped API surface.
func (*Repository) PurgeTraces ¶
func (r *Repository) PurgeTraces(olderThan time.Time) (int64, error)
PurgeTraces deletes traces older than the given timestamp in a single statement. Uses Unscoped() for a hard DELETE (Trace has a soft-delete column that would otherwise leave rows present and block storage reclamation).
func (*Repository) PurgeTracesBatched ¶
func (r *Repository) PurgeTracesBatched(ctx context.Context, olderThan time.Time, batchSize int, sleep time.Duration) (int64, error)
PurgeTracesBatched deletes traces (and their spans) in bounded chunks. On SQLite it falls through to a single-statement delete.
Tenant scope: this is a SYSTEM-WIDE retention operation and intentionally does NOT filter by tenant. Rows are deleted across every tenant. Never expose this on a tenant-scoped API surface.
func (*Repository) RecentLogs ¶
RecentLogs returns the most recent logs scoped to the tenant carried on ctx.
func (*Repository) RecentTraces ¶
RecentTraces returns the most recent traces scoped to the tenant carried on ctx.
func (*Repository) SearchLogs ¶
SearchLogs searches for logs based on query, scoped to the tenant carried on ctx.
On SQLite the search routes through the FTS5 virtual table (`logs_fts`) and orders by BM25 score for relevance. On Postgres / MySQL / others it falls back to LIKE/ILIKE against logs.body and logs.service_name (Postgres uses the pg_trgm GIN indexes built in AutoMigrateModels).
func (*Repository) UpdateLogInsight ¶
UpdateLogInsight updates the AI insight for a specific log. The update is scoped to the tenant derived from ctx — a caller that attempts to update a log belonging to another tenant gets ErrLogNotFoundOrWrongTenant (IDOR fix).
func (*Repository) VacuumDB ¶
func (r *Repository) VacuumDB() error
VacuumDB runs VACUUM on the database (SQLite only, no-op for others).
type RetentionScheduler ¶
type RetentionScheduler struct {
// contains filtered or unexported fields
}
RetentionScheduler periodically enforces hot-DB retention and runs DB maintenance. On startup and hourly thereafter it deletes rows older than retentionDays. Daily it runs driver-appropriate maintenance (VACUUM ANALYZE / OPTIMIZE / VACUUM).
func NewRetentionScheduler ¶
func NewRetentionScheduler(repo *Repository, retentionDays, batchSize int, batchSleep time.Duration) *RetentionScheduler
NewRetentionScheduler constructs a scheduler but does not start it. batchSize <= 0 defaults to 10_000; batchSleep < 0 defaults to 5ms.
func (*RetentionScheduler) SkippedRuns ¶
func (r *RetentionScheduler) SkippedRuns() int64
SkippedRuns returns the number of purge/maintenance ticks that were dropped because a previous run was still executing. Intended for tests and telemetry.
func (*RetentionScheduler) Start ¶
func (r *RetentionScheduler) Start(parent context.Context)
Start launches the scheduler goroutine. It runs an initial purge immediately. Idempotent and race-free: atomic CAS elects the first caller, and mu publishes cancel+done before any concurrent Stop can observe started=true.
func (*RetentionScheduler) Stop ¶
func (r *RetentionScheduler) Stop()
Stop signals the scheduler to exit and waits for the loop to return. No-op if Start was never called. Safe to call concurrently / repeatedly.
type ServiceError ¶
type ServiceError struct {
ServiceName string `json:"service_name"`
ErrorCount int64 `json:"error_count"`
TotalCount int64 `json:"total_count"`
ErrorRate float64 `json:"error_rate"`
}
ServiceError represents error counts per service.
type ServiceMapEdge ¶
type ServiceMapEdge struct {
Source string `json:"source"`
Target string `json:"target"`
CallCount int64 `json:"call_count"`
AvgLatencyMs float64 `json:"avg_latency_ms"`
ErrorRate float64 `json:"error_rate"`
}
ServiceMapEdge represents a connection between two services.
type ServiceMapMetrics ¶
type ServiceMapMetrics struct {
Nodes []ServiceMapNode `json:"nodes"`
Edges []ServiceMapEdge `json:"edges"`
}
ServiceMapMetrics holds the complete service topology with metrics.
type ServiceMapNode ¶
type ServiceMapNode struct {
Name string `json:"name"`
TotalTraces int64 `json:"total_traces"`
ErrorCount int64 `json:"error_count"`
AvgLatencyMs float64 `json:"avg_latency_ms"`
}
ServiceMapNode represents a single service node on the service map.
type Span ¶
type Span struct {
ID uint `gorm:"primaryKey" json:"id"`
TenantID string `` /* 197-byte string literal not displayed */
TraceID string `` /* 130-byte string literal not displayed */
SpanID string `gorm:"size:16;not null;uniqueIndex:idx_spans_tenant_trace_span,priority:3" json:"span_id"`
ParentSpanID string `gorm:"size:16" json:"parent_span_id"`
OperationName string `gorm:"size:255;index" json:"operation_name"`
StartTime time.Time `gorm:"index:idx_spans_tenant_service_start,priority:3" json:"start_time"`
EndTime time.Time `json:"end_time"`
Duration int64 `json:"duration"` // Microseconds
ServiceName string `gorm:"size:255;index:idx_spans_tenant_service_start,priority:2" json:"service_name"` // Originating service
Status string `gorm:"size:50;default:'STATUS_CODE_UNSET';index" json:"status"` // OTLP status code (e.g. STATUS_CODE_ERROR); drives GraphRAG error signal
AttributesJSON CompressedText `json:"attributes_json"` // Compressed JSON string
}
Span represents a single operation within a trace.
Idempotency: the composite uniqueIndex idx_spans_tenant_trace_span on (tenant_id, trace_id, span_id) ensures a span is written at most once per tenant. DLQ replay (or any duplicate ingest) collapses cleanly via OnConflict.DoNothing in BatchCreateAll/BatchCreateSpans rather than double-counting in downstream metrics or GraphRAG. The composite covers the legacy idx_spans_tenant_trace as a left-prefix; the legacy index is retained for query-plan stability across upgrades.
type SpanGraphRow ¶
type SpanGraphRow struct {
SpanID string
ParentSpanID string
ServiceName string
OperationName string
DurationMs float64
IsError bool
Timestamp time.Time
}
SpanGraphRow is the minimal projection needed by the in-memory service graph.
type Trace ¶
type Trace struct {
ID uint `gorm:"primaryKey" json:"id"`
TenantID string `` /* 189-byte string literal not displayed */
TraceID string `gorm:"size:32;not null;uniqueIndex:idx_traces_tenant_trace_id,priority:2" json:"trace_id"`
ServiceName string `gorm:"size:255;index:idx_traces_tenant_service,priority:2" json:"service_name"`
Duration int64 `gorm:"index" json:"duration"` // Microseconds
DurationMs float64 `gorm:"-" json:"duration_ms"`
SpanCount int `gorm:"-" json:"span_count"`
Operation string `gorm:"-" json:"operation"`
Status string `gorm:"size:50" json:"status"`
// Timestamp is both part of idx_traces_tenant_ts (composite) and retains a
// standalone index so range scans on traces across all tenants (e.g.
// retention sweeps) still use an index.
Timestamp time.Time `gorm:"index;index:idx_traces_tenant_ts,priority:2" json:"timestamp"`
Spans []Span `gorm:"foreignKey:TraceID;references:TraceID;constraint:false" json:"spans,omitempty"`
Logs []Log `gorm:"foreignKey:TraceID;references:TraceID;constraint:false" json:"logs,omitempty"`
CreatedAt time.Time `json:"-"`
UpdatedAt time.Time `json:"-"`
DeletedAt gorm.DeletedAt `gorm:"index" json:"-"`
}
Trace represents a complete distributed trace.
Index strategy: single-column tenant_id is redundant — every tenant-scoped query joins tenant_id with another filter (timestamp, service_name). The leftmost column of a composite index satisfies single-column tenant lookups, so we only declare composites. TraceID uniqueness is scoped to (tenant_id, trace_id): distinct tenants may legitimately ingest identical trace_ids (RAN-21). The old standalone `uniqueIndex` on trace_id is dropped at migration time by dropLegacyTraceIDUniqueIndex.