workqueue

package
v1.0.0-b001 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2018 License: MIT Imports: 6 Imported by: 0

README

go-workqueue

Build Status

This library implements a simple work queue, driven by a configurable number of goroutine workers.

Example

wq := workqueue.New()
workQueue.Start() //two workers, i.e. two go routines
workQueue.Enqueue(func(v ...interface{}) error {
    fmt.Printf("Work Item: %#v\n", v)
    return nil
}, "hello world")
// output: "Work Item: hello world"

Documentation

Index

Constants

View Source
const (
	// DefaultMaxRetries is the maximum times a process queue item will be retried before being dropped.
	DefaultMaxRetries = 10

	// DefaultMaxWorkItems is the default entry buffer length.
	// Currently the default is 2^18 or 256k.
	// WorkItems maps to the initialized capacity of a buffered channel.
	// As a result it does not reflect actual memory consumed.
	DefaultMaxWorkItems = 1 << 18
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Action

type Action func(args ...interface{}) error

Action is an action that can be dispatched by the process queue.

type Entry

type Entry struct {
	Action  Action
	Args    []interface{}
	Tries   int32
	Recover bool
}

Entry is an individual item of work.

func (Entry) Execute

func (e Entry) Execute() (err error)

Execute runs the work item.

func (Entry) String

func (e Entry) String() string

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is the container for work items, it dispatches work to the workers.

func Default

func Default() *Queue

Default returns a singleton queue.

func New

func New() *Queue

New returns a new work queue.

func NewWithOptions

func NewWithOptions(numWorkers, retryCount, maxWorkItems int) *Queue

NewWithOptions returns a new queue with customizable options.

func NewWithWorkers

func NewWithWorkers(numWorkers int) *Queue

NewWithWorkers returns a new work queue with a given number of workers.

func (*Queue) Close

func (q *Queue) Close() error

Close drains the queue and stops the workers.

func (*Queue) Each

func (q *Queue) Each(visitor func(entry *Entry))

Each runs the consumer for each item in the queue.

func (*Queue) Enqueue

func (q *Queue) Enqueue(action Action, args ...interface{})

Enqueue adds a work item to the process queue.

func (*Queue) Len

func (q *Queue) Len() int

Len returns the number of items in the work queue.

func (*Queue) MaxRetries

func (q *Queue) MaxRetries() int

MaxRetries returns the maximum number of retries.

func (*Queue) MaxWorkItems

func (q *Queue) MaxWorkItems() int

MaxWorkItems returns the maximum length of the work item queue.

func (*Queue) NumWorkers

func (q *Queue) NumWorkers() int

NumWorkers returns the number of worker routines.

func (*Queue) Recover

func (q *Queue) Recover() bool

Recover returns if the queue is handling / recovering from panics.

func (*Queue) Running

func (q *Queue) Running() bool

Running returns if the queue has started or not.

func (*Queue) SetMaxRetries

func (q *Queue) SetMaxRetries(maxRetries int)

SetMaxRetries sets the maximum nummer of retries for a work item on error.

func (*Queue) SetMaxWorkItems

func (q *Queue) SetMaxWorkItems(workItems int)

SetMaxWorkItems sets the max work items.

func (*Queue) SetNumWorkers

func (q *Queue) SetNumWorkers(workers int)

SetNumWorkers lets you set the num workers.

func (*Queue) SetRecover

func (q *Queue) SetRecover(shouldRecover bool)

SetRecover sets if the queue workers should handle panics.

func (*Queue) Start

func (q *Queue) Start()

Start starts the dispatcher workers for the process quere.

func (*Queue) String

func (q *Queue) String() string

String returns a string representation of the queue.

func (*Queue) WithMaxRetries

func (q *Queue) WithMaxRetries(maxRetries int) *Queue

WithMaxRetries calls `SetMaxRetries` and returns a reference to the queue.

func (*Queue) WithMaxWorkItems

func (q *Queue) WithMaxWorkItems(workItems int) *Queue

WithMaxWorkItems calls `SetMaxWorkItems` and returns a reference to the queue.

func (*Queue) WithNumWorkers

func (q *Queue) WithNumWorkers(workers int) *Queue

WithNumWorkers calls `SetNumWorkers` and returns a reference to the queue.

func (*Queue) WithRecover

func (q *Queue) WithRecover(shouldRecover bool) *Queue

WithRecover sets if the queue should recover panics.

type Worker

type Worker struct {
	ID      int
	Work    chan *Entry
	Parent  *Queue
	Abort   chan bool
	Aborted chan bool
}

Worker is a consumer of the work queue.

func NewWorker

func NewWorker(id int, parent *Queue, maxItems int) *Worker

NewWorker creates a new worker.

func (*Worker) Close

func (w *Worker) Close() error

Close sends the stop signal to the worker.

func (*Worker) Start

func (w *Worker) Start()

Start starts the worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL