ingest

package
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package ingest builds the in-memory projections that the serve API reads from. The sessions projection mirrors ~/.config/ctm/sessions.json (decoded leniently — serve is a consumer; the CLI owns strictness via internal/jsonstrict) and exposes thread-safe accessors plus a tmux liveness probe with a short TTL cache.

Polling note: this initial implementation re-reads sessions.json on a 1 s ticker whenever the file's mtime changes. Step 5 of the ctm-serve plan will swap the polling loop for an fsnotify watcher. The public API of Projection (New / Run / All / Get / TmuxAlive) will not change when that swap happens.

Package ingest — subagent event shapes (V15) + team aggregation primitives (V16).

The Claude Code JSONL transcripts do not carry dedicated `subagent_start` / `subagent_stop` / `team_spawn` rows. Instead, each tool_call row optionally carries top-level `agent_id` + `agent_type` fields (appended by the PostToolUse hook when the tool call was dispatched via the Agent/Task tool). V15 treats the first occurrence of a given (session, agent_id) pair as the subagent's start and the last occurrence as its stop.

To keep the ingest path additive — existing tool_call parsing must continue to work untouched per the V15/V16 brief — subagent metadata is surfaced three ways:

  1. Tailer emits a sibling `subagent_start` hub event the first time a given agent_id is seen on a session. The payload carries enough context for the UI to wake up and refetch /api/sessions/{name}/subagents (it does NOT try to be a self-contained tree — computing the full tree from a stream of live events would duplicate the replay logic and drift).
  2. The /subagents REST handler (api.Subagents) replays the session JSONL and groups rows by agent_id to produce the forest.
  3. The /teams REST handler (api.Teams) groups subagents that start within a bounded time window (see teamWindow below) into a single "team" — the closest primitive the real-world JSONL supports to the V16 brief's "agent_team" concept.

Future work: if Claude Code starts emitting explicit `subagent_start` / `subagent_stop` rows (different `hook_event_name` or a dedicated field), update parseSubagentMeta to surface the lifecycle directly rather than infer it from first/last tool_call.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type GlobalSnapshot

type GlobalSnapshot struct {
	WeeklyPct       int
	FiveHourPct     int
	WeeklyResetsAt  time.Time
	FiveHourResetAt time.Time
	Known           bool
}

GlobalSnapshot is the point-in-time rate-limit view exposed to the REST /api/quota handler. Zero values indicate "unknown" (no dump has populated that field yet); callers should gate on Known before trusting the percentages.

type Projection

type Projection struct {
	// contains filtered or unexported fields
}

Projection is an RWMutex-guarded in-memory snapshot of sessions.json plus a tiny TTL cache for tmux liveness probes.

func New

func New(path string, tmux TmuxClient) *Projection

New constructs a Projection bound to path and the given tmux client. Run must be called to populate the snapshot and keep it fresh.

func (*Projection) All

func (p *Projection) All() []session.Session

All returns a defensive copy of the current snapshot. Callers may mutate the returned slice without affecting the projection.

func (*Projection) Get

func (p *Projection) Get(name string) (session.Session, bool)

Get returns the session with the given name, if known.

func (*Projection) Reload

func (p *Projection) Reload()

Reload synchronously re-reads sessions.json. Call once at startup before iterating All() so the snapshot is populated before any caller (e.g. serve's tailer-spawn loop) reads from it; otherwise they race with Run's first refresh and see an empty list.

func (*Projection) Run

func (p *Projection) Run(ctx context.Context) error

Run loads the initial snapshot and then polls path's mtime every pollInterval, re-reading on change. Returns nil when ctx is cancelled.

func (*Projection) TmuxAlive

func (p *Projection) TmuxAlive(name string) bool

TmuxAlive reports whether tmux currently has a session matching name. Results are cached for tmuxAliveTTL to avoid fork/exec on every API request.

type QuotaIngester

type QuotaIngester struct {
	// contains filtered or unexported fields
}

QuotaIngester watches the directory where `cmd statusline` drops per-session JSON snapshots (path templated by `{uuid}` in CTM_STATUSLINE_DUMP) and republishes the aggregated quota state on the hub.

Two flavours of `quota_update` event are emitted:

  • Global (`Session == ""`) carrying weekly + 5-hour rate limit percentages. Re-emitted whenever any dump file changes — these fields are not session-scoped, but they're easiest to refresh from the same payload.
  • Per-session (`Session == <name>`) carrying the per-session `context_pct` derived from the `context_window.used_percentage` field. UUID → session name resolution goes through the Projection.

QuotaIngester also implements the surface a `SessionEnricher` adapter in server.go needs to populate the `context_pct` field on the /api/sessions view.

func NewQuotaIngester

func NewQuotaIngester(dir string, proj *Projection, hub *events.Hub) *QuotaIngester

NewQuotaIngester constructs an ingester rooted at dir. proj is used to resolve UUID → session name; pass nil and per-session events will not be published (global rate limits still are).

func (*QuotaIngester) ContextPct

func (q *QuotaIngester) ContextPct(name string) (int, bool)

ContextPct returns the latest context_window.used_percentage seen for the named session, rounded to a whole percent.

func (*QuotaIngester) FiveHourPct

func (q *QuotaIngester) FiveHourPct() (float64, bool)

FiveHourPct returns the latest 5-hour rate limit percentage, if known.

func (*QuotaIngester) PerSessionSnapshot

func (q *QuotaIngester) PerSessionSnapshot(name string) (SessionTokenSnapshot, bool)

PerSessionSnapshot returns the live token view for a session, or (_, false) if no statusline dump has been ingested for it yet. Called by the api.SessionEnricher adapter so REST /api/sessions renders token counts on first paint.

func (*QuotaIngester) Run

func (q *QuotaIngester) Run(ctx context.Context) error

Run blocks until ctx is cancelled. Re-scans every file in dir on startup so a freshly-spawned serve picks up state Claude wrote minutes earlier; then watches for file events.

func (*QuotaIngester) Snapshot

func (q *QuotaIngester) Snapshot() GlobalSnapshot

Snapshot returns the current global rate-limit state under a single read lock so the REST response is consistent (vs. calling WeeklyPct/FiveHourPct separately and risking a torn read between a concurrent ingest).

func (*QuotaIngester) WeeklyPct

func (q *QuotaIngester) WeeklyPct() (float64, bool)

WeeklyPct returns the latest 7-day rate limit percentage, if known.

type SessionTokenSnapshot

type SessionTokenSnapshot struct {
	ContextPct   int
	InputTokens  int
	OutputTokens int
	CacheTokens  int
}

SessionTokenSnapshot is the live per-session token view exposed to the REST /api/sessions handler. Zero values indicate "unknown" — callers gate on the bool from PerSessionSnapshot.

type SubagentMeta

type SubagentMeta struct {
	AgentID   string    `json:"agent_id"`
	AgentType string    `json:"agent_type"`
	Tool      string    `json:"tool"`
	Input     string    `json:"input,omitempty"`
	IsError   bool      `json:"is_error"`
	TS        time.Time `json:"ts"`
}

SubagentMeta is the minimal envelope a single JSONL tool_call row contributes to the subagent forest. All fields are best-effort; ok=false from parseSubagentMeta signals "this row does not belong to any subagent".

type SubagentStartPayload

type SubagentStartPayload struct {
	Session   string    `json:"session"`
	AgentID   string    `json:"agent_id"`
	AgentType string    `json:"agent_type"`
	TS        time.Time `json:"ts"`
}

SubagentStartPayload is the hub-event payload emitted once per session+agent_id when the tailer first observes a new subagent. The UI treats it purely as a wake-up signal — the full tree is fetched via the REST endpoint.

type Tailer

type Tailer struct {
	SessionName string
	SessionUUID string
	LogPath     string
	Hub         *events.Hub
}

Tailer watches the JSONL log file for a single Claude session and publishes `tool_call` events to the hub for each appended line.

Per the design spec (§4 Tailers), log files are keyed on Claude's session UUID, not the human session name. The human name is stamped into outgoing events so the UI can route by a stable, user-friendly key while the file on disk follows Claude's identifier.

func NewTailer

func NewTailer(sessionName, sessionUUID, logDir string, hub *events.Hub) *Tailer

NewTailer constructs a tailer for the given session. The log file is assumed to live at `<logDir>/<sessionUUID>.jsonl`.

func (*Tailer) Run

func (t *Tailer) Run(ctx context.Context) error

Run blocks until ctx is cancelled or a fatal fsnotify error occurs. On startup it scans the file to EOF (if it already exists), then waits for WRITE / CREATE / RENAME / REMOVE events on the parent directory and reacts per spec §7 "Error handling per layer":

  • WRITE: re-scan from last offset (not just "tail new bytes") — fsnotify can coalesce writes, so always catch up to EOF.
  • RENAME / REMOVE: close fd; wait for CREATE to reopen.
  • CREATE (after rotation or first appearance): reopen at offset 0.

Parse errors on individual lines are logged and skipped — never fatal.

type TailerManager

type TailerManager struct {
	// contains filtered or unexported fields
}

TailerManager owns the lifecycle of one Tailer goroutine per active session. Hooks (`session_new` → Start, `session_killed` → Stop) and the startup reconciliation sweep both call into this manager.

func NewTailerManager

func NewTailerManager(logDir string, hub *events.Hub) *TailerManager

NewTailerManager constructs a manager rooted at logDir. logDir is typically `~/.config/ctm/logs/`; tests pass a t.TempDir().

func (*TailerManager) Active

func (m *TailerManager) Active() []string

Active reports the names of currently-running tailers (test helper / debug aid).

func (*TailerManager) Start

func (m *TailerManager) Start(ctx context.Context, name, uuid string)

Start spawns a tailer for (name, uuid) if one isn't already running for that name. Re-calling with the same name and the same uuid is a no-op; calling with a different uuid implicitly stops the prior tailer first (rare — happens on uuid drift after recreation).

func (*TailerManager) Stop

func (m *TailerManager) Stop(name string)

Stop terminates the tailer for the named session, if any. Blocks until the goroutine exits (ensures no late publishes after shutdown).

func (*TailerManager) StopAll

func (m *TailerManager) StopAll()

StopAll terminates every running tailer. Used during graceful shutdown of the serve daemon.

type TmuxClient

type TmuxClient interface {
	HasSession(name string) bool
}

TmuxClient is the narrow surface of *tmux.Client that the projection depends on. Defined here so tests can supply a fake without spinning up tmux.

type ToolCallPayload

type ToolCallPayload struct {
	Session string    `json:"session"`
	Tool    string    `json:"tool"`
	Input   string    `json:"input,omitempty"`
	Summary string    `json:"summary,omitempty"`
	IsError bool      `json:"is_error"`
	TS      time.Time `json:"ts"`
}

ToolCallPayload is the JSON envelope published on the hub for each tool invocation, matching §6 of the design spec exactly.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL