telemetry

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2026 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package telemetry exposes the aggregated /parsec/metrics view that the upgrade spec calls for. It is the operator-facing snapshot of channel health, envelope rates, presence, history utilization, token issuance, and cache hit rates — pulled from every running Parsec component and rendered in one JSON response.

The package is a small aggregator, not a metrics store. Each Source is plugged in at construction; the snapshot is computed on demand. For Prometheus exposition the existing internal/metrics registry stays the source of truth — this surface is a higher-level dashboard view.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Aggregator

type Aggregator struct {
	Sources []Source
}

Aggregator composes Source(s) into a single Snapshot.

func New

func New(sources ...Source) *Aggregator

New constructs an Aggregator over the supplied sources.

func (*Aggregator) Handler

func (a *Aggregator) Handler() http.Handler

Handler returns an http.Handler that serves the JSON snapshot at the mount path. Mount it at /parsec/metrics.

func (*Aggregator) Snapshot

func (a *Aggregator) Snapshot(ctx context.Context) Snapshot

Snapshot computes the current aggregate. Sources are queried in sequence; their stats are summed field-by-field (rates and utilization percentages are averaged across sources).

type CacheSource

type CacheSource struct {
	Stats func() cacheStats
	// contains filtered or unexported fields
}

CacheSource implements Source against a CacheStatter. The window for "evictions last hour" is approximate: it diffs the eviction counter against the value observed an hour ago.

func NewCacheSource

func NewCacheSource(stats func() cacheStats) *CacheSource

NewCacheSource constructs a CacheSource over the supplied stat func.

func (*CacheSource) Cache

func (*CacheSource) Channels

func (s *CacheSource) Channels(context.Context) ChannelStats

func (*CacheSource) Envelopes

func (s *CacheSource) Envelopes(context.Context) EnvelopeStats

func (*CacheSource) History

func (*CacheSource) Presence

func (*CacheSource) Tokens

type CacheStats

type CacheStats struct {
	HitRatePct        float64 `json:"hit_rate_pct"`
	SizeBytes         int64   `json:"size_bytes"`
	SizeEntries       int64   `json:"size_entries,omitempty"`
	EvictionsLastHour int64   `json:"evictions_last_hour"`
}

CacheStats is the cache aggregate.

type CacheStatter

type CacheStatter interface {
	Stats() cacheStats
}

CacheStatter is satisfied by both cache.MemoryCache and cache.RedisCache.

type ChannelEntry

type ChannelEntry struct {
	Name    string
	Pattern string
}

ChannelEntry is the minimal shape ChannelSource consumes.

type ChannelLister

type ChannelLister interface {
	List() []ChannelEntry
}

ChannelLister returns the current set of channels for ChannelSource.

type ChannelSource

type ChannelSource struct{ Lister ChannelLister }

ChannelSource is a Source that reports the channel aggregate from a ChannelLister. The other source methods return zero values.

func (ChannelSource) Cache

func (ChannelSource) Channels

func (c ChannelSource) Channels(_ context.Context) ChannelStats

func (ChannelSource) Envelopes

func (ChannelSource) History

func (ChannelSource) Presence

func (ChannelSource) Tokens

type ChannelStats

type ChannelStats struct {
	TotalActive int64            `json:"total_active"`
	ByPattern   map[string]int64 `json:"by_pattern,omitempty"`
}

ChannelStats is the per-pattern aggregate.

type EnvelopeCounters

type EnvelopeCounters struct {
	// contains filtered or unexported fields
}

EnvelopeCounters tracks publish rate + per-aspect counts.

func NewEnvelopeCounters

func NewEnvelopeCounters() *EnvelopeCounters

NewEnvelopeCounters constructs a counter with a 60-second rate window.

func (*EnvelopeCounters) AsSource

func (e *EnvelopeCounters) AsSource() Source

AsSource adapts the counter to a Source.

func (*EnvelopeCounters) Observe

func (e *EnvelopeCounters) Observe(t time.Time, aspect string)

Observe records one publish at time t for the named aspect.

func (*EnvelopeCounters) Snapshot

func (e *EnvelopeCounters) Snapshot() EnvelopeStats

Snapshot returns the current EnvelopeStats.

type EnvelopeStats

type EnvelopeStats struct {
	RatePerSecond float64          `json:"rate_per_second"`
	ByAspect      map[string]int64 `json:"by_aspect,omitempty"`
}

EnvelopeStats is the publish-rate aggregate.

type HistoryStats

type HistoryStats struct {
	TotalEnvelopesBuffered int64   `json:"total_envelopes_buffered"`
	BufferUtilizationPct   float64 `json:"buffer_utilization_pct"`
}

HistoryStats is the history-buffer aggregate.

type PresenceStats

type PresenceStats struct {
	TotalUsers        int64   `json:"total_users"`
	TotalAgents       int64   `json:"total_agents"`
	AveragePerChannel float64 `json:"average_per_channel"`
}

PresenceStats is the presence aggregate.

type Snapshot

type Snapshot struct {
	Channels  ChannelStats  `json:"channels"`
	Envelopes EnvelopeStats `json:"envelopes"`
	Presence  PresenceStats `json:"presence"`
	History   HistoryStats  `json:"history"`
	Tokens    TokenStats    `json:"tokens"`
	Cache     CacheStats    `json:"cache"`
	At        time.Time     `json:"at"`
}

Snapshot is the aggregated /parsec/metrics response shape.

type Source

type Source interface {
	Channels(ctx context.Context) ChannelStats
	Envelopes(ctx context.Context) EnvelopeStats
	Presence(ctx context.Context) PresenceStats
	History(ctx context.Context) HistoryStats
	Tokens(ctx context.Context) TokenStats
	Cache(ctx context.Context) CacheStats
}

Source is the contract every telemetry input satisfies. Each method is invoked on every snapshot — if a metric is unavailable, return the zero value (the field will simply read as 0 in the JSON).

type TokenCounters

type TokenCounters struct {
	// contains filtered or unexported fields
}

TokenCounters tracks token issuance over a sliding window. The token broker calls Issued / Revoked on each event; the source reads back hour-window aggregates.

func NewTokenCounters

func NewTokenCounters() *TokenCounters

NewTokenCounters constructs a counter with a one-hour window.

func (*TokenCounters) AsSource

func (c *TokenCounters) AsSource() Source

AsSource adapts the counter to a Source whose Tokens method returns the current Snapshot.

func (*TokenCounters) Expired

func (c *TokenCounters) Expired()

Expired drops one from the active set without recording a revocation.

func (*TokenCounters) Issued

func (c *TokenCounters) Issued(t time.Time)

Issued records one issued token at time t.

func (*TokenCounters) Revoked

func (c *TokenCounters) Revoked(t time.Time)

Revoked records one revoked token at time t.

func (*TokenCounters) Snapshot

func (c *TokenCounters) Snapshot() TokenStats

Snapshot returns the current TokenStats.

type TokenStats

type TokenStats struct {
	IssuedLastHour  int64 `json:"issued_last_hour"`
	RevokedLastHour int64 `json:"revoked_last_hour"`
	ActiveCount     int64 `json:"active_count"`
}

TokenStats is the token issuance aggregate.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL