Documentation
¶
Overview ¶
Package pool provides the core scheduling and orchestration engine for job execution.
It is responsible for:
- Concurrent execution of scheduled jobs with respect to MaxWorkers.
- Lifecycle management of jobs (waiting, running, completed, error).
- Periodic polling of job states at configurable intervals.
- Graceful shutdown and cleanup of job state and resources.
Each pool operates as an independent execution unit, isolated via context for cancellation and monitoring support.
Index ¶
- type Job
- type Pool
- func (p *Pool) AddJob(job Job) error
- func (p *Pool) GetMetrics() map[string]interface{}
- func (p *Pool) Kill()
- func (p *Pool) PauseJob(id string, timeout time.Duration) error
- func (p *Pool) RemoveJob(id string) error
- func (p *Pool) ResumeJob(id string) error
- func (p *Pool) Run() (err error)
- func (p *Pool) StopJob(id string) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Job ¶
type Job interface {
// GetMetadata returns the job's configuration metadata.
GetMetadata() domain.JobDTO
// SetTimeout sets the max allowed job execution time; 0 disables timeout tracking.
SetTimeout(timeout time.Duration)
// GetStatus returns the current execution status of the job.
GetStatus() domain.JobStatus
// UpdateState partially updates the job's state with non-zero fields from the provided DTO.
UpdateState(state domain.StateDTO)
// GetNextRun calculates and returns the next scheduled execution time for the job.
GetNextRun() time.Time
// ProcessStart marks the job's state as "Running", initializing execution metrics and metadata.
ProcessStart()
// ProcessRun monitors the job during execution, checking for timeout conditions.
// Returns ErrJobTimeout if the job exceeds its configured timeout.
ProcessRun() error
// ProcessEnd finalizes the job state after execution completes, recording metrics and handling errors.
ProcessEnd(status domain.JobStatus, err error)
// ProcessError performs retry logic and updates the job state in case of failure.
ProcessError() error
// CanExecute checks if the job meets conditions required to start execution immediately.
// Returns an error indicating why execution is not allowed, or nil if eligible.
CanExecute() error
// Execute performs the job's main execution logic, handling internal lifecycle hooks and error handling.
Execute() error
// Stop forcibly stops job execution and updates its state accordingly.
Stop() error
// Pause attempts to pause job execution with a specified timeout.
Pause(timeout time.Duration) error
// Resume resumes execution of a paused job, if applicable.
Resume(ctx context.Context) error
// LockJob acquires exclusive execution access to the job if it is available.
LockJob() bool
// UnlockJob releases exclusive execution access to the job.
UnlockJob()
}
Job represents an executable task managed by the scheduler. It encapsulates lifecycle management methods, status updates, execution logic, and retry mechanisms.
Each Job implementation must provide thread-safe access to internal state and logic, ensuring safe concurrent interactions within the Pool environment.
type Pool ¶
type Pool struct {
domain.Pool // Configuration settings (max workers, check intervals, idle timeouts).
Mon domain.Monitoring // Monitoring implementation for capturing execution metrics.
Ctx context.Context // Execution context for the scheduler pool.
// contains filtered or unexported fields
}
Pool manages scheduling, execution, lifecycle control, and concurrency of multiple jobs.
Pool utilizes a background worker loop, controlled by context cancellation, to continuously check and execute jobs based on their defined schedules and states.
func New ¶
New initializes and configures a new Pool instance with provided settings.
It sets default values for configuration parameters if they are not explicitly defined.
Parameters:
- ctx: Parent context for cancellation and graceful shutdown control.
- cfg: Pool configuration specifying worker limits, intervals, and timeouts.
- mon: Monitoring implementation for capturing job execution metrics.
Returns:
- A fully initialized Pool instance ready for execution.
func (*Pool) AddJob ¶
AddJob adds a new job to the scheduler pool for future execution.
The method performs the following validations:
- Ensures the Job ID is unique within the pool.
- Checks that the Job status is Waiting, indicating readiness for execution.
Parameters:
- job: The Job instance to add to the scheduler.
Returns:
- An error (ErrIDExists, ErrJobWrongStatus) if validation fails.
- nil if the job is successfully added to the pool.
func (*Pool) GetMetrics ¶
GetMetrics retrieves all job metrics currently tracked by the Pool.
This method returns the result of Mon.GetMetrics(), where Mon is the monitoring system provided during pool creation (either custom or the default implementation).
The returned map contains job IDs as keys and job state or metric objects as values. Use this method to collect runtime metrics for all jobs in the pool.
func (*Pool) Kill ¶
func (p *Pool) Kill()
Kill immediately shuts down the entire scheduler pool.
This method:
- Cancels the execution context of the pool, stopping all running, waiting, and scheduled jobs.
- Updates all jobs' states to Stopped with an ErrPoolShutdown error.
- Deletes all jobs from internal storage.
- Marks the pool as permanently shut down; it cannot be restarted afterward.
Use this method with caution, as the pool becomes unusable after calling Kill.
func (*Pool) PauseJob ¶
PauseJob temporarily pauses execution of the specified job.
If no timeout is explicitly provided (timeout = 0), the default pause timeout (DEFAULT_PAUSE_TIMEOUT) is applied.
Parameters:
- id: Unique identifier of the job to pause.
- timeout: Duration to wait for job acknowledgement of pause request.
Returns:
- An error (ErrJobNotFound, ErrJobPaused, ErrJobNotRunning) if the job does not exist, is already paused, or cannot be paused.
- nil if the pause request is successful.
func (*Pool) RemoveJob ¶
RemoveJob deletes an existing job from the scheduler pool.
The method:
- Removes the job entry from the pool's internal storage.
- It does not actively stop a running job; jobs are stopped separately via StopJob.
Parameters:
- id: Unique identifier of the job to remove.
Returns:
- An error (ErrJobNotFound) if the specified job does not exist.
- nil if the job is successfully removed.
func (*Pool) ResumeJob ¶
ResumeJob resumes a paused or stopped job, allowing it to continue execution.
Parameters:
- id: Unique identifier of the job to resume.
Returns:
- An error (ErrJobNotFound, ErrJobNotPausedOrStopped) if the job does not exist, or is in an invalid state for resumption.
- nil if the resume operation is successful.
func (*Pool) Run ¶
Run initiates the scheduler's main execution loop, periodically checking and managing jobs based on their states.
Execution flow:
- Runs continuously until the Pool's context is canceled.
- Checks job states at regular intervals defined by CheckInterval.
- Manages job lifecycle transitions (waiting, running, completed, error).
- Controls concurrency using a semaphore to enforce MaxWorkers limits.
- Upon shutdown:
- Waits for all active job executions to complete.
- Updates all remaining jobs to Stopped state with an ErrPoolShutdown error.
- Removes all jobs from internal storage.
- Marks the Pool as permanently killed (cannot be restarted).
Returns:
- An error (ErrPoolShutdown) if the pool was previously terminated.
func (*Pool) StopJob ¶
StopJob terminates execution of the specified job.
The method triggers immediate job cancellation via its execution context.
Parameters:
- id: Unique identifier of the job to stop.
Returns:
- An error (ErrJobNotFound) if the specified job does not exist.
- nil if the job is successfully stopped.