kafka

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package kafka provides a Kafka-backed implementation of source.Source via the franz-go client. Suitable for both Apache Kafka and Amazon MSK.

Semantics: at-least-once with manual offset marking. Each record's Ack callback marks the underlying Kafka record for commit; AutoCommitMarks then periodically advances the committed offset. Records that are not Ack'd before consumer-group rebalance or process death are re-delivered to the next consumer — pipelines must dedup by EventID.

EventID is "<topic>:<partition>:<offset>", globally unique within a Kafka cluster.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config[T any] struct {
	// Brokers is the seed broker list, e.g. {"localhost:9092"} or MSK bootstrap servers.
	Brokers []string

	// Topic is the Kafka topic to consume from.
	Topic string

	// ConsumerGroup is the Kafka consumer-group ID. All workers sharing this ID
	// cooperatively partition the topic between them.
	ConsumerGroup string

	// Decode converts a raw message value to T. Use JSONDecoder[T]() for JSON-encoded
	// records, or supply your own for Avro / Protobuf / etc.
	Decode Decoder[T]

	// OnDecodeError, if non-nil, is called for every message whose Decode returned
	// an error. The default behavior is to drop the record silently and advance —
	// fine for development but dangerous in production. Wire this to a DLQ producer
	// or a metrics.Recorder.RecordError to surface poison pills.
	OnDecodeError func(raw []byte, partition int32, offset int64, err error)

	// EventID, if non-nil, is called on every decoded record to compute the
	// at-least-once dedup key. The default is "<topic>:<partition>:<offset>"
	// — globally unique within the cluster's history but only deduplicates
	// Kafka-side redeliveries (rebalance after a crash before commit). For
	// CDC pipelines where the same logical change can be produced twice
	// (Debezium retransmits, dual-write fixers), set this to extract the
	// upstream identifier (e.g. Mongo `_id`, Postgres LSN) from the record.
	EventID func(T) string

	// OnFetchError, if non-nil, is called for partition-level fetch errors that the
	// client is going to retry internally (broker bounce, leader change, etc).
	// Default: drop. Wire to logging / metrics to surface persistent failures.
	OnFetchError func(topic string, partition int32, err error)

	// Extra lets callers append additional franz-go options (TLS, SASL, etc).
	Extra []kgo.Opt
}

Config configures a Kafka Source.

type Decoder

type Decoder[T any] func([]byte) (T, error)

Decoder converts a raw Kafka message value to a typed Record value.

func JSONDecoder

func JSONDecoder[T any]() Decoder[T]

JSONDecoder returns a Decoder that unmarshals JSON into T.

type Source

type Source[T any] struct {
	// contains filtered or unexported fields
}

Source reads from a Kafka topic and yields source.Records.

func NewSource

func NewSource[T any](cfg Config[T]) (*Source[T], error)

NewSource constructs a Kafka Source. The returned Source owns the underlying franz-go client; call Close to shut down cleanly.

func (*Source[T]) Close

func (s *Source[T]) Close() error

Close commits any outstanding marked offsets and shuts down the client.

func (*Source[T]) Name

func (s *Source[T]) Name() string

Name returns "kafka:<topic>".

func (*Source[T]) Read

func (s *Source[T]) Read(ctx context.Context, out chan<- source.Record[T]) error

Read polls the consumer group and yields decoded records into out until ctx is canceled. Returns nil on graceful shutdown; non-nil only on a fatal client error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL