runner

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2025 License: GPL-3.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrTaskRunnerAlreadyStarted = TaskRunnerError("ErrTaskRunnerAlreadyStarted")
	ErrRaceOccuredOnStart       = TaskRunnerError("ErrRaceOccuredOnStart")
	ErrTaskNotFound             = TaskRunnerError("TaskNotFound")
	ErrInvalidTaskPayload       = TaskRunnerError("ErrInvalidTaskPayload")

	ErrTaskMaxRetryExceed              = TaskRunnerError("ErrTaskMaxRetryExceed")
	ErrUniqueForIsRequired             = TaskRunnerError("ErrUniqueForIsRequired")
	ErrDurationIsSmallerThanCheckCycle = TaskRunnerError("ErrDurationIsSmallerThanCheckCycle")

	// It will happen when task is setted to be Unique and another task with same name and unique key dispached
	ErrTaskAlreadyDispatched = TaskRunnerError("ErrTaskAlreadyDispatched")
)
View Source
const ErrTaskUnlockFailed = TaskRunnerError("ErrTaskUnlockFailed")

Variables

This section is empty.

Functions

This section is empty.

Types

type DelayedTask added in v0.3.0

type DelayedTask struct {
	Id      string `json:"id"`
	Task    string `json:"task"`
	Time    int64  `json:"time"`
	Payload any    `json:"payload"`
}

type FailedTaskHandler

type FailedTaskHandler func(ctx context.Context, task TaskMessage, err error) error

type FlushFunc

type FlushFunc func(data []timingDto) error

type LongQueueHook

type LongQueueHook func(Stats)

type Stats

type Stats struct {
	PerTaskTiming     map[string]int64
	PredictedWaitTime float64
	AvgTiming         time.Duration
	AvgScheduleTiming int64
	// Estimated TPS, it's not actual tps
	TPS float64
}

type Task

type Task struct {
	Name     string
	MaxRetry int

	// When a task takes more than reservation time, woker will send a HeartBeat to queue, to prevent reClaim by
	ReservationTimeout time.Duration

	Action TaskAction `json:"-"`

	// If 'unique' is set to true, the task will not be dispatched if another instance of same task is already in the queue and has not finished processing.
	Unique bool

	// If 'unique' is set to true and a 'UniqueKey' is specified, the task with the same 'UniqueKey' will not be dispatched if another instance of it with the same 'UniqueKey' is already in the queue and has not finished processing.
	UniqueKey UniqueKeyFunc

	// If `UniqueFor` seconds have passed since a task with the same name and UniqueKey was enqueued and it has not finished processing yet, the new task with the same name and UniqueKey can now be dispatched.
	// this option is required when Unique is true
	UniqueFor int64
}

func (*Task) CreateMessage

func (t *Task) CreateMessage(paylaod any) TaskMessage

type TaskAction

type TaskAction func(ctx context.Context, payload any) error

type TaskExecutionError added in v0.2.0

type TaskExecutionError struct {
	// contains filtered or unexported fields
}

func NewTaskExecutionError added in v0.2.0

func NewTaskExecutionError(task string, err error) TaskExecutionError

func (TaskExecutionError) Error added in v0.2.0

func (t TaskExecutionError) Error() string

func (TaskExecutionError) GetError added in v0.2.0

func (t TaskExecutionError) GetError() error

func (TaskExecutionError) GetTaskName added in v0.2.0

func (t TaskExecutionError) GetTaskName() string

type TaskMessage

type TaskMessage struct {
	ID                 string `json:"-"`
	Unique             bool   `json:"unique"`
	UniqueFor          int64  `json:"unique_for"`
	UniqueKey          string `json:"unique_key"`
	UniqueLockValue    string `json:"unique_lock_value"`
	ReservationTimeout int64  `json:"reservation_timeout"`
	TaskName           string `json:"task_name"`
	Payload            any    `json:"payload"`
}

type TaskRunner

type TaskRunner struct {
	// contains filtered or unexported fields
}

func NewTaskRunner

func NewTaskRunner(cfg TaskRunnerConfig, client *redis.Client, queue contracts.MessageQueue) *TaskRunner

func (*TaskRunner) ConsumerGroup added in v0.3.0

func (t *TaskRunner) ConsumerGroup() string

func (*TaskRunner) Dispatch

func (t *TaskRunner) Dispatch(ctx context.Context, taskName string, payload any) error

func (*TaskRunner) DispatchDelayed added in v0.3.0

func (t *TaskRunner) DispatchDelayed(ctx context.Context, taskName string, payload any, d time.Duration) error

DispatchDelayed dispatches the task with delay Note: delay is not exact and may have 1-5 seconds error

func (*TaskRunner) ErrorChannel

func (t *TaskRunner) ErrorChannel() chan error

func (*TaskRunner) GetNumberOfReplications added in v0.3.1

func (t *TaskRunner) GetNumberOfReplications() (int, error)

func (*TaskRunner) GetQueue

func (t *TaskRunner) GetQueue() contracts.MessageQueue

func (*TaskRunner) GetTimingStatistics added in v0.3.0

func (t *TaskRunner) GetTimingStatistics() (Stats, error)

GetTimingStatistics return PerTaskTiming and other estimated statistics of the queue

func (*TaskRunner) IsLeader added in v0.3.0

func (t *TaskRunner) IsLeader() bool

func (*TaskRunner) RegisterTask

func (t *TaskRunner) RegisterTask(task *Task)

func (*TaskRunner) ScheduleFor added in v0.3.0

func (t *TaskRunner) ScheduleFor(ctx context.Context, taskName string, payload any, executionTime time.Time) error

func (*TaskRunner) Start

func (t *TaskRunner) Start(ctx context.Context) error

func (*TaskRunner) StartDelayedSchedule added in v0.3.0

func (t *TaskRunner) StartDelayedSchedule(ctx context.Context, batchSize int) error

func (*TaskRunner) StartElection added in v0.3.0

func (t *TaskRunner) StartElection(ctx context.Context)

type TaskRunnerConfig

type TaskRunnerConfig struct {
	// Optional
	Host string

	BatchSize       int
	ConsumerGroup   string
	ConsumersPrefix string

	NumWorkers  int
	NumFetchers int

	// ReplicationFactor Number of pod replicas configured, affecting metric calculations
	// Let T_avg be the average execution time of task, Q_len be the length of the queue, and W_num be the number of workers
	// The total execution time for the queue is estimated as (T_avg * Q_len) / (W_num * ReplicationFactor).
	// Deprecated: taskrunner automaticaly handles
	ReplicationFactor int

	FailedTaskHandler FailedTaskHandler

	LongQueueHook      LongQueueHook
	LongQueueThreshold time.Duration

	BlockDuration time.Duration

	// MetricsResetInterval defines how often timing metrics (sum and count) are reset.
	// If zero or negative, a default of 24 hours is used.
	MetricsResetInterval time.Duration
}

type TaskRunnerError

type TaskRunnerError string

func (TaskRunnerError) Error

func (tre TaskRunnerError) Error() string

type TimingBulkWriter

type TimingBulkWriter struct {
	// contains filtered or unexported fields
}

func NewBulkWriter

func NewBulkWriter(flushInterval time.Duration, flushFunc FlushFunc) *TimingBulkWriter

type UniqueKeyFunc

type UniqueKeyFunc func(payload any) string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL