asynctask

package
v0.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2026 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultInitialPoolSize = 1024
View Source
const DefaultMaxPoolSize = math.MaxInt32
View Source
const Lens scene.InfraName = "asynctask"

Variables

View Source
var (
	ErrInternal            = _eg.CreateError(1, "internal error")
	ErrTaskNotFound        = _eg.CreateError(2, "task not found")
	ErrInvalidQueueTask    = _eg.CreateError(3, "invalid queue task")
	ErrTaskHandlerNotFound = _eg.CreateError(4, "task handler not found")
	ErrQueueConfigConflict = _eg.CreateError(5, "task queue config conflict")
	ErrInvalidQueueName    = _eg.CreateError(6, "invalid queue name")
	ErrQueueNotRegistered  = _eg.CreateError(7, "task queue not registered")
	ErrInvalidTaskType     = _eg.CreateError(8, "invalid task type")
	ErrInvalidTaskHandler  = _eg.CreateError(9, "invalid task handler")
	ErrInvalidQueuePayload = _eg.CreateError(10, "invalid queue payload")
)

Functions

func MarshalPayload added in v0.3.4

func MarshalPayload(payload any) ([]byte, error)

func UnmarshalPayload added in v0.3.4

func UnmarshalPayload(task *QueueTask, payload any) error

Types

type CronTask

type CronTask struct {
	Name        string // Use Identifier method to access
	Description string
	Func        TaskFunc
	Total       uint64
	ErrCount    uint64
}

func (*CronTask) Identifier added in v0.2.10

func (t *CronTask) Identifier() string

Identifier is the unique identifier getter, if this CronTask already set a name. the name will be the identifier

type CronTaskDispatcher

type CronTaskDispatcher interface {
	scene.Named
	// Add will add task with a generated name common uuid
	Add(spec string, cmd TaskFunc) (*CronTask, error)
	// AddWithName will add task with specific name. name should be unique
	AddWithName(spec string, name string, cmd TaskFunc) (*CronTask, error)
	// AddTask is the underlying implementation for Add and AddWithName
	AddTask(spec string, task *CronTask) error
	// Cancel will cancel task have specific identifier
	Cancel(id string) error
	// GetTask will return the underlying task, not copy of the task info.
	// which means user can modify TaskFunc if they need
	GetTask(id string) (*CronTask, error)
}

CronTaskDispatcher is a service which will handle all cron task

type QueueTask added in v0.3.4

type QueueTask struct {
	ID string `json:"id"`
	// Queue is the logical queue name.
	// It defines the consumption domain and queue-level runtime config.
	Queue string `json:"queue"`
	// Type is the task type inside a queue.
	// Consumers use it to route the task to a specific handler.
	Type string `json:"type"`
	// Key is an optional business grouping key.
	// It can be used by backends or future schedulers for partitioning,
	// deduplication, or serializing tasks for the same business object.
	Key string `json:"key,omitempty"`
	// Payload is the serialized task body, usually JSON.
	Payload []byte `json:"payload,omitempty"`
	// Headers stores optional metadata for tracing or backend-specific routing.
	Headers map[string]string `json:"headers,omitempty"`
	// Delay requests delayed execution.
	// The exact behavior depends on the backend implementation.
	Delay time.Duration `json:"delay,omitempty"`
	// Timeout is the handler execution timeout for this task.
	Timeout time.Duration `json:"timeout,omitempty"`
	// MaxRetry overrides the queue-level retry limit when greater than zero.
	MaxRetry int `json:"max_retry,omitempty"`
	// Attempt is the current retry attempt count.
	Attempt     int             `json:"attempt,omitempty"`
	Status      QueueTaskStatus `json:"status,omitempty"`
	LastError   string          `json:"last_error,omitempty"`
	CreatedAt   time.Time       `json:"created_at"`
	AvailableAt time.Time       `json:"available_at"`
}

func (*QueueTask) Identifier added in v0.3.4

func (t *QueueTask) Identifier() string

type QueueTaskStatus added in v0.3.4

type QueueTaskStatus string
const (
	QueueTaskStatusPending   QueueTaskStatus = "pending"
	QueueTaskStatusRunning   QueueTaskStatus = "running"
	QueueTaskStatusRetrying  QueueTaskStatus = "retrying"
	QueueTaskStatusSucceeded QueueTaskStatus = "succeeded"
	QueueTaskStatusFailed    QueueTaskStatus = "failed"
)

type Task

type Task struct {
	Name string
	Func TaskFunc
	Err  error
	// contains filtered or unexported fields
}

func (*Task) Identifier added in v0.3.2

func (t *Task) Identifier() string

func (*Task) SetStatus

func (t *Task) SetStatus(status TaskStatus)

func (*Task) Status

func (t *Task) Status() TaskStatus

type TaskDispatcher

type TaskDispatcher interface {
	scene.Named
	// Run run task once, the task should be ended in expected time
	Run(task TaskFunc) *Task
	// RunTask run task once, the task should be ended in expected time
	RunTask(task *Task)
}

type TaskFunc

type TaskFunc func() error

type TaskQueueConfig added in v0.3.4

type TaskQueueConfig struct {
	// Concurrency is the number of workers consuming the logical queue.
	Concurrency int
	// MaxRetry is the default retry limit for tasks in the queue.
	MaxRetry int
	// RetryDelay is the default delay before retrying a failed task.
	RetryDelay time.Duration
	// BufferSize is the in-memory queue buffer size.
	// It is only meaningful for in-process backends such as memoryqueue.
	BufferSize int
}

type TaskQueueConsumer added in v0.3.4

type TaskQueueConsumer interface {
	scene.Named
	// RegisterQueue defines the config of a logical queue.
	// Re-registering the same queue requires an identical config.
	RegisterQueue(queue string, config TaskQueueConfig) error
	// RegisterHandler binds a task type to a logical queue.
	// The queue must be registered before handlers are attached.
	RegisterHandler(queue string, taskType string, handler TaskQueueHandler) error
}

TaskQueueConsumer registers queues and task handlers, then starts consuming from queues.

type TaskQueueHandler added in v0.3.4

type TaskQueueHandler interface {
	HandleTask(ctx context.Context, task *QueueTask) error
}

TaskQueueHandler processes a single queue task delivered by a consumer.

type TaskQueueHandlerFunc added in v0.3.4

type TaskQueueHandlerFunc func(ctx context.Context, task *QueueTask) error

func (TaskQueueHandlerFunc) HandleTask added in v0.3.4

func (f TaskQueueHandlerFunc) HandleTask(ctx context.Context, task *QueueTask) error

type TaskQueuePublisher added in v0.3.4

type TaskQueuePublisher interface {
	scene.Named
	// Publish enqueues a task to the backend and returns the stored task model.
	// The implementation may populate fields such as ID, CreatedAt and Status.
	Publish(ctx context.Context, task *QueueTask) (*QueueTask, error)
}

TaskQueuePublisher publishes queue tasks into a backend implementation. Implementations should treat Queue and Type as required routing metadata.

type TaskStatus

type TaskStatus int32
const (
	TaskStatusQueue TaskStatus = iota
	TaskStatusRunning
	TaskStatusFinish
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL