workers

package
v1.11.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2020 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// MaintenanceTaskQueue task queue name used for all the periodic maintenance jobs.
	// These are internal queue internal tasks
	MaintenanceTaskQueue string = "queue-maintenance"

	// RetentionTask is finished task cleanup type
	RetentionTask queue.TaskType = "retention"
)

Variables

View Source
var (
	// ErrScheduleQueueIsEmpty occurs when there is no schedule with
	// the `next_execution_time` in the past
	ErrScheduleQueueIsEmpty = errors.New("nothing to schedule")
)
View Source
var TaskQueueMetrics = struct {
	Labels             []string
	TaskDuration       *prometheus.HistogramVec
	TaskWaiting        *prometheus.HistogramVec
	DequeueDuration    prometheus.Histogram
	WorkerGauge        prometheus.Gauge
	WorkerWaiting      prometheus.Counter
	WorkerWorking      *prometheus.CounterVec
	WorkerWorkingGauge *prometheus.GaugeVec
	WorkerTask         *prometheus.CounterVec
	WorkerErrors       *prometheus.CounterVec
	QueueErrors        prometheus.Counter
}{
	Labels: queueMetricLabels,
	TaskDuration: promauto.NewHistogramVec(
		prometheus.HistogramOpts{
			Namespace: "hub",
			Subsystem: "worker",
			Name:      "duration_s",
			Help:      "duration of a task in seconds by queue",
			Buckets:   durationSBuckets,
		},
		taskMetricLabels,
	),
	TaskWaiting: promauto.NewHistogramVec(
		prometheus.HistogramOpts{
			Namespace: "hub",
			Subsystem: "queue",
			Name:      "waiting_duration_s",
			Help:      "duration of a task waiting to start",
			Buckets:   durationSBuckets,
		},
		taskMetricLabels,
	),
	DequeueDuration: promauto.NewHistogram(
		prometheus.HistogramOpts{
			Namespace: "hub",
			Subsystem: "worker",
			Name:      "wait_duration_s",
			Help:      "duration in seconds a worker spent waiting",
			Buckets:   durationSBuckets,
		},
	),
	WorkerGauge: promauto.NewGauge(prometheus.GaugeOpts{
		Namespace: "hub",
		Subsystem: "worker",
		Name:      "count",
		Help:      "count of initialized workers",
	}),
	WorkerWaiting: promauto.NewCounter(prometheus.CounterOpts{
		Namespace: "hub",
		Subsystem: "worker",
		Name:      "waiting_count",
		Help:      "count of workers waiting for a task",
	}),
	WorkerWorking: promauto.NewCounterVec(
		prometheus.CounterOpts{
			Namespace: "hub",
			Subsystem: "worker",
			Name:      "working_total",
			Help:      "count of workers working on a task",
		},
		taskMetricLabels,
	),
	WorkerWorkingGauge: promauto.NewGaugeVec(
		prometheus.GaugeOpts{
			Namespace: "hub",
			Subsystem: "worker",
			Name:      "working",
			Help:      "count of working workers",
		},
		taskMetricLabels,
	),
	WorkerTask: promauto.NewCounterVec(
		prometheus.CounterOpts{
			Namespace: "hub",
			Subsystem: "worker",
			Name:      "task_total",
			Help:      "count of tasks seen by the worker",
		},
		taskMetricLabels,
	),
	WorkerErrors: promauto.NewCounterVec(
		prometheus.CounterOpts{
			Namespace: "hub",
			Subsystem: "worker",
			Name:      "error_total",
			Help:      "count of errors seen by the worker",
		},
		taskMetricLabels,
	),
	QueueErrors: promauto.NewCounter(prometheus.CounterOpts{
		Namespace: "hub",
		Subsystem: "worker",
		Name:      "queue_error_total",
		Help:      "count of dequeue errors seen by the worker",
	}),
}

TaskQueueMetrics provides access to the prometheus metric objects for the task queue

View Source
var TaskSchedulingMetrics = struct {
	Labels            []string
	IterationDuration prometheus.Histogram
	WorkerGauge,
	WorkerWorkingGauge prometheus.Gauge
	WorkerWaiting,
	WorkerWorking,
	WorkerErrors prometheus.Counter
	WorkerTaskScheduled *prometheus.CounterVec
}{
	Labels: queueMetricLabels,
	IterationDuration: promauto.NewHistogram(prometheus.HistogramOpts{
		Namespace: "hub",
		Subsystem: "scheduling",
		Name:      "iteration_duration_ms",
		Help:      "duration of the scheduling iteration in ms",
		Buckets:   durationMsBuckets,
	}),
	WorkerGauge: promauto.NewGauge(prometheus.GaugeOpts{
		Namespace: "hub",
		Subsystem: "scheduling",
		Name:      "count",
		Help:      "count of initialized task scheduling workers",
	}),
	WorkerWorkingGauge: promauto.NewGauge(prometheus.GaugeOpts{
		Namespace: "hub",
		Subsystem: "scheduling",
		Name:      "working_count",
		Help:      "count of working task scheduling workers",
	}),
	WorkerWaiting: promauto.NewCounter(prometheus.CounterOpts{
		Namespace: "hub",
		Subsystem: "scheduling",
		Name:      "waiting_total",
		Help:      "count of workers waiting for a task to schedule",
	}),
	WorkerWorking: promauto.NewCounter(prometheus.CounterOpts{
		Namespace: "hub",
		Subsystem: "scheduling",
		Name:      "working_total",
		Help:      "count of workers working on a task scheduling",
	}),
	WorkerErrors: promauto.NewCounter(prometheus.CounterOpts{
		Namespace: "hub",
		Subsystem: "scheduling",
		Name:      "error_total",
		Help:      "count of task scheduling errors seen by the worker",
	}),
	WorkerTaskScheduled: promauto.NewCounterVec(
		prometheus.CounterOpts{
			Namespace: "hub",
			Subsystem: "scheduling",
			Name:      "task_total",
			Help:      "count of tasks scheduled by the worker",
		},
		taskMetricLabels,
	),
}

TaskSchedulingMetrics provides access to the prometheus metric objects for task scheduling

Functions

func AssertRetentionSchedule added in v1.10.0

func AssertRetentionSchedule(ctx context.Context, scheduler queue.Scheduler, queueName string, taskType queue.TaskType, status queue.TaskStatus, filter squirrel.Sqlizer, age time.Duration) error

AssertRetentionSchedule creates a new queue retention tasks for the supplied queue, finished tasks matching the supplied parameters will be deleted

Types

type SQLExecTaskSpec added in v1.10.0

type SQLExecTaskSpec struct {
	// SQL is the actual sql that will be run
	SQL string `json:"sql"`
}

SQLExecTaskSpec defines a task that simply executes a single SQL statement. This can be used for simple CRON cleanup tasks, for example.

type SQLTaskProgress added in v1.10.0

type SQLTaskProgress struct {
	// Duration of the HTTP request in milliseconds
	Duration *int64 `json:"duration,omitempty"`
	// RowsAffected
	RowsAffected *int64 `json:"rowsAffected,omitempty"`
	// ErrorMessage contains an error message string if it occurs during the update process
	ErrorMessage *string `json:"errorMessage,omitempty"`
}

SQLTaskProgress contains the generic progress information for a sql task

type TaskHandler

type TaskHandler interface {
	// Process implements the specific Task parsing logic
	Process(ctx context.Context, task queue.Task, heartbeats chan<- queue.Progress) (err error)
}

TaskHandler is a type alias for a method that parses a task and returns any processing errors

func NewRetentionHandler added in v1.10.0

func NewRetentionHandler(db *sql.DB) TaskHandler

NewRetentionHandler creates a task handler that will clean up old finished tasks

func NewSQLTaskHandler added in v1.10.0

func NewSQLTaskHandler(name string, db *sql.DB) TaskHandler

NewSQLTaskHandler creates a sqlTaskHandler handler instance with the given tracing name

type TaskHandlerFunc

type TaskHandlerFunc func(context.Context, queue.Task, chan<- queue.Progress) error

TaskHandlerFunc is an adapter that allows the use of a normal function as a TaskHandler

func (TaskHandlerFunc) Process

func (f TaskHandlerFunc) Process(ctx context.Context, task queue.Task, heartbeats chan<- queue.Progress) error

Process implements the specific Task parsing logic

type Worker

type Worker interface {
	// Work is responsible for getting and processing tasks
	// It should run continuously or until the context is cancelled
	Work(context.Context) error
}

Worker provides methods to do some kind of work

func NewScheduleWorker

func NewScheduleWorker(db *sql.DB, queue queue.Queuer, interval time.Duration) Worker

NewScheduleWorker creates a new task scheduling worker

func NewTaskWorker

func NewTaskWorker(dequeuer queue.Dequeuer, handler TaskHandler) Worker

NewTaskWorker creates a new Task Worker instance

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL