sync

package
v0.17.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2026 License: MIT Imports: 19 Imported by: 0

README

pkg/sync

Sync engine that orchestrates incremental transcript uploads to the backend. Handles file tracking, chunking, agent discovery, and chunk upload. Provider-specific behavior (metadata extraction, descendant discovery, root metadata attachment) lives entirely in pkg/provider; the engine dispatches through the provider.Provider interface (see CF-397).

The engine is fully file-based and provider-agnostic about its source: it reads whatever local file the daemon points transcriptPath at. For Claude/Codex that is the tool's own JSONL; for OpenCode the daemon materializes the session from OpenCode's local SQLite DB into ~/.confab/opencode/<id>/messages.jsonl and points transcriptPath there, so the materialized file is tracked, redacted (ReadChunk), chunked, and uploaded exactly like any transcript — no OpenCode-specific code in this package. Because TrackedFile separates local Path from backend Name, the backend file_name is just the base (messages.jsonl).

Files

File Role
engine.go Engine — orchestrates init, sync loop, agent discovery (BFS); dispatches provider behavior via InitTranscript/DiscoverDescendants/DiscoverWorkflowFiles/AnnotateChunk. Owns capability gating (resolveCaps, workflowFileTypeAllowed, OpencodeChildFilesAllowed). Exposes Tracker() and SetDescendantRegistrar() (CF-538) so the daemon can wrap the tracker for OpenCode child-collector spawn. Includes the chunkView adapter that satisfies provider.ChunkView
client.go Client — HTTP API methods for init, chunk upload, events, summary updates, GitHub linking, and the Capabilities() probe (GET /api/v1/capabilities). Defines the Capabilities struct (workflow_files, workflow_journal, opencode_subagent_files) and the ChunkMetadata wire struct (git_info, summary, first_user_message, codex_rollout, plus Cursor's latest_message_at (*time.Time, RFC3339) and model (spm9)); aliases provider.CodexRolloutMetadata as sync.CodexRolloutMetadata
tracker.go FileTracker — tracks file state, reads chunks with byte-offset seeking, discovers agent files (Claude transitive discovery). Implements provider.TranscriptRegistrar (via *TrackedFile.SetCodexRollout), provider.DescendantRegistrar (via *FileTracker.RegisterCodexRollout), provider.WorkflowRegistrar (via SubagentsDir + RegisterSidechainFile), and provider.RootTranscriptProvider (via RootTranscriptPath). RegisterSidechainFile (renamed from CF-533's RegisterWorkflowFile to generalize across CF-533 workflow files + CF-538 OpenCode children) registers a path-encoded backend file_name with a local disk Path; idempotent overwrite preserves sync position. RootTranscriptPath exposes the root transcript path so providers whose subagent layout differs from Claude's (Cursor — kata 2brd) derive their subagents dir from it rather than from SubagentsDir
summary_link.go Links child session summaries to parent sessions via leafUuid

Three Components

Engine (orchestrator)

Engine.Init() registers the session with the backend, receiving the current sync state (last synced line per file), then calls provider.InitTranscript(transcript, ...) so the provider can attach root-level metadata (Codex attaches codex_rollout; Claude is a no-op). Engine.SyncAll() performs a BFS traversal: it first calls provider.DiscoverDescendants(tracker, externalID) once per cycle (Codex walks the SQLite subtree; Claude is a no-op) and provider.DiscoverWorkflowFiles(tracker, allow) (Claude scans subagents/workflows/; Codex is a no-op), then for each tracked file checks for changes, reads a chunk, dispatches provider.AnnotateChunk(chunkView, sentFirst, redact), uploads, and discovers new agent files via tracker.DiscoverNewFiles (Claude's transitive content-driven discovery). Codex descendants are registered as file_type=agent sidechain files under the root's backend session.

Workflow subagent files + capability gating (CF-533)

Claude's Workflow tool spawns subagents whose transcripts live at <session>/subagents/workflows/<runId>/agent-<id>.jsonl, plus a per-run journal.jsonl. These carry no agentId in the main transcript, so the Claude provider discovers them by scanning that directory (not via ExtractAgentIDsFromMessage) in provider.DiscoverWorkflowFiles, and registers them through FileTracker.RegisterSidechainFile with path-encoded backend file_names (forward slashes, written verbatim — the backend resolves <runId> from the path and <id> via path.Base):

  • subagents/workflows/<runId>/agent-<id>.jsonlfile_type=agent
  • subagents/workflows/<runId>/journal.jsonlfile_type=workflow_journal

agent-<id>.meta.json, wf_<runId>.json, and the script file are not uploaded. Once registered, workflow files flow through the ordinary ReadChunkUploadChunk path: incremental byte-offset/line tracking, idempotent re-sync, and redaction applies to every line (including the journal's free-text result).

Capability gate. Because the CLI may be newer than a self-hosted backend, uploads are gated on the backend advertising support. The engine probes GET /api/v1/capabilities (body is {"workflow_files":bool,"workflow_journal":bool}, no wrapper) lazily — only when the provider actually finds a workflow run dir, so non-workflow sessions never probe. resolveCaps caches only definitive answers: a 404 (old backend → both false) or a clean 200 (parsed flags). A transient failure (network / timeout / 5xx / malformed body) is not cached, so the next cycle re-probes. workflowFileTypeAllowed gates per flag: agent files require workflow_files, the journal requires workflow_journal — a backend reporting only one uploads only that kind. Once the backend has definitively reported support for neither, the engine short-circuits the provider call so the directory is no longer scanned.

journal.jsonl line schema (the CF-534/535 read contract)

The CLI uploads journal.jsonl verbatim and never parses it; the schema below is what is observed on disk and is owned by the Claude Workflow harness, so it may evolve — consumers (CF-534 Workflows card, CF-535 Agents-card rows) must tolerate unknown and missing fields. Each line is one JSON object:

// agent lifecycle markers, keyed by a content-addressed dedup key:
{"type": "started", "key": "v2:<hash>", "agentId": "<id>"}
{"type": "result",  "key": "v2:<hash>", "agentId": "<id>", "result": <string|object>}

Fields: type ("started" | "result"), key (dedup key), agentId (matches the run dir's agent-<id>.jsonl), and result (present only on result lines; an arbitrary JSON string or object — the subagent's return value). Observed line types are started/result; treat the set as open.

Client (API)

Thin wrapper around pkg/http.Client that marshals/unmarshals request types for the sync API endpoints: /api/v1/sync/init, /api/v1/sync/chunk, /api/v1/sync/event, and session-specific endpoints for summaries and GitHub links.

FileTracker (file I/O + state)

Manages the mapping between files on disk and their sync state. ReadChunk() seeks to the last known byte offset, reads new lines up to the chunk size limit, applies redaction, and extracts agent IDs. DiscoverNewFiles() finds new agent files both from collected agent IDs and by scanning the subagents directory.

Per-chunk git_info extraction (CF-493) is provider-agnostic with two paths in ReadChunk, each guarded by the gitInfo == nil first-wins check:

  • gitInfoFromClaudeMessage — Claude transcript messages carry inline gitBranch + cwd; populates Branch, RepoURL, Remotes, TrackingRemote.
  • gitInfoFromCodexSessionMeta — Codex rollouts (both root transcripts and descendant agent files) begin with a session_meta line whose payload carries cwd; runs git.DetectBranch(cwd) and populates all four CF-494-resolver-required fields.

Data Flow

SyncAll() loop:
  HasFileChanged? ──no──> skip
       │yes
       ▼
  ReadChunk(maxBytes)
    ├── seek to ByteOffset
    ├── read lines until maxBytes
    ├── extract agent IDs (pre-redaction)
    ├── extract metadata (pre-redaction)
    ├── apply redaction
    └── return Chunk
       │
       ▼
  UploadChunk (redacted lines + redacted metadata)
       │
       ▼
  UpdateAfterSync (update byte offset + line count)
       │
       ▼
  DiscoverNewFiles (from collected agent IDs + directory scan)
       │
       ▼
  Add new files to queue → repeat

How to Extend

Adding a new API endpoint: Add request/response types in client.go, add a method on Client, call it from the engine or command layer.

Adding new metadata extraction: Modify the appropriate provider's AnnotateChunk in pkg/provider/{claude,codex,opencode}.go. Metadata is extracted from raw lines before redaction, then the extracted values are redacted via the closure passed to AnnotateChunk before being attached to the chunk via the ChunkView setters.

Tracking a new file type: Add discovery logic in DiscoverNewFiles() (for content-driven discovery), the provider's DiscoverDescendants (for external-state discovery), or DiscoverWorkflowFiles (for directory-scanned, capability-gated workflow files). Set the file type in TrackedFile.Type. The rest of the pipeline (read, chunk, upload) is file-type agnostic.

Gating behavior on backend support: Add a field to the Capabilities struct (client.go), have the backend advertise it via GET /api/v1/capabilities, and gate in the engine. Default any absent field to false (older backends omit the endpoint → 404); cache only definitive answers and re-probe on transient failures.

Adding a new provider: Implement provider.Provider (including the sync-loop methods InitTranscript, DiscoverDescendants, DiscoverWorkflowFiles, AnnotateChunk) and register it in pkg/provider/provider.go's registry. Zero changes required in pkg/sync/engine.go — the engine dispatches everything through the interface.

Driving per-discovery side effects from the daemon (OpenCode pattern, CF-538): When a provider's DiscoverDescendants needs to spawn goroutines or check capabilities (things the engine and FileTracker know nothing about), the daemon wraps the engine's FileTracker in a provider-specific registrar (implementing an extension of provider.DescendantRegistrar) and injects it via Engine.SetDescendantRegistrar. The provider type-asserts to the extension inside DiscoverDescendants; a missed assertion logs Warn and degrades gracefully. The engine remains provider-agnostic — the extension lives entirely in the daemon and provider packages.

Invariants

  • Chunks must not exceed 14MB (DefaultMaxChunkBytes). The backend rejects larger payloads. The limit is 14MB not 16MB to leave headroom for JSON encoding overhead.
  • Init() must be called before SyncAll(). The engine needs a backend session ID and initial sync state.
  • After upload failure, state must be refreshed from backend (refreshStateFromBackend). This handles the case where the server received and stored data but the client timed out before receiving the response. Without refresh, the client would re-upload duplicate lines. applyBackendFiles is the shared path for initial and refreshed backend file state.
  • Agent discovery uses BFS with cycle detection. The knownAgentIDs set prevents infinite loops when agents reference each other. Max 10 BFS iterations as a safety bound.
  • Redaction must happen in ReadChunk() before lines leave the tracker. Never upload unredacted content. The same call site covers Claude transcripts, Claude agent files, and Codex rollouts; redactor.RedactJSONLine is JSON-shape-agnostic, so no per-provider branching is needed.
  • Metadata is extracted before redaction, then redacted. Summaries and first user messages need the original text for meaningful extraction, but must be redacted before upload.
  • Byte offsets must be maintained accurately. ReadChunk returns NewOffset which is the byte position after the last line read. UpdateAfterSync stores this for the next read. Incorrect offsets cause duplicate or missing lines.
  • Directory scan in DiscoverNewFiles catches agents from already-synced lines. After a daemon restart, agent IDs from previously-synced lines are lost from memory. The directory scan recovers them.
  • codex_rollout metadata rides on first chunks only. provider.Codex.AnnotateChunk attaches ChunkMetadata.CodexRollout whenever c.FirstLine() == 1 and the tracked file carries a CodexRollout. On retry after a failed upload, FirstLine remains 1 so the metadata is automatically resent — the backend upsert is idempotent. InitFromBackendState preserves TrackedFile.CodexRollout across refreshStateFromBackend so retries don't lose the payload.
  • Cursor session metadata (spm9). Cursor's transcript lines carry no per-line timestamp, so the backend opts Cursor out of timestamp extraction and feeds session.last_message_at solely from ChunkMetadata.LatestMessageAt, which provider.Cursor.AnnotateChunk sets from the transcript file mtime on transcript chunks. The session's model (Cursor's only model signal, sourced from the sessionStart hook) is session-constant, so it is plumbed via EngineConfig.ModelEngine.model and stamped onto transcript chunks engine-side (generic + omitempty: providers whose model is empty send nothing, so no provider branch lives in the engine). model is accepted on the wire but not yet persisted by the backend (forward-looking, pending a confab-web migration).
  • The engine has no provider-name branches. TestEngine_NoProviderNameLiterals in engine_dispatch_test.go scans engine.go for NameCodex / NameClaudeCode literals and fails CI if either appears. New provider-specific behavior must live in pkg/provider, not the engine.
  • Workflow uploads are capability-gated, and gating is per-flag. Never send workflow_journal files or path-encoded agent names to a backend that didn't advertise the matching flag — an older backend would silently mis-store them. The allow predicate is the single gate; the provider classifies, the engine decides.
  • Backend capability is assumed stable for a backend. CF-532 ships both flags permanently, so the engine caches a definitive answer for its lifetime and does not handle a mid-life downgrade (a daemon restart re-probes a fresh engine). Only transient probe failures re-probe within a session.
  • Workflow file_names are path-encoded with forward slashes and written verbatim. subagents/workflows/<runId>/... is load-bearing: the backend parses <runId> from the path and <id> via path.Base. Never flatten them. Redaction still applies to every workflow line, including the journal.

Design Decisions

BFS for agent discovery. Agents can spawn sub-agents transitively (A references B, B references C). BFS ensures all transitive agents are discovered and synced, not just direct children. The iteration cap (10) prevents runaway discovery.

Byte-offset seeking instead of re-reading. For large transcripts (megabytes), seeking to the last read position is far more efficient than re-reading from the start and skipping lines.

refreshStateFromBackend after upload failure. When a chunk upload times out, the server may have stored the data. Without refreshing, the next SyncAll() would re-upload the same lines. The refresh call gets the server's actual LastSyncedLine and updates the tracker accordingly. Auth errors during refresh are propagated (can't recover without re-auth).

Summary link injection. When a transcript contains a summary with a leafUuid, it means this session is a continuation of a previous one. linkSummaryToPreviousSession finds the parent transcript by scanning other JSONL files for the matching UUID, then calls the backend to update the parent's summary. This is best-effort — failures are logged but don't block sync.

Testing

go test ./pkg/sync/...
  • NewWithBackend() allows injecting a mock backend/client for unit tests
  • engine_test.go / tracker_test.go — unit tests for incremental sync, agent discovery, byte offsets, chunking
  • integration_test.go — full engine lifecycle with mock HTTP backend: init, multi-cycle sync, agent discovery, error recovery, large files, chunk size limits

Dependencies

Uses: pkg/config, pkg/git, pkg/http, pkg/logger, pkg/provider, pkg/redactor, pkg/types, pkg/utils

Used by: pkg/daemon/ (sync loop), cmd/ (save command, post-tool-use linking)

Documentation

Index

Constants

View Source
const DefaultMaxChunkBytes = 14 * 1024 * 1024 // 14MB

DefaultMaxChunkBytes is the default maximum size of a chunk in bytes. This is a backend-imposed limit: the server rejects chunks larger than 16MB. We use 14MB to leave headroom for JSON encoding overhead and compression. If the backend limit changes, this constant must be updated accordingly.

Variables

This section is empty.

Functions

func FindSessionByLeafUUID

func FindSessionByLeafUUID(transcriptDir, leafUUID, excludeFile string) string

FindSessionByLeafUUID searches transcript files in the given directory for a message with the specified uuid in the last N lines. Returns the session ID (filename without extension) if found, empty string otherwise.

Excludes the file specified by excludeFile from the search.

Types

type Backend added in v0.16.0

type Backend interface {
	Init(providerName, externalID, transcriptPath string, metadata *InitMetadata) (*InitResponse, error)
	UploadChunk(sessionID, fileName, fileType string, firstLine int, lines []string, metadata *ChunkMetadata) (int, error)
	SendEvent(sessionID, eventType string, timestamp time.Time, payload json.RawMessage) error
	UpdateSessionSummary(externalID, summary string) error
	// Capabilities probes the backend's optional-feature signal (CF-533).
	// Returns an error (404 / network / parse) when the backend does not
	// advertise capabilities; the engine treats a 404 as a definitive
	// "unsupported" and other errors as transient.
	Capabilities() (Capabilities, error)
}

Backend is the sync transport used by Engine. The HTTP client implements this for provider-aware backend sync.

type Capabilities added in v0.16.2

type Capabilities struct {
	// WorkflowFiles reports that the backend resolves path-encoded workflow
	// subagent file_names (subagents/workflows/<runId>/agent-<id>.jsonl).
	WorkflowFiles bool `json:"workflow_files"`
	// WorkflowJournal reports that the backend accepts the workflow_journal
	// file_type (subagents/workflows/<runId>/journal.jsonl).
	WorkflowJournal bool `json:"workflow_journal"`
	// OpencodeSubagentFiles reports that the backend resolves path-encoded
	// OpenCode subagent file_names (opencode/<child-id>/messages.jsonl) and
	// stitches them into the root session's analytics (CF-538/CF-539).
	OpencodeSubagentFiles bool `json:"opencode_subagent_files"`
}

Capabilities is the backend's optional-feature signal (CF-533). The response body of GET /api/v1/capabilities IS this map (no outer wrapper). Absent fields default to false (zero value), so a backend that omits a field — or the whole endpoint (404) — is treated as not supporting it.

type Chunk

type Chunk struct {
	FileName  string         // Base name of the file
	FileType  string         // "transcript" or "agent"
	FirstLine int            // 1-based line number of first line
	Lines     []string       // The lines (redacted if applicable)
	NewOffset int64          // Byte offset after reading these lines
	Metadata  *ChunkMetadata // Metadata to send to backend
	AgentIDs  []string       // Agent IDs discovered (local use only, not sent to backend)
}

Chunk represents a range of lines read from a file with extracted metadata

type ChunkMetadata

type ChunkMetadata struct {
	GitInfo          *git.GitInfo          `json:"git_info,omitempty"`
	Summary          string                `json:"summary,omitempty"`
	FirstUserMessage string                `json:"first_user_message,omitempty"`
	CodexRollout     *CodexRolloutMetadata `json:"codex_rollout,omitempty"`

	// LatestMessageAt carries an explicit session timestamp for providers
	// whose transcript lines have no per-line timestamp (Cursor). The backend
	// feeds session.last_message_at from this on transcript chunks, since it
	// opts Cursor out of per-line timestamp extraction. Serialized RFC3339
	// (time.Time default) to match the backend's *time.Time reader; omitted
	// when nil. Cursor sets it from the transcript file mtime.
	LatestMessageAt *time.Time `json:"latest_message_at,omitempty"`

	// Model names the LLM that produced this session (Cursor only; sourced
	// from the sessionStart hook payload). Cursor JSONL carries no model field,
	// so this is the only model signal. Set engine-side from daemon config on
	// transcript chunks; omitted when empty, so other providers send nothing.
	// NOTE: accepted on the wire but not yet persisted by the backend (pending
	// a confab-web migration), so it is currently forward-looking/inert.
	Model string `json:"model,omitempty"`
}

ChunkMetadata contains metadata sent to the backend with a chunk

type ChunkRequest

type ChunkRequest struct {
	SessionID string         `json:"session_id"`
	FileName  string         `json:"file_name"`
	FileType  string         `json:"file_type"`
	FirstLine int            `json:"first_line"`
	Lines     []string       `json:"lines"`
	Metadata  *ChunkMetadata `json:"metadata,omitempty"`
}

ChunkRequest is the request body for POST /api/v1/sync/chunk

type ChunkResponse

type ChunkResponse struct {
	LastSyncedLine int `json:"last_synced_line"`
}

ChunkResponse is the response for POST /api/v1/sync/chunk

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client handles communication with the sync API endpoints

func NewClient

func NewClient(cfg *config.UploadConfig) (*Client, error)

NewClient creates a new sync API client

func (*Client) Capabilities added in v0.16.2

func (c *Client) Capabilities() (Capabilities, error)

Capabilities probes GET /api/v1/capabilities (public, no auth). Any failure — 404 on an older backend, a network error, or a malformed body — is returned to the caller, which treats it as "no capabilities". The 404 sentinel (http.ErrSessionNotFound) is preserved via %w for errors.Is.

func (*Client) Init

func (c *Client) Init(providerName, externalID, transcriptPath string, metadata *InitMetadata) (*InitResponse, error)

Init initializes or resumes a sync session Returns the session ID and current sync state for all files. The providerName must be a canonical provider name (callers via Engine.Init pass e.provider.Name(), which is always non-empty).

func (*Client) LinkGitHub

func (c *Client) LinkGitHub(sessionID string, req *GitHubLinkRequest) (*GitHubLinkResponse, error)

LinkGitHub creates a GitHub link for a session

func (*Client) SendEvent

func (c *Client) SendEvent(sessionID, eventType string, timestamp time.Time, payload json.RawMessage) error

SendEvent sends a session lifecycle event to the backend

func (*Client) UpdateSessionSummary

func (c *Client) UpdateSessionSummary(externalID, summary string) error

UpdateSessionSummary updates the summary for a session identified by its external_id

func (*Client) UploadChunk

func (c *Client) UploadChunk(sessionID, fileName, fileType string, firstLine int, lines []string, metadata *ChunkMetadata) (int, error)

UploadChunk uploads a chunk of lines for a file with optional metadata Returns the new last synced line number

type CodexRolloutMetadata added in v0.16.0

type CodexRolloutMetadata = provider.CodexRolloutMetadata

CodexRolloutMetadata is the per-rollout metadata transmitted on the FIRST chunk of a Codex rollout. The canonical definition lives in pkg/provider so the Codex implementation can construct one without an import cycle; pkg/sync re-exports it here as an alias so existing call sites that reference sync.CodexRolloutMetadata keep working. Wire format unchanged.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is the core sync engine used by both daemon and manual save. It provides a unified interface for syncing session data to the backend.

func New

func New(uploadCfg *config.UploadConfig, engineCfg EngineConfig) (*Engine, error)

New creates a new sync engine with the given configuration. The engine is not connected to the backend until Init() is called.

func NewWithBackend added in v0.16.0

func NewWithBackend(backend Backend, r *redactor.Redactor, engineCfg EngineConfig) (*Engine, error)

NewWithBackend creates an engine with a preconfigured backend. Test-facing; returns an error if the provider name is invalid.

func (*Engine) GetSyncStats

func (e *Engine) GetSyncStats() map[string]int

GetSyncStats returns current sync statistics (lines synced per file)

func (*Engine) Init

func (e *Engine) Init() error

Init initializes the sync session with the backend. - Creates session if not exists, or resumes existing - Gets last_synced_line for all known files - Sends initial metadata (git info, hostname, username) Must be called before SyncAll.

func (*Engine) IsInitialized

func (e *Engine) IsInitialized() bool

IsInitialized returns true if Init() has been called successfully

func (*Engine) OpencodeChildFilesAllowed added in v0.17.0

func (e *Engine) OpencodeChildFilesAllowed() bool

OpencodeChildFilesAllowed reports whether OpenCode subagent sidechain files may be uploaded to this backend, per its cached capabilities (CF-538/CF-539). Lazy-probes once per SyncAll cycle; cached definitive answers persist for the engine's lifetime. Exported because the daemon's opencodeRegistrar gates RegisterOpencodeChild on it.

func (*Engine) Reset

func (e *Engine) Reset()

Reset clears the initialized state, allowing Init to be called again. This is useful when the backend returns an auth error and we need to re-authenticate and re-initialize.

func (*Engine) SendSessionEnd

func (e *Engine) SendSessionEnd(hookInput *types.ClaudeHookInput, timestamp time.Time) error

SendSessionEnd sends a session_end event to the backend

func (*Engine) SessionID

func (e *Engine) SessionID() string

SessionID returns the backend session ID (empty if not initialized)

func (*Engine) SetDescendantRegistrar added in v0.17.0

func (e *Engine) SetDescendantRegistrar(reg provider.DescendantRegistrar)

SetDescendantRegistrar overrides the default DescendantRegistrar (the engine's own *FileTracker) that Engine.SyncAll passes to provider.DiscoverDescendants. Used by the daemon to inject a registrar that wraps the FileTracker with provider-specific behavior (OpenCode child collector spawn). Must be called before SyncAll.

The honest-but-mutable setter is the cleanest break of the cyclic dependency between the engine and an OpenCode registrar: the registrar needs the engine (for the capability gate); the engine needs the registrar (for SyncAll). Construction order is engine → registrar → setter; either constructor injection or hidden closures would just reshuffle the same mutation.

func (*Engine) SyncAll

func (e *Engine) SyncAll() (int, error)

SyncAll syncs all tracked files to the backend using BFS traversal. It discovers agent files referenced in transcripts and syncs them transitively within a single call. Each file is processed at most once per call.

Algorithm:

  1. Start with all currently tracked files in the queue
  2. Process each file in queue (sync if changed, extract agent IDs)
  3. Discover new files from collected agent IDs
  4. Add only NEW files to the queue for next iteration
  5. Repeat until queue is empty (or max iterations reached)

Returns number of chunks uploaded and the first error encountered (if any). Continues syncing other files even if one file fails.

func (*Engine) Tracker added in v0.17.0

func (e *Engine) Tracker() *FileTracker

Tracker returns the engine's internal FileTracker. Exposed so the daemon can wrap it in a provider-specific DescendantRegistrar (OpenCode) and then hand the wrapper back via SetDescendantRegistrar. Direct callers outside that wiring should not mutate the returned tracker.

type EngineConfig

type EngineConfig struct {
	Provider       string
	ExternalID     string
	TranscriptPath string
	CWD            string
	// Model is the session-constant LLM model name (Cursor only; sourced from
	// the sessionStart hook payload). Empty for other providers. The engine
	// stamps it onto transcript chunk metadata when non-empty.
	Model string
}

EngineConfig holds configuration for creating an Engine

type EventRequest

type EventRequest struct {
	SessionID string          `json:"session_id"`
	EventType string          `json:"event_type"`
	Timestamp time.Time       `json:"timestamp"`
	Payload   json.RawMessage `json:"payload"`
}

EventRequest is the request body for POST /api/v1/sync/event

type EventResponse

type EventResponse struct {
	Success bool `json:"success"`
}

EventResponse is the response for POST /api/v1/sync/event

type FileState

type FileState struct {
	LastSyncedLine int `json:"last_synced_line"`
}

FileState represents the sync state for a single file from the backend

type FileTracker

type FileTracker struct {
	// contains filtered or unexported fields
}

FileTracker tracks files and their sync state for a session

func NewFileTracker

func NewFileTracker(transcriptPath string) *FileTracker

NewFileTracker creates a new file tracker for a session

func (*FileTracker) AddCodexRollout added in v0.16.0

func (t *FileTracker) AddCodexRollout(path, fileName string, isRoot bool, meta CodexRolloutMetadata) *TrackedFile

AddCodexRollout registers a Codex rollout file in the tracker.

isRoot=true → file type "transcript" (the Codex root's primary rollout). isRoot=false → file type "agent" (every descendant, at any depth).

All descendants sync as sidechain files under the root's backend session — the same primitive Claude Code uses for its `agent-*.jsonl` files — while the Codex thread tree is preserved separately in the backend's `codex_rollouts` table via `meta.ParentThreadUUID`.

Idempotent: a second call for an already-tracked path returns the existing TrackedFile without modifying it. The caller can use this to avoid maintaining a separate "already added" set.

func (*FileTracker) DiscoverNewFiles

func (t *FileTracker) DiscoverNewFiles(newAgentIDs []string) []*TrackedFile

DiscoverNewFiles checks for new agent files based on agent IDs discovered in previous chunk reads, and also scans the subagents directory for any agent files not already tracked. Returns newly discovered files.

func (*FileTracker) GetTrackedFiles

func (t *FileTracker) GetTrackedFiles() []*TrackedFile

GetTrackedFiles returns all currently tracked files

func (*FileTracker) GetTranscriptFile

func (t *FileTracker) GetTranscriptFile() *TrackedFile

GetTranscriptFile returns the transcript file being tracked

func (*FileTracker) HasFileChanged

func (t *FileTracker) HasFileChanged(file *TrackedFile) bool

HasFileChanged checks if a file has more data to sync. Returns true if: - The file has grown (more bytes than our last known offset) - The file has been modified (mod time changed) - We haven't read the file yet (no byte offset)

func (*FileTracker) InitFromBackendState

func (t *FileTracker) InitFromBackendState(backendFiles map[string]FileState)

InitFromBackendState initializes the tracker with state from the backend. This sets up tracking for the transcript and any files the backend knows about.

Called from both Engine.Init() (first time) and Engine.refreshStateFromBackend() (after a chunk-upload failure). On refresh, any per-file metadata that the engine has already set on a tracked file (notably Codex rollout metadata) must survive — otherwise a retried first chunk would lose its codex_rollout payload. We preserve CodexRollout for existing entries and only refresh the fields that can legitimately drift (sync position).

func (*FileTracker) IsTracked

func (t *FileTracker) IsTracked(fileName string) bool

IsTracked returns true if a file is already being tracked

func (*FileTracker) ReadChunk

func (t *FileTracker) ReadChunk(file *TrackedFile, r *redactor.Redactor, maxBytes int) (*Chunk, error)

ReadChunk reads new lines from a file starting after LastSyncedLine. Uses ByteOffset to seek directly to the right position if available. Applies redaction if a redactor is provided. Stops reading when accumulated bytes would exceed maxBytes (aligned to line boundary). Returns nil if there are no new lines.

func (*FileTracker) RegisterCodexRollout added in v0.16.0

func (t *FileTracker) RegisterCodexRollout(path, fileName string, isRoot bool, meta provider.CodexRolloutMetadata)

RegisterCodexRollout is the provider.DescendantRegistrar entry point: thin wrapper around AddCodexRollout that drops the *TrackedFile return (kept on AddCodexRollout for in-package callers that want it).

func (*FileTracker) RegisterSidechainFile added in v0.17.0

func (t *FileTracker) RegisterSidechainFile(path, name, fileType string) bool

RegisterSidechainFile is the entry point for registering path-encoded sidechain files. CF-533 uses it for Claude workflow subagent transcripts + run journals; CF-538 uses it for OpenCode subagent messages. name is the forward-slash path-encoded backend file_name; path is its absolute location on disk; fileType is "agent" or provider.FileTypeWorkflowJournal.

Returns true when a new file is tracked. When the name is already tracked (e.g. a stale entry rebuilt from backend state on daemon restart), it overwrites Path+Type in place and returns false, preserving the sync position (LastSyncedLine/ByteOffset) so the file resumes incrementally.

func (*FileTracker) RootTranscriptPath added in v0.17.3

func (t *FileTracker) RootTranscriptPath() string

RootTranscriptPath returns the absolute path of the session's root transcript file. Part of provider.RootTranscriptProvider: providers whose subagent layout differs from Claude's (e.g. Cursor) derive their subagents directory from this path instead of from SubagentsDir().

func (*FileTracker) SubagentsDir added in v0.16.2

func (t *FileTracker) SubagentsDir() string

SubagentsDir returns the <session>/subagents directory for this tracker. Part of provider.WorkflowRegistrar: the Claude provider scans subagents/workflows/<runId>/ beneath it to discover workflow files.

NOTE: this is computed for Claude's layout (transcript at <project>/<session-id>.jsonl → subagents at <project>/<session-id>/subagents/). Cursor's subagents sit beside the transcript file (<dir>/agent-transcripts/<id>/subagents/, i.e. filepath.Dir(transcript)/subagents), so Cursor's DiscoverDescendants derives the dir from RootTranscriptPath() rather than relying on this value.

func (*FileTracker) UpdateAfterSync

func (t *FileTracker) UpdateAfterSync(file *TrackedFile, lastLine int, newOffset int64)

UpdateAfterSync updates the tracked file state after a successful sync. This updates both the sync position and the cached file stats (modtime/size) so HasFileChanged won't re-trigger until the file actually changes again.

type GitHubLinkRequest

type GitHubLinkRequest struct {
	URL    string `json:"url"`
	Title  string `json:"title,omitempty"`
	Source string `json:"source"` // "cli_hook" or "manual"
}

GitHubLinkRequest is the request body for POST /api/v1/sessions/{id}/github-links

type GitHubLinkResponse

type GitHubLinkResponse struct {
	ID       int64  `json:"id"`
	LinkType string `json:"link_type"` // "commit" or "pull_request"
	URL      string `json:"url"`
	Owner    string `json:"owner"`
	Repo     string `json:"repo"`
	Ref      string `json:"ref"`
}

GitHubLinkResponse is the response for POST /api/v1/sessions/{id}/github-links

type InitMetadata

type InitMetadata struct {
	CWD      string          `json:"cwd,omitempty"`
	GitInfo  json.RawMessage `json:"git_info,omitempty"`
	Hostname string          `json:"hostname,omitempty"`
	Username string          `json:"username,omitempty"`
}

InitMetadata contains optional metadata for session initialization

type InitRequest

type InitRequest struct {
	Provider       string        `json:"provider"`
	ExternalID     string        `json:"external_id"`
	TranscriptPath string        `json:"transcript_path"`
	Metadata       *InitMetadata `json:"metadata,omitempty"`
}

InitRequest is the request body for POST /api/v1/sync/init

type InitResponse

type InitResponse struct {
	SessionID string               `json:"session_id"`
	Files     map[string]FileState `json:"files"`
}

InitResponse is the response for POST /api/v1/sync/init

type TrackedFile

type TrackedFile struct {
	Path           string    // Full path to the file
	Name           string    // Base name of the file
	Type           string    // "transcript" or "agent"
	LastSyncedLine int       // Last line number synced to backend (1-based)
	ByteOffset     int64     // Byte position after LastSyncedLine (for seeking)
	LastModTime    time.Time // Last modification time (for change detection)
	LastSize       int64     // Last known size (for change detection)

	// CodexRollout, if non-nil, marks this tracked file as a Codex rollout
	// for which the engine should emit `codex_rollout` chunk metadata on
	// the FIRST chunk uploaded for this file. "First chunk" is detected
	// via chunk.FirstLine == 1; no separate state field is required.
	// Roots and descendants both carry this; only the engine's emission
	// gate (FirstLine==1) determines when it goes on the wire.
	CodexRollout *CodexRolloutMetadata
}

TrackedFile represents a file being synced

func (*TrackedFile) SetCodexRollout added in v0.16.0

func (f *TrackedFile) SetCodexRollout(meta *provider.CodexRolloutMetadata)

SetCodexRollout assigns Codex rollout metadata to this tracked file, used by provider.Codex.InitTranscript via the provider.TranscriptRegistrar interface. Separated from direct field assignment so the engine and providers don't need to share struct internals.

type UpdateSummaryRequest

type UpdateSummaryRequest struct {
	Summary string `json:"summary"`
}

UpdateSummaryRequest is the request body for PATCH /api/v1/sessions/{external_id}/summary

type UpdateSummaryResponse

type UpdateSummaryResponse struct {
	Status string `json:"status"`
}

UpdateSummaryResponse is the response for PATCH /api/v1/sessions/{external_id}/summary

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL