queue

package
v0.0.0-...-627253d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const MaxTaskRetries = 3 // Maximum number of times a task can be retried before failing

Variables

View Source
var (
	ErrNotExist           = fmt.Errorf("task does not exist")
	ErrNotRunning         = fmt.Errorf("task is not running")
	ErrTaskCanceled       = fmt.Errorf("task was canceled")
	ErrContextCanceled    = fmt.Errorf("dequeue context timed out or was canceled")
	ErrRowsNotAffected    = fmt.Errorf("no rows were affected")
	ErrMaxRetriesExceeded = fmt.Errorf("task has exceeded the maximum amount of retries")
	ErrNotCancellable     = fmt.Errorf("task not cancellable")
)

Functions

func NewPgxPool

func NewPgxPool(ctx context.Context, url string) (*pgxpool.Pool, error)

Types

type Connection

type Connection interface {
	Transaction
	Conn() *pgx.Conn
	Release()
}

type FakePgxPoolWrapper

type FakePgxPoolWrapper struct {
	// contains filtered or unexported fields
}

FakePgxPoolWrapper is used for testing, to provide a pool interface implementation that doesn't actually use the pool in order to wrap all work in a transaction. This shouldn't be used in production, as it is based off a single transaction

func (*FakePgxPoolWrapper) Acquire

func (*FakePgxPoolWrapper) Begin

func (p *FakePgxPoolWrapper) Begin(ctx context.Context) (pgx.Tx, error)

func (*FakePgxPoolWrapper) Close

func (p *FakePgxPoolWrapper) Close()

func (*FakePgxPoolWrapper) Conn

func (p *FakePgxPoolWrapper) Conn() *pgx.Conn

func (*FakePgxPoolWrapper) Exec

func (p *FakePgxPoolWrapper) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)

func (*FakePgxPoolWrapper) Query

func (p *FakePgxPoolWrapper) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)

func (*FakePgxPoolWrapper) QueryRow

func (p *FakePgxPoolWrapper) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row

func (*FakePgxPoolWrapper) Release

func (p *FakePgxPoolWrapper) Release()

type MockQueue

type MockQueue struct {
	mock.Mock
}

MockQueue is an autogenerated mock type for the Queue type

func NewMockQueue

func NewMockQueue(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockQueue

NewMockQueue creates a new instance of MockQueue. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockQueue) Cancel

func (_mock *MockQueue) Cancel(ctx context.Context, taskId uuid.UUID) error

Cancel provides a mock function for the type MockQueue

func (*MockQueue) Dequeue

func (_mock *MockQueue) Dequeue(ctx context.Context, taskTypes []string) (*models.TaskInfo, error)

Dequeue provides a mock function for the type MockQueue

func (*MockQueue) EXPECT

func (_m *MockQueue) EXPECT() *MockQueue_Expecter

func (*MockQueue) Enqueue

func (_mock *MockQueue) Enqueue(task *Task) (uuid.UUID, error)

Enqueue provides a mock function for the type MockQueue

func (*MockQueue) Finish

func (_mock *MockQueue) Finish(taskId uuid.UUID, taskError error) error

Finish provides a mock function for the type MockQueue

func (*MockQueue) Heartbeats

func (_mock *MockQueue) Heartbeats(olderThan time.Duration) []uuid.UUID

Heartbeats provides a mock function for the type MockQueue

func (*MockQueue) IdFromToken

func (_mock *MockQueue) IdFromToken(token uuid.UUID) (uuid.UUID, bool, error)

IdFromToken provides a mock function for the type MockQueue

func (*MockQueue) ListenForCanceledTask

func (_mock *MockQueue) ListenForCanceledTask(ctx context.Context) (uuid.UUID, error)

ListenForCanceledTask provides a mock function for the type MockQueue

func (*MockQueue) RefreshHeartbeat

func (_mock *MockQueue) RefreshHeartbeat(token uuid.UUID) error

RefreshHeartbeat provides a mock function for the type MockQueue

func (*MockQueue) Requeue

func (_mock *MockQueue) Requeue(taskId uuid.UUID) error

Requeue provides a mock function for the type MockQueue

func (*MockQueue) RequeueFailedTasks

func (_mock *MockQueue) RequeueFailedTasks(taskTypes []string) error

RequeueFailedTasks provides a mock function for the type MockQueue

func (*MockQueue) Status

func (_mock *MockQueue) Status(taskId uuid.UUID) (*models.TaskInfo, error)

Status provides a mock function for the type MockQueue

func (*MockQueue) UpdatePayload

func (_mock *MockQueue) UpdatePayload(task *models.TaskInfo, payload interface{}) (*models.TaskInfo, error)

UpdatePayload provides a mock function for the type MockQueue

type MockQueue_Cancel_Call

type MockQueue_Cancel_Call struct {
	*mock.Call
}

MockQueue_Cancel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Cancel'

func (*MockQueue_Cancel_Call) Return

func (*MockQueue_Cancel_Call) Run

func (_c *MockQueue_Cancel_Call) Run(run func(ctx context.Context, taskId uuid.UUID)) *MockQueue_Cancel_Call

func (*MockQueue_Cancel_Call) RunAndReturn

func (_c *MockQueue_Cancel_Call) RunAndReturn(run func(ctx context.Context, taskId uuid.UUID) error) *MockQueue_Cancel_Call

type MockQueue_Dequeue_Call

type MockQueue_Dequeue_Call struct {
	*mock.Call
}

MockQueue_Dequeue_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Dequeue'

func (*MockQueue_Dequeue_Call) Return

func (*MockQueue_Dequeue_Call) Run

func (_c *MockQueue_Dequeue_Call) Run(run func(ctx context.Context, taskTypes []string)) *MockQueue_Dequeue_Call

func (*MockQueue_Dequeue_Call) RunAndReturn

func (_c *MockQueue_Dequeue_Call) RunAndReturn(run func(ctx context.Context, taskTypes []string) (*models.TaskInfo, error)) *MockQueue_Dequeue_Call

type MockQueue_Enqueue_Call

type MockQueue_Enqueue_Call struct {
	*mock.Call
}

MockQueue_Enqueue_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Enqueue'

func (*MockQueue_Enqueue_Call) Return

func (*MockQueue_Enqueue_Call) Run

func (_c *MockQueue_Enqueue_Call) Run(run func(task *Task)) *MockQueue_Enqueue_Call

func (*MockQueue_Enqueue_Call) RunAndReturn

func (_c *MockQueue_Enqueue_Call) RunAndReturn(run func(task *Task) (uuid.UUID, error)) *MockQueue_Enqueue_Call

type MockQueue_Expecter

type MockQueue_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockQueue_Expecter) Cancel

func (_e *MockQueue_Expecter) Cancel(ctx interface{}, taskId interface{}) *MockQueue_Cancel_Call

Cancel is a helper method to define mock.On call

  • ctx context.Context
  • taskId uuid.UUID

func (*MockQueue_Expecter) Dequeue

func (_e *MockQueue_Expecter) Dequeue(ctx interface{}, taskTypes interface{}) *MockQueue_Dequeue_Call

Dequeue is a helper method to define mock.On call

  • ctx context.Context
  • taskTypes []string

func (*MockQueue_Expecter) Enqueue

func (_e *MockQueue_Expecter) Enqueue(task interface{}) *MockQueue_Enqueue_Call

Enqueue is a helper method to define mock.On call

  • task *Task

func (*MockQueue_Expecter) Finish

func (_e *MockQueue_Expecter) Finish(taskId interface{}, taskError interface{}) *MockQueue_Finish_Call

Finish is a helper method to define mock.On call

  • taskId uuid.UUID
  • taskError error

func (*MockQueue_Expecter) Heartbeats

func (_e *MockQueue_Expecter) Heartbeats(olderThan interface{}) *MockQueue_Heartbeats_Call

Heartbeats is a helper method to define mock.On call

  • olderThan time.Duration

func (*MockQueue_Expecter) IdFromToken

func (_e *MockQueue_Expecter) IdFromToken(token interface{}) *MockQueue_IdFromToken_Call

IdFromToken is a helper method to define mock.On call

  • token uuid.UUID

func (*MockQueue_Expecter) ListenForCanceledTask

func (_e *MockQueue_Expecter) ListenForCanceledTask(ctx interface{}) *MockQueue_ListenForCanceledTask_Call

ListenForCanceledTask is a helper method to define mock.On call

  • ctx context.Context

func (*MockQueue_Expecter) RefreshHeartbeat

func (_e *MockQueue_Expecter) RefreshHeartbeat(token interface{}) *MockQueue_RefreshHeartbeat_Call

RefreshHeartbeat is a helper method to define mock.On call

  • token uuid.UUID

func (*MockQueue_Expecter) Requeue

func (_e *MockQueue_Expecter) Requeue(taskId interface{}) *MockQueue_Requeue_Call

Requeue is a helper method to define mock.On call

  • taskId uuid.UUID

func (*MockQueue_Expecter) RequeueFailedTasks

func (_e *MockQueue_Expecter) RequeueFailedTasks(taskTypes interface{}) *MockQueue_RequeueFailedTasks_Call

RequeueFailedTasks is a helper method to define mock.On call

  • taskTypes []string

func (*MockQueue_Expecter) Status

func (_e *MockQueue_Expecter) Status(taskId interface{}) *MockQueue_Status_Call

Status is a helper method to define mock.On call

  • taskId uuid.UUID

func (*MockQueue_Expecter) UpdatePayload

func (_e *MockQueue_Expecter) UpdatePayload(task interface{}, payload interface{}) *MockQueue_UpdatePayload_Call

UpdatePayload is a helper method to define mock.On call

  • task *models.TaskInfo
  • payload interface{}

type MockQueue_Finish_Call

type MockQueue_Finish_Call struct {
	*mock.Call
}

MockQueue_Finish_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Finish'

func (*MockQueue_Finish_Call) Return

func (*MockQueue_Finish_Call) Run

func (_c *MockQueue_Finish_Call) Run(run func(taskId uuid.UUID, taskError error)) *MockQueue_Finish_Call

func (*MockQueue_Finish_Call) RunAndReturn

func (_c *MockQueue_Finish_Call) RunAndReturn(run func(taskId uuid.UUID, taskError error) error) *MockQueue_Finish_Call

type MockQueue_Heartbeats_Call

type MockQueue_Heartbeats_Call struct {
	*mock.Call
}

MockQueue_Heartbeats_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Heartbeats'

func (*MockQueue_Heartbeats_Call) Return

func (*MockQueue_Heartbeats_Call) Run

func (_c *MockQueue_Heartbeats_Call) Run(run func(olderThan time.Duration)) *MockQueue_Heartbeats_Call

func (*MockQueue_Heartbeats_Call) RunAndReturn

func (_c *MockQueue_Heartbeats_Call) RunAndReturn(run func(olderThan time.Duration) []uuid.UUID) *MockQueue_Heartbeats_Call

type MockQueue_IdFromToken_Call

type MockQueue_IdFromToken_Call struct {
	*mock.Call
}

MockQueue_IdFromToken_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IdFromToken'

func (*MockQueue_IdFromToken_Call) Return

func (_c *MockQueue_IdFromToken_Call) Return(id uuid.UUID, isRunning bool, err error) *MockQueue_IdFromToken_Call

func (*MockQueue_IdFromToken_Call) Run

func (*MockQueue_IdFromToken_Call) RunAndReturn

func (_c *MockQueue_IdFromToken_Call) RunAndReturn(run func(token uuid.UUID) (uuid.UUID, bool, error)) *MockQueue_IdFromToken_Call

type MockQueue_ListenForCanceledTask_Call

type MockQueue_ListenForCanceledTask_Call struct {
	*mock.Call
}

MockQueue_ListenForCanceledTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListenForCanceledTask'

func (*MockQueue_ListenForCanceledTask_Call) Return

func (*MockQueue_ListenForCanceledTask_Call) Run

func (*MockQueue_ListenForCanceledTask_Call) RunAndReturn

type MockQueue_RefreshHeartbeat_Call

type MockQueue_RefreshHeartbeat_Call struct {
	*mock.Call
}

MockQueue_RefreshHeartbeat_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RefreshHeartbeat'

func (*MockQueue_RefreshHeartbeat_Call) Return

func (*MockQueue_RefreshHeartbeat_Call) Run

func (*MockQueue_RefreshHeartbeat_Call) RunAndReturn

type MockQueue_RequeueFailedTasks_Call

type MockQueue_RequeueFailedTasks_Call struct {
	*mock.Call
}

MockQueue_RequeueFailedTasks_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RequeueFailedTasks'

func (*MockQueue_RequeueFailedTasks_Call) Return

func (*MockQueue_RequeueFailedTasks_Call) Run

func (*MockQueue_RequeueFailedTasks_Call) RunAndReturn

func (_c *MockQueue_RequeueFailedTasks_Call) RunAndReturn(run func(taskTypes []string) error) *MockQueue_RequeueFailedTasks_Call

type MockQueue_Requeue_Call

type MockQueue_Requeue_Call struct {
	*mock.Call
}

MockQueue_Requeue_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Requeue'

func (*MockQueue_Requeue_Call) Return

func (*MockQueue_Requeue_Call) Run

func (_c *MockQueue_Requeue_Call) Run(run func(taskId uuid.UUID)) *MockQueue_Requeue_Call

func (*MockQueue_Requeue_Call) RunAndReturn

func (_c *MockQueue_Requeue_Call) RunAndReturn(run func(taskId uuid.UUID) error) *MockQueue_Requeue_Call

type MockQueue_Status_Call

type MockQueue_Status_Call struct {
	*mock.Call
}

MockQueue_Status_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Status'

func (*MockQueue_Status_Call) Return

func (_c *MockQueue_Status_Call) Return(taskInfo *models.TaskInfo, err error) *MockQueue_Status_Call

func (*MockQueue_Status_Call) Run

func (_c *MockQueue_Status_Call) Run(run func(taskId uuid.UUID)) *MockQueue_Status_Call

func (*MockQueue_Status_Call) RunAndReturn

func (_c *MockQueue_Status_Call) RunAndReturn(run func(taskId uuid.UUID) (*models.TaskInfo, error)) *MockQueue_Status_Call

type MockQueue_UpdatePayload_Call

type MockQueue_UpdatePayload_Call struct {
	*mock.Call
}

MockQueue_UpdatePayload_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdatePayload'

func (*MockQueue_UpdatePayload_Call) Return

func (*MockQueue_UpdatePayload_Call) Run

func (_c *MockQueue_UpdatePayload_Call) Run(run func(task *models.TaskInfo, payload interface{})) *MockQueue_UpdatePayload_Call

func (*MockQueue_UpdatePayload_Call) RunAndReturn

func (_c *MockQueue_UpdatePayload_Call) RunAndReturn(run func(task *models.TaskInfo, payload interface{}) (*models.TaskInfo, error)) *MockQueue_UpdatePayload_Call

type PgListener

type PgListener struct {
	// contains filtered or unexported fields
}

PgListener helper that manages a persistent LISTEN connection in postgres A persistent connection avoids missing notifications between LISTEN and UNLISTEN calls One instance of PgListener should only be used and closed by one goroutine

func NewPgListener

func NewPgListener(pool Pool, channelName string) *PgListener

func (*PgListener) Close

func (p *PgListener) Close(ctx context.Context) error

Close unlisten from the channel and release the persistent connection

func (*PgListener) WaitForNotification

func (p *PgListener) WaitForNotification(ctx context.Context) (string, error)

WaitForNotification create a connection to listen on the channel if a connection has been created, skip straight to waiting for a notification

type PgQueue

type PgQueue struct {
	Pool Pool
	// contains filtered or unexported fields
}

PgQueue a task queue backed by postgres, using pgxpool.Pool using a wrapper (PgxPoolWrapper) that implements a Pool interface

func NewPgQueue

func NewPgQueue(ctx context.Context, url string) (PgQueue, error)

func (*PgQueue) Cancel

func (p *PgQueue) Cancel(ctx context.Context, taskId uuid.UUID) error

func (*PgQueue) Close

func (p *PgQueue) Close()

func (*PgQueue) Dequeue

func (p *PgQueue) Dequeue(ctx context.Context, taskTypes []string) (*models.TaskInfo, error)

func (*PgQueue) Enqueue

func (p *PgQueue) Enqueue(task *Task) (uuid.UUID, error)

func (*PgQueue) Finish

func (p *PgQueue) Finish(taskId uuid.UUID, taskError error) error

func (*PgQueue) Heartbeats

func (p *PgQueue) Heartbeats(olderThan time.Duration) []uuid.UUID

func (*PgQueue) IdFromToken

func (p *PgQueue) IdFromToken(token uuid.UUID) (id uuid.UUID, isRunning bool, err error)

func (*PgQueue) ListenForCanceledTask

func (p *PgQueue) ListenForCanceledTask(ctx context.Context) (uuid.UUID, error)

func (*PgQueue) RefreshHeartbeat

func (p *PgQueue) RefreshHeartbeat(token uuid.UUID) error

Reset the last heartbeat time to time.Now()

func (*PgQueue) RemoveAllTasks

func (p *PgQueue) RemoveAllTasks() error

RemoveAllTasks used for tests, along with testTx, to clear tables before running tests

func (*PgQueue) Requeue

func (p *PgQueue) Requeue(taskId uuid.UUID) error

func (*PgQueue) RequeueFailedTasks

func (p *PgQueue) RequeueFailedTasks(taskTypes []string) error

func (*PgQueue) Status

func (p *PgQueue) Status(taskID uuid.UUID) (*models.TaskInfo, error)

func (*PgQueue) UpdatePayload

func (p *PgQueue) UpdatePayload(task *models.TaskInfo, payload interface{}) (*models.TaskInfo, error)

type PgxConnWrapper

type PgxConnWrapper struct {
	// contains filtered or unexported fields
}

PgxConnWrapper wraps a pgxpool Conn in a generic interface to allow for alternative implementations, such as the FakePgxPoolWrapper

func (*PgxConnWrapper) Begin

func (p *PgxConnWrapper) Begin(ctx context.Context) (pgx.Tx, error)

func (*PgxConnWrapper) Conn

func (p *PgxConnWrapper) Conn() *pgx.Conn

func (*PgxConnWrapper) Exec

func (p *PgxConnWrapper) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)

func (*PgxConnWrapper) Query

func (p *PgxConnWrapper) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)

func (*PgxConnWrapper) QueryRow

func (p *PgxConnWrapper) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row

func (*PgxConnWrapper) Release

func (p *PgxConnWrapper) Release()

type PgxPoolWrapper

type PgxPoolWrapper struct {
	// contains filtered or unexported fields
}

PgxPoolWrapper wraps a pgx Pool in a generic interface to allow for alternative implementations, such as the FakePgxPoolWrapper

func (*PgxPoolWrapper) Acquire

func (p *PgxPoolWrapper) Acquire(ctx context.Context) (Connection, error)

func (*PgxPoolWrapper) Begin

func (p *PgxPoolWrapper) Begin(ctx context.Context) (pgx.Tx, error)

func (*PgxPoolWrapper) Close

func (p *PgxPoolWrapper) Close()

func (*PgxPoolWrapper) Exec

func (p *PgxPoolWrapper) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)

func (*PgxPoolWrapper) Query

func (p *PgxPoolWrapper) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)

func (*PgxPoolWrapper) QueryRow

func (p *PgxPoolWrapper) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row

type Pool

type Pool interface {
	Transaction
	Acquire(ctx context.Context) (Connection, error)
	Close()
}

Pool matches the pgxpool.Pool struct

type Queue

type Queue interface {
	// Enqueue Enqueues a job
	Enqueue(task *Task) (uuid.UUID, error)
	// Dequeue Dequeues a job of a type in taskTypes, blocking until one is available.
	Dequeue(ctx context.Context, taskTypes []string) (*models.TaskInfo, error)
	// Status returns Status of the given task
	Status(taskId uuid.UUID) (*models.TaskInfo, error)
	// Finish finishes given task, setting status to completed or failed if taskError is not nil
	Finish(taskId uuid.UUID, taskError error) error
	// Requeue requeues the given task
	Requeue(taskId uuid.UUID) error
	// Heartbeats returns the tokens of all tasks older than given duration
	Heartbeats(olderThan time.Duration) []uuid.UUID
	// IdFromToken returns a task's ID given its token
	IdFromToken(token uuid.UUID) (id uuid.UUID, isRunning bool, err error)
	// RefreshHeartbeat refresh heartbeat of task given its token
	RefreshHeartbeat(token uuid.UUID) error
	// UpdatePayload update the payload on a task
	UpdatePayload(task *models.TaskInfo, payload interface{}) (*models.TaskInfo, error)
	// Cancel sends notification to cancel given task and sets task state to canceled
	Cancel(ctx context.Context, taskId uuid.UUID) error
	// RequeueFailedTasks requeues all failed tasks of taskTypes to the queue
	RequeueFailedTasks(taskTypes []string) error
	// ListenForCanceledTask listens on the cancel_tasks channel for a notification to cancel the returned task
	ListenForCanceledTask(ctx context.Context) (taskID uuid.UUID, err error)
}

type Task

type Task struct {
	Typename     string
	Payload      interface{}
	Dependencies []uuid.UUID
	OrgId        string
	AccountId    string
	ObjectUUID   *string
	ObjectType   *string
	RequestID    string
	Priority     int
}

type Transaction

type Transaction interface {
	Begin(ctx context.Context) (pgx.Tx, error)
	Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
	Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
	QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
}

Transaction mimics the pgx.Tx struct

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL