pkg

package
v1.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2025 License: BSD-2-Clause Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MatchesFilter added in v1.3.0

func MatchesFilter(msg *sarama.ConsumerMessage, filter []byte) bool

MatchesFilter checks if a Kafka message matches the given filter bytes. If filter is empty, all messages match. Otherwise, performs exact byte substring search in the raw binary message value.

func NewHandler

func NewHandler(
	changesProvider ChangesProvider,
) libhttp.WithError

Types

type ChangesProvider

type ChangesProvider interface {
	Changes(
		ctx context.Context,
		topic libkafka.Topic,
		partition libkafka.Partition,
		offset libkafka.Offset,
		limit uint64,
		filter []byte,
	) (Records, error)
}

func NewChangesProvider

func NewChangesProvider(
	sentryClient sentry.Client,
	saramaClient libkafka.SaramaClient,
	converter Converter,
	logSamplerFactory log.SamplerFactory,
) ChangesProvider

type Converter

type Converter interface {
	Convert(ctx context.Context, msg *sarama.ConsumerMessage) (*Record, error)
}

func NewConverter

func NewConverter(errorPreviewContentLength int) Converter

type Page

type Page struct {
	NextOffset *libkafka.Offset `json:"nextOffset,omitempty"`
	Records    Records          `json:"records"`
}

type Record

type Record struct {
	Key       string             `json:"key"`
	Value     interface{}        `json:"value"`
	Offset    libkafka.Offset    `json:"offset"`
	Partition libkafka.Partition `json:"partition"`
	Topic     libkafka.Topic     `json:"topic"`
	Header    libkafka.Header    `json:"header"`
}

type Records

type Records []Record

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL