queue

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultMaxAttempts is the default number of retry attempts for jobs.
	DefaultMaxAttempts = 3

	// DefaultRetryBaseDelay is the base delay for exponential backoff.
	DefaultRetryBaseDelay = 30 * time.Second

	// DefaultRetryMaxDelay is the maximum delay between retries.
	DefaultRetryMaxDelay = 30 * time.Minute

	// DefaultJobTTL is the default time-to-live for job data in Redis.
	// Jobs are automatically expired after this duration.
	DefaultJobTTL = 24 * time.Hour

	// DefaultJobClaimTTL is the default time a worker can hold a job claim.
	// If a worker doesn't heartbeat within this duration, the job can be reclaimed.
	DefaultJobClaimTTL = 5 * time.Minute
)

Retry configuration defaults.

Variables

View Source
var ErrJobAlreadyExists = errors.New("job already exists")

ErrJobAlreadyExists is returned when trying to enqueue a duplicate job.

Functions

func CalculateRetryDelay

func CalculateRetryDelay(attempt int) time.Duration

CalculateRetryDelay calculates the delay before the next retry using exponential backoff. Formula: min(baseDelay * 2^(attempt-1), maxDelay).

func Connect

func Connect(addr, password string, db int) (*redis.Client, error)

Connect creates a new Redis client.

func ConnectWithTLS

func ConnectWithTLS(addr, password string, db int, tlsCfg *TLSConfig) (*redis.Client, error)

ConnectWithTLS creates a new Redis client with optional TLS configuration.

Types

type BulkActionResult

type BulkActionResult struct {
	Processed int `json:"processed"`
	Succeeded int `json:"succeeded"`
	Failed    int `json:"failed"`
}

BulkActionResult represents the result of a bulk job operation.

type CleanupPayload

type CleanupPayload struct {
	RepositoryID   int64  `json:"repository_id"`
	RepositoryName string `json:"repository_name"`
	DataDir        string `json:"data_dir"` // Base data directory for repos and index
}

CleanupPayload is the payload for cleanup jobs (deleting Zoekt shards and repo files).

type CleanupResult

type CleanupResult struct {
	DeletedCount int `json:"deleted_count"`
	ScannedCount int `json:"scanned_count"`
}

CleanupResult contains the result of a cleanup operation.

type IndexPayload

type IndexPayload struct {
	RepositoryID int64    `json:"repository_id"`
	ConnectionID int64    `json:"connection_id"`
	RepoName     string   `json:"repo_name"`
	CloneURL     string   `json:"clone_url"`
	Branch       string   `json:"branch"`             // Default/primary branch (backward compat)
	Branches     []string `json:"branches,omitempty"` // Additional branches to index
}

IndexPayload is the payload for index jobs.

type Job

type Job struct {
	ID          string          `json:"id"`
	Type        JobType         `json:"type"`
	Status      JobStatus       `json:"status"`
	Payload     json.RawMessage `json:"payload"`
	Error       string          `json:"error,omitempty"`
	CreatedAt   time.Time       `json:"created_at"`
	UpdatedAt   time.Time       `json:"updated_at"`
	StartedAt   *time.Time      `json:"started_at,omitempty"`
	CompletedAt *time.Time      `json:"completed_at,omitempty"`
	Progress    *JobProgress    `json:"progress,omitempty"`
	// Retry fields
	Attempts    int        `json:"attempts"`               // Number of attempts so far
	MaxAttempts int        `json:"max_attempts,omitempty"` // Max attempts (0 = use default)
	NextRetryAt *time.Time `json:"next_retry_at,omitempty"`
	LastError   string     `json:"last_error,omitempty"` // Error from last attempt
}

Job represents a queued job.

func (*Job) ShouldRetry

func (j *Job) ShouldRetry() bool

ShouldRetry determines if a job should be retried based on its attempt count.

type JobListOptions

type JobListOptions struct {
	Type          JobType    // Filter by job type (empty = all)
	Status        JobStatus  // Filter by status (empty = all)
	ExcludeStatus JobStatus  // Exclude jobs with this status (empty = none)
	RepoName      string     // Filter by repo name (partial match, empty = all)
	CreatedAfter  *time.Time // Filter to jobs created after this time (nil = no filter)
	Limit         int        // Max results (default 50)
	Offset        int        // Pagination offset
}

JobListOptions represents options for listing jobs.

type JobListResult

type JobListResult struct {
	Jobs       []*Job `json:"jobs"`
	TotalCount int    `json:"total_count"`
	Limit      int    `json:"limit"`
	Offset     int    `json:"offset"`
	HasMore    bool   `json:"has_more"`
}

JobListResult represents the result of listing jobs.

type JobProgress

type JobProgress struct {
	Current int    `json:"current"`
	Total   int    `json:"total"`
	Message string `json:"message,omitempty"`
}

JobProgress tracks the progress of a running job.

type JobStatus

type JobStatus string

JobStatus represents the status of a job.

const (
	JobStatusPending   JobStatus = "pending"
	JobStatusRunning   JobStatus = "running"
	JobStatusCompleted JobStatus = "completed"
	JobStatusFailed    JobStatus = "failed"
)

type JobType

type JobType string

JobType represents the type of job.

const (
	JobTypeIndex   JobType = "index"
	JobTypeReplace JobType = "replace"
	JobTypeSync    JobType = "sync"
	JobTypeCleanup JobType = "cleanup"
)

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue handles job queue operations.

func NewQueue

func NewQueue(client *redis.Client) *Queue

NewQueue creates a new job queue.

func (*Queue) BulkCancelJobs

func (q *Queue) BulkCancelJobs(
	ctx context.Context,
	jobType JobType,
	status JobStatus,
) (*BulkActionResult, error)

BulkCancelJobs cancels all jobs matching the given filters Only pending and running jobs can be canceled. Uses sorted set indexes for efficient queries instead of SCAN.

func (*Queue) BulkDeleteJobs

func (q *Queue) BulkDeleteJobs(
	ctx context.Context,
	jobType JobType,
	status JobStatus,
) (*BulkActionResult, error)

BulkDeleteJobs deletes all jobs matching the given filters Only completed and failed jobs can be deleted. Uses sorted set indexes for efficient queries instead of SCAN.

func (*Queue) CleanupOldJobs

func (q *Queue) CleanupOldJobs(ctx context.Context, maxAge time.Duration) (*CleanupResult, error)

CleanupOldJobs removes completed and failed jobs older than the specified duration. Uses sorted set indexes for efficient range queries instead of SCAN.

func (*Queue) Clear

func (q *Queue) Clear(ctx context.Context) error

Clear removes all jobs from the queue.

func (*Queue) Client

func (q *Queue) Client() *redis.Client

Client returns the underlying Redis client.

func (*Queue) DeleteAllJobs

func (q *Queue) DeleteAllJobs(ctx context.Context) (*BulkActionResult, error)

DeleteAllJobs deletes ALL jobs regardless of status (use with caution). Uses the main job index for efficient iteration instead of SCAN.

func (*Queue) Dequeue

func (q *Queue) Dequeue(ctx context.Context, timeout time.Duration) (*Job, error)

Dequeue retrieves the next job from the queue. Priority queue (replace jobs) is checked first, then normal queue.

func (*Queue) Enqueue

func (q *Queue) Enqueue(ctx context.Context, jobType JobType, payload any) (*Job, error)

Enqueue adds a job to the queue. Replace jobs are added to a high-priority queue that is processed first.

func (*Queue) EnqueueWithOptions

func (q *Queue) EnqueueWithOptions(
	ctx context.Context,
	jobType JobType,
	payload any,
	maxAttempts int,
) (*Job, error)

EnqueueWithOptions adds a job to the queue with custom retry settings. maxAttempts of 0 uses the default (DefaultMaxAttempts).

func (*Queue) FindPendingJob

func (q *Queue) FindPendingJob(
	ctx context.Context,
	jobType JobType,
	keyExtractor func(payload json.RawMessage) (string, bool),
) (*Job, error)

FindPendingJob searches for an existing pending or running job of the given type that matches the provided key extractor function. The keyExtractor is called for each job payload and should return a comparable key.

func (*Queue) GetActiveIndexRepos

func (q *Queue) GetActiveIndexRepos(ctx context.Context) ([]int64, error)

GetActiveIndexRepos returns all repo IDs currently in the active index jobs set.

func (*Queue) GetJob

func (q *Queue) GetJob(ctx context.Context, jobID string) (*Job, error)

GetJob retrieves a job by ID.

func (*Queue) GetRetryQueueLength

func (q *Queue) GetRetryQueueLength(ctx context.Context) (int64, error)

GetRetryQueueLength returns the number of jobs waiting for retry.

func (*Queue) HasActiveIndexJob

func (q *Queue) HasActiveIndexJob(ctx context.Context, repoID int64) (bool, error)

HasActiveIndexJob checks if a repo has any pending or running index job. This is used for recovery to detect orphaned active index markers.

func (*Queue) HasPendingCleanupJob

func (q *Queue) HasPendingCleanupJob(ctx context.Context, repoID int64) (bool, error)

HasPendingCleanupJob checks if there's already a pending/running cleanup job for the given repository.

func (*Queue) HasPendingIndexJob

func (q *Queue) HasPendingIndexJob(ctx context.Context, repoID int64) (bool, error)

HasPendingIndexJob checks if there's already a pending/running index job for the given repository. Uses O(1) Redis SET lookup instead of scanning all jobs.

func (*Queue) HasPendingSyncJob

func (q *Queue) HasPendingSyncJob(ctx context.Context, connectionID int64) (bool, error)

HasPendingSyncJob checks if there's already a pending/running sync job for the given connection. Uses O(1) Redis SET lookup instead of scanning all jobs.

func (*Queue) ListJobsWithOptions

func (q *Queue) ListJobsWithOptions(
	ctx context.Context,
	opts JobListOptions,
) (*JobListResult, error)

ListJobsWithOptions returns jobs with filtering and pagination. Uses sorted set indexes for efficient queries instead of SCAN.

func (*Queue) MarkCleanupJobActive

func (q *Queue) MarkCleanupJobActive(ctx context.Context, repoID int64) error

MarkCleanupJobActive adds a repo ID to the active cleanup jobs set. Deprecated: Use TryAcquireCleanupJob for race-free job acquisition.

func (*Queue) MarkCleanupJobInactive

func (q *Queue) MarkCleanupJobInactive(ctx context.Context, repoID int64) error

MarkCleanupJobInactive removes a repo ID from the active cleanup jobs set.

func (*Queue) MarkCompleted

func (q *Queue) MarkCompleted(ctx context.Context, jobID string) error

MarkCompleted marks a job as completed.

func (*Queue) MarkFailed

func (q *Queue) MarkFailed(ctx context.Context, jobID string, err error) error

MarkFailed marks a job as failed.

func (*Queue) MarkIndexJobActive

func (q *Queue) MarkIndexJobActive(ctx context.Context, repoID int64) error

MarkIndexJobActive adds a repo ID to the active index jobs set. Deprecated: Use TryAcquireIndexJob for race-free job acquisition.

func (*Queue) MarkIndexJobInactive

func (q *Queue) MarkIndexJobInactive(ctx context.Context, repoID int64) error

MarkIndexJobInactive removes a repo ID from the active index jobs set.

func (*Queue) MarkRunning

func (q *Queue) MarkRunning(ctx context.Context, jobID string) error

MarkRunning marks a job as running.

func (*Queue) MarkSyncJobActive

func (q *Queue) MarkSyncJobActive(ctx context.Context, connectionID int64) error

MarkSyncJobActive adds a connection ID to the active sync jobs set. Deprecated: Use TryAcquireSyncJob for race-free job acquisition.

func (*Queue) MarkSyncJobInactive

func (q *Queue) MarkSyncJobInactive(ctx context.Context, connectionID int64) error

MarkSyncJobInactive removes a connection ID from the active sync jobs set.

func (*Queue) ProcessRetryQueue

func (q *Queue) ProcessRetryQueue(ctx context.Context) (int, error)

ProcessRetryQueue moves jobs from the retry queue to the main queue if their retry time has passed. This should be called periodically (e.g., every 10 seconds).

func (*Queue) QueueLength

func (q *Queue) QueueLength(ctx context.Context) (int64, error)

QueueLength returns the number of pending jobs.

func (*Queue) RebuildJobIndexes

func (q *Queue) RebuildJobIndexes(ctx context.Context) (int, error)

RebuildJobIndexes scans all existing jobs and rebuilds the sorted set indexes. This is useful for fixing stale data from jobs created before the index feature or when indexes get out of sync due to bugs. Returns the number of jobs processed and any errors encountered.

func (*Queue) ScheduleRetry

func (q *Queue) ScheduleRetry(ctx context.Context, jobID string, jobErr error) (bool, error)

ScheduleRetry schedules a job for retry with exponential backoff. Returns true if the job was scheduled for retry, false if max attempts exceeded.

func (*Queue) TryAcquireCleanupJob

func (q *Queue) TryAcquireCleanupJob(ctx context.Context, repoID int64) (bool, error)

TryAcquireCleanupJob atomically checks and marks a repo as having an active cleanup job. Returns true if the job was acquired (repo was not already active), false if already active. This eliminates race conditions between HasPendingCleanupJob and MarkCleanupJobActive.

func (*Queue) TryAcquireIndexJob

func (q *Queue) TryAcquireIndexJob(ctx context.Context, repoID int64) (bool, error)

TryAcquireIndexJob atomically checks and marks a repo as having an active index job. Returns true if the job was acquired (repo was not already active), false if already active. This eliminates race conditions between HasPendingIndexJob and MarkIndexJobActive.

func (*Queue) TryAcquireSyncJob

func (q *Queue) TryAcquireSyncJob(ctx context.Context, connectionID int64) (bool, error)

TryAcquireSyncJob atomically checks and marks a connection as having an active sync job. Returns true if the job was acquired (connection was not already active), false if already active. This eliminates race conditions between HasPendingSyncJob and MarkSyncJobActive.

func (*Queue) UpdateProgress

func (q *Queue) UpdateProgress(
	ctx context.Context,
	jobID string,
	current, total int,
	message string,
) error

UpdateProgress updates the progress of a running job.

func (*Queue) UpdateStatus

func (q *Queue) UpdateStatus(
	ctx context.Context,
	jobID string,
	status JobStatus,
	errorMsg string,
) error

UpdateStatus updates the status of a job.

type ReplaceMatch

type ReplaceMatch struct {
	RepositoryID   int64  `json:"repository_id"`
	RepositoryName string `json:"repository_name"`
	FilePath       string `json:"file_path"`
}

ReplaceMatch represents a file match for replacement (from preview).

type ReplacePayload

type ReplacePayload struct {
	// Search parameters (kept for reference/logging, not used for search)
	SearchPattern string   `json:"search_pattern"`
	ReplaceWith   string   `json:"replace_with"`
	IsRegex       bool     `json:"is_regex"`
	CaseSensitive bool     `json:"case_sensitive"`
	FilePatterns  []string `json:"file_patterns,omitempty"`

	// Matches from preview (required - Execute uses these directly)
	Matches []ReplaceMatch `json:"matches"`

	// MR/PR options (MR is always created)
	BranchName    string `json:"branch_name,omitempty"`
	MRTitle       string `json:"mr_title,omitempty"`
	MRDescription string `json:"mr_description,omitempty"`

	// User-provided tokens for repos without server-side authentication
	// Map of connection_id (as string) -> token
	UserTokens map[string]string `json:"user_tokens,omitempty"`

	// ReposReadOnly indicates if the system is in read-only mode
	// When true, only user-provided tokens can be used (DB tokens are for indexing only)
	ReposReadOnly bool `json:"repos_readonly,omitempty"`
}

ReplacePayload is the payload for replace jobs.

type ShardedQueue

type ShardedQueue struct {
	*Queue
	// contains filtered or unexported fields
}

ShardedQueue extends Queue with shard-aware job processing.

func NewShardedQueue

func NewShardedQueue(client *redis.Client) *ShardedQueue

NewShardedQueue creates a shard-aware queue.

func (*ShardedQueue) DequeueForShard

func (sq *ShardedQueue) DequeueForShard(ctx context.Context, timeout time.Duration) (*Job, error)

DequeueForShard retrieves a job that belongs to this shard It will skip jobs that belong to other shards (re-queue them). Priority queue (replace jobs) is checked first, then normal queue.

func (*ShardedQueue) GetShardIndex

func (sq *ShardedQueue) GetShardIndex() int

GetShardIndex returns this worker's shard index.

func (*ShardedQueue) GetTotalShards

func (sq *ShardedQueue) GetTotalShards() int

GetTotalShards returns total number of shards.

func (*ShardedQueue) Heartbeat

func (sq *ShardedQueue) Heartbeat(ctx context.Context, jobID string) error

Heartbeat updates the TTL for a job we're processing Call this periodically for long-running jobs.

func (*ShardedQueue) IsShardingEnabled

func (sq *ShardedQueue) IsShardingEnabled() bool

IsShardingEnabled returns whether sharding is enabled.

func (*ShardedQueue) MarkCompletedAndRelease

func (sq *ShardedQueue) MarkCompletedAndRelease(ctx context.Context, jobID string) error

MarkCompletedAndRelease marks a job complete and releases it.

func (*ShardedQueue) MarkFailedAndRelease

func (sq *ShardedQueue) MarkFailedAndRelease(
	ctx context.Context,
	jobID string,
	jobErr error,
) error

MarkFailedAndRelease marks a job failed and releases it. If the job has retries remaining, it will be scheduled for retry instead of being marked as permanently failed.

func (*ShardedQueue) RecoverStaleJobs

func (sq *ShardedQueue) RecoverStaleJobs(ctx context.Context) (int, error)

RecoverStaleJobs finds jobs that were being processed but worker died Call this periodically from a leader/coordinator.

func (*ShardedQueue) RecoveryLoop

func (sq *ShardedQueue) RecoveryLoop(ctx context.Context, interval time.Duration)

RecoveryLoop runs periodic recovery of stale jobs.

func (*ShardedQueue) ReleaseJob

func (sq *ShardedQueue) ReleaseJob(ctx context.Context, jobID string) error

ReleaseJob releases a job from processing (on completion or failure).

type SyncPayload

type SyncPayload struct {
	ConnectionID int64 `json:"connection_id"`
}

SyncPayload is the payload for sync jobs.

type TLSConfig

type TLSConfig struct {
	Enabled    bool   // Enable TLS
	SkipVerify bool   // Skip certificate verification (insecure)
	CertFile   string // Path to client certificate file (for mTLS)
	KeyFile    string // Path to client key file (for mTLS)
	CACertFile string // Path to CA certificate file
	ServerName string // Override server name for TLS verification
}

TLSConfig holds TLS configuration options for Redis connections.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL