job

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2025 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Contains added in v0.0.2

func Contains(arr []int, val int) bool

Contains checks whether a slice of integers contains a specified integer value.

Parameters:

  • arr: Slice of integers.
  • val: Integer to search for.

Returns:

  • true if the integer is found in the slice; false otherwise.

func New

func New(jobDTO domain.JobDTO, ctx context.Context, mon domain.Monitoring) (domain.Job, error)

New creates and initializes a new Job instance from the given JobDTO and context.

Steps:

  • Validates configuration (ID, Fn, scheduling exclusivity).
  • Sets default values for Name, StartAt, and EndAt.
  • Parses Cron expression if provided.
  • Initializes state, channels, and internal control object.

Parameters:

  • jobDTO: Configuration settings for the job.
  • ctx: Parent context inherited from the scheduler.
  • mon: Monitoring implementation used for metrics reporting.

Returns:

  • Pointer to a new Job instance.
  • Error if validation fails or initialization is inconsistent.

func ParseField added in v0.0.2

func ParseField(field string, min int, max int) ([]int, error)

ParseField parses an individual cron expression field (e.g., minutes, hours) and returns a slice of integers representing the allowed values.

Supported syntax:

  • "*": wildcard, matches all values within the range.
  • "*/N": step values, matches every Nth value in the range.
  • "X-Y": range values, matches all values between X and Y inclusive.
  • "X,Y,Z": specific values, matches only listed values.

Parameters:

  • field: Cron field string to parse (e.g., "*/5", "1,2,3", "10-15").
  • min, max: Allowed range for this cron field.

Returns:

  • A slice of parsed integer values.
  • An error if parsing fails due to invalid syntax or values out of range.

Types

type CronSchedule

type CronSchedule struct {
	Minutes  []int // Allowed minute values (0-59)
	Hours    []int // Allowed hour values (0-23)
	Days     []int // Allowed day-of-month values (1-31)
	Months   []int // Allowed month values (1-12)
	Weekdays []int // Allowed weekday values (0-6, where 0 = Sunday)
}

CronSchedule represents a parsed cron expression. It defines allowed scheduling times using slices of integers for each cron field (minutes, hours, days, months, weekdays).

func ParseCron

func ParseCron(cronExpr string) (*CronSchedule, error)

ParseCron parses a cron expression string and returns a CronSchedule instance. The cron expression must contain exactly five fields separated by spaces:

Minutes (0-59), Hours (0-23), Day-of-month (1-31), Month (1-12), Weekday (0-6, Sunday = 0).

Parameters:

  • cronExpr: The cron expression string (e.g., "*/5 * * * *").

Returns:

  • A pointer to a CronSchedule with parsed fields.
  • An error if the cron expression syntax is invalid.

func (*CronSchedule) Matches added in v0.0.2

func (cs *CronSchedule) Matches(t time.Time) bool

Matches determines if a given time satisfies the cron schedule conditions.

Parameters:

  • t: Time value to evaluate.

Returns:

  • true if the provided time matches all cron fields; false otherwise.

func (*CronSchedule) NextRun

func (cs *CronSchedule) NextRun() time.Time

NextRun calculates and returns the next scheduled execution time based on the cron schedule. It efficiently skips unsuitable dates and times to find the next match, looking ahead up to one year.

Returns:

  • The next scheduled execution time (time.Time).
  • Zero-value time if no valid execution time is found within a year (unlikely scenario).

type Fn

type Fn func(ctrl FnControl) error

Fn defines the signature of a job's main execution function.

This function is invoked by the scheduler at runtime and represents the core logic of a scheduled task.

Parameters:

  • ctrl: FnControl instance providing access to execution context, pause/resume signaling, and runtime metadata storage.

Returns:

  • An error if the job execution fails, or nil on successful completion.

type FnControl

type FnControl struct {
	// contains filtered or unexported fields
}

FnControl provides the job's execution function (Fn) with control mechanisms, lifecycle coordination, and runtime metadata handling.

It exposes capabilities for:

  • Context management (cancellation, timeout).
  • Pause/resume signaling support.
  • Saving runtime key-value data for inspection or lifecycle hooks.

func (*FnControl) Context

func (ctrl *FnControl) Context() context.Context

Context returns the execution context associated with this job.

The context is used to monitor cancellation or timeout events triggered by scheduler shutdown or job configuration limits.

Returns:

  • context.Context instance for cancellation awareness.

func (*FnControl) GetData

func (ctrl *FnControl) GetData() map[string]interface{}

GetData returns a copy of the job's current saved runtime data.

This method allows the job to access the latest key-value pairs previously stored via SaveData. It can be used inside the main function (Fn) or any lifecycle hook (OnStart, OnSuccess, etc.).

Returns:

  • map[string]interface{}: A snapshot of the user-defined runtime data.

func (*FnControl) PauseChan

func (ctrl *FnControl) PauseChan() <-chan struct{}

PauseChan exposes the channel used to notify a job that it should pause.

Job functions should monitor this channel and enter a paused state upon receiving a signal (e.g., via select-case).

Returns:

  • <-chan struct{}: read-only pause signal channel.

func (*FnControl) ResumeChan

func (ctrl *FnControl) ResumeChan() <-chan struct{}

ResumeChan provides a channel that signals when a paused job should resume.

This wrapper around the internal resume channel ensures that if the context is canceled while the job is paused, the resume channel will also be closed to prevent goroutine leaks.

Returns:

  • <-chan struct{}: read-only channel for resume signaling.

func (*FnControl) SaveData

func (ctrl *FnControl) SaveData(data map[string]interface{})

SaveData stores custom key-value data generated during job execution.

This metadata becomes part of the job's execution state and is retained across lifecycle hooks and monitoring systems.

Parameters:

  • data: Map of user-defined runtime data to persist.

type Job

type Job struct {
	domain.JobDTO // Embedded job configuration provided during creation.

	PoolID string // Optional: pool identifier if needed for grouping or routing.
	// contains filtered or unexported fields
}

Job represents a scheduled task managed and executed by the scheduler.

It encapsulates the job's execution function (Fn), configuration, lifecycle state, context cancellation, pause/resume signaling, retry logic, and runtime metrics.

func (*Job) CanExecute

func (j *Job) CanExecute() error

CanExecute evaluates whether the job is eligible for execution based on scheduling constraints and current status.

Validation checks include:

  • The job must be in the Waiting state.
  • The current time must be after StartAt and before EndAt.

Returns:

  • nil if the job is eligible for execution.
  • A wrapped error indicating the reason why execution is not allowed.

func (*Job) CloseChannels

func (j *Job) CloseChannels()

CloseChannels safely closes all internal job channels and cancels the job context.

Behavior:

  • Attempts to close all coordination channels (pause, resume, done, process).
  • Recovers from any panics due to closing already-closed channels.
  • Cancels the associated job context.

Warning:

  • processCh is mistakenly attempted to be closed twice — calling CloseChannels more than once is unsafe and should be avoided.

func (*Job) Execute

func (j *Job) Execute() (err error)

Execute runs the job's main function, managing the full lifecycle including pre-execution checks, hook invocations, error propagation, and cleanup logic.

Execution flow:

  1. Checks that the job is not already running by verifying the `doneCh` channel. Returns ErrJobStillRunning if the job is currently in progress.
  2. Ensures the `Finally` hook is always executed after job logic (deferred).
  3. Executes the `OnStart` hook. If it fails and `IgnoreError` is false, job execution stops.
  4. Executes the user-defined function (`Fn`). If it returns an error, the `OnError` hook is run. The error is wrapped and passed to the job's error handler.
  5. If execution is successful, runs the `OnSuccess` hook.
  6. Any error from `Finally` overrides prior ones.

Returns:

  • The resulting error from the main function or any hook, or nil on success.

func (*Job) GetMetadata

func (j *Job) GetMetadata() domain.JobDTO

GetMetadata returns the original job configuration (JobDTO).

func (*Job) GetNextRun

func (j *Job) GetNextRun() time.Time

GetNextRun retrieves the scheduled time for the job's next execution.

Returns:

  • time.Time: The timestamp of the next planned run.

func (*Job) GetState

func (j *Job) GetState() *domain.StateDTO

GetState returns a snapshot of the job's current execution state.

func (*Job) GetStateData

func (j *Job) GetStateData() map[string]interface{}

GetStateData returns a copy of the job's current runtime data.

This method allows accessing the latest key-value pairs previously stored via SaveData. It guarantees that modifications to the returned map will not affect the internal state.

Returns:

  • map[string]interface{}: A copy of the user-defined Data field.

func (*Job) GetStatus

func (j *Job) GetStatus() domain.JobStatus

GetStatus retrieves the job's current status (e.g., Waiting, Running).

func (*Job) LockJob

func (j *Job) LockJob() bool

LockJob attempts to acquire an internal processing lock for lifecycle transitions. Returns true if lock was acquired.

func (*Job) Pause

func (j *Job) Pause(timeout time.Duration) error

Pause attempts to pause the currently running job by sending a non-blocking signal to pauseCh.

Once the pause signal is sent, a timeout watcher is started in a separate goroutine. If the job fails to acknowledge the pause (by reading from pauseCh) within the specified timeout, the job's status is automatically reverted back to Running.

This mechanism ensures that jobs which do not support pause/resume don't get stuck in the Paused state.

Parameters:

  • timeout: Duration to wait for pause acknowledgment. If zero, DEFAULT_PAUSE_TIMEOUT is used.

Returns:

  • An error if the job is not in the Running state, or if the pause signal could not be delivered because the job has already received one.

func (*Job) ProcessEnd

func (j *Job) ProcessEnd(status domain.JobStatus, err error)

ProcessEnd finalizes the job after execution, saving timing and status metadata.

This method performs:

  • Records EndAt timestamp and calculates ExecutionTime.
  • Sets the final status (Completed, Error, Ended).
  • Increments Success or Failure counter.
  • Stores execution error if any.
  • Triggers graceful cleanup if status is Ended.

Parameters:

  • status: Final job status (Completed, Error, or Ended).
  • err: Error encountered during execution, if any.

func (*Job) ProcessError

func (j *Job) ProcessError() error

ProcessError applies retry logic after a job execution failure.

Behavior:

  • If retries are available, the job is rescheduled by setting status to complete.
  • If retries are exhausted or disabled, the job is finalized (Ended) and removed from the pool. (Note: In this case, execution errors are not stored in the final state.)

Returns:

  • An error if no more retries are allowed (indicating job termination).
  • nil if the job will be retried.

func (*Job) ProcessRun

func (j *Job) ProcessRun() error

ProcessRun monitors job execution time to detect timeouts.

This method should be called during execution (typically inside the pool), and ensures the job hasn't exceeded its configured Timeout.

Returns:

  • ErrJobTimeout if execution exceeds Timeout.
  • nil if within time limit.

func (*Job) ProcessStart

func (j *Job) ProcessStart()

ProcessStart initializes the job's internal state at the beginning of execution.

This method performs:

  • Records the current timestamp as StartAt.
  • Resets execution-related fields (EndAt, ExecutionTime, Data).
  • Sets the job status to Running.
  • Prepares the state for clean metrics collection and data accumulation.

func (*Job) Resume

func (j *Job) Resume(ctx context.Context) error

Resume sends a resume signal to a paused or stopped job, allowing it to continue execution.

If the job was Paused, it sends a resume signal via resumeCh, sets status to Running, and triggers the OnResume hook. If the job was Stopped, it transitions the status to Waiting (preparing for re-execution) and triggers the OnResume hook.

Returns:

  • An error if the job is not in the Paused or Stopped state, or if the resume signal cannot be delivered (e.g., if the channel is full).

func (*Job) Retry

func (j *Job) Retry() error

Retry increments the job's retry counter and evaluates retry eligibility.

Logic:

  • If retrying is disabled, returns ErrRetryFlagNotActive.
  • If max retry count is reached, returns ErrJobRetryLimit.
  • If unlimited retries (Count = 0), allows retry indefinitely.

Returns:

  • nil if retry is permitted.
  • Error if retrying is disallowed or exhausted.

func (*Job) SetNextRun

func (j *Job) SetNextRun(startTime time.Time) time.Time

func (*Job) SetStatus

func (j *Job) SetStatus(status domain.JobStatus)

SetStatus forcefully sets the job's current status.

func (*Job) SetTimeout

func (j *Job) SetTimeout(timeout time.Duration)

SetTimeout updates the job's execution timeout. If exceeded, the job may be canceled during ProcessRun.

func (*Job) Stop

func (j *Job) Stop() error

Stop immediately cancels the job's execution context and updates its status to Stopped.

For Running, Paused, or Waiting jobs — it forcibly marks them as Stopped and records the EndAt timestamp. For Completed or Error jobs — it updates the status to Stopped without altering timestamps.

This method is idempotent and can be safely called multiple times.

If the OnStop hook is defined, it is executed after updating the state.

func (*Job) TrySetStatus

func (j *Job) TrySetStatus(allowed []domain.JobStatus, status domain.JobStatus) bool

TrySetStatus attempts to transition to a new status if the current status is within the allowed list. Returns true if the transition succeeds.

func (*Job) UnlockJob

func (j *Job) UnlockJob()

UnlockJob releases the internal lifecycle lock.

func (*Job) UpdateState

func (j *Job) UpdateState(state domain.StateDTO)

UpdateState applies a partial update to the job's internal state. Only non-zero/non-nil fields in the provided StateDTO will be applied.

func (*Job) UpdateStateStrict

func (j *Job) UpdateStateStrict(state domain.StateDTO)

UpdateStateStrict performs a full overwrite of the job's current state using all fields from the provided StateDTO.

type State added in v0.0.2

type State struct {
	domain.StateDTO // Embedded State DTO for simplified access.
	// contains filtered or unexported fields
}

State represents the internal, thread-safe runtime State of a scheduled job.

It manages execution timestamps, status transitions, retry attempts, custom job data, and error tracking. All operations are guarded by mutexes to ensure concurrency safety.

func NewState added in v0.0.2

func NewState(jobId string, mon domain.Monitoring) *State

NewState initializes a new job State with default values.

Behavior:

  • Sets Status to Waiting.
  • Initializes an empty Data map.
  • Links the provided Monitoring backend for metric tracking.

Parameters:

  • jobId: Unique job identifier.
  • mon: Monitoring implementation for metrics reporting.

Returns:

  • A pointer to a fully initialized State.

func (*State) GetNextRun added in v0.0.2

func (s *State) GetNextRun() time.Time

func (*State) GetState added in v0.0.2

func (s *State) GetState() *domain.StateDTO

GetState returns the current job State.

Warning:

  • The returned pointer refers to the internal State (not a deep clone).
  • External code must treat this object as read-only to avoid race conditions.

Returns:

  • A pointer to the current StateDTO.

func (*State) GetStatus added in v0.0.2

func (s *State) GetStatus() domain.JobStatus

GetStatus retrieves the current job status in a thread-safe manner.

func (*State) SetEndState added in v0.0.2

func (s *State) SetEndState(resOnSuccess bool, status domain.JobStatus, err error)

SetEndState finalizes the execution State of a job after it finishes running, applying post-execution metadata such as status, error information, and execution duration.

Behavior:

  • Updates the job’s EndAt timestamp and total ExecutionTime.
  • Determines the correct final status based on current State:
  • If status was Running, Waiting, or Paused: sets the new status from input.
  • If status was Stopped, Ended, or Error: preserves the current status.
  • For unknown States, applies the provided final status defensively.
  • Records any job execution error for diagnostics.
  • Increments Success or Failure counters.
  • Resets the retry counter if the execution was successful and reset-on-success is enabled.
  • Saves the final State to the monitoring backend.

Parameters:

  • resOnSuccess: If true, resets retry counter after a successful execution.
  • status: The target status to assign if eligible.
  • err: The execution error encountered, or nil if the job completed successfully.

func (*State) SetStatus added in v0.0.2

func (s *State) SetStatus(status domain.JobStatus)

SetStatus updates the job's execution status.

Behavior:

  • Updates the Status field.
  • Immediately saves the updated State into the Monitoring backend.

func (*State) TrySetStatus added in v0.0.2

func (s *State) TrySetStatus(allowed []domain.JobStatus, status domain.JobStatus) bool

TrySetStatus attempts to change the status only if the current status is in the allowed list.

Parameters:

  • allowed: List of acceptable current statuses.
  • status: New status to set if transition is allowed.

Returns:

  • true if the status was successfully updated.

func (*State) Update added in v0.0.2

func (s *State) Update(State domain.StateDTO, strict bool)

Update applies a partial or full State update from the provided DTO.

Behavior:

  • In strict mode: All fields are forcefully overwritten. Metrics are not updated automatically (caller is responsible).
  • In non-strict mode: Only non-zero or non-empty fields are merged selectively. Metrics are updated immediately.

Parameters:

  • State: New values to apply to the State.
  • strict: Whether to fully overwrite (true) or merge selectively (false).

func (*State) UpdateData added in v0.0.2

func (s *State) UpdateData(data map[string]interface{})

UpdateData applies key-value pairs to the job's custom metadata.

Also triggers metric storage after modification.

func (*State) UpdateExecutionTime added in v0.0.2

func (s *State) UpdateExecutionTime() int64

UpdateExecutionTime calculates and updates execution duration since StartAt.

Also pushes the updated State to the monitoring system.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL