handler

package
v1.67.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2026 License: MIT Imports: 5 Imported by: 0

README

Package handler

Пакет handler предоставляет инструменты для обработки сообщений Kafka с поддержкой middleware, управления результатами обработки (коммит/ретрай) и интеграции с метриками/логированием. Предназначен для использования в консумера пакета kafkax.

Types

Sync

Основная структура для синхронной обработки сообщений. Обеспечивает:

  • Применение цепочки middleware
  • Обработку результатов (коммит или повтор)
  • Централизованное логирование ошибок

Methods:

NewSync(logger log.Logger, adapter SyncHandlerAdapter, middlewares ...Middleware) Sync

Конструктор синхронного обработчика, принимающий на вход адаптер бизнес-логики, который должен реализовывать интерфейс SyncHandlerAdapter.

Доступные middleware:

  • Metrics(metricStorage ConsumerMetricStorage) Middleware – сбор метрик времени обработки и размера сообщений, количества коммитов и ретраев.
  • Log(logger log.Logger) Middleware – логирование ключевых событий (успешные коммиты, отправка в ретрай с ошибкой).
  • Recovery() Middleware – предотвращает падение сервиса при панике в обработчике, преобразуя ее в ошибку.
(r Sync) Handle(ctx context.Context, delivery *consumer.Delivery)

Выполняет обработку сообщения. Автоматически:

  • Вызывает цепочку middleware
  • Обрабатывает результат (Commit, Retry или Nothing)
  • Логирует ошибки коммита

Usage

Default usage flow
package main

import (
	"context"
	"log"
	"time"

	"github.com/txix-open/isp-kit/kafkax/consumer"
	"github.com/txix-open/isp-kit/kafkax/handler"
	log2 "github.com/txix-open/isp-kit/log"
)

func processMessage(msg []byte) error {
	/* put here business logic */
	return nil
}

func noopHandler(ctx context.Context, d *consumer.Delivery) handler.Result {
	err := processMessage(d.Source().Value)
	if err != nil {
		return handler.Retry(5*time.Second, err)
	}
	return handler.Commit()
}

func main() {
	logger, err := log2.New()
	if err != nil {
		log.Fatal(err)
	}

	adapter := handler.SyncHandlerAdapterFunc(noopHandler)
	syncHandler := handler.NewSync(
		logger,
		adapter,
		handler.Log(logger),
	)

	/* handler's call for example */
	delivery := new(consumer.Delivery) /* placeholder for example */
	syncHandler.Handle(context.Background(), delivery)
}

Documentation

Overview

Package handler provides synchronous message processing with support for retry logic, commit handling, and middleware chains for cross-cutting concerns like logging, metrics, and panic recovery.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerMetricStorage

type ConsumerMetricStorage interface {
	ObserveConsumeDuration(consumerGroup, topic string, t time.Duration)
	ObserveConsumeMsgSize(consumerGroup, topic string, size int)
	IncCommitCount(consumerGroup, topic string)
	IncRetryCount(consumerGroup, topic string)
}

ConsumerMetricStorage defines the interface for consumer metrics storage.

type Middleware

type Middleware func(next SyncHandlerAdapter) SyncHandlerAdapter

Middleware is a function that wraps a SyncHandlerAdapter to add cross-cutting functionality such as logging, metrics, or panic recovery.

func Log

func Log(logger log.Logger) Middleware

Log creates a middleware that logs the outcome of message processing, including whether the message was committed, retried, or skipped.

func Metrics

func Metrics(metricStorage ConsumerMetricStorage) Middleware

Metrics creates a middleware that records metrics for message processing, including duration, message size, and commit/retry counts.

func Recovery added in v1.53.0

func Recovery() Middleware

Recovery creates a middleware that recovers from panics during message processing. When a panic occurs, the message is marked for retry after a 15-second delay. It is recommended to place this middleware early in the chain to ensure all handlers are protected.

type Result

type Result struct {
	Commit     bool
	Retry      bool
	RetryError error
	RetryAfter time.Duration
}

Result defines the outcome of message processing, indicating whether to commit the offset, retry processing, or skip the message.

func Commit

func Commit() Result

Commit returns a Result indicating the message should be committed.

func Nothing added in v1.66.1

func Nothing() Result

Nothing returns a Result indicating the message should be skipped without committing.

func Retry

func Retry(after time.Duration, err error) Result

Retry returns a Result indicating the message should be retried after the specified duration. The error will be logged for diagnostic purposes.

type Sync

type Sync struct {
	// contains filtered or unexported fields
}

Sync wraps a SyncHandlerAdapter with middleware support and handles the message lifecycle including committing, retrying, or skipping based on the returned Result.

func NewSync

func NewSync(logger log.Logger, adapter SyncHandlerAdapter, middlewares ...Middleware) Sync

NewSync creates a new Sync instance with the provided logger, adapter, and middlewares. Middlewares are applied in the order they are provided.

func (Sync) Handle

func (r Sync) Handle(ctx context.Context, delivery *consumer.Delivery)

Handle processes a message by calling the configured handler and acting on the returned Result. It supports automatic retry with backoff and error logging during offset committing.

type SyncHandlerAdapter

type SyncHandlerAdapter interface {
	Handle(ctx context.Context, delivery *consumer.Delivery) Result
}

SyncHandlerAdapter defines the interface for synchronous message processing. Implementations should return a Result indicating whether to commit, retry, or skip the message.

type SyncHandlerAdapterFunc

type SyncHandlerAdapterFunc func(ctx context.Context, delivery *consumer.Delivery) Result

SyncHandlerAdapterFunc is an adapter that allows a function to be used as a SyncHandlerAdapter.

func (SyncHandlerAdapterFunc) Handle

func (a SyncHandlerAdapterFunc) Handle(ctx context.Context, delivery *consumer.Delivery) Result

Handle implements the SyncHandlerAdapter interface by calling the underlying function.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL