Documentation
¶
Overview ¶
Package base defines foundational types and constants used in asynq package.
Index ¶
- Constants
- Variables
- func EncodeMessage(msg *TaskMessage) (string, error)
- func FailureKey(t time.Time) string
- func ProcessedKey(t time.Time) string
- func QueueKey(qname string) string
- func ServerInfoKey(hostname string, pid int, sid string) string
- func SetBasePrefix(prefix string)
- func WorkersKey(hostname string, pid int, sid string) string
- type Broker
- type Cancelations
- type ServerInfo
- type ServerStatus
- type ServerStatusValue
- type TaskMessage
- type WorkerInfo
- type Z
Constants ¶
const DefaultQueueName = "default"
DefaultQueueName is the queue name used if none are specified by user.
const Version = "0.10.0"
Version of asynq library and CLI.
Variables ¶
var ( AllServers = "asynq:servers" // ZSET AllWorkers = "asynq:workers" // ZSET QueuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname> AllQueues = "asynq:queues" // SET DefaultQueue = QueuePrefix + DefaultQueueName // LIST ScheduledQueue = "asynq:scheduled" // ZSET RetryQueue = "asynq:retry" // ZSET DeadQueue = "asynq:dead" // ZSET InProgressQueue = "asynq:in_progress" // LIST KeyDeadlines = "asynq:deadlines" // ZSET PausedQueues = "asynq:paused" // SET CancelChannel = "asynq:cancel" // PubSub channel )
Redis keys
Functions ¶
func EncodeMessage ¶
func EncodeMessage(msg *TaskMessage) (string, error)
EncodeMessage marshals the given task message in JSON and returns an encoded string.
func FailureKey ¶
FailureKey returns a redis key for failure count for the given day.
func ProcessedKey ¶
ProcessedKey returns a redis key for processed count for the given day.
func ServerInfoKey ¶
ServerInfoKey returns a redis key for process info.
func SetBasePrefix ¶
func SetBasePrefix(prefix string)
set base prefix only can set once before client or server launch
Types ¶
type Broker ¶
type Broker interface {
Ping() error
Enqueue(msg *TaskMessage) error
EnqueueUnique(msg *TaskMessage, ttl time.Duration) error
Dequeue(qnames ...string) (*TaskMessage, time.Time, error)
Done(msg *TaskMessage) error
Requeue(msg *TaskMessage) error
Schedule(msg *TaskMessage, processAt time.Time) error
ScheduleUnique(msg *TaskMessage, processAt time.Time, ttl time.Duration) error
Retry(msg *TaskMessage, processAt time.Time, errMsg string) error
Kill(msg *TaskMessage, errMsg string) error
CheckAndEnqueue() error
ListDeadlineExceeded(deadline time.Time) ([]*TaskMessage, error)
WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error
ClearServerState(host string, pid int, serverID string) error
CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers
PublishCancelation(id string) error
Close() error
}
Broker is a message broker that supports operations to manage task queues.
See rdb.RDB as a reference implementation.
type Cancelations ¶
type Cancelations struct {
// contains filtered or unexported fields
}
Cancelations is a collection that holds cancel functions for all in-progress tasks.
Cancelations are safe for concurrent use by multipel goroutines.
func NewCancelations ¶
func NewCancelations() *Cancelations
NewCancelations returns a Cancelations instance.
func (*Cancelations) Add ¶
func (c *Cancelations) Add(id string, fn context.CancelFunc)
Add adds a new cancel func to the collection.
func (*Cancelations) Delete ¶
func (c *Cancelations) Delete(id string)
Delete deletes a cancel func from the collection given an id.
func (*Cancelations) Get ¶
func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool)
Get returns a cancel func given an id.
type ServerInfo ¶
type ServerInfo struct {
Host string
PID int
ServerID string
Concurrency int
Queues map[string]int
StrictPriority bool
Status string
Started time.Time
ActiveWorkerCount int
}
ServerInfo holds information about a running server.
type ServerStatus ¶
type ServerStatus struct {
// contains filtered or unexported fields
}
ServerStatus represents status of a server. ServerStatus methods are concurrency safe.
func NewServerStatus ¶
func NewServerStatus(v ServerStatusValue) *ServerStatus
NewServerStatus returns a new status instance given an initial value.
func (*ServerStatus) Get ¶
func (s *ServerStatus) Get() ServerStatusValue
Get returns the status value.
func (*ServerStatus) Set ¶
func (s *ServerStatus) Set(v ServerStatusValue)
Set sets the status value.
func (*ServerStatus) String ¶
func (s *ServerStatus) String() string
type ServerStatusValue ¶
type ServerStatusValue int
const ( // StatusIdle indicates the server is in idle state. StatusIdle ServerStatusValue = iota // StatusRunning indicates the servier is up and processing tasks. StatusRunning // StatusQuiet indicates the server is up but not processing new tasks. StatusQuiet // StatusStopped indicates the server server has been stopped. StatusStopped )
type TaskMessage ¶
type TaskMessage struct {
// Type indicates the kind of the task to be performed.
Type string
// Payload holds data needed to process the task.
Payload map[string]interface{}
// ID is a unique identifier for each task.
ID uuid.UUID
// Queue is a name this message should be enqueued to.
Queue string
// Retry is the max number of retry for this task.
Retry int
// Retried is the number of times we've retried this task so far.
Retried int
// ErrorMsg holds the error message from the last failure.
ErrorMsg string
// Timeout specifies timeout in seconds.
// If task processing doesn't complete within the timeout, the task will be retried
// if retry count is remaining. Otherwise it will be moved to the dead queue.
//
// Use zero to indicate no timeout.
Timeout int64
// Deadline specifies the deadline for the task in Unix time,
// the number of seconds elapsed since January 1, 1970 UTC.
// If task processing doesn't complete before the deadline, the task will be retried
// if retry count is remaining. Otherwise it will be moved to the dead queue.
//
// Use zero to indicate no deadline.
Deadline int64
// UniqueKey holds the redis key used for uniqueness lock for this task.
//
// Empty string indicates that no uniqueness lock was used.
UniqueKey string
}
TaskMessage is the internal representation of a task with additional metadata fields. Serialized data of this type gets written to redis.
func DecodeMessage ¶
func DecodeMessage(s string) (*TaskMessage, error)
DecodeMessage unmarshals the given encoded string and returns a decoded task message.
type WorkerInfo ¶
type WorkerInfo struct {
Host string
PID int
ID string
Type string
Queue string
Payload map[string]interface{}
Started time.Time
}
WorkerInfo holds information about a running worker.
type Z ¶ added in v0.11.1
type Z struct {
Message *TaskMessage
Score int64
}
Z represents sorted set member.