Documentation
¶
Index ¶
- func Contains(arr []int, val int) bool
- func New(jobDTO domain.JobDTO, ctx context.Context, mon domain.Monitoring) (domain.Job, error)
- func ParseField(field string, min int, max int) ([]int, error)
- type CronSchedule
- type Fn
- type FnControl
- type Job
- func (j *Job) CanExecute() error
- func (j *Job) CloseChannels()
- func (j *Job) Execute() (err error)
- func (j *Job) GetMetadata() domain.JobDTO
- func (j *Job) GetNextRun() time.Time
- func (j *Job) GetState() *domain.StateDTO
- func (j *Job) GetStateData() map[string]interface{}
- func (j *Job) GetStatus() domain.JobStatus
- func (j *Job) LockJob() bool
- func (j *Job) Pause(timeout time.Duration) error
- func (j *Job) ProcessEnd(status domain.JobStatus, err error)
- func (j *Job) ProcessError() error
- func (j *Job) ProcessRun() error
- func (j *Job) ProcessStart()
- func (j *Job) Resume(ctx context.Context) error
- func (j *Job) Retry() error
- func (j *Job) SetStatus(status domain.JobStatus)
- func (j *Job) SetTimeout(timeout time.Duration)
- func (j *Job) Stop() error
- func (j *Job) TrySetStatus(allowed []domain.JobStatus, status domain.JobStatus) bool
- func (j *Job) UnlockJob()
- func (j *Job) UpdateState(state domain.StateDTO)
- func (j *Job) UpdateStateStrict(state domain.StateDTO)
- type State
- func (s *State) GetNextRun() time.Time
- func (s *State) GetState() *domain.StateDTO
- func (s *State) GetStatus() domain.JobStatus
- func (s *State) SetEndState(resOnSuccess bool, status domain.JobStatus, err error, interval time.Duration)
- func (s *State) SetNextRun(startTime time.Time, interval time.Duration)
- func (s *State) SetStatus(status domain.JobStatus)
- func (s *State) TrySetStatus(allowed []domain.JobStatus, status domain.JobStatus) bool
- func (s *State) Update(State domain.StateDTO, strict bool)
- func (s *State) UpdateData(data map[string]interface{})
- func (s *State) UpdateExecutionTime() int64
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Contains ¶ added in v0.0.2
Contains checks whether a slice of integers contains a specified integer value.
Parameters:
- arr: Slice of integers.
- val: Integer to search for.
Returns:
- true if the integer is found in the slice; false otherwise.
func New ¶
New creates and initializes a new Job instance from the given JobDTO and context.
Steps:
- Validates configuration (ID, Fn, scheduling exclusivity).
- Sets default values for Name, StartAt, and EndAt.
- Parses Cron expression if provided.
- Initializes state, channels, and internal control object.
Parameters:
- jobDTO: Configuration settings for the job.
- ctx: Parent context inherited from the scheduler.
- mon: Monitoring implementation used for metrics reporting.
Returns:
- Pointer to a new Job instance.
- Error if validation fails or initialization is inconsistent.
func ParseField ¶ added in v0.0.2
ParseField parses an individual cron expression field (e.g., minutes, hours) and returns a slice of integers representing the allowed values.
Supported syntax:
- "*": wildcard, matches all values within the range.
- "*/N": step values, matches every Nth value in the range.
- "X-Y": range values, matches all values between X and Y inclusive.
- "X,Y,Z": specific values, matches only listed values.
Parameters:
- field: Cron field string to parse (e.g., "*/5", "1,2,3", "10-15").
- min, max: Allowed range for this cron field.
Returns:
- A slice of parsed integer values.
- An error if parsing fails due to invalid syntax or values out of range.
Types ¶
type CronSchedule ¶
type CronSchedule struct {
Minutes []int // Allowed minute values (0-59)
Hours []int // Allowed hour values (0-23)
Days []int // Allowed day-of-month values (1-31)
Months []int // Allowed month values (1-12)
Weekdays []int // Allowed weekday values (0-6, where 0 = Sunday)
}
CronSchedule represents a parsed cron expression. It defines allowed scheduling times using slices of integers for each cron field (minutes, hours, days, months, weekdays).
func ParseCron ¶
func ParseCron(cronExpr string) (*CronSchedule, error)
ParseCron parses a cron expression string and returns a CronSchedule instance. The cron expression must contain exactly five fields separated by spaces:
Minutes (0-59), Hours (0-23), Day-of-month (1-31), Month (1-12), Weekday (0-6, Sunday = 0).
Parameters:
- cronExpr: The cron expression string (e.g., "*/5 * * * *").
Returns:
- A pointer to a CronSchedule with parsed fields.
- An error if the cron expression syntax is invalid.
func (*CronSchedule) Matches ¶ added in v0.0.2
func (cs *CronSchedule) Matches(t time.Time) bool
Matches determines if a given time satisfies the cron schedule conditions.
Parameters:
- t: Time value to evaluate.
Returns:
- true if the provided time matches all cron fields; false otherwise.
func (*CronSchedule) NextRun ¶
func (cs *CronSchedule) NextRun(startTime time.Time) time.Time
NextRun returns the next scheduled execution time that matches the cron pattern.
The method starts from the provided `startTime`, rounded up to the next minute, and searches forward minute by minute for a valid match. It checks the schedule components in the following order: Month, Day, Hour, Minute, Weekday. If a component does not match, it skips forward efficiently to the next possible candidate.
To prevent infinite loops on malformed cron definitions, the search is limited to 1 year ahead.
Parameters:
- startTime: The reference point from which to begin the search.
Returns:
- time.Time: The next valid execution time.
- Zero time (time.Time{}) if no match is found within 1 year (should not occur in normal use).
type Fn ¶
Fn defines the signature of a job's main execution function.
This function is invoked by the scheduler at runtime and represents the core logic of a scheduled task.
Parameters:
- ctrl: FnControl instance providing access to execution context, pause/resume signaling, and runtime metadata storage.
Returns:
- An error if the job execution fails, or nil on successful completion.
type FnControl ¶
type FnControl struct {
// contains filtered or unexported fields
}
FnControl provides the job's execution function (Fn) with control mechanisms, lifecycle coordination, and runtime metadata handling.
It exposes capabilities for:
- Context management (cancellation, timeout).
- Pause/resume signaling support.
- Saving runtime key-value data for inspection or lifecycle hooks.
func (*FnControl) Context ¶
Context returns the execution context associated with this job.
The context is used to monitor cancellation or timeout events triggered by scheduler shutdown or job configuration limits.
Returns:
- context.Context instance for cancellation awareness.
func (*FnControl) GetData ¶
GetData returns a copy of the job's current saved runtime data.
This method allows the job to access the latest key-value pairs previously stored via SaveData. It can be used inside the main function (Fn) or any lifecycle hook (OnStart, OnSuccess, etc.).
Returns:
- map[string]interface{}: A snapshot of the user-defined runtime data.
func (*FnControl) PauseChan ¶
func (ctrl *FnControl) PauseChan() <-chan struct{}
PauseChan exposes the channel used to notify a job that it should pause.
Job functions should monitor this channel and enter a paused state upon receiving a signal (e.g., via select-case).
Returns:
- <-chan struct{}: read-only pause signal channel.
func (*FnControl) ResumeChan ¶
func (ctrl *FnControl) ResumeChan() <-chan struct{}
ResumeChan provides a channel that signals when a paused job should resume.
This wrapper around the internal resume channel ensures that if the context is canceled while the job is paused, the resume channel will also be closed to prevent goroutine leaks.
Returns:
- <-chan struct{}: read-only channel for resume signaling.
type Job ¶
type Job struct {
domain.JobDTO // Embedded job configuration provided during creation.
PoolID string // Optional: pool identifier if needed for grouping or routing.
// contains filtered or unexported fields
}
Job represents a scheduled task managed and executed by the scheduler.
It encapsulates the job's execution function (Fn), configuration, lifecycle state, context cancellation, pause/resume signaling, retry logic, and runtime metrics.
func (*Job) CanExecute ¶
CanExecute evaluates whether the job is eligible for execution based on scheduling constraints and current status.
Validation checks include:
- The job must be in the Waiting state.
- The current time must be after StartAt and before EndAt.
Returns:
- nil if the job is eligible for execution.
- A wrapped error indicating the reason why execution is not allowed.
func (*Job) CloseChannels ¶
func (j *Job) CloseChannels()
CloseChannels safely closes all internal job channels and cancels the job context.
Behavior:
- Attempts to close all coordination channels (pause, resume, done, process).
- Recovers from any panics due to closing already-closed channels.
- Cancels the associated job context.
Warning:
- processCh is mistakenly attempted to be closed twice — calling CloseChannels more than once is unsafe and should be avoided.
func (*Job) Execute ¶
Execute runs the job's main function, managing the full lifecycle including pre-execution checks, hook invocations, error propagation, and cleanup logic.
Execution flow:
- Checks that the job is not already running by verifying the `doneCh` channel. Returns ErrJobStillRunning if the job is currently in progress.
- Ensures the `Finally` hook is always executed after job logic (deferred).
- Executes the `OnStart` hook. If it fails and `IgnoreError` is false, job execution stops.
- Executes the user-defined function (`Fn`). If it returns an error, the `OnError` hook is run. The error is wrapped and passed to the job's error handler.
- If execution is successful, runs the `OnSuccess` hook.
- Any error from `Finally` overrides prior ones.
Returns:
- The resulting error from the main function or any hook, or nil on success.
func (*Job) GetMetadata ¶
GetMetadata returns the original job configuration (JobDTO).
func (*Job) GetNextRun ¶
GetNextRun retrieves the scheduled time for the job's next execution.
Returns:
- time.Time: The timestamp of the next planned run.
func (*Job) GetStateData ¶
GetStateData returns a copy of the job's current runtime data.
This method allows accessing the latest key-value pairs previously stored via SaveData. It guarantees that modifications to the returned map will not affect the internal state.
Returns:
- map[string]interface{}: A copy of the user-defined Data field.
func (*Job) LockJob ¶
LockJob attempts to acquire an internal processing lock for lifecycle transitions. Returns true if lock was acquired.
func (*Job) Pause ¶
Pause attempts to pause the currently running job by sending a non-blocking signal to pauseCh.
Once the pause signal is sent, a timeout watcher is started in a separate goroutine. If the job fails to acknowledge the pause (by reading from pauseCh) within the specified timeout, the job's status is automatically reverted back to Running.
This mechanism ensures that jobs which do not support pause/resume don't get stuck in the Paused state.
Parameters:
- timeout: Duration to wait for pause acknowledgment. If zero, DEFAULT_PAUSE_TIMEOUT is used.
Returns:
- An error if the job is not in the Running state, or if the pause signal could not be delivered because the job has already received one.
func (*Job) ProcessEnd ¶
ProcessEnd finalizes the job after execution, saving timing and status metadata.
This method performs:
- Records EndAt timestamp and calculates ExecutionTime.
- Sets the final status (Completed, Error, Ended).
- Increments Success or Failure counter.
- Stores execution error if any.
- Triggers graceful cleanup if status is Ended.
Parameters:
- status: Final job status (Completed, Error, or Ended).
- err: Error encountered during execution, if any.
func (*Job) ProcessError ¶
ProcessError applies retry logic after a job execution failure.
Behavior:
- If retries are available, the job is rescheduled by setting status to complete.
- If retries are exhausted or disabled, the job is finalized (Ended) and removed from the pool. (Note: In this case, execution errors are not stored in the final state.)
Returns:
- An error if no more retries are allowed (indicating job termination).
- nil if the job will be retried.
func (*Job) ProcessRun ¶
ProcessRun monitors job execution time to detect timeouts.
This method should be called during execution (typically inside the pool), and ensures the job hasn't exceeded its configured Timeout.
Returns:
- ErrJobTimeout if execution exceeds Timeout.
- nil if within time limit.
func (*Job) ProcessStart ¶
func (j *Job) ProcessStart()
ProcessStart initializes the job's internal state at the beginning of execution.
This method performs:
- Records the current timestamp as StartAt.
- Resets execution-related fields (EndAt, ExecutionTime, Data).
- Sets the job status to Running.
- Prepares the state for clean metrics collection and data accumulation.
func (*Job) Resume ¶
Resume sends a resume signal to a paused or stopped job, allowing it to continue execution.
If the job was Paused, it sends a resume signal via resumeCh, sets status to Running, and triggers the OnResume hook. If the job was Stopped, it transitions the status to Waiting (preparing for re-execution) and triggers the OnResume hook.
Returns:
- An error if the job is not in the Paused or Stopped state, or if the resume signal cannot be delivered (e.g., if the channel is full).
func (*Job) Retry ¶
Retry increments the job's retry counter and evaluates retry eligibility.
Logic:
- If retrying is disabled, returns ErrRetryFlagNotActive.
- If max retry count is reached, returns ErrJobRetryLimit.
- If unlimited retries (Count = 0), allows retry indefinitely.
Returns:
- nil if retry is permitted.
- Error if retrying is disallowed or exhausted.
func (*Job) SetTimeout ¶
SetTimeout updates the job's execution timeout. If exceeded, the job may be canceled during ProcessRun.
func (*Job) Stop ¶
Stop immediately cancels the job's execution context and updates its status to Stopped.
For Running, Paused, or Waiting jobs — it forcibly marks them as Stopped and records the EndAt timestamp. For Completed or Error jobs — it updates the status to Stopped without altering timestamps.
This method is idempotent and can be safely called multiple times.
If the OnStop hook is defined, it is executed after updating the state.
func (*Job) TrySetStatus ¶
TrySetStatus attempts to transition to a new status if the current status is within the allowed list. Returns true if the transition succeeds.
func (*Job) UpdateState ¶
UpdateState applies a partial update to the job's internal state. Only non-zero/non-nil fields in the provided StateDTO will be applied.
func (*Job) UpdateStateStrict ¶
UpdateStateStrict performs a full overwrite of the job's current state using all fields from the provided StateDTO.
type State ¶ added in v0.0.2
type State struct {
domain.StateDTO // Embedded State DTO for simplified access.
// contains filtered or unexported fields
}
State represents the internal, thread-safe runtime State of a scheduled job.
It manages execution timestamps, status transitions, retry attempts, custom job data, and error tracking. All operations are guarded by mutexes to ensure concurrency safety.
func NewState ¶ added in v0.0.2
func NewState(jobId string, cron *CronSchedule, mon domain.Monitoring) *State
NewState initializes a new job State with default values.
Behavior:
- Sets Status to Waiting.
- Initializes an empty Data map.
- Links the provided Monitoring backend for metric tracking.
Parameters:
- jobId: Unique job identifier.
- mon: Monitoring implementation for metrics reporting.
Returns:
- A pointer to a fully initialized State.
func (*State) GetNextRun ¶ added in v0.0.2
GetNextRun returns the next scheduled run time of the job, based on its current state.
Behavior:
- If the job has never been executed (i.e., both Success and Failure counters are zero), the initially assigned NextRun time is returned.
- If the job has run at least once but EndAt is still unset, it is treated as a running job, and a far-future date (January 1, 9999) is returned as a sentinel value.
- Otherwise, returns the current value of NextRun.
This method uses a read lock to safely access state in concurrent environments.
func (*State) GetState ¶ added in v0.0.2
GetState returns the current job State.
Warning:
- The returned pointer refers to the internal State (not a deep clone).
- External code must treat this object as read-only to avoid race conditions.
Returns:
- A pointer to the current StateDTO.
func (*State) GetStatus ¶ added in v0.0.2
GetStatus retrieves the current job status in a thread-safe manner.
func (*State) SetEndState ¶ added in v0.0.2
func (s *State) SetEndState(resOnSuccess bool, status domain.JobStatus, err error, interval time.Duration)
SetEndState finalizes the job's runtime state after an execution attempt completes.
This method records execution metadata such as end time, duration, outcome, retry counters, and the next scheduled run. It also determines whether to update the current job status or preserve it based on the prior lifecycle state.
Behavior:
- Sets EndAt to the current time.
- Calculates ExecutionTime as the duration since StartAt.
- If current status is Paused, Running, or Waiting: overrides with the provided `status` and sets JobError.
- If current status is Stopped, Ended, or Error: preserves existing status and skips JobError update.
- Increments Success or Failure counters based on the presence of `err`.
- Resets retry counter if execution was successful and `resOnSuccess` is true.
- Computes and sets the next run time using the provided `interval`.
- Saves the updated state to the monitoring system.
Parameters:
- resOnSuccess: If true, resets the retry counter on success.
- status: The target status to apply if the current state allows it.
- err: The error returned by job execution, or nil on success.
- interval: The repeat interval used to schedule the next execution.
func (*State) SetNextRun ¶ added in v0.0.3
SetNextRun updates the job's NextRun field based on the specified scheduling configuration.
Behavior:
- If the job is cron-based, computes the next run using the cron schedule and the provided startTime.
- If interval-based:
- On the first execution (i.e., Success and Failure are zero), NextRun is set to startTime.
- On subsequent executions, NextRun is set to EndAt + interval.
Parameters:
- startTime: Reference time used for computing the next run (typically StartAt or EndAt).
- interval: Fixed delay to apply when computing the next run for interval-based jobs.
func (*State) SetStatus ¶ added in v0.0.2
SetStatus updates the job's execution status.
Behavior:
- Updates the Status field.
- Immediately saves the updated State into the Monitoring backend.
func (*State) TrySetStatus ¶ added in v0.0.2
TrySetStatus attempts to change the status only if the current status is in the allowed list.
Parameters:
- allowed: List of acceptable current statuses.
- status: New status to set if transition is allowed.
Returns:
- true if the status was successfully updated.
func (*State) Update ¶ added in v0.0.2
Update applies a partial or full update to the job's runtime state using the provided StateDTO.
This method supports two modes:
- Strict mode (`strict = true`): All fields in the current state are forcibly overwritten with the values from the provided StateDTO, regardless of whether they are zero-valued. Metrics (e.g., Success, Failure) are NOT automatically recorded in this mode.
- Non-strict mode (`strict = false`): Only non-zero or non-nil fields from the DTO are merged into the existing state. This allows incremental updates without wiping unrelated values. Metrics are saved immediately after the merge.
Error handling:
- JobError and HookError fields are merged independently.
- Individual hook errors (e.g., OnStart, OnError) are only updated if non-nil.
- The state lock ensures thread-safe updates.
Parameters:
- State: The StateDTO containing new values to apply.
- strict: Whether to overwrite all fields (`true`) or only merge meaningful changes (`false`).
func (*State) UpdateData ¶ added in v0.0.2
UpdateData applies key-value pairs to the job's custom metadata.
Also triggers metric storage after modification.
func (*State) UpdateExecutionTime ¶ added in v0.0.2
UpdateExecutionTime calculates and updates execution duration since StartAt.
Also pushes the updated State to the monitoring system.