Documentation
¶
Index ¶
- func MustMarshal(v interface{}) json.RawMessage
- func NewDelayedJob(delay time.Duration) *job.Job
- func NewEmailJob(to, subject string) *job.Job
- func NewFailingJob() *job.Job
- func NewHighPriorityJob() *job.Job
- func NewJobWithPayload(jobType string, payload interface{}) *job.Job
- func NewJobWithRetries(maxRetries int) *job.Job
- func NewJobWithTimeout(timeout time.Duration) *job.Job
- func NewLowPriorityJob() *job.Job
- func NewRandomTestJob(opts ...job.Option) *job.Job
- func NewTestJob(opts ...job.Option) *job.Job
- func NewTestJobWithID(id string, opts ...job.Option) *job.Job
- type EmailPayload
- type ImageResizePayload
- type JobAssertions
- type MockBroker
- func (m *MockBroker) Ack(ctx context.Context, j *job.Job) error
- func (m *MockBroker) Close() error
- func (m *MockBroker) DelayedJobs() []*job.Job
- func (m *MockBroker) DeleteJob(ctx context.Context, jobID string) error
- func (m *MockBroker) Dequeue(ctx context.Context, queues []string, timeout time.Duration) (*job.Job, error)
- func (m *MockBroker) Enqueue(ctx context.Context, j *job.Job) error
- func (m *MockBroker) FailedJobs() []*job.Job
- func (m *MockBroker) GetDelayedJobs(ctx context.Context, until time.Time, limit int64) ([]*job.Job, error)
- func (m *MockBroker) GetJob(ctx context.Context, jobID string) (*job.Job, error)
- func (m *MockBroker) GetPendingJobs(ctx context.Context, queue string, idleTime time.Duration) ([]*job.Job, error)
- func (m *MockBroker) GetQueueDepth(ctx context.Context, queue string) (int64, error)
- func (m *MockBroker) MoveDelayedToQueue(ctx context.Context, j *job.Job) error
- func (m *MockBroker) Nack(ctx context.Context, j *job.Job, err error) error
- func (m *MockBroker) Ping(ctx context.Context) error
- func (m *MockBroker) ProcessedJobs() []*job.Job
- func (m *MockBroker) QueuedJobs() []*job.Job
- func (m *MockBroker) RequeueStaleJob(ctx context.Context, j *job.Job) error
- func (m *MockBroker) Reset()
- func (m *MockBroker) Schedule(ctx context.Context, j *job.Job, at time.Time) error
- type MockDLQRepository
- func (m *MockDLQRepository) Add(ctx context.Context, j *job.Job, errMsg string) error
- func (m *MockDLQRepository) AddJob(dlj *repository.DeadLetterJob)
- func (m *MockDLQRepository) Count(ctx context.Context) (int64, error)
- func (m *MockDLQRepository) Delete(ctx context.Context, id string) error
- func (m *MockDLQRepository) GetByID(ctx context.Context, id string) (*repository.DeadLetterJob, error)
- func (m *MockDLQRepository) List(ctx context.Context, filter repository.DLQFilter) ([]*repository.DeadLetterJob, error)
- func (m *MockDLQRepository) MarkRequeued(ctx context.Context, id string) error
- type MockExecutionRepository
- func (m *MockExecutionRepository) AddExecution(exec *repository.JobExecution)
- func (m *MockExecutionRepository) Create(ctx context.Context, exec *repository.JobExecution) error
- func (m *MockExecutionRepository) GetByJobID(ctx context.Context, jobID string) ([]*repository.JobExecution, error)
- func (m *MockExecutionRepository) GetStats(ctx context.Context, fromDate, toDate time.Time) (*repository.ExecutionStats, error)
- func (m *MockExecutionRepository) List(ctx context.Context, filter repository.ExecutionFilter) ([]*repository.JobExecution, error)
- func (m *MockExecutionRepository) RecordExecution(ctx context.Context, j *job.Job, workerID string, execErr error) error
- type MockScheduleRepository
- func (m *MockScheduleRepository) AddSchedule(s *repository.JobSchedule)
- func (m *MockScheduleRepository) Create(ctx context.Context, input repository.CreateScheduleInput) (*repository.JobSchedule, error)
- func (m *MockScheduleRepository) Delete(ctx context.Context, id string) error
- func (m *MockScheduleRepository) GetByID(ctx context.Context, id string) (*repository.JobSchedule, error)
- func (m *MockScheduleRepository) ListActive(ctx context.Context) ([]*repository.JobSchedule, error)
- func (m *MockScheduleRepository) ListDue(ctx context.Context, until time.Time) ([]*repository.JobSchedule, error)
- func (m *MockScheduleRepository) RecordRun(ctx context.Context, id string, status string, nextRunAt time.Time) error
- func (m *MockScheduleRepository) SetActive(ctx context.Context, id string, active bool) error
- func (m *MockScheduleRepository) UpdateNextRun(ctx context.Context, id string, nextRunAt time.Time) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MustMarshal ¶
func MustMarshal(v interface{}) json.RawMessage
MustMarshal marshals to JSON or panics.
func NewDelayedJob ¶
NewDelayedJob creates a job scheduled for future execution.
func NewEmailJob ¶
NewEmailJob creates a test email job.
func NewFailingJob ¶
NewFailingJob creates a job that should fail (no retries).
func NewHighPriorityJob ¶
NewHighPriorityJob creates a high priority test job.
func NewJobWithPayload ¶
NewJobWithPayload creates a job with a custom payload.
func NewJobWithRetries ¶
NewJobWithRetries creates a job with a specific max retries.
func NewJobWithTimeout ¶
NewJobWithTimeout creates a job with a specific timeout.
func NewLowPriorityJob ¶
NewLowPriorityJob creates a low priority test job.
func NewRandomTestJob ¶
NewRandomTestJob creates a test job with a random ID.
func NewTestJob ¶
NewTestJob creates a test job with default values.
Types ¶
type EmailPayload ¶
type EmailPayload struct {
To string `json:"to"`
Subject string `json:"subject"`
Body string `json:"body"`
}
EmailPayload represents an email job payload.
type ImageResizePayload ¶
type ImageResizePayload struct {
ImageURL string `json:"image_url"`
Width int `json:"width"`
Height int `json:"height"`
Format string `json:"format"`
}
ImageResizePayload represents an image resize job payload.
type JobAssertions ¶
JobAssertions provides test assertions for jobs.
func Assert ¶
func Assert(j *job.Job) *JobAssertions
Assert creates a new JobAssertions for the given job.
func (*JobAssertions) HasRetryCount ¶
func (a *JobAssertions) HasRetryCount(expected int) bool
HasRetryCount checks the retry count.
func (*JobAssertions) HasState ¶
func (a *JobAssertions) HasState(expected job.State) bool
HasState checks if the job has the expected state.
func (*JobAssertions) IsCompleted ¶
func (a *JobAssertions) IsCompleted() bool
IsCompleted checks if the job is completed.
func (*JobAssertions) IsDead ¶
func (a *JobAssertions) IsDead() bool
IsDead checks if the job is dead (in DLQ).
func (*JobAssertions) IsFailed ¶
func (a *JobAssertions) IsFailed() bool
IsFailed checks if the job is failed.
type MockBroker ¶
type MockBroker struct {
// Function overrides for custom behavior
EnqueueFunc func(ctx context.Context, j *job.Job) error
DequeueFunc func(ctx context.Context, queues []string, timeout time.Duration) (*job.Job, error)
AckFunc func(ctx context.Context, j *job.Job) error
NackFunc func(ctx context.Context, j *job.Job, err error) error
ScheduleFunc func(ctx context.Context, j *job.Job, at time.Time) error
GetJobFunc func(ctx context.Context, jobID string) (*job.Job, error)
DeleteJobFunc func(ctx context.Context, jobID string) error
GetQueueDepthFunc func(ctx context.Context, queue string) (int64, error)
GetDelayedJobsFunc func(ctx context.Context, until time.Time, limit int64) ([]*job.Job, error)
MoveDelayedToQueueFunc func(ctx context.Context, j *job.Job) error
GetPendingJobsFunc func(ctx context.Context, queue string, idleTime time.Duration) ([]*job.Job, error)
RequeueStaleJobFunc func(ctx context.Context, j *job.Job) error
PingFunc func(ctx context.Context) error
CloseFunc func() error
// contains filtered or unexported fields
}
MockBroker is a test double for the Broker interface.
func NewMockBroker ¶
func NewMockBroker() *MockBroker
NewMockBroker creates a new MockBroker with default behavior.
func (*MockBroker) DelayedJobs ¶
func (m *MockBroker) DelayedJobs() []*job.Job
DelayedJobs returns all delayed jobs.
func (*MockBroker) DeleteJob ¶
func (m *MockBroker) DeleteJob(ctx context.Context, jobID string) error
DeleteJob removes a job.
func (*MockBroker) Dequeue ¶
func (m *MockBroker) Dequeue(ctx context.Context, queues []string, timeout time.Duration) (*job.Job, error)
Dequeue returns the next job from the mock queue.
func (*MockBroker) FailedJobs ¶
func (m *MockBroker) FailedJobs() []*job.Job
FailedJobs returns all failed jobs.
func (*MockBroker) GetDelayedJobs ¶
func (m *MockBroker) GetDelayedJobs(ctx context.Context, until time.Time, limit int64) ([]*job.Job, error)
GetDelayedJobs returns delayed jobs ready for processing.
func (*MockBroker) GetPendingJobs ¶
func (m *MockBroker) GetPendingJobs(ctx context.Context, queue string, idleTime time.Duration) ([]*job.Job, error)
GetPendingJobs returns jobs that are being processed.
func (*MockBroker) GetQueueDepth ¶
GetQueueDepth returns the number of queued jobs.
func (*MockBroker) MoveDelayedToQueue ¶
MoveDelayedToQueue moves a delayed job to the queue.
func (*MockBroker) Ping ¶
func (m *MockBroker) Ping(ctx context.Context) error
Ping checks the connection.
func (*MockBroker) ProcessedJobs ¶
func (m *MockBroker) ProcessedJobs() []*job.Job
ProcessedJobs returns all successfully processed jobs.
func (*MockBroker) QueuedJobs ¶
func (m *MockBroker) QueuedJobs() []*job.Job
QueuedJobs returns all queued jobs.
func (*MockBroker) RequeueStaleJob ¶
RequeueStaleJob requeues a stale job.
type MockDLQRepository ¶ added in v0.5.0
type MockDLQRepository struct {
ListFunc func(ctx context.Context, filter repository.DLQFilter) ([]*repository.DeadLetterJob, error)
GetByIDFunc func(ctx context.Context, id string) (*repository.DeadLetterJob, error)
MarkRequeuedFunc func(ctx context.Context, id string) error
DeleteFunc func(ctx context.Context, id string) error
CountFunc func(ctx context.Context) (int64, error)
// contains filtered or unexported fields
}
MockDLQRepository is a test double for DLQRepository.
func NewMockDLQRepository ¶ added in v0.5.0
func NewMockDLQRepository() *MockDLQRepository
func (*MockDLQRepository) AddJob ¶ added in v0.5.0
func (m *MockDLQRepository) AddJob(dlj *repository.DeadLetterJob)
func (*MockDLQRepository) Count ¶ added in v0.5.0
func (m *MockDLQRepository) Count(ctx context.Context) (int64, error)
func (*MockDLQRepository) Delete ¶ added in v0.5.0
func (m *MockDLQRepository) Delete(ctx context.Context, id string) error
func (*MockDLQRepository) GetByID ¶ added in v0.5.0
func (m *MockDLQRepository) GetByID(ctx context.Context, id string) (*repository.DeadLetterJob, error)
func (*MockDLQRepository) List ¶ added in v0.5.0
func (m *MockDLQRepository) List(ctx context.Context, filter repository.DLQFilter) ([]*repository.DeadLetterJob, error)
func (*MockDLQRepository) MarkRequeued ¶ added in v0.5.0
func (m *MockDLQRepository) MarkRequeued(ctx context.Context, id string) error
type MockExecutionRepository ¶ added in v0.5.0
type MockExecutionRepository struct {
ListFunc func(ctx context.Context, filter repository.ExecutionFilter) ([]*repository.JobExecution, error)
GetByJobIDFunc func(ctx context.Context, jobID string) ([]*repository.JobExecution, error)
GetStatsFunc func(ctx context.Context, fromDate, toDate time.Time) (*repository.ExecutionStats, error)
// contains filtered or unexported fields
}
MockExecutionRepository is a test double for ExecutionRepository.
func NewMockExecutionRepository ¶ added in v0.5.0
func NewMockExecutionRepository() *MockExecutionRepository
func (*MockExecutionRepository) AddExecution ¶ added in v0.5.0
func (m *MockExecutionRepository) AddExecution(exec *repository.JobExecution)
func (*MockExecutionRepository) Create ¶ added in v0.5.0
func (m *MockExecutionRepository) Create(ctx context.Context, exec *repository.JobExecution) error
func (*MockExecutionRepository) GetByJobID ¶ added in v0.5.0
func (m *MockExecutionRepository) GetByJobID(ctx context.Context, jobID string) ([]*repository.JobExecution, error)
func (*MockExecutionRepository) GetStats ¶ added in v0.5.0
func (m *MockExecutionRepository) GetStats(ctx context.Context, fromDate, toDate time.Time) (*repository.ExecutionStats, error)
func (*MockExecutionRepository) List ¶ added in v0.5.0
func (m *MockExecutionRepository) List(ctx context.Context, filter repository.ExecutionFilter) ([]*repository.JobExecution, error)
func (*MockExecutionRepository) RecordExecution ¶ added in v0.5.0
type MockScheduleRepository ¶ added in v0.5.0
type MockScheduleRepository struct {
ListActiveFunc func(ctx context.Context) ([]*repository.JobSchedule, error)
ListDueFunc func(ctx context.Context, until time.Time) ([]*repository.JobSchedule, error)
UpdateNextRunFunc func(ctx context.Context, id string, nextRunAt time.Time) error
RecordRunFunc func(ctx context.Context, id string, status string, nextRunAt time.Time) error
// contains filtered or unexported fields
}
MockScheduleRepository is a test double for ScheduleRepository.
func NewMockScheduleRepository ¶ added in v0.5.0
func NewMockScheduleRepository() *MockScheduleRepository
func (*MockScheduleRepository) AddSchedule ¶ added in v0.5.0
func (m *MockScheduleRepository) AddSchedule(s *repository.JobSchedule)
func (*MockScheduleRepository) Create ¶ added in v0.5.0
func (m *MockScheduleRepository) Create(ctx context.Context, input repository.CreateScheduleInput) (*repository.JobSchedule, error)
func (*MockScheduleRepository) Delete ¶ added in v0.5.0
func (m *MockScheduleRepository) Delete(ctx context.Context, id string) error
func (*MockScheduleRepository) GetByID ¶ added in v0.5.0
func (m *MockScheduleRepository) GetByID(ctx context.Context, id string) (*repository.JobSchedule, error)
func (*MockScheduleRepository) ListActive ¶ added in v0.5.0
func (m *MockScheduleRepository) ListActive(ctx context.Context) ([]*repository.JobSchedule, error)
func (*MockScheduleRepository) ListDue ¶ added in v0.5.0
func (m *MockScheduleRepository) ListDue(ctx context.Context, until time.Time) ([]*repository.JobSchedule, error)