Documentation
¶
Index ¶
- Constants
- func NextRunAt(rule RecurrenceRuleInterface, now *carbon.Carbon) (*carbon.Carbon, error)
- type DayOfWeek
- type Frequency
- type MonthOfYear
- type NewStoreOptions
- type RecurrenceRuleInterface
- type ScheduleDefinition
- type ScheduleInterface
- type Store
- func (st *Store) AutoMigrate() error
- func (st *Store) EnableDebug(debugEnabled bool) StoreInterface
- func (store *Store) QueueRunConcurrent(ctx context.Context, queueName string, processSeconds int, unstuckMinutes int)
- func (store *Store) QueueRunDefault(ctx context.Context, processSeconds int, unstuckMinutes int)
- func (store *Store) QueueRunSerial(ctx context.Context, queueName string, processSeconds int, unstuckMinutes int)
- func (store *Store) QueueStop()
- func (store *Store) QueueStopByName(queueName string)
- func (store *Store) QueuedTaskForceFail(queuedTask TaskQueueInterface, waitMinutes int) error
- func (store *Store) QueuedTaskProcess(queuedTask TaskQueueInterface) (bool, error)
- func (store *Store) QueuedTaskProcessWithContext(ctx context.Context, queuedTask TaskQueueInterface) (bool, error)
- func (st *Store) SetErrorHandler(handler func(queueName, taskID string, err error)) StoreInterface
- func (st *Store) SqlCreateTaskDefinitionTable() string
- func (st *Store) SqlCreateTaskQueueTable() string
- func (store *Store) TaskDefinitionCount(options TaskDefinitionQueryInterface) (int64, error)
- func (store *Store) TaskDefinitionCreate(task TaskDefinitionInterface) error
- func (store *Store) TaskDefinitionDelete(task TaskDefinitionInterface) error
- func (store *Store) TaskDefinitionDeleteByID(id string) error
- func (store *Store) TaskDefinitionFindByAlias(alias string) (task TaskDefinitionInterface, err error)
- func (store *Store) TaskDefinitionFindByID(id string) (task TaskDefinitionInterface, err error)
- func (store *Store) TaskDefinitionList(query TaskDefinitionQueryInterface) ([]TaskDefinitionInterface, error)
- func (store *Store) TaskDefinitionSoftDelete(task TaskDefinitionInterface) error
- func (store *Store) TaskDefinitionSoftDeleteByID(id string) error
- func (store *Store) TaskDefinitionUpdate(task TaskDefinitionInterface) error
- func (st *Store) TaskEnqueueByAlias(taskAlias string, parameters map[string]interface{}) (TaskQueueInterface, error)
- func (store *Store) TaskExecuteCli(alias string, args []string) bool
- func (store *Store) TaskHandlerAdd(taskHandler TaskHandlerInterface, createIfMissing bool) error
- func (store *Store) TaskHandlerList() []TaskHandlerInterface
- func (store *Store) TaskQueueClaimNext(ctx context.Context, queueName string) (TaskQueueInterface, error)
- func (store *Store) TaskQueueCount(options TaskQueueQueryInterface) (int64, error)
- func (store *Store) TaskQueueCreate(queue TaskQueueInterface) error
- func (store *Store) TaskQueueDelete(queue TaskQueueInterface) error
- func (st *Store) TaskQueueDeleteByID(id string) error
- func (st *Store) TaskQueueFail(queue TaskQueueInterface) error
- func (store *Store) TaskQueueFindByID(id string) (TaskQueueInterface, error)
- func (store *Store) TaskQueueFindNextQueuedTask() (TaskQueueInterface, error)
- func (store *Store) TaskQueueFindNextQueuedTaskByQueue(queueName string) (TaskQueueInterface, error)
- func (store *Store) TaskQueueFindRunning(limit int) []TaskQueueInterface
- func (store *Store) TaskQueueFindRunningByQueue(queueName string, limit int) []TaskQueueInterface
- func (store *Store) TaskQueueList(query TaskQueueQueryInterface) ([]TaskQueueInterface, error)
- func (store *Store) TaskQueueProcessNext() error
- func (store *Store) TaskQueueProcessNextAsyncByQueue(queueName string) error
- func (store *Store) TaskQueueProcessNextByQueue(queueName string) error
- func (store *Store) TaskQueueSoftDelete(queue TaskQueueInterface) error
- func (store *Store) TaskQueueSoftDeleteByID(id string) error
- func (st *Store) TaskQueueSuccess(queue TaskQueueInterface) error
- func (store *Store) TaskQueueUnstuck(waitMinutes int)
- func (store *Store) TaskQueueUnstuckByQueue(queueName string, waitMinutes int)
- func (store *Store) TaskQueueUpdate(queue TaskQueueInterface) error
- type StoreInterface
- type TaskDefinitionInterface
- type TaskDefinitionQueryInterface
- type TaskHandlerBase
- func (handler *TaskHandlerBase) ErrorMessage() string
- func (handler *TaskHandlerBase) GetParam(paramName string) string
- func (handler *TaskHandlerBase) GetParamArray(paramName string) []string
- func (handler *TaskHandlerBase) HasQueuedTask() bool
- func (handler *TaskHandlerBase) InfoMessage() string
- func (handler *TaskHandlerBase) LogError(message string)
- func (handler *TaskHandlerBase) LogInfo(message string)
- func (handler *TaskHandlerBase) LogSuccess(message string)
- func (handler *TaskHandlerBase) Options() map[string]string
- func (handler *TaskHandlerBase) QueuedTask() TaskQueueInterface
- func (handler *TaskHandlerBase) SetOptions(options map[string]string)
- func (handler *TaskHandlerBase) SetQueuedTask(queuedTask TaskQueueInterface)
- func (handler *TaskHandlerBase) SuccessMessage() string
- type TaskHandlerInterface
- type TaskHandlerWithContext
- type TaskQueueInterface
- type TaskQueueQueryInterface
Constants ¶
const ASC = "asc"
const COLUMN_ALIAS = "alias"
const COLUMN_ATTEMPTS = "attempts"
const COLUMN_COMPLETED_AT = "completed_at"
const COLUMN_CREATED_AT = "created_at"
const COLUMN_DELETED_AT = "deleted_at"
const COLUMN_DESCRIPTION = "description"
const COLUMN_DETAILS = "details"
const COLUMN_ID = "id"
const COLUMN_IS_RECURRING = "is_recurring"
const COLUMN_MEMO = "memo"
const COLUMN_METAS = "metas"
const COLUMN_OUTPUT = "output"
const COLUMN_PARAMETERS = "parameters"
const COLUMN_QUEUE_NAME = "queue_name"
const COLUMN_RECURRENCE_RULE = "recurrence_rule"
const COLUMN_STARTED_AT = "started_at"
const COLUMN_STATUS = "status"
const COLUMN_TASK_ID = "task_id"
const COLUMN_TITLE = "title"
const COLUMN_UPDATED_AT = "updated_at"
const DESC = "desc"
const DefaultQueueName = "default"
const TaskDefinitionStatusActive = "active"
const TaskDefinitionStatusCanceled = "canceled"
const TaskQueueStatusCanceled = "canceled"
const TaskQueueStatusDeleted = "deleted"
const TaskQueueStatusFailed = "failed"
const TaskQueueStatusPaused = "paused"
const TaskQueueStatusQueued = "queued"
const TaskQueueStatusRunning = "running"
const TaskQueueStatusSuccess = "success"
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Frequency ¶
type Frequency string
Define a string type alias
const ( FrequencyNone Frequency = "none" FrequencySecondly Frequency = "secondly" FrequencyMinutely Frequency = "minutely" FrequencyHourly Frequency = "hourly" FrequencyDaily Frequency = "daily" FrequencyWeekly Frequency = "weekly" FrequencyMonthly Frequency = "monthly" FrequencyYearly Frequency = "yearly" )
Define the constants as strings
type MonthOfYear ¶
type MonthOfYear string
const ( MonthOfYearJanuary MonthOfYear = "JANUARY" MonthOfYearFebruary MonthOfYear = "FEBRUARY" MonthOfYearMarch MonthOfYear = "MARCH" MonthOfYearApril MonthOfYear = "APRIL" MonthOfYearMay MonthOfYear = "MAY" MonthOfYearJune MonthOfYear = "JUNE" MonthOfYearJuly MonthOfYear = "JULY" MonthOfYearAugust MonthOfYear = "AUGUST" MonthOfYearSeptember MonthOfYear = "SEPTEMBER" MonthOfYearOctober MonthOfYear = "OCTOBER" MonthOfYearNovember MonthOfYear = "NOVEMBER" MonthOfYearDecember MonthOfYear = "DECEMBER" )
type NewStoreOptions ¶
type NewStoreOptions struct {
TaskDefinitionTableName string
TaskQueueTableName string
DB *sql.DB
DbDriverName string
AutomigrateEnabled bool
DebugEnabled bool
MaxConcurrency int // Max concurrent tasks (default: 10, 0 = unlimited)
ErrorHandler func(queueName, taskID string, err error) // Optional error callback
}
NewStoreOptions define the options for creating a new task store
type RecurrenceRuleInterface ¶ added in v1.10.0
type RecurrenceRuleInterface interface {
GetFrequency() Frequency
SetFrequency(Frequency) RecurrenceRuleInterface
GetStartsAt() string
SetStartsAt(dateTimeUTC string) RecurrenceRuleInterface
GetEndsAt() string
SetEndsAt(dateTimeUTC string) RecurrenceRuleInterface
GetInterval() int
SetInterval(int) RecurrenceRuleInterface
GetDaysOfWeek() []DayOfWeek
SetDaysOfWeek([]DayOfWeek) RecurrenceRuleInterface
GetDaysOfMonth() []int
SetDaysOfMonth([]int) RecurrenceRuleInterface
GetMonthsOfYear() []MonthOfYear
SetMonthsOfYear([]MonthOfYear) RecurrenceRuleInterface
}
func NewRecurrenceRule ¶
func NewRecurrenceRule() RecurrenceRuleInterface
type ScheduleDefinition ¶
type ScheduleDefinition interface {
GetID() string
SetID(string) ScheduleDefinition
GetRecurrenceRule() string
SetRecurrenceRule(dateTimeUtc string) ScheduleDefinition
GetStartsAt() string
SetStartsAt(dateTimeUtc string)
GetEndsAt() string
SetEndsAt(string)
IsValid() bool
GetNextRunTime(string) (string, error)
}
type ScheduleInterface ¶
type ScheduleInterface interface {
ScheduleDefinitionID() string
SetScheduleDefinition(ScheduleDefinition)
}
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store defines a session store
func NewStore ¶
func NewStore(opts NewStoreOptions) (*Store, error)
NewStore creates a new task store
func (*Store) EnableDebug ¶
func (st *Store) EnableDebug(debugEnabled bool) StoreInterface
EnableDebug - enables the debug option
func (*Store) QueueRunConcurrent ¶ added in v1.10.0
func (store *Store) QueueRunConcurrent(ctx context.Context, queueName string, processSeconds int, unstuckMinutes int)
QueueRunConcurrent starts a queue processor that handles multiple tasks concurrently. Tasks are processed in parallel up to the configured MaxConcurrency limit. The processor runs in a background goroutine and can be stopped via QueueStopByName.
func (*Store) QueueRunDefault ¶ added in v1.10.0
QueueRunDefault starts the queue processor for the default queue. Equivalent to calling QueueRunSerial with DefaultQueueName.
func (*Store) QueueRunSerial ¶ added in v1.10.0
func (store *Store) QueueRunSerial(ctx context.Context, queueName string, processSeconds int, unstuckMinutes int)
QueueRunSerial starts a queue processor that handles tasks one at a time (serially). Each task must complete before the next one starts. The processor runs in a background goroutine and can be stopped via QueueStopByName.
func (*Store) QueueStop ¶ added in v1.8.0
func (store *Store) QueueStop()
QueueStop stops the default queue processor. It blocks until the worker goroutine and all tasks have fully completed.
func (*Store) QueueStopByName ¶ added in v1.10.0
QueueStopByName stops the specified queue processor. It cancels the context, waits for the queue loop to exit, and waits for all in-flight tasks to complete.
func (*Store) QueuedTaskForceFail ¶
func (store *Store) QueuedTaskForceFail(queuedTask TaskQueueInterface, waitMinutes int) error
func (*Store) QueuedTaskProcess ¶
func (store *Store) QueuedTaskProcess(queuedTask TaskQueueInterface) (bool, error)
func (*Store) QueuedTaskProcessWithContext ¶ added in v1.10.0
func (store *Store) QueuedTaskProcessWithContext(ctx context.Context, queuedTask TaskQueueInterface) (bool, error)
QueuedTaskProcessWithContext processes a queued task with context support. It checks if the handler implements TaskHandlerWithContext and uses that if available, otherwise falls back to the standard Handle() method for backward compatibility.
func (*Store) SetErrorHandler ¶ added in v1.10.0
func (st *Store) SetErrorHandler(handler func(queueName, taskID string, err error)) StoreInterface
SetErrorHandler - sets a custom error handler for queue processing errors
func (*Store) SqlCreateTaskDefinitionTable ¶ added in v1.10.0
SqlCreateTaskDefinitionTable - creates the task definition table
func (*Store) SqlCreateTaskQueueTable ¶ added in v1.10.0
SqlCreateTaskQueueTable - creates the task queue table
func (*Store) TaskDefinitionCount ¶ added in v1.10.0
func (store *Store) TaskDefinitionCount(options TaskDefinitionQueryInterface) (int64, error)
func (*Store) TaskDefinitionCreate ¶ added in v1.10.0
func (store *Store) TaskDefinitionCreate(task TaskDefinitionInterface) error
func (*Store) TaskDefinitionDelete ¶ added in v1.10.0
func (store *Store) TaskDefinitionDelete(task TaskDefinitionInterface) error
func (*Store) TaskDefinitionDeleteByID ¶ added in v1.10.0
func (*Store) TaskDefinitionFindByAlias ¶ added in v1.10.0
func (store *Store) TaskDefinitionFindByAlias(alias string) (task TaskDefinitionInterface, err error)
func (*Store) TaskDefinitionFindByID ¶ added in v1.10.0
func (store *Store) TaskDefinitionFindByID(id string) (task TaskDefinitionInterface, err error)
func (*Store) TaskDefinitionList ¶ added in v1.10.0
func (store *Store) TaskDefinitionList(query TaskDefinitionQueryInterface) ([]TaskDefinitionInterface, error)
func (*Store) TaskDefinitionSoftDelete ¶ added in v1.10.0
func (store *Store) TaskDefinitionSoftDelete(task TaskDefinitionInterface) error
func (*Store) TaskDefinitionSoftDeleteByID ¶ added in v1.10.0
func (*Store) TaskDefinitionUpdate ¶ added in v1.10.0
func (store *Store) TaskDefinitionUpdate(task TaskDefinitionInterface) error
func (*Store) TaskEnqueueByAlias ¶
func (st *Store) TaskEnqueueByAlias(taskAlias string, parameters map[string]interface{}) (TaskQueueInterface, error)
TaskEnqueueByAlias finds a task by its alias and appends it to the queue
func (*Store) TaskExecuteCli ¶
TaskExecuteCli - CLI tool to find a task by its alias and execute its handler - alias "list" is reserved. it lists all the available commands
func (*Store) TaskHandlerAdd ¶
func (store *Store) TaskHandlerAdd(taskHandler TaskHandlerInterface, createIfMissing bool) error
func (*Store) TaskHandlerList ¶
func (store *Store) TaskHandlerList() []TaskHandlerInterface
func (*Store) TaskQueueClaimNext ¶ added in v1.10.0
func (store *Store) TaskQueueClaimNext(ctx context.Context, queueName string) (TaskQueueInterface, error)
TaskQueueClaimNext atomically claims the next queued task for processing. It uses SELECT FOR UPDATE within a transaction to prevent race conditions where multiple workers might try to process the same task.
Returns:
- TaskQueueInterface: The claimed task (status updated to "running")
- error: Any error that occurred during the operation
Returns (nil, nil) if no tasks are available to claim.
func (*Store) TaskQueueCount ¶ added in v1.10.0
func (store *Store) TaskQueueCount(options TaskQueueQueryInterface) (int64, error)
func (*Store) TaskQueueCreate ¶ added in v1.10.0
func (store *Store) TaskQueueCreate(queue TaskQueueInterface) error
TaskQueueCreate creates a queued task
func (*Store) TaskQueueDelete ¶ added in v1.10.0
func (store *Store) TaskQueueDelete(queue TaskQueueInterface) error
func (*Store) TaskQueueDeleteByID ¶ added in v1.10.0
func (*Store) TaskQueueFail ¶ added in v1.10.0
func (st *Store) TaskQueueFail(queue TaskQueueInterface) error
TaskQueueFail fails a queued task
func (*Store) TaskQueueFindByID ¶ added in v1.10.0
func (store *Store) TaskQueueFindByID(id string) (TaskQueueInterface, error)
TaskQueueFindByID finds a Queue by ID
func (*Store) TaskQueueFindNextQueuedTask ¶ added in v1.10.0
func (store *Store) TaskQueueFindNextQueuedTask() (TaskQueueInterface, error)
func (*Store) TaskQueueFindNextQueuedTaskByQueue ¶ added in v1.10.0
func (store *Store) TaskQueueFindNextQueuedTaskByQueue(queueName string) (TaskQueueInterface, error)
func (*Store) TaskQueueFindRunning ¶ added in v1.10.0
func (store *Store) TaskQueueFindRunning(limit int) []TaskQueueInterface
func (*Store) TaskQueueFindRunningByQueue ¶ added in v1.10.0
func (store *Store) TaskQueueFindRunningByQueue(queueName string, limit int) []TaskQueueInterface
func (*Store) TaskQueueList ¶ added in v1.10.0
func (store *Store) TaskQueueList(query TaskQueueQueryInterface) ([]TaskQueueInterface, error)
func (*Store) TaskQueueProcessNext ¶ added in v1.10.0
func (*Store) TaskQueueProcessNextAsyncByQueue ¶ added in v1.10.0
func (*Store) TaskQueueProcessNextByQueue ¶ added in v1.10.0
func (*Store) TaskQueueSoftDelete ¶ added in v1.10.0
func (store *Store) TaskQueueSoftDelete(queue TaskQueueInterface) error
func (*Store) TaskQueueSoftDeleteByID ¶ added in v1.10.0
func (*Store) TaskQueueSuccess ¶ added in v1.10.0
func (st *Store) TaskQueueSuccess(queue TaskQueueInterface) error
TaskQueueSuccess completes a queued task successfully
func (*Store) TaskQueueUnstuck ¶ added in v1.10.0
TaskQueueUnstuck clears the queue of tasks running for more than the specified wait time as most probably these have abnormally exited (panicked) and stop the rest of the queue from being processed
The tasks are marked as failed. However, if they are still running in the background and they are successfully completed, they will be marked as success
================================================================= Business Logic 1. Checks is there are running tasks in progress 2. If running for more than the specified wait minutes mark as failed =================================================================
func (*Store) TaskQueueUnstuckByQueue ¶ added in v1.10.0
func (*Store) TaskQueueUpdate ¶ added in v1.10.0
func (store *Store) TaskQueueUpdate(queue TaskQueueInterface) error
TaskQueueUpdate creates a Queue
type StoreInterface ¶
type StoreInterface interface {
AutoMigrate() error
EnableDebug(debug bool) StoreInterface
SetErrorHandler(handler func(queueName, taskID string, err error)) StoreInterface
TaskQueueCount(options TaskQueueQueryInterface) (int64, error)
TaskQueueCreate(TaskQueue TaskQueueInterface) error
TaskQueueDelete(TaskQueue TaskQueueInterface) error
TaskQueueDeleteByID(id string) error
TaskQueueFindByID(TaskQueueID string) (TaskQueueInterface, error)
TaskQueueList(query TaskQueueQueryInterface) ([]TaskQueueInterface, error)
TaskQueueSoftDelete(TaskQueue TaskQueueInterface) error
TaskQueueSoftDeleteByID(id string) error
TaskQueueUpdate(TaskQueue TaskQueueInterface) error
QueueRunDefault(ctx context.Context, processSeconds int, unstuckMinutes int)
QueueRunSerial(ctx context.Context, queueName string, processSeconds int, unstuckMinutes int)
QueueRunConcurrent(ctx context.Context, queueName string, processSeconds int, unstuckMinutes int)
QueueStop()
QueueStopByName(queueName string)
QueuedTaskProcess(queuedTask TaskQueueInterface) (bool, error)
TaskEnqueueByAlias(alias string, parameters map[string]interface{}) (TaskQueueInterface, error)
TaskExecuteCli(alias string, args []string) bool
TaskDefinitionCount(options TaskDefinitionQueryInterface) (int64, error)
TaskDefinitionCreate(TaskDefinition TaskDefinitionInterface) error
TaskDefinitionDelete(TaskDefinition TaskDefinitionInterface) error
TaskDefinitionDeleteByID(id string) error
TaskDefinitionFindByAlias(alias string) (TaskDefinitionInterface, error)
TaskDefinitionFindByID(id string) (TaskDefinitionInterface, error)
TaskDefinitionList(options TaskDefinitionQueryInterface) ([]TaskDefinitionInterface, error)
TaskDefinitionSoftDelete(TaskDefinition TaskDefinitionInterface) error
TaskDefinitionSoftDeleteByID(id string) error
TaskDefinitionUpdate(TaskDefinition TaskDefinitionInterface) error
TaskHandlerList() []TaskHandlerInterface
TaskHandlerAdd(taskHandler TaskHandlerInterface, createIfMissing bool) error
}
type TaskDefinitionInterface ¶ added in v1.10.0
type TaskDefinitionInterface interface {
Data() map[string]string
DataChanged() map[string]string
MarkAsNotDirty()
IsActive() bool
IsCanceled() bool
IsSoftDeleted() bool
Alias() string
SetAlias(alias string) TaskDefinitionInterface
CreatedAt() string
CreatedAtCarbon() *carbon.Carbon
SetCreatedAt(createdAt string) TaskDefinitionInterface
Description() string
SetDescription(description string) TaskDefinitionInterface
ID() string
SetID(id string) TaskDefinitionInterface
Memo() string
SetMemo(memo string) TaskDefinitionInterface
SoftDeletedAt() string
SoftDeletedAtCarbon() *carbon.Carbon
SetSoftDeletedAt(deletedAt string) TaskDefinitionInterface
Status() string
SetStatus(status string) TaskDefinitionInterface
Title() string
SetTitle(title string) TaskDefinitionInterface
UpdatedAt() string
UpdatedAtCarbon() *carbon.Carbon
SetUpdatedAt(updatedAt string) TaskDefinitionInterface
}
func NewTaskDefinition ¶ added in v1.10.0
func NewTaskDefinition() TaskDefinitionInterface
func NewTaskDefinitionFromExistingData ¶ added in v1.10.0
func NewTaskDefinitionFromExistingData(data map[string]string) TaskDefinitionInterface
type TaskDefinitionQueryInterface ¶ added in v1.10.0
type TaskDefinitionQueryInterface interface {
Validate() error
Columns() []string
SetColumns(columns []string) TaskDefinitionQueryInterface
HasCountOnly() bool
IsCountOnly() bool
SetCountOnly(countOnly bool) TaskDefinitionQueryInterface
HasAlias() bool
Alias() string
SetAlias(alias string) TaskDefinitionQueryInterface
HasCreatedAtGte() bool
CreatedAtGte() string
SetCreatedAtGte(createdAtGte string) TaskDefinitionQueryInterface
HasCreatedAtLte() bool
CreatedAtLte() string
SetCreatedAtLte(createdAtLte string) TaskDefinitionQueryInterface
HasID() bool
ID() string
SetID(id string) TaskDefinitionQueryInterface
HasIDIn() bool
IDIn() []string
SetIDIn(idIn []string) TaskDefinitionQueryInterface
HasLimit() bool
Limit() int
SetLimit(limit int) TaskDefinitionQueryInterface
HasOffset() bool
Offset() int
SetOffset(offset int) TaskDefinitionQueryInterface
HasSortOrder() bool
SortOrder() string
SetSortOrder(sortOrder string) TaskDefinitionQueryInterface
HasOrderBy() bool
OrderBy() string
SetOrderBy(orderBy string) TaskDefinitionQueryInterface
HasSoftDeletedIncluded() bool
SoftDeletedIncluded() bool
SetSoftDeletedIncluded(withDeleted bool) TaskDefinitionQueryInterface
HasStatus() bool
Status() string
SetStatus(status string) TaskDefinitionQueryInterface
HasStatusIn() bool
StatusIn() []string
SetStatusIn(statusIn []string) TaskDefinitionQueryInterface
}
func TaskDefinitionQuery ¶ added in v1.10.0
func TaskDefinitionQuery() TaskDefinitionQueryInterface
type TaskHandlerBase ¶
type TaskHandlerBase struct {
// contains filtered or unexported fields
}
func (*TaskHandlerBase) ErrorMessage ¶
func (handler *TaskHandlerBase) ErrorMessage() string
func (*TaskHandlerBase) GetParam ¶
func (handler *TaskHandlerBase) GetParam(paramName string) string
func (*TaskHandlerBase) GetParamArray ¶
func (handler *TaskHandlerBase) GetParamArray(paramName string) []string
func (*TaskHandlerBase) HasQueuedTask ¶
func (handler *TaskHandlerBase) HasQueuedTask() bool
func (*TaskHandlerBase) InfoMessage ¶
func (handler *TaskHandlerBase) InfoMessage() string
func (*TaskHandlerBase) LogError ¶
func (handler *TaskHandlerBase) LogError(message string)
func (*TaskHandlerBase) LogInfo ¶
func (handler *TaskHandlerBase) LogInfo(message string)
func (*TaskHandlerBase) LogSuccess ¶
func (handler *TaskHandlerBase) LogSuccess(message string)
func (*TaskHandlerBase) Options ¶
func (handler *TaskHandlerBase) Options() map[string]string
func (*TaskHandlerBase) QueuedTask ¶
func (handler *TaskHandlerBase) QueuedTask() TaskQueueInterface
func (*TaskHandlerBase) SetOptions ¶
func (handler *TaskHandlerBase) SetOptions(options map[string]string)
func (*TaskHandlerBase) SetQueuedTask ¶
func (handler *TaskHandlerBase) SetQueuedTask(queuedTask TaskQueueInterface)
func (*TaskHandlerBase) SuccessMessage ¶
func (handler *TaskHandlerBase) SuccessMessage() string
type TaskHandlerInterface ¶
type TaskHandlerWithContext ¶ added in v1.10.0
type TaskHandlerWithContext interface {
TaskHandlerInterface
HandleWithContext(ctx context.Context) bool
}
TaskHandlerWithContext is an optional interface that task handlers can implement to receive context for cancellation support. This is backward compatible - handlers that don't implement this will continue to work using the standard Handle() method.
Example usage:
type MyHandler struct {
TaskHandlerBase
}
func (h *MyHandler) HandleWithContext(ctx context.Context) bool {
select {
case <-ctx.Done():
h.LogInfo("Task cancelled")
return false
case <-time.After(5 * time.Second):
h.LogSuccess("Task completed")
return true
}
}
type TaskQueueInterface ¶ added in v1.10.0
type TaskQueueInterface interface {
Data() map[string]string
DataChanged() map[string]string
MarkAsNotDirty()
IsCanceled() bool
IsDeleted() bool
IsFailed() bool
IsQueued() bool
IsPaused() bool
IsRunning() bool
IsSuccess() bool
IsSoftDeleted() bool
Attempts() int
SetAttempts(attempts int) TaskQueueInterface
CompletedAt() string
CompletedAtCarbon() *carbon.Carbon
SetCompletedAt(completedAt string) TaskQueueInterface
CreatedAt() string
CreatedAtCarbon() *carbon.Carbon
SetCreatedAt(createdAt string) TaskQueueInterface
Details() string
AppendDetails(details string) TaskQueueInterface
SetDetails(details string) TaskQueueInterface
ID() string
SetID(id string) TaskQueueInterface
Output() string
SetOutput(output string) TaskQueueInterface
Parameters() string
SetParameters(parameters string) TaskQueueInterface
ParametersMap() (map[string]string, error)
SetParametersMap(parameters map[string]string) (TaskQueueInterface, error)
SoftDeletedAt() string
SoftDeletedAtCarbon() *carbon.Carbon
SetSoftDeletedAt(deletedAt string) TaskQueueInterface
StartedAt() string
StartedAtCarbon() *carbon.Carbon
SetStartedAt(startedAt string) TaskQueueInterface
Status() string
SetStatus(status string) TaskQueueInterface
TaskID() string
SetTaskID(taskID string) TaskQueueInterface
UpdatedAt() string
UpdatedAtCarbon() *carbon.Carbon
SetUpdatedAt(updatedAt string) TaskQueueInterface
QueueName() string
SetQueueName(queueName string) TaskQueueInterface
}
func NewTaskQueue ¶ added in v1.10.0
func NewTaskQueue(queueName ...string) TaskQueueInterface
NewTaskQueue creates a new task queue If a queue name is provided, it will be used; otherwise DefaultQueueName is used.
func NewTaskQueueFromExistingData ¶ added in v1.10.0
func NewTaskQueueFromExistingData(data map[string]string) TaskQueueInterface
type TaskQueueQueryInterface ¶ added in v1.10.0
type TaskQueueQueryInterface interface {
Validate() error
Columns() []string
SetColumns(columns []string) TaskQueueQueryInterface
HasCountOnly() bool
IsCountOnly() bool
SetCountOnly(countOnly bool) TaskQueueQueryInterface
HasCreatedAtGte() bool
CreatedAtGte() string
SetCreatedAtGte(createdAtGte string) TaskQueueQueryInterface
HasCreatedAtLte() bool
CreatedAtLte() string
SetCreatedAtLte(createdAtLte string) TaskQueueQueryInterface
HasID() bool
ID() string
SetID(id string) TaskQueueQueryInterface
HasIDIn() bool
IDIn() []string
SetIDIn(idIn []string) TaskQueueQueryInterface
HasLimit() bool
Limit() int
SetLimit(limit int) TaskQueueQueryInterface
HasOffset() bool
Offset() int
SetOffset(offset int) TaskQueueQueryInterface
HasSortOrder() bool
SortOrder() string
SetSortOrder(sortOrder string) TaskQueueQueryInterface
HasOrderBy() bool
OrderBy() string
SetOrderBy(orderBy string) TaskQueueQueryInterface
HasSoftDeletedIncluded() bool
SoftDeletedIncluded() bool
SetSoftDeletedIncluded(withDeleted bool) TaskQueueQueryInterface
HasStatus() bool
Status() string
SetStatus(status string) TaskQueueQueryInterface
HasStatusIn() bool
StatusIn() []string
SetStatusIn(statusIn []string) TaskQueueQueryInterface
HasTaskID() bool
TaskID() string
SetTaskID(taskID string) TaskQueueQueryInterface
HasQueueName() bool
QueueName() string
SetQueueName(queueName string) TaskQueueQueryInterface
}
func TaskQueueQuery ¶ added in v1.10.0
func TaskQueueQuery() TaskQueueQueryInterface
Source Files
¶
- consts.go
- dev.go
- recurrence_rule.go
- store_handler_methods.go
- store_implementation.go
- store_interface.go
- store_sqls.go
- store_task_definition_methods.go
- store_task_queue_methods.go
- task_definition_implementation.go
- task_definition_interface.go
- task_definition_query.go
- task_handler_base.go
- task_handler_interface.go
- task_handler_with_context.go
- task_queue_implementation.go
- task_queue_interfaces.go
- task_queue_query.go
- task_queue_query_interface.go