agentwork

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2026 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AgentProcess

type AgentProcess struct {
	PID       int           `json:"pid"`
	WorkType  string        `json:"work_type"`
	Target    string        `json:"target"`
	StartedAt time.Time     `json:"started_at"`
	Status    string        `json:"status"` // "running", "completed", "failed"
	Duration  time.Duration `json:"duration,omitempty"`
	Error     string        `json:"error,omitempty"`
	ExitCode  int           `json:"exit_code,omitempty"`
}

AgentProcess describes a single agent invocation (active or completed).

type AgentWorkStats

type AgentWorkStats struct {
	TotalInvocations int `json:"total_invocations"`
	Successes        int `json:"successes"`
	Failures         int `json:"failures"`
}

AgentWorkStats tracks cumulative invocation metrics.

type AgentWorkStatus

type AgentWorkStatus struct {
	Enabled    bool           `json:"enabled"`
	AgentType  string         `json:"agent_type"`
	AgentAvail bool           `json:"agent_available"`
	QueueDepth int            `json:"queue_depth"`
	Active     []AgentProcess `json:"active,omitempty"`
	Recent     []AgentProcess `json:"recent,omitempty"`
	Stats      AgentWorkStats `json:"stats"`
}

AgentWorkStatus is the IPC-friendly snapshot of Manager state.

type ClaudeRunner

type ClaudeRunner struct {
	// contains filtered or unexported fields
}

ClaudeRunner implements Runner using `claude --output-format stream-json`. It spawns Claude Code CLI in non-interactive mode. ClaudeRunner is safe for concurrent use — each Run() call is independent.

func NewClaudeRunner

func NewClaudeRunner(logger *slog.Logger) *ClaudeRunner

NewClaudeRunner creates a ClaudeRunner by resolving the `claude` binary. If the binary is not found, the runner is still created but Available() returns false.

func (*ClaudeRunner) Available

func (r *ClaudeRunner) Available() bool

Available reports whether the claude binary exists on disk.

func (*ClaudeRunner) Run

func (r *ClaudeRunner) Run(ctx context.Context, req RunRequest) (*RunResult, error)

Run executes a claude invocation with the given request.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager coordinates agent work items in the daemon.

func NewManager

func NewManager(
	runner Runner,
	logger *slog.Logger,
	configLoader func() *config.AgentWorkerConfig,
	syncSignal <-chan struct{},
	ledgerPath string,
) *Manager

NewManager creates a Manager wired to the given runner and config loader. syncSignal is a channel that fires after each ledger sync to trigger immediate detection and processing. ledgerPath is passed to handler Detect calls.

func (*Manager) Enqueue

func (m *Manager) Enqueue(item *WorkItem) bool

Enqueue adds a work item to the queue. Safe for external callers (e.g., IPC). Signals the main loop to process immediately so externally-enqueued items (e.g., from IPC) don't wait for syncSignal or doctorTicker.

func (*Manager) ForceDetect

func (m *Manager) ForceDetect() int

ForceDetect runs detection on all enabled handlers, bypassing the cooldown. Returns the total number of work items enqueued. Safe to call from IPC handlers. Always runs session cleanup (ghost removal) regardless of agent_worker.enabled.

func (*Manager) RegisterHandler

func (m *Manager) RegisterHandler(handler WorkHandler)

RegisterHandler adds a WorkHandler keyed by its Type().

func (*Manager) SetOnComplete

func (m *Manager) SetOnComplete(fn func(result WorkResult))

SetOnComplete sets a callback invoked after each work item completes.

func (*Manager) Start

func (m *Manager) Start(ctx context.Context)

Start runs the main loop until ctx is canceled.

Two triggers drive work:

  • syncSignal: fires after a successful ledger pull. Only drains the existing queue — does NOT trigger detection scans (syncs are too frequent for doctor-style work).
  • doctorTicker: fires every doctorInterval (5m). Runs detect+process. This is the sole trigger for detection scans, catching both newly-synced incomplete sessions and time-based conditions (e.g., stale recordings with dead parent PIDs).

func (*Manager) Status

func (m *Manager) Status() AgentWorkStatus

Status returns a point-in-time snapshot of the manager for IPC status responses.

type MockRunner

type MockRunner struct {
	RunFunc func(ctx context.Context, req RunRequest) (*RunResult, error)
	// contains filtered or unexported fields
}

MockRunner is a test double for the Runner interface.

func NewMockRunner

func NewMockRunner(available bool) *MockRunner

NewMockRunner creates a MockRunner that reports itself as available.

func (*MockRunner) Available

func (m *MockRunner) Available() bool

func (*MockRunner) Run

func (m *MockRunner) Run(ctx context.Context, req RunRequest) (*RunResult, error)

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter implements a simple token-bucket rate limiter that refills at the start of each window. It is safe for concurrent use.

func NewRateLimiter

func NewRateLimiter(maxTokens int, window time.Duration) *RateLimiter

NewRateLimiter creates a rate limiter that allows maxTokens calls per window.

func (*RateLimiter) Allow

func (r *RateLimiter) Allow() bool

Allow consumes one token and returns true, or returns false if the bucket is exhausted for the current window.

func (*RateLimiter) Remaining

func (r *RateLimiter) Remaining() int

Remaining returns how many tokens are left in the current window. The window resets automatically if it has expired.

type RunRequest

type RunRequest struct {
	Prompt          string
	WorkDir         string
	TimeoutOverride time.Duration
}

RunRequest describes a single agent invocation.

type RunResult

type RunResult struct {
	Output    string
	Duration  time.Duration
	ExitCode  int
	TokensIn  int // input tokens (from structured output if available)
	TokensOut int // output tokens
}

RunResult captures the outcome of an agent invocation.

type Runner

type Runner interface {
	// Run executes an agent with the given request and returns the result.
	Run(ctx context.Context, req RunRequest) (*RunResult, error)
	// Available reports whether the runner's backing agent CLI is installed and reachable.
	Available() bool
}

Runner spawns and manages agent CLI processes.

type SessionFinalizeHandler

type SessionFinalizeHandler struct {
	// contains filtered or unexported fields
}

SessionFinalizeHandler detects and finalizes incomplete sessions in the ledger. It generates missing artifacts: summary.md (via LLM), summary.json, session.html, and session.md (deterministic exports).

func NewSessionFinalizeHandler

func NewSessionFinalizeHandler(logger *slog.Logger) *SessionFinalizeHandler

NewSessionFinalizeHandler creates a handler with the given logger.

func NewSessionFinalizeHandlerForTest

func NewSessionFinalizeHandlerForTest(logger *slog.Logger) *SessionFinalizeHandler

NewSessionFinalizeHandlerForTest creates a handler with git operations disabled. Use in tests that don't have a real git repository.

func (*SessionFinalizeHandler) BuildPrompt

func (h *SessionFinalizeHandler) BuildPrompt(item *WorkItem) (RunRequest, error)

BuildPrompt reads the raw session and constructs a summarization prompt.

func (*SessionFinalizeHandler) Cleanup

func (h *SessionFinalizeHandler) Cleanup(ledgerPath string)

Cleanup performs lightweight filesystem housekeeping that doesn't require LLM processing: ghost session removal (dead PID + no data). Safe to run regardless of agent_worker.enabled. Does NOT clear .recording.json markers on sessions that have recoverable data — that's Detect()'s job when LLM processing follows.

Cleans both the ledger sessions dir and the session cache dir (where ox session list reads from). The cache dir is derived from the repoID in the ledger path.

func (*SessionFinalizeHandler) Detect

func (h *SessionFinalizeHandler) Detect(ledgerPath string) ([]*WorkItem, error)

Detect scans both the ledger and session cache for sessions missing artifacts. The cache dir is where sessions are initially recorded; the ledger is where they are pushed after finalization. Both must be scanned to catch orphans.

func (*SessionFinalizeHandler) ProcessResult

func (h *SessionFinalizeHandler) ProcessResult(item *WorkItem, result *RunResult) error

ProcessResult writes the LLM output and generates deterministic artifacts.

NOTE: this method performs git add/commit/push from the daemon, which is an intentional exception to the Daemon-CLI Git Operations Split documented in CLAUDE.md. Session finalization writes to unique, timestamped session paths with near-zero conflict risk, and must run asynchronously (no CLI available). If this architecture is rejected, the alternative is an IPC endpoint that delegates writes back to the CLI.

When triggered via async session upload (SAGEOX_ASYNC_SESSION_UPLOAD=1), content files are committed as regular files (not LFS pointers). This is acceptable for the initial rollout; LFS upload will be added to this handler as a follow-up to avoid committing large blobs to git.

func (*SessionFinalizeHandler) SetPIDLookup

func (h *SessionFinalizeHandler) SetPIDLookup(fn func(agentID string) int)

SetPIDLookup sets the function used to look up agent PIDs from daemon in-memory state. This enables PID-based liveness detection for recordings that predate the ParentPID field.

func (*SessionFinalizeHandler) SetQualityThresholds

func (h *SessionFinalizeHandler) SetQualityThresholds(upload, discard float64)

SetQualityThresholds configures the quality score thresholds for upload/discard.

func (*SessionFinalizeHandler) Type

func (h *SessionFinalizeHandler) Type() string

Type implements WorkHandler.

type SessionFinalizePayload

type SessionFinalizePayload struct {
	SessionDir string   `json:"session_dir"` // absolute path to session directory
	RawPath    string   `json:"raw_path"`    // path to raw.jsonl
	Missing    []string `json:"missing"`     // which artifacts need generation
	LedgerPath string   `json:"ledger_path"` // ledger repo root (for git operations)
	// contains filtered or unexported fields
}

SessionFinalizePayload carries per-item context through the pipeline.

type WorkHandler

type WorkHandler interface {
	// Type returns the work item type string this handler manages
	// (e.g. "session-finalize").
	Type() string

	// Detect scans a ledger path and returns zero or more work items that
	// need processing. Returning an empty slice is not an error.
	Detect(ledgerPath string) ([]*WorkItem, error)

	// BuildPrompt converts a work item into a RunRequest suitable for the
	// Runner. The handler decides prompt text, working directory, token
	// budget, and timeout.
	BuildPrompt(item *WorkItem) (RunRequest, error)

	// ProcessResult handles the agent's output after a successful run.
	// Implementations typically persist derived artifacts back to the ledger.
	ProcessResult(item *WorkItem, result *RunResult) error
}

WorkHandler defines how a specific work type is detected, prompted, and post-processed. Each handler is responsible for a single WorkItem.Type.

type WorkItem

type WorkItem struct {
	ID        string // UUIDv7
	Type      string // e.g. "session-finalize"
	Priority  int    // lower = higher priority
	Payload   any    // type-specific data
	CreatedAt time.Time
	Attempts  int
	LastErr   string
	DedupKey  string // only one item per dedup key at a time
	// contains filtered or unexported fields
}

WorkItem represents a unit of background agent work.

type WorkQueue

type WorkQueue struct {
	// contains filtered or unexported fields
}

WorkQueue is a thread-safe, priority-ordered work queue with deduplication. It tracks both queued and in-progress items by dedup key so that the same logical work unit cannot be enqueued twice concurrently. The queue is capped at maxQueueDepth; excess items are rejected and re-detected on the next sync cycle.

func NewWorkQueue

func NewWorkQueue(logger *slog.Logger) *WorkQueue

NewWorkQueue creates a ready-to-use WorkQueue.

func (*WorkQueue) Complete

func (q *WorkQueue) Complete(dedupKey string)

Complete marks a dedup key as no longer in-progress, allowing the same key to be enqueued again in the future.

func (*WorkQueue) Dequeue

func (q *WorkQueue) Dequeue() *WorkItem

Dequeue removes and returns the highest-priority item, or nil if the queue is empty. The item's dedup key moves from the queued set to the in-progress set.

func (*WorkQueue) Enqueue

func (q *WorkQueue) Enqueue(item *WorkItem) bool

Enqueue adds an item to the queue. It returns false without enqueuing if the dedup key is already present in the queue or in-progress set. The item's ID and CreatedAt are populated automatically if empty.

func (*WorkQueue) InProgress

func (q *WorkQueue) InProgress(dedupKey string) bool

InProgress reports whether the given dedup key is currently being processed.

func (*WorkQueue) Len

func (q *WorkQueue) Len() int

Len returns the number of items waiting in the queue (excludes in-progress).

func (*WorkQueue) Requeue

func (q *WorkQueue) Requeue(item *WorkItem) bool

Requeue atomically moves a dedup key from in-progress back to queued. This prevents a race where a concurrent detectAndEnqueue could see the key absent from both sets and enqueue a duplicate.

type WorkResult

type WorkResult struct {
	Item     *WorkItem
	Success  bool
	Output   string
	Duration time.Duration
	Error    string
}

WorkResult captures the outcome of processing a work item.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL