Documentation
¶
Index ¶
- func AddJobIndexes(catalog *coal.Catalog, removeAfter time.Duration)
- func Backoff(min, max time.Duration, factor float64, attempt int) time.Duration
- func Cancel(ctx context.Context, store *coal.Store, id coal.ID, reason string) error
- func Complete(ctx context.Context, store *coal.Store, id coal.ID, result coal.Map) error
- func Fail(ctx context.Context, store *coal.Store, id coal.ID, reason string, ...) error
- type Blueprint
- type Context
- type Error
- type Job
- type Model
- type Queue
- func (q *Queue) Action(methods []string, cb func(ctx *fire.Context) Blueprint) *fire.Action
- func (q *Queue) Add(task *Task)
- func (q *Queue) Callback(matcher fire.Matcher, cb func(ctx *fire.Context) Blueprint) *fire.Callback
- func (q *Queue) Close()
- func (q *Queue) Enqueue(bp Blueprint) (*Job, error)
- func (q *Queue) Run()
- type Status
- type Task
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AddJobIndexes ¶
AddJobIndexes will add job indexes to the specified catalog. If a duration is specified, completed and cancelled jobs are automatically removed when their finished timestamp falls behind the specified duration.
func Backoff ¶ added in v0.21.0
Backoff is the simple backoff algorithm to calculate the delay of job.
func Cancel ¶ added in v0.21.0
Cancel will cancel the job with the specified id and the specified reason. Only jobs in the "dequeued" state can be cancelled.
Types ¶
type Blueprint ¶ added in v0.22.1
type Blueprint struct {
// The task name.
Name string
// The job label. If given, the job will only be enqueued if no other job is
// available with the same name and label.
Label string
// The job model. If given, data is overridden with the marshaled model.
Model Model
// The job data.
Data coal.Map
// The initial delay. If specified the job will not be dequeued until the
// specified time has passed.
Delay time.Duration
// The job period. If given, and a label is present, the job will only
// enqueued if no job has been finished in the specified duration.
Period time.Duration
}
Blueprint describes a job to enqueued.
type Context ¶ added in v0.20.0
type Context struct {
// The context that is cancelled when the task timeout has been reached.
//
// Values: opentracing.Span, *cinder.Trace
context.Context
// The model carried by the job.
Model Model
// The custom result of the job.
Result coal.Map
// The task that processes this job.
//
// Usage: Read Only
Task *Task
// The queue this job was dequeued from.
//
// Usage: Read Only
Queue *Queue
// The current trace.
//
// Usage: Read Only
Trace *cinder.Trace
}
Context holds and stores contextual data.
type Job ¶
type Job struct {
coal.Base `json:"-" bson:",inline" coal:"jobs"`
// The name of the job.
Name string `json:"name"`
// The custom job label.
Label string `json:"label"`
// The data that has been supplied on creation.
Data coal.Map `json:"data"`
// The current status of the job.
Status Status `json:"status"`
// The time when the job was created.
Created time.Time `json:"created-at" bson:"created_at"`
// The time when the job is available for execution.
Available time.Time `json:"available-at" bson:"available_at"`
// The time when the job was dequeue the last time.
Started *time.Time `json:"started-at" bson:"started_at"`
// The time when the last attempt ended (completed, failed or cancelled).
Ended *time.Time `json:"ended-at" bson:"ended_at"`
// The time when the job was finished (completed or cancelled).
Finished *time.Time `json:"finished-at" bson:"finished_at"`
// Attempts is incremented with each execution attempt.
Attempts int `json:"attempts"`
// The result submitted during completion.
Result coal.Map `json:"result"`
// The last message submitted when the job was failed or cancelled.
Reason string `json:"reason"`
}
Job is a single job managed by a queue.
func Dequeue ¶ added in v0.21.0
func Dequeue(ctx context.Context, store *coal.Store, id coal.ID, timeout time.Duration) (*Job, error)
Dequeue will dequeue the job with the specified id. The provided timeout will be set to allow the job to be dequeued if the process failed to set its status. Only jobs in the "enqueued", "dequeued" (passed timeout) or "failed" state are dequeued.
type Queue ¶
type Queue struct {
// MaxLag defines the maximum amount of lag that should be applied to every
// dequeue attempt.
//
// By default multiple processes compete with each other when getting jobs
// from the same queue. An artificial lag prevents multiple simultaneous
// dequeue attempts and allows the worker with the smallest lag to dequeue
// the job and inform the other processes to prevent another dequeue attempt.
//
// Default: 100ms.
MaxLag time.Duration
// DequeueInterval defines the time after a worker might try to dequeue a job
// again that has not yet been associated.
//
// Default: 2s.
DequeueInterval time.Duration
// contains filtered or unexported fields
}
Queue manages the queueing of jobs.
func (*Queue) Action ¶ added in v0.23.0
Action is a factory to create an action that can be used to enqueue jobs.
func (*Queue) Callback ¶
Callback is a factory to create callbacks that can be used to enqueue jobs during request processing.
type Status ¶
type Status string
Status defines the allowed statuses of a job.
const ( // StatusEnqueued is used as the initial status when jobs are created. StatusEnqueued Status = "enqueued" // StatusDequeued is set when a job has been successfully dequeued. StatusDequeued Status = "dequeued" // StatusCompleted is set when a job has been successfully executed. StatusCompleted Status = "completed" // StatusFailed is set when an execution of a job failed. StatusFailed Status = "failed" // StatusCancelled is set when a job has been cancelled. StatusCancelled Status = "cancelled" )
The available job statuses.
type Task ¶
type Task struct {
// Name is the unique name of the task.
Name string
// Model is the model that holds task related data.
Model Model
// Handler is the callback called with jobs for processing. The handler
// should return errors formatted with E to properly indicate the status of
// the job. If a task execution is successful the handler may return some
// data that is attached to the job.
Handler func(*Context) error
// Workers defines the number for spawned workers that dequeue and execute
// jobs in parallel.
//
// Default: 2.
Workers int
// MaxAttempts defines the maximum attempts to complete a task. Zero means
// that the jobs is retried forever. The error retry field will take
// precedence to this setting and allow retry beyond the configured maximum.
//
// Default: 0
MaxAttempts int
// Interval defines the rate at which the worker will request a job from the
// queue.
//
// Default: 100ms.
Interval time.Duration
// MinDelay is the minimal time after a failed task is retried.
//
// Default: 1s.
MinDelay time.Duration
// MaxDelay is the maximal time after a failed task is retried.
//
// Default: 10m.
MaxDelay time.Duration
// DelayFactor defines the exponential increase of the delay after individual
// attempts.
//
// Default: 2.
DelayFactor float64
// Timeout is the time after which a task can be dequeued again in case the
// worker was not able to set its status.
//
// Default: 10m.
Timeout time.Duration
// Lifetime is the time after which the context of a job is cancelled and
// the execution should be stopped. Should be several minutes less than
// timeout to prevent race conditions.
//
// Default: 5m.
Lifetime time.Duration
// Periodically may be set to let the system enqueue a job automatically
// every given interval.
//
// Default: 0.
Periodically time.Duration
// PeriodicJob is the blueprint of the job that is periodically enqueued.
//
// Default: Blueprint{Name: Task.Name}.
PeriodicJob Blueprint
}
Task describes work that is managed using a job queue.