taskq

package module
v2.0.0-beta.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 4, 2019 License: BSD-2-Clause Imports: 21 Imported by: 4

README

Golang asynchronous task/job queue with Redis, SQS, IronMQ, and in-memory backends

Build Status GoDoc

Installation

go get github.com/vmihailenco/taskq/v2

Features

  • Redis, SQS, IronMQ, and in-memory backends.
  • Automatically scaling number of goroutines used to fetch (fetcher) and process messages (worker).
  • Global rate limiting.
  • Global limit of workers.
  • Call once - deduplicating messages with same name.
  • Automatic retries with exponential backoffs.
  • Automatic pausing when all messages in queue fail.
  • Fallback handler for processing failed messages.
  • Message batching. It is used in SQS and IronMQ backends to add/delete messages in batches.
  • Automatic message compression using zstd.

Quickstart

I recommend to split your app into 2 parts:

  • API that accepts requests from customers and adds tasks to the queues.
  • Worker that fetches tasks from the queues and processes them.

This way you can:

  • isolate API and worker from each other;
  • scale API and worker separately;
  • have different configs for API and worker (like timeouts).

There is an api_worker example that demonstrates this approach using Redis as backend:

cd examples/api_worker
go run worker/main.go
go run api/main.go

You start by choosing backend to use - in our case Redis:

var QueueFactory = redisq.NewFactory()

Using that factory you create queue that contains task(s):

var MainQueue = QueueFactory.NewQueue(&taskq.QueueOptions{
	Name:  "api-worker",
	Redis: Redis, // go-redis client
})

Using the queue you create task with handler that does some useful work:

var CountTask = MainQueue.NewTask(&taskq.TaskOptions{
	Name: "counter",
	Handler: func() error {
		IncrLocalCounter()
		return nil
	},
})

Then in API you use the task to add messages/jobs to the queues:

for {
	err := api_worker.CountTask.Call() // call task handler without any args
	if err != nil {
		log.Fatal(err)
	}
}

And in worker you start processing the queue:

err := api_worker.MainQueue.Consumer().Start()
if err != nil {
	log.Fatal(err)
}

API overview

t := myQueue.NewTask(&taskq.TaskOptions{
	Name:    "greeting",
	Handler: func(name string) error {
		fmt.Println("Hello", name)
		return nil
	},
})

// Say "Hello World".
t.Call("World")

// Same using Message API.
t.AddMessage(taskq.NewMessage("World"))

// Say "Hello World" with 1 hour delay.
msg := taskq.NewMessage("World")
msg.Delay = time.Hour
t.AddMessage(msg)

// Say "Hello World" once.
for i := 0; i < 100; i++ {
    msg := taskq.NewMessage("hello")
    msg.Name = "hello-world" // unique
    t.Add(msg)
}

// Say "Hello World" once with 1 hour delay.
for i := 0; i < 100; i++ {
    msg := taskq.NewMessage("hello")
    msg.Name = "hello-world"
    msg.Delay = time.Hour
    t.Add(msg)
}

// Say "Hello World" once in an hour.
for i := 0; i < 100; i++ {
    t.CallOnce(time.Hour, "hello")
}

// Say "Hello World" for Europe region once in an hour.
for i := 0; i < 100; i++ {
    msg := taskq.NewMessage("hello")
    msg.OnceWithArgs(time.Hour, "europe") // set delay and autogenerate message name
    t.Add(msg)
}

Custom message delay

If error returned by handler implements Delay() time.Duration interface then that delay is used to postpone message processing.

type RateLimitError string

func (e RateLimitError) Error() string {
    return string(e)
}

func (RateLimitError) Delay() time.Duration {
    return time.Hour
}

func handler() error {
    return RateLimitError("calm down")
}

Documentation

Overview

Package taskq implements task/job queue with Redis, SQS, IronMQ, and in-memory backends.

Example (CustomRateLimit)
package main

import (
	"fmt"
	"time"

	"github.com/vmihailenco/taskq/v2"
	"github.com/vmihailenco/taskq/v2/memqueue"
)

type RateLimitError string

func (e RateLimitError) Error() string {
	return string(e)
}

func (RateLimitError) Delay() time.Duration {
	return 3 * time.Second
}

func main() {
	start := time.Now()
	q := memqueue.NewQueue(&taskq.QueueOptions{
		Name: "test",
	})
	task := taskq.NewTask(&taskq.TaskOptions{
		Name: "Example_customRateLimit",
		Handler: func() error {
			fmt.Println("retried in", timeSince(start))
			return RateLimitError("calm down")
		},
		RetryLimit: 2,
		MinBackoff: time.Millisecond,
	})

	q.Add(task.WithArgs())

	// Wait for all messages to be processed.
	_ = q.Close()

}
Output:

retried in 0s
retried in 3s
Example (MessageDelay)
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
	Name: "test",
})
task := taskq.NewTask(&taskq.TaskOptions{
	Name: "Example_messageDelay",
	Handler: func() {
		fmt.Println("processed with delay", timeSince(start))
	},
})

msg := task.WithArgs()
msg.Delay = time.Second
_ = q.Add(msg)

// Wait for all messages to be processed.
_ = q.Close()
Output:

processed with delay 1s
Example (Once)
q := memqueue.NewQueue(&taskq.QueueOptions{
	Name:      "test",
	Redis:     redisRing(),
	RateLimit: rate.Every(time.Second),
})
task := taskq.NewTask(&taskq.TaskOptions{
	Name: "Example_once",
	Handler: func(name string) {
		fmt.Println("hello", name)
	},
})

for i := 0; i < 10; i++ {
	// Call once in a second.
	_ = q.Add(task.OnceWithArgs(time.Second, "world"))
}

// Wait for all messages to be processed.
_ = q.Close()
Output:

hello world
Example (RateLimit)
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
	Name:      "test",
	Redis:     redisRing(),
	RateLimit: rate.Every(time.Second),
})
task := taskq.NewTask(&taskq.TaskOptions{
	Name:    "Example_rateLimit",
	Handler: func() {},
})

const n = 5
for i := 0; i < n; i++ {
	_ = q.Add(task.WithArgs())
}

// Wait for all messages to be processed.
_ = q.Close()

fmt.Printf("%d msg/s", timeSinceCeil(start)/time.Second/n)
Output:

1 msg/s
Example (RetryOnError)
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
	Name: "test",
})
task := taskq.NewTask(&taskq.TaskOptions{
	Name: "Example_retryOnError",
	Handler: func() error {
		fmt.Println("retried in", timeSince(start))
		return errors.New("fake error")
	},
	RetryLimit: 3,
	MinBackoff: time.Second,
})

q.Add(task.WithArgs())

// Wait for all messages to be processed.
_ = q.Close()
Output:

retried in 0s
retried in 1s
retried in 3s

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrAsyncTask = errors.New("taskq: async task")
View Source
var ErrDuplicate = errors.New("taskq: message with such name already exists")

ErrDuplicate is returned when adding duplicate message to the queue.

View Source
var Queues queueRegistry
View Source
var Tasks taskRegistry

Functions

func SetLogger

func SetLogger(logger *log.Logger)

func SetUnknownTaskOptions

func SetUnknownTaskOptions(opt *TaskOptions)

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer reserves messages from the queue, processes them, and then either releases or deletes messages from the queue.

func NewConsumer

func NewConsumer(q Queuer) *Consumer

New creates new Consumer for the queue using provided processing options.

func StartConsumer

func StartConsumer(q Queuer) *Consumer

Starts creates new Consumer and starts it.

func (*Consumer) Add

func (c *Consumer) Add(msg *Message) error

func (*Consumer) AddHook

func (c *Consumer) AddHook(hook ConsumerHook)

AddHook adds a hook into message processing.

func (*Consumer) Len

func (c *Consumer) Len() int

func (*Consumer) Options

func (c *Consumer) Options() *QueueOptions

func (*Consumer) Process

func (c *Consumer) Process(msg *Message) error

Process is low-level API to process message bypassing the internal queue.

func (*Consumer) ProcessAll

func (c *Consumer) ProcessAll() error

ProcessAll starts workers to process messages in the queue and then stops them when all messages are processed.

func (*Consumer) ProcessOne

func (c *Consumer) ProcessOne() error

ProcessOne processes at most one message in the queue.

func (*Consumer) Purge

func (c *Consumer) Purge() error

Purge discards messages from the internal queue.

func (*Consumer) Put

func (c *Consumer) Put(msg *Message)

func (*Consumer) Queue

func (c *Consumer) Queue() Queuer

func (*Consumer) Start

func (c *Consumer) Start() error

Start starts consuming messages in the queue.

func (*Consumer) Stats

func (c *Consumer) Stats() *ConsumerStats

Stats returns processor stats.

func (*Consumer) Stop

func (c *Consumer) Stop() error

Stop is StopTimeout with 30 seconds timeout.

func (*Consumer) StopTimeout

func (c *Consumer) StopTimeout(timeout time.Duration) error

StopTimeout waits workers for timeout duration to finish processing current messages and stops workers.

func (*Consumer) String

func (c *Consumer) String() string

type ConsumerHook

type ConsumerHook interface {
	BeforeProcessMessage(*ProcessMessageEvent) error
	AfterProcessMessage(*ProcessMessageEvent) error
}

type ConsumerStats

type ConsumerStats struct {
	WorkerNumber  uint32
	FetcherNumber uint32
	BufferSize    uint32
	Buffered      uint32
	InFlight      uint32
	Processed     uint32
	Retries       uint32
	Fails         uint32
}

type Delayer

type Delayer interface {
	Delay() time.Duration
}

type Factory

type Factory interface {
	NewQueue(*QueueOptions) Queuer
	Queues() []Queuer
	StartConsumers() error
	StopConsumers() error
	Close() error
}

Factory is an interface that abstracts creation of new queues. It is implemented in subpackages memqueue, azsqs, and ironmq.

type Handler

type Handler interface {
	HandleMessage(msg *Message) error
}

Handler is an interface for processing messages.

func NewHandler

func NewHandler(fn interface{}) Handler

type HandlerFunc

type HandlerFunc func(*Message) error

func (HandlerFunc) HandleMessage

func (fn HandlerFunc) HandleMessage(msg *Message) error

type Message

type Message struct {
	Ctx context.Context `msgpack:"-"`

	// SQS/IronMQ message id.
	ID string `msgpack:",omitempty"`

	// Optional name for the message. Messages with the same name
	// are processed only once.
	Name string `msgpack:"-"`

	// Delay specifies the duration the queue must wait
	// before executing the message.
	Delay time.Duration `msgpack:"-"`

	// Function args passed to the handler.
	Args []interface{} `msgpack:"-"`

	// Binary representation of the args.
	ArgsCompression string `msgpack:",omitempty"`
	ArgsBin         []byte

	// SQS/IronMQ reservation id that is used to release/delete the message.
	ReservationID string `msgpack:"-"`

	// The number of times the message has been reserved or released.
	ReservedCount int

	TaskName  string
	StickyErr error `msgpack:"-"`
	// contains filtered or unexported fields
}

Message is used to create and retrieve messages from a queue.

func NewMessage

func NewMessage(args ...interface{}) *Message

func (*Message) MarshalArgs

func (m *Message) MarshalArgs() ([]byte, error)

func (*Message) MarshalBinary

func (m *Message) MarshalBinary() ([]byte, error)

func (*Message) OnceWithArgs

func (m *Message) OnceWithArgs(period time.Duration, args ...interface{})

func (*Message) String

func (m *Message) String() string

func (*Message) UnmarshalBinary

func (m *Message) UnmarshalBinary(b []byte) error

type ProcessMessageEvent

type ProcessMessageEvent struct {
	Message   *Message
	StartTime time.Time
	Error     error

	Stash map[interface{}]interface{}
}

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(opt *QueueOptions) *Queue

func (*Queue) Add

func (q *Queue) Add(msg *Message) error

func (*Queue) Bind

func (q *Queue) Bind(factory Factory)

func (*Queue) Close

func (q *Queue) Close() error

func (*Queue) CloseTimeout

func (q *Queue) CloseTimeout(timeout time.Duration) error

func (*Queue) Consumer

func (q *Queue) Consumer() *Consumer

func (*Queue) Delete

func (q *Queue) Delete(msg *Message) error

func (*Queue) Len

func (q *Queue) Len() (int, error)

func (*Queue) Name

func (q *Queue) Name() string

func (*Queue) Options

func (q *Queue) Options() *QueueOptions

func (*Queue) Purge

func (q *Queue) Purge() error

func (*Queue) Release

func (q *Queue) Release(msg *Message) error

func (*Queue) ReserveN

func (q *Queue) ReserveN(n int, waitTimeout time.Duration) ([]Message, error)

type QueueOptions

type QueueOptions struct {
	// Queue name.
	Name string

	// Minimum number of goroutines processing messages.
	// Default is 1.
	MinWorkers int
	// Maximum number of goroutines processing messages.
	// Default is 32 * number of CPUs.
	MaxWorkers int
	// Global limit of concurrently running workers across all servers.
	// Overrides MaxWorkers.
	WorkerLimit int
	// Maximum number of goroutines fetching messages.
	// Default is 16 * number of CPUs.
	MaxFetchers int

	// Number of messages reserved by a fetcher in the queue in one request.
	// Default is 10 messages.
	ReservationSize int
	// Time after which the reserved message is returned to the queue.
	// Default is 5 minutes.
	ReservationTimeout time.Duration
	// Time that a long polling receive call waits for a message to become
	// available before returning an empty response.
	// Default is 10 seconds.
	WaitTimeout time.Duration
	// Size of the buffer where reserved messages are stored.
	// Default is the same as ReservationSize.
	BufferSize int

	// Number of consecutive failures after which queue processing is paused.
	// Default is 100 failures.
	PauseErrorsThreshold int

	// Processing rate limit.
	RateLimit rate.Limit

	// Optional rate limiter interface. The default is to use Redis.
	RateLimiter RateLimiter

	// Redis client that is used for storing metadata.
	Redis Redis

	// Optional storage interface. The default is to use Redis.
	Storage Storage
	// contains filtered or unexported fields
}

func (*QueueOptions) Init

func (opt *QueueOptions) Init()

type Queuer

type Queuer interface {
	Name() string
	Options() *QueueOptions
	Consumer() *Consumer

	Len() (int, error)
	Add(msg *Message) error
	ReserveN(n int, waitTimeout time.Duration) ([]Message, error)
	Release(msg *Message) error
	Delete(msg *Message) error
	Purge() error
	Close() error
	CloseTimeout(timeout time.Duration) error
}

type RateLimiter

type RateLimiter interface {
	AllowRate(name string, limit rate.Limit) (delay time.Duration, allow bool)
}

type Redis

type Redis interface {
	Del(keys ...string) *redis.IntCmd
	SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
	Pipelined(func(pipe redis.Pipeliner) error) ([]redis.Cmder, error)

	// Required by redislock
	Eval(script string, keys []string, args ...interface{}) *redis.Cmd
	EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
	ScriptExists(scripts ...string) *redis.BoolSliceCmd
	ScriptLoad(script string) *redis.StringCmd
}

type Storage

type Storage interface {
	Exists(key string) bool
}

type Task

type Task struct {
	// contains filtered or unexported fields
}

func NewTask

func NewTask(opt *TaskOptions) *Task

func (*Task) HandleMessage

func (t *Task) HandleMessage(msg *Message) error

func (*Task) Name

func (t *Task) Name() string

func (*Task) OnceWithArgs

func (t *Task) OnceWithArgs(period time.Duration, args ...interface{}) *Message

func (*Task) Options

func (t *Task) Options() *TaskOptions

func (*Task) String

func (t *Task) String() string

func (*Task) WithArgs

func (t *Task) WithArgs(args ...interface{}) *Message

func (*Task) WithMessage

func (t *Task) WithMessage(msg *Message) *Message

type TaskOptions

type TaskOptions struct {
	// Task name.
	Name string

	// Function called to process a message.
	Handler interface{}
	// Function called to process failed message.
	FallbackHandler interface{}

	// Optional function used by Consumer with defer statement
	// to recover from panics.
	DeferFunc func()

	// Number of tries/releases after which the message fails permanently
	// and is deleted.
	// Default is 64 retries.
	RetryLimit int
	// Minimum backoff time between retries.
	// Default is 30 seconds.
	MinBackoff time.Duration
	// Maximum backoff time between retries.
	// Default is 30 minutes.
	MaxBackoff time.Duration
	// contains filtered or unexported fields
}

Directories

Path Synopsis
examples
api_worker/api command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL