Documentation
¶
Overview ¶
Package observability provides SQLite-native monitoring components that replace Prometheus, Loki, Consul health checks and Elasticsearch audit.
Each component writes to a shared observability database (separate from the application database to avoid write contention). Call Init() on the shared *sql.DB first, then pass it to the individual constructors.
All persistence is async and non-blocking: buffer overflow silently drops datapoints rather than applying backpressure to the application.
Index ¶
- Constants
- func Cleanup(ctx context.Context, db *sql.DB, cfg RetentionConfig) error
- func CleanupHeartbeats(ctx context.Context, db *sql.DB, retentionDays int) (int64, error)
- func Init(db *sql.DB) error
- type AuditEntry
- type AuditFilter
- type AuditLogger
- func (a *AuditLogger) Cleanup(ctx context.Context, retentionDays int) (int64, error)
- func (a *AuditLogger) Close() error
- func (a *AuditLogger) Log(ctx context.Context, entry *AuditEntry) error
- func (a *AuditLogger) LogAsync(entry *AuditEntry)
- func (a *AuditLogger) NewAuditEntry(component, operation string, params interface{}, result interface{}, err error, ...) *AuditEntry
- func (a *AuditLogger) Query(ctx context.Context, f *AuditFilter) ([]*AuditEntry, error)
- type AuditOption
- type BusinessEvent
- type EventLogger
- type EventLoggerOption
- type HeartbeatStatus
- type HeartbeatWriter
- type Metric
- type MetricsManager
- func (mm *MetricsManager) Cleanup(ctx context.Context, retentionDays int) (int64, error)
- func (mm *MetricsManager) Close() error
- func (mm *MetricsManager) Query(metricName string, startTime, endTime *time.Time, limit int) ([]*Metric, error)
- func (mm *MetricsManager) Record(m *Metric)
- func (mm *MetricsManager) RecordSimple(name string, value float64, unit string)
- type RetentionConfig
- type RuntimeMetrics
Constants ¶
const ( MetricCPUUsagePercent = "cpu_usage_percent" MetricMemoryUsedBytes = "memory_used_bytes" MetricMemoryAllocMB = "memory_alloc_mb" MetricGoroutinesCount = "goroutines_count" MetricGCCount = "gc_count" MetricWorkflowDurationMs = "workflow_duration_ms" MetricTaskProcessedCount = "task_processed_count" )
Standard metric name constants.
const Schema = `` /* 4445-byte string literal not displayed */
Schema contains the complete DDL for the observability tables. Call Init(db) to apply it, or use this constant to embed in your own schema management.
Variables ¶
This section is empty.
Functions ¶
func CleanupHeartbeats ¶
CleanupHeartbeats deletes heartbeats older than retentionDays.
Types ¶
type AuditEntry ¶
type AuditEntry struct {
EntryID string
Timestamp time.Time
ComponentName string // e.g. "orchestrator", "chunker", "embedder"
OperationType string // e.g. "workflow_start", "task_dispatch"
UserID string
SessionID string
RequestID string
Parameters string // JSON
Result string // JSON
ErrorCode string
ErrorMessage string
DurationMs int64
Status string // "success", "error", "timeout", "cancelled"
Metadata string // free-form JSON
}
AuditEntry is a single operation record in the audit trail.
type AuditFilter ¶
type AuditFilter struct {
StartTime *time.Time
EndTime *time.Time
ComponentName *string
OperationType *string
Status *string
Limit int // default 100
Offset int
OrderBy string // "timestamp" or "duration_ms"
OrderDir string // "ASC" or "DESC"
}
AuditFilter controls query results from the audit log.
type AuditLogger ¶
type AuditLogger struct {
// contains filtered or unexported fields
}
AuditLogger persists operation-level audit entries asynchronously.
func NewAuditLogger ¶
func NewAuditLogger(db *sql.DB, bufferSize int, opts ...AuditOption) *AuditLogger
NewAuditLogger creates an async audit logger. Recommended bufferSize: 1000.
func (*AuditLogger) Close ¶
func (a *AuditLogger) Close() error
Close drains the buffer and stops the flush goroutine.
func (*AuditLogger) Log ¶
func (a *AuditLogger) Log(ctx context.Context, entry *AuditEntry) error
Log inserts an audit entry synchronously.
func (*AuditLogger) LogAsync ¶
func (a *AuditLogger) LogAsync(entry *AuditEntry)
LogAsync queues an entry for async persistence. Falls back to synchronous insert if the buffer is full.
func (*AuditLogger) NewAuditEntry ¶
func (a *AuditLogger) NewAuditEntry(component, operation string, params interface{}, result interface{}, err error, duration time.Duration) *AuditEntry
NewAuditEntry is a convenience factory that builds an AuditEntry from operation parameters, result and error. Params and result are marshalled to JSON.
func (*AuditLogger) Query ¶
func (a *AuditLogger) Query(ctx context.Context, f *AuditFilter) ([]*AuditEntry, error)
Query retrieves audit entries matching the given filter.
type AuditOption ¶
type AuditOption func(*AuditLogger)
AuditOption configures an AuditLogger.
func WithAuditIDGenerator ¶
func WithAuditIDGenerator(gen idgen.Generator) AuditOption
WithAuditIDGenerator sets a custom ID generator for audit entry IDs.
type BusinessEvent ¶
type BusinessEvent struct {
EventType string
ServiceName string
EntityType string
EntityID string
UserID string
Action string
Details string // optional JSON
Success bool
}
BusinessEvent represents a domain-level event to record.
type EventLogger ¶
type EventLogger struct {
// contains filtered or unexported fields
}
EventLogger writes business events and manages retention cleanup.
func NewEventLogger ¶
func NewEventLogger(db *sql.DB, opts ...EventLoggerOption) *EventLogger
NewEventLogger creates a logger backed by the given observability database.
func (*EventLogger) LogEvent ¶
func (l *EventLogger) LogEvent(ctx context.Context, event BusinessEvent)
LogEvent records a business event. Non-blocking: errors are logged via slog but do not propagate, so a failing observability store never blocks the app.
func (*EventLogger) LogHeartbeat ¶
func (l *EventLogger) LogHeartbeat(ctx context.Context, workerName string, workerPID int, machineName string)
LogHeartbeat records a lightweight heartbeat row (for services that prefer the simpler Logger interface instead of HeartbeatWriter).
type EventLoggerOption ¶
type EventLoggerOption func(*EventLogger)
EventLoggerOption configures an EventLogger.
func WithEventIDGenerator ¶
func WithEventIDGenerator(gen idgen.Generator) EventLoggerOption
WithEventIDGenerator sets a custom ID generator for event IDs.
type HeartbeatStatus ¶
type HeartbeatStatus struct {
WorkerName string `json:"worker_name"`
Hostname string `json:"hostname"`
PID int `json:"pid"`
Timestamp time.Time `json:"timestamp"`
GoroutinesCount int `json:"goroutines_count"`
MemoryAllocMB float64 `json:"memory_alloc_mb"`
MemorySysMB float64 `json:"memory_sys_mb"`
GCCount int `json:"gc_count"`
Alive bool `json:"alive"` // true if last beat is within staleness threshold
StaleSince *time.Duration `json:"stale_since,omitempty"` // how long past the threshold
}
HeartbeatStatus is the latest heartbeat for a worker, enriched with a staleness check so callers don't have to compute it themselves.
func LatestHeartbeat ¶
func LatestHeartbeat(ctx context.Context, db *sql.DB, workerName string, stalenessThreshold time.Duration) (*HeartbeatStatus, error)
LatestHeartbeat returns the most recent heartbeat for the given worker. stalenessThreshold controls the alive/stale boundary (typically 3× the heartbeat interval). Returns nil, nil if no heartbeat has been recorded yet.
type HeartbeatWriter ¶
type HeartbeatWriter struct {
// contains filtered or unexported fields
}
HeartbeatWriter writes periodic liveness probes to the worker_heartbeats table.
func NewHeartbeatWriter ¶
NewHeartbeatWriter creates a writer. Recommended interval: 15s.
func (*HeartbeatWriter) Start ¶
func (hw *HeartbeatWriter) Start(ctx context.Context)
Start launches the heartbeat goroutine. It writes one heartbeat immediately, then repeats at the configured interval until Stop or context cancellation.
func (*HeartbeatWriter) Stop ¶
func (hw *HeartbeatWriter) Stop()
Stop signals the heartbeat goroutine to exit and waits for it.
func (*HeartbeatWriter) WriteHeartbeat ¶
func (hw *HeartbeatWriter) WriteHeartbeat() error
WriteHeartbeat writes a single heartbeat row with current runtime metrics.
type Metric ¶
type Metric struct {
Name string // e.g. "cpu_usage_percent", "workflow_duration_ms"
Timestamp time.Time
Value float64
Labels map[string]string // optional key/value pairs
Unit string // "percent", "bytes", "milliseconds", "count"
}
Metric is a single timeseries datapoint.
type MetricsManager ¶
type MetricsManager struct {
// contains filtered or unexported fields
}
MetricsManager buffers metrics and flushes them to SQLite in batches.
func NewMetricsManager ¶
NewMetricsManager creates a manager that flushes metrics in batches. Recommended defaults: bufferSize=100, flushInterval=5s.
func (*MetricsManager) Cleanup ¶
Cleanup deletes metrics older than retentionDays and returns the count removed.
func (*MetricsManager) Close ¶
func (mm *MetricsManager) Close() error
Close flushes remaining metrics and stops the background goroutine.
func (*MetricsManager) Query ¶
func (mm *MetricsManager) Query(metricName string, startTime, endTime *time.Time, limit int) ([]*Metric, error)
Query retrieves metrics filtered by name, time range and limit. Pass empty metricName for all metrics. Nil time pointers mean unbounded.
func (*MetricsManager) Record ¶
func (mm *MetricsManager) Record(m *Metric)
Record queues a metric for async persistence. Non-blocking.
func (*MetricsManager) RecordSimple ¶
func (mm *MetricsManager) RecordSimple(name string, value float64, unit string)
RecordSimple is a convenience helper for metrics without labels.
type RetentionConfig ¶
type RetentionConfig struct {
HTTPLogsDays int
EventLogsDays int
HeartbeatsDays int
RunVacuumAfter bool
}
RetentionConfig specifies per-table retention in days. Zero means no cleanup.
type RuntimeMetrics ¶
type RuntimeMetrics struct {
GoroutinesCount int
MemoryAllocMB float64
MemorySysMB float64
GCCount uint32
}
RuntimeMetrics captures Go process health at a point in time.
func CollectRuntimeMetrics ¶
func CollectRuntimeMetrics() RuntimeMetrics
CollectRuntimeMetrics reads current Go runtime stats (~10µs overhead).