model

package
v1.45.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2025 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Job statuses before processing
	JobStatusQueued    = "QUEUED"
	JobStatusScheduled = "SCHEDULED"
	// Running status is used when the job is being processed by a worker.
	JobStatusRunning = "RUNNING"
	// Job statuses after processing
	JobStatusFailed    = "FAILED"
	JobStatusSucceeded = "SUCCEEDED"
	JobStatusCancelled = "CANCELLED"
)
View Source
const (
	RETRY_BACKOFF_NONE        = "none"
	RETRY_BACKOFF_LINEAR      = "linear"
	RETRY_BACKOFF_EXPONENTIAL = "exponential"
)
View Source
const (
	WorkerStatusReady   = "READY"
	WorkerStatusRunning = "RUNNING"
	WorkerStatusFailed  = "FAILED"
	WorkerStatusStopped = "STOPPED"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchJob

type BatchJob struct {
	Task            interface{}
	Parameters      []interface{}
	ParametersKeyed map[string]interface{}
	Options         *Options
}

type Connection added in v1.9.0

type Connection struct {
	PID             int    `json:"pid"`
	Database        string `json:"database"`
	Username        string `json:"username"`
	ApplicationName string `json:"application_name"`
	Query           string `json:"query"`
	State           string `json:"state"`
}

Connection represents a database connection with its details. It is used to monitor and manage active connections to the database.

type DBTime

type DBTime struct {
	time.Time
}

DBTime is a custom time type for handling database-specific time formats.

func (*DBTime) IsSet

func (ct *DBTime) IsSet() bool

func (DBTime) MarshalJSON

func (ct DBTime) MarshalJSON() ([]byte, error)

func (*DBTime) UnmarshalJSON

func (ct *DBTime) UnmarshalJSON(b []byte) error

type Job

type Job struct {
	ID              int             `json:"id"`
	RID             uuid.UUID       `json:"rid"`
	WorkerID        int             `json:"worker_id"`
	WorkerRID       uuid.UUID       `json:"worker_rid"`
	Options         *Options        `json:"options"`
	TaskName        string          `json:"task_name"`
	Parameters      Parameters      `json:"parameters"`
	ParametersKeyed ParametersKeyed `json:"parameters_keyed"`
	Status          string          `json:"status"`
	ScheduledAt     *time.Time      `json:"scheduled_at"`
	StartedAt       *time.Time      `json:"started_at"`
	ScheduleCount   int             `json:"schedule_count"`
	Attempts        int             `json:"attempts"`
	Results         Parameters      `json:"result"`
	Error           string          `json:"error"`
	CreatedAt       time.Time       `json:"created_at"`
	UpdatedAt       time.Time       `json:"updated_at"`
}

Job represents an assigned task to a worker. It contains all necessary information to execute the task, including OnError and Schedule options, parameters, and status.

ID, RID, CreatedAt, and UpdatedAt are set automatically on creation.

Status, ScheduledAt, StartedAt, ScheduleCount, Attempts, Results, Error, CreatedAt, and UpdatedAt are set automatically on update.

func NewJob

func NewJob(task interface{}, options *Options, parametersKeyed map[string]interface{}, parameters ...interface{}) (*Job, error)

NewJob creates a new Job instance with the provided task, options, and parameters. It validates the task name and options, and initializes the job status and scheduled time if applicable. It returns a pointer to the new Job instance or an error if something is invalid.

type JobFromNotification

type JobFromNotification struct {
	ID              int             `json:"id"`
	RID             uuid.UUID       `json:"rid"`
	WorkerID        int             `json:"worker_id"`
	WorkerRID       uuid.UUID       `json:"worker_rid"`
	Options         *Options        `json:"options"`
	TaskName        string          `json:"task_name"`
	Parameters      Parameters      `json:"parameters"`
	ParametersKeyed ParametersKeyed `json:"parameters_keyed"`
	Status          string          `json:"status"`
	ScheduledAt     DBTime          `json:"scheduled_at"`
	StartedAt       DBTime          `json:"started_at"`
	Attempts        int             `json:"attempts"`
	Results         Parameters      `json:"result"`
	Error           string          `json:"error"`
	CreatedAt       DBTime          `json:"created_at"`
	UpdatedAt       DBTime          `json:"updated_at"`
}

JobFromNotification represents a job received from a notification. It contains all fields from Job, but with DBTime for time fields to handle database-specific time formats.

func (*JobFromNotification) ToJob

func (jn *JobFromNotification) ToJob() *Job

ToJob converts a JobFromNotification to a Job instance.

type Master added in v1.6.0

type Master struct {
	ID        int            `json:"id"`
	Settings  MasterSettings `json:"settings"`
	WorkerID  int            `json:"worker_id"`
	WorkerRID uuid.UUID      `json:"worker_rid"`
	CreatedAt time.Time      `json:"created_at"`
	UpdatedAt time.Time      `json:"updated_at"`
}

type MasterSettings added in v1.6.0

type MasterSettings struct {
	// MasterLockTimeout is the duration after which a master lock
	// is considered stale and a new worker can get master.
	MasterLockTimeout time.Duration `json:"master_lock_timeout"`
	// MasterPollInterval is the interval at which the master worker
	// updates the master entry to stay master.
	MasterPollInterval time.Duration `json:"master_poll_interval"`
	// JobDeleteThreshold is the duration for which archived data is retained.
	JobDeleteThreshold time.Duration `json:"retention_archive"`
	// WorkerStaleThreshold is the duration after which a worker
	// is considered stale if it hasn't updated its heartbeat
	// and gets updated to status STOPPED.
	WorkerStaleThreshold time.Duration `json:"worker_stale_threshold"`
	// WorkerDeleteThreshold is the duration after which a stale worker
	// is deleted from the database.
	WorkerDeleteThreshold time.Duration `json:"worker_delete_threshold"`
	// JobStaleThreshold is the duration after which a job
	// is considered stale if it hasn't been updated
	// and gets updated to status CANCELED.
	JobStaleThreshold time.Duration `json:"job_stale_threshold"`
}

func (MasterSettings) Marshal added in v1.6.0

func (r MasterSettings) Marshal() ([]byte, error)

func (*MasterSettings) Scan added in v1.6.0

func (c *MasterSettings) Scan(value interface{}) error

func (*MasterSettings) SetDefault added in v1.24.0

func (r *MasterSettings) SetDefault()

func (*MasterSettings) Unmarshal added in v1.6.0

func (r *MasterSettings) Unmarshal(value interface{}) error

func (MasterSettings) Value added in v1.6.0

func (c MasterSettings) Value() (driver.Value, error)

type NextIntervalFunc

type NextIntervalFunc func(start time.Time, currentCount int) time.Time

NextIntervalFunc defines a function type that calculates the next interval based on the start time and the current count of executions.

type OnError

type OnError struct {
	Timeout      float64 `json:"timeout"`
	MaxRetries   int     `json:"max_retries"`
	RetryDelay   float64 `json:"retry_delay"`
	RetryBackoff string  `json:"retry_backoff"`
}

OptionsOnError represents the options to handle errors during job execution. It includes timeout, maximum retries, retry delay, and retry backoff strategy. It is used to define how the system should behave when a job fails.

Parameters: - Timeout is the maximum time in seconds to wait for a job to complete before considering it failed. - MaxRetries is the maximum number of retries to attempt if a job fails. - RetryDelay is the delay in seconds before retrying a failed job. - RetryBackoff is the strategy to use for retrying failed jobs. It can be one of the following:

  • none: no retry, the job will fail immediately.
  • linear: retry with a fixed delay.
  • exponential: retry with an exponentially increasing delay. If RetryBackoff is not provided, it defaults to "none".

func (*OnError) IsValid

func (c *OnError) IsValid() error

IsValid checks if the OnError options are valid. Timeout, MaxRetries, and RetryDelay must be non-negative. RetryBackoff must be one of the predefined strategies: none, linear, or exponential. It returns an error if any of the conditions are not met.

func (OnError) Marshal

func (r OnError) Marshal() ([]byte, error)

func (*OnError) Scan

func (c *OnError) Scan(value interface{}) error

func (*OnError) Unmarshal

func (r *OnError) Unmarshal(value interface{}) error

func (OnError) Value

func (c OnError) Value() (driver.Value, error)

type Options

type Options struct {
	OnError  *OnError  `json:"on_error,omitempty"`
	Schedule *Schedule `json:"schedule,omitempty"`
}

func (*Options) IsValid

func (c *Options) IsValid() error

func (Options) Marshal

func (r Options) Marshal() ([]byte, error)

func (*Options) Scan

func (c *Options) Scan(value interface{}) error

func (*Options) Unmarshal

func (r *Options) Unmarshal(value interface{}) error

func (Options) Value

func (c Options) Value() (driver.Value, error)

type Parameters

type Parameters []interface{}

func (Parameters) Marshal

func (r Parameters) Marshal() ([]byte, error)

func (*Parameters) Scan

func (c *Parameters) Scan(value interface{}) error

func (*Parameters) ToInterfaceSlice

func (r *Parameters) ToInterfaceSlice() []interface{}

func (*Parameters) ToReflectValues

func (r *Parameters) ToReflectValues() []reflect.Value

func (*Parameters) Unmarshal

func (r *Parameters) Unmarshal(value interface{}) error

func (Parameters) Value

func (c Parameters) Value() (driver.Value, error)

type ParametersKeyed added in v1.41.0

type ParametersKeyed map[string]interface{}

func (ParametersKeyed) Marshal added in v1.41.0

func (r ParametersKeyed) Marshal() ([]byte, error)

func (*ParametersKeyed) Scan added in v1.41.0

func (c *ParametersKeyed) Scan(value interface{}) error

func (*ParametersKeyed) Unmarshal added in v1.41.0

func (r *ParametersKeyed) Unmarshal(value interface{}) error

func (ParametersKeyed) Value added in v1.41.0

func (c ParametersKeyed) Value() (driver.Value, error)

type Schedule

type Schedule struct {
	Start        time.Time     `json:"start"`
	MaxCount     int           `json:"max_count"`
	Interval     time.Duration `json:"interval"`
	NextInterval string        `json:"next_interval,omitempty"`
}

Schedule represents the scheduling options for a job. It includes the start time, maximum count of executions, interval between executions, and an optional next interval function to calculate the next execution time. It is used to define how often a job should be executed.

Parameters: - Start is the time when the job should start executing. - If MaxCount is 0, the job will run indefinitely. - If MaxCount greater 0, the job will run MaxCount times. - Interval is the duration between executions. - If MaxCount is equal or greater than 0, Interval must be greater than zero or NextInterval must be provided. - If MaxCount is 1, NextInterval can be provided to specify the next execution time.

func (*Schedule) IsValid

func (c *Schedule) IsValid() error

IsValid checks if the Schedule options are valid. Start time must not be zero, MaxCount must be non-negative, Interval must be greater than zero if MaxCount is greater than 1, or NextInterval must be provided.

func (Schedule) Marshal

func (r Schedule) Marshal() ([]byte, error)

func (*Schedule) Scan

func (c *Schedule) Scan(value interface{}) error

func (*Schedule) Unmarshal

func (r *Schedule) Unmarshal(value interface{}) error

func (Schedule) Value

func (c Schedule) Value() (driver.Value, error)

type Task

type Task struct {
	Task             interface{}
	Name             string
	InputParameters  []reflect.Type
	OutputParameters []reflect.Type
}

Task represents a job task with its function, name, input parameters, and output parameters. It is used to define a job that can be executed by the queuer system.

Parameters: - Task is the function that will be executed as a job. - Name is the name of the task, which should be unique and descriptive. - InputParameters is a slice of reflect.Type representing the types of input parameters for the task.

func NewTask

func NewTask(task interface{}) (*Task, error)

func NewTaskWithName

func NewTaskWithName(task interface{}, taskName string) (*Task, error)

type Worker

type Worker struct {
	ID                         int       `json:"id"`
	RID                        uuid.UUID `json:"rid"`
	Name                       string    `json:"name"`
	Options                    *OnError  `json:"options,omitempty"`
	MaxConcurrency             int       `json:"max_concurrency"`
	AvailableTasks             []string  `json:"available_tasks,omitempty"`
	AvailableNextIntervalFuncs []string  `json:"available_next_interval,omitempty"`
	Status                     string    `json:"status"`
	CreatedAt                  time.Time `json:"created_at"`
	UpdatedAt                  time.Time `json:"updated_at"`
}

Worker represents a worker that can execute tasks. It includes the worker's ID, name, options for error handling, maximum concurrency, available tasks, and status.

ID, RID, Status, CreatedAt, and UpdatedAt are set automatically on creation.

Parameters:

  • Name is the name of the worker, which should be unique and descriptive.
  • Options is an optional field that can be used to specify error handling options. If the Job has options set, the Job options are used as primary options.
  • MaxConcurrency is the maximum number of tasks that can be executed concurrently by the worker.
  • AvailableTasks is a list of task names that the worker can execute.
  • AvailableNextIntervalFuncs is a list of next interval functions that the worker can use for

func NewWorker

func NewWorker(name string, maxConcurrency int) (*Worker, error)

NewWorker creates a new Worker with the specified name and maximum concurrency. It validates the name and maximum concurrency, and initializes the worker status to running. It returns a pointer to the new Worker instance or an error if something is invalid.

func NewWorkerWithOptions

func NewWorkerWithOptions(name string, maxConcurrency int, options *OnError) (*Worker, error)

NewWorkerWithOptions creates a new Worker with the specified name, maximum concurrency, and error handling options. It validates the name, maximum concurrency, and options, and initializes the worker status to running. It returns a pointer to the new Worker instance or an error if something is invalid.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL