handler

package
v1.67.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2026 License: MIT Imports: 8 Imported by: 0

README

Package handler

Пакет handler предоставляет инструменты для обработки результатов выполнения bgjob с использованием middleware.

Types

Sync

Структура Sync реализует обработчик фоновых задач с поддержкой middleware.

Methods:

NewSync(adapter SyncHandlerAdapter, middlewares ...Middleware) Sync

Конструктор синхронного обработчика, принимающий на вход адаптер бизнес-логики, который должен реализовывать интерфейс SyncHandlerAdapter или быть преобразованным к SyncHandlerAdapterFunc, если это функция-обработчик.

Стандартные Middleware:

  • Metrics(storage MetricStorage) Middleware – middleware для сбора метрик, регистрирующая время обработки. Принимает на вход хранилище метрик, реализующее интерфейс MetricStorage.
  • Recovery() Middleware – предотвращает падение сервиса при панике в обработчике, преобразуя ее в ошибку.
  • RequestId() Middleware – обеспечивает трассировку, берёт requestId из job.RequestId.
(r Sync) Handle(ctx context.Context, job bgjob.Job) bgjob.Result

Выполняет обработку сообщения.

Mux

Структура Mux реализует мультиплексор фоновых задач.

Methods:

NewMux() *Mux

Конструктор мультиплексора.

(m *Mux) Register(jobType string, handler SyncHandlerAdapter) *Mux

Выполняет регистрацию обработчика задачи по ее типу в мультиплексоре.

(m *Mux) Handle(ctx context.Context, job bgjob.Job) Result

Выполняет вызов обработчика задачи в зависимости от типа задачи. Если обработчик не зарегистрирован, отправляет задачу в DLQ с ошибкой bgjob.ErrUnknownType.

Functions

Reschedule(by RescheduleBy, opts ...RescheduleOption) Result

Отдает результат перепланировки задачи:

Методы перепланировки задач:

ByAfterTime(after time.Duration, currentTime time.Time) AfterTime

Перепланировка задачи на дату спустя указанное время.

ByCron(cronExpression string, currentTime time.Time) Cron

Перепланировка задачи по cron-выражению.

Опции перепланирования:

WithArg(arg []byte) RescheduleOption

Опция для указания аргумента при перепланировании.

Usage

Custom handler
package main

import (
    "context"
    "time"

    "github.com/txix-open/bgjob"
    "github.com/txix-open/isp-kit/bgjobx"
    "github.com/txix-open/isp-kit/bgjobx/handler"
)

type customHandler struct{}

func (h customHandler) Handle(ctx context.Context, job bgjob.Job) handler.Result {
  /* put here business logic */
  return handler.Reschedule(handler.ByAfterTime(10*time.Minutes, time.Now()))
}

func main() {
  var (
    metricStorage = NewMetricStorage() /* MetricStorage interface implementation */
    adapter       customHandler
  )

  syncHandler := handler.NewSync(adapter, []handler.Middleware{
    handler.Metrics(metricStorage),
    handler.Recovery(),
    handler.RequestId(),
  }...)

  /* handler's call for example */
  job := new(bgjob.Job) /* placeholder for example */
  syncHandler.Handle(context.Background(), job)
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AfterTime added in v1.63.0

type AfterTime struct {
	// contains filtered or unexported fields
}

AfterTime implements RescheduleBy for rescheduling after a fixed duration.

func ByAfterTime added in v1.63.0

func ByAfterTime(after time.Duration, currentTime time.Time) AfterTime

ByAfterTime creates an AfterTime reschedule option.

func (AfterTime) Reschedule added in v1.63.0

func (b AfterTime) Reschedule() (time.Duration, error)

Reschedule returns the fixed duration for rescheduling.

type Cron added in v1.63.0

type Cron struct {
	// contains filtered or unexported fields
}

Cron implements RescheduleBy using a cron expression to calculate the next run time.

func ByCron added in v1.63.0

func ByCron(cronExpression string, currentTime time.Time) Cron

ByCron creates a Cron reschedule option using a cron expression and current time.

func (Cron) Reschedule added in v1.63.0

func (b Cron) Reschedule() (time.Duration, error)

Reschedule calculates the duration until the next scheduled time based on the cron expression.

type MetricStorage

type MetricStorage interface {
	// ObserveExecuteDuration records the time taken to execute a job.
	ObserveExecuteDuration(queue string, jobType string, t time.Duration)
	// IncRetryCount increments the retry counter for a job type.
	IncRetryCount(queue string, jobType string)
	// IncDlqCount increments the dead letter queue counter for a job type.
	IncDlqCount(queue string, jobType string)
	// IncSuccessCount increments the success counter for a job type.
	IncSuccessCount(queue string, jobType string)
	// IncInternalErrorCount increments the internal worker error counter.
	IncInternalErrorCount()
}

MetricStorage defines the interface for recording job execution metrics. Implementations should track job performance and outcome statistics.

type Middleware

type Middleware func(next SyncHandlerAdapter) SyncHandlerAdapter

Middleware is a function that wraps a SyncHandlerAdapter with additional functionality. Middleware components can log, measure metrics, recover from panics, or modify the context before/after calling the next handler.

func Metrics added in v1.60.0

func Metrics(storage MetricStorage) Middleware

Metrics creates a middleware that records execution metrics. It measures the duration of job execution and reports success, retry, and DLQ events to the provided MetricStorage.

func Recovery

func Recovery() Middleware

Recovery creates a middleware that catches panics during job execution. When a panic occurs, the job is moved to the dead letter queue with the panic error. This prevents worker crashes from unhandled panics.

func RequestId added in v1.60.0

func RequestId() Middleware

RequestId creates a middleware that ensures request IDs are available in the handler context. If the job does not have a RequestId, it generates a new one. The request ID is added to the context for downstream logging and tracing.

type Mux added in v1.55.1

type Mux struct {
	// contains filtered or unexported fields
}

Mux routes jobs to different handlers based on their type. It maintains a registry of job types and their corresponding handlers, returning an error for unknown job types.

func NewMux added in v1.55.1

func NewMux() *Mux

NewMux creates a new empty Mux instance.

func (*Mux) Handle added in v1.55.1

func (m *Mux) Handle(ctx context.Context, job bgjob.Job) Result

Handle routes a job to the appropriate handler based on its type. If no handler is registered for the job type, it returns MoveToDlq with ErrUnknownType.

func (*Mux) Register added in v1.55.1

func (m *Mux) Register(jobType string, handler SyncHandlerAdapter) *Mux

Register associates a job type with its handler. Returns the Mux for method chaining.

type RescheduleBy added in v1.63.0

type RescheduleBy interface {
	Reschedule() (time.Duration, error)
}

RescheduleBy is an interface for types that can calculate a rescheduling delay.

type RescheduleOption added in v1.63.0

type RescheduleOption func(opt *rescheduleOptions)

RescheduleOption is a function type for configuring reschedule options.

func WithArg added in v1.63.0

func WithArg(arg []byte) RescheduleOption

WithArg configures a reschedule to include a new payload.

type Result

type Result struct {
	// Complete indicates the job was successfully processed.
	Complete bool
	// Err contains the error that occurred during processing.
	Err error
	// MoveToDlq indicates the job should be moved to the dead letter queue.
	MoveToDlq bool
	// Retry indicates the job should be retried.
	Retry bool
	// RetryDelay specifies the delay before retrying the job.
	RetryDelay time.Duration

	// Reschedule indicates the job should be rescheduled.
	Reschedule bool
	// RescheduleDelay specifies when to reschedule the job.
	RescheduleDelay time.Duration
	// RescheduleWithArg indicates the job should be rescheduled with a new argument.
	RescheduleWithArg bool
	// Arg contains the new payload for rescheduled jobs.
	Arg []byte
}

Result defines the outcome of job processing. One of the boolean flags (Complete, Retry, MoveToDlq, Reschedule, RescheduleWithArg) should be set to indicate the desired action.

func Complete

func Complete() Result

Complete returns a Result indicating successful job completion.

func MoveToDlq

func MoveToDlq(err error) Result

MoveToDlq returns a Result indicating the job should be moved to the dead letter queue.

func Reschedule

func Reschedule(by RescheduleBy, opts ...RescheduleOption) Result

Reschedule creates a Result for rescheduling a job at a future time. It uses the provided RescheduleBy implementation to calculate the delay. Optional RescheduleOption functions can be used to include a new payload. If the reschedule calculation fails, the job is moved to the DLQ.

func Retry

func Retry(after time.Duration, err error) Result

Retry returns a Result indicating the job should be retried after the specified delay.

type Sync

type Sync struct {
	// contains filtered or unexported fields
}

Sync wraps a SyncHandlerAdapter with a chain of Middleware functions. It ensures that middleware is applied in the correct order (last to first).

func NewSync

func NewSync(adapter SyncHandlerAdapter, middlewares ...Middleware) Sync

NewSync creates a new Sync instance with the provided adapter and middleware chain. Middleware is applied in reverse order, so the first middleware in the list will be the outermost wrapper.

func (Sync) Handle

func (r Sync) Handle(ctx context.Context, job bgjob.Job) bgjob.Result

Handle processes the job using the wrapped handler and middleware chain. It translates the Result into a bgjob.Result to control job lifecycle:

  • Complete: marks the job as successfully processed
  • Retry: schedules the job for retry after a delay
  • MoveToDlq: moves the job to the dead letter queue
  • Reschedule: reschedules the job for later execution

type SyncHandlerAdapter

type SyncHandlerAdapter interface {
	Handle(ctx context.Context, job bgjob.Job) Result
}

SyncHandlerAdapter defines the interface for synchronous job handlers. Implementations must process a job and return a Result indicating the desired action (complete, retry, reschedule, or move to DLQ).

type SyncHandlerAdapterFunc

type SyncHandlerAdapterFunc func(ctx context.Context, job bgjob.Job) Result

SyncHandlerAdapterFunc is an adapter type that allows regular functions to be used as SyncHandlerAdapter implementations.

func (SyncHandlerAdapterFunc) Handle

Handle calls the underlying function.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL