pool

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2025 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package pool implements the core orchestration engine responsible for executing and managing scheduled jobs.

It is designed as an isolated execution unit that handles:

  • Concurrent job execution with worker limit control.
  • Job state transitions (Waiting, Running, Completed, Error).
  • Periodic polling and scheduling based on intervals or cron rules.
  • Graceful cancellation and shutdown of the execution engine.
  • Integration with pluggable monitoring and external job factories.

The pool is initialized with a context, lifecycle configuration, monitoring implementation, and a JobFactory — a function responsible for creating job instances from raw configuration (JobDTO).

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type JobFactory added in v0.0.2

type JobFactory func(jobDTO domain.JobDTO, ctx context.Context, mon domain.Monitoring) (domain.Job, error)

JobFactory defines a function signature responsible for converting a JobDTO into a fully initialized and executable Job instance.

This abstraction allows the Pool to remain decoupled from any specific job implementation, following Dependency Inversion and facilitating testing or customization.

type Pool

type Pool struct {
	domain.Pool // Embeds basic configuration: MaxWorkers, CheckInterval, IdleTimeout.
	// contains filtered or unexported fields
}

Pool manages the lifecycle and scheduling of jobs, including their concurrent execution, state transitions, and shutdown behavior.

Jobs are stored in a concurrent map and executed periodically based on their scheduling configuration. The pool uses a worker semaphore to enforce concurrency limits and relies on the JobFactory to create jobs.

func New

func New(ctx context.Context, cfg domain.Pool, mon domain.Monitoring, jf JobFactory) (*Pool, error)

New constructs and initializes a new Pool instance based on the provided configuration and job factory.

It sets default values for MaxWorkers, CheckInterval, and IdleTimeout if they are unset (zero values).

Parameters:

  • ctx: Parent context used for cancellation and lifetime control.
  • cfg: PoolConfig with concurrency and timing settings.
  • mon: Monitoring interface used for collecting job metrics.
  • jf: JobFactory used to create executable Job instances from configuration.

Returns:

  • A fully initialized Pool instance ready to run jobs.
  • An error if the JobFactory is nil.

func (*Pool) AddJob

func (p *Pool) AddJob(cfg domain.JobDTO) error

AddJob adds a new job to the scheduler pool for future execution.

The method performs the following validations:

  • Ensures the Job ID is unique within the pool.
  • Checks that the Job status is Waiting, indicating readiness for execution.

Parameters:

  • job: The Job instance to add to the scheduler.

Returns:

  • An error (ErrIDExists, ErrJobWrongStatus) if validation fails.
  • nil if the job is successfully added to the pool.

func (*Pool) Execute added in v0.0.2

func (p *Pool) Execute(job domain.Job, sem chan struct{}, wg *sync.WaitGroup)

Execute schedules and runs a job in a separate goroutine, assuming that a worker slot in the provided semaphore has already been acquired.

This method ensures that job execution is non-blocking and fully managed, including error propagation, lifecycle tracking, and proper synchronization with the worker pool via the semaphore and WaitGroup.

Execution flow:

  1. Validates job readiness using CanExecute(): - If the job is not yet due (ErrJobExecTooEarly), execution is skipped silently. - If the job is otherwise invalid, it is skipped and may be marked accordingly.
  2. Registers the job in the WaitGroup to ensure controlled shutdown.
  3. Launches a goroutine that: - Calls job.Execute() to run the job logic. - Sets final status to Completed or Error based on the result. - Calls ProcessEnd() to finalize the job state (e.g., update metrics, reset retry). - Releases the semaphore slot (freeing concurrency for another job). - Signals job completion to the WaitGroup.

Parameters:

  • job: The job instance to execute.
  • sem: A buffered channel used as a semaphore to limit concurrent execution.
  • wg: A WaitGroup used to track when all jobs have completed.

func (*Pool) GetJobByID added in v0.0.2

func (p *Pool) GetJobByID(id string) (domain.Job, error)

GetJobByID looks up a job in the pool by its unique ID.

Returns the job instance if found, or ErrJobNotFound otherwise.

func (*Pool) GetMetrics

func (p *Pool) GetMetrics() map[string]interface{}

GetMetrics returns a snapshot of all job metrics collected by the monitoring implementation.

This includes runtime data such as execution time, success/failure counts, current state, etc.

func (*Pool) Kill

func (p *Pool) Kill()

Kill immediately shuts down the entire scheduler pool.

This method:

  • Cancels the execution context of the pool, stopping all running, waiting, and scheduled jobs.
  • Updates all jobs' states to Stopped with an ErrPoolShutdown error.
  • Deletes all jobs from internal storage.
  • Marks the pool as permanently shut down; it cannot be restarted afterward.

Use this method with caution, as the pool becomes unusable after calling Kill.

func (*Pool) PauseJob

func (p *Pool) PauseJob(id string, timeout time.Duration) error

PauseJob temporarily pauses execution of the specified job.

If no timeout is explicitly provided (timeout = 0), the default pause timeout (DEFAULT_PAUSE_TIMEOUT) is applied.

Parameters:

  • id: Unique identifier of the job to pause.
  • timeout: Duration to wait for job acknowledgement of pause request.

Returns:

  • An error (ErrJobNotFound, ErrJobPaused, ErrJobNotRunning) if the job does not exist, is already paused, or cannot be paused.
  • nil if the pause request is successful.

func (*Pool) ProcessCompleted added in v0.0.2

func (p *Pool) ProcessCompleted(job domain.Job)

ProcessCompleted handles the state of a job marked as "Completed".

It checks if the job has future scheduled executions. If another execution is pending, the job state is reset to "Waiting". Otherwise, the job is marked as "Ended", indicating no further executions are planned.

Parameters:

  • job: The Job instance that has completed its execution.

func (*Pool) ProcessError added in v0.0.2

func (p *Pool) ProcessError(job domain.Job) error

ProcessError manages jobs that have encountered an error during execution.

Behavior:

  • Attempts to retry the job execution if retries are still allowed.
  • If retries are exhausted, the job is removed from the pool.
  • If retry is allowed, the job is reset to "Waiting" for another execution attempt.

Parameters:

  • job: The Job instance currently in an Error state.

Returns:

  • An error if the job could not be removed from the pool.
  • nil otherwise.

func (*Pool) ProcessRunning added in v0.0.2

func (p *Pool) ProcessRunning(job domain.Job)

ProcessRunning monitors a job currently in the "Running" state, checking for execution timeouts or runtime errors.

If the job exceeds its configured timeout, it is marked as Error, triggering its finalization and metric recording.

Parameters:

  • job: The Job instance currently executing.

func (*Pool) ProcessWaiting added in v0.0.2

func (p *Pool) ProcessWaiting(job domain.Job, sem chan struct{}, wg *sync.WaitGroup)

ProcessWaiting evaluates whether a job in the "Waiting" state is ready for execution, and dispatches it if all conditions are met.

This method is intended to be called periodically by the pool scheduler to identify and start due jobs. It ensures exclusive job processing via locking and limits concurrency using a semaphore with timeout protection.

Behavior:

  • Acquires an internal lock to ensure exclusive handling of the job.
  • Checks whether the current time is equal to or after the job's NextRun time.
  • Evaluates the job's eligibility for execution using CanExecute().
  • If execution is not yet due or is otherwise invalid, the job is skipped.
  • Attempts to acquire a semaphore slot within a timeout window (default: 5s).
  • If slot acquisition fails, marks the job as errored (ErrTooManyJobs).
  • If job's CanExecute returns execution-blocking errors (e.g., expired, bad status):
  • Marks job as Ended or Error accordingly.
  • If eligible and a slot is available, schedules the job for execution via Execute().

Parameters:

  • job: The job instance currently in the "Waiting" state.
  • sem: A buffered channel used as a semaphore to limit concurrent execution.
  • wg: A WaitGroup used to track active job executions and support graceful shutdown.

func (*Pool) RemoveJob

func (p *Pool) RemoveJob(id string) error

RemoveJob deletes an existing job from the scheduler pool.

The method:

  • Removes the job entry from the pool's internal storage.
  • It does not actively stop a running job; jobs are stopped separately via StopJob.

Parameters:

  • id: Unique identifier of the job to remove.

Returns:

  • An error (ErrJobNotFound) if the specified job does not exist.
  • nil if the job is successfully removed.

func (*Pool) ResumeJob

func (p *Pool) ResumeJob(id string) error

ResumeJob resumes a paused or stopped job, allowing it to continue execution.

Parameters:

  • id: Unique identifier of the job to resume.

Returns:

  • An error (ErrJobNotFound, ErrJobNotPausedOrStopped) if the job does not exist, or is in an invalid state for resumption.
  • nil if the resume operation is successful.

func (*Pool) Run

func (p *Pool) Run() (err error)

Run launches the main scheduler loop that monitors job states and dispatches jobs for execution.

Behavior:

  • Executes jobs based on their status and readiness (Waiting, Running, Completed, Error).
  • Enforces MaxWorkers using a semaphore.
  • Periodically polls jobs at the configured CheckInterval.
  • Gracefully handles shutdown via context cancellation:
  • Waits for in-progress jobs to complete.
  • Updates all jobs to Stopped state with a shutdown error.
  • Clears internal job storage and marks the pool as killed.

Returns:

  • ErrPoolShutdown if the pool was already stopped or after termination.

func (*Pool) StopJob

func (p *Pool) StopJob(id string) error

StopJob terminates execution of the specified job.

The method triggers immediate job cancellation via its execution context.

Parameters:

  • id: Unique identifier of the job to stop.

Returns:

  • An error (ErrJobNotFound) if the specified job does not exist.
  • nil if the job is successfully stopped.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL