Documentation
¶
Index ¶
- func ComputeFileHash(path string) (string, error)
- func ComputeHash(r io.Reader) (string, error)
- type Emitter
- type Engine
- func (e *Engine) FindSourceFile(sessionID string) string
- func (e *Engine) InjectSkipCache(entries map[string]int64)
- func (e *Engine) LastSync() time.Time
- func (e *Engine) LastSyncStartedAt() time.Time
- func (e *Engine) LastSyncStats() SyncStats
- func (e *Engine) RecomputeSignals(ctx context.Context, sessionID string) error
- func (e *Engine) ResyncAll(ctx context.Context, onProgress ProgressFunc) (stats SyncStats)
- func (e *Engine) SnapshotSkipCache() map[string]int64
- func (e *Engine) SyncAll(ctx context.Context, onProgress ProgressFunc) (stats SyncStats)
- func (e *Engine) SyncAllSince(ctx context.Context, since time.Time, onProgress ProgressFunc) (stats SyncStats)
- func (e *Engine) SyncPaths(paths []string)
- func (e *Engine) SyncSingleSession(sessionID string) (err error)
- type EngineConfig
- type Phase
- type Progress
- type ProgressFunc
- type SyncResult
- type SyncStats
- type Watcher
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ComputeFileHash ¶
ComputeFileHash returns the SHA-256 hex digest of the file at path.
Types ¶
type Emitter ¶ added in v0.23.0
type Emitter interface {
Emit(scope string)
}
Emitter is notified after a sync pass writes data. Implementations must be thread-safe; Emit is called from whatever goroutine runs the sync pass (e.g., the file watcher, a periodic timer, or a handler goroutine triggered by POST /api/v1/sync).
Emit must not block. A slow implementation can delay the sync pipeline; see server.Broadcaster for the production implementation, which drops events on full per-subscriber buffers.
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine orchestrates session file discovery and sync.
func NewEngine ¶
func NewEngine( database *db.DB, cfg EngineConfig, ) *Engine
NewEngine creates a sync engine. It pre-populates the in-memory skip cache from the database so that files skipped in a prior run are not re-parsed on startup, and migrates legacy codex_exec skip entries on first run under the new bulk-sync behavior.
func (*Engine) FindSourceFile ¶
FindSourceFile locates the original source file for a session ID. It first checks the stored file_path from the database (handles cases where filename differs from session ID, e.g. Zencoder header ID vs filename), then falls back to agent-specific path reconstruction.
func (*Engine) InjectSkipCache ¶ added in v0.23.0
InjectSkipCache merges entries into the in-memory skip cache. Used by remote sync to pre-populate with translated paths.
func (*Engine) LastSyncStartedAt ¶ added in v0.20.0
LastSyncStartedAt returns the recorded start time of the most recent sync. Returns zero time if no sync has run. Use this as the mtime cutoff for quick incremental syncs — anything modified at or after this time must be re-evaluated.
func (*Engine) LastSyncStats ¶
LastSyncStats returns statistics from the last sync.
func (*Engine) RecomputeSignals ¶ added in v0.23.0
RecomputeSignals recomputes signals for a single session from existing DB data. Returns nil on success (including when the session no longer exists). Returns an error when the recompute could not complete -- BackfillSignals uses that signal to keep the one-shot completion marker unset so the next startup can retry.
func (*Engine) ResyncAll ¶ added in v0.6.0
func (e *Engine) ResyncAll( ctx context.Context, onProgress ProgressFunc, ) (stats SyncStats)
ResyncAll builds a fresh database from scratch, syncs all sessions into it, copies insights from the old DB, then atomically swaps the files and reopens the original DB handle. This avoids the per-row trigger overhead of bulk deleting hundreds of thousands of messages in place.
func (*Engine) SnapshotSkipCache ¶ added in v0.23.0
SnapshotSkipCache returns a copy of the in-memory skip cache.
func (*Engine) SyncAll ¶
func (e *Engine) SyncAll( ctx context.Context, onProgress ProgressFunc, ) (stats SyncStats)
SyncAll discovers and syncs all session files from all agents.
func (*Engine) SyncAllSince ¶ added in v0.20.0
func (e *Engine) SyncAllSince( ctx context.Context, since time.Time, onProgress ProgressFunc, ) (stats SyncStats)
SyncAllSince syncs only files whose mtime is at or after the given cutoff time. Use a zero time to sync everything (equivalent to SyncAll). The cutoff is applied after discovery; directory traversal still walks all session directories. Typical callers pass a small safety margin behind the last successful sync start to avoid missing files that were being written during a prior sync.
func (*Engine) SyncPaths ¶ added in v0.3.0
SyncPaths syncs only the specified changed file paths instead of discovering and hashing all session files. Paths that don't match known session file patterns are silently ignored.
func (*Engine) SyncSingleSession ¶
SyncSingleSession re-syncs a single session by its ID and uses the existing DB project as fallback where applicable.
type EngineConfig ¶ added in v0.10.0
type EngineConfig struct {
AgentDirs map[parser.AgentType][]string
Machine string
BlockedResultCategories []string
// IDPrefix is prepended to all session IDs. Used by
// remote sync to namespace IDs by host (e.g. "host~").
IDPrefix string
// PathRewriter transforms file paths before storage.
// Used by remote sync to replace temp paths with
// "host:/remote/path" references.
PathRewriter func(string) string
// Ephemeral disables sync-state persistence (timestamps
// and skip cache) so remote sync does not interfere with
// local sync watermarks or pollute the skipped_files table
// with temp-dir paths.
Ephemeral bool
// Emitter, when non-nil, is called once after each sync pass
// that wrote data. Safe to leave nil (e.g., in PG serve mode
// where the engine is not run).
Emitter Emitter
}
EngineConfig holds the configuration needed by the sync engine, replacing per-agent positional parameters.
type Progress ¶
type Progress struct {
Phase Phase `json:"phase"`
CurrentProject string `json:"current_project,omitempty"`
ProjectsTotal int `json:"projects_total"`
ProjectsDone int `json:"projects_done"`
SessionsTotal int `json:"sessions_total"`
SessionsDone int `json:"sessions_done"`
MessagesIndexed int `json:"messages_indexed"`
}
Progress reports sync progress to listeners.
type ProgressFunc ¶
type ProgressFunc func(Progress)
ProgressFunc is called with progress updates during sync.
type SyncResult ¶
type SyncResult struct {
SessionID string `json:"session_id"`
Project string `json:"project"`
Skipped bool `json:"skipped"`
Messages int `json:"messages"`
}
SyncResult describes the outcome of syncing a single session.
type SyncStats ¶
type SyncStats struct {
TotalSessions int `json:"total_sessions"`
Synced int `json:"synced"`
Skipped int `json:"skipped"`
Failed int `json:"failed"`
OrphanedCopied int `json:"orphaned_copied,omitempty"`
Warnings []string `json:"warnings,omitempty"`
Aborted bool `json:"aborted,omitempty"`
// contains filtered or unexported fields
}
SyncStats summarizes a full sync run.
TotalSessions counts discovered files plus OpenCode sessions. Synced counts sessions (one file can produce multiple via fork detection; OpenCode adds sessions directly). Failed counts files with hard parse/stat errors. filesOK counts files that produced at least one session — used by ResyncAll to compare against Failed on the same unit.
func (*SyncStats) RecordFailed ¶ added in v0.8.0
func (s *SyncStats) RecordFailed()
RecordFailed increments the hard-failure counter.
func (*SyncStats) RecordSkip ¶
func (s *SyncStats) RecordSkip()
RecordSkip increments the skipped session counter.
func (*SyncStats) RecordSynced ¶
RecordSynced adds n to the synced session counter.
type Watcher ¶
type Watcher struct {
// contains filtered or unexported fields
}
Watcher uses fsnotify to watch session directories for changes and triggers a callback with debouncing.
func NewWatcher ¶
func NewWatcher(debounce time.Duration, onChange func(paths []string), excludes []string) (*Watcher, error)
NewWatcher creates a file watcher that calls onChange when files are modified after the debounce period elapses.
func (*Watcher) Start ¶
func (w *Watcher) Start()
Start begins processing file events in a goroutine.
func (*Watcher) WatchRecursive ¶
WatchRecursive walks a directory tree and adds all subdirectories to the watch list. Returns the number of directories watched and unwatched (failed to add).
func (*Watcher) WatchShallow ¶ added in v0.20.0
WatchShallow adds only the root directory to the watch list, without recursing into subdirectories. Use this for directories with many subdirectories where periodic sync handles changes. Returns true if the directory was successfully watched.