Documentation
¶
Index ¶
- Variables
- func FanOutTask(ctx context.Context, asyncClient Client, parentTask *asynq.Task, ...) error
- func TenantFanOut(ctx context.Context, task *asynq.Task, f ProcessFunc) error
- func TracingMiddleware(cfg config.Config) func(next asynq.HandlerFunc) asynq.HandlerFunc
- type App
- func (a *App) Client() *asynq.Client
- func (a *App) Enqueue(ctx context.Context, task *asynq.Task, opts ...asynq.Option) (*asynq.TaskInfo, error)
- func (a *App) Inspector() *asynq.Inspector
- func (a *App) RegisterTasks(ctx context.Context, handlers []TaskHandler)
- func (a *App) RunScheduler() error
- func (a *App) RunWorker(ctx context.Context, r repo.Repo) error
- func (a *App) Shutdown(ctx context.Context) error
- type BatchProcessor
- type BatchProcessorOptions
- type ChildTaskHandler
- type Client
- type FanOutFunc
- type FanOutHandler
- type Middleware
- type MockClient
- type ProcessFunc
- type ScheduledTaskConfigProvider
- type TaskHandler
- type TaskOption
- type TenantTaskHandler
Constants ¶
This section is empty.
Variables ¶
var ( ErrLoadingDatabaseHost = errors.New("error loading task queue host") ErrMTLSRedisClientOpt = errors.New("error redis client opt") ErrSecretTypeQueue = errors.New("unsupported secret type for task queue") ErrACLNotEnabled = errors.New("ACL is not enabled for task queue") ErrACLPassword = errors.New("ACL is not load password for redis client") ErrACLUsername = errors.New("ACL is not load username for redis client") )
var ( ErrEnqueueingTask = errors.New("enqueue task") ErrClientShutdown = errors.New("client shutdown") ErrStartingWorker = errors.New("starting worker") ErrCreatingScheduler = errors.New("creating scheduler") ErrRunningScheduler = errors.New("running scheduler") ErrReadingConfig = errors.New("error reading scheduler task config file") ErrInvalidConfig = errors.New("invalid scheduler task config") ErrNilTask = errors.New("task is nil") )
Functions ¶
func FanOutTask ¶ added in v0.8.0
func FanOutTask( ctx context.Context, asyncClient Client, parentTask *asynq.Task, payload asyncUtils.TaskPayload, opts ...asynq.Option, ) error
FanOutTask enqueues a child task for a task This is used to allow parallelism on tasks running in a loop manner Example of it's usage is for example in the ProcessTenantsInBatch to spawn a task for each tenant
func TenantFanOut ¶ added in v0.8.0
TenantFanOut extracts tenant from payload and injects it into context before executing
func TracingMiddleware ¶ added in v0.7.0
func TracingMiddleware(cfg config.Config) func(next asynq.HandlerFunc) asynq.HandlerFunc
Types ¶
type App ¶
type App struct {
// contains filtered or unexported fields
}
func (*App) Enqueue ¶
func (a *App) Enqueue( ctx context.Context, task *asynq.Task, opts ...asynq.Option, ) (*asynq.TaskInfo, error)
Enqueue is used to run tasks
func (*App) RegisterTasks ¶
func (a *App) RegisterTasks(ctx context.Context, handlers []TaskHandler)
RegisterTasks registers multiple task handlers
func (*App) RunScheduler ¶
RunScheduler starts the cron job scheduling It starts the cron related tasks defined in the schedulerTasksConfig
type BatchProcessor ¶ added in v0.8.0
type BatchProcessor struct {
// contains filtered or unexported fields
}
func NewBatchProcessor ¶ added in v0.8.0
func NewBatchProcessor(r repo.Repo, opts ...BatchProcessorOptions) *BatchProcessor
func (*BatchProcessor) ProcessTenantsInBatch ¶ added in v0.8.0
func (bp *BatchProcessor) ProcessTenantsInBatch( ctx context.Context, asynqTask *asynq.Task, processTenant func(ctx context.Context, task *asynq.Task) error, ) error
ProcessTenantsInBatch iterates through tenants in batches and applies the process function It tracks the total tenant count, logs batch progress, and logs task completion In fan-out mode, it enqueues child tasks instead of processing inline
type BatchProcessorOptions ¶ added in v0.8.0
type BatchProcessorOptions func(*BatchProcessor)
func WithFanOutTenants ¶ added in v0.8.0
func WithFanOutTenants(asyncClient Client, opts ...asynq.Option) BatchProcessorOptions
func WithTenantQuery ¶ added in v0.8.0
func WithTenantQuery(q *repo.Query) BatchProcessorOptions
type ChildTaskHandler ¶ added in v0.8.0
type ChildTaskHandler struct {
// contains filtered or unexported fields
}
ChildTaskHandler wraps a parent handler and executes its ProcessTask with custom fanout logic
func NewFanOutHandler ¶ added in v0.8.0
func NewFanOutHandler( parent TaskHandler, fanOutFunc func(ctx context.Context, task *asynq.Task, f ProcessFunc) error, ) *ChildTaskHandler
func (*ChildTaskHandler) IsFanOutEnabled ¶ added in v0.8.0
func (c *ChildTaskHandler) IsFanOutEnabled() bool
IsFanOutEnabled returns false for child tasks
func (*ChildTaskHandler) ProcessTask ¶ added in v0.8.0
func (*ChildTaskHandler) TaskType ¶ added in v0.8.0
func (c *ChildTaskHandler) TaskType() string
type FanOutFunc ¶ added in v0.8.0
type FanOutHandler ¶ added in v0.8.0
type FanOutHandler interface {
TaskHandler
FanOutFunc() FanOutFunc
}
FanOutHandler task into child tasks per tenant
type Middleware ¶ added in v0.7.0
type Middleware func(asynq.HandlerFunc) asynq.HandlerFunc
type MockClient ¶
MockClient implements the AsyncClient interface for testing
func (*MockClient) Close ¶
func (m *MockClient) Close() error
func (*MockClient) EnqueueContext ¶
func (*MockClient) Ping ¶
func (m *MockClient) Ping() error
type ProcessFunc ¶ added in v0.8.0
type ScheduledTaskConfigProvider ¶
ScheduledTaskConfigProvider implements asynq PeriodicTaskConfigProvider interface.
func (*ScheduledTaskConfigProvider) GetConfigs ¶
func (p *ScheduledTaskConfigProvider) GetConfigs() ([]*asynq.PeriodicTaskConfig, error)
GetConfigs Parses the yaml file and return a list of PeriodicTaskConfigs.
type TaskHandler ¶
type TaskHandler interface {
ProcessTask(ctx context.Context, task *asynq.Task) error
TaskType() string
}
TaskHandler defines the interface for handling async
type TaskOption ¶ added in v0.8.0
type TaskOption func(TenantTaskHandler)
type TenantTaskHandler ¶ added in v0.8.0
type TenantTaskHandler interface {
TaskHandler
FanOutHandler
TenantQuery() *repo.Query
}
TenantTaskHandler is a task that is run for a selection of tenants