daemon

package
v0.20.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2026 License: MIT Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultPollInterval = 2 * time.Second

DefaultPollInterval is the default polling interval for WaitForReview. Tests can override this to speed up polling-based tests.

Functions

func CleanupZombieDaemons added in v0.16.3

func CleanupZombieDaemons() int

CleanupZombieDaemons finds and kills all unresponsive daemons. Returns the number of zombies cleaned up.

func DefaultErrorLogPath

func DefaultErrorLogPath() string

DefaultErrorLogPath returns the default path for the error log

func FindAvailablePort

func FindAvailablePort(startAddr string) (string, int, error)

FindAvailablePort finds an available port starting from the configured port. After zombie cleanup, this should usually succeed on the first try. Falls back to searching if the port is still in use (e.g., by another service).

func IsDaemonAlive added in v0.16.3

func IsDaemonAlive(addr string) bool

IsDaemonAlive checks if a daemon at the given address is actually responding. This is more reliable than checking PID and works cross-platform. Only allows loopback addresses to prevent SSRF via malicious runtime files. Uses retry logic to avoid misclassifying a slow or transiently failing daemon.

func KillDaemon added in v0.16.3

func KillDaemon(info *RuntimeInfo) bool

KillDaemon attempts to gracefully shut down a daemon, then force kill if needed. Returns true if the daemon was killed or is no longer running. Only removes runtime file if the daemon is confirmed dead.

func LegacyRuntimePath added in v0.16.3

func LegacyRuntimePath() string

LegacyRuntimePath returns the old daemon.json path for migration

func RemoveRuntime

func RemoveRuntime()

RemoveRuntime removes the runtime info file for the current process

func RemoveRuntimeForPID added in v0.16.3

func RemoveRuntimeForPID(pid int)

RemoveRuntimeForPID removes the runtime info file for a specific PID

func RuntimePath

func RuntimePath() string

RuntimePath returns the path to the runtime info file for the current process

func RuntimePathForPID added in v0.16.3

func RuntimePathForPID(pid int) string

RuntimePathForPID returns the path to the runtime info file for a specific PID

func WriteRuntime

func WriteRuntime(addr string, port int, version string) error

WriteRuntime saves the daemon runtime info atomically. Uses write-to-temp-then-rename to prevent readers from seeing partial writes.

Types

type AddCommentRequest added in v0.17.0

type AddCommentRequest struct {
	SHA       string `json:"sha,omitempty"`    // Legacy: link to commit by SHA
	JobID     int64  `json:"job_id,omitempty"` // Preferred: link to job
	Commenter string `json:"commenter"`
	Comment   string `json:"comment"`
}

type AddressReviewRequest

type AddressReviewRequest struct {
	JobID     int64 `json:"job_id"`
	Addressed bool  `json:"addressed"`
}

type Broadcaster

type Broadcaster interface {
	Subscribe(repoPath string) (int, <-chan Event)
	Unsubscribe(id int)
	Broadcast(event Event)
	SubscriberCount() int
}

Broadcaster interface manages event subscriptions and broadcasting

func NewBroadcaster

func NewBroadcaster() Broadcaster

NewBroadcaster creates a new event broadcaster

type CancelJobRequest

type CancelJobRequest struct {
	JobID int64 `json:"job_id"`
}

type Client

type Client interface {
	// GetReviewBySHA retrieves a review by commit SHA
	GetReviewBySHA(sha string) (*storage.Review, error)

	// GetReviewByJobID retrieves a review by job ID
	GetReviewByJobID(jobID int64) (*storage.Review, error)

	// MarkReviewAddressed marks a review as addressed by job ID
	MarkReviewAddressed(jobID int64) error

	// AddComment adds a comment to a job
	AddComment(jobID int64, commenter, comment string) error

	// EnqueueReview enqueues a review job and returns the job ID
	EnqueueReview(repoPath, gitRef, agentName string) (int64, error)

	// WaitForReview waits for a job to complete and returns the review
	WaitForReview(jobID int64) (*storage.Review, error)

	// FindJobForCommit finds a job for a specific commit in a repo
	FindJobForCommit(repoPath, sha string) (*storage.ReviewJob, error)

	// FindPendingJobForRef finds a queued or running job for any git ref
	FindPendingJobForRef(repoPath, gitRef string) (*storage.ReviewJob, error)

	// GetCommentsForJob fetches comments for a job
	GetCommentsForJob(jobID int64) ([]storage.Response, error)
}

Client provides an interface for interacting with the roborev daemon. This abstraction allows for easy mocking in tests.

type ConfigGetter added in v0.15.0

type ConfigGetter interface {
	Config() *config.Config
}

ConfigGetter provides access to the current config

type ConfigWatcher added in v0.15.0

type ConfigWatcher struct {
	// contains filtered or unexported fields
}

ConfigWatcher watches config.toml for changes and reloads configuration.

Hot-reloadable settings take effect immediately: default_agent, job_timeout, allow_unsafe_agents, anthropic_api_key, review_context_count.

Settings requiring restart: server_addr, max_workers, sync section. These are read at startup and the running values are preserved even if the config file changes. CLI flag overrides (--addr, --workers) only apply to restart-required settings, so they remain in effect for the daemon's lifetime. The config object may show file values after reload, but the actual running server address and worker pool size are fixed at startup.

Note: ConfigWatcher is not restart-safe. Once Stop() is called, Start() will return an error. Create a new ConfigWatcher instance if restart is needed.

func NewConfigWatcher added in v0.15.0

func NewConfigWatcher(configPath string, cfg *config.Config, broadcaster Broadcaster) *ConfigWatcher

NewConfigWatcher creates a new config watcher

func (*ConfigWatcher) Config added in v0.15.0

func (cw *ConfigWatcher) Config() *config.Config

Config returns the current config with read lock

func (*ConfigWatcher) LastReloadedAt added in v0.15.0

func (cw *ConfigWatcher) LastReloadedAt() time.Time

LastReloadedAt returns the time of the last successful config reload

func (*ConfigWatcher) ReloadCounter added in v0.15.1

func (cw *ConfigWatcher) ReloadCounter() uint64

ReloadCounter returns a monotonic counter incremented on each reload. Use this instead of timestamp comparison to detect reloads that happen within the same second.

func (*ConfigWatcher) Start added in v0.15.0

func (cw *ConfigWatcher) Start(ctx context.Context) error

Start begins watching the config file for changes. Returns an error if the watcher has already been stopped.

func (*ConfigWatcher) Stop added in v0.15.0

func (cw *ConfigWatcher) Stop()

Stop stops the config watcher. Safe to call multiple times.

type EnqueueRequest

type EnqueueRequest struct {
	RepoPath     string `json:"repo_path"`
	CommitSHA    string `json:"commit_sha,omitempty"` // Single commit (for backwards compat)
	GitRef       string `json:"git_ref,omitempty"`    // Single commit, range like "abc..def", or "dirty"
	Branch       string `json:"branch,omitempty"`     // Branch name at time of job creation
	Agent        string `json:"agent,omitempty"`
	Model        string `json:"model,omitempty"`         // Model to use (for opencode: provider/model format)
	DiffContent  string `json:"diff_content,omitempty"`  // Pre-captured diff for dirty reviews
	Reasoning    string `json:"reasoning,omitempty"`     // Reasoning level: thorough, standard, fast
	CustomPrompt string `json:"custom_prompt,omitempty"` // Custom prompt for ad-hoc agent work
	Agentic      bool   `json:"agentic,omitempty"`       // Enable agentic mode (allow file edits)
	OutputPrefix string `json:"output_prefix,omitempty"` // Prefix to prepend to review output
}

type ErrorEntry

type ErrorEntry struct {
	Timestamp time.Time `json:"ts"`
	Level     string    `json:"level"`     // "error", "warn"
	Component string    `json:"component"` // "worker", "sync", "server"
	Message   string    `json:"message"`
	JobID     int64     `json:"job_id,omitempty"`
}

ErrorEntry represents a single error log entry

type ErrorLog

type ErrorLog struct {
	// contains filtered or unexported fields
}

ErrorLog manages error logging to a file and maintains an in-memory ring buffer

func NewErrorLog

func NewErrorLog(path string) (*ErrorLog, error)

NewErrorLog creates a new error log writer

func (*ErrorLog) Close

func (e *ErrorLog) Close() error

Close closes the error log file

func (*ErrorLog) Count24h

func (e *ErrorLog) Count24h() int

Count24h returns the count of errors in the last 24 hours from the in-memory buffer. Note: This only counts up to maxRecent (100) entries. If error volume is high, the actual 24h count may be higher. For precise counts, parse the log file.

func (*ErrorLog) Log

func (e *ErrorLog) Log(level, component, message string, jobID int64)

Log writes an error entry to both file and in-memory buffer

func (*ErrorLog) LogError

func (e *ErrorLog) LogError(component, message string, jobID int64)

LogError is a convenience method for logging errors

func (*ErrorLog) LogWarn

func (e *ErrorLog) LogWarn(component, message string, jobID int64)

LogWarn is a convenience method for logging warnings

func (*ErrorLog) Recent

func (e *ErrorLog) Recent() []ErrorEntry

Recent returns the most recent error entries (newest first)

func (*ErrorLog) RecentN

func (e *ErrorLog) RecentN(n int) []ErrorEntry

RecentN returns up to n most recent error entries (newest first)

type ErrorResponse

type ErrorResponse struct {
	Error string `json:"error"`
}

type Event

type Event struct {
	Type     string    `json:"type"`
	TS       time.Time `json:"ts"`
	JobID    int64     `json:"job_id"`
	Repo     string    `json:"repo"`
	RepoName string    `json:"repo_name"`
	SHA      string    `json:"sha"`
	Agent    string    `json:"agent,omitempty"`
	Verdict  string    `json:"verdict,omitempty"`
	Error    string    `json:"error,omitempty"`
}

Event represents a review event that can be broadcast

func (Event) MarshalJSON

func (e Event) MarshalJSON() ([]byte, error)

MarshalJSON converts an Event to JSON for streaming

type EventBroadcaster

type EventBroadcaster struct {
	// contains filtered or unexported fields
}

EventBroadcaster implements the Broadcaster interface

func (*EventBroadcaster) Broadcast

func (b *EventBroadcaster) Broadcast(event Event)

Broadcast sends an event to all matching subscribers Non-blocking: if a subscriber's channel is full, the event is dropped for that subscriber

func (*EventBroadcaster) Subscribe

func (b *EventBroadcaster) Subscribe(repoPath string) (int, <-chan Event)

Subscribe adds a new subscriber with optional repo filter Returns a subscriber ID and event channel

func (*EventBroadcaster) SubscriberCount

func (b *EventBroadcaster) SubscriberCount() int

SubscriberCount returns the current number of subscribers (for testing)

func (*EventBroadcaster) Unsubscribe

func (b *EventBroadcaster) Unsubscribe(id int)

Unsubscribe removes a subscriber and closes its channel

type HTTPClient

type HTTPClient struct {
	// contains filtered or unexported fields
}

HTTPClient is the default HTTP-based implementation of Client

func NewHTTPClient

func NewHTTPClient(addr string) *HTTPClient

NewHTTPClient creates a new HTTP daemon client

func NewHTTPClientFromRuntime

func NewHTTPClientFromRuntime() (*HTTPClient, error)

NewHTTPClientFromRuntime creates an HTTP client using daemon runtime info

func (*HTTPClient) AddComment added in v0.17.0

func (c *HTTPClient) AddComment(jobID int64, commenter, comment string) error

func (*HTTPClient) EnqueueReview

func (c *HTTPClient) EnqueueReview(repoPath, gitRef, agentName string) (int64, error)

func (*HTTPClient) FindJobForCommit

func (c *HTTPClient) FindJobForCommit(repoPath, sha string) (*storage.ReviewJob, error)

func (*HTTPClient) FindPendingJobForRef

func (c *HTTPClient) FindPendingJobForRef(repoPath, gitRef string) (*storage.ReviewJob, error)

func (*HTTPClient) GetCommentsForJob added in v0.17.0

func (c *HTTPClient) GetCommentsForJob(jobID int64) ([]storage.Response, error)

func (*HTTPClient) GetReviewByJobID

func (c *HTTPClient) GetReviewByJobID(jobID int64) (*storage.Review, error)

func (*HTTPClient) GetReviewBySHA

func (c *HTTPClient) GetReviewBySHA(sha string) (*storage.Review, error)

func (*HTTPClient) MarkReviewAddressed

func (c *HTTPClient) MarkReviewAddressed(jobID int64) error

func (*HTTPClient) SetPollInterval

func (c *HTTPClient) SetPollInterval(interval time.Duration)

SetPollInterval sets the polling interval for WaitForReview

func (*HTTPClient) WaitForReview

func (c *HTTPClient) WaitForReview(jobID int64) (*storage.Review, error)

type JobOutput added in v0.18.0

type JobOutput struct {
	// contains filtered or unexported fields
}

JobOutput stores output for a single job

type JobOutputResponse added in v0.18.0

type JobOutputResponse struct {
	JobID   int64        `json:"job_id"`
	Status  string       `json:"status"`
	Lines   []OutputLine `json:"lines"`
	HasMore bool         `json:"has_more"`
}

JobOutputResponse is the response for /api/job/output

type OutputBuffer added in v0.18.0

type OutputBuffer struct {
	// contains filtered or unexported fields
}

OutputBuffer stores streaming output for running jobs with memory limits.

func NewOutputBuffer added in v0.18.0

func NewOutputBuffer(maxPerJob, maxTotal int) *OutputBuffer

NewOutputBuffer creates a new output buffer with the given limits.

func (*OutputBuffer) Append added in v0.18.0

func (ob *OutputBuffer) Append(jobID int64, line OutputLine)

Append adds a line to the job's output buffer.

func (*OutputBuffer) CloseJob added in v0.18.0

func (ob *OutputBuffer) CloseJob(jobID int64)

CloseJob marks a job as complete and removes its buffer.

func (*OutputBuffer) GetLines added in v0.18.0

func (ob *OutputBuffer) GetLines(jobID int64) []OutputLine

GetLines returns all lines for a job.

func (*OutputBuffer) IsActive added in v0.18.0

func (ob *OutputBuffer) IsActive(jobID int64) bool

IsActive returns true if there's an active buffer for this job.

func (*OutputBuffer) Subscribe added in v0.18.0

func (ob *OutputBuffer) Subscribe(jobID int64) ([]OutputLine, <-chan OutputLine, func())

Subscribe returns existing lines and a channel for new lines. Call the returned cancel function when done.

func (*OutputBuffer) Writer added in v0.18.0

func (ob *OutputBuffer) Writer(jobID int64, normalize OutputNormalizer) *outputWriter

Writer returns an io.Writer that normalizes and stores output for a job. Lines exceeding maxPerJob will be truncated to prevent unbounded buffer growth.

type OutputLine added in v0.18.0

type OutputLine struct {
	Timestamp time.Time `json:"ts"`
	Text      string    `json:"text"`
	Type      string    `json:"line_type"` // "text", "tool", "thinking", "error"
}

OutputLine represents a single line of normalized output

func NormalizeClaudeOutput added in v0.18.0

func NormalizeClaudeOutput(line string) *OutputLine

NormalizeClaudeOutput parses Claude's stream-json format and extracts readable content.

func NormalizeGenericOutput added in v0.18.0

func NormalizeGenericOutput(line string) *OutputLine

NormalizeGenericOutput is the default normalizer for other agents.

func NormalizeOpenCodeOutput added in v0.18.0

func NormalizeOpenCodeOutput(line string) *OutputLine

NormalizeOpenCodeOutput normalizes OpenCode output (plain text with ANSI codes).

type OutputNormalizer added in v0.18.0

type OutputNormalizer func(line string) *OutputLine

OutputNormalizer converts agent-specific output to normalized OutputLines.

func GetNormalizer added in v0.18.0

func GetNormalizer(agentName string) OutputNormalizer

GetNormalizer returns the appropriate normalizer for an agent.

type RerunJobRequest

type RerunJobRequest struct {
	JobID int64 `json:"job_id"`
}

type RuntimeInfo

type RuntimeInfo struct {
	PID        int    `json:"pid"`
	Addr       string `json:"addr"`
	Port       int    `json:"port"`
	Version    string `json:"version"`
	SourcePath string `json:"-"` // Path to the runtime file (not serialized, set by ListAllRuntimes)
}

RuntimeInfo stores daemon runtime state

func GetAnyRunningDaemon added in v0.16.3

func GetAnyRunningDaemon() (*RuntimeInfo, error)

GetAnyRunningDaemon returns info about a responsive daemon. Returns os.ErrNotExist if no responsive daemon is found.

func ListAllRuntimes added in v0.16.3

func ListAllRuntimes() ([]*RuntimeInfo, error)

ListAllRuntimes returns info for all daemon runtime files found. Sets SourcePath on each RuntimeInfo for proper cleanup. Continues scanning even if some files are unreadable (e.g., permission errors).

func ReadRuntime

func ReadRuntime() (*RuntimeInfo, error)

ReadRuntime reads the daemon runtime info for the current process

func ReadRuntimeForPID added in v0.16.3

func ReadRuntimeForPID(pid int) (*RuntimeInfo, error)

ReadRuntimeForPID reads the daemon runtime info for a specific PID

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the HTTP API server for the daemon

func NewServer

func NewServer(db *storage.DB, cfg *config.Config, configPath string) *Server

NewServer creates a new daemon server

func (*Server) SetSyncWorker

func (s *Server) SetSyncWorker(sw *storage.SyncWorker)

SetSyncWorker sets the sync worker for triggering manual syncs

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

Start begins the server and worker pool

func (*Server) Stop

func (s *Server) Stop() error

Stop gracefully shuts down the server

type StaticConfig added in v0.15.0

type StaticConfig struct {
	// contains filtered or unexported fields
}

StaticConfig wraps a config for use without hot-reloading (e.g., in tests)

func NewStaticConfig added in v0.15.0

func NewStaticConfig(cfg *config.Config) *StaticConfig

NewStaticConfig creates a ConfigGetter that always returns the same config

func (*StaticConfig) Config added in v0.15.0

func (sc *StaticConfig) Config() *config.Config

Config returns the static config

type Subscriber

type Subscriber struct {
	ID       int
	RepoPath string // Filter: only send events for this repo (empty = all)
	Ch       chan Event
}

Subscriber represents a client subscribed to events

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages a pool of review workers

func NewWorkerPool

func NewWorkerPool(db *storage.DB, cfgGetter ConfigGetter, numWorkers int, broadcaster Broadcaster, errorLog *ErrorLog) *WorkerPool

NewWorkerPool creates a new worker pool

func (*WorkerPool) ActiveWorkers

func (wp *WorkerPool) ActiveWorkers() int

ActiveWorkers returns the number of currently active workers

func (*WorkerPool) CancelJob

func (wp *WorkerPool) CancelJob(jobID int64) bool

CancelJob cancels a running job by its ID, killing the subprocess. Returns true if the job was canceled or marked for pending cancellation. Returns false only if the job doesn't exist or isn't in a cancellable state.

func (*WorkerPool) GetJobOutput added in v0.18.0

func (wp *WorkerPool) GetJobOutput(jobID int64) []OutputLine

GetJobOutput returns the current output lines for a job.

func (*WorkerPool) HasJobOutput added in v0.18.0

func (wp *WorkerPool) HasJobOutput(jobID int64) bool

HasJobOutput returns true if there's active output capture for a job.

func (*WorkerPool) MaxWorkers added in v0.15.0

func (wp *WorkerPool) MaxWorkers() int

MaxWorkers returns the total number of workers in the pool

func (*WorkerPool) Start

func (wp *WorkerPool) Start()

Start begins the worker pool

func (*WorkerPool) Stop

func (wp *WorkerPool) Stop()

Stop gracefully shuts down the worker pool

func (*WorkerPool) SubscribeJobOutput added in v0.18.0

func (wp *WorkerPool) SubscribeJobOutput(jobID int64) ([]OutputLine, <-chan OutputLine, func())

SubscribeJobOutput returns initial lines and a channel for new output. Call cancel when done to unsubscribe.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL