Documentation
¶
Overview ¶
Package repository provides data access for scheduler jobs.
Index ¶
- Variables
- type Job
- type JobFilter
- type JobRepository
- type JobStatus
- type MemoryJobRepository
- func (r *MemoryJobRepository) CountJobs(ctx context.Context, filter JobFilter) (int64, error)
- func (r *MemoryJobRepository) CreateJob(ctx context.Context, job *Job) error
- func (r *MemoryJobRepository) DeleteJob(ctx context.Context, jobID string) error
- func (r *MemoryJobRepository) GetJob(ctx context.Context, jobID string) (*Job, error)
- func (r *MemoryJobRepository) GetJobsByStatus(ctx context.Context, status JobStatus, limit int) ([]Job, error)
- func (r *MemoryJobRepository) GetPendingJobs(ctx context.Context, limit int) ([]Job, error)
- func (r *MemoryJobRepository) GetRecurringJobs(ctx context.Context) ([]Job, error)
- func (r *MemoryJobRepository) IncrementRetryCount(ctx context.Context, jobID string) error
- func (r *MemoryJobRepository) ListJobs(ctx context.Context, filter JobFilter) ([]Job, error)
- func (r *MemoryJobRepository) SetJobResult(ctx context.Context, jobID string, result json.RawMessage) error
- func (r *MemoryJobRepository) UpdateJob(ctx context.Context, job *Job) error
- func (r *MemoryJobRepository) UpdateJobStatus(ctx context.Context, jobID string, status JobStatus, jobErr error) error
- type SQLJobRepository
- func (r *SQLJobRepository) CountJobs(ctx context.Context, filter JobFilter) (int64, error)
- func (r *SQLJobRepository) CreateJob(ctx context.Context, job *Job) error
- func (r *SQLJobRepository) CreateTable(ctx context.Context) error
- func (r *SQLJobRepository) DeleteJob(ctx context.Context, jobID string) error
- func (r *SQLJobRepository) GetJob(ctx context.Context, jobID string) (*Job, error)
- func (r *SQLJobRepository) GetJobsByStatus(ctx context.Context, status JobStatus, limit int) ([]Job, error)
- func (r *SQLJobRepository) GetPendingJobs(ctx context.Context, limit int) ([]Job, error)
- func (r *SQLJobRepository) GetRecurringJobs(ctx context.Context) ([]Job, error)
- func (r *SQLJobRepository) IncrementRetryCount(ctx context.Context, jobID string) error
- func (r *SQLJobRepository) ListJobs(ctx context.Context, filter JobFilter) ([]Job, error)
- func (r *SQLJobRepository) SetJobResult(ctx context.Context, jobID string, result json.RawMessage) error
- func (r *SQLJobRepository) UpdateJob(ctx context.Context, job *Job) error
- func (r *SQLJobRepository) UpdateJobStatus(ctx context.Context, jobID string, status JobStatus, jobErr error) error
Constants ¶
This section is empty.
Variables ¶
var (
ErrJobNotFound = errors.New("job not found")
)
Common errors.
Functions ¶
This section is empty.
Types ¶
type Job ¶
type Job struct {
ID string `json:"id"`
TaskType string `json:"task_type"`
Payload json.RawMessage `json:"payload"`
Status JobStatus `json:"status"`
Queue string `json:"queue"`
ScheduledAt *time.Time `json:"scheduled_at,omitempty"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
FailedAt *time.Time `json:"failed_at,omitempty"`
RetryCount int `json:"retry_count"`
MaxRetries int `json:"max_retries"`
Error string `json:"error,omitempty"`
Result json.RawMessage `json:"result,omitempty"`
CronExpression string `json:"cron_expression,omitempty"`
CronEntryID string `json:"cron_entry_id,omitempty"`
Timeout time.Duration `json:"timeout"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
Metadata map[string]any `json:"metadata,omitempty"`
}
Job represents a scheduled job in the system.
type JobFilter ¶
type JobFilter struct {
Status []JobStatus
TaskTypes []string
Queue string
ScheduledBefore *time.Time
ScheduledAfter *time.Time
CreatedBefore *time.Time
CreatedAfter *time.Time
WithCron *bool
Limit int
Offset int
OrderBy string
OrderDirection string
}
JobFilter contains filter options for listing jobs.
func DefaultFilter ¶
func DefaultFilter() JobFilter
DefaultFilter returns a JobFilter with sensible defaults.
type JobRepository ¶
type JobRepository interface {
// CreateJob creates a new job record.
CreateJob(ctx context.Context, job *Job) error
// GetJob retrieves a job by ID.
GetJob(ctx context.Context, jobID string) (*Job, error)
// UpdateJob updates an existing job.
UpdateJob(ctx context.Context, job *Job) error
// UpdateJobStatus updates only the status and related timestamps.
UpdateJobStatus(ctx context.Context, jobID string, status JobStatus, err error) error
// DeleteJob removes a job.
DeleteJob(ctx context.Context, jobID string) error
// ListJobs lists jobs based on filter criteria.
ListJobs(ctx context.Context, filter JobFilter) ([]Job, error)
// CountJobs counts jobs based on filter criteria.
CountJobs(ctx context.Context, filter JobFilter) (int64, error)
// GetJobsByStatus retrieves jobs by status with a limit.
GetJobsByStatus(ctx context.Context, status JobStatus, limit int) ([]Job, error)
// GetPendingJobs retrieves all pending jobs ready to be executed.
GetPendingJobs(ctx context.Context, limit int) ([]Job, error)
// GetRecurringJobs retrieves all jobs with cron expressions.
GetRecurringJobs(ctx context.Context) ([]Job, error)
// SetJobResult stores the result of a job execution.
SetJobResult(ctx context.Context, jobID string, result json.RawMessage) error
// IncrementRetryCount increments the retry count for a job.
IncrementRetryCount(ctx context.Context, jobID string) error
}
JobRepository defines the interface for job persistence.
type JobStatus ¶
type JobStatus string
JobStatus represents the status of a job.
const ( JobStatusPending JobStatus = "pending" JobStatusScheduled JobStatus = "scheduled" JobStatusRunning JobStatus = "running" JobStatusCompleted JobStatus = "completed" JobStatusFailed JobStatus = "failed" JobStatusCancelled JobStatus = "cancelled" JobStatusRetrying JobStatus = "retrying" )
Job status constants.
type MemoryJobRepository ¶
type MemoryJobRepository struct {
// contains filtered or unexported fields
}
MemoryJobRepository implements JobRepository using in-memory storage. Useful for testing and development.
func NewMemoryJobRepository ¶
func NewMemoryJobRepository() *MemoryJobRepository
NewMemoryJobRepository creates a new in-memory job repository.
func (*MemoryJobRepository) CreateJob ¶
func (r *MemoryJobRepository) CreateJob(ctx context.Context, job *Job) error
CreateJob creates a new job record.
func (*MemoryJobRepository) DeleteJob ¶
func (r *MemoryJobRepository) DeleteJob(ctx context.Context, jobID string) error
DeleteJob removes a job.
func (*MemoryJobRepository) GetJobsByStatus ¶
func (r *MemoryJobRepository) GetJobsByStatus(ctx context.Context, status JobStatus, limit int) ([]Job, error)
GetJobsByStatus retrieves jobs by status with a limit.
func (*MemoryJobRepository) GetPendingJobs ¶
GetPendingJobs retrieves all pending jobs ready to be executed.
func (*MemoryJobRepository) GetRecurringJobs ¶
func (r *MemoryJobRepository) GetRecurringJobs(ctx context.Context) ([]Job, error)
GetRecurringJobs retrieves all jobs with cron expressions.
func (*MemoryJobRepository) IncrementRetryCount ¶
func (r *MemoryJobRepository) IncrementRetryCount(ctx context.Context, jobID string) error
IncrementRetryCount increments the retry count for a job.
func (*MemoryJobRepository) SetJobResult ¶
func (r *MemoryJobRepository) SetJobResult(ctx context.Context, jobID string, result json.RawMessage) error
SetJobResult stores the result of a job execution.
func (*MemoryJobRepository) UpdateJob ¶
func (r *MemoryJobRepository) UpdateJob(ctx context.Context, job *Job) error
UpdateJob updates an existing job.
func (*MemoryJobRepository) UpdateJobStatus ¶
func (r *MemoryJobRepository) UpdateJobStatus(ctx context.Context, jobID string, status JobStatus, jobErr error) error
UpdateJobStatus updates only the status and related timestamps.
type SQLJobRepository ¶
type SQLJobRepository struct {
// contains filtered or unexported fields
}
SQLJobRepository implements JobRepository using SQL database.
func NewSQLJobRepository ¶
func NewSQLJobRepository(db *sql.DB) *SQLJobRepository
NewSQLJobRepository creates a new SQL-based job repository.
func (*SQLJobRepository) CreateJob ¶
func (r *SQLJobRepository) CreateJob(ctx context.Context, job *Job) error
CreateJob creates a new job record.
func (*SQLJobRepository) CreateTable ¶
func (r *SQLJobRepository) CreateTable(ctx context.Context) error
CreateTable creates the scheduler_jobs table if it doesn't exist.
func (*SQLJobRepository) DeleteJob ¶
func (r *SQLJobRepository) DeleteJob(ctx context.Context, jobID string) error
DeleteJob removes a job.
func (*SQLJobRepository) GetJobsByStatus ¶
func (r *SQLJobRepository) GetJobsByStatus(ctx context.Context, status JobStatus, limit int) ([]Job, error)
GetJobsByStatus retrieves jobs by status with a limit.
func (*SQLJobRepository) GetPendingJobs ¶
GetPendingJobs retrieves all pending jobs ready to be executed.
func (*SQLJobRepository) GetRecurringJobs ¶
func (r *SQLJobRepository) GetRecurringJobs(ctx context.Context) ([]Job, error)
GetRecurringJobs retrieves all jobs with cron expressions.
func (*SQLJobRepository) IncrementRetryCount ¶
func (r *SQLJobRepository) IncrementRetryCount(ctx context.Context, jobID string) error
IncrementRetryCount increments the retry count for a job.
func (*SQLJobRepository) SetJobResult ¶
func (r *SQLJobRepository) SetJobResult(ctx context.Context, jobID string, result json.RawMessage) error
SetJobResult stores the result of a job execution.
func (*SQLJobRepository) UpdateJob ¶
func (r *SQLJobRepository) UpdateJob(ctx context.Context, job *Job) error
UpdateJob updates an existing job.
func (*SQLJobRepository) UpdateJobStatus ¶
func (r *SQLJobRepository) UpdateJobStatus(ctx context.Context, jobID string, status JobStatus, jobErr error) error
UpdateJobStatus updates only the status and related timestamps.