Versions in this module Expand all Collapse all v1 v1.0.0 Mar 6, 2026 Changes in this version + const DefaultJobClaimTTL + const DefaultJobTTL + const DefaultMaxAttempts + const DefaultRetryBaseDelay + const DefaultRetryMaxDelay + var ErrJobAlreadyExists = errors.New("job already exists") + func CalculateRetryDelay(attempt int) time.Duration + func Connect(addr, password string, db int) (*redis.Client, error) + func ConnectWithTLS(addr, password string, db int, tlsCfg *TLSConfig) (*redis.Client, error) + type BulkActionResult struct + Failed int + Processed int + Succeeded int + type CleanupPayload struct + DataDir string + RepositoryID int64 + RepositoryName string + type CleanupResult struct + DeletedCount int + ScannedCount int + type IndexPayload struct + Branch string + Branches []string + CloneURL string + ConnectionID int64 + RepoName string + RepositoryID int64 + type Job struct + Attempts int + CompletedAt *time.Time + CreatedAt time.Time + Error string + ID string + LastError string + MaxAttempts int + NextRetryAt *time.Time + Payload json.RawMessage + Progress *JobProgress + StartedAt *time.Time + Status JobStatus + Type JobType + UpdatedAt time.Time + func (j *Job) ShouldRetry() bool + type JobListOptions struct + CreatedAfter *time.Time + ExcludeStatus JobStatus + Limit int + Offset int + RepoName string + Status JobStatus + Type JobType + type JobListResult struct + HasMore bool + Jobs []*Job + Limit int + Offset int + TotalCount int + type JobProgress struct + Current int + Message string + Total int + type JobStatus string + const JobStatusCompleted + const JobStatusFailed + const JobStatusPending + const JobStatusRunning + type JobType string + const JobTypeCleanup + const JobTypeIndex + const JobTypeReplace + const JobTypeSync + type Queue struct + func NewQueue(client *redis.Client) *Queue + func (q *Queue) BulkCancelJobs(ctx context.Context, jobType JobType, status JobStatus) (*BulkActionResult, error) + func (q *Queue) BulkDeleteJobs(ctx context.Context, jobType JobType, status JobStatus) (*BulkActionResult, error) + func (q *Queue) CleanupOldJobs(ctx context.Context, maxAge time.Duration) (*CleanupResult, error) + func (q *Queue) Clear(ctx context.Context) error + func (q *Queue) Client() *redis.Client + func (q *Queue) DeleteAllJobs(ctx context.Context) (*BulkActionResult, error) + func (q *Queue) Dequeue(ctx context.Context, timeout time.Duration) (*Job, error) + func (q *Queue) Enqueue(ctx context.Context, jobType JobType, payload any) (*Job, error) + func (q *Queue) EnqueueWithOptions(ctx context.Context, jobType JobType, payload any, maxAttempts int) (*Job, error) + func (q *Queue) FindPendingJob(ctx context.Context, jobType JobType, ...) (*Job, error) + func (q *Queue) GetActiveIndexRepos(ctx context.Context) ([]int64, error) + func (q *Queue) GetJob(ctx context.Context, jobID string) (*Job, error) + func (q *Queue) GetRetryQueueLength(ctx context.Context) (int64, error) + func (q *Queue) HasActiveIndexJob(ctx context.Context, repoID int64) (bool, error) + func (q *Queue) HasPendingCleanupJob(ctx context.Context, repoID int64) (bool, error) + func (q *Queue) HasPendingIndexJob(ctx context.Context, repoID int64) (bool, error) + func (q *Queue) HasPendingSyncJob(ctx context.Context, connectionID int64) (bool, error) + func (q *Queue) ListJobsWithOptions(ctx context.Context, opts JobListOptions) (*JobListResult, error) + func (q *Queue) MarkCleanupJobActive(ctx context.Context, repoID int64) error + func (q *Queue) MarkCleanupJobInactive(ctx context.Context, repoID int64) error + func (q *Queue) MarkCompleted(ctx context.Context, jobID string) error + func (q *Queue) MarkFailed(ctx context.Context, jobID string, err error) error + func (q *Queue) MarkIndexJobActive(ctx context.Context, repoID int64) error + func (q *Queue) MarkIndexJobInactive(ctx context.Context, repoID int64) error + func (q *Queue) MarkRunning(ctx context.Context, jobID string) error + func (q *Queue) MarkSyncJobActive(ctx context.Context, connectionID int64) error + func (q *Queue) MarkSyncJobInactive(ctx context.Context, connectionID int64) error + func (q *Queue) ProcessRetryQueue(ctx context.Context) (int, error) + func (q *Queue) QueueLength(ctx context.Context) (int64, error) + func (q *Queue) RebuildJobIndexes(ctx context.Context) (int, error) + func (q *Queue) ScheduleRetry(ctx context.Context, jobID string, jobErr error) (bool, error) + func (q *Queue) TryAcquireCleanupJob(ctx context.Context, repoID int64) (bool, error) + func (q *Queue) TryAcquireIndexJob(ctx context.Context, repoID int64) (bool, error) + func (q *Queue) TryAcquireSyncJob(ctx context.Context, connectionID int64) (bool, error) + func (q *Queue) UpdateProgress(ctx context.Context, jobID string, current, total int, message string) error + func (q *Queue) UpdateStatus(ctx context.Context, jobID string, status JobStatus, errorMsg string) error + type ReplaceMatch struct + FilePath string + RepositoryID int64 + RepositoryName string + type ReplacePayload struct + BranchName string + CaseSensitive bool + FilePatterns []string + IsRegex bool + MRDescription string + MRTitle string + Matches []ReplaceMatch + ReplaceWith string + ReposReadOnly bool + SearchPattern string + UserTokens map[string]string + type ShardedQueue struct + func NewShardedQueue(client *redis.Client) *ShardedQueue + func (sq *ShardedQueue) DequeueForShard(ctx context.Context, timeout time.Duration) (*Job, error) + func (sq *ShardedQueue) GetShardIndex() int + func (sq *ShardedQueue) GetTotalShards() int + func (sq *ShardedQueue) Heartbeat(ctx context.Context, jobID string) error + func (sq *ShardedQueue) IsShardingEnabled() bool + func (sq *ShardedQueue) MarkCompletedAndRelease(ctx context.Context, jobID string) error + func (sq *ShardedQueue) MarkFailedAndRelease(ctx context.Context, jobID string, jobErr error) error + func (sq *ShardedQueue) RecoverStaleJobs(ctx context.Context) (int, error) + func (sq *ShardedQueue) RecoveryLoop(ctx context.Context, interval time.Duration) + func (sq *ShardedQueue) ReleaseJob(ctx context.Context, jobID string) error + type SyncPayload struct + ConnectionID int64 + type TLSConfig struct + CACertFile string + CertFile string + Enabled bool + KeyFile string + ServerName string + SkipVerify bool