dbjobqueue

package
v0.0.0-...-30ddced Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 26, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package dbjobqueue implements the interfaces in package jobqueue backed by a PostreSQL database.

Data is stored non-reduntantly. Any data structure necessary for efficient access (e.g., dependants) are kept in memory.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Logger is used for all logging of the queue, when not provided, the stanard
	// global logger (logrus) is used.
	Logger jobqueue.SimpleLogger
}

Config allows more detailed customization of queue behavior

type DBJobQueue

type DBJobQueue struct {
	// contains filtered or unexported fields
}

func New

func New(url string) (*DBJobQueue, error)

New creates a new DBJobQueue object for `url` with default configuration.

func NewWithConfig

func NewWithConfig(url string, config Config) (*DBJobQueue, error)

NewWithLogger creates a new DBJobQueue object for `url` with specific configuration.

func (*DBJobQueue) AllRootJobIDs

func (q *DBJobQueue) AllRootJobIDs(ctx context.Context) (rootJobs []uuid.UUID, err error)

AllRootJobIDs returns a list of top level job UUIDs that the worker knows about

func (*DBJobQueue) CancelJob

func (q *DBJobQueue) CancelJob(id uuid.UUID) error

func (*DBJobQueue) Close

func (q *DBJobQueue) Close()

func (*DBJobQueue) DeleteJob

func (q *DBJobQueue) DeleteJob(ctx context.Context, id uuid.UUID) error

DeleteJob deletes a job and all of its dependencies from the database If a dependency has multiple dependents it will only remove the parent job from the dependents list for that job instead of removing it.

This assumes that the jobs have been created correctly, and that they have no dependency loops. Shared Dependents are ok, but a job cannot have a dependency on any of its parents (this should never happen).

func (*DBJobQueue) DeleteWorker

func (q *DBJobQueue) DeleteWorker(workerID uuid.UUID) error

func (*DBJobQueue) Dequeue

func (q *DBJobQueue) Dequeue(ctx context.Context, workerID uuid.UUID, jobTypes, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error)

func (*DBJobQueue) DequeueByID

func (q *DBJobQueue) DequeueByID(ctx context.Context, id, workerID uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error)

func (*DBJobQueue) Enqueue

func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error)

func (*DBJobQueue) FailJob

func (q *DBJobQueue) FailJob(id uuid.UUID, result interface{}) error

func (*DBJobQueue) Heartbeats

func (q *DBJobQueue) Heartbeats(olderThan time.Duration) (tokens []uuid.UUID)

Get a list of tokens which haven't been updated in the specified time frame

func (*DBJobQueue) IdFromToken

func (q *DBJobQueue) IdFromToken(token uuid.UUID) (id uuid.UUID, err error)

Find job by token, this will return an error if the job hasn't been dequeued

func (*DBJobQueue) InsertWorker

func (q *DBJobQueue) InsertWorker(channel, arch string) (uuid.UUID, error)

func (*DBJobQueue) Job

func (q *DBJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, channel string, err error)

Job returns all the parameters that define a job (everything provided during Enqueue).

func (*DBJobQueue) JobStatus

func (q *DBJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, dependents []uuid.UUID, err error)

func (*DBJobQueue) RefreshHeartbeat

func (q *DBJobQueue) RefreshHeartbeat(token uuid.UUID)

Reset the last heartbeat time to time.Now()

func (*DBJobQueue) RequeueOrFinishJob

func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) (bool, error)

func (*DBJobQueue) UpdateJobResult

func (q *DBJobQueue) UpdateJobResult(id uuid.UUID, result interface{}) error

func (*DBJobQueue) UpdateWorkerStatus

func (q *DBJobQueue) UpdateWorkerStatus(workerID uuid.UUID) error

func (*DBJobQueue) Workers

func (q *DBJobQueue) Workers(olderThan time.Duration) ([]jobqueue.Worker, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL