telemetry

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package telemetry exposes the aggregated /parsec/metrics view that the upgrade spec calls for. It is the operator-facing snapshot of channel health, envelope rates, presence, history utilization, token issuance, and cache hit rates — pulled from every running Parsec component and rendered in one JSON response.

The package is a small aggregator, not a metrics store. Each Source is plugged in at construction; the snapshot is computed on demand. For Prometheus exposition the existing internal/metrics registry stays the source of truth — this surface is a higher-level dashboard view.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ValidateAlertRules added in v0.3.0

func ValidateAlertRules(rules []AlertRule) error

ValidateAlertRules checks the rule list for shape problems before the Aggregator starts using it. Empty names, unknown severities, nil Condition closures, and duplicate names are all rejected. The embedder calls this at boot so misconfigured deployments fail loud rather than silently dropping firings.

Types

type Aggregator

type Aggregator struct {
	Sources []Source
	// Rules, when non-empty, are evaluated against every Snapshot. The
	// list is validated by Aggregator.WithAlerts; assigning Rules
	// directly skips that check.
	Rules []AlertRule
}

Aggregator composes Source(s) into a single Snapshot.

func New

func New(sources ...Source) *Aggregator

New constructs an Aggregator over the supplied sources.

func (*Aggregator) Handler

func (a *Aggregator) Handler() http.Handler

Handler returns an http.Handler that serves the JSON snapshot at the mount path. Mount it at /parsec/metrics.

func (*Aggregator) PrometheusHandler added in v0.3.0

func (a *Aggregator) PrometheusHandler() http.Handler

PrometheusHandler returns an http.Handler that writes the Snapshot as Prometheus text exposition (version 0.0.4). The handler re-aggregates on every scrape so the values are always current.

Mount it under a path the operator's Prometheus is scraping — for example `/parsec/telemetry/metrics` (kept separate from the existing `/metrics` collector registry to avoid metric-name collisions). The embedder is responsible for the mount; this package only returns the handler.

Cardinality budget: only bounded label sets escape — `pattern` is a configured channel grammar pattern, `aspect` is the envelope kind supplied by the embedder, `alert` is the rule name, `severity` is one of info/warning/critical. Operators who plumb a high-cardinality pattern or aspect into a Source are responsible for the bytes — the handler does not truncate.

func (*Aggregator) Snapshot

func (a *Aggregator) Snapshot(ctx context.Context) Snapshot

Snapshot computes the current aggregate. Sources are queried in sequence; their stats are summed field-by-field (rates and utilization percentages are averaged across sources).

func (*Aggregator) WithAlerts added in v0.3.0

func (a *Aggregator) WithAlerts(rules []AlertRule) (*Aggregator, error)

WithAlerts validates rules and attaches them to the Aggregator. The receiver is returned so callers can chain it onto New for fluent construction. Returns an error if any rule fails ValidateAlertRules.

type AlertRule added in v0.3.0

type AlertRule struct {
	// Name is a stable identifier used in JSON output, Prometheus
	// labels, and operator runbooks. Must be unique within the rule
	// list; duplicates are rejected by ValidateAlertRules.
	Name string `json:"name"`
	// Severity classifies the firing. See Severity for the levels.
	Severity Severity `json:"severity"`
	// Description is the operator-facing one-liner explaining what the
	// rule guards against. Shown in dashboards next to the firing
	// state.
	Description string `json:"description,omitempty"`
	// Condition returns true when the rule should fire for snap.
	Condition func(snap Snapshot) bool `json:"-"`
}

AlertRule is a declarative gate on Snapshot fields. The embedder supplies a list of rules to the Aggregator; on every Snapshot the rules are evaluated and any that return true are surfaced as FiringAlert entries in Snapshot.Alerts (and, when the Prometheus handler is mounted, exposed as `parsec_alerts_firing{alert,severity}` gauges).

The Condition closure receives the Snapshot pre-Alerts, so a rule cannot reference its own firing. Side effects in Condition are strongly discouraged — the function may be called multiple times per scrape window.

type CacheSource

type CacheSource struct {
	Stats func() cacheStats
	// contains filtered or unexported fields
}

CacheSource implements Source against a CacheStatter. The window for "evictions last hour" is approximate: it diffs the eviction counter against the value observed an hour ago.

func NewCacheSource

func NewCacheSource(stats func() cacheStats) *CacheSource

NewCacheSource constructs a CacheSource over the supplied stat func.

func NewCacheSourceFromCache added in v0.3.0

func NewCacheSourceFromCache(c cache.Cache) *CacheSource

NewCacheSourceFromCache adapts a cache.Cache to a Source by wiring its Stats() call into the CacheSource closure. The embedder shares one cache instance across the library and the telemetry aggregator without writing the adapter by hand.

Returns nil when c is nil so callers can compose blindly:

agg := telemetry.New(other..., telemetry.NewCacheSourceFromCache(p.Cache()))

will silently drop the cache source when the cache is disabled.

func (*CacheSource) Cache

func (*CacheSource) Channels

func (s *CacheSource) Channels(context.Context) ChannelStats

func (*CacheSource) Envelopes

func (s *CacheSource) Envelopes(context.Context) EnvelopeStats

func (*CacheSource) History

func (*CacheSource) Presence

func (*CacheSource) Tokens

type CacheStats

type CacheStats struct {
	HitRatePct        float64 `json:"hit_rate_pct"`
	SizeBytes         int64   `json:"size_bytes"`
	SizeEntries       int64   `json:"size_entries,omitempty"`
	EvictionsLastHour int64   `json:"evictions_last_hour"`
}

CacheStats is the cache aggregate.

type CacheStatter

type CacheStatter interface {
	Stats() cacheStats
}

CacheStatter is satisfied by both cache.MemoryCache and cache.RedisCache.

type ChannelEntry

type ChannelEntry struct {
	Name    string
	Pattern string
}

ChannelEntry is the minimal shape ChannelSource consumes.

type ChannelLister

type ChannelLister interface {
	List() []ChannelEntry
}

ChannelLister returns the current set of channels for ChannelSource.

type ChannelSource

type ChannelSource struct{ Lister ChannelLister }

ChannelSource is a Source that reports the channel aggregate from a ChannelLister. The other source methods return zero values.

func (ChannelSource) Cache

func (ChannelSource) Channels

func (c ChannelSource) Channels(_ context.Context) ChannelStats

func (ChannelSource) Envelopes

func (ChannelSource) History

func (ChannelSource) Presence

func (ChannelSource) Tokens

type ChannelStats

type ChannelStats struct {
	TotalActive int64            `json:"total_active"`
	ByPattern   map[string]int64 `json:"by_pattern,omitempty"`
}

ChannelStats is the per-pattern aggregate.

type EnvelopeCounters

type EnvelopeCounters struct {
	// contains filtered or unexported fields
}

EnvelopeCounters tracks publish rate + per-aspect counts.

func NewEnvelopeCounters

func NewEnvelopeCounters() *EnvelopeCounters

NewEnvelopeCounters constructs a counter with a 60-second rate window.

func (*EnvelopeCounters) AsSource

func (e *EnvelopeCounters) AsSource() Source

AsSource adapts the counter to a Source.

func (*EnvelopeCounters) Observe

func (e *EnvelopeCounters) Observe(t time.Time, aspect string)

Observe records one publish at time t for the named aspect.

func (*EnvelopeCounters) Snapshot

func (e *EnvelopeCounters) Snapshot() EnvelopeStats

Snapshot returns the current EnvelopeStats.

type EnvelopeStats

type EnvelopeStats struct {
	RatePerSecond float64          `json:"rate_per_second"`
	ByAspect      map[string]int64 `json:"by_aspect,omitempty"`
}

EnvelopeStats is the publish-rate aggregate.

type FiringAlert added in v0.3.0

type FiringAlert struct {
	Name        string   `json:"name"`
	Severity    Severity `json:"severity"`
	Description string   `json:"description,omitempty"`
}

FiringAlert is the per-snapshot record of a rule that evaluated true. It is the JSON-friendly subset of AlertRule (no Condition closure) plus the snapshot timestamp so consumers can correlate firings with dashboard time ranges.

func EvaluateAlerts added in v0.3.0

func EvaluateAlerts(snap Snapshot, rules []AlertRule) []FiringAlert

EvaluateAlerts walks rules and returns the FiringAlert list for the snapshot, sorted by Name for stable ordering. A nil rule list returns nil.

type HistoryStats

type HistoryStats struct {
	TotalEnvelopesBuffered int64   `json:"total_envelopes_buffered"`
	BufferUtilizationPct   float64 `json:"buffer_utilization_pct"`
}

HistoryStats is the history-buffer aggregate.

type PresenceStats

type PresenceStats struct {
	TotalUsers        int64   `json:"total_users"`
	TotalAgents       int64   `json:"total_agents"`
	AveragePerChannel float64 `json:"average_per_channel"`
}

PresenceStats is the presence aggregate.

type Severity added in v0.3.0

type Severity string

Severity classifies an alert firing. The string values are stable and surface verbatim in JSON output, Prometheus labels, and operator dashboards.

const (
	// SeverityInfo is informational; the condition is unusual but does
	// not require operator action. Use for capacity warnings well
	// below their hard ceiling.
	SeverityInfo Severity = "info"
	// SeverityWarning means the condition warrants investigation. The
	// system is still serving traffic but a metric has crossed a
	// threshold that historically precedes degradation.
	SeverityWarning Severity = "warning"
	// SeverityCritical means immediate action — the condition either
	// already degrades user experience or will within minutes.
	SeverityCritical Severity = "critical"
)

func (Severity) Valid added in v0.3.0

func (s Severity) Valid() bool

Valid reports whether s is a recognized severity.

type Snapshot

type Snapshot struct {
	Channels  ChannelStats  `json:"channels"`
	Envelopes EnvelopeStats `json:"envelopes"`
	Presence  PresenceStats `json:"presence"`
	History   HistoryStats  `json:"history"`
	Tokens    TokenStats    `json:"tokens"`
	Cache     CacheStats    `json:"cache"`
	At        time.Time     `json:"at"`
	// Alerts lists the AlertRule(s) that evaluated true on this snapshot.
	// Empty when no rules are configured or none fired. Populated by the
	// Aggregator after summing sources so a rule can reference the
	// aggregate (e.g. "total active > N").
	Alerts []FiringAlert `json:"alerts,omitempty"`
}

Snapshot is the aggregated /parsec/metrics response shape.

type Source

type Source interface {
	Channels(ctx context.Context) ChannelStats
	Envelopes(ctx context.Context) EnvelopeStats
	Presence(ctx context.Context) PresenceStats
	History(ctx context.Context) HistoryStats
	Tokens(ctx context.Context) TokenStats
	Cache(ctx context.Context) CacheStats
}

Source is the contract every telemetry input satisfies. Each method is invoked on every snapshot — if a metric is unavailable, return the zero value (the field will simply read as 0 in the JSON).

type TokenCounters

type TokenCounters struct {
	// contains filtered or unexported fields
}

TokenCounters tracks token issuance over a sliding window. The token broker calls Issued / Revoked on each event; the source reads back hour-window aggregates.

func NewTokenCounters

func NewTokenCounters() *TokenCounters

NewTokenCounters constructs a counter with a one-hour window.

func (*TokenCounters) AsSource

func (c *TokenCounters) AsSource() Source

AsSource adapts the counter to a Source whose Tokens method returns the current Snapshot.

func (*TokenCounters) Expired

func (c *TokenCounters) Expired()

Expired drops one from the active set without recording a revocation.

func (*TokenCounters) Issued

func (c *TokenCounters) Issued(t time.Time)

Issued records one issued token at time t.

func (*TokenCounters) Revoked

func (c *TokenCounters) Revoked(t time.Time)

Revoked records one revoked token at time t.

func (*TokenCounters) Snapshot

func (c *TokenCounters) Snapshot() TokenStats

Snapshot returns the current TokenStats.

type TokenStats

type TokenStats struct {
	IssuedLastHour  int64 `json:"issued_last_hour"`
	RevokedLastHour int64 `json:"revoked_last_hour"`
	ActiveCount     int64 `json:"active_count"`
}

TokenStats is the token issuance aggregate.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL