domain

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2025 License: MIT Imports: 2 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DEFAULT_NUM_WORKERS specifies the default number of concurrent workers
	// that the scheduler will use to execute jobs concurrently when no explicit limit is provided.
	// Recommended to adjust according to the workload and system resources.
	DEFAULT_NUM_WORKERS = 1000
	// DEFAULT_CHECK_INTERVAL sets the default time interval between consecutive job state checks in the scheduler.
	// The scheduler evaluates job statuses and conditions every interval defined by this constant.
	// A shorter interval provides faster reaction time but can increase CPU usage.
	DEFAULT_CHECK_INTERVAL = 100 * time.Millisecond
	// DEFAULT_IDLE_TIMEOUT specifies the default duration after which an idle job (not running or scheduled to run soon)
	// will be considered inactive or ready for removal.
	// This timeout prevents resource leaks by removing or disabling long-idle jobs.
	DEFAULT_IDLE_TIMEOUT = 100 * time.Hour
	// DEFAULT_PAUSE_TIMEOUT is the default timeout applied when pausing a job without an explicitly provided duration.
	// It determines how long a job will remain paused before automatically resuming execution.
	DEFAULT_PAUSE_TIMEOUT = 1 * time.Second
)

Variables

View Source
var (
	// MAX_END_AT represents the maximum allowable date-time for job scheduling.
	// It serves as a predefined maximum execution boundary, allowing tasks to run indefinitely
	// unless explicitly set otherwise. Used to avoid zero-value date confusion or invalid scheduling intervals.
	MAX_END_AT = time.Date(9999, 12, 31, 23, 59, 59, 0, time.UTC)
)

Functions

This section is empty.

Types

type Fn

type Fn func(ctrl FnControl) error

Fn represents the main function executed as a scheduled job by the scheduler.

The function receives a FnControl instance that exposes mechanisms for:

  • Saving execution-related data (for monitoring or hooks).
  • Accessing cancellation context and pause/resume signals.

The function should return nil if execution completes successfully, or an error if execution fails.

type FnControl

type FnControl interface {
	// SaveData stores arbitrary key-value pairs generated during the execution of a job.
	// This data can later be accessed for monitoring, logging, or analysis.
	SaveData(data map[string]interface{})

	// GetData returns a copy of the job's current saved runtime data.
	//
	// This method allows the job to access the latest key-value pairs previously stored via SaveData.
	// It can be used inside the main function (Fn) or any lifecycle hook (OnStart, OnSuccess, etc.).
	//
	// Returns:
	//   - map[string]interface{}: A snapshot of the user-defined runtime data.
	GetData() map[string]interface{}

	// Context returns the job's execution context.
	// It is canceled when the job is stopped, times out, or the pool is shut down.
	Context() context.Context

	// PauseChan returns a channel that is used to signal when the job should pause.
	// The job implementation is expected to read from this channel and enter a paused state when needed.
	PauseChan() <-chan struct{}

	// ResumeChan returns a channel that signals when a paused job should resume execution.
	// The job should wait on this channel to continue processing after a pause.
	ResumeChan() <-chan struct{}
}

FnControl provides controlled access to a job's execution environment, including lifecycle control, runtime metadata storage, and scheduler interaction.

This interface is passed into each job's execution function (Fn), enabling the job to:

  • Save and share state or metrics via SaveData.
  • React to cancellation or timeouts via Context.
  • Support manual pausing and resuming through PauseChan and ResumeChan.

type Hook

type Hook struct {
	// Fn is the function to be executed during the lifecycle stage.
	// It receives the current FnControl and an optional error for handling error in OnError hook (if applicable).
	// It should return an error if the hook fails.
	Fn func(ctrl FnControl, err error) error

	// IgnoreError determines whether hook execution errors should be ignored.
	// If true, the job will proceed even if the hook fails.
	// If false, the job will halt and treat the hook error as fatal.
	IgnoreError bool
}

Hook defines a lifecycle callback that is executed at specific stages of job execution.

Each hook provides optional behavior such as logging, metrics, notifications, etc., and can either allow or block job execution depending on its result.

type HookError

type HookError struct {
	// OnStart captures errors raised by the OnStart hook.
	OnStart error

	// OnStop captures errors raised by the OnStop hook.
	OnStop error

	// OnError captures errors raised by the OnError hook,
	// which is typically executed after a failed job function.
	OnError error

	// OnSuccess captures errors raised by the OnSuccess hook,
	// which is called after the job function completes successfully.
	OnSuccess error

	// OnPause captures errors raised by the OnPause hook,
	// which is triggered when the job is paused manually.
	OnPause error

	// OnResume captures errors raised by the OnResume hook,
	// which is triggered when a paused job is resumed.
	OnResume error

	// Finally captures errors raised by the Finally hook,
	// which always runs after execution, regardless of outcome.
	Finally error
}

HookError captures individual errors encountered in job lifecycle hooks.

Each field corresponds to a specific lifecycle phase where a hook may be executed.

type Hooks

type Hooks struct {
	// OnStart is executed right before the job's main function (Fn) begins execution.
	// Returning an error from OnStart prevents the job from running.
	OnStart Hook

	// OnStop is executed when a running job receives a stop signal.
	// It allows for graceful termination, cleanup, or state preservation before stopping the job completely.
	OnStop Hook

	// OnError is executed whenever the job's main function (Fn) encounters an error.
	// This hook provides an opportunity for error handling, logging, notifications, or triggering recovery actions.
	OnError Hook

	// OnSuccess is executed immediately after successful completion of the job's main function (Fn).
	// If OnSuccess returns an error, the job's final status will reflect the failure of this hook.
	OnSuccess Hook

	// OnPause is triggered when a job transitions into a paused state.
	// It can be used for resource management, state tracking, or logging.
	// If OnPause returns an error, the job's final status will reflect the failure of this hook.
	OnPause Hook

	// OnResume is executed when a paused job is resumed.
	// It allows reinitializing resources or state restoration required for the continuation of job execution.
	// If OnResume returns an error, the job's final status will reflect the failure of this hook.
	OnResume Hook

	// Finally is always executed after job completion, regardless of success, error, or any interruptions.
	// Ideal for guaranteed cleanup operations, final resource releases, or logging outcomes.
	// If Finally returns an error, it overrides previous results and marks the job's final status as Error.
	Finally Hook
}

Hooks represents lifecycle callback functions that can be optionally provided to manage and respond to various job execution events in the scheduler.

type Interval

type Interval struct {
	// Time specifies the duration between consecutive executions of the job.
	// For example, an interval of 1 hour executes the job every hour.
	// If Time is set, CronExpr must be empty.
	Time time.Duration

	// CronExpr specifies a cron expression that defines precise scheduling times (e.g., "0 0 * * *" for daily execution at midnight).
	// Supports standard cron syntax with minute, hour, day-of-month, month, and day-of-week fields.
	// If CronExpr is set, Time must be zero.
	CronExpr string
}

Interval defines how frequently or at what specific times a job should be executed. It supports two scheduling methods: interval-based scheduling and cron-based scheduling. Only one scheduling method should be used per job configuration to avoid conflicts.

type Job added in v0.0.2

type Job interface {
	// GetMetadata returns the job's configuration metadata.
	GetMetadata() JobDTO

	// SetTimeout sets the max allowed job execution time; 0 disables timeout tracking.
	SetTimeout(timeout time.Duration)

	// GetStatus returns the current execution status of the job.
	GetStatus() JobStatus

	// SetStatus forcefully sets the job's current status.
	SetStatus(status JobStatus)

	TrySetStatus(allowed []JobStatus, status JobStatus) bool
	// UpdateState partially updates the job's state with non-zero fields from the provided DTO.
	UpdateState(state StateDTO)

	GetState() *StateDTO

	// GetNextRun calculates and returns the next scheduled execution time for the job.
	GetNextRun() time.Time

	// ProcessStart marks the job's state as "Running", initializing execution metrics and metadata.
	ProcessStart()

	// ProcessRun monitors the job during execution, checking for timeout conditions.
	// Returns ErrJobTimeout if the job exceeds its configured timeout.
	ProcessRun() error

	// ProcessEnd finalizes the job state after execution completes, recording metrics and handling errors.
	ProcessEnd(status JobStatus, err error)

	// ProcessError performs retry logic and updates the job state in case of failure.
	ProcessError() error

	// CanExecute checks if the job meets conditions required to start execution immediately.
	// Returns an error indicating why execution is not allowed, or nil if eligible.
	CanExecute() error

	// Execute performs the job's main execution logic, handling internal lifecycle hooks and error handling.
	Execute() error

	// Stop forcibly stops job execution and updates its state accordingly.
	Stop() error

	// Pause attempts to pause job execution with a specified timeout.
	Pause(timeout time.Duration) error

	// Resume resumes execution of a paused job, if applicable.
	Resume(ctx context.Context) error

	// LockJob acquires exclusive execution access to the job if it is available.
	LockJob() bool

	// UnlockJob releases exclusive execution access to the job.
	UnlockJob()
}

Job represents an executable task managed by the scheduler. It encapsulates lifecycle management methods, status updates, execution logic, and retry mechanisms.

Each Job implementation must provide thread-safe access to internal state and logic, ensuring safe concurrent interactions within the Pool environment.

type JobDTO

type JobDTO struct {
	// ID is a required unique identifier for the job.
	ID string

	// Name is an optional human-readable label for the job.
	// If omitted, it defaults to the value of ID.
	Name string

	// Fn is the function that encapsulates the job's execution logic.
	// It receives a FnControl object that provides control over execution flow and data storage.
	Fn Fn

	// Interval specifies the time delay between consecutive executions.
	// If set to 0, the job runs only once.
	Interval Interval

	// Timeout defines the maximum duration allowed for a single execution of the job.
	// If exceeded, the job will be terminated.
	Timeout time.Duration

	// StartAt defines the earliest point in time the job is allowed to run.
	// Jobs scheduled before this timestamp will remain in Waiting state.
	StartAt time.Time

	// EndAt defines the latest point in time the job is allowed to run.
	// Jobs scheduled after this timestamp will be considered expired and marked as Stopped.
	EndAt time.Time

	// Retry specifies retry behavior in case of job execution failure.
	Retry Retry

	// Hooks defines callback functions triggered at key lifecycle events
	// such as OnStart, OnSuccess, OnError, OnStop, etc.
	Hooks Hooks
}

JobDTO defines the configuration and execution parameters of a scheduled job.

It serves as the main input structure used to register jobs with the scheduler.

type JobStatus

type JobStatus string

JobStatus represents the current execution state of a job within the scheduler.

It is used to track and manage the lifecycle and transitions of scheduled tasks. Possible values include: - Waiting: The job is waiting to be executed. - Running: The job is currently executing. - Completed: The job has finished execution successfully. - Paused: The job is temporarily paused. - Stopped: The job has been explicitly stopped and will not run unless restarted. - Ended: The job reached its defined end condition (e.g., end time or retry limit). - Error: The job encountered an error during execution.

const (
	// Waiting indicates the job is scheduled and ready to run.
	// The scheduler will pick up the job when the execution conditions are met.
	// If the job has an interval, it will wait for the next scheduled run.
	Waiting JobStatus = "waiting"

	// Running indicates the job is currently executing.
	// The scheduler actively processes the job and updates its execution time.
	// If the job exceeds the timeout, it will be marked as an error and terminated.
	Running JobStatus = "running"

	// Stopped indicates the job execution has been manually stopped.
	// The job will not run again unless explicitly restarted.
	// This status is usually set when the job's execution window has expired.
	Stopped JobStatus = "stopped"

	// Paused indicates the job is temporarily paused and can be resumed later.
	// While paused, the job retains its execution state but does not progress.
	// The scheduler does not execute paused jobs until they are resumed.
	Paused JobStatus = "paused"

	// Completed indicates the job has finished execution successfully.
	// If the job has an interval, it will be scheduled to run again after the delay.
	// Otherwise, it remains in a completed state indefinitely.
	Completed JobStatus = "completed"

	// Error indicates the job has encountered an error during execution.
	// The job may retry execution if retries are enabled and attempts to remain.
	// If retries are exhausted, the job remains in the error state.
	Error JobStatus = "error"

	// Ended indicates the job has finished execution.
	// This status is usually set when the job's execution window has expired.
	// The job will not run again unless explicitly restarted.
	Ended JobStatus = "ended"
)

type Monitoring

type Monitoring interface {
	// SaveMetrics stores execution metrics derived from a job's execution state (StateDTO).
	//
	// Implementations typically capture metrics like execution timestamps, duration,
	// final status, encountered errors, and user-defined runtime metadata.
	//
	// Parameters:
	//   - dto: StateDTO instance containing the job's execution details and metadata.
	SaveMetrics(dto StateDTO)

	// GetMetrics retrieves all stored execution metrics.
	//
	// Returns:
	//   - A map keyed by JobID, where each entry contains the StateDTO representing
	//     execution metrics for a specific job.
	GetMetrics() map[string]interface{}
}

Monitoring defines an interface for collecting, storing, and retrieving metrics related to job execution.

Implementations of this interface can persist metrics in various ways, such as: - In-memory storage for simple debugging and development purposes. - Real-time logging for operational monitoring. - External systems like dashboards, time-series databases, or analytics platforms.

type Pool

type Pool struct {
	// IdleTimeout specifies the duration after which a job that remains idle
	// (not executed or scheduled for immediate execution) will be marked as inactive.
	// This helps optimize resource usage and prevents accumulation of stale tasks.
	IdleTimeout time.Duration

	// MaxWorkers sets the maximum number of concurrent workers allowed to execute jobs simultaneously.
	// Higher values can improve throughput for CPU-bound or I/O-bound tasks, but might consume more system resources.
	MaxWorkers int

	// CheckInterval defines how frequently the pool checks for jobs that are ready for execution or require status updates.
	// Short intervals result in more responsive job execution at the expense of slightly increased CPU utilization.
	CheckInterval time.Duration
}

Pool represents configuration settings for managing the execution environment of scheduled jobs. It controls worker concurrency, resource usage, and job-checking frequency.

type Retry

type Retry struct {
	// Active specifies whether retrying is enabled for the job.
	Active bool

	// Count is the number of allowed retry attempts before marking the job as failed.
	// Each time a job fails, this counter decreases. When it reaches zero, the job stops retrying.
	Count int

	// ResetOnSuccess determines whether the retry counter should be reset after a successful execution.
	// If true, a successful execution resets the retry count to its initial value.
	ResetOnSuccess bool
}

Retry defines the retry policy for a job execution.

type StateDTO

type StateDTO struct {
	// JobID is the unique identifier of the job to which this state belongs.
	JobID string

	// StartAt is the timestamp marking when the job execution began.
	// It is zero if the job has not started yet.
	StartAt time.Time

	// EndAt is the timestamp marking when the job finished execution.
	// It is zero if the job is currently running or has not yet started.
	EndAt time.Time

	// Error captures errors encountered during job execution or in lifecycle hooks.
	// If no errors occurred, this field remains empty.
	Error StateError

	// Status indicates the current execution state of the job (e.g., Waiting, Running, Completed, Error).
	Status JobStatus

	// ExecutionTime is the duration in nanoseconds that the job took to complete.
	// It is zero if the job hasn't completed or hasn't started yet.
	ExecutionTime int64

	// Data holds user-defined key-value pairs collected during job execution.
	// This is commonly used for computed results, metrics, or side-channel information.
	Data map[string]interface{}

	// Success tracks how many times the job has completed successfully.
	Success int

	// Failure tracks how many times the job has failed.
	Failure int

	// NextRun indicates the estimated time of the next scheduled execution.
	// For one-time jobs, this will remain zero after execution.
	NextRun time.Time
}

StateDTO is a data transfer object representing the runtime state of a job.

It encapsulates execution metadata, status flags, timing information, user-defined data, and error traces without exposing internal synchronization or logic.

type StateError

type StateError struct {
	// JobError is the error returned by the main job function (Fn), if any.
	JobError error

	// HookError captures individual errors raised by lifecycle hooks.
	HookError
}

StateError groups together execution errors encountered during job runtime or in hook callbacks.

It distinguishes between core job execution errors and hook-specific failures.

func (StateError) IsEmpty

func (e StateError) IsEmpty() bool

IsEmpty checks whether the error state is effectively clean, i.e., no job or hook errors occurred.

Returns:

  • true if both JobError and all HookError fields are nil; false otherwise.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL