observability

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2026 License: MIT Imports: 11 Imported by: 0

README

observability — SQLite-native monitoring

observability replaces Prometheus, Grafana, and ELK with SQLite tables. All writers are async and non-blocking to avoid impacting application latency.

Components

Component Table Pattern
AuditLogger audit_log Buffered channel + batch flush
MetricsManager metrics_timeseries Buffered batch (100 entries / 5 s)
EventLogger business_event_logs Synchronous single-insert
HeartbeatWriter worker_heartbeats Periodic ticker with runtime metrics

Quick start

observability.Init(db)

audit := observability.NewAuditLogger(db, 1000,
    observability.WithAuditIDGenerator(idgen.Prefixed("aud_", idgen.Default)),
)
defer audit.Close()

metrics := observability.NewMetricsManager(db, 100, 5*time.Second)
defer metrics.Close()

hb := observability.NewHeartbeatWriter(db, "worker-1", 15*time.Second)
hb.Start(ctx)
defer hb.Stop()

Heartbeat with runtime metrics

Each heartbeat row captures Go runtime stats alongside liveness:

// Automatic: goroutine count, memory alloc/sys, GC count.
status, _ := observability.LatestHeartbeat(ctx, db, "worker-1", 30*time.Second)
// status.Alive, status.StaleSince

Retention cleanup

audit.Cleanup(ctx, 90)   // delete entries older than 90 days
metrics.Cleanup(ctx, 30)  // 30 days
observability.CleanupHeartbeats(ctx, db, 7)

Schema

8 tables: worker_heartbeats, metrics_timeseries, metrics_metadata, audit_log, business_event_logs, system_alerts, http_request_logs, _observability_metadata.

Exported API

Symbol Description
Init(db) Create all tables
AuditLogger Async audit trail (buffered channel)
MetricsManager Buffered time-series writer
EventLogger Business event logger
HeartbeatWriter Periodic liveness + runtime metrics
LatestHeartbeat(ctx, db, name, threshold) Query worker health

Documentation

Overview

Package observability provides SQLite-native monitoring components that replace Prometheus, Loki, Consul health checks and Elasticsearch audit.

Each component writes to a shared observability database (separate from the application database to avoid write contention). Call Init() on the shared *sql.DB first, then pass it to the individual constructors.

All persistence is async and non-blocking: buffer overflow silently drops datapoints rather than applying backpressure to the application.

Index

Constants

View Source
const (
	MetricCPUUsagePercent    = "cpu_usage_percent"
	MetricMemoryUsedBytes    = "memory_used_bytes"
	MetricMemoryAllocMB      = "memory_alloc_mb"
	MetricGoroutinesCount    = "goroutines_count"
	MetricGCCount            = "gc_count"
	MetricWorkflowDurationMs = "workflow_duration_ms"
	MetricTaskProcessedCount = "task_processed_count"
)

Standard metric name constants.

View Source
const Schema = `` /* 4445-byte string literal not displayed */

Schema contains the complete DDL for the observability tables. Call Init(db) to apply it, or use this constant to embed in your own schema management.

Variables

This section is empty.

Functions

func Cleanup

func Cleanup(ctx context.Context, db *sql.DB, cfg RetentionConfig) error

Cleanup deletes records exceeding the retention thresholds.

func CleanupHeartbeats

func CleanupHeartbeats(ctx context.Context, db *sql.DB, retentionDays int) (int64, error)

CleanupHeartbeats deletes heartbeats older than retentionDays.

func Init

func Init(db *sql.DB) error

Init applies the observability schema to the given database.

Types

type AuditEntry

type AuditEntry struct {
	EntryID       string
	Timestamp     time.Time
	ComponentName string // e.g. "orchestrator", "chunker", "embedder"
	OperationType string // e.g. "workflow_start", "task_dispatch"

	UserID    string
	SessionID string
	RequestID string

	Parameters   string // JSON
	Result       string // JSON
	ErrorCode    string
	ErrorMessage string
	DurationMs   int64

	Status   string // "success", "error", "timeout", "cancelled"
	Metadata string // free-form JSON
}

AuditEntry is a single operation record in the audit trail.

type AuditFilter

type AuditFilter struct {
	StartTime     *time.Time
	EndTime       *time.Time
	ComponentName *string
	OperationType *string
	Status        *string
	Limit         int // default 100
	Offset        int
	OrderBy       string // "timestamp" or "duration_ms"
	OrderDir      string // "ASC" or "DESC"
}

AuditFilter controls query results from the audit log.

type AuditLogger

type AuditLogger struct {
	// contains filtered or unexported fields
}

AuditLogger persists operation-level audit entries asynchronously.

func NewAuditLogger

func NewAuditLogger(db *sql.DB, bufferSize int, opts ...AuditOption) *AuditLogger

NewAuditLogger creates an async audit logger. Recommended bufferSize: 1000.

func (*AuditLogger) Cleanup

func (a *AuditLogger) Cleanup(ctx context.Context, retentionDays int) (int64, error)

Cleanup deletes audit entries older than retentionDays.

func (*AuditLogger) Close

func (a *AuditLogger) Close() error

Close drains the buffer and stops the flush goroutine.

func (*AuditLogger) Log

func (a *AuditLogger) Log(ctx context.Context, entry *AuditEntry) error

Log inserts an audit entry synchronously.

func (*AuditLogger) LogAsync

func (a *AuditLogger) LogAsync(entry *AuditEntry)

LogAsync queues an entry for async persistence. Falls back to synchronous insert if the buffer is full.

func (*AuditLogger) NewAuditEntry

func (a *AuditLogger) NewAuditEntry(component, operation string, params interface{}, result interface{}, err error, duration time.Duration) *AuditEntry

NewAuditEntry is a convenience factory that builds an AuditEntry from operation parameters, result and error. Params and result are marshalled to JSON.

func (*AuditLogger) Query

func (a *AuditLogger) Query(ctx context.Context, f *AuditFilter) ([]*AuditEntry, error)

Query retrieves audit entries matching the given filter.

type AuditOption

type AuditOption func(*AuditLogger)

AuditOption configures an AuditLogger.

func WithAuditIDGenerator

func WithAuditIDGenerator(gen idgen.Generator) AuditOption

WithAuditIDGenerator sets a custom ID generator for audit entry IDs.

type BusinessEvent

type BusinessEvent struct {
	EventType   string
	ServiceName string
	EntityType  string
	EntityID    string
	UserID      string
	Action      string
	Details     string // optional JSON
	Success     bool
}

BusinessEvent represents a domain-level event to record.

type EventLogger

type EventLogger struct {
	// contains filtered or unexported fields
}

EventLogger writes business events and manages retention cleanup.

func NewEventLogger

func NewEventLogger(db *sql.DB, opts ...EventLoggerOption) *EventLogger

NewEventLogger creates a logger backed by the given observability database.

func (*EventLogger) LogEvent

func (l *EventLogger) LogEvent(ctx context.Context, event BusinessEvent)

LogEvent records a business event. Non-blocking: errors are logged via slog but do not propagate, so a failing observability store never blocks the app.

func (*EventLogger) LogHeartbeat

func (l *EventLogger) LogHeartbeat(ctx context.Context, workerName string, workerPID int, machineName string)

LogHeartbeat records a lightweight heartbeat row (for services that prefer the simpler Logger interface instead of HeartbeatWriter).

type EventLoggerOption

type EventLoggerOption func(*EventLogger)

EventLoggerOption configures an EventLogger.

func WithEventIDGenerator

func WithEventIDGenerator(gen idgen.Generator) EventLoggerOption

WithEventIDGenerator sets a custom ID generator for event IDs.

type HeartbeatStatus

type HeartbeatStatus struct {
	WorkerName      string         `json:"worker_name"`
	Hostname        string         `json:"hostname"`
	PID             int            `json:"pid"`
	Timestamp       time.Time      `json:"timestamp"`
	GoroutinesCount int            `json:"goroutines_count"`
	MemoryAllocMB   float64        `json:"memory_alloc_mb"`
	MemorySysMB     float64        `json:"memory_sys_mb"`
	GCCount         int            `json:"gc_count"`
	Alive           bool           `json:"alive"`                 // true if last beat is within staleness threshold
	StaleSince      *time.Duration `json:"stale_since,omitempty"` // how long past the threshold
}

HeartbeatStatus is the latest heartbeat for a worker, enriched with a staleness check so callers don't have to compute it themselves.

func LatestHeartbeat

func LatestHeartbeat(ctx context.Context, db *sql.DB, workerName string, stalenessThreshold time.Duration) (*HeartbeatStatus, error)

LatestHeartbeat returns the most recent heartbeat for the given worker. stalenessThreshold controls the alive/stale boundary (typically 3× the heartbeat interval). Returns nil, nil if no heartbeat has been recorded yet.

type HeartbeatWriter

type HeartbeatWriter struct {
	// contains filtered or unexported fields
}

HeartbeatWriter writes periodic liveness probes to the worker_heartbeats table.

func NewHeartbeatWriter

func NewHeartbeatWriter(db *sql.DB, workerName string, interval time.Duration) *HeartbeatWriter

NewHeartbeatWriter creates a writer. Recommended interval: 15s.

func (*HeartbeatWriter) Start

func (hw *HeartbeatWriter) Start(ctx context.Context)

Start launches the heartbeat goroutine. It writes one heartbeat immediately, then repeats at the configured interval until Stop or context cancellation.

func (*HeartbeatWriter) Stop

func (hw *HeartbeatWriter) Stop()

Stop signals the heartbeat goroutine to exit and waits for it.

func (*HeartbeatWriter) WriteHeartbeat

func (hw *HeartbeatWriter) WriteHeartbeat() error

WriteHeartbeat writes a single heartbeat row with current runtime metrics.

type Metric

type Metric struct {
	Name      string // e.g. "cpu_usage_percent", "workflow_duration_ms"
	Timestamp time.Time
	Value     float64
	Labels    map[string]string // optional key/value pairs
	Unit      string            // "percent", "bytes", "milliseconds", "count"
}

Metric is a single timeseries datapoint.

type MetricsManager

type MetricsManager struct {
	// contains filtered or unexported fields
}

MetricsManager buffers metrics and flushes them to SQLite in batches.

func NewMetricsManager

func NewMetricsManager(db *sql.DB, bufferSize int, flushInterval time.Duration) *MetricsManager

NewMetricsManager creates a manager that flushes metrics in batches. Recommended defaults: bufferSize=100, flushInterval=5s.

func (*MetricsManager) Cleanup

func (mm *MetricsManager) Cleanup(ctx context.Context, retentionDays int) (int64, error)

Cleanup deletes metrics older than retentionDays and returns the count removed.

func (*MetricsManager) Close

func (mm *MetricsManager) Close() error

Close flushes remaining metrics and stops the background goroutine.

func (*MetricsManager) Query

func (mm *MetricsManager) Query(metricName string, startTime, endTime *time.Time, limit int) ([]*Metric, error)

Query retrieves metrics filtered by name, time range and limit. Pass empty metricName for all metrics. Nil time pointers mean unbounded.

func (*MetricsManager) Record

func (mm *MetricsManager) Record(m *Metric)

Record queues a metric for async persistence. Non-blocking.

func (*MetricsManager) RecordSimple

func (mm *MetricsManager) RecordSimple(name string, value float64, unit string)

RecordSimple is a convenience helper for metrics without labels.

type RetentionConfig

type RetentionConfig struct {
	HTTPLogsDays   int
	EventLogsDays  int
	HeartbeatsDays int
	RunVacuumAfter bool
}

RetentionConfig specifies per-table retention in days. Zero means no cleanup.

type RuntimeMetrics

type RuntimeMetrics struct {
	GoroutinesCount int
	MemoryAllocMB   float64
	MemorySysMB     float64
	GCCount         uint32
}

RuntimeMetrics captures Go process health at a point in time.

func CollectRuntimeMetrics

func CollectRuntimeMetrics() RuntimeMetrics

CollectRuntimeMetrics reads current Go runtime stats (~10µs overhead).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL