queues

package
v0.20.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2025 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Fair

type Fair struct {
	// contains filtered or unexported fields
}

Fair implements a fair queue where tasks are distributed evenly across owners.

A queue with base key "foo" and owners "owner1" and "owner2" will have the following keys:

  • {foo}:queued - set of owners scored by number of queued tasks
  • {foo}:active - set of owners scored by number of active tasks
  • {foo}:paused - set of paused owners
  • {foo}:temp - used internally
  • {foo}:o:owner1/0 - e.g. list of tasks for owner1 with priority 0 (low)
  • {foo}:o:owner1/1 - e.g. list of tasks for owner1 with priority 1 (high)
  • {foo}:o:owner2/0 - e.g. list of tasks for owner2 with priority 0 (low)
  • {foo}:o:owner2/1 - e.g. list of tasks for owner2 with priority 1 (high)

Note: it would be nice if owner queues could use distict hash tags and so live on different nodes in a cluster, but our push and pop scripts require atomic changes to the queued/active sets and the task lists.

func NewFair

func NewFair(keyBase string, maxActivePerOwner int) *Fair

NewFair creates a new fair queue with the given key base.

func (*Fair) Done

func (q *Fair) Done(ctx context.Context, vc valkey.Conn, owner OwnerID) error

Done marks the passed in task as complete. Callers must call this in order to maintain fair workers across orgs

func (*Fair) Dump added in v0.20.0

func (q *Fair) Dump(ctx context.Context, vc valkey.Conn) ([]byte, error)

func (*Fair) Pause

func (q *Fair) Pause(ctx context.Context, vc valkey.Conn, owner OwnerID) error

Pause marks the given owner as paused, disabling processing of their tasks

func (*Fair) Paused

func (q *Fair) Paused(ctx context.Context, vc valkey.Conn) ([]OwnerID, error)

Paused returns the list of owners marked as paused

func (*Fair) Pop

func (q *Fair) Pop(ctx context.Context, vc valkey.Conn) (TaskID, OwnerID, []byte, error)

Pop pops the next task off our queue

func (*Fair) Push

func (q *Fair) Push(ctx context.Context, vc valkey.Conn, owner OwnerID, priority bool, task []byte) (TaskID, error)

Push adds the passed in task to our queue for execution

func (*Fair) Queued

func (q *Fair) Queued(ctx context.Context, vc valkey.Conn) ([]OwnerID, error)

Queued returns the list of owners with queued tasks

func (*Fair) Resume

func (q *Fair) Resume(ctx context.Context, vc valkey.Conn, owner OwnerID) error

Resume unmarks the given owner as paused, re-enabling processing of their tasks

func (*Fair) Size

func (q *Fair) Size(ctx context.Context, vc valkey.Conn, owner OwnerID) (int, error)

Size returns the number of queued tasks for the given owner

type OwnerID added in v0.19.0

type OwnerID string

OwnerID is the identifier for an owner of tasks in the queue.

type TaskID added in v0.19.0

type TaskID string

TaskID is the unique identifier for a task in the queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL