Documentation
¶
Index ¶
- Constants
- func JECancelJob(ctx context.Context, jobID uuid.UUID, reason error) errorx.Error
- func JEDeleteJob(ctx context.Context, jobID uuid.UUID, reason error) errorx.Error
- func JEResumeJob(ctx context.Context, jobID uuid.UUID) errorx.Error
- func JEScheduleJob(ctx context.Context, job *JobObj) errorx.Error
- func JESuspendJob(ctx context.Context, jobID uuid.UUID) errorx.Error
- func JEWaitJob(ctx context.Context, jobID uuid.UUID, timeoutSec time.Duration) errorx.Error
- func RegisterJobGroup(group *JobGroup)
- func ResumeJobsOnServerStart(ctx context.Context) error
- type Job
- type JobAPI
- func (j *JobAPI) APICancelJob(ctx context.Context, input ...) (*JobAPIResponse, error)
- func (j *JobAPI) APIDeleteJob(ctx context.Context, input ...) (*struct{}, error)
- func (j *JobAPI) APIGetJob(ctx context.Context, input ...) (*JobAPIResponse, error)
- func (j *JobAPI) APIGetJobGroup(ctx context.Context, input ...) (*JobGroupAPIResponse, error)
- func (j *JobAPI) APIGetJobType(ctx context.Context, input ...) (*JobTypeAPIResponse, error)
- func (j *JobAPI) APIListJobGroups(ctx context.Context, input *ListJobGroupsAPIRequest) (*ListJobGroupsAPIResponse, error)
- func (j *JobAPI) APIListJobTypes(ctx context.Context, request *api.PageAPIRequest) (*ListJobTypesAPIResponse, error)
- func (j *JobAPI) APIListJobs(ctx context.Context, input *ListJobsAPIRequest) (*ListJobsAPIResponse, error)
- func (j *JobAPI) APIResumeJob(ctx context.Context, input ...) (*JobAPIResponse, error)
- func (j *JobAPI) APIScheduleJob(ctx context.Context, input ...) (*JobAPIResponse, error)
- func (j *JobAPI) APISuspendJob(ctx context.Context, input ...) (*JobAPIResponse, error)
- func (j *JobAPI) JobGroupToAPIResponse(jobGroup *JobGroup) *JobGroupAPIResponse
- func (j *JobAPI) JobToAPIResponse(job *JobObj) *JobAPIResponse
- func (j *JobAPI) JobTypeToAPIResponse(jobType *JobType) *JobTypeAPIResponse
- type JobAPIResponse
- type JobAPIResponseItem
- type JobGroup
- type JobGroupAPIResponse
- type JobGroupAPIResponseItem
- type JobObj
- func JEGetJob(ctx context.Context, jobID uuid.UUID) (*JobObj, errorx.Error)
- func JEGetJobByIdempotencyKey(ctx context.Context, idempotencyKey uuid.UUID) (*JobObj, errorx.Error)
- func ListJobs(ctx context.Context, pageRequest *api.PageAPIRequest, status string) ([]*JobObj, error)
- func NewJob(ctx context.Context, idempotencyKey uuid.UUID, jobType *JobType, ...) (*JobObj, error)
- func (j *JobObj) CheckStatusTransition(desiredStatus JobStatus) (bool, error)
- func (j *JobObj) GetETA() int64
- func (j *JobObj) GetError() string
- func (j *JobObj) GetIdempotencyKey() uuid.UUID
- func (j *JobObj) GetJobID() uuid.UUID
- func (j *JobObj) GetLockedBy() uuid.UUID
- func (j *JobObj) GetMaxRetries() int
- func (j *JobObj) GetParams() string
- func (j *JobObj) GetParamsPtr() interface{}
- func (j *JobObj) GetProgress() float32
- func (j *JobObj) GetQueue() JobQueueName
- func (j *JobObj) GetRetries() int
- func (j *JobObj) GetRetryDelay() time.Duration
- func (j *JobObj) GetScheduledAt() int64
- func (j *JobObj) GetStartedAt() int64
- func (j *JobObj) GetStatus() JobStatus
- func (j *JobObj) GetStatusErrorProgressSuccess() (string, string, float32, bool)
- func (j *JobObj) GetTenantID() uuid.UUID
- func (j *JobObj) GetTimeoutSec() time.Duration
- func (j *JobObj) GetType() string
- func (j *JobObj) GetTypePtr() *JobType
- func (j *JobObj) GetUserID() uuid.UUID
- func (j *JobObj) Log(level logging.Level, msg string, args ...interface{})
- func (j *JobObj) LogDebug(msg string, args ...interface{})
- func (j *JobObj) LogError(msg string, args ...interface{})
- func (j *JobObj) LogInfo(msg string, args ...interface{})
- func (j *JobObj) LogTrace(msg string, args ...interface{})
- func (j *JobObj) LogWarn(msg string, args ...interface{})
- func (j *JobObj) SetLockedBy(ctx context.Context, lockedBy uuid.UUID) error
- func (j *JobObj) SetProgress(ctx context.Context, progress float32) error
- func (j *JobObj) SetResult(ctx context.Context, result interface{}) error
- func (j *JobObj) SetRetryPolicy(retryDelay time.Duration, maxRetries int, timeout time.Duration) error
- func (j *JobObj) SetSkipped(ctx context.Context, reason string) error
- func (j *JobObj) SetUnlocked(ctx context.Context) error
- type JobQueue
- type JobQueueName
- type JobStatus
- type JobType
- type JobTypeAPIResponse
- type JobTypeAPIResponseItem
- type JobTypeParams
- type JobWorker
- type JobWorkerExecutionCallback
- type JobWorkerInitCallback
- type JobWorkerResumeCallback
- type JobWorkerStateUpdateCallback
- type JobWorkerSuspendCallback
- type JobsExecutor
- type ListJobGroupsAPIRequest
- type ListJobGroupsAPIResponse
- type ListJobTypesAPIResponse
- type ListJobsAPIRequest
- type ListJobsAPIResponse
Constants ¶
const ( // Initial states StatusInit = "initializing" StatusWaiting = "waiting" StatusResuming = "resuming" // Running states StatusRunning = "running" StatusCanceling = "canceling" // still running, but will be canceled StatusSuspending = "suspending" // still running, but will be suspended StatusLocked = "locked" // almost like running, but just indicates it's locked by another job // Intermediate states StatusSuspended = "suspended" // dequeued from the 'running' queue // Final immutable states StatusSkipped = "skipped" StatusCanceled = "canceled" StatusFailed = "failed" StatusTimedOut = "timeout" StatusCompleted = "completed" )
const JobGroupSeparator = ":"
Variables ¶
This section is empty.
Functions ¶
func JECancelJob ¶
func JEDeleteJob ¶
func RegisterJobGroup ¶
func RegisterJobGroup(group *JobGroup)
func ResumeJobsOnServerStart ¶
ResumeJobsOnServerStart finds all jobs in initializing, waiting, running, and locked states and resumes them. This function is intended to be called during server startup to catch up any jobs that might have been left in an inconsistent state due to a previous server shutdown. Jobs are processed in batches of 50 to avoid loading too many jobs into memory at once.
Types ¶
type Job ¶
type Job struct {
JobID uuid.UUID `json:"id" gorm:"index"`
TenantID uuid.UUID `json:"tenant_id" gorm:"index"`
UserID uuid.UUID `json:"user_id" gorm:"index"`
IdempotencyKey uuid.UUID `json:"idempotency_key" gorm:"index"`
Type string `json:"type" gorm:"index"`
TypePtr *JobType `json:"-" gorm:"-"`
ScheduledAtMs int64 `json:"scheduled_at" gorm:"index" doc:"unix timestamp in milliseconds"`
UpdatedAtMs int64 `json:"updated_at" gorm:"index" doc:"unix timestamp in milliseconds"`
StartedAtMs int64 `json:"started_at" gorm:"index" doc:"unix timestamp in milliseconds"`
ETAMs int64 `json:"eta" gorm:"index" readOnly:"true" doc:"unix timestamp in milliseconds"`
LockedBy uuid.UUID `json:"locked_by" gorm:"index" readOnly:"true"`
Progress float32 `json:"progress" readOnly:"true"`
ProgressDetails string `json:"progress_details" readOnly:"true"`
Status JobStatus `json:"status" gorm:"index" readOnly:"true"`
Details string `json:"details"`
TimeoutSec int `json:"timeout_sec,omitempty"` // Default is taken from JobType
MaxRetries int `json:"max_retries,omitempty"` // Default is taken from JobType
RetryDelaySec int `json:"retry_delay_sec,omitempty"`
Retries int `json:"retries" readOnly:"true"` // Track current retry count
Error string `json:"error,omitempty" readOnly:"true"`
Result string `json:"result,omitempty" readOnly:"true"`
Params string `json:"params,omitempty"` // job parameters as a JSON string
ParamsPtr interface{} `json:"-" gorm:"-"` // job parameters as a struct
}
JobIface is used for API and DB interfaces, it has exported fields for proper serialization
type JobAPI ¶
type JobAPI struct{}
func (*JobAPI) APICancelJob ¶
func (*JobAPI) APIDeleteJob ¶
func (*JobAPI) APIGetJobGroup ¶
func (*JobAPI) APIGetJobType ¶
func (*JobAPI) APIListJobGroups ¶
func (j *JobAPI) APIListJobGroups(ctx context.Context, input *ListJobGroupsAPIRequest) (*ListJobGroupsAPIResponse, error)
func (*JobAPI) APIListJobTypes ¶
func (j *JobAPI) APIListJobTypes(ctx context.Context, request *api.PageAPIRequest) (*ListJobTypesAPIResponse, error)
func (*JobAPI) APIListJobs ¶
func (j *JobAPI) APIListJobs(ctx context.Context, input *ListJobsAPIRequest) (*ListJobsAPIResponse, error)
func (*JobAPI) APIResumeJob ¶
func (*JobAPI) APIScheduleJob ¶
func (*JobAPI) APISuspendJob ¶
func (*JobAPI) JobGroupToAPIResponse ¶
func (j *JobAPI) JobGroupToAPIResponse(jobGroup *JobGroup) *JobGroupAPIResponse
func (*JobAPI) JobToAPIResponse ¶
func (j *JobAPI) JobToAPIResponse(job *JobObj) *JobAPIResponse
func (*JobAPI) JobTypeToAPIResponse ¶
func (j *JobAPI) JobTypeToAPIResponse(jobType *JobType) *JobTypeAPIResponse
type JobAPIResponse ¶
type JobAPIResponse struct {
Body JobAPIResponseItem `json:"body"`
}
type JobAPIResponseItem ¶
type JobAPIResponseItem Job
type JobGroup ¶
type JobGroup struct {
Name string `json:"name" gorm:"primaryKey"`
Queue JobQueueName `json:"queue_name"`
Description string `json:"description"`
}
func GetJobGroup ¶
func GetJobGroups ¶
func GetJobGroups(ctx context.Context, pageRequest *api.PageAPIRequest) []*JobGroup
type JobGroupAPIResponse ¶
type JobGroupAPIResponse struct {
Body JobGroupAPIResponseItem `json:"body"`
}
type JobGroupAPIResponseItem ¶
type JobGroupAPIResponseItem JobGroup
type JobObj ¶
type JobObj struct {
// contains filtered or unexported fields
}
JobObj has unexported fields and getter/setter APIs It is used for internal job operations and object safety
func (*JobObj) CheckStatusTransition ¶
CheckStatusTransition implements safe job transition checks for concurrent job operations. Returns true/false if we need to update status and error in case of false
func (*JobObj) GetIdempotencyKey ¶
func (*JobObj) GetLockedBy ¶
func (*JobObj) GetMaxRetries ¶
func (*JobObj) GetParamsPtr ¶
func (j *JobObj) GetParamsPtr() interface{}
func (*JobObj) GetProgress ¶
func (*JobObj) GetQueue ¶
func (j *JobObj) GetQueue() JobQueueName
func (*JobObj) GetRetries ¶
func (*JobObj) GetRetryDelay ¶
func (*JobObj) GetScheduledAt ¶
func (*JobObj) GetStartedAt ¶
func (*JobObj) GetStatusErrorProgressSuccess ¶
func (*JobObj) GetTenantID ¶
func (*JobObj) GetTimeoutSec ¶
func (*JobObj) GetTypePtr ¶
func (*JobObj) SetLockedBy ¶
func (*JobObj) SetProgress ¶
func (*JobObj) SetRetryPolicy ¶
type JobQueue ¶
type JobQueue struct {
// contains filtered or unexported fields
}
---------------------------------------------------------------------- JobQueue encapsulates all queue‐specific logic.
func GetJobQueue ¶
func GetJobQueue(queueName JobQueueName) (*JobQueue, error)
func NewJobQueue ¶
func NewJobQueue(name JobQueueName, capacity int) *JobQueue
NewJobQueue creates a new JobQueue.
func RegisterJobQueue ¶
func RegisterJobQueue(queueName JobQueueName, capacity int) (*JobQueue, error)
func (*JobQueue) AddRunningJob ¶
AddRunningJob records that a job is now running.
func (*JobQueue) GetNextJob ¶
GetNextJob returns the next waitingjob. If no jobs are waiting, returns nil.
func (*JobQueue) RemoveRunningJob ¶
RemoveRunningJob removes a job from the running map.
type JobQueueName ¶
type JobQueueName string
---------------------------------------------------------------------- Define JobQueueName type (string alias) for queues.
const ( JobQueueCompute JobQueueName = "compute" JobQueueMaintenance JobQueueName = "maintenance" JobQueueDownload JobQueueName = "download" )
type JobStatus ¶
type JobStatus string
JobStatus represents the current status of a job
const ( // Initial states JobStatusInit JobStatus = JobStatus(StatusInit) JobStatusWaiting JobStatus = JobStatus(StatusWaiting) JobStatusResuming JobStatus = JobStatus(StatusResuming) // Running states JobStatusRunning JobStatus = JobStatus(StatusRunning) JobStatusCanceling JobStatus = JobStatus(StatusCanceling) JobStatusSuspending JobStatus = JobStatus(StatusSuspending) JobStatusLocked JobStatus = JobStatus(StatusLocked) // Intermediate states JobStatusSuspended JobStatus = JobStatus(StatusSuspended) // Final immutable states JobStatusSkipped JobStatus = JobStatus(StatusSkipped) JobStatusCanceled JobStatus = JobStatus(StatusCanceled) JobStatusFailed JobStatus = JobStatus(StatusFailed) JobStatusTimedOut JobStatus = JobStatus(StatusTimedOut) JobStatusCompleted JobStatus = JobStatus(StatusCompleted) )
type JobType ¶
type JobType struct {
TypeID string `json:"type_id" gorm:"primaryKey"`
Description string `json:"description"`
Group string `json:"group"`
GroupPtr *JobGroup `json:"-" gorm:"foreignKey:Group;references:Name"`
Name string `json:"name"`
TimeoutSec int `json:"timeout_sec"`
MaxRetries int `json:"max_retries"` // Maximum allowed retries
RetryDelaySec int `json:"retry_delay_sec"` // Delay between retries
Params interface{} `json:"params" gorm:"-"`
ParamsSchema string `json:"params_schema" gorm:"-"`
WorkerInitCallback JobWorkerInitCallback `json:"-" gorm:"-"` // called within synchronous API call for job creation
WorkerExecutionCallback JobWorkerExecutionCallback `json:"-" gorm:"-"` // called in a go-routine as asynchronous job worker
WorkerStateUpdateCallback JobWorkerStateUpdateCallback `json:"-" gorm:"-"` // optional callback for status update, called from job execution worker
WorkerSuspendCallback JobWorkerSuspendCallback `json:"-" gorm:"-"` // optional callback for job suspension, called synchronously via API
WorkerResumeCallback JobWorkerResumeCallback `json:"-" gorm:"-"` // optional callback for job resume, called synchronously via API
WorkerIsSuspendable bool `json:"suspendable" gorm:"-"` // whether the job can be suspended/resumed
}
func GetJobType ¶
func GetJobTypes ¶
func GetJobTypes(ctx context.Context, pageRequest *api.PageAPIRequest) []*JobType
func RegisterJobType ¶
func RegisterJobType(params JobTypeParams) *JobType
type JobTypeAPIResponse ¶
type JobTypeAPIResponse struct {
Body JobTypeAPIResponseItem `json:"body"`
}
type JobTypeAPIResponseItem ¶
type JobTypeAPIResponseItem JobType
type JobTypeParams ¶
type JobTypeParams struct {
Group *JobGroup
Name string
Description string
Params interface{}
WorkerInitCallback JobWorkerInitCallback // Initial job initialisation callback that is called on job start or resume
WorkerExecutionCallback JobWorkerExecutionCallback // main worker callback that is called on job start or after resume
WorkerStateUpdateCallback JobWorkerStateUpdateCallback // optional callback for linked objects state updates
WorkerSuspendCallback JobWorkerSuspendCallback // optional callback called before job suspend
WorkerResumeCallback JobWorkerResumeCallback // optional callback called after job resume, but before InitCallback
WorkerIsSuspendable bool // if true, the job can be suspended and resumed
Timeout time.Duration
RetryDelay time.Duration
MaxRetries int
}
type JobWorkerExecutionCallback ¶
type JobWorkerExecutionCallback func(ctx context.Context, worker JobWorker, progress chan<- float32) error
JobWorkerExecutionCallback is a callback function for target worker execution. It run asynchronously in a dedicated go-routine, if error is returned then the job will be marked as failed. Otherwise it will be marked as completed successfully. This callback is called when the job is first started and again after resume. The progress channel is used to report progress updates to the job executor. On resume, the worker is responsible for setting the job progress to the last reported value.
type JobWorkerInitCallback ¶
JobWorkerInitCallback is a callback function for job paramemeter initialisation. The function returns target worker object associated with current job and an error if any. It's being called from the synchronous API handler creating the job, so if error is returned, the job creation will fail
type JobWorkerResumeCallback ¶
A callback function for job resume after suspension, called from syncrhonous API handler If error is returned the resume operation will fail.
type JobWorkerStateUpdateCallback ¶
JobWorkerStateUpdateCallback is a callback function that updates target worker properties, such as Progress, Status, Error and Success
type JobWorkerSuspendCallback ¶
A callback function for job suspension, called from syncrhonous API handler. If error is returned the suspend operation will fail.
type JobsExecutor ¶
type JobsExecutor struct {
// contains filtered or unexported fields
}
---------------------------------------------------------------------- JobsExecutor simply routes jobs into their proper queue.
type ListJobGroupsAPIRequest ¶
type ListJobGroupsAPIRequest struct {
api.PageAPIRequest
}
type ListJobGroupsAPIResponse ¶
type ListJobGroupsAPIResponse struct {
Body struct {
api.PageAPIResponse
JobGroups []JobGroupAPIResponseItem `json:"job_groups"`
} `json:"body"`
}
type ListJobTypesAPIResponse ¶
type ListJobTypesAPIResponse struct {
Body struct {
api.PageAPIResponse
JobTypes []JobTypeAPIResponseItem `json:"job_types"`
} `json:"body"`
}
type ListJobsAPIRequest ¶
type ListJobsAPIRequest struct {
api.PageAPIRequest
Status string `query:"status" doc:"Filter by status, comma separated list of statuses is supported"`
}
type ListJobsAPIResponse ¶
type ListJobsAPIResponse struct {
Body struct {
api.PageAPIResponse
Jobs []JobAPIResponseItem `json:"jobs"`
} `json:"body"`
}