bgjobx

package
v1.53.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2025 License: MIT Imports: 10 Imported by: 0

README

Package bgjobx

Пакет bgjobx предоставляет инструменты для работы с фоновыми задачами. Основные компоненты: клиент для работы с задачами, конфигурация воркеров, сбор метрик и логирование.

Данный пакет использует PostgreSQL для организации очередей задач. Для работы клиента необходимо применить к БД sql-файл.

Types

Client

Клиент для взаимодействия с фоновыми задачами. Воркер обнаруживает новую задачу и вызывает функцию-обработчик. При успехе – задание помечается выполненным. При ошибке – ретраи с экспоненциальной задержкой. После исчерпания попыток – перемещение в DLQ.

Методы:

NewClient(db DBProvider, logger log.Logger) *Client

Конструктор клиента

Upgrade(ctx context.Context, workerConfigs []WorkerConfig) error

Обновить конфигурацию воркеров. Остановить старые воркеры и запустить новые согласно переданным конфигурациям

Enqueue(ctx context.Context, req bgjob.EnqueueRequest) error

Добавить задачу в очередь

BulkEnqueue(ctx context.Context, list []bgjob.EnqueueRequest) error

Добавить список задач в очередь

Close()

Остановить все воркеры

WorkerConfig

Конфигурация обработчика задач

Поля:

  • Queue - имя очереди
  • Concurrency - количество параллельных обработчиков
  • PollInterval - интервал опроса очереди
  • Handle - функция-обработчик задачи

Метрики

  • Время выполнения задачи
  • Количество успешных выполнений
  • Количество ретраев
  • Количество перемещений в DLQ
  • Количество ошибок воркеров

Стандартный обработчик

NewDefaultHandler(adapter handler.SyncHandlerAdapter, metricStorage handler.MetricStorage) handler.Sync

Используется для добавления стандартных middleware в функцию-обработчик каждого воркера при создании воркеров.

Usage

Default usage flow
package main

import (
	"context"
	"log"
	"time"

	"github.com/txix-open/bgjob"
	"github.com/txix-open/isp-kit/app"
	"github.com/txix-open/isp-kit/bgjobx"
	"github.com/txix-open/isp-kit/bgjobx/handler"
	"github.com/txix-open/isp-kit/dbrx"
	"github.com/txix-open/isp-kit/dbx"
)

func main() {
	application, err := app.New()
	if err != nil {
		log.Fatal(err)
	}

	db := dbrx.New(application.Logger())
	err = db.Upgrade(application.Context(), dbx.Config{
		Host:     "127.0.0.1",
		Port:     "5432",
		Database: "test",
		Username: "test",
		Password: "test",
	})
	if err != nil {
		log.Fatal(err)
	}

	cli := bgjobx.NewClient(db, application.Logger())
	worker := bgjobx.WorkerConfig{
		Queue:        "test",
		Concurrency:  5,
		PollInterval: 1 * time.Second,
		Handle: handler.SyncHandlerAdapterFunc(func(ctx context.Context, job bgjob.Job) handler.Result {
			/* do some work */
			return handler.Complete()
		}),
	}
	err = cli.Upgrade(application.Context(), []bgjobx.WorkerConfig{worker})
	if err != nil {
		log.Fatal(err)
	}

	/* enqueue task */
	err = cli.Enqueue(application.Context(), bgjob.EnqueueRequest{
		Queue: "test",
		Type:  "some-type",
		Arg:   []byte(`{"hello": "world"}`),
	})
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewDefaultHandler added in v1.53.0

func NewDefaultHandler(adapter handler.SyncHandlerAdapter, metricStorage handler.MetricStorage) handler.Sync

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(db DBProvider, logger log.Logger) *Client

func (*Client) BulkEnqueue

func (c *Client) BulkEnqueue(ctx context.Context, list []bgjob.EnqueueRequest) error

func (*Client) Close

func (c *Client) Close()

func (*Client) Enqueue

func (c *Client) Enqueue(ctx context.Context, req bgjob.EnqueueRequest) error

func (*Client) Upgrade

func (c *Client) Upgrade(ctx context.Context, workerConfigs []WorkerConfig) error

type DBProvider

type DBProvider interface {
	DB() (*dbx.Client, error)
}

type Observer

type Observer struct {
	// contains filtered or unexported fields
}

func (Observer) JobCompleted

func (o Observer) JobCompleted(ctx context.Context, job bgjob.Job)

func (Observer) JobMovedToDlq

func (o Observer) JobMovedToDlq(ctx context.Context, job bgjob.Job, err error)

func (Observer) JobRescheduled

func (o Observer) JobRescheduled(ctx context.Context, job bgjob.Job, after time.Duration)

func (Observer) JobStarted

func (o Observer) JobStarted(ctx context.Context, job bgjob.Job)

func (Observer) JobWillBeRetried

func (o Observer) JobWillBeRetried(ctx context.Context, job bgjob.Job, after time.Duration, err error)

func (Observer) QueueIsEmpty

func (o Observer) QueueIsEmpty(ctx context.Context)

func (Observer) WorkerError

func (o Observer) WorkerError(ctx context.Context, err error)

type WorkerConfig

type WorkerConfig struct {
	Queue        string
	Concurrency  int
	PollInterval time.Duration
	Handle       handler.SyncHandlerAdapter
}

func (WorkerConfig) GetConcurrency

func (c WorkerConfig) GetConcurrency() int

func (WorkerConfig) GetPollInterval

func (c WorkerConfig) GetPollInterval() time.Duration

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL