queuer

package module
v1.51.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2026 License: Apache-2.0 Imports: 16 Imported by: 0

README ΒΆ

queuer

Go Reference Go Coverage License

Queueing package based on postgres written in Go.

πŸ’‘ Goal of this package

This queuer is meant to be as easy as possible to use. No specific function signature (except for an error as the last output parameter, if you want to give back an error), easy setup and still fast.

The job table contains only queued, scheduled and running tasks. The ended jobs (succeeded, cancelled, failed) are moved to a timescaleDB table.


πŸ› οΈ Installation

To integrate the queuer package into your Go project, use the standard go get command:

go get github.com/siherrmann/queuer

To use the package you also need a running postgres database with the timescaleDB extension. You can use the docker-compose.yml file in the example folder or start a Docker container with the timescale/timescaledb:latest-pg17 image.


πŸš€ Getting started

The full initialisation is (in the easiest case):

// Create a new queuer instance
q := queuer.NewQueuer("exampleWorker", 3)

// Add a task to the queuer
q.AddTask(ExampleTask)

// Start the queuer
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q.Start(ctx, cancel)

That's easy, right? Adding a job is just as easy:

// Add a job to the queue
_, err := q.AddJob(ExampleTask, 5, "12")
if err != nil {
    log.Fatalf("Error adding job: %v", err)
}

In the initialisation of the queuer the existance of the necessary database tables is checked and if they don't exist they get created. The database is configured with these environment variables:

QUEUER_DB_HOST=localhost
QUEUER_DB_PORT=5432
QUEUER_DB_DATABASE=postgres
QUEUER_DB_USERNAME=username
QUEUER_DB_PASSWORD=password1234
QUEUER_DB_SCHEMA=public

You can find a full example (the same as above plus a more detailed example) in the example folder. In there you'll also find a docker-compose file with the timescaleDB/postgres service that is needed for the running the queuer (it's just postgres with an extension).


NewQueuer

NewQueuer is a convenience constructor that creates a new Queuer instance using default database configuration derived from environment variables. It acts as a wrapper around NewQueuerWithDB. The encryption key for the database is taken from the QUEUER_ENCRYPTION_KEY environment variable; if not provided, it defaults to unencrypted results.

NewQueuerWithDB is the primary constructor for creating a new Queuer instance. It allows for explicit database configuration and encryption key specification, and initializes all necessary components, including database handlers, internal event listeners, and the worker.

func NewQueuer(name string, maxConcurrency int, options ...*model.OnError) *Queuer

func NewQueuerWithDB(name string, maxConcurrency int, encryptionKey string, dbConfig *helper.DatabaseConfiguration, options ...*model.OnError) *Queuer
  • name: A string identifier for this queuer instance.
  • maxConcurrency: An int specifying the maximum number of jobs this queuer can process concurrently.
  • encryptionKey: A string used for encrypting sensitive job data in the database. If empty, results will be stored unencrypted.
  • dbConfig: An optional *helper.DatabaseConfiguration. If nil, the configuration will be loaded from environment variables.
  • options: Optional OnError configurations to apply to the worker.

This function performs the following setup:

  • Initializes a logger.
  • Sets up the database connection using the provided dbConfig or environment variables.
  • Creates JobDBHandler, WorkerDBHandler, and MasterDBHandler instances for database interactions.
  • Initializes internal core.Listener instances for jobInsert, jobUpdate, and jobDelete events.
  • Creates and inserts a new model.Worker into the database based on the provided name, maxConcurrency, and options.
  • If any critical error occurs during this initialization (e.g., database connection failure, worker creation error), the function will log a panic error and exit the program. It returns a pointer to the newly configured Queuer instance.

Start

The Start method initiates the operational lifecycle of the Queuer. It sets up the main context, initializes database listeners, and begins the job processing and polling loops in a dedicated goroutine.

func (q *Queuer) Start(ctx context.Context, cancel context.CancelFunc, masterSettings ...*model.MasterSettings)
  • ctx: The parent context.Context for the queuer's operations. This context will control the overall lifetime of the queuer.
  • cancel: The context.CancelFunc associated with the provided ctx. This function should be called to gracefully stop the queuer.
  • masterSettings: The central settings set will be set if the current worker becomes the master.

Upon calling Start:

  • It performs a basic check to ensure internal listeners are initialized.
  • Db listeners and broadcasters are created to listen to job events (inserts, updates, deletes).
  • It starts a poller to periodically poll the database for new jobs to process (5 minute interval).
  • It signals its readiness via an internal channel, ensuring the Start method returns only when the core loops are active.
  • If MasterSettings are given it sets the current worker as master if none is active. If the current worker is the master it starts a ticker that updates the master entry, else it starts a ticker that checks for a missing master. If no MasterSettings are given, no ticker gets started.

The method includes a timeout mechanism (5 seconds) to detect if the queuer fails to start its internal processes promptly, panicking if the timeout is exceeded. If the queuer is not not properly initialized (created by calling NewQueuer), or if there's an error creating the database listeners, the function will panic.


StartWithoutWorker

The StartWithoutWorker method provides a way to start the Queuer instance without an active worker. This is particularly useful for scenarios where you need to interact with the job queue (e.g., add jobs, check job status) but don't intend for this specific instance to actively process them. This is also nice to only have one service that can become the master so updating the MasterSettings only requires this serivce to be restarted. This has also the (very small) benefit that all other services don't run a ticker for updating or becoming the master.

func (q *Queuer) StartWithoutWorker(ctx context.Context, cancel context.CancelFunc, withoutListeners bool, masterSettings ...*model.MasterSettings)
  • ctx: The parent context.Context for the queuer's operations.
  • cancel: The context.CancelFunc associated with the provided ctx.
  • withoutListeners: A bool flag. If true, the database.NewQueuerDBListener instances for job and job_archive tables will not be created.
  • masterSettings: The central settings set will be set if the current worker becomes the master.

Stop

The Stop method gracefully shuts down the Queuer instance, releasing resources and ensuring ongoing operations are properly concluded.

func (q *Queuer) Stop() error

The Stop method cancels all jobs, closes db listeners and returns an error if any step of the stopping process encounters an issue. Note: This method can only be used to stop the current worker instance that the code is running in. To stop other workers, use StopWorker or StopWorkerGracefully.


StopWorker

The StopWorker method immediately stops a worker by setting its status to STOPPED. This will cancel all running jobs on that worker.

func (q *Queuer) StopWorker(workerRID uuid.UUID) error
  • workerRID: The uuid.UUID identifying the worker to stop.

When a worker is stopped:

  • The worker status is immediately set to STOPPED in the database
  • The heartbeat ticker detects the STOPPED status and calls Stop() on that worker
  • All running jobs on that worker are cancelled immediately
  • The worker will no longer accept new jobs

This method is useful for immediately shutting down a worker, for example in emergency situations or when you need to take a worker offline quickly.


StopWorkerGracefully

The StopWorkerGracefully method gracefully stops a worker by setting its status to STOPPING. This allows currently running jobs to complete before the worker shuts down.

func (q *Queuer) StopWorkerGracefully(workerRID uuid.UUID) error
  • workerRID: The uuid.UUID identifying the worker to stop gracefully.

When a worker is stopped gracefully:

  • The worker status is set to STOPPING in the database
  • The heartbeat ticker detects the STOPPING status and sets maxConcurrency to 0
  • Currently running jobs are allowed to complete normally
  • No new jobs will be accepted by this worker
  • Once all running jobs have finished, the worker status is automatically set to STOPPED and the worker shuts down

This method is ideal for maintenance scenarios where you want to ensure all in-progress work completes before shutting down the worker.


Add Task

The AddTask method registers a new job task with the queuer. A task is the actual function that will be executed when a job associated with it is processed.

func (q *Queuer) AddTask(task interface{}) *model.Task

func (q *Queuer) AddTaskWithName(task interface{}, name string) *model.Task
  • task: An interface{} representing the function that will serve as the job's executable logic. The queuer will automatically derive a name for this task based on its function signature (e.g., main.MyTaskFunction). The derived name must be unique if no name is given.
  • name: A string specifying the custom name for this task. This name must be unique within the queuer's tasks.

This method handles the registration of a task, making the worker able to pick up and execute a job of this task type. It also updates the worker's available tasks in the database. The task should be added before starting the queuer. If there's an issue during task creation or database update, the program will panic.


Add NextIntervalFunc

The AddNextIntervalFunc method registers a custom function that determines the next execution time for scheduled jobs. This is useful for implementing complex scheduling logic beyond simple fixed intervals.

func (q *Queuer) AddNextIntervalFunc(nif model.NextIntervalFunc) *model.Worker

func (q *Queuer) AddNextIntervalFuncWithName(nif model.NextIntervalFunc, name string) *model.Worker
  • nif: An instance of model.NextIntervalFunc, which is a function type defining custom logic for calculating the next interval. The queuer will automatically derive a name for this function. The derived name must be unique if no name is given.
  • name: A string specifying the custom name for this NextIntervalFunc. This name must be unique within the queuer's NextIntervalFuncs.

This method adds the provided NextIntervalFunc to the queuer's available functions, making it usable for jobs with custom scheduling requirements. It updates the worker's configuration in the database. If nif is nil, if the function name cannot be derived, or if a function with the same name already exists, the program will panic.


Worker Options

The OnError struct defines how a worker should handle errors when processing a job. This allows for configurable retry behavior.

type OnError struct {
    Timeout      float64 `json:"timeout"`
    MaxRetries   int     `json:"max_retries"`
    RetryDelay   float64 `json:"retry_delay"`
    RetryBackoff string  `json:"retry_backoff"`
}
  • Timeout: The maximum time (in seconds) allowed for a single attempt of a job. If the job exceeds this duration, it's considered to have timed out.
  • MaxRetries: The maximum number of times a job will be retried after a failure.
  • RetryDelay: The initial delay (in seconds) before the first retry attempt. This delay can be modified by the RetryBackoff strategy.
  • RetryBackoff: Specifies the strategy used to increase the delay between subsequent retries.
Retry Backoff Strategies

The RetryBackoff constant defines the available strategies for increasing retry delays:

const (
    RETRY_BACKOFF_NONE        = "none"
    RETRY_BACKOFF_LINEAR      = "linear"
    RETRY_BACKOFF_EXPONENTIAL = "exponential"
)
  • RETRY_BACKOFF_NONE: No backoff. The RetryDelay remains constant for all retries.
  • RETRY_BACKOFF_LINEAR: The retry delay increases linearly with each attempt (e.g., delay, 2delay, 3delay).
  • RETRY_BACKOFF_EXPONENTIAL: The retry delay increases exponentially with each attempt (e.g., delay, delay2, delay2*2).

Job options

Job Options The Options struct allows you to define specific behaviors for individual jobs, overriding default worker settings where applicable.

type Options struct {
    OnError  *OnError
    Schedule *Schedule
}
  • OnError: An optional OnError configuration that will override the worker's default error handling for this specific job. This allows you to define unique retry logic per job.
  • Schedule: An optional Schedule configuration for jobs that need to be executed at recurring intervals.
OnError for jobs

OnError for Jobs The OnError struct for jobs is identical to the one used for worker options, allowing granular control over error handling for individual jobs.

type OnError struct {
    Timeout      float64 `json:"timeout"`
    MaxRetries   int     `json:"max_retries"`
    RetryDelay   float64 `json:"retry_delay"`
    RetryBackoff string  `json:"retry_backoff"`
}
Schedule

The Schedule struct is used to define recurring jobs.

type Schedule struct {
    Start        time.Time       `json:"start"`
    MaxCount     int             `json:"max_count"`
    Interval     time.Duration   `json:"interval"`
    NextInterval string          `json:"next_interval"`
}
  • Start: The initial time at which the scheduled job should first run.
  • MaxCount: The maximum number of times the job should be executed. A value 0 indicates an indefinite number of repetitions (run forever).
  • Interval: The duration between consecutive executions of the scheduled job.
  • NextInterval: Function name of the NextIntervalFunc returning the time of the next execution of the scheduled job. Either Interval or NextInterval have to be set if the MaxCount is 0 or greater 1.

πŸ–₯️ CLI Tool

The queuer package includes a small command-line interface (CLI) tool for monitoring your job queues. The CLI provides easy access to view jobs, workers, connections, and archived data.

Installation

The CLI tool can be built from the cli directory:

cd cli
go build -o queuer .

Available Commands

The CLI tool supports the following main commands:

Core Commands
  • list - List queuer resources with pagination support

    • list job - List active jobs (queued, scheduled, running)
    • list worker - List registered workers and their status
    • list connection - List active database connections
    • list jobArchive - List completed/archived jobs
  • get - Get detailed information about a specific resource by RID

    • get job --rid <RID> - Get details of a specific job
    • get worker --rid <RID> - Get worker information and status
    • get jobArchive --rid <RID> - Get archived job details
  • cancel - Cancel operations on specific resources by RID

    • cancel job --rid <RID> - Cancel a running or queued job
    • cancel worker --rid <RID> - Cancel/shutdown a worker
Utility Commands
  • version - Display version information of the Queuer CLI
  • completion - Generate autocompletion scripts for various shells
  • help - Display help information for any command
Global Flags
  • -v, --verbose - Enable verbose output for detailed information
  • -h, --help - Show help for any command
Pagination Support

List commands support pagination through:

  • --lastId <int> - Last ID from previous call for pagination
  • --limit <int> - Maximum number of entries to return (default: 10)
Examples
# List workers with pagination
queuer list worker --limit 5

# Get specific job details
queuer get job --rid "550e8400-e29b-41d4-a716-446655440000"

# Cancel a running job
queuer cancel job --rid "550e8400-e29b-41d4-a716-446655440000"

# List jobs with pagination
queuer list job --lastId 100 --limit 20

Usage

For detailed information about any command, use the built-in help:

queuer --help                    # General help
queuer list --help              # Help for list commands
queuer get job --help           # Help for specific subcommands

The CLI tool uses the same database configuration environment variables as the main queuer package.


⭐ Features

  • Insert job batches using the COPY FROM postgres feature.
  • Insert a job in a transaction to rollback if eg. the step after job insertion fails.
  • Panic recovery for all running jobs.
  • Error handling by checking last output parameter for error.
  • Multiple queuers can be started in different microservices while maintaining job start order and isolation.
  • Scheduled and periodic jobs.
  • Easy functions to get jobs and workers.
  • Listener functions for job updates and deletion (ended jobs).
  • Helper function to listen for a specific finished job.
  • Retry mechanism for ended jobs which creates a new job with the same parameters.
  • Custom NextInterval functions to address custom needs for scheduling (eg. scheduling with timezone offset)
  • Automatic master worker setting retention and other central settings. Automatic switch to new master if old worker stops.
  • Heartbeat for all workers and automatic stale worker detection and cancelation by the master.
  • Encryption support for sensitive job data stored in the database.
  • Command-line interface (CLI) tool for monitoring job queues.

Documentation ΒΆ

Index ΒΆ

Constants ΒΆ

This section is empty.

Variables ΒΆ

This section is empty.

Functions ΒΆ

This section is empty.

Types ΒΆ

type Queuer ΒΆ

type Queuer struct {
	DB                 *sql.DB
	JobPollInterval    time.Duration
	WorkerPollInterval time.Duration
	RetentionArchive   time.Duration
	// contains filtered or unexported fields
}

Queuer represents the main queuing system. It manages job scheduling, execution, and error handling. It provides methods to start, stop, and manage jobs and workers. It also handles database connections and listeners for job events.

func NewQueuer ΒΆ

func NewQueuer(name string, maxConcurrency int, options ...*model.OnError) *Queuer

NewQueuer creates a new Queuer instance with the given name and max concurrency. It wraps NewQueuerWithDB to initialize the queuer without an external db config and encryption key. The encryption key for the database is taken from an environment variable (QUEUER_ENCRYPTION_KEY), if not provided, it defaults to unencrypted results.

func NewQueuerWithDB ΒΆ

func NewQueuerWithDB(name string, maxConcurrency int, encryptionKey string, dbConfig *helper.DatabaseConfiguration, options ...*model.OnError) *Queuer

NewQueuerWithDB creates a new Queuer instance with the given name and max concurrency. It initializes the database connection and worker. If options are provided, it creates a worker with those options.

It takes the db configuration from environment variables if dbConfig is nil. - QUEUER_DB_HOST (required) - QUEUER_DB_PORT (required) - QUEUER_DB_DATABASE (required) - QUEUER_DB_USERNAME (required) - QUEUER_DB_PASSWORD (required) - QUEUER_DB_SCHEMA (required) - QUEUER_DB_SSLMODE (optional, defaults to "require")

If the encryption key is empty, it defaults to unencrypted results.

If any error occurs during initialization, it logs a panic error and exits the program. It returns a pointer to the newly created Queuer instance.

func NewStaticQueuer ΒΆ added in v1.9.0

func NewStaticQueuer(logLevel slog.Leveler, dbConfig *helper.DatabaseConfiguration) *Queuer

NewStaticQueuer creates a new Queuer instance without a worker. It initializes the database connection and other necessary components. If any error occurs during initialization, it logs a panic error and exits the program. It returns a pointer to the newly created Queuer instance. This queuer instance does not listen to the db nor does it run jobs. It is primarily used for static operations like adding jobs, getting job status etc.

func (*Queuer) AddJob ΒΆ

func (q *Queuer) AddJob(task interface{}, parametersKeyed map[string]interface{}, parameters ...interface{}) (*model.Job, error)

AddJob adds a job to the queue with the given task and parameters. As a task you can either pass a function or a string with the task name (necessary if you want to use a task with a name set by you). It returns the created job or an error if something goes wrong.

func (*Queuer) AddJobTx ΒΆ

func (q *Queuer) AddJobTx(tx *sql.Tx, task interface{}, parametersKeyed map[string]interface{}, parameters ...interface{}) (*model.Job, error)

AddJobTx adds a job to the queue with the given task and parameters within a transaction. As a task you can either pass a function or a string with the task name (necessary if you want to use a task with a name set by you). It returns the created job or an error if something goes wrong.

func (*Queuer) AddJobWithOptions ΒΆ

func (q *Queuer) AddJobWithOptions(options *model.Options, task interface{}, parametersKeyed map[string]interface{}, parameters ...interface{}) (*model.Job, error)

AddJobWithOptions adds a job with the given task, options, and parameters. As a task you can either pass a function or a string with the task name (necessary if you want to use a task with a name set by you).

It returns the created job or an error if something goes wrong. The options parameter allows you to specify additional options for the job, such as scheduling, retry policies, and error handling. If options are nil, the worker's default options will be used.

Example usage:

func AddJobExample(queuer *Queuer, param1 string, param2 int) {
	options := &model.Options{
		OnError: &model.OnError{
			Timeout:      5,
			MaxRetries:   2, Runs 3 times, first is not a retry
			RetryDelay:   1,
			RetryBackoff: model.RETRY_BACKOFF_NONE,
		},
		Schedule: &model.Schedule{
			Start:         time.Now().Add(10 * time.Second),
			Interval:      5 * time.Second,
			MaxCount:      3,
		},
	}
	job, err := queuer.AddJobWithOptions(options, myTaskFunction, param1, param2)
	if err != nil {
		log.Fatalf("Failed to add job: %v", err)
	}
}

func (*Queuer) AddJobWithOptionsTx ΒΆ

func (q *Queuer) AddJobWithOptionsTx(tx *sql.Tx, options *model.Options, task interface{}, parametersKeyed map[string]interface{}, parameters ...interface{}) (*model.Job, error)

AddJobWithOptionsTx adds a job with the given task, options, and parameters within a transaction. As a task you can either pass a function or a string with the task name (necessary if you want to use a task with a name set by you). It returns the created job or an error if something goes wrong.

func (*Queuer) AddJobs ΒΆ

func (q *Queuer) AddJobs(batchJobs []model.BatchJob) error

AddJobs adds a batch of jobs to the queue. It takes a slice of BatchJob, which contains the task, options, and parameters for each job. It returns an error if something goes wrong during the process.

func (*Queuer) AddNextIntervalFunc ΒΆ

func (q *Queuer) AddNextIntervalFunc(nif model.NextIntervalFunc) *model.Worker

AddNextIntervalFunc adds a NextIntervalFunc to the worker's available next interval functions. It takes a NextIntervalFunc and adds it to the worker's AvailableNextIntervalFuncs. The function name is derived from the NextIntervalFunc interface using helper.GetTaskNameFromInterface. The function is meant to be used before starting the Queuer to ensure that the worker has access to the function. It checks if the function is nil or already exists in the worker's available next interval functions.

If the function is nil or already exists, it panics. It returns the updated worker after adding the function.

func (*Queuer) AddNextIntervalFuncWithName ΒΆ

func (q *Queuer) AddNextIntervalFuncWithName(nif model.NextIntervalFunc, name string) *model.Worker

AddNextIntervalFuncWithName adds a NextIntervalFunc to the worker's available next interval functions with a specific name. It takes a NextIntervalFunc and a name, checks if the function is nil or already exists in the worker's available next interval functions, and adds it to the worker's AvailableNextIntervalFuncs.

This function is useful when you want to add a NextIntervalFunc with a specific name that you control, rather than deriving it from the function itself. It ensures that the function is not nil and that the name does not already exist in the worker's available next interval functions.

If the function is nil or already exists, it panics. It returns the updated worker after adding the function with the specified name.

func (*Queuer) AddTask ΒΆ

func (q *Queuer) AddTask(task interface{}) *model.Task

AddTask adds a new task to the queuer. It creates a new task with the provided task interface, adds it to the worker's available tasks, and updates the worker in the database. The task name is automatically generated based on the task's function name (eg. main.TestTask).

If the task creation fails, it logs a panic error and exits the program. It returns the newly created task.

func (*Queuer) AddTaskWithName ΒΆ

func (q *Queuer) AddTaskWithName(task interface{}, name string) *model.Task

AddTaskWithName adds a new task with a specific name to the queuer. It creates a new task with the provided task interface and name, adds it to the worker's available tasks, and updates the worker in the database.

If task creation fails, it logs a panic error and exits the program. It returns the newly created task.

func (*Queuer) CancelAllJobsByWorker ΒΆ

func (q *Queuer) CancelAllJobsByWorker(workerRid uuid.UUID, entries int) error

CancelAllJobsByWorker cancels all jobs assigned to a specific worker by its RID. It retrieves all jobs assigned to the worker and cancels each one. It returns an error if something goes wrong during the process.

func (*Queuer) CancelJob ΒΆ

func (q *Queuer) CancelJob(jobRid uuid.UUID) (*model.Job, error)

CancelJob cancels a job with the given job RID. It retrieves the job from the database and cancels it. If the job is not found or already cancelled, it returns an error.

func (*Queuer) DeleteJob ΒΆ added in v1.24.0

func (q *Queuer) DeleteJob(jobRid uuid.UUID) error

DeleteJob deletes a job by its RID.

func (*Queuer) GetConnections ΒΆ added in v1.9.0

func (q *Queuer) GetConnections() ([]*model.Connection, error)

GetAllConnections retrieves all connections from the database.

func (*Queuer) GetJob ΒΆ

func (q *Queuer) GetJob(jobRid uuid.UUID) (*model.Job, error)

GetJob retrieves a job by its RID.

func (*Queuer) GetJobEnded ΒΆ added in v1.9.0

func (q *Queuer) GetJobEnded(jobRid uuid.UUID) (*model.Job, error)

GetJobEnded retrieves a job that has ended (succeeded, cancelled or failed) by its RID.

func (*Queuer) GetJobs ΒΆ

func (q *Queuer) GetJobs(lastId int, entries int) ([]*model.Job, error)

GetJobs retrieves all jobs in the queue.

func (*Queuer) GetJobsBySearch ΒΆ added in v1.37.0

func (q *Queuer) GetJobsBySearch(search string, lastId int, entries int) ([]*model.Job, error)

GetJobsBySearch retrieves jobs that match the given search term.

func (*Queuer) GetJobsByWorkerRID ΒΆ

func (q *Queuer) GetJobsByWorkerRID(workerRid uuid.UUID, lastId int, entries int) ([]*model.Job, error)

GetJobsByWorkerRID retrieves jobs assigned to a specific worker by its RID.

func (*Queuer) GetJobsEnded ΒΆ

func (q *Queuer) GetJobsEnded(lastId int, entries int) ([]*model.Job, error)

GetJobsEnded retrieves all jobs that have ended (succeeded, cancelled or failed).

func (*Queuer) GetJobsEndedBySearch ΒΆ added in v1.37.0

func (q *Queuer) GetJobsEndedBySearch(search string, lastId int, entries int) ([]*model.Job, error)

GetJobsEndedBySearch retrieves ended jobs that match the given search term.

func (*Queuer) GetWorker ΒΆ

func (q *Queuer) GetWorker(workerRid uuid.UUID) (*model.Worker, error)

GetWorker retrieves a worker by its RID (Resource Identifier).

func (*Queuer) GetWorkers ΒΆ

func (q *Queuer) GetWorkers(lastId int, entries int) ([]*model.Worker, error)

GetWorkers retrieves a list of workers starting from the lastId and returning the specified number of entries.

func (*Queuer) GetWorkersBySearch ΒΆ added in v1.37.0

func (q *Queuer) GetWorkersBySearch(search string, lastId int, entries int) ([]*model.Worker, error)

GetWorkersBySearch retrieves workers that match the given search term.

func (*Queuer) ListenForJobDelete ΒΆ

func (q *Queuer) ListenForJobDelete(notifyFunction func(data *model.Job)) error

ListenForJobInsert listens for job insert events and notifies the provided function when a job is added.

func (*Queuer) ListenForJobUpdate ΒΆ

func (q *Queuer) ListenForJobUpdate(notifyFunction func(data *model.Job)) error

ListenForJobInsert listens for job insert events and notifies the provided function when a job is added.

func (*Queuer) ReaddJobFromArchive ΒΆ

func (q *Queuer) ReaddJobFromArchive(jobRid uuid.UUID) (*model.Job, error)

ReaddJobFromArchive readds a job from the archive back to the queue.

func (*Queuer) Start ΒΆ

func (q *Queuer) Start(ctx context.Context, cancel context.CancelFunc, masterSettings ...*model.MasterSettings)

Start starts the queuer by initializing the job listeners and starting the job poll ticker. If masterSettings are provided, it also starts the master poll ticker. If masterSettings contain 0 values, they are set to default values. It checks if the queuer is initialized properly, and if not, it logs a panic error and exits the program. It runs the job processing in a separate goroutine and listens for job events.

Detailed steps include: 1. Create job and job archive database listeners. 2. Create broadcasters for job insert, update, and delete events. 3. Start the job listeners to listen for job events. 4. Start the job poll ticker to periodically check for new jobs. 5. Wait for the queuer to be ready or log a panic error if it fails to start within 5 seconds.

func (*Queuer) StartWithoutWorker ΒΆ

func (q *Queuer) StartWithoutWorker(ctx context.Context, cancel context.CancelFunc, withoutListeners bool, masterSettings ...*model.MasterSettings)

Start starts the queuer by initializing the job listeners and starting the job poll ticker. It checks if the queuer is initialized properly, and if not, it logs a panic error and exits the program. It runs the job processing in a separate goroutine and listens for job events.

This version does not run the job processing, allowing the queuer to be started without a worker. Is is useful if you want to run a queuer instance in a seperate service without a worker, for example to handle listening to job events and providing a central frontend.

func (*Queuer) Stop ΒΆ

func (q *Queuer) Stop() error

Stop stops the queuer by closing the job listeners, cancelling all queued and running jobs, and cancelling the context to stop the queuer.

func (*Queuer) StopWorker ΒΆ added in v1.46.0

func (q *Queuer) StopWorker(workerRid uuid.UUID) error

StopWorkerGracefully sets the status of the specified worker to 'STOPPING' to cancel running jobs when stopping.

func (*Queuer) StopWorkerGracefully ΒΆ added in v1.46.0

func (q *Queuer) StopWorkerGracefully(workerRid uuid.UUID) error

StopWorkerGracefully sets the worker's status to STOPPING to allow it to finish current tasks before stopping.

func (*Queuer) WaitForJobAdded ΒΆ

func (q *Queuer) WaitForJobAdded() *model.Job

WaitForJobAdded waits for any job to start and returns the job. It listens for job insert events and returns the job when it is added to the queue.

func (*Queuer) WaitForJobFinished ΒΆ

func (q *Queuer) WaitForJobFinished(jobRid uuid.UUID, timeout time.Duration) *model.Job

WaitForJobFinished waits for a job to finish and returns the job. It listens for job delete events and returns the job when it is finished. If timeout is reached before the job finishes, it returns nil.

Directories ΒΆ

Path Synopsis
cli
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL