taskstore

package module
v1.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2025 License: AGPL-3.0 Imports: 20 Imported by: 1

README

Task Store Open in Gitpod

Tests Status Go Report Card PkgGoDev

TaskStore is a robust, asynchronous durable task queue package designed to offload time-consuming or resource-intensive operations from your main application.

By deferring tasks to the background, you can improve application responsiveness and prevent performance bottlenecks.

TaskStore leverages a durable database (SQLite, MySQL, or PostgreSQL) to ensure reliable persistence and fault tolerance.

License

This project is licensed under the GNU Affero General Public License v3.0 (AGPL-3.0). You can find a copy of the license at https://www.gnu.org/licenses/agpl-3.0.en.html

For commercial use, please use my contact page to obtain a commercial license.

Installation

go get github.com/dracory/taskstore

Queue Features

Atomic Task Claiming

Tasks are claimed atomically using database transactions with SELECT FOR UPDATE, preventing race conditions where multiple workers might process the same task simultaneously.

Concurrency Control
  • Default limit: 10 concurrent tasks per queue
  • Configurable: Set via MaxConcurrency in NewStoreOptions
  • Semaphore-based: Automatic backpressure when limit is reached
store, err := taskstore.NewStore(taskstore.NewStoreOptions{
    DB:                      databaseInstance,
    TaskDefinitionTableName: "task_definition",
    TaskQueueTableName:      "task_queue",
    MaxConcurrency:          20, // Allow 20 concurrent tasks
})
Graceful Shutdown
  • TaskQueueStop() – Stop default queue and wait for all tasks to complete
  • TaskQueueStopByName(queueName) – Stop specific queue and wait for all tasks
  • Ensures no task goroutines are abandoned
// Start concurrent queue
store.TaskQueueRunConcurrent(ctx, "emails", 10, 1)

// Later: gracefully stop and wait for completion
store.TaskQueueStopByName("emails")
Error Handling

Configure custom error handlers for monitoring and alerting:

store.SetErrorHandler(func(queueName, taskID string, err error) {
    log.Printf("[ERROR] Queue: %s, Task: %s, Error: %v", queue Name, taskID, err)
    // Send to monitoring system
    metrics.RecordTaskError(queueName, taskID)
})
Context Propagation (Optional)

Task handlers can optionally implement TaskHandlerWithContext to support cancellation:

func (h *EmailHandler) HandleWithContext(ctx context.Context) bool {
    select {
    case <-ctx.Done():
        h.LogInfo("Task cancelled")
        return false
    case <-time.After(5 * time.Second):
        // Send email...
        h.LogSuccess("Email sent")
        return true
    }
}

Note: Existing handlers without HandleWithContext continue to work - this is fully backward compatible.

Setup

myTaskStore = taskstore.NewStore(taskstore.NewStoreOptions{
	DB:                 databaseInstance,
	TaskDefinitionTableName: "my_task_definition",
	TaskQueueTableName:      "my_task_queue",
	AutomigrateEnabled: true,
	DebugEnabled:       false,
})

Task Definitions

The task definition specifies a unit of work to be completed. It can be performed immediately, or enqueued to the database and deferred for asynchronous processing, ensuring your application remains responsive.

Each task definition is uniquely identified by an alias and provides a human-readable title and description.

Each task definition is uniquely identified by an alias that allows the task to be easily called. A human-readable title and description give the user more information on the task definition.

To define a task definition, implement the TaskHandlerInterface and provide a Handle method that contains the task's logic.

Optionally, extend the TaskHandlerBase struct for additional features like parameter retrieval.

Task definitions can be executed directly from the command line (CLI) or as part of a background task queue.

The tasks placed in the task queue will be processed at a specified interval.

package tasks

func NewHelloWorldTask() *HelloWorldTask {
	return &HelloWorldTask{}
}

type HelloWorldTask struct {
	taskstore.TaskHandlerBase
}

var _ taskstore.TaskHandlerInterface = (*HelloWorldTask)(nil) // verify it extends the task handler interface

func (task *HelloWorldTask) Alias() string {
	return "HelloWorldTask"
}

func (task *HelloWorldTask) Title() string {
	return "Hello World"
}

func (task *HelloWorldTask) Description() string {
	return "Say hello world"
}

// Enqueue. Optional shortcut to quickly add this task to the task queue
func (task *HelloWorldTask) Enqueue(name string) (taskQueue taskstore.TaskQueueInterface, err error) {
	return myTaskStore.TaskDefinitionEnqueueByAlias(taskstore.DefaultTaskQueue, task.Alias(), map[string]any{
		"name": name,
	})
}

func (task *HelloWorldTask) Handle() bool {
	name := handler.GetParam("name")

        // Optional to allow adding the task to the task queue manually. Useful while in development
	if !task.HasQueuedTask() && task.GetParam("enqueue") == "yes" {
		_, err := handler.Enqueue(name)

		if err != nil {
			task.LogError("Error enqueuing task: " + err.Error())
		} else {
			task.LogSuccess("Task enqueued.")
		}
		
		return true
	}

        if name != "" {
		task.LogInfo("Hello" + name + "!")	
	} else {
		task.LogInfo("Hello World!")
	}

	return true
}

Registering Task Definitions to the TaskStore

Registering the task definition to the task store will persist it in the database.

myTaskStore.TaskHandlerAdd(NewHelloWorldTask(), true)

Executing Task Definitions in the Terminal

To add the option to execute tasks from the terminal add the following to your main method

myTaskStore.TaskDefinitionExecuteCli(args[1], args[1:])

Example:

go run . HelloWorldTask --name="Tom Jones"

Adding the Task to the Task Queue

To add a task to the background task queue

myTaskStore.TaskDefinitionEnqueueByAlias(taskstore.DefaultTaskQueue, NewHelloWorldTask.Alias(), map[string]any{
	"name": name,
})

Or if you have defined an Enqueue method as in the example task above.

NewHelloWorldTask().Enqueue("Tom Jones")

Starting the Task Queue

To start the task queue, use one of the queue run methods:

ctx := context.Background()

// Option 1: Run default queue (serial processing)
myTaskStore.TaskQueueRunDefault(ctx, 10, 2) // every 10s, unstuck after 2 mins

// Option 2: Run named queue with serial processing
myTaskStore.TaskQueueRunSerial(ctx, "emails", 10, 2)

// Option 3: Run named queue with concurrent processing (respects MaxConcurrency)
myTaskStore.TaskQueueRunConcurrent(ctx, "emails", 10, 2)

Store Methods

  • AutoMigrate() error – automigrates (creates) the task definition and task queue tables
  • EnableDebug(debug bool) StoreInterface – enables / disables the debug option

Task Definition Methods

  • TaskDefinitionCreate(task TaskDefinitionInterface) error – creates a task definition
  • TaskDefinitionFindByAlias(alias string) (TaskDefinitionInterface, error) – finds a task definition by alias
  • TaskDefinitionFindByID(id string) (TaskDefinitionInterface, error) – finds a task definition by ID
  • TaskDefinitionList(options TaskDefinitionQueryInterface) ([]TaskDefinitionInterface, error) – lists task definitions
  • TaskDefinitionUpdate(task TaskDefinitionInterface) error – updates a task definition
  • TaskDefinitionSoftDelete(task TaskDefinitionInterface) error – soft deletes a task definition

Task Queue Methods

  • TaskQueueCreate(queue TaskQueueInterface) error – creates a new queued task
  • TaskQueueDeleteByID(id string) error – deletes a queued task by ID
  • TaskQueueFindByID(id string) (TaskQueueInterface, error) – finds a queued task by ID
  • TaskQueueFail(queue TaskQueueInterface) error – marks a queued task as failed
  • TaskQueueSoftDeleteByID(id string) error – soft deletes a queued task by ID (populates the deleted_at field)
  • TaskQueueSuccess(queue TaskQueueInterface) error – completes a queued task successfully
  • TaskQueueList(options TaskQueueQueryInterface) ([]TaskQueueInterface, error) – lists the queued tasks
  • TaskQueueUpdate(queue TaskQueueInterface) error – updates a queued task

Frequently Asked Questions (FAQ)

1. What is TaskStore used for?

TaskStore is a versatile tool for offloading time-consuming or resource-intensive tasks from your main application. By deferring these tasks to the background, you can improve application responsiveness and prevent performance bottlenecks.

It's ideal for tasks like data processing, sending emails, generating reports, or performing batch operations.

2. How does TaskStore work?

TaskStore creates a durable queue in your database (SQLite, MySQL, or PostgreSQL) to store tasks. These tasks are then processed asynchronously by a background worker. You can define tasks using a simple interface and schedule them to be executed at specific intervals or on demand.

3. What are the benefits of using TaskStore?
  • Improved application performance: Offload time-consuming tasks to prevent performance bottlenecks.
  • Asynchronous processing: Execute tasks independently of your main application flow.
  • Reliability: Ensure tasks are completed even if your application crashes.
  • Flexibility: Schedule tasks to run at specific intervals or on demand.
  • Ease of use: Define tasks using a simple interface and integrate with your existing application.
4. How do I create a task definition in TaskStore?

To create a task definition, you'll need to implement the TaskHandlerInterface and provide a Handle method that contains the task's logic. You can also extend the TaskHandlerBase struct for additional features.

5. How do I schedule a task to run in the background?

Use the TaskDefinitionEnqueueByAlias method to add a task to the background task queue. You can specify the interval at which the task queue is processed using the QueueRunGoroutine method.

6. Can I monitor the status of tasks?

Yes, TaskStore provides methods to list tasks, check their status, and view task details.

7. How does TaskStore handle task failures?

If a task fails, it can be retried automatically or marked as failed. You can customize the retry logic to suit your specific needs.

8. Is TaskStore suitable for large-scale applications?

Yes, TaskStore is designed to handle large volumes of tasks. It can be scaled horizontally by adding more worker processes.

9. Does TaskStore support different database systems?

Yes, TaskStore supports SQLite, MySQL, and PostgreSQL.

10. Can I customize TaskStore to fit my specific needs?

Yes, TaskStore is highly customizable. You can extend and modify the code to suit your requirements.

Similar

Documentation

Index

Constants

View Source
const ASC = "asc"
View Source
const COLUMN_ALIAS = "alias"
View Source
const COLUMN_ATTEMPTS = "attempts"
View Source
const COLUMN_COMPLETED_AT = "completed_at"
View Source
const COLUMN_CREATED_AT = "created_at"
View Source
const COLUMN_DESCRIPTION = "description"
View Source
const COLUMN_DETAILS = "details"
View Source
const COLUMN_ID = "id"
View Source
const COLUMN_IS_RECURRING = "is_recurring"
View Source
const COLUMN_MEMO = "memo"
View Source
const COLUMN_METAS = "metas"
View Source
const COLUMN_OUTPUT = "output"
View Source
const COLUMN_PARAMETERS = "parameters"
View Source
const COLUMN_QUEUE_NAME = "queue_name"
View Source
const COLUMN_RECURRENCE_RULE = "recurrence_rule"
View Source
const COLUMN_SOFT_DELETED_AT = "soft_deleted_at"
View Source
const COLUMN_STARTED_AT = "started_at"
View Source
const COLUMN_STATUS = "status"
View Source
const COLUMN_TASK_ID = "task_id"
View Source
const COLUMN_TITLE = "title"
View Source
const COLUMN_UPDATED_AT = "updated_at"
View Source
const DESC = "desc"
View Source
const DefaultQueueName = "default"
View Source
const TaskDefinitionStatusActive = "active"
View Source
const TaskDefinitionStatusCanceled = "canceled"
View Source
const TaskQueueStatusCanceled = "canceled"
View Source
const TaskQueueStatusDeleted = "deleted"
View Source
const TaskQueueStatusFailed = "failed"
View Source
const TaskQueueStatusPaused = "paused"
View Source
const TaskQueueStatusQueued = "queued"
View Source
const TaskQueueStatusRunning = "running"
View Source
const TaskQueueStatusSuccess = "success"

Variables

This section is empty.

Functions

func NextRunAt

func NextRunAt(rule RecurrenceRuleInterface, now *carbon.Carbon) (*carbon.Carbon, error)

Types

type DayOfWeek

type DayOfWeek string
const (
	DayOfWeekMonday    DayOfWeek = "monday"
	DayOfWeekTuesday   DayOfWeek = "tuesday"
	DayOfWeekWednesday DayOfWeek = "wednesday"
	DayOfWeekThursday  DayOfWeek = "thursday"
	DayOfWeekFriday    DayOfWeek = "friday"
	DayOfWeekSaturday  DayOfWeek = "saturday"
	DayOfWeekSunday    DayOfWeek = "sunday"
)

type Frequency

type Frequency string

Define a string type alias

const (
	FrequencyNone     Frequency = "none"
	FrequencySecondly Frequency = "secondly"
	FrequencyMinutely Frequency = "minutely"
	FrequencyHourly   Frequency = "hourly"
	FrequencyDaily    Frequency = "daily"
	FrequencyWeekly   Frequency = "weekly"
	FrequencyMonthly  Frequency = "monthly"
	FrequencyYearly   Frequency = "yearly"
)

Define the constants as strings

type MonthOfYear

type MonthOfYear string
const (
	MonthOfYearJanuary   MonthOfYear = "JANUARY"
	MonthOfYearFebruary  MonthOfYear = "FEBRUARY"
	MonthOfYearMarch     MonthOfYear = "MARCH"
	MonthOfYearApril     MonthOfYear = "APRIL"
	MonthOfYearMay       MonthOfYear = "MAY"
	MonthOfYearJune      MonthOfYear = "JUNE"
	MonthOfYearJuly      MonthOfYear = "JULY"
	MonthOfYearAugust    MonthOfYear = "AUGUST"
	MonthOfYearSeptember MonthOfYear = "SEPTEMBER"
	MonthOfYearOctober   MonthOfYear = "OCTOBER"
	MonthOfYearNovember  MonthOfYear = "NOVEMBER"
	MonthOfYearDecember  MonthOfYear = "DECEMBER"
)

type NewStoreOptions

type NewStoreOptions struct {
	TaskDefinitionTableName string
	TaskQueueTableName      string
	DB                      *sql.DB
	DbDriverName            string
	AutomigrateEnabled      bool
	DebugEnabled            bool
	MaxConcurrency          int                                       // Max concurrent tasks (default: 10, 0 = unlimited)
	ErrorHandler            func(queueName, taskID string, err error) // Optional error callback
}

NewStoreOptions define the options for creating a new task store

type RecurrenceRuleInterface added in v1.10.0

type RecurrenceRuleInterface interface {
	GetFrequency() Frequency
	SetFrequency(Frequency) RecurrenceRuleInterface

	GetStartsAt() string
	SetStartsAt(dateTimeUTC string) RecurrenceRuleInterface

	GetEndsAt() string
	SetEndsAt(dateTimeUTC string) RecurrenceRuleInterface

	GetInterval() int
	SetInterval(int) RecurrenceRuleInterface

	GetDaysOfWeek() []DayOfWeek
	SetDaysOfWeek([]DayOfWeek) RecurrenceRuleInterface

	GetDaysOfMonth() []int
	SetDaysOfMonth([]int) RecurrenceRuleInterface

	GetMonthsOfYear() []MonthOfYear
	SetMonthsOfYear([]MonthOfYear) RecurrenceRuleInterface
}

func NewRecurrenceRule

func NewRecurrenceRule() RecurrenceRuleInterface

type ScheduleDefinition

type ScheduleDefinition interface {
	GetID() string
	SetID(string) ScheduleDefinition

	GetRecurrenceRule() string
	SetRecurrenceRule(dateTimeUtc string) ScheduleDefinition

	GetStartsAt() string
	SetStartsAt(dateTimeUtc string)

	GetEndsAt() string
	SetEndsAt(string)

	IsValid() bool
	GetNextRunTime(string) (string, error)
}

type ScheduleInterface

type ScheduleInterface interface {
	ScheduleDefinitionID() string
	SetScheduleDefinition(ScheduleDefinition)
}

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store defines a session store

func NewStore

func NewStore(opts NewStoreOptions) (*Store, error)

NewStore creates a new task store

func (*Store) AutoMigrate

func (st *Store) AutoMigrate() error

AutoMigrate migrates the tables

func (*Store) EnableDebug

func (st *Store) EnableDebug(debugEnabled bool) StoreInterface

EnableDebug - enables the debug option

func (*Store) QueuedTaskForceFail

func (store *Store) QueuedTaskForceFail(ctx context.Context, queuedTask TaskQueueInterface, waitMinutes int) error

func (*Store) QueuedTaskProcessWithContext added in v1.10.0

func (store *Store) QueuedTaskProcessWithContext(ctx context.Context, queuedTask TaskQueueInterface) (bool, error)

QueuedTaskProcessWithContext processes a queued task with context support. It checks if the handler implements TaskHandlerWithContext and uses that if available, otherwise falls back to the standard Handle() method for backward compatibility.

func (*Store) SetErrorHandler added in v1.10.0

func (st *Store) SetErrorHandler(handler func(queueName, taskID string, err error)) StoreInterface

SetErrorHandler - sets a custom error handler for queue processing errors

func (*Store) SqlCreateTaskDefinitionTable added in v1.10.0

func (st *Store) SqlCreateTaskDefinitionTable() string

SqlCreateTaskDefinitionTable - creates the task definition table

func (*Store) SqlCreateTaskQueueTable added in v1.10.0

func (st *Store) SqlCreateTaskQueueTable() string

SqlCreateTaskQueueTable - creates the task queue table

func (*Store) TaskDefinitionCount added in v1.10.0

func (store *Store) TaskDefinitionCount(ctx context.Context, options TaskDefinitionQueryInterface) (int64, error)

func (*Store) TaskDefinitionCreate added in v1.10.0

func (store *Store) TaskDefinitionCreate(ctx context.Context, task TaskDefinitionInterface) error

func (*Store) TaskDefinitionDelete added in v1.10.0

func (store *Store) TaskDefinitionDelete(ctx context.Context, task TaskDefinitionInterface) error

func (*Store) TaskDefinitionDeleteByID added in v1.10.0

func (store *Store) TaskDefinitionDeleteByID(ctx context.Context, id string) error

func (*Store) TaskDefinitionEnqueueByAlias added in v1.14.0

func (st *Store) TaskDefinitionEnqueueByAlias(
	ctx context.Context,
	queueName string,
	taskAlias string,
	parameters map[string]any,
) (TaskQueueInterface, error)

TaskDefinitionEnqueueByAlias finds a task by its alias and appends it to the queue

func (*Store) TaskDefinitionExecuteCli added in v1.14.0

func (store *Store) TaskDefinitionExecuteCli(alias string, args []string) bool

TaskDefinitionExecuteCli - CLI tool to find a task by its alias and execute its handler - alias "list" is reserved. it lists all the available commands

func (*Store) TaskDefinitionFindByAlias added in v1.10.0

func (store *Store) TaskDefinitionFindByAlias(ctx context.Context, alias string) (task TaskDefinitionInterface, err error)

func (*Store) TaskDefinitionFindByID added in v1.10.0

func (store *Store) TaskDefinitionFindByID(ctx context.Context, id string) (task TaskDefinitionInterface, err error)

func (*Store) TaskDefinitionList added in v1.10.0

func (store *Store) TaskDefinitionList(ctx context.Context, query TaskDefinitionQueryInterface) ([]TaskDefinitionInterface, error)

func (*Store) TaskDefinitionSoftDelete added in v1.10.0

func (store *Store) TaskDefinitionSoftDelete(ctx context.Context, task TaskDefinitionInterface) error

func (*Store) TaskDefinitionSoftDeleteByID added in v1.10.0

func (store *Store) TaskDefinitionSoftDeleteByID(ctx context.Context, id string) error

func (*Store) TaskDefinitionUpdate added in v1.10.0

func (store *Store) TaskDefinitionUpdate(ctx context.Context, task TaskDefinitionInterface) error

func (*Store) TaskHandlerAdd

func (store *Store) TaskHandlerAdd(ctx context.Context, taskHandler TaskHandlerInterface, createIfMissing bool) error

func (*Store) TaskHandlerList

func (store *Store) TaskHandlerList() []TaskHandlerInterface

func (*Store) TaskQueueClaimNext added in v1.10.0

func (store *Store) TaskQueueClaimNext(ctx context.Context, queueName string) (TaskQueueInterface, error)

TaskQueueClaimNext atomically claims the next queued task for processing. It uses SELECT FOR UPDATE within a transaction to prevent race conditions where multiple workers might try to process the same task.

Returns:

  • TaskQueueInterface: The claimed task (status updated to "running")
  • error: Any error that occurred during the operation

Returns (nil, nil) if no tasks are available to claim.

func (*Store) TaskQueueCount added in v1.10.0

func (store *Store) TaskQueueCount(ctx context.Context, options TaskQueueQueryInterface) (int64, error)

func (*Store) TaskQueueCreate added in v1.10.0

func (store *Store) TaskQueueCreate(ctx context.Context, queue TaskQueueInterface) error

TaskQueueCreate creates a queued task

func (*Store) TaskQueueDelete added in v1.10.0

func (store *Store) TaskQueueDelete(ctx context.Context, queue TaskQueueInterface) error

func (*Store) TaskQueueDeleteByID added in v1.10.0

func (st *Store) TaskQueueDeleteByID(ctx context.Context, id string) error

func (*Store) TaskQueueFail added in v1.10.0

func (st *Store) TaskQueueFail(ctx context.Context, queue TaskQueueInterface) error

TaskQueueFail fails a queued task

func (*Store) TaskQueueFindByID added in v1.10.0

func (store *Store) TaskQueueFindByID(ctx context.Context, id string) (TaskQueueInterface, error)

TaskQueueFindByID finds a Queue by ID

func (*Store) TaskQueueFindNextQueuedTask added in v1.10.0

func (store *Store) TaskQueueFindNextQueuedTask(ctx context.Context) (TaskQueueInterface, error)

func (*Store) TaskQueueFindNextQueuedTaskByQueue added in v1.10.0

func (store *Store) TaskQueueFindNextQueuedTaskByQueue(ctx context.Context, queueName string) (TaskQueueInterface, error)

func (*Store) TaskQueueFindRunning added in v1.10.0

func (store *Store) TaskQueueFindRunning(ctx context.Context, limit int) []TaskQueueInterface

func (*Store) TaskQueueFindRunningByQueue added in v1.10.0

func (store *Store) TaskQueueFindRunningByQueue(ctx context.Context, queueName string, limit int) []TaskQueueInterface

func (*Store) TaskQueueList added in v1.10.0

func (store *Store) TaskQueueList(ctx context.Context, query TaskQueueQueryInterface) ([]TaskQueueInterface, error)

func (*Store) TaskQueueProcessNext added in v1.10.0

func (store *Store) TaskQueueProcessNext(ctx context.Context) error

func (*Store) TaskQueueProcessNextAsyncByQueue added in v1.10.0

func (store *Store) TaskQueueProcessNextAsyncByQueue(ctx context.Context, queueName string) error

func (*Store) TaskQueueProcessNextByQueue added in v1.10.0

func (store *Store) TaskQueueProcessNextByQueue(ctx context.Context, queueName string) error

func (*Store) TaskQueueProcessTask added in v1.14.0

func (store *Store) TaskQueueProcessTask(ctx context.Context, queuedTask TaskQueueInterface) (bool, error)

func (*Store) TaskQueueRunConcurrent added in v1.14.0

func (store *Store) TaskQueueRunConcurrent(
	ctx context.Context,
	queueName string,
	processSeconds int,
	unstuckMinutes int,
)

TaskQueueRunConcurrent starts a queue processor that handles multiple tasks concurrently. Tasks are processed in parallel up to the configured MaxConcurrency limit. The processor runs in a background goroutine and can be stopped via TaskQueueStopByName.

func (*Store) TaskQueueRunDefault added in v1.14.0

func (store *Store) TaskQueueRunDefault(
	ctx context.Context,
	processSeconds int,
	unstuckMinutes int,
)

TaskQueueRunDefault starts the queue processor for the default queue. Equivalent to calling TaskQueueRunSerial with DefaultQueueName.

func (*Store) TaskQueueRunSerial added in v1.14.0

func (store *Store) TaskQueueRunSerial(
	ctx context.Context,
	queueName string,
	processSeconds int,
	unstuckMinutes int,
)

TaskQueueRunSerial starts a queue processor that handles tasks one at a time (serially). Each task must complete before the next one starts. The processor runs in a background goroutine and can be stopped via TaskQueueStopByName.

func (*Store) TaskQueueSoftDelete added in v1.10.0

func (store *Store) TaskQueueSoftDelete(ctx context.Context, queue TaskQueueInterface) error

func (*Store) TaskQueueSoftDeleteByID added in v1.10.0

func (store *Store) TaskQueueSoftDeleteByID(ctx context.Context, id string) error

func (*Store) TaskQueueStop added in v1.14.0

func (store *Store) TaskQueueStop()

TaskQueueStop stops the default queue processor. It blocks until the worker goroutine and all tasks have fully completed.

func (*Store) TaskQueueStopByName added in v1.14.0

func (store *Store) TaskQueueStopByName(queueName string)

TaskQueueStopByName stops the specified queue processor. It cancels the context, waits for the queue loop to exit, and waits for all in-flight tasks to complete.

func (*Store) TaskQueueSuccess added in v1.10.0

func (st *Store) TaskQueueSuccess(ctx context.Context, queue TaskQueueInterface) error

TaskQueueSuccess completes a queued task successfully

func (*Store) TaskQueueUnstuck added in v1.10.0

func (store *Store) TaskQueueUnstuck(ctx context.Context, waitMinutes int)

TaskQueueUnstuck clears the queue of tasks running for more than the specified wait time as most probably these have abnormally exited (panicked) and stop the rest of the queue from being processed

The tasks are marked as failed. However, if they are still running in the background and they are successfully completed, they will be marked as success

================================================================= Business Logic 1. Checks is there are running tasks in progress 2. If running for more than the specified wait minutes mark as failed =================================================================

func (*Store) TaskQueueUnstuckByQueue added in v1.10.0

func (store *Store) TaskQueueUnstuckByQueue(ctx context.Context, queueName string, waitMinutes int)

func (*Store) TaskQueueUpdate added in v1.10.0

func (store *Store) TaskQueueUpdate(ctx context.Context, queue TaskQueueInterface) error

TaskQueueUpdate creates a Queue

type StoreInterface

type StoreInterface interface {
	AutoMigrate() error
	EnableDebug(debug bool) StoreInterface
	SetErrorHandler(handler func(queueName, taskID string, err error)) StoreInterface

	TaskQueueCount(ctx context.Context, options TaskQueueQueryInterface) (int64, error)
	TaskQueueCreate(ctx context.Context, TaskQueue TaskQueueInterface) error
	TaskQueueDelete(ctx context.Context, TaskQueue TaskQueueInterface) error
	TaskQueueDeleteByID(ctx context.Context, id string) error
	TaskQueueFindByID(ctx context.Context, TaskQueueID string) (TaskQueueInterface, error)
	TaskQueueList(ctx context.Context, query TaskQueueQueryInterface) ([]TaskQueueInterface, error)
	TaskQueueSoftDelete(ctx context.Context, TaskQueue TaskQueueInterface) error
	TaskQueueSoftDeleteByID(ctx context.Context, id string) error
	TaskQueueUpdate(ctx context.Context, TaskQueue TaskQueueInterface) error

	TaskQueueRunDefault(ctx context.Context, processSeconds int, unstuckMinutes int)
	TaskQueueRunSerial(ctx context.Context, queueName string, processSeconds int, unstuckMinutes int)
	TaskQueueRunConcurrent(ctx context.Context, queueName string, processSeconds int, unstuckMinutes int)
	TaskQueueStop()
	TaskQueueStopByName(queueName string)
	TaskQueueProcessTask(ctx context.Context, queuedTask TaskQueueInterface) (bool, error)

	TaskDefinitionCount(ctx context.Context, options TaskDefinitionQueryInterface) (int64, error)
	TaskDefinitionCreate(ctx context.Context, TaskDefinition TaskDefinitionInterface) error
	TaskDefinitionDelete(ctx context.Context, TaskDefinition TaskDefinitionInterface) error
	TaskDefinitionDeleteByID(ctx context.Context, id string) error
	TaskDefinitionFindByAlias(ctx context.Context, alias string) (TaskDefinitionInterface, error)
	TaskDefinitionFindByID(ctx context.Context, id string) (TaskDefinitionInterface, error)
	TaskDefinitionList(ctx context.Context, options TaskDefinitionQueryInterface) ([]TaskDefinitionInterface, error)
	TaskDefinitionSoftDelete(ctx context.Context, TaskDefinition TaskDefinitionInterface) error
	TaskDefinitionSoftDeleteByID(ctx context.Context, id string) error
	TaskDefinitionUpdate(ctx context.Context, TaskDefinition TaskDefinitionInterface) error

	// TaskDefinition Operations
	TaskDefinitionEnqueueByAlias(ctx context.Context, queueName string, alias string, parameters map[string]any) (TaskQueueInterface, error)
	TaskDefinitionExecuteCli(alias string, args []string) bool

	TaskHandlerList() []TaskHandlerInterface
	TaskHandlerAdd(ctx context.Context, taskHandler TaskHandlerInterface, createIfMissing bool) error
}

type TaskDefinitionInterface added in v1.10.0

type TaskDefinitionInterface interface {
	Data() map[string]string
	DataChanged() map[string]string
	MarkAsNotDirty()

	IsActive() bool
	IsCanceled() bool
	IsSoftDeleted() bool

	Alias() string
	SetAlias(alias string) TaskDefinitionInterface

	CreatedAt() string
	CreatedAtCarbon() *carbon.Carbon
	SetCreatedAt(createdAt string) TaskDefinitionInterface

	Description() string
	SetDescription(description string) TaskDefinitionInterface

	ID() string
	SetID(id string) TaskDefinitionInterface

	Memo() string
	SetMemo(memo string) TaskDefinitionInterface

	IsRecurring() int
	SetIsRecurring(isRecurring int) TaskDefinitionInterface

	RecurrenceRule() string
	SetRecurrenceRule(recurrenceRule string) TaskDefinitionInterface

	SoftDeletedAt() string
	SoftDeletedAtCarbon() *carbon.Carbon
	SetSoftDeletedAt(deletedAt string) TaskDefinitionInterface

	Status() string
	SetStatus(status string) TaskDefinitionInterface

	Title() string
	SetTitle(title string) TaskDefinitionInterface

	UpdatedAt() string
	UpdatedAtCarbon() *carbon.Carbon
	SetUpdatedAt(updatedAt string) TaskDefinitionInterface
}

func NewTaskDefinition added in v1.10.0

func NewTaskDefinition() TaskDefinitionInterface

func NewTaskDefinitionFromExistingData added in v1.10.0

func NewTaskDefinitionFromExistingData(data map[string]string) TaskDefinitionInterface

type TaskDefinitionQueryInterface added in v1.10.0

type TaskDefinitionQueryInterface interface {
	Validate() error

	Columns() []string
	SetColumns(columns []string) TaskDefinitionQueryInterface

	HasCountOnly() bool
	IsCountOnly() bool
	SetCountOnly(countOnly bool) TaskDefinitionQueryInterface

	HasAlias() bool
	Alias() string
	SetAlias(alias string) TaskDefinitionQueryInterface

	HasCreatedAtGte() bool
	CreatedAtGte() string
	SetCreatedAtGte(createdAtGte string) TaskDefinitionQueryInterface

	HasCreatedAtLte() bool
	CreatedAtLte() string
	SetCreatedAtLte(createdAtLte string) TaskDefinitionQueryInterface

	HasID() bool
	ID() string
	SetID(id string) TaskDefinitionQueryInterface

	HasIDIn() bool
	IDIn() []string
	SetIDIn(idIn []string) TaskDefinitionQueryInterface

	HasLimit() bool
	Limit() int
	SetLimit(limit int) TaskDefinitionQueryInterface

	HasOffset() bool
	Offset() int
	SetOffset(offset int) TaskDefinitionQueryInterface

	HasSortOrder() bool
	SortOrder() string
	SetSortOrder(sortOrder string) TaskDefinitionQueryInterface

	HasOrderBy() bool
	OrderBy() string
	SetOrderBy(orderBy string) TaskDefinitionQueryInterface

	HasSoftDeletedIncluded() bool
	SoftDeletedIncluded() bool
	SetSoftDeletedIncluded(withDeleted bool) TaskDefinitionQueryInterface

	HasStatus() bool
	Status() string
	SetStatus(status string) TaskDefinitionQueryInterface

	HasStatusIn() bool
	StatusIn() []string
	SetStatusIn(statusIn []string) TaskDefinitionQueryInterface
}

func TaskDefinitionQuery added in v1.10.0

func TaskDefinitionQuery() TaskDefinitionQueryInterface

type TaskHandlerBase

type TaskHandlerBase struct {
	// contains filtered or unexported fields
}

func (*TaskHandlerBase) ErrorMessage

func (handler *TaskHandlerBase) ErrorMessage() string

func (*TaskHandlerBase) GetParam

func (handler *TaskHandlerBase) GetParam(paramName string) string

func (*TaskHandlerBase) GetParamArray

func (handler *TaskHandlerBase) GetParamArray(paramName string) []string

func (*TaskHandlerBase) HasQueuedTask

func (handler *TaskHandlerBase) HasQueuedTask() bool

func (*TaskHandlerBase) InfoMessage

func (handler *TaskHandlerBase) InfoMessage() string

func (*TaskHandlerBase) LogError

func (handler *TaskHandlerBase) LogError(message string)

func (*TaskHandlerBase) LogInfo

func (handler *TaskHandlerBase) LogInfo(message string)

func (*TaskHandlerBase) LogSuccess

func (handler *TaskHandlerBase) LogSuccess(message string)

func (*TaskHandlerBase) Options

func (handler *TaskHandlerBase) Options() map[string]string

func (*TaskHandlerBase) QueuedTask

func (handler *TaskHandlerBase) QueuedTask() TaskQueueInterface

func (*TaskHandlerBase) SetOptions

func (handler *TaskHandlerBase) SetOptions(options map[string]string)

func (*TaskHandlerBase) SetQueuedTask

func (handler *TaskHandlerBase) SetQueuedTask(queuedTask TaskQueueInterface)

func (*TaskHandlerBase) SuccessMessage

func (handler *TaskHandlerBase) SuccessMessage() string

type TaskHandlerInterface

type TaskHandlerInterface interface {
	Alias() string

	Title() string

	Description() string

	Handle() bool

	SetQueuedTask(queuedTask TaskQueueInterface)

	SetOptions(options map[string]string)
}

type TaskHandlerWithContext added in v1.10.0

type TaskHandlerWithContext interface {
	TaskHandlerInterface
	HandleWithContext(ctx context.Context) bool
}

TaskHandlerWithContext is an optional interface that task handlers can implement to receive context for cancellation support. This is backward compatible - handlers that don't implement this will continue to work using the standard Handle() method.

Example usage:

type MyHandler struct {
    TaskHandlerBase
}

func (h *MyHandler) HandleWithContext(ctx context.Context) bool {
    select {
    case <-ctx.Done():
        h.LogInfo("Task cancelled")
        return false
    case <-time.After(5 * time.Second):
        h.LogSuccess("Task completed")
        return true
    }
}

type TaskQueueInterface added in v1.10.0

type TaskQueueInterface interface {
	Data() map[string]string
	DataChanged() map[string]string
	MarkAsNotDirty()

	IsCanceled() bool
	IsDeleted() bool
	IsFailed() bool
	IsQueued() bool
	IsPaused() bool
	IsRunning() bool
	IsSuccess() bool
	IsSoftDeleted() bool

	Attempts() int
	SetAttempts(attempts int) TaskQueueInterface

	CompletedAt() string
	CompletedAtCarbon() *carbon.Carbon
	SetCompletedAt(completedAt string) TaskQueueInterface

	CreatedAt() string
	CreatedAtCarbon() *carbon.Carbon
	SetCreatedAt(createdAt string) TaskQueueInterface

	Details() string
	AppendDetails(details string) TaskQueueInterface
	SetDetails(details string) TaskQueueInterface

	ID() string
	SetID(id string) TaskQueueInterface

	Output() string
	SetOutput(output string) TaskQueueInterface

	Parameters() string
	SetParameters(parameters string) TaskQueueInterface
	ParametersMap() (map[string]string, error)
	SetParametersMap(parameters map[string]string) (TaskQueueInterface, error)

	SoftDeletedAt() string
	SoftDeletedAtCarbon() *carbon.Carbon
	SetSoftDeletedAt(deletedAt string) TaskQueueInterface

	StartedAt() string
	StartedAtCarbon() *carbon.Carbon
	SetStartedAt(startedAt string) TaskQueueInterface

	Status() string
	SetStatus(status string) TaskQueueInterface

	TaskID() string
	SetTaskID(taskID string) TaskQueueInterface

	UpdatedAt() string
	UpdatedAtCarbon() *carbon.Carbon
	SetUpdatedAt(updatedAt string) TaskQueueInterface

	QueueName() string
	SetQueueName(queueName string) TaskQueueInterface
}

func NewTaskQueue added in v1.10.0

func NewTaskQueue(queueName ...string) TaskQueueInterface

NewTaskQueue creates a new task queue If a queue name is provided, it will be used; otherwise DefaultQueueName is used.

func NewTaskQueueFromExistingData added in v1.10.0

func NewTaskQueueFromExistingData(data map[string]string) TaskQueueInterface

type TaskQueueQueryInterface added in v1.10.0

type TaskQueueQueryInterface interface {
	Validate() error

	Columns() []string
	SetColumns(columns []string) TaskQueueQueryInterface

	HasCountOnly() bool
	IsCountOnly() bool
	SetCountOnly(countOnly bool) TaskQueueQueryInterface

	HasCreatedAtGte() bool
	CreatedAtGte() string
	SetCreatedAtGte(createdAtGte string) TaskQueueQueryInterface

	HasCreatedAtLte() bool
	CreatedAtLte() string
	SetCreatedAtLte(createdAtLte string) TaskQueueQueryInterface

	HasID() bool
	ID() string
	SetID(id string) TaskQueueQueryInterface

	HasIDIn() bool
	IDIn() []string
	SetIDIn(idIn []string) TaskQueueQueryInterface

	HasLimit() bool
	Limit() int
	SetLimit(limit int) TaskQueueQueryInterface

	HasOffset() bool
	Offset() int
	SetOffset(offset int) TaskQueueQueryInterface

	HasSortOrder() bool
	SortOrder() string
	SetSortOrder(sortOrder string) TaskQueueQueryInterface

	HasOrderBy() bool
	OrderBy() string
	SetOrderBy(orderBy string) TaskQueueQueryInterface

	HasSoftDeletedIncluded() bool
	SoftDeletedIncluded() bool
	SetSoftDeletedIncluded(withDeleted bool) TaskQueueQueryInterface

	HasStatus() bool
	Status() string
	SetStatus(status string) TaskQueueQueryInterface

	HasStatusIn() bool
	StatusIn() []string
	SetStatusIn(statusIn []string) TaskQueueQueryInterface

	HasTaskID() bool
	TaskID() string
	SetTaskID(taskID string) TaskQueueQueryInterface

	HasQueueName() bool
	QueueName() string
	SetQueueName(queueName string) TaskQueueQueryInterface
}

func TaskQueueQuery added in v1.10.0

func TaskQueueQuery() TaskQueueQueryInterface

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL