Documentation
¶
Overview ¶
Package rdb encapsulates the interactions with redis.
Index ¶
- Variables
- type DailyStats
- type ErrQueueNotEmpty
- type ErrQueueNotFound
- type Pagination
- type Queue
- type RDB
- func (r *RDB) CancelationPubSub() (*redis.PubSub, error)
- func (r *RDB) CheckAndEnqueue() (err error)
- func (r *RDB) ClearServerState(host string, pid int, serverID string) error
- func (r *RDB) Close() error
- func (r *RDB) CurrentStats() (*Stats, error)
- func (r *RDB) DeleteAllDeadTasks() (int64, error)
- func (r *RDB) DeleteAllRetryTasks() (int64, error)
- func (r *RDB) DeleteAllScheduledTasks() (int64, error)
- func (r *RDB) DeleteDeadTask(id uuid.UUID, score int64) error
- func (r *RDB) DeleteRetryTask(id uuid.UUID, score int64) error
- func (r *RDB) DeleteScheduledTask(id uuid.UUID, score int64) error
- func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Time, err error)
- func (r *RDB) Done(msg *base.TaskMessage) error
- func (r *RDB) Enqueue(msg *base.TaskMessage) error
- func (r *RDB) EnqueueAllDeadTasks() (int64, error)
- func (r *RDB) EnqueueAllRetryTasks() (int64, error)
- func (r *RDB) EnqueueAllScheduledTasks() (int64, error)
- func (r *RDB) EnqueueDeadTask(id uuid.UUID, score int64) error
- func (r *RDB) EnqueueRetryTask(id uuid.UUID, score int64) error
- func (r *RDB) EnqueueScheduledTask(id uuid.UUID, score int64) error
- func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error
- func (r *RDB) HistoricalStats(n int) ([]*DailyStats, error)
- func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error
- func (r *RDB) KillAllRetryTasks() (int64, error)
- func (r *RDB) KillAllScheduledTasks() (int64, error)
- func (r *RDB) KillRetryTask(id uuid.UUID, score int64) error
- func (r *RDB) KillScheduledTask(id uuid.UUID, score int64) error
- func (r *RDB) ListDead(pgn Pagination) ([]base.Z, error)
- func (r *RDB) ListDeadlineExceeded(deadline time.Time) ([]*base.TaskMessage, error)
- func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*base.TaskMessage, error)
- func (r *RDB) ListInProgress(pgn Pagination) ([]*base.TaskMessage, error)
- func (r *RDB) ListRetry(pgn Pagination) ([]base.Z, error)
- func (r *RDB) ListScheduled(pgn Pagination) ([]base.Z, error)
- func (r *RDB) ListServers() ([]*base.ServerInfo, error)
- func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error)
- func (r *RDB) Pause(qname string) error
- func (r *RDB) Ping() error
- func (r *RDB) PublishCancelation(id string) error
- func (r *RDB) RedisInfo() (map[string]string, error)
- func (r *RDB) RemoveQueue(qname string, force bool) error
- func (r *RDB) Requeue(msg *base.TaskMessage) error
- func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error
- func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error
- func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error
- func (r *RDB) Unpause(qname string) error
- func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error
- type Stats
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoProcessableTask indicates that there are no tasks ready to be processed. ErrNoProcessableTask = errors.New("no tasks are ready for processing") // ErrTaskNotFound indicates that a task that matches the given identifier was not found. ErrTaskNotFound = errors.New("could not find a task") // ErrDuplicateTask indicates that another task with the same unique key holds the uniqueness lock. ErrDuplicateTask = errors.New("task already exists") )
Functions ¶
This section is empty.
Types ¶
type DailyStats ¶
DailyStats holds aggregate data for a given day.
type ErrQueueNotEmpty ¶
type ErrQueueNotEmpty struct {
// contains filtered or unexported fields
}
ErrQueueNotEmpty indicates specified queue is not empty.
func (*ErrQueueNotEmpty) Error ¶
func (e *ErrQueueNotEmpty) Error() string
type ErrQueueNotFound ¶
type ErrQueueNotFound struct {
// contains filtered or unexported fields
}
ErrQueueNotFound indicates specified queue does not exist.
func (*ErrQueueNotFound) Error ¶
func (e *ErrQueueNotFound) Error() string
type Pagination ¶
type Pagination struct {
// Number of items in the page.
Size int
// Page number starting from zero.
Page int
}
Pagination specifies the page size and page number for the list operation.
type Queue ¶
type Queue struct {
// Name of the queue (e.g. "default", "critical").
// Note: It doesn't include the prefix "asynq:queues:".
Name string
// Paused indicates whether the queue is paused.
// If true, tasks in the queue should not be processed.
Paused bool
// Size is the number of tasks in the queue.
Size int
}
Queue represents a task queue.
type RDB ¶
type RDB struct {
// contains filtered or unexported fields
}
RDB is a client interface to query and mutate task queues.
func (*RDB) CancelationPubSub ¶
CancelationPubSub returns a pubsub for cancelation messages.
func (*RDB) CheckAndEnqueue ¶
CheckAndEnqueue checks for all scheduled/retry tasks and enqueues any tasks that are ready to be processed.
func (*RDB) ClearServerState ¶
ClearServerState deletes server state data from redis.
func (*RDB) CurrentStats ¶
CurrentStats returns a current state of the queues.
func (*RDB) DeleteAllDeadTasks ¶
DeleteAllDeadTasks deletes all tasks from the dead queue and returns the number of tasks deleted.
func (*RDB) DeleteAllRetryTasks ¶
DeleteAllRetryTasks deletes all tasks from the dead queue and returns the number of tasks deleted.
func (*RDB) DeleteAllScheduledTasks ¶
DeleteAllScheduledTasks deletes all tasks from the dead queue and returns the number of tasks deleted.
func (*RDB) DeleteDeadTask ¶
DeleteDeadTask finds a task that matches the given id and score from dead queue and deletes it. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) DeleteRetryTask ¶
DeleteRetryTask finds a task that matches the given id and score from retry queue and deletes it. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) DeleteScheduledTask ¶
DeleteScheduledTask finds a task that matches the given id and score from scheduled queue and deletes it. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) Dequeue ¶
Dequeue queries given queues in order and pops a task message off a queue if one exists and returns the message and deadline. Dequeue skips a queue if the queue is paused. If all queues are empty, ErrNoProcessableTask error is returned.
func (*RDB) Done ¶
func (r *RDB) Done(msg *base.TaskMessage) error
Done removes the task from in-progress queue to mark the task as done. It removes a uniqueness lock acquired by the task, if any.
func (*RDB) Enqueue ¶
func (r *RDB) Enqueue(msg *base.TaskMessage) error
Enqueue inserts the given task to the tail of the queue.
func (*RDB) EnqueueAllDeadTasks ¶
EnqueueAllDeadTasks enqueues all tasks from dead queue and returns the number of tasks enqueued.
func (*RDB) EnqueueAllRetryTasks ¶
EnqueueAllRetryTasks enqueues all tasks from retry queue and returns the number of tasks enqueued.
func (*RDB) EnqueueAllScheduledTasks ¶
EnqueueAllScheduledTasks enqueues all tasks from scheduled queue and returns the number of tasks enqueued.
func (*RDB) EnqueueDeadTask ¶
EnqueueDeadTask finds a task that matches the given id and score from dead queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) EnqueueRetryTask ¶
EnqueueRetryTask finds a task that matches the given id and score from retry queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) EnqueueScheduledTask ¶
EnqueueScheduledTask finds a task that matches the given id and score from scheduled queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) EnqueueUnique ¶
EnqueueUnique inserts the given task if the task's uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.
func (*RDB) HistoricalStats ¶
func (r *RDB) HistoricalStats(n int) ([]*DailyStats, error)
HistoricalStats returns a list of stats from the last n days.
func (*RDB) Kill ¶
func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error
Kill sends the task to "dead" queue from in-progress queue, assigning the error message to the task. It also trims the set by timestamp and set size.
func (*RDB) KillAllRetryTasks ¶
KillAllRetryTasks moves all tasks from retry queue to dead queue and returns the number of tasks that were moved.
func (*RDB) KillAllScheduledTasks ¶
KillAllScheduledTasks moves all tasks from scheduled queue to dead queue and returns the number of tasks that were moved.
func (*RDB) KillRetryTask ¶
KillRetryTask finds a task that matches the given id and score from retry queue and moves it to dead queue. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) KillScheduledTask ¶
KillScheduledTask finds a task that matches the given id and score from scheduled queue and moves it to dead queue. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) ListDead ¶
func (r *RDB) ListDead(pgn Pagination) ([]base.Z, error)
ListDead returns all tasks that have exhausted its retry limit.
func (*RDB) ListDeadlineExceeded ¶
ListDeadlineExceeded returns a list of task messages that have exceeded the given deadline.
func (*RDB) ListEnqueued ¶
func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*base.TaskMessage, error)
ListEnqueued returns enqueued tasks that are ready to be processed.
func (*RDB) ListInProgress ¶
func (r *RDB) ListInProgress(pgn Pagination) ([]*base.TaskMessage, error)
ListInProgress returns all tasks that are currently being processed.
func (*RDB) ListRetry ¶
func (r *RDB) ListRetry(pgn Pagination) ([]base.Z, error)
ListRetry returns all tasks that have failed before and willl be retried in the future.
func (*RDB) ListScheduled ¶
func (r *RDB) ListScheduled(pgn Pagination) ([]base.Z, error)
ListScheduled returns all tasks that are scheduled to be processed in the future.
func (*RDB) ListServers ¶
func (r *RDB) ListServers() ([]*base.ServerInfo, error)
ListServers returns the list of server info.
func (*RDB) ListWorkers ¶
func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error)
ListWorkers returns the list of worker stats.
func (*RDB) PublishCancelation ¶
PublishCancelation publish cancelation message to all subscribers. The message is the ID for the task to be canceled.
func (*RDB) RemoveQueue ¶
RemoveQueue removes the specified queue.
If force is set to true, it will remove the queue regardless of whether the queue is empty. If force is set to false, it will only remove the queue if it is empty.
func (*RDB) Requeue ¶
func (r *RDB) Requeue(msg *base.TaskMessage) error
Requeue moves the task from in-progress queue to the specified queue.
func (*RDB) Retry ¶
Retry moves the task from in-progress to retry queue, incrementing retry count and assigning error message to the task message.
func (*RDB) ScheduleUnique ¶
ScheduleUnique adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.
func (*RDB) WriteServerState ¶
func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error
WriteServerState writes server state data to redis with expiration set to the value ttl.