daemon

package
v0.51.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2026 License: MIT Imports: 52 Imported by: 0

Documentation

Index

Constants

View Source
const MaxErrorLogEntries = 100

MaxErrorLogEntries is the maximum number of error log entries kept in memory

Variables

View Source
var DefaultPollInterval = 2 * time.Second

DefaultPollInterval is the default polling interval for WaitForReview. Tests can override this to speed up polling-based tests.

View Source
var MaxUnixPathLen = func() int {
	if runtime.GOOS == "darwin" {
		return 104
	}
	return 108
}()

MaxUnixPathLen is the platform socket path length limit. macOS/BSD: 104, Linux: 108.

Functions

func CleanJobLogs added in v0.35.0

func CleanJobLogs(maxAge time.Duration) int

CleanJobLogs removes log files older than maxAge and returns the number of files removed.

func CleanupZombieDaemons added in v0.16.3

func CleanupZombieDaemons(target DaemonEndpoint) int

CleanupZombieDaemons finds and kills all unresponsive daemons. Returns the number of zombies cleaned up.

func DefaultActivityLogPath added in v0.34.0

func DefaultActivityLogPath() string

DefaultActivityLogPath returns the default path for the activity log

func DefaultErrorLogPath

func DefaultErrorLogPath() string

DefaultErrorLogPath returns the default path for the error log

func DefaultSocketPath added in v0.49.0

func DefaultSocketPath() string

DefaultSocketPath returns the auto-generated socket path under os.TempDir(), or $XDG_RUNTIME_DIR when it is an absolute path to an existing directory and the resulting socket path fits within MaxUnixPathLen.

func DeleteCompactMetadata added in v0.33.0

func DeleteCompactMetadata(jobID int64) error

DeleteCompactMetadata removes the metadata file after processing

func ExactReposOnly added in v0.36.0

func ExactReposOnly(repos []string) []string

ExactReposOnly returns the subset of ci.Repos that are exact entries (no glob characters). Used as a fallback when resolver expansion fails.

func FindAvailablePort

func FindAvailablePort(startAddr string) (string, int, error)

FindAvailablePort finds an available port starting from the configured port. After zombie cleanup, this should usually succeed on the first try. Falls back to searching if the port is still in use (e.g., by another service).

func IsDaemonAlive added in v0.16.3

func IsDaemonAlive(ep DaemonEndpoint) bool

IsDaemonAlive checks if a daemon at the given endpoint is actually responding. This is more reliable than checking PID and works cross-platform. Only allows loopback addresses (for TCP) to prevent SSRF via malicious runtime files. Uses retry logic to avoid misclassifying a slow or transiently failing daemon.

func IsValidCompactOutput added in v0.33.0

func IsValidCompactOutput(output string) bool

IsValidCompactOutput checks whether compact agent output looks like a real response (vs. empty or an obvious error/stack trace). Intentionally permissive — we don't try to parse the review content.

func JobLogDir added in v0.35.0

func JobLogDir() string

JobLogDir returns the directory for per-job log files.

func JobLogExists added in v0.35.0

func JobLogExists(jobID int64) bool

JobLogExists reports whether a log file exists for the given job.

func JobLogPath added in v0.35.0

func JobLogPath(jobID int64) string

JobLogPath returns the log file path for a given job ID.

func KillDaemon added in v0.16.3

func KillDaemon(info *RuntimeInfo) bool

KillDaemon attempts to gracefully shut down a daemon, then force kill if needed. Returns true if the daemon was killed or is no longer running. Only removes runtime file if the daemon is confirmed dead.

func LegacyRuntimePath added in v0.16.3

func LegacyRuntimePath() string

LegacyRuntimePath returns the old daemon.json path for migration

func ParseJobIDFromLogName added in v0.35.0

func ParseJobIDFromLogName(name string) (int64, bool)

ParseJobIDFromLogName extracts the job ID from a log file name like "42.log". Returns 0, false if the name doesn't match.

func ReadJobLog added in v0.35.0

func ReadJobLog(jobID int64) ([]byte, error)

ReadJobLog returns the contents of the log file for a job, or an error if the file doesn't exist.

func RemoveRuntime

func RemoveRuntime()

RemoveRuntime removes the runtime info file for the current process

func RemoveRuntimeForPID added in v0.16.3

func RemoveRuntimeForPID(pid int)

RemoveRuntimeForPID removes the runtime info file for a specific PID

func RuntimePath

func RuntimePath() string

RuntimePath returns the path to the runtime info file for the current process

func RuntimePathForPID added in v0.16.3

func RuntimePathForPID(pid int) string

RuntimePathForPID returns the path to the runtime info file for a specific PID

func WriteRuntime

func WriteRuntime(ep DaemonEndpoint, version string) error

WriteRuntime saves the daemon runtime info atomically. Uses write-to-temp-then-rename to prevent readers from seeing partial writes.

Types

type ActivityEntry added in v0.34.0

type ActivityEntry struct {
	Timestamp time.Time         `json:"ts"`
	Event     string            `json:"event"`
	Component string            `json:"component"`
	Message   string            `json:"message"`
	Details   map[string]string `json:"details,omitempty"`
}

ActivityEntry represents a single activity log entry

type ActivityLog added in v0.34.0

type ActivityLog struct {
	// contains filtered or unexported fields
}

ActivityLog manages activity logging to a JSONL file and maintains an in-memory ring buffer. Follows the same pattern as ErrorLog.

func NewActivityLog added in v0.34.0

func NewActivityLog(path string) (*ActivityLog, error)

NewActivityLog creates a new activity log writer. If the existing file exceeds maxActivityLogSize it is truncated.

func (*ActivityLog) Close added in v0.34.0

func (a *ActivityLog) Close() error

Close closes the activity log file

func (*ActivityLog) Log added in v0.34.0

func (a *ActivityLog) Log(
	event, component, message string,
	details map[string]string,
)

Log writes an activity entry to both file and in-memory buffer. The details map is copied; callers may safely mutate it after calling Log.

func (*ActivityLog) Recent added in v0.34.0

func (a *ActivityLog) Recent() []ActivityEntry

Recent returns all activity entries in the ring buffer (newest first)

func (*ActivityLog) RecentN added in v0.34.0

func (a *ActivityLog) RecentN(n int) []ActivityEntry

RecentN returns up to n most recent entries (newest first)

type AddCommentInput added in v0.51.0

type AddCommentInput struct {
	Body AddCommentRequest
}

AddCommentInput is the request body for adding a comment.

type AddCommentOutput added in v0.51.0

type AddCommentOutput struct {
	Body *storage.Response
}

AddCommentOutput is the response for POST /api/comment.

type AddCommentRequest added in v0.17.0

type AddCommentRequest struct {
	SHA       string `json:"sha,omitempty"`    // Legacy: link to commit by SHA
	JobID     int64  `json:"job_id,omitempty"` // Preferred: link to job
	Commenter string `json:"commenter"`
	Comment   string `json:"comment"`
}

AddCommentRequest is the JSON body for POST /api/comment.

type Broadcaster

type Broadcaster interface {
	Subscribe(repoPath string) (int, <-chan Event)
	Unsubscribe(id int)
	Broadcast(event Event)
	SubscriberCount() int
}

Broadcaster interface manages event subscriptions and broadcasting

func NewBroadcaster

func NewBroadcaster() Broadcaster

NewBroadcaster creates a new event broadcaster

type CIPoller added in v0.26.0

type CIPoller struct {
	// contains filtered or unexported fields
}

CIPoller polls GitHub for open PRs and enqueues security reviews. It also listens for review.completed events and posts results as PR comments.

func NewCIPoller added in v0.26.0

func NewCIPoller(db *storage.DB, cfgGetter ConfigGetter, broadcaster Broadcaster) *CIPoller

NewCIPoller creates a new CI poller. If GitHub App is configured, it initializes a token provider so gh commands authenticate as the app bot instead of the user's personal account.

func (*CIPoller) HealthCheck added in v0.26.0

func (p *CIPoller) HealthCheck() (bool, string)

HealthCheck returns whether the CI poller is healthy

func (*CIPoller) Start added in v0.26.0

func (p *CIPoller) Start() error

Start begins polling for PRs

func (*CIPoller) Stop added in v0.26.0

func (p *CIPoller) Stop()

Stop gracefully shuts down the CI poller

type CancelJobInput added in v0.51.0

type CancelJobInput struct {
	Body CancelJobRequest
}

CancelJobInput is the request body for canceling a job.

type CancelJobOutput added in v0.51.0

type CancelJobOutput struct {
	Body struct {
		Success bool `json:"success"`
	}
}

CancelJobOutput is the response for POST /api/job/cancel.

type CancelJobRequest

type CancelJobRequest struct {
	JobID int64 `json:"job_id"`
}

CancelJobRequest is the JSON body for POST /api/job/cancel.

type Client

type Client interface {
	// GetReviewBySHA retrieves a review by commit SHA
	GetReviewBySHA(sha string) (*storage.Review, error)

	// GetReviewByJobID retrieves a review by job ID
	GetReviewByJobID(jobID int64) (*storage.Review, error)

	// MarkReviewClosed marks a review as closed by job ID
	MarkReviewClosed(jobID int64) error

	// AddComment adds a comment to a job
	AddComment(jobID int64, commenter, comment string) error

	// EnqueueReview enqueues a review job and returns the job ID
	EnqueueReview(repoPath, gitRef, agentName string) (int64, error)

	// WaitForReview waits for a job to complete and returns the review
	WaitForReview(jobID int64) (*storage.Review, error)

	// FindJobForCommit finds a job for a specific commit in a repo
	FindJobForCommit(repoPath, sha string) (*storage.ReviewJob, error)

	// FindPendingJobForRef finds a queued or running job for any git ref
	FindPendingJobForRef(repoPath, gitRef string) (*storage.ReviewJob, error)

	// GetCommentsForJob fetches comments for a job
	GetCommentsForJob(jobID int64) ([]storage.Response, error)

	// Remap updates git_ref for jobs whose commits were rewritten
	Remap(req RemapRequest) (*RemapResult, error)
}

Client provides an interface for interacting with the roborev daemon. This abstraction allows for easy mocking in tests.

type CloseReviewInput added in v0.51.0

type CloseReviewInput struct {
	Body CloseReviewRequest
}

CloseReviewInput is the request body for closing/reopening a review.

type CloseReviewOutput added in v0.51.0

type CloseReviewOutput struct {
	Body struct {
		Success bool `json:"success"`
	}
}

CloseReviewOutput is the response for POST /api/review/close.

type CloseReviewRequest added in v0.40.0

type CloseReviewRequest struct {
	JobID  int64 `json:"job_id"`
	Closed bool  `json:"closed"`
}

CloseReviewRequest is the JSON body for POST /api/review/close.

type CompactMetadata added in v0.33.0

type CompactMetadata struct {
	SourceJobIDs []int64 `json:"source_job_ids"`
}

CompactMetadata stores source job IDs for a compact job

func ReadCompactMetadata added in v0.33.0

func ReadCompactMetadata(jobID int64) (*CompactMetadata, error)

ReadCompactMetadata retrieves source job IDs for a compact job

type ConfigGetter added in v0.15.0

type ConfigGetter interface {
	Config() *config.Config
}

ConfigGetter provides access to the current config

type ConfigWatcher added in v0.15.0

type ConfigWatcher struct {
	// contains filtered or unexported fields
}

ConfigWatcher watches config.toml for changes and reloads configuration.

Hot-reloadable settings take effect immediately: default_agent, job_timeout, allow_unsafe_agents, anthropic_api_key, review_context_count.

Settings requiring restart: server_addr, max_workers, sync section. These are read at startup and the running values are preserved even if the config file changes. CLI flag overrides (--addr, --workers) only apply to restart-required settings, so they remain in effect for the daemon's lifetime. The config object may show file values after reload, but the actual running server address and worker pool size are fixed at startup.

Note: ConfigWatcher is not restart-safe. Once Stop() is called, Start() will return an error. Create a new ConfigWatcher instance if restart is needed.

func NewConfigWatcher added in v0.15.0

func NewConfigWatcher(configPath string, cfg *config.Config, broadcaster Broadcaster, activityLog *ActivityLog) *ConfigWatcher

NewConfigWatcher creates a new config watcher

func (*ConfigWatcher) Config added in v0.15.0

func (cw *ConfigWatcher) Config() *config.Config

Config returns the current config with read lock

func (*ConfigWatcher) LastReloadedAt added in v0.15.0

func (cw *ConfigWatcher) LastReloadedAt() time.Time

LastReloadedAt returns the time of the last successful config reload

func (*ConfigWatcher) ReloadCounter added in v0.15.1

func (cw *ConfigWatcher) ReloadCounter() uint64

ReloadCounter returns a monotonic counter incremented on each reload. Use this instead of timestamp comparison to detect reloads that happen within the same second.

func (*ConfigWatcher) Start added in v0.15.0

func (cw *ConfigWatcher) Start(ctx context.Context) error

Start begins watching the config file for changes. Returns an error if the watcher has already been stopped.

func (*ConfigWatcher) Stop added in v0.15.0

func (cw *ConfigWatcher) Stop()

Stop stops the config watcher. Safe to call multiple times.

type DaemonEndpoint added in v0.49.0

type DaemonEndpoint struct {
	Network string // "tcp" or "unix"
	Address string // "127.0.0.1:7373" or "/tmp/roborev-1000/daemon.sock"
}

DaemonEndpoint encapsulates the transport type and address for the daemon.

func ParseEndpoint added in v0.49.0

func ParseEndpoint(serverAddr string) (DaemonEndpoint, error)

ParseEndpoint parses a server_addr config value into a DaemonEndpoint.

func (DaemonEndpoint) BaseURL added in v0.49.0

func (e DaemonEndpoint) BaseURL() string

BaseURL returns the HTTP base URL for constructing API requests.

func (DaemonEndpoint) ConfigAddr added in v0.49.0

func (e DaemonEndpoint) ConfigAddr() string

ConfigAddr returns a ParseEndpoint-compatible string suitable for persisting in config or runtime metadata files.

func (DaemonEndpoint) HTTPClient added in v0.49.0

func (e DaemonEndpoint) HTTPClient(timeout time.Duration) *http.Client

HTTPClient returns an http.Client configured for this endpoint's transport.

func (DaemonEndpoint) IsUnix added in v0.49.0

func (e DaemonEndpoint) IsUnix() bool

IsUnix returns true if this endpoint uses a Unix domain socket.

func (DaemonEndpoint) Listener added in v0.49.0

func (e DaemonEndpoint) Listener() (net.Listener, error)

Listener creates a net.Listener bound to this endpoint.

func (DaemonEndpoint) Port added in v0.49.0

func (e DaemonEndpoint) Port() int

Port returns the TCP port, or 0 for Unix sockets.

func (DaemonEndpoint) String added in v0.49.0

func (e DaemonEndpoint) String() string

String returns a human-readable representation for logging.

type EnqueueRequest

type EnqueueRequest struct {
	RepoPath     string `json:"repo_path"`
	CommitSHA    string `json:"commit_sha,omitempty"` // Single commit (for backwards compat)
	GitRef       string `json:"git_ref,omitempty"`    // Single commit, range like "abc..def", or "dirty"
	Branch       string `json:"branch,omitempty"`     // Branch name at time of job creation
	Since        string `json:"since,omitempty"`      // RFC3339 lower bound for insights datasets
	Agent        string `json:"agent,omitempty"`
	Model        string `json:"model,omitempty"`         // Model to use (for opencode: provider/model format)
	DiffContent  string `json:"diff_content,omitempty"`  // Pre-captured diff for dirty reviews
	Reasoning    string `json:"reasoning,omitempty"`     // Reasoning level: thorough, standard, fast
	ReviewType   string `json:"review_type,omitempty"`   // Review type (e.g., "security") — changes system prompt
	CustomPrompt string `json:"custom_prompt,omitempty"` // Custom prompt for ad-hoc agent work
	Agentic      bool   `json:"agentic,omitempty"`       // Enable agentic mode (allow file edits)
	OutputPrefix string `json:"output_prefix,omitempty"` // Prefix to prepend to review output
	JobType      string `json:"job_type,omitempty"`      // Explicit job type (review/range/dirty/task/insights/compact/fix)
	Provider     string `json:"provider,omitempty"`      // Provider for pi agent (e.g., "anthropic")
	MinSeverity  string `json:"min_severity,omitempty"`  // Minimum severity filter: critical, high, medium, low
}

type ErrorEntry

type ErrorEntry struct {
	Timestamp time.Time `json:"ts"`
	Level     string    `json:"level"`     // "error", "warn"
	Component string    `json:"component"` // "worker", "sync", "server"
	Message   string    `json:"message"`
	JobID     int64     `json:"job_id,omitempty"`
}

ErrorEntry represents a single error log entry

type ErrorLog

type ErrorLog struct {
	// contains filtered or unexported fields
}

ErrorLog manages error logging to a file and maintains an in-memory ring buffer

func NewErrorLog

func NewErrorLog(path string) (*ErrorLog, error)

NewErrorLog creates a new error log writer

func (*ErrorLog) Close

func (e *ErrorLog) Close() error

Close closes the error log file

func (*ErrorLog) Count24h

func (e *ErrorLog) Count24h() int

Count24h returns the count of errors in the last 24 hours from the in-memory buffer. Note: This only counts up to maxRecent (100) entries. If error volume is high, the actual 24h count may be higher. For precise counts, parse the log file.

func (*ErrorLog) Log

func (e *ErrorLog) Log(level, component, message string, jobID int64)

Log writes an error entry to both file and in-memory buffer

func (*ErrorLog) LogError

func (e *ErrorLog) LogError(component, message string, jobID int64)

LogError is a convenience method for logging errors

func (*ErrorLog) LogWarn

func (e *ErrorLog) LogWarn(component, message string, jobID int64)

LogWarn is a convenience method for logging warnings

func (*ErrorLog) Recent

func (e *ErrorLog) Recent() []ErrorEntry

Recent returns the most recent error entries (newest first)

func (*ErrorLog) RecentN

func (e *ErrorLog) RecentN(n int) []ErrorEntry

RecentN returns up to n most recent error entries (newest first)

type ErrorResponse

type ErrorResponse struct {
	Error string `json:"error"`
}

type Event

type Event struct {
	Type         string    `json:"type"`
	TS           time.Time `json:"ts"`
	JobID        int64     `json:"job_id"`
	Repo         string    `json:"repo"`
	RepoName     string    `json:"repo_name"`
	SHA          string    `json:"sha"`
	Agent        string    `json:"agent,omitempty"`
	Verdict      string    `json:"verdict,omitempty"`
	Findings     string    `json:"findings,omitempty"`
	Error        string    `json:"error,omitempty"`
	WorktreePath string    `json:"worktree_path,omitempty"`
}

Event represents a review event that can be broadcast

func (Event) MarshalJSON

func (e Event) MarshalJSON() ([]byte, error)

MarshalJSON converts an Event to JSON for streaming

type EventBroadcaster

type EventBroadcaster struct {
	// contains filtered or unexported fields
}

EventBroadcaster implements the Broadcaster interface

func (*EventBroadcaster) Broadcast

func (b *EventBroadcaster) Broadcast(event Event)

Broadcast sends an event to all matching subscribers Non-blocking: if a subscriber's channel is full, the event is dropped for that subscriber

func (*EventBroadcaster) Subscribe

func (b *EventBroadcaster) Subscribe(repoPath string) (int, <-chan Event)

Subscribe adds a new subscriber with optional repo filter Returns a subscriber ID and event channel

func (*EventBroadcaster) SubscriberCount

func (b *EventBroadcaster) SubscriberCount() int

SubscriberCount returns the current number of subscribers (for testing)

func (*EventBroadcaster) Unsubscribe

func (b *EventBroadcaster) Unsubscribe(id int)

Unsubscribe removes a subscriber and closes its channel

type GetReviewInput added in v0.51.0

type GetReviewInput struct {
	JobID int64  `query:"job_id" default:"-1" doc:"Look up review by job ID"`
	SHA   string `query:"sha" doc:"Look up review by commit SHA"`
}

GetReviewInput holds query parameters for fetching a review.

type GetReviewOutput added in v0.51.0

type GetReviewOutput struct {
	Body *storage.Review
}

GetReviewOutput is the response for GET /api/review.

type GetStatusInput added in v0.51.0

type GetStatusInput struct{}

GetStatusInput is an empty input for the status endpoint.

type GetStatusOutput added in v0.51.0

type GetStatusOutput struct {
	Body storage.DaemonStatus
}

GetStatusOutput is the response for GET /api/status.

type GetSummaryInput added in v0.51.0

type GetSummaryInput struct {
	Since  string `query:"since" doc:"Time window (e.g. 7d, 24h). Default: 7d"`
	Repo   string `query:"repo" doc:"Filter by repo root path"`
	Branch string `query:"branch" doc:"Filter by branch name"`
	All    string `query:"all" doc:"Include per-repo breakdown" enum:"true,false"`
}

GetSummaryInput holds query parameters for the summary endpoint.

type GetSummaryOutput added in v0.51.0

type GetSummaryOutput struct {
	Body *storage.Summary
}

GetSummaryOutput is the response for GET /api/summary.

type GitHubAppTokenProvider added in v0.26.0

type GitHubAppTokenProvider struct {
	// contains filtered or unexported fields
}

GitHubAppTokenProvider obtains GitHub installation access tokens using GitHub App JWT authentication. It caches tokens per installation and refreshes them when within 5 minutes of expiry. Thread-safe.

func NewGitHubAppTokenProvider added in v0.26.0

func NewGitHubAppTokenProvider(appID int64, pemData string) (*GitHubAppTokenProvider, error)

NewGitHubAppTokenProvider creates a token provider from the given PEM data. Supports both PKCS1 and PKCS8 private key formats.

func (*GitHubAppTokenProvider) TokenForInstallation added in v0.26.0

func (p *GitHubAppTokenProvider) TokenForInstallation(installationID int64) (string, error)

TokenForInstallation returns a valid access token for the given installation, refreshing if needed.

type HTTPClient

type HTTPClient struct {
	// contains filtered or unexported fields
}

HTTPClient is the default HTTP-based implementation of Client

func NewHTTPClient

func NewHTTPClient(ep DaemonEndpoint) *HTTPClient

NewHTTPClient creates a new HTTP daemon client

func NewHTTPClientFromRuntime

func NewHTTPClientFromRuntime() (*HTTPClient, error)

NewHTTPClientFromRuntime creates an HTTP client using daemon runtime info

func (*HTTPClient) AddComment added in v0.17.0

func (c *HTTPClient) AddComment(jobID int64, commenter, comment string) error

func (*HTTPClient) EnqueueReview

func (c *HTTPClient) EnqueueReview(repoPath, gitRef, agentName string) (int64, error)

func (*HTTPClient) FindJobForCommit

func (c *HTTPClient) FindJobForCommit(repoPath, sha string) (*storage.ReviewJob, error)

func (*HTTPClient) FindPendingJobForRef

func (c *HTTPClient) FindPendingJobForRef(repoPath, gitRef string) (*storage.ReviewJob, error)

func (*HTTPClient) GetAllCommentsForJob added in v0.51.0

func (c *HTTPClient) GetAllCommentsForJob(jobID, commitID int64, gitRef string) ([]storage.Response, error)

GetAllCommentsForJob fetches comments for a job, merging legacy commit-based comments via storage.MergeResponses. When commitID > 0, fetches legacy by commit ID. Otherwise, if gitRef looks like a SHA, fetches by SHA. Callers may pre-validate gitRef via git.LooksLikeSHA.

func (*HTTPClient) GetCommentsForJob added in v0.17.0

func (c *HTTPClient) GetCommentsForJob(jobID int64) ([]storage.Response, error)

func (*HTTPClient) GetReviewByJobID

func (c *HTTPClient) GetReviewByJobID(jobID int64) (*storage.Review, error)

func (*HTTPClient) GetReviewBySHA

func (c *HTTPClient) GetReviewBySHA(sha string) (*storage.Review, error)

func (*HTTPClient) MarkReviewClosed added in v0.40.0

func (c *HTTPClient) MarkReviewClosed(jobID int64) error

func (*HTTPClient) Remap added in v0.33.0

func (c *HTTPClient) Remap(req RemapRequest) (*RemapResult, error)

Remap sends rewritten commit mappings to the daemon so that review jobs are updated to point at the new SHAs.

func (*HTTPClient) SetPollInterval

func (c *HTTPClient) SetPollInterval(interval time.Duration)

SetPollInterval sets the polling interval for WaitForReview

func (*HTTPClient) WaitForReview

func (c *HTTPClient) WaitForReview(jobID int64) (*storage.Review, error)

type HookRunner added in v0.22.0

type HookRunner struct {
	// contains filtered or unexported fields
}

HookRunner listens for broadcaster events and runs configured hooks.

func NewHookRunner added in v0.22.0

func NewHookRunner(cfgGetter ConfigGetter, broadcaster Broadcaster, logger *log.Logger) *HookRunner

NewHookRunner creates a new HookRunner that subscribes to events from the broadcaster.

func (*HookRunner) Stop added in v0.22.0

func (hr *HookRunner) Stop()

Stop shuts down the hook runner, waits for in-flight hooks to finish, and unsubscribes from the broadcaster. Unsubscribe runs before Wait to prevent the broadcaster from blocking on a full channel after the event loop exits.

func (*HookRunner) WaitUntilIdle added in v0.37.0

func (hr *HookRunner) WaitUntilIdle()

WaitUntilIdle blocks until the currently queued events are drained and all in-flight hooks have finished. It is a point-in-time barrier: events arriving after the drain starts are handled on the next listener iteration.

type JobOutput added in v0.18.0

type JobOutput struct {
	// contains filtered or unexported fields
}

JobOutput stores output for a single job

type JobOutputResponse added in v0.18.0

type JobOutputResponse struct {
	JobID   int64        `json:"job_id"`
	Status  string       `json:"status"`
	Lines   []OutputLine `json:"lines"`
	HasMore bool         `json:"has_more"`
}

JobOutputResponse is the response for GET /api/job/output.

type ListBranchesInput added in v0.51.0

type ListBranchesInput struct {
	Repo []string `query:"repo,explode" doc:"Filter to branches in these repo paths"`
}

ListBranchesInput holds query parameters for listing branches.

type ListBranchesOutput added in v0.51.0

type ListBranchesOutput struct {
	Body struct {
		Branches       []storage.BranchWithCount `json:"branches"`
		TotalCount     int                       `json:"total_count"`
		NullsRemaining int                       `json:"nulls_remaining"`
	}
}

ListBranchesOutput is the response for GET /api/branches.

type ListCommentsInput added in v0.51.0

type ListCommentsInput struct {
	JobID    int64  `query:"job_id" default:"-1" doc:"List comments by job ID"`
	CommitID int64  `query:"commit_id" default:"-1" doc:"List comments by commit ID"`
	SHA      string `query:"sha" doc:"List comments by commit SHA"`
}

ListCommentsInput holds query parameters for listing comments.

type ListCommentsOutput added in v0.51.0

type ListCommentsOutput struct {
	Body struct {
		Responses []storage.Response `json:"responses"`
	}
}

ListCommentsOutput is the response for GET /api/comments.

type ListJobsInput added in v0.51.0

type ListJobsInput struct {
	ID                 int64  `query:"id" default:"-1" doc:"Return a single job by ID"`
	Status             string `query:"status" doc:"Filter by job status"`
	Repo               string `query:"repo" doc:"Filter by repo root path"`
	GitRef             string `query:"git_ref" doc:"Filter by git ref"`
	Branch             string `query:"branch" doc:"Filter by branch name"`
	BranchIncludeEmpty string `query:"branch_include_empty" doc:"Include jobs with no branch when filtering by branch" enum:"true,false,"`
	Closed             string `query:"closed" doc:"Filter by review closed state" enum:"true,false,"`
	JobType            string `query:"job_type" doc:"Filter by job type"`
	ExcludeJobType     string `query:"exclude_job_type" doc:"Exclude jobs of this type"`
	RepoPrefix         string `query:"repo_prefix" doc:"Filter repos by path prefix"`
	Limit              int    `query:"limit" default:"-999999" doc:"Max results (default 50, 0=unlimited, max 10000)"`
	Offset             int    `query:"offset" default:"-1" doc:"Skip N results (requires limit>0)"`
	Before             int64  `query:"before" default:"-1" doc:"Cursor: return jobs with ID < this value"`
}

ListJobsInput holds query parameters for listing jobs. Huma does not support pointer types for query parameters, so we use sentinel defaults to detect presence:

  • ID, Before: default -1 (valid IDs are always positive)
  • Limit: default limitNotProvided (so explicit negative values like -1 are treated as unlimited, matching legacy)
  • Offset: default -1 (negative offsets clamp to 0)

type ListJobsOutput added in v0.51.0

type ListJobsOutput struct {
	Body struct {
		Jobs    []storage.ReviewJob `json:"jobs"`
		HasMore bool                `json:"has_more"`
		Stats   *storage.JobStats   `json:"stats,omitempty"`
	}
}

ListJobsOutput is the response for GET /api/jobs.

type ListReposInput added in v0.51.0

type ListReposInput struct {
	Branch string `query:"branch" doc:"Filter to repos with jobs on this branch"`
	Prefix string `query:"prefix" doc:"Filter repos by path prefix"`
}

ListReposInput holds query parameters for listing repos.

type ListReposOutput added in v0.51.0

type ListReposOutput struct {
	Body struct {
		Repos      []storage.RepoWithCount `json:"repos"`
		TotalCount int                     `json:"total_count"`
	}
}

ListReposOutput is the response for GET /api/repos.

type OutputBuffer added in v0.18.0

type OutputBuffer struct {
	// contains filtered or unexported fields
}

OutputBuffer stores streaming output for running jobs with memory limits.

func NewOutputBuffer added in v0.18.0

func NewOutputBuffer(maxPerJob, maxTotal int) *OutputBuffer

NewOutputBuffer creates a new output buffer with the given limits.

func (*OutputBuffer) Append added in v0.18.0

func (ob *OutputBuffer) Append(jobID int64, line OutputLine)

Append adds a line to the job's output buffer.

func (*OutputBuffer) CloseJob added in v0.18.0

func (ob *OutputBuffer) CloseJob(jobID int64)

CloseJob marks a job as complete and removes its buffer.

func (*OutputBuffer) GetLines added in v0.18.0

func (ob *OutputBuffer) GetLines(jobID int64) []OutputLine

GetLines returns all lines for a job.

func (*OutputBuffer) IsActive added in v0.18.0

func (ob *OutputBuffer) IsActive(jobID int64) bool

IsActive returns true if there's an active buffer for this job.

func (*OutputBuffer) Subscribe added in v0.18.0

func (ob *OutputBuffer) Subscribe(jobID int64) ([]OutputLine, <-chan OutputLine, func())

Subscribe returns existing lines and a channel for new lines. Call the returned cancel function when done.

func (*OutputBuffer) Writer added in v0.18.0

func (ob *OutputBuffer) Writer(jobID int64, normalize OutputNormalizer) *outputWriter

Writer returns an io.Writer that normalizes and stores output for a job. Lines exceeding maxPerJob will be truncated to prevent unbounded buffer growth.

type OutputLine added in v0.18.0

type OutputLine struct {
	Timestamp time.Time `json:"ts"`
	Text      string    `json:"text"`
	Type      string    `json:"line_type"` // "text", "tool", "thinking", "error"
}

OutputLine represents a single line of normalized output

func NormalizeClaudeOutput added in v0.18.0

func NormalizeClaudeOutput(line string) *OutputLine

NormalizeClaudeOutput parses Claude's stream-json format and extracts readable content.

func NormalizeCodexOutput added in v0.29.0

func NormalizeCodexOutput(line string) *OutputLine

NormalizeCodexOutput parses codex's --json JSONL format and extracts readable content.

func NormalizeGenericOutput added in v0.18.0

func NormalizeGenericOutput(line string) *OutputLine

NormalizeGenericOutput is the default normalizer for other agents.

func NormalizeOpenCodeOutput added in v0.18.0

func NormalizeOpenCodeOutput(line string) *OutputLine

NormalizeOpenCodeOutput normalizes OpenCode output (plain text with ANSI codes).

type OutputNormalizer added in v0.18.0

type OutputNormalizer func(line string) *OutputLine

OutputNormalizer converts agent-specific output to normalized OutputLines.

func GetNormalizer added in v0.18.0

func GetNormalizer(agentName string) OutputNormalizer

GetNormalizer returns the appropriate normalizer for an agent.

type PingInfo added in v0.44.0

type PingInfo struct {
	Service string `json:"service"`
	Version string `json:"version"`
	PID     int    `json:"pid,omitempty"`
}

PingInfo is the minimal daemon identity payload used for liveness probes.

func ProbeDaemon added in v0.44.0

func ProbeDaemon(ep DaemonEndpoint, timeout time.Duration) (*PingInfo, error)

ProbeDaemon validates that a daemon endpoint is serving the roborev daemon. It prefers the lightweight /api/ping endpoint and falls back to /api/status for older daemon versions that do not implement /api/ping yet.

type RemapMapping added in v0.33.0

type RemapMapping struct {
	OldSHA    string `json:"old_sha"`
	NewSHA    string `json:"new_sha"`
	PatchID   string `json:"patch_id"`
	Author    string `json:"author"`
	Subject   string `json:"subject"`
	Timestamp string `json:"timestamp"` // RFC3339
}

RemapMapping maps a pre-rewrite SHA to its post-rewrite replacement.

type RemapRequest added in v0.33.0

type RemapRequest struct {
	RepoPath string         `json:"repo_path"`
	Mappings []RemapMapping `json:"mappings"`
}

RemapRequest is the request body for POST /api/remap.

type RemapResult added in v0.33.0

type RemapResult struct {
	Remapped int `json:"remapped"`
	Skipped  int `json:"skipped"`
}

RemapResult is the response from POST /api/remap.

type RepoResolver added in v0.36.0

type RepoResolver struct {
	// contains filtered or unexported fields
}

RepoResolver expands wildcard patterns in CI repo config into concrete "owner/repo" entries by querying the GitHub API. Results are cached for the refresh interval and automatically invalidated when the config changes.

func (*RepoResolver) Resolve added in v0.36.0

func (r *RepoResolver) Resolve(ctx context.Context, ci *config.CIConfig, tokenFn githubTokenFn) ([]string, error)

Resolve returns the list of concrete "owner/repo" entries to poll. It uses a cached result when the TTL has not expired and the config has not changed, otherwise it re-expands wildcard patterns.

type RerunJobInput added in v0.51.0

type RerunJobInput struct {
	Body RerunJobRequest
}

RerunJobInput is the request body for rerunning a job.

type RerunJobOutput added in v0.51.0

type RerunJobOutput struct {
	Body struct {
		Success bool `json:"success"`
	}
}

RerunJobOutput is the response for POST /api/job/rerun.

type RerunJobRequest

type RerunJobRequest struct {
	JobID int64 `json:"job_id"`
}

RerunJobRequest is the JSON body for POST /api/job/rerun.

type RuntimeInfo

type RuntimeInfo struct {
	PID        int    `json:"pid"`
	Addr       string `json:"addr"`
	Port       int    `json:"port"`
	Network    string `json:"network"`
	Version    string `json:"version"`
	SourcePath string `json:"-"` // Path to the runtime file (not serialized, set by ListAllRuntimes)
}

RuntimeInfo stores daemon runtime state

func GetAnyRunningDaemon added in v0.16.3

func GetAnyRunningDaemon() (*RuntimeInfo, error)

GetAnyRunningDaemon returns info about a responsive daemon. Returns os.ErrNotExist if no responsive daemon is found.

func ListAllRuntimes added in v0.16.3

func ListAllRuntimes() ([]*RuntimeInfo, error)

ListAllRuntimes returns info for all daemon runtime files found. Sets SourcePath on each RuntimeInfo for proper cleanup. Continues scanning even if some files are unreadable (e.g., permission errors).

func ReadRuntime

func ReadRuntime() (*RuntimeInfo, error)

ReadRuntime reads the daemon runtime info for the current process

func ReadRuntimeForPID added in v0.16.3

func ReadRuntimeForPID(pid int) (*RuntimeInfo, error)

ReadRuntimeForPID reads the daemon runtime info for a specific PID

func (RuntimeInfo) Endpoint added in v0.49.0

func (r RuntimeInfo) Endpoint() DaemonEndpoint

Endpoint returns a DaemonEndpoint for this runtime. An empty Network defaults to "tcp" for backwards compatibility with old runtime files that predate the Network field.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the HTTP API server for the daemon

func NewServer

func NewServer(db *storage.DB, cfg *config.Config, configPath string) *Server

NewServer creates a new daemon server

func (*Server) Broadcaster added in v0.26.0

func (s *Server) Broadcaster() Broadcaster

Broadcaster returns the server's event broadcaster (for use by external components)

func (*Server) Close added in v0.37.0

func (s *Server) Close() error

Close shuts down the server and releases its resources. It is primarily provided for ease of use in test cleanup.

func (*Server) ConfigWatcher added in v0.26.0

func (s *Server) ConfigWatcher() *ConfigWatcher

ConfigWatcher returns the server's config watcher (for use by external components)

func (*Server) SetCIPoller added in v0.26.0

func (s *Server) SetCIPoller(cp *CIPoller)

SetCIPoller sets the CI poller for status reporting and wires up the worker pool cancellation callback so the poller can kill running processes when superseding stale batches.

func (*Server) SetSyncWorker

func (s *Server) SetSyncWorker(sw *storage.SyncWorker)

SetSyncWorker sets the sync worker for triggering manual syncs

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

Start begins the server and worker pool

func (*Server) Stop

func (s *Server) Stop() error

Stop gracefully shuts down the server

type StaticConfig added in v0.15.0

type StaticConfig struct {
	// contains filtered or unexported fields
}

StaticConfig wraps a config for use without hot-reloading (e.g., in tests)

func NewStaticConfig added in v0.15.0

func NewStaticConfig(cfg *config.Config) *StaticConfig

NewStaticConfig creates a ConfigGetter that always returns the same config

func (*StaticConfig) Config added in v0.15.0

func (sc *StaticConfig) Config() *config.Config

Config returns the static config

type Subscriber

type Subscriber struct {
	ID       int
	RepoPath string // Filter: only send events for this repo (empty = all)
	Ch       chan Event
}

Subscriber represents a client subscribed to events

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages a pool of review workers

func NewWorkerPool

func NewWorkerPool(db *storage.DB, cfgGetter ConfigGetter, numWorkers int, broadcaster Broadcaster, errorLog *ErrorLog, activityLog *ActivityLog) *WorkerPool

NewWorkerPool creates a new worker pool

func (*WorkerPool) ActiveWorkers

func (wp *WorkerPool) ActiveWorkers() int

ActiveWorkers returns the number of currently active workers

func (*WorkerPool) CancelJob

func (wp *WorkerPool) CancelJob(jobID int64) bool

CancelJob cancels a running job by its ID, killing the subprocess. Returns true if the job was canceled or marked for pending cancellation. Returns false only if the job doesn't exist or isn't in a cancellable state.

func (*WorkerPool) GetJobOutput added in v0.18.0

func (wp *WorkerPool) GetJobOutput(jobID int64) []OutputLine

GetJobOutput returns the current output lines for a job.

func (*WorkerPool) HasJobOutput added in v0.18.0

func (wp *WorkerPool) HasJobOutput(jobID int64) bool

HasJobOutput returns true if there's active output capture for a job.

func (*WorkerPool) IsJobPendingCancel added in v0.21.0

func (wp *WorkerPool) IsJobPendingCancel(jobID int64) bool

IsJobPendingCancel reports whether a job is in the pendingCancels set. This is intended for use in tests.

func (*WorkerPool) MaxWorkers added in v0.15.0

func (wp *WorkerPool) MaxWorkers() int

MaxWorkers returns the total number of workers in the pool

func (*WorkerPool) Start

func (wp *WorkerPool) Start()

Start begins the worker pool. Safe to call multiple times; only the first call spawns workers.

func (*WorkerPool) Stop

func (wp *WorkerPool) Stop()

Stop gracefully shuts down the worker pool. Safe to call multiple times; only the first call performs shutdown.

func (*WorkerPool) SubscribeJobOutput added in v0.18.0

func (wp *WorkerPool) SubscribeJobOutput(jobID int64) ([]OutputLine, <-chan OutputLine, func())

SubscribeJobOutput returns initial lines and a channel for new output. Call cancel when done to unsubscribe.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL