telemetry

package
v0.1.0-beta.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2026 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Package telemetry implements opt-in disk persistence for the three signal types gridctl already captures in memory: logs, metrics, and traces. All writers are async with respect to the request path — a failed disk write logs an error and continues so a telemetry persistence fault never breaks an MCP call.

Index

Constants

View Source
const DefaultMetricsFlushInterval = 60 * time.Second

DefaultMetricsFlushInterval is the period at which the metrics flusher snapshots the accumulator and appends a per-server diff line. 60s matches the in-memory bucket granularity (1-minute buckets in metrics.Accumulator).

Variables

This section is empty.

Functions

func IsValidSignal

func IsValidSignal(sig string) bool

IsValidSignal reports whether sig names a known telemetry signal. Used by the DELETE handler to reject invalid query parameters before touching disk.

func Wipe

func Wipe(stackName, serverName, signal string) error

Wipe deletes telemetry files matching the requested scope. Empty serverName or signal acts as a wildcard. Both the active <sig>.jsonl and any rotated lumberjack siblings are removed.

Scope combinations:

serverName="", signal=""      → everything for this stack
serverName="X", signal=""     → all signals for server X
serverName="", signal="logs"  → logs across all servers in the stack
serverName="X", signal="logs" → logs for server X only

The operation is wrapped in state.WithLock so a wipe and a daemon reload/seed cannot interleave on the same stack file. Lumberjack keeps its active file open across rotations; on POSIX, removing the file while the writer holds it is safe (unlink-on-close), and lumberjack will transparently reopen on the next emit.

A missing stack directory is treated as a successful no-op; signal validation (against the known {logs, metrics, traces}) is the caller's responsibility.

Types

type InventoryRecord

type InventoryRecord struct {
	Server     string    `json:"server"`
	Signal     string    `json:"signal"`
	Path       string    `json:"path"`
	SizeBytes  int64     `json:"sizeBytes"`
	OldestTime time.Time `json:"oldestTime"`
	NewestTime time.Time `json:"newestTime"`
	FileCount  int       `json:"fileCount"`
}

InventoryRecord describes the on-disk footprint of a single (server, signal) pair. Sizes and times aggregate the active jsonl plus any rotated/compressed lumberjack siblings (e.g. `logs.jsonl`, `logs-2026-04-30T12-00-00.000.jsonl`, `logs-…jsonl.gz`). Records are only emitted when at least one matching file exists so the wipe modal can drive its enumeration without filtering empties.

func Inventory

func Inventory(stackName, serverName string) ([]InventoryRecord, error)

Inventory walks ~/.gridctl/telemetry/<stackName>/ and returns one record per (server, signal) pair where at least one file exists. When serverName is non-empty only that server's records are returned.

Records are sorted by server name then by the canonical signal order (logs, metrics, traces) so the API response is deterministic. A missing stack directory returns an empty slice without error — the daemon may legitimately have no persisted telemetry yet.

type LogOpts

type LogOpts struct {
	MaxSizeMB  int
	MaxBackups int
	MaxAgeDays int
}

LogOpts mirrors lumberjack rotation knobs and matches RetentionConfig in pkg/config. Zero values fall back to lumberjack defaults via logging.NewFileHandler.

type LogRouter

type LogRouter struct {
	// contains filtered or unexported fields
}

LogRouter is a slog.Handler that fans every record out to the inner handler (the existing buffer/redact chain that powers the UI) AND, when a record's resolved server attribute matches a configured server, to a per-server lumberjack file. The routing key is `component` when present and `server` otherwise — gridctl's older subsystems (e.g. pkg/mcp/gateway) tag per-server loggers with `.With("server", name)`, while newer code uses `.With("component", name)`. Both shapes route correctly; `component` wins when both are present in the same logger view.

Records without a routing-key match pass through to the inner handler only.

func NewLogRouter

func NewLogRouter(inner slog.Handler) *LogRouter

NewLogRouter wraps an existing slog.Handler chain. The router itself is a slog.Handler — install it in place of the existing chain to enable per- server fan-out. inner must not be nil; pass the existing redact/buffer chain that powers the in-memory ring buffer.

func (*LogRouter) AddServer

func (r *LogRouter) AddServer(name, path string, opts LogOpts) error

AddServer registers a per-server file handler. Subsequent records with a resolved component=name will be appended to path. AddServer is idempotent: calling it twice for the same server replaces the previous handler (the underlying lumberjack file fd persists until process exit). A failure to open the file is returned to the caller; gateway_builder logs and proceeds.

func (*LogRouter) Close

func (r *LogRouter) Close()

Close drops every per-server writer entry. The on-disk file fds remain open until process exit (lumberjack-owned). The router itself remains usable; records continue to flow to the inner handler.

func (*LogRouter) ConfiguredServers

func (r *LogRouter) ConfiguredServers() []string

ConfiguredServers returns the names currently persisting logs.

func (*LogRouter) Enabled

func (r *LogRouter) Enabled(ctx context.Context, level slog.Level) bool

Enabled implements slog.Handler. Returns true if either inner or any per- server handler is enabled at this level — keeps log emission cheap when everything is disabled.

func (*LogRouter) Handle

func (r *LogRouter) Handle(ctx context.Context, record slog.Record) error

Handle implements slog.Handler. Always forwards to inner; additionally forwards to the per-server file handler when the resolved component (from either accumulated handler-level attrs or this record's own attrs) matches a configured server. Errors from the per-server write are reported via SelfLogger and swallowed.

func (*LogRouter) RemoveServer

func (r *LogRouter) RemoveServer(name string)

RemoveServer stops persisting logs for a server. Safe to call for an unregistered server. The on-disk file fd is not closed here — lumberjack owns it, and POSIX append semantics make explicit close unnecessary.

func (*LogRouter) SetSelfLogger

func (r *LogRouter) SetSelfLogger(logger *slog.Logger)

SetSelfLogger configures where the router itself logs persistence errors. Pass a logger backed by the in-memory buffer so users see write failures surface in the UI. The setting propagates to all derived router views.

func (*LogRouter) WithAttrs

func (r *LogRouter) WithAttrs(attrs []slog.Attr) slog.Handler

WithAttrs implements slog.Handler. Returns a derived router that carries the additional attrs alongside the inner-with-attrs handler. The shared per-server map is unchanged. Both `component` and `server` are recognized as routing keys; `component` takes precedence when both appear in the same attrs slice.

func (*LogRouter) WithGroup

func (r *LogRouter) WithGroup(name string) slog.Handler

WithGroup implements slog.Handler. Group nesting does not affect component resolution — component is conventionally a top-level attr.

type MetricsFlusher

type MetricsFlusher struct {
	// contains filtered or unexported fields
}

MetricsFlusher periodically serializes per-server token counters from a metrics.Accumulator and appends one NDJSON line per server with non-zero deltas. Single goroutine; one-shot Start/Stop pair (re-Starting after Stop is a no-op). Failed writes are logged via the self logger and do not crash the goroutine.

func NewMetricsFlusher

func NewMetricsFlusher(acc *metrics.Accumulator, interval time.Duration) *MetricsFlusher

NewMetricsFlusher creates a flusher with the given accumulator and per-flush interval. interval <= 0 falls back to DefaultMetricsFlushInterval.

func (*MetricsFlusher) AddServer

func (f *MetricsFlusher) AddServer(name, path string, opts LogOpts) error

AddServer registers a per-server output file. Idempotent: re-adding a server replaces the prior writer (the lumberjack handle is closed). The previous-snapshot tracking is preserved so re-adding does not synthesize a reset.

func (*MetricsFlusher) ConfiguredServers

func (f *MetricsFlusher) ConfiguredServers() []string

ConfiguredServers returns the names currently persisting metrics.

func (*MetricsFlusher) RemoveServer

func (f *MetricsFlusher) RemoveServer(name string)

RemoveServer stops persisting metrics for a server and closes its writer. The previous-snapshot tracking is dropped so re-adding produces a fresh reset line as the first entry.

func (*MetricsFlusher) SeedFromFile

func (f *MetricsFlusher) SeedFromFile(path string, n int) error

SeedFromFile reads up to the last n NDJSON entries from path and seeds four surfaces atomically: cumulative per-server token totals (via Restore), cumulative per-server cost totals (via RestoreCost), per-minute time-series ring buckets — both tokens and cost — (via ReplaySnapshot), and this flusher's previous-snapshot maps (prev + prevCost). The Token Usage Over Time and Cost Over Time charts are backed by the time-series ring; without the bucket replay each would show only a single post-restart point. The Cost KPI card is backed by the cumulative atomics; without RestoreCost it would silently read $0 even when pre-restart cost was non-zero.

On-disk format mirrors flushOnce's output: full MetricsSnapshotLine entries plus lighter reset sentinels ({reset, ts, server} only). Reset sentinels parse with a zero Total and are immediately followed by a full reset line whose Total carries the post-reset state — so taking the most recent Total per server yields the correct seed value in either case.

For time-series, only non-reset lines are replayed: a Reset line's Diff carries the carry-over from prior sessions (full snapshot), not a single minute's activity, so replaying it would create a synthetic spike at the reset boundary. The same skip applies to cost replay.

Legacy files predating cost persistence have no CostDiff / CostTotal fields; they unmarshal with nil pointers, the cost diff sums to zero, the cost replay no-ops, and RestoreCost is invoked with an empty map (which itself no-ops). Token state restores normally — the file remains fully readable with no warning.

Missing or empty files return nil (expected on first run with persistence enabled). Malformed lines are skipped without aborting; a single corrupt line should not lose the rest of the history.

func (*MetricsFlusher) SetLogger

func (f *MetricsFlusher) SetLogger(logger *slog.Logger)

SetLogger configures where flush errors are logged. Pass a logger backed by the in-memory buffer so users see write failures in the UI.

func (*MetricsFlusher) Start

func (f *MetricsFlusher) Start()

Start launches the flush goroutine. Safe to call once; subsequent calls are no-ops. The goroutine runs until Stop is called.

func (*MetricsFlusher) Stop

func (f *MetricsFlusher) Stop()

Stop signals the flush goroutine to exit and waits for it to drain — one final flush is performed before exit so the on-disk file reflects the last in-memory state. Safe to call multiple times concurrently; the stop-channel close is sync.Once-guarded so racing Stop() calls don't panic with a "close of closed channel".

type MetricsSnapshotLine

type MetricsSnapshotLine struct {
	Time      time.Time                   `json:"ts"`
	Server    string                      `json:"server"`
	Reset     bool                        `json:"reset,omitempty"`
	CostReset bool                        `json:"cost_reset,omitempty"`
	Diff      metrics.TokenCounts         `json:"diff"`
	Total     metrics.TokenCounts         `json:"total"`
	CostDiff  *metrics.CostMicroUSDCounts `json:"cost_diff,omitempty"`
	CostTotal *metrics.CostMicroUSDCounts `json:"cost_total,omitempty"`
}

MetricsSnapshotLine is the on-disk schema for one NDJSON entry in metrics.jsonl. Time, Server, and Diff are populated for every line; Reset is true on the first line written after a token counter reset (server restart, Accumulator.Clear) and the diff for that line is the *full* snapshot. CostReset signals an independent cost-side reset (e.g. Accumulator.ClearCost between flushes) — the two flags are independent so a cost-only clear does not invalidate the token diff on the same line. Cost fields are pointer + omitempty so token-only minutes emit lines byte-identical to the pre-cost-persistence schema.

type TracesFileClient

type TracesFileClient struct {
	// contains filtered or unexported fields
}

TracesFileClient implements otlptrace.Client by writing each batch as one OTLP-JSON line per configured server. The OTel SDK calls UploadTraces with a slice of *tracepb.ResourceSpans already converted from ReadOnlySpan, so this avoids re-implementing the SDK's transform code.

Each emitted line is a fully valid TracesData envelope, matching the OTLP File Exporter spec — `tail -f traces.jsonl | otelcol --config ... otlpjsonfilereceiver` ingests cleanly.

Spans without a resolvable server.name attribute (rare; the gateway stamps it on every tool-call span) are dropped. This is documented as an accepted Phase-2 limitation: per-server file routing is what users want, and a "miscellaneous" file would muddy the inventory model.

func NewTracesFileClient

func NewTracesFileClient() *TracesFileClient

NewTracesFileClient constructs an empty client. Add servers via AddServer before passing the client to otlptrace.NewUnstarted.

func (*TracesFileClient) AddServer

func (c *TracesFileClient) AddServer(name, path string, opts LogOpts) error

AddServer registers a per-server traces.jsonl file. Idempotent: re-adding a server replaces the prior writer (the lumberjack handle is closed).

func (*TracesFileClient) ConfiguredServers

func (c *TracesFileClient) ConfiguredServers() []string

ConfiguredServers returns the names currently persisting traces.

func (*TracesFileClient) RemoveServer

func (c *TracesFileClient) RemoveServer(name string)

RemoveServer stops persisting traces for a server and closes its writer.

func (*TracesFileClient) SetLogger

func (c *TracesFileClient) SetLogger(logger *slog.Logger)

SetLogger configures where the client logs persistence errors.

func (*TracesFileClient) Start

func (c *TracesFileClient) Start(_ context.Context) error

Start implements otlptrace.Client. No connection setup required for files.

func (*TracesFileClient) Stop

Stop implements otlptrace.Client. Closes every per-server writer. Safe to call multiple times.

func (*TracesFileClient) UploadTraces

func (c *TracesFileClient) UploadTraces(_ context.Context, resourceSpans []*tracepb.ResourceSpans) error

UploadTraces implements otlptrace.Client. Re-routes spans by server.name attribute and writes one TracesData envelope per server per call. The per-server writer map is snapshotted under the lock; disk I/O happens outside the lock so a slow writer can't block AddServer/RemoveServer/Stop. Errors from individual writers are logged and swallowed — the SDK's batch processor must not stall because a disk write failed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL