Documentation
¶
Index ¶
- Variables
- func CleanupZombieDaemons() int
- func DefaultErrorLogPath() string
- func FindAvailablePort(startAddr string) (string, int, error)
- func IsDaemonAlive(addr string) bool
- func KillDaemon(info *RuntimeInfo) bool
- func LegacyRuntimePath() string
- func RemoveRuntime()
- func RemoveRuntimeForPID(pid int)
- func RuntimePath() string
- func RuntimePathForPID(pid int) string
- func WriteRuntime(addr string, port int, version string) error
- type AddCommentRequest
- type AddressReviewRequest
- type Broadcaster
- type CancelJobRequest
- type Client
- type ConfigGetter
- type ConfigWatcher
- type EnqueueRequest
- type ErrorEntry
- type ErrorLog
- func (e *ErrorLog) Close() error
- func (e *ErrorLog) Count24h() int
- func (e *ErrorLog) Log(level, component, message string, jobID int64)
- func (e *ErrorLog) LogError(component, message string, jobID int64)
- func (e *ErrorLog) LogWarn(component, message string, jobID int64)
- func (e *ErrorLog) Recent() []ErrorEntry
- func (e *ErrorLog) RecentN(n int) []ErrorEntry
- type ErrorResponse
- type Event
- type EventBroadcaster
- type HTTPClient
- func (c *HTTPClient) AddComment(jobID int64, commenter, comment string) error
- func (c *HTTPClient) EnqueueReview(repoPath, gitRef, agentName string) (int64, error)
- func (c *HTTPClient) FindJobForCommit(repoPath, sha string) (*storage.ReviewJob, error)
- func (c *HTTPClient) FindPendingJobForRef(repoPath, gitRef string) (*storage.ReviewJob, error)
- func (c *HTTPClient) GetCommentsForJob(jobID int64) ([]storage.Response, error)
- func (c *HTTPClient) GetReviewByJobID(jobID int64) (*storage.Review, error)
- func (c *HTTPClient) GetReviewBySHA(sha string) (*storage.Review, error)
- func (c *HTTPClient) MarkReviewAddressed(reviewID int64) error
- func (c *HTTPClient) SetPollInterval(interval time.Duration)
- func (c *HTTPClient) WaitForReview(jobID int64) (*storage.Review, error)
- type RerunJobRequest
- type RuntimeInfo
- type Server
- type StaticConfig
- type Subscriber
- type WorkerPool
Constants ¶
This section is empty.
Variables ¶
var DefaultPollInterval = 2 * time.Second
DefaultPollInterval is the default polling interval for WaitForReview. Tests can override this to speed up polling-based tests.
Functions ¶
func CleanupZombieDaemons ¶ added in v0.16.3
func CleanupZombieDaemons() int
CleanupZombieDaemons finds and kills all unresponsive daemons. Returns the number of zombies cleaned up.
func DefaultErrorLogPath ¶
func DefaultErrorLogPath() string
DefaultErrorLogPath returns the default path for the error log
func FindAvailablePort ¶
FindAvailablePort finds an available port starting from the configured port. After zombie cleanup, this should usually succeed on the first try. Falls back to searching if the port is still in use (e.g., by another service).
func IsDaemonAlive ¶ added in v0.16.3
IsDaemonAlive checks if a daemon at the given address is actually responding. This is more reliable than checking PID and works cross-platform. Only allows loopback addresses to prevent SSRF via malicious runtime files. Uses retry logic to avoid misclassifying a slow or transiently failing daemon.
func KillDaemon ¶ added in v0.16.3
func KillDaemon(info *RuntimeInfo) bool
KillDaemon attempts to gracefully shut down a daemon, then force kill if needed. Returns true if the daemon was killed or is no longer running. Only removes runtime file if the daemon is confirmed dead.
func LegacyRuntimePath ¶ added in v0.16.3
func LegacyRuntimePath() string
LegacyRuntimePath returns the old daemon.json path for migration
func RemoveRuntime ¶
func RemoveRuntime()
RemoveRuntime removes the runtime info file for the current process
func RemoveRuntimeForPID ¶ added in v0.16.3
func RemoveRuntimeForPID(pid int)
RemoveRuntimeForPID removes the runtime info file for a specific PID
func RuntimePath ¶
func RuntimePath() string
RuntimePath returns the path to the runtime info file for the current process
func RuntimePathForPID ¶ added in v0.16.3
RuntimePathForPID returns the path to the runtime info file for a specific PID
Types ¶
type AddCommentRequest ¶ added in v0.17.0
type AddressReviewRequest ¶
type Broadcaster ¶
type Broadcaster interface {
Subscribe(repoPath string) (int, <-chan Event)
Unsubscribe(id int)
Broadcast(event Event)
SubscriberCount() int
}
Broadcaster interface manages event subscriptions and broadcasting
func NewBroadcaster ¶
func NewBroadcaster() Broadcaster
NewBroadcaster creates a new event broadcaster
type CancelJobRequest ¶
type CancelJobRequest struct {
JobID int64 `json:"job_id"`
}
type Client ¶
type Client interface {
// GetReviewBySHA retrieves a review by commit SHA
GetReviewBySHA(sha string) (*storage.Review, error)
// GetReviewByJobID retrieves a review by job ID
GetReviewByJobID(jobID int64) (*storage.Review, error)
// MarkReviewAddressed marks a review as addressed
MarkReviewAddressed(reviewID int64) error
// AddComment adds a comment to a job
AddComment(jobID int64, commenter, comment string) error
// EnqueueReview enqueues a review job and returns the job ID
EnqueueReview(repoPath, gitRef, agentName string) (int64, error)
// WaitForReview waits for a job to complete and returns the review
WaitForReview(jobID int64) (*storage.Review, error)
// FindJobForCommit finds a job for a specific commit in a repo
FindJobForCommit(repoPath, sha string) (*storage.ReviewJob, error)
// FindPendingJobForRef finds a queued or running job for any git ref
FindPendingJobForRef(repoPath, gitRef string) (*storage.ReviewJob, error)
// GetCommentsForJob fetches comments for a job
GetCommentsForJob(jobID int64) ([]storage.Response, error)
}
Client provides an interface for interacting with the roborev daemon. This abstraction allows for easy mocking in tests.
type ConfigGetter ¶ added in v0.15.0
ConfigGetter provides access to the current config
type ConfigWatcher ¶ added in v0.15.0
type ConfigWatcher struct {
// contains filtered or unexported fields
}
ConfigWatcher watches config.toml for changes and reloads configuration.
Hot-reloadable settings take effect immediately: default_agent, job_timeout, allow_unsafe_agents, anthropic_api_key, review_context_count.
Settings requiring restart: server_addr, max_workers, sync section. These are read at startup and the running values are preserved even if the config file changes. CLI flag overrides (--addr, --workers) only apply to restart-required settings, so they remain in effect for the daemon's lifetime. The config object may show file values after reload, but the actual running server address and worker pool size are fixed at startup.
Note: ConfigWatcher is not restart-safe. Once Stop() is called, Start() will return an error. Create a new ConfigWatcher instance if restart is needed.
func NewConfigWatcher ¶ added in v0.15.0
func NewConfigWatcher(configPath string, cfg *config.Config, broadcaster Broadcaster) *ConfigWatcher
NewConfigWatcher creates a new config watcher
func (*ConfigWatcher) Config ¶ added in v0.15.0
func (cw *ConfigWatcher) Config() *config.Config
Config returns the current config with read lock
func (*ConfigWatcher) LastReloadedAt ¶ added in v0.15.0
func (cw *ConfigWatcher) LastReloadedAt() time.Time
LastReloadedAt returns the time of the last successful config reload
func (*ConfigWatcher) ReloadCounter ¶ added in v0.15.1
func (cw *ConfigWatcher) ReloadCounter() uint64
ReloadCounter returns a monotonic counter incremented on each reload. Use this instead of timestamp comparison to detect reloads that happen within the same second.
func (*ConfigWatcher) Start ¶ added in v0.15.0
func (cw *ConfigWatcher) Start(ctx context.Context) error
Start begins watching the config file for changes. Returns an error if the watcher has already been stopped.
func (*ConfigWatcher) Stop ¶ added in v0.15.0
func (cw *ConfigWatcher) Stop()
Stop stops the config watcher. Safe to call multiple times.
type EnqueueRequest ¶
type EnqueueRequest struct {
RepoPath string `json:"repo_path"`
CommitSHA string `json:"commit_sha,omitempty"` // Single commit (for backwards compat)
GitRef string `json:"git_ref,omitempty"` // Single commit, range like "abc..def", or "dirty"
Agent string `json:"agent,omitempty"`
Model string `json:"model,omitempty"` // Model to use (for opencode: provider/model format)
DiffContent string `json:"diff_content,omitempty"` // Pre-captured diff for dirty reviews
Reasoning string `json:"reasoning,omitempty"` // Reasoning level: thorough, standard, fast
CustomPrompt string `json:"custom_prompt,omitempty"` // Custom prompt for ad-hoc agent work
Agentic bool `json:"agentic,omitempty"` // Enable agentic mode (allow file edits)
}
type ErrorEntry ¶
type ErrorEntry struct {
Timestamp time.Time `json:"ts"`
Level string `json:"level"` // "error", "warn"
Component string `json:"component"` // "worker", "sync", "server"
Message string `json:"message"`
JobID int64 `json:"job_id,omitempty"`
}
ErrorEntry represents a single error log entry
type ErrorLog ¶
type ErrorLog struct {
// contains filtered or unexported fields
}
ErrorLog manages error logging to a file and maintains an in-memory ring buffer
func NewErrorLog ¶
NewErrorLog creates a new error log writer
func (*ErrorLog) Count24h ¶
Count24h returns the count of errors in the last 24 hours from the in-memory buffer. Note: This only counts up to maxRecent (100) entries. If error volume is high, the actual 24h count may be higher. For precise counts, parse the log file.
func (*ErrorLog) Recent ¶
func (e *ErrorLog) Recent() []ErrorEntry
Recent returns the most recent error entries (newest first)
func (*ErrorLog) RecentN ¶
func (e *ErrorLog) RecentN(n int) []ErrorEntry
RecentN returns up to n most recent error entries (newest first)
type ErrorResponse ¶
type ErrorResponse struct {
Error string `json:"error"`
}
type Event ¶
type Event struct {
Type string `json:"type"`
TS time.Time `json:"ts"`
JobID int64 `json:"job_id"`
Repo string `json:"repo"`
RepoName string `json:"repo_name"`
SHA string `json:"sha"`
Agent string `json:"agent,omitempty"`
Verdict string `json:"verdict,omitempty"`
Error string `json:"error,omitempty"`
}
Event represents a review event that can be broadcast
func (Event) MarshalJSON ¶
MarshalJSON converts an Event to JSON for streaming
type EventBroadcaster ¶
type EventBroadcaster struct {
// contains filtered or unexported fields
}
EventBroadcaster implements the Broadcaster interface
func (*EventBroadcaster) Broadcast ¶
func (b *EventBroadcaster) Broadcast(event Event)
Broadcast sends an event to all matching subscribers Non-blocking: if a subscriber's channel is full, the event is dropped for that subscriber
func (*EventBroadcaster) Subscribe ¶
func (b *EventBroadcaster) Subscribe(repoPath string) (int, <-chan Event)
Subscribe adds a new subscriber with optional repo filter Returns a subscriber ID and event channel
func (*EventBroadcaster) SubscriberCount ¶
func (b *EventBroadcaster) SubscriberCount() int
SubscriberCount returns the current number of subscribers (for testing)
func (*EventBroadcaster) Unsubscribe ¶
func (b *EventBroadcaster) Unsubscribe(id int)
Unsubscribe removes a subscriber and closes its channel
type HTTPClient ¶
type HTTPClient struct {
// contains filtered or unexported fields
}
HTTPClient is the default HTTP-based implementation of Client
func NewHTTPClient ¶
func NewHTTPClient(addr string) *HTTPClient
NewHTTPClient creates a new HTTP daemon client
func NewHTTPClientFromRuntime ¶
func NewHTTPClientFromRuntime() (*HTTPClient, error)
NewHTTPClientFromRuntime creates an HTTP client using daemon runtime info
func (*HTTPClient) AddComment ¶ added in v0.17.0
func (c *HTTPClient) AddComment(jobID int64, commenter, comment string) error
func (*HTTPClient) EnqueueReview ¶
func (c *HTTPClient) EnqueueReview(repoPath, gitRef, agentName string) (int64, error)
func (*HTTPClient) FindJobForCommit ¶
func (c *HTTPClient) FindJobForCommit(repoPath, sha string) (*storage.ReviewJob, error)
func (*HTTPClient) FindPendingJobForRef ¶
func (c *HTTPClient) FindPendingJobForRef(repoPath, gitRef string) (*storage.ReviewJob, error)
func (*HTTPClient) GetCommentsForJob ¶ added in v0.17.0
func (c *HTTPClient) GetCommentsForJob(jobID int64) ([]storage.Response, error)
func (*HTTPClient) GetReviewByJobID ¶
func (c *HTTPClient) GetReviewByJobID(jobID int64) (*storage.Review, error)
func (*HTTPClient) GetReviewBySHA ¶
func (c *HTTPClient) GetReviewBySHA(sha string) (*storage.Review, error)
func (*HTTPClient) MarkReviewAddressed ¶
func (c *HTTPClient) MarkReviewAddressed(reviewID int64) error
func (*HTTPClient) SetPollInterval ¶
func (c *HTTPClient) SetPollInterval(interval time.Duration)
SetPollInterval sets the polling interval for WaitForReview
func (*HTTPClient) WaitForReview ¶
func (c *HTTPClient) WaitForReview(jobID int64) (*storage.Review, error)
type RerunJobRequest ¶
type RerunJobRequest struct {
JobID int64 `json:"job_id"`
}
type RuntimeInfo ¶
type RuntimeInfo struct {
PID int `json:"pid"`
Addr string `json:"addr"`
Port int `json:"port"`
Version string `json:"version"`
SourcePath string `json:"-"` // Path to the runtime file (not serialized, set by ListAllRuntimes)
}
RuntimeInfo stores daemon runtime state
func GetAnyRunningDaemon ¶ added in v0.16.3
func GetAnyRunningDaemon() (*RuntimeInfo, error)
GetAnyRunningDaemon returns info about a responsive daemon. Returns os.ErrNotExist if no responsive daemon is found.
func ListAllRuntimes ¶ added in v0.16.3
func ListAllRuntimes() ([]*RuntimeInfo, error)
ListAllRuntimes returns info for all daemon runtime files found. Sets SourcePath on each RuntimeInfo for proper cleanup. Continues scanning even if some files are unreadable (e.g., permission errors).
func ReadRuntime ¶
func ReadRuntime() (*RuntimeInfo, error)
ReadRuntime reads the daemon runtime info for the current process
func ReadRuntimeForPID ¶ added in v0.16.3
func ReadRuntimeForPID(pid int) (*RuntimeInfo, error)
ReadRuntimeForPID reads the daemon runtime info for a specific PID
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the HTTP API server for the daemon
func (*Server) SetSyncWorker ¶
func (s *Server) SetSyncWorker(sw *storage.SyncWorker)
SetSyncWorker sets the sync worker for triggering manual syncs
type StaticConfig ¶ added in v0.15.0
type StaticConfig struct {
// contains filtered or unexported fields
}
StaticConfig wraps a config for use without hot-reloading (e.g., in tests)
func NewStaticConfig ¶ added in v0.15.0
func NewStaticConfig(cfg *config.Config) *StaticConfig
NewStaticConfig creates a ConfigGetter that always returns the same config
func (*StaticConfig) Config ¶ added in v0.15.0
func (sc *StaticConfig) Config() *config.Config
Config returns the static config
type Subscriber ¶
type Subscriber struct {
ID int
RepoPath string // Filter: only send events for this repo (empty = all)
Ch chan Event
}
Subscriber represents a client subscribed to events
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool manages a pool of review workers
func NewWorkerPool ¶
func NewWorkerPool(db *storage.DB, cfgGetter ConfigGetter, numWorkers int, broadcaster Broadcaster, errorLog *ErrorLog) *WorkerPool
NewWorkerPool creates a new worker pool
func (*WorkerPool) ActiveWorkers ¶
func (wp *WorkerPool) ActiveWorkers() int
ActiveWorkers returns the number of currently active workers
func (*WorkerPool) CancelJob ¶
func (wp *WorkerPool) CancelJob(jobID int64) bool
CancelJob cancels a running job by its ID, killing the subprocess. Returns true if the job was canceled or marked for pending cancellation. Returns false only if the job doesn't exist or isn't in a cancellable state.
func (*WorkerPool) MaxWorkers ¶ added in v0.15.0
func (wp *WorkerPool) MaxWorkers() int
MaxWorkers returns the total number of workers in the pool