Documentation
      ¶
    
    
  
    
  
    Overview ¶
Package base defines foundational types and constants used in asynq package.
Index ¶
- Constants
 - Variables
 - func ActiveKey(qname string) string
 - func AggregationSetKey(qname, gname, setID string) string
 - func AllAggregationSets(qname string) string
 - func AllGroups(qname string) string
 - func ArchivedKey(qname string) string
 - func CompletedKey(qname string) string
 - func EncodeMessage(msg *TaskMessage) ([]byte, error)
 - func EncodeSchedulerEnqueueEvent(event *SchedulerEnqueueEvent) ([]byte, error)
 - func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error)
 - func EncodeServerInfo(info *ServerInfo) ([]byte, error)
 - func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error)
 - func FailedKey(qname string, t time.Time) string
 - func FailedTotalKey(qname string) string
 - func GroupKey(qname, gkey string) string
 - func GroupKeyPrefix(qname string) string
 - func LeaseKey(qname string) string
 - func PausedKey(qname string) string
 - func PendingKey(qname string) string
 - func ProcessedKey(qname string, t time.Time) string
 - func ProcessedTotalKey(qname string) string
 - func QueueKeyPrefix(qname string) string
 - func RetryKey(qname string) string
 - func ScheduledKey(qname string) string
 - func SchedulerEntriesKey(schedulerID string) string
 - func SchedulerHistoryKey(entryID string) string
 - func ServerInfoKey(hostname string, pid int, serverID string) string
 - func TaskKey(qname, id string) string
 - func TaskKeyPrefix(qname string) string
 - func UniqueKey(qname, tasktype string, payload []byte) string
 - func ValidateQueueName(qname string) error
 - func WorkersKey(hostname string, pid int, serverID string) string
 - type Broker
 - type Cancelations
 - type Lease
 - type SchedulerEnqueueEvent
 - type SchedulerEntry
 - type ServerInfo
 - type TaskInfo
 - type TaskMessage
 - type TaskState
 - type WorkerInfo
 - type Z
 
Constants ¶
const ( AllServers = "asynq:servers" // ZSET AllWorkers = "asynq:workers" // ZSET AllSchedulers = "asynq:schedulers" // ZSET AllQueues = "asynq:queues" // SET CancelChannel = "asynq:cancel" // PubSub channel )
Global Redis keys.
const DefaultQueueName = "default"
    DefaultQueueName is the queue name used if none are specified by user.
const Version = "0.23.0"
    Version of asynq library and CLI.
Variables ¶
var DefaultQueue = PendingKey(DefaultQueueName)
    DefaultQueue is the redis key for the default queue.
Functions ¶
func AggregationSetKey ¶
AggregationSetKey returns a redis key used for an aggregation set.
func AllAggregationSets ¶
AllAggregationSets returns a redis key used to store all aggregation sets (set of tasks staged to be aggregated) in a given queue.
func ArchivedKey ¶
ArchivedKey returns a redis key for the archived tasks.
func CompletedKey ¶
func EncodeMessage ¶
func EncodeMessage(msg *TaskMessage) ([]byte, error)
EncodeMessage marshals the given task message and returns an encoded bytes.
func EncodeSchedulerEnqueueEvent ¶
func EncodeSchedulerEnqueueEvent(event *SchedulerEnqueueEvent) ([]byte, error)
EncodeSchedulerEnqueueEvent marshals the given event and returns an encoded bytes.
func EncodeSchedulerEntry ¶
func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error)
EncodeSchedulerEntry marshals the given entry and returns an encoded bytes.
func EncodeServerInfo ¶
func EncodeServerInfo(info *ServerInfo) ([]byte, error)
EncodeServerInfo marshals the given ServerInfo and returns the encoded bytes.
func EncodeWorkerInfo ¶
func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error)
EncodeWorkerInfo marshals the given WorkerInfo and returns the encoded bytes.
func FailedTotalKey ¶
FailedTotalKey returns a redis key for total failure count for the given queue.
func GroupKeyPrefix ¶
GroupKeyPrefix returns a prefix for group key.
func PendingKey ¶
PendingKey returns a redis key for the given queue name.
func ProcessedKey ¶
ProcessedKey returns a redis key for processed count for the given day for the queue.
func ProcessedTotalKey ¶
ProcessedTotalKey returns a redis key for total processed count for the given queue.
func QueueKeyPrefix ¶
QueueKeyPrefix returns a prefix for all keys in the given queue.
func ScheduledKey ¶
ScheduledKey returns a redis key for the scheduled tasks.
func SchedulerEntriesKey ¶
SchedulerEntriesKey returns a redis key for the scheduler entries given scheduler ID.
func SchedulerHistoryKey ¶
SchedulerHistoryKey returns a redis key for the scheduler's history for the given entry.
func ServerInfoKey ¶
ServerInfoKey returns a redis key for process info.
func TaskKeyPrefix ¶
TaskKeyPrefix returns a prefix for task key.
func ValidateQueueName ¶
ValidateQueueName validates a given qname to be used as a queue name. Returns nil if valid, otherwise returns non-nil error.
Types ¶
type Broker ¶
type Broker interface {
	Ping() error
	Close() error
	Enqueue(ctx context.Context, msg *TaskMessage) error
	EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error
	Dequeue(qnames ...string) (*TaskMessage, time.Time, error)
	Done(ctx context.Context, msg *TaskMessage) error
	MarkAsComplete(ctx context.Context, msg *TaskMessage) error
	Requeue(ctx context.Context, msg *TaskMessage) error
	Schedule(ctx context.Context, msg *TaskMessage, processAt time.Time) error
	ScheduleUnique(ctx context.Context, msg *TaskMessage, processAt time.Time, ttl time.Duration) error
	Retry(ctx context.Context, msg *TaskMessage, processAt time.Time, errMsg string, isFailure bool) error
	Archive(ctx context.Context, msg *TaskMessage, errMsg string) error
	ForwardIfReady(qnames ...string) error
	// Group aggregation related methods
	AddToGroup(ctx context.Context, msg *TaskMessage, gname string) error
	AddToGroupUnique(ctx context.Context, msg *TaskMessage, groupKey string, ttl time.Duration) error
	ListGroups(qname string) ([]string, error)
	AggregationCheck(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, maxSize int) (aggregationSetID string, err error)
	ReadAggregationSet(qname, gname, aggregationSetID string) ([]*TaskMessage, time.Time, error)
	DeleteAggregationSet(ctx context.Context, qname, gname, aggregationSetID string) error
	ReclaimStaleAggregationSets(qname string) error
	// Task retention related method
	DeleteExpiredCompletedTasks(qname string) error
	// Lease related methods
	ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error)
	ExtendLease(qname string, ids ...string) (time.Time, error)
	// State snapshot related methods
	WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error
	ClearServerState(host string, pid int, serverID string) error
	// Cancelation related methods
	CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers
	PublishCancelation(id string) error
	WriteResult(qname, id string, data []byte) (n int, err error)
}
    Broker is a message broker that supports operations to manage task queues.
See rdb.RDB as a reference implementation.
type Cancelations ¶
type Cancelations struct {
	// contains filtered or unexported fields
}
    Cancelations is a collection that holds cancel functions for all active tasks.
Cancelations are safe for concurrent use by multipel goroutines.
func NewCancelations ¶
func NewCancelations() *Cancelations
NewCancelations returns a Cancelations instance.
func (*Cancelations) Add ¶
func (c *Cancelations) Add(id string, fn context.CancelFunc)
Add adds a new cancel func to the collection.
func (*Cancelations) Delete ¶
func (c *Cancelations) Delete(id string)
Delete deletes a cancel func from the collection given an id.
func (*Cancelations) Get ¶
func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool)
Get returns a cancel func given an id.
type Lease ¶
Lease is a time bound lease for worker to process task. It provides a communication channel between lessor and lessee about lease expiration.
func (*Lease) Done ¶
func (l *Lease) Done() <-chan struct{}
Done returns a communication channel from which the lessee can read to get notified when lessor notifies about lease expiration.
func (*Lease) IsValid ¶
IsValid returns true if the lease's expieration time is in the future or equals to the current time, returns false otherwise.
func (*Lease) NotifyExpiration ¶
Sends a notification to lessee about expired lease Returns true if notification was sent, returns false if the lease is still valid and notification was not sent.
type SchedulerEnqueueEvent ¶
type SchedulerEnqueueEvent struct {
	// ID of the task that was enqueued.
	TaskID string
	// Time the task was enqueued.
	EnqueuedAt time.Time
}
    SchedulerEnqueueEvent holds information about an enqueue event by a scheduler.
func DecodeSchedulerEnqueueEvent ¶
func DecodeSchedulerEnqueueEvent(b []byte) (*SchedulerEnqueueEvent, error)
DecodeSchedulerEnqueueEvent unmarshals the given bytes and returns a decoded SchedulerEnqueueEvent.
type SchedulerEntry ¶
type SchedulerEntry struct {
	// Identifier of this entry.
	ID string
	// Spec describes the schedule of this entry.
	Spec string
	// Type is the task type of the periodic task.
	Type string
	// Payload is the payload of the periodic task.
	Payload []byte
	// Opts is the options for the periodic task.
	Opts []string
	// Next shows the next time the task will be enqueued.
	Next time.Time
	// Prev shows the last time the task was enqueued.
	// Zero time if task was never enqueued.
	Prev time.Time
}
    SchedulerEntry holds information about a periodic task registered with a scheduler.
func DecodeSchedulerEntry ¶
func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error)
DecodeSchedulerEntry unmarshals the given bytes and returns a decoded SchedulerEntry.
type ServerInfo ¶
type ServerInfo struct {
	Host              string
	PID               int
	ServerID          string
	Concurrency       int
	Queues            map[string]int
	StrictPriority    bool
	Status            string
	Started           time.Time
	ActiveWorkerCount int
}
    ServerInfo holds information about a running server.
func DecodeServerInfo ¶
func DecodeServerInfo(b []byte) (*ServerInfo, error)
DecodeServerInfo decodes the given bytes into ServerInfo.
type TaskInfo ¶
type TaskInfo struct {
	Message       *TaskMessage
	State         TaskState
	NextProcessAt time.Time
	Result        []byte
}
    TaskInfo describes a task message and its metadata.
type TaskMessage ¶
type TaskMessage struct {
	// Type indicates the kind of the task to be performed.
	Type string
	// Payload holds data needed to process the task.
	Payload []byte
	// ID is a unique identifier for each task.
	ID string
	// Queue is a name this message should be enqueued to.
	Queue string
	// Retry is the max number of retry for this task.
	Retry int
	// Retried is the number of times we've retried this task so far.
	Retried int
	// ErrorMsg holds the error message from the last failure.
	ErrorMsg string
	// Time of last failure in Unix time,
	// the number of seconds elapsed since January 1, 1970 UTC.
	//
	// Use zero to indicate no last failure
	LastFailedAt int64
	// Timeout specifies timeout in seconds.
	// If task processing doesn't complete within the timeout, the task will be retried
	// if retry count is remaining. Otherwise it will be moved to the archive.
	//
	// Use zero to indicate no timeout.
	Timeout int64
	// Deadline specifies the deadline for the task in Unix time,
	// the number of seconds elapsed since January 1, 1970 UTC.
	// If task processing doesn't complete before the deadline, the task will be retried
	// if retry count is remaining. Otherwise it will be moved to the archive.
	//
	// Use zero to indicate no deadline.
	Deadline int64
	// UniqueKey holds the redis key used for uniqueness lock for this task.
	//
	// Empty string indicates that no uniqueness lock was used.
	UniqueKey string
	// GroupKey holds the group key used for task aggregation.
	//
	// Empty string indicates no aggregation is used for this task.
	GroupKey string
	// Retention specifies the number of seconds the task should be retained after completion.
	Retention int64
	// CompletedAt is the time the task was processed successfully in Unix time,
	// the number of seconds elapsed since January 1, 1970 UTC.
	//
	// Use zero to indicate no value.
	CompletedAt int64
}
    TaskMessage is the internal representation of a task with additional metadata fields. Serialized data of this type gets written to redis.
func DecodeMessage ¶
func DecodeMessage(data []byte) (*TaskMessage, error)
DecodeMessage unmarshals the given bytes and returns a decoded task message.
type TaskState ¶
type TaskState int
TaskState denotes the state of a task.
func TaskStateFromString ¶
type WorkerInfo ¶
type WorkerInfo struct {
	Host     string
	PID      int
	ServerID string
	ID       string
	Type     string
	Payload  []byte
	Queue    string
	Started  time.Time
	Deadline time.Time
}
    WorkerInfo holds information about a running worker.
func DecodeWorkerInfo ¶
func DecodeWorkerInfo(b []byte) (*WorkerInfo, error)
DecodeWorkerInfo decodes the given bytes into WorkerInfo.