pool

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2025 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package pool implements the core orchestration engine responsible for executing and managing scheduled jobs.

It is designed as an isolated execution unit that handles:

  • Concurrent job execution with worker limit control.
  • Job state transitions (Waiting, Running, Completed, Error).
  • Periodic polling and scheduling based on intervals or cron rules.
  • Graceful cancellation and shutdown of the execution engine.
  • Integration with pluggable monitoring and external job factories.

The pool is initialized with a context, lifecycle configuration, monitoring implementation, and a JobFactory — a function responsible for creating job instances from raw configuration (JobDTO).

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type JobFactory added in v0.0.2

type JobFactory func(jobDTO domain.JobDTO, ctx context.Context, mon domain.Monitoring) (domain.Job, error)

JobFactory defines a function signature responsible for converting a JobDTO into a fully initialized and executable Job instance.

This abstraction allows the Pool to remain decoupled from any specific job implementation, following Dependency Inversion and facilitating testing or customization.

type Pool

type Pool struct {
	domain.Pool // Embeds basic configuration: MaxWorkers, CheckInterval, IdleTimeout.
	// contains filtered or unexported fields
}

Pool manages the lifecycle and scheduling of jobs, including their concurrent execution, state transitions, and shutdown behavior.

Jobs are stored in a concurrent map and executed periodically based on their scheduling configuration. The pool uses a worker semaphore to enforce concurrency limits and relies on the JobFactory to create jobs.

func New

func New(ctx context.Context, cfg domain.Pool, mon domain.Monitoring, jf JobFactory) (*Pool, error)

New constructs and initializes a new Pool instance based on the provided configuration and job factory.

It sets default values for MaxWorkers, CheckInterval, and IdleTimeout if they are unset (zero values).

Parameters:

  • ctx: Parent context used for cancellation and lifetime control.
  • cfg: PoolConfig with concurrency and timing settings.
  • mon: Monitoring interface used for collecting job metrics.
  • jf: JobFactory used to create executable Job instances from configuration.

Returns:

  • A fully initialized Pool instance ready to run jobs.
  • An error if the JobFactory is nil.

func (*Pool) AddJob

func (p *Pool) AddJob(cfg domain.JobDTO) error

AddJob adds a new job to the scheduler pool for future execution.

The method performs the following validations:

  • Ensures the Job ID is unique within the pool.
  • Checks that the Job status is Waiting, indicating readiness for execution.

Parameters:

  • job: The Job instance to add to the scheduler.

Returns:

  • An error (ErrIDExists, ErrJobWrongStatus) if validation fails.
  • nil if the job is successfully added to the pool.

func (*Pool) Execute added in v0.0.2

func (p *Pool) Execute(job domain.Job, sem chan struct{}, wg *sync.WaitGroup)

Execute schedules and runs a job inside a separate goroutine, assuming a semaphore slot is already acquired.

Execution flow:

  1. Validates job readiness via CanExecute(): - If the job is not yet due (ErrJobExecTooEarly), execution is skipped. - If the job is otherwise invalid, execution is skipped or job status is updated.
  2. Registers the job execution in the WaitGroup to track active jobs.
  3. Launches a new goroutine: - Calls job.Execute(). - Updates final status based on execution result. - Calls ProcessEnd() to finalize the job state. - Releases the semaphore slot. - Marks the job as done in the WaitGroup.

Parameters:

  • job: Job to execute.
  • sem: Semaphore channel that controls maximum concurrency.
  • wg: WaitGroup to synchronize the completion of active jobs.

func (*Pool) GetJobByID added in v0.0.2

func (p *Pool) GetJobByID(id string) (domain.Job, error)

GetJobByID looks up a job in the pool by its unique ID.

Returns the job instance if found, or ErrJobNotFound otherwise.

func (*Pool) GetMetrics

func (p *Pool) GetMetrics() map[string]interface{}

GetMetrics returns a snapshot of all job metrics collected by the monitoring implementation.

This includes runtime data such as execution time, success/failure counts, current state, etc.

func (*Pool) Kill

func (p *Pool) Kill()

Kill immediately shuts down the entire scheduler pool.

This method:

  • Cancels the execution context of the pool, stopping all running, waiting, and scheduled jobs.
  • Updates all jobs' states to Stopped with an ErrPoolShutdown error.
  • Deletes all jobs from internal storage.
  • Marks the pool as permanently shut down; it cannot be restarted afterward.

Use this method with caution, as the pool becomes unusable after calling Kill.

func (*Pool) PauseJob

func (p *Pool) PauseJob(id string, timeout time.Duration) error

PauseJob temporarily pauses execution of the specified job.

If no timeout is explicitly provided (timeout = 0), the default pause timeout (DEFAULT_PAUSE_TIMEOUT) is applied.

Parameters:

  • id: Unique identifier of the job to pause.
  • timeout: Duration to wait for job acknowledgement of pause request.

Returns:

  • An error (ErrJobNotFound, ErrJobPaused, ErrJobNotRunning) if the job does not exist, is already paused, or cannot be paused.
  • nil if the pause request is successful.

func (*Pool) ProcessCompleted added in v0.0.2

func (p *Pool) ProcessCompleted(job domain.Job)

ProcessCompleted handles the state of a job marked as "Completed".

It checks if the job has future scheduled executions. If another execution is pending, the job state is reset to "Waiting". Otherwise, the job is marked as "Ended", indicating no further executions are planned.

Parameters:

  • job: The Job instance that has completed its execution.

func (*Pool) ProcessError added in v0.0.2

func (p *Pool) ProcessError(job domain.Job) error

ProcessError manages jobs that have encountered an error during execution.

Behavior:

  • Attempts to retry the job execution if retries are still allowed.
  • If retries are exhausted, the job is removed from the pool.
  • If retry is allowed, the job is reset to "Waiting" for another execution attempt.

Parameters:

  • job: The Job instance currently in an Error state.

Returns:

  • An error if the job could not be removed from the pool.
  • nil otherwise.

func (*Pool) ProcessRunning added in v0.0.2

func (p *Pool) ProcessRunning(job domain.Job)

ProcessRunning monitors a job currently in the "Running" state, checking for execution timeouts or runtime errors.

If the job exceeds its configured timeout, it is marked as Error, triggering its finalization and metric recording.

Parameters:

  • job: The Job instance currently executing.

func (*Pool) ProcessWaiting added in v0.0.2

func (p *Pool) ProcessWaiting(job domain.Job, sem chan struct{}, wg *sync.WaitGroup)

ProcessWaiting checks if a job in the "Waiting" state is ready to execute.

Behavior:

  • If the current time is past the job's scheduled next run time, the job is dispatched for execution.
  • Acquires a semaphore slot with timeout protection to avoid deadlocks.
  • If execution is not possible (e.g., wrong status, not yet time), updates job state accordingly.
  • If the job was not executed after acquiring the slot, the semaphore slot is manually released.

Parameters:

  • job: The Job instance currently in the Waiting state.
  • sem: Semaphore used to limit concurrent execution based on Pool configuration.
  • wg: WaitGroup to synchronize the execution lifecycle of active jobs.

func (*Pool) RemoveJob

func (p *Pool) RemoveJob(id string) error

RemoveJob deletes an existing job from the scheduler pool.

The method:

  • Removes the job entry from the pool's internal storage.
  • It does not actively stop a running job; jobs are stopped separately via StopJob.

Parameters:

  • id: Unique identifier of the job to remove.

Returns:

  • An error (ErrJobNotFound) if the specified job does not exist.
  • nil if the job is successfully removed.

func (*Pool) ResumeJob

func (p *Pool) ResumeJob(id string) error

ResumeJob resumes a paused or stopped job, allowing it to continue execution.

Parameters:

  • id: Unique identifier of the job to resume.

Returns:

  • An error (ErrJobNotFound, ErrJobNotPausedOrStopped) if the job does not exist, or is in an invalid state for resumption.
  • nil if the resume operation is successful.

func (*Pool) Run

func (p *Pool) Run() (err error)

Run launches the main scheduler loop that monitors job states and dispatches jobs for execution.

Behavior:

  • Executes jobs based on their status and readiness (Waiting, Running, Completed, Error).
  • Enforces MaxWorkers using a semaphore.
  • Periodically polls jobs at the configured CheckInterval.
  • Gracefully handles shutdown via context cancellation:
  • Waits for in-progress jobs to complete.
  • Updates all jobs to Stopped state with a shutdown error.
  • Clears internal job storage and marks the pool as killed.

Returns:

  • ErrPoolShutdown if the pool was already stopped or after termination.

func (*Pool) StopJob

func (p *Pool) StopJob(id string) error

StopJob terminates execution of the specified job.

The method triggers immediate job cancellation via its execution context.

Parameters:

  • id: Unique identifier of the job to stop.

Returns:

  • An error (ErrJobNotFound) if the specified job does not exist.
  • nil if the job is successfully stopped.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL