Documentation
¶
Overview ¶
Package consumer provides a high-level abstraction for consuming messages from Apache Kafka topics. It wraps the franz-go client and supports concurrent message processing with middleware chains for cross-cutting concerns like logging, metrics, and request IDs.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrDeliveryAlreadyHandled is returned when attempting to commit a delivery // that has already been handled. ErrDeliveryAlreadyHandled = errors.New("delivery already handled") )
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer handles consuming messages from Kafka topics with configurable concurrency and middleware support. It manages offset committing and provides lifecycle hooks through the observer pattern.
func New ¶
func New(client *kgo.Client, consumerGroupId string, handler Handler, concurrency int, opts ...Option) *Consumer
New creates a new Consumer instance with the provided Kafka client, consumer group ID, handler, and concurrency level. Options can be used to configure middlewares and the observer.
func (*Consumer) Close ¶
Close gracefully shuts down the consumer, waits for pending message processing to complete, and releases the underlying Kafka client connection.
func (*Consumer) Healthcheck ¶
Healthcheck returns nil if the consumer is healthy and able to fetch messages, or an error if it has encountered issues.
type Delivery ¶
type Delivery struct {
// contains filtered or unexported fields
}
Delivery represents a consumed message from Kafka. It provides methods to access the message source, commit offsets, and signal completion.
func NewDelivery ¶
func NewDelivery(donner Donner, client *kgo.Client, source *kgo.Record, consumerGroupId string) *Delivery
NewDelivery creates a new Delivery instance with the provided configuration.
func (*Delivery) Commit ¶
Commit commits the message offset to Kafka and signals completion. Returns an error if the delivery has already been handled. Only one of Commit or Done should be called per delivery.
func (*Delivery) ConsumerGroupId ¶
ConsumerGroupId returns the consumer group ID associated with this delivery.
type Donner ¶
type Donner interface {
Done()
}
Donner defines an interface for signaling when message processing is complete.
type HandlerFunc ¶
HandlerFunc is an adapter that allows a function to be used as a Handler.
type LogObserver ¶
type LogObserver struct {
NoopObserver
// contains filtered or unexported fields
}
LogObserver is an Observer implementation that logs consumer lifecycle events to the provided logger.
func NewLogObserver ¶
func NewLogObserver(ctx context.Context, logger log.Logger) LogObserver
NewLogObserver creates a new LogObserver with the provided context and logger.
func (LogObserver) BeginConsuming ¶
func (l LogObserver) BeginConsuming()
BeginConsuming logs that the consumer has started processing messages.
func (LogObserver) CloseDone ¶
func (l LogObserver) CloseDone()
CloseDone logs that the consumer shutdown process has completed.
func (LogObserver) CloseStart ¶
func (l LogObserver) CloseStart()
CloseStart logs that the consumer shutdown process has begun.
func (LogObserver) ConsumerError ¶
func (l LogObserver) ConsumerError(err error)
ConsumerError logs an unexpected consumer error.
type Middleware ¶
Middleware is a function that wraps a Handler to add cross-cutting functionality such as logging, metrics, or request ID propagation.
type NoopObserver ¶
type NoopObserver struct{}
NoopObserver is a no-op implementation of the Observer interface that ignores all events.
func (NoopObserver) BeginConsuming ¶
func (n NoopObserver) BeginConsuming()
BeginConsuming does nothing.
func (NoopObserver) ConsumerError ¶
func (n NoopObserver) ConsumerError(err error)
ConsumerError does nothing.
type Observer ¶
type Observer interface {
ConsumerError(err error)
BeginConsuming()
CloseStart()
CloseDone()
}
Observer defines an interface for receiving lifecycle events from the consumer, such as error notifications and shutdown progress.
type Option ¶
type Option func(p *Consumer)
Option is a function that configures a Consumer instance.
func WithMiddlewares ¶
func WithMiddlewares(mws ...Middleware) Option
WithMiddlewares configures the consumer with the provided middlewares. Middlewares are applied in the order they are provided.
func WithObserver ¶
WithObserver configures the consumer with the provided observer for lifecycle event notifications.