Documentation
¶
Index ¶
- Constants
- Variables
- func MarshalPayload(payload any) ([]byte, error)
- func UnmarshalPayload(task *QueueTask, payload any) error
- type CronTask
- type CronTaskDispatcher
- type QueueTask
- type QueueTaskStatus
- type Task
- type TaskDispatcher
- type TaskFunc
- type TaskQueueConfig
- type TaskQueueConsumer
- type TaskQueueHandler
- type TaskQueueHandlerFunc
- type TaskQueuePublisher
- type TaskStatus
Constants ¶
View Source
const DefaultInitialPoolSize = 1024
View Source
const DefaultMaxPoolSize = math.MaxInt32
View Source
const Lens scene.InfraName = "asynctask"
Variables ¶
View Source
var ( ErrInternal = _eg.CreateError(1, "internal error") ErrTaskNotFound = _eg.CreateError(2, "task not found") ErrInvalidQueueTask = _eg.CreateError(3, "invalid queue task") ErrTaskHandlerNotFound = _eg.CreateError(4, "task handler not found") ErrQueueConfigConflict = _eg.CreateError(5, "task queue config conflict") ErrInvalidQueueName = _eg.CreateError(6, "invalid queue name") ErrQueueNotRegistered = _eg.CreateError(7, "task queue not registered") ErrInvalidTaskType = _eg.CreateError(8, "invalid task type") ErrInvalidTaskHandler = _eg.CreateError(9, "invalid task handler") ErrInvalidQueuePayload = _eg.CreateError(10, "invalid queue payload") )
Functions ¶
func MarshalPayload ¶ added in v0.3.4
func UnmarshalPayload ¶ added in v0.3.4
Types ¶
type CronTask ¶
type CronTask struct {
Name string // Use Identifier method to access
Description string
Func TaskFunc
Total uint64
ErrCount uint64
}
func (*CronTask) Identifier ¶ added in v0.2.10
Identifier is the unique identifier getter, if this CronTask already set a name. the name will be the identifier
type CronTaskDispatcher ¶
type CronTaskDispatcher interface {
scene.Named
// Add will add task with a generated name common uuid
Add(spec string, cmd TaskFunc) (*CronTask, error)
// AddWithName will add task with specific name. name should be unique
AddWithName(spec string, name string, cmd TaskFunc) (*CronTask, error)
// AddTask is the underlying implementation for Add and AddWithName
AddTask(spec string, task *CronTask) error
// Cancel will cancel task have specific identifier
Cancel(id string) error
// GetTask will return the underlying task, not copy of the task info.
// which means user can modify TaskFunc if they need
GetTask(id string) (*CronTask, error)
}
CronTaskDispatcher is a service which will handle all cron task
type QueueTask ¶ added in v0.3.4
type QueueTask struct {
ID string `json:"id"`
// Queue is the logical queue name.
// It defines the consumption domain and queue-level runtime config.
Queue string `json:"queue"`
// Type is the task type inside a queue.
// Consumers use it to route the task to a specific handler.
Type string `json:"type"`
// Key is an optional business grouping key.
// It can be used by backends or future schedulers for partitioning,
// deduplication, or serializing tasks for the same business object.
Key string `json:"key,omitempty"`
// Payload is the serialized task body, usually JSON.
Payload []byte `json:"payload,omitempty"`
// Headers stores optional metadata for tracing or backend-specific routing.
Headers map[string]string `json:"headers,omitempty"`
// Delay requests delayed execution.
// The exact behavior depends on the backend implementation.
Delay time.Duration `json:"delay,omitempty"`
// Timeout is the handler execution timeout for this task.
Timeout time.Duration `json:"timeout,omitempty"`
// MaxRetry overrides the queue-level retry limit when greater than zero.
MaxRetry int `json:"max_retry,omitempty"`
// Attempt is the current retry attempt count.
Attempt int `json:"attempt,omitempty"`
Status QueueTaskStatus `json:"status,omitempty"`
LastError string `json:"last_error,omitempty"`
CreatedAt time.Time `json:"created_at"`
AvailableAt time.Time `json:"available_at"`
}
func (*QueueTask) Identifier ¶ added in v0.3.4
type QueueTaskStatus ¶ added in v0.3.4
type QueueTaskStatus string
const ( QueueTaskStatusPending QueueTaskStatus = "pending" QueueTaskStatusRunning QueueTaskStatus = "running" QueueTaskStatusRetrying QueueTaskStatus = "retrying" QueueTaskStatusSucceeded QueueTaskStatus = "succeeded" QueueTaskStatusFailed QueueTaskStatus = "failed" )
type Task ¶
func (*Task) Identifier ¶ added in v0.3.2
func (*Task) SetStatus ¶
func (t *Task) SetStatus(status TaskStatus)
func (*Task) Status ¶
func (t *Task) Status() TaskStatus
type TaskDispatcher ¶
type TaskQueueConfig ¶ added in v0.3.4
type TaskQueueConfig struct {
// Concurrency is the number of workers consuming the logical queue.
Concurrency int
// MaxRetry is the default retry limit for tasks in the queue.
MaxRetry int
// RetryDelay is the default delay before retrying a failed task.
RetryDelay time.Duration
// BufferSize is the in-memory queue buffer size.
// It is only meaningful for in-process backends such as memoryqueue.
BufferSize int
}
type TaskQueueConsumer ¶ added in v0.3.4
type TaskQueueConsumer interface {
scene.Named
// RegisterQueue defines the config of a logical queue.
// Re-registering the same queue requires an identical config.
RegisterQueue(queue string, config TaskQueueConfig) error
// RegisterHandler binds a task type to a logical queue.
// The queue must be registered before handlers are attached.
RegisterHandler(queue string, taskType string, handler TaskQueueHandler) error
}
TaskQueueConsumer registers queues and task handlers, then starts consuming from queues.
type TaskQueueHandler ¶ added in v0.3.4
TaskQueueHandler processes a single queue task delivered by a consumer.
type TaskQueueHandlerFunc ¶ added in v0.3.4
func (TaskQueueHandlerFunc) HandleTask ¶ added in v0.3.4
func (f TaskQueueHandlerFunc) HandleTask(ctx context.Context, task *QueueTask) error
type TaskQueuePublisher ¶ added in v0.3.4
type TaskQueuePublisher interface {
scene.Named
// Publish enqueues a task to the backend and returns the stored task model.
// The implementation may populate fields such as ID, CreatedAt and Status.
Publish(ctx context.Context, task *QueueTask) (*QueueTask, error)
}
TaskQueuePublisher publishes queue tasks into a backend implementation. Implementations should treat Queue and Type as required routing metadata.
type TaskStatus ¶
type TaskStatus int32
const ( TaskStatusQueue TaskStatus = iota TaskStatusRunning TaskStatusFinish )
Click to show internal directories.
Click to hide internal directories.