handler

package
v1.63.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2025 License: MIT Imports: 8 Imported by: 0

README

Package handler

Пакет handler предоставляет инструменты для обработки результатов выполнения bgjob с использованием middleware.

Types

Sync

Структура Sync реализует обработчик фоновых задач с поддержкой middleware.

Methods:

NewSync(adapter SyncHandlerAdapter, middlewares ...Middleware) Sync

Конструктор синхронного обработчика, принимающий на вход адаптер бизнес-логики, который должен реализовывать интерфейс SyncHandlerAdapter или быть преобразованным к SyncHandlerAdapterFunc, если это функция-обработчик.

Стандартные Middleware:

  • Metrics(storage MetricStorage) Middleware – middleware для сбора метрик, регистрирующая время обработки. Принимает на вход хранилище метрик, реализующее интерфейс MetricStorage.
  • Recovery() Middleware – предотвращает падение сервиса при панике в обработчике, преобразуя ее в ошибку.
  • RequestId() Middleware – обеспечивает трассировку, берёт requestId из job.RequestId.
(r Sync) Handle(ctx context.Context, job bgjob.Job) bgjob.Result

Выполняет обработку сообщения.

Mux

Структура Mux реализует мультиплексор фоновых задач.

Methods:

NewMux() *Mux

Конструктор мультиплексора.

(m *Mux) Register(jobType string, handler SyncHandlerAdapter) *Mux

Выполняет регистрацию обработчика задачи по ее типу в мультиплексоре.

(m *Mux) Handle(ctx context.Context, job bgjob.Job) Result

Выполняет вызов обработчика задачи в зависимости от типа задачи. Если обработчик не зарегистрирован, отправляет задачу в DLQ с ошибкой bgjob.ErrUnknownType.

Functions

Reschedule(by RescheduleBy, opts ...RescheduleOption) Result

Отдает результат перепланировки задачи:

Методы перепланировки задач:

ByAfterTime(after time.Duration, currentTime time.Time) AfterTime

Перепланировка задачи на дату спустя указанное время.

ByCron(cronExpression string, currentTime time.Time) Cron

Перепланировка задачи по cron-выражению.

Опции перепланирования:

WithArg(arg []byte) RescheduleOption

Опция для указания аргумента при перепланировании.

Usage

Custom handler
package main

import (
    "context"
    "time"

    "github.com/txix-open/bgjob"
    "github.com/txix-open/isp-kit/bgjobx"
    "github.com/txix-open/isp-kit/bgjobx/handler"
)

type customHandler struct{}

func (h customHandler) Handle(ctx context.Context, job bgjob.Job) handler.Result {
  /* put here business logic */
  return handler.Reschedule(handler.ByAfterTime(10*time.Minutes, time.Now()))
}

func main() {
  var (
    metricStorage = NewMetricStorage() /* MetricStorage interface implementation */
    adapter       customHandler
  )

  syncHandler := handler.NewSync(adapter, []handler.Middleware{
    handler.Metrics(metricStorage),
    handler.Recovery(),
    handler.RequestId(),
  }...)

  /* handler's call for example */
  job := new(bgjob.Job) /* placeholder for example */
  syncHandler.Handle(context.Background(), job)
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AfterTime added in v1.63.0

type AfterTime struct {
	// contains filtered or unexported fields
}

func ByAfterTime added in v1.63.0

func ByAfterTime(after time.Duration, currentTime time.Time) AfterTime

func (AfterTime) Reschedule added in v1.63.0

func (b AfterTime) Reschedule() (time.Time, error)

type Cron added in v1.63.0

type Cron struct {
	// contains filtered or unexported fields
}

func ByCron added in v1.63.0

func ByCron(cronExpression string, currentTime time.Time) Cron

func (Cron) Reschedule added in v1.63.0

func (b Cron) Reschedule() (time.Time, error)

type MetricStorage

type MetricStorage interface {
	ObserveExecuteDuration(queue string, jobType string, t time.Duration)
	IncRetryCount(queue string, jobType string)
	IncDlqCount(queue string, jobType string)
	IncSuccessCount(queue string, jobType string)
	IncInternalErrorCount()
}

type Middleware

type Middleware func(next SyncHandlerAdapter) SyncHandlerAdapter

func Metrics added in v1.60.0

func Metrics(storage MetricStorage) Middleware

func Recovery

func Recovery() Middleware

nolint:nonamedreturns

func RequestId added in v1.60.0

func RequestId() Middleware

type Mux added in v1.55.1

type Mux struct {
	// contains filtered or unexported fields
}

func NewMux added in v1.55.1

func NewMux() *Mux

func (*Mux) Handle added in v1.55.1

func (m *Mux) Handle(ctx context.Context, job bgjob.Job) Result

func (*Mux) Register added in v1.55.1

func (m *Mux) Register(jobType string, handler SyncHandlerAdapter) *Mux

type RescheduleBy added in v1.63.0

type RescheduleBy interface {
	Reschedule() (time.Time, error)
}

type RescheduleOption added in v1.63.0

type RescheduleOption func(opt *rescheduleOptions)

func WithArg added in v1.63.0

func WithArg(arg []byte) RescheduleOption

type Result

type Result struct {
	Complete   bool
	Err        error
	MoveToDlq  bool
	Retry      bool
	RetryDelay time.Duration

	Reschedule        bool
	RescheduleDelay   time.Duration
	RescheduleWithArg bool
	Arg               []byte
	NextRunAt         time.Time
}

func Complete

func Complete() Result

func MoveToDlq

func MoveToDlq(err error) Result

func Reschedule

func Reschedule(by RescheduleBy, opts ...RescheduleOption) Result

func Retry

func Retry(after time.Duration, err error) Result

type Sync

type Sync struct {
	// contains filtered or unexported fields
}

func NewSync

func NewSync(adapter SyncHandlerAdapter, middlewares ...Middleware) Sync

func (Sync) Handle

func (r Sync) Handle(ctx context.Context, job bgjob.Job) bgjob.Result

type SyncHandlerAdapter

type SyncHandlerAdapter interface {
	Handle(ctx context.Context, job bgjob.Job) Result
}

type SyncHandlerAdapterFunc

type SyncHandlerAdapterFunc func(ctx context.Context, job bgjob.Job) Result

func (SyncHandlerAdapterFunc) Handle

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL