kafka

package
v1.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2026 License: GPL-3.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const SinkType = "kafka"

Variables

This section is empty.

Functions

func InitSaramaConfig added in v1.8.10

func InitSaramaConfig(config *ProducerConfig, maxExportBatchSize int) (*sarama.Config, error)

InitSaramaConfig builds a *sarama.Config from ProducerConfig. maxExportBatchSize sets Producer.Flush.Messages so that Sarama's internal flush threshold stays aligned with the BatchItemProcessor batch size.

func NewSyncProducer

func NewSyncProducer(config *ProducerConfig, maxExportBatchSize int) (sarama.SyncProducer, error)

NewSyncProducer creates a new Sarama SyncProducer from ProducerConfig. maxExportBatchSize is used to set the Sarama flush message threshold so that both layers flush at the same boundary.

Types

type CompressionStrategy

type CompressionStrategy string

CompressionStrategy defines the compression codec for Kafka messages.

var (
	CompressionStrategyNone   CompressionStrategy = "none"
	CompressionStrategyGZIP   CompressionStrategy = "gzip"
	CompressionStrategySnappy CompressionStrategy = "snappy"
	CompressionStrategyLZ4    CompressionStrategy = "lz4"
	CompressionStrategyZSTD   CompressionStrategy = "zstd"
)

type Config

type Config struct {
	ProducerConfig     `yaml:",inline"`
	Topic              string        `yaml:"topic"`
	TopicPattern       string        `yaml:"topicPattern"`
	MaxQueueSize       int           `yaml:"maxQueueSize" default:"51200"`
	BatchTimeout       time.Duration `yaml:"batchTimeout" default:"5s"`
	MaxExportBatchSize int           `yaml:"maxExportBatchSize" default:"2048"`
	Workers            int           `yaml:"workers" default:"5"`
}

Config is the top-level configuration for the kafka output sink. It embeds ProducerConfig for Sarama producer settings and adds sink-specific fields like Topic/TopicPattern and batching parameters.

Exactly one of Topic or TopicPattern must be set. TopicPattern supports template variables ${EVENT_NAME} (SCREAMING_SNAKE) and ${event-name} (kebab).

func (*Config) Validate

func (c *Config) Validate() error

Validate checks the Config for correctness.

type EncodingType added in v1.8.10

type EncodingType string

EncodingType defines the serialization format for Kafka message values.

var (
	EncodingTypeJSON     EncodingType = "json"
	EncodingTypeProtobuf EncodingType = "protobuf"
)

type ItemExporter

type ItemExporter struct {
	// contains filtered or unexported fields
}

ItemExporter sends batches of events to Kafka topics. It supports both a static Topic and a dynamic TopicPattern that resolves per event.

func NewItemExporter

func NewItemExporter(
	name string,
	config *Config,
	log logrus.FieldLogger,
	producer sarama.SyncProducer,
) ItemExporter

NewItemExporter creates a new ItemExporter using the provided SyncProducer. The caller is responsible for creating the producer (see NewSyncProducer).

func (ItemExporter) ExportItems

func (e ItemExporter) ExportItems(ctx context.Context, items []*xatu.DecoratedEvent) error

ExportItems sends a batch of decorated events to Kafka.

func (ItemExporter) Shutdown

func (e ItemExporter) Shutdown(_ context.Context) error

Shutdown closes the underlying Kafka producer to drain inflight messages.

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

Kafka is a sink that writes decorated events to Apache Kafka.

func New

func New(name string, config *Config, log logrus.FieldLogger, filterConfig *xatu.EventFilterConfig, shippingMethod processor.ShippingMethod) (*Kafka, error)

New creates a new Kafka sink. It builds the Sarama SyncProducer and wires it into the ItemExporter used by the batch processor.

func (*Kafka) HandleNewDecoratedEvent

func (h *Kafka) HandleNewDecoratedEvent(ctx context.Context, event *xatu.DecoratedEvent) error

HandleNewDecoratedEvent processes a single event through the filter and batch processor.

func (*Kafka) HandleNewDecoratedEvents

func (h *Kafka) HandleNewDecoratedEvents(ctx context.Context, events []*xatu.DecoratedEvent) error

HandleNewDecoratedEvents processes a batch of events through the filter and batch processor.

func (*Kafka) Name

func (h *Kafka) Name() string

Name returns the configured name of this sink.

func (*Kafka) Start

func (h *Kafka) Start(ctx context.Context) error

Start begins the batch processor.

func (*Kafka) Stop

func (h *Kafka) Stop(ctx context.Context) error

Stop shuts down the batch processor.

func (*Kafka) Type

func (h *Kafka) Type() string

Type returns the sink type identifier.

type PartitionStrategy

type PartitionStrategy string

PartitionStrategy defines the partitioning strategy for produced messages.

var (
	PartitionStrategyNone   PartitionStrategy = "none"
	PartitionStrategyRandom PartitionStrategy = "random"
)

type ProducerConfig added in v1.8.10

type ProducerConfig struct {
	Brokers         string           `yaml:"brokers"`
	TLS             bool             `yaml:"tls" default:"false"`
	TLSClientConfig *TLSClientConfig `yaml:"tlsClientConfig"`
	SASLConfig      *SASLConfig      `yaml:"sasl"`
	FlushFrequency  time.Duration    `yaml:"flushFrequency" default:"10s"`
	FlushBytes      int              `yaml:"flushBytes" default:"1000000"`
	// MaxMessageBytes is the maximum permitted size of a message produced
	// by this client. Must be set <= the broker's message.max.bytes. Default
	// 1000000 matches sarama's historical default; raise this when publishing
	// large payloads (e.g. blob-tx mempool events with sidecars).
	MaxMessageBytes int                 `yaml:"maxMessageBytes" default:"1000000"`
	MaxRetries      int                 `yaml:"maxRetries" default:"3"`
	Compression     CompressionStrategy `yaml:"compression" default:"none"`
	RequiredAcks    RequiredAcks        `yaml:"requiredAcks" default:"leader"`
	Partitioning    PartitionStrategy   `yaml:"partitioning" default:"none"`
	Encoding        EncodingType        `yaml:"encoding" default:"json"`
	Version         string              `yaml:"version"`
}

ProducerConfig contains the fields needed to build a Sarama producer. It is shared by all Kafka-based sinks.

func (*ProducerConfig) MarshalEvent added in v1.8.10

func (c *ProducerConfig) MarshalEvent(m proto.Message) ([]byte, error)

MarshalEvent serializes a proto message using the configured encoding.

func (*ProducerConfig) Validate added in v1.8.10

func (c *ProducerConfig) Validate() error

Validate checks the ProducerConfig for correctness.

type RequiredAcks

type RequiredAcks string

RequiredAcks defines the acknowledgment level for produced messages.

var (
	RequiredAcksLeader RequiredAcks = "leader"
	RequiredAcksAll    RequiredAcks = "all"
	RequiredAcksNone   RequiredAcks = "none"
)

type SASLConfig added in v1.1.17

type SASLConfig struct {
	Mechanism    SASLMechanism `yaml:"mechanism" default:"PLAIN"`
	Version      int16         `yaml:"version" default:"1"`
	User         string        `yaml:"user"`
	Password     string        `yaml:"password"`
	PasswordFile string        `yaml:"passwordFile"`
}

SASLConfig holds SASL authentication configuration.

func (*SASLConfig) Validate added in v1.1.17

func (c *SASLConfig) Validate() error

Validate checks the SASLConfig for correctness.

type SASLMechanism added in v1.1.17

type SASLMechanism string

SASLMechanism defines the SASL authentication mechanism.

var (
	SASLTypeOAuth       SASLMechanism = "OAUTHBEARER"
	SASLTypePlaintext   SASLMechanism = "PLAIN"
	SASLTypeSCRAMSHA256 SASLMechanism = "SCRAM-SHA-256"
	SASLTypeSCRAMSHA512 SASLMechanism = "SCRAM-SHA-512"
	SASLTypeGSSAPI      SASLMechanism = "GSSAPI"
)

type TLSClientConfig added in v1.0.12

type TLSClientConfig struct {
	CertificatePath string `yaml:"certificatePath"`
	KeyPath         string `yaml:"keyPath"`
	CACertificate   string `yaml:"caCertificate"`
}

TLSClientConfig holds mTLS client certificate configuration.

func (*TLSClientConfig) Validate added in v1.0.12

func (c *TLSClientConfig) Validate() error

Validate checks the TLSClientConfig for correctness.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL