Documentation
¶
Overview ¶
Package kafka provides a Kafka-backed implementation of source.Source via the franz-go client. Suitable for both Apache Kafka and Amazon MSK.
Semantics: at-least-once with manual offset marking. Each record's Ack callback marks the underlying Kafka record for commit; AutoCommitMarks then periodically advances the committed offset. Records that are not Ack'd before consumer-group rebalance or process death are re-delivered to the next consumer — pipelines must dedup by EventID.
EventID is "<topic>:<partition>:<offset>", globally unique within a Kafka cluster.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config[T any] struct { // Brokers is the seed broker list, e.g. {"localhost:9092"} or MSK bootstrap servers. Brokers []string // Topic is the Kafka topic to consume from. Topic string // ConsumerGroup is the Kafka consumer-group ID. All workers sharing this ID // cooperatively partition the topic between them. ConsumerGroup string // Decode converts a raw message value to T. Use JSONDecoder[T]() for JSON-encoded // records, or supply your own for Avro / Protobuf / etc. Decode Decoder[T] // OnDecodeError, if non-nil, is called for every message whose Decode returned // an error. The default behavior is to drop the record silently and advance — // fine for development but dangerous in production. Wire this to a DLQ producer // or a metrics.Recorder.RecordError to surface poison pills. OnDecodeError func(raw []byte, partition int32, offset int64, err error) // EventID, if non-nil, is called on every decoded record to compute the // at-least-once dedup key. The default is "<topic>:<partition>:<offset>" // — globally unique within the cluster's history but only deduplicates // Kafka-side redeliveries (rebalance after a crash before commit). For // CDC pipelines where the same logical change can be produced twice // (Debezium retransmits, dual-write fixers), set this to extract the // upstream identifier (e.g. Mongo `_id`, Postgres LSN) from the record. EventID func(T) string // OnFetchError, if non-nil, is called for partition-level fetch errors that the // client is going to retry internally (broker bounce, leader change, etc). // Default: drop. Wire to logging / metrics to surface persistent failures. OnFetchError func(topic string, partition int32, err error) // Extra lets callers append additional franz-go options (TLS, SASL, etc). Extra []kgo.Opt }
Config configures a Kafka Source.
type Decoder ¶
Decoder converts a raw Kafka message value to a typed Record value.
func JSONDecoder ¶
JSONDecoder returns a Decoder that unmarshals JSON into T.
type Source ¶
type Source[T any] struct { // contains filtered or unexported fields }
Source reads from a Kafka topic and yields source.Records.
func NewSource ¶
NewSource constructs a Kafka Source. The returned Source owns the underlying franz-go client; call Close to shut down cleanly.