Documentation
¶
Overview ¶
Package dbjobqueue implements the interfaces in package jobqueue backed by a PostreSQL database.
Data is stored non-reduntantly. Any data structure necessary for efficient access (e.g., dependants) are kept in memory.
Index ¶
- type Config
- type DBJobQueue
- func (q *DBJobQueue) AllRootJobIDs(ctx context.Context) (rootJobs []uuid.UUID, err error)
- func (q *DBJobQueue) CancelJob(id uuid.UUID) error
- func (q *DBJobQueue) Close()
- func (q *DBJobQueue) DeleteJob(ctx context.Context, id uuid.UUID) error
- func (q *DBJobQueue) DeleteWorker(workerID uuid.UUID) error
- func (q *DBJobQueue) Dequeue(ctx context.Context, workerID uuid.UUID, jobTypes, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error)
- func (q *DBJobQueue) DequeueByID(ctx context.Context, id, workerID uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error)
- func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error)
- func (q *DBJobQueue) FailJob(id uuid.UUID, result interface{}) error
- func (q *DBJobQueue) Heartbeats(olderThan time.Duration) (tokens []uuid.UUID)
- func (q *DBJobQueue) IdFromToken(token uuid.UUID) (id uuid.UUID, err error)
- func (q *DBJobQueue) InsertWorker(channel, arch string) (uuid.UUID, error)
- func (q *DBJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, channel string, ...)
- func (q *DBJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, ...)
- func (q *DBJobQueue) RefreshHeartbeat(token uuid.UUID)
- func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) (bool, error)
- func (q *DBJobQueue) UpdateJobResult(id uuid.UUID, result interface{}) error
- func (q *DBJobQueue) UpdateWorkerStatus(workerID uuid.UUID) error
- func (q *DBJobQueue) Workers(olderThan time.Duration) ([]jobqueue.Worker, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// Logger is used for all logging of the queue, when not provided, the stanard
// global logger (logrus) is used.
Logger jobqueue.SimpleLogger
}
Config allows more detailed customization of queue behavior
type DBJobQueue ¶
type DBJobQueue struct {
// contains filtered or unexported fields
}
func New ¶
func New(url string) (*DBJobQueue, error)
New creates a new DBJobQueue object for `url` with default configuration.
func NewWithConfig ¶
func NewWithConfig(url string, config Config) (*DBJobQueue, error)
NewWithLogger creates a new DBJobQueue object for `url` with specific configuration.
func (*DBJobQueue) AllRootJobIDs ¶
AllRootJobIDs returns a list of top level job UUIDs that the worker knows about
func (*DBJobQueue) Close ¶
func (q *DBJobQueue) Close()
func (*DBJobQueue) DeleteJob ¶
DeleteJob deletes a job and all of its dependencies from the database If a dependency has multiple dependents it will only remove the parent job from the dependents list for that job instead of removing it.
This assumes that the jobs have been created correctly, and that they have no dependency loops. Shared Dependents are ok, but a job cannot have a dependency on any of its parents (this should never happen).
func (*DBJobQueue) DeleteWorker ¶
func (q *DBJobQueue) DeleteWorker(workerID uuid.UUID) error
func (*DBJobQueue) DequeueByID ¶
func (*DBJobQueue) Heartbeats ¶
func (q *DBJobQueue) Heartbeats(olderThan time.Duration) (tokens []uuid.UUID)
Get a list of tokens which haven't been updated in the specified time frame
func (*DBJobQueue) IdFromToken ¶
Find job by token, this will return an error if the job hasn't been dequeued
func (*DBJobQueue) InsertWorker ¶
func (q *DBJobQueue) InsertWorker(channel, arch string) (uuid.UUID, error)
func (*DBJobQueue) Job ¶
func (q *DBJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, channel string, err error)
Job returns all the parameters that define a job (everything provided during Enqueue).
func (*DBJobQueue) RefreshHeartbeat ¶
func (q *DBJobQueue) RefreshHeartbeat(token uuid.UUID)
Reset the last heartbeat time to time.Now()
func (*DBJobQueue) RequeueOrFinishJob ¶
func (*DBJobQueue) UpdateJobResult ¶
func (q *DBJobQueue) UpdateJobResult(id uuid.UUID, result interface{}) error
func (*DBJobQueue) UpdateWorkerStatus ¶
func (q *DBJobQueue) UpdateWorkerStatus(workerID uuid.UUID) error