sync

package
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2026 License: MIT Imports: 19 Imported by: 0

README

pkg/sync

Sync engine that orchestrates incremental transcript uploads to the backend. Handles file tracking, chunking, agent discovery, metadata extraction, and summary linking.

Files

File Role
engine.go Engine — orchestrates init, sync loop, agent discovery (BFS), metadata extraction
client.go Client — HTTP API methods for init, chunk upload, events, summary updates, GitHub linking
tracker.go FileTracker — tracks file state, reads chunks with byte-offset seeking, discovers agent files
summary_link.go Links child session summaries to parent sessions via leafUuid

Three Components

Engine (orchestrator)

Engine.Init() registers the session with the backend, receiving the current sync state (last synced line per file). Engine.SyncAll() performs a BFS traversal: for each tracked file, it checks for changes, reads a chunk, extracts metadata, uploads, and discovers new agent files. New agent files are added to the queue for the next iteration.

Client (API)

Thin wrapper around pkg/http.Client that marshals/unmarshals request types for the sync API endpoints: /api/v1/sync/init, /api/v1/sync/chunk, /api/v1/sync/event, and session-specific endpoints for summaries and GitHub links.

FileTracker (file I/O + state)

Manages the mapping between files on disk and their sync state. ReadChunk() seeks to the last known byte offset, reads new lines up to the chunk size limit, applies redaction, and extracts agent IDs. DiscoverNewFiles() finds new agent files both from collected agent IDs and by scanning the subagents directory.

Data Flow

SyncAll() loop:
  HasFileChanged? ──no──> skip
       │yes
       ▼
  ReadChunk(maxBytes)
    ├── seek to ByteOffset
    ├── read lines until maxBytes
    ├── extract agent IDs (pre-redaction)
    ├── extract metadata (pre-redaction)
    ├── apply redaction
    └── return Chunk
       │
       ▼
  UploadChunk (redacted lines + redacted metadata)
       │
       ▼
  UpdateAfterSync (update byte offset + line count)
       │
       ▼
  DiscoverNewFiles (from collected agent IDs + directory scan)
       │
       ▼
  Add new files to queue → repeat

How to Extend

Adding a new API endpoint: Add request/response types in client.go, add a method on Client, call it from the engine or command layer.

Adding new metadata extraction: Modify the metadata extraction section in SyncAll() where ChunkMetadata is built. Metadata is extracted from raw lines before redaction, then the extracted values are redacted separately before upload.

Tracking a new file type: Add discovery logic in DiscoverNewFiles(). Set the file type in TrackedFile.Type. The rest of the pipeline (read, chunk, upload) is file-type agnostic.

Invariants

  • Chunks must not exceed 14MB (DefaultMaxChunkBytes). The backend rejects larger payloads. The limit is 14MB not 16MB to leave headroom for JSON encoding overhead.
  • Init() must be called before SyncAll(). The engine needs a backend session ID and initial sync state.
  • After upload failure, state must be refreshed from backend (refreshStateFromBackend). This handles the case where the server received and stored data but the client timed out before receiving the response. Without refresh, the client would re-upload duplicate lines.
  • Agent discovery uses BFS with cycle detection. The knownAgentIDs set prevents infinite loops when agents reference each other. Max 10 BFS iterations as a safety bound.
  • Redaction must happen in ReadChunk() before lines leave the tracker. Never upload unredacted content.
  • Metadata is extracted before redaction, then redacted. Summaries and first user messages need the original text for meaningful extraction, but must be redacted before upload.
  • Byte offsets must be maintained accurately. ReadChunk returns NewOffset which is the byte position after the last line read. UpdateAfterSync stores this for the next read. Incorrect offsets cause duplicate or missing lines.
  • Directory scan in DiscoverNewFiles catches agents from already-synced lines. After a daemon restart, agent IDs from previously-synced lines are lost from memory. The directory scan recovers them.

Design Decisions

BFS for agent discovery. Agents can spawn sub-agents transitively (A references B, B references C). BFS ensures all transitive agents are discovered and synced, not just direct children. The iteration cap (10) prevents runaway discovery.

Byte-offset seeking instead of re-reading. For large transcripts (megabytes), seeking to the last read position is far more efficient than re-reading from the start and skipping lines.

refreshStateFromBackend after upload failure. When a chunk upload times out, the server may have stored the data. Without refreshing, the next SyncAll() would re-upload the same lines. The refresh call gets the server's actual LastSyncedLine and updates the tracker accordingly. Auth errors during refresh are propagated (can't recover without re-auth).

Summary link injection. When a transcript contains a summary with a leafUuid, it means this session is a continuation of a previous one. linkSummaryToPreviousSession finds the parent transcript by scanning other JSONL files for the matching UUID, then calls the backend to update the parent's summary. This is best-effort — failures are logged but don't block sync.

Testing

go test ./pkg/sync/...
  • NewWithClient() allows injecting a mock client for unit tests
  • engine_test.go / tracker_test.go — unit tests for incremental sync, agent discovery, byte offsets, chunking
  • integration_test.go — full engine lifecycle with mock HTTP backend: init, multi-cycle sync, agent discovery, error recovery, large files, chunk size limits

Dependencies

Uses: pkg/http, pkg/redactor, pkg/discovery, pkg/git, pkg/config, pkg/types, pkg/logger

Used by: pkg/daemon/ (sync loop), cmd/ (save command, post-tool-use linking)

Documentation

Index

Constants

View Source
const DefaultMaxChunkBytes = 14 * 1024 * 1024 // 14MB

DefaultMaxChunkBytes is the default maximum size of a chunk in bytes. This is a backend-imposed limit: the server rejects chunks larger than 16MB. We use 14MB to leave headroom for JSON encoding overhead and compression. If the backend limit changes, this constant must be updated accordingly.

Variables

This section is empty.

Functions

func FindSessionByLeafUUID

func FindSessionByLeafUUID(transcriptDir, leafUUID, excludeFile string) string

FindSessionByLeafUUID searches transcript files in the given directory for a message with the specified uuid in the last N lines. Returns the session ID (filename without extension) if found, empty string otherwise.

Excludes the file specified by excludeFile from the search.

Types

type Chunk

type Chunk struct {
	FileName  string         // Base name of the file
	FileType  string         // "transcript" or "agent"
	FirstLine int            // 1-based line number of first line
	Lines     []string       // The lines (redacted if applicable)
	NewOffset int64          // Byte offset after reading these lines
	Metadata  *ChunkMetadata // Metadata to send to backend
	AgentIDs  []string       // Agent IDs discovered (local use only, not sent to backend)
}

Chunk represents a range of lines read from a file with extracted metadata

type ChunkMetadata

type ChunkMetadata struct {
	GitInfo          *git.GitInfo `json:"git_info,omitempty"`
	Summary          string       `json:"summary,omitempty"`
	FirstUserMessage string       `json:"first_user_message,omitempty"`
}

ChunkMetadata contains metadata sent to the backend with a chunk

type ChunkRequest

type ChunkRequest struct {
	SessionID string         `json:"session_id"`
	FileName  string         `json:"file_name"`
	FileType  string         `json:"file_type"`
	FirstLine int            `json:"first_line"`
	Lines     []string       `json:"lines"`
	Metadata  *ChunkMetadata `json:"metadata,omitempty"`
}

ChunkRequest is the request body for POST /api/v1/sync/chunk

type ChunkResponse

type ChunkResponse struct {
	LastSyncedLine int `json:"last_synced_line"`
}

ChunkResponse is the response for POST /api/v1/sync/chunk

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client handles communication with the sync API endpoints

func NewClient

func NewClient(cfg *config.UploadConfig) (*Client, error)

NewClient creates a new sync API client

func (*Client) Init

func (c *Client) Init(externalID, transcriptPath string, metadata *InitMetadata) (*InitResponse, error)

Init initializes or resumes a sync session Returns the session ID and current sync state for all files

func (*Client) LinkGitHub

func (c *Client) LinkGitHub(sessionID string, req *GitHubLinkRequest) (*GitHubLinkResponse, error)

LinkGitHub creates a GitHub link for a session

func (*Client) SendEvent

func (c *Client) SendEvent(sessionID, eventType string, timestamp time.Time, payload json.RawMessage) error

SendEvent sends a session lifecycle event to the backend

func (*Client) UpdateSessionSummary

func (c *Client) UpdateSessionSummary(externalID, summary string) error

UpdateSessionSummary updates the summary for a session identified by its external_id

func (*Client) UploadChunk

func (c *Client) UploadChunk(sessionID, fileName, fileType string, firstLine int, lines []string, metadata *ChunkMetadata) (int, error)

UploadChunk uploads a chunk of lines for a file with optional metadata Returns the new last synced line number

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is the core sync engine used by both daemon and manual save. It provides a unified interface for syncing session data to the backend.

func New

func New(uploadCfg *config.UploadConfig, engineCfg EngineConfig) (*Engine, error)

New creates a new sync engine with the given configuration. The engine is not connected to the backend until Init() is called.

func NewWithClient

func NewWithClient(client *Client, r *redactor.Redactor, engineCfg EngineConfig) *Engine

NewWithClient creates an engine with a pre-configured client. This is useful for testing with mock clients.

func (*Engine) GetSyncStats

func (e *Engine) GetSyncStats() map[string]int

GetSyncStats returns current sync statistics (lines synced per file)

func (*Engine) Init

func (e *Engine) Init() error

Init initializes the sync session with the backend. - Creates session if not exists, or resumes existing - Gets last_synced_line for all known files - Sends initial metadata (git info, hostname, username) Must be called before SyncAll.

func (*Engine) IsInitialized

func (e *Engine) IsInitialized() bool

IsInitialized returns true if Init() has been called successfully

func (*Engine) Reset

func (e *Engine) Reset()

Reset clears the initialized state, allowing Init to be called again. This is useful when the backend returns an auth error and we need to re-authenticate and re-initialize.

func (*Engine) SendSessionEnd

func (e *Engine) SendSessionEnd(hookInput *types.HookInput, timestamp time.Time) error

SendSessionEnd sends a session_end event to the backend

func (*Engine) SessionID

func (e *Engine) SessionID() string

SessionID returns the backend session ID (empty if not initialized)

func (*Engine) SyncAll

func (e *Engine) SyncAll() (int, error)

SyncAll syncs all tracked files to the backend using BFS traversal. It discovers agent files referenced in transcripts and syncs them transitively within a single call. Each file is processed at most once per call.

Algorithm:

  1. Start with all currently tracked files in the queue
  2. Process each file in queue (sync if changed, extract agent IDs)
  3. Discover new files from collected agent IDs
  4. Add only NEW files to the queue for next iteration
  5. Repeat until queue is empty (or max iterations reached)

Returns number of chunks uploaded and the first error encountered (if any). Continues syncing other files even if one file fails.

type EngineConfig

type EngineConfig struct {
	ExternalID     string
	TranscriptPath string
	CWD            string
}

EngineConfig holds configuration for creating an Engine

type EventRequest

type EventRequest struct {
	SessionID string          `json:"session_id"`
	EventType string          `json:"event_type"`
	Timestamp time.Time       `json:"timestamp"`
	Payload   json.RawMessage `json:"payload"`
}

EventRequest is the request body for POST /api/v1/sync/event

type EventResponse

type EventResponse struct {
	Success bool `json:"success"`
}

EventResponse is the response for POST /api/v1/sync/event

type FileState

type FileState struct {
	LastSyncedLine int `json:"last_synced_line"`
}

FileState represents the sync state for a single file from the backend

type FileTracker

type FileTracker struct {
	// contains filtered or unexported fields
}

FileTracker tracks files and their sync state for a session

func NewFileTracker

func NewFileTracker(transcriptPath string) *FileTracker

NewFileTracker creates a new file tracker for a session

func (*FileTracker) DiscoverNewFiles

func (t *FileTracker) DiscoverNewFiles(newAgentIDs []string) []*TrackedFile

DiscoverNewFiles checks for new agent files based on agent IDs discovered in previous chunk reads, and also scans the subagents directory for any agent files not already tracked. Returns newly discovered files.

func (*FileTracker) GetTrackedFiles

func (t *FileTracker) GetTrackedFiles() []*TrackedFile

GetTrackedFiles returns all currently tracked files

func (*FileTracker) GetTranscriptFile

func (t *FileTracker) GetTranscriptFile() *TrackedFile

GetTranscriptFile returns the transcript file being tracked

func (*FileTracker) HasFileChanged

func (t *FileTracker) HasFileChanged(file *TrackedFile) bool

HasFileChanged checks if a file has more data to sync. Returns true if: - The file has grown (more bytes than our last known offset) - The file has been modified (mod time changed) - We haven't read the file yet (no byte offset)

func (*FileTracker) InitFromBackendState

func (t *FileTracker) InitFromBackendState(backendFiles map[string]FileState)

InitFromBackendState initializes the tracker with state from the backend. This sets up tracking for the transcript and any files the backend knows about.

func (*FileTracker) IsTracked

func (t *FileTracker) IsTracked(fileName string) bool

IsTracked returns true if a file is already being tracked

func (*FileTracker) ReadChunk

func (t *FileTracker) ReadChunk(file *TrackedFile, r *redactor.Redactor, maxBytes int) (*Chunk, error)

ReadChunk reads new lines from a file starting after LastSyncedLine. Uses ByteOffset to seek directly to the right position if available. Applies redaction if a redactor is provided. Stops reading when accumulated bytes would exceed maxBytes (aligned to line boundary). Returns nil if there are no new lines.

func (*FileTracker) UpdateAfterSync

func (t *FileTracker) UpdateAfterSync(file *TrackedFile, lastLine int, newOffset int64)

UpdateAfterSync updates the tracked file state after a successful sync. This updates both the sync position and the cached file stats (modtime/size) so HasFileChanged won't re-trigger until the file actually changes again.

type GitHubLinkRequest

type GitHubLinkRequest struct {
	URL    string `json:"url"`
	Title  string `json:"title,omitempty"`
	Source string `json:"source"` // "cli_hook" or "manual"
}

GitHubLinkRequest is the request body for POST /api/v1/sessions/{id}/github-links

type GitHubLinkResponse

type GitHubLinkResponse struct {
	ID       int64  `json:"id"`
	LinkType string `json:"link_type"` // "commit" or "pull_request"
	URL      string `json:"url"`
	Owner    string `json:"owner"`
	Repo     string `json:"repo"`
	Ref      string `json:"ref"`
}

GitHubLinkResponse is the response for POST /api/v1/sessions/{id}/github-links

type InitMetadata

type InitMetadata struct {
	CWD      string          `json:"cwd,omitempty"`
	GitInfo  json.RawMessage `json:"git_info,omitempty"`
	Hostname string          `json:"hostname,omitempty"`
	Username string          `json:"username,omitempty"`
}

InitMetadata contains optional metadata for session initialization

type InitRequest

type InitRequest struct {
	ExternalID     string        `json:"external_id"`
	TranscriptPath string        `json:"transcript_path"`
	Metadata       *InitMetadata `json:"metadata,omitempty"`
}

InitRequest is the request body for POST /api/v1/sync/init

type InitResponse

type InitResponse struct {
	SessionID string               `json:"session_id"`
	Files     map[string]FileState `json:"files"`
}

InitResponse is the response for POST /api/v1/sync/init

type TrackedFile

type TrackedFile struct {
	Path           string    // Full path to the file
	Name           string    // Base name of the file
	Type           string    // "transcript" or "agent"
	LastSyncedLine int       // Last line number synced to backend (1-based)
	ByteOffset     int64     // Byte position after LastSyncedLine (for seeking)
	LastModTime    time.Time // Last modification time (for change detection)
	LastSize       int64     // Last known size (for change detection)
}

TrackedFile represents a file being synced

type UpdateSummaryRequest

type UpdateSummaryRequest struct {
	Summary string `json:"summary"`
}

UpdateSummaryRequest is the request body for PATCH /api/v1/sessions/{external_id}/summary

type UpdateSummaryResponse

type UpdateSummaryResponse struct {
	Status string `json:"status"`
}

UpdateSummaryResponse is the response for PATCH /api/v1/sessions/{external_id}/summary

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL