Documentation
¶
Overview ¶
Package telemetry implements opt-in disk persistence for the three signal types gridctl already captures in memory: logs, metrics, and traces. All writers are async with respect to the request path — a failed disk write logs an error and continues so a telemetry persistence fault never breaks an MCP call.
Index ¶
- Constants
- func IsValidSignal(sig string) bool
- func Wipe(stackName, serverName, signal string) error
- type InventoryRecord
- type LogOpts
- type LogRouter
- func (r *LogRouter) AddServer(name, path string, opts LogOpts) error
- func (r *LogRouter) Close()
- func (r *LogRouter) ConfiguredServers() []string
- func (r *LogRouter) Enabled(ctx context.Context, level slog.Level) bool
- func (r *LogRouter) Handle(ctx context.Context, record slog.Record) error
- func (r *LogRouter) RemoveServer(name string)
- func (r *LogRouter) SetSelfLogger(logger *slog.Logger)
- func (r *LogRouter) WithAttrs(attrs []slog.Attr) slog.Handler
- func (r *LogRouter) WithGroup(name string) slog.Handler
- type MetricsFlusher
- func (f *MetricsFlusher) AddServer(name, path string, opts LogOpts) error
- func (f *MetricsFlusher) ConfiguredServers() []string
- func (f *MetricsFlusher) RemoveServer(name string)
- func (f *MetricsFlusher) SeedFromFile(path string, n int) error
- func (f *MetricsFlusher) SetLogger(logger *slog.Logger)
- func (f *MetricsFlusher) Start()
- func (f *MetricsFlusher) Stop()
- type MetricsSnapshotLine
- type TracesFileClient
- func (c *TracesFileClient) AddServer(name, path string, opts LogOpts) error
- func (c *TracesFileClient) ConfiguredServers() []string
- func (c *TracesFileClient) RemoveServer(name string)
- func (c *TracesFileClient) SetLogger(logger *slog.Logger)
- func (c *TracesFileClient) Start(_ context.Context) error
- func (c *TracesFileClient) Stop(_ context.Context) error
- func (c *TracesFileClient) UploadTraces(_ context.Context, resourceSpans []*tracepb.ResourceSpans) error
Constants ¶
const DefaultMetricsFlushInterval = 60 * time.Second
DefaultMetricsFlushInterval is the period at which the metrics flusher snapshots the accumulator and appends a per-server diff line. 60s matches the in-memory bucket granularity (1-minute buckets in metrics.Accumulator).
Variables ¶
This section is empty.
Functions ¶
func IsValidSignal ¶
IsValidSignal reports whether sig names a known telemetry signal. Used by the DELETE handler to reject invalid query parameters before touching disk.
func Wipe ¶
Wipe deletes telemetry files matching the requested scope. Empty serverName or signal acts as a wildcard. Both the active <sig>.jsonl and any rotated lumberjack siblings are removed.
Scope combinations:
serverName="", signal="" → everything for this stack serverName="X", signal="" → all signals for server X serverName="", signal="logs" → logs across all servers in the stack serverName="X", signal="logs" → logs for server X only
The operation is wrapped in state.WithLock so a wipe and a daemon reload/seed cannot interleave on the same stack file. Lumberjack keeps its active file open across rotations; on POSIX, removing the file while the writer holds it is safe (unlink-on-close), and lumberjack will transparently reopen on the next emit.
A missing stack directory is treated as a successful no-op; signal validation (against the known {logs, metrics, traces}) is the caller's responsibility.
Types ¶
type InventoryRecord ¶
type InventoryRecord struct {
Server string `json:"server"`
Signal string `json:"signal"`
Path string `json:"path"`
SizeBytes int64 `json:"sizeBytes"`
OldestTime time.Time `json:"oldestTime"`
NewestTime time.Time `json:"newestTime"`
FileCount int `json:"fileCount"`
}
InventoryRecord describes the on-disk footprint of a single (server, signal) pair. Sizes and times aggregate the active jsonl plus any rotated/compressed lumberjack siblings (e.g. `logs.jsonl`, `logs-2026-04-30T12-00-00.000.jsonl`, `logs-…jsonl.gz`). Records are only emitted when at least one matching file exists so the wipe modal can drive its enumeration without filtering empties.
func Inventory ¶
func Inventory(stackName, serverName string) ([]InventoryRecord, error)
Inventory walks ~/.gridctl/telemetry/<stackName>/ and returns one record per (server, signal) pair where at least one file exists. When serverName is non-empty only that server's records are returned.
Records are sorted by server name then by the canonical signal order (logs, metrics, traces) so the API response is deterministic. A missing stack directory returns an empty slice without error — the daemon may legitimately have no persisted telemetry yet.
type LogOpts ¶
LogOpts mirrors lumberjack rotation knobs and matches RetentionConfig in pkg/config. Zero values fall back to lumberjack defaults via logging.NewFileHandler.
type LogRouter ¶
type LogRouter struct {
// contains filtered or unexported fields
}
LogRouter is a slog.Handler that fans every record out to the inner handler (the existing buffer/redact chain that powers the UI) AND, when a record's resolved server attribute matches a configured server, to a per-server lumberjack file. The routing key is `component` when present and `server` otherwise — gridctl's older subsystems (e.g. pkg/mcp/gateway) tag per-server loggers with `.With("server", name)`, while newer code uses `.With("component", name)`. Both shapes route correctly; `component` wins when both are present in the same logger view.
Records without a routing-key match pass through to the inner handler only.
func NewLogRouter ¶
NewLogRouter wraps an existing slog.Handler chain. The router itself is a slog.Handler — install it in place of the existing chain to enable per- server fan-out. inner must not be nil; pass the existing redact/buffer chain that powers the in-memory ring buffer.
func (*LogRouter) AddServer ¶
AddServer registers a per-server file handler. Subsequent records with a resolved component=name will be appended to path. AddServer is idempotent: calling it twice for the same server replaces the previous handler (the underlying lumberjack file fd persists until process exit). A failure to open the file is returned to the caller; gateway_builder logs and proceeds.
func (*LogRouter) Close ¶
func (r *LogRouter) Close()
Close drops every per-server writer entry. The on-disk file fds remain open until process exit (lumberjack-owned). The router itself remains usable; records continue to flow to the inner handler.
func (*LogRouter) ConfiguredServers ¶
ConfiguredServers returns the names currently persisting logs.
func (*LogRouter) Enabled ¶
Enabled implements slog.Handler. Returns true if either inner or any per- server handler is enabled at this level — keeps log emission cheap when everything is disabled.
func (*LogRouter) Handle ¶
Handle implements slog.Handler. Always forwards to inner; additionally forwards to the per-server file handler when the resolved component (from either accumulated handler-level attrs or this record's own attrs) matches a configured server. Errors from the per-server write are reported via SelfLogger and swallowed.
func (*LogRouter) RemoveServer ¶
RemoveServer stops persisting logs for a server. Safe to call for an unregistered server. The on-disk file fd is not closed here — lumberjack owns it, and POSIX append semantics make explicit close unnecessary.
func (*LogRouter) SetSelfLogger ¶
SetSelfLogger configures where the router itself logs persistence errors. Pass a logger backed by the in-memory buffer so users see write failures surface in the UI. The setting propagates to all derived router views.
func (*LogRouter) WithAttrs ¶
WithAttrs implements slog.Handler. Returns a derived router that carries the additional attrs alongside the inner-with-attrs handler. The shared per-server map is unchanged. Both `component` and `server` are recognized as routing keys; `component` takes precedence when both appear in the same attrs slice.
type MetricsFlusher ¶
type MetricsFlusher struct {
// contains filtered or unexported fields
}
MetricsFlusher periodically serializes per-server token counters from a metrics.Accumulator and appends one NDJSON line per server with non-zero deltas. Single goroutine; one-shot Start/Stop pair (re-Starting after Stop is a no-op). Failed writes are logged via the self logger and do not crash the goroutine.
func NewMetricsFlusher ¶
func NewMetricsFlusher(acc *metrics.Accumulator, interval time.Duration) *MetricsFlusher
NewMetricsFlusher creates a flusher with the given accumulator and per-flush interval. interval <= 0 falls back to DefaultMetricsFlushInterval.
func (*MetricsFlusher) AddServer ¶
func (f *MetricsFlusher) AddServer(name, path string, opts LogOpts) error
AddServer registers a per-server output file. Idempotent: re-adding a server replaces the prior writer (the lumberjack handle is closed). The previous-snapshot tracking is preserved so re-adding does not synthesize a reset.
func (*MetricsFlusher) ConfiguredServers ¶
func (f *MetricsFlusher) ConfiguredServers() []string
ConfiguredServers returns the names currently persisting metrics.
func (*MetricsFlusher) RemoveServer ¶
func (f *MetricsFlusher) RemoveServer(name string)
RemoveServer stops persisting metrics for a server and closes its writer. The previous-snapshot tracking is dropped so re-adding produces a fresh reset line as the first entry.
func (*MetricsFlusher) SeedFromFile ¶
func (f *MetricsFlusher) SeedFromFile(path string, n int) error
SeedFromFile reads up to the last n NDJSON entries from path and seeds four surfaces atomically: cumulative per-server token totals (via Restore), cumulative per-server cost totals (via RestoreCost), per-minute time-series ring buckets — both tokens and cost — (via ReplaySnapshot), and this flusher's previous-snapshot maps (prev + prevCost). The Token Usage Over Time and Cost Over Time charts are backed by the time-series ring; without the bucket replay each would show only a single post-restart point. The Cost KPI card is backed by the cumulative atomics; without RestoreCost it would silently read $0 even when pre-restart cost was non-zero.
On-disk format mirrors flushOnce's output: full MetricsSnapshotLine entries plus lighter reset sentinels ({reset, ts, server} only). Reset sentinels parse with a zero Total and are immediately followed by a full reset line whose Total carries the post-reset state — so taking the most recent Total per server yields the correct seed value in either case.
For time-series, only non-reset lines are replayed: a Reset line's Diff carries the carry-over from prior sessions (full snapshot), not a single minute's activity, so replaying it would create a synthetic spike at the reset boundary. The same skip applies to cost replay.
Legacy files predating cost persistence have no CostDiff / CostTotal fields; they unmarshal with nil pointers, the cost diff sums to zero, the cost replay no-ops, and RestoreCost is invoked with an empty map (which itself no-ops). Token state restores normally — the file remains fully readable with no warning.
Missing or empty files return nil (expected on first run with persistence enabled). Malformed lines are skipped without aborting; a single corrupt line should not lose the rest of the history.
func (*MetricsFlusher) SetLogger ¶
func (f *MetricsFlusher) SetLogger(logger *slog.Logger)
SetLogger configures where flush errors are logged. Pass a logger backed by the in-memory buffer so users see write failures in the UI.
func (*MetricsFlusher) Start ¶
func (f *MetricsFlusher) Start()
Start launches the flush goroutine. Safe to call once; subsequent calls are no-ops. The goroutine runs until Stop is called.
func (*MetricsFlusher) Stop ¶
func (f *MetricsFlusher) Stop()
Stop signals the flush goroutine to exit and waits for it to drain — one final flush is performed before exit so the on-disk file reflects the last in-memory state. Safe to call multiple times concurrently; the stop-channel close is sync.Once-guarded so racing Stop() calls don't panic with a "close of closed channel".
type MetricsSnapshotLine ¶
type MetricsSnapshotLine struct {
Time time.Time `json:"ts"`
Server string `json:"server"`
Reset bool `json:"reset,omitempty"`
CostReset bool `json:"cost_reset,omitempty"`
Diff metrics.TokenCounts `json:"diff"`
Total metrics.TokenCounts `json:"total"`
CostDiff *metrics.CostMicroUSDCounts `json:"cost_diff,omitempty"`
CostTotal *metrics.CostMicroUSDCounts `json:"cost_total,omitempty"`
}
MetricsSnapshotLine is the on-disk schema for one NDJSON entry in metrics.jsonl. Time, Server, and Diff are populated for every line; Reset is true on the first line written after a token counter reset (server restart, Accumulator.Clear) and the diff for that line is the *full* snapshot. CostReset signals an independent cost-side reset (e.g. Accumulator.ClearCost between flushes) — the two flags are independent so a cost-only clear does not invalidate the token diff on the same line. Cost fields are pointer + omitempty so token-only minutes emit lines byte-identical to the pre-cost-persistence schema.
type TracesFileClient ¶
type TracesFileClient struct {
// contains filtered or unexported fields
}
TracesFileClient implements otlptrace.Client by writing each batch as one OTLP-JSON line per configured server. The OTel SDK calls UploadTraces with a slice of *tracepb.ResourceSpans already converted from ReadOnlySpan, so this avoids re-implementing the SDK's transform code.
Each emitted line is a fully valid TracesData envelope, matching the OTLP File Exporter spec — `tail -f traces.jsonl | otelcol --config ... otlpjsonfilereceiver` ingests cleanly.
Spans without a resolvable server.name attribute (rare; the gateway stamps it on every tool-call span) are dropped. This is documented as an accepted Phase-2 limitation: per-server file routing is what users want, and a "miscellaneous" file would muddy the inventory model.
func NewTracesFileClient ¶
func NewTracesFileClient() *TracesFileClient
NewTracesFileClient constructs an empty client. Add servers via AddServer before passing the client to otlptrace.NewUnstarted.
func (*TracesFileClient) AddServer ¶
func (c *TracesFileClient) AddServer(name, path string, opts LogOpts) error
AddServer registers a per-server traces.jsonl file. Idempotent: re-adding a server replaces the prior writer (the lumberjack handle is closed).
func (*TracesFileClient) ConfiguredServers ¶
func (c *TracesFileClient) ConfiguredServers() []string
ConfiguredServers returns the names currently persisting traces.
func (*TracesFileClient) RemoveServer ¶
func (c *TracesFileClient) RemoveServer(name string)
RemoveServer stops persisting traces for a server and closes its writer.
func (*TracesFileClient) SetLogger ¶
func (c *TracesFileClient) SetLogger(logger *slog.Logger)
SetLogger configures where the client logs persistence errors.
func (*TracesFileClient) Start ¶
func (c *TracesFileClient) Start(_ context.Context) error
Start implements otlptrace.Client. No connection setup required for files.
func (*TracesFileClient) Stop ¶
func (c *TracesFileClient) Stop(_ context.Context) error
Stop implements otlptrace.Client. Closes every per-server writer. Safe to call multiple times.
func (*TracesFileClient) UploadTraces ¶
func (c *TracesFileClient) UploadTraces(_ context.Context, resourceSpans []*tracepb.ResourceSpans) error
UploadTraces implements otlptrace.Client. Re-routes spans by server.name attribute and writes one TracesData envelope per server per call. The per-server writer map is snapshotted under the lock; disk I/O happens outside the lock so a slow writer can't block AddServer/RemoveServer/Stop. Errors from individual writers are logged and swallowed — the SDK's batch processor must not stall because a disk write failed.