Documentation
¶
Index ¶
- type JobDBHandler
- func (r JobDBHandler) AddRetentionArchive(retention time.Duration) error
- func (r JobDBHandler) BatchInsertJobs(jobs []*model.Job) error
- func (r JobDBHandler) CheckTablesExistance() (bool, error)
- func (r JobDBHandler) CreateTable() error
- func (r JobDBHandler) DeleteJob(rid uuid.UUID) error
- func (r JobDBHandler) DropTables() error
- func (r JobDBHandler) InsertJob(job *model.Job) (*model.Job, error)
- func (r JobDBHandler) InsertJobTx(tx *sql.Tx, job *model.Job) (*model.Job, error)
- func (r JobDBHandler) RemoveRetentionArchive() error
- func (r JobDBHandler) SelectAllJobs(lastID int, entries int) ([]*model.Job, error)
- func (r JobDBHandler) SelectAllJobsBySearch(search string, lastID int, entries int) ([]*model.Job, error)
- func (r JobDBHandler) SelectAllJobsByWorkerRID(workerRid uuid.UUID, lastID int, entries int) ([]*model.Job, error)
- func (r JobDBHandler) SelectAllJobsFromArchive(lastID int, entries int) ([]*model.Job, error)
- func (r JobDBHandler) SelectAllJobsFromArchiveBySearch(search string, lastID int, entries int) ([]*model.Job, error)
- func (r JobDBHandler) SelectJob(rid uuid.UUID) (*model.Job, error)
- func (r JobDBHandler) SelectJobFromArchive(rid uuid.UUID) (*model.Job, error)
- func (r JobDBHandler) UpdateJobFinal(job *model.Job) (*model.Job, error)
- func (r JobDBHandler) UpdateJobsInitial(worker *model.Worker) ([]*model.Job, error)
- func (r JobDBHandler) UpdateStaleJobs() (int, error)
- type JobDBHandlerFunctions
- type MasterDBHandler
- func (r MasterDBHandler) CheckTableExistance() (bool, error)
- func (r MasterDBHandler) CreateTable() error
- func (r MasterDBHandler) DropTable() error
- func (r MasterDBHandler) SelectMaster() (*model.Master, error)
- func (r MasterDBHandler) UpdateMaster(worker *model.Worker, settings *model.MasterSettings) (*model.Master, error)
- type MasterDBHandlerFunctions
- type QueuerListener
- type WorkerDBHandler
- func (r WorkerDBHandler) CheckTableExistance() (bool, error)
- func (r WorkerDBHandler) CreateTable() error
- func (r WorkerDBHandler) DeleteStaleWorkers(deleteThreshold time.Duration) (int, error)
- func (r WorkerDBHandler) DeleteWorker(rid uuid.UUID) error
- func (r WorkerDBHandler) DropTable() error
- func (r WorkerDBHandler) InsertWorker(worker *model.Worker) (*model.Worker, error)
- func (r WorkerDBHandler) SelectAllConnections() ([]*model.Connection, error)
- func (r WorkerDBHandler) SelectAllWorkers(lastID int, entries int) ([]*model.Worker, error)
- func (r WorkerDBHandler) SelectAllWorkersBySearch(search string, lastID int, entries int) ([]*model.Worker, error)
- func (r WorkerDBHandler) SelectWorker(rid uuid.UUID) (*model.Worker, error)
- func (r WorkerDBHandler) UpdateStaleWorkers(staleThreshold time.Duration) (int, error)
- func (r WorkerDBHandler) UpdateWorker(worker *model.Worker) (*model.Worker, error)
- type WorkerDBHandlerFunctions
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type JobDBHandler ¶
type JobDBHandler struct {
EncryptionKey string
// contains filtered or unexported fields
}
JobDBHandler implements JobDBHandlerFunctions and holds the database connection.
func NewJobDBHandler ¶
func NewJobDBHandler(dbConnection *helper.Database, withTableDrop bool, encryptionKey ...string) (*JobDBHandler, error)
NewJobDBHandler creates a new instance of JobDBHandler. It initializes the database connection and optionally drops existing tables. If withTableDrop is true, it will drop the existing job tables before creating new ones
func (JobDBHandler) AddRetentionArchive ¶ added in v1.6.0
func (r JobDBHandler) AddRetentionArchive(retention time.Duration) error
AddRetentionArchive updates the retention archive settings for the job archive.
func (JobDBHandler) BatchInsertJobs ¶
func (r JobDBHandler) BatchInsertJobs(jobs []*model.Job) error
BatchInsertJobs inserts multiple job records into the database in a single transaction.
func (JobDBHandler) CheckTablesExistance ¶ added in v1.4.0
func (r JobDBHandler) CheckTablesExistance() (bool, error)
CheckTablesExistance checks if the 'job' and 'job_archive' tables exist in the database. It returns true if both tables exist, otherwise false.
func (JobDBHandler) CreateTable ¶
func (r JobDBHandler) CreateTable() error
CreateTable creates the 'job' and 'job_archive' tables in the database. If the tables already exist, it does not create them again. It also creates a trigger for notifying events on the table and all necessary indexes.
func (JobDBHandler) DeleteJob ¶
func (r JobDBHandler) DeleteJob(rid uuid.UUID) error
DeleteJob deletes a job record from the job archive based on its RID. We only delete jobs from the archive as queued and running jobs should be cancelled first. Cancelling a job will move it to the archive with CANCELLED status.
func (JobDBHandler) DropTables ¶
func (r JobDBHandler) DropTables() error
DropTables drops the 'job' and 'job_archive' tables from the database.
func (JobDBHandler) InsertJobTx ¶
InsertJobTx inserts a new job record into the database within a transaction.
func (JobDBHandler) RemoveRetentionArchive ¶ added in v1.6.0
func (r JobDBHandler) RemoveRetentionArchive() error
RemoveRetentionArchive removes the retention archive settings for the job archive.
func (JobDBHandler) SelectAllJobs ¶
SelectAllJobs retrieves a paginated list of jobs for a all workers. It returns jobs that were created before the specified lastID, or the newest jobs if lastID is 0.
func (JobDBHandler) SelectAllJobsBySearch ¶
func (r JobDBHandler) SelectAllJobsBySearch(search string, lastID int, entries int) ([]*model.Job, error)
SelectAllJobsBySearch retrieves a paginated list of jobs for a worker, filtered by search string.
It searches across 'rid', 'worker_id', and 'status' fields. The search is case-insensitive and uses ILIKE for partial matches.
It returns jobs that were created before the specified lastID, or the newest jobs if lastID is 0.
func (JobDBHandler) SelectAllJobsByWorkerRID ¶
func (r JobDBHandler) SelectAllJobsByWorkerRID(workerRid uuid.UUID, lastID int, entries int) ([]*model.Job, error)
SelectAllJobsByWorkerRID retrieves a paginated list of jobs for a specific worker, filtered by worker RID. It returns jobs that were created before the specified lastID, or the newest jobs if lastID is 0.
func (JobDBHandler) SelectAllJobsFromArchive ¶
SelectAllJobsFromArchive retrieves a paginated list of archived jobs. It returns jobs that were created before the specified lastID, or the newest jobs if lastID is 0.
func (JobDBHandler) SelectAllJobsFromArchiveBySearch ¶
func (r JobDBHandler) SelectAllJobsFromArchiveBySearch(search string, lastID int, entries int) ([]*model.Job, error)
SelectAllJobsFromArchiveBySearch retrieves a paginated list of archived jobs filtered by search string. It searches across 'rid', 'worker_id', 'task_name', and 'status' fields. It returns jobs that were created before the specified lastID, or the newest jobs if lastID
func (JobDBHandler) SelectJob ¶
SelectJob retrieves a single job record from the database based on its RID.
func (JobDBHandler) SelectJobFromArchive ¶
SelectJobFromArchive retrieves a single archived job record from the database based on its RID.
func (JobDBHandler) UpdateJobFinal ¶
UpdateJobFinal updates an existing job record in the database to state 'FAILED' or 'SUCCEEDED'.
It deletes the job from the 'job' table and inserts it into the 'job_archive' table. The archived job will have the status set to the provided status, and it will include results and error information.
It returns the archived job record.
func (JobDBHandler) UpdateJobsInitial ¶
UpdateJobsInitial updates an existing queued non locked job record in the database.
Checks if the job is in 'QUEUED' or 'FAILED' status and if the worker can handle the task. The worker must have the task in its available tasks and the next interval must be available if set. If the job is scheduled it must be scheduled within the next 10 minutes. It updates the job to 'RUNNING' status, increments the schedule count and attempts, and sets the started_at timestamp. It uses the `FOR UPDATE SKIP LOCKED` clause to avoid locking issues with concurrent updates.
It returns the updated job records.
func (JobDBHandler) UpdateStaleJobs ¶ added in v1.9.0
func (r JobDBHandler) UpdateStaleJobs() (int, error)
UpdateStaleJobs updates all jobs to QUEUED status where the assigned worker is STOPPED so they can be picked up by available workers again. It returns the number of jobs that were updated. Jobs are considered stale if their assigned worker has STOPPED status.
type JobDBHandlerFunctions ¶
type JobDBHandlerFunctions interface {
CheckTablesExistance() (bool, error)
CreateTable() error
DropTables() error
InsertJob(job *model.Job) (*model.Job, error)
InsertJobTx(tx *sql.Tx, job *model.Job) (*model.Job, error)
BatchInsertJobs(jobs []*model.Job) error
UpdateJobsInitial(worker *model.Worker) ([]*model.Job, error)
UpdateJobFinal(job *model.Job) (*model.Job, error)
UpdateStaleJobs() (int, error)
DeleteJob(rid uuid.UUID) error
SelectJob(rid uuid.UUID) (*model.Job, error)
SelectAllJobs(lastID int, entries int) ([]*model.Job, error)
SelectAllJobsByWorkerRID(workerRid uuid.UUID, lastID int, entries int) ([]*model.Job, error)
SelectAllJobsBySearch(search string, lastID int, entries int) ([]*model.Job, error)
// Job Archive
AddRetentionArchive(retention time.Duration) error
RemoveRetentionArchive() error
SelectJobFromArchive(rid uuid.UUID) (*model.Job, error)
SelectAllJobsFromArchive(lastID int, entries int) ([]*model.Job, error)
SelectAllJobsFromArchiveBySearch(search string, lastID int, entries int) ([]*model.Job, error)
}
JobDBHandlerFunctions defines the interface for Job database operations.
type MasterDBHandler ¶ added in v1.6.0
type MasterDBHandler struct {
// contains filtered or unexported fields
}
MasterDBHandler implements MasterDBHandlerFunctions and holds the database connection.
func NewMasterDBHandler ¶ added in v1.6.0
func NewMasterDBHandler(dbConnection *helper.Database, withTableDrop bool) (*MasterDBHandler, error)
NewMasterDBHandler creates a new instance of MasterDBHandler. It initializes the database connection and creates the master table if it does not exist.
func (MasterDBHandler) CheckTableExistance ¶ added in v1.6.0
func (r MasterDBHandler) CheckTableExistance() (bool, error)
CheckTableExistance checks if the 'master' table exists in the database. It returns true if the table exists, otherwise false.
func (MasterDBHandler) CreateTable ¶ added in v1.6.0
func (r MasterDBHandler) CreateTable() error
CreateTable creates the 'master' and 'master_archive' tables in the database. If the tables already exist, it does not create them again. It also creates a trigger for notifying events on the table and all necessary indexes.
func (MasterDBHandler) DropTable ¶ added in v1.6.0
func (r MasterDBHandler) DropTable() error
DropTables drops the 'master' and 'master_archive' tables from the database.
func (MasterDBHandler) SelectMaster ¶ added in v1.6.0
func (r MasterDBHandler) SelectMaster() (*model.Master, error)
SelectMaster retrieves the current master entry from the database.
func (MasterDBHandler) UpdateMaster ¶ added in v1.6.0
func (r MasterDBHandler) UpdateMaster(worker *model.Worker, settings *model.MasterSettings) (*model.Master, error)
UpdateMaster updates the master entry with the given worker's ID and settings. It locks the row for update to ensure that only one worker can update the master at a time. It returns the old master entry if it was successfully updated, or nil if no update was done.
type MasterDBHandlerFunctions ¶ added in v1.6.0
type MasterDBHandlerFunctions interface {
CheckTableExistance() (bool, error)
CreateTable() error
DropTable() error
UpdateMaster(worker *model.Worker, settings *model.MasterSettings) (*model.Master, error)
SelectMaster() (*model.Master, error)
}
MasterDBHandlerFunctions defines the interface for Master database operations.
type QueuerListener ¶
func NewQueuerDBListener ¶
func NewQueuerDBListener(dbConfig *helper.DatabaseConfiguration, channel string) (*QueuerListener, error)
NewQueuerDBListener creates a new QueuerListener instance. It initializes a PostgreSQL listener for the specified channel using the provided database configuration. The listener will automatically reconnect if the connection is lost, with a 10-second timeout and a 1-minute interval for reconnection attempts. If an error occurs during the creation of the listener, it returns an error. The listener will log any errors encountered during listening.
func (*QueuerListener) Listen ¶
func (l *QueuerListener) Listen(ctx context.Context, cancel context.CancelFunc, notifyFunction func(data string))
Listen listens for events on the specified channel and processes them. It takes a context for cancellation, a cancel function to stop listening, and a notifyFunction that will be called with the event data when an event is received. The listener will check the connection every 90 seconds and will cancel the context if an error occurs during the ping. The notifyFunction will be called in a separate goroutine to avoid blocking the listener. If the context is done, the listener will stop listening and returns. It will log any errors encountered during the ping operation.
func (*QueuerListener) ListenWithTimeout ¶ added in v1.9.0
func (l *QueuerListener) ListenWithTimeout(ctx context.Context, cancel context.CancelFunc, notifyFunction func(data string), pingTimeout time.Duration)
ListenWithTimeout is similar to Listen but allows configuring the ping timeout interval. This is primarily used for testing to avoid waiting the full 90 seconds.
type WorkerDBHandler ¶
type WorkerDBHandler struct {
// contains filtered or unexported fields
}
WorkerDBHandler implements WorkerDBHandlerFunctions and holds the database connection.
func NewWorkerDBHandler ¶
func NewWorkerDBHandler(dbConnection *helper.Database, withTableDrop bool) (*WorkerDBHandler, error)
NewWorkerDBHandler creates a new instance of WorkerDBHandler. It initializes the database connection and optionally drops the existing worker table. If withTableDrop is true, it will drop the existing worker table before creating a new one.
func (WorkerDBHandler) CheckTableExistance ¶
func (r WorkerDBHandler) CheckTableExistance() (bool, error)
CheckTableExistance checks if the 'worker' table exists in the database. It returns true if the table exists, otherwise false.
func (WorkerDBHandler) CreateTable ¶
func (r WorkerDBHandler) CreateTable() error
CreateTable creates the 'worker' table in the database if it doesn't already exist. It defines the structure of the table with appropriate columns and types. If the table already exists, it will not create it again. It also creates necessary indexes for efficient querying.
func (WorkerDBHandler) DeleteStaleWorkers ¶ added in v1.24.0
func (r WorkerDBHandler) DeleteStaleWorkers(deleteThreshold time.Duration) (int, error)
DeleteStaleWorkers deletes workers that have been in STOPPED status for longer than the deleteThreshold. It returns the number of workers that were deleted.
func (WorkerDBHandler) DeleteWorker ¶
func (r WorkerDBHandler) DeleteWorker(rid uuid.UUID) error
DeleteWorker deletes a worker record from the database based on its RID. It removes the worker from the database and returns an error if the deletion fails.
func (WorkerDBHandler) DropTable ¶
func (r WorkerDBHandler) DropTable() error
DropTable drops the 'worker' table from the database. It will remove the table and all its data. This operation is irreversible, so it should be used with caution. It is used during testing or when resetting the database schema. If the table does not exist, it will not return an error.
func (WorkerDBHandler) InsertWorker ¶
InsertWorker inserts a new worker record with name, options and max concurrency into the database. It returns the newly created worker with an automatically generated RID. If the insertion fails, it returns an error.
func (WorkerDBHandler) SelectAllConnections ¶ added in v1.9.0
func (r WorkerDBHandler) SelectAllConnections() ([]*model.Connection, error)
SelectAllConnections retrieves all active connections from the database. It returns a slice of Connection records. If the query fails, it returns an error.
func (WorkerDBHandler) SelectAllWorkers ¶
SelectAllWorkers retrieves a paginated list of all workers. It returns a slice of worker records, ordered by creation date in descending order. It returns workers that were created before the specified lastID, or the newest workers if lastID is 0.
func (WorkerDBHandler) SelectAllWorkersBySearch ¶
func (r WorkerDBHandler) SelectAllWorkersBySearch(search string, lastID int, entries int) ([]*model.Worker, error)
SelectAllWorkersBySearch retrieves a paginated list of workers, filtered by search string. It searches across 'queue_name', 'name', and 'status' fields. The search is case-insensitive and uses ILIKE for partial matches. It returns a slice of worker records, ordered by creation date in descending order. It returns workers that were created before the specified lastID, or the newest workers if last
func (WorkerDBHandler) SelectWorker ¶
SelectWorker retrieves a single worker record from the database based on its RID. It returns the worker record. If the worker is not found or an error occurs during the query, it returns an error.
func (WorkerDBHandler) UpdateStaleWorkers ¶ added in v1.9.0
func (r WorkerDBHandler) UpdateStaleWorkers(staleThreshold time.Duration) (int, error)
UpdateStaleWorkers updates all stale workers to STOPPED status based on the provided threshold. It returns the number of workers that were updated. Workers are considered stale if they have READY or RUNNING status and their updated_at timestamp is older than the threshold.
func (WorkerDBHandler) UpdateWorker ¶
UpdateWorker updates an existing worker record in the database based on its RID. It updates the worker's name, options, available tasks, next interval functions, max concurrency, and status. It returns the updated worker record with an automatically updated updated_at timestamp. If the update fails, it returns an error.
type WorkerDBHandlerFunctions ¶
type WorkerDBHandlerFunctions interface {
CheckTableExistance() (bool, error)
CreateTable() error
DropTable() error
InsertWorker(worker *model.Worker) (*model.Worker, error)
UpdateWorker(worker *model.Worker) (*model.Worker, error)
UpdateStaleWorkers(staleThreshold time.Duration) (int, error)
DeleteWorker(rid uuid.UUID) error
DeleteStaleWorkers(deleteThreshold time.Duration) (int, error)
SelectWorker(rid uuid.UUID) (*model.Worker, error)
SelectAllWorkers(lastID int, entries int) ([]*model.Worker, error)
SelectAllWorkersBySearch(search string, lastID int, entries int) ([]*model.Worker, error)
// Connections
SelectAllConnections() ([]*model.Connection, error)
}
WorkerDBHandlerFunctions defines the interface for Worker database operations.