Documentation
¶
Overview ¶
Package handler provides synchronous message processing with support for retry logic, commit handling, and middleware chains for cross-cutting concerns like logging, metrics, and panic recovery.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumerMetricStorage ¶
type ConsumerMetricStorage interface {
ObserveConsumeDuration(consumerGroup, topic string, t time.Duration)
ObserveConsumeMsgSize(consumerGroup, topic string, size int)
IncCommitCount(consumerGroup, topic string)
IncRetryCount(consumerGroup, topic string)
}
ConsumerMetricStorage defines the interface for consumer metrics storage.
type Middleware ¶
type Middleware func(next SyncHandlerAdapter) SyncHandlerAdapter
Middleware is a function that wraps a SyncHandlerAdapter to add cross-cutting functionality such as logging, metrics, or panic recovery.
func Log ¶
func Log(logger log.Logger) Middleware
Log creates a middleware that logs the outcome of message processing, including whether the message was committed, retried, or skipped.
func Metrics ¶
func Metrics(metricStorage ConsumerMetricStorage) Middleware
Metrics creates a middleware that records metrics for message processing, including duration, message size, and commit/retry counts.
func Recovery ¶ added in v1.53.0
func Recovery() Middleware
Recovery creates a middleware that recovers from panics during message processing. When a panic occurs, the message is marked for retry after a 15-second delay. It is recommended to place this middleware early in the chain to ensure all handlers are protected.
type Result ¶
Result defines the outcome of message processing, indicating whether to commit the offset, retry processing, or skip the message.
func Commit ¶
func Commit() Result
Commit returns a Result indicating the message should be committed.
type Sync ¶
type Sync struct {
// contains filtered or unexported fields
}
Sync wraps a SyncHandlerAdapter with middleware support and handles the message lifecycle including committing, retrying, or skipping based on the returned Result.
func NewSync ¶
func NewSync(logger log.Logger, adapter SyncHandlerAdapter, middlewares ...Middleware) Sync
NewSync creates a new Sync instance with the provided logger, adapter, and middlewares. Middlewares are applied in the order they are provided.
type SyncHandlerAdapter ¶
type SyncHandlerAdapter interface {
Handle(ctx context.Context, delivery *consumer.Delivery) Result
}
SyncHandlerAdapter defines the interface for synchronous message processing. Implementations should return a Result indicating whether to commit, retry, or skip the message.
type SyncHandlerAdapterFunc ¶
SyncHandlerAdapterFunc is an adapter that allows a function to be used as a SyncHandlerAdapter.