repository

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package repository provides data access for scheduler jobs.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrJobNotFound = errors.New("job not found")
)

Common errors.

Functions

This section is empty.

Types

type Job

type Job struct {
	ID             string          `json:"id"`
	TaskType       string          `json:"task_type"`
	Payload        json.RawMessage `json:"payload"`
	Status         JobStatus       `json:"status"`
	Queue          string          `json:"queue"`
	ScheduledAt    *time.Time      `json:"scheduled_at,omitempty"`
	StartedAt      *time.Time      `json:"started_at,omitempty"`
	CompletedAt    *time.Time      `json:"completed_at,omitempty"`
	FailedAt       *time.Time      `json:"failed_at,omitempty"`
	RetryCount     int             `json:"retry_count"`
	MaxRetries     int             `json:"max_retries"`
	Error          string          `json:"error,omitempty"`
	Result         json.RawMessage `json:"result,omitempty"`
	CronExpression string          `json:"cron_expression,omitempty"`
	CronEntryID    string          `json:"cron_entry_id,omitempty"`
	Timeout        time.Duration   `json:"timeout"`
	CreatedAt      time.Time       `json:"created_at"`
	UpdatedAt      time.Time       `json:"updated_at"`
	Metadata       map[string]any  `json:"metadata,omitempty"`
}

Job represents a scheduled job in the system.

type JobFilter

type JobFilter struct {
	Status          []JobStatus
	TaskTypes       []string
	Queue           string
	ScheduledBefore *time.Time
	ScheduledAfter  *time.Time
	CreatedBefore   *time.Time
	CreatedAfter    *time.Time
	WithCron        *bool
	Limit           int
	Offset          int
	OrderBy         string
	OrderDirection  string
}

JobFilter contains filter options for listing jobs.

func DefaultFilter

func DefaultFilter() JobFilter

DefaultFilter returns a JobFilter with sensible defaults.

type JobRepository

type JobRepository interface {
	// CreateJob creates a new job record.
	CreateJob(ctx context.Context, job *Job) error

	// GetJob retrieves a job by ID.
	GetJob(ctx context.Context, jobID string) (*Job, error)

	// UpdateJob updates an existing job.
	UpdateJob(ctx context.Context, job *Job) error

	// UpdateJobStatus updates only the status and related timestamps.
	UpdateJobStatus(ctx context.Context, jobID string, status JobStatus, err error) error

	// DeleteJob removes a job.
	DeleteJob(ctx context.Context, jobID string) error

	// ListJobs lists jobs based on filter criteria.
	ListJobs(ctx context.Context, filter JobFilter) ([]Job, error)

	// CountJobs counts jobs based on filter criteria.
	CountJobs(ctx context.Context, filter JobFilter) (int64, error)

	// GetJobsByStatus retrieves jobs by status with a limit.
	GetJobsByStatus(ctx context.Context, status JobStatus, limit int) ([]Job, error)

	// GetPendingJobs retrieves all pending jobs ready to be executed.
	GetPendingJobs(ctx context.Context, limit int) ([]Job, error)

	// GetRecurringJobs retrieves all jobs with cron expressions.
	GetRecurringJobs(ctx context.Context) ([]Job, error)

	// SetJobResult stores the result of a job execution.
	SetJobResult(ctx context.Context, jobID string, result json.RawMessage) error

	// IncrementRetryCount increments the retry count for a job.
	IncrementRetryCount(ctx context.Context, jobID string) error
}

JobRepository defines the interface for job persistence.

type JobStatus

type JobStatus string

JobStatus represents the status of a job.

const (
	JobStatusPending   JobStatus = "pending"
	JobStatusScheduled JobStatus = "scheduled"
	JobStatusRunning   JobStatus = "running"
	JobStatusCompleted JobStatus = "completed"
	JobStatusFailed    JobStatus = "failed"
	JobStatusCancelled JobStatus = "cancelled"
	JobStatusRetrying  JobStatus = "retrying"
)

Job status constants.

type MemoryJobRepository

type MemoryJobRepository struct {
	// contains filtered or unexported fields
}

MemoryJobRepository implements JobRepository using in-memory storage. Useful for testing and development.

func NewMemoryJobRepository

func NewMemoryJobRepository() *MemoryJobRepository

NewMemoryJobRepository creates a new in-memory job repository.

func (*MemoryJobRepository) CountJobs

func (r *MemoryJobRepository) CountJobs(ctx context.Context, filter JobFilter) (int64, error)

CountJobs counts jobs based on filter criteria.

func (*MemoryJobRepository) CreateJob

func (r *MemoryJobRepository) CreateJob(ctx context.Context, job *Job) error

CreateJob creates a new job record.

func (*MemoryJobRepository) DeleteJob

func (r *MemoryJobRepository) DeleteJob(ctx context.Context, jobID string) error

DeleteJob removes a job.

func (*MemoryJobRepository) GetJob

func (r *MemoryJobRepository) GetJob(ctx context.Context, jobID string) (*Job, error)

GetJob retrieves a job by ID.

func (*MemoryJobRepository) GetJobsByStatus

func (r *MemoryJobRepository) GetJobsByStatus(ctx context.Context, status JobStatus, limit int) ([]Job, error)

GetJobsByStatus retrieves jobs by status with a limit.

func (*MemoryJobRepository) GetPendingJobs

func (r *MemoryJobRepository) GetPendingJobs(ctx context.Context, limit int) ([]Job, error)

GetPendingJobs retrieves all pending jobs ready to be executed.

func (*MemoryJobRepository) GetRecurringJobs

func (r *MemoryJobRepository) GetRecurringJobs(ctx context.Context) ([]Job, error)

GetRecurringJobs retrieves all jobs with cron expressions.

func (*MemoryJobRepository) IncrementRetryCount

func (r *MemoryJobRepository) IncrementRetryCount(ctx context.Context, jobID string) error

IncrementRetryCount increments the retry count for a job.

func (*MemoryJobRepository) ListJobs

func (r *MemoryJobRepository) ListJobs(ctx context.Context, filter JobFilter) ([]Job, error)

ListJobs lists jobs based on filter criteria.

func (*MemoryJobRepository) SetJobResult

func (r *MemoryJobRepository) SetJobResult(ctx context.Context, jobID string, result json.RawMessage) error

SetJobResult stores the result of a job execution.

func (*MemoryJobRepository) UpdateJob

func (r *MemoryJobRepository) UpdateJob(ctx context.Context, job *Job) error

UpdateJob updates an existing job.

func (*MemoryJobRepository) UpdateJobStatus

func (r *MemoryJobRepository) UpdateJobStatus(ctx context.Context, jobID string, status JobStatus, jobErr error) error

UpdateJobStatus updates only the status and related timestamps.

type SQLJobRepository

type SQLJobRepository struct {
	// contains filtered or unexported fields
}

SQLJobRepository implements JobRepository using SQL database.

func NewSQLJobRepository

func NewSQLJobRepository(db *sql.DB) *SQLJobRepository

NewSQLJobRepository creates a new SQL-based job repository.

func (*SQLJobRepository) CountJobs

func (r *SQLJobRepository) CountJobs(ctx context.Context, filter JobFilter) (int64, error)

CountJobs counts jobs based on filter criteria.

func (*SQLJobRepository) CreateJob

func (r *SQLJobRepository) CreateJob(ctx context.Context, job *Job) error

CreateJob creates a new job record.

func (*SQLJobRepository) CreateTable

func (r *SQLJobRepository) CreateTable(ctx context.Context) error

CreateTable creates the scheduler_jobs table if it doesn't exist.

func (*SQLJobRepository) DeleteJob

func (r *SQLJobRepository) DeleteJob(ctx context.Context, jobID string) error

DeleteJob removes a job.

func (*SQLJobRepository) GetJob

func (r *SQLJobRepository) GetJob(ctx context.Context, jobID string) (*Job, error)

GetJob retrieves a job by ID.

func (*SQLJobRepository) GetJobsByStatus

func (r *SQLJobRepository) GetJobsByStatus(ctx context.Context, status JobStatus, limit int) ([]Job, error)

GetJobsByStatus retrieves jobs by status with a limit.

func (*SQLJobRepository) GetPendingJobs

func (r *SQLJobRepository) GetPendingJobs(ctx context.Context, limit int) ([]Job, error)

GetPendingJobs retrieves all pending jobs ready to be executed.

func (*SQLJobRepository) GetRecurringJobs

func (r *SQLJobRepository) GetRecurringJobs(ctx context.Context) ([]Job, error)

GetRecurringJobs retrieves all jobs with cron expressions.

func (*SQLJobRepository) IncrementRetryCount

func (r *SQLJobRepository) IncrementRetryCount(ctx context.Context, jobID string) error

IncrementRetryCount increments the retry count for a job.

func (*SQLJobRepository) ListJobs

func (r *SQLJobRepository) ListJobs(ctx context.Context, filter JobFilter) ([]Job, error)

ListJobs lists jobs based on filter criteria.

func (*SQLJobRepository) SetJobResult

func (r *SQLJobRepository) SetJobResult(ctx context.Context, jobID string, result json.RawMessage) error

SetJobResult stores the result of a job execution.

func (*SQLJobRepository) UpdateJob

func (r *SQLJobRepository) UpdateJob(ctx context.Context, job *Job) error

UpdateJob updates an existing job.

func (*SQLJobRepository) UpdateJobStatus

func (r *SQLJobRepository) UpdateJobStatus(ctx context.Context, jobID string, status JobStatus, jobErr error) error

UpdateJobStatus updates only the status and related timestamps.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL