bgjobx

package
v1.67.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2026 License: MIT Imports: 11 Imported by: 0

README

Package bgjobx

Пакет bgjobx предоставляет инструменты для работы с фоновыми задачами. Основные компоненты: клиент для работы с задачами, конфигурация воркеров, сбор метрик и логирование.

Данный пакет использует PostgreSQL для организации очередей задач. Для работы клиента необходимо применить к БД sql-файл.

Types

Client

Клиент для взаимодействия с фоновыми задачами. Воркер обнаруживает новую задачу и вызывает функцию-обработчик. При успехе – задание помечается выполненным. При ошибке – ретраи с экспоненциальной задержкой. После исчерпания попыток – перемещение в DLQ.

Методы:

NewClient(db DBProvider, logger log.Logger) *Client

Конструктор клиента

Upgrade(ctx context.Context, workerConfigs []WorkerConfig) error

Обновить конфигурацию воркеров. Остановить старые воркеры и запустить новые согласно переданным конфигурациям

Enqueue(ctx context.Context, req bgjob.EnqueueRequest) error

Добавить задачу в очередь

BulkEnqueue(ctx context.Context, list []bgjob.EnqueueRequest) error

Добавить список задач в очередь

Close()

Остановить все воркеры

WorkerConfig

Конфигурация обработчика задач

Поля:

  • Queue - имя очереди
  • Concurrency - количество параллельных обработчиков
  • PollInterval - интервал опроса очереди
  • Handle - функция-обработчик задачи

Метрики

  • Время выполнения задачи
  • Количество успешных выполнений
  • Количество ретраев
  • Количество перемещений в DLQ
  • Количество ошибок воркеров

Стандартный обработчик

NewDefaultHandler(adapter handler.SyncHandlerAdapter, metricStorage handler.MetricStorage) handler.Sync

Используется для добавления стандартных middleware в функцию-обработчик каждого воркера при создании воркеров.

Usage

Default usage flow
package main

import (
	"context"
	"log"
	"time"

	"github.com/txix-open/bgjob"
	"github.com/txix-open/isp-kit/app"
	"github.com/txix-open/isp-kit/bgjobx"
	"github.com/txix-open/isp-kit/bgjobx/handler"
	"github.com/txix-open/isp-kit/dbrx"
	"github.com/txix-open/isp-kit/dbx"
)

func main() {
	application, err := app.New()
	if err != nil {
		log.Fatal(err)
	}

	db := dbrx.New(application.Logger())
	err = db.Upgrade(application.Context(), dbx.Config{
		Host:     "127.0.0.1",
		Port:     5432,
		Database: "test",
		Username: "test",
		Password: "test",
	})
	if err != nil {
		log.Fatal(err)
	}

	cli := bgjobx.NewClient(db, application.Logger())
	worker := bgjobx.WorkerConfig{
		Queue:        "test",
		Concurrency:  5,
		PollInterval: 1 * time.Second,
		Handle: handler.SyncHandlerAdapterFunc(func(ctx context.Context, job bgjob.Job) handler.Result {
			/* do some work */
			return handler.Complete()
		}),
	}
	err = cli.Upgrade(application.Context(), []bgjobx.WorkerConfig{worker})
	if err != nil {
		log.Fatal(err)
	}

	/* enqueue task */
	err = cli.Enqueue(application.Context(), bgjob.EnqueueRequest{
		Queue: "test",
		Type:  "some-type",
		Arg:   []byte(`{"hello": "world"}`),
	})
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewDefaultHandler added in v1.53.0

func NewDefaultHandler(adapter handler.SyncHandlerAdapter, metricStorage handler.MetricStorage) handler.Sync

NewDefaultHandler creates a handler with standard middleware applied. It wraps the provided adapter with metrics collection, panic recovery, and request ID propagation middleware.

The middleware stack is applied in the following order:

  1. RequestId - propagates request IDs to the context
  2. Recovery - catches panics and moves jobs to DLQ
  3. Metrics - records execution duration and job outcomes

Returns a Sync handler ready to be used with workers.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client manages background job workers and provides functionality for enqueuing and processing jobs. It handles worker lifecycle, including startup, graceful shutdown, and configuration updates.

The Client is safe for concurrent use by multiple goroutines.

func NewClient

func NewClient(db DBProvider, logger log.Logger) *Client

NewClient creates a new Client instance with the provided database provider and logger. The client is ready to be configured with workers via Upgrade.

func (*Client) BulkEnqueue

func (c *Client) BulkEnqueue(ctx context.Context, list []bgjob.EnqueueRequest) error

BulkEnqueue adds multiple jobs to the background job queue in a single operation. If any request in the list does not contain a RequestId, it inherits the main RequestId from the context (or generates one if not present).

Returns an error if the database connection cannot be established.

func (*Client) Close

func (c *Client) Close()

Close gracefully shuts down all running workers. This method should be called when the application is stopping to ensure in-flight jobs are properly handled.

func (*Client) Enqueue

func (c *Client) Enqueue(ctx context.Context, req bgjob.EnqueueRequest) error

Enqueue adds a single job to the background job queue. If the request does not contain a RequestId, it attempts to extract one from the context, or generates a new one if none is present.

Returns an error if the database connection cannot be established.

func (*Client) Upgrade

func (c *Client) Upgrade(ctx context.Context, workerConfigs []WorkerConfig) error

Upgrade configures and starts workers based on the provided worker configurations. It gracefully stops any previously running workers before starting the new ones. Each worker polls its designated queue for jobs and processes them using the configured handler.

Returns an error if the database connection cannot be established or if the job store cannot be created.

type DBProvider

type DBProvider interface {
	DB() (*dbx.Client, error)
}

DBProvider defines an interface for obtaining a database client.

type Observer

type Observer struct {
	bgjob.NoopObserver
	// contains filtered or unexported fields
}

Observer implements the bgjob.Observer interface to provide logging and metrics collection for job lifecycle events. It records job state changes such as completion, retries, and failures to the dead letter queue.

func (Observer) JobCompleted

func (o Observer) JobCompleted(ctx context.Context, job bgjob.Job)

JobCompleted is called when a job finishes successfully. It logs the completion and increments the success counter for the queue and job type.

func (Observer) JobMovedToDlq

func (o Observer) JobMovedToDlq(ctx context.Context, job bgjob.Job, err error)

JobMovedToDlq is called when a job is moved to the dead letter queue after exhausting all retry attempts. It logs the error and increments the DLQ counter.

func (Observer) JobRescheduled

func (o Observer) JobRescheduled(ctx context.Context, job bgjob.Job, after time.Duration)

JobRescheduled is called when a job is rescheduled for future execution. It logs the rescheduled time for the job.

func (Observer) JobStarted

func (o Observer) JobStarted(ctx context.Context, job bgjob.Job)

JobStarted is called when a job begins processing. It logs the job ID, request ID, and job type at debug level.

func (Observer) JobWillBeRetried

func (o Observer) JobWillBeRetried(ctx context.Context, job bgjob.Job, after time.Duration, err error)

JobWillBeRetried is called when a job fails and is scheduled for retry. It logs the error and the retry delay, and increments the retry counter.

func (Observer) WorkerError

func (o Observer) WorkerError(ctx context.Context, err error)

WorkerError is called when an unexpected error occurs in the worker. It logs the error and increments the internal error counter.

type WorkerConfig

type WorkerConfig struct {
	Queue        string
	Concurrency  int
	PollInterval time.Duration
	Handle       handler.SyncHandlerAdapter
}

WorkerConfig defines the configuration for a background job worker. It specifies the queue to poll, concurrency level, polling interval, and the handler that processes jobs.

func (WorkerConfig) GetConcurrency

func (c WorkerConfig) GetConcurrency() int

GetConcurrency returns the configured concurrency level. If Concurrency is not set or is less than or equal to zero, it returns 1.

func (WorkerConfig) GetPollInterval

func (c WorkerConfig) GetPollInterval() time.Duration

GetPollInterval returns the configured polling interval. If PollInterval is not set or is less than or equal to zero, it returns 1 second.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL