Documentation
¶
Overview ¶
Package batch_handler provides batch message processing for RabbitMQ consumers.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BatchItem ¶
type BatchItem struct {
// Context contains the request context.
Context context.Context
// Delivery contains the raw message delivery from the consumer.
Delivery *consumer.Delivery
// Result stores the processing outcome.
Result Result
}
BatchItem represents a single message in a batch with its processing result.
func (*BatchItem) Ack ¶
func (b *BatchItem) Ack()
Ack sets the result to indicate successful message acknowledgment.
type BatchItems ¶
type BatchItems []*BatchItem
BatchItems is a slice of batch items.
func (BatchItems) MoveToDlqAll ¶
func (bs BatchItems) MoveToDlqAll(err error)
MoveToDlqAll sets all items to be moved to the DLQ.
func (BatchItems) RetryAll ¶
func (bs BatchItems) RetryAll(err error)
RetryAll sets all items to be retried.
type ConsumerMetricStorage ¶
type ConsumerMetricStorage interface {
ObserveConsumeDuration(exchange string, routingKey string, t time.Duration)
ObserveConsumeMsgSize(exchange string, routingKey string, size int)
IncDlqCount(exchange string, routingKey string)
IncSuccessCount(exchange string, routingKey string)
IncRetryCount(exchange string, routingKey string)
}
ConsumerMetricStorage defines an interface for consumer metrics storage.
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler accumulates messages and processes them in batches. It triggers processing when the batch reaches maxSize or when the purgeInterval elapses.
func New ¶
func New(adapter SyncHandlerAdapter, purgeInterval time.Duration, maxSize int) *Handler
New creates a new batch handler with the specified adapter, purge interval, and max batch size.
type Middleware ¶
type Middleware func(next SyncHandlerAdapter) SyncHandlerAdapter
Middleware is a function that wraps a SyncHandlerAdapter.
func Log ¶
func Log(logger log.Logger) Middleware
Log creates a middleware that logs batch message processing results with appropriate log levels based on the outcome (Ack, Retry, or MoveToDlq).
func Metrics ¶
func Metrics(metricStorage ConsumerMetricStorage) Middleware
Metrics creates a middleware that collects consumer metrics for batch processing, including message processing duration, message size, and result counts (success, retry, DLQ).
func Recovery ¶
func Recovery(logger log.Logger) Middleware
Recovery creates a middleware that recovers from panics during batch message processing. On panic, the error is logged but message acknowledgment state remains unchanged.
type Result ¶
type Result struct {
// Ack indicates the message should be acknowledged (successfully processed).
Ack bool
// Retry indicates the message should be retried using the retry policy.
Retry bool
// MoveToDlq indicates the message should be moved to the dead letter queue.
MoveToDlq bool
// Err contains the error that occurred during processing.
Err error
}
Result represents the outcome of batch message processing.
type Sync ¶
type Sync struct {
// contains filtered or unexported fields
}
Sync wraps a handler with middleware and manages batch message acknowledgment.
func NewSync ¶
func NewSync(logger log.Logger, adapter SyncHandlerAdapter, middlewares ...Middleware) Sync
NewSync creates a new Sync handler with the specified logger, adapter, and middleware. Middleware functions are applied in reverse order (last to first).
func (Sync) Handle ¶
func (r Sync) Handle(batch BatchItems)
Handle processes a batch of messages and performs the appropriate action for each message based on its Result (Ack, Retry, or MoveToDlq).
type SyncHandlerAdapter ¶
type SyncHandlerAdapter interface {
Handle(batch BatchItems)
}
SyncHandlerAdapter defines the interface for synchronous batch message handlers.
type SyncHandlerAdapterFunc ¶
type SyncHandlerAdapterFunc func(batch BatchItems)
SyncHandlerAdapterFunc is an adapter that allows using functions as SyncHandlerAdapters.
func (SyncHandlerAdapterFunc) Handle ¶
func (a SyncHandlerAdapterFunc) Handle(batch BatchItems)
Handle handles a batch of messages using the function.