Documentation
ΒΆ
Index ΒΆ
- type Queuer
- func (q *Queuer) AddJob(task interface{}, parameters ...interface{}) (*model.Job, error)
- func (q *Queuer) AddJobTx(tx *sql.Tx, task interface{}, parameters ...interface{}) (*model.Job, error)
- func (q *Queuer) AddJobWithOptions(options *model.Options, task interface{}, parameters ...interface{}) (*model.Job, error)
- func (q *Queuer) AddJobWithOptionsTx(tx *sql.Tx, options *model.Options, task interface{}, ...) (*model.Job, error)
- func (q *Queuer) AddJobs(batchJobs []model.BatchJob) error
- func (q *Queuer) AddNextIntervalFunc(nif model.NextIntervalFunc) *model.Worker
- func (q *Queuer) AddNextIntervalFuncWithName(nif model.NextIntervalFunc, name string) *model.Worker
- func (q *Queuer) AddTask(task interface{}) *model.Task
- func (q *Queuer) AddTaskWithName(task interface{}, name string) *model.Task
- func (q *Queuer) CancelAllJobsByWorker(workerRid uuid.UUID, entries int) error
- func (q *Queuer) CancelJob(jobRid uuid.UUID) (*model.Job, error)
- func (q *Queuer) GetJob(jobRid uuid.UUID) (*model.Job, error)
- func (q *Queuer) GetJobs(lastId int, entries int) ([]*model.Job, error)
- func (q *Queuer) GetJobsByWorkerRID(workerRid uuid.UUID, lastId int, entries int) ([]*model.Job, error)
- func (q *Queuer) GetJobsEnded(lastId int, entries int) ([]*model.Job, error)
- func (q *Queuer) GetWorker(workerRid uuid.UUID) (*model.Worker, error)
- func (q *Queuer) GetWorkers(lastId int, entries int) ([]*model.Worker, error)
- func (q *Queuer) ListenForJobDelete(notifyFunction func(data *model.Job)) error
- func (q *Queuer) ListenForJobUpdate(notifyFunction func(data *model.Job)) error
- func (q *Queuer) ReaddJobFromArchive(jobRid uuid.UUID) (*model.Job, error)
- func (q *Queuer) Start(ctx context.Context, cancel context.CancelFunc)
- func (q *Queuer) StartWithoutWorker(ctx context.Context, cancel context.CancelFunc, withoutListeners bool)
- func (q *Queuer) Stop() error
- func (q *Queuer) WaitForJobAdded() *model.Job
- func (q *Queuer) WaitForJobFinished(jobRid uuid.UUID) *model.Job
Constants ΒΆ
This section is empty.
Variables ΒΆ
This section is empty.
Functions ΒΆ
This section is empty.
Types ΒΆ
type Queuer ΒΆ
type Queuer struct {
// DBs
DB *sql.DB
JobPollInterval time.Duration
// Queuer options before starting
WithTableDrop bool
// contains filtered or unexported fields
}
Queuer represents the main queuing system. It manages job scheduling, execution, and error handling. It provides methods to start, stop, and manage jobs and workers. It also handles database connections and listeners for job events.
func NewQueuer ΒΆ
NewQueuer creates a new Queuer instance with the given name and max concurrency. It wraps NewQueuerWithDB to initialize the queuer without an external db config.
func NewQueuerWithDB ΒΆ
func NewQueuerWithDB(name string, maxConcurrency int, dbConfig *helper.DatabaseConfiguration, options ...*model.OnError) *Queuer
NewQueuer creates a new Queuer instance with the given name and max concurrency. It initializes the database connection and worker. If options are provided, it creates a worker with those options. If any error occurs during initialization, it logs a panic error and exits the program. It returns a pointer to the newly created Queuer instance.
func (*Queuer) AddJob ΒΆ
AddJob adds a job to the queue with the given task and parameters. As a task you can either pass a function or a string with the task name (necessary if you want to use a task with a name set by you). It returns the created job or an error if something goes wrong.
func (*Queuer) AddJobTx ΒΆ
func (q *Queuer) AddJobTx(tx *sql.Tx, task interface{}, parameters ...interface{}) (*model.Job, error)
AddJobTx adds a job to the queue with the given task and parameters within a transaction. As a task you can either pass a function or a string with the task name (necessary if you want to use a task with a name set by you). It returns the created job or an error if something goes wrong.
func (*Queuer) AddJobWithOptions ΒΆ
func (q *Queuer) AddJobWithOptions(options *model.Options, task interface{}, parameters ...interface{}) (*model.Job, error)
AddJobWithOptions adds a job with the given task, options, and parameters. As a task you can either pass a function or a string with the task name (necessary if you want to use a task with a name set by you).
It returns the created job or an error if something goes wrong.
The options parameter allows you to specify additional options for the job, such as scheduling, retry policies, and error handling. If options are nil, the worker's default options will be used.
Example usage: ```go
options := &model.Options{
OnError: &model.OnError{
Timeout: 5,
MaxRetries: 2, // Runs 3 times, first is not a retry
RetryDelay: 1,
RetryBackoff: model.RETRY_BACKOFF_NONE,
},
Schedule: &model.Schedule{
Start: time.Now().Add(10 * time.Second),
Interval: 5 * time.Second,
MaxCount: 3,
},
}
job, err := queuer.AddJobWithOptions(options, myTaskFunction, param1, param2)
if err != nil {
log.Fatalf("Failed to add job: %v", err)
}
```
func (*Queuer) AddJobWithOptionsTx ΒΆ
func (q *Queuer) AddJobWithOptionsTx(tx *sql.Tx, options *model.Options, task interface{}, parameters ...interface{}) (*model.Job, error)
AddJobWithOptionsTx adds a job with the given task, options, and parameters within a transaction. As a task you can either pass a function or a string with the task name (necessary if you want to use a task with a name set by you). It returns the created job or an error if something goes wrong.
func (*Queuer) AddJobs ΒΆ
AddJobs adds a batch of jobs to the queue. It takes a slice of BatchJob, which contains the task, options, and parameters for each job. It returns an error if something goes wrong during the process.
func (*Queuer) AddNextIntervalFunc ΒΆ
func (q *Queuer) AddNextIntervalFunc(nif model.NextIntervalFunc) *model.Worker
AddNextIntervalFunc adds a NextIntervalFunc to the worker's available next interval functions. It takes a NextIntervalFunc and adds it to the worker's AvailableNextIntervalFuncs. The function name is derived from the NextIntervalFunc interface using helper.GetTaskNameFromInterface. The function is meant to be used before starting the Queuer to ensure that the worker has access to the function. It checks if the function is nil or already exists in the worker's available next interval functions.
If the function is nil or already exists, it panics. It returns the updated worker after adding the function.
func (*Queuer) AddNextIntervalFuncWithName ΒΆ
AddNextIntervalFuncWithName adds a NextIntervalFunc to the worker's available next interval functions with a specific name. It takes a NextIntervalFunc and a name, checks if the function is nil or already exists in the worker's available next interval functions, and adds it to the worker's AvailableNextIntervalFuncs.
This function is useful when you want to add a NextIntervalFunc with a specific name that you control, rather than deriving it from the function itself. It ensures that the function is not nil and that the name does not already exist in the worker's available next interval functions.
If the function is nil or already exists, it panics. It returns the updated worker after adding the function with the specified name.
func (*Queuer) AddTask ΒΆ
AddTask adds a new task to the queuer. It creates a new task with the provided task interface, adds it to the worker's available tasks, and updates the worker in the database. The task name is automatically generated based on the task's function name (eg. main.TestTask).
If the task creation fails, it logs a panic error and exits the program. It returns the newly created task.
func (*Queuer) AddTaskWithName ΒΆ
AddTaskWithName adds a new task with a specific name to the queuer. It creates a new task with the provided task interface and name, adds it to the worker's available tasks, and updates the worker in the database.
If task creation fails, it logs a panic error and exits the program. It returns the newly created task.
func (*Queuer) CancelAllJobsByWorker ΒΆ
CancelAllJobsByWorker cancels all jobs assigned to a specific worker by its RID. It retrieves all jobs assigned to the worker and cancels each one. It returns an error if something goes wrong during the process.
func (*Queuer) CancelJob ΒΆ
CancelJob cancels a job with the given job RID. It retrieves the job from the database and cancels it. If the job is not found or already cancelled, it returns an error.
func (*Queuer) GetJobsByWorkerRID ΒΆ
func (q *Queuer) GetJobsByWorkerRID(workerRid uuid.UUID, lastId int, entries int) ([]*model.Job, error)
GetJobsByWorkerRID retrieves jobs assigned to a specific worker by its RID.
func (*Queuer) GetJobsEnded ΒΆ
GetJobsEnded retrieves all jobs that have ended (succeeded, cancelled or failed).
func (*Queuer) GetWorker ΒΆ
GetWorker retrieves a worker by its RID (Resource Identifier). It returns the worker if found, or an error if not.
func (*Queuer) GetWorkers ΒΆ
GetWorkers retrieves a list of workers starting from the lastId and returning the specified number of entries. It returns a slice of workers and an error if any occurs.
func (*Queuer) ListenForJobDelete ΒΆ
ListenForJobInsert listens for job insert events and notifies the provided function when a job is added.
func (*Queuer) ListenForJobUpdate ΒΆ
ListenForJobInsert listens for job insert events and notifies the provided function when a job is added.
func (*Queuer) ReaddJobFromArchive ΒΆ
ReaddJobFromArchive readds a job from the archive back to the queue.
func (*Queuer) Start ΒΆ
func (q *Queuer) Start(ctx context.Context, cancel context.CancelFunc)
Start starts the queuer by initializing the job listeners and starting the job poll ticker. It checks if the queuer is initialized properly, and if not, it logs a panic error and exits the program. It runs the job processing in a separate goroutine and listens for job events.
Detailed steps include: 1. Create job and job archive database listeners. 2. Create broadcasters for job insert, update, and delete events. 3. Start the job listeners to listen for job events. 4. Start the job poll ticker to periodically check for new jobs. 5. Wait for the queuer to be ready or log a panic error if it fails to start within 5 seconds.
func (*Queuer) StartWithoutWorker ΒΆ
func (q *Queuer) StartWithoutWorker(ctx context.Context, cancel context.CancelFunc, withoutListeners bool)
Start starts the queuer by initializing the job listeners and starting the job poll ticker. It checks if the queuer is initialized properly, and if not, it logs a panic error and exits the program. It runs the job processing in a separate goroutine and listens for job events.
This version does not run the job processing, allowing the queuer to be started without a worker. Is is useful if you want to run a queuer instance in a seperate service without a worker, for example to handle listening to job events and providing a central frontend.
func (*Queuer) Stop ΒΆ
Stop stops the queuer by closing the job listeners, cancelling all queued and running jobs, and cancelling the context to stop the queuer.
func (*Queuer) WaitForJobAdded ΒΆ
WaitForJobAdded waits for any job to start and returns the job. It listens for job insert events and returns the job when it is added to the queue.