Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MatchesFilter ¶ added in v1.3.0
func MatchesFilter(msg *sarama.ConsumerMessage, filter []byte) bool
MatchesFilter checks if a Kafka message matches the given filter bytes. If filter is empty, all messages match. Otherwise, performs exact byte substring search in the raw binary message value.
func NewHandler ¶
func NewHandler( changesProvider ChangesProvider, ) libhttp.WithError
Types ¶
type ChangesProvider ¶
type ChangesProvider interface {
Changes(
ctx context.Context,
topic libkafka.Topic,
partition libkafka.Partition,
offset libkafka.Offset,
limit uint64,
filter []byte,
) (Records, error)
}
func NewChangesProvider ¶
func NewChangesProvider( sentryClient sentry.Client, saramaClient libkafka.SaramaClient, converter Converter, logSamplerFactory log.SamplerFactory, ) ChangesProvider
type Converter ¶
type Converter interface {
Convert(ctx context.Context, msg *sarama.ConsumerMessage) (*Record, error)
}
func NewConverter ¶
Click to show internal directories.
Click to hide internal directories.