Documentation
¶
Index ¶
- Constants
- func InitSaramaConfig(config *ProducerConfig, maxExportBatchSize int) (*sarama.Config, error)
- func NewSyncProducer(config *ProducerConfig, maxExportBatchSize int) (sarama.SyncProducer, error)
- type CompressionStrategy
- type Config
- type EncodingType
- type ItemExporter
- type Kafka
- func (h *Kafka) HandleNewDecoratedEvent(ctx context.Context, event *xatu.DecoratedEvent) error
- func (h *Kafka) HandleNewDecoratedEvents(ctx context.Context, events []*xatu.DecoratedEvent) error
- func (h *Kafka) Name() string
- func (h *Kafka) Start(ctx context.Context) error
- func (h *Kafka) Stop(ctx context.Context) error
- func (h *Kafka) Type() string
- type PartitionStrategy
- type ProducerConfig
- type RequiredAcks
- type SASLConfig
- type SASLMechanism
- type TLSClientConfig
Constants ¶
const SinkType = "kafka"
Variables ¶
This section is empty.
Functions ¶
func InitSaramaConfig ¶ added in v1.8.10
func InitSaramaConfig(config *ProducerConfig, maxExportBatchSize int) (*sarama.Config, error)
InitSaramaConfig builds a *sarama.Config from ProducerConfig. maxExportBatchSize sets Producer.Flush.Messages so that Sarama's internal flush threshold stays aligned with the BatchItemProcessor batch size.
func NewSyncProducer ¶
func NewSyncProducer(config *ProducerConfig, maxExportBatchSize int) (sarama.SyncProducer, error)
NewSyncProducer creates a new Sarama SyncProducer from ProducerConfig. maxExportBatchSize is used to set the Sarama flush message threshold so that both layers flush at the same boundary.
Types ¶
type CompressionStrategy ¶
type CompressionStrategy string
CompressionStrategy defines the compression codec for Kafka messages.
var ( CompressionStrategyNone CompressionStrategy = "none" CompressionStrategyGZIP CompressionStrategy = "gzip" CompressionStrategySnappy CompressionStrategy = "snappy" CompressionStrategyLZ4 CompressionStrategy = "lz4" CompressionStrategyZSTD CompressionStrategy = "zstd" )
type Config ¶
type Config struct {
ProducerConfig `yaml:",inline"`
Topic string `yaml:"topic"`
TopicPattern string `yaml:"topicPattern"`
MaxQueueSize int `yaml:"maxQueueSize" default:"51200"`
BatchTimeout time.Duration `yaml:"batchTimeout" default:"5s"`
MaxExportBatchSize int `yaml:"maxExportBatchSize" default:"2048"`
Workers int `yaml:"workers" default:"5"`
}
Config is the top-level configuration for the kafka output sink. It embeds ProducerConfig for Sarama producer settings and adds sink-specific fields like Topic/TopicPattern and batching parameters.
Exactly one of Topic or TopicPattern must be set. TopicPattern supports template variables ${EVENT_NAME} (SCREAMING_SNAKE) and ${event-name} (kebab).
type EncodingType ¶ added in v1.8.10
type EncodingType string
EncodingType defines the serialization format for Kafka message values.
var ( EncodingTypeJSON EncodingType = "json" EncodingTypeProtobuf EncodingType = "protobuf" )
type ItemExporter ¶
type ItemExporter struct {
// contains filtered or unexported fields
}
ItemExporter sends batches of events to Kafka topics. It supports both a static Topic and a dynamic TopicPattern that resolves per event.
func NewItemExporter ¶
func NewItemExporter( name string, config *Config, log logrus.FieldLogger, producer sarama.SyncProducer, ) ItemExporter
NewItemExporter creates a new ItemExporter using the provided SyncProducer. The caller is responsible for creating the producer (see NewSyncProducer).
func (ItemExporter) ExportItems ¶
func (e ItemExporter) ExportItems(ctx context.Context, items []*xatu.DecoratedEvent) error
ExportItems sends a batch of decorated events to Kafka.
type Kafka ¶
type Kafka struct {
// contains filtered or unexported fields
}
Kafka is a sink that writes decorated events to Apache Kafka.
func New ¶
func New(name string, config *Config, log logrus.FieldLogger, filterConfig *xatu.EventFilterConfig, shippingMethod processor.ShippingMethod) (*Kafka, error)
New creates a new Kafka sink. It builds the Sarama SyncProducer and wires it into the ItemExporter used by the batch processor.
func (*Kafka) HandleNewDecoratedEvent ¶
HandleNewDecoratedEvent processes a single event through the filter and batch processor.
func (*Kafka) HandleNewDecoratedEvents ¶
HandleNewDecoratedEvents processes a batch of events through the filter and batch processor.
type PartitionStrategy ¶
type PartitionStrategy string
PartitionStrategy defines the partitioning strategy for produced messages.
var ( PartitionStrategyNone PartitionStrategy = "none" PartitionStrategyRandom PartitionStrategy = "random" )
type ProducerConfig ¶ added in v1.8.10
type ProducerConfig struct {
Brokers string `yaml:"brokers"`
TLS bool `yaml:"tls" default:"false"`
TLSClientConfig *TLSClientConfig `yaml:"tlsClientConfig"`
SASLConfig *SASLConfig `yaml:"sasl"`
FlushFrequency time.Duration `yaml:"flushFrequency" default:"10s"`
FlushBytes int `yaml:"flushBytes" default:"1000000"`
// MaxMessageBytes is the maximum permitted size of a message produced
// by this client. Must be set <= the broker's message.max.bytes. Default
// 1000000 matches sarama's historical default; raise this when publishing
// large payloads (e.g. blob-tx mempool events with sidecars).
MaxMessageBytes int `yaml:"maxMessageBytes" default:"1000000"`
MaxRetries int `yaml:"maxRetries" default:"3"`
Compression CompressionStrategy `yaml:"compression" default:"none"`
RequiredAcks RequiredAcks `yaml:"requiredAcks" default:"leader"`
Partitioning PartitionStrategy `yaml:"partitioning" default:"none"`
Encoding EncodingType `yaml:"encoding" default:"json"`
Version string `yaml:"version"`
}
ProducerConfig contains the fields needed to build a Sarama producer. It is shared by all Kafka-based sinks.
func (*ProducerConfig) MarshalEvent ¶ added in v1.8.10
func (c *ProducerConfig) MarshalEvent(m proto.Message) ([]byte, error)
MarshalEvent serializes a proto message using the configured encoding.
func (*ProducerConfig) Validate ¶ added in v1.8.10
func (c *ProducerConfig) Validate() error
Validate checks the ProducerConfig for correctness.
type RequiredAcks ¶
type RequiredAcks string
RequiredAcks defines the acknowledgment level for produced messages.
var ( RequiredAcksLeader RequiredAcks = "leader" RequiredAcksAll RequiredAcks = "all" RequiredAcksNone RequiredAcks = "none" )
type SASLConfig ¶ added in v1.1.17
type SASLConfig struct {
Mechanism SASLMechanism `yaml:"mechanism" default:"PLAIN"`
Version int16 `yaml:"version" default:"1"`
User string `yaml:"user"`
Password string `yaml:"password"`
PasswordFile string `yaml:"passwordFile"`
}
SASLConfig holds SASL authentication configuration.
func (*SASLConfig) Validate ¶ added in v1.1.17
func (c *SASLConfig) Validate() error
Validate checks the SASLConfig for correctness.
type SASLMechanism ¶ added in v1.1.17
type SASLMechanism string
SASLMechanism defines the SASL authentication mechanism.
var ( SASLTypeOAuth SASLMechanism = "OAUTHBEARER" SASLTypePlaintext SASLMechanism = "PLAIN" SASLTypeSCRAMSHA256 SASLMechanism = "SCRAM-SHA-256" SASLTypeSCRAMSHA512 SASLMechanism = "SCRAM-SHA-512" SASLTypeGSSAPI SASLMechanism = "GSSAPI" )
type TLSClientConfig ¶ added in v1.0.12
type TLSClientConfig struct {
CertificatePath string `yaml:"certificatePath"`
KeyPath string `yaml:"keyPath"`
CACertificate string `yaml:"caCertificate"`
}
TLSClientConfig holds mTLS client certificate configuration.
func (*TLSClientConfig) Validate ¶ added in v1.0.12
func (c *TLSClientConfig) Validate() error
Validate checks the TLSClientConfig for correctness.