daemon

package
v0.15.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2026 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultPollInterval = 2 * time.Second

DefaultPollInterval is the default polling interval for WaitForReview. Tests can override this to speed up polling-based tests.

Functions

func DefaultErrorLogPath

func DefaultErrorLogPath() string

DefaultErrorLogPath returns the default path for the error log

func FindAvailablePort

func FindAvailablePort(startAddr string) (string, int, error)

FindAvailablePort finds an available port starting from the given port

func RemoveRuntime

func RemoveRuntime()

RemoveRuntime removes the runtime info file

func RuntimePath

func RuntimePath() string

RuntimePath returns the path to the runtime info file

func WriteRuntime

func WriteRuntime(addr string, port int, version string) error

WriteRuntime saves the daemon runtime info

Types

type AddResponseRequest

type AddResponseRequest struct {
	SHA       string `json:"sha,omitempty"`    // Legacy: link to commit by SHA
	JobID     int64  `json:"job_id,omitempty"` // Preferred: link to job
	Responder string `json:"responder"`
	Response  string `json:"response"`
}

type AddressReviewRequest

type AddressReviewRequest struct {
	ReviewID  int64 `json:"review_id"`
	Addressed bool  `json:"addressed"`
}

type Broadcaster

type Broadcaster interface {
	Subscribe(repoPath string) (int, <-chan Event)
	Unsubscribe(id int)
	Broadcast(event Event)
	SubscriberCount() int
}

Broadcaster interface manages event subscriptions and broadcasting

func NewBroadcaster

func NewBroadcaster() Broadcaster

NewBroadcaster creates a new event broadcaster

type CancelJobRequest

type CancelJobRequest struct {
	JobID int64 `json:"job_id"`
}

type Client

type Client interface {
	// GetReviewBySHA retrieves a review by commit SHA
	GetReviewBySHA(sha string) (*storage.Review, error)

	// GetReviewByJobID retrieves a review by job ID
	GetReviewByJobID(jobID int64) (*storage.Review, error)

	// MarkReviewAddressed marks a review as addressed
	MarkReviewAddressed(reviewID int64) error

	// AddResponse adds a response to a job
	AddResponse(jobID int64, responder, response string) error

	// EnqueueReview enqueues a review job and returns the job ID
	EnqueueReview(repoPath, gitRef, agentName string) (int64, error)

	// WaitForReview waits for a job to complete and returns the review
	WaitForReview(jobID int64) (*storage.Review, error)

	// FindJobForCommit finds a job for a specific commit in a repo
	FindJobForCommit(repoPath, sha string) (*storage.ReviewJob, error)

	// FindPendingJobForRef finds a queued or running job for any git ref
	FindPendingJobForRef(repoPath, gitRef string) (*storage.ReviewJob, error)

	// GetResponsesForJob fetches responses for a job
	GetResponsesForJob(jobID int64) ([]storage.Response, error)
}

Client provides an interface for interacting with the roborev daemon. This abstraction allows for easy mocking in tests.

type ConfigGetter added in v0.15.0

type ConfigGetter interface {
	Config() *config.Config
}

ConfigGetter provides access to the current config

type ConfigWatcher added in v0.15.0

type ConfigWatcher struct {
	// contains filtered or unexported fields
}

ConfigWatcher watches config.toml for changes and reloads configuration.

Hot-reloadable settings take effect immediately: default_agent, job_timeout, allow_unsafe_agents, anthropic_api_key, review_context_count.

Settings requiring restart: server_addr, max_workers, sync section. These are read at startup and the running values are preserved even if the config file changes. CLI flag overrides (--addr, --workers) only apply to restart-required settings, so they remain in effect for the daemon's lifetime. The config object may show file values after reload, but the actual running server address and worker pool size are fixed at startup.

func NewConfigWatcher added in v0.15.0

func NewConfigWatcher(configPath string, cfg *config.Config, broadcaster Broadcaster) *ConfigWatcher

NewConfigWatcher creates a new config watcher

func (*ConfigWatcher) Config added in v0.15.0

func (cw *ConfigWatcher) Config() *config.Config

Config returns the current config with read lock

func (*ConfigWatcher) LastReloadedAt added in v0.15.0

func (cw *ConfigWatcher) LastReloadedAt() time.Time

LastReloadedAt returns the time of the last successful config reload

func (*ConfigWatcher) Start added in v0.15.0

func (cw *ConfigWatcher) Start(ctx context.Context) error

Start begins watching the config file for changes

func (*ConfigWatcher) Stop added in v0.15.0

func (cw *ConfigWatcher) Stop()

Stop stops the config watcher. Safe to call multiple times.

type EnqueueRequest

type EnqueueRequest struct {
	RepoPath     string `json:"repo_path"`
	CommitSHA    string `json:"commit_sha,omitempty"` // Single commit (for backwards compat)
	GitRef       string `json:"git_ref,omitempty"`    // Single commit, range like "abc..def", or "dirty"
	Agent        string `json:"agent,omitempty"`
	DiffContent  string `json:"diff_content,omitempty"`  // Pre-captured diff for dirty reviews
	Reasoning    string `json:"reasoning,omitempty"`     // Reasoning level: thorough, standard, fast
	CustomPrompt string `json:"custom_prompt,omitempty"` // Custom prompt for ad-hoc agent work
	Agentic      bool   `json:"agentic,omitempty"`       // Enable agentic mode (allow file edits)
}

type ErrorEntry

type ErrorEntry struct {
	Timestamp time.Time `json:"ts"`
	Level     string    `json:"level"`     // "error", "warn"
	Component string    `json:"component"` // "worker", "sync", "server"
	Message   string    `json:"message"`
	JobID     int64     `json:"job_id,omitempty"`
}

ErrorEntry represents a single error log entry

type ErrorLog

type ErrorLog struct {
	// contains filtered or unexported fields
}

ErrorLog manages error logging to a file and maintains an in-memory ring buffer

func NewErrorLog

func NewErrorLog(path string) (*ErrorLog, error)

NewErrorLog creates a new error log writer

func (*ErrorLog) Close

func (e *ErrorLog) Close() error

Close closes the error log file

func (*ErrorLog) Count24h

func (e *ErrorLog) Count24h() int

Count24h returns the count of errors in the last 24 hours from the in-memory buffer. Note: This only counts up to maxRecent (100) entries. If error volume is high, the actual 24h count may be higher. For precise counts, parse the log file.

func (*ErrorLog) Log

func (e *ErrorLog) Log(level, component, message string, jobID int64)

Log writes an error entry to both file and in-memory buffer

func (*ErrorLog) LogError

func (e *ErrorLog) LogError(component, message string, jobID int64)

LogError is a convenience method for logging errors

func (*ErrorLog) LogWarn

func (e *ErrorLog) LogWarn(component, message string, jobID int64)

LogWarn is a convenience method for logging warnings

func (*ErrorLog) Recent

func (e *ErrorLog) Recent() []ErrorEntry

Recent returns the most recent error entries (newest first)

func (*ErrorLog) RecentN

func (e *ErrorLog) RecentN(n int) []ErrorEntry

RecentN returns up to n most recent error entries (newest first)

type ErrorResponse

type ErrorResponse struct {
	Error string `json:"error"`
}

type Event

type Event struct {
	Type     string    `json:"type"`
	TS       time.Time `json:"ts"`
	JobID    int64     `json:"job_id"`
	Repo     string    `json:"repo"`
	RepoName string    `json:"repo_name"`
	SHA      string    `json:"sha"`
	Agent    string    `json:"agent,omitempty"`
	Verdict  string    `json:"verdict,omitempty"`
	Error    string    `json:"error,omitempty"`
}

Event represents a review event that can be broadcast

func (Event) MarshalJSON

func (e Event) MarshalJSON() ([]byte, error)

MarshalJSON converts an Event to JSON for streaming

type EventBroadcaster

type EventBroadcaster struct {
	// contains filtered or unexported fields
}

EventBroadcaster implements the Broadcaster interface

func (*EventBroadcaster) Broadcast

func (b *EventBroadcaster) Broadcast(event Event)

Broadcast sends an event to all matching subscribers Non-blocking: if a subscriber's channel is full, the event is dropped for that subscriber

func (*EventBroadcaster) Subscribe

func (b *EventBroadcaster) Subscribe(repoPath string) (int, <-chan Event)

Subscribe adds a new subscriber with optional repo filter Returns a subscriber ID and event channel

func (*EventBroadcaster) SubscriberCount

func (b *EventBroadcaster) SubscriberCount() int

SubscriberCount returns the current number of subscribers (for testing)

func (*EventBroadcaster) Unsubscribe

func (b *EventBroadcaster) Unsubscribe(id int)

Unsubscribe removes a subscriber and closes its channel

type HTTPClient

type HTTPClient struct {
	// contains filtered or unexported fields
}

HTTPClient is the default HTTP-based implementation of Client

func NewHTTPClient

func NewHTTPClient(addr string) *HTTPClient

NewHTTPClient creates a new HTTP daemon client

func NewHTTPClientFromRuntime

func NewHTTPClientFromRuntime() (*HTTPClient, error)

NewHTTPClientFromRuntime creates an HTTP client using daemon runtime info

func (*HTTPClient) AddResponse

func (c *HTTPClient) AddResponse(jobID int64, responder, response string) error

func (*HTTPClient) EnqueueReview

func (c *HTTPClient) EnqueueReview(repoPath, gitRef, agentName string) (int64, error)

func (*HTTPClient) FindJobForCommit

func (c *HTTPClient) FindJobForCommit(repoPath, sha string) (*storage.ReviewJob, error)

func (*HTTPClient) FindPendingJobForRef

func (c *HTTPClient) FindPendingJobForRef(repoPath, gitRef string) (*storage.ReviewJob, error)

func (*HTTPClient) GetResponsesForJob

func (c *HTTPClient) GetResponsesForJob(jobID int64) ([]storage.Response, error)

func (*HTTPClient) GetReviewByJobID

func (c *HTTPClient) GetReviewByJobID(jobID int64) (*storage.Review, error)

func (*HTTPClient) GetReviewBySHA

func (c *HTTPClient) GetReviewBySHA(sha string) (*storage.Review, error)

func (*HTTPClient) MarkReviewAddressed

func (c *HTTPClient) MarkReviewAddressed(reviewID int64) error

func (*HTTPClient) SetPollInterval

func (c *HTTPClient) SetPollInterval(interval time.Duration)

SetPollInterval sets the polling interval for WaitForReview

func (*HTTPClient) WaitForReview

func (c *HTTPClient) WaitForReview(jobID int64) (*storage.Review, error)

type RerunJobRequest

type RerunJobRequest struct {
	JobID int64 `json:"job_id"`
}

type RuntimeInfo

type RuntimeInfo struct {
	PID     int    `json:"pid"`
	Addr    string `json:"addr"`
	Port    int    `json:"port"`
	Version string `json:"version"`
}

RuntimeInfo stores daemon runtime state

func ReadRuntime

func ReadRuntime() (*RuntimeInfo, error)

ReadRuntime reads the daemon runtime info

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the HTTP API server for the daemon

func NewServer

func NewServer(db *storage.DB, cfg *config.Config, configPath string) *Server

NewServer creates a new daemon server

func (*Server) SetSyncWorker

func (s *Server) SetSyncWorker(sw *storage.SyncWorker)

SetSyncWorker sets the sync worker for triggering manual syncs

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

Start begins the server and worker pool

func (*Server) Stop

func (s *Server) Stop() error

Stop gracefully shuts down the server

type StaticConfig added in v0.15.0

type StaticConfig struct {
	// contains filtered or unexported fields
}

StaticConfig wraps a config for use without hot-reloading (e.g., in tests)

func NewStaticConfig added in v0.15.0

func NewStaticConfig(cfg *config.Config) *StaticConfig

NewStaticConfig creates a ConfigGetter that always returns the same config

func (*StaticConfig) Config added in v0.15.0

func (sc *StaticConfig) Config() *config.Config

Config returns the static config

type Subscriber

type Subscriber struct {
	ID       int
	RepoPath string // Filter: only send events for this repo (empty = all)
	Ch       chan Event
}

Subscriber represents a client subscribed to events

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages a pool of review workers

func NewWorkerPool

func NewWorkerPool(db *storage.DB, cfgGetter ConfigGetter, numWorkers int, broadcaster Broadcaster, errorLog *ErrorLog) *WorkerPool

NewWorkerPool creates a new worker pool

func (*WorkerPool) ActiveWorkers

func (wp *WorkerPool) ActiveWorkers() int

ActiveWorkers returns the number of currently active workers

func (*WorkerPool) CancelJob

func (wp *WorkerPool) CancelJob(jobID int64) bool

CancelJob cancels a running job by its ID, killing the subprocess. Returns true if the job was canceled or marked for pending cancellation. Returns false only if the job doesn't exist or isn't in a cancellable state.

func (*WorkerPool) MaxWorkers added in v0.15.0

func (wp *WorkerPool) MaxWorkers() int

MaxWorkers returns the total number of workers in the pool

func (*WorkerPool) Start

func (wp *WorkerPool) Start()

Start begins the worker pool

func (*WorkerPool) Stop

func (wp *WorkerPool) Stop()

Stop gracefully shuts down the worker pool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL