Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var (
ErrDeliveryAlreadyHandled = errors.New("delivery already handled")
)
Functions ¶
This section is empty.
Types ¶
type Delivery ¶
type Delivery struct {
// contains filtered or unexported fields
}
func NewDelivery ¶
func (*Delivery) ConsumerGroupId ¶
type HandlerFunc ¶
type LogObserver ¶
type LogObserver struct {
NoopObserver
// contains filtered or unexported fields
}
nolint:containedctx
func NewLogObserver ¶
func NewLogObserver(ctx context.Context, logger log.Logger) LogObserver
func (LogObserver) BeginConsuming ¶
func (l LogObserver) BeginConsuming()
func (LogObserver) CloseDone ¶
func (l LogObserver) CloseDone()
func (LogObserver) CloseStart ¶
func (l LogObserver) CloseStart()
func (LogObserver) ConsumerError ¶
func (l LogObserver) ConsumerError(err error)
type MetricStorage ¶ added in v1.47.0
type MetricStorage interface {
ObserveConsumerDials(dials int64)
ObserveConsumerFetches(fetches int64)
ObserveConsumerMessages(messages int64)
ObserveConsumerMessageBytes(messageBytes int64)
ObserveConsumerRebalances(rebalances int64)
ObserveConsumerTimeouts(timeouts int64)
ObserveConsumerError(errors int64)
ObserveConsumerDialTime(dialTime kafka.DurationStats)
ObserveConsumerReadTime(readTime kafka.DurationStats)
ObserveConsumerWaitTime(waitTime kafka.DurationStats)
ObserveConsumerFetchSize(fetchSize kafka.SummaryStats)
ObserveConsumerFetchBytes(fetchBytes kafka.SummaryStats)
ObserveConsumerOffset(offset int64)
ObserveConsumerLag(lag int64)
ObserveConsumerQueueLength(queueLength int64)
ObserveConsumerQueueCapacity(queueCapacity int64)
}
nolint:interfacebloat
type Metrics ¶ added in v1.47.0
type Metrics struct {
// contains filtered or unexported fields
}
func NewMetrics ¶ added in v1.47.0
func (*Metrics) Send ¶ added in v1.47.0
func (m *Metrics) Send(stats kafka.ReaderStats)
type Middleware ¶
type NoopObserver ¶
type NoopObserver struct{}
func (NoopObserver) BeginConsuming ¶
func (n NoopObserver) BeginConsuming()
func (NoopObserver) CloseDone ¶
func (n NoopObserver) CloseDone()
func (NoopObserver) CloseStart ¶
func (n NoopObserver) CloseStart()
func (NoopObserver) ConsumerError ¶
func (n NoopObserver) ConsumerError(err error)
type Observer ¶
type Observer interface {
ConsumerError(err error)
BeginConsuming()
CloseStart()
CloseDone()
}
type Option ¶
type Option func(p *Consumer)
func WithMiddlewares ¶
func WithMiddlewares(mws ...Middleware) Option
func WithObserver ¶
Click to show internal directories.
Click to hide internal directories.