Documentation
ΒΆ
Index ΒΆ
- type Queuer
- func (q *Queuer) AddJob(task interface{}, parametersKeyed map[string]interface{}, ...) (*model.Job, error)
- func (q *Queuer) AddJobTx(tx *sql.Tx, task interface{}, parametersKeyed map[string]interface{}, ...) (*model.Job, error)
- func (q *Queuer) AddJobWithOptions(options *model.Options, task interface{}, ...) (*model.Job, error)
- func (q *Queuer) AddJobWithOptionsTx(tx *sql.Tx, options *model.Options, task interface{}, ...) (*model.Job, error)
- func (q *Queuer) AddJobs(batchJobs []model.BatchJob) error
- func (q *Queuer) AddNextIntervalFunc(nif model.NextIntervalFunc) *model.Worker
- func (q *Queuer) AddNextIntervalFuncWithName(nif model.NextIntervalFunc, name string) *model.Worker
- func (q *Queuer) AddTask(task interface{}) *model.Task
- func (q *Queuer) AddTaskWithName(task interface{}, name string) *model.Task
- func (q *Queuer) CancelAllJobsByWorker(workerRid uuid.UUID, entries int) error
- func (q *Queuer) CancelJob(jobRid uuid.UUID) (*model.Job, error)
- func (q *Queuer) DeleteJob(jobRid uuid.UUID) error
- func (q *Queuer) GetConnections() ([]*model.Connection, error)
- func (q *Queuer) GetJob(jobRid uuid.UUID) (*model.Job, error)
- func (q *Queuer) GetJobEnded(jobRid uuid.UUID) (*model.Job, error)
- func (q *Queuer) GetJobs(lastId int, entries int) ([]*model.Job, error)
- func (q *Queuer) GetJobsBySearch(search string, lastId int, entries int) ([]*model.Job, error)
- func (q *Queuer) GetJobsByWorkerRID(workerRid uuid.UUID, lastId int, entries int) ([]*model.Job, error)
- func (q *Queuer) GetJobsEnded(lastId int, entries int) ([]*model.Job, error)
- func (q *Queuer) GetJobsEndedBySearch(search string, lastId int, entries int) ([]*model.Job, error)
- func (q *Queuer) GetWorker(workerRid uuid.UUID) (*model.Worker, error)
- func (q *Queuer) GetWorkers(lastId int, entries int) ([]*model.Worker, error)
- func (q *Queuer) GetWorkersBySearch(search string, lastId int, entries int) ([]*model.Worker, error)
- func (q *Queuer) ListenForJobDelete(notifyFunction func(data *model.Job)) error
- func (q *Queuer) ListenForJobUpdate(notifyFunction func(data *model.Job)) error
- func (q *Queuer) ReaddJobFromArchive(jobRid uuid.UUID) (*model.Job, error)
- func (q *Queuer) Start(ctx context.Context, cancel context.CancelFunc, ...)
- func (q *Queuer) StartWithoutWorker(ctx context.Context, cancel context.CancelFunc, withoutListeners bool, ...)
- func (q *Queuer) Stop() error
- func (q *Queuer) StopWorker(workerRid uuid.UUID) error
- func (q *Queuer) StopWorkerGracefully(workerRid uuid.UUID) error
- func (q *Queuer) WaitForJobAdded() *model.Job
- func (q *Queuer) WaitForJobFinished(jobRid uuid.UUID, timeout time.Duration) *model.Job
Constants ΒΆ
This section is empty.
Variables ΒΆ
This section is empty.
Functions ΒΆ
This section is empty.
Types ΒΆ
type Queuer ΒΆ
type Queuer struct {
DB *sql.DB
JobPollInterval time.Duration
WorkerPollInterval time.Duration
RetentionArchive time.Duration
// contains filtered or unexported fields
}
Queuer represents the main queuing system. It manages job scheduling, execution, and error handling. It provides methods to start, stop, and manage jobs and workers. It also handles database connections and listeners for job events.
func NewQueuer ΒΆ
NewQueuer creates a new Queuer instance with the given name and max concurrency. It wraps NewQueuerWithDB to initialize the queuer without an external db config and encryption key. The encryption key for the database is taken from an environment variable (QUEUER_ENCRYPTION_KEY), if not provided, it defaults to unencrypted results.
func NewQueuerWithDB ΒΆ
func NewQueuerWithDB(name string, maxConcurrency int, encryptionKey string, dbConfig *helper.DatabaseConfiguration, options ...*model.OnError) *Queuer
NewQueuerWithDB creates a new Queuer instance with the given name and max concurrency. It initializes the database connection and worker. If options are provided, it creates a worker with those options.
It takes the db configuration from environment variables if dbConfig is nil. - QUEUER_DB_HOST (required) - QUEUER_DB_PORT (required) - QUEUER_DB_DATABASE (required) - QUEUER_DB_USERNAME (required) - QUEUER_DB_PASSWORD (required) - QUEUER_DB_SCHEMA (required) - QUEUER_DB_SSLMODE (optional, defaults to "require")
If the encryption key is empty, it defaults to unencrypted results.
If any error occurs during initialization, it logs a panic error and exits the program. It returns a pointer to the newly created Queuer instance.
func NewStaticQueuer ΒΆ added in v1.9.0
func NewStaticQueuer(logLevel slog.Leveler, dbConfig *helper.DatabaseConfiguration) *Queuer
NewStaticQueuer creates a new Queuer instance without a worker. It initializes the database connection and other necessary components. If any error occurs during initialization, it logs a panic error and exits the program. It returns a pointer to the newly created Queuer instance. This queuer instance does not listen to the db nor does it run jobs. It is primarily used for static operations like adding jobs, getting job status etc.
func (*Queuer) AddJob ΒΆ
func (q *Queuer) AddJob(task interface{}, parametersKeyed map[string]interface{}, parameters ...interface{}) (*model.Job, error)
AddJob adds a job to the queue with the given task and parameters. As a task you can either pass a function or a string with the task name (necessary if you want to use a task with a name set by you). It returns the created job or an error if something goes wrong.
func (*Queuer) AddJobTx ΒΆ
func (q *Queuer) AddJobTx(tx *sql.Tx, task interface{}, parametersKeyed map[string]interface{}, parameters ...interface{}) (*model.Job, error)
AddJobTx adds a job to the queue with the given task and parameters within a transaction. As a task you can either pass a function or a string with the task name (necessary if you want to use a task with a name set by you). It returns the created job or an error if something goes wrong.
func (*Queuer) AddJobWithOptions ΒΆ
func (q *Queuer) AddJobWithOptions(options *model.Options, task interface{}, parametersKeyed map[string]interface{}, parameters ...interface{}) (*model.Job, error)
AddJobWithOptions adds a job with the given task, options, and parameters. As a task you can either pass a function or a string with the task name (necessary if you want to use a task with a name set by you).
It returns the created job or an error if something goes wrong. The options parameter allows you to specify additional options for the job, such as scheduling, retry policies, and error handling. If options are nil, the worker's default options will be used.
Example usage:
func AddJobExample(queuer *Queuer, param1 string, param2 int) {
options := &model.Options{
OnError: &model.OnError{
Timeout: 5,
MaxRetries: 2, Runs 3 times, first is not a retry
RetryDelay: 1,
RetryBackoff: model.RETRY_BACKOFF_NONE,
},
Schedule: &model.Schedule{
Start: time.Now().Add(10 * time.Second),
Interval: 5 * time.Second,
MaxCount: 3,
},
}
job, err := queuer.AddJobWithOptions(options, myTaskFunction, param1, param2)
if err != nil {
log.Fatalf("Failed to add job: %v", err)
}
}
func (*Queuer) AddJobWithOptionsTx ΒΆ
func (q *Queuer) AddJobWithOptionsTx(tx *sql.Tx, options *model.Options, task interface{}, parametersKeyed map[string]interface{}, parameters ...interface{}) (*model.Job, error)
AddJobWithOptionsTx adds a job with the given task, options, and parameters within a transaction. As a task you can either pass a function or a string with the task name (necessary if you want to use a task with a name set by you). It returns the created job or an error if something goes wrong.
func (*Queuer) AddJobs ΒΆ
AddJobs adds a batch of jobs to the queue. It takes a slice of BatchJob, which contains the task, options, and parameters for each job. It returns an error if something goes wrong during the process.
func (*Queuer) AddNextIntervalFunc ΒΆ
func (q *Queuer) AddNextIntervalFunc(nif model.NextIntervalFunc) *model.Worker
AddNextIntervalFunc adds a NextIntervalFunc to the worker's available next interval functions. It takes a NextIntervalFunc and adds it to the worker's AvailableNextIntervalFuncs. The function name is derived from the NextIntervalFunc interface using helper.GetTaskNameFromInterface. The function is meant to be used before starting the Queuer to ensure that the worker has access to the function. It checks if the function is nil or already exists in the worker's available next interval functions.
If the function is nil or already exists, it panics. It returns the updated worker after adding the function.
func (*Queuer) AddNextIntervalFuncWithName ΒΆ
AddNextIntervalFuncWithName adds a NextIntervalFunc to the worker's available next interval functions with a specific name. It takes a NextIntervalFunc and a name, checks if the function is nil or already exists in the worker's available next interval functions, and adds it to the worker's AvailableNextIntervalFuncs.
This function is useful when you want to add a NextIntervalFunc with a specific name that you control, rather than deriving it from the function itself. It ensures that the function is not nil and that the name does not already exist in the worker's available next interval functions.
If the function is nil or already exists, it panics. It returns the updated worker after adding the function with the specified name.
func (*Queuer) AddTask ΒΆ
AddTask adds a new task to the queuer. It creates a new task with the provided task interface, adds it to the worker's available tasks, and updates the worker in the database. The task name is automatically generated based on the task's function name (eg. main.TestTask).
If the task creation fails, it logs a panic error and exits the program. It returns the newly created task.
func (*Queuer) AddTaskWithName ΒΆ
AddTaskWithName adds a new task with a specific name to the queuer. It creates a new task with the provided task interface and name, adds it to the worker's available tasks, and updates the worker in the database.
If task creation fails, it logs a panic error and exits the program. It returns the newly created task.
func (*Queuer) CancelAllJobsByWorker ΒΆ
CancelAllJobsByWorker cancels all jobs assigned to a specific worker by its RID. It retrieves all jobs assigned to the worker and cancels each one. It returns an error if something goes wrong during the process.
func (*Queuer) CancelJob ΒΆ
CancelJob cancels a job with the given job RID. It retrieves the job from the database and cancels it. If the job is not found or already cancelled, it returns an error.
func (*Queuer) GetConnections ΒΆ added in v1.9.0
func (q *Queuer) GetConnections() ([]*model.Connection, error)
GetAllConnections retrieves all connections from the database.
func (*Queuer) GetJobEnded ΒΆ added in v1.9.0
GetJobEnded retrieves a job that has ended (succeeded, cancelled or failed) by its RID.
func (*Queuer) GetJobsBySearch ΒΆ added in v1.37.0
GetJobsBySearch retrieves jobs that match the given search term.
func (*Queuer) GetJobsByWorkerRID ΒΆ
func (q *Queuer) GetJobsByWorkerRID(workerRid uuid.UUID, lastId int, entries int) ([]*model.Job, error)
GetJobsByWorkerRID retrieves jobs assigned to a specific worker by its RID.
func (*Queuer) GetJobsEnded ΒΆ
GetJobsEnded retrieves all jobs that have ended (succeeded, cancelled or failed).
func (*Queuer) GetJobsEndedBySearch ΒΆ added in v1.37.0
GetJobsEndedBySearch retrieves ended jobs that match the given search term.
func (*Queuer) GetWorkers ΒΆ
GetWorkers retrieves a list of workers starting from the lastId and returning the specified number of entries.
func (*Queuer) GetWorkersBySearch ΒΆ added in v1.37.0
func (q *Queuer) GetWorkersBySearch(search string, lastId int, entries int) ([]*model.Worker, error)
GetWorkersBySearch retrieves workers that match the given search term.
func (*Queuer) ListenForJobDelete ΒΆ
ListenForJobInsert listens for job insert events and notifies the provided function when a job is added.
func (*Queuer) ListenForJobUpdate ΒΆ
ListenForJobInsert listens for job insert events and notifies the provided function when a job is added.
func (*Queuer) ReaddJobFromArchive ΒΆ
ReaddJobFromArchive readds a job from the archive back to the queue.
func (*Queuer) Start ΒΆ
func (q *Queuer) Start(ctx context.Context, cancel context.CancelFunc, masterSettings ...*model.MasterSettings)
Start starts the queuer by initializing the job listeners and starting the job poll ticker. If masterSettings are provided, it also starts the master poll ticker. If masterSettings contain 0 values, they are set to default values. It checks if the queuer is initialized properly, and if not, it logs a panic error and exits the program. It runs the job processing in a separate goroutine and listens for job events.
Detailed steps include: 1. Create job and job archive database listeners. 2. Create broadcasters for job insert, update, and delete events. 3. Start the job listeners to listen for job events. 4. Start the job poll ticker to periodically check for new jobs. 5. Wait for the queuer to be ready or log a panic error if it fails to start within 5 seconds.
func (*Queuer) StartWithoutWorker ΒΆ
func (q *Queuer) StartWithoutWorker(ctx context.Context, cancel context.CancelFunc, withoutListeners bool, masterSettings ...*model.MasterSettings)
Start starts the queuer by initializing the job listeners and starting the job poll ticker. It checks if the queuer is initialized properly, and if not, it logs a panic error and exits the program. It runs the job processing in a separate goroutine and listens for job events.
This version does not run the job processing, allowing the queuer to be started without a worker. Is is useful if you want to run a queuer instance in a seperate service without a worker, for example to handle listening to job events and providing a central frontend.
func (*Queuer) Stop ΒΆ
Stop stops the queuer by closing the job listeners, cancelling all queued and running jobs, and cancelling the context to stop the queuer.
func (*Queuer) StopWorker ΒΆ added in v1.46.0
StopWorkerGracefully sets the status of the specified worker to 'STOPPING' to cancel running jobs when stopping.
func (*Queuer) StopWorkerGracefully ΒΆ added in v1.46.0
StopWorkerGracefully sets the worker's status to STOPPING to allow it to finish current tasks before stopping.
func (*Queuer) WaitForJobAdded ΒΆ
WaitForJobAdded waits for any job to start and returns the job. It listens for job insert events and returns the job when it is added to the queue.
func (*Queuer) WaitForJobFinished ΒΆ
WaitForJobFinished waits for a job to finish and returns the job. It listens for job delete events and returns the job when it is finished. If timeout is reached before the job finishes, it returns nil.