Documentation
¶
Overview ¶
Package taskq implements task/job queue with Redis, SQS, IronMQ, and in-memory backends.
Example (CustomRateLimit) ¶
package main
import (
"fmt"
"time"
"github.com/vmihailenco/taskq/v2"
"github.com/vmihailenco/taskq/v2/memqueue"
)
type RateLimitError string
func (e RateLimitError) Error() string {
return string(e)
}
func (RateLimitError) Delay() time.Duration {
return 3 * time.Second
}
func main() {
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
Name: "test",
})
task := taskq.NewTask(&taskq.TaskOptions{
Name: "Example_customRateLimit",
Handler: func() error {
fmt.Println("retried in", timeSince(start))
return RateLimitError("calm down")
},
RetryLimit: 2,
MinBackoff: time.Millisecond,
})
q.Add(task.WithArgs())
// Wait for all messages to be processed.
_ = q.Close()
}
Output: retried in 0s retried in 3s
Example (MessageDelay) ¶
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
Name: "test",
})
task := taskq.NewTask(&taskq.TaskOptions{
Name: "Example_messageDelay",
Handler: func() {
fmt.Println("processed with delay", timeSince(start))
},
})
msg := task.WithArgs()
msg.Delay = time.Second
_ = q.Add(msg)
// Wait for all messages to be processed.
_ = q.Close()
Output: processed with delay 1s
Example (Once) ¶
q := memqueue.NewQueue(&taskq.QueueOptions{
Name: "test",
Redis: redisRing(),
RateLimit: rate.Every(time.Second),
})
task := taskq.NewTask(&taskq.TaskOptions{
Name: "Example_once",
Handler: func(name string) {
fmt.Println("hello", name)
},
})
for i := 0; i < 10; i++ {
// Call once in a second.
_ = q.Add(task.OnceWithArgs(time.Second, "world"))
}
// Wait for all messages to be processed.
_ = q.Close()
Output: hello world
Example (RateLimit) ¶
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
Name: "test",
Redis: redisRing(),
RateLimit: rate.Every(time.Second),
})
task := taskq.NewTask(&taskq.TaskOptions{
Name: "Example_rateLimit",
Handler: func() {},
})
const n = 5
for i := 0; i < n; i++ {
_ = q.Add(task.WithArgs())
}
// Wait for all messages to be processed.
_ = q.Close()
fmt.Printf("%d msg/s", timeSinceCeil(start)/time.Second/n)
Output: 1 msg/s
Example (RetryOnError) ¶
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
Name: "test",
})
task := taskq.NewTask(&taskq.TaskOptions{
Name: "Example_retryOnError",
Handler: func() error {
fmt.Println("retried in", timeSince(start))
return errors.New("fake error")
},
RetryLimit: 3,
MinBackoff: time.Second,
})
q.Add(task.WithArgs())
// Wait for all messages to be processed.
_ = q.Close()
Output: retried in 0s retried in 1s retried in 3s
Index ¶
- Variables
- func SetLogger(logger *log.Logger)
- func SetUnknownTaskOptions(opt *TaskOptions)
- type Consumer
- func (c *Consumer) Add(msg *Message) error
- func (c *Consumer) AddHook(hook ConsumerHook)
- func (c *Consumer) Len() int
- func (c *Consumer) Options() *QueueOptions
- func (c *Consumer) Process(msg *Message) error
- func (c *Consumer) ProcessAll() error
- func (c *Consumer) ProcessOne() error
- func (c *Consumer) Purge() error
- func (c *Consumer) Put(msg *Message)
- func (c *Consumer) Queue() Queuer
- func (c *Consumer) Start() error
- func (c *Consumer) Stats() *ConsumerStats
- func (c *Consumer) Stop() error
- func (c *Consumer) StopTimeout(timeout time.Duration) error
- func (c *Consumer) String() string
- type ConsumerHook
- type ConsumerStats
- type Delayer
- type Factory
- type Handler
- type HandlerFunc
- type Message
- type ProcessMessageEvent
- type Queue
- func (q *Queue) Add(msg *Message) error
- func (q *Queue) Bind(factory Factory)
- func (q *Queue) Close() error
- func (q *Queue) CloseTimeout(timeout time.Duration) error
- func (q *Queue) Consumer() *Consumer
- func (q *Queue) Delete(msg *Message) error
- func (q *Queue) Len() (int, error)
- func (q *Queue) Name() string
- func (q *Queue) Options() *QueueOptions
- func (q *Queue) Purge() error
- func (q *Queue) Release(msg *Message) error
- func (q *Queue) ReserveN(n int, waitTimeout time.Duration) ([]Message, error)
- type QueueOptions
- type Queuer
- type RateLimiter
- type Redis
- type Storage
- type Task
- func (t *Task) HandleMessage(msg *Message) error
- func (t *Task) Name() string
- func (t *Task) OnceWithArgs(period time.Duration, args ...interface{}) *Message
- func (t *Task) Options() *TaskOptions
- func (t *Task) String() string
- func (t *Task) WithArgs(args ...interface{}) *Message
- func (t *Task) WithMessage(msg *Message) *Message
- type TaskOptions
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrAsyncTask = errors.New("taskq: async task")
var ErrDuplicate = errors.New("taskq: message with such name already exists")
ErrDuplicate is returned when adding duplicate message to the queue.
var Queues queueRegistry
var Tasks taskRegistry
Functions ¶
func SetUnknownTaskOptions ¶
func SetUnknownTaskOptions(opt *TaskOptions)
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer reserves messages from the queue, processes them, and then either releases or deletes messages from the queue.
func NewConsumer ¶
New creates new Consumer for the queue using provided processing options.
func StartConsumer ¶
Starts creates new Consumer and starts it.
func (*Consumer) AddHook ¶
func (c *Consumer) AddHook(hook ConsumerHook)
AddHook adds a hook into message processing.
func (*Consumer) Options ¶
func (c *Consumer) Options() *QueueOptions
func (*Consumer) Process ¶
Process is low-level API to process message bypassing the internal queue.
func (*Consumer) ProcessAll ¶
ProcessAll starts workers to process messages in the queue and then stops them when all messages are processed.
func (*Consumer) ProcessOne ¶
ProcessOne processes at most one message in the queue.
func (*Consumer) StopTimeout ¶
StopTimeout waits workers for timeout duration to finish processing current messages and stops workers.
type ConsumerHook ¶
type ConsumerHook interface {
BeforeProcessMessage(*ProcessMessageEvent) error
AfterProcessMessage(*ProcessMessageEvent) error
}
type ConsumerStats ¶
type Factory ¶
type Factory interface {
NewQueue(*QueueOptions) Queuer
Queues() []Queuer
StartConsumers() error
StopConsumers() error
Close() error
}
Factory is an interface that abstracts creation of new queues. It is implemented in subpackages memqueue, azsqs, and ironmq.
type Handler ¶
Handler is an interface for processing messages.
func NewHandler ¶
func NewHandler(fn interface{}) Handler
type HandlerFunc ¶
func (HandlerFunc) HandleMessage ¶
func (fn HandlerFunc) HandleMessage(msg *Message) error
type Message ¶
type Message struct {
Ctx context.Context `msgpack:"-"`
// SQS/IronMQ message id.
ID string `msgpack:",omitempty"`
// Optional name for the message. Messages with the same name
// are processed only once.
Name string `msgpack:"-"`
// Delay specifies the duration the queue must wait
// before executing the message.
Delay time.Duration `msgpack:"-"`
// Function args passed to the handler.
Args []interface{} `msgpack:"-"`
// Binary representation of the args.
ArgsCompression string `msgpack:",omitempty"`
ArgsBin []byte
// SQS/IronMQ reservation id that is used to release/delete the message.
ReservationID string `msgpack:"-"`
// The number of times the message has been reserved or released.
ReservedCount int
TaskName string
StickyErr error `msgpack:"-"`
// contains filtered or unexported fields
}
Message is used to create and retrieve messages from a queue.
func NewMessage ¶
func NewMessage(args ...interface{}) *Message
func (*Message) MarshalArgs ¶
func (*Message) MarshalBinary ¶
func (*Message) OnceWithArgs ¶
func (*Message) UnmarshalBinary ¶
type ProcessMessageEvent ¶
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
func NewQueue ¶
func NewQueue(opt *QueueOptions) *Queue
func (*Queue) Options ¶
func (q *Queue) Options() *QueueOptions
type QueueOptions ¶
type QueueOptions struct {
// Queue name.
Name string
// Minimum number of goroutines processing messages.
// Default is 1.
MinWorkers int
// Maximum number of goroutines processing messages.
// Default is 32 * number of CPUs.
MaxWorkers int
// Global limit of concurrently running workers across all servers.
// Overrides MaxWorkers.
WorkerLimit int
// Maximum number of goroutines fetching messages.
// Default is 16 * number of CPUs.
MaxFetchers int
// Number of messages reserved by a fetcher in the queue in one request.
// Default is 10 messages.
ReservationSize int
// Time after which the reserved message is returned to the queue.
// Default is 5 minutes.
ReservationTimeout time.Duration
// Time that a long polling receive call waits for a message to become
// available before returning an empty response.
// Default is 10 seconds.
WaitTimeout time.Duration
// Size of the buffer where reserved messages are stored.
// Default is the same as ReservationSize.
BufferSize int
// Number of consecutive failures after which queue processing is paused.
// Default is 100 failures.
PauseErrorsThreshold int
// Processing rate limit.
RateLimit rate.Limit
// Optional rate limiter interface. The default is to use Redis.
RateLimiter RateLimiter
// Redis client that is used for storing metadata.
Redis Redis
// Optional storage interface. The default is to use Redis.
Storage Storage
// contains filtered or unexported fields
}
func (*QueueOptions) Init ¶
func (opt *QueueOptions) Init()
type Queuer ¶
type Queuer interface {
Name() string
Options() *QueueOptions
Consumer() *Consumer
Len() (int, error)
Add(msg *Message) error
ReserveN(n int, waitTimeout time.Duration) ([]Message, error)
Release(msg *Message) error
Delete(msg *Message) error
Purge() error
Close() error
CloseTimeout(timeout time.Duration) error
}
type RateLimiter ¶
type Redis ¶
type Redis interface {
Del(keys ...string) *redis.IntCmd
SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
Pipelined(func(pipe redis.Pipeliner) error) ([]redis.Cmder, error)
// Required by redislock
Eval(script string, keys []string, args ...interface{}) *redis.Cmd
EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
ScriptExists(scripts ...string) *redis.BoolSliceCmd
ScriptLoad(script string) *redis.StringCmd
}
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
func NewTask ¶
func NewTask(opt *TaskOptions) *Task
func (*Task) HandleMessage ¶
func (*Task) OnceWithArgs ¶
func (*Task) Options ¶
func (t *Task) Options() *TaskOptions
func (*Task) WithMessage ¶
type TaskOptions ¶
type TaskOptions struct {
// Task name.
Name string
// Function called to process a message.
Handler interface{}
// Function called to process failed message.
FallbackHandler interface{}
// Optional function used by Consumer with defer statement
// to recover from panics.
DeferFunc func()
// Number of tries/releases after which the message fails permanently
// and is deleted.
// Default is 64 retries.
RetryLimit int
// Minimum backoff time between retries.
// Default is 30 seconds.
MinBackoff time.Duration
// Maximum backoff time between retries.
// Default is 30 minutes.
MaxBackoff time.Duration
// contains filtered or unexported fields
}
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
api_worker/api
command
|
|
|
api_worker/worker
command
|
|
|
sqs_api_worker/api
command
|
|
|
sqs_api_worker/worker
command
|
|