rdb

package
v0.11.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2020 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package rdb encapsulates the interactions with redis.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoProcessableTask indicates that there are no tasks ready to be processed.
	ErrNoProcessableTask = errors.New("no tasks are ready for processing")

	// ErrTaskNotFound indicates that a task that matches the given identifier was not found.
	ErrTaskNotFound = errors.New("could not find a task")

	// ErrDuplicateTask indicates that another task with the same unique key holds the uniqueness lock.
	ErrDuplicateTask = errors.New("task already exists")
)

Functions

This section is empty.

Types

type DailyStats

type DailyStats struct {
	Processed int
	Failed    int
	Time      time.Time
}

DailyStats holds aggregate data for a given day.

type ErrQueueNotEmpty

type ErrQueueNotEmpty struct {
	// contains filtered or unexported fields
}

ErrQueueNotEmpty indicates specified queue is not empty.

func (*ErrQueueNotEmpty) Error

func (e *ErrQueueNotEmpty) Error() string

type ErrQueueNotFound

type ErrQueueNotFound struct {
	// contains filtered or unexported fields
}

ErrQueueNotFound indicates specified queue does not exist.

func (*ErrQueueNotFound) Error

func (e *ErrQueueNotFound) Error() string

type Pagination

type Pagination struct {
	// Number of items in the page.
	Size int

	// Page number starting from zero.
	Page int
}

Pagination specifies the page size and page number for the list operation.

type Queue

type Queue struct {
	// Name of the queue (e.g. "default", "critical").
	// Note: It doesn't include the prefix "asynq:queues:".
	Name string

	// Paused indicates whether the queue is paused.
	// If true, tasks in the queue should not be processed.
	Paused bool

	// Size is the number of tasks in the queue.
	Size int
}

Queue represents a task queue.

type RDB

type RDB struct {
	// contains filtered or unexported fields
}

RDB is a client interface to query and mutate task queues.

func NewRDB

func NewRDB(client *redis.Client) *RDB

NewRDB returns a new instance of RDB.

func (*RDB) CancelationPubSub

func (r *RDB) CancelationPubSub() (*redis.PubSub, error)

CancelationPubSub returns a pubsub for cancelation messages.

func (*RDB) CheckAndEnqueue

func (r *RDB) CheckAndEnqueue() (err error)

CheckAndEnqueue checks for all scheduled/retry tasks and enqueues any tasks that are ready to be processed.

func (*RDB) ClearServerState

func (r *RDB) ClearServerState(host string, pid int, serverID string) error

ClearServerState deletes server state data from redis.

func (*RDB) Close

func (r *RDB) Close() error

Close closes the connection with redis server.

func (*RDB) CurrentStats

func (r *RDB) CurrentStats() (*Stats, error)

CurrentStats returns a current state of the queues.

func (*RDB) DeleteAllDeadTasks

func (r *RDB) DeleteAllDeadTasks() (int64, error)

DeleteAllDeadTasks deletes all tasks from the dead queue and returns the number of tasks deleted.

func (*RDB) DeleteAllRetryTasks

func (r *RDB) DeleteAllRetryTasks() (int64, error)

DeleteAllRetryTasks deletes all tasks from the dead queue and returns the number of tasks deleted.

func (*RDB) DeleteAllScheduledTasks

func (r *RDB) DeleteAllScheduledTasks() (int64, error)

DeleteAllScheduledTasks deletes all tasks from the dead queue and returns the number of tasks deleted.

func (*RDB) DeleteDeadTask

func (r *RDB) DeleteDeadTask(id uuid.UUID, score int64) error

DeleteDeadTask finds a task that matches the given id and score from dead queue and deletes it. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) DeleteRetryTask

func (r *RDB) DeleteRetryTask(id uuid.UUID, score int64) error

DeleteRetryTask finds a task that matches the given id and score from retry queue and deletes it. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) DeleteScheduledTask

func (r *RDB) DeleteScheduledTask(id uuid.UUID, score int64) error

DeleteScheduledTask finds a task that matches the given id and score from scheduled queue and deletes it. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) Dequeue

func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Time, err error)

Dequeue queries given queues in order and pops a task message off a queue if one exists and returns the message and deadline. Dequeue skips a queue if the queue is paused. If all queues are empty, ErrNoProcessableTask error is returned.

func (*RDB) Done

func (r *RDB) Done(msg *base.TaskMessage) error

Done removes the task from in-progress queue to mark the task as done. It removes a uniqueness lock acquired by the task, if any.

func (*RDB) Enqueue

func (r *RDB) Enqueue(msg *base.TaskMessage) error

Enqueue inserts the given task to the tail of the queue.

func (*RDB) EnqueueAllDeadTasks

func (r *RDB) EnqueueAllDeadTasks() (int64, error)

EnqueueAllDeadTasks enqueues all tasks from dead queue and returns the number of tasks enqueued.

func (*RDB) EnqueueAllRetryTasks

func (r *RDB) EnqueueAllRetryTasks() (int64, error)

EnqueueAllRetryTasks enqueues all tasks from retry queue and returns the number of tasks enqueued.

func (*RDB) EnqueueAllScheduledTasks

func (r *RDB) EnqueueAllScheduledTasks() (int64, error)

EnqueueAllScheduledTasks enqueues all tasks from scheduled queue and returns the number of tasks enqueued.

func (*RDB) EnqueueDeadTask

func (r *RDB) EnqueueDeadTask(id uuid.UUID, score int64) error

EnqueueDeadTask finds a task that matches the given id and score from dead queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) EnqueueRetryTask

func (r *RDB) EnqueueRetryTask(id uuid.UUID, score int64) error

EnqueueRetryTask finds a task that matches the given id and score from retry queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) EnqueueScheduledTask

func (r *RDB) EnqueueScheduledTask(id uuid.UUID, score int64) error

EnqueueScheduledTask finds a task that matches the given id and score from scheduled queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) EnqueueUnique

func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error

EnqueueUnique inserts the given task if the task's uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.

func (*RDB) HistoricalStats

func (r *RDB) HistoricalStats(n int) ([]*DailyStats, error)

HistoricalStats returns a list of stats from the last n days.

func (*RDB) Kill

func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error

Kill sends the task to "dead" queue from in-progress queue, assigning the error message to the task. It also trims the set by timestamp and set size.

func (*RDB) KillAllRetryTasks

func (r *RDB) KillAllRetryTasks() (int64, error)

KillAllRetryTasks moves all tasks from retry queue to dead queue and returns the number of tasks that were moved.

func (*RDB) KillAllScheduledTasks

func (r *RDB) KillAllScheduledTasks() (int64, error)

KillAllScheduledTasks moves all tasks from scheduled queue to dead queue and returns the number of tasks that were moved.

func (*RDB) KillRetryTask

func (r *RDB) KillRetryTask(id uuid.UUID, score int64) error

KillRetryTask finds a task that matches the given id and score from retry queue and moves it to dead queue. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) KillScheduledTask

func (r *RDB) KillScheduledTask(id uuid.UUID, score int64) error

KillScheduledTask finds a task that matches the given id and score from scheduled queue and moves it to dead queue. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) ListDead

func (r *RDB) ListDead(pgn Pagination) ([]base.Z, error)

ListDead returns all tasks that have exhausted its retry limit.

func (*RDB) ListDeadlineExceeded

func (r *RDB) ListDeadlineExceeded(deadline time.Time) ([]*base.TaskMessage, error)

ListDeadlineExceeded returns a list of task messages that have exceeded the given deadline.

func (*RDB) ListEnqueued

func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*base.TaskMessage, error)

ListEnqueued returns enqueued tasks that are ready to be processed.

func (*RDB) ListInProgress

func (r *RDB) ListInProgress(pgn Pagination) ([]*base.TaskMessage, error)

ListInProgress returns all tasks that are currently being processed.

func (*RDB) ListRetry

func (r *RDB) ListRetry(pgn Pagination) ([]base.Z, error)

ListRetry returns all tasks that have failed before and willl be retried in the future.

func (*RDB) ListScheduled

func (r *RDB) ListScheduled(pgn Pagination) ([]base.Z, error)

ListScheduled returns all tasks that are scheduled to be processed in the future.

func (*RDB) ListServers

func (r *RDB) ListServers() ([]*base.ServerInfo, error)

ListServers returns the list of server info.

func (*RDB) ListWorkers

func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error)

ListWorkers returns the list of worker stats.

func (*RDB) Pause

func (r *RDB) Pause(qname string) error

Pause pauses processing of tasks from the given queue.

func (*RDB) Ping added in v0.11.1

func (r *RDB) Ping() error

Ping checks the connection with redis server.

func (*RDB) PublishCancelation

func (r *RDB) PublishCancelation(id string) error

PublishCancelation publish cancelation message to all subscribers. The message is the ID for the task to be canceled.

func (*RDB) RedisInfo

func (r *RDB) RedisInfo() (map[string]string, error)

RedisInfo returns a map of redis info.

func (*RDB) RemoveQueue

func (r *RDB) RemoveQueue(qname string, force bool) error

RemoveQueue removes the specified queue.

If force is set to true, it will remove the queue regardless of whether the queue is empty. If force is set to false, it will only remove the queue if it is empty.

func (*RDB) Requeue

func (r *RDB) Requeue(msg *base.TaskMessage) error

Requeue moves the task from in-progress queue to the specified queue.

func (*RDB) Retry

func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error

Retry moves the task from in-progress to retry queue, incrementing retry count and assigning error message to the task message.

func (*RDB) Schedule

func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error

Schedule adds the task to the backlog queue to be processed in the future.

func (*RDB) ScheduleUnique

func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error

ScheduleUnique adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.

func (*RDB) Unpause

func (r *RDB) Unpause(qname string) error

Unpause resumes processing of tasks from the given queue.

func (*RDB) WriteServerState

func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error

WriteServerState writes server state data to redis with expiration set to the value ttl.

type Stats

type Stats struct {
	Enqueued   int
	InProgress int
	Scheduled  int
	Retry      int
	Dead       int
	Processed  int
	Failed     int
	Queues     []*Queue
	Timestamp  time.Time
}

Stats represents a state of queues at a certain time.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL