watcher

package
v0.19.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2026 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package watcher provides the alerting engine.

Watch conditions use a JSON tree structure that the AI agent can generate to express complex alert rules. Each leaf node is a typed condition (threshold, relative, delta, count), and nodes can be combined with all (AND), any (OR), and not operators.

Examples:

Simple threshold:
  {"type":"threshold","metric":"error_rate","op":"gt","value":0.05,"service":"api"}

Compound AND:
  {"all":[
    {"type":"threshold","metric":"error_rate","op":"gt","value":0.05,"service":"api"},
    {"type":"threshold","metric":"response_time","op":"gt","value":500,"service":"api"}
  ]}

Relative to baseline:
  {"type":"relative","metric":"error_rate","op":"gt","baseline_multiple":2.0,"service":"api"}

Rate of change:
  {"type":"delta","metric":"error_rate","compare_window":"1h","op":"gt","change_pct":50}

Count distinct:
  {"type":"count","query":"level:error","field":"error_fingerprint","distinct":true,"op":"gt","value":10,"window":"1h"}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CaptureBaseline

func CaptureBaseline(ctx context.Context, logStore store.LogStore, metrics *WatchMetrics, w *store.Watch) (*store.WatchBaseline, error)

CaptureBaseline takes a snapshot of current metrics for a watch.

func NotifyAllWatchAlert

func NotifyAllWatchAlert(ctx context.Context, notifiers []WatchAlertNotifier, alert *store.WatchAlert, watch *store.Watch)

NotifyAllWatchAlert sends a watch alert to all notifiers concurrently. Errors are logged but do not block.

Types

type Condition

type Condition struct {
	// Combinators — at most one should be set.
	All []*Condition `json:"all,omitempty"`
	Any []*Condition `json:"any,omitempty"`
	Not *Condition   `json:"not,omitempty"`

	// Leaf condition fields.
	Type string `json:"type,omitempty"` // "threshold", "relative", "delta", "count"

	// Common fields (used by threshold, relative, delta).
	Metric   store.WatchMetric   `json:"metric,omitempty"`
	Op       store.WatchOperator `json:"op,omitempty"`
	Service  string              `json:"service,omitempty"`
	Endpoint string              `json:"endpoint,omitempty"`

	// threshold: metric <op> value
	Value float64 `json:"value,omitempty"`

	// relative: metric <op> baseline * baseline_multiple
	BaselineMultiple float64 `json:"baseline_multiple,omitempty"`

	// delta: metric changed by <op> change_pct% vs compare_window ago
	ChangePct     float64 `json:"change_pct,omitempty"`
	CompareWindow string  `json:"compare_window,omitempty"` // e.g. "1h", "30m"

	// count: count (distinct) <field> where <query> <op> value
	Field    string `json:"field,omitempty"`    // e.g. "error_fingerprint", "user_id"
	Distinct bool   `json:"distinct,omitempty"` // COUNT DISTINCT vs COUNT
	Query    string `json:"query,omitempty"`    // log search query filter (e.g. "level:error")
	Window   string `json:"window,omitempty"`   // time window for count
}

Condition is the JSON-serializable tree node for watch conditions. It is either a leaf (Type is set) or a combinator (All/Any/Not is set).

func ParseCondition

func ParseCondition(raw json.RawMessage) (*Condition, error)

ParseCondition parses a JSON condition tree.

type ConditionResult

type ConditionResult struct {
	Breached bool    `json:"breached"`
	Summary  string  `json:"summary"`
	Value    float64 `json:"value,omitempty"`
}

ConditionResult holds the evaluation result of a single condition node.

func EvaluateCondition

func EvaluateCondition(ctx context.Context, c *Condition, metrics *WatchMetrics, baseline *store.WatchBaseline, environment string, checkWindow time.Duration) (*ConditionResult, error)

EvaluateCondition recursively evaluates a condition tree. environment scopes every underlying metric query; callers should pass the owning watch's Environment so metrics are computed against that env's traffic only.

type WatchAlertNotifier

type WatchAlertNotifier interface {
	// NotifyWatchAlert sends a notification for a watch alert.
	// Implementations must be safe for concurrent use.
	NotifyWatchAlert(ctx context.Context, alert *store.WatchAlert, watch *store.Watch) error
}

WatchAlertNotifier sends notifications for watch alerts.

type WatchEvalResult

type WatchEvalResult struct {
	Value    float64
	Breached bool
	HasAlert bool
	Summary  string
}

WatchEvalResult holds the result of evaluating a single watch.

type WatchEvaluator

type WatchEvaluator struct {
	// contains filtered or unexported fields
}

WatchEvaluator evaluates watches against their metric thresholds.

func NewWatchEvaluator

func NewWatchEvaluator(metrics *WatchMetrics, watchStore store.WatchStore) *WatchEvaluator

NewWatchEvaluator creates a new WatchEvaluator.

func (*WatchEvaluator) Evaluate

func (e *WatchEvaluator) Evaluate(ctx context.Context, w *store.Watch) (*WatchEvalResult, error)

Evaluate measures the watch's conditions and determines if an alert should fire. If the watch has a JSON conditions tree, it uses the tree evaluator. Otherwise, it falls back to the flat metric/operator/threshold fields.

type WatchEvidenceBuilder

type WatchEvidenceBuilder struct {
	// contains filtered or unexported fields
}

WatchEvidenceBuilder collects evidence when a watch alert fires.

func NewWatchEvidenceBuilder

func NewWatchEvidenceBuilder(logStore store.LogStore, metrics *WatchMetrics) *WatchEvidenceBuilder

NewWatchEvidenceBuilder creates a new evidence builder.

func (*WatchEvidenceBuilder) Build

Build collects all evidence for a triggered watch.

type WatchLogNotifier

type WatchLogNotifier struct{}

WatchLogNotifier logs watch alerts to slog (always-on fallback).

func (*WatchLogNotifier) NotifyWatchAlert

func (n *WatchLogNotifier) NotifyWatchAlert(_ context.Context, alert *store.WatchAlert, watch *store.Watch) error

type WatchMetrics

type WatchMetrics struct {
	// contains filtered or unexported fields
}

WatchMetrics computes metric values from existing LogStore methods.

func NewWatchMetrics

func NewWatchMetrics(logStore store.LogStore) *WatchMetrics

NewWatchMetrics creates a new WatchMetrics helper.

func (*WatchMetrics) Measure

func (m *WatchMetrics) Measure(ctx context.Context, metric store.WatchMetric, service, endpoint, environment string, window time.Duration) (float64, error)

Measure computes the given metric for the specified service/endpoint/env over the time window. environment scopes log-store queries to a single env — pass "" to match every env (used only in tests and legacy paths).

type WatchScheduler

type WatchScheduler struct {
	// contains filtered or unexported fields
}

WatchScheduler polls for due watches and evaluates them.

func NewWatchScheduler

func NewWatchScheduler(opts WatchSchedulerOpts) *WatchScheduler

NewWatchScheduler creates a new watch scheduler.

func (*WatchScheduler) Start

func (s *WatchScheduler) Start(ctx context.Context)

Start begins the background polling loop. Returns immediately.

func (*WatchScheduler) Stop

func (s *WatchScheduler) Stop()

Stop gracefully shuts down the scheduler.

type WatchSchedulerOpts

type WatchSchedulerOpts struct {
	WatchStore      store.WatchStore
	LogStore        store.LogStore
	Evaluator       *WatchEvaluator
	EvidenceBuilder *WatchEvidenceBuilder
	SessionManager  *WatchSessionManager
	Notifiers       []WatchAlertNotifier
	PollInterval    time.Duration
}

WatchSchedulerOpts configures the watch scheduler.

type WatchSessionManager

type WatchSessionManager struct {
	// contains filtered or unexported fields
}

WatchSessionManager handles auto-resolve and cleanup of watches.

func NewWatchSessionManager

func NewWatchSessionManager(watchStore store.WatchStore, metrics *WatchMetrics) *WatchSessionManager

NewWatchSessionManager creates a new session manager.

func (*WatchSessionManager) CheckAutoResolve

func (m *WatchSessionManager) CheckAutoResolve(ctx context.Context, w *store.Watch) error

CheckAutoResolve checks if a triggered watch should auto-resolve because the metric has returned to within 10% of the baseline value.

func (*WatchSessionManager) Cleanup

func (m *WatchSessionManager) Cleanup(ctx context.Context)

Cleanup expires watches past their expiry and runs any housekeeping.

type WatchStreamEvaluator

type WatchStreamEvaluator struct {
	// contains filtered or unexported fields
}

WatchStreamEvaluator reactively checks watches when new logs arrive. It coordinates with the WatchScheduler through the DB — both paths use last_checked_at as the single source of truth to prevent duplicate evaluations.

func NewWatchStreamEvaluator

func NewWatchStreamEvaluator(
	ctx context.Context,
	watchStore store.WatchStore,
	evaluator *WatchEvaluator,
	evidenceBuilder *WatchEvidenceBuilder,
	notifiers []WatchAlertNotifier,
) *WatchStreamEvaluator

NewWatchStreamEvaluator creates a reactive stream evaluator.

func (*WatchStreamEvaluator) OnLogsReceived

func (s *WatchStreamEvaluator) OnLogsReceived(entries []store.LogEntry)

OnLogsReceived is called after new logs are ingested. It checks if any active watch matches the incoming data and triggers async evaluation. This method is non-blocking.

type WatchWebhookNotifier

type WatchWebhookNotifier struct {
	URL    string
	Client *http.Client
}

WatchWebhookNotifier sends watch alerts to a webhook URL.

func NewWatchWebhookNotifier

func NewWatchWebhookNotifier(url string) *WatchWebhookNotifier

NewWatchWebhookNotifier creates a new webhook notifier for watch alerts.

func (*WatchWebhookNotifier) NotifyWatchAlert

func (n *WatchWebhookNotifier) NotifyWatchAlert(ctx context.Context, alert *store.WatchAlert, watch *store.Watch) error

type WatchWebhookPayload

type WatchWebhookPayload struct {
	AlertID        string  `json:"alert_id"`
	WatchID        string  `json:"watch_id"`
	Metric         string  `json:"metric"`
	Urgency        string  `json:"urgency"`
	Summary        string  `json:"summary"`
	TriggerValue   float64 `json:"trigger_value"`
	ThresholdValue float64 `json:"threshold_value"`
	Service        string  `json:"service,omitempty"`
	Environment    string  `json:"environment,omitempty"`
	Timestamp      string  `json:"timestamp"`
}

WatchWebhookPayload is the JSON body sent for watch alerts.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL