Documentation
¶
Overview ¶
Package daemon provides background sync operations for the ledger.
Package daemon provides heartbeat file management for daemon health monitoring.
Heartbeat Storage Strategy ¶
Heartbeats are stored in different locations depending on what they're monitoring:
- WORKSPACE heartbeats: ~/.cache/sageox/<endpoint>/heartbeats/<workspace_id>.jsonl
- LEDGER heartbeats: ~/.cache/sageox/<endpoint>/heartbeats/<workspace_id>_ledger.jsonl
- TEAM heartbeats: ~/.cache/sageox/<endpoint>/heartbeats/<team_id>.jsonl
Why workspace_id (not repo_id) for workspace/ledger heartbeats? ¶
CRITICAL: workspace_id is a hash of the project root PATH, not the repo identity. This prevents collisions when users have multiple git worktrees of the same repo.
Scenario that breaks with repo_id:
- User has repo "foo" with repo_id="repo_abc123"
- User creates worktrees: ~/work/foo-main, ~/work/foo-feature1, ~/work/foo-feature2
- Each worktree has its own daemon instance (one per project root)
- All three daemons would write to the SAME heartbeat file: repo_abc123.jsonl
- Result: Race conditions, confusing PID/workspace data, can't track per-worktree health
By using workspace_id (hash of ~/work/foo-main vs ~/work/foo-feature1):
- Each worktree gets its own heartbeat file: a1b2c3d4.jsonl, e5f6g7h8.jsonl, etc.
- Each daemon's heartbeat is isolated
- Can track health of each worktree independently
Why global cache (not in-repo .sageox/cache/)? ¶
Originally heartbeats were written to .sageox/cache/heartbeats.jsonl inside each repo. This worked fine for workspaces (each worktree has its own .sageox/), but caused problems for ledgers and team contexts:
- Ledgers are SHARED git repos (in sibling dirs like project_sageox/)
- Team contexts are SHARED git repos (in ~/.local/share/sageox/<endpoint>/teams/)
- Writing .sageox/ directories into shared repos pollutes them with machine-specific data
- Even with .gitignore, the directories shouldn't exist in shared repos
Solution: Global cache for all heartbeats, using appropriate identifiers:
- workspace_id: Unique per daemon instance (solves worktree problem)
- team_id: Shared across workspaces (team contexts are multi-project)
Package daemon implements the background sync daemon for ledger and team contexts.
The daemon performs git pull (read) operations for ledger and team context sync. The CLI handles add/commit/push (write) operations via the session upload pipeline. Exception: GitHubSyncManager also performs add/commit/push for data/github/ files, since these are idempotent and last-write-wins safe (accept-theirs conflict resolution).
NETWORK DISCONNECTION HANDLING ¶
The daemon operates normally when the internet is disconnected. This is NOT a failure mode - developers frequently work offline (planes, cafes, etc.).
Design principles:
- Network failures are expected and handled gracefully
- Logs should NOT fill up during disconnection (use Warn, not Error)
- Operations retry on the next sync interval when connectivity returns
- The daemon should return to normal operation automatically when reconnected
SageOx is multiplayer, but the underlying git repos work fine offline. Only API calls and git fetch require daemon connectivity; push is CLI-side.
Index ¶
- Constants
- Variables
- func CurrentWorkspaceID() string
- func EnsureDaemon() error
- func EnsureDaemonAttached() error
- func ErrorStorePath() string
- func ErrorStorePathForWorkspace(workspaceID string) string
- func FormatDaemonList(daemons []DaemonInfo) string
- func FormatNotRunning(inProject bool) string
- func FormatStarting() string
- func FormatStatus(status *StatusData, cliVersion string) string
- func FormatStatusVerbose(status *StatusData, history []SyncEvent, cliVersion string) string
- func FormatStatusWithSparkline(status *StatusData, history []SyncEvent, cliVersion string) string
- func FormatStuck() string
- func HasConfirmRequired(issues []DaemonIssue) bool
- func IsDaemonDisabled() bool
- func IsHealthy() error
- func IsRunning() bool
- func IsStarting() bool
- func KillStaleDaemon(workspaceID string) error
- func KillStaleDaemonForCurrentWorkspace() error
- func LegacyWorkspaceID() string
- func LogPath() string
- func LogPathForWorkspace(repoID, workspaceID string) string
- func MaxIssueSeverity(issues []DaemonIssue) string
- func PidPath() string
- func PidPathForWorkspace(workspaceID string) string
- func RegisterDaemon(workspacePath, version string) error
- func RegistryPath() string
- func RepoBasedWorkspaceID(projectRoot string) string
- func SaveSyncState(workspacePath string, state *SyncState) error
- func ShouldUseDaemon() bool
- func SocketPath() string
- func SocketPathForWorkspace(workspaceID string) string
- func StabilizeCWD()
- func UnregisterDaemon() error
- func UserHeartbeatPath(ep, repoID, workspaceID string) string
- func UserLedgerHeartbeatPath(ep, repoID, workspaceID string) string
- func UserTeamHeartbeatPath(ep, teamID string) string
- func Version() string
- func WaitForProcessExit(pid int, timeout time.Duration) bool
- func WorkspaceID(workspacePath string) string
- func WriteHeartbeatToPath(heartbeatPath string, entry HeartbeatEntry) error
- type ActiveAgentResolver
- type ActivityEntry
- type ActivitySummary
- type ActivitySummarySource
- type ActivityTracker
- func (t *ActivityTracker) Clear(key string)
- func (t *ActivityTracker) Count(key string) int
- func (t *ActivityTracker) Get(key string) []time.Time
- func (t *ActivityTracker) Has(key string) bool
- func (t *ActivityTracker) Keys() []string
- func (t *ActivityTracker) Last(key string) time.Time
- func (t *ActivityTracker) Record(key string)
- func (t *ActivityTracker) RecordAt(key string, ts time.Time)
- func (t *ActivityTracker) Reset()
- type AgentContextStats
- type AgentSession
- type AuthenticatedUser
- type CallbackService
- func (c *CallbackService) Activity()
- func (c *CallbackService) Checkout(payload CheckoutPayload, progress *ProgressWriter) (*CheckoutResult, error)
- func (c *CallbackService) CodeIndex(payload CodeIndexPayload, progress *ProgressWriter) (*CodeIndexResult, error)
- func (c *CallbackService) CodeStatus() *CodeDBStats
- func (c *CallbackService) Doctor() *DoctorResponse
- func (c *CallbackService) Friction(payload FrictionPayload)
- func (c *CallbackService) GetErrors() []StoredError
- func (c *CallbackService) Heartbeat(callerID string, payload json.RawMessage)
- func (c *CallbackService) Instances() []InstanceInfo
- func (c *CallbackService) MarkErrors(ids []string)
- func (c *CallbackService) PauseMurmuring(agentID string)
- func (c *CallbackService) PublishMurmur(payload MurmurPayload)
- func (c *CallbackService) ResumeMurmuring(agentID string)
- func (c *CallbackService) SessionFinalize(payload SessionFinalizeIPCPayload)
- func (c *CallbackService) SessionWatchStart(payload SessionWatchStartPayload)
- func (c *CallbackService) SessionWatchStop(payload SessionWatchStopPayload)
- func (c *CallbackService) Sessions() []AgentSession
- func (c *CallbackService) Status() *StatusData
- func (c *CallbackService) Stop()
- func (c *CallbackService) Sync() error
- func (c *CallbackService) SyncHistory() []SyncEvent
- func (c *CallbackService) SyncWithProgress(progress *ProgressWriter) error
- func (c *CallbackService) TeamSync(progress *ProgressWriter) error
- func (c *CallbackService) Telemetry(payload json.RawMessage)
- func (c *CallbackService) TriggerGC() *TriggerGCResponse
- func (c *CallbackService) WhisperHistory(agentID string, before time.Time, limit int) (*WhisperHistoryResponse, error)
- func (c *CallbackService) Whispers(agentID string, attention whisperstore.Attention, topics []string) ([]whisperstore.WhisperEntry, error)
- type CallerInfo
- type ChangeAccumulator
- type ChangeType
- type CheckoutPayload
- type CheckoutProgress
- type CheckoutResult
- type Client
- func NewClientForCurrentRepo() *Client
- func NewClientForCurrentRepoWithTimeout(timeout time.Duration) *Client
- func NewClientWithSocket(socketPath string) *Client
- func NewClientWithSocketAndTimeout(socketPath string, timeout time.Duration) *Client
- func TryConnect() *Client
- func TryConnectForCheckout() *Client
- func TryConnectForCheckoutWithRetry(maxRetries int, initialDelay time.Duration) *Client
- func TryConnectForSync() *Client
- func TryConnectForSyncWithRetry(maxRetries int, initialDelay time.Duration) *Client
- func TryConnectOrDirect() *Client
- func TryConnectOrDirectForCheckout() *Client
- func TryConnectOrDirectForSync() *Client
- func TryConnectWithRetry(maxRetries int, initialDelay time.Duration) *Client
- func (c *Client) Checkout(payload CheckoutPayload, onProgress ProgressCallback) (*CheckoutResult, error)
- func (c *Client) CodeIndex(payload CodeIndexPayload, onProgress ProgressCallback) (*CodeIndexResult, error)
- func (c *Client) CodeStatus() (*CodeDBStats, error)
- func (c *Client) Connect() (net.Conn, error)
- func (c *Client) Doctor() (*DoctorResponse, error)
- func (c *Client) GetUnviewedErrors() ([]StoredError, error)
- func (c *Client) Instances() ([]InstanceInfo, error)
- func (c *Client) MarkErrorsViewed(ids []string) error
- func (c *Client) Murmur(payload MurmurPayload) error
- func (c *Client) MurmurPause(agentID string) error
- func (c *Client) MurmurResume(agentID string) error
- func (c *Client) Ping() error
- func (c *Client) RequestSync() error
- func (c *Client) SendOneWay(msg Message) error
- func (c *Client) SessionFinalize(payload SessionFinalizeIPCPayload) error
- func (c *Client) SessionWatchStart(payload SessionWatchStartPayload) error
- func (c *Client) SessionWatchStop(payload SessionWatchStopPayload) error
- func (c *Client) Sessions() ([]AgentSession, error)
- func (c *Client) Status() (*StatusData, error)
- func (c *Client) Stop() error
- func (c *Client) SyncHistory() ([]SyncEvent, error)
- func (c *Client) SyncWithProgress(onProgress ProgressCallback) error
- func (c *Client) TeamSyncWithProgress(onProgress ProgressCallback) error
- func (c *Client) TriggerGC() (*TriggerGCResponse, error)
- func (c *Client) WhisperHistory(agentID string, before time.Time, limit int) (*WhisperHistoryResponse, error)
- func (c *Client) Whispers(agentID string, attention string, topics []string) (*WhispersResponse, error)
- type CodeDBMaintainer
- type CodeDBManager
- func (m *CodeDBManager) BuildLedgerIndex(ctx context.Context, ledgerPath string)
- func (m *CodeDBManager) CheckFreshness(ctx context.Context)
- func (m *CodeDBManager) Index(ctx context.Context, payload CodeIndexPayload, pw *ProgressWriter) (*CodeIndexResult, error)
- func (m *CodeDBManager) RefreshDirtyOverlay(ctx context.Context)
- func (m *CodeDBManager) SetIssueTracker(tracker *IssueTracker)
- func (m *CodeDBManager) SetLedgerPath(path string)
- func (m *CodeDBManager) Stats() CodeDBStats
- func (m *CodeDBManager) UpdateProjectRoot(path string)
- type CodeDBStats
- type CodeIndexPayload
- type CodeIndexResult
- type Config
- type CredentialProvider
- type DBMaintainer
- type DBMaintenanceResult
- type DBMaintenanceScheduler
- func (s *DBMaintenanceScheduler) Names() []string
- func (s *DBMaintenanceScheduler) Register(m DBMaintainer)
- func (s *DBMaintenanceScheduler) RunAll(ctx context.Context) []DBMaintenanceResult
- func (s *DBMaintenanceScheduler) RunOne(ctx context.Context, name string) *DBMaintenanceResult
- func (s *DBMaintenanceScheduler) Start(ctx context.Context, wg *sync.WaitGroup, interval time.Duration)
- type Daemon
- type DaemonInfo
- type DaemonIssue
- type DaemonService
- type DaemonState
- type DirtyOverlayDebouncer
- type DoctorResponse
- type ErrorStore
- func (e *ErrorStore) Add(err StoredError)
- func (e *ErrorStore) Cleanup(maxAge time.Duration)
- func (e *ErrorStore) Clear()
- func (e *ErrorStore) Count() int
- func (e *ErrorStore) GetAll() []StoredError
- func (e *ErrorStore) GetUnviewed() []StoredError
- func (e *ErrorStore) Load() error
- func (e *ErrorStore) MarkAllViewed()
- func (e *ErrorStore) MarkViewed(id string)
- func (e *ErrorStore) Save() error
- func (e *ErrorStore) UnviewedCount() int
- type ExtendedStatus
- type FileChange
- type FileChangeMurmurPublisher
- type FileSystem
- type FileSystemWatcher
- type FlushThrottle
- type FrictionCollector
- func (f *FrictionCollector) IsEnabled() bool
- func (f *FrictionCollector) Record(event friction.FrictionEvent)
- func (f *FrictionCollector) RecordFromIPC(payload FrictionPayload)
- func (f *FrictionCollector) SetAuthTokenGetter(cb func() string)
- func (f *FrictionCollector) Start()
- func (f *FrictionCollector) Stats() FrictionStats
- func (f *FrictionCollector) Stop()
- type FrictionPayload
- type FrictionStats
- type GitHubSyncManager
- type GitHubSyncStats
- type GitTrackedMatcher
- type HandlerResult
- type HealthStatus
- type HeartbeatCreds
- type HeartbeatEntry
- type HeartbeatHandler
- func (h *HeartbeatHandler) CleanupStaleAgents(activeIDs []string)
- func (h *HeartbeatHandler) GetActivitySummary() ActivitySummary
- func (h *HeartbeatHandler) GetAgentActivity() *ActivityTracker
- func (h *HeartbeatHandler) GetAgentContextStats(agentID string) AgentContextStats
- func (h *HeartbeatHandler) GetAgentLastWhisper(agentID string) time.Time
- func (h *HeartbeatHandler) GetAgentPID(agentID string) int
- func (h *HeartbeatHandler) GetAgentParentID(agentID string) string
- func (h *HeartbeatHandler) GetAgentType(agentID string) string
- func (h *HeartbeatHandler) GetAuthToken() string
- func (h *HeartbeatHandler) GetAuthenticatedUser() *AuthenticatedUser
- func (h *HeartbeatHandler) GetCallers() []CallerInfo
- func (h *HeartbeatHandler) GetCredentials() (*HeartbeatCreds, time.Time)
- func (h *HeartbeatHandler) GetRepoActivity() *ActivityTracker
- func (h *HeartbeatHandler) GetTeamActivity() *ActivityTracker
- func (h *HeartbeatHandler) GetWorkspaceActivity() *ActivityTracker
- func (h *HeartbeatHandler) Handle(callerID string, payload json.RawMessage)
- func (h *HeartbeatHandler) HasValidCredentials() bool
- func (h *HeartbeatHandler) LastCallerPath() string
- func (h *HeartbeatHandler) RecordWhisperDelivery(agentID string)
- func (h *HeartbeatHandler) SetActivityCallback(cb func())
- func (h *HeartbeatHandler) SetAgentHeartbeatCallback(cb func(agentID string))
- func (h *HeartbeatHandler) SetCallerPathCallback(cb func(path string))
- func (h *HeartbeatHandler) SetInitialCredentials(creds *HeartbeatCreds)
- func (h *HeartbeatHandler) SetTeamNeededCallback(cb func(teamID string))
- func (h *HeartbeatHandler) SetVersionMismatchCallback(cb func(cliVersion, daemonVersion string))
- type HeartbeatPayload
- type Instance
- type InstanceInfo
- type InstanceStore
- func (s *InstanceStore) ActiveCount() int
- func (s *InstanceStore) Count() int
- func (s *InstanceStore) Deregister(id string)
- func (s *InstanceStore) Get(id string) *Instance
- func (s *InstanceStore) GetActive() []*Instance
- func (s *InstanceStore) GetAll() []*Instance
- func (s *InstanceStore) GetStale(threshold time.Duration) []*Instance
- func (s *InstanceStore) Heartbeat(id string)
- func (s *InstanceStore) Register(inst *Instance)
- func (s *InstanceStore) StartCleanup()
- func (s *InstanceStore) Stop()
- type InstancesResponse
- type IssueTracker
- func (t *IssueTracker) Clear()
- func (t *IssueTracker) ClearIssue(issueType, repo string)
- func (t *IssueTracker) ClearRepo(repo string)
- func (t *IssueTracker) Count() int
- func (t *IssueTracker) GetIssues() []DaemonIssue
- func (t *IssueTracker) MaxSeverity() string
- func (t *IssueTracker) NeedsHelp() bool
- func (t *IssueTracker) SetIssue(issue DaemonIssue)
- type ManagedRepoPullOpts
- type ManagedRepoPullResult
- type MarkErrorsPayload
- type Message
- type MessageHandler
- type MessageRouter
- type MockFileSystem
- func (m *MockFileSystem) AddDir(path string, entries []string)
- func (m *MockFileSystem) AddFile(path string, size int64, mode fs.FileMode)
- func (m *MockFileSystem) ReadDir(name string) ([]fs.DirEntry, error)
- func (m *MockFileSystem) SetReadDirError(path string, err error)
- func (m *MockFileSystem) SetStatError(path string, err error)
- func (m *MockFileSystem) Stat(name string) (fs.FileInfo, error)
- type MockFileSystemWatcher
- func (m *MockFileSystemWatcher) Add(path string) error
- func (m *MockFileSystemWatcher) AddedPaths() []string
- func (m *MockFileSystemWatcher) Close() error
- func (m *MockFileSystemWatcher) CloseErrors()
- func (m *MockFileSystemWatcher) CloseEvents()
- func (m *MockFileSystemWatcher) Errors() <-chan error
- func (m *MockFileSystemWatcher) Events() <-chan fsnotify.Event
- func (m *MockFileSystemWatcher) SendError(err error)
- func (m *MockFileSystemWatcher) SendEvent(event fsnotify.Event)
- func (m *MockFileSystemWatcher) SetAddError(err error)
- func (m *MockFileSystemWatcher) SetCloseError(err error)
- type MurmurNudgeSource
- func (s *MurmurNudgeSource) Interval() time.Duration
- func (s *MurmurNudgeSource) IsAgentPaused(agentID string) bool
- func (s *MurmurNudgeSource) Name() string
- func (s *MurmurNudgeSource) PauseAgent(agentID string)
- func (s *MurmurNudgeSource) Produce(_ context.Context) []whisperstore.WhisperEntry
- func (s *MurmurNudgeSource) ResumeAgent(agentID string)
- type MurmurPausePayload
- type MurmurPayload
- type MurmurPublisher
- type MurmurRelay
- type ProgressCallback
- type ProgressResponse
- type ProgressWriter
- type ProjectWatcher
- type RealFileSystem
- type RealFileSystemWatcher
- type Registry
- func (r *Registry) FindByRepoID(repoID string) *DaemonInfo
- func (r *Registry) FindByWorkspaceID(workspaceID string) *DaemonInfo
- func (r *Registry) List() []DaemonInfo
- func (r *Registry) Register(info DaemonInfo) error
- func (r *Registry) Save() error
- func (r *Registry) Unregister(workspaceID string) error
- type RepoStats
- type Response
- type RingBuffer
- type Server
- func (s *Server) SetActivityCallback(cb func())
- func (s *Server) SetCheckoutHandler(...)
- func (s *Server) SetCodeIndexHandler(...)
- func (s *Server) SetCodeStatusHandler(cb func() *CodeDBStats)
- func (s *Server) SetDoctorHandler(handler func() *DoctorResponse)
- func (s *Server) SetErrorsHandler(onGet func() []StoredError, onMark func(ids []string))
- func (s *Server) SetFrictionHandler(cb func(payload FrictionPayload))
- func (s *Server) SetHandlers(onSync func() error, onStop func(), onStatus func() *StatusData)
- func (s *Server) SetHeartbeatHandler(cb func(callerID string, payload json.RawMessage))
- func (s *Server) SetInstancesHandler(cb func() []InstanceInfo)
- func (s *Server) SetMurmurHandler(fn func(payload MurmurPayload))
- func (s *Server) SetPauseMurmuringHandler(fn func(agentID string))
- func (s *Server) SetResumeMurmuringHandler(fn func(agentID string))
- func (s *Server) SetSessionFinalizeHandler(fn func(payload SessionFinalizeIPCPayload))
- func (s *Server) SetSessionWatchStartHandler(fn func(payload SessionWatchStartPayload))
- func (s *Server) SetSessionWatchStopHandler(fn func(payload SessionWatchStopPayload))
- func (s *Server) SetSessionsHandler(cb func() []AgentSession)
- func (s *Server) SetSyncHandler(cb func(progress *ProgressWriter) error)
- func (s *Server) SetSyncHistoryHandler(handler func() []SyncEvent)
- func (s *Server) SetTeamSyncHandler(cb func(progress *ProgressWriter) error)
- func (s *Server) SetTelemetryHandler(cb func(payload json.RawMessage))
- func (s *Server) SetTriggerGCHandler(handler func() *TriggerGCResponse)
- func (s *Server) SetWhisperHistoryHandler(...)
- func (s *Server) SetWhispersHandler(...)
- func (s *Server) Start(ctx context.Context) error
- type SessionFinalizeIPCPayload
- type SessionWatchStartPayload
- type SessionWatchStopPayload
- type SessionsResponse
- type StatusData
- type StatusJSON
- type StoredError
- type SyncEvent
- type SyncMetrics
- func (m *SyncMetrics) RecordConflict()
- func (m *SyncMetrics) RecordDivergence()
- func (m *SyncMetrics) RecordPullFailure()
- func (m *SyncMetrics) RecordPullSuccess(duration time.Duration)
- func (m *SyncMetrics) RecordTeamSync()
- func (m *SyncMetrics) RecordTeamSyncError()
- func (m *SyncMetrics) Snapshot() SyncMetricsSnapshot
- type SyncMetricsSnapshot
- type SyncOption
- type SyncScheduler
- func (s *SyncScheduler) Checkout(payload CheckoutPayload, progress *ProgressWriter) (*CheckoutResult, error)
- func (s *SyncScheduler) LastError() (string, time.Time)
- func (s *SyncScheduler) LastRemoteChange(repoPath string) time.Time
- func (s *SyncScheduler) LastSync() time.Time
- func (s *SyncScheduler) LedgerMu() *sync.Mutex
- func (s *SyncScheduler) Metrics() *SyncMetrics
- func (s *SyncScheduler) RecentErrorCount() int
- func (s *SyncScheduler) RemoteChangeActivity() *ActivityTracker
- func (s *SyncScheduler) SetActivityCallback(cb func())
- func (s *SyncScheduler) SetAgentWorkSignal(ch chan<- struct{})
- func (s *SyncScheduler) SetAuthTokenGetter(cb func() string)
- func (s *SyncScheduler) SetCodeDBManager(m *CodeDBManager)
- func (s *SyncScheduler) SetGitHubSyncManager(m *GitHubSyncManager)
- func (s *SyncScheduler) SetIssueTracker(tracker *IssueTracker)
- func (s *SyncScheduler) SetMurmurRelay(r *MurmurRelay)
- func (s *SyncScheduler) SetTelemetryCallback(cb func(syncType, operation, status string, duration time.Duration))
- func (s *SyncScheduler) SetWhisperRegistry(r *WhisperRegistry)
- func (s *SyncScheduler) Start(ctx context.Context)
- func (s *SyncScheduler) Sync() error
- func (s *SyncScheduler) SyncHistory() []SyncEvent
- func (s *SyncScheduler) SyncStats() SyncStatistics
- func (s *SyncScheduler) SyncWithProgress(progress *ProgressWriter) error
- func (s *SyncScheduler) TeamContextStatus() []TeamContextSyncStatus
- func (s *SyncScheduler) TeamSync(progress *ProgressWriter) error
- func (s *SyncScheduler) TriggerAntiEntropy()
- func (s *SyncScheduler) TriggerGC(ctx context.Context) *TriggerGCResponse
- func (s *SyncScheduler) TriggerSync()
- func (s *SyncScheduler) WorkspaceRegistry() *WorkspaceRegistry
- type SyncState
- type SyncStatistics
- type TeamContextSyncStatus
- type TelemetryCollector
- func (c *TelemetryCollector) IsEnabled() bool
- func (c *TelemetryCollector) Record(event string, props map[string]any)
- func (c *TelemetryCollector) RecordCodeIndexComplete(result *CodeIndexResult, status string)
- func (c *TelemetryCollector) RecordDaemonCrash(uptime time.Duration, errType, errMsg string)
- func (c *TelemetryCollector) RecordDaemonShutdown(uptime time.Duration, reason string)
- func (c *TelemetryCollector) RecordDaemonStartup()
- func (c *TelemetryCollector) RecordFromIPC(event string, props map[string]any)
- func (c *TelemetryCollector) RecordSyncComplete(syncType, operation, status string, duration time.Duration, recordsCount int)
- func (c *TelemetryCollector) Start()
- func (c *TelemetryCollector) Stats() TelemetryStats
- func (c *TelemetryCollector) Stop()
- type TelemetryEvent
- type TelemetryPayload
- type TelemetryStats
- type TimeBasedSource
- type TriggerGCResponse
- type VersionCache
- type VersionCacheData
- type Watcher
- type WatcherFactory
- type WhisperDBMaintainer
- type WhisperHistoryPayload
- type WhisperHistoryResponse
- type WhisperRegistry
- func (r *WhisperRegistry) Add(scope string, entries ...whisperstore.WhisperEntry) error
- func (r *WhisperRegistry) AddTeamStore(teamID string, store *whisperstore.Store)
- func (r *WhisperRegistry) Close() error
- func (r *WhisperRegistry) EnforceMaxSize(maxBytes int64)
- func (r *WhisperRegistry) GetAllWhispers(agentID string) ([]whisperstore.WhisperEntry, error)
- func (r *WhisperRegistry) GetCursor(agentID string) (time.Time, error)
- func (r *WhisperRegistry) GetWhispers(agentID string, attention whisperstore.Attention, topics []string) ([]whisperstore.WhisperEntry, error)
- func (r *WhisperRegistry) GetWhispersPage(agentID string, before time.Time, limit int) ([]whisperstore.WhisperEntry, bool, error)
- func (r *WhisperRegistry) HasTeamStore(teamID string) bool
- func (r *WhisperRegistry) IsRelayed(murmurID, scope string) (bool, error)
- func (r *WhisperRegistry) LedgerStore() *whisperstore.Store
- func (r *WhisperRegistry) MarkRelayed(murmurID, scope string) error
- func (r *WhisperRegistry) Prune(retention time.Duration)
- func (r *WhisperRegistry) RemoveCursor(agentID string)
- func (r *WhisperRegistry) ReopenLedgerStore(dbPath string) error
- type WhisperScheduler
- type WhispersPayload
- type WhispersResponse
- type WorkspaceRegistry
- func (r *WorkspaceRegistry) CleanupRevokedTeamContexts(currentTeamIDs map[string]bool)
- func (r *WorkspaceRegistry) ClearCloneRetry(id string)
- func (r *WorkspaceRegistry) ClearSyncFailures(id string)
- func (r *WorkspaceRegistry) ClearWorkspaceError(id string)
- func (r *WorkspaceRegistry) GetAllWorkspaces() []WorkspaceState
- func (r *WorkspaceRegistry) GetCloneRetryInfo(id string) (attempts int, nextAttempt time.Time)
- func (r *WorkspaceRegistry) GetEndpoint() string
- func (r *WorkspaceRegistry) GetGCInterval(path string) int
- func (r *WorkspaceRegistry) GetLastGCTime(id string) time.Time
- func (r *WorkspaceRegistry) GetLedger() *WorkspaceState
- func (r *WorkspaceRegistry) GetLedgerPath() string
- func (r *WorkspaceRegistry) GetRepoID() string
- func (r *WorkspaceRegistry) GetSyncIntervalMin(path string) int
- func (r *WorkspaceRegistry) GetSyncRetryInfo(id string) (failures int, nextAttempt time.Time)
- func (r *WorkspaceRegistry) GetTeamContextStatus() []TeamContextSyncStatus
- func (r *WorkspaceRegistry) GetTeamContexts() []WorkspaceState
- func (r *WorkspaceRegistry) GetWorkspace(id string) *WorkspaceState
- func (r *WorkspaceRegistry) HasFetchHead(id string) (exists bool, mtime time.Time)
- func (r *WorkspaceRegistry) InitializeLedger(cloneURL, projectRoot string)
- func (r *WorkspaceRegistry) InvalidateConfigCache()
- func (r *WorkspaceRegistry) LoadFromConfig() error
- func (r *WorkspaceRegistry) PersistLedgerPath(path string) error
- func (r *WorkspaceRegistry) ProjectTeamID() string
- func (r *WorkspaceRegistry) RecordSyncAttempt(id string)
- func (r *WorkspaceRegistry) RecordSyncFailure(id string)
- func (r *WorkspaceRegistry) RefreshExists()
- func (r *WorkspaceRegistry) RegisterTeamContextsFromAPI(teamContexts []api.RepoDetailTeamContext)
- func (r *WorkspaceRegistry) SetCloneRetry(id string, attempts int, nextAttempt time.Time)
- func (r *WorkspaceRegistry) SetGCInterval(path string, days int)
- func (r *WorkspaceRegistry) SetLedgerCloneURL(cloneURL string) bool
- func (r *WorkspaceRegistry) SetSyncInProgress(id string, inProgress bool)
- func (r *WorkspaceRegistry) SetSyncIntervalMin(path string, minutes int)
- func (r *WorkspaceRegistry) SetWorkspaceError(id, errMsg string)
- func (r *WorkspaceRegistry) ShouldRetryClone(id string) bool
- func (r *WorkspaceRegistry) ShouldSync(id string) bool
- func (r *WorkspaceRegistry) UpdateConfigLastSync(id string) error
- func (r *WorkspaceRegistry) UpdateLastGC(id string)
- type WorkspaceState
- type WorkspaceSyncStatus
- type WorkspaceType
Constants ¶
const ( ErrorCodeSyncFailed = "sync_failed" ErrorCodeAuthExpired = "auth_expired" ErrorCodeGitConflict = "git_conflict" ErrorCodeNetworkError = "network_error" ErrorCodeDiskFull = "disk_full" ErrorCodePermissionDeny = "permission_denied" )
Error codes for common daemon errors.
const ( StatusActive = "active" // recently received heartbeat StatusIdle = "idle" // no heartbeat within idle threshold StatusStale = "stale" // no heartbeat within stale threshold StatusExited = "exited" // parent process confirmed dead via kill(pid, 0) )
Instance status constants.
const ( // IdleThreshold is how long without heartbeat before instance is "idle". // Hook-based heartbeats fire per prompt turn, so gaps of several minutes are // normal during active coding. 5 minutes allows for a few long tool-call turns. IdleThreshold = 5 * time.Minute // StaleThreshold is how long without heartbeat before instance is "stale". // PID liveness checks keep known-alive agents visible beyond this threshold, // so 15 minutes is sufficient for agents without a trackable PID. StaleThreshold = 15 * time.Minute // MaxAge is the maximum age of an instance before auto-cleanup. // Prevents abandoned instances from accumulating indefinitely. MaxAge = 24 * time.Hour // CleanupInterval is how often to run the cleanup routine. CleanupInterval = 1 * time.Minute // MaxInstances caps the number of tracked instances to prevent unbounded growth. // When exceeded, oldest stale instances are evicted first. MaxInstances = 100 )
Default instance timing thresholds.
const ( MsgTypeStatus = "status" MsgTypeSync = "sync" MsgTypeTeamSync = "team_sync" // on-demand team context sync MsgTypePing = "ping" MsgTypeStop = "stop" MsgTypeVersion = "version" MsgTypeSyncHistory = "sync_history" MsgTypeHeartbeat = "heartbeat" // one-way, no response expected MsgTypeCheckout = "checkout" // synchronous git clone operation MsgTypeTelemetry = "telemetry" // one-way, no response expected MsgTypeFriction = "friction" // one-way, friction event for analytics MsgTypeGetErrors = "get_errors" // retrieve unviewed daemon errors MsgTypeMarkErrors = "mark_errors" // mark errors as viewed MsgTypeSessions = "sessions" // get active agent sessions (deprecated: use instances) MsgTypeInstances = "instances" // get active agent instances MsgTypeDoctor = "doctor" // trigger daemon health checks (anti-entropy, etc.) MsgTypeTriggerGC = "trigger_gc" // force GC reclone for team contexts MsgTypeCodeIndex = "code_index" // index local code with progress MsgTypeCodeStatus = "code_status" // get code index status/stats MsgTypeWhispers = "whispers" // query whisper entries for an agent MsgTypeWhisperHistory = "whisper_history" // query all whispers (pending + delivered) without advancing cursor MsgTypeSessionFinalize = "session_finalize" // one-way, trigger async session upload+finalization MsgTypeMurmur = "murmur" // one-way, write+commit a murmur file in ledger/team context MsgTypeMurmurPause = "murmur_pause" // one-way, pause murmur nudging for an agent MsgTypeMurmurResume = "murmur_resume" // one-way, resume murmur nudging for an agent MsgTypeSessionWatchStart = "session_watch_start" // one-way, start tailing a hookless agent session MsgTypeSessionWatchStop = "session_watch_stop" // one-way, stop tailing a session )
Message types for IPC communication.
const ( SeverityWarning = "warning" SeverityError = "error" SeverityCritical = "critical" )
Severity constants for DaemonIssue. No "info" level - if the daemon needs help, it's at least a warning.
const ( IssueTypeMergeConflict = "merge_conflict" IssueTypeMissingScaffolding = "missing_scaffolding" IssueTypeDiverged = "diverged" IssueTypeAuthExpiring = "auth_expiring" IssueTypeGitLock = "git_lock" IssueTypeCloneFailed = "clone_failed" IssueTypeSyncBackoff = "sync_backoff" IssueTypeDirtyWorkspace = "dirty_workspace" IssueTypeCodeDBCacheWiped = "codedb_cache_wiped" IssueTypeDirtyOverlayFailed = "dirty_overlay_failed" IssueTypeRebaseStuck = "rebase_stuck" )
Issue type constants.
const BootstrapGracePeriod = 3 * time.Minute
BootstrapGracePeriod is the window after daemon start during which zero syncs is not considered a problem — the daemon is still performing its first pull.
const ( // DefaultStalenessThreshold is the duration after which sync data is considered stale. DefaultStalenessThreshold = 24 * time.Hour )
const SessionStaleThreshold = StaleThreshold
Backward compatibility aliases for SessionStaleThreshold TODO: Remove after updating all consumers
Variables ¶
var ErrCloneSemaphoreTimeout = errors.New("clone semaphore timeout")
ErrCloneSemaphoreTimeout indicates all clone slots were busy and the wait timed out. This is a transient error that should be retried on the next sync cycle without exponential backoff — the slots will free up when in-progress clones finish.
var ErrInvalidRepoPath = errors.New("invalid repo path: path traversal or unsafe location detected")
ErrInvalidRepoPath indicates the repo path failed security validation.
var ErrNotRunning = errors.New("daemon not running")
ErrNotRunning indicates the daemon is not running.
var ErrShutdownTimeout = errors.New("shutdown timeout: goroutines did not finish in time")
ErrShutdownTimeout indicates goroutines did not finish within the timeout.
Functions ¶
func CurrentWorkspaceID ¶
func CurrentWorkspaceID() string
CurrentWorkspaceID returns the ID for the current working directory. Prefers repo_id-based identity so multiple clones/worktrees of the same repo share one daemon. Falls back to path-based ID for non-initialized repos. The result is cached on first call so the daemon continues to use the correct workspace ID even if its CWD is later deleted (e.g. macOS tmpdir cleanup while the daemon is running long-term).
Note: This uses raw os.Getwd() for the direct socket path. Subdirectory normalization happens in resolveSocketPath() (registry fallback) and findProjectRootForDaemon() (daemon startup CWD), not here, because the sync.Once caching makes it unsafe to depend on walk-up discovery in tests.
func EnsureDaemon ¶
func EnsureDaemon() error
EnsureDaemon ensures the daemon is running, starting it if necessary. Claude manages the daemon process lifecycle (launching and killing), so setsid/detach is no longer needed. The daemon relies on its inactivity timeout to self-exit when no heartbeats arrive. Returns nil on success (daemon is running), or an error if it couldn't be started. This is a no-op if daemon is already running or disabled via SAGEOX_DAEMON=false.
The function waits up to 2 seconds for the daemon to become available after starting.
func EnsureDaemonAttached ¶
func EnsureDaemonAttached() error
EnsureDaemonAttached is an alias for EnsureDaemon. Previously started the daemon without setsid (attached to caller's process group). Now that Claude manages the daemon lifecycle, setsid is removed entirely and both functions behave identically.
func ErrorStorePath ¶
func ErrorStorePath() string
ErrorStorePath returns the default path to the error store file.
func ErrorStorePathForWorkspace ¶
ErrorStorePathForWorkspace returns the error store path for a specific workspace.
func FormatDaemonList ¶
func FormatDaemonList(daemons []DaemonInfo) string
FormatDaemonList formats a list of daemons for display.
func FormatNotRunning ¶ added in v0.3.0
FormatNotRunning renders the "daemon not running" state with context. inProject indicates whether the user is inside an initialized SageOx project.
func FormatStarting ¶ added in v0.3.0
func FormatStarting() string
FormatStarting renders the daemon status when the process exists but IPC isn't ready yet.
func FormatStatus ¶
func FormatStatus(status *StatusData, cliVersion string) string
FormatStatus renders compact daemon status (Tufte-inspired: maximize data-ink ratio). cliVersion is the current CLI version for match comparison.
func FormatStatusVerbose ¶
func FormatStatusVerbose(status *StatusData, history []SyncEvent, cliVersion string) string
FormatStatusVerbose includes sparkline, internals, and sync history table.
func FormatStatusWithSparkline ¶ added in v0.3.0
func FormatStatusWithSparkline(status *StatusData, history []SyncEvent, cliVersion string) string
FormatStatusWithSparkline adds a 4h activity sparkline to the status output.
func FormatStuck ¶ added in v0.6.0
func FormatStuck() string
FormatStuck renders the daemon status when the process is alive but has exceeded the startup window without accepting IPC connections — indicating it is hung.
func HasConfirmRequired ¶
func HasConfirmRequired(issues []DaemonIssue) bool
HasConfirmRequired returns true if any issue in the slice requires confirmation.
func IsDaemonDisabled ¶
func IsDaemonDisabled() bool
IsDaemonDisabled returns true if the daemon has been explicitly disabled via the SAGEOX_DAEMON=false environment variable.
func IsHealthy ¶
func IsHealthy() error
IsHealthy checks if the daemon is running AND responsive. Returns nil if healthy, error describing the failure mode otherwise.
Uses a 100ms timeout - plenty for localhost IPC. If you need custom timeouts, use NewClientForCurrentRepoWithTimeout(t).Ping() directly.
func IsRunning ¶
func IsRunning() bool
IsRunning checks if the daemon is fully running and responsive to IPC.
func IsStarting ¶ added in v0.3.0
func IsStarting() bool
IsStarting checks if a daemon process exists but is not yet responding to IPC. Returns true for both Starting and Stuck states — callers that need to distinguish them should call GetState() directly.
func KillStaleDaemon ¶ added in v0.6.0
KillStaleDaemon kills any existing daemon for the given workspace before starting a new one. Escalation: IPC stop (graceful) → SIGTERM (forceful) → error. Returns nil if the stale daemon was successfully stopped or none existed. Returns an error if the stale daemon could not be stopped (caller should not start a new one). KillStaleDaemon stops a daemon by workspace ID using the full cleanup flow: IPC stop → wait → SIGTERM (with PID-reuse guard) → wait → cleanup registry/files.
func KillStaleDaemonForCurrentWorkspace ¶ added in v0.6.0
func KillStaleDaemonForCurrentWorkspace() error
KillStaleDaemonForCurrentWorkspace is the public entry point for pre-start kill. Called by `ox daemon start` CLI command.
func LegacyWorkspaceID ¶ added in v0.5.0
func LegacyWorkspaceID() string
LegacyWorkspaceID returns the old path-based workspace ID for the current working directory. Needed for migration: stopping daemons that were started under the old path-hash scheme. Cached separately from CurrentWorkspaceID to avoid interference.
func LogPath ¶
func LogPath() string
LogPath returns the path to the daemon log file for the current workspace. Requires project to be initialized with repo_id.
func LogPathForWorkspace ¶
LogPathForWorkspace returns the log path for a specific workspace and repo.
func MaxIssueSeverity ¶
func MaxIssueSeverity(issues []DaemonIssue) string
MaxIssueSeverity returns the highest severity among the given issues. Returns empty string if the slice is empty.
func PidPath ¶
func PidPath() string
PidPath returns the path to the daemon PID file for the current workspace. Note: PID files are NOT used for liveness detection - use file locks instead.
func PidPathForWorkspace ¶
PidPathForWorkspace returns the PID path for a specific workspace.
func RegisterDaemon ¶
RegisterDaemon registers the current daemon in the registry.
func RegistryPath ¶
func RegistryPath() string
RegistryPath returns the path to the daemon registry file.
func RepoBasedWorkspaceID ¶ added in v0.5.0
RepoBasedWorkspaceID returns a workspace ID derived from repo_id in .sageox/config.json. Multiple clones or worktrees of the same repo produce the same ID, so they share a single daemon. Falls back to path-based WorkspaceID if repo_id is unavailable.
func SaveSyncState ¶ added in v0.3.0
SaveSyncState writes sync state to .sageox/cache/sync-state.json within workspacePath. Writes to .sageox/cache/ which is gitignored — local-only, never committed to the ledger.
func ShouldUseDaemon ¶
func ShouldUseDaemon() bool
ShouldUseDaemon returns true if we should attempt to use the daemon. Returns true if the daemon is currently running.
func SocketPath ¶
func SocketPath() string
SocketPath returns the path to the daemon Unix socket for the current workspace.
func SocketPathForWorkspace ¶
SocketPathForWorkspace returns the socket path for a specific workspace.
func StabilizeCWD ¶ added in v0.3.0
func StabilizeCWD()
StabilizeCWD moves the daemon's working directory to $HOME so that git commands don't fail if the original CWD is deleted (e.g. tmpdir cleanup). Must be called AFTER CurrentWorkspaceID() has cached the workspace ID.
func UnregisterDaemon ¶
func UnregisterDaemon() error
UnregisterDaemon removes the current daemon from the registry. Uses cached workspace ID since CWD may have been stabilized to $HOME.
func UserHeartbeatPath ¶
UserHeartbeatPath returns the global cache path for workspace heartbeat.
CRITICAL: Uses BOTH repo_id AND workspace_id in filename.
Why both?
- workspace_id (hash of path) prevents collisions between worktrees
- repo_id makes debugging easier - you can see which repo the heartbeat belongs to
Format: ~/.cache/sageox/<endpoint>/heartbeats/<repo_id>_<workspace_id>.jsonl
Example with multiple worktrees of repo "foo" (repo_id=repo_abc123):
- Worktree ~/work/foo-main → workspace_id=a1b2c3d4 → repo_abc123_a1b2c3d4.jsonl
- Worktree ~/work/foo-fix123 → workspace_id=e5f6g7h8 → repo_abc123_e5f6g7h8.jsonl
Both have same repo_id but different workspace_ids → no collision, easy to identify.
func UserLedgerHeartbeatPath ¶
UserLedgerHeartbeatPath returns the global cache path for ledger heartbeat.
CRITICAL: Uses BOTH repo_id AND workspace_id because each worktree has its own ledger. Ledgers use the sibling directory pattern: <project>_sageox/<endpoint>/ledger So different worktrees → different ledger paths → need different heartbeat files.
Format: ~/.cache/sageox/<endpoint>/heartbeats/<repo_id>_<workspace_id>_ledger.jsonl
Example:
- Worktree ~/work/foo-main has ledger ~/work/foo-main_sageox/sageox.ai/ledger → repo_abc123_a1b2c3d4_ledger.jsonl
- Worktree ~/work/foo-fix has ledger ~/work/foo-fix_sageox/sageox.ai/ledger → repo_abc123_e5f6g7h8_ledger.jsonl
These are DIFFERENT ledger repos → separate heartbeats, but same repo_id for grouping.
func UserTeamHeartbeatPath ¶
UserTeamHeartbeatPath returns the global cache path for team context heartbeat.
Uses team_id (NOT workspace_id) because team contexts are shared across projects. A team context repo at ~/.local/share/sageox/<endpoint>/teams/<team_id>/ may be used by multiple projects/worktrees simultaneously. All daemons monitoring this team context write to the same heartbeat file (last-write-wins is acceptable for monitoring data - we just care that SOME daemon is syncing it).
~/.cache/sageox/<endpoint>/heartbeats/<team_id>.jsonl
Example:
- Team "engineering" (team_id=team_abc123) is used by projects A, B, C
- All three daemons write to the same heartbeat: team_abc123.jsonl
- Doctor sees "team synced 2m ago" - doesn't matter which daemon did it
func Version ¶
func Version() string
Version returns the daemon version including build timestamp. Used for heartbeat version comparison to detect when CLI has been rebuilt. Includes BuildDate so dirty rebuilds (same git hash) still trigger restart.
func WaitForProcessExit ¶ added in v0.6.0
WaitForProcessExit polls signal 0 until the process exits or timeout is reached. Returns true if the process exited, false if it's still alive after timeout.
func WorkspaceID ¶
WorkspaceID generates a stable identifier for a workspace path. Uses SHA256 of the real (symlink-resolved) absolute path, truncated to 8 chars. This is the legacy path-based ID, still used for non-initialized repos.
func WriteHeartbeatToPath ¶
func WriteHeartbeatToPath(heartbeatPath string, entry HeartbeatEntry) error
WriteHeartbeatToPath writes a heartbeat entry to an explicit path (for global cache). Maintains a rolling history of the last N heartbeats. Used for writing to ~/.cache/sageox/<endpoint>/heartbeats/<id>.jsonl
Types ¶
type ActiveAgentResolver ¶ added in v0.6.1
type ActiveAgentResolver interface {
ActiveAgentIDs() []string
}
ActiveAgentResolver returns IDs of active AI coworker instances.
type ActivityEntry ¶
type ActivityEntry struct {
Key string `json:"key"`
Count int `json:"count"`
Last time.Time `json:"last"`
Timestamps []time.Time `json:"timestamps,omitempty"` // for sparkline
}
ActivityEntry represents activity for a single key.
type ActivitySummary ¶
type ActivitySummary struct {
Repos []ActivityEntry `json:"repos,omitempty"`
Teams []ActivityEntry `json:"teams,omitempty"`
Workspaces []ActivityEntry `json:"workspaces,omitempty"`
Agents []ActivityEntry `json:"agents,omitempty"` // connected agent sessions
}
ActivitySummary returns a summary of all activity for status display.
type ActivitySummarySource ¶ added in v0.6.0
type ActivitySummarySource struct {
// contains filtered or unexported fields
}
ActivitySummarySource produces periodic activity whispers. Reports active coworker count and last sync time.
func NewActivitySummarySource ¶ added in v0.6.0
func NewActivitySummarySource(heartbeat *HeartbeatHandler, scheduler *SyncScheduler) *ActivitySummarySource
NewActivitySummarySource creates a source that periodically summarizes coworker activity and sync status as ambient whispers.
func (*ActivitySummarySource) Interval ¶ added in v0.6.0
func (s *ActivitySummarySource) Interval() time.Duration
func (*ActivitySummarySource) Name ¶ added in v0.6.0
func (s *ActivitySummarySource) Name() string
func (*ActivitySummarySource) Produce ¶ added in v0.6.0
func (s *ActivitySummarySource) Produce(_ context.Context) []whisperstore.WhisperEntry
type ActivityTracker ¶
type ActivityTracker struct {
// contains filtered or unexported fields
}
ActivityTracker stores recent timestamps for any named key. Thread-safe. Capped to N entries per key for memory efficiency. Also limits total number of keys to prevent unbounded memory growth. Useful for tracking heartbeats, syncs, and other events for sparkline display.
func NewActivityTracker ¶
func NewActivityTracker(capacity int) *ActivityTracker
NewActivityTracker creates a new activity tracker with the given capacity per key.
func NewActivityTrackerWithMaxKeys ¶
func NewActivityTrackerWithMaxKeys(capacity, maxKeys int) *ActivityTracker
NewActivityTrackerWithMaxKeys creates a tracker with custom capacity and key limit.
func (*ActivityTracker) Clear ¶
func (t *ActivityTracker) Clear(key string)
Clear removes all entries for the given key.
func (*ActivityTracker) Count ¶
func (t *ActivityTracker) Count(key string) int
Count returns the number of recorded events for the key.
func (*ActivityTracker) Get ¶
func (t *ActivityTracker) Get(key string) []time.Time
Get returns recent timestamps for the key (oldest first). Returns nil if key not found.
func (*ActivityTracker) Has ¶ added in v0.3.0
func (t *ActivityTracker) Has(key string) bool
Has returns true if the key has been recorded at least once.
func (*ActivityTracker) Keys ¶
func (t *ActivityTracker) Keys() []string
Keys returns all tracked keys.
func (*ActivityTracker) Last ¶
func (t *ActivityTracker) Last(key string) time.Time
Last returns the most recent timestamp for the key. Returns zero time if key not found or no entries.
func (*ActivityTracker) Record ¶
func (t *ActivityTracker) Record(key string)
Record adds a timestamp for the given key.
type AgentContextStats ¶ added in v0.3.0
type AgentContextStats struct {
ContextTokens int64 `json:"context_tokens"`
CommandCount int `json:"command_count"`
}
AgentContextStats holds cumulative context consumption for an agent.
type AgentSession ¶
type AgentSession struct {
// AgentID is the short agent identifier (e.g., "Oxa7b3").
AgentID string `json:"agent_id"`
// WorkspacePath is the workspace/repo the agent is working in.
WorkspacePath string `json:"workspace_path"`
// LastHeartbeat is when the agent last sent a heartbeat.
LastHeartbeat time.Time `json:"last_heartbeat"`
// HeartbeatCount is the number of heartbeats received from this agent.
HeartbeatCount int `json:"heartbeat_count"`
// Status is "active" (recent heartbeat) or "idle" (stale heartbeat).
Status string `json:"status"`
}
AgentSession represents an active agent session from a daemon. Used by the sessions IPC message to report connected agents.
func GetAllSessions ¶
func GetAllSessions() ([]AgentSession, error)
GetAllSessions queries all running daemons and aggregates their agent sessions. Returns sessions from all workspaces, sorted by last heartbeat (most recent first). Deprecated: Use GetAllInstances instead.
type AuthenticatedUser ¶
type AuthenticatedUser struct {
Email string `json:"email,omitempty"`
ID string `json:"id,omitempty"`
}
AuthenticatedUser holds info about the authenticated user.
type CallbackService ¶ added in v0.6.0
type CallbackService struct {
// contains filtered or unexported fields
}
CallbackService implements DaemonService using individual callback functions. It lets callers wire handlers incrementally via Set*Handler methods, which is useful in tests and during staged daemon startup.
func (*CallbackService) Activity ¶ added in v0.6.0
func (c *CallbackService) Activity()
func (*CallbackService) Checkout ¶ added in v0.6.0
func (c *CallbackService) Checkout(payload CheckoutPayload, progress *ProgressWriter) (*CheckoutResult, error)
func (*CallbackService) CodeIndex ¶ added in v0.6.0
func (c *CallbackService) CodeIndex(payload CodeIndexPayload, progress *ProgressWriter) (*CodeIndexResult, error)
func (*CallbackService) CodeStatus ¶ added in v0.6.0
func (c *CallbackService) CodeStatus() *CodeDBStats
func (*CallbackService) Doctor ¶ added in v0.6.0
func (c *CallbackService) Doctor() *DoctorResponse
func (*CallbackService) Friction ¶ added in v0.6.0
func (c *CallbackService) Friction(payload FrictionPayload)
func (*CallbackService) GetErrors ¶ added in v0.6.0
func (c *CallbackService) GetErrors() []StoredError
func (*CallbackService) Heartbeat ¶ added in v0.6.0
func (c *CallbackService) Heartbeat(callerID string, payload json.RawMessage)
func (*CallbackService) Instances ¶ added in v0.6.0
func (c *CallbackService) Instances() []InstanceInfo
func (*CallbackService) MarkErrors ¶ added in v0.6.0
func (c *CallbackService) MarkErrors(ids []string)
func (*CallbackService) PauseMurmuring ¶ added in v0.6.0
func (c *CallbackService) PauseMurmuring(agentID string)
func (*CallbackService) PublishMurmur ¶ added in v0.6.0
func (c *CallbackService) PublishMurmur(payload MurmurPayload)
func (*CallbackService) ResumeMurmuring ¶ added in v0.6.0
func (c *CallbackService) ResumeMurmuring(agentID string)
func (*CallbackService) SessionFinalize ¶ added in v0.6.0
func (c *CallbackService) SessionFinalize(payload SessionFinalizeIPCPayload)
func (*CallbackService) SessionWatchStart ¶ added in v0.6.1
func (c *CallbackService) SessionWatchStart(payload SessionWatchStartPayload)
func (*CallbackService) SessionWatchStop ¶ added in v0.6.1
func (c *CallbackService) SessionWatchStop(payload SessionWatchStopPayload)
func (*CallbackService) Sessions ¶ added in v0.6.0
func (c *CallbackService) Sessions() []AgentSession
func (*CallbackService) Status ¶ added in v0.6.0
func (c *CallbackService) Status() *StatusData
func (*CallbackService) Stop ¶ added in v0.6.0
func (c *CallbackService) Stop()
func (*CallbackService) Sync ¶ added in v0.6.0
func (c *CallbackService) Sync() error
func (*CallbackService) SyncHistory ¶ added in v0.6.0
func (c *CallbackService) SyncHistory() []SyncEvent
func (*CallbackService) SyncWithProgress ¶ added in v0.6.0
func (c *CallbackService) SyncWithProgress(progress *ProgressWriter) error
func (*CallbackService) TeamSync ¶ added in v0.6.0
func (c *CallbackService) TeamSync(progress *ProgressWriter) error
func (*CallbackService) Telemetry ¶ added in v0.6.0
func (c *CallbackService) Telemetry(payload json.RawMessage)
func (*CallbackService) TriggerGC ¶ added in v0.6.0
func (c *CallbackService) TriggerGC() *TriggerGCResponse
func (*CallbackService) WhisperHistory ¶ added in v0.6.0
func (c *CallbackService) WhisperHistory(agentID string, before time.Time, limit int) (*WhisperHistoryResponse, error)
func (*CallbackService) Whispers ¶ added in v0.6.0
func (c *CallbackService) Whispers(agentID string, attention whisperstore.Attention, topics []string) ([]whisperstore.WhisperEntry, error)
type CallerInfo ¶ added in v0.5.0
type CallerInfo struct {
ID string `json:"id"` // CallerID (path-based hash)
Path string `json:"path"` // absolute path of the clone/worktree
LastSeen time.Time `json:"last_seen"` // last heartbeat received
AgentID string `json:"agent_id,omitempty"` // last known agent in this clone
}
CallerInfo tracks a connected clone/worktree.
type ChangeAccumulator ¶ added in v0.6.1
type ChangeAccumulator struct {
// contains filtered or unexported fields
}
ChangeAccumulator batches raw fsnotify events into settled change sets. Inspired by Watchman's PendingCollection + settle mechanism.
func NewChangeAccumulator ¶ added in v0.6.1
func NewChangeAccumulator(settlePeriod time.Duration) *ChangeAccumulator
NewChangeAccumulator creates an accumulator with the given settle period.
func (*ChangeAccumulator) AddEvent ¶ added in v0.6.1
func (a *ChangeAccumulator) AddEvent(relPath string, op fsnotify.Op, isDir bool)
AddEvent adds a filesystem event, collapsing with existing pending changes.
func (*ChangeAccumulator) DrainSettled ¶ added in v0.6.1
func (a *ChangeAccumulator) DrainSettled() []FileChange
DrainSettled returns and clears all settled changes.
func (*ChangeAccumulator) PendingCount ¶ added in v0.6.1
func (a *ChangeAccumulator) PendingCount() int
PendingCount returns the number of pending (unsettled) changes.
func (*ChangeAccumulator) SetOnSettled ¶ added in v0.6.1
func (a *ChangeAccumulator) SetOnSettled(fn func())
SetOnSettled registers a callback invoked (in a goroutine) each time pending changes settle. Used by the dirty overlay debouncer to trigger index rebuilds.
func (*ChangeAccumulator) Stop ¶ added in v0.6.1
func (a *ChangeAccumulator) Stop()
Stop prevents further timer callbacks.
type ChangeType ¶ added in v0.6.1
type ChangeType string
ChangeType categorizes a filesystem change.
const ( ChangeCreated ChangeType = "created" ChangeModified ChangeType = "modified" ChangeDeleted ChangeType = "deleted" ChangeRenamed ChangeType = "renamed" )
type CheckoutPayload ¶
type CheckoutPayload struct {
RepoPath string `json:"repo_path"` // target path for clone
CloneURL string `json:"clone_url"` // git clone URL
RepoType string `json:"repo_type"` // "ledger" or "team_context"
}
CheckoutPayload is the payload for checkout requests.
type CheckoutProgress ¶
type CheckoutProgress struct {
Stage string `json:"stage"` // "connecting", "cloning", "verifying"
Percent *int `json:"percent,omitempty"` // 0-100, nil if unknown
Message string `json:"message"` // human-readable progress message
}
CheckoutProgress is sent during long-running checkout operations.
type CheckoutResult ¶
type CheckoutResult struct {
Path string `json:"path"` // actual path where repo exists
AlreadyExists bool `json:"already_exists"` // true if repo already existed
Cloned bool `json:"cloned"` // true if we performed a clone
}
CheckoutResult is the result of a checkout operation.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client provides IPC communication with the daemon.
func NewClientForCurrentRepo ¶ added in v0.6.0
func NewClientForCurrentRepo() *Client
NewClientForCurrentRepo creates an IPC client that uses the registry to find the daemon for the current repo, even if its workspace ID differs from what the current binary computes (e.g., after a workspace ID format change). Use this in status/stop/restart commands where you need to reach the daemon for the project in the current directory regardless of workspace ID drift.
func NewClientForCurrentRepoWithTimeout ¶ added in v0.6.0
NewClientForCurrentRepoWithTimeout is like NewClientForCurrentRepo but with a custom timeout.
func NewClientWithSocket ¶
NewClientWithSocket creates an IPC client for a specific socket path. Used when connecting to daemons for other workspaces.
func NewClientWithSocketAndTimeout ¶ added in v0.6.0
NewClientWithSocketAndTimeout creates an IPC client for a specific socket path with custom timeout. Use longer timeouts for stop operations where the daemon may be busy.
func TryConnect ¶
func TryConnect() *Client
TryConnect attempts to connect to the daemon. Returns the client if connected, nil otherwise.
func TryConnectForCheckout ¶
func TryConnectForCheckout() *Client
TryConnectForCheckout attempts to connect for checkout operations. Uses a long timeout since clones can take time.
func TryConnectForCheckoutWithRetry ¶
TryConnectForCheckoutWithRetry is like TryConnectWithRetry but uses longer timeouts for checkout operations.
func TryConnectForSync ¶
func TryConnectForSync() *Client
TryConnectForSync attempts to connect for sync operations. Uses a longer timeout since syncs can take time.
func TryConnectForSyncWithRetry ¶
TryConnectForSyncWithRetry is like TryConnectWithRetry but uses longer timeouts for sync operations.
func TryConnectOrDirect ¶
func TryConnectOrDirect() *Client
TryConnectOrDirect attempts to connect to the daemon. Returns nil if daemon is not running or unreachable. On transient connection failures, retries once with exponential backoff.
func TryConnectOrDirectForCheckout ¶
func TryConnectOrDirectForCheckout() *Client
TryConnectOrDirectForCheckout is like TryConnectOrDirect but uses longer timeouts for checkout operations.
func TryConnectOrDirectForSync ¶
func TryConnectOrDirectForSync() *Client
TryConnectOrDirectForSync is like TryConnectOrDirect but uses longer timeouts for sync operations.
func TryConnectWithRetry ¶
TryConnectWithRetry attempts to connect to the daemon with retry logic. On transient failures, retries up to maxRetries times with exponential backoff. initialDelay is the delay before the first retry.
func (*Client) Checkout ¶
func (c *Client) Checkout(payload CheckoutPayload, onProgress ProgressCallback) (*CheckoutResult, error)
Checkout requests the daemon to clone a repository. The onProgress callback is called for each progress update (may be nil). Uses a long timeout (60s) since clones can take time.
func (*Client) CodeIndex ¶ added in v0.4.0
func (c *Client) CodeIndex(payload CodeIndexPayload, onProgress ProgressCallback) (*CodeIndexResult, error)
CodeIndex requests the daemon to index code with progress updates. Uses an idle timeout: the deadline resets on each progress message, so the connection stays alive as long as the daemon is making progress. Indexing large repos can take many minutes but emits frequent progress updates.
func (*Client) CodeStatus ¶ added in v0.4.0
func (c *Client) CodeStatus() (*CodeDBStats, error)
CodeStatus requests the current code index status from the daemon.
func (*Client) Connect ¶
Connect attempts to connect to the daemon. Returns error if daemon is not running.
func (*Client) Doctor ¶
func (c *Client) Doctor() (*DoctorResponse, error)
Doctor triggers daemon health checks including anti-entropy (self-healing). Returns the results of the health checks.
func (*Client) GetUnviewedErrors ¶
func (c *Client) GetUnviewedErrors() ([]StoredError, error)
GetUnviewedErrors retrieves unviewed daemon errors.
func (*Client) Instances ¶
func (c *Client) Instances() ([]InstanceInfo, error)
Instances gets active agent instances from this daemon.
func (*Client) MarkErrorsViewed ¶
MarkErrorsViewed marks errors as viewed. If ids is empty, marks all errors as viewed.
func (*Client) Murmur ¶ added in v0.6.0
func (c *Client) Murmur(payload MurmurPayload) error
Murmur sends a murmur write+commit request to the daemon (fire-and-forget). The daemon writes the murmur file and commits it; the CLI returns immediately.
func (*Client) MurmurPause ¶ added in v0.6.0
MurmurPause sends a fire-and-forget request to pause murmur nudging for an agent.
func (*Client) MurmurResume ¶ added in v0.6.0
MurmurResume sends a fire-and-forget request to resume murmur nudging for an agent.
func (*Client) RequestSync ¶
RequestSync requests the daemon to perform a sync.
func (*Client) SendOneWay ¶
SendOneWay sends a message without waiting for response. Connect, write, close immediately - truly fire-and-forget at IPC layer. Used for heartbeats and other non-blocking notifications.
func (*Client) SessionFinalize ¶ added in v0.5.0
func (c *Client) SessionFinalize(payload SessionFinalizeIPCPayload) error
SessionFinalize sends a fire-and-forget request to finalize a session. The daemon will upload to LFS, commit, push, and generate summary artifacts.
func (*Client) SessionWatchStart ¶ added in v0.6.1
func (c *Client) SessionWatchStart(payload SessionWatchStartPayload) error
SessionWatchStart sends a fire-and-forget request to start tailing a session file. The daemon begins tailing the agent's native session file and writing to raw.jsonl.
func (*Client) SessionWatchStop ¶ added in v0.6.1
func (c *Client) SessionWatchStop(payload SessionWatchStopPayload) error
SessionWatchStop sends a fire-and-forget request to stop tailing a session.
func (*Client) Sessions ¶
func (c *Client) Sessions() ([]AgentSession, error)
Sessions gets active agent sessions from this daemon. Deprecated: Use Instances() instead.
func (*Client) Status ¶
func (c *Client) Status() (*StatusData, error)
Status gets the daemon status.
func (*Client) SyncHistory ¶
SyncHistory gets the recent sync history.
func (*Client) SyncWithProgress ¶
func (c *Client) SyncWithProgress(onProgress ProgressCallback) error
SyncWithProgress requests the daemon to perform a sync with progress updates. The onProgress callback is called for each progress update (may be nil). Uses an idle timeout: the deadline resets on each progress message, so the connection stays alive as long as the daemon is making progress.
func (*Client) TeamSyncWithProgress ¶
func (c *Client) TeamSyncWithProgress(onProgress ProgressCallback) error
TeamSyncWithProgress requests the daemon to sync all team contexts with progress updates. The onProgress callback is called for each progress update (may be nil). Uses an idle timeout: the deadline resets on each progress message, so the connection stays alive as long as the daemon is making progress.
func (*Client) TriggerGC ¶ added in v0.3.0
func (c *Client) TriggerGC() (*TriggerGCResponse, error)
TriggerGC requests the daemon to force a GC reclone of team contexts.
func (*Client) WhisperHistory ¶ added in v0.6.0
func (c *Client) WhisperHistory(agentID string, before time.Time, limit int) (*WhisperHistoryResponse, error)
WhisperHistory queries a page of whispers for an agent without advancing the cursor. Used for inspection — shows what has been or will be whispered to an agent. Pass before=zero and limit=0 to get the first page (most recent 50 entries). Use resp.NextCursor as before in subsequent calls when resp.HasMore is true.
type CodeDBMaintainer ¶ added in v0.6.0
type CodeDBMaintainer struct {
// contains filtered or unexported fields
}
CodeDBMaintainer wraps a CodeDBManager for the DBMaintainer interface. Opens the store on demand for maintenance since CodeDBManager doesn't hold a persistent handle.
func NewCodeDBMaintainer ¶ added in v0.6.0
func NewCodeDBMaintainer(name string, manager *CodeDBManager) *CodeDBMaintainer
NewCodeDBMaintainer creates a maintainer for a codedb store.
func (*CodeDBMaintainer) Maintain ¶ added in v0.6.0
func (m *CodeDBMaintainer) Maintain(ctx context.Context) DBMaintenanceResult
func (*CodeDBMaintainer) Name ¶ added in v0.6.0
func (m *CodeDBMaintainer) Name() string
type CodeDBManager ¶ added in v0.4.0
type CodeDBManager struct {
// contains filtered or unexported fields
}
CodeDBManager manages CodeDB indexing in the daemon. It ensures only one indexing operation runs at a time and tracks index status.
Concurrency note: The in-process mutex prevents concurrent indexing within a single daemon, but today multiple daemons can exist for the same repo (one per worktree). Cross-process safety currently relies on SQLite WAL mode with busy_timeout(5000ms) — concurrent readers are fine, but two daemons indexing simultaneously could contend on SQLite/Bleve write locks. This is a known short-term limitation. When the daemon model moves to one-per-repo (shared across worktrees), the in-process mutex will be sufficient and no flock will be needed.
func NewCodeDBManager ¶ added in v0.4.0
func NewCodeDBManager(projectRoot string, logger *slog.Logger, telemetry *TelemetryCollector) *CodeDBManager
NewCodeDBManager creates a new CodeDB manager for the given project root. Resolves the shared CodeDB path via project config (ledger cache). Falls back to the legacy per-worktree path if project config is unavailable.
func (*CodeDBManager) BuildLedgerIndex ¶ added in v0.6.1
func (m *CodeDBManager) BuildLedgerIndex(ctx context.Context, ledgerPath string)
BuildLedgerIndex builds or refreshes the ledger index from the ledger's main branch. This is independent of the worktree index — the ledger index provides committed content search even when no worktree is active. Non-blocking: if a ledger index build is already in progress, returns immediately.
func (*CodeDBManager) CheckFreshness ¶ added in v0.4.0
func (m *CodeDBManager) CheckFreshness(ctx context.Context)
CheckFreshness checks if the index needs refreshing and triggers a background re-index if needed. If no index exists yet, creates the initial index. This is non-blocking and safe to call from the scheduler or daemon startup.
Cheap pre-check: compares the current HEAD fingerprint (filesystem read, ~0.1ms) against the cached value from the last successful index. If unchanged, skips the entire doIndex pipeline (which opens SQLite, Bleve, go-git even for a no-op). On daemon startup the cache is empty, so the first call always runs doIndex to catch any changes that happened while the daemon was offline.
func (*CodeDBManager) Index ¶ added in v0.4.0
func (m *CodeDBManager) Index(ctx context.Context, payload CodeIndexPayload, pw *ProgressWriter) (*CodeIndexResult, error)
Index runs indexing with progress reporting. Only one indexing operation runs at a time. If indexing is already in progress, returns an error immediately.
TODO: When multiple daemons share the same CodeDB (worktree scenario), add a filesystem flock on the data dir to prevent concurrent write contention across processes. Until then, busy_timeout(5000ms) on SQLite provides best-effort protection but Bleve's bolt backend only allows one writer at a time and will error if two daemons index simultaneously.
func (*CodeDBManager) RefreshDirtyOverlay ¶ added in v0.6.1
func (m *CodeDBManager) RefreshDirtyOverlay(ctx context.Context)
RefreshDirtyOverlay rebuilds only the dirty file overlay index (uncommitted files). Non-blocking: if a dirty refresh or full index is already running, returns immediately. This is much cheaper than CheckFreshness — no git history scan, no symbol/comment parsing. Called by the DirtyOverlayDebouncer when project files change via fsnotify.
func (*CodeDBManager) SetIssueTracker ¶ added in v0.6.0
func (m *CodeDBManager) SetIssueTracker(tracker *IssueTracker)
SetIssueTracker wires the daemon's issue tracker so doIndex can emit structured issues when the cache directory is missing.
func (*CodeDBManager) SetLedgerPath ¶ added in v0.5.0
func (m *CodeDBManager) SetLedgerPath(path string)
SetLedgerPath sets the ledger checkout path for GitHub data indexing. Called by the daemon when the ledger workspace is discovered.
func (*CodeDBManager) Stats ¶ added in v0.4.0
func (m *CodeDBManager) Stats() CodeDBStats
Stats returns current index statistics. Returns cached stats from the last index run to avoid blocking on SQLite during active indexing. Only queries the DB on cold start (no cached stats and not currently indexing).
func (*CodeDBManager) UpdateProjectRoot ¶ added in v0.6.0
func (m *CodeDBManager) UpdateProjectRoot(path string)
UpdateProjectRoot updates the project root path used for indexing. Called when a heartbeat arrives from a different workspace (e.g., Conductor creates a new workspace after deleting the old one).
dataDir is intentionally NOT reset here: all worktrees of the same repo share the same dataDir (keyed by repo ID + endpoint). Resetting it on every heartbeat from a different worktree causes repeated config re-resolution on repos with many active Conductor workspaces, producing log spam and unnecessary I/O.
type CodeDBStats ¶ added in v0.4.0
type CodeDBStats struct {
Commits int `json:"commits"`
Blobs int `json:"blobs"`
Symbols int `json:"symbols"`
Comments int `json:"comments"`
PRs int `json:"prs"`
Issues int `json:"issues"`
Repos []RepoStats `json:"repos,omitempty"`
LastIndexed time.Time `json:"last_indexed,omitempty"`
IndexingNow bool `json:"indexing_now"`
LastError string `json:"last_error,omitempty"`
DataDir string `json:"data_dir"`
IndexExists bool `json:"index_exists"`
// ledger index fields (ledger main branch, worktree-independent)
LedgerExists bool `json:"ledger_exists"`
LedgerCommits int `json:"ledger_commits"`
LedgerIndexingNow bool `json:"ledger_indexing_now"`
}
CodeDBStats tracks index statistics.
type CodeIndexPayload ¶ added in v0.4.0
type CodeIndexPayload struct {
// URL is an optional remote git URL to index. If empty, indexes the local repo.
URL string `json:"url,omitempty"`
// Full wipes the existing index before rebuilding. Used by 'ox index --full'.
Full bool `json:"full,omitempty"`
}
CodeIndexPayload is the IPC payload for code_index requests.
type CodeIndexResult ¶ added in v0.4.0
type CodeIndexResult struct {
BlobsParsed uint64 `json:"blobs_parsed"`
SymbolsExtracted uint64 `json:"symbols_extracted"`
CommentsExtracted uint64 `json:"comments_extracted"`
// Per-stage timing in milliseconds
IndexDurationMs int64 `json:"index_duration_ms"`
SymbolDurationMs int64 `json:"symbol_duration_ms"`
CommentDurationMs int64 `json:"comment_duration_ms"`
TotalDurationMs int64 `json:"total_duration_ms"`
}
CodeIndexResult is the result of a code_index operation.
type Config ¶
type Config struct {
// SyncIntervalRead is how often to pull changes from remote.
SyncIntervalRead time.Duration
// TeamContextSyncInterval is how often to sync team context repos.
TeamContextSyncInterval time.Duration
// DebounceWindow batches rapid changes before committing.
DebounceWindow time.Duration
// InactivityTimeout is how long the daemon waits without activity before exiting.
// Zero means never exit due to inactivity.
InactivityTimeout time.Duration
// VersionCheckInterval is how often to check GitHub for new releases.
VersionCheckInterval time.Duration
// GCCheckInterval is how often to check if any workspace needs a reclone GC.
// The actual GC cadence is per-workspace from gc_interval_days in the manifest.
GCCheckInterval time.Duration
// DistillInterval is how often to trigger memory distillation.
// Zero disables automatic distillation.
DistillInterval time.Duration
// CodeDBCheckInterval is how often to run CheckFreshness to detect new commits
// (branch switches, manual commits, pulled history). Decoupled from git pull
// cadence because the dirty overlay (via fsnotify) handles uncommitted file
// search with ~5s latency — full reindex only needs to catch new commits.
// Zero disables automatic codedb freshness checks.
CodeDBCheckInterval time.Duration
// LedgerCheckInterval is how often to check if the codedb ledger index
// needs rebuilding (ledger HEAD changed). Independent of ledger pull cadence
// so ledger index rebuilds don't scale with sync frequency.
// Zero disables automatic ledger index checks.
LedgerCheckInterval time.Duration
// GitHubSyncInterval is how often to sync PRs/issues from GitHub.
// Zero disables automatic GitHub sync.
GitHubSyncInterval time.Duration
// MurmurNudgeInterval is how often to nudge agents to self-report
// what they're working on via ox murmur. Minimum 10 minutes.
// Zero disables nudging (used when murmuring config is "manual").
MurmurNudgeInterval time.Duration
// SocketCheckInterval is how often the daemon checks the registry to see if
// its PID is still registered. If a different PID is registered, the daemon
// assumes it has been superseded and exits gracefully.
// Zero disables the check.
SocketCheckInterval time.Duration
// PendingWorkGracePeriod is the maximum time the daemon stays alive
// solely for pending work after inactivity timeout is reached.
// Prevents stuck daemons when finalization hangs.
PendingWorkGracePeriod time.Duration
// AutoStart starts daemon on first ox command if true.
AutoStart bool
// LedgerPath is the path to the ledger repository.
LedgerPath string
// ProjectRoot is the path to the project root (for loading team contexts).
ProjectRoot string
}
Config holds daemon configuration settings.
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns the default daemon configuration.
type CredentialProvider ¶ added in v0.6.0
type CredentialProvider interface {
LoadCredentialsForEndpoint(endpoint string) (*gitserver.GitCredentials, error)
}
CredentialProvider abstracts git credential loading for testability.
type DBMaintainer ¶ added in v0.6.0
type DBMaintainer interface {
// Name returns a short identifier (e.g., "whisper-ledger", "codedb").
Name() string
// Maintain runs pruning, vacuum, and integrity checks.
Maintain(ctx context.Context) DBMaintenanceResult
}
DBMaintainer is implemented by any SQLite database that needs periodic maintenance.
type DBMaintenanceResult ¶ added in v0.6.0
type DBMaintenanceResult struct {
Name string
Pruned int64 // entries/rows removed
SizeBefore int64 // file size before (bytes), -1 if unknown
SizeAfter int64 // file size after (bytes), -1 if unknown
Vacuumed bool
Healthy bool // integrity check passed
Healed bool // corruption detected and auto-fixed
Duration time.Duration
Error error
}
DBMaintenanceResult captures what happened during a single DB maintenance cycle.
type DBMaintenanceScheduler ¶ added in v0.6.0
type DBMaintenanceScheduler struct {
// contains filtered or unexported fields
}
DBMaintenanceScheduler coordinates periodic maintenance across all registered databases. Each database's maintenance is independent — one failing doesn't block others.
func NewDBMaintenanceScheduler ¶ added in v0.6.0
func NewDBMaintenanceScheduler(logger *slog.Logger) *DBMaintenanceScheduler
NewDBMaintenanceScheduler creates a new scheduler.
func (*DBMaintenanceScheduler) Names ¶ added in v0.6.0
func (s *DBMaintenanceScheduler) Names() []string
Names returns the names of all registered maintainers.
func (*DBMaintenanceScheduler) Register ¶ added in v0.6.0
func (s *DBMaintenanceScheduler) Register(m DBMaintainer)
Register adds a maintainer. Must be called before Start.
func (*DBMaintenanceScheduler) RunAll ¶ added in v0.6.0
func (s *DBMaintenanceScheduler) RunAll(ctx context.Context) []DBMaintenanceResult
RunAll runs maintenance on all registered databases sequentially. Returns results for each. Errors in one DB don't affect others.
func (*DBMaintenanceScheduler) RunOne ¶ added in v0.6.0
func (s *DBMaintenanceScheduler) RunOne(ctx context.Context, name string) *DBMaintenanceResult
RunOne runs maintenance on a single named database. Returns nil if the name is not found.
type Daemon ¶
type Daemon struct {
// contains filtered or unexported fields
}
Daemon manages background ledger sync operations.
func (*Daemon) RestartRequested ¶ added in v0.5.0
RestartRequested returns true if the daemon stopped due to a version mismatch and should be re-executed with the updated binary.
type DaemonInfo ¶
type DaemonInfo struct {
WorkspaceID string `json:"workspace_id"`
WorkspacePath string `json:"workspace_path"`
RepoID string `json:"repo_id,omitempty"` // identifies repo-scoped daemons across clones
SocketPath string `json:"socket_path"`
PID int `json:"pid"`
Version string `json:"version"`
StartedAt time.Time `json:"started_at"`
}
DaemonInfo represents information about a running daemon.
func FindDaemonForRepo ¶ added in v0.5.0
func FindDaemonForRepo(repoID string) *DaemonInfo
FindDaemonForRepo scans the registry for a daemon with a matching repo_id. Returns the DaemonInfo if found, nil otherwise.
func KillAllDaemons ¶
func KillAllDaemons() ([]DaemonInfo, error)
KillAllDaemons stops all running daemons gracefully.
func ListRunningDaemons ¶
func ListRunningDaemons() ([]DaemonInfo, error)
ListRunningDaemons returns all daemons that are actually running (socket responsive). Prunes stale entries from the registry.
type DaemonIssue ¶
type DaemonIssue struct {
// Type categorizes the issue.
// Examples: "merge_conflict", "missing_scaffolding", "diverged", "auth_expiring"
Type string `json:"type"`
// Severity indicates urgency. No "info" level exists.
// - "warning": should address soon, not blocking operations
// - "error": blocking normal operation, agent should fix now
// - "critical": data at risk, urgent attention required
Severity string `json:"severity"`
// Repo identifies which repository has the issue.
// Examples: "ledger", "team-context-abc123"
// Empty string for global issues (e.g., auth expiring).
Repo string `json:"repo,omitempty"`
// Summary is a human-readable one-liner for display.
// The LLM will investigate the repo directly to understand details.
Summary string `json:"summary"`
// Since tracks when the issue was first detected.
// Useful for understanding how long an issue has been outstanding.
Since time.Time `json:"since"`
// RequiresConfirm indicates the resolution needs human approval before execution.
// When true, the agent should propose a fix and wait for user confirmation.
// When false, the agent can attempt to resolve automatically.
// This separates urgency (Severity) from authority (who decides).
RequiresConfirm bool `json:"requires_confirm,omitempty"`
}
DaemonIssue represents something the daemon cannot resolve with deterministic code. If the daemon could fix it programmatically, it already would have. These issues require LLM reasoning or human judgment to resolve.
Design principles:
- Issue granularity is (Type, Repo), not file-level. The daemon flags that a repo has a problem; the LLM investigates the repo to understand details.
- No "info" severity level. If something is just informational, the daemon doesn't need help - it's a notification, not a request for reasoning.
- Severity drives CLI behavior: warning=mention, error=fix now, critical=urgent.
- RequiresConfirm separates urgency from authority: some issues need human approval even if the agent could technically attempt a fix.
func (DaemonIssue) FormatLine ¶
func (i DaemonIssue) FormatLine(includeSeverity bool) string
FormatLine returns a formatted single-line representation of the issue. If includeSeverity is true, includes the severity tag in brackets.
type DaemonService ¶ added in v0.6.0
type DaemonService interface {
// sync operations
Sync() error
SyncWithProgress(progress *ProgressWriter) error
TeamSync(progress *ProgressWriter) error
SyncHistory() []SyncEvent
// status / query operations
Status() *StatusData
GetErrors() []StoredError
Sessions() []AgentSession // deprecated: use Instances
Instances() []InstanceInfo
Whispers(agentID string, attention whisperstore.Attention, topics []string) ([]whisperstore.WhisperEntry, error)
WhisperHistory(agentID string, before time.Time, limit int) (*WhisperHistoryResponse, error)
CodeStatus() *CodeDBStats
// mutation operations
Stop()
Checkout(payload CheckoutPayload, progress *ProgressWriter) (*CheckoutResult, error)
MarkErrors(ids []string)
TriggerGC() *TriggerGCResponse
CodeIndex(payload CodeIndexPayload, progress *ProgressWriter) (*CodeIndexResult, error)
Doctor() *DoctorResponse
SessionFinalize(payload SessionFinalizeIPCPayload)
SessionWatchStart(payload SessionWatchStartPayload)
SessionWatchStop(payload SessionWatchStopPayload)
// fire-and-forget operations
Activity()
Heartbeat(callerID string, payload json.RawMessage)
Telemetry(payload json.RawMessage)
Friction(payload FrictionPayload)
PublishMurmur(payload MurmurPayload)
PauseMurmuring(agentID string)
ResumeMurmuring(agentID string)
}
DaemonService is the interface the IPC server calls for all daemon operations. Grouping methods by concern makes it easy to see what each new IPC message needs.
type DaemonState ¶ added in v0.6.0
type DaemonState int
DaemonState describes the lifecycle state of the daemon process.
const ( // DaemonStateStopped: no PID file, dead process, or unreadable PID. DaemonStateStopped DaemonState = iota // DaemonStateStarting: process is alive but IPC not yet ready. // Normal during throttled restarts (up to 2 min) or fast initial startup. DaemonStateStarting // DaemonStateStuck: process alive, IPC unreachable, past the startup window. // PID file is older than startupStuckThreshold — the process likely hung in init. DaemonStateStuck // DaemonStateRunning: IPC socket is up and responding to pings. DaemonStateRunning )
func GetState ¶ added in v0.6.0
func GetState() DaemonState
GetState returns the current lifecycle state of the daemon. This is the canonical way to check daemon status — prefer it over the boolean helpers IsRunning/IsStarting, which are thin wrappers.
type DirtyOverlayDebouncer ¶ added in v0.6.1
type DirtyOverlayDebouncer struct {
// contains filtered or unexported fields
}
DirtyOverlayDebouncer bridges filesystem change events from the ChangeAccumulator to CodeDB dirty overlay rebuilds. It applies a debounce window (5s) and a minimum interval (30s) to avoid thrashing during active coding sessions.
func NewDirtyOverlayDebouncer ¶ added in v0.6.1
func NewDirtyOverlayDebouncer(codedb *CodeDBManager, logger *slog.Logger) *DirtyOverlayDebouncer
NewDirtyOverlayDebouncer creates a debouncer that triggers dirty overlay rebuilds after filesystem changes settle.
func (*DirtyOverlayDebouncer) OnSettled ¶ added in v0.6.1
func (d *DirtyOverlayDebouncer) OnSettled()
OnSettled is the callback wired to ChangeAccumulator.SetOnSettled(). Each call resets the debounce timer. If the minimum interval hasn't elapsed since the last rebuild, the timer is extended to fire at the next allowed time.
func (*DirtyOverlayDebouncer) Start ¶ added in v0.6.1
func (d *DirtyOverlayDebouncer) Start(ctx context.Context)
Start sets the context for dirty overlay refreshes.
func (*DirtyOverlayDebouncer) Stop ¶ added in v0.6.1
func (d *DirtyOverlayDebouncer) Stop()
Stop cancels any pending timer and prevents in-flight callbacks from executing.
type DoctorResponse ¶
type DoctorResponse struct {
AntiEntropyTriggered bool `json:"anti_entropy_triggered"`
ClonesTriggered int `json:"clones_triggered"`
SessionFinalizeTriggered bool `json:"session_finalize_triggered"`
SessionFinalizeQueued int `json:"session_finalize_queued"`
Errors []string `json:"errors,omitempty"`
}
DoctorResponse is the response for the doctor IPC message.
type ErrorStore ¶
type ErrorStore struct {
// contains filtered or unexported fields
}
ErrorStore manages daemon errors for user notification. Errors are persisted to disk and survive daemon restarts.
Thread safety: RWMutex allows concurrent reads from IPC handlers.
func NewErrorStore ¶
func NewErrorStore(path string) *ErrorStore
NewErrorStore creates a new error store at the given path. If path is empty, uses the default daemon state directory.
func (*ErrorStore) Add ¶
func (e *ErrorStore) Add(err StoredError)
Add adds an error to the store. If an error with the same code already exists, it updates the existing entry. Automatically saves to disk.
func (*ErrorStore) Cleanup ¶
func (e *ErrorStore) Cleanup(maxAge time.Duration)
Cleanup removes errors older than maxAge. Automatically saves to disk if any errors were removed.
func (*ErrorStore) Clear ¶
func (e *ErrorStore) Clear()
Clear removes all errors. Automatically saves to disk.
func (*ErrorStore) Count ¶
func (e *ErrorStore) Count() int
Count returns the total number of errors.
func (*ErrorStore) GetAll ¶
func (e *ErrorStore) GetAll() []StoredError
GetAll returns all errors (viewed and unviewed). Returns a copy sorted by timestamp (most recent first).
func (*ErrorStore) GetUnviewed ¶
func (e *ErrorStore) GetUnviewed() []StoredError
GetUnviewed returns all errors that haven't been viewed. Returns a copy sorted by timestamp (most recent first).
func (*ErrorStore) MarkAllViewed ¶
func (e *ErrorStore) MarkAllViewed()
MarkAllViewed marks all errors as viewed. Automatically saves to disk.
func (*ErrorStore) MarkViewed ¶
func (e *ErrorStore) MarkViewed(id string)
MarkViewed marks an error as viewed by its ID. Automatically saves to disk.
func (*ErrorStore) UnviewedCount ¶
func (e *ErrorStore) UnviewedCount() int
UnviewedCount returns the number of unviewed errors.
type ExtendedStatus ¶
ExtendedStatus provides additional status info for diagnostics.
func GetExtendedStatus ¶
func GetExtendedStatus(s *StatusData) (ExtendedStatus, bool)
GetExtendedStatus extracts extended status from StatusData. Returns the extended status and true if available.
type FileChange ¶ added in v0.6.1
type FileChange struct {
Path string // relative to project root
ChangeType ChangeType
IsDir bool
Timestamp time.Time
}
FileChange represents a single observed filesystem change.
type FileChangeMurmurPublisher ¶ added in v0.6.1
type FileChangeMurmurPublisher struct {
// contains filtered or unexported fields
}
FileChangeMurmurPublisher drains the ChangeAccumulator frequently and batches changes into a murmur every 10-15 minutes. Only reports changes since the last murmur. On startup, caps at the last 30 minutes.
func NewFileChangeMurmurPublisher ¶ added in v0.6.1
func NewFileChangeMurmurPublisher( accumulator *ChangeAccumulator, publisher MurmurPublisher, ledgerPath string, projectRoot string, logger *slog.Logger, ) *FileChangeMurmurPublisher
NewFileChangeMurmurPublisher creates a publisher that batches file changes and emits murmurs at a regular interval (default 10 minutes).
func (*FileChangeMurmurPublisher) PendingCount ¶ added in v0.6.1
func (p *FileChangeMurmurPublisher) PendingCount() int
PendingCount returns the number of buffered changes (for testing).
func (*FileChangeMurmurPublisher) SetAgentResolver ¶ added in v0.6.1
func (p *FileChangeMurmurPublisher) SetAgentResolver(r ActiveAgentResolver)
SetAgentResolver sets the resolver used to look up active AI coworker IDs.
func (*FileChangeMurmurPublisher) Start ¶ added in v0.6.1
func (p *FileChangeMurmurPublisher) Start(ctx context.Context)
Start drains the accumulator and publishes murmurs. Blocks until ctx is canceled.
type FileSystem ¶
type FileSystem interface {
// Stat returns file info for the given path.
Stat(name string) (fs.FileInfo, error)
// ReadDir reads a directory and returns its entries.
ReadDir(name string) ([]fs.DirEntry, error)
}
FileSystem abstracts filesystem operations for testability.
type FileSystemWatcher ¶
type FileSystemWatcher interface {
// Add adds a path to the watch list.
Add(path string) error
// Events returns the channel for receiving file system events.
Events() <-chan fsnotify.Event
// Errors returns the channel for receiving watcher errors.
Errors() <-chan error
// Close stops the watcher and releases resources.
Close() error
}
FileSystemWatcher abstracts filesystem watching operations for testability. This interface allows tests to inject mock implementations that simulate file events without requiring actual filesystem operations.
func DefaultWatcherFactory ¶
func DefaultWatcherFactory() (FileSystemWatcher, error)
DefaultWatcherFactory creates real filesystem watchers.
type FlushThrottle ¶ added in v0.3.0
type FlushThrottle struct {
// contains filtered or unexported fields
}
FlushThrottle prevents thundering herd flushes by enforcing a minimum cooldown between flush operations. Without throttling, every Record() call above a batch threshold can spawn a new flush goroutine, creating unbounded HTTP POSTs.
Usage: call TryFlush() before spawning a flush goroutine. It atomically claims a flush slot if the cooldown has elapsed. Call RecordFlush() from the flush function itself (e.g., ticker-triggered flushes that bypass TryFlush).
func NewFlushThrottle ¶ added in v0.3.0
func NewFlushThrottle(cooldown time.Duration) *FlushThrottle
NewFlushThrottle creates a throttle with the given minimum interval between flushes.
func (*FlushThrottle) RecordFlush ¶ added in v0.3.0
func (ft *FlushThrottle) RecordFlush()
RecordFlush updates the last flush timestamp. Use this for flushes triggered by the background ticker (which bypass TryFlush).
func (*FlushThrottle) TryFlush ¶ added in v0.3.0
func (ft *FlushThrottle) TryFlush() bool
TryFlush atomically checks if the cooldown has elapsed and claims the flush slot. Returns true if the caller should proceed with flushing.
type FrictionCollector ¶
type FrictionCollector struct {
// contains filtered or unexported fields
}
FrictionCollector manages friction event buffering and transmission to the cloud. It delegates to a frictionax.Friction instance which handles the ring buffer, background flush loop, rate limiting, and catalog caching internally.
func NewFrictionCollector ¶
func NewFrictionCollector(logger *slog.Logger, projectEndpoint string) *FrictionCollector
NewFrictionCollector creates a new friction event collector. If friction is disabled via settings, the collector operates as a no-op.
projectEndpoint is the project's configured endpoint (e.g., "https://test.sageox.ai"). If empty, falls back to the default production endpoint. SAGEOX_FRICTION_ENDPOINT env var always takes precedence when set.
func (*FrictionCollector) IsEnabled ¶
func (f *FrictionCollector) IsEnabled() bool
IsEnabled returns whether friction collection is enabled.
func (*FrictionCollector) Record ¶
func (f *FrictionCollector) Record(event friction.FrictionEvent)
Record adds a friction event to the buffer. This is non-blocking and safe for concurrent use.
func (*FrictionCollector) RecordFromIPC ¶
func (f *FrictionCollector) RecordFromIPC(payload FrictionPayload)
RecordFromIPC adds a friction event from an IPC payload.
func (*FrictionCollector) SetAuthTokenGetter ¶
func (f *FrictionCollector) SetAuthTokenGetter(cb func() string)
SetAuthTokenGetter sets the callback to get auth token from heartbeat cache. Friction events are only accepted by the server from authenticated users.
func (*FrictionCollector) Start ¶
func (f *FrictionCollector) Start()
Start begins background processing of friction events. frictionax starts its background sender automatically in New(), so this is retained for API compatibility but is effectively a no-op.
func (*FrictionCollector) Stats ¶
func (f *FrictionCollector) Stats() FrictionStats
Stats returns current friction stats for status display.
func (*FrictionCollector) Stop ¶
func (f *FrictionCollector) Stop()
Stop gracefully shuts down the friction collector. Performs a final flush before returning. Safe to call multiple times.
type FrictionPayload ¶
type FrictionPayload struct {
// Timestamp in ISO8601 format (RFC3339 UTC).
Timestamp string `json:"ts"`
// Kind categorizes the failure type (unknown-command, unknown-flag, invalid-arg, parse-error).
Kind string `json:"kind"`
// Command is the top-level command.
Command string `json:"command,omitempty"`
// Subcommand is the subcommand if applicable.
Subcommand string `json:"subcommand,omitempty"`
// Actor identifies who ran the command (human or agent).
Actor string `json:"actor"`
// AgentType is the specific agent type when Actor is "agent" (e.g., "claude-code").
AgentType string `json:"agent_type,omitempty"`
// PathBucket categorizes the working directory (home, repo, other).
PathBucket string `json:"path_bucket"`
// Input is the redacted command input (max 500 chars).
Input string `json:"input"`
// ErrorMsg is the redacted, truncated error message (max 200 chars).
ErrorMsg string `json:"error_msg"`
}
FrictionPayload is the payload for friction events from CLI. These events capture CLI usage friction (unknown commands, typos, etc.) and are forwarded to the friction analytics service.
type FrictionStats ¶
type FrictionStats struct {
Enabled bool `json:"enabled"`
BufferCount int `json:"buffer_count"`
BufferSize int `json:"buffer_size"`
SampleRate float64 `json:"sample_rate"`
CatalogVersion string `json:"catalog_version,omitempty"`
}
FrictionStats holds friction statistics for status display.
type GitHubSyncManager ¶ added in v0.5.0
type GitHubSyncManager struct {
// contains filtered or unexported fields
}
GitHubSyncManager handles automatic GitHub PR/issue sync in the daemon. It fetches from the GitHub API, writes JSON files to the ledger, and commits+pushes so the data is available for distillation and cross-coworker search.
func NewGitHubSyncManager ¶ added in v0.5.0
func NewGitHubSyncManager(projectRoot string, ledgerMu *sync.Mutex, logger *slog.Logger) *GitHubSyncManager
NewGitHubSyncManager creates a new sync manager.
func (*GitHubSyncManager) CheckAndSync ¶ added in v0.5.0
func (m *GitHubSyncManager) CheckAndSync(ctx context.Context, ledgerPath string)
CheckAndSync runs a non-blocking GitHub sync if conditions are met.
func (*GitHubSyncManager) SetCodeDBManager ¶ added in v0.5.0
func (m *GitHubSyncManager) SetCodeDBManager(codedb *CodeDBManager)
SetCodeDBManager sets the CodeDB manager so indexing is triggered immediately after extraction (rather than waiting for the next 5m cycle).
func (*GitHubSyncManager) SetIssueTracker ¶ added in v0.5.0
func (m *GitHubSyncManager) SetIssueTracker(tracker *IssueTracker)
SetIssueTracker sets the issue tracker for reporting auth/sync problems.
func (*GitHubSyncManager) Status ¶ added in v0.5.0
func (m *GitHubSyncManager) Status() GitHubSyncStats
Status returns current sync status.
type GitHubSyncStats ¶ added in v0.5.0
type GitHubSyncStats struct {
LastSync time.Time `json:"last_sync,omitempty"`
LastError string `json:"last_error,omitempty"`
Syncing bool `json:"syncing"`
Owner string `json:"owner,omitempty"`
Repo string `json:"repo,omitempty"`
}
GitHubSyncStats exposes sync status for ox status / IPC.
type GitTrackedMatcher ¶ added in v0.6.1
type GitTrackedMatcher struct {
// contains filtered or unexported fields
}
GitTrackedMatcher determines whether a filesystem path should be watched by checking if it is tracked by git. Only git-versioned directories and files are watched — this is always correct and never drifts from reality.
func NewGitTrackedMatcher ¶ added in v0.6.1
func NewGitTrackedMatcher(projectRoot string, logger *slog.Logger) *GitTrackedMatcher
NewGitTrackedMatcher creates a matcher that watches only git-tracked paths. Runs git ls-files at construction to build the initial set.
func (*GitTrackedMatcher) IsTrackedDir ¶ added in v0.6.1
func (m *GitTrackedMatcher) IsTrackedDir(relPath string) bool
IsTrackedDir returns true if the relative directory contains tracked files. The project root (relPath == ".") is always tracked.
func (*GitTrackedMatcher) IsTrackedFile ¶ added in v0.6.1
func (m *GitTrackedMatcher) IsTrackedFile(relPath string) bool
IsTrackedFile returns true if the relative file path is tracked by git.
func (*GitTrackedMatcher) Refresh ¶ added in v0.6.1
func (m *GitTrackedMatcher) Refresh()
Refresh re-reads the tracked file set from git.
func (*GitTrackedMatcher) TrackedDirs ¶ added in v0.6.1
func (m *GitTrackedMatcher) TrackedDirs() []string
TrackedDirs returns a sorted list of tracked directory paths.
type HandlerResult ¶
type HandlerResult struct {
Response *Response // response to send (nil = no response)
SkipDefault bool // if true, don't send the default response
}
HandlerResult represents the result of a message handler.
type HealthStatus ¶
type HealthStatus int
HealthStatus represents overall daemon health.
const ( HealthHealthy HealthStatus = iota HealthWarning HealthCritical )
type HeartbeatCreds ¶
type HeartbeatCreds struct {
// Token is the git PAT (Personal Access Token) for clone/push/pull.
// Issued by the SageOx git server, expires periodically (see ExpiresAt).
Token string `json:"token"`
// ServerURL is the git server base URL (e.g., "https://git.sageox.io").
// Used to construct clone URLs for ledger and team context repos.
ServerURL string `json:"server_url"`
// ExpiresAt is when the git token expires.
// Daemon uses this to trigger credential refresh before expiry.
ExpiresAt time.Time `json:"expires_at"`
// AuthToken is the OAuth access token for REST API calls.
// Used to call endpoints like GET /api/v1/cli/repos to refresh git credentials.
// This is separate from Token because git and API auth are different systems.
AuthToken string `json:"auth_token"`
// UserEmail is the authenticated user's email address.
// Used for logging auth events and displaying in `ox status`.
UserEmail string `json:"user_email"`
// UserID is the authenticated user's unique identifier.
// Used for telemetry and audit logging.
UserID string `json:"user_id"`
}
HeartbeatCreds contains credentials for the daemon. Passed in heartbeats to keep daemon credentials fresh.
Two credential types are passed:
- Git credentials (Token/ServerURL): for clone/fetch/push operations
- Auth credentials (AuthToken): for REST API calls (e.g., GET /api/v1/cli/repos)
User identity (UserEmail/UserID) is included so daemon can:
- Log authentication events (login, logout, user switch)
- Include user context in telemetry
- Display authenticated user in `ox status`
func (*HeartbeatCreds) Copy ¶
func (c *HeartbeatCreds) Copy() *HeartbeatCreds
Copy returns a deep copy of HeartbeatCreds. Used to prevent races when storing credentials from external sources.
type HeartbeatEntry ¶
type HeartbeatEntry struct {
Timestamp time.Time `json:"ts"`
DaemonPID int `json:"pid"`
DaemonVersion string `json:"version"`
Workspace string `json:"workspace"`
LastSync time.Time `json:"last_sync,omitempty"`
Status string `json:"status"` // "healthy", "error", "starting"
ErrorCount int `json:"error_count,omitempty"`
}
HeartbeatEntry represents a single heartbeat written to a repo.
func ReadLastHeartbeatFromPath ¶
func ReadLastHeartbeatFromPath(heartbeatPath string) (*HeartbeatEntry, error)
ReadLastHeartbeatFromPath reads the most recent heartbeat from an explicit path. Used for reading from ~/.cache/sageox/<endpoint>/heartbeats/<id>.jsonl
type HeartbeatHandler ¶
type HeartbeatHandler struct {
// contains filtered or unexported fields
}
HeartbeatHandler processes incoming heartbeats from CLI commands.
func NewHeartbeatHandler ¶
func NewHeartbeatHandler(logger *slog.Logger) *HeartbeatHandler
NewHeartbeatHandler creates a new heartbeat handler.
func (*HeartbeatHandler) CleanupStaleAgents ¶ added in v0.5.1
func (h *HeartbeatHandler) CleanupStaleAgents(activeIDs []string)
CleanupStaleAgents removes entries from all agent-keyed maps for agents not in the active set. Call periodically (e.g., after liveness checks) to prevent unbounded growth of context and metadata maps.
func (*HeartbeatHandler) GetActivitySummary ¶
func (h *HeartbeatHandler) GetActivitySummary() ActivitySummary
GetActivitySummary returns a summary of all tracked activity.
func (*HeartbeatHandler) GetAgentActivity ¶
func (h *HeartbeatHandler) GetAgentActivity() *ActivityTracker
GetAgentActivity returns the activity tracker for connected agents.
func (*HeartbeatHandler) GetAgentContextStats ¶ added in v0.3.0
func (h *HeartbeatHandler) GetAgentContextStats(agentID string) AgentContextStats
GetAgentContextStats returns the cumulative context consumption for a given agent.
func (*HeartbeatHandler) GetAgentLastWhisper ¶ added in v0.6.0
func (h *HeartbeatHandler) GetAgentLastWhisper(agentID string) time.Time
GetAgentLastWhisper returns when whispers were last delivered to the agent. Returns zero time if no whispers have been delivered.
func (*HeartbeatHandler) GetAgentPID ¶ added in v0.5.0
func (h *HeartbeatHandler) GetAgentPID(agentID string) int
GetAgentPID returns the parent process ID for a given agent. Returns 0 if not known.
func (*HeartbeatHandler) GetAgentParentID ¶ added in v0.5.0
func (h *HeartbeatHandler) GetAgentParentID(agentID string) string
GetAgentParentID returns the parent agent ID for a given agent. Returns empty string if no parent is known.
func (*HeartbeatHandler) GetAgentType ¶ added in v0.5.0
func (h *HeartbeatHandler) GetAgentType(agentID string) string
GetAgentType returns the agent type for a given agent. Returns empty string if no type is known.
func (*HeartbeatHandler) GetAuthToken ¶
func (h *HeartbeatHandler) GetAuthToken() string
GetAuthToken returns the cached auth token for API calls. Returns empty string if no auth token is available.
func (*HeartbeatHandler) GetAuthenticatedUser ¶
func (h *HeartbeatHandler) GetAuthenticatedUser() *AuthenticatedUser
GetAuthenticatedUser returns info about the currently authenticated user. Returns nil if no user is authenticated.
func (*HeartbeatHandler) GetCallers ¶ added in v0.5.0
func (h *HeartbeatHandler) GetCallers() []CallerInfo
GetCallers returns all known callers (clones/worktrees) that have sent heartbeats.
func (*HeartbeatHandler) GetCredentials ¶
func (h *HeartbeatHandler) GetCredentials() (*HeartbeatCreds, time.Time)
GetCredentials returns the current credentials and their freshness. Returns nil if no credentials have been received.
func (*HeartbeatHandler) GetRepoActivity ¶
func (h *HeartbeatHandler) GetRepoActivity() *ActivityTracker
GetRepoActivity returns the activity tracker for repos.
func (*HeartbeatHandler) GetTeamActivity ¶
func (h *HeartbeatHandler) GetTeamActivity() *ActivityTracker
GetTeamActivity returns the activity tracker for teams.
func (*HeartbeatHandler) GetWorkspaceActivity ¶
func (h *HeartbeatHandler) GetWorkspaceActivity() *ActivityTracker
GetWorkspaceActivity returns the activity tracker for workspaces.
func (*HeartbeatHandler) Handle ¶
func (h *HeartbeatHandler) Handle(callerID string, payload json.RawMessage)
Handle processes an incoming heartbeat message. callerID identifies the clone/worktree that sent the heartbeat (path-based hash).
func (*HeartbeatHandler) HasValidCredentials ¶
func (h *HeartbeatHandler) HasValidCredentials() bool
HasValidCredentials returns true if we have non-expired credentials.
func (*HeartbeatHandler) LastCallerPath ¶ added in v0.5.0
func (h *HeartbeatHandler) LastCallerPath() string
LastCallerPath returns the most recent caller clone/worktree path from heartbeats. Returns empty string if no heartbeat with CallerPath has been received.
func (*HeartbeatHandler) RecordWhisperDelivery ¶ added in v0.6.0
func (h *HeartbeatHandler) RecordWhisperDelivery(agentID string)
RecordWhisperDelivery records that whispers were delivered to the agent right now.
func (*HeartbeatHandler) SetActivityCallback ¶
func (h *HeartbeatHandler) SetActivityCallback(cb func())
SetActivityCallback sets the callback for any heartbeat activity.
func (*HeartbeatHandler) SetAgentHeartbeatCallback ¶ added in v0.6.0
func (h *HeartbeatHandler) SetAgentHeartbeatCallback(cb func(agentID string))
SetAgentHeartbeatCallback registers a callback fired when an agent heartbeat arrives.
func (*HeartbeatHandler) SetCallerPathCallback ¶ added in v0.6.0
func (h *HeartbeatHandler) SetCallerPathCallback(cb func(path string))
SetCallerPathCallback sets the callback for when a heartbeat arrives with a CallerPath. Used to update components (e.g., CodeDBManager) when the active workspace changes (common with Conductor which creates new workspace dirs).
func (*HeartbeatHandler) SetInitialCredentials ¶
func (h *HeartbeatHandler) SetInitialCredentials(creds *HeartbeatCreds)
SetInitialCredentials pre-populates credentials (e.g., from credential store on startup). This allows daemon to have credentials immediately without waiting for first heartbeat.
func (*HeartbeatHandler) SetTeamNeededCallback ¶
func (h *HeartbeatHandler) SetTeamNeededCallback(cb func(teamID string))
SetTeamNeededCallback sets the callback for when a team context is needed.
func (*HeartbeatHandler) SetVersionMismatchCallback ¶
func (h *HeartbeatHandler) SetVersionMismatchCallback(cb func(cliVersion, daemonVersion string))
SetVersionMismatchCallback sets the callback for CLI/daemon version mismatch. Called when CLI version differs from daemon version, typically triggers restart.
type HeartbeatPayload ¶
type HeartbeatPayload struct {
// RepoPath identifies which git repository the CLI is operating in.
// Used for activity tracking sparklines and to prioritize sync for active repos.
RepoPath string `json:"repo_path,omitempty"`
// WorkspaceID identifies the workspace context (derived from repo path hash).
// Enables multi-workspace daemon support and request routing.
WorkspaceID string `json:"workspace_id,omitempty"`
// CallerPath is the absolute path of the clone/worktree sending this heartbeat.
// With per-repo daemons, multiple clones share one daemon — CallerPath lets the
// daemon know which clone paths are alive and keeps registry.workspace_path fresh.
CallerPath string `json:"caller_path,omitempty"`
// AgentID identifies the agent session (e.g., "Oxa7b3").
// Used for tracking which agents are actively connected to the daemon.
// Empty for non-agent CLI commands.
AgentID string `json:"agent_id,omitempty"`
// TeamIDs lists team contexts referenced in the current operation.
// Triggers lazy loading of team context repos when daemon sees new team IDs.
TeamIDs []string `json:"team_ids,omitempty"`
// Credentials contains tokens for git operations and API calls.
// CLI pushes credentials so daemon doesn't need filesystem access to token stores.
// This is the primary mechanism for keeping daemon authenticated.
Credentials *HeartbeatCreds `json:"credentials,omitempty"`
// Timestamp is when the heartbeat was generated.
// Used for staleness detection and activity timeline.
Timestamp time.Time `json:"timestamp"`
// CLIVersion is the version of the CLI sending the heartbeat.
// Daemon compares this to its own version; mismatch triggers daemon restart
// to ensure CLI and daemon stay in sync after upgrades.
CLIVersion string `json:"cli_version,omitempty"`
// ContextTokens is the estimated token count of context this command produced.
// Accumulated per-agent by the daemon for visibility into context budget usage.
// Zero means no context tracking for this heartbeat (e.g., non-agent commands).
ContextTokens int64 `json:"context_tokens,omitempty"`
// CommandName identifies which ox subcommand produced this context (e.g., "prime",
// "team-ctx", "session list"). Used for per-command breakdown (ox-aw0).
CommandName string `json:"command_name,omitempty"`
// ParentAgentID is the agent ID of the parent that spawned this agent (e.g., "Oxa7b3").
// Empty for top-level agents. Used for tree structure in `ox agent list`.
ParentAgentID string `json:"parent_agent_id,omitempty"`
// AgentType identifies the kind of agent (e.g., "claude-code", "explore").
// Used for display in `ox agent list`.
AgentType string `json:"agent_type,omitempty"`
// ParentPID is the process ID of the parent agent process (e.g., Claude Code).
// Captured via os.Getppid() in the CLI. Used by the daemon for instant liveness
// detection via kill(pid, 0) instead of waiting for heartbeat timeout.
ParentPID int `json:"parent_pid,omitempty"`
}
HeartbeatPayload is sent by CLI commands to the daemon. All fields are optional - commands send what context they have.
The heartbeat serves multiple purposes:
- Activity tracking: lets daemon know CLI is active (prevents inactivity shutdown)
- Context awareness: daemon learns which repos/teams/workspaces/agents are in use
- Credential refresh: CLI pushes fresh tokens so daemon can make API calls
- Version sync: ensures daemon and CLI are compatible
type Instance ¶
type Instance struct {
// ID is the unique instance identifier (e.g., "Oxa7b3").
// Generated by the agent on startup.
ID string `json:"id"`
// AgentType identifies the type of agent (e.g., "claude-code", "cursor").
AgentType string `json:"agent_type"`
// StartTime is when the instance was registered.
StartTime time.Time `json:"start_time"`
// LastHeartbeat is when the last heartbeat was received.
LastHeartbeat time.Time `json:"last_heartbeat"`
// WorkspacePath is the workspace directory the agent is operating in.
WorkspacePath string `json:"workspace_path"`
// Status is the computed instance status: "active", "idle", or "stale".
// Computed from LastHeartbeat relative to current time.
Status string `json:"status"`
// ParentPID is the process ID of the parent agent process.
// Used for instant liveness detection via kill(pid, 0).
ParentPID int `json:"parent_pid,omitempty"`
}
Instance represents an active agent instance. Instances are registered when agents connect and tracked via heartbeats.
func (*Instance) IsProcessAlive ¶ added in v0.5.0
IsProcessAlive checks if the parent agent process is still running. Uses kill(pid, 0) which checks existence without sending a signal. Returns false if no PID was recorded or the process is gone.
type InstanceInfo ¶
type InstanceInfo struct {
// AgentID is the short agent identifier (e.g., "Oxa7b3").
AgentID string `json:"agent_id"`
// WorkspacePath is the workspace/repo the agent is working in.
WorkspacePath string `json:"workspace_path"`
// LastHeartbeat is when the agent last sent a heartbeat.
LastHeartbeat time.Time `json:"last_heartbeat"`
// HeartbeatCount is the number of heartbeats received from this agent.
HeartbeatCount int `json:"heartbeat_count"`
// Status is "active" (recent heartbeat) or "idle" (stale heartbeat).
Status string `json:"status"`
// CumulativeContextTokens is the estimated total tokens of context this agent consumed from ox commands.
CumulativeContextTokens int64 `json:"cumulative_context_tokens,omitempty"`
// CommandCount is the number of ox commands that produced context output for this agent.
CommandCount int `json:"command_count,omitempty"`
// ParentAgentID is the parent agent that spawned this agent (empty for top-level agents).
// Populated from heartbeat tracking, enabling cross-worktree tree display.
ParentAgentID string `json:"parent_agent_id,omitempty"`
// AgentType identifies the kind of agent (e.g., "claude-code", "explore").
// Populated from heartbeat tracking, enabling cross-worktree type display.
AgentType string `json:"agent_type,omitempty"`
// ParentPID is the parent process ID of the agent.
// Enables instant liveness detection without heartbeat timeout.
ParentPID int `json:"parent_pid,omitempty"`
// LastWhisper is when whispers were last delivered to this agent.
// Zero if no whispers have been delivered in the current daemon session.
LastWhisper time.Time `json:"last_whisper,omitempty"`
}
InstanceInfo represents an active agent instance from a daemon. Used by the instances IPC message to report connected agents.
func GetAllInstances ¶
func GetAllInstances() ([]InstanceInfo, error)
GetAllInstances queries all running daemons and aggregates their agent instances. Returns instances from all workspaces, sorted by last heartbeat (most recent first).
type InstanceStore ¶
type InstanceStore struct {
// contains filtered or unexported fields
}
InstanceStore manages active agent instances.
Design principles:
- Thread-safe: sync loop and IPC handlers may access concurrently
- Bounded: MaxInstances cap prevents unbounded memory growth
- Self-cleaning: stale instances are automatically removed
- Fast reads: GetActive() is O(n) where n = active instances (typically < 10)
The daemon uses InstanceStore to:
- Track which agents are connected (for `ox status`)
- Coordinate work across multiple agents
- Clean up resources when agents disconnect
func NewInstanceStore ¶
func NewInstanceStore(logger *slog.Logger) *InstanceStore
NewInstanceStore creates a new instance store.
func (*InstanceStore) ActiveCount ¶
func (s *InstanceStore) ActiveCount() int
ActiveCount returns the number of active (non-stale) instances.
func (*InstanceStore) Count ¶
func (s *InstanceStore) Count() int
Count returns the total number of tracked instances.
func (*InstanceStore) Deregister ¶
func (s *InstanceStore) Deregister(id string)
Deregister removes an instance. No-op if the instance doesn't exist.
func (*InstanceStore) Get ¶
func (s *InstanceStore) Get(id string) *Instance
Get retrieves an instance by ID. Returns nil if not found.
func (*InstanceStore) GetActive ¶
func (s *InstanceStore) GetActive() []*Instance
GetActive returns all instances that are not stale. Includes both "active" and "idle" instances. Returns a copy sorted by last heartbeat (most recent first).
func (*InstanceStore) GetAll ¶
func (s *InstanceStore) GetAll() []*Instance
GetAll returns all instances regardless of status. Returns a copy sorted by start time (oldest first).
func (*InstanceStore) GetStale ¶
func (s *InstanceStore) GetStale(threshold time.Duration) []*Instance
GetStale returns instances that haven't received a heartbeat within the threshold. These are candidates for cleanup. Returns a copy sorted by last heartbeat (oldest first).
func (*InstanceStore) Heartbeat ¶
func (s *InstanceStore) Heartbeat(id string)
Heartbeat updates the last heartbeat time for an instance. If the instance doesn't exist, this is a no-op. Use Register() to create new instances.
func (*InstanceStore) Register ¶
func (s *InstanceStore) Register(inst *Instance)
Register adds or updates an instance. If an instance with the same ID exists, it is updated.
func (*InstanceStore) StartCleanup ¶
func (s *InstanceStore) StartCleanup()
StartCleanup starts the background cleanup routine. Call Stop() to stop the cleanup routine.
func (*InstanceStore) Stop ¶
func (s *InstanceStore) Stop()
Stop stops the background cleanup routine.
type InstancesResponse ¶
type InstancesResponse struct {
Instances []InstanceInfo `json:"instances"`
}
InstancesResponse is the response for the instances IPC message.
type IssueTracker ¶
type IssueTracker struct {
// contains filtered or unexported fields
}
IssueTracker maintains the daemon's issue cache.
Design: The daemon detects issues during sync operations and caches them in memory. CLI reads are O(1) memory access - no blocking on git operations. This is critical because CLI commands block the agent's event loop and must be fast (< 1ms).
Thread safety: Sync loop writes, IPC handlers read. RWMutex allows concurrent reads.
Deduplication: Only one issue per (Type, Repo) combination. If the same issue is set again, it updates the existing entry (e.g., severity might change).
func NewIssueTracker ¶
func NewIssueTracker() *IssueTracker
NewIssueTracker creates a new issue tracker.
func (*IssueTracker) ClearIssue ¶
func (t *IssueTracker) ClearIssue(issueType, repo string)
ClearIssue removes an issue by type and repo. No-op if the issue doesn't exist.
func (*IssueTracker) ClearRepo ¶
func (t *IssueTracker) ClearRepo(repo string)
ClearRepo removes all issues for a specific repo. Useful when a repo is removed or all its issues are resolved.
func (*IssueTracker) GetIssues ¶
func (t *IssueTracker) GetIssues() []DaemonIssue
GetIssues returns a copy of all issues, sorted by severity (critical first). Returns a copy to prevent races with concurrent modifications.
func (*IssueTracker) MaxSeverity ¶
func (t *IssueTracker) MaxSeverity() string
MaxSeverity returns the highest severity among all issues. Returns empty string if no issues exist.
func (*IssueTracker) NeedsHelp ¶
func (t *IssueTracker) NeedsHelp() bool
NeedsHelp returns true if any issues exist. This is the fast-path check for CLI - just reading a length.
func (*IssueTracker) SetIssue ¶
func (t *IssueTracker) SetIssue(issue DaemonIssue)
SetIssue adds or updates an issue. Deduplicates by (Type, Repo) - only one issue per combination exists. If an issue with the same (Type, Repo) exists, it is updated.
type ManagedRepoPullOpts ¶ added in v0.6.0
type ManagedRepoPullOpts struct {
// RepoPath is the local filesystem path to the git repo.
RepoPath string
// RepoName identifies the repo in issues and logs (e.g., "ledger", "team-context-foo").
RepoName string
// ProjectRoot is the user's project root, used to resolve the endpoint
// for credential refresh.
ProjectRoot string
// SyncInterval is the base sync interval. Used to compute FETCH_HEAD
// dedup threshold (SyncInterval / 2).
SyncInterval time.Duration
// MinFetchAge overrides the minimum FETCH_HEAD age for dedup.
// Zero means use gitutil.MinFetchHeadAge.
MinFetchAge time.Duration
// ValidateIntegrity runs isValidGitRepo before pulling. If false (or repo
// is corrupt), returns a CorruptRepo result so the caller can handle reclone.
ValidateIntegrity bool
// DetectDivergence runs rev-list divergence check after fetch, before pull.
// Purely informational — pull --rebase handles it either way.
DetectDivergence bool
// ResolveRules maps path prefixes to conflict resolution modes.
// Empty means no auto-resolve — conflicts become errors.
// Most specific prefix wins: `resolve none data/proprietary/` overrides
// `resolve auto data/` for files under data/proprietary/.
ResolveRules []manifest.ResolveRule
// Logger for structured logging. Required.
Logger *slog.Logger
}
ManagedRepoPullOpts configures how pullManagedRepo behaves for a given repo. Both ledger and team context repos use this — behavioral differences are expressed through these options, not through separate code paths.
type ManagedRepoPullResult ¶ added in v0.6.0
type ManagedRepoPullResult struct {
// Skipped is true if the pull was skipped (repo up-to-date, in rebase, locked, etc).
Skipped bool
// SkipReason explains why the pull was skipped.
SkipReason string
// CorruptRepo is true if ValidateIntegrity detected a corrupt repo.
// The caller should handle reclone.
CorruptRepo bool
// Diverged is true if branches were diverged before the pull.
Diverged bool
// AutoResolved is true if rebase conflicts were auto-resolved.
AutoResolved bool
// FetchHeadTime is the FETCH_HEAD mtime after fetch (zero if not fetched).
FetchHeadTime time.Time
// Error from fetch or pull. Nil on success or skip.
Err error
// Issue to report (nil if none). The caller decides how to persist it.
Issue *DaemonIssue
}
ManagedRepoPullResult describes what happened during pullManagedRepo.
type MarkErrorsPayload ¶
type MarkErrorsPayload struct {
// IDs to mark as viewed. If empty, marks all errors as viewed.
IDs []string `json:"ids,omitempty"`
}
MarkErrorsPayload is the payload for marking errors as viewed.
type Message ¶
type Message struct {
Type string `json:"type"`
WorkspaceID string `json:"workspace_id,omitempty"` // repo-scoped daemon identity
CallerID string `json:"caller_id,omitempty"` // identifies calling clone/worktree (path-based hash)
Payload json.RawMessage `json:"payload,omitempty"`
}
Message represents an IPC message.
type MessageHandler ¶
type MessageHandler func(s *Server, msg Message, conn net.Conn) HandlerResult
MessageHandler handles a specific message type. It receives the server (for accessing callbacks), the message, and the connection. Returns the handler result.
type MessageRouter ¶
type MessageRouter struct {
// contains filtered or unexported fields
}
MessageRouter routes messages to their handlers.
func NewMessageRouter ¶
func NewMessageRouter(logger *slog.Logger) *MessageRouter
NewMessageRouter creates a new message router.
func (*MessageRouter) Handle ¶
func (r *MessageRouter) Handle(s *Server, msg Message, conn net.Conn) (HandlerResult, bool)
Handle routes a message to its handler. Returns the handler result and whether a handler was found.
func (*MessageRouter) Register ¶
func (r *MessageRouter) Register(msgType string, handler MessageHandler)
Register registers a handler for a message type.
type MockFileSystem ¶
type MockFileSystem struct {
// contains filtered or unexported fields
}
MockFileSystem implements FileSystem for testing.
func NewMockFileSystem ¶
func NewMockFileSystem() *MockFileSystem
NewMockFileSystem creates a new mock filesystem for testing.
func (*MockFileSystem) AddDir ¶
func (m *MockFileSystem) AddDir(path string, entries []string)
AddDir adds a mock directory to the filesystem.
func (*MockFileSystem) AddFile ¶
func (m *MockFileSystem) AddFile(path string, size int64, mode fs.FileMode)
AddFile adds a mock file to the filesystem.
func (*MockFileSystem) ReadDir ¶
func (m *MockFileSystem) ReadDir(name string) ([]fs.DirEntry, error)
ReadDir reads a directory and returns its entries.
func (*MockFileSystem) SetReadDirError ¶
func (m *MockFileSystem) SetReadDirError(path string, err error)
SetReadDirError sets the error to return for ReadDir on a specific path.
func (*MockFileSystem) SetStatError ¶
func (m *MockFileSystem) SetStatError(path string, err error)
SetStatError sets the error to return for Stat on a specific path.
type MockFileSystemWatcher ¶
type MockFileSystemWatcher struct {
// contains filtered or unexported fields
}
MockFileSystemWatcher implements FileSystemWatcher for testing. It allows tests to inject events and errors without real filesystem operations.
func NewMockFileSystemWatcher ¶
func NewMockFileSystemWatcher() *MockFileSystemWatcher
NewMockFileSystemWatcher creates a new mock watcher for testing.
func (*MockFileSystemWatcher) Add ¶
func (m *MockFileSystemWatcher) Add(path string) error
Add records the path and returns the configured error (if any).
func (*MockFileSystemWatcher) AddedPaths ¶
func (m *MockFileSystemWatcher) AddedPaths() []string
AddedPaths returns the paths that were added to the watcher.
func (*MockFileSystemWatcher) Close ¶
func (m *MockFileSystemWatcher) Close() error
Close closes the channels and returns the configured error (if any).
func (*MockFileSystemWatcher) CloseErrors ¶
func (m *MockFileSystemWatcher) CloseErrors()
CloseErrors closes the errors channel.
func (*MockFileSystemWatcher) CloseEvents ¶
func (m *MockFileSystemWatcher) CloseEvents()
CloseEvents closes the events channel to signal completion.
func (*MockFileSystemWatcher) Errors ¶
func (m *MockFileSystemWatcher) Errors() <-chan error
Errors returns the errors channel.
func (*MockFileSystemWatcher) Events ¶
func (m *MockFileSystemWatcher) Events() <-chan fsnotify.Event
Events returns the events channel.
func (*MockFileSystemWatcher) SendError ¶
func (m *MockFileSystemWatcher) SendError(err error)
SendError sends an error to the watcher.
func (*MockFileSystemWatcher) SendEvent ¶
func (m *MockFileSystemWatcher) SendEvent(event fsnotify.Event)
SendEvent sends an event to the watcher.
func (*MockFileSystemWatcher) SetAddError ¶
func (m *MockFileSystemWatcher) SetAddError(err error)
SetAddError configures the error to return from Add().
func (*MockFileSystemWatcher) SetCloseError ¶
func (m *MockFileSystemWatcher) SetCloseError(err error)
SetCloseError configures the error to return from Close().
type MurmurNudgeSource ¶ added in v0.6.0
type MurmurNudgeSource struct {
// contains filtered or unexported fields
}
MurmurNudgeSource periodically checks if active agents should be nudged to self-report what they're working on via ox murmur. Produces a whisper entry for each agent that hasn't murmured within the configured interval.
Uses the whisper store as source of truth for nudge/murmur history, so state survives daemon restarts without needing in-memory tracking.
func NewMurmurNudgeSource ¶ added in v0.6.0
func NewMurmurNudgeSource(store *whisperstore.Store, heartbeat *HeartbeatHandler, interval time.Duration, projectRoot string) *MurmurNudgeSource
NewMurmurNudgeSource creates a source that nudges agents to self-report.
func (*MurmurNudgeSource) Interval ¶ added in v0.6.0
func (s *MurmurNudgeSource) Interval() time.Duration
Interval returns 1 minute — the source runs frequently but only produces whispers when agents actually need nudging (first nudge after ~1min, then every configured interval).
func (*MurmurNudgeSource) IsAgentPaused ¶ added in v0.6.0
func (s *MurmurNudgeSource) IsAgentPaused(agentID string) bool
IsAgentPaused returns true if the agent has murmuring paused.
func (*MurmurNudgeSource) Name ¶ added in v0.6.0
func (s *MurmurNudgeSource) Name() string
func (*MurmurNudgeSource) PauseAgent ¶ added in v0.6.0
func (s *MurmurNudgeSource) PauseAgent(agentID string)
PauseAgent pauses murmur nudging for the given agent.
func (*MurmurNudgeSource) Produce ¶ added in v0.6.0
func (s *MurmurNudgeSource) Produce(_ context.Context) []whisperstore.WhisperEntry
func (*MurmurNudgeSource) ResumeAgent ¶ added in v0.6.0
func (s *MurmurNudgeSource) ResumeAgent(agentID string)
ResumeAgent resumes murmur nudging for the given agent.
type MurmurPausePayload ¶ added in v0.6.0
type MurmurPausePayload struct {
AgentID string `json:"agent_id"`
}
MurmurPausePayload carries the agent ID for pause/resume murmur nudging.
type MurmurPayload ¶ added in v0.6.0
type MurmurPayload struct {
TargetDir string `json:"target_dir"` // ledger or team context repo path
Content string `json:"content"` // murmur content (for commit message summary)
RelPath string `json:"rel_path"` // relative path to write within TargetDir
MurmurJSON []byte `json:"murmur_json"` // serialized ledger.MurmurFile to write at RelPath
}
MurmurPayload carries all data the daemon needs to write and commit a murmur. The CLI passes the full MurmurFile JSON so the daemon owns all disk I/O — no temp file is written by the CLI when the daemon is available.
type MurmurPublisher ¶ added in v0.6.1
type MurmurPublisher interface {
PublishMurmur(payload MurmurPayload)
}
MurmurPublisher writes a murmur to the ledger via the daemon service.
type MurmurRelay ¶ added in v0.6.0
type MurmurRelay struct {
// contains filtered or unexported fields
}
MurmurRelay detects new murmur files in ledger and team context repos after git pull, converts them to whisper entries, and feeds them to the WhisperRegistry. Uses the relayed_murmurs table for dedup across restarts.
func NewMurmurRelay ¶ added in v0.6.0
func NewMurmurRelay(registry *WhisperRegistry, projectRoot string, logger *slog.Logger) *MurmurRelay
NewMurmurRelay creates a relay that scans murmur files and converts them to whisper entries in the registry.
func (*MurmurRelay) RelayFromPath ¶ added in v0.6.0
func (r *MurmurRelay) RelayFromPath(baseDir, scope string) int
RelayFromPath scans a directory for murmur files within the default window and relays any new ones into the whisper registry. Called after git pull on a ledger or team context repo. scope must be "ledger" or "team". Returns the number of murmurs successfully relayed.
func (*MurmurRelay) SetLocalAgentIDs ¶ added in v0.6.0
func (r *MurmurRelay) SetLocalAgentIDs(ids []string)
SetLocalAgentIDs sets the agent IDs running on this machine. Murmurs authored by these agents are filtered out to avoid echo.
type ProgressCallback ¶
ProgressCallback is called for each progress update during long operations. Percent is nil when unknown.
type ProgressResponse ¶
type ProgressResponse struct {
Progress *CheckoutProgress `json:"progress,omitempty"` // non-nil = still in progress
Success bool `json:"success"` // final result
Error string `json:"error,omitempty"`
Data json.RawMessage `json:"data,omitempty"`
}
ProgressResponse is a response that indicates ongoing progress.
type ProgressWriter ¶
type ProgressWriter struct {
// contains filtered or unexported fields
}
ProgressWriter allows handlers to send progress updates during long operations.
func (*ProgressWriter) WriteMessage ¶
func (pw *ProgressWriter) WriteMessage(message string) error
WriteMessage sends a progress update with just a message (no stage or percent).
func (*ProgressWriter) WriteProgress ¶
func (pw *ProgressWriter) WriteProgress(stage string, percent int, message string) error
WriteProgress sends a progress update with known percentage.
func (*ProgressWriter) WriteStage ¶
func (pw *ProgressWriter) WriteStage(stage string, message string) error
WriteStage sends a progress update with stage and message (no percent).
type ProjectWatcher ¶ added in v0.6.1
type ProjectWatcher struct {
// contains filtered or unexported fields
}
ProjectWatcher watches a project directory recursively for file changes and feeds events through a ChangeAccumulator for settled batch processing. Only watches directories that contain git-tracked files.
func NewProjectWatcher ¶ added in v0.6.1
func NewProjectWatcher( projectRoot string, logger *slog.Logger, watcherFactory WatcherFactory, fileSystem FileSystem, accumulator *ChangeAccumulator, tracker *GitTrackedMatcher, ) *ProjectWatcher
NewProjectWatcher creates a new project watcher.
func (*ProjectWatcher) Accumulator ¶ added in v0.6.1
func (pw *ProjectWatcher) Accumulator() *ChangeAccumulator
Accumulator returns the underlying change accumulator.
func (*ProjectWatcher) Start ¶ added in v0.6.1
func (pw *ProjectWatcher) Start(ctx context.Context)
Start begins watching the project directory. Blocks until ctx is canceled.
func (*ProjectWatcher) WatchedDirCount ¶ added in v0.6.1
func (pw *ProjectWatcher) WatchedDirCount() int
WatchedDirCount returns the number of directories being watched.
type RealFileSystem ¶
type RealFileSystem struct{}
RealFileSystem implements FileSystem using actual OS calls.
type RealFileSystemWatcher ¶
type RealFileSystemWatcher struct {
// contains filtered or unexported fields
}
RealFileSystemWatcher implements FileSystemWatcher using fsnotify.
func NewRealFileSystemWatcher ¶
func NewRealFileSystemWatcher() (*RealFileSystemWatcher, error)
NewRealFileSystemWatcher creates a new real filesystem watcher.
func (*RealFileSystemWatcher) Add ¶
func (r *RealFileSystemWatcher) Add(path string) error
Add adds a path to the watch list.
func (*RealFileSystemWatcher) Close ¶
func (r *RealFileSystemWatcher) Close() error
Close stops the watcher and releases resources.
func (*RealFileSystemWatcher) Errors ¶
func (r *RealFileSystemWatcher) Errors() <-chan error
Errors returns the channel for receiving watcher errors.
func (*RealFileSystemWatcher) Events ¶
func (r *RealFileSystemWatcher) Events() <-chan fsnotify.Event
Events returns the channel for receiving file system events.
type Registry ¶
type Registry struct {
Daemons map[string]DaemonInfo `json:"daemons"`
// contains filtered or unexported fields
}
Registry tracks all running ox daemons on the host.
func LoadRegistry ¶
LoadRegistry loads the daemon registry from disk.
func (*Registry) FindByRepoID ¶ added in v0.5.0
func (r *Registry) FindByRepoID(repoID string) *DaemonInfo
FindByRepoID returns the first daemon entry matching the given repo_id, or nil if no match is found. Empty repoID always returns nil.
func (*Registry) FindByWorkspaceID ¶ added in v0.6.0
func (r *Registry) FindByWorkspaceID(workspaceID string) *DaemonInfo
FindByWorkspaceID returns the daemon entry for the given workspace ID, or nil if no match is found. Empty workspaceID always returns nil.
func (*Registry) Register ¶
func (r *Registry) Register(info DaemonInfo) error
Register adds or updates a daemon entry.
Known race: there is a TOCTOU window between the in-memory map update (protected by r.mu) and the disk write (protected by the package-level registryMu). Two daemon processes calling Register simultaneously could each read a stale on-disk snapshot, causing one entry to be lost. This is acceptable because the registry is best-effort: a lost entry re-registers on the next heartbeat, and adding flock-based file locking would introduce disproportionate complexity for this failure mode.
func (*Registry) Unregister ¶
Unregister removes a daemon entry.
type RepoStats ¶ added in v0.4.0
type RepoStats struct {
Name string `json:"name"`
Path string `json:"path"`
Commits int `json:"commits"`
Blobs int `json:"blobs"`
}
RepoStats tracks per-repo statistics within the index.
type Response ¶
type Response struct {
Success bool `json:"success"`
Error string `json:"error,omitempty"`
Data json.RawMessage `json:"data,omitempty"`
}
Response represents an IPC response.
type RingBuffer ¶
type RingBuffer struct {
// contains filtered or unexported fields
}
RingBuffer stores N most recent timestamps in a circular buffer.
func (*RingBuffer) Add ¶
func (r *RingBuffer) Add(t time.Time)
Add adds a timestamp to the ring buffer.
func (*RingBuffer) Count ¶
func (r *RingBuffer) Count() int
Count returns the number of entries in the buffer.
func (*RingBuffer) Slice ¶
func (r *RingBuffer) Slice() []time.Time
Slice returns all timestamps in chronological order (oldest first).
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server handles IPC requests from clients.
func NewServer ¶
NewServer creates a new IPC server with a mutable CallbackService. Handlers are wired incrementally via Set*Handler methods.
func NewServerWithService ¶ added in v0.6.0
func NewServerWithService(logger *slog.Logger, service DaemonService) *Server
NewServerWithService creates an IPC server backed by an explicit DaemonService. Use this when all operations are available upfront (e.g., in daemon.go).
func (*Server) SetActivityCallback ¶
func (s *Server) SetActivityCallback(cb func())
SetActivityCallback sets the callback for activity tracking.
func (*Server) SetCheckoutHandler ¶
func (s *Server) SetCheckoutHandler(cb func(payload CheckoutPayload, progress *ProgressWriter) (*CheckoutResult, error))
SetCheckoutHandler sets the handler for checkout requests. The handler receives a ProgressWriter to send progress updates during long operations.
func (*Server) SetCodeIndexHandler ¶ added in v0.4.0
func (s *Server) SetCodeIndexHandler(cb func(payload CodeIndexPayload, progress *ProgressWriter) (*CodeIndexResult, error))
SetCodeIndexHandler sets the handler for code indexing requests. The handler receives a ProgressWriter to send progress updates during indexing.
func (*Server) SetCodeStatusHandler ¶ added in v0.4.0
func (s *Server) SetCodeStatusHandler(cb func() *CodeDBStats)
SetCodeStatusHandler sets the handler for code index status requests.
func (*Server) SetDoctorHandler ¶
func (s *Server) SetDoctorHandler(handler func() *DoctorResponse)
SetDoctorHandler sets the doctor (health check) handler.
func (*Server) SetErrorsHandler ¶
func (s *Server) SetErrorsHandler(onGet func() []StoredError, onMark func(ids []string))
SetErrorsHandler sets the handler for retrieving unviewed errors.
func (*Server) SetFrictionHandler ¶
func (s *Server) SetFrictionHandler(cb func(payload FrictionPayload))
SetFrictionHandler sets the handler for friction messages. Friction events are fire-and-forget - no response is sent.
func (*Server) SetHandlers ¶
func (s *Server) SetHandlers(onSync func() error, onStop func(), onStatus func() *StatusData)
SetHandlers sets the core sync/stop/status handlers on the CallbackService. Panics if the server was created via NewServerWithService (no mutable adapter).
func (*Server) SetHeartbeatHandler ¶
func (s *Server) SetHeartbeatHandler(cb func(callerID string, payload json.RawMessage))
SetHeartbeatHandler sets the handler for heartbeat messages. callerID identifies which clone/worktree sent the heartbeat (path-based hash).
func (*Server) SetInstancesHandler ¶
func (s *Server) SetInstancesHandler(cb func() []InstanceInfo)
SetInstancesHandler sets the handler for retrieving active agent instances.
func (*Server) SetMurmurHandler ¶ added in v0.6.0
func (s *Server) SetMurmurHandler(fn func(payload MurmurPayload))
SetMurmurHandler sets the handler for murmur write+commit messages. Murmur events are fire-and-forget - no response is sent.
func (*Server) SetPauseMurmuringHandler ¶ added in v0.6.0
SetPauseMurmuringHandler sets the handler for pausing murmur nudging. Murmur pause events are fire-and-forget - no response is sent.
func (*Server) SetResumeMurmuringHandler ¶ added in v0.6.0
SetResumeMurmuringHandler sets the handler for resuming murmur nudging. Murmur resume events are fire-and-forget - no response is sent.
func (*Server) SetSessionFinalizeHandler ¶ added in v0.5.0
func (s *Server) SetSessionFinalizeHandler(fn func(payload SessionFinalizeIPCPayload))
SetSessionFinalizeHandler sets the handler for session finalize messages. Session finalize events are fire-and-forget - no response is sent.
func (*Server) SetSessionWatchStartHandler ¶ added in v0.6.1
func (s *Server) SetSessionWatchStartHandler(fn func(payload SessionWatchStartPayload))
SetSessionWatchStartHandler sets the handler for session watch start messages. Session watch start events are fire-and-forget - no response is sent.
func (*Server) SetSessionWatchStopHandler ¶ added in v0.6.1
func (s *Server) SetSessionWatchStopHandler(fn func(payload SessionWatchStopPayload))
SetSessionWatchStopHandler sets the handler for session watch stop messages. Session watch stop events are fire-and-forget - no response is sent.
func (*Server) SetSessionsHandler ¶
func (s *Server) SetSessionsHandler(cb func() []AgentSession)
SetSessionsHandler sets the handler for retrieving active agent sessions. Deprecated: Use SetInstancesHandler instead.
func (*Server) SetSyncHandler ¶
func (s *Server) SetSyncHandler(cb func(progress *ProgressWriter) error)
SetSyncHandler sets the sync handler with progress support. This supersedes the onSync callback set in SetHandlers.
func (*Server) SetSyncHistoryHandler ¶
SetSyncHistoryHandler sets the sync history handler.
func (*Server) SetTeamSyncHandler ¶
func (s *Server) SetTeamSyncHandler(cb func(progress *ProgressWriter) error)
SetTeamSyncHandler sets the team context sync handler with progress support.
func (*Server) SetTelemetryHandler ¶
func (s *Server) SetTelemetryHandler(cb func(payload json.RawMessage))
SetTelemetryHandler sets the handler for telemetry messages. Telemetry is fire-and-forget - no response is sent.
func (*Server) SetTriggerGCHandler ¶ added in v0.3.0
func (s *Server) SetTriggerGCHandler(handler func() *TriggerGCResponse)
SetTriggerGCHandler sets the handler for forced GC reclone.
func (*Server) SetWhisperHistoryHandler ¶ added in v0.6.0
func (s *Server) SetWhisperHistoryHandler(cb func(agentID string, before time.Time, limit int) (*WhisperHistoryResponse, error))
SetWhisperHistoryHandler sets the handler for whisper history (inspection) queries.
func (*Server) SetWhispersHandler ¶ added in v0.6.0
func (s *Server) SetWhispersHandler(cb func(agentID string, attention whisperstore.Attention, topics []string) ([]whisperstore.WhisperEntry, error))
SetWhispersHandler sets the handler for whisper queries.
type SessionFinalizeIPCPayload ¶ added in v0.5.0
type SessionFinalizeIPCPayload struct {
SessionName string `json:"session_name"` // e.g. "2026-03-12T11-09-ryan-OxTndR"
LedgerPath string `json:"ledger_path"` // ledger repo root
CachePath string `json:"cache_path"` // local cache session dir (source files)
ProjectRoot string `json:"project_root"` // for endpoint/auth resolution
}
SessionFinalizeIPCPayload carries the minimum info needed for the daemon to upload and finalize a session that was saved locally by the CLI.
type SessionWatchStartPayload ¶ added in v0.6.1
type SessionWatchStartPayload struct {
SessionName string `json:"session_name"` // session folder name (e.g. "2026-03-12T11-09-ryan-OxTndR")
SessionFile string `json:"session_file"` // path to agent's native session file (e.g. ~/.codex/sessions/...jsonl)
AdapterName string `json:"adapter_name"` // "codex", "claude-code", etc.
}
SessionWatchStartPayload carries info for the daemon to start tailing a hookless agent's session file and writing entries to raw.jsonl.
type SessionWatchStopPayload ¶ added in v0.6.1
type SessionWatchStopPayload struct {
SessionName string `json:"session_name"`
}
SessionWatchStopPayload signals the daemon to stop tailing a session.
type SessionsResponse ¶
type SessionsResponse struct {
Sessions []AgentSession `json:"sessions"`
}
SessionsResponse is the response for the sessions IPC message. Deprecated: Use InstancesResponse instead.
type StatusData ¶
type StatusData struct {
Running bool `json:"running"`
Pid int `json:"pid"`
Version string `json:"version"`
Uptime time.Duration `json:"uptime"`
WorkspacePath string `json:"workspace_path,omitempty"`
LedgerPath string `json:"ledger_path"`
LastSync time.Time `json:"last_sync"`
SyncIntervalRead time.Duration `json:"sync_interval_read"`
// error tracking
RecentErrorCount int `json:"recent_error_count,omitempty"`
LastError string `json:"last_error,omitempty"`
LastErrorTime string `json:"last_error_time,omitempty"`
// sync insights
TotalSyncs int `json:"total_syncs,omitempty"`
SyncsLastHour int `json:"syncs_last_hour,omitempty"`
AvgSyncTime time.Duration `json:"avg_sync_time,omitempty"`
// workspaces being synced, keyed by type ("ledger", "team-context")
// each type maps to a list of workspaces of that type (ledger has 1, team-context may have many)
Workspaces map[string][]WorkspaceSyncStatus `json:"workspaces,omitempty"`
ProjectTeamID string `json:"project_team_id,omitempty"` // primary team for this project
// team context sync (deprecated: use Workspaces["team-context"] instead)
TeamContexts []TeamContextSyncStatus `json:"team_contexts,omitempty"`
// inactivity tracking
InactivityTimeout time.Duration `json:"inactivity_timeout,omitempty"`
TimeSinceActivity time.Duration `json:"time_since_activity,omitempty"`
// heartbeat activity tracking (for sparklines)
Activity *ActivitySummary `json:"activity,omitempty"`
// authenticated user (from heartbeat credentials)
AuthenticatedUser *AuthenticatedUser `json:"authenticated_user,omitempty"`
// NeedsHelp is true when the daemon has issues requiring LLM reasoning.
// If the daemon could solve it with deterministic code, it already would have.
// This is the fast-path check for CLI - just reading a boolean.
NeedsHelp bool `json:"needs_help"`
// Issues contains problems the daemon cannot resolve alone.
// Keyed by (Type, Repo) - only one issue per combination.
// The LLM inspects repos directly to understand details; daemon just flags repo-level issues.
// Severity levels: "warning" (address soon), "error" (blocking), "critical" (urgent).
// No "info" level - if daemon needs help, it's at least a warning.
Issues []DaemonIssue `json:"issues,omitempty"`
// UnviewedErrorCount is the number of persisted errors that haven't been viewed.
// These are errors that persist across daemon restarts for user notification.
UnviewedErrorCount int `json:"unviewed_error_count,omitempty"`
// startup timing (how long the daemon took to start)
StartupDurationMs int64 `json:"startup_duration_ms,omitempty"`
ThrottleDurationMs int64 `json:"throttle_duration_ms,omitempty"`
// code index status
CodeDB *CodeDBStats `json:"code_db,omitempty"`
// agent work manager status
AgentWork *agentwork.AgentWorkStatus `json:"agent_work,omitempty"`
// connected clones/worktrees that have sent heartbeats
Callers []CallerInfo `json:"callers,omitempty"`
}
StatusData represents daemon status information.
func (*StatusData) HasConfiguredRepos ¶ added in v0.6.0
func (s *StatusData) HasConfiguredRepos() bool
HasConfiguredRepos reports whether the daemon has repos it should be syncing.
func (*StatusData) IsBootstrapping ¶ added in v0.6.0
func (s *StatusData) IsBootstrapping() bool
IsBootstrapping reports true when the daemon just started and hasn't completed its first sync yet. Callers use this to soften warnings during the grace period. Only true when repos are configured — a daemon with nothing to sync is never bootstrapping.
func (*StatusData) LastSyncForPath ¶ added in v0.3.0
func (s *StatusData) LastSyncForPath(path string) (time.Time, bool)
LastSyncForPath returns the last sync time for a workspace at the given path. Returns (time, true) if found and non-zero, (zero, false) otherwise. Safe to call on nil receiver.
type StatusJSON ¶ added in v0.6.0
type StatusJSON struct {
Running bool `json:"running"`
Health string `json:"health"`
Pid int `json:"pid"`
Version string `json:"version"`
Uptime string `json:"uptime"`
// backward-compatible top-level aliases (deprecated; use project.ledger)
LedgerPath string `json:"ledger_path,omitempty"`
LastSync *time.Time `json:"last_sync,omitempty"`
Sync statusSyncJSON `json:"sync"`
Project statusProjectGroupJSON `json:"project"`
OtherTeams []statusTeamContextJSON `json:"other_teams,omitempty"`
Issues []statusIssueJSON `json:"issues,omitempty"`
AutoExitIn string `json:"auto_exit_in,omitempty"`
}
StatusJSON is the top-level JSON output for `ox daemon status --json`. It mirrors the information shown in the human-readable output.
func BuildStatusJSON ¶ added in v0.6.0
func BuildStatusJSON(status *StatusData, cliVersion string) *StatusJSON
BuildStatusJSON constructs the curated JSON output that mirrors the human-readable status.
type StoredError ¶
type StoredError struct {
ID string `json:"id"`
Message string `json:"message"`
Code string `json:"code"`
Timestamp time.Time `json:"timestamp"`
Viewed bool `json:"viewed"`
Severity string `json:"severity"` // "warning", "error"
}
StoredError represents a daemon error that needs user attention. These are persisted to disk so they survive daemon restarts.
func NewStoredError ¶
func NewStoredError(code, message string, severity string) StoredError
NewStoredError creates a new StoredError with the given code and message.
type SyncEvent ¶
type SyncEvent struct {
Time time.Time `json:"time"`
Type string `json:"type"` // "pull", "push", "full", "team_context"
WorkspaceID string `json:"workspace_id,omitempty"` // workspace that was synced (e.g., "ledger", team_id)
Duration time.Duration `json:"duration"`
FilesChanged int `json:"files_changed"`
}
SyncEvent tracks a successful sync with metadata.
type SyncMetrics ¶
type SyncMetrics struct {
// contains filtered or unexported fields
}
SyncMetrics tracks observability counters and timing for sync operations. Counters and timestamps use lock-free atomics; only pullDurations needs a mutex.
func NewSyncMetrics ¶
func NewSyncMetrics() *SyncMetrics
NewSyncMetrics creates a new SyncMetrics instance.
func (*SyncMetrics) RecordConflict ¶
func (m *SyncMetrics) RecordConflict()
RecordConflict records a merge conflict detection.
func (*SyncMetrics) RecordDivergence ¶ added in v0.6.0
func (m *SyncMetrics) RecordDivergence()
RecordDivergence records a branch divergence detection.
func (*SyncMetrics) RecordPullFailure ¶
func (m *SyncMetrics) RecordPullFailure()
RecordPullFailure records a failed pull operation.
func (*SyncMetrics) RecordPullSuccess ¶
func (m *SyncMetrics) RecordPullSuccess(duration time.Duration)
RecordPullSuccess records a successful pull operation.
func (*SyncMetrics) RecordTeamSync ¶
func (m *SyncMetrics) RecordTeamSync()
RecordTeamSync records a successful team context sync.
func (*SyncMetrics) RecordTeamSyncError ¶
func (m *SyncMetrics) RecordTeamSyncError()
RecordTeamSyncError records a failed team context sync.
func (*SyncMetrics) Snapshot ¶
func (m *SyncMetrics) Snapshot() SyncMetricsSnapshot
Snapshot returns a point-in-time copy of metrics for reporting.
type SyncMetricsSnapshot ¶
type SyncMetricsSnapshot struct {
PullSuccessCount int64 `json:"pull_success_count"`
PullFailureCount int64 `json:"pull_failure_count"`
ConflictCount int64 `json:"conflict_count"`
DivergenceCount int64 `json:"divergence_count"`
TeamSyncCount int64 `json:"team_sync_count"`
TeamSyncErrorCount int64 `json:"team_sync_error_count"`
LastPullSuccess time.Time `json:"last_pull_success,omitempty"`
LastPullFailure time.Time `json:"last_pull_failure,omitempty"`
LastConflict time.Time `json:"last_conflict,omitempty"`
AvgPullDuration time.Duration `json:"avg_pull_duration"`
P95PullDuration time.Duration `json:"p95_pull_duration"`
}
SyncMetricsSnapshot is a point-in-time copy of sync metrics for reporting.
type SyncOption ¶ added in v0.6.0
type SyncOption func(*SyncScheduler)
SyncOption configures optional SyncScheduler dependencies.
func WithCredentialProvider ¶ added in v0.6.0
func WithCredentialProvider(c CredentialProvider) SyncOption
WithCredentialProvider overrides the default credential loader.
func WithGitRunner ¶ added in v0.6.0
func WithGitRunner(g gitutil.GitRunner) SyncOption
WithGitRunner overrides the default git command runner.
type SyncScheduler ¶
type SyncScheduler struct {
// contains filtered or unexported fields
}
SyncScheduler manages periodic sync operations.
func NewSyncScheduler ¶
func NewSyncScheduler(cfg *Config, logger *slog.Logger, opts ...SyncOption) *SyncScheduler
NewSyncScheduler creates a new sync scheduler. Optional SyncOption values override default dependencies for testing.
func (*SyncScheduler) Checkout ¶
func (s *SyncScheduler) Checkout(payload CheckoutPayload, progress *ProgressWriter) (*CheckoutResult, error)
Checkout clones a repository if it doesn't exist. Sends progress updates via ProgressWriter during long operations. Uses cloneSem to bound concurrent clone operations (blocks until a slot is available). After successful clone of ledger/team-context repos, creates AGENTS.md. Checkout clones a repository to the specified path.
┌─────────────────────────────────────────────────────────────────────────────┐ │ DAEMON IPC HANDLER: checkout │ │ Classification: CRITICAL PATH WITH FALLBACK │ │ (see docs/ai/specs/ipc-architecture.md) │ │ │ │ Clone is CRITICAL for product functionality - without it, SageOx cannot │ │ be initialized at all. However, IPC to this handler is NOT strictly │ │ required because the CLI has a FALLBACK: │ │ │ │ cmd/ox/doctor_git_repos.go:cloneViaDaemon() │ │ → Falls back to gitserver.CloneFromURLWithEndpoint() when daemon unavailable │ │ │ │ This handler is PREFERRED over direct clone because it provides: │ │ - Centralized credential handling │ │ - Progress streaming to CLI │ │ - Consistent locking for concurrent operations │ │ - AGENTS.md creation after clone │ │ - Workspace registry cache invalidation │ └─────────────────────────────────────────────────────────────────────────────┘
func (*SyncScheduler) LastError ¶
func (s *SyncScheduler) LastError() (string, time.Time)
LastError returns the most recent error message and time.
func (*SyncScheduler) LastRemoteChange ¶
func (s *SyncScheduler) LastRemoteChange(repoPath string) time.Time
LastRemoteChange returns the most recent FETCH_HEAD mtime for a repo. Returns zero time if no remote changes have been observed.
func (*SyncScheduler) LastSync ¶
func (s *SyncScheduler) LastSync() time.Time
LastSync returns the timestamp of the last successful sync.
func (*SyncScheduler) LedgerMu ¶ added in v0.5.0
func (s *SyncScheduler) LedgerMu() *sync.Mutex
LedgerMu returns the shared ledger mutex for git operations.
func (*SyncScheduler) Metrics ¶
func (s *SyncScheduler) Metrics() *SyncMetrics
Metrics returns the sync metrics for observability.
func (*SyncScheduler) RecentErrorCount ¶
func (s *SyncScheduler) RecentErrorCount() int
RecentErrorCount returns the count of recent errors (last hour).
func (*SyncScheduler) RemoteChangeActivity ¶
func (s *SyncScheduler) RemoteChangeActivity() *ActivityTracker
RemoteChangeActivity returns the remote change tracker for status display.
func (*SyncScheduler) SetActivityCallback ¶
func (s *SyncScheduler) SetActivityCallback(cb func())
SetActivityCallback sets the callback for activity tracking.
func (*SyncScheduler) SetAgentWorkSignal ¶ added in v0.5.0
func (s *SyncScheduler) SetAgentWorkSignal(ch chan<- struct{})
SetAgentWorkSignal sets the channel used to notify the agent work manager after a successful ledger pull.
func (*SyncScheduler) SetAuthTokenGetter ¶
func (s *SyncScheduler) SetAuthTokenGetter(cb func() string)
SetAuthTokenGetter sets the callback to get auth token from heartbeat cache. Used for lazy credential refresh via /api/v1/cli/repos.
func (*SyncScheduler) SetCodeDBManager ¶ added in v0.4.0
func (s *SyncScheduler) SetCodeDBManager(m *CodeDBManager)
SetCodeDBManager sets the CodeDB manager for periodic freshness checks.
func (*SyncScheduler) SetGitHubSyncManager ¶ added in v0.5.0
func (s *SyncScheduler) SetGitHubSyncManager(m *GitHubSyncManager)
SetGitHubSyncManager sets the GitHub sync manager for periodic PR/issue sync.
func (*SyncScheduler) SetIssueTracker ¶
func (s *SyncScheduler) SetIssueTracker(tracker *IssueTracker)
SetIssueTracker sets the issue tracker for reporting sync issues. Issues are reported when the daemon encounters problems it cannot resolve with deterministic code (e.g., merge conflicts requiring LLM reasoning).
func (*SyncScheduler) SetMurmurRelay ¶ added in v0.6.0
func (s *SyncScheduler) SetMurmurRelay(r *MurmurRelay)
SetMurmurRelay sets the murmur relay for converting murmur files to whisper entries.
func (*SyncScheduler) SetTelemetryCallback ¶
func (s *SyncScheduler) SetTelemetryCallback(cb func(syncType, operation, status string, duration time.Duration))
SetTelemetryCallback sets the callback for telemetry events. Called when sync operations complete with syncType, operation, status, and duration.
func (*SyncScheduler) SetWhisperRegistry ¶ added in v0.6.0
func (s *SyncScheduler) SetWhisperRegistry(r *WhisperRegistry)
SetWhisperRegistry sets the whisper registry for trigger whispers on sync events.
func (*SyncScheduler) Start ¶
func (s *SyncScheduler) Start(ctx context.Context)
Start starts the sync scheduler.
func (*SyncScheduler) Sync ¶
func (s *SyncScheduler) Sync() error
Sync performs an immediate full sync. Used for manual requests via IPC.
func (*SyncScheduler) SyncHistory ¶
func (s *SyncScheduler) SyncHistory() []SyncEvent
SyncHistory returns recent sync events for display.
func (*SyncScheduler) SyncStats ¶
func (s *SyncScheduler) SyncStats() SyncStatistics
SyncStats returns aggregate statistics about recent syncs.
func (*SyncScheduler) SyncWithProgress ¶
func (s *SyncScheduler) SyncWithProgress(progress *ProgressWriter) error
SyncWithProgress performs a full sync with progress updates. If progress is nil, no progress updates are sent. Returns an error if the ledger sync fails (surfaced to CLI via IPC).
func (*SyncScheduler) TeamContextStatus ¶
func (s *SyncScheduler) TeamContextStatus() []TeamContextSyncStatus
TeamContextStatus returns the current team context sync status. Uses the WorkspaceRegistry for a unified view of workspace state.
func (*SyncScheduler) TeamSync ¶
func (s *SyncScheduler) TeamSync(progress *ProgressWriter) error
TeamSync performs an on-demand sync of all team contexts with progress updates.
func (*SyncScheduler) TriggerAntiEntropy ¶
func (s *SyncScheduler) TriggerAntiEntropy()
TriggerAntiEntropy triggers self-healing checks for missing workspaces. This is called by IPC when doctor or other commands want to ensure ledgers and team contexts are cloned.
func (*SyncScheduler) TriggerGC ¶ added in v0.3.0
func (s *SyncScheduler) TriggerGC(ctx context.Context) *TriggerGCResponse
TriggerGC forces a GC reclone of all eligible team contexts, bypassing the interval check. Returns immediately if GC is already in progress. Runs synchronously.
func (*SyncScheduler) TriggerSync ¶
func (s *SyncScheduler) TriggerSync()
TriggerSync triggers an immediate sync (debounced by watcher).
func (*SyncScheduler) WorkspaceRegistry ¶
func (s *SyncScheduler) WorkspaceRegistry() *WorkspaceRegistry
WorkspaceRegistry returns the workspace registry for status queries.
type SyncState ¶ added in v0.3.0
type SyncState struct {
LastSync time.Time `json:"last_sync"`
LastSyncCommit string `json:"last_sync_commit"`
ConsecutiveFailures int `json:"consecutive_failures"`
}
SyncState tracks the sync health of a workspace (team context or ledger). Stored in .sageox/cache/sync-state.json within the workspace directory.
This is local-only machine state — never committed or pushed to the remote. Protected by .sageox/.gitignore (cache/ is gitignored).
func LoadSyncState ¶ added in v0.3.0
LoadSyncState reads sync state from .sageox/cache/sync-state.json within workspacePath. Returns empty SyncState (not error) if the file is missing or corrupt.
func (*SyncState) IsStale ¶ added in v0.3.0
IsStale returns true if the last successful sync exceeds the given threshold. A zero LastSync (never synced) is always stale.
func (*SyncState) RecordFailure ¶ added in v0.3.0
func (s *SyncState) RecordFailure()
RecordFailure increments the consecutive failure count.
func (*SyncState) RecordSuccess ¶ added in v0.3.0
RecordSuccess updates state after a successful sync.
func (*SyncState) StaleDuration ¶ added in v0.3.0
StaleDuration returns how long since the last successful sync. Returns 0 if never synced.
type SyncStatistics ¶
type SyncStatistics struct {
TotalSyncs int
SyncsLastHour int
AvgDuration time.Duration
OldestSync time.Time
NewestSync time.Time
}
SyncStatistics holds aggregate sync metrics.
type TeamContextSyncStatus ¶
type TeamContextSyncStatus struct {
TeamID string `json:"team_id"`
TeamName string `json:"team_name"`
Path string `json:"path"`
CloneURL string `json:"clone_url,omitempty"` // git remote URL
LastSync time.Time `json:"last_sync"`
LastErr string `json:"last_error,omitempty"`
Exists bool `json:"exists"` // whether the local path exists
}
TeamContextSyncStatus tracks sync status for a team context repo.
type TelemetryCollector ¶
type TelemetryCollector struct {
// contains filtered or unexported fields
}
TelemetryCollector manages event buffering and transmission to the cloud. It uses a ring buffer for bounded memory usage and supports server-controlled throttling via X-SageOx-Interval header.
func NewTelemetryCollector ¶
func NewTelemetryCollector(logger *slog.Logger) *TelemetryCollector
NewTelemetryCollector creates a new telemetry collector. It loads or generates a persistent client ID and checks opt-out settings.
func (*TelemetryCollector) IsEnabled ¶
func (c *TelemetryCollector) IsEnabled() bool
IsEnabled returns whether telemetry collection is enabled.
func (*TelemetryCollector) Record ¶
func (c *TelemetryCollector) Record(event string, props map[string]any)
Record adds an event to the ring buffer. This is non-blocking and safe for concurrent use.
func (*TelemetryCollector) RecordCodeIndexComplete ¶ added in v0.5.0
func (c *TelemetryCollector) RecordCodeIndexComplete(result *CodeIndexResult, status string)
RecordCodeIndexComplete records a code index completion event with per-stage timing.
func (*TelemetryCollector) RecordDaemonCrash ¶
func (c *TelemetryCollector) RecordDaemonCrash(uptime time.Duration, errType, errMsg string)
RecordDaemonCrash records a daemon crash/panic event.
func (*TelemetryCollector) RecordDaemonShutdown ¶
func (c *TelemetryCollector) RecordDaemonShutdown(uptime time.Duration, reason string)
RecordDaemonShutdown records the daemon shutdown event.
func (*TelemetryCollector) RecordDaemonStartup ¶
func (c *TelemetryCollector) RecordDaemonStartup()
RecordDaemonStartup records the daemon startup event.
func (*TelemetryCollector) RecordFromIPC ¶
func (c *TelemetryCollector) RecordFromIPC(event string, props map[string]any)
RecordFromIPC records an event received via IPC from CLI. The props should already contain app_type from the CLI.
func (*TelemetryCollector) RecordSyncComplete ¶
func (c *TelemetryCollector) RecordSyncComplete(syncType, operation, status string, duration time.Duration, recordsCount int)
RecordSyncComplete records a sync completion event.
func (*TelemetryCollector) Start ¶
func (c *TelemetryCollector) Start()
Start begins background processing of telemetry events.
func (*TelemetryCollector) Stats ¶
func (c *TelemetryCollector) Stats() TelemetryStats
Stats returns current telemetry stats for status display.
func (*TelemetryCollector) Stop ¶
func (c *TelemetryCollector) Stop()
Stop gracefully shuts down the telemetry collector. Performs a final flush before returning. Safe to call multiple times.
type TelemetryEvent ¶
type TelemetryEvent struct {
UUID string `json:"uuid"`
TS int64 `json:"ts"`
TSLocal string `json:"tslocal"`
Event string `json:"event"`
Props map[string]any `json:"props"`
}
TelemetryEvent matches the spec format for telemetry events.
type TelemetryPayload ¶
type TelemetryPayload struct {
Event string `json:"event"` // event name (e.g., "sync:complete")
Props map[string]any `json:"props"` // event properties
}
TelemetryPayload is the payload for telemetry events from CLI.
type TelemetryStats ¶
type TelemetryStats struct {
Enabled bool `json:"enabled"`
BufferCount int `json:"buffer_count"`
BufferSize int `json:"buffer_size"`
SendInterval time.Duration `json:"send_interval"`
LastSend time.Time `json:"last_send"`
ClientID string `json:"client_id"`
}
TelemetryStats holds telemetry statistics for status display.
type TimeBasedSource ¶ added in v0.6.0
type TimeBasedSource interface {
Name() string
Interval() time.Duration
Produce(ctx context.Context) []whisperstore.WhisperEntry
}
TimeBasedSource produces whisper entries on a periodic interval.
type TriggerGCResponse ¶ added in v0.3.0
type TriggerGCResponse struct {
Triggered int `json:"triggered"`
Skipped int `json:"skipped,omitempty"`
LedgerTriggered bool `json:"ledger_triggered,omitempty"`
Errors []string `json:"errors,omitempty"`
}
TriggerGCResponse is the response for trigger_gc requests. Errors include both failures (clone/validation errors) and skips due to uncommitted changes — GC is a disk-space optimization and must never destroy user work.
type VersionCache ¶ added in v0.2.0
type VersionCache struct {
// contains filtered or unexported fields
}
VersionCache manages the GitHub release version cache on disk. Thread-safe for concurrent access.
func NewVersionCache ¶ added in v0.2.0
func NewVersionCache(log *slog.Logger) *VersionCache
NewVersionCache creates a new version cache. The cache file is stored at ~/.cache/sageox/version-check.json (or XDG equivalent).
func (*VersionCache) CheckAndUpdate ¶ added in v0.2.0
func (v *VersionCache) CheckAndUpdate(ctx context.Context) error
CheckAndUpdate fetches the latest release from GitHub and updates the cache. Uses ETag conditional requests to avoid unnecessary data transfer. Safe to call concurrently.
func (*VersionCache) Data ¶ added in v0.2.0
func (v *VersionCache) Data() *VersionCacheData
Data returns a copy of the cached version data. Returns nil if no data is cached. Safe to call concurrently.
func (*VersionCache) Load ¶ added in v0.2.0
func (v *VersionCache) Load() error
Load reads the version cache from disk if it exists. Returns nil error if file doesn't exist (empty cache is valid). Safe to call concurrently.
func (*VersionCache) Save ¶ added in v0.2.0
func (v *VersionCache) Save(data *VersionCacheData) error
Save writes the version cache to disk atomically. Creates the cache directory if it doesn't exist. Safe to call concurrently.
type VersionCacheData ¶ added in v0.2.0
type VersionCacheData struct {
LatestVersion string `json:"latest_version"`
CheckedAt time.Time `json:"checked_at"`
ETag string `json:"etag,omitempty"`
}
VersionCacheData holds the cached latest release version from GitHub.
type Watcher ¶
type Watcher struct {
// contains filtered or unexported fields
}
Watcher monitors the ledger directory for changes.
func NewWatcher ¶
NewWatcher creates a new file watcher.
func NewWatcherWithFS ¶
func NewWatcherWithFS( path string, debounceWindow time.Duration, logger *slog.Logger, watcherFactory WatcherFactory, fileSystem FileSystem, ) *Watcher
NewWatcherWithFS creates a new file watcher with injectable dependencies. This constructor is primarily for testing, allowing injection of mock filesystem and watcher implementations.
type WatcherFactory ¶
type WatcherFactory func() (FileSystemWatcher, error)
WatcherFactory creates FileSystemWatcher instances. This abstraction allows tests to inject mock watchers.
type WhisperDBMaintainer ¶ added in v0.6.0
type WhisperDBMaintainer struct {
// contains filtered or unexported fields
}
WhisperDBMaintainer wraps a WhisperRegistry for the DBMaintainer interface. It runs pruning and size enforcement on ALL stores (ledger + team) via the registry's Prune() and EnforceMaxSize() methods. Integrity checks and size measurement use the ledger store specifically.
func NewWhisperDBMaintainer ¶ added in v0.6.0
func NewWhisperDBMaintainer(name string, registry *WhisperRegistry, retention time.Duration, maxBytes int64) *WhisperDBMaintainer
NewWhisperDBMaintainer creates a maintainer that covers all whisper stores (ledger + team) via the registry.
func (*WhisperDBMaintainer) Maintain ¶ added in v0.6.0
func (m *WhisperDBMaintainer) Maintain(_ context.Context) DBMaintenanceResult
func (*WhisperDBMaintainer) Name ¶ added in v0.6.0
func (m *WhisperDBMaintainer) Name() string
type WhisperHistoryPayload ¶ added in v0.6.0
type WhisperHistoryPayload struct {
AgentID string `json:"agent_id"` // empty = all agents
Before time.Time `json:"before,omitempty"` // cursor: only return entries older than this
Limit int `json:"limit,omitempty"` // max entries per page; 0 = default (50), max 200
}
WhisperHistoryPayload is the payload for whisper history queries.
type WhisperHistoryResponse ¶ added in v0.6.0
type WhisperHistoryResponse struct {
Entries []whisperstore.WhisperEntry `json:"entries"`
Cursor time.Time `json:"cursor"` // agent's delivery cursor (entries at/before this are "delivered")
HasCursor bool `json:"has_cursor"` // false if agent has never received whispers
HasMore bool `json:"has_more,omitempty"` // true if more entries exist beyond this page
NextCursor time.Time `json:"next_cursor,omitempty"` // pass as Before in next request to get the next page
}
WhisperHistoryResponse returns a page of whispers with delivery status.
type WhisperRegistry ¶ added in v0.6.0
type WhisperRegistry struct {
// contains filtered or unexported fields
}
WhisperRegistry aggregates whisper stores across scopes (ledger + team) and provides a unified API for adding and querying whispers.
The daemon creates one WhisperRegistry at startup. It wraps:
- One ledger whisper store (single-writer, owned by this daemon)
- Zero or more team whisper stores (shared across daemons via WAL)
func NewWhisperRegistry ¶ added in v0.6.0
func NewWhisperRegistry(ledgerStore *whisperstore.Store, logger *slog.Logger) *WhisperRegistry
NewWhisperRegistry creates a new registry with the given ledger store. Team stores can be added later via AddTeamStore.
func (*WhisperRegistry) Add ¶ added in v0.6.0
func (r *WhisperRegistry) Add(scope string, entries ...whisperstore.WhisperEntry) error
Add routes entries to the correct store based on scope.
func (*WhisperRegistry) AddTeamStore ¶ added in v0.6.0
func (r *WhisperRegistry) AddTeamStore(teamID string, store *whisperstore.Store)
AddTeamStore registers a team whisper store.
func (*WhisperRegistry) Close ¶ added in v0.6.0
func (r *WhisperRegistry) Close() error
Close closes all stores.
func (*WhisperRegistry) EnforceMaxSize ¶ added in v0.6.0
func (r *WhisperRegistry) EnforceMaxSize(maxBytes int64)
EnforceMaxSize runs size enforcement on all stores.
func (*WhisperRegistry) GetAllWhispers ¶ added in v0.6.0
func (r *WhisperRegistry) GetAllWhispers(agentID string) ([]whisperstore.WhisperEntry, error)
GetAllWhispers queries ALL stores and merges all whispers without advancing cursors. Used for inspection — shows pending and already-delivered whispers.
func (*WhisperRegistry) GetCursor ¶ added in v0.6.0
func (r *WhisperRegistry) GetCursor(agentID string) (time.Time, error)
GetCursor returns the agent's cursor from the ledger store (earliest cursor wins for display).
func (*WhisperRegistry) GetWhispers ¶ added in v0.6.0
func (r *WhisperRegistry) GetWhispers(agentID string, attention whisperstore.Attention, topics []string) ([]whisperstore.WhisperEntry, error)
GetWhispers queries ALL stores and merges results, sorted by importance then time.
func (*WhisperRegistry) GetWhispersPage ¶ added in v0.6.0
func (r *WhisperRegistry) GetWhispersPage(agentID string, before time.Time, limit int) ([]whisperstore.WhisperEntry, bool, error)
GetWhispersPage returns a paginated view of all whispers across all stores. before: if non-zero, only entries with created_at strictly before this time. limit: max entries to return; 0 uses the store's default (50). Merges results from ledger and all team stores, sorts by created_at DESC, and truncates. Returns (entries, hasMore, error).
func (*WhisperRegistry) HasTeamStore ¶ added in v0.6.0
func (r *WhisperRegistry) HasTeamStore(teamID string) bool
HasTeamStore returns true if a team whisper store is already registered.
func (*WhisperRegistry) IsRelayed ¶ added in v0.6.0
func (r *WhisperRegistry) IsRelayed(murmurID, scope string) (bool, error)
IsRelayed checks if a murmur has been relayed in the appropriate store.
func (*WhisperRegistry) LedgerStore ¶ added in v0.6.0
func (r *WhisperRegistry) LedgerStore() *whisperstore.Store
LedgerStore returns the underlying ledger whisper store. Used by DB maintenance to run pruning directly on the store.
func (*WhisperRegistry) MarkRelayed ¶ added in v0.6.0
func (r *WhisperRegistry) MarkRelayed(murmurID, scope string) error
MarkRelayed records that a murmur has been relayed.
func (*WhisperRegistry) Prune ¶ added in v0.6.0
func (r *WhisperRegistry) Prune(retention time.Duration)
Prune runs cleanup on all stores.
func (*WhisperRegistry) RemoveCursor ¶ added in v0.6.0
func (r *WhisperRegistry) RemoveCursor(agentID string)
RemoveCursor removes an agent's cursor from all stores.
func (*WhisperRegistry) ReopenLedgerStore ¶ added in v0.6.0
func (r *WhisperRegistry) ReopenLedgerStore(dbPath string) error
ReopenLedgerStore closes the current ledger store and opens a new one at dbPath. Called after GC reclone swaps the ledger directory — the old sql.DB handle is stale because the underlying inode was deleted during the rename-swap.
type WhisperScheduler ¶ added in v0.6.0
type WhisperScheduler struct {
// contains filtered or unexported fields
}
WhisperScheduler runs periodic whisper sources and feeds entries to the registry.
func NewWhisperScheduler ¶ added in v0.6.0
func NewWhisperScheduler(registry *WhisperRegistry, logger *slog.Logger) *WhisperScheduler
NewWhisperScheduler creates a scheduler that feeds periodic whisper entries to the registry.
func (*WhisperScheduler) RegisterSource ¶ added in v0.6.0
func (s *WhisperScheduler) RegisterSource(source TimeBasedSource)
RegisterSource adds a time-based source. Must be called before Start.
type WhispersPayload ¶ added in v0.6.0
type WhispersPayload struct {
AgentID string `json:"agent_id"`
Attention string `json:"attention,omitempty"` // "all", "normal", "focused" (default: "normal")
Topics []string `json:"topics,omitempty"` // nil = all topics
}
WhispersPayload is the payload for whisper queries.
type WhispersResponse ¶ added in v0.6.0
type WhispersResponse struct {
Entries []whisperstore.WhisperEntry `json:"entries"`
}
WhispersResponse is the response for whisper queries.
type WorkspaceRegistry ¶
type WorkspaceRegistry struct {
// contains filtered or unexported fields
}
WorkspaceRegistry tracks all workspaces (ledger + team contexts) for a daemon. Provides a unified view of workspace state for both daemon lifecycle and sync operations.
Design rationale: - Single source of truth for workspace state (replaces duplicate tracking in sync.go) - Caches loaded config to avoid repeated disk reads - Adds runtime state (Exists, LastErr, SyncInProgress) on top of config - Thread-safe for concurrent access from daemon goroutines
INVARIANT: WorkspaceRegistry is the sole writer to config.local.toml within the daemon. All config writes must go through registry methods (UpdateConfigLastSync, PersistLedgerPath, etc.) to prevent cache/disk divergence. Never call config.SaveLocalConfig directly from sync.go or other daemon code.
func NewWorkspaceRegistry ¶
func NewWorkspaceRegistry(projectRoot, repoName string) *WorkspaceRegistry
NewWorkspaceRegistry creates a new workspace registry for the given project.
func (*WorkspaceRegistry) CleanupRevokedTeamContexts ¶
func (r *WorkspaceRegistry) CleanupRevokedTeamContexts(currentTeamIDs map[string]bool)
CleanupRevokedTeamContexts removes team contexts that are no longer in the repo detail API response. Only removes team contexts that were discovered via the detail API (public TCs for non-members). Team contexts discovered from user credentials (member teams) are never removed by this cleanup, since the detail API is project-scoped and doesn't include all user teams.
func (*WorkspaceRegistry) ClearCloneRetry ¶
func (r *WorkspaceRegistry) ClearCloneRetry(id string)
ClearCloneRetry clears the clone retry state after a successful clone.
func (*WorkspaceRegistry) ClearSyncFailures ¶ added in v0.1.1
func (r *WorkspaceRegistry) ClearSyncFailures(id string)
ClearSyncFailures resets sync retry state after a successful sync.
func (*WorkspaceRegistry) ClearWorkspaceError ¶
func (r *WorkspaceRegistry) ClearWorkspaceError(id string)
ClearWorkspaceError clears the error for a workspace.
func (*WorkspaceRegistry) GetAllWorkspaces ¶
func (r *WorkspaceRegistry) GetAllWorkspaces() []WorkspaceState
GetAllWorkspaces returns copies of all workspaces (ledger + team contexts).
func (*WorkspaceRegistry) GetCloneRetryInfo ¶
func (r *WorkspaceRegistry) GetCloneRetryInfo(id string) (attempts int, nextAttempt time.Time)
GetCloneRetryInfo returns the current clone retry state for a workspace. Returns attempts=0 if no retry state exists.
func (*WorkspaceRegistry) GetEndpoint ¶
func (r *WorkspaceRegistry) GetEndpoint() string
GetEndpoint returns the API endpoint from project config.
func (*WorkspaceRegistry) GetGCInterval ¶ added in v0.3.0
func (r *WorkspaceRegistry) GetGCInterval(path string) int
GetGCInterval returns the GC interval for a workspace. Returns manifest.DefaultGCIntervalDays (7) if not set or workspace not found.
func (*WorkspaceRegistry) GetLastGCTime ¶ added in v0.3.0
func (r *WorkspaceRegistry) GetLastGCTime(id string) time.Time
GetLastGCTime returns when GC last ran for a workspace.
func (*WorkspaceRegistry) GetLedger ¶
func (r *WorkspaceRegistry) GetLedger() *WorkspaceState
GetLedger returns a copy of the ledger workspace state. Returns nil if no ledger workspace exists.
func (*WorkspaceRegistry) GetLedgerPath ¶
func (r *WorkspaceRegistry) GetLedgerPath() string
GetLedgerPath returns the ledger path for quick access.
func (*WorkspaceRegistry) GetRepoID ¶
func (r *WorkspaceRegistry) GetRepoID() string
GetRepoID returns the repo ID from project config. Used for API calls like GetLedgerStatus.
func (*WorkspaceRegistry) GetSyncIntervalMin ¶ added in v0.3.0
func (r *WorkspaceRegistry) GetSyncIntervalMin(path string) int
GetSyncIntervalMin returns the manifest-derived sync interval for a workspace identified by its local path. Returns 0 if not set or workspace not found.
func (*WorkspaceRegistry) GetSyncRetryInfo ¶ added in v0.1.1
func (r *WorkspaceRegistry) GetSyncRetryInfo(id string) (failures int, nextAttempt time.Time)
GetSyncRetryInfo returns the current sync retry state for a workspace.
func (*WorkspaceRegistry) GetTeamContextStatus ¶
func (r *WorkspaceRegistry) GetTeamContextStatus() []TeamContextSyncStatus
GetTeamContextStatus returns team context status in the legacy format. This provides backward compatibility with existing status display code.
func (*WorkspaceRegistry) GetTeamContexts ¶
func (r *WorkspaceRegistry) GetTeamContexts() []WorkspaceState
GetTeamContexts returns copies of all team context workspaces.
func (*WorkspaceRegistry) GetWorkspace ¶
func (r *WorkspaceRegistry) GetWorkspace(id string) *WorkspaceState
GetWorkspace returns a copy of a workspace by ID. Returns nil if the workspace doesn't exist.
func (*WorkspaceRegistry) HasFetchHead ¶
func (r *WorkspaceRegistry) HasFetchHead(id string) (exists bool, mtime time.Time)
HasFetchHead checks if the workspace has a .git/FETCH_HEAD file and returns its modification time.
func (*WorkspaceRegistry) InitializeLedger ¶
func (r *WorkspaceRegistry) InitializeLedger(cloneURL, projectRoot string)
InitializeLedger creates a ledger workspace from API-fetched URL. Called when ledger URL is fetched from API but no ledger workspace exists yet.
Path is computed via config.DefaultLedgerPath (user directory) for consistency.
func (*WorkspaceRegistry) InvalidateConfigCache ¶
func (r *WorkspaceRegistry) InvalidateConfigCache()
InvalidateConfigCache forces the next LoadFromConfig to reload from disk.
func (*WorkspaceRegistry) LoadFromConfig ¶
func (r *WorkspaceRegistry) LoadFromConfig() error
LoadFromConfig loads workspace state from config.local.toml. Uses cached config if recently loaded, otherwise reloads from disk.
func (*WorkspaceRegistry) PersistLedgerPath ¶ added in v0.1.1
func (r *WorkspaceRegistry) PersistLedgerPath(path string) error
PersistLedgerPath saves the ledger path to the config cache and disk. This keeps the cache in sync so UpdateConfigLastSync doesn't overwrite it.
func (*WorkspaceRegistry) ProjectTeamID ¶ added in v0.3.0
func (r *WorkspaceRegistry) ProjectTeamID() string
ProjectTeamID returns the project's primary team ID.
func (*WorkspaceRegistry) RecordSyncAttempt ¶
func (r *WorkspaceRegistry) RecordSyncAttempt(id string)
RecordSyncAttempt records that a sync was attempted for a workspace.
func (*WorkspaceRegistry) RecordSyncFailure ¶ added in v0.1.1
func (r *WorkspaceRegistry) RecordSyncFailure(id string)
RecordSyncFailure increments the consecutive failure count and sets backoff. Backoff: 1min, 2min, 4min, 8min, 16min, 32min→capped to 30min. Creates a minimal workspace entry if one doesn't exist yet.
func (*WorkspaceRegistry) RefreshExists ¶
func (r *WorkspaceRegistry) RefreshExists()
RefreshExists updates the Exists field for all workspaces.
func (*WorkspaceRegistry) RegisterTeamContextsFromAPI ¶
func (r *WorkspaceRegistry) RegisterTeamContextsFromAPI(teamContexts []api.RepoDetailTeamContext)
RegisterTeamContextsFromAPI registers team contexts discovered from the repo detail API. This is the third discovery source: public team contexts visible to non-members. Only adds new team contexts that aren't already tracked (from config or credentials).
func (*WorkspaceRegistry) SetCloneRetry ¶
func (r *WorkspaceRegistry) SetCloneRetry(id string, attempts int, nextAttempt time.Time)
SetCloneRetry records a failed clone attempt with exponential backoff. attempts is the total number of failed attempts (1-based). nextAttempt is when the next clone should be attempted.
func (*WorkspaceRegistry) SetGCInterval ¶ added in v0.3.0
func (r *WorkspaceRegistry) SetGCInterval(path string, days int)
SetGCInterval stores the manifest-derived GC interval for a workspace.
func (*WorkspaceRegistry) SetLedgerCloneURL ¶
func (r *WorkspaceRegistry) SetLedgerCloneURL(cloneURL string) bool
SetLedgerCloneURL sets the clone URL for the ledger workspace. Called when ledger URL is fetched from API. Returns false if no ledger workspace exists (caller should ensure ledger is initialized first).
func (*WorkspaceRegistry) SetSyncInProgress ¶
func (r *WorkspaceRegistry) SetSyncInProgress(id string, inProgress bool)
SetSyncInProgress marks a workspace as syncing.
func (*WorkspaceRegistry) SetSyncIntervalMin ¶ added in v0.3.0
func (r *WorkspaceRegistry) SetSyncIntervalMin(path string, minutes int)
SetSyncIntervalMin stores the manifest-derived sync interval for a workspace identified by its local path. Uses path lookup since callers may not have the workspace ID readily available.
func (*WorkspaceRegistry) SetWorkspaceError ¶
func (r *WorkspaceRegistry) SetWorkspaceError(id, errMsg string)
SetWorkspaceError records an error for a workspace.
func (*WorkspaceRegistry) ShouldRetryClone ¶
func (r *WorkspaceRegistry) ShouldRetryClone(id string) bool
ShouldRetryClone checks if enough time has passed to retry a failed clone. Returns true if: - No previous clone attempts (first try) - Current time is after NextCloneAttempt (backoff expired)
func (*WorkspaceRegistry) ShouldSync ¶ added in v0.1.1
func (r *WorkspaceRegistry) ShouldSync(id string) bool
ShouldSync checks if enough time has passed since the last sync failure to retry. Returns true if no previous failures or backoff has expired.
func (*WorkspaceRegistry) UpdateConfigLastSync ¶
func (r *WorkspaceRegistry) UpdateConfigLastSync(id string) error
UpdateConfigLastSync updates the last sync time in both registry and config file. This should be called after a successful sync.
func (*WorkspaceRegistry) UpdateLastGC ¶ added in v0.3.0
func (r *WorkspaceRegistry) UpdateLastGC(id string)
UpdateLastGC records that GC completed for a workspace.
type WorkspaceState ¶
type WorkspaceState struct {
// identity
ID string `json:"id"` // workspace ID (ledger=path hash, team=team_id)
Type WorkspaceType `json:"type"` // ledger or team_context
Path string `json:"path"` // local path to the git repo
// team-specific (only for team_context type)
TeamID string `json:"team_id,omitempty"`
TeamName string `json:"team_name,omitempty"`
TeamSlug string `json:"team_slug,omitempty"` // kebab-case slug for CLI identifiers
CloneURL string `json:"clone_url,omitempty"` // git clone URL (from credentials)
// config (from config.local.toml)
Endpoint string `json:"endpoint,omitempty"`
ConfigLastSync time.Time `json:"config_last_sync,omitempty"` // last sync recorded in config
// runtime state
Exists bool `json:"exists"` // whether the local path exists
LastErr string `json:"last_error,omitempty"` // last error during sync
LastSyncAttempt time.Time `json:"last_sync_attempt,omitempty"` // last time we tried to sync
SyncInProgress bool `json:"sync_in_progress,omitempty"` // currently syncing
// clone retry state (for background clones that fail)
CloneAttempts int `json:"clone_attempts,omitempty"` // number of failed clone attempts
NextCloneAttempt time.Time `json:"next_clone_attempt,omitempty"` // when to retry clone (exponential backoff)
// sync (fetch/pull) retry state — backoff on consecutive failures
SyncFailures int `json:"sync_failures,omitempty"` // consecutive failed sync attempts
NextSyncAttempt time.Time `json:"next_sync_attempt,omitempty"` // when to retry sync (exponential backoff)
// manifest-derived settings (from .sageox/sync.manifest in the team context repo)
SyncIntervalMin int `json:"sync_interval_min,omitempty"` // minutes between syncs (0 = use default)
GCIntervalDays int `json:"gc_interval_days,omitempty"` // days between reclones (0 = default 7)
LastGCTime time.Time `json:"last_gc_time,omitempty"` // when last GC reclone completed
}
WorkspaceState represents the runtime state of a workspace (repo). Combines config data with runtime sync status.
type WorkspaceSyncStatus ¶
type WorkspaceSyncStatus struct {
ID string `json:"id"` // workspace ID (e.g., "ledger", team_id)
Type string `json:"type"` // "ledger" or "team_context"
Path string `json:"path"` // local filesystem path
CloneURL string `json:"clone_url,omitempty"` // git remote URL
Exists bool `json:"exists"` // whether path exists locally
TeamID string `json:"team_id,omitempty"` // team ID (for team contexts)
TeamName string `json:"team_name,omitempty"` // team name (for team contexts)
TeamSlug string `json:"team_slug,omitempty"` // kebab-case team slug
LastSync time.Time `json:"last_sync,omitempty"` // last successful sync
LastErr string `json:"last_error,omitempty"` // last error message
Syncing bool `json:"syncing,omitempty"` // currently syncing
LastGCTime time.Time `json:"last_gc_time,omitempty"` // last successful GC reclone
GCIntervalDays int `json:"gc_interval_days,omitempty"` // configured GC cadence (0 = default 7)
}
WorkspaceSyncStatus represents the sync status of a workspace (ledger or team context). Provides a unified view of all repos the daemon is syncing.
type WorkspaceType ¶
type WorkspaceType string
WorkspaceType identifies the type of workspace.
const ( WorkspaceTypeLedger WorkspaceType = "ledger" WorkspaceTypeTeamContext WorkspaceType = "team_context" )
Source Files
¶
- activity.go
- codedb.go
- codedb_dirty_watcher.go
- config.go
- daemon.go
- db_maintenance.go
- db_maintenance_codedb.go
- db_maintenance_whisper.go
- errors.go
- fallback.go
- fallback_unix.go
- file_change_source.go
- flush_throttle.go
- friction.go
- github_sync.go
- heartbeat.go
- heartbeat_file.go
- instances.go
- ipc.go
- ipc_handlers.go
- ipc_unix.go
- issues.go
- jitter.go
- murmur_relay.go
- project_watcher.go
- registry.go
- signals_unix.go
- status_display.go
- sync.go
- sync_clone.go
- sync_deps.go
- sync_discovery.go
- sync_gc.go
- sync_heartbeat.go
- sync_managed.go
- sync_metrics.go
- sync_state.go
- sync_team.go
- telemetry.go
- version_cache.go
- watcher.go
- whisper_registry.go
- whisper_scheduler.go
- whisper_sources.go
- workspace_registry.go