Documentation
¶
Index ¶
Constants ¶
const ( // MaintenanceTaskQueue task queue name used for all the periodic maintenance jobs. // These are internal queue internal tasks MaintenanceTaskQueue string = "queue-maintenance" // RetentionTask is finished task cleanup type RetentionTask queue.TaskType = "retention" )
Variables ¶
var ( // ErrScheduleQueueIsEmpty occurs when there is no schedule with // the `next_execution_time` in the past ErrScheduleQueueIsEmpty = errors.New("nothing to schedule") )
var TaskQueueMetrics = struct { Labels []string TaskDuration *prometheus.HistogramVec TaskWaiting *prometheus.HistogramVec DequeueDuration prometheus.Histogram WorkerGauge prometheus.Gauge WorkerWaiting prometheus.Counter WorkerWorking *prometheus.CounterVec WorkerWorkingGauge *prometheus.GaugeVec WorkerTask *prometheus.CounterVec WorkerErrors *prometheus.CounterVec QueueErrors prometheus.Counter }{ Labels: queueMetricLabels, TaskDuration: promauto.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "hub", Subsystem: "worker", Name: "duration_s", Help: "duration of a task in seconds by queue", Buckets: durationSBuckets, }, taskMetricLabels, ), TaskWaiting: promauto.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "hub", Subsystem: "queue", Name: "waiting_duration_s", Help: "duration of a task waiting to start", Buckets: durationSBuckets, }, taskMetricLabels, ), DequeueDuration: promauto.NewHistogram( prometheus.HistogramOpts{ Namespace: "hub", Subsystem: "worker", Name: "wait_duration_s", Help: "duration in seconds a worker spent waiting", Buckets: durationSBuckets, }, ), WorkerGauge: promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "hub", Subsystem: "worker", Name: "count", Help: "count of initialized workers", }), WorkerWaiting: promauto.NewCounter(prometheus.CounterOpts{ Namespace: "hub", Subsystem: "worker", Name: "waiting_count", Help: "count of workers waiting for a task", }), WorkerWorking: promauto.NewCounterVec( prometheus.CounterOpts{ Namespace: "hub", Subsystem: "worker", Name: "working_total", Help: "count of workers working on a task", }, taskMetricLabels, ), WorkerWorkingGauge: promauto.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "hub", Subsystem: "worker", Name: "working", Help: "count of working workers", }, taskMetricLabels, ), WorkerTask: promauto.NewCounterVec( prometheus.CounterOpts{ Namespace: "hub", Subsystem: "worker", Name: "task_total", Help: "count of tasks seen by the worker", }, taskMetricLabels, ), WorkerErrors: promauto.NewCounterVec( prometheus.CounterOpts{ Namespace: "hub", Subsystem: "worker", Name: "error_total", Help: "count of errors seen by the worker", }, taskMetricLabels, ), QueueErrors: promauto.NewCounter(prometheus.CounterOpts{ Namespace: "hub", Subsystem: "worker", Name: "queue_error_total", Help: "count of dequeue errors seen by the worker", }), }
TaskQueueMetrics provides access to the prometheus metric objects for the task queue
var TaskSchedulingMetrics = struct { Labels []string IterationDuration prometheus.Histogram WorkerGauge, WorkerWorkingGauge prometheus.Gauge WorkerWaiting, WorkerWorking, WorkerErrors prometheus.Counter WorkerTaskScheduled *prometheus.CounterVec }{ Labels: queueMetricLabels, IterationDuration: promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: "hub", Subsystem: "scheduling", Name: "iteration_duration_ms", Help: "duration of the scheduling iteration in ms", Buckets: durationMsBuckets, }), WorkerGauge: promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "hub", Subsystem: "scheduling", Name: "count", Help: "count of initialized task scheduling workers", }), WorkerWorkingGauge: promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "hub", Subsystem: "scheduling", Name: "working_count", Help: "count of working task scheduling workers", }), WorkerWaiting: promauto.NewCounter(prometheus.CounterOpts{ Namespace: "hub", Subsystem: "scheduling", Name: "waiting_total", Help: "count of workers waiting for a task to schedule", }), WorkerWorking: promauto.NewCounter(prometheus.CounterOpts{ Namespace: "hub", Subsystem: "scheduling", Name: "working_total", Help: "count of workers working on a task scheduling", }), WorkerErrors: promauto.NewCounter(prometheus.CounterOpts{ Namespace: "hub", Subsystem: "scheduling", Name: "error_total", Help: "count of task scheduling errors seen by the worker", }), WorkerTaskScheduled: promauto.NewCounterVec( prometheus.CounterOpts{ Namespace: "hub", Subsystem: "scheduling", Name: "task_total", Help: "count of tasks scheduled by the worker", }, taskMetricLabels, ), }
TaskSchedulingMetrics provides access to the prometheus metric objects for task scheduling
Functions ¶
func AssertRetentionSchedule ¶ added in v1.10.0
func AssertRetentionSchedule(ctx context.Context, scheduler queue.Scheduler, queueName string, taskType queue.TaskType, status queue.TaskStatus, filter squirrel.Sqlizer, age time.Duration) error
AssertRetentionSchedule creates a new queue retention tasks for the supplied queue, finished tasks matching the supplied parameters will be deleted
Types ¶
type SQLExecTaskSpec ¶ added in v1.10.0
type SQLExecTaskSpec struct {
// SQL is the actual sql that will be run
SQL string `json:"sql"`
}
SQLExecTaskSpec defines a task that simply executes a single SQL statement. This can be used for simple CRON cleanup tasks, for example.
type SQLTaskProgress ¶ added in v1.10.0
type SQLTaskProgress struct {
// Duration of the HTTP request in milliseconds
Duration *int64 `json:"duration,omitempty"`
// RowsAffected
RowsAffected *int64 `json:"rowsAffected,omitempty"`
// ErrorMessage contains an error message string if it occurs during the update process
ErrorMessage *string `json:"errorMessage,omitempty"`
}
SQLTaskProgress contains the generic progress information for a sql task
type TaskHandler ¶
type TaskHandler interface {
// Process implements the specific Task parsing logic
Process(ctx context.Context, task queue.Task, heartbeats chan<- queue.Progress) (err error)
}
TaskHandler is a type alias for a method that parses a task and returns any processing errors
func NewRetentionHandler ¶ added in v1.10.0
func NewRetentionHandler(db *sql.DB) TaskHandler
NewRetentionHandler creates a task handler that will clean up old finished tasks
func NewSQLTaskHandler ¶ added in v1.10.0
func NewSQLTaskHandler(name string, db *sql.DB) TaskHandler
NewSQLTaskHandler creates a sqlTaskHandler handler instance with the given tracing name
type TaskHandlerFunc ¶
TaskHandlerFunc is an adapter that allows the use of a normal function as a TaskHandler
type Worker ¶
type Worker interface {
// Work is responsible for getting and processing tasks
// It should run continuously or until the context is cancelled
Work(context.Context) error
}
Worker provides methods to do some kind of work
func NewScheduleWorker ¶
NewScheduleWorker creates a new task scheduling worker
func NewTaskWorker ¶
func NewTaskWorker(dequeuer queue.Dequeuer, handler TaskHandler) Worker
NewTaskWorker creates a new Task Worker instance