storage

package
v0.18.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 26, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SyncStateMachineID        = "machine_id"
	SyncStateLastJobCursor    = "last_job_cursor"    // ID of last synced job
	SyncStateLastReviewCursor = "last_review_cursor" // Composite cursor for reviews (updated_at,id)
	SyncStateLastResponseID   = "last_response_id"   // ID of last synced response
	SyncStateSyncTargetID     = "sync_target_id"     // Database ID of last synced Postgres
)

Sync state keys

Variables

View Source
var ErrAmbiguousCommit = sql.ErrNoRows // Use sql.ErrNoRows for API compatibility; callers can check message

ErrAmbiguousCommit is returned when a SHA lookup matches multiple repos

View Source
var ErrRepoHasJobs = errors.New("repository has existing jobs; use cascade to delete them")

ErrRepoHasJobs is returned when trying to delete a repo with jobs without cascade

Functions

func DefaultDBPath

func DefaultDBPath() string

DefaultDBPath returns the default database path

func ExtractRepoNameFromIdentity added in v0.18.0

func ExtractRepoNameFromIdentity(identity string) string

ExtractRepoNameFromIdentity extracts a human-readable name from a git identity. Examples:

func GenerateUUID

func GenerateUUID() string

GenerateUUID generates a random UUID v4 string. Uses crypto/rand for secure random generation.

func ParseVerdict

func ParseVerdict(output string) string

ParseVerdict extracts P (pass) or F (fail) from review output. Returns "P" only if a clear pass indicator appears at the start of a line. Rejects lines containing caveats like "but", "however", "except".

Types

type Commit

type Commit struct {
	ID        int64     `json:"id"`
	RepoID    int64     `json:"repo_id"`
	SHA       string    `json:"sha"`
	Author    string    `json:"author"`
	Subject   string    `json:"subject"`
	Timestamp time.Time `json:"timestamp"`
	CreatedAt time.Time `json:"created_at"`
}

type ComponentHealth

type ComponentHealth struct {
	Name    string `json:"name"`
	Healthy bool   `json:"healthy"`
	Message string `json:"message,omitempty"`
}

ComponentHealth represents the health of a single component

type DB

type DB struct {
	*sql.DB
}

func Open

func Open(dbPath string) (*DB, error)

Open opens or creates the database at the given path

func (*DB) AddComment added in v0.17.0

func (db *DB) AddComment(commitID int64, responder, response string) (*Response, error)

AddComment adds a comment to a commit (legacy - use AddCommentToJob for new code)

func (*DB) AddCommentToJob added in v0.17.0

func (db *DB) AddCommentToJob(jobID int64, responder, response string) (*Response, error)

AddCommentToJob adds a comment linked to a job/review

func (*DB) BackfillRepoIdentities

func (db *DB) BackfillRepoIdentities() (int, error)

BackfillRepoIdentities computes and sets identity for repos that don't have one. Uses config.ResolveRepoIdentity to ensure consistency with new repo creation. Returns the number of repos backfilled.

func (*DB) BackfillSourceMachineID

func (db *DB) BackfillSourceMachineID() error

BackfillSourceMachineID sets source_machine_id on existing rows that don't have one. This should be called when sync is first enabled.

func (*DB) CancelJob

func (db *DB) CancelJob(jobID int64) error

CancelJob marks a running or queued job as canceled

func (*DB) ClaimJob

func (db *DB) ClaimJob(workerID string) (*ReviewJob, error)

ClaimJob atomically claims the next queued job for a worker

func (*DB) ClearAllSyncedAt

func (db *DB) ClearAllSyncedAt() error

ClearAllSyncedAt clears all synced_at timestamps in the database. This is used when syncing to a new Postgres database to ensure all data gets re-synced.

func (*DB) CompleteJob

func (db *DB) CompleteJob(jobID int64, agent, prompt, output string) error

CompleteJob marks a job as done and stores the review. Only updates if job is still in 'running' state (respects cancellation).

func (*DB) CountStalledJobs

func (db *DB) CountStalledJobs(threshold time.Duration) (int, error)

CountStalledJobs returns the number of jobs that have been running longer than the threshold

func (*DB) DeleteRepo

func (db *DB) DeleteRepo(repoID int64, cascade bool) error

DeleteRepo deletes a repo and optionally its associated data If cascade is true, also deletes all jobs, reviews, and responses for the repo If cascade is false and jobs exist, returns ErrRepoHasJobs

func (*DB) EnqueueDirtyJob

func (db *DB) EnqueueDirtyJob(repoID int64, gitRef, agent, model, reasoning, diffContent string) (*ReviewJob, error)

EnqueueDirtyJob creates a new review job for uncommitted (dirty) changes. The diff is captured at enqueue time since the working tree may change.

func (*DB) EnqueueJob

func (db *DB) EnqueueJob(repoID, commitID int64, gitRef, agent, model, reasoning string) (*ReviewJob, error)

EnqueueJob creates a new review job for a single commit

func (*DB) EnqueuePromptJob

func (db *DB) EnqueuePromptJob(repoID int64, agent, model, reasoning, customPrompt string, agentic bool) (*ReviewJob, error)

EnqueuePromptJob creates a new job with a custom prompt (not a git review). The prompt is stored at enqueue time and used directly by the worker. If agentic is true, the agent will be allowed to edit files and run commands.

func (*DB) EnqueueRangeJob

func (db *DB) EnqueueRangeJob(repoID int64, gitRef, agent, model, reasoning string) (*ReviewJob, error)

EnqueueRangeJob creates a new review job for a commit range

func (*DB) FailJob

func (db *DB) FailJob(jobID int64, errorMsg string) error

FailJob marks a job as failed with an error message. Only updates if job is still in 'running' state (respects cancellation).

func (*DB) FindRepo

func (db *DB) FindRepo(identifier string) (*Repo, error)

FindRepo finds a repo by path or name (tries path first, then name)

func (*DB) GetAllReviewsForGitRef added in v0.14.0

func (db *DB) GetAllReviewsForGitRef(gitRef string) ([]Review, error)

GetAllReviewsForGitRef returns all reviews for a git ref (commit SHA or range) for re-review context

func (*DB) GetCommentsForCommit added in v0.17.0

func (db *DB) GetCommentsForCommit(commitID int64) ([]Response, error)

GetCommentsForCommit returns all comments for a commit

func (*DB) GetCommentsForCommitSHA added in v0.17.0

func (db *DB) GetCommentsForCommitSHA(sha string) ([]Response, error)

GetCommentsForCommitSHA returns all comments for a commit by SHA

func (*DB) GetCommentsForJob added in v0.17.0

func (db *DB) GetCommentsForJob(jobID int64) ([]Response, error)

GetCommentsForJob returns all comments linked to a job

func (*DB) GetCommentsToSync added in v0.17.0

func (db *DB) GetCommentsToSync(machineID string, limit int) ([]SyncableResponse, error)

GetCommentsToSync returns comments created locally that need to be pushed. Only returns comments whose parent job has already been synced.

func (*DB) GetCommitByID

func (db *DB) GetCommitByID(id int64) (*Commit, error)

GetCommitByID returns a commit by its ID

func (*DB) GetCommitByRepoAndSHA

func (db *DB) GetCommitByRepoAndSHA(repoID int64, sha string) (*Commit, error)

GetCommitByRepoAndSHA returns a commit by repo ID and SHA

func (*DB) GetCommitBySHA

func (db *DB) GetCommitBySHA(sha string) (*Commit, error)

GetCommitBySHA returns a commit by its SHA. DEPRECATED: This is a legacy API that doesn't handle the same SHA in different repos. Returns sql.ErrNoRows if no commit found, or if multiple repos have this SHA (ambiguous). Prefer using GetCommitByRepoAndSHA or job-based lookups instead.

func (*DB) GetJobByID

func (db *DB) GetJobByID(id int64) (*ReviewJob, error)

GetJobByID returns a job by ID with joined fields

func (*DB) GetJobCounts

func (db *DB) GetJobCounts() (queued, running, done, failed, canceled int, err error)

GetJobCounts returns counts of jobs by status

func (*DB) GetJobRetryCount

func (db *DB) GetJobRetryCount(jobID int64) (int, error)

GetJobRetryCount returns the retry count for a job

func (*DB) GetJobsToSync

func (db *DB) GetJobsToSync(machineID string, limit int) ([]SyncableJob, error)

GetJobsToSync returns terminal jobs that need to be pushed to PostgreSQL. These are jobs created locally that haven't been synced or were updated since last sync.

func (*DB) GetKnownJobUUIDs

func (db *DB) GetKnownJobUUIDs() ([]string, error)

GetKnownJobUUIDs returns UUIDs of all jobs that have a UUID. Used to filter reviews when pulling from PostgreSQL.

func (*DB) GetMachineID

func (db *DB) GetMachineID() (string, error)

GetMachineID returns this machine's unique identifier, creating one if it doesn't exist. Uses INSERT OR IGNORE + SELECT to ensure concurrency-safe behavior. Treats empty values as missing and regenerates.

func (*DB) GetOrCreateCommit

func (db *DB) GetOrCreateCommit(repoID int64, sha, author, subject string, timestamp time.Time) (*Commit, error)

GetOrCreateCommit finds or creates a commit record. Lookups are by (repo_id, sha) to handle the same SHA in different repos.

func (*DB) GetOrCreateCommitByRepoAndSHA

func (db *DB) GetOrCreateCommitByRepoAndSHA(repoID int64, sha, author, subject string, timestamp time.Time) (int64, error)

GetOrCreateCommitByRepoAndSHA finds or creates a commit.

func (*DB) GetOrCreateRepo

func (db *DB) GetOrCreateRepo(rootPath string, identity ...string) (*Repo, error)

GetOrCreateRepo finds or creates a repo by its root path. If identity is provided, it will be stored; otherwise the identity field remains NULL.

func (*DB) GetOrCreateRepoByIdentity

func (db *DB) GetOrCreateRepoByIdentity(identity string) (int64, error)

GetOrCreateRepoByIdentity finds or creates a repo for syncing by identity. The logic is:

  1. If exactly one local repo has this identity, use it (always preferred)
  2. If a placeholder repo exists (root_path == identity), use it
  3. If 0 or 2+ local repos have this identity, create a placeholder

This ensures synced jobs attach to the right repo:

  • Single clone: jobs attach directly to the local repo
  • Multiple clones: jobs attach to a neutral placeholder
  • No local clone: placeholder serves as a sync-only repo

Note: Single local repos are always preferred, even if a placeholder exists from a previous sync (e.g., when there were 0 or 2+ clones before).

func (*DB) GetRecentReviewsForRepo

func (db *DB) GetRecentReviewsForRepo(repoID int64, limit int) ([]Review, error)

GetRecentReviewsForRepo returns the N most recent reviews for a repo

func (*DB) GetRepoByID

func (db *DB) GetRepoByID(id int64) (*Repo, error)

GetRepoByID returns a repo by its ID

func (*DB) GetRepoByIdentity

func (db *DB) GetRepoByIdentity(identity string) (*Repo, error)

GetRepoByIdentity finds a repo by its identity. Returns nil if not found, error if duplicates exist.

func (*DB) GetRepoByName

func (db *DB) GetRepoByName(name string) (*Repo, error)

GetRepoByName returns a repo by its display name

func (*DB) GetRepoByPath

func (db *DB) GetRepoByPath(rootPath string) (*Repo, error)

GetRepoByPath returns a repo by its path

func (*DB) GetRepoStats

func (db *DB) GetRepoStats(repoID int64) (*RepoStats, error)

GetRepoStats returns detailed statistics for a repo

func (*DB) GetReviewByCommitSHA

func (db *DB) GetReviewByCommitSHA(sha string) (*Review, error)

GetReviewByCommitSHA finds the most recent review by commit SHA (searches git_ref field)

func (*DB) GetReviewByID

func (db *DB) GetReviewByID(reviewID int64) (*Review, error)

GetReviewByID finds a review by its ID

func (*DB) GetReviewByJobID

func (db *DB) GetReviewByJobID(jobID int64) (*Review, error)

GetReviewByJobID finds a review by its job ID

func (*DB) GetReviewsToSync

func (db *DB) GetReviewsToSync(machineID string, limit int) ([]SyncableReview, error)

GetReviewsToSync returns reviews modified locally that need to be pushed. Only returns reviews whose parent job has already been synced.

func (*DB) GetSyncState

func (db *DB) GetSyncState(key string) (string, error)

GetSyncState retrieves a value from the sync_state table. Returns empty string if key doesn't exist.

func (*DB) ListJobs

func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int, gitRefFilter ...string) ([]ReviewJob, error)

ListJobs returns jobs with optional status and repo filters

func (*DB) ListRepos

func (db *DB) ListRepos() ([]Repo, error)

ListRepos returns all repos in the database

func (*DB) ListReposWithReviewCounts

func (db *DB) ListReposWithReviewCounts() ([]RepoWithCount, int, error)

ListReposWithReviewCounts returns all repos with their total job counts

func (*DB) MarkCommentSynced added in v0.17.0

func (db *DB) MarkCommentSynced(responseID int64) error

MarkCommentSynced updates the synced_at timestamp for a comment

func (*DB) MarkCommentsSynced added in v0.17.0

func (db *DB) MarkCommentsSynced(responseIDs []int64) error

MarkCommentsSynced updates the synced_at timestamp for multiple comments

func (*DB) MarkJobSynced

func (db *DB) MarkJobSynced(jobID int64) error

MarkJobSynced updates the synced_at timestamp for a job

func (*DB) MarkJobsSynced

func (db *DB) MarkJobsSynced(jobIDs []int64) error

MarkJobsSynced updates the synced_at timestamp for multiple jobs

func (*DB) MarkReviewAddressed

func (db *DB) MarkReviewAddressed(reviewID int64, addressed bool) error

MarkReviewAddressed marks a review as addressed (or unaddressed) by review ID

func (*DB) MarkReviewAddressedByJobID added in v0.18.0

func (db *DB) MarkReviewAddressedByJobID(jobID int64, addressed bool) error

MarkReviewAddressedByJobID marks a review as addressed (or unaddressed) by job ID

func (*DB) MarkReviewSynced

func (db *DB) MarkReviewSynced(reviewID int64) error

MarkReviewSynced updates the synced_at timestamp for a review

func (*DB) MarkReviewsSynced

func (db *DB) MarkReviewsSynced(reviewIDs []int64) error

MarkReviewsSynced updates the synced_at timestamp for multiple reviews

func (*DB) MergeRepos

func (db *DB) MergeRepos(sourceRepoID, targetRepoID int64) (int64, error)

MergeRepos moves all jobs and commits from sourceRepoID to targetRepoID, then deletes the source repo

func (*DB) ReenqueueJob

func (db *DB) ReenqueueJob(jobID int64) error

ReenqueueJob resets a completed, failed, or canceled job back to queued status. This allows manual re-running of jobs to get a fresh review. For done jobs, the existing review is deleted to avoid unique constraint violations.

func (*DB) RenameRepo

func (db *DB) RenameRepo(identifier, newName string) (int64, error)

RenameRepo updates the display name of a repo identified by its path or current name

func (*DB) ResetStaleJobs

func (db *DB) ResetStaleJobs() error

ResetStaleJobs marks all running jobs as queued (for daemon restart)

func (*DB) RetryJob

func (db *DB) RetryJob(jobID int64, maxRetries int) (bool, error)

RetryJob atomically resets a running job to queued for retry. Returns false if max retries reached or job is not in running state. maxRetries is the number of retries allowed (e.g., 3 means up to 4 total attempts).

func (*DB) SaveJobPrompt

func (db *DB) SaveJobPrompt(jobID int64, prompt string) error

SaveJobPrompt stores the prompt for a running job

func (*DB) SetRepoIdentity

func (db *DB) SetRepoIdentity(repoID int64, identity string) error

SetRepoIdentity sets the identity for a repo.

func (*DB) SetSyncState

func (db *DB) SetSyncState(key, value string) error

SetSyncState sets a value in the sync_state table (upsert).

func (*DB) UpsertPulledJob

func (db *DB) UpsertPulledJob(j PulledJob, repoID int64, commitID *int64) error

UpsertPulledJob inserts or updates a job from PostgreSQL into SQLite. Sets synced_at to prevent re-pushing. Requires repo to exist.

func (*DB) UpsertPulledResponse

func (db *DB) UpsertPulledResponse(r PulledResponse) error

UpsertPulledResponse inserts a response from PostgreSQL into SQLite.

func (*DB) UpsertPulledReview

func (db *DB) UpsertPulledReview(r PulledReview) error

UpsertPulledReview inserts or updates a review from PostgreSQL into SQLite.

type DaemonStatus

type DaemonStatus struct {
	Version             string `json:"version"`
	QueuedJobs          int    `json:"queued_jobs"`
	RunningJobs         int    `json:"running_jobs"`
	CompletedJobs       int    `json:"completed_jobs"`
	FailedJobs          int    `json:"failed_jobs"`
	CanceledJobs        int    `json:"canceled_jobs"`
	ActiveWorkers       int    `json:"active_workers"`
	MaxWorkers          int    `json:"max_workers"`
	MachineID           string `json:"machine_id,omitempty"`            // Local machine ID for remote job detection
	ConfigReloadedAt    string `json:"config_reloaded_at,omitempty"`    // Last config reload timestamp (RFC3339Nano)
	ConfigReloadCounter uint64 `json:"config_reload_counter,omitempty"` // Monotonic reload counter (for sub-second detection)
}

type ErrorEntry

type ErrorEntry struct {
	Timestamp time.Time `json:"ts"`
	Level     string    `json:"level"`
	Component string    `json:"component"`
	Message   string    `json:"message"`
	JobID     int64     `json:"job_id,omitempty"`
}

ErrorEntry represents a single error log entry (mirrors daemon.ErrorEntry for API)

type HealthStatus

type HealthStatus struct {
	Healthy      bool              `json:"healthy"`
	Uptime       string            `json:"uptime"`
	Version      string            `json:"version"`
	Components   []ComponentHealth `json:"components"`
	RecentErrors []ErrorEntry      `json:"recent_errors"`
	ErrorCount   int               `json:"error_count_24h"`
}

HealthStatus represents the overall daemon health

type JobStatus

type JobStatus string
const (
	JobStatusQueued   JobStatus = "queued"
	JobStatusRunning  JobStatus = "running"
	JobStatusDone     JobStatus = "done"
	JobStatusFailed   JobStatus = "failed"
	JobStatusCanceled JobStatus = "canceled"
)

type JobWithPgIDs

type JobWithPgIDs struct {
	Job        SyncableJob
	PgRepoID   int64
	PgCommitID *int64
}

JobWithPgIDs represents a job with its resolved PostgreSQL repo and commit IDs

type PgPool

type PgPool struct {
	// contains filtered or unexported fields
}

PgPool wraps a pgx connection pool with reconnection logic

func NewPgPool

func NewPgPool(ctx context.Context, connString string, cfg PgPoolConfig) (*PgPool, error)

NewPgPool creates a new PostgreSQL connection pool. The connection string should be a PostgreSQL URL like: postgres://user:pass@host:port/dbname?sslmode=disable

func (*PgPool) BatchInsertResponses

func (p *PgPool) BatchInsertResponses(ctx context.Context, responses []SyncableResponse) ([]bool, error)

BatchInsertResponses inserts multiple responses in a single batch operation. Returns a boolean slice indicating success/failure for each item at the corresponding index.

func (*PgPool) BatchUpsertJobs

func (p *PgPool) BatchUpsertJobs(ctx context.Context, jobs []JobWithPgIDs) ([]bool, error)

BatchUpsertJobs inserts or updates multiple jobs in a single batch operation. The jobs must have their PgRepoID and PgCommitID already resolved. Returns a boolean slice indicating success/failure for each item at the corresponding index.

func (*PgPool) BatchUpsertReviews

func (p *PgPool) BatchUpsertReviews(ctx context.Context, reviews []SyncableReview) ([]bool, error)

BatchUpsertReviews inserts or updates multiple reviews in a single batch operation. Returns a boolean slice indicating success/failure for each item at the corresponding index.

func (*PgPool) Close

func (p *PgPool) Close()

Close closes the connection pool

func (*PgPool) EnsureSchema

func (p *PgPool) EnsureSchema(ctx context.Context) error

EnsureSchema creates the schema if it doesn't exist and checks version. If legacy tables exist in the public schema, they are migrated to roborev.

func (*PgPool) GetDatabaseID

func (p *PgPool) GetDatabaseID(ctx context.Context) (string, error)

GetDatabaseID returns the unique ID for this Postgres database. Creates one if it doesn't exist. This ID is used to detect when a client is syncing to a different database than before.

func (*PgPool) GetOrCreateCommit

func (p *PgPool) GetOrCreateCommit(ctx context.Context, repoID int64, sha, author, subject string, timestamp time.Time) (int64, error)

GetOrCreateCommit finds or creates a commit, returns the PostgreSQL ID

func (*PgPool) GetOrCreateRepo

func (p *PgPool) GetOrCreateRepo(ctx context.Context, identity string) (int64, error)

GetOrCreateRepo finds or creates a repo by identity, returns the PostgreSQL ID

func (*PgPool) InsertResponse

func (p *PgPool) InsertResponse(ctx context.Context, r SyncableResponse) error

InsertResponse inserts a response in PostgreSQL (append-only, no updates)

func (*PgPool) Ping

func (p *PgPool) Ping(ctx context.Context) error

Ping checks if the connection is alive

func (*PgPool) Pool

func (p *PgPool) Pool() *pgxpool.Pool

Pool returns the underlying pgxpool.Pool for direct access

func (*PgPool) PullJobs

func (p *PgPool) PullJobs(ctx context.Context, excludeMachineID string, cursor string, limit int) ([]PulledJob, string, error)

PullJobs fetches jobs from PostgreSQL updated after the given cursor. Cursor format: "updated_at id" (space-separated) or empty for first pull. Returns jobs not from the given machineID (to avoid echo).

func (*PgPool) PullResponses

func (p *PgPool) PullResponses(ctx context.Context, excludeMachineID string, afterID int64, limit int) ([]PulledResponse, int64, error)

PullResponses fetches responses from PostgreSQL created after the given ID cursor.

func (*PgPool) PullReviews

func (p *PgPool) PullReviews(ctx context.Context, excludeMachineID string, knownJobUUIDs []string, cursor string, limit int) ([]PulledReview, string, error)

PullReviews fetches reviews from PostgreSQL updated after the given cursor. Only fetches reviews for jobs in knownJobUUIDs to avoid cursor advancement past unknown jobs.

func (*PgPool) RegisterMachine

func (p *PgPool) RegisterMachine(ctx context.Context, machineID, name string) error

RegisterMachine registers or updates this machine in the machines table

func (*PgPool) Tx

func (p *PgPool) Tx(ctx context.Context, fn func(tx pgx.Tx) error) error

Tx runs a function within a transaction

func (*PgPool) UpsertJob

func (p *PgPool) UpsertJob(ctx context.Context, j SyncableJob, pgRepoID int64, pgCommitID *int64) error

UpsertJob inserts or updates a job in PostgreSQL

func (*PgPool) UpsertReview

func (p *PgPool) UpsertReview(ctx context.Context, r SyncableReview) error

UpsertReview inserts or updates a review in PostgreSQL

type PgPoolConfig

type PgPoolConfig struct {
	// ConnectTimeout is the timeout for initial connection (default: 5s)
	ConnectTimeout time.Duration
	// MaxConns is the maximum number of connections (default: 4)
	MaxConns int32
	// MinConns is the minimum number of connections (default: 0)
	MinConns int32
	// MaxConnLifetime is the maximum lifetime of a connection (default: 1h)
	MaxConnLifetime time.Duration
	// MaxConnIdleTime is the maximum idle time before closing (default: 30m)
	MaxConnIdleTime time.Duration
}

PgPoolConfig configures the PostgreSQL connection pool

func DefaultPgPoolConfig

func DefaultPgPoolConfig() PgPoolConfig

DefaultPgPoolConfig returns sensible defaults for the connection pool

type PulledJob

type PulledJob struct {
	UUID            string
	RepoIdentity    string
	CommitSHA       string
	CommitAuthor    string
	CommitSubject   string
	CommitTimestamp time.Time
	GitRef          string
	Agent           string
	Model           string
	Reasoning       string
	Status          string
	Agentic         bool
	EnqueuedAt      time.Time
	StartedAt       *time.Time
	FinishedAt      *time.Time
	Prompt          string
	DiffContent     *string
	Error           string
	SourceMachineID string
	UpdatedAt       time.Time
}

PulledJob represents a job pulled from PostgreSQL

type PulledResponse

type PulledResponse struct {
	UUID            string
	JobUUID         string
	Responder       string
	Response        string
	SourceMachineID string
	CreatedAt       time.Time
}

PulledResponse represents a response pulled from PostgreSQL

type PulledReview

type PulledReview struct {
	UUID               string
	JobUUID            string
	Agent              string
	Prompt             string
	Output             string
	Addressed          bool
	UpdatedByMachineID string
	CreatedAt          time.Time
	UpdatedAt          time.Time
}

PulledReview represents a review pulled from PostgreSQL

type Repo

type Repo struct {
	ID        int64     `json:"id"`
	RootPath  string    `json:"root_path"`
	Name      string    `json:"name"`
	CreatedAt time.Time `json:"created_at"`
	Identity  string    `json:"identity,omitempty"` // Unique identity for sync (git remote URL, .roborev-id, or local path)
}

type RepoStats

type RepoStats struct {
	Repo          *Repo
	TotalJobs     int
	QueuedJobs    int
	RunningJobs   int
	CompletedJobs int
	FailedJobs    int
	PassedReviews int
	FailedReviews int
}

RepoStats contains statistics for a single repo

type RepoWithCount

type RepoWithCount struct {
	Name     string `json:"name"`
	RootPath string `json:"root_path"`
	Count    int    `json:"count"`
}

RepoWithCount represents a repo with its total job count

type Response

type Response struct {
	ID        int64     `json:"id"`
	CommitID  *int64    `json:"commit_id,omitempty"` // For commit-based responses (legacy)
	JobID     *int64    `json:"job_id,omitempty"`    // For job/review-based responses
	Responder string    `json:"responder"`
	Response  string    `json:"response"`
	CreatedAt time.Time `json:"created_at"`

	// Sync fields
	UUID            string     `json:"uuid,omitempty"`              // Globally unique identifier for sync
	SourceMachineID string     `json:"source_machine_id,omitempty"` // Machine that created this response
	SyncedAt        *time.Time `json:"synced_at,omitempty"`         // Last sync time
}

type Review

type Review struct {
	ID        int64     `json:"id"`
	JobID     int64     `json:"job_id"`
	Agent     string    `json:"agent"`
	Prompt    string    `json:"prompt"`
	Output    string    `json:"output"`
	CreatedAt time.Time `json:"created_at"`
	Addressed bool      `json:"addressed"`

	// Sync fields
	UUID               string     `json:"uuid,omitempty"`                  // Globally unique identifier for sync
	UpdatedAt          *time.Time `json:"updated_at,omitempty"`            // Last modification time
	UpdatedByMachineID string     `json:"updated_by_machine_id,omitempty"` // Machine that last modified this review
	SyncedAt           *time.Time `json:"synced_at,omitempty"`             // Last sync time

	// Joined fields
	Job *ReviewJob `json:"job,omitempty"`
}

type ReviewJob

type ReviewJob struct {
	ID          int64      `json:"id"`
	RepoID      int64      `json:"repo_id"`
	CommitID    *int64     `json:"commit_id,omitempty"` // nil for ranges
	GitRef      string     `json:"git_ref"`             // SHA or "start..end" for ranges
	Agent       string     `json:"agent"`
	Model       string     `json:"model,omitempty"`     // Model to use (for opencode: provider/model format)
	Reasoning   string     `json:"reasoning,omitempty"` // thorough, standard, fast (default: thorough)
	Status      JobStatus  `json:"status"`
	EnqueuedAt  time.Time  `json:"enqueued_at"`
	StartedAt   *time.Time `json:"started_at,omitempty"`
	FinishedAt  *time.Time `json:"finished_at,omitempty"`
	WorkerID    string     `json:"worker_id,omitempty"`
	Error       string     `json:"error,omitempty"`
	Prompt      string     `json:"prompt,omitempty"`
	RetryCount  int        `json:"retry_count"`
	DiffContent *string    `json:"diff_content,omitempty"` // For dirty reviews (uncommitted changes)
	Agentic     bool       `json:"agentic"`                // Enable agentic mode (allow file edits)

	// Sync fields
	UUID            string     `json:"uuid,omitempty"`              // Globally unique identifier for sync
	SourceMachineID string     `json:"source_machine_id,omitempty"` // Machine that created this job
	UpdatedAt       *time.Time `json:"updated_at,omitempty"`        // Last modification time
	SyncedAt        *time.Time `json:"synced_at,omitempty"`         // Last sync time

	// Joined fields for convenience
	RepoPath      string  `json:"repo_path,omitempty"`
	RepoName      string  `json:"repo_name,omitempty"`
	CommitSubject string  `json:"commit_subject,omitempty"` // empty for ranges
	Addressed     *bool   `json:"addressed,omitempty"`      // nil if no review yet
	Verdict       *string `json:"verdict,omitempty"`        // P/F parsed from review output
}

type SyncProgress

type SyncProgress struct {
	Phase      string // "push" or "pull"
	BatchNum   int
	BatchJobs  int
	BatchRevs  int
	BatchResps int
	TotalJobs  int
	TotalRevs  int
	TotalResps int
}

SyncProgress reports progress during a sync operation

type SyncStats

type SyncStats struct {
	PushedJobs      int
	PushedReviews   int
	PushedResponses int
	PulledJobs      int
	PulledReviews   int
	PulledResponses int
}

SyncStats contains statistics from a sync operation

type SyncWorker

type SyncWorker struct {
	// contains filtered or unexported fields
}

SyncWorker handles background synchronization between SQLite and PostgreSQL

func NewSyncWorker

func NewSyncWorker(db *DB, cfg config.SyncConfig) *SyncWorker

NewSyncWorker creates a new sync worker

func (*SyncWorker) FinalPush

func (w *SyncWorker) FinalPush() error

FinalPush performs a push-only sync for graceful shutdown. This ensures all local changes are pushed before the daemon exits. Loops until all pending items are synced (not just one batch). Does not pull changes since we're shutting down.

func (*SyncWorker) HealthCheck

func (w *SyncWorker) HealthCheck() (healthy bool, message string)

HealthCheck returns the health status of the sync worker

func (*SyncWorker) Start

func (w *SyncWorker) Start() error

Start begins the sync worker in a background goroutine. The worker can be stopped with Stop() and restarted with Start().

func (*SyncWorker) Stop

func (w *SyncWorker) Stop()

Stop gracefully stops the sync worker

func (*SyncWorker) SyncNow

func (w *SyncWorker) SyncNow() (*SyncStats, error)

SyncNow triggers an immediate sync cycle and returns statistics. Returns an error if the worker is not running or not connected. Loops until all pending items are pushed (not just one batch).

func (*SyncWorker) SyncNowWithProgress

func (w *SyncWorker) SyncNowWithProgress(progressFn func(SyncProgress)) (*SyncStats, error)

SyncNowWithProgress is like SyncNow but calls progressFn after each batch. If not connected, attempts to connect first.

type SyncableJob

type SyncableJob struct {
	ID              int64
	UUID            string
	RepoID          int64
	RepoIdentity    string
	CommitID        *int64
	CommitSHA       string
	CommitAuthor    string
	CommitSubject   string
	CommitTimestamp time.Time
	GitRef          string
	Agent           string
	Model           string
	Reasoning       string
	Status          string
	Agentic         bool
	EnqueuedAt      time.Time
	StartedAt       *time.Time
	FinishedAt      *time.Time
	Prompt          string
	DiffContent     *string
	Error           string
	SourceMachineID string
	UpdatedAt       time.Time
}

SyncableJob contains job data needed for sync

type SyncableResponse

type SyncableResponse struct {
	ID              int64
	UUID            string
	JobID           int64
	JobUUID         string
	Responder       string
	Response        string
	SourceMachineID string
	CreatedAt       time.Time
}

SyncableResponse contains response data needed for sync

type SyncableReview

type SyncableReview struct {
	ID                 int64
	UUID               string
	JobID              int64
	JobUUID            string
	Agent              string
	Prompt             string
	Output             string
	Addressed          bool
	UpdatedByMachineID string
	CreatedAt          time.Time
	UpdatedAt          time.Time
}

SyncableReview contains review data needed for sync

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL