jobs

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 12, 2020 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Controller added in v0.2.0

type Controller interface {
	// Start starts a controller that reports errors on errCh and
	// optionally stopped status on shutdownCh if the channel is provided.
	// The controller blokcs waiting on tasks in its queue untill
	// interrupted by context (ctx) or a Shutdown/Stop function call.
	Start(ctx context.Context, errCh chan<- error, shutdownCh chan struct{})
	// Enqueue adds a task to this controller's queue for workers to
	// process. The function is non-blocking and can be interrupted by
	// context.
	Enqueue(ctx context.Context, task interface{})
	// Shutdown will singal a started controller to quit waiting continuously
	// for task and work on tasks until its queue is drained, then exit.
	// The shutdownCh parameter is an optional channel to notify when
	// shutdown is complete.
	Stop(shutdownCh chan struct{})
	// Stops the controller and its workers, regardless of whether there
	// are tasks in the queue waiting processing.
	Shutdown()
}

Controller is the functional interface of worker controllers for workeque. It captures a controller lifecycle from its start (Start), through adding tasks for workers (Enqueue) to its immediate (Shutdown) or gracefull end (Stop).

func NewController added in v0.2.0

func NewController(job *Job) Controller

NewController creates new Controller instances

type Job

type Job struct {
	// ID is job identifier used in log messages
	ID string
	// MaxWorkers is the maximum number of workers processing a batch of tasks in parallel
	MaxWorkers int
	// MinWorkers is the minimum number of workers processing a batch of tasks in parallel
	MinWorkers int
	// Worker for processing tasks
	Worker Worker
	// FailFast controls the behavior of this Job upon errors. If set to true, it will quit
	// further processing upon the first error that occurs. For fault tolerant applications
	// use false.
	FailFast bool
	// WorkQueue is the queue for tasks picked up by the workers in this Job. The Dispatch
	// method will feed its tasks argument elements to the queue, and it may be fed
	// from other sources in parallel, including the workers.
	Queue WorkQueue
	// IsWorkerExitsOnEmptyQueue controls whether a worker exits right after its Work function is
	// done and no more tasks are available in the queue, or will loop waiting for more tasks.
	// Note that this flag does not prevent the worker from block waiting for a task. This
	// can be interrupted only by the workqueue with a task or stop signal. However, after a taks
	// is processed it will be consulted whether to continue or exit before block waiting for
	// another.
	IsWorkerExitsOnEmptyQueue bool
}

Job enques assignments for parallel processing and synchronous response

func (*Job) Dispatch

func (j *Job) Dispatch(ctx context.Context, tasks []interface{}) *WorkerError

Dispatch spawns a set of workers processing in parallel the supplied tasks. If the context is cancelled or has timed out (if it's a timeout context), or if any other error occurs during processing of tasks, a workerError error is returned as soon as possible, processing halts and workers are disposed.

type WorkQueue added in v0.2.0

type WorkQueue interface {
	// Get block waits for items from the workqueue and
	// returns an item .
	Get() interface{}
	// Stops sends a stop signal to thework queue
	Stop() bool
	// Add adds a task to this workqueue
	Add(task interface{})
	// Count returns the current number of items in the queue
	Count() int
}

WorkQueue encapsulates operations on a workque.

func NewWorkQueue added in v0.2.0

func NewWorkQueue(buffer int) WorkQueue

NewWorkQueue creates new WorkQueue implementation object

type Worker

type Worker interface {
	// Work processes the task within the given context.
	Work(ctx context.Context, task interface{}, wq WorkQueue) *WorkerError
}

Worker declares workers functional interface

type WorkerError

type WorkerError struct {
	// contains filtered or unexported fields
}

WorkerError wraps an underlying error struct and adds optional code to enrich the context of the error e.g. with HTTP status codes

func NewWorkerError

func NewWorkerError(err error, code int) *WorkerError

NewWorkerError creates worker errors

func (WorkerError) Is

func (we WorkerError) Is(target error) bool

Is implements the contract for errors.Is (https://golang.org/pkg/errors/#Is)

func (WorkerError) Unwrap added in v0.2.0

func (we WorkerError) Unwrap() error

Unwrap implements the contract for errors.Unwrap (https://golang.org/pkg/errors/#Unwrap)

type WorkerFunc

type WorkerFunc func(ctx context.Context, task interface{}, wq WorkQueue) *WorkerError

The WorkerFunc type is an adapter to allow the use of ordinary functions as Workers. If f is a function with the appropriate signature, WorkerFunc(f) is a Worker object that calls f.

func (WorkerFunc) Work

func (f WorkerFunc) Work(ctx context.Context, task interface{}, wq WorkQueue) *WorkerError

Work calls f(ctx, Task).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL