Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Controller ¶ added in v0.2.0
type Controller interface {
// Start starts a controller that reports errors on errCh and
// optionally stopped status on shutdownCh if the channel is provided.
// The controller blokcs waiting on tasks in its queue untill
// interrupted by context (ctx) or a Shutdown/Stop function call.
Start(ctx context.Context, errCh chan<- error, shutdownCh chan struct{})
// Enqueue adds a task to this controller's queue for workers to
// process. The function is non-blocking and can be interrupted by
// context.
Enqueue(ctx context.Context, task interface{})
// Shutdown will singal a started controller to quit waiting continuously
// for task and work on tasks until its queue is drained, then exit.
// The shutdownCh parameter is an optional channel to notify when
// shutdown is complete.
Stop(shutdownCh chan struct{})
// Stops the controller and its workers, regardless of whether there
// are tasks in the queue waiting processing.
Shutdown()
}
Controller is the functional interface of worker controllers for workeque. It captures a controller lifecycle from its start (Start), through adding tasks for workers (Enqueue) to its immediate (Shutdown) or gracefull end (Stop).
func NewController ¶ added in v0.2.0
func NewController(job *Job) Controller
NewController creates new Controller instances
type Job ¶
type Job struct {
// ID is job identifier used in log messages
ID string
// MaxWorkers is the maximum number of workers processing a batch of tasks in parallel
MaxWorkers int
// MinWorkers is the minimum number of workers processing a batch of tasks in parallel
MinWorkers int
// Worker for processing tasks
Worker Worker
// FailFast controls the behavior of this Job upon errors. If set to true, it will quit
// further processing upon the first error that occurs. For fault tolerant applications
// use false.
FailFast bool
// WorkQueue is the queue for tasks picked up by the workers in this Job. The Dispatch
// method will feed its tasks argument elements to the queue, and it may be fed
// from other sources in parallel, including the workers.
Queue WorkQueue
// IsWorkerExitsOnEmptyQueue controls whether a worker exits right after its Work function is
// done and no more tasks are available in the queue, or will loop waiting for more tasks.
// Note that this flag does not prevent the worker from block waiting for a task. This
// can be interrupted only by the workqueue with a task or stop signal. However, after a taks
// is processed it will be consulted whether to continue or exit before block waiting for
// another.
IsWorkerExitsOnEmptyQueue bool
}
Job enques assignments for parallel processing and synchronous response
func (*Job) Dispatch ¶
func (j *Job) Dispatch(ctx context.Context, tasks []interface{}) *WorkerError
Dispatch spawns a set of workers processing in parallel the supplied tasks. If the context is cancelled or has timed out (if it's a timeout context), or if any other error occurs during processing of tasks, a workerError error is returned as soon as possible, processing halts and workers are disposed.
type WorkQueue ¶ added in v0.2.0
type WorkQueue interface {
// Get block waits for items from the workqueue and
// returns an item .
Get() interface{}
// Stops sends a stop signal to thework queue
Stop() bool
// Add adds a task to this workqueue
Add(task interface{})
// Count returns the current number of items in the queue
Count() int
}
WorkQueue encapsulates operations on a workque.
func NewWorkQueue ¶ added in v0.2.0
NewWorkQueue creates new WorkQueue implementation object
type Worker ¶
type Worker interface {
// Work processes the task within the given context.
Work(ctx context.Context, task interface{}, wq WorkQueue) *WorkerError
}
Worker declares workers functional interface
type WorkerError ¶
type WorkerError struct {
// contains filtered or unexported fields
}
WorkerError wraps an underlying error struct and adds optional code to enrich the context of the error e.g. with HTTP status codes
func NewWorkerError ¶
func NewWorkerError(err error, code int) *WorkerError
NewWorkerError creates worker errors
func (WorkerError) Is ¶
func (we WorkerError) Is(target error) bool
Is implements the contract for errors.Is (https://golang.org/pkg/errors/#Is)
func (WorkerError) Unwrap ¶ added in v0.2.0
func (we WorkerError) Unwrap() error
Unwrap implements the contract for errors.Unwrap (https://golang.org/pkg/errors/#Unwrap)
type WorkerFunc ¶
type WorkerFunc func(ctx context.Context, task interface{}, wq WorkQueue) *WorkerError
The WorkerFunc type is an adapter to allow the use of ordinary functions as Workers. If f is a function with the appropriate signature, WorkerFunc(f) is a Worker object that calls f.
func (WorkerFunc) Work ¶
func (f WorkerFunc) Work(ctx context.Context, task interface{}, wq WorkQueue) *WorkerError
Work calls f(ctx, Task).