sync

package
v0.16.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2026 License: MIT Imports: 19 Imported by: 0

README

pkg/sync

Sync engine that orchestrates incremental transcript uploads to the backend. Handles file tracking, chunking, agent discovery, and chunk upload. Provider-specific behavior (metadata extraction, descendant discovery, root metadata attachment) lives entirely in pkg/provider; the engine dispatches through the provider.Provider interface (see CF-397).

Files

File Role
engine.go Engine — orchestrates init, sync loop, agent discovery (BFS); dispatches provider behavior via InitTranscript/DiscoverDescendants/AnnotateChunk. Includes the chunkView adapter that satisfies provider.ChunkView
client.go Client — HTTP API methods for init, chunk upload, events, summary updates, GitHub linking. Aliases provider.CodexRolloutMetadata as sync.CodexRolloutMetadata
tracker.go FileTracker — tracks file state, reads chunks with byte-offset seeking, discovers agent files (Claude transitive discovery). Implements provider.TranscriptRegistrar (via *TrackedFile.SetCodexRollout) and provider.DescendantRegistrar (via *FileTracker.RegisterCodexRollout) so providers can register Codex rollouts
summary_link.go Links child session summaries to parent sessions via leafUuid

Three Components

Engine (orchestrator)

Engine.Init() registers the session with the backend, receiving the current sync state (last synced line per file), then calls provider.InitTranscript(transcript, ...) so the provider can attach root-level metadata (Codex attaches codex_rollout; Claude is a no-op). Engine.SyncAll() performs a BFS traversal: it first calls provider.DiscoverDescendants(tracker, externalID) once per cycle (Codex walks the SQLite subtree; Claude is a no-op), then for each tracked file checks for changes, reads a chunk, dispatches provider.AnnotateChunk(chunkView, sentFirst, redact), uploads, and discovers new agent files via tracker.DiscoverNewFiles (Claude's transitive content-driven discovery). Codex descendants are registered as file_type=agent sidechain files under the root's backend session.

Client (API)

Thin wrapper around pkg/http.Client that marshals/unmarshals request types for the sync API endpoints: /api/v1/sync/init, /api/v1/sync/chunk, /api/v1/sync/event, and session-specific endpoints for summaries and GitHub links.

FileTracker (file I/O + state)

Manages the mapping between files on disk and their sync state. ReadChunk() seeks to the last known byte offset, reads new lines up to the chunk size limit, applies redaction, and extracts agent IDs. DiscoverNewFiles() finds new agent files both from collected agent IDs and by scanning the subagents directory.

Data Flow

SyncAll() loop:
  HasFileChanged? ──no──> skip
       │yes
       ▼
  ReadChunk(maxBytes)
    ├── seek to ByteOffset
    ├── read lines until maxBytes
    ├── extract agent IDs (pre-redaction)
    ├── extract metadata (pre-redaction)
    ├── apply redaction
    └── return Chunk
       │
       ▼
  UploadChunk (redacted lines + redacted metadata)
       │
       ▼
  UpdateAfterSync (update byte offset + line count)
       │
       ▼
  DiscoverNewFiles (from collected agent IDs + directory scan)
       │
       ▼
  Add new files to queue → repeat

How to Extend

Adding a new API endpoint: Add request/response types in client.go, add a method on Client, call it from the engine or command layer.

Adding new metadata extraction: Modify the appropriate provider's AnnotateChunk in pkg/provider/{claude,codex}.go. Metadata is extracted from raw lines before redaction, then the extracted values are redacted via the closure passed to AnnotateChunk before being attached to the chunk via the ChunkView setters.

Tracking a new file type: Add discovery logic in DiscoverNewFiles() (for content-driven discovery) or in the provider's DiscoverDescendants (for external-state discovery). Set the file type in TrackedFile.Type. The rest of the pipeline (read, chunk, upload) is file-type agnostic.

Adding a new provider: Implement provider.Provider (including the three sync-loop methods InitTranscript, DiscoverDescendants, AnnotateChunk) and register it in pkg/provider/provider.go's registry. Zero changes required in pkg/sync/engine.go — the engine dispatches everything through the interface.

Invariants

  • Chunks must not exceed 14MB (DefaultMaxChunkBytes). The backend rejects larger payloads. The limit is 14MB not 16MB to leave headroom for JSON encoding overhead.
  • Init() must be called before SyncAll(). The engine needs a backend session ID and initial sync state.
  • After upload failure, state must be refreshed from backend (refreshStateFromBackend). This handles the case where the server received and stored data but the client timed out before receiving the response. Without refresh, the client would re-upload duplicate lines. applyBackendFiles is the shared path for initial and refreshed backend file state.
  • Agent discovery uses BFS with cycle detection. The knownAgentIDs set prevents infinite loops when agents reference each other. Max 10 BFS iterations as a safety bound.
  • Redaction must happen in ReadChunk() before lines leave the tracker. Never upload unredacted content. The same call site covers Claude transcripts, Claude agent files, and Codex rollouts; redactor.RedactJSONLine is JSON-shape-agnostic, so no per-provider branching is needed.
  • Metadata is extracted before redaction, then redacted. Summaries and first user messages need the original text for meaningful extraction, but must be redacted before upload.
  • Byte offsets must be maintained accurately. ReadChunk returns NewOffset which is the byte position after the last line read. UpdateAfterSync stores this for the next read. Incorrect offsets cause duplicate or missing lines.
  • Directory scan in DiscoverNewFiles catches agents from already-synced lines. After a daemon restart, agent IDs from previously-synced lines are lost from memory. The directory scan recovers them.
  • codex_rollout metadata rides on first chunks only. provider.Codex.AnnotateChunk attaches ChunkMetadata.CodexRollout whenever c.FirstLine() == 1 and the tracked file carries a CodexRollout. On retry after a failed upload, FirstLine remains 1 so the metadata is automatically resent — the backend upsert is idempotent. InitFromBackendState preserves TrackedFile.CodexRollout across refreshStateFromBackend so retries don't lose the payload.
  • The engine has no provider-name branches. TestEngine_NoProviderNameLiterals in engine_dispatch_test.go scans engine.go for NameCodex / NameClaudeCode literals and fails CI if either appears. New provider-specific behavior must live in pkg/provider, not the engine.

Design Decisions

BFS for agent discovery. Agents can spawn sub-agents transitively (A references B, B references C). BFS ensures all transitive agents are discovered and synced, not just direct children. The iteration cap (10) prevents runaway discovery.

Byte-offset seeking instead of re-reading. For large transcripts (megabytes), seeking to the last read position is far more efficient than re-reading from the start and skipping lines.

refreshStateFromBackend after upload failure. When a chunk upload times out, the server may have stored the data. Without refreshing, the next SyncAll() would re-upload the same lines. The refresh call gets the server's actual LastSyncedLine and updates the tracker accordingly. Auth errors during refresh are propagated (can't recover without re-auth).

Summary link injection. When a transcript contains a summary with a leafUuid, it means this session is a continuation of a previous one. linkSummaryToPreviousSession finds the parent transcript by scanning other JSONL files for the matching UUID, then calls the backend to update the parent's summary. This is best-effort — failures are logged but don't block sync.

Testing

go test ./pkg/sync/...
  • NewWithBackend() allows injecting a mock backend/client for unit tests
  • engine_test.go / tracker_test.go — unit tests for incremental sync, agent discovery, byte offsets, chunking
  • integration_test.go — full engine lifecycle with mock HTTP backend: init, multi-cycle sync, agent discovery, error recovery, large files, chunk size limits

Dependencies

Uses: pkg/config, pkg/git, pkg/http, pkg/logger, pkg/provider, pkg/redactor, pkg/types, pkg/utils

Used by: pkg/daemon/ (sync loop), cmd/ (save command, post-tool-use linking)

Documentation

Index

Constants

View Source
const DefaultMaxChunkBytes = 14 * 1024 * 1024 // 14MB

DefaultMaxChunkBytes is the default maximum size of a chunk in bytes. This is a backend-imposed limit: the server rejects chunks larger than 16MB. We use 14MB to leave headroom for JSON encoding overhead and compression. If the backend limit changes, this constant must be updated accordingly.

Variables

This section is empty.

Functions

func FindSessionByLeafUUID

func FindSessionByLeafUUID(transcriptDir, leafUUID, excludeFile string) string

FindSessionByLeafUUID searches transcript files in the given directory for a message with the specified uuid in the last N lines. Returns the session ID (filename without extension) if found, empty string otherwise.

Excludes the file specified by excludeFile from the search.

Types

type Backend added in v0.16.0

type Backend interface {
	Init(providerName, externalID, transcriptPath string, metadata *InitMetadata) (*InitResponse, error)
	UploadChunk(sessionID, fileName, fileType string, firstLine int, lines []string, metadata *ChunkMetadata) (int, error)
	SendEvent(sessionID, eventType string, timestamp time.Time, payload json.RawMessage) error
	UpdateSessionSummary(externalID, summary string) error
}

Backend is the sync transport used by Engine. The HTTP client implements this for provider-aware backend sync.

type Chunk

type Chunk struct {
	FileName  string         // Base name of the file
	FileType  string         // "transcript" or "agent"
	FirstLine int            // 1-based line number of first line
	Lines     []string       // The lines (redacted if applicable)
	NewOffset int64          // Byte offset after reading these lines
	Metadata  *ChunkMetadata // Metadata to send to backend
	AgentIDs  []string       // Agent IDs discovered (local use only, not sent to backend)
}

Chunk represents a range of lines read from a file with extracted metadata

type ChunkMetadata

type ChunkMetadata struct {
	GitInfo          *git.GitInfo          `json:"git_info,omitempty"`
	Summary          string                `json:"summary,omitempty"`
	FirstUserMessage string                `json:"first_user_message,omitempty"`
	CodexRollout     *CodexRolloutMetadata `json:"codex_rollout,omitempty"`
}

ChunkMetadata contains metadata sent to the backend with a chunk

type ChunkRequest

type ChunkRequest struct {
	SessionID string         `json:"session_id"`
	FileName  string         `json:"file_name"`
	FileType  string         `json:"file_type"`
	FirstLine int            `json:"first_line"`
	Lines     []string       `json:"lines"`
	Metadata  *ChunkMetadata `json:"metadata,omitempty"`
}

ChunkRequest is the request body for POST /api/v1/sync/chunk

type ChunkResponse

type ChunkResponse struct {
	LastSyncedLine int `json:"last_synced_line"`
}

ChunkResponse is the response for POST /api/v1/sync/chunk

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client handles communication with the sync API endpoints

func NewClient

func NewClient(cfg *config.UploadConfig) (*Client, error)

NewClient creates a new sync API client

func (*Client) Init

func (c *Client) Init(providerName, externalID, transcriptPath string, metadata *InitMetadata) (*InitResponse, error)

Init initializes or resumes a sync session Returns the session ID and current sync state for all files. The providerName must be a canonical provider name (callers via Engine.Init pass e.provider.Name(), which is always non-empty).

func (*Client) LinkGitHub

func (c *Client) LinkGitHub(sessionID string, req *GitHubLinkRequest) (*GitHubLinkResponse, error)

LinkGitHub creates a GitHub link for a session

func (*Client) SendEvent

func (c *Client) SendEvent(sessionID, eventType string, timestamp time.Time, payload json.RawMessage) error

SendEvent sends a session lifecycle event to the backend

func (*Client) UpdateSessionSummary

func (c *Client) UpdateSessionSummary(externalID, summary string) error

UpdateSessionSummary updates the summary for a session identified by its external_id

func (*Client) UploadChunk

func (c *Client) UploadChunk(sessionID, fileName, fileType string, firstLine int, lines []string, metadata *ChunkMetadata) (int, error)

UploadChunk uploads a chunk of lines for a file with optional metadata Returns the new last synced line number

type CodexRolloutMetadata added in v0.16.0

type CodexRolloutMetadata = provider.CodexRolloutMetadata

CodexRolloutMetadata is the per-rollout metadata transmitted on the FIRST chunk of a Codex rollout. The canonical definition lives in pkg/provider so the Codex implementation can construct one without an import cycle; pkg/sync re-exports it here as an alias so existing call sites that reference sync.CodexRolloutMetadata keep working. Wire format unchanged.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is the core sync engine used by both daemon and manual save. It provides a unified interface for syncing session data to the backend.

func New

func New(uploadCfg *config.UploadConfig, engineCfg EngineConfig) (*Engine, error)

New creates a new sync engine with the given configuration. The engine is not connected to the backend until Init() is called.

func NewWithBackend added in v0.16.0

func NewWithBackend(backend Backend, r *redactor.Redactor, engineCfg EngineConfig) *Engine

NewWithBackend creates an engine with a preconfigured backend. Test-facing; an invalid Provider name falls back to ClaudeCode to keep historical behavior (default provider when unspecified) and avoid a second error path for callers that don't care.

func (*Engine) GetSyncStats

func (e *Engine) GetSyncStats() map[string]int

GetSyncStats returns current sync statistics (lines synced per file)

func (*Engine) Init

func (e *Engine) Init() error

Init initializes the sync session with the backend. - Creates session if not exists, or resumes existing - Gets last_synced_line for all known files - Sends initial metadata (git info, hostname, username) Must be called before SyncAll.

func (*Engine) IsInitialized

func (e *Engine) IsInitialized() bool

IsInitialized returns true if Init() has been called successfully

func (*Engine) Reset

func (e *Engine) Reset()

Reset clears the initialized state, allowing Init to be called again. This is useful when the backend returns an auth error and we need to re-authenticate and re-initialize.

func (*Engine) SendSessionEnd

func (e *Engine) SendSessionEnd(hookInput *types.ClaudeHookInput, timestamp time.Time) error

SendSessionEnd sends a session_end event to the backend

func (*Engine) SessionID

func (e *Engine) SessionID() string

SessionID returns the backend session ID (empty if not initialized)

func (*Engine) SyncAll

func (e *Engine) SyncAll() (int, error)

SyncAll syncs all tracked files to the backend using BFS traversal. It discovers agent files referenced in transcripts and syncs them transitively within a single call. Each file is processed at most once per call.

Algorithm:

  1. Start with all currently tracked files in the queue
  2. Process each file in queue (sync if changed, extract agent IDs)
  3. Discover new files from collected agent IDs
  4. Add only NEW files to the queue for next iteration
  5. Repeat until queue is empty (or max iterations reached)

Returns number of chunks uploaded and the first error encountered (if any). Continues syncing other files even if one file fails.

type EngineConfig

type EngineConfig struct {
	Provider       string
	ExternalID     string
	TranscriptPath string
	CWD            string
}

EngineConfig holds configuration for creating an Engine

type EventRequest

type EventRequest struct {
	SessionID string          `json:"session_id"`
	EventType string          `json:"event_type"`
	Timestamp time.Time       `json:"timestamp"`
	Payload   json.RawMessage `json:"payload"`
}

EventRequest is the request body for POST /api/v1/sync/event

type EventResponse

type EventResponse struct {
	Success bool `json:"success"`
}

EventResponse is the response for POST /api/v1/sync/event

type FileState

type FileState struct {
	LastSyncedLine int `json:"last_synced_line"`
}

FileState represents the sync state for a single file from the backend

type FileTracker

type FileTracker struct {
	// contains filtered or unexported fields
}

FileTracker tracks files and their sync state for a session

func NewFileTracker

func NewFileTracker(transcriptPath string) *FileTracker

NewFileTracker creates a new file tracker for a session

func (*FileTracker) AddCodexRollout added in v0.16.0

func (t *FileTracker) AddCodexRollout(path, fileName string, isRoot bool, meta CodexRolloutMetadata) *TrackedFile

AddCodexRollout registers a Codex rollout file in the tracker.

isRoot=true → file type "transcript" (the Codex root's primary rollout). isRoot=false → file type "agent" (every descendant, at any depth).

All descendants sync as sidechain files under the root's backend session — the same primitive Claude Code uses for its `agent-*.jsonl` files — while the Codex thread tree is preserved separately in the backend's `codex_rollouts` table via `meta.ParentThreadUUID`.

Idempotent: a second call for an already-tracked path returns the existing TrackedFile without modifying it. The caller can use this to avoid maintaining a separate "already added" set.

func (*FileTracker) DiscoverNewFiles

func (t *FileTracker) DiscoverNewFiles(newAgentIDs []string) []*TrackedFile

DiscoverNewFiles checks for new agent files based on agent IDs discovered in previous chunk reads, and also scans the subagents directory for any agent files not already tracked. Returns newly discovered files.

func (*FileTracker) GetTrackedFiles

func (t *FileTracker) GetTrackedFiles() []*TrackedFile

GetTrackedFiles returns all currently tracked files

func (*FileTracker) GetTranscriptFile

func (t *FileTracker) GetTranscriptFile() *TrackedFile

GetTranscriptFile returns the transcript file being tracked

func (*FileTracker) HasFileChanged

func (t *FileTracker) HasFileChanged(file *TrackedFile) bool

HasFileChanged checks if a file has more data to sync. Returns true if: - The file has grown (more bytes than our last known offset) - The file has been modified (mod time changed) - We haven't read the file yet (no byte offset)

func (*FileTracker) InitFromBackendState

func (t *FileTracker) InitFromBackendState(backendFiles map[string]FileState)

InitFromBackendState initializes the tracker with state from the backend. This sets up tracking for the transcript and any files the backend knows about.

Called from both Engine.Init() (first time) and Engine.refreshStateFromBackend() (after a chunk-upload failure). On refresh, any per-file metadata that the engine has already set on a tracked file (notably Codex rollout metadata) must survive — otherwise a retried first chunk would lose its codex_rollout payload. We preserve CodexRollout for existing entries and only refresh the fields that can legitimately drift (sync position).

func (*FileTracker) IsTracked

func (t *FileTracker) IsTracked(fileName string) bool

IsTracked returns true if a file is already being tracked

func (*FileTracker) ReadChunk

func (t *FileTracker) ReadChunk(file *TrackedFile, r *redactor.Redactor, maxBytes int) (*Chunk, error)

ReadChunk reads new lines from a file starting after LastSyncedLine. Uses ByteOffset to seek directly to the right position if available. Applies redaction if a redactor is provided. Stops reading when accumulated bytes would exceed maxBytes (aligned to line boundary). Returns nil if there are no new lines.

func (*FileTracker) RegisterCodexRollout added in v0.16.0

func (t *FileTracker) RegisterCodexRollout(path, fileName string, isRoot bool, meta provider.CodexRolloutMetadata)

RegisterCodexRollout is the provider.DescendantRegistrar entry point: thin wrapper around AddCodexRollout that drops the *TrackedFile return (kept on AddCodexRollout for in-package callers that want it).

func (*FileTracker) UpdateAfterSync

func (t *FileTracker) UpdateAfterSync(file *TrackedFile, lastLine int, newOffset int64)

UpdateAfterSync updates the tracked file state after a successful sync. This updates both the sync position and the cached file stats (modtime/size) so HasFileChanged won't re-trigger until the file actually changes again.

type GitHubLinkRequest

type GitHubLinkRequest struct {
	URL    string `json:"url"`
	Title  string `json:"title,omitempty"`
	Source string `json:"source"` // "cli_hook" or "manual"
}

GitHubLinkRequest is the request body for POST /api/v1/sessions/{id}/github-links

type GitHubLinkResponse

type GitHubLinkResponse struct {
	ID       int64  `json:"id"`
	LinkType string `json:"link_type"` // "commit" or "pull_request"
	URL      string `json:"url"`
	Owner    string `json:"owner"`
	Repo     string `json:"repo"`
	Ref      string `json:"ref"`
}

GitHubLinkResponse is the response for POST /api/v1/sessions/{id}/github-links

type InitMetadata

type InitMetadata struct {
	CWD      string          `json:"cwd,omitempty"`
	GitInfo  json.RawMessage `json:"git_info,omitempty"`
	Hostname string          `json:"hostname,omitempty"`
	Username string          `json:"username,omitempty"`
}

InitMetadata contains optional metadata for session initialization

type InitRequest

type InitRequest struct {
	Provider       string        `json:"provider"`
	ExternalID     string        `json:"external_id"`
	TranscriptPath string        `json:"transcript_path"`
	Metadata       *InitMetadata `json:"metadata,omitempty"`
}

InitRequest is the request body for POST /api/v1/sync/init

type InitResponse

type InitResponse struct {
	SessionID string               `json:"session_id"`
	Files     map[string]FileState `json:"files"`
}

InitResponse is the response for POST /api/v1/sync/init

type TrackedFile

type TrackedFile struct {
	Path           string    // Full path to the file
	Name           string    // Base name of the file
	Type           string    // "transcript" or "agent"
	LastSyncedLine int       // Last line number synced to backend (1-based)
	ByteOffset     int64     // Byte position after LastSyncedLine (for seeking)
	LastModTime    time.Time // Last modification time (for change detection)
	LastSize       int64     // Last known size (for change detection)

	// CodexRollout, if non-nil, marks this tracked file as a Codex rollout
	// for which the engine should emit `codex_rollout` chunk metadata on
	// the FIRST chunk uploaded for this file. "First chunk" is detected
	// via chunk.FirstLine == 1; no separate state field is required.
	// Roots and descendants both carry this; only the engine's emission
	// gate (FirstLine==1) determines when it goes on the wire.
	CodexRollout *CodexRolloutMetadata
}

TrackedFile represents a file being synced

func (*TrackedFile) SetCodexRollout added in v0.16.0

func (f *TrackedFile) SetCodexRollout(meta *provider.CodexRolloutMetadata)

SetCodexRollout assigns Codex rollout metadata to this tracked file, used by provider.Codex.InitTranscript via the provider.TranscriptRegistrar interface. Separated from direct field assignment so the engine and providers don't need to share struct internals.

type UpdateSummaryRequest

type UpdateSummaryRequest struct {
	Summary string `json:"summary"`
}

UpdateSummaryRequest is the request body for PATCH /api/v1/sessions/{external_id}/summary

type UpdateSummaryResponse

type UpdateSummaryResponse struct {
	Status string `json:"status"`
}

UpdateSummaryResponse is the response for PATCH /api/v1/sessions/{external_id}/summary

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL