Documentation
¶
Index ¶
- Constants
- func NextRunAt(rule RecurrenceRuleInterface, now *carbon.Carbon) (*carbon.Carbon, error)
- type DayOfWeek
- type Frequency
- type MonthOfYear
- type NewStoreOptions
- type RecurrenceRuleInterface
- type ScheduleInterface
- type ScheduleQuery
- func (q *ScheduleQuery) ID() string
- func (q *ScheduleQuery) Limit() int
- func (q *ScheduleQuery) Name() string
- func (q *ScheduleQuery) Offset() int
- func (q *ScheduleQuery) QueueName() string
- func (q *ScheduleQuery) SetID(id string) ScheduleQueryInterface
- func (q *ScheduleQuery) SetLimit(limit int) ScheduleQueryInterface
- func (q *ScheduleQuery) SetName(name string) ScheduleQueryInterface
- func (q *ScheduleQuery) SetOffset(offset int) ScheduleQueryInterface
- func (q *ScheduleQuery) SetQueueName(queueName string) ScheduleQueryInterface
- func (q *ScheduleQuery) SetStatus(status string) ScheduleQueryInterface
- func (q *ScheduleQuery) SetTaskDefinitionID(taskDefinitionID string) ScheduleQueryInterface
- func (q *ScheduleQuery) Status() string
- func (q *ScheduleQuery) TaskDefinitionID() string
- type ScheduleQueryInterface
- type ScheduleRunnerInterface
- type ScheduleRunnerOptions
- type Store
- func (st *Store) AutoMigrate() error
- func (st *Store) EnableDebug(debugEnabled bool) StoreInterface
- func (store *Store) QueuedTaskForceFail(ctx context.Context, queuedTask TaskQueueInterface, waitMinutes int) error
- func (store *Store) QueuedTaskProcessWithContext(ctx context.Context, queuedTask TaskQueueInterface) (bool, error)
- func (store *Store) ScheduleCount(ctx context.Context, options ScheduleQueryInterface) (int64, error)
- func (store *Store) ScheduleCreate(ctx context.Context, schedule ScheduleInterface) error
- func (store *Store) ScheduleDelete(ctx context.Context, schedule ScheduleInterface) error
- func (store *Store) ScheduleDeleteByID(ctx context.Context, id string) error
- func (store *Store) ScheduleFindByID(ctx context.Context, id string) (ScheduleInterface, error)
- func (store *Store) ScheduleList(ctx context.Context, options ScheduleQueryInterface) ([]ScheduleInterface, error)
- func (store *Store) ScheduleRun(ctx context.Context) error
- func (store *Store) ScheduleSoftDelete(ctx context.Context, schedule ScheduleInterface) error
- func (store *Store) ScheduleSoftDeleteByID(ctx context.Context, id string) error
- func (store *Store) ScheduleUpdate(ctx context.Context, schedule ScheduleInterface) error
- func (st *Store) SetErrorHandler(handler func(queueName, taskID string, err error)) StoreInterface
- func (st *Store) SqlCreateScheduleTable() string
- func (st *Store) SqlCreateTaskDefinitionTable() string
- func (st *Store) SqlCreateTaskQueueTable() string
- func (store *Store) TaskDefinitionCount(ctx context.Context, options TaskDefinitionQueryInterface) (int64, error)
- func (store *Store) TaskDefinitionCreate(ctx context.Context, task TaskDefinitionInterface) error
- func (store *Store) TaskDefinitionDelete(ctx context.Context, task TaskDefinitionInterface) error
- func (store *Store) TaskDefinitionDeleteByID(ctx context.Context, id string) error
- func (st *Store) TaskDefinitionEnqueueByAlias(ctx context.Context, queueName string, taskAlias string, ...) (TaskQueueInterface, error)
- func (store *Store) TaskDefinitionExecuteCli(alias string, args []string) bool
- func (store *Store) TaskDefinitionFindByAlias(ctx context.Context, alias string) (task TaskDefinitionInterface, err error)
- func (store *Store) TaskDefinitionFindByID(ctx context.Context, id string) (task TaskDefinitionInterface, err error)
- func (store *Store) TaskDefinitionList(ctx context.Context, query TaskDefinitionQueryInterface) ([]TaskDefinitionInterface, error)
- func (store *Store) TaskDefinitionSoftDelete(ctx context.Context, task TaskDefinitionInterface) error
- func (store *Store) TaskDefinitionSoftDeleteByID(ctx context.Context, id string) error
- func (store *Store) TaskDefinitionUpdate(ctx context.Context, task TaskDefinitionInterface) error
- func (store *Store) TaskHandlerAdd(ctx context.Context, taskHandler TaskHandlerInterface, createIfMissing bool) error
- func (store *Store) TaskHandlerList() []TaskHandlerInterface
- func (store *Store) TaskQueueClaimNext(ctx context.Context, queueName string) (TaskQueueInterface, error)
- func (store *Store) TaskQueueCount(ctx context.Context, options TaskQueueQueryInterface) (int64, error)
- func (store *Store) TaskQueueCreate(ctx context.Context, queue TaskQueueInterface) error
- func (store *Store) TaskQueueDelete(ctx context.Context, queue TaskQueueInterface) error
- func (st *Store) TaskQueueDeleteByID(ctx context.Context, id string) error
- func (st *Store) TaskQueueFail(ctx context.Context, queue TaskQueueInterface) error
- func (store *Store) TaskQueueFindByID(ctx context.Context, id string) (TaskQueueInterface, error)
- func (store *Store) TaskQueueFindNextQueuedTask(ctx context.Context) (TaskQueueInterface, error)
- func (store *Store) TaskQueueFindNextQueuedTaskByQueue(ctx context.Context, queueName string) (TaskQueueInterface, error)
- func (store *Store) TaskQueueFindRunning(ctx context.Context, limit int) []TaskQueueInterface
- func (store *Store) TaskQueueFindRunningByQueue(ctx context.Context, queueName string, limit int) []TaskQueueInterface
- func (store *Store) TaskQueueList(ctx context.Context, query TaskQueueQueryInterface) ([]TaskQueueInterface, error)
- func (store *Store) TaskQueueProcessNext(ctx context.Context) error
- func (store *Store) TaskQueueProcessNextAsyncByQueue(ctx context.Context, queueName string) error
- func (store *Store) TaskQueueProcessNextByQueue(ctx context.Context, queueName string) error
- func (store *Store) TaskQueueProcessTask(ctx context.Context, queuedTask TaskQueueInterface) (bool, error)
- func (store *Store) TaskQueueRunConcurrent(ctx context.Context, queueName string, processSeconds int, unstuckMinutes int)
- func (store *Store) TaskQueueRunDefault(ctx context.Context, processSeconds int, unstuckMinutes int)
- func (store *Store) TaskQueueRunSerial(ctx context.Context, queueName string, processSeconds int, unstuckMinutes int)
- func (store *Store) TaskQueueSoftDelete(ctx context.Context, queue TaskQueueInterface) error
- func (store *Store) TaskQueueSoftDeleteByID(ctx context.Context, id string) error
- func (store *Store) TaskQueueStop()
- func (store *Store) TaskQueueStopByName(queueName string)
- func (st *Store) TaskQueueSuccess(ctx context.Context, queue TaskQueueInterface) error
- func (store *Store) TaskQueueUnstuck(ctx context.Context, waitMinutes int)
- func (store *Store) TaskQueueUnstuckByQueue(ctx context.Context, queueName string, waitMinutes int)
- func (store *Store) TaskQueueUpdate(ctx context.Context, queue TaskQueueInterface) error
- type StoreInterface
- type TaskDefinitionInterface
- type TaskDefinitionQueryInterface
- type TaskHandlerBase
- func (handler *TaskHandlerBase) ErrorMessage() string
- func (handler *TaskHandlerBase) GetParam(paramName string) string
- func (handler *TaskHandlerBase) GetParamArray(paramName string) []string
- func (handler *TaskHandlerBase) HasQueuedTask() bool
- func (handler *TaskHandlerBase) InfoMessage() string
- func (handler *TaskHandlerBase) LogError(message string)
- func (handler *TaskHandlerBase) LogInfo(message string)
- func (handler *TaskHandlerBase) LogSuccess(message string)
- func (handler *TaskHandlerBase) Options() map[string]string
- func (handler *TaskHandlerBase) QueuedTask() TaskQueueInterface
- func (handler *TaskHandlerBase) SetOptions(options map[string]string)
- func (handler *TaskHandlerBase) SetQueuedTask(queuedTask TaskQueueInterface)
- func (handler *TaskHandlerBase) SuccessMessage() string
- type TaskHandlerInterface
- type TaskHandlerWithContext
- type TaskQueueInterface
- type TaskQueueQueryInterface
- type TaskQueueRunnerInterface
- type TaskQueueRunnerOptions
Constants ¶
const ASC = "asc"
const COLUMN_ALIAS = "alias"
const COLUMN_ATTEMPTS = "attempts"
const COLUMN_COMPLETED_AT = "completed_at"
const COLUMN_CREATED_AT = "created_at"
const COLUMN_DESCRIPTION = "description"
const COLUMN_DETAILS = "details"
const COLUMN_END_AT = "end_at"
const COLUMN_EXECUTION_COUNT = "execution_count"
const COLUMN_ID = "id"
const COLUMN_IS_RECURRING = "is_recurring"
const COLUMN_LAST_RUN_AT = "last_run_at"
const COLUMN_MAX_EXECUTION_COUNT = "max_execution_count"
const COLUMN_MEMO = "memo"
const COLUMN_METAS = "metas"
const COLUMN_NAME = "name"
const COLUMN_NEXT_RUN_AT = "next_run_at"
const COLUMN_OUTPUT = "output"
const COLUMN_PARAMETERS = "parameters"
const COLUMN_QUEUE_NAME = "queue_name"
const COLUMN_RECURRENCE_RULE = "recurrence_rule"
const COLUMN_SOFT_DELETED_AT = "soft_deleted_at"
const COLUMN_STARTED_AT = "started_at"
const COLUMN_START_AT = "start_at"
const COLUMN_STATUS = "status"
const COLUMN_TASK_DEFINITION_ID = "task_definition_id"
const COLUMN_TASK_ID = "task_id"
const COLUMN_TITLE = "title"
const COLUMN_UPDATED_AT = "updated_at"
const DESC = "desc"
const DefaultQueueName = "default"
const TaskDefinitionStatusActive = "active"
const TaskDefinitionStatusCanceled = "canceled"
const TaskQueueStatusCanceled = "canceled"
const TaskQueueStatusDeleted = "deleted"
const TaskQueueStatusFailed = "failed"
const TaskQueueStatusPaused = "paused"
const TaskQueueStatusQueued = "queued"
const TaskQueueStatusRunning = "running"
const TaskQueueStatusSuccess = "success"
Variables ¶
This section is empty.
Functions ¶
Types ¶
type DayOfWeek ¶
type DayOfWeek string
DayOfWeek represents a day of the week used in weekly recurrence rules.
type Frequency ¶
type Frequency string
Define a string type alias Frequency represents how often a schedule recurs (daily, weekly, etc.). It is a string-based alias compatible with rrule-go frequencies.
const ( FrequencyNone Frequency = "none" FrequencySecondly Frequency = "secondly" FrequencyMinutely Frequency = "minutely" FrequencyHourly Frequency = "hourly" FrequencyDaily Frequency = "daily" FrequencyWeekly Frequency = "weekly" FrequencyMonthly Frequency = "monthly" FrequencyYearly Frequency = "yearly" )
Define the constants as strings
type MonthOfYear ¶
type MonthOfYear string
MonthOfYear represents a month used in yearly or monthly recurrence rules.
const ( MonthOfYearJanuary MonthOfYear = "JANUARY" MonthOfYearFebruary MonthOfYear = "FEBRUARY" MonthOfYearMarch MonthOfYear = "MARCH" MonthOfYearApril MonthOfYear = "APRIL" MonthOfYearMay MonthOfYear = "MAY" MonthOfYearJune MonthOfYear = "JUNE" MonthOfYearJuly MonthOfYear = "JULY" MonthOfYearAugust MonthOfYear = "AUGUST" MonthOfYearSeptember MonthOfYear = "SEPTEMBER" MonthOfYearOctober MonthOfYear = "OCTOBER" MonthOfYearNovember MonthOfYear = "NOVEMBER" MonthOfYearDecember MonthOfYear = "DECEMBER" )
type NewStoreOptions ¶
type NewStoreOptions struct {
TaskDefinitionTableName string
TaskQueueTableName string
ScheduleTableName string
DB *sql.DB
DbDriverName string
AutomigrateEnabled bool
DebugEnabled bool
MaxConcurrency int // Max concurrent tasks (default: 10, 0 = unlimited)
ErrorHandler func(queueName, taskID string, err error) // Optional error callback
}
NewStoreOptions define the options for creating a new task store
type RecurrenceRuleInterface ¶ added in v1.10.0
type RecurrenceRuleInterface interface {
// GetFrequency returns how often the rule recurs (e.g. daily, weekly).
GetFrequency() Frequency
// SetFrequency sets how often the rule recurs.
SetFrequency(Frequency) RecurrenceRuleInterface
// GetStartsAt returns the UTC datetime when the rule becomes active.
GetStartsAt() string
// SetStartsAt sets the UTC datetime when the rule becomes active.
SetStartsAt(dateTimeUTC string) RecurrenceRuleInterface
// GetEndsAt returns the UTC datetime when the rule stops producing occurrences.
GetEndsAt() string
// SetEndsAt sets the UTC datetime when the rule stops producing occurrences.
SetEndsAt(dateTimeUTC string) RecurrenceRuleInterface
// GetInterval returns the step interval between occurrences (e.g. every N days).
GetInterval() int
// SetInterval sets the step interval between occurrences.
SetInterval(int) RecurrenceRuleInterface
// GetDaysOfWeek returns the days of the week the rule applies to (for weekly rules).
GetDaysOfWeek() []DayOfWeek
// SetDaysOfWeek sets the days of the week the rule applies to (for weekly rules).
SetDaysOfWeek([]DayOfWeek) RecurrenceRuleInterface
// GetDaysOfMonth returns the days of the month the rule applies to.
GetDaysOfMonth() []int
// SetDaysOfMonth sets the days of the month the rule applies to.
SetDaysOfMonth([]int) RecurrenceRuleInterface
// GetMonthsOfYear returns the months of the year the rule applies to.
GetMonthsOfYear() []MonthOfYear
// SetMonthsOfYear sets the months of the year the rule applies to.
SetMonthsOfYear([]MonthOfYear) RecurrenceRuleInterface
}
RecurrenceRuleInterface defines the contract for recurrence rules used by schedules. It exposes frequency, start/end times, interval, and optional day/month filters.
func NewRecurrenceRule ¶
func NewRecurrenceRule() RecurrenceRuleInterface
NewRecurrenceRule creates a new recurrence rule with default values. By default, it has no end time (MAX_DATETIME) and an interval of 1.
type ScheduleInterface ¶
type ScheduleInterface interface {
// ID the unique identifier of the schedule
ID() string
// SetID sets the unique identifier of the schedule
SetID(string) ScheduleInterface
// Name the name of the schedule
Name() string
// SetName sets the name of the schedule
SetName(string) ScheduleInterface
// Description the description of the schedule
Description() string
// SetDescription sets the description of the schedule
SetDescription(string) ScheduleInterface
// Status the status of the schedule
// Valid values are "draft" (default), "active", "inactive"
Status() string
// SetStatus sets the status of the schedule
SetStatus(string) ScheduleInterface
// RecurrenceRule the recurrence rule that defines when the schedule should run
RecurrenceRule() RecurrenceRuleInterface
// SetRecurrenceRule sets the recurrence rule that defines when the schedule should run
SetRecurrenceRule(RecurrenceRuleInterface) ScheduleInterface
// QueueName the name of the queue that this schedule is associated with
QueueName() string
// SetQueueName sets the name of the queue that this schedule is associated with
SetQueueName(string) ScheduleInterface
// TaskDefinitionID the unique identifier of the task definition
// that this schedule is associated with
TaskDefinitionID() string
// SetTaskDefinitionID sets the unique identifier of the task definition
// that this schedule is associated with
SetTaskDefinitionID(string) ScheduleInterface
// TaskParameters the parameters to be passed to the task definition
// when it is executed
TaskParameters() map[string]any
// SetTaskParameters sets the parameters to be passed to the task definition
// when it is executed
SetTaskParameters(map[string]any) ScheduleInterface
// StartAt the start date and time of the schedule
StartAt() string
// SetStartAt sets the start date and time of the schedule
// If startAt is not set, the schedule will start at the current time
SetStartAt(string) ScheduleInterface
// EndAt the end date and time of the schedule
// The default value is the maximum datetime (never expires)
EndAt() string
// SetEndAt sets the end date and time of the schedule
SetEndAt(string) ScheduleInterface
// ExecutionCount the number of times the schedule has been executed
ExecutionCount() int
// SetExecutionCount sets the number of times the schedule has been executed
SetExecutionCount(int) ScheduleInterface
// MaxExecutionCount the maximum number of times the schedule is allowed to be executed
// The default value is int max (no limit)
// To execute only once, set maxExecutionCount to 1
MaxExecutionCount() int
// SetMaxExecutionCount sets the maximum number of times the schedule is allowed to be executed
SetMaxExecutionCount(int) ScheduleInterface
// LastRunAt the last date and time the schedule was executed
LastRunAt() string
// SetLastRunAt sets the last date and time the schedule was executed
SetLastRunAt(string) ScheduleInterface
// NextRunAt the next date and time the schedule is scheduled to run
NextRunAt() string
// SetNextRunAt sets the next date and time the schedule is scheduled to run
SetNextRunAt(string) ScheduleInterface
// CreatedAt the date and time the schedule was created
CreatedAt() string
// SetCreatedAt sets the date and time the schedule was created
SetCreatedAt(string) ScheduleInterface
// UpdatedAt the date and time the schedule was last updated
UpdatedAt() string
// SetUpdatedAt sets the date and time the schedule was last updated
SetUpdatedAt(string) ScheduleInterface
// SoftDeletedAt the date and time the schedule was soft deleted
// The default value is max datetime (not soft deleted, 9999-12-31 23:59:59)
// To soft delete a schedule, set softDeletedAt to the current time
// To unsoft delete a schedule, set softDeletedAt to max datetime
// A soft deleted schedule is when its in the past
SoftDeletedAt() string
// SetSoftDeletedAt sets the date and time the schedule was soft deleted
SetSoftDeletedAt(string) ScheduleInterface
// HasReachedEndDate returns true if the schedule has reached its end date
HasReachedEndDate() bool
// HasReachedMaxExecutions returns true if the schedule has reached its maximum number of executions
HasReachedMaxExecutions() bool
// GetNextOccurrence returns the next occurrence of the schedule
// if invalid recurrence rule, returns error
GetNextOccurrence() (string, error)
// IncrementExecutionCount increments the execution count of the schedule by one
IncrementExecutionCount() ScheduleInterface
// UpdateNextRunAt calculates the next run at of the schedule and updates it
UpdateNextRunAt() ScheduleInterface
// UpdateLastRunAt updates the last run at of the schedule with current time
UpdateLastRunAt() ScheduleInterface
// IsDue returns true if the schedule is due to run
IsDue() bool
}
ScheduleInterface defines the contract for a schedule, including its identity, metadata, recurrence rule, timing fields, execution limits, soft-delete semantics, and helper methods for evaluating schedule state.
func NewSchedule ¶ added in v1.15.0
func NewSchedule() ScheduleInterface
NewSchedule creates a new schedule with default values and a new recurrence rule.
type ScheduleQuery ¶ added in v1.15.0
type ScheduleQuery struct {
// contains filtered or unexported fields
}
func (*ScheduleQuery) ID ¶ added in v1.15.0
func (q *ScheduleQuery) ID() string
func (*ScheduleQuery) Limit ¶ added in v1.15.0
func (q *ScheduleQuery) Limit() int
func (*ScheduleQuery) Name ¶ added in v1.15.0
func (q *ScheduleQuery) Name() string
func (*ScheduleQuery) Offset ¶ added in v1.15.0
func (q *ScheduleQuery) Offset() int
func (*ScheduleQuery) QueueName ¶ added in v1.15.0
func (q *ScheduleQuery) QueueName() string
func (*ScheduleQuery) SetID ¶ added in v1.15.0
func (q *ScheduleQuery) SetID(id string) ScheduleQueryInterface
func (*ScheduleQuery) SetLimit ¶ added in v1.15.0
func (q *ScheduleQuery) SetLimit(limit int) ScheduleQueryInterface
func (*ScheduleQuery) SetName ¶ added in v1.15.0
func (q *ScheduleQuery) SetName(name string) ScheduleQueryInterface
func (*ScheduleQuery) SetOffset ¶ added in v1.15.0
func (q *ScheduleQuery) SetOffset(offset int) ScheduleQueryInterface
func (*ScheduleQuery) SetQueueName ¶ added in v1.15.0
func (q *ScheduleQuery) SetQueueName(queueName string) ScheduleQueryInterface
func (*ScheduleQuery) SetStatus ¶ added in v1.15.0
func (q *ScheduleQuery) SetStatus(status string) ScheduleQueryInterface
func (*ScheduleQuery) SetTaskDefinitionID ¶ added in v1.15.0
func (q *ScheduleQuery) SetTaskDefinitionID(taskDefinitionID string) ScheduleQueryInterface
func (*ScheduleQuery) Status ¶ added in v1.15.0
func (q *ScheduleQuery) Status() string
func (*ScheduleQuery) TaskDefinitionID ¶ added in v1.15.0
func (q *ScheduleQuery) TaskDefinitionID() string
type ScheduleQueryInterface ¶ added in v1.15.0
type ScheduleQueryInterface interface {
// ID the unique identifier of the schedule to filter by
ID() string
// SetID sets the unique identifier of the schedule to filter by
SetID(string) ScheduleQueryInterface
// Name the name of the schedule to filter by
Name() string
// SetName sets the name of the schedule to filter by
SetName(string) ScheduleQueryInterface
// Status the status of the schedule to filter by
Status() string
// SetStatus sets the status of the schedule to filter by
SetStatus(string) ScheduleQueryInterface
// QueueName the name of the queue that schedules are associated with to filter by
QueueName() string
// SetQueueName sets the name of the queue that schedules are associated with to filter by
SetQueueName(string) ScheduleQueryInterface
// TaskDefinitionID the unique identifier of the task definition that schedules are associated with to filter by
TaskDefinitionID() string
// SetTaskDefinitionID sets the unique identifier of the task definition that schedules are associated with to filter by
SetTaskDefinitionID(string) ScheduleQueryInterface
// Limit the maximum number of schedules to return
Limit() int
// SetLimit sets the maximum number of schedules to return
SetLimit(int) ScheduleQueryInterface
// Offset the number of schedules to skip before starting to return results
Offset() int
// SetOffset sets the number of schedules to skip before starting to return results
SetOffset(int) ScheduleQueryInterface
}
ScheduleQueryInterface defines the query parameters used to filter and paginate schedules when listing or counting them.
func NewScheduleQuery ¶ added in v1.15.0
func NewScheduleQuery() ScheduleQueryInterface
type ScheduleRunnerInterface ¶ added in v1.15.0
type ScheduleRunnerInterface interface {
Start(ctx context.Context)
Stop()
IsRunning() bool
RunOnce(ctx context.Context) error
SetInitialRuns(ctx context.Context) error
}
func NewScheduleRunner ¶ added in v1.15.0
func NewScheduleRunner(store StoreInterface, opts ScheduleRunnerOptions) ScheduleRunnerInterface
type ScheduleRunnerOptions ¶ added in v1.15.0
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store defines a session store
func NewStore ¶
func NewStore(opts NewStoreOptions) (*Store, error)
NewStore creates a new task store
func (*Store) EnableDebug ¶
func (st *Store) EnableDebug(debugEnabled bool) StoreInterface
EnableDebug - enables the debug option
func (*Store) QueuedTaskForceFail ¶
func (*Store) QueuedTaskProcessWithContext ¶ added in v1.10.0
func (store *Store) QueuedTaskProcessWithContext(ctx context.Context, queuedTask TaskQueueInterface) (bool, error)
QueuedTaskProcessWithContext processes a queued task with context support. It checks if the handler implements TaskHandlerWithContext and uses that if available, otherwise falls back to the standard Handle() method for backward compatibility.
func (*Store) ScheduleCount ¶ added in v1.15.0
func (*Store) ScheduleCreate ¶ added in v1.15.0
func (store *Store) ScheduleCreate(ctx context.Context, schedule ScheduleInterface) error
func (*Store) ScheduleDelete ¶ added in v1.15.0
func (store *Store) ScheduleDelete(ctx context.Context, schedule ScheduleInterface) error
func (*Store) ScheduleDeleteByID ¶ added in v1.15.0
func (*Store) ScheduleFindByID ¶ added in v1.15.0
func (*Store) ScheduleList ¶ added in v1.15.0
func (store *Store) ScheduleList(ctx context.Context, options ScheduleQueryInterface) ([]ScheduleInterface, error)
func (*Store) ScheduleRun ¶ added in v1.15.0
func (*Store) ScheduleSoftDelete ¶ added in v1.15.0
func (store *Store) ScheduleSoftDelete(ctx context.Context, schedule ScheduleInterface) error
func (*Store) ScheduleSoftDeleteByID ¶ added in v1.15.0
func (*Store) ScheduleUpdate ¶ added in v1.15.0
func (store *Store) ScheduleUpdate(ctx context.Context, schedule ScheduleInterface) error
func (*Store) SetErrorHandler ¶ added in v1.10.0
func (st *Store) SetErrorHandler(handler func(queueName, taskID string, err error)) StoreInterface
SetErrorHandler - sets a custom error handler for queue processing errors
func (*Store) SqlCreateScheduleTable ¶ added in v1.15.0
SqlCreateScheduleTable - creates the schedule table
func (*Store) SqlCreateTaskDefinitionTable ¶ added in v1.10.0
SqlCreateTaskDefinitionTable - creates the task definition table
func (*Store) SqlCreateTaskQueueTable ¶ added in v1.10.0
SqlCreateTaskQueueTable - creates the task queue table
func (*Store) TaskDefinitionCount ¶ added in v1.10.0
func (*Store) TaskDefinitionCreate ¶ added in v1.10.0
func (store *Store) TaskDefinitionCreate(ctx context.Context, task TaskDefinitionInterface) error
func (*Store) TaskDefinitionDelete ¶ added in v1.10.0
func (store *Store) TaskDefinitionDelete(ctx context.Context, task TaskDefinitionInterface) error
func (*Store) TaskDefinitionDeleteByID ¶ added in v1.10.0
func (*Store) TaskDefinitionEnqueueByAlias ¶ added in v1.14.0
func (st *Store) TaskDefinitionEnqueueByAlias( ctx context.Context, queueName string, taskAlias string, parameters map[string]any, ) (TaskQueueInterface, error)
TaskDefinitionEnqueueByAlias finds a task by its alias and appends it to the queue
func (*Store) TaskDefinitionExecuteCli ¶ added in v1.14.0
TaskDefinitionExecuteCli - CLI tool to find a task by its alias and execute its handler - alias "list" is reserved. it lists all the available commands
func (*Store) TaskDefinitionFindByAlias ¶ added in v1.10.0
func (*Store) TaskDefinitionFindByID ¶ added in v1.10.0
func (*Store) TaskDefinitionList ¶ added in v1.10.0
func (store *Store) TaskDefinitionList(ctx context.Context, query TaskDefinitionQueryInterface) ([]TaskDefinitionInterface, error)
func (*Store) TaskDefinitionSoftDelete ¶ added in v1.10.0
func (store *Store) TaskDefinitionSoftDelete(ctx context.Context, task TaskDefinitionInterface) error
func (*Store) TaskDefinitionSoftDeleteByID ¶ added in v1.10.0
func (*Store) TaskDefinitionUpdate ¶ added in v1.10.0
func (store *Store) TaskDefinitionUpdate(ctx context.Context, task TaskDefinitionInterface) error
func (*Store) TaskHandlerAdd ¶
func (*Store) TaskHandlerList ¶
func (store *Store) TaskHandlerList() []TaskHandlerInterface
func (*Store) TaskQueueClaimNext ¶ added in v1.10.0
func (store *Store) TaskQueueClaimNext(ctx context.Context, queueName string) (TaskQueueInterface, error)
TaskQueueClaimNext atomically claims the next queued task for processing. It uses SELECT FOR UPDATE within a transaction to prevent race conditions where multiple workers might try to process the same task.
Returns:
- TaskQueueInterface: The claimed task (status updated to "running")
- error: Any error that occurred during the operation
Returns (nil, nil) if no tasks are available to claim.
func (*Store) TaskQueueCount ¶ added in v1.10.0
func (*Store) TaskQueueCreate ¶ added in v1.10.0
func (store *Store) TaskQueueCreate(ctx context.Context, queue TaskQueueInterface) error
TaskQueueCreate creates a queued task
func (*Store) TaskQueueDelete ¶ added in v1.10.0
func (store *Store) TaskQueueDelete(ctx context.Context, queue TaskQueueInterface) error
func (*Store) TaskQueueDeleteByID ¶ added in v1.10.0
func (*Store) TaskQueueFail ¶ added in v1.10.0
func (st *Store) TaskQueueFail(ctx context.Context, queue TaskQueueInterface) error
TaskQueueFail fails a queued task
func (*Store) TaskQueueFindByID ¶ added in v1.10.0
TaskQueueFindByID finds a Queue by ID
func (*Store) TaskQueueFindNextQueuedTask ¶ added in v1.10.0
func (store *Store) TaskQueueFindNextQueuedTask(ctx context.Context) (TaskQueueInterface, error)
func (*Store) TaskQueueFindNextQueuedTaskByQueue ¶ added in v1.10.0
func (*Store) TaskQueueFindRunning ¶ added in v1.10.0
func (store *Store) TaskQueueFindRunning(ctx context.Context, limit int) []TaskQueueInterface
func (*Store) TaskQueueFindRunningByQueue ¶ added in v1.10.0
func (*Store) TaskQueueList ¶ added in v1.10.0
func (store *Store) TaskQueueList(ctx context.Context, query TaskQueueQueryInterface) ([]TaskQueueInterface, error)
func (*Store) TaskQueueProcessNext ¶ added in v1.10.0
func (*Store) TaskQueueProcessNextAsyncByQueue ¶ added in v1.10.0
func (*Store) TaskQueueProcessNextByQueue ¶ added in v1.10.0
func (*Store) TaskQueueProcessTask ¶ added in v1.14.0
func (*Store) TaskQueueRunConcurrent ¶ added in v1.14.0
func (store *Store) TaskQueueRunConcurrent( ctx context.Context, queueName string, processSeconds int, unstuckMinutes int, )
TaskQueueRunConcurrent starts a queue processor that handles multiple tasks concurrently. Tasks are processed in parallel up to the configured MaxConcurrency limit. The processor runs in a background goroutine and can be stopped via TaskQueueStopByName.
func (*Store) TaskQueueRunDefault ¶ added in v1.14.0
func (store *Store) TaskQueueRunDefault( ctx context.Context, processSeconds int, unstuckMinutes int, )
TaskQueueRunDefault starts the queue processor for the default queue. Equivalent to calling TaskQueueRunSerial with DefaultQueueName.
func (*Store) TaskQueueRunSerial ¶ added in v1.14.0
func (store *Store) TaskQueueRunSerial( ctx context.Context, queueName string, processSeconds int, unstuckMinutes int, )
TaskQueueRunSerial starts a queue processor that handles tasks one at a time (serially). Each task must complete before the next one starts. The processor runs in a background goroutine and can be stopped via TaskQueueStopByName.
func (*Store) TaskQueueSoftDelete ¶ added in v1.10.0
func (store *Store) TaskQueueSoftDelete(ctx context.Context, queue TaskQueueInterface) error
func (*Store) TaskQueueSoftDeleteByID ¶ added in v1.10.0
func (*Store) TaskQueueStop ¶ added in v1.14.0
func (store *Store) TaskQueueStop()
TaskQueueStop stops the default queue processor. It blocks until the worker goroutine and all tasks have fully completed.
func (*Store) TaskQueueStopByName ¶ added in v1.14.0
TaskQueueStopByName stops the specified queue processor. It cancels the context, waits for the queue loop to exit, and waits for all in-flight tasks to complete.
func (*Store) TaskQueueSuccess ¶ added in v1.10.0
func (st *Store) TaskQueueSuccess(ctx context.Context, queue TaskQueueInterface) error
TaskQueueSuccess completes a queued task successfully
func (*Store) TaskQueueUnstuck ¶ added in v1.10.0
TaskQueueUnstuck clears the queue of tasks running for more than the specified wait time as most probably these have abnormally exited (panicked) and stop the rest of the queue from being processed
The tasks are marked as failed. However, if they are still running in the background and they are successfully completed, they will be marked as success
================================================================= Business Logic 1. Checks is there are running tasks in progress 2. If running for more than the specified wait minutes mark as failed =================================================================
func (*Store) TaskQueueUnstuckByQueue ¶ added in v1.10.0
func (*Store) TaskQueueUpdate ¶ added in v1.10.0
func (store *Store) TaskQueueUpdate(ctx context.Context, queue TaskQueueInterface) error
TaskQueueUpdate creates a Queue
type StoreInterface ¶
type StoreInterface interface {
AutoMigrate() error
EnableDebug(debug bool) StoreInterface
SetErrorHandler(handler func(queueName, taskID string, err error)) StoreInterface
TaskQueueCount(ctx context.Context, options TaskQueueQueryInterface) (int64, error)
TaskQueueCreate(ctx context.Context, TaskQueue TaskQueueInterface) error
TaskQueueDelete(ctx context.Context, TaskQueue TaskQueueInterface) error
TaskQueueDeleteByID(ctx context.Context, id string) error
TaskQueueFindByID(ctx context.Context, TaskQueueID string) (TaskQueueInterface, error)
TaskQueueList(ctx context.Context, query TaskQueueQueryInterface) ([]TaskQueueInterface, error)
TaskQueueSoftDelete(ctx context.Context, TaskQueue TaskQueueInterface) error
TaskQueueSoftDeleteByID(ctx context.Context, id string) error
TaskQueueUpdate(ctx context.Context, TaskQueue TaskQueueInterface) error
TaskQueueClaimNext(ctx context.Context, queueName string) (TaskQueueInterface, error)
TaskQueueRunDefault(ctx context.Context, processSeconds int, unstuckMinutes int)
TaskQueueRunSerial(ctx context.Context, queueName string, processSeconds int, unstuckMinutes int)
TaskQueueRunConcurrent(ctx context.Context, queueName string, processSeconds int, unstuckMinutes int)
TaskQueueStop()
TaskQueueStopByName(queueName string)
TaskQueueProcessTask(ctx context.Context, queuedTask TaskQueueInterface) (bool, error)
TaskDefinitionCount(ctx context.Context, options TaskDefinitionQueryInterface) (int64, error)
TaskDefinitionCreate(ctx context.Context, TaskDefinition TaskDefinitionInterface) error
TaskDefinitionDelete(ctx context.Context, TaskDefinition TaskDefinitionInterface) error
TaskDefinitionDeleteByID(ctx context.Context, id string) error
TaskDefinitionFindByAlias(ctx context.Context, alias string) (TaskDefinitionInterface, error)
TaskDefinitionFindByID(ctx context.Context, id string) (TaskDefinitionInterface, error)
TaskDefinitionList(ctx context.Context, options TaskDefinitionQueryInterface) ([]TaskDefinitionInterface, error)
TaskDefinitionSoftDelete(ctx context.Context, TaskDefinition TaskDefinitionInterface) error
TaskDefinitionSoftDeleteByID(ctx context.Context, id string) error
TaskDefinitionUpdate(ctx context.Context, TaskDefinition TaskDefinitionInterface) error
// TaskDefinition Operations
TaskDefinitionEnqueueByAlias(ctx context.Context, queueName string, alias string, parameters map[string]any) (TaskQueueInterface, error)
TaskDefinitionExecuteCli(alias string, args []string) bool
TaskHandlerList() []TaskHandlerInterface
TaskHandlerAdd(ctx context.Context, taskHandler TaskHandlerInterface, createIfMissing bool) error
ScheduleCount(ctx context.Context, options ScheduleQueryInterface) (int64, error)
ScheduleCreate(ctx context.Context, schedule ScheduleInterface) error
ScheduleDelete(ctx context.Context, schedule ScheduleInterface) error
ScheduleDeleteByID(ctx context.Context, id string) error
ScheduleFindByID(ctx context.Context, id string) (ScheduleInterface, error)
ScheduleList(ctx context.Context, options ScheduleQueryInterface) ([]ScheduleInterface, error)
ScheduleSoftDelete(ctx context.Context, schedule ScheduleInterface) error
ScheduleSoftDeleteByID(ctx context.Context, id string) error
ScheduleUpdate(ctx context.Context, schedule ScheduleInterface) error
ScheduleRun(ctx context.Context) error
}
type TaskDefinitionInterface ¶ added in v1.10.0
type TaskDefinitionInterface interface {
Data() map[string]string
DataChanged() map[string]string
MarkAsNotDirty()
IsActive() bool
IsCanceled() bool
IsSoftDeleted() bool
Alias() string
SetAlias(alias string) TaskDefinitionInterface
CreatedAt() string
CreatedAtCarbon() *carbon.Carbon
SetCreatedAt(createdAt string) TaskDefinitionInterface
Description() string
SetDescription(description string) TaskDefinitionInterface
ID() string
SetID(id string) TaskDefinitionInterface
Memo() string
SetMemo(memo string) TaskDefinitionInterface
IsRecurring() int
SetIsRecurring(isRecurring int) TaskDefinitionInterface
RecurrenceRule() string
SetRecurrenceRule(recurrenceRule string) TaskDefinitionInterface
SoftDeletedAt() string
SoftDeletedAtCarbon() *carbon.Carbon
SetSoftDeletedAt(deletedAt string) TaskDefinitionInterface
Status() string
SetStatus(status string) TaskDefinitionInterface
Title() string
SetTitle(title string) TaskDefinitionInterface
UpdatedAt() string
UpdatedAtCarbon() *carbon.Carbon
SetUpdatedAt(updatedAt string) TaskDefinitionInterface
}
func NewTaskDefinition ¶ added in v1.10.0
func NewTaskDefinition() TaskDefinitionInterface
func NewTaskDefinitionFromExistingData ¶ added in v1.10.0
func NewTaskDefinitionFromExistingData(data map[string]string) TaskDefinitionInterface
type TaskDefinitionQueryInterface ¶ added in v1.10.0
type TaskDefinitionQueryInterface interface {
Validate() error
Columns() []string
SetColumns(columns []string) TaskDefinitionQueryInterface
HasCountOnly() bool
IsCountOnly() bool
SetCountOnly(countOnly bool) TaskDefinitionQueryInterface
HasAlias() bool
Alias() string
SetAlias(alias string) TaskDefinitionQueryInterface
HasCreatedAtGte() bool
CreatedAtGte() string
SetCreatedAtGte(createdAtGte string) TaskDefinitionQueryInterface
HasCreatedAtLte() bool
CreatedAtLte() string
SetCreatedAtLte(createdAtLte string) TaskDefinitionQueryInterface
HasID() bool
ID() string
SetID(id string) TaskDefinitionQueryInterface
HasIDIn() bool
IDIn() []string
SetIDIn(idIn []string) TaskDefinitionQueryInterface
HasLimit() bool
Limit() int
SetLimit(limit int) TaskDefinitionQueryInterface
HasOffset() bool
Offset() int
SetOffset(offset int) TaskDefinitionQueryInterface
HasSortOrder() bool
SortOrder() string
SetSortOrder(sortOrder string) TaskDefinitionQueryInterface
HasOrderBy() bool
OrderBy() string
SetOrderBy(orderBy string) TaskDefinitionQueryInterface
HasSoftDeletedIncluded() bool
SoftDeletedIncluded() bool
SetSoftDeletedIncluded(withDeleted bool) TaskDefinitionQueryInterface
HasStatus() bool
Status() string
SetStatus(status string) TaskDefinitionQueryInterface
HasStatusIn() bool
StatusIn() []string
SetStatusIn(statusIn []string) TaskDefinitionQueryInterface
}
func TaskDefinitionQuery ¶ added in v1.10.0
func TaskDefinitionQuery() TaskDefinitionQueryInterface
type TaskHandlerBase ¶
type TaskHandlerBase struct {
// contains filtered or unexported fields
}
func (*TaskHandlerBase) ErrorMessage ¶
func (handler *TaskHandlerBase) ErrorMessage() string
func (*TaskHandlerBase) GetParam ¶
func (handler *TaskHandlerBase) GetParam(paramName string) string
func (*TaskHandlerBase) GetParamArray ¶
func (handler *TaskHandlerBase) GetParamArray(paramName string) []string
func (*TaskHandlerBase) HasQueuedTask ¶
func (handler *TaskHandlerBase) HasQueuedTask() bool
func (*TaskHandlerBase) InfoMessage ¶
func (handler *TaskHandlerBase) InfoMessage() string
func (*TaskHandlerBase) LogError ¶
func (handler *TaskHandlerBase) LogError(message string)
func (*TaskHandlerBase) LogInfo ¶
func (handler *TaskHandlerBase) LogInfo(message string)
func (*TaskHandlerBase) LogSuccess ¶
func (handler *TaskHandlerBase) LogSuccess(message string)
func (*TaskHandlerBase) Options ¶
func (handler *TaskHandlerBase) Options() map[string]string
func (*TaskHandlerBase) QueuedTask ¶
func (handler *TaskHandlerBase) QueuedTask() TaskQueueInterface
func (*TaskHandlerBase) SetOptions ¶
func (handler *TaskHandlerBase) SetOptions(options map[string]string)
func (*TaskHandlerBase) SetQueuedTask ¶
func (handler *TaskHandlerBase) SetQueuedTask(queuedTask TaskQueueInterface)
func (*TaskHandlerBase) SuccessMessage ¶
func (handler *TaskHandlerBase) SuccessMessage() string
type TaskHandlerInterface ¶
type TaskHandlerWithContext ¶ added in v1.10.0
type TaskHandlerWithContext interface {
TaskHandlerInterface
HandleWithContext(ctx context.Context) bool
}
TaskHandlerWithContext is an optional interface that task handlers can implement to receive context for cancellation support. This is backward compatible - handlers that don't implement this will continue to work using the standard Handle() method.
Example usage:
type MyHandler struct {
TaskHandlerBase
}
func (h *MyHandler) HandleWithContext(ctx context.Context) bool {
select {
case <-ctx.Done():
h.LogInfo("Task cancelled")
return false
case <-time.After(5 * time.Second):
h.LogSuccess("Task completed")
return true
}
}
type TaskQueueInterface ¶ added in v1.10.0
type TaskQueueInterface interface {
Data() map[string]string
DataChanged() map[string]string
MarkAsNotDirty()
IsCanceled() bool
IsDeleted() bool
IsFailed() bool
IsQueued() bool
IsPaused() bool
IsRunning() bool
IsSuccess() bool
IsSoftDeleted() bool
Attempts() int
SetAttempts(attempts int) TaskQueueInterface
CompletedAt() string
CompletedAtCarbon() *carbon.Carbon
SetCompletedAt(completedAt string) TaskQueueInterface
CreatedAt() string
CreatedAtCarbon() *carbon.Carbon
SetCreatedAt(createdAt string) TaskQueueInterface
Details() string
AppendDetails(details string) TaskQueueInterface
SetDetails(details string) TaskQueueInterface
ID() string
SetID(id string) TaskQueueInterface
Output() string
SetOutput(output string) TaskQueueInterface
Parameters() string
SetParameters(parameters string) TaskQueueInterface
ParametersMap() (map[string]string, error)
SetParametersMap(parameters map[string]string) (TaskQueueInterface, error)
SoftDeletedAt() string
SoftDeletedAtCarbon() *carbon.Carbon
SetSoftDeletedAt(deletedAt string) TaskQueueInterface
StartedAt() string
StartedAtCarbon() *carbon.Carbon
SetStartedAt(startedAt string) TaskQueueInterface
Status() string
SetStatus(status string) TaskQueueInterface
TaskID() string
SetTaskID(taskID string) TaskQueueInterface
UpdatedAt() string
UpdatedAtCarbon() *carbon.Carbon
SetUpdatedAt(updatedAt string) TaskQueueInterface
QueueName() string
SetQueueName(queueName string) TaskQueueInterface
}
func NewTaskQueue ¶ added in v1.10.0
func NewTaskQueue(queueName ...string) TaskQueueInterface
NewTaskQueue creates a new task queue If a queue name is provided, it will be used; otherwise DefaultQueueName is used.
func NewTaskQueueFromExistingData ¶ added in v1.10.0
func NewTaskQueueFromExistingData(data map[string]string) TaskQueueInterface
type TaskQueueQueryInterface ¶ added in v1.10.0
type TaskQueueQueryInterface interface {
Validate() error
Columns() []string
SetColumns(columns []string) TaskQueueQueryInterface
HasCountOnly() bool
IsCountOnly() bool
SetCountOnly(countOnly bool) TaskQueueQueryInterface
HasCreatedAtGte() bool
CreatedAtGte() string
SetCreatedAtGte(createdAtGte string) TaskQueueQueryInterface
HasCreatedAtLte() bool
CreatedAtLte() string
SetCreatedAtLte(createdAtLte string) TaskQueueQueryInterface
HasID() bool
ID() string
SetID(id string) TaskQueueQueryInterface
HasIDIn() bool
IDIn() []string
SetIDIn(idIn []string) TaskQueueQueryInterface
HasLimit() bool
Limit() int
SetLimit(limit int) TaskQueueQueryInterface
HasOffset() bool
Offset() int
SetOffset(offset int) TaskQueueQueryInterface
HasSortOrder() bool
SortOrder() string
SetSortOrder(sortOrder string) TaskQueueQueryInterface
HasOrderBy() bool
OrderBy() string
SetOrderBy(orderBy string) TaskQueueQueryInterface
HasSoftDeletedIncluded() bool
SoftDeletedIncluded() bool
SetSoftDeletedIncluded(withDeleted bool) TaskQueueQueryInterface
HasStatus() bool
Status() string
SetStatus(status string) TaskQueueQueryInterface
HasStatusIn() bool
StatusIn() []string
SetStatusIn(statusIn []string) TaskQueueQueryInterface
HasTaskID() bool
TaskID() string
SetTaskID(taskID string) TaskQueueQueryInterface
HasQueueName() bool
QueueName() string
SetQueueName(queueName string) TaskQueueQueryInterface
}
func TaskQueueQuery ¶ added in v1.10.0
func TaskQueueQuery() TaskQueueQueryInterface
type TaskQueueRunnerInterface ¶ added in v1.15.0
type TaskQueueRunnerInterface interface {
Start(ctx context.Context)
Stop()
IsRunning() bool
RunOnce(ctx context.Context) error
}
func NewTaskQueueRunner ¶ added in v1.15.0
func NewTaskQueueRunner(store StoreInterface, opts TaskQueueRunnerOptions) TaskQueueRunnerInterface
Source Files
¶
- constants.go
- dev.go
- recurrence_rule.go
- schedule_implementation.go
- schedule_interface.go
- schedule_runner.go
- store_handler_methods.go
- store_implementation.go
- store_interface.go
- store_schedule_methods.go
- store_sqls.go
- store_task_definition_methods.go
- store_task_queue_methods.go
- task_definition_implementation.go
- task_definition_interface.go
- task_definition_query.go
- task_handler_base.go
- task_handler_interface.go
- task_handler_with_context.go
- task_queue_implementation.go
- task_queue_interfaces.go
- task_queue_query.go
- task_queue_query_interface.go
- task_queue_runner.go