Documentation
¶
Overview ¶
Package telemetry exposes the aggregated /parsec/metrics view that the upgrade spec calls for. It is the operator-facing snapshot of channel health, envelope rates, presence, history utilization, token issuance, and cache hit rates — pulled from every running Parsec component and rendered in one JSON response.
The package is a small aggregator, not a metrics store. Each Source is plugged in at construction; the snapshot is computed on demand. For Prometheus exposition the existing internal/metrics registry stays the source of truth — this surface is a higher-level dashboard view.
Index ¶
- func ValidateAlertRules(rules []AlertRule) error
- type Aggregator
- type AlertRule
- type CacheSource
- func (s *CacheSource) Cache(context.Context) CacheStats
- func (s *CacheSource) Channels(context.Context) ChannelStats
- func (s *CacheSource) Envelopes(context.Context) EnvelopeStats
- func (s *CacheSource) History(context.Context) HistoryStats
- func (s *CacheSource) Presence(context.Context) PresenceStats
- func (s *CacheSource) Tokens(context.Context) TokenStats
- type CacheStats
- type CacheStatter
- type ChannelEntry
- type ChannelLister
- type ChannelSource
- func (ChannelSource) Cache(context.Context) CacheStats
- func (c ChannelSource) Channels(_ context.Context) ChannelStats
- func (ChannelSource) Envelopes(context.Context) EnvelopeStats
- func (ChannelSource) History(context.Context) HistoryStats
- func (ChannelSource) Presence(context.Context) PresenceStats
- func (ChannelSource) Tokens(context.Context) TokenStats
- type ChannelStats
- type EnvelopeCounters
- type EnvelopeStats
- type FiringAlert
- type HistoryStats
- type PresenceStats
- type Severity
- type Snapshot
- type Source
- type TokenCounters
- type TokenStats
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ValidateAlertRules ¶ added in v0.3.0
ValidateAlertRules checks the rule list for shape problems before the Aggregator starts using it. Empty names, unknown severities, nil Condition closures, and duplicate names are all rejected. The embedder calls this at boot so misconfigured deployments fail loud rather than silently dropping firings.
Types ¶
type Aggregator ¶
type Aggregator struct {
Sources []Source
// Rules, when non-empty, are evaluated against every Snapshot. The
// list is validated by Aggregator.WithAlerts; assigning Rules
// directly skips that check.
Rules []AlertRule
}
Aggregator composes Source(s) into a single Snapshot.
func New ¶
func New(sources ...Source) *Aggregator
New constructs an Aggregator over the supplied sources.
func (*Aggregator) Handler ¶
func (a *Aggregator) Handler() http.Handler
Handler returns an http.Handler that serves the JSON snapshot at the mount path. Mount it at /parsec/metrics.
func (*Aggregator) PrometheusHandler ¶ added in v0.3.0
func (a *Aggregator) PrometheusHandler() http.Handler
PrometheusHandler returns an http.Handler that writes the Snapshot as Prometheus text exposition (version 0.0.4). The handler re-aggregates on every scrape so the values are always current.
Mount it under a path the operator's Prometheus is scraping — for example `/parsec/telemetry/metrics` (kept separate from the existing `/metrics` collector registry to avoid metric-name collisions). The embedder is responsible for the mount; this package only returns the handler.
Cardinality budget: only bounded label sets escape — `pattern` is a configured channel grammar pattern, `aspect` is the envelope kind supplied by the embedder, `alert` is the rule name, `severity` is one of info/warning/critical. Operators who plumb a high-cardinality pattern or aspect into a Source are responsible for the bytes — the handler does not truncate.
func (*Aggregator) Snapshot ¶
func (a *Aggregator) Snapshot(ctx context.Context) Snapshot
Snapshot computes the current aggregate. Sources are queried in sequence; their stats are summed field-by-field (rates and utilization percentages are averaged across sources).
func (*Aggregator) WithAlerts ¶ added in v0.3.0
func (a *Aggregator) WithAlerts(rules []AlertRule) (*Aggregator, error)
WithAlerts validates rules and attaches them to the Aggregator. The receiver is returned so callers can chain it onto New for fluent construction. Returns an error if any rule fails ValidateAlertRules.
type AlertRule ¶ added in v0.3.0
type AlertRule struct {
// Name is a stable identifier used in JSON output, Prometheus
// labels, and operator runbooks. Must be unique within the rule
// list; duplicates are rejected by ValidateAlertRules.
Name string `json:"name"`
// Severity classifies the firing. See Severity for the levels.
Severity Severity `json:"severity"`
// Description is the operator-facing one-liner explaining what the
// rule guards against. Shown in dashboards next to the firing
// state.
Description string `json:"description,omitempty"`
// Condition returns true when the rule should fire for snap.
Condition func(snap Snapshot) bool `json:"-"`
}
AlertRule is a declarative gate on Snapshot fields. The embedder supplies a list of rules to the Aggregator; on every Snapshot the rules are evaluated and any that return true are surfaced as FiringAlert entries in Snapshot.Alerts (and, when the Prometheus handler is mounted, exposed as `parsec_alerts_firing{alert,severity}` gauges).
The Condition closure receives the Snapshot pre-Alerts, so a rule cannot reference its own firing. Side effects in Condition are strongly discouraged — the function may be called multiple times per scrape window.
type CacheSource ¶
type CacheSource struct {
Stats func() cacheStats
// contains filtered or unexported fields
}
CacheSource implements Source against a CacheStatter. The window for "evictions last hour" is approximate: it diffs the eviction counter against the value observed an hour ago.
func NewCacheSource ¶
func NewCacheSource(stats func() cacheStats) *CacheSource
NewCacheSource constructs a CacheSource over the supplied stat func.
func NewCacheSourceFromCache ¶ added in v0.3.0
func NewCacheSourceFromCache(c cache.Cache) *CacheSource
NewCacheSourceFromCache adapts a cache.Cache to a Source by wiring its Stats() call into the CacheSource closure. The embedder shares one cache instance across the library and the telemetry aggregator without writing the adapter by hand.
Returns nil when c is nil so callers can compose blindly:
agg := telemetry.New(other..., telemetry.NewCacheSourceFromCache(p.Cache()))
will silently drop the cache source when the cache is disabled.
func (*CacheSource) Cache ¶
func (s *CacheSource) Cache(context.Context) CacheStats
func (*CacheSource) Channels ¶
func (s *CacheSource) Channels(context.Context) ChannelStats
func (*CacheSource) Envelopes ¶
func (s *CacheSource) Envelopes(context.Context) EnvelopeStats
func (*CacheSource) History ¶
func (s *CacheSource) History(context.Context) HistoryStats
func (*CacheSource) Presence ¶
func (s *CacheSource) Presence(context.Context) PresenceStats
func (*CacheSource) Tokens ¶
func (s *CacheSource) Tokens(context.Context) TokenStats
type CacheStats ¶
type CacheStats struct {
HitRatePct float64 `json:"hit_rate_pct"`
SizeBytes int64 `json:"size_bytes"`
SizeEntries int64 `json:"size_entries,omitempty"`
EvictionsLastHour int64 `json:"evictions_last_hour"`
}
CacheStats is the cache aggregate.
type CacheStatter ¶
type CacheStatter interface {
Stats() cacheStats
}
CacheStatter is satisfied by both cache.MemoryCache and cache.RedisCache.
type ChannelEntry ¶
ChannelEntry is the minimal shape ChannelSource consumes.
type ChannelLister ¶
type ChannelLister interface {
List() []ChannelEntry
}
ChannelLister returns the current set of channels for ChannelSource.
type ChannelSource ¶
type ChannelSource struct{ Lister ChannelLister }
ChannelSource is a Source that reports the channel aggregate from a ChannelLister. The other source methods return zero values.
func (ChannelSource) Cache ¶
func (ChannelSource) Cache(context.Context) CacheStats
func (ChannelSource) Channels ¶
func (c ChannelSource) Channels(_ context.Context) ChannelStats
func (ChannelSource) Envelopes ¶
func (ChannelSource) Envelopes(context.Context) EnvelopeStats
func (ChannelSource) History ¶
func (ChannelSource) History(context.Context) HistoryStats
func (ChannelSource) Presence ¶
func (ChannelSource) Presence(context.Context) PresenceStats
func (ChannelSource) Tokens ¶
func (ChannelSource) Tokens(context.Context) TokenStats
type ChannelStats ¶
type ChannelStats struct {
TotalActive int64 `json:"total_active"`
ByPattern map[string]int64 `json:"by_pattern,omitempty"`
}
ChannelStats is the per-pattern aggregate.
type EnvelopeCounters ¶
type EnvelopeCounters struct {
// contains filtered or unexported fields
}
EnvelopeCounters tracks publish rate + per-aspect counts.
func NewEnvelopeCounters ¶
func NewEnvelopeCounters() *EnvelopeCounters
NewEnvelopeCounters constructs a counter with a 60-second rate window.
func (*EnvelopeCounters) AsSource ¶
func (e *EnvelopeCounters) AsSource() Source
AsSource adapts the counter to a Source.
func (*EnvelopeCounters) Observe ¶
func (e *EnvelopeCounters) Observe(t time.Time, aspect string)
Observe records one publish at time t for the named aspect.
func (*EnvelopeCounters) Snapshot ¶
func (e *EnvelopeCounters) Snapshot() EnvelopeStats
Snapshot returns the current EnvelopeStats.
type EnvelopeStats ¶
type EnvelopeStats struct {
RatePerSecond float64 `json:"rate_per_second"`
ByAspect map[string]int64 `json:"by_aspect,omitempty"`
}
EnvelopeStats is the publish-rate aggregate.
type FiringAlert ¶ added in v0.3.0
type FiringAlert struct {
Name string `json:"name"`
Severity Severity `json:"severity"`
Description string `json:"description,omitempty"`
}
FiringAlert is the per-snapshot record of a rule that evaluated true. It is the JSON-friendly subset of AlertRule (no Condition closure) plus the snapshot timestamp so consumers can correlate firings with dashboard time ranges.
func EvaluateAlerts ¶ added in v0.3.0
func EvaluateAlerts(snap Snapshot, rules []AlertRule) []FiringAlert
EvaluateAlerts walks rules and returns the FiringAlert list for the snapshot, sorted by Name for stable ordering. A nil rule list returns nil.
type HistoryStats ¶
type HistoryStats struct {
TotalEnvelopesBuffered int64 `json:"total_envelopes_buffered"`
BufferUtilizationPct float64 `json:"buffer_utilization_pct"`
}
HistoryStats is the history-buffer aggregate.
type PresenceStats ¶
type PresenceStats struct {
TotalUsers int64 `json:"total_users"`
TotalAgents int64 `json:"total_agents"`
AveragePerChannel float64 `json:"average_per_channel"`
}
PresenceStats is the presence aggregate.
type Severity ¶ added in v0.3.0
type Severity string
Severity classifies an alert firing. The string values are stable and surface verbatim in JSON output, Prometheus labels, and operator dashboards.
const ( // SeverityInfo is informational; the condition is unusual but does // not require operator action. Use for capacity warnings well // below their hard ceiling. SeverityInfo Severity = "info" // SeverityWarning means the condition warrants investigation. The // system is still serving traffic but a metric has crossed a // threshold that historically precedes degradation. SeverityWarning Severity = "warning" // SeverityCritical means immediate action — the condition either // already degrades user experience or will within minutes. SeverityCritical Severity = "critical" )
type Snapshot ¶
type Snapshot struct {
Channels ChannelStats `json:"channels"`
Envelopes EnvelopeStats `json:"envelopes"`
Presence PresenceStats `json:"presence"`
History HistoryStats `json:"history"`
Tokens TokenStats `json:"tokens"`
Cache CacheStats `json:"cache"`
At time.Time `json:"at"`
// Alerts lists the AlertRule(s) that evaluated true on this snapshot.
// Empty when no rules are configured or none fired. Populated by the
// Aggregator after summing sources so a rule can reference the
// aggregate (e.g. "total active > N").
Alerts []FiringAlert `json:"alerts,omitempty"`
}
Snapshot is the aggregated /parsec/metrics response shape.
type Source ¶
type Source interface {
Channels(ctx context.Context) ChannelStats
Envelopes(ctx context.Context) EnvelopeStats
Presence(ctx context.Context) PresenceStats
History(ctx context.Context) HistoryStats
Tokens(ctx context.Context) TokenStats
Cache(ctx context.Context) CacheStats
}
Source is the contract every telemetry input satisfies. Each method is invoked on every snapshot — if a metric is unavailable, return the zero value (the field will simply read as 0 in the JSON).
type TokenCounters ¶
type TokenCounters struct {
// contains filtered or unexported fields
}
TokenCounters tracks token issuance over a sliding window. The token broker calls Issued / Revoked on each event; the source reads back hour-window aggregates.
func NewTokenCounters ¶
func NewTokenCounters() *TokenCounters
NewTokenCounters constructs a counter with a one-hour window.
func (*TokenCounters) AsSource ¶
func (c *TokenCounters) AsSource() Source
AsSource adapts the counter to a Source whose Tokens method returns the current Snapshot.
func (*TokenCounters) Expired ¶
func (c *TokenCounters) Expired()
Expired drops one from the active set without recording a revocation.
func (*TokenCounters) Issued ¶
func (c *TokenCounters) Issued(t time.Time)
Issued records one issued token at time t.
func (*TokenCounters) Revoked ¶
func (c *TokenCounters) Revoked(t time.Time)
Revoked records one revoked token at time t.
func (*TokenCounters) Snapshot ¶
func (c *TokenCounters) Snapshot() TokenStats
Snapshot returns the current TokenStats.
type TokenStats ¶
type TokenStats struct {
IssuedLastHour int64 `json:"issued_last_hour"`
RevokedLastHour int64 `json:"revoked_last_hour"`
ActiveCount int64 `json:"active_count"`
}
TokenStats is the token issuance aggregate.