Documentation
¶
Overview ¶
Package ingest builds the in-memory projections that the serve API reads from. The sessions projection mirrors ~/.config/ctm/sessions.json (decoded leniently — serve is a consumer; the CLI owns strictness via internal/jsonstrict) and exposes thread-safe accessors plus a tmux liveness probe with a short TTL cache.
Polling note: this initial implementation re-reads sessions.json on a 1 s ticker whenever the file's mtime changes. Step 5 of the ctm-serve plan will swap the polling loop for an fsnotify watcher. The public API of Projection (New / Run / All / Get / TmuxAlive) will not change when that swap happens.
Package ingest — subagent event shapes (V15) + team aggregation primitives (V16).
The Claude Code JSONL transcripts do not carry dedicated `subagent_start` / `subagent_stop` / `team_spawn` rows. Instead, each tool_call row optionally carries top-level `agent_id` + `agent_type` fields (appended by the PostToolUse hook when the tool call was dispatched via the Agent/Task tool). V15 treats the first occurrence of a given (session, agent_id) pair as the subagent's start and the last occurrence as its stop.
To keep the ingest path additive — existing tool_call parsing must continue to work untouched per the V15/V16 brief — subagent metadata is surfaced three ways:
- Tailer emits a sibling `subagent_start` hub event the first time a given agent_id is seen on a session. The payload carries enough context for the UI to wake up and refetch /api/sessions/{name}/subagents (it does NOT try to be a self-contained tree — computing the full tree from a stream of live events would duplicate the replay logic and drift).
- The /subagents REST handler (api.Subagents) replays the session JSONL and groups rows by agent_id to produce the forest.
- The /teams REST handler (api.Teams) groups subagents that start within a bounded time window (see teamWindow below) into a single "team" — the closest primitive the real-world JSONL supports to the V16 brief's "agent_team" concept.
Future work: if Claude Code starts emitting explicit `subagent_start` / `subagent_stop` rows (different `hook_event_name` or a dedicated field), update parseSubagentMeta to surface the lifecycle directly rather than infer it from first/last tool_call.
Index ¶
- type GlobalSnapshot
- type Projection
- type QuotaIngester
- func (q *QuotaIngester) ContextPct(name string) (int, bool)
- func (q *QuotaIngester) FiveHourPct() (float64, bool)
- func (q *QuotaIngester) PerSessionSnapshot(name string) (SessionTokenSnapshot, bool)
- func (q *QuotaIngester) Run(ctx context.Context) error
- func (q *QuotaIngester) Snapshot() GlobalSnapshot
- func (q *QuotaIngester) WeeklyPct() (float64, bool)
- type SessionTokenSnapshot
- type SubagentMeta
- type SubagentStartPayload
- type Tailer
- type TailerManager
- type TmuxClient
- type ToolCallPayload
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type GlobalSnapshot ¶
type GlobalSnapshot struct {
WeeklyPct int
FiveHourPct int
WeeklyResetsAt time.Time
FiveHourResetAt time.Time
Known bool
}
GlobalSnapshot is the point-in-time rate-limit view exposed to the REST /api/quota handler. Zero values indicate "unknown" (no dump has populated that field yet); callers should gate on Known before trusting the percentages.
type Projection ¶
type Projection struct {
// contains filtered or unexported fields
}
Projection is an RWMutex-guarded in-memory snapshot of sessions.json plus a tiny TTL cache for tmux liveness probes.
func New ¶
func New(path string, tmux TmuxClient) *Projection
New constructs a Projection bound to path and the given tmux client. Run must be called to populate the snapshot and keep it fresh.
func (*Projection) All ¶
func (p *Projection) All() []session.Session
All returns a defensive copy of the current snapshot. Callers may mutate the returned slice without affecting the projection.
func (*Projection) Get ¶
func (p *Projection) Get(name string) (session.Session, bool)
Get returns the session with the given name, if known.
func (*Projection) Reload ¶
func (p *Projection) Reload()
Reload synchronously re-reads sessions.json. Call once at startup before iterating All() so the snapshot is populated before any caller (e.g. serve's tailer-spawn loop) reads from it; otherwise they race with Run's first refresh and see an empty list.
func (*Projection) Run ¶
func (p *Projection) Run(ctx context.Context) error
Run loads the initial snapshot and then polls path's mtime every pollInterval, re-reading on change. Returns nil when ctx is cancelled.
func (*Projection) TmuxAlive ¶
func (p *Projection) TmuxAlive(name string) bool
TmuxAlive reports whether tmux currently has a session matching name. Results are cached for tmuxAliveTTL to avoid fork/exec on every API request.
type QuotaIngester ¶
type QuotaIngester struct {
// contains filtered or unexported fields
}
QuotaIngester watches the directory where `cmd statusline` drops per-session JSON snapshots (path templated by `{uuid}` in CTM_STATUSLINE_DUMP) and republishes the aggregated quota state on the hub.
Two flavours of `quota_update` event are emitted:
- Global (`Session == ""`) carrying weekly + 5-hour rate limit percentages. Re-emitted whenever any dump file changes — these fields are not session-scoped, but they're easiest to refresh from the same payload.
- Per-session (`Session == <name>`) carrying the per-session `context_pct` derived from the `context_window.used_percentage` field. UUID → session name resolution goes through the Projection.
QuotaIngester also implements the surface a `SessionEnricher` adapter in server.go needs to populate the `context_pct` field on the /api/sessions view.
func NewQuotaIngester ¶
func NewQuotaIngester(dir string, proj *Projection, hub *events.Hub) *QuotaIngester
NewQuotaIngester constructs an ingester rooted at dir. proj is used to resolve UUID → session name; pass nil and per-session events will not be published (global rate limits still are).
func (*QuotaIngester) ContextPct ¶
func (q *QuotaIngester) ContextPct(name string) (int, bool)
ContextPct returns the latest context_window.used_percentage seen for the named session, rounded to a whole percent.
func (*QuotaIngester) FiveHourPct ¶
func (q *QuotaIngester) FiveHourPct() (float64, bool)
FiveHourPct returns the latest 5-hour rate limit percentage, if known.
func (*QuotaIngester) PerSessionSnapshot ¶
func (q *QuotaIngester) PerSessionSnapshot(name string) (SessionTokenSnapshot, bool)
PerSessionSnapshot returns the live token view for a session, or (_, false) if no statusline dump has been ingested for it yet. Called by the api.SessionEnricher adapter so REST /api/sessions renders token counts on first paint.
func (*QuotaIngester) Run ¶
func (q *QuotaIngester) Run(ctx context.Context) error
Run blocks until ctx is cancelled. Re-scans every file in dir on startup so a freshly-spawned serve picks up state Claude wrote minutes earlier; then watches for file events.
func (*QuotaIngester) Snapshot ¶
func (q *QuotaIngester) Snapshot() GlobalSnapshot
Snapshot returns the current global rate-limit state under a single read lock so the REST response is consistent (vs. calling WeeklyPct/FiveHourPct separately and risking a torn read between a concurrent ingest).
func (*QuotaIngester) WeeklyPct ¶
func (q *QuotaIngester) WeeklyPct() (float64, bool)
WeeklyPct returns the latest 7-day rate limit percentage, if known.
type SessionTokenSnapshot ¶
type SessionTokenSnapshot struct {
ContextPct int
InputTokens int
OutputTokens int
CacheTokens int
}
SessionTokenSnapshot is the live per-session token view exposed to the REST /api/sessions handler. Zero values indicate "unknown" — callers gate on the bool from PerSessionSnapshot.
type SubagentMeta ¶
type SubagentMeta struct {
AgentID string `json:"agent_id"`
AgentType string `json:"agent_type"`
Tool string `json:"tool"`
Input string `json:"input,omitempty"`
IsError bool `json:"is_error"`
TS time.Time `json:"ts"`
}
SubagentMeta is the minimal envelope a single JSONL tool_call row contributes to the subagent forest. All fields are best-effort; ok=false from parseSubagentMeta signals "this row does not belong to any subagent".
type SubagentStartPayload ¶
type SubagentStartPayload struct {
Session string `json:"session"`
AgentID string `json:"agent_id"`
AgentType string `json:"agent_type"`
TS time.Time `json:"ts"`
}
SubagentStartPayload is the hub-event payload emitted once per session+agent_id when the tailer first observes a new subagent. The UI treats it purely as a wake-up signal — the full tree is fetched via the REST endpoint.
type Tailer ¶
Tailer watches the JSONL log file for a single Claude session and publishes `tool_call` events to the hub for each appended line.
Per the design spec (§4 Tailers), log files are keyed on Claude's session UUID, not the human session name. The human name is stamped into outgoing events so the UI can route by a stable, user-friendly key while the file on disk follows Claude's identifier.
func NewTailer ¶
NewTailer constructs a tailer for the given session. The log file is assumed to live at `<logDir>/<sessionUUID>.jsonl`.
func (*Tailer) Run ¶
Run blocks until ctx is cancelled or a fatal fsnotify error occurs. On startup it scans the file to EOF (if it already exists), then waits for WRITE / CREATE / RENAME / REMOVE events on the parent directory and reacts per spec §7 "Error handling per layer":
- WRITE: re-scan from last offset (not just "tail new bytes") — fsnotify can coalesce writes, so always catch up to EOF.
- RENAME / REMOVE: close fd; wait for CREATE to reopen.
- CREATE (after rotation or first appearance): reopen at offset 0.
Parse errors on individual lines are logged and skipped — never fatal.
type TailerManager ¶
type TailerManager struct {
// contains filtered or unexported fields
}
TailerManager owns the lifecycle of one Tailer goroutine per active session. Hooks (`session_new` → Start, `session_killed` → Stop) and the startup reconciliation sweep both call into this manager.
func NewTailerManager ¶
func NewTailerManager(logDir string, hub *events.Hub) *TailerManager
NewTailerManager constructs a manager rooted at logDir. logDir is typically `~/.config/ctm/logs/`; tests pass a t.TempDir().
func (*TailerManager) Active ¶
func (m *TailerManager) Active() []string
Active reports the names of currently-running tailers (test helper / debug aid).
func (*TailerManager) Start ¶
func (m *TailerManager) Start(ctx context.Context, name, uuid string)
Start spawns a tailer for (name, uuid) if one isn't already running for that name. Re-calling with the same name and the same uuid is a no-op; calling with a different uuid implicitly stops the prior tailer first (rare — happens on uuid drift after recreation).
func (*TailerManager) Stop ¶
func (m *TailerManager) Stop(name string)
Stop terminates the tailer for the named session, if any. Blocks until the goroutine exits (ensures no late publishes after shutdown).
func (*TailerManager) StopAll ¶
func (m *TailerManager) StopAll()
StopAll terminates every running tailer. Used during graceful shutdown of the serve daemon.
type TmuxClient ¶
TmuxClient is the narrow surface of *tmux.Client that the projection depends on. Defined here so tests can supply a fake without spinning up tmux.
type ToolCallPayload ¶
type ToolCallPayload struct {
Session string `json:"session"`
Tool string `json:"tool"`
Input string `json:"input,omitempty"`
Summary string `json:"summary,omitempty"`
IsError bool `json:"is_error"`
TS time.Time `json:"ts"`
}
ToolCallPayload is the JSON envelope published on the hub for each tool invocation, matching §6 of the design spec exactly.