Documentation
¶
Index ¶
- Constants
- Variables
- func GetService[T startstop.Service](maintainer *QueueMaintainer) T
- type DefaultReindexerSchedule
- type InsertFunc
- type JobCleaner
- type JobCleanerConfig
- type JobCleanerTestSignals
- type JobRescuer
- type JobRescuerConfig
- type JobRescuerTestSignals
- type JobScheduler
- type JobSchedulerConfig
- type JobSchedulerTestSignals
- type NotifyInsertFunc
- type PeriodicJob
- type PeriodicJobEnqueuer
- func (s *PeriodicJobEnqueuer) AddManySafely(periodicJobs []*PeriodicJob) ([]rivertype.PeriodicJobHandle, error)
- func (s *PeriodicJobEnqueuer) AddSafely(periodicJob *PeriodicJob) (rivertype.PeriodicJobHandle, error)
- func (s *PeriodicJobEnqueuer) Clear()
- func (s *PeriodicJobEnqueuer) Remove(periodicJobHandle rivertype.PeriodicJobHandle)
- func (s *PeriodicJobEnqueuer) RemoveByID(id string) bool
- func (s *PeriodicJobEnqueuer) RemoveMany(periodicJobHandles []rivertype.PeriodicJobHandle)
- func (s *PeriodicJobEnqueuer) RemoveManyByID(ids []string)
- func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error
- type PeriodicJobEnqueuerConfig
- type PeriodicJobEnqueuerTestSignals
- type QueueCleaner
- type QueueCleanerConfig
- type QueueCleanerTestSignals
- type QueueMaintainer
- type Reindexer
- type ReindexerConfig
- type ReindexerTestSignals
Constants ¶
const ( JobRescuerRescueAfterDefault = time.Hour JobRescuerIntervalDefault = 30 * time.Second )
const ( ReindexerIntervalDefault = 24 * time.Hour // We've had user reports of builds taking 45 seconds on large tables, so // set a timeout of that plus a little margin. Use of `CONCURRENTLY` should // prevent index operations that run a little long from impacting work from // an operational standpoint. // // https://github.com/riverqueue/river/issues/909#issuecomment-2909949466 ReindexerTimeoutDefault = 1 * time.Minute )
const (
JobSchedulerIntervalDefault = 5 * time.Second
)
const (
QueueRetentionPeriodDefault = 24 * time.Hour
)
Variables ¶
var ErrNoJobToInsert = errors.New("a nil job was returned, nothing to insert")
ErrNoJobToInsert can be returned by a PeriodicJob's JobToInsertFunc to signal that there's no job to insert at this time.
Functions ¶
func GetService ¶
func GetService[T startstop.Service](maintainer *QueueMaintainer) T
GetService is a convenience method for getting a service by name and casting it to the desired type. It should only be used in tests due to its use of reflection and potential for panics.
Types ¶
type DefaultReindexerSchedule ¶ added in v0.16.0
type DefaultReindexerSchedule struct{}
DefaultReindexerSchedule is a default schedule for the reindexer job which runs at midnight UTC daily.
type InsertFunc ¶ added in v0.14.2
type InsertFunc func(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) ([]*rivertype.JobInsertResult, error)
type JobCleaner ¶
type JobCleaner struct {
riversharedmaintenance.QueueMaintainerServiceBase
startstop.BaseStartStop
// exported for test purposes
Config *JobCleanerConfig
TestSignals JobCleanerTestSignals
// contains filtered or unexported fields
}
JobCleaner periodically removes finalized jobs that are cancelled, completed, or discarded. Each state's retention time can be configured individually.
func NewJobCleaner ¶
func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, exec riverdriver.Executor) *JobCleaner
type JobCleanerConfig ¶
type JobCleanerConfig struct {
riversharedmaintenance.BatchSizes
// CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs
// around before they're removed permanently.
//
// The special value -1 disables deletion of cancelled jobs.
CancelledJobRetentionPeriod time.Duration
// CompletedJobRetentionPeriod is the amount of time to keep completed jobs
// around before they're removed permanently.
//
// The special value -1 disables deletion of completed jobs.
CompletedJobRetentionPeriod time.Duration
// DiscardedJobRetentionPeriod is the amount of time to keep cancelled jobs
// around before they're removed permanently.
//
// The special value -1 disables deletion of discarded jobs.
DiscardedJobRetentionPeriod time.Duration
// Interval is the amount of time to wait between runs of the cleaner.
Interval time.Duration
// QueuesExcluded are queues that'll be excluded from cleaning.
QueuesExcluded []string
// Schema where River tables are located. Empty string omits schema, causing
// Postgres to default to `search_path`.
Schema string
// Timeout of the individual queries in the job cleaner.
Timeout time.Duration
}
type JobCleanerTestSignals ¶
type JobCleanerTestSignals struct {
DeletedBatch testsignal.TestSignal[struct{}] // notifies when runOnce finishes a pass
}
Test-only properties.
func (*JobCleanerTestSignals) Init ¶
func (ts *JobCleanerTestSignals) Init(tb testutil.TestingTB)
type JobRescuer ¶ added in v0.0.23
type JobRescuer struct {
riversharedmaintenance.QueueMaintainerServiceBase
startstop.BaseStartStop
// exported for test purposes
Config *JobRescuerConfig
TestSignals JobRescuerTestSignals
// contains filtered or unexported fields
}
JobRescuer periodically rescues jobs that have been executing for too long and are considered to be "stuck".
func NewRescuer ¶ added in v0.0.6
func NewRescuer(archetype *baseservice.Archetype, config *JobRescuerConfig, exec riverdriver.Executor) *JobRescuer
type JobRescuerConfig ¶ added in v0.0.23
type JobRescuerConfig struct {
riversharedmaintenance.BatchSizes
// ClientRetryPolicy is the default retry policy to use for workers that don't
// override NextRetry.
ClientRetryPolicy jobexecutor.ClientRetryPolicy
// Interval is the amount of time to wait between runs of the rescuer.
Interval time.Duration
// RescueAfter is the amount of time for a job to be active before it is
// considered stuck and should be rescued.
RescueAfter time.Duration
// Schema where River tables are located. Empty string omits schema, causing
// Postgres to default to `search_path`.
Schema string
WorkUnitFactoryFunc func(kind string) workunit.WorkUnitFactory
}
type JobRescuerTestSignals ¶ added in v0.0.23
type JobRescuerTestSignals struct {
FetchedBatch testsignal.TestSignal[struct{}] // notifies when runOnce has fetched a batch of jobs
UpdatedBatch testsignal.TestSignal[struct{}] // notifies when runOnce has updated rescued jobs from a batch
}
Test-only properties.
func (*JobRescuerTestSignals) Init ¶ added in v0.0.23
func (ts *JobRescuerTestSignals) Init(tb testutil.TestingTB)
type JobScheduler ¶ added in v0.0.23
type JobScheduler struct {
riversharedmaintenance.QueueMaintainerServiceBase
startstop.BaseStartStop
// exported for test purposes
TestSignals JobSchedulerTestSignals
// contains filtered or unexported fields
}
JobScheduler periodically moves jobs in `scheduled` or `retryable` state and which are ready to run over to `available` so that they're eligible to be worked.
func NewJobScheduler ¶ added in v0.5.0
func NewJobScheduler(archetype *baseservice.Archetype, config *JobSchedulerConfig, exec riverdriver.Executor) *JobScheduler
type JobSchedulerConfig ¶ added in v0.0.23
type JobSchedulerConfig struct {
riversharedmaintenance.BatchSizes
// Interval is the amount of time between periodic checks for jobs to
// be moved from "scheduled" to "available".
Interval time.Duration
// NotifyInsert is a function to call to emit notifications for queues
// where jobs were scheduled.
NotifyInsert NotifyInsertFunc
// Schema where River tables are located. Empty string omits schema, causing
// Postgres to default to `search_path`.
Schema string
}
type JobSchedulerTestSignals ¶ added in v0.0.23
type JobSchedulerTestSignals struct {
NotifiedQueues testsignal.TestSignal[[]string] // notifies when queues are sent an insert notification
ScheduledBatch testsignal.TestSignal[struct{}] // notifies when runOnce finishes a pass
}
Test-only properties.
func (*JobSchedulerTestSignals) Init ¶ added in v0.0.23
func (ts *JobSchedulerTestSignals) Init(tb testutil.TestingTB)
type NotifyInsertFunc ¶ added in v0.5.0
type NotifyInsertFunc func(ctx context.Context, execTx riverdriver.ExecutorTx, queues []string) error
NotifyInsert is a function to call to emit notifications for queues where jobs were scheduled.
type PeriodicJob ¶
type PeriodicJob struct {
ID string
ConstructorFunc func() (*rivertype.JobInsertParams, error)
RunOnStart bool
ScheduleFunc func(time.Time) time.Time
// contains filtered or unexported fields
}
PeriodicJob is a periodic job to be run. It's similar to the top-level river.PeriodicJobArgs, but needs a separate type because the enqueuer is in a subpackage.
type PeriodicJobEnqueuer ¶
type PeriodicJobEnqueuer struct {
riversharedmaintenance.QueueMaintainerServiceBase
startstop.BaseStartStop
// exported for test purposes
Config *PeriodicJobEnqueuerConfig
TestSignals PeriodicJobEnqueuerTestSignals
// contains filtered or unexported fields
}
PeriodicJobEnqueuer inserts jobs configured to run periodically as unique jobs to make sure they'll run as frequently as their period dictates.
func NewPeriodicJobEnqueuer ¶
func NewPeriodicJobEnqueuer(archetype *baseservice.Archetype, config *PeriodicJobEnqueuerConfig, exec riverdriver.Executor) (*PeriodicJobEnqueuer, error)
func (*PeriodicJobEnqueuer) AddManySafely ¶ added in v0.23.0
func (s *PeriodicJobEnqueuer) AddManySafely(periodicJobs []*PeriodicJob) ([]rivertype.PeriodicJobHandle, error)
AddManySafely adds many new periodic job to the enqueuer. The service's run loop is woken immediately so that the job is scheduled appropriately, and inserted if any RunOnStart flags are set to true.
func (*PeriodicJobEnqueuer) AddSafely ¶ added in v0.23.0
func (s *PeriodicJobEnqueuer) AddSafely(periodicJob *PeriodicJob) (rivertype.PeriodicJobHandle, error)
AddSafely adds a new periodic job to the enqueuer. The service's run loop is woken immediately so that the job is scheduled appropriately, and inserted if its RunOnStart flag is set to true.
func (*PeriodicJobEnqueuer) Clear ¶ added in v0.2.0
func (s *PeriodicJobEnqueuer) Clear()
Clear clears all periodic jobs from the enqueuer.
func (*PeriodicJobEnqueuer) Remove ¶ added in v0.2.0
func (s *PeriodicJobEnqueuer) Remove(periodicJobHandle rivertype.PeriodicJobHandle)
Remove removes a periodic job from the enqueuer. Its current target run time and all future runs are cancelled.
func (*PeriodicJobEnqueuer) RemoveByID ¶ added in v0.27.0
func (s *PeriodicJobEnqueuer) RemoveByID(id string) bool
Remove removes a periodic job from the enqueuer by ID. Its current target run time and all future runs are cancelled.
func (*PeriodicJobEnqueuer) RemoveMany ¶ added in v0.2.0
func (s *PeriodicJobEnqueuer) RemoveMany(periodicJobHandles []rivertype.PeriodicJobHandle)
RemoveMany removes many periodic jobs from the enqueuer. Their current target run time and all future runs are cancelled.
func (*PeriodicJobEnqueuer) RemoveManyByID ¶ added in v0.27.0
func (s *PeriodicJobEnqueuer) RemoveManyByID(ids []string)
RemoveMany removes many periodic jobs from the enqueuer by ID. Their current target run time and all future runs are cancelled.
type PeriodicJobEnqueuerConfig ¶
type PeriodicJobEnqueuerConfig struct {
AdvisoryLockPrefix int32
// Insert is the function to call to insert jobs into the database.
Insert InsertFunc
// PeriodicJobs are the periodic jobs with which to configure the enqueuer.
PeriodicJobs []*PeriodicJob
// Pilot is a plugin module providing additional non-standard functionality.
Pilot riverpilot.PilotPeriodicJob
// Schema where River tables are located. Empty string omits schema, causing
// Postgres to default to `search_path`.
Schema string
}
type PeriodicJobEnqueuerTestSignals ¶
type PeriodicJobEnqueuerTestSignals struct {
EnteredLoop testsignal.TestSignal[struct{}] // notifies when the enqueuer finishes start up and enters its initial run loop
InsertedJobs testsignal.TestSignal[struct{}] // notifies when a batch of jobs is inserted
PeriodicJobKeepAliveAndReap testsignal.TestSignal[struct{}] // notifies when the background services that runs keep alive and reap on periodic jobs ticks
PeriodicJobUpserted testsignal.TestSignal[struct{}] // notifies when a batch of periodic job records are upserted to pilot
SkippedJob testsignal.TestSignal[struct{}] // notifies when a job is skipped because of nil JobInsertParams
}
Test-only properties.
func (*PeriodicJobEnqueuerTestSignals) Init ¶
func (ts *PeriodicJobEnqueuerTestSignals) Init(tb testutil.TestingTB)
type QueueCleaner ¶ added in v0.5.0
type QueueCleaner struct {
riversharedmaintenance.QueueMaintainerServiceBase
startstop.BaseStartStop
// exported for test purposes
Config *QueueCleanerConfig
TestSignals QueueCleanerTestSignals
// contains filtered or unexported fields
}
QueueCleaner periodically removes queues from the river_queue table that have not been updated in a while, indicating that they are no longer active.
func NewQueueCleaner ¶ added in v0.5.0
func NewQueueCleaner(archetype *baseservice.Archetype, config *QueueCleanerConfig, exec riverdriver.Executor) *QueueCleaner
type QueueCleanerConfig ¶ added in v0.5.0
type QueueCleanerConfig struct {
riversharedmaintenance.BatchSizes
// Interval is the amount of time to wait between runs of the cleaner.
Interval time.Duration
// RetentionPeriod is the amount of time to keep queues around before they're
// removed.
RetentionPeriod time.Duration
// Schema where River tables are located. Empty string omits schema, causing
// Postgres to default to `search_path`.
Schema string
}
type QueueCleanerTestSignals ¶ added in v0.5.0
type QueueCleanerTestSignals struct {
DeletedBatch testsignal.TestSignal[struct{}] // notifies when runOnce finishes a pass
}
Test-only properties.
func (*QueueCleanerTestSignals) Init ¶ added in v0.5.0
func (ts *QueueCleanerTestSignals) Init(tb testutil.TestingTB)
type QueueMaintainer ¶
type QueueMaintainer struct {
baseservice.BaseService
startstop.BaseStartStop
// contains filtered or unexported fields
}
QueueMaintainer runs regular maintenance operations against job queues, like pruning completed jobs. It runs only on the client which has been elected leader at any given time.
Its methods are not safe for concurrent usage.
func NewQueueMaintainer ¶
func NewQueueMaintainer(archetype *baseservice.Archetype, services []startstop.Service) *QueueMaintainer
func (*QueueMaintainer) StaggerStartupDisable ¶ added in v0.2.0
func (m *QueueMaintainer) StaggerStartupDisable(disabled bool)
StaggerStartupDisable sets whether the short staggered sleep on start up is disabled. This is useful in tests where the extra sleep involved in a staggered start up is not helpful for test run time.
type Reindexer ¶
type Reindexer struct {
riversharedmaintenance.QueueMaintainerServiceBase
startstop.BaseStartStop
// exported for test purposes
Config *ReindexerConfig
TestSignals ReindexerTestSignals
// contains filtered or unexported fields
}
Reindexer periodically executes a REINDEX command on the important job indexes to rebuild them and fix bloat issues.
func NewReindexer ¶
func NewReindexer(archetype *baseservice.Archetype, config *ReindexerConfig, exec riverdriver.Executor) *Reindexer
type ReindexerConfig ¶
type ReindexerConfig struct {
// IndexNames is a list of indexes to reindex on each run.
IndexNames []string
// ScheduleFunc returns the next scheduled run time for the reindexer given the
// current time.
ScheduleFunc func(time.Time) time.Time
// Schema where River tables are located. Empty string omits schema, causing
// Postgres to default to `search_path`.
Schema string
// Timeout is the amount of time to wait for a single reindex query to run
// before cancelling it via context.
Timeout time.Duration
}
type ReindexerTestSignals ¶
type ReindexerTestSignals struct {
Reindexed testsignal.TestSignal[struct{}] // notifies when a run finishes executing reindexes for all indexes
}
Test-only properties.
func (*ReindexerTestSignals) Init ¶
func (ts *ReindexerTestSignals) Init(tb testutil.TestingTB)