Documentation
¶
Overview ¶
Package taskq implements task/job queue with Redis, SQS, IronMQ, and in-memory backends.
Example (CustomRateLimit) ¶
package main
import (
"context"
"fmt"
"time"
"github.com/vmihailenco/taskq/memqueue/v4"
"github.com/vmihailenco/taskq/v4"
)
type RateLimitError string
func (e RateLimitError) Error() string {
return string(e)
}
func (RateLimitError) Delay() time.Duration {
return 3 * time.Second
}
func main() {
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueConfig{
Name: "test",
})
task := taskq.RegisterTask("Example_customRateLimit", &taskq.TaskConfig{
Handler: func() error {
fmt.Println("retried in", timeSince(start))
return RateLimitError("calm down")
},
RetryLimit: 2,
MinBackoff: time.Millisecond,
})
ctx := context.Background()
q.AddJob(ctx, task.NewJob())
// Wait for all messages to be processed.
_ = q.Close()
}
Output: retried in 0s retried in 3s
Example (MessageDelay) ¶
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueConfig{
Name: "test",
})
task := taskq.RegisterTask("Example_messageDelay", &taskq.TaskConfig{
Handler: func() {
fmt.Println("processed with delay", timeSince(start))
},
})
ctx := context.Background()
msg := task.NewJob()
msg.Delay = time.Second
_ = q.AddJob(ctx, msg)
// Wait for all messages to be processed.
_ = q.Close()
Output: processed with delay 1s
Example (Once) ¶
q := memqueue.NewQueue(&taskq.QueueConfig{
Name: "test",
Redis: redisRing(),
RateLimit: redis_rate.PerSecond(1),
})
task := taskq.RegisterTask("Example_once", &taskq.TaskConfig{
Handler: func(name string) {
fmt.Println("hello", name)
},
})
ctx := context.Background()
for i := 0; i < 10; i++ {
msg := task.NewJob("world")
// Call once in a second.
msg.OnceInPeriod(time.Second)
_ = q.AddJob(ctx, msg)
}
// Wait for all messages to be processed.
_ = q.Close()
Output: hello world
Example (RateLimit) ¶
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueConfig{
Name: "test",
Redis: redisRing(),
RateLimit: redis_rate.PerSecond(1),
})
task := taskq.RegisterTask("Example_rateLimit", &taskq.TaskConfig{
Handler: func() {},
})
const n = 5
ctx := context.Background()
for i := 0; i < n; i++ {
_ = q.AddJob(ctx, task.NewJob())
}
// Wait for all messages to be processed.
_ = q.Close()
fmt.Printf("%d msg/s", timeSinceCeil(start)/time.Second/n)
Output: 1 msg/s
Example (RetryOnError) ¶
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueConfig{
Name: "test",
})
task := taskq.RegisterTask("Example_retryOnError", &taskq.TaskConfig{
Handler: func() error {
fmt.Println("retried in", timeSince(start))
return errors.New("fake error")
},
RetryLimit: 3,
MinBackoff: time.Second,
})
ctx := context.Background()
q.AddJob(ctx, task.NewJob())
// Wait for all messages to be processed.
_ = q.Close()
Output: retried in 0s retried in 1s retried in 3s
Index ¶
- Variables
- func SetLogger(logger logr.Logger)
- func SetUnknownTaskConfig(opt *TaskConfig)
- func Version() string
- type Consumer
- func (c *Consumer) AddHook(hook ConsumerHook)
- func (c *Consumer) AddJob(ctx context.Context, job *Job) error
- func (c *Consumer) Len() int
- func (c *Consumer) Options() *QueueConfig
- func (c *Consumer) Process(ctx context.Context, job *Job) error
- func (c *Consumer) ProcessAll(ctx context.Context) error
- func (c *Consumer) ProcessOne(ctx context.Context) error
- func (c *Consumer) Purge(ctx context.Context) error
- func (c *Consumer) Put(ctx context.Context, job *Job)
- func (c *Consumer) Queue() Queue
- func (c *Consumer) Start(ctx context.Context) error
- func (c *Consumer) Stats() *ConsumerStats
- func (c *Consumer) Stop() error
- func (c *Consumer) StopTimeout(timeout time.Duration) error
- func (c *Consumer) String() string
- type ConsumerHook
- type ConsumerStats
- type Delayer
- type Factory
- type Handler
- type HandlerFunc
- type Job
- func (m *Job) MarshalArgs() ([]byte, error)
- func (m *Job) MarshalBinary() ([]byte, error)
- func (m *Job) OnceInPeriod(period time.Duration, args ...interface{})
- func (m *Job) OnceWithDelay(delay time.Duration)
- func (m *Job) OnceWithSchedule(tm time.Time)
- func (m *Job) SetDelay(delay time.Duration)
- func (m *Job) String() string
- func (m *Job) UnmarshalBinary(b []byte) error
- type ProcessJobEvent
- type Queue
- type QueueConfig
- type QueueConsumer
- type Redis
- type Storage
- type Task
- type TaskConfig
- type TaskMap
- func (r *TaskMap) Get(name string) *Task
- func (r *TaskMap) HandleJob(ctx context.Context, msg *Job) error
- func (r *TaskMap) Range(fn func(name string, task *Task) bool)
- func (r *TaskMap) Register(name string, opt *TaskConfig) (*Task, error)
- func (r *TaskMap) Reset()
- func (r *TaskMap) Unregister(task *Task)
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrAsyncTask = errors.New("taskq: async task")
var ErrDuplicate = errors.New("taskq: message with such name already exists")
ErrDuplicate is returned when adding duplicate message to the queue.
Functions ¶
func SetUnknownTaskConfig ¶
func SetUnknownTaskConfig(opt *TaskConfig)
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer reserves messages from the queue, processes them, and then either releases or deletes messages from the queue.
func NewConsumer ¶
NewConsumer creates new Consumer for the queue using provided processing options.
func StartConsumer ¶
StartConsumer creates new QueueConsumer and starts it.
func (*Consumer) AddHook ¶
func (c *Consumer) AddHook(hook ConsumerHook)
AddHook adds a hook into message processing.
func (*Consumer) Options ¶
func (c *Consumer) Options() *QueueConfig
func (*Consumer) Process ¶
Process is low-level API to process message bypassing the internal queue.
func (*Consumer) ProcessAll ¶
ProcessAll starts workers to process messages in the queue and then stops them when all messages are processed.
func (*Consumer) ProcessOne ¶
ProcessOne processes at most one message in the queue.
func (*Consumer) StopTimeout ¶
StopTimeout waits workers for timeout duration to finish processing current messages and stops workers.
type ConsumerHook ¶
type ConsumerHook interface {
BeforeProcessJob(context.Context, *ProcessJobEvent) context.Context
AfterProcessJob(context.Context, *ProcessJobEvent)
}
type ConsumerStats ¶
type Factory ¶
type Factory interface {
RegisterQueue(*QueueConfig) Queue
Range(func(Queue) bool)
StartConsumers(context.Context) error
StopConsumers() error
Close() error
}
Factory is an interface that abstracts creation of new queues. It is implemented in subpackages memqueue, azsqs, and ironmq.
type Handler ¶
Handler is an interface for processing messages.
func NewHandler ¶
func NewHandler(fn interface{}) Handler
type HandlerFunc ¶
type Job ¶
type Job struct {
// SQS/IronMQ message id.
ID string `msgpack:"1,omitempty,alias:ID"`
// Optional name for the message. Jobs with the same name
// are processed only once.
Name string `msgpack:"-"`
// Delay specifies the duration the queue must wait
// before executing the message.
Delay time.Duration `msgpack:"-"`
// Args passed to the handler.
Args []interface{} `msgpack:"-"`
// Binary representation of the args.
ArgsCompression string `msgpack:"2,omitempty,alias:ArgsCompression"`
ArgsBin []byte `msgpack:"3,alias:ArgsBin"`
// SQS/IronMQ reservation id that is used to release/delete the message.
ReservationID string `msgpack:"-"`
// The number of times the message has been reserved or released.
ReservedCount int `msgpack:"4,omitempty,alias:ReservedCount"`
TaskName string `msgpack:"5,alias:TaskName"`
Err error `msgpack:"-"`
// contains filtered or unexported fields
}
Job is used to create and retrieve messages from a queue.
func (*Job) MarshalArgs ¶
func (*Job) MarshalBinary ¶
func (*Job) OnceInPeriod ¶
OnceInPeriod uses the period and the args to generate such a message name that message with such args is added to the queue once in a given period. If args are not provided then message args are used instead.
func (*Job) OnceWithDelay ¶
func (*Job) OnceWithSchedule ¶
func (*Job) UnmarshalBinary ¶
type ProcessJobEvent ¶
type Queue ¶
type Queue interface {
fmt.Stringer
Name() string
Options() *QueueConfig
Consumer() QueueConsumer
Len(ctx context.Context) (int, error)
AddJob(ctx context.Context, msg *Job) error
ReserveN(ctx context.Context, n int, waitTimeout time.Duration) ([]Job, error)
Release(ctx context.Context, msg *Job) error
Delete(ctx context.Context, msg *Job) error
Purge(ctx context.Context) error
Close() error
CloseTimeout(timeout time.Duration) error
}
type QueueConfig ¶
type QueueConfig struct {
// Queue name.
Name string
NumWorker int
// Global limit of concurrently running workers across all servers.
// Overrides NumWorker.
WorkerLimit int
// Maximum number of goroutines fetching messages.
// Default is 8 * number of CPUs.
NumFetcher int
// Number of messages reserved by a fetcher in the queue in one request.
// Default is 10 messages.
ReservationSize int
// Time after which the reserved message is returned to the queue.
// Default is 5 minutes.
ReservationTimeout time.Duration
// Time that a long polling receive call waits for a message to become
// available before returning an empty response.
// Default is 10 seconds.
WaitTimeout time.Duration
// Size of the buffer where reserved messages are stored.
// Default is the same as ReservationSize.
BufferSize int
// Number of consecutive failures after which queue processing is paused.
// Default is 100 failures.
PauseErrorsThreshold int
// Processing rate limit.
RateLimit redis_rate.Limit
// Optional rate limiter. The default is to use Redis.
RateLimiter *redis_rate.Limiter
// Redis client that is used for storing metadata.
Redis Redis
// Optional storage interface. The default is to use Redis.
Storage Storage
// Optional message handler. The default is the global Tasks registry.
Handler Handler
// ConsumerIdleTimeout Time after which the consumer need to be deleted.
// Default is 6 hour
ConsumerIdleTimeout time.Duration
// SchedulerBackoffTime is the time of backoff for the scheduler(
// Scheduler was designed to clean zombie Consumer and requeue pending msgs, and so on.
// Default is randomly between 1~1.5s
// We can change it to a bigger value so that it won't slowdown the redis when using redis queue.
// It will be between SchedulerBackoffTime and SchedulerBackoffTime+250ms.
SchedulerBackoffTime time.Duration
// contains filtered or unexported fields
}
func (*QueueConfig) Init ¶
func (opt *QueueConfig) Init()
type QueueConsumer ¶
type QueueConsumer interface {
// AddHook adds a hook into message processing.
AddHook(hook ConsumerHook)
Queue() Queue
Options() *QueueConfig
Len() int
// Stats returns processor stats.
Stats() *ConsumerStats
AddJob(ctx context.Context, job *Job) error
// Start starts consuming messages in the queue.
Start(ctx context.Context) error
// Stop is StopTimeout with 30 seconds timeout.
Stop() error
// StopTimeout waits workers for timeout duration to finish processing current
// messages and stops workers.
StopTimeout(timeout time.Duration) error
// ProcessAll starts workers to process messages in the queue and then stops
// them when all messages are processed.
ProcessAll(ctx context.Context) error
// ProcessOne processes at most one message in the queue.
ProcessOne(ctx context.Context) error
// Process is low-level API to process message bypassing the internal queue.
Process(ctx context.Context, msg *Job) error
Put(ctx context.Context, msg *Job)
// Purge discards messages from the internal queue.
Purge(ctx context.Context) error
String() string
}
QueueConsumer reserves messages from the queue, processes them, and then either releases or deletes messages from the queue.
type Storage ¶
func NewLocalStorage ¶
func NewLocalStorage() Storage
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
func RegisterTask ¶
func RegisterTask(name string, opt *TaskConfig) *Task
func (*Task) Options ¶
func (t *Task) Options() *TaskConfig
type TaskConfig ¶
type TaskConfig struct {
// Function called to process a message.
// There are three permitted types of signature:
// 1. A zero-argument function
// 2. A function whose arguments are assignable in type from those which are passed in the message
// 3. A function which takes a single `*Job` argument
// The handler function may also optionally take a Context as a first argument and may optionally return an error.
// If the handler takes a Context, when it is invoked it will be passed the same Context as that which was passed to
// `StartConsumer`. If the handler returns a non-nil error the message processing will fail and will be retried/.
Handler interface{}
// Function called to process failed message after the specified number of retries have all failed.
// The FallbackHandler accepts the same types of function as the Handler.
FallbackHandler interface{}
// Optional function used by Consumer with defer statement
// to recover from panics.
DeferFunc func()
// Number of tries/releases after which the message fails permanently
// and is deleted.
// Default is 64 retries.
RetryLimit int
// Minimum backoff time between retries.
// Default is 30 seconds.
MinBackoff time.Duration
// Maximum backoff time between retries.
// Default is 30 minutes.
MaxBackoff time.Duration
// contains filtered or unexported fields
}
type TaskMap ¶
type TaskMap struct {
// contains filtered or unexported fields
}
var Tasks TaskMap
