Documentation
¶
Index ¶
- Constants
- Variables
- func CalculateRetryDelay(attempt int) time.Duration
- func Connect(addr, password string, db int) (*redis.Client, error)
- func ConnectWithTLS(addr, password string, db int, tlsCfg *TLSConfig) (*redis.Client, error)
- type BulkActionResult
- type CleanupPayload
- type CleanupResult
- type IndexPayload
- type Job
- type JobListOptions
- type JobListResult
- type JobProgress
- type JobStatus
- type JobType
- type Queue
- func (q *Queue) BulkCancelJobs(ctx context.Context, jobType JobType, status JobStatus) (*BulkActionResult, error)
- func (q *Queue) BulkDeleteJobs(ctx context.Context, jobType JobType, status JobStatus) (*BulkActionResult, error)
- func (q *Queue) CleanupOldJobs(ctx context.Context, maxAge time.Duration) (*CleanupResult, error)
- func (q *Queue) Clear(ctx context.Context) error
- func (q *Queue) Client() *redis.Client
- func (q *Queue) DeleteAllJobs(ctx context.Context) (*BulkActionResult, error)
- func (q *Queue) Dequeue(ctx context.Context, timeout time.Duration) (*Job, error)
- func (q *Queue) Enqueue(ctx context.Context, jobType JobType, payload any) (*Job, error)
- func (q *Queue) EnqueueWithOptions(ctx context.Context, jobType JobType, payload any, maxAttempts int) (*Job, error)
- func (q *Queue) FindPendingJob(ctx context.Context, jobType JobType, ...) (*Job, error)
- func (q *Queue) GetActiveIndexRepos(ctx context.Context) ([]int64, error)
- func (q *Queue) GetJob(ctx context.Context, jobID string) (*Job, error)
- func (q *Queue) GetRetryQueueLength(ctx context.Context) (int64, error)
- func (q *Queue) HasActiveIndexJob(ctx context.Context, repoID int64) (bool, error)
- func (q *Queue) HasPendingCleanupJob(ctx context.Context, repoID int64) (bool, error)
- func (q *Queue) HasPendingIndexJob(ctx context.Context, repoID int64) (bool, error)
- func (q *Queue) HasPendingSyncJob(ctx context.Context, connectionID int64) (bool, error)
- func (q *Queue) ListJobsWithOptions(ctx context.Context, opts JobListOptions) (*JobListResult, error)
- func (q *Queue) MarkCleanupJobActive(ctx context.Context, repoID int64) error
- func (q *Queue) MarkCleanupJobInactive(ctx context.Context, repoID int64) error
- func (q *Queue) MarkCompleted(ctx context.Context, jobID string) error
- func (q *Queue) MarkFailed(ctx context.Context, jobID string, err error) error
- func (q *Queue) MarkIndexJobActive(ctx context.Context, repoID int64) error
- func (q *Queue) MarkIndexJobInactive(ctx context.Context, repoID int64) error
- func (q *Queue) MarkRunning(ctx context.Context, jobID string) error
- func (q *Queue) MarkSyncJobActive(ctx context.Context, connectionID int64) error
- func (q *Queue) MarkSyncJobInactive(ctx context.Context, connectionID int64) error
- func (q *Queue) ProcessRetryQueue(ctx context.Context) (int, error)
- func (q *Queue) QueueLength(ctx context.Context) (int64, error)
- func (q *Queue) RebuildJobIndexes(ctx context.Context) (int, error)
- func (q *Queue) ScheduleRetry(ctx context.Context, jobID string, jobErr error) (bool, error)
- func (q *Queue) TryAcquireCleanupJob(ctx context.Context, repoID int64) (bool, error)
- func (q *Queue) TryAcquireIndexJob(ctx context.Context, repoID int64) (bool, error)
- func (q *Queue) TryAcquireSyncJob(ctx context.Context, connectionID int64) (bool, error)
- func (q *Queue) UpdateProgress(ctx context.Context, jobID string, current, total int, message string) error
- func (q *Queue) UpdateStatus(ctx context.Context, jobID string, status JobStatus, errorMsg string) error
- type ReplaceMatch
- type ReplacePayload
- type ShardedQueue
- func (sq *ShardedQueue) DequeueForShard(ctx context.Context, timeout time.Duration) (*Job, error)
- func (sq *ShardedQueue) GetShardIndex() int
- func (sq *ShardedQueue) GetTotalShards() int
- func (sq *ShardedQueue) Heartbeat(ctx context.Context, jobID string) error
- func (sq *ShardedQueue) IsShardingEnabled() bool
- func (sq *ShardedQueue) MarkCompletedAndRelease(ctx context.Context, jobID string) error
- func (sq *ShardedQueue) MarkFailedAndRelease(ctx context.Context, jobID string, jobErr error) error
- func (sq *ShardedQueue) RecoverStaleJobs(ctx context.Context) (int, error)
- func (sq *ShardedQueue) RecoveryLoop(ctx context.Context, interval time.Duration)
- func (sq *ShardedQueue) ReleaseJob(ctx context.Context, jobID string) error
- type SyncPayload
- type TLSConfig
Constants ¶
const ( // DefaultMaxAttempts is the default number of retry attempts for jobs. DefaultMaxAttempts = 3 // DefaultRetryBaseDelay is the base delay for exponential backoff. DefaultRetryBaseDelay = 30 * time.Second // DefaultRetryMaxDelay is the maximum delay between retries. DefaultRetryMaxDelay = 30 * time.Minute // DefaultJobTTL is the default time-to-live for job data in Redis. // Jobs are automatically expired after this duration. DefaultJobTTL = 24 * time.Hour // DefaultJobClaimTTL is the default time a worker can hold a job claim. // If a worker doesn't heartbeat within this duration, the job can be reclaimed. DefaultJobClaimTTL = 5 * time.Minute )
Retry configuration defaults.
Variables ¶
var ErrJobAlreadyExists = errors.New("job already exists")
ErrJobAlreadyExists is returned when trying to enqueue a duplicate job.
Functions ¶
func CalculateRetryDelay ¶
CalculateRetryDelay calculates the delay before the next retry using exponential backoff. Formula: min(baseDelay * 2^(attempt-1), maxDelay).
Types ¶
type BulkActionResult ¶
type BulkActionResult struct {
Processed int `json:"processed"`
Succeeded int `json:"succeeded"`
Failed int `json:"failed"`
}
BulkActionResult represents the result of a bulk job operation.
type CleanupPayload ¶
type CleanupPayload struct {
RepositoryID int64 `json:"repository_id"`
RepositoryName string `json:"repository_name"`
DataDir string `json:"data_dir"` // Base data directory for repos and index
}
CleanupPayload is the payload for cleanup jobs (deleting Zoekt shards and repo files).
type CleanupResult ¶
type CleanupResult struct {
DeletedCount int `json:"deleted_count"`
ScannedCount int `json:"scanned_count"`
}
CleanupResult contains the result of a cleanup operation.
type IndexPayload ¶
type IndexPayload struct {
RepositoryID int64 `json:"repository_id"`
ConnectionID int64 `json:"connection_id"`
RepoName string `json:"repo_name"`
CloneURL string `json:"clone_url"`
Branch string `json:"branch"` // Default/primary branch (backward compat)
Branches []string `json:"branches,omitempty"` // Additional branches to index
}
IndexPayload is the payload for index jobs.
type Job ¶
type Job struct {
ID string `json:"id"`
Type JobType `json:"type"`
Status JobStatus `json:"status"`
Payload json.RawMessage `json:"payload"`
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
Progress *JobProgress `json:"progress,omitempty"`
// Retry fields
Attempts int `json:"attempts"` // Number of attempts so far
MaxAttempts int `json:"max_attempts,omitempty"` // Max attempts (0 = use default)
NextRetryAt *time.Time `json:"next_retry_at,omitempty"`
LastError string `json:"last_error,omitempty"` // Error from last attempt
}
Job represents a queued job.
func (*Job) ShouldRetry ¶
ShouldRetry determines if a job should be retried based on its attempt count.
type JobListOptions ¶
type JobListOptions struct {
Type JobType // Filter by job type (empty = all)
Status JobStatus // Filter by status (empty = all)
ExcludeStatus JobStatus // Exclude jobs with this status (empty = none)
RepoName string // Filter by repo name (partial match, empty = all)
CreatedAfter *time.Time // Filter to jobs created after this time (nil = no filter)
Limit int // Max results (default 50)
Offset int // Pagination offset
}
JobListOptions represents options for listing jobs.
type JobListResult ¶
type JobListResult struct {
Jobs []*Job `json:"jobs"`
TotalCount int `json:"total_count"`
Limit int `json:"limit"`
Offset int `json:"offset"`
HasMore bool `json:"has_more"`
}
JobListResult represents the result of listing jobs.
type JobProgress ¶
type JobProgress struct {
Current int `json:"current"`
Total int `json:"total"`
Message string `json:"message,omitempty"`
}
JobProgress tracks the progress of a running job.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue handles job queue operations.
func (*Queue) BulkCancelJobs ¶
func (q *Queue) BulkCancelJobs( ctx context.Context, jobType JobType, status JobStatus, ) (*BulkActionResult, error)
BulkCancelJobs cancels all jobs matching the given filters Only pending and running jobs can be canceled. Uses sorted set indexes for efficient queries instead of SCAN.
func (*Queue) BulkDeleteJobs ¶
func (q *Queue) BulkDeleteJobs( ctx context.Context, jobType JobType, status JobStatus, ) (*BulkActionResult, error)
BulkDeleteJobs deletes all jobs matching the given filters Only completed and failed jobs can be deleted. Uses sorted set indexes for efficient queries instead of SCAN.
func (*Queue) CleanupOldJobs ¶
CleanupOldJobs removes completed and failed jobs older than the specified duration. Uses sorted set indexes for efficient range queries instead of SCAN.
func (*Queue) DeleteAllJobs ¶
func (q *Queue) DeleteAllJobs(ctx context.Context) (*BulkActionResult, error)
DeleteAllJobs deletes ALL jobs regardless of status (use with caution). Uses the main job index for efficient iteration instead of SCAN.
func (*Queue) Dequeue ¶
Dequeue retrieves the next job from the queue. Priority queue (replace jobs) is checked first, then normal queue.
func (*Queue) Enqueue ¶
Enqueue adds a job to the queue. Replace jobs are added to a high-priority queue that is processed first.
func (*Queue) EnqueueWithOptions ¶
func (q *Queue) EnqueueWithOptions( ctx context.Context, jobType JobType, payload any, maxAttempts int, ) (*Job, error)
EnqueueWithOptions adds a job to the queue with custom retry settings. maxAttempts of 0 uses the default (DefaultMaxAttempts).
func (*Queue) FindPendingJob ¶
func (q *Queue) FindPendingJob( ctx context.Context, jobType JobType, keyExtractor func(payload json.RawMessage) (string, bool), ) (*Job, error)
FindPendingJob searches for an existing pending or running job of the given type that matches the provided key extractor function. The keyExtractor is called for each job payload and should return a comparable key.
func (*Queue) GetActiveIndexRepos ¶
GetActiveIndexRepos returns all repo IDs currently in the active index jobs set.
func (*Queue) GetRetryQueueLength ¶
GetRetryQueueLength returns the number of jobs waiting for retry.
func (*Queue) HasActiveIndexJob ¶
HasActiveIndexJob checks if a repo has any pending or running index job. This is used for recovery to detect orphaned active index markers.
func (*Queue) HasPendingCleanupJob ¶
HasPendingCleanupJob checks if there's already a pending/running cleanup job for the given repository.
func (*Queue) HasPendingIndexJob ¶
HasPendingIndexJob checks if there's already a pending/running index job for the given repository. Uses O(1) Redis SET lookup instead of scanning all jobs.
func (*Queue) HasPendingSyncJob ¶
HasPendingSyncJob checks if there's already a pending/running sync job for the given connection. Uses O(1) Redis SET lookup instead of scanning all jobs.
func (*Queue) ListJobsWithOptions ¶
func (q *Queue) ListJobsWithOptions( ctx context.Context, opts JobListOptions, ) (*JobListResult, error)
ListJobsWithOptions returns jobs with filtering and pagination. Uses sorted set indexes for efficient queries instead of SCAN.
func (*Queue) MarkCleanupJobActive ¶
MarkCleanupJobActive adds a repo ID to the active cleanup jobs set. Deprecated: Use TryAcquireCleanupJob for race-free job acquisition.
func (*Queue) MarkCleanupJobInactive ¶
MarkCleanupJobInactive removes a repo ID from the active cleanup jobs set.
func (*Queue) MarkCompleted ¶
MarkCompleted marks a job as completed.
func (*Queue) MarkFailed ¶
MarkFailed marks a job as failed.
func (*Queue) MarkIndexJobActive ¶
MarkIndexJobActive adds a repo ID to the active index jobs set. Deprecated: Use TryAcquireIndexJob for race-free job acquisition.
func (*Queue) MarkIndexJobInactive ¶
MarkIndexJobInactive removes a repo ID from the active index jobs set.
func (*Queue) MarkRunning ¶
MarkRunning marks a job as running.
func (*Queue) MarkSyncJobActive ¶
MarkSyncJobActive adds a connection ID to the active sync jobs set. Deprecated: Use TryAcquireSyncJob for race-free job acquisition.
func (*Queue) MarkSyncJobInactive ¶
MarkSyncJobInactive removes a connection ID from the active sync jobs set.
func (*Queue) ProcessRetryQueue ¶
ProcessRetryQueue moves jobs from the retry queue to the main queue if their retry time has passed. This should be called periodically (e.g., every 10 seconds).
func (*Queue) QueueLength ¶
QueueLength returns the number of pending jobs.
func (*Queue) RebuildJobIndexes ¶
RebuildJobIndexes scans all existing jobs and rebuilds the sorted set indexes. This is useful for fixing stale data from jobs created before the index feature or when indexes get out of sync due to bugs. Returns the number of jobs processed and any errors encountered.
func (*Queue) ScheduleRetry ¶
ScheduleRetry schedules a job for retry with exponential backoff. Returns true if the job was scheduled for retry, false if max attempts exceeded.
func (*Queue) TryAcquireCleanupJob ¶
TryAcquireCleanupJob atomically checks and marks a repo as having an active cleanup job. Returns true if the job was acquired (repo was not already active), false if already active. This eliminates race conditions between HasPendingCleanupJob and MarkCleanupJobActive.
func (*Queue) TryAcquireIndexJob ¶
TryAcquireIndexJob atomically checks and marks a repo as having an active index job. Returns true if the job was acquired (repo was not already active), false if already active. This eliminates race conditions between HasPendingIndexJob and MarkIndexJobActive.
func (*Queue) TryAcquireSyncJob ¶
TryAcquireSyncJob atomically checks and marks a connection as having an active sync job. Returns true if the job was acquired (connection was not already active), false if already active. This eliminates race conditions between HasPendingSyncJob and MarkSyncJobActive.
type ReplaceMatch ¶
type ReplaceMatch struct {
RepositoryID int64 `json:"repository_id"`
RepositoryName string `json:"repository_name"`
FilePath string `json:"file_path"`
}
ReplaceMatch represents a file match for replacement (from preview).
type ReplacePayload ¶
type ReplacePayload struct {
// Search parameters (kept for reference/logging, not used for search)
SearchPattern string `json:"search_pattern"`
ReplaceWith string `json:"replace_with"`
IsRegex bool `json:"is_regex"`
CaseSensitive bool `json:"case_sensitive"`
FilePatterns []string `json:"file_patterns,omitempty"`
// Matches from preview (required - Execute uses these directly)
Matches []ReplaceMatch `json:"matches"`
// MR/PR options (MR is always created)
BranchName string `json:"branch_name,omitempty"`
MRTitle string `json:"mr_title,omitempty"`
MRDescription string `json:"mr_description,omitempty"`
// User-provided tokens for repos without server-side authentication
// Map of connection_id (as string) -> token
UserTokens map[string]string `json:"user_tokens,omitempty"`
// ReposReadOnly indicates if the system is in read-only mode
// When true, only user-provided tokens can be used (DB tokens are for indexing only)
ReposReadOnly bool `json:"repos_readonly,omitempty"`
}
ReplacePayload is the payload for replace jobs.
type ShardedQueue ¶
type ShardedQueue struct {
*Queue
// contains filtered or unexported fields
}
ShardedQueue extends Queue with shard-aware job processing.
func NewShardedQueue ¶
func NewShardedQueue(client *redis.Client) *ShardedQueue
NewShardedQueue creates a shard-aware queue.
func (*ShardedQueue) DequeueForShard ¶
DequeueForShard retrieves a job that belongs to this shard It will skip jobs that belong to other shards (re-queue them). Priority queue (replace jobs) is checked first, then normal queue.
func (*ShardedQueue) GetShardIndex ¶
func (sq *ShardedQueue) GetShardIndex() int
GetShardIndex returns this worker's shard index.
func (*ShardedQueue) GetTotalShards ¶
func (sq *ShardedQueue) GetTotalShards() int
GetTotalShards returns total number of shards.
func (*ShardedQueue) Heartbeat ¶
func (sq *ShardedQueue) Heartbeat(ctx context.Context, jobID string) error
Heartbeat updates the TTL for a job we're processing Call this periodically for long-running jobs.
func (*ShardedQueue) IsShardingEnabled ¶
func (sq *ShardedQueue) IsShardingEnabled() bool
IsShardingEnabled returns whether sharding is enabled.
func (*ShardedQueue) MarkCompletedAndRelease ¶
func (sq *ShardedQueue) MarkCompletedAndRelease(ctx context.Context, jobID string) error
MarkCompletedAndRelease marks a job complete and releases it.
func (*ShardedQueue) MarkFailedAndRelease ¶
func (sq *ShardedQueue) MarkFailedAndRelease( ctx context.Context, jobID string, jobErr error, ) error
MarkFailedAndRelease marks a job failed and releases it. If the job has retries remaining, it will be scheduled for retry instead of being marked as permanently failed.
func (*ShardedQueue) RecoverStaleJobs ¶
func (sq *ShardedQueue) RecoverStaleJobs(ctx context.Context) (int, error)
RecoverStaleJobs finds jobs that were being processed but worker died Call this periodically from a leader/coordinator.
func (*ShardedQueue) RecoveryLoop ¶
func (sq *ShardedQueue) RecoveryLoop(ctx context.Context, interval time.Duration)
RecoveryLoop runs periodic recovery of stale jobs.
func (*ShardedQueue) ReleaseJob ¶
func (sq *ShardedQueue) ReleaseJob(ctx context.Context, jobID string) error
ReleaseJob releases a job from processing (on completion or failure).
type SyncPayload ¶
type SyncPayload struct {
ConnectionID int64 `json:"connection_id"`
}
SyncPayload is the payload for sync jobs.
type TLSConfig ¶
type TLSConfig struct {
Enabled bool // Enable TLS
SkipVerify bool // Skip certificate verification (insecure)
CertFile string // Path to client certificate file (for mTLS)
KeyFile string // Path to client key file (for mTLS)
CACertFile string // Path to CA certificate file
ServerName string // Override server name for TLS verification
}
TLSConfig holds TLS configuration options for Redis connections.