Documentation
¶
Index ¶
- func ResolveAgent(configured string) string
- type AgentProcess
- type AgentUsability
- type AgentWorkStats
- type AgentWorkStatus
- type ClaudeRunner
- type CodexRunner
- type GeminiRunner
- type Manager
- func (m *Manager) Enqueue(item *WorkItem) bool
- func (m *Manager) ForceDetect() int
- func (m *Manager) HasPendingWork() bool
- func (m *Manager) RegisterHandler(handler WorkHandler)
- func (m *Manager) SetOnComplete(fn func(result WorkResult))
- func (m *Manager) SetSessionWatcher(sw *SessionWatcherManager)
- func (m *Manager) Start(ctx context.Context)
- func (m *Manager) Status() AgentWorkStatus
- type MockRunner
- type RateLimiter
- type RunRequest
- type RunResult
- type Runner
- type SessionFinalizeHandler
- func (h *SessionFinalizeHandler) BuildPrompt(item *WorkItem) (RunRequest, error)
- func (h *SessionFinalizeHandler) Cleanup(ledgerPath string)
- func (h *SessionFinalizeHandler) Detect(ledgerPath string) ([]*WorkItem, error)
- func (h *SessionFinalizeHandler) DetectOrphanedForAgent(ledgerPath, agentID string, heartbeatPID int) []*WorkItem
- func (h *SessionFinalizeHandler) ProcessResult(item *WorkItem, result *RunResult) error
- func (h *SessionFinalizeHandler) SetLedgerMu(mu *sync.Mutex)
- func (h *SessionFinalizeHandler) SetPIDLookup(fn func(agentID string) int)
- func (h *SessionFinalizeHandler) SetProjectRoot(root string)
- func (h *SessionFinalizeHandler) SetQualityThresholds(upload, discard float64)
- func (h *SessionFinalizeHandler) Type() string
- type SessionFinalizePayload
- type SessionWatcherManager
- func (m *SessionWatcherManager) ActiveSessions() []string
- func (m *SessionWatcherManager) Cleanup()
- func (m *SessionWatcherManager) DetectAndRestart(ledgerPath string) int
- func (m *SessionWatcherManager) StartWatch(sessionName, sessionFile, adapterName, ledgerPath, cachePath string) error
- func (m *SessionWatcherManager) StopAll()
- func (m *SessionWatcherManager) StopWatch(sessionName string)
- type WorkHandler
- type WorkItem
- type WorkQueue
- func (q *WorkQueue) Complete(dedupKey string)
- func (q *WorkQueue) Dequeue() *WorkItem
- func (q *WorkQueue) Enqueue(item *WorkItem) bool
- func (q *WorkQueue) InProgress(dedupKey string) bool
- func (q *WorkQueue) InProgressCount() int
- func (q *WorkQueue) Len() int
- func (q *WorkQueue) Requeue(item *WorkItem) bool
- type WorkResult
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ResolveAgent ¶ added in v0.6.0
ResolveAgent resolves the effective agent from a config value. If configured is "" (unconfigured), auto-detects by checking which agent CLIs are installed AND authenticated, in priority order: claude > codex. Returns "none" if no usable agent found. If configured is a specific value ("none", "claude", "codex"), returns it as-is.
Types ¶
type AgentProcess ¶
type AgentProcess struct {
PID int `json:"pid"`
WorkType string `json:"work_type"`
Target string `json:"target"`
StartedAt time.Time `json:"started_at"`
Status string `json:"status"` // "running", "completed", "failed"
Duration time.Duration `json:"duration,omitempty"`
Error string `json:"error,omitempty"`
ExitCode int `json:"exit_code,omitempty"`
}
AgentProcess describes a single agent invocation (active or completed).
type AgentUsability ¶ added in v0.6.0
type AgentUsability struct {
Installed bool
Authenticated bool
AuthDetail string // human-readable auth status (e.g. "OAuth", "API key", "not logged in")
}
AgentUsability describes whether an agent CLI is installed and authenticated.
func CheckAgentUsability ¶ added in v0.6.0
func CheckAgentUsability(agent string) AgentUsability
CheckAgentUsability probes whether the given agent CLI is installed and authenticated. Designed to be fast (< 2s) for use in doctor checks and auto-detection.
type AgentWorkStats ¶
type AgentWorkStats struct {
TotalInvocations int `json:"total_invocations"`
Successes int `json:"successes"`
Failures int `json:"failures"`
}
AgentWorkStats tracks cumulative invocation metrics.
type AgentWorkStatus ¶
type AgentWorkStatus struct {
Enabled bool `json:"enabled"`
AgentType string `json:"agent_type"`
AgentAvail bool `json:"agent_available"`
QueueDepth int `json:"queue_depth"`
Active []AgentProcess `json:"active,omitempty"`
Recent []AgentProcess `json:"recent,omitempty"`
Stats AgentWorkStats `json:"stats"`
}
AgentWorkStatus is the IPC-friendly snapshot of Manager state.
type ClaudeRunner ¶
type ClaudeRunner struct {
// contains filtered or unexported fields
}
ClaudeRunner implements Runner using `claude --output-format stream-json`. It spawns Claude Code CLI in non-interactive mode. ClaudeRunner is safe for concurrent use — each Run() call is independent.
func NewClaudeRunner ¶
func NewClaudeRunner(logger *slog.Logger) *ClaudeRunner
NewClaudeRunner creates a ClaudeRunner by resolving the `claude` binary. If the binary is not found, the runner is still created but Available() returns false.
func (*ClaudeRunner) Available ¶
func (r *ClaudeRunner) Available() bool
Available reports whether the claude binary exists on disk.
func (*ClaudeRunner) Run ¶
func (r *ClaudeRunner) Run(ctx context.Context, req RunRequest) (*RunResult, error)
Run executes a claude invocation with the given request.
type CodexRunner ¶ added in v0.6.0
type CodexRunner struct {
// contains filtered or unexported fields
}
CodexRunner implements Runner using the OpenAI Codex CLI. It spawns Codex in non-interactive mode with --full-auto. CodexRunner is safe for concurrent use — each Run() call is independent.
func NewCodexRunner ¶ added in v0.6.0
func NewCodexRunner(logger *slog.Logger) *CodexRunner
NewCodexRunner creates a CodexRunner by resolving the `codex` binary. If the binary is not found, the runner is still created but Available() returns false.
func (*CodexRunner) Available ¶ added in v0.6.0
func (r *CodexRunner) Available() bool
Available reports whether the codex binary exists on disk.
func (*CodexRunner) Run ¶ added in v0.6.0
func (r *CodexRunner) Run(ctx context.Context, req RunRequest) (*RunResult, error)
Run executes a codex invocation with the given request.
type GeminiRunner ¶ added in v0.6.1
type GeminiRunner struct {
// contains filtered or unexported fields
}
GeminiRunner implements Runner using `gemini -p`. It spawns Gemini CLI in non-interactive (headless) mode. GeminiRunner is safe for concurrent use — each Run() call is independent.
func NewGeminiRunner ¶ added in v0.6.1
func NewGeminiRunner(logger *slog.Logger) *GeminiRunner
NewGeminiRunner creates a GeminiRunner by resolving the `gemini` binary. If the binary is not found, the runner is still created but Available() returns false.
func (*GeminiRunner) Available ¶ added in v0.6.1
func (r *GeminiRunner) Available() bool
Available reports whether the gemini binary exists on disk.
func (*GeminiRunner) Run ¶ added in v0.6.1
func (r *GeminiRunner) Run(ctx context.Context, req RunRequest) (*RunResult, error)
Run executes a gemini invocation with the given request. Gemini CLI outputs plain text (not JSONL), so parsing is simpler than Claude.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager coordinates agent work items in the daemon.
func NewManager ¶
func NewManager( runner Runner, logger *slog.Logger, configLoader func() *config.AgentWorkerConfig, syncSignal <-chan struct{}, ledgerPath string, ) *Manager
NewManager creates a Manager wired to the given runner and config loader. syncSignal is a channel that fires after each ledger sync to trigger immediate detection and processing. ledgerPath is passed to handler Detect calls.
func (*Manager) Enqueue ¶
Enqueue adds a work item to the queue. Safe for external callers (e.g., IPC). Signals the main loop to process immediately so externally-enqueued items (e.g., from IPC) don't wait for syncSignal or doctorTicker.
func (*Manager) ForceDetect ¶
ForceDetect runs detection on all enabled handlers, bypassing the cooldown. Returns the total number of work items enqueued. Safe to call from IPC handlers. Always runs session cleanup (ghost removal) regardless of agent_worker.enabled.
func (*Manager) HasPendingWork ¶ added in v0.6.1
HasPendingWork returns true if there are queued or in-flight finalization items.
func (*Manager) RegisterHandler ¶
func (m *Manager) RegisterHandler(handler WorkHandler)
RegisterHandler adds a WorkHandler keyed by its Type().
func (*Manager) SetOnComplete ¶
func (m *Manager) SetOnComplete(fn func(result WorkResult))
SetOnComplete sets a callback invoked after each work item completes.
func (*Manager) SetSessionWatcher ¶ added in v0.6.1
func (m *Manager) SetSessionWatcher(sw *SessionWatcherManager)
SetSessionWatcher sets an optional SessionWatcherManager for periodic detection and cleanup of tail-mode session watchers. Called during daemon startup.
func (*Manager) Start ¶
Start runs the main loop until ctx is canceled.
Two triggers drive work:
- syncSignal: fires after a successful ledger pull. Only drains the existing queue — does NOT trigger detection scans (syncs are too frequent for doctor-style work).
- doctorTicker: fires every doctorInterval (5m). Runs detect+process. This is the sole trigger for detection scans, catching both newly-synced incomplete sessions and time-based conditions (e.g., stale recordings with dead parent PIDs).
func (*Manager) Status ¶
func (m *Manager) Status() AgentWorkStatus
Status returns a point-in-time snapshot of the manager for IPC status responses.
type MockRunner ¶
type MockRunner struct {
RunFunc func(ctx context.Context, req RunRequest) (*RunResult, error)
// contains filtered or unexported fields
}
MockRunner is a test double for the Runner interface.
func NewMockRunner ¶
func NewMockRunner(available bool) *MockRunner
NewMockRunner creates a MockRunner that reports itself as available.
func (*MockRunner) Available ¶
func (m *MockRunner) Available() bool
func (*MockRunner) Run ¶
func (m *MockRunner) Run(ctx context.Context, req RunRequest) (*RunResult, error)
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter implements a simple token-bucket rate limiter that refills at the start of each window. It is safe for concurrent use.
func NewRateLimiter ¶
func NewRateLimiter(maxTokens int, window time.Duration) *RateLimiter
NewRateLimiter creates a rate limiter that allows maxTokens calls per window.
func (*RateLimiter) Allow ¶
func (r *RateLimiter) Allow() bool
Allow consumes one token and returns true, or returns false if the bucket is exhausted for the current window.
func (*RateLimiter) Remaining ¶
func (r *RateLimiter) Remaining() int
Remaining returns how many tokens are left in the current window. The window resets automatically if it has expired.
type RunRequest ¶
type RunRequest struct {
Prompt string
WorkDir string
TimeoutOverride time.Duration
// SkipLLM bypasses the LLM runner entirely. The manager calls
// ProcessResult directly with an empty RunResult. Use this when
// the work item has all the information it needs to proceed without
// generating a prompt (e.g., upload-only session recovery).
SkipLLM bool
}
RunRequest describes a single agent invocation.
type RunResult ¶
type RunResult struct {
Output string
Duration time.Duration
ExitCode int
TokensIn int // input tokens (from structured output if available)
TokensOut int // output tokens
}
RunResult captures the outcome of an agent invocation.
type Runner ¶
type Runner interface {
// Run executes an agent with the given request and returns the result.
Run(ctx context.Context, req RunRequest) (*RunResult, error)
// Available reports whether the runner's backing agent CLI is installed and reachable.
Available() bool
}
Runner spawns and manages agent CLI processes.
type SessionFinalizeHandler ¶
type SessionFinalizeHandler struct {
// contains filtered or unexported fields
}
SessionFinalizeHandler detects and finalizes incomplete sessions in the ledger. It generates missing artifacts: summary.md (via LLM), summary.json, and session.md (deterministic exports).
func NewSessionFinalizeHandler ¶
func NewSessionFinalizeHandler(logger *slog.Logger) *SessionFinalizeHandler
NewSessionFinalizeHandler creates a handler with the given logger.
func NewSessionFinalizeHandlerForTest ¶
func NewSessionFinalizeHandlerForTest(logger *slog.Logger) *SessionFinalizeHandler
NewSessionFinalizeHandlerForTest creates a handler with git and LFS operations disabled. Use in tests that don't have a real git repository or LFS server.
func (*SessionFinalizeHandler) BuildPrompt ¶
func (h *SessionFinalizeHandler) BuildPrompt(item *WorkItem) (RunRequest, error)
BuildPrompt reads the raw session and constructs a summarization prompt. For UploadOnly items, returns a SkipLLM request — no prompt needed.
func (*SessionFinalizeHandler) Cleanup ¶
func (h *SessionFinalizeHandler) Cleanup(ledgerPath string)
Cleanup performs lightweight filesystem housekeeping that doesn't require LLM processing: ghost session removal (dead PID + no data). Safe to run regardless of agent_worker.enabled. Does NOT clear .recording.json markers on sessions that have recoverable data — that's Detect()'s job when LLM processing follows.
Scans three locations where sessions can live:
- ledgerPath/.sageox/cache/sessions/ — primary ledger cache (StartRecording writes here)
- ledgerPath/sessions/ — git-tracked sessions (after finalization/upload)
- XDG cache paths — legacy and alternate cache locations
func (*SessionFinalizeHandler) Detect ¶
func (h *SessionFinalizeHandler) Detect(ledgerPath string) ([]*WorkItem, error)
Detect scans all locations where sessions can live for sessions missing artifacts. Sessions are initially written to ledgerPath/.sageox/cache/sessions/ (primary), and moved to ledgerPath/sessions/ after finalization/upload. XDG cache paths are also scanned for sessions written by older ox versions or different environments.
func (*SessionFinalizeHandler) DetectOrphanedForAgent ¶ added in v0.6.1
func (h *SessionFinalizeHandler) DetectOrphanedForAgent(ledgerPath, agentID string, heartbeatPID int) []*WorkItem
DetectOrphanedForAgent scans for recordings belonging to a specific agent and returns work items for immediate finalization. Unlike Detect(), this skips the normal stale-recording threshold and immediately considers any recording for the given agent as orphaned.
heartbeatPID is the PID from the heartbeat tracker. The function cross-checks it against the recording's ParentPID to avoid misclassifying live sessions whose heartbeat recorded a short-lived shell PID.
func (*SessionFinalizeHandler) ProcessResult ¶
func (h *SessionFinalizeHandler) ProcessResult(item *WorkItem, result *RunResult) error
ProcessResult writes the LLM output and generates deterministic artifacts.
NOTE: this method performs git add/commit/push from the daemon, which is an intentional exception to the Daemon-CLI Git Operations Split documented in CLAUDE.md. Session finalization writes to unique, timestamped session paths with near-zero conflict risk, and must run asynchronously (no CLI available). If this architecture is rejected, the alternative is an IPC endpoint that delegates writes back to the CLI.
Content files are uploaded to LFS (when projectRoot is set) before git commit. LFS upload is best-effort: on failure, content is committed as regular git blobs as a fallback to avoid losing the session.
func (*SessionFinalizeHandler) SetLedgerMu ¶ added in v0.6.0
func (h *SessionFinalizeHandler) SetLedgerMu(mu *sync.Mutex)
SetLedgerMu sets the shared ledger mutex for git operation serialization.
func (*SessionFinalizeHandler) SetPIDLookup ¶
func (h *SessionFinalizeHandler) SetPIDLookup(fn func(agentID string) int)
SetPIDLookup sets the function used to look up agent PIDs from daemon in-memory state. This enables PID-based liveness detection for recordings that predate the ParentPID field.
func (*SessionFinalizeHandler) SetProjectRoot ¶ added in v0.6.0
func (h *SessionFinalizeHandler) SetProjectRoot(root string)
SetProjectRoot sets the workspace root for endpoint/credential resolution. Required for LFS upload during session finalization.
func (*SessionFinalizeHandler) SetQualityThresholds ¶
func (h *SessionFinalizeHandler) SetQualityThresholds(upload, discard float64)
SetQualityThresholds configures the quality score thresholds for upload/discard.
func (*SessionFinalizeHandler) Type ¶
func (h *SessionFinalizeHandler) Type() string
Type implements WorkHandler.
type SessionFinalizePayload ¶
type SessionFinalizePayload struct {
SessionDir string `json:"session_dir"` // absolute path to session directory
RawPath string `json:"raw_path"` // path to raw.jsonl
Missing []string `json:"missing"` // which artifacts need generation
LedgerPath string `json:"ledger_path"` // ledger repo root (for git operations)
// UploadOnly skips LLM summarization. All artifacts already exist locally;
// the session just needs to be staged in ledger/sessions/ and pushed.
UploadOnly bool `json:"upload_only,omitempty"`
// contains filtered or unexported fields
}
SessionFinalizePayload carries per-item context through the pipeline.
type SessionWatcherManager ¶ added in v0.6.1
type SessionWatcherManager struct {
// contains filtered or unexported fields
}
SessionWatcherManager manages TailWatchers for active tail-mode recordings. It runs watchers that tail agent session files and write entries to raw.jsonl.
Two entry points:
- IPC (fast): CLI sends session_watch_start → StartWatch called directly
- Doctor (slow): DetectAndRestart scans for .recording.json with WatchMode:"tail" that don't have an active watcher → restarts them (daemon restarted)
Offset persistence: the watcher persists SourceOffset to .recording.json after each batch. On daemon restart, DetectAndRestart reads the persisted offset and does a catch-up read to recover entries written while the daemon was down.
func NewSessionWatcherManager ¶ added in v0.6.1
func NewSessionWatcherManager(logger *slog.Logger) *SessionWatcherManager
NewSessionWatcherManager creates a manager for tail-mode session watchers.
func (*SessionWatcherManager) ActiveSessions ¶ added in v0.6.1
func (m *SessionWatcherManager) ActiveSessions() []string
ActiveSessions returns the names of sessions currently being watched.
func (*SessionWatcherManager) Cleanup ¶ added in v0.6.1
func (m *SessionWatcherManager) Cleanup()
Cleanup stops watchers for sessions that have been stopped, whose .recording.json has been removed, or whose agent PID has died.
func (*SessionWatcherManager) DetectAndRestart ¶ added in v0.6.1
func (m *SessionWatcherManager) DetectAndRestart(ledgerPath string) int
DetectAndRestart scans for .recording.json files with WatchMode:"tail" that aren't currently being watched, and restarts their watchers. Called by the daemon's doctor interval to handle daemon restarts.
On restart, reads the persisted SourceOffset from .recording.json so the catch-up read recovers entries written while the daemon was down.
func (*SessionWatcherManager) StartWatch ¶ added in v0.6.1
func (m *SessionWatcherManager) StartWatch( sessionName, sessionFile, adapterName, ledgerPath, cachePath string, ) error
StartWatch begins tailing a session file and writing entries to raw.jsonl. Called directly from IPC handler (bypasses work queue for fast startup). Idempotent: if already watching this session, returns nil.
func (*SessionWatcherManager) StopAll ¶ added in v0.6.1
func (m *SessionWatcherManager) StopAll()
StopAll stops all active watchers and waits for goroutines to finish. Called during daemon shutdown.
func (*SessionWatcherManager) StopWatch ¶ added in v0.6.1
func (m *SessionWatcherManager) StopWatch(sessionName string)
StopWatch stops tailing a session.
type WorkHandler ¶
type WorkHandler interface {
// Type returns the work item type string this handler manages
// (e.g. "session-finalize").
Type() string
// Detect scans a ledger path and returns zero or more work items that
// need processing. Returning an empty slice is not an error.
Detect(ledgerPath string) ([]*WorkItem, error)
// BuildPrompt converts a work item into a RunRequest suitable for the
// Runner. The handler decides prompt text, working directory, token
// budget, and timeout.
BuildPrompt(item *WorkItem) (RunRequest, error)
// ProcessResult handles the agent's output after a successful run.
// Implementations typically persist derived artifacts back to the ledger.
ProcessResult(item *WorkItem, result *RunResult) error
}
WorkHandler defines how a specific work type is detected, prompted, and post-processed. Each handler is responsible for a single WorkItem.Type.
type WorkItem ¶
type WorkItem struct {
ID string // UUIDv7
Type string // e.g. "session-finalize"
Priority int // lower = higher priority
Payload any // type-specific data
CreatedAt time.Time
Attempts int
LastErr string
DedupKey string // only one item per dedup key at a time
// contains filtered or unexported fields
}
WorkItem represents a unit of background agent work.
type WorkQueue ¶
type WorkQueue struct {
// contains filtered or unexported fields
}
WorkQueue is a thread-safe, priority-ordered work queue with deduplication. It tracks both queued and in-progress items by dedup key so that the same logical work unit cannot be enqueued twice concurrently. The queue is capped at maxQueueDepth; excess items are rejected and re-detected on the next sync cycle.
func NewWorkQueue ¶
NewWorkQueue creates a ready-to-use WorkQueue.
func (*WorkQueue) Complete ¶
Complete marks a dedup key as no longer in-progress, allowing the same key to be enqueued again in the future.
func (*WorkQueue) Dequeue ¶
Dequeue removes and returns the highest-priority item, or nil if the queue is empty. The item's dedup key moves from the queued set to the in-progress set.
func (*WorkQueue) Enqueue ¶
Enqueue adds an item to the queue. It returns false without enqueuing if the dedup key is already present in the queue or in-progress set. The item's ID and CreatedAt are populated automatically if empty.
func (*WorkQueue) InProgress ¶
InProgress reports whether the given dedup key is currently being processed.
func (*WorkQueue) InProgressCount ¶ added in v0.6.1
InProgressCount returns the number of items currently being processed.