Documentation
¶
Overview ¶
Package kafka provides a Kafka-backed Publisher for the outbox dispatcher.
This package wraps github.com/segmentio/kafka-go and is the only place in the codebase that imports Kafka client code. All other packages depend only on the dispatch.Publisher interface.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type KafkaConfig ¶
type KafkaConfig struct {
// Brokers is the list of seed broker addresses in "host:port" form.
// At least one address is required.
Brokers []string
// ClientID is an optional string sent to the broker on every connection
// for observability (visible in broker logs and metrics). Applied to the
// kafka-go Transport.ClientID field.
ClientID string
// RequiredAcks controls the acknowledgement level:
// -1 = all in-sync replicas (strongest, default)
// 0 = no acknowledgement (fire and forget; not recommended)
// 1 = leader only
//
// For outbox delivery RequiredAcksAll (-1) is recommended because it
// provides the strongest durability guarantee before MarkPublished is called.
RequiredAcks RequiredAcks
// WriteTimeout bounds each WriteMessages call. Zero means no per-write
// timeout; context cancellation remains the only deadline in that case.
WriteTimeout time.Duration
}
KafkaConfig holds the broker-level configuration for KafkaPublisher.
func DefaultKafkaConfig ¶
func DefaultKafkaConfig() KafkaConfig
DefaultKafkaConfig returns production-safe defaults.
type KafkaPublisher ¶
type KafkaPublisher struct {
// contains filtered or unexported fields
}
KafkaPublisher implements dispatch.Publisher using segmentio/kafka-go.
Each Publish call writes one message to the configured Kafka topic using the writer's internal batching. The writer is created per-topic by kafka-go's transport layer; callers do not need to manage per-topic writers explicitly.
Publish blocks until the broker acknowledges receipt according to the RequiredAcks setting or until ctx is cancelled.
func NewKafkaPublisher ¶
func NewKafkaPublisher(cfg KafkaConfig) (*KafkaPublisher, error)
NewKafkaPublisher constructs a KafkaPublisher from cfg. cfg.Brokers must be non-empty; all other fields are optional.
Config field mapping to kafka-go v0.4.50:
- Brokers → kafka.Writer.Addr (via kafka.TCP)
- RequiredAcks → kafka.Writer.RequiredAcks
- WriteTimeout → kafka.Writer.WriteTimeout (applied when > 0)
- ClientID → kafka.Transport.ClientID (Transport is set on the Writer)
The returned publisher is safe for concurrent use. Call Close when the publisher is no longer needed to flush pending writes and release resources.
func (*KafkaPublisher) Close ¶
func (p *KafkaPublisher) Close() error
Close flushes any pending writes and closes the underlying Kafka connection. It should be called when the publisher is no longer needed.
func (*KafkaPublisher) Publish ¶
Publish sends msg to the Kafka topic named in msg.Topic. It blocks until the broker acknowledges receipt or ctx is cancelled.
A non-nil error means the message may or may not have been delivered. The dispatcher will leave the outbox row unpublished and retry on the next cycle.
type RequiredAcks ¶
type RequiredAcks int
RequiredAcks mirrors the kafka-go RequiredAcks type with named constants.
const ( // RequiredAcksNone instructs the broker to not acknowledge the write. // Messages may be lost on leader failure. Not recommended for outbox use. RequiredAcksNone RequiredAcks = 0 // RequiredAcksLeader instructs the broker to acknowledge after the leader // has written to its log. Provides reasonable durability without full ISR sync. RequiredAcksLeader RequiredAcks = 1 // RequiredAcksAll instructs the broker to acknowledge only after all // in-sync replicas have written the message. Strongest durability guarantee. RequiredAcksAll RequiredAcks = -1 )