Documentation
¶
Index ¶
- type AgentProcess
- type AgentWorkStats
- type AgentWorkStatus
- type ClaudeRunner
- type Manager
- type MockRunner
- type RateLimiter
- type RunRequest
- type RunResult
- type Runner
- type SessionFinalizeHandler
- func (h *SessionFinalizeHandler) BuildPrompt(item *WorkItem) (RunRequest, error)
- func (h *SessionFinalizeHandler) Cleanup(ledgerPath string)
- func (h *SessionFinalizeHandler) Detect(ledgerPath string) ([]*WorkItem, error)
- func (h *SessionFinalizeHandler) ProcessResult(item *WorkItem, result *RunResult) error
- func (h *SessionFinalizeHandler) SetPIDLookup(fn func(agentID string) int)
- func (h *SessionFinalizeHandler) SetQualityThresholds(upload, discard float64)
- func (h *SessionFinalizeHandler) Type() string
- type SessionFinalizePayload
- type WorkHandler
- type WorkItem
- type WorkQueue
- type WorkResult
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AgentProcess ¶
type AgentProcess struct {
PID int `json:"pid"`
WorkType string `json:"work_type"`
Target string `json:"target"`
StartedAt time.Time `json:"started_at"`
Status string `json:"status"` // "running", "completed", "failed"
Duration time.Duration `json:"duration,omitempty"`
Error string `json:"error,omitempty"`
ExitCode int `json:"exit_code,omitempty"`
}
AgentProcess describes a single agent invocation (active or completed).
type AgentWorkStats ¶
type AgentWorkStats struct {
TotalInvocations int `json:"total_invocations"`
Successes int `json:"successes"`
Failures int `json:"failures"`
}
AgentWorkStats tracks cumulative invocation metrics.
type AgentWorkStatus ¶
type AgentWorkStatus struct {
Enabled bool `json:"enabled"`
AgentType string `json:"agent_type"`
AgentAvail bool `json:"agent_available"`
QueueDepth int `json:"queue_depth"`
Active []AgentProcess `json:"active,omitempty"`
Recent []AgentProcess `json:"recent,omitempty"`
Stats AgentWorkStats `json:"stats"`
}
AgentWorkStatus is the IPC-friendly snapshot of Manager state.
type ClaudeRunner ¶
type ClaudeRunner struct {
// contains filtered or unexported fields
}
ClaudeRunner implements Runner using `claude --output-format stream-json`. It spawns Claude Code CLI in non-interactive mode. ClaudeRunner is safe for concurrent use — each Run() call is independent.
func NewClaudeRunner ¶
func NewClaudeRunner(logger *slog.Logger) *ClaudeRunner
NewClaudeRunner creates a ClaudeRunner by resolving the `claude` binary. If the binary is not found, the runner is still created but Available() returns false.
func (*ClaudeRunner) Available ¶
func (r *ClaudeRunner) Available() bool
Available reports whether the claude binary exists on disk.
func (*ClaudeRunner) Run ¶
func (r *ClaudeRunner) Run(ctx context.Context, req RunRequest) (*RunResult, error)
Run executes a claude invocation with the given request.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager coordinates agent work items in the daemon.
func NewManager ¶
func NewManager( runner Runner, logger *slog.Logger, configLoader func() *config.AgentWorkerConfig, syncSignal <-chan struct{}, ledgerPath string, ) *Manager
NewManager creates a Manager wired to the given runner and config loader. syncSignal is a channel that fires after each ledger sync to trigger immediate detection and processing. ledgerPath is passed to handler Detect calls.
func (*Manager) Enqueue ¶
Enqueue adds a work item to the queue. Safe for external callers (e.g., IPC). Signals the main loop to process immediately so externally-enqueued items (e.g., from IPC) don't wait for syncSignal or doctorTicker.
func (*Manager) ForceDetect ¶
ForceDetect runs detection on all enabled handlers, bypassing the cooldown. Returns the total number of work items enqueued. Safe to call from IPC handlers. Always runs session cleanup (ghost removal) regardless of agent_worker.enabled.
func (*Manager) RegisterHandler ¶
func (m *Manager) RegisterHandler(handler WorkHandler)
RegisterHandler adds a WorkHandler keyed by its Type().
func (*Manager) SetOnComplete ¶
func (m *Manager) SetOnComplete(fn func(result WorkResult))
SetOnComplete sets a callback invoked after each work item completes.
func (*Manager) Start ¶
Start runs the main loop until ctx is canceled.
Two triggers drive work:
- syncSignal: fires after a successful ledger pull. Only drains the existing queue — does NOT trigger detection scans (syncs are too frequent for doctor-style work).
- doctorTicker: fires every doctorInterval (5m). Runs detect+process. This is the sole trigger for detection scans, catching both newly-synced incomplete sessions and time-based conditions (e.g., stale recordings with dead parent PIDs).
func (*Manager) Status ¶
func (m *Manager) Status() AgentWorkStatus
Status returns a point-in-time snapshot of the manager for IPC status responses.
type MockRunner ¶
type MockRunner struct {
RunFunc func(ctx context.Context, req RunRequest) (*RunResult, error)
// contains filtered or unexported fields
}
MockRunner is a test double for the Runner interface.
func NewMockRunner ¶
func NewMockRunner(available bool) *MockRunner
NewMockRunner creates a MockRunner that reports itself as available.
func (*MockRunner) Available ¶
func (m *MockRunner) Available() bool
func (*MockRunner) Run ¶
func (m *MockRunner) Run(ctx context.Context, req RunRequest) (*RunResult, error)
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter implements a simple token-bucket rate limiter that refills at the start of each window. It is safe for concurrent use.
func NewRateLimiter ¶
func NewRateLimiter(maxTokens int, window time.Duration) *RateLimiter
NewRateLimiter creates a rate limiter that allows maxTokens calls per window.
func (*RateLimiter) Allow ¶
func (r *RateLimiter) Allow() bool
Allow consumes one token and returns true, or returns false if the bucket is exhausted for the current window.
func (*RateLimiter) Remaining ¶
func (r *RateLimiter) Remaining() int
Remaining returns how many tokens are left in the current window. The window resets automatically if it has expired.
type RunRequest ¶
RunRequest describes a single agent invocation.
type RunResult ¶
type RunResult struct {
Output string
Duration time.Duration
ExitCode int
TokensIn int // input tokens (from structured output if available)
TokensOut int // output tokens
}
RunResult captures the outcome of an agent invocation.
type Runner ¶
type Runner interface {
// Run executes an agent with the given request and returns the result.
Run(ctx context.Context, req RunRequest) (*RunResult, error)
// Available reports whether the runner's backing agent CLI is installed and reachable.
Available() bool
}
Runner spawns and manages agent CLI processes.
type SessionFinalizeHandler ¶
type SessionFinalizeHandler struct {
// contains filtered or unexported fields
}
SessionFinalizeHandler detects and finalizes incomplete sessions in the ledger. It generates missing artifacts: summary.md (via LLM), summary.json, session.html, and session.md (deterministic exports).
func NewSessionFinalizeHandler ¶
func NewSessionFinalizeHandler(logger *slog.Logger) *SessionFinalizeHandler
NewSessionFinalizeHandler creates a handler with the given logger.
func NewSessionFinalizeHandlerForTest ¶
func NewSessionFinalizeHandlerForTest(logger *slog.Logger) *SessionFinalizeHandler
NewSessionFinalizeHandlerForTest creates a handler with git operations disabled. Use in tests that don't have a real git repository.
func (*SessionFinalizeHandler) BuildPrompt ¶
func (h *SessionFinalizeHandler) BuildPrompt(item *WorkItem) (RunRequest, error)
BuildPrompt reads the raw session and constructs a summarization prompt.
func (*SessionFinalizeHandler) Cleanup ¶
func (h *SessionFinalizeHandler) Cleanup(ledgerPath string)
Cleanup performs lightweight filesystem housekeeping that doesn't require LLM processing: ghost session removal (dead PID + no data). Safe to run regardless of agent_worker.enabled. Does NOT clear .recording.json markers on sessions that have recoverable data — that's Detect()'s job when LLM processing follows.
Cleans both the ledger sessions dir and the session cache dir (where ox session list reads from). The cache dir is derived from the repoID in the ledger path.
func (*SessionFinalizeHandler) Detect ¶
func (h *SessionFinalizeHandler) Detect(ledgerPath string) ([]*WorkItem, error)
Detect scans both the ledger and session cache for sessions missing artifacts. The cache dir is where sessions are initially recorded; the ledger is where they are pushed after finalization. Both must be scanned to catch orphans.
func (*SessionFinalizeHandler) ProcessResult ¶
func (h *SessionFinalizeHandler) ProcessResult(item *WorkItem, result *RunResult) error
ProcessResult writes the LLM output and generates deterministic artifacts.
NOTE: this method performs git add/commit/push from the daemon, which is an intentional exception to the Daemon-CLI Git Operations Split documented in CLAUDE.md. Session finalization writes to unique, timestamped session paths with near-zero conflict risk, and must run asynchronously (no CLI available). If this architecture is rejected, the alternative is an IPC endpoint that delegates writes back to the CLI.
When triggered via async session upload (SAGEOX_ASYNC_SESSION_UPLOAD=1), content files are committed as regular files (not LFS pointers). This is acceptable for the initial rollout; LFS upload will be added to this handler as a follow-up to avoid committing large blobs to git.
func (*SessionFinalizeHandler) SetPIDLookup ¶
func (h *SessionFinalizeHandler) SetPIDLookup(fn func(agentID string) int)
SetPIDLookup sets the function used to look up agent PIDs from daemon in-memory state. This enables PID-based liveness detection for recordings that predate the ParentPID field.
func (*SessionFinalizeHandler) SetQualityThresholds ¶
func (h *SessionFinalizeHandler) SetQualityThresholds(upload, discard float64)
SetQualityThresholds configures the quality score thresholds for upload/discard.
func (*SessionFinalizeHandler) Type ¶
func (h *SessionFinalizeHandler) Type() string
Type implements WorkHandler.
type SessionFinalizePayload ¶
type SessionFinalizePayload struct {
SessionDir string `json:"session_dir"` // absolute path to session directory
RawPath string `json:"raw_path"` // path to raw.jsonl
Missing []string `json:"missing"` // which artifacts need generation
LedgerPath string `json:"ledger_path"` // ledger repo root (for git operations)
// contains filtered or unexported fields
}
SessionFinalizePayload carries per-item context through the pipeline.
type WorkHandler ¶
type WorkHandler interface {
// Type returns the work item type string this handler manages
// (e.g. "session-finalize").
Type() string
// Detect scans a ledger path and returns zero or more work items that
// need processing. Returning an empty slice is not an error.
Detect(ledgerPath string) ([]*WorkItem, error)
// BuildPrompt converts a work item into a RunRequest suitable for the
// Runner. The handler decides prompt text, working directory, token
// budget, and timeout.
BuildPrompt(item *WorkItem) (RunRequest, error)
// ProcessResult handles the agent's output after a successful run.
// Implementations typically persist derived artifacts back to the ledger.
ProcessResult(item *WorkItem, result *RunResult) error
}
WorkHandler defines how a specific work type is detected, prompted, and post-processed. Each handler is responsible for a single WorkItem.Type.
type WorkItem ¶
type WorkItem struct {
ID string // UUIDv7
Type string // e.g. "session-finalize"
Priority int // lower = higher priority
Payload any // type-specific data
CreatedAt time.Time
Attempts int
LastErr string
DedupKey string // only one item per dedup key at a time
// contains filtered or unexported fields
}
WorkItem represents a unit of background agent work.
type WorkQueue ¶
type WorkQueue struct {
// contains filtered or unexported fields
}
WorkQueue is a thread-safe, priority-ordered work queue with deduplication. It tracks both queued and in-progress items by dedup key so that the same logical work unit cannot be enqueued twice concurrently. The queue is capped at maxQueueDepth; excess items are rejected and re-detected on the next sync cycle.
func NewWorkQueue ¶
NewWorkQueue creates a ready-to-use WorkQueue.
func (*WorkQueue) Complete ¶
Complete marks a dedup key as no longer in-progress, allowing the same key to be enqueued again in the future.
func (*WorkQueue) Dequeue ¶
Dequeue removes and returns the highest-priority item, or nil if the queue is empty. The item's dedup key moves from the queued set to the in-progress set.
func (*WorkQueue) Enqueue ¶
Enqueue adds an item to the queue. It returns false without enqueuing if the dedup key is already present in the queue or in-progress set. The item's ID and CreatedAt are populated automatically if empty.
func (*WorkQueue) InProgress ¶
InProgress reports whether the given dedup key is currently being processed.