Documentation
¶
Index ¶
- Constants
- Variables
- func DefaultDBPath() string
- func ExtractRepoNameFromIdentity(identity string) string
- func GenerateUUID() string
- func ParseVerdict(output string) string
- type BranchListResult
- type BranchWithCount
- type Commit
- type ComponentHealth
- type DB
- func (db *DB) AddComment(commitID int64, responder, response string) (*Response, error)
- func (db *DB) AddCommentToJob(jobID int64, responder, response string) (*Response, error)
- func (db *DB) BackfillRepoIdentities() (int, error)
- func (db *DB) BackfillSourceMachineID() error
- func (db *DB) CancelJob(jobID int64) error
- func (db *DB) ClaimJob(workerID string) (*ReviewJob, error)
- func (db *DB) ClearAllSyncedAt() error
- func (db *DB) CompleteJob(jobID int64, agent, prompt, output string) error
- func (db *DB) CountStalledJobs(threshold time.Duration) (int, error)
- func (db *DB) DeleteRepo(repoID int64, cascade bool) error
- func (db *DB) EnqueueDirtyJob(repoID int64, gitRef, branch, agent, model, reasoning, diffContent string) (*ReviewJob, error)
- func (db *DB) EnqueueJob(repoID, commitID int64, gitRef, branch, agent, model, reasoning string) (*ReviewJob, error)
- func (db *DB) EnqueuePromptJob(opts PromptJobOptions) (*ReviewJob, error)
- func (db *DB) EnqueueRangeJob(repoID int64, gitRef, branch, agent, model, reasoning string) (*ReviewJob, error)
- func (db *DB) FailJob(jobID int64, errorMsg string) error
- func (db *DB) FindRepo(identifier string) (*Repo, error)
- func (db *DB) GetAllReviewsForGitRef(gitRef string) ([]Review, error)
- func (db *DB) GetCommentsForCommit(commitID int64) ([]Response, error)
- func (db *DB) GetCommentsForCommitSHA(sha string) ([]Response, error)
- func (db *DB) GetCommentsForJob(jobID int64) ([]Response, error)
- func (db *DB) GetCommentsToSync(machineID string, limit int) ([]SyncableResponse, error)
- func (db *DB) GetCommitByID(id int64) (*Commit, error)
- func (db *DB) GetCommitByRepoAndSHA(repoID int64, sha string) (*Commit, error)
- func (db *DB) GetCommitBySHA(sha string) (*Commit, error)
- func (db *DB) GetJobByID(id int64) (*ReviewJob, error)
- func (db *DB) GetJobCounts() (queued, running, done, failed, canceled int, err error)
- func (db *DB) GetJobRetryCount(jobID int64) (int, error)
- func (db *DB) GetJobsToSync(machineID string, limit int) ([]SyncableJob, error)
- func (db *DB) GetKnownJobUUIDs() ([]string, error)
- func (db *DB) GetMachineID() (string, error)
- func (db *DB) GetOrCreateCommit(repoID int64, sha, author, subject string, timestamp time.Time) (*Commit, error)
- func (db *DB) GetOrCreateCommitByRepoAndSHA(repoID int64, sha, author, subject string, timestamp time.Time) (int64, error)
- func (db *DB) GetOrCreateRepo(rootPath string, identity ...string) (*Repo, error)
- func (db *DB) GetOrCreateRepoByIdentity(identity string) (int64, error)
- func (db *DB) GetRecentReviewsForRepo(repoID int64, limit int) ([]Review, error)
- func (db *DB) GetRepoByID(id int64) (*Repo, error)
- func (db *DB) GetRepoByIdentity(identity string) (*Repo, error)
- func (db *DB) GetRepoByName(name string) (*Repo, error)
- func (db *DB) GetRepoByPath(rootPath string) (*Repo, error)
- func (db *DB) GetRepoStats(repoID int64) (*RepoStats, error)
- func (db *DB) GetReviewByCommitSHA(sha string) (*Review, error)
- func (db *DB) GetReviewByID(reviewID int64) (*Review, error)
- func (db *DB) GetReviewByJobID(jobID int64) (*Review, error)
- func (db *DB) GetReviewsToSync(machineID string, limit int) ([]SyncableReview, error)
- func (db *DB) GetSyncState(key string) (string, error)
- func (db *DB) ListBranchesWithCounts(repoPaths []string) (*BranchListResult, error)
- func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int, ...) ([]ReviewJob, error)
- func (db *DB) ListRepos() ([]Repo, error)
- func (db *DB) ListReposWithReviewCounts() ([]RepoWithCount, int, error)
- func (db *DB) ListReposWithReviewCountsByBranch(branch string) ([]RepoWithCount, int, error)
- func (db *DB) MarkCommentSynced(responseID int64) error
- func (db *DB) MarkCommentsSynced(responseIDs []int64) error
- func (db *DB) MarkJobSynced(jobID int64) error
- func (db *DB) MarkJobsSynced(jobIDs []int64) error
- func (db *DB) MarkReviewAddressed(reviewID int64, addressed bool) error
- func (db *DB) MarkReviewAddressedByJobID(jobID int64, addressed bool) error
- func (db *DB) MarkReviewSynced(reviewID int64) error
- func (db *DB) MarkReviewsSynced(reviewIDs []int64) error
- func (db *DB) MergeRepos(sourceRepoID, targetRepoID int64) (int64, error)
- func (db *DB) ReenqueueJob(jobID int64) error
- func (db *DB) RenameRepo(identifier, newName string) (int64, error)
- func (db *DB) ResetStaleJobs() error
- func (db *DB) RetryJob(jobID int64, maxRetries int) (bool, error)
- func (db *DB) SaveJobPrompt(jobID int64, prompt string) error
- func (db *DB) SetRepoIdentity(repoID int64, identity string) error
- func (db *DB) SetSyncState(key, value string) error
- func (db *DB) UpdateJobBranch(jobID int64, branch string) (int64, error)
- func (db *DB) UpsertPulledJob(j PulledJob, repoID int64, commitID *int64) error
- func (db *DB) UpsertPulledResponse(r PulledResponse) error
- func (db *DB) UpsertPulledReview(r PulledReview) error
- type DaemonStatus
- type ErrorEntry
- type HealthStatus
- type JobStatus
- type JobWithPgIDs
- type ListJobsOption
- type PgPool
- func (p *PgPool) BatchInsertResponses(ctx context.Context, responses []SyncableResponse) ([]bool, error)
- func (p *PgPool) BatchUpsertJobs(ctx context.Context, jobs []JobWithPgIDs) ([]bool, error)
- func (p *PgPool) BatchUpsertReviews(ctx context.Context, reviews []SyncableReview) ([]bool, error)
- func (p *PgPool) Close()
- func (p *PgPool) EnsureSchema(ctx context.Context) error
- func (p *PgPool) GetDatabaseID(ctx context.Context) (string, error)
- func (p *PgPool) GetOrCreateCommit(ctx context.Context, repoID int64, sha, author, subject string, ...) (int64, error)
- func (p *PgPool) GetOrCreateRepo(ctx context.Context, identity string) (int64, error)
- func (p *PgPool) InsertResponse(ctx context.Context, r SyncableResponse) error
- func (p *PgPool) Ping(ctx context.Context) error
- func (p *PgPool) Pool() *pgxpool.Pool
- func (p *PgPool) PullJobs(ctx context.Context, excludeMachineID string, cursor string, limit int) ([]PulledJob, string, error)
- func (p *PgPool) PullResponses(ctx context.Context, excludeMachineID string, afterID int64, limit int) ([]PulledResponse, int64, error)
- func (p *PgPool) PullReviews(ctx context.Context, excludeMachineID string, knownJobUUIDs []string, ...) ([]PulledReview, string, error)
- func (p *PgPool) RegisterMachine(ctx context.Context, machineID, name string) error
- func (p *PgPool) Tx(ctx context.Context, fn func(tx pgx.Tx) error) error
- func (p *PgPool) UpsertJob(ctx context.Context, j SyncableJob, pgRepoID int64, pgCommitID *int64) error
- func (p *PgPool) UpsertReview(ctx context.Context, r SyncableReview) error
- type PgPoolConfig
- type PromptJobOptions
- type PulledJob
- type PulledResponse
- type PulledReview
- type Repo
- type RepoStats
- type RepoWithCount
- type Response
- type Review
- type ReviewJob
- type SyncProgress
- type SyncStats
- type SyncWorker
- func (w *SyncWorker) FinalPush() error
- func (w *SyncWorker) HealthCheck() (healthy bool, message string)
- func (w *SyncWorker) Start() error
- func (w *SyncWorker) Stop()
- func (w *SyncWorker) SyncNow() (*SyncStats, error)
- func (w *SyncWorker) SyncNowWithProgress(progressFn func(SyncProgress)) (*SyncStats, error)
- type SyncableJob
- type SyncableResponse
- type SyncableReview
Constants ¶
const ( SyncStateMachineID = "machine_id" SyncStateLastJobCursor = "last_job_cursor" // ID of last synced job SyncStateLastReviewCursor = "last_review_cursor" // Composite cursor for reviews (updated_at,id) SyncStateLastResponseID = "last_response_id" // ID of last synced response SyncStateSyncTargetID = "sync_target_id" // Database ID of last synced Postgres )
Sync state keys
Variables ¶
var ErrAmbiguousCommit = sql.ErrNoRows // Use sql.ErrNoRows for API compatibility; callers can check message
ErrAmbiguousCommit is returned when a SHA lookup matches multiple repos
var ErrRepoHasJobs = errors.New("repository has existing jobs; use cascade to delete them")
ErrRepoHasJobs is returned when trying to delete a repo with jobs without cascade
Functions ¶
func ExtractRepoNameFromIdentity ¶ added in v0.18.0
ExtractRepoNameFromIdentity extracts a human-readable name from a git identity. Examples:
- "git@github.com:org/repo.git" -> "repo"
- "https://github.com/org/my-project.git" -> "my-project"
- "https://github.com/org/repo" -> "repo"
- "" -> "unknown"
func GenerateUUID ¶
func GenerateUUID() string
GenerateUUID generates a random UUID v4 string. Uses crypto/rand for secure random generation.
func ParseVerdict ¶
ParseVerdict extracts P (pass) or F (fail) from review output. Returns "P" only if a clear pass indicator appears at the start of a line. Rejects lines containing caveats like "but", "however", "except". Also fails if severity labels (Critical/High/Medium/Low) indicate findings.
Types ¶
type BranchListResult ¶ added in v0.19.0
type BranchListResult struct {
Branches []BranchWithCount
TotalCount int
NullsRemaining int // Number of jobs with NULL/empty branch (for backfill tracking)
}
BranchListResult contains branches with counts and metadata
type BranchWithCount ¶ added in v0.19.0
BranchWithCount represents a branch with its total job count
type ComponentHealth ¶
type ComponentHealth struct {
Name string `json:"name"`
Healthy bool `json:"healthy"`
Message string `json:"message,omitempty"`
}
ComponentHealth represents the health of a single component
type DB ¶
func (*DB) AddComment ¶ added in v0.17.0
AddComment adds a comment to a commit (legacy - use AddCommentToJob for new code)
func (*DB) AddCommentToJob ¶ added in v0.17.0
AddCommentToJob adds a comment linked to a job/review
func (*DB) BackfillRepoIdentities ¶
BackfillRepoIdentities computes and sets identity for repos that don't have one. Uses config.ResolveRepoIdentity to ensure consistency with new repo creation. Returns the number of repos backfilled.
func (*DB) BackfillSourceMachineID ¶
BackfillSourceMachineID sets source_machine_id on existing rows that don't have one. This should be called when sync is first enabled.
func (*DB) ClearAllSyncedAt ¶
ClearAllSyncedAt clears all synced_at timestamps in the database. This is used when syncing to a new Postgres database to ensure all data gets re-synced.
func (*DB) CompleteJob ¶
CompleteJob marks a job as done and stores the review. Only updates if job is still in 'running' state (respects cancellation). If the job has an output_prefix, it will be prepended to the output.
func (*DB) CountStalledJobs ¶
CountStalledJobs returns the number of jobs that have been running longer than the threshold
func (*DB) DeleteRepo ¶
DeleteRepo deletes a repo and optionally its associated data If cascade is true, also deletes all jobs, reviews, and responses for the repo If cascade is false and jobs exist, returns ErrRepoHasJobs
func (*DB) EnqueueDirtyJob ¶
func (db *DB) EnqueueDirtyJob(repoID int64, gitRef, branch, agent, model, reasoning, diffContent string) (*ReviewJob, error)
EnqueueDirtyJob creates a new review job for uncommitted (dirty) changes. The diff is captured at enqueue time since the working tree may change.
func (*DB) EnqueueJob ¶
func (db *DB) EnqueueJob(repoID, commitID int64, gitRef, branch, agent, model, reasoning string) (*ReviewJob, error)
EnqueueJob creates a new review job for a single commit
func (*DB) EnqueuePromptJob ¶
func (db *DB) EnqueuePromptJob(opts PromptJobOptions) (*ReviewJob, error)
EnqueuePromptJob creates a new job with a custom prompt (not a git review). The prompt is stored at enqueue time and used directly by the worker.
func (*DB) EnqueueRangeJob ¶
func (db *DB) EnqueueRangeJob(repoID int64, gitRef, branch, agent, model, reasoning string) (*ReviewJob, error)
EnqueueRangeJob creates a new review job for a commit range
func (*DB) FailJob ¶
FailJob marks a job as failed with an error message. Only updates if job is still in 'running' state (respects cancellation).
func (*DB) GetAllReviewsForGitRef ¶ added in v0.14.0
GetAllReviewsForGitRef returns all reviews for a git ref (commit SHA or range) for re-review context
func (*DB) GetCommentsForCommit ¶ added in v0.17.0
GetCommentsForCommit returns all comments for a commit
func (*DB) GetCommentsForCommitSHA ¶ added in v0.17.0
GetCommentsForCommitSHA returns all comments for a commit by SHA
func (*DB) GetCommentsForJob ¶ added in v0.17.0
GetCommentsForJob returns all comments linked to a job
func (*DB) GetCommentsToSync ¶ added in v0.17.0
func (db *DB) GetCommentsToSync(machineID string, limit int) ([]SyncableResponse, error)
GetCommentsToSync returns comments created locally that need to be pushed. Only returns comments whose parent job has already been synced.
func (*DB) GetCommitByID ¶
GetCommitByID returns a commit by its ID
func (*DB) GetCommitByRepoAndSHA ¶
GetCommitByRepoAndSHA returns a commit by repo ID and SHA
func (*DB) GetCommitBySHA ¶
GetCommitBySHA returns a commit by its SHA. DEPRECATED: This is a legacy API that doesn't handle the same SHA in different repos. Returns sql.ErrNoRows if no commit found, or if multiple repos have this SHA (ambiguous). Prefer using GetCommitByRepoAndSHA or job-based lookups instead.
func (*DB) GetJobByID ¶
GetJobByID returns a job by ID with joined fields
func (*DB) GetJobCounts ¶
GetJobCounts returns counts of jobs by status
func (*DB) GetJobRetryCount ¶
GetJobRetryCount returns the retry count for a job
func (*DB) GetJobsToSync ¶
func (db *DB) GetJobsToSync(machineID string, limit int) ([]SyncableJob, error)
GetJobsToSync returns terminal jobs that need to be pushed to PostgreSQL. These are jobs created locally that haven't been synced or were updated since last sync.
func (*DB) GetKnownJobUUIDs ¶
GetKnownJobUUIDs returns UUIDs of all jobs that have a UUID. Used to filter reviews when pulling from PostgreSQL.
func (*DB) GetMachineID ¶
GetMachineID returns this machine's unique identifier, creating one if it doesn't exist. Uses INSERT OR IGNORE + SELECT to ensure concurrency-safe behavior. Treats empty values as missing and regenerates.
func (*DB) GetOrCreateCommit ¶
func (db *DB) GetOrCreateCommit(repoID int64, sha, author, subject string, timestamp time.Time) (*Commit, error)
GetOrCreateCommit finds or creates a commit record. Lookups are by (repo_id, sha) to handle the same SHA in different repos.
func (*DB) GetOrCreateCommitByRepoAndSHA ¶
func (db *DB) GetOrCreateCommitByRepoAndSHA(repoID int64, sha, author, subject string, timestamp time.Time) (int64, error)
GetOrCreateCommitByRepoAndSHA finds or creates a commit.
func (*DB) GetOrCreateRepo ¶
GetOrCreateRepo finds or creates a repo by its root path. If identity is provided, it will be stored; otherwise the identity field remains NULL.
func (*DB) GetOrCreateRepoByIdentity ¶
GetOrCreateRepoByIdentity finds or creates a repo for syncing by identity. The logic is:
- If exactly one local repo has this identity, use it (always preferred)
- If a placeholder repo exists (root_path == identity), use it
- If 0 or 2+ local repos have this identity, create a placeholder
This ensures synced jobs attach to the right repo:
- Single clone: jobs attach directly to the local repo
- Multiple clones: jobs attach to a neutral placeholder
- No local clone: placeholder serves as a sync-only repo
Note: Single local repos are always preferred, even if a placeholder exists from a previous sync (e.g., when there were 0 or 2+ clones before).
func (*DB) GetRecentReviewsForRepo ¶
GetRecentReviewsForRepo returns the N most recent reviews for a repo
func (*DB) GetRepoByID ¶
GetRepoByID returns a repo by its ID
func (*DB) GetRepoByIdentity ¶
GetRepoByIdentity finds a repo by its identity. Returns nil if not found, error if duplicates exist.
func (*DB) GetRepoByName ¶
GetRepoByName returns a repo by its display name
func (*DB) GetRepoByPath ¶
GetRepoByPath returns a repo by its path
func (*DB) GetRepoStats ¶
GetRepoStats returns detailed statistics for a repo
func (*DB) GetReviewByCommitSHA ¶
GetReviewByCommitSHA finds the most recent review by commit SHA (searches git_ref field)
func (*DB) GetReviewByID ¶
GetReviewByID finds a review by its ID
func (*DB) GetReviewByJobID ¶
GetReviewByJobID finds a review by its job ID
func (*DB) GetReviewsToSync ¶
func (db *DB) GetReviewsToSync(machineID string, limit int) ([]SyncableReview, error)
GetReviewsToSync returns reviews modified locally that need to be pushed. Only returns reviews whose parent job has already been synced.
func (*DB) GetSyncState ¶
GetSyncState retrieves a value from the sync_state table. Returns empty string if key doesn't exist.
func (*DB) ListBranchesWithCounts ¶ added in v0.19.0
func (db *DB) ListBranchesWithCounts(repoPaths []string) (*BranchListResult, error)
ListBranchesWithCounts returns all branches with their job counts If repoPaths is non-empty, filters to jobs in those repos only
func (*DB) ListJobs ¶
func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int, opts ...ListJobsOption) ([]ReviewJob, error)
ListJobs returns jobs with optional status, repo, branch, and addressed filters. addressedFilter: nil = no filter, non-nil bool = filter by addressed state.
func (*DB) ListReposWithReviewCounts ¶
func (db *DB) ListReposWithReviewCounts() ([]RepoWithCount, int, error)
ListReposWithReviewCounts returns all repos with their total job counts
func (*DB) ListReposWithReviewCountsByBranch ¶ added in v0.19.0
func (db *DB) ListReposWithReviewCountsByBranch(branch string) ([]RepoWithCount, int, error)
ListReposWithReviewCountsByBranch returns repos filtered by branch with their job counts If branch is empty, returns all repos. Use "(none)" to filter for jobs without a branch.
func (*DB) MarkCommentSynced ¶ added in v0.17.0
MarkCommentSynced updates the synced_at timestamp for a comment
func (*DB) MarkCommentsSynced ¶ added in v0.17.0
MarkCommentsSynced updates the synced_at timestamp for multiple comments
func (*DB) MarkJobSynced ¶
MarkJobSynced updates the synced_at timestamp for a job
func (*DB) MarkJobsSynced ¶
MarkJobsSynced updates the synced_at timestamp for multiple jobs
func (*DB) MarkReviewAddressed ¶
MarkReviewAddressed marks a review as addressed (or unaddressed) by review ID
func (*DB) MarkReviewAddressedByJobID ¶ added in v0.18.0
MarkReviewAddressedByJobID marks a review as addressed (or unaddressed) by job ID
func (*DB) MarkReviewSynced ¶
MarkReviewSynced updates the synced_at timestamp for a review
func (*DB) MarkReviewsSynced ¶
MarkReviewsSynced updates the synced_at timestamp for multiple reviews
func (*DB) MergeRepos ¶
MergeRepos moves all jobs and commits from sourceRepoID to targetRepoID, then deletes the source repo
func (*DB) ReenqueueJob ¶
ReenqueueJob resets a completed, failed, or canceled job back to queued status. This allows manual re-running of jobs to get a fresh review. For done jobs, the existing review is deleted to avoid unique constraint violations.
func (*DB) RenameRepo ¶
RenameRepo updates the display name of a repo identified by its path or current name
func (*DB) ResetStaleJobs ¶
ResetStaleJobs marks all running jobs as queued (for daemon restart)
func (*DB) RetryJob ¶
RetryJob atomically resets a running job to queued for retry. Returns false if max retries reached or job is not in running state. maxRetries is the number of retries allowed (e.g., 3 means up to 4 total attempts).
func (*DB) SaveJobPrompt ¶
SaveJobPrompt stores the prompt for a running job
func (*DB) SetRepoIdentity ¶
SetRepoIdentity sets the identity for a repo.
func (*DB) SetSyncState ¶
SetSyncState sets a value in the sync_state table (upsert).
func (*DB) UpdateJobBranch ¶ added in v0.19.0
UpdateJobBranch sets the branch field for a job that doesn't have one. This is used to backfill the branch when it's derived from git. Only updates if the current branch is NULL or empty. Returns the number of rows affected (0 if branch was already set or job not found, 1 if updated).
func (*DB) UpsertPulledJob ¶
UpsertPulledJob inserts or updates a job from PostgreSQL into SQLite. Sets synced_at to prevent re-pushing. Requires repo to exist.
func (*DB) UpsertPulledResponse ¶
func (db *DB) UpsertPulledResponse(r PulledResponse) error
UpsertPulledResponse inserts a response from PostgreSQL into SQLite.
func (*DB) UpsertPulledReview ¶
func (db *DB) UpsertPulledReview(r PulledReview) error
UpsertPulledReview inserts or updates a review from PostgreSQL into SQLite.
type DaemonStatus ¶
type DaemonStatus struct {
Version string `json:"version"`
QueuedJobs int `json:"queued_jobs"`
RunningJobs int `json:"running_jobs"`
CompletedJobs int `json:"completed_jobs"`
FailedJobs int `json:"failed_jobs"`
CanceledJobs int `json:"canceled_jobs"`
ActiveWorkers int `json:"active_workers"`
MaxWorkers int `json:"max_workers"`
MachineID string `json:"machine_id,omitempty"` // Local machine ID for remote job detection
ConfigReloadedAt string `json:"config_reloaded_at,omitempty"` // Last config reload timestamp (RFC3339Nano)
ConfigReloadCounter uint64 `json:"config_reload_counter,omitempty"` // Monotonic reload counter (for sub-second detection)
}
type ErrorEntry ¶
type ErrorEntry struct {
Timestamp time.Time `json:"ts"`
Level string `json:"level"`
Component string `json:"component"`
Message string `json:"message"`
JobID int64 `json:"job_id,omitempty"`
}
ErrorEntry represents a single error log entry (mirrors daemon.ErrorEntry for API)
type HealthStatus ¶
type HealthStatus struct {
Healthy bool `json:"healthy"`
Uptime string `json:"uptime"`
Version string `json:"version"`
Components []ComponentHealth `json:"components"`
RecentErrors []ErrorEntry `json:"recent_errors"`
ErrorCount int `json:"error_count_24h"`
}
HealthStatus represents the overall daemon health
type JobWithPgIDs ¶
type JobWithPgIDs struct {
Job SyncableJob
PgRepoID int64
PgCommitID *int64
}
JobWithPgIDs represents a job with its resolved PostgreSQL repo and commit IDs
type ListJobsOption ¶ added in v0.21.0
type ListJobsOption func(*listJobsOptions)
ListJobsOption configures optional filters for ListJobs.
func WithAddressed ¶ added in v0.21.0
func WithAddressed(addressed bool) ListJobsOption
WithAddressed filters jobs by addressed state (true/false).
func WithBranch ¶ added in v0.21.0
func WithBranch(branch string) ListJobsOption
WithBranch filters jobs by branch name.
func WithGitRef ¶ added in v0.21.0
func WithGitRef(ref string) ListJobsOption
WithGitRef filters jobs by git ref.
type PgPool ¶
type PgPool struct {
// contains filtered or unexported fields
}
PgPool wraps a pgx connection pool with reconnection logic
func NewPgPool ¶
NewPgPool creates a new PostgreSQL connection pool. The connection string should be a PostgreSQL URL like: postgres://user:pass@host:port/dbname?sslmode=disable
func (*PgPool) BatchInsertResponses ¶
func (p *PgPool) BatchInsertResponses(ctx context.Context, responses []SyncableResponse) ([]bool, error)
BatchInsertResponses inserts multiple responses in a single batch operation. Returns a boolean slice indicating success/failure for each item at the corresponding index.
func (*PgPool) BatchUpsertJobs ¶
BatchUpsertJobs inserts or updates multiple jobs in a single batch operation. The jobs must have their PgRepoID and PgCommitID already resolved. Returns a boolean slice indicating success/failure for each item at the corresponding index.
func (*PgPool) BatchUpsertReviews ¶
BatchUpsertReviews inserts or updates multiple reviews in a single batch operation. Returns a boolean slice indicating success/failure for each item at the corresponding index.
func (*PgPool) EnsureSchema ¶
EnsureSchema creates the schema if it doesn't exist and checks version. If legacy tables exist in the public schema, they are migrated to roborev.
func (*PgPool) GetDatabaseID ¶
GetDatabaseID returns the unique ID for this Postgres database. Creates one if it doesn't exist. This ID is used to detect when a client is syncing to a different database than before.
func (*PgPool) GetOrCreateCommit ¶
func (p *PgPool) GetOrCreateCommit(ctx context.Context, repoID int64, sha, author, subject string, timestamp time.Time) (int64, error)
GetOrCreateCommit finds or creates a commit, returns the PostgreSQL ID
func (*PgPool) GetOrCreateRepo ¶
GetOrCreateRepo finds or creates a repo by identity, returns the PostgreSQL ID
func (*PgPool) InsertResponse ¶
func (p *PgPool) InsertResponse(ctx context.Context, r SyncableResponse) error
InsertResponse inserts a response in PostgreSQL (append-only, no updates)
func (*PgPool) PullJobs ¶
func (p *PgPool) PullJobs(ctx context.Context, excludeMachineID string, cursor string, limit int) ([]PulledJob, string, error)
PullJobs fetches jobs from PostgreSQL updated after the given cursor. Cursor format: "updated_at id" (space-separated) or empty for first pull. Returns jobs not from the given machineID (to avoid echo).
func (*PgPool) PullResponses ¶
func (p *PgPool) PullResponses(ctx context.Context, excludeMachineID string, afterID int64, limit int) ([]PulledResponse, int64, error)
PullResponses fetches responses from PostgreSQL created after the given ID cursor.
func (*PgPool) PullReviews ¶
func (p *PgPool) PullReviews(ctx context.Context, excludeMachineID string, knownJobUUIDs []string, cursor string, limit int) ([]PulledReview, string, error)
PullReviews fetches reviews from PostgreSQL updated after the given cursor. Only fetches reviews for jobs in knownJobUUIDs to avoid cursor advancement past unknown jobs.
func (*PgPool) RegisterMachine ¶
RegisterMachine registers or updates this machine in the machines table
func (*PgPool) UpsertJob ¶
func (p *PgPool) UpsertJob(ctx context.Context, j SyncableJob, pgRepoID int64, pgCommitID *int64) error
UpsertJob inserts or updates a job in PostgreSQL
func (*PgPool) UpsertReview ¶
func (p *PgPool) UpsertReview(ctx context.Context, r SyncableReview) error
UpsertReview inserts or updates a review in PostgreSQL
type PgPoolConfig ¶
type PgPoolConfig struct {
// ConnectTimeout is the timeout for initial connection (default: 5s)
ConnectTimeout time.Duration
// MaxConns is the maximum number of connections (default: 4)
MaxConns int32
// MinConns is the minimum number of connections (default: 0)
MinConns int32
// MaxConnLifetime is the maximum lifetime of a connection (default: 1h)
MaxConnLifetime time.Duration
// MaxConnIdleTime is the maximum idle time before closing (default: 30m)
MaxConnIdleTime time.Duration
}
PgPoolConfig configures the PostgreSQL connection pool
func DefaultPgPoolConfig ¶
func DefaultPgPoolConfig() PgPoolConfig
DefaultPgPoolConfig returns sensible defaults for the connection pool
type PromptJobOptions ¶ added in v0.20.0
type PromptJobOptions struct {
RepoID int64
Branch string
Agent string
Model string
Reasoning string
Prompt string
OutputPrefix string // Prefix to prepend to review output (e.g., file paths)
Agentic bool // Allow file edits and command execution
Label string // Display label in TUI (default: "prompt")
}
PromptJobOptions contains options for creating a prompt-based job.
type PulledJob ¶
type PulledJob struct {
UUID string
RepoIdentity string
CommitSHA string
CommitAuthor string
CommitSubject string
CommitTimestamp time.Time
GitRef string
Agent string
Model string
Reasoning string
Status string
Agentic bool
EnqueuedAt time.Time
StartedAt *time.Time
FinishedAt *time.Time
Prompt string
DiffContent *string
Error string
SourceMachineID string
UpdatedAt time.Time
}
PulledJob represents a job pulled from PostgreSQL
type PulledResponse ¶
type PulledResponse struct {
UUID string
JobUUID string
Responder string
Response string
SourceMachineID string
CreatedAt time.Time
}
PulledResponse represents a response pulled from PostgreSQL
type PulledReview ¶
type PulledReview struct {
UUID string
JobUUID string
Agent string
Prompt string
Output string
Addressed bool
UpdatedByMachineID string
CreatedAt time.Time
UpdatedAt time.Time
}
PulledReview represents a review pulled from PostgreSQL
type RepoStats ¶
type RepoStats struct {
Repo *Repo
TotalJobs int
QueuedJobs int
RunningJobs int
CompletedJobs int
FailedJobs int
PassedReviews int
FailedReviews int
AddressedReviews int
UnaddressedReviews int
}
RepoStats contains statistics for a single repo
type RepoWithCount ¶
type RepoWithCount struct {
Name string `json:"name"`
RootPath string `json:"root_path"`
Count int `json:"count"`
}
RepoWithCount represents a repo with its total job count
type Response ¶
type Response struct {
ID int64 `json:"id"`
CommitID *int64 `json:"commit_id,omitempty"` // For commit-based responses (legacy)
JobID *int64 `json:"job_id,omitempty"` // For job/review-based responses
Responder string `json:"responder"`
Response string `json:"response"`
CreatedAt time.Time `json:"created_at"`
// Sync fields
UUID string `json:"uuid,omitempty"` // Globally unique identifier for sync
SourceMachineID string `json:"source_machine_id,omitempty"` // Machine that created this response
SyncedAt *time.Time `json:"synced_at,omitempty"` // Last sync time
}
type Review ¶
type Review struct {
ID int64 `json:"id"`
JobID int64 `json:"job_id"`
Agent string `json:"agent"`
Prompt string `json:"prompt"`
Output string `json:"output"`
CreatedAt time.Time `json:"created_at"`
Addressed bool `json:"addressed"`
// Sync fields
UUID string `json:"uuid,omitempty"` // Globally unique identifier for sync
UpdatedAt *time.Time `json:"updated_at,omitempty"` // Last modification time
UpdatedByMachineID string `json:"updated_by_machine_id,omitempty"` // Machine that last modified this review
SyncedAt *time.Time `json:"synced_at,omitempty"` // Last sync time
// Joined fields
Job *ReviewJob `json:"job,omitempty"`
}
type ReviewJob ¶
type ReviewJob struct {
ID int64 `json:"id"`
RepoID int64 `json:"repo_id"`
CommitID *int64 `json:"commit_id,omitempty"` // nil for ranges
GitRef string `json:"git_ref"` // SHA or "start..end" for ranges
Branch string `json:"branch,omitempty"` // Branch name at time of job creation
Agent string `json:"agent"`
Model string `json:"model,omitempty"` // Model to use (for opencode: provider/model format)
Reasoning string `json:"reasoning,omitempty"` // thorough, standard, fast (default: thorough)
Status JobStatus `json:"status"`
EnqueuedAt time.Time `json:"enqueued_at"`
StartedAt *time.Time `json:"started_at,omitempty"`
FinishedAt *time.Time `json:"finished_at,omitempty"`
WorkerID string `json:"worker_id,omitempty"`
Error string `json:"error,omitempty"`
Prompt string `json:"prompt,omitempty"`
RetryCount int `json:"retry_count"`
DiffContent *string `json:"diff_content,omitempty"` // For dirty reviews (uncommitted changes)
Agentic bool `json:"agentic"` // Enable agentic mode (allow file edits)
OutputPrefix string `json:"output_prefix,omitempty"` // Prefix to prepend to review output
// Sync fields
UUID string `json:"uuid,omitempty"` // Globally unique identifier for sync
SourceMachineID string `json:"source_machine_id,omitempty"` // Machine that created this job
UpdatedAt *time.Time `json:"updated_at,omitempty"` // Last modification time
SyncedAt *time.Time `json:"synced_at,omitempty"` // Last sync time
// Joined fields for convenience
RepoPath string `json:"repo_path,omitempty"`
RepoName string `json:"repo_name,omitempty"`
CommitSubject string `json:"commit_subject,omitempty"` // empty for ranges
Addressed *bool `json:"addressed,omitempty"` // nil if no review yet
Verdict *string `json:"verdict,omitempty"` // P/F parsed from review output
}
type SyncProgress ¶
type SyncProgress struct {
Phase string // "push" or "pull"
BatchNum int
BatchJobs int
BatchRevs int
BatchResps int
TotalJobs int
TotalRevs int
TotalResps int
}
SyncProgress reports progress during a sync operation
type SyncStats ¶
type SyncStats struct {
PushedJobs int
PushedReviews int
PushedResponses int
PulledJobs int
PulledReviews int
PulledResponses int
}
SyncStats contains statistics from a sync operation
type SyncWorker ¶
type SyncWorker struct {
// contains filtered or unexported fields
}
SyncWorker handles background synchronization between SQLite and PostgreSQL
func NewSyncWorker ¶
func NewSyncWorker(db *DB, cfg config.SyncConfig) *SyncWorker
NewSyncWorker creates a new sync worker
func (*SyncWorker) FinalPush ¶
func (w *SyncWorker) FinalPush() error
FinalPush performs a push-only sync for graceful shutdown. This ensures all local changes are pushed before the daemon exits. Loops until all pending items are synced (not just one batch). Does not pull changes since we're shutting down.
func (*SyncWorker) HealthCheck ¶
func (w *SyncWorker) HealthCheck() (healthy bool, message string)
HealthCheck returns the health status of the sync worker
func (*SyncWorker) Start ¶
func (w *SyncWorker) Start() error
Start begins the sync worker in a background goroutine. The worker can be stopped with Stop() and restarted with Start().
func (*SyncWorker) SyncNow ¶
func (w *SyncWorker) SyncNow() (*SyncStats, error)
SyncNow triggers an immediate sync cycle and returns statistics. Returns an error if the worker is not running or not connected. Loops until all pending items are pushed (not just one batch).
func (*SyncWorker) SyncNowWithProgress ¶
func (w *SyncWorker) SyncNowWithProgress(progressFn func(SyncProgress)) (*SyncStats, error)
SyncNowWithProgress is like SyncNow but calls progressFn after each batch. If not connected, attempts to connect first.
type SyncableJob ¶
type SyncableJob struct {
ID int64
UUID string
RepoID int64
RepoIdentity string
CommitID *int64
CommitSHA string
CommitAuthor string
CommitSubject string
CommitTimestamp time.Time
GitRef string
Agent string
Model string
Reasoning string
Status string
Agentic bool
EnqueuedAt time.Time
StartedAt *time.Time
FinishedAt *time.Time
Prompt string
DiffContent *string
Error string
SourceMachineID string
UpdatedAt time.Time
}
SyncableJob contains job data needed for sync