Documentation
¶
Overview ¶
Package taskq implements task/job queue with Redis, SQS, IronMQ, and in-memory backends.
Example (CustomRateLimit) ¶
package main
import (
"context"
"fmt"
"time"
"github.com/frain-dev/taskq/v3"
"github.com/frain-dev/taskq/v3/memqueue"
)
type RateLimitError string
func (e RateLimitError) Error() string {
return string(e)
}
func (RateLimitError) Delay() time.Duration {
return 3 * time.Second
}
func main() {
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
Name: "test",
})
task := taskq.RegisterTask(&taskq.TaskOptions{
Name: "Example_customRateLimit",
Handler: func() error {
fmt.Println("retried in", timeSince(start))
return RateLimitError("calm down")
},
RetryLimit: 2,
MinBackoff: time.Millisecond,
})
ctx := context.Background()
q.Add(task.WithArgs(ctx))
// Wait for all messages to be processed.
_ = q.Close()
}
Output: retried in 0s retried in 3s
Example (MessageDelay) ¶
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
Name: "test",
})
task := taskq.RegisterTask(&taskq.TaskOptions{
Name: "Example_messageDelay",
Handler: func() {
fmt.Println("processed with delay", timeSince(start))
},
})
ctx := context.Background()
msg := task.WithArgs(ctx)
msg.Delay = time.Second
_ = q.Add(msg)
// Wait for all messages to be processed.
_ = q.Close()
Output: processed with delay 1s
Example (RetryOnError) ¶
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
Name: "test",
})
task := taskq.RegisterTask(&taskq.TaskOptions{
Name: "Example_retryOnError",
Handler: func() error {
fmt.Println("retried in", timeSince(start))
return errors.New("fake error")
},
RetryLimit: 3,
MinBackoff: time.Second,
})
ctx := context.Background()
q.Add(task.WithArgs(ctx))
// Wait for all messages to be processed.
_ = q.Close()
Output: retried in 0s retried in 1s retried in 3s
Index ¶
- Variables
- func NewConsumerConfig(numFetcher, numWorker int32) *consumerConfig
- func SetLogger(logger *log.Logger)
- func SetUnknownTaskOptions(opt *TaskOptions)
- func Version() string
- type Consumer
- func (c *Consumer) Add(msg *Message) error
- func (c *Consumer) AddHook(hook ConsumerHook)
- func (c *Consumer) Len() int
- func (c *Consumer) Options() *QueueOptions
- func (c *Consumer) Process(msg *Message) error
- func (c *Consumer) ProcessAll(ctx context.Context) error
- func (c *Consumer) ProcessOne(ctx context.Context) error
- func (c *Consumer) Purge() error
- func (c *Consumer) Put(msg *Message)
- func (c *Consumer) Queue() Queue
- func (c *Consumer) Start(ctx context.Context) error
- func (c *Consumer) Stats() *ConsumerStats
- func (c *Consumer) Stop() error
- func (c *Consumer) StopTimeout(timeout time.Duration) error
- func (c *Consumer) String() string
- type ConsumerHook
- type ConsumerStats
- type Delayer
- type Factory
- type Handler
- type HandlerFunc
- type Message
- func (m *Message) MarshalArgs() ([]byte, error)
- func (m *Message) MarshalBinary() ([]byte, error)
- func (m *Message) OnceInPeriod(period time.Duration, args ...interface{})
- func (m *Message) OnceWithDelay(delay time.Duration)
- func (m *Message) OnceWithSchedule(tm time.Time)
- func (m *Message) SetDelay(delay time.Duration)
- func (m *Message) String() string
- func (m *Message) UnmarshalBinary(b []byte) error
- type ProcessMessageEvent
- type Queue
- type QueueConsumer
- type QueueOptions
- type Redis
- type Storage
- type Task
- type TaskMap
- type TaskOptions
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrAsyncTask = errors.New("taskq: async task")
var ErrDuplicate = errors.New("taskq: message with such name already exists")
ErrDuplicate is returned when adding duplicate message to the queue.
Functions ¶
func NewConsumerConfig ¶ added in v3.2.10
func NewConsumerConfig(numFetcher, numWorker int32) *consumerConfig
func SetUnknownTaskOptions ¶
func SetUnknownTaskOptions(opt *TaskOptions)
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer reserves messages from the queue, processes them, and then either releases or deletes messages from the queue.
func NewConsumer ¶
NewConsumer creates new Consumer for the queue using provided processing options.
func StartConsumer ¶
StartConsumer creates new QueueConsumer and starts it.
func (*Consumer) AddHook ¶
func (c *Consumer) AddHook(hook ConsumerHook)
AddHook adds a hook into message processing.
func (*Consumer) Options ¶
func (c *Consumer) Options() *QueueOptions
func (*Consumer) Process ¶
Process is low-level API to process message bypassing the internal queue.
func (*Consumer) ProcessAll ¶
ProcessAll starts workers to process messages in the queue and then stops them when all messages are processed.
func (*Consumer) ProcessOne ¶
ProcessOne processes at most one message in the queue.
func (*Consumer) StopTimeout ¶
StopTimeout waits workers for timeout duration to finish processing current messages and stops workers.
type ConsumerHook ¶
type ConsumerHook interface {
BeforeProcessMessage(*ProcessMessageEvent) error
AfterProcessMessage(*ProcessMessageEvent) error
}
type ConsumerStats ¶
type Factory ¶
type Factory interface {
RegisterQueue(*QueueOptions) Queue
Range(func(Queue) bool)
StartConsumers(context.Context) error
StopConsumers() error
Close() error
}
Factory is an interface that abstracts creation of new queues. It is implemented in subpackages memqueue, azsqs, and ironmq.
type Handler ¶
Handler is an interface for processing messages.
func NewHandler ¶
func NewHandler(fn interface{}) Handler
type HandlerFunc ¶
func (HandlerFunc) HandleMessage ¶
func (fn HandlerFunc) HandleMessage(msg *Message) error
type Message ¶
type Message struct {
Ctx context.Context `msgpack:"-"`
// SQS/IronMQ message id.
ID string `msgpack:"1,omitempty,alias:ID"`
// Optional name for the message. Messages with the same name
// are processed only once.
Name string `msgpack:"-"`
// Delay specifies the duration the queue must wait
// before executing the message.
Delay time.Duration `msgpack:"-"`
// Args passed to the handler.
Args []interface{} `msgpack:"-"`
// Binary representation of the args.
ArgsCompression string `msgpack:"2,omitempty,alias:ArgsCompression"`
ArgsBin []byte `msgpack:"3,alias:ArgsBin"`
// SQS/IronMQ reservation id that is used to release/delete the message.
ReservationID string `msgpack:"-"`
// The number of times the message has been reserved or released.
ReservedCount int `msgpack:"4,omitempty,alias:ReservedCount"`
TaskName string `msgpack:"5,alias:TaskName"`
Err error `msgpack:"-"`
// contains filtered or unexported fields
}
Message is used to create and retrieve messages from a queue.
func NewMessage ¶
func (*Message) MarshalArgs ¶
func (*Message) MarshalBinary ¶
func (*Message) OnceInPeriod ¶
OnceInPeriod uses the period and the args to generate such a message name that message with such args is added to the queue once in a given period. If args are not provided then message args are used instead.
func (*Message) OnceWithDelay ¶
func (*Message) OnceWithSchedule ¶
func (*Message) UnmarshalBinary ¶
type ProcessMessageEvent ¶
type Queue ¶
type Queue interface {
fmt.Stringer
Name() string
Options() *QueueOptions
Consumer() QueueConsumer
Len() (int, error)
Add(msg *Message) error
ReserveN(ctx context.Context, n int, waitTimeout time.Duration) ([]Message, error)
Release(msg *Message) error
Delete(msg *Message) error
Purge() error
Close() error
CloseTimeout(timeout time.Duration) error
}
type QueueConsumer ¶
type QueueConsumer interface {
// AddHook adds a hook into message processing.
AddHook(hook ConsumerHook)
Queue() Queue
Options() *QueueOptions
Len() int
// Stats returns processor stats.
Stats() *ConsumerStats
Add(msg *Message) error
// Start starts consuming messages in the queue.
Start(ctx context.Context) error
// Stop is StopTimeout with 30 seconds timeout.
Stop() error
// StopTimeout waits workers for timeout duration to finish processing current
// messages and stops workers.
StopTimeout(timeout time.Duration) error
// ProcessAll starts workers to process messages in the queue and then stops
// them when all messages are processed.
ProcessAll(ctx context.Context) error
// ProcessOne processes at most one message in the queue.
ProcessOne(ctx context.Context) error
// Process is low-level API to process message bypassing the internal queue.
Process(msg *Message) error
Put(msg *Message)
// Purge discards messages from the internal queue.
Purge() error
String() string
}
QueueConsumer reserves messages from the queue, processes them, and then either releases or deletes messages from the queue.
type QueueOptions ¶
type QueueOptions struct {
// Queue name.
Name string
// Minimum number of goroutines processing messages.
// Default is 1.
MinNumWorker int32
// Maximum number of goroutines processing messages.
// Default is 32 * number of CPUs.
MaxNumWorker int32
// Global limit of concurrently running workers across all servers.
// Overrides MaxNumWorker.
WorkerLimit int32
// Maximum number of goroutines fetching messages.
// Default is 8 * number of CPUs.
MaxNumFetcher int32
// Number of messages reserved by a fetcher in the queue in one request.
// Default is 10 messages.
ReservationSize int
// Time after which the reserved message is returned to the queue.
// Default is 5 minutes.
ReservationTimeout time.Duration
// Time that a long polling receive call waits for a message to become
// available before returning an empty response.
// Default is 10 seconds.
WaitTimeout time.Duration
// Size of the buffer where reserved messages are stored.
// Default is the same as ReservationSize.
BufferSize int
// Number of consecutive failures after which queue processing is paused.
// Default is 100 failures.
PauseErrorsThreshold int
// Processing rate limit.
RateLimit redis_rate.Limit
// Optional rate limiter. The default is to use Redis.
RateLimiter *redis_rate.Limiter
// Redis client that is used for storing metadata.
Redis Redis
// Optional storage interface. The default is to use Redis.
Storage Storage
// Optional message handler. The default is the global Tasks registry.
Handler Handler
// ConsumerIdleTimeout Time after which the consumer need to be deleted.
// Default is 6 hour
ConsumerIdleTimeout time.Duration
// contains filtered or unexported fields
}
func (*QueueOptions) Init ¶
func (opt *QueueOptions) Init()
type Redis ¶
type Redis interface {
Del(ctx context.Context, keys ...string) *redis.IntCmd
SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd
Pipelined(ctx context.Context, fn func(pipe redis.Pipeliner) error) ([]redis.Cmder, error)
// Eval Required by redislock
Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd
EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd
ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd
ScriptLoad(ctx context.Context, script string) *redis.StringCmd
}
type Storage ¶
func NewLocalStorage ¶
func NewLocalStorage() Storage
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
func RegisterTask ¶
func RegisterTask(opt *TaskOptions) *Task
func (*Task) HandleMessage ¶
func (*Task) Options ¶
func (t *Task) Options() *TaskOptions
type TaskMap ¶
type TaskMap struct {
// contains filtered or unexported fields
}
var Tasks TaskMap
func (*TaskMap) HandleMessage ¶
func (*TaskMap) Unregister ¶
type TaskOptions ¶
type TaskOptions struct {
// Task name.
Name string
// Function called to process a message.
// There are three permitted types of signature:
// 1. A zero-argument function
// 2. A function whose arguments are assignable in type from those which are passed in the message
// 3. A function which takes a single `*Message` argument
// The handler function may also optionally take a Context as a first argument and may optionally return an error.
// If the handler takes a Context, when it is invoked it will be passed the same Context as that which was passed to
// `StartConsumer`. If the handler returns a non-nil error the message processing will fail and will be retried/.
Handler interface{}
// Function called to process failed message after the specified number of retries have all failed.
// The FallbackHandler accepts the same types of function as the Handler.
FallbackHandler interface{}
// Optional function used by Consumer with defer statement
// to recover from panics.
DeferFunc func()
// Number of tries/releases after which the message fails permanently
// and is deleted.
// Default is 64 retries.
RetryLimit int
// Minimum backoff time between retries.
// Default is 30 seconds.
MinBackoff time.Duration
// Maximum backoff time between retries.
// Default is 30 minutes.
MaxBackoff time.Duration
// contains filtered or unexported fields
}