Documentation
¶
Index ¶
- Constants
- type DelayedTask
- type FailedTaskHandler
- type FlushFunc
- type LongQueueHook
- type Stats
- type Task
- type TaskAction
- type TaskExecutionError
- type TaskMessage
- type TaskRunner
- func (t *TaskRunner) ConsumerGroup() string
- func (t *TaskRunner) Dispatch(ctx context.Context, taskName string, payload any) error
- func (t *TaskRunner) DispatchDelayed(ctx context.Context, taskName string, payload any, d time.Duration) error
- func (t *TaskRunner) ErrorChannel() chan error
- func (t *TaskRunner) GetNumberOfReplications() (int, error)
- func (t *TaskRunner) GetQueue() contracts.MessageQueue
- func (t *TaskRunner) GetTimingStatistics() (Stats, error)
- func (t *TaskRunner) IsLeader() bool
- func (t *TaskRunner) RegisterTask(task *Task)
- func (t *TaskRunner) ScheduleFor(ctx context.Context, taskName string, payload any, executionTime time.Time) error
- func (t *TaskRunner) Start(ctx context.Context) error
- func (t *TaskRunner) StartDelayedSchedule(ctx context.Context, batchSize int) error
- func (t *TaskRunner) StartElection(ctx context.Context)
- type TaskRunnerConfig
- type TaskRunnerError
- type TimingBulkWriter
- type UniqueKeyFunc
Constants ¶
View Source
const ( ErrTaskRunnerAlreadyStarted = TaskRunnerError("ErrTaskRunnerAlreadyStarted") ErrRaceOccuredOnStart = TaskRunnerError("ErrRaceOccuredOnStart") ErrTaskNotFound = TaskRunnerError("TaskNotFound") ErrInvalidTaskPayload = TaskRunnerError("ErrInvalidTaskPayload") ErrTaskMaxRetryExceed = TaskRunnerError("ErrTaskMaxRetryExceed") ErrUniqueForIsRequired = TaskRunnerError("ErrUniqueForIsRequired") ErrDurationIsSmallerThanCheckCycle = TaskRunnerError("ErrDurationIsSmallerThanCheckCycle") // It will happen when task is setted to be Unique and another task with same name and unique key dispached ErrTaskAlreadyDispatched = TaskRunnerError("ErrTaskAlreadyDispatched") )
View Source
const ErrTaskUnlockFailed = TaskRunnerError("ErrTaskUnlockFailed")
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DelayedTask ¶ added in v0.3.0
type FailedTaskHandler ¶
type FailedTaskHandler func(ctx context.Context, task TaskMessage, err error) error
type LongQueueHook ¶
type LongQueueHook func(Stats)
type Task ¶
type Task struct {
Name string
MaxRetry int
// When a task takes more than reservation time, woker will send a HeartBeat to queue, to prevent reClaim by
ReservationTimeout time.Duration
Action TaskAction `json:"-"`
// If 'unique' is set to true, the task will not be dispatched if another instance of same task is already in the queue and has not finished processing.
Unique bool
// If 'unique' is set to true and a 'UniqueKey' is specified, the task with the same 'UniqueKey' will not be dispatched if another instance of it with the same 'UniqueKey' is already in the queue and has not finished processing.
UniqueKey UniqueKeyFunc
// If `UniqueFor` seconds have passed since a task with the same name and UniqueKey was enqueued and it has not finished processing yet, the new task with the same name and UniqueKey can now be dispatched.
// this option is required when Unique is true
UniqueFor int64
}
func (*Task) CreateMessage ¶
func (t *Task) CreateMessage(paylaod any) TaskMessage
type TaskExecutionError ¶ added in v0.2.0
type TaskExecutionError struct {
// contains filtered or unexported fields
}
func NewTaskExecutionError ¶ added in v0.2.0
func NewTaskExecutionError(task string, err error) TaskExecutionError
func (TaskExecutionError) Error ¶ added in v0.2.0
func (t TaskExecutionError) Error() string
func (TaskExecutionError) GetError ¶ added in v0.2.0
func (t TaskExecutionError) GetError() error
func (TaskExecutionError) GetTaskName ¶ added in v0.2.0
func (t TaskExecutionError) GetTaskName() string
type TaskMessage ¶
type TaskMessage struct {
ID string `json:"-"`
Unique bool `json:"unique"`
UniqueFor int64 `json:"unique_for"`
UniqueKey string `json:"unique_key"`
UniqueLockValue string `json:"unique_lock_value"`
ReservationTimeout int64 `json:"reservation_timeout"`
TaskName string `json:"task_name"`
Payload any `json:"payload"`
}
type TaskRunner ¶
type TaskRunner struct {
// contains filtered or unexported fields
}
func NewTaskRunner ¶
func NewTaskRunner(cfg TaskRunnerConfig, client *redis.Client, queue contracts.MessageQueue) *TaskRunner
func (*TaskRunner) ConsumerGroup ¶ added in v0.3.0
func (t *TaskRunner) ConsumerGroup() string
func (*TaskRunner) DispatchDelayed ¶ added in v0.3.0
func (t *TaskRunner) DispatchDelayed(ctx context.Context, taskName string, payload any, d time.Duration) error
DispatchDelayed dispatches the task with delay Note: delay is not exact and may have 1-5 seconds error
func (*TaskRunner) ErrorChannel ¶
func (t *TaskRunner) ErrorChannel() chan error
func (*TaskRunner) GetNumberOfReplications ¶ added in v0.3.1
func (t *TaskRunner) GetNumberOfReplications() (int, error)
func (*TaskRunner) GetQueue ¶
func (t *TaskRunner) GetQueue() contracts.MessageQueue
func (*TaskRunner) GetTimingStatistics ¶ added in v0.3.0
func (t *TaskRunner) GetTimingStatistics() (Stats, error)
GetTimingStatistics return PerTaskTiming and other estimated statistics of the queue
func (*TaskRunner) IsLeader ¶ added in v0.3.0
func (t *TaskRunner) IsLeader() bool
func (*TaskRunner) RegisterTask ¶
func (t *TaskRunner) RegisterTask(task *Task)
func (*TaskRunner) ScheduleFor ¶ added in v0.3.0
func (*TaskRunner) StartDelayedSchedule ¶ added in v0.3.0
func (t *TaskRunner) StartDelayedSchedule(ctx context.Context, batchSize int) error
func (*TaskRunner) StartElection ¶ added in v0.3.0
func (t *TaskRunner) StartElection(ctx context.Context)
type TaskRunnerConfig ¶
type TaskRunnerConfig struct {
// Optional
Host string
BatchSize int
ConsumerGroup string
ConsumersPrefix string
NumWorkers int
NumFetchers int
// ReplicationFactor Number of pod replicas configured, affecting metric calculations
// Let T_avg be the average execution time of task, Q_len be the length of the queue, and W_num be the number of workers
// The total execution time for the queue is estimated as (T_avg * Q_len) / (W_num * ReplicationFactor).
// Deprecated: taskrunner automaticaly handles
ReplicationFactor int
FailedTaskHandler FailedTaskHandler
LongQueueHook LongQueueHook
LongQueueThreshold time.Duration
BlockDuration time.Duration
// MetricsResetInterval defines how often timing metrics (sum and count) are reset.
// If zero or negative, a default of 24 hours is used.
MetricsResetInterval time.Duration
}
type TaskRunnerError ¶
type TaskRunnerError string
func (TaskRunnerError) Error ¶
func (tre TaskRunnerError) Error() string
type TimingBulkWriter ¶
type TimingBulkWriter struct {
// contains filtered or unexported fields
}
func NewBulkWriter ¶
func NewBulkWriter(flushInterval time.Duration, flushFunc FlushFunc) *TimingBulkWriter
type UniqueKeyFunc ¶
Click to show internal directories.
Click to hide internal directories.