Documentation
¶
Index ¶
Constants ¶
View Source
const ( // DeliveryModeBatch batches messages before flushing writes. This is the // highest-throughput mode but can replay cross-table rows on partial failure. DeliveryModeBatch = "batch" // DeliveryModeMessage flushes writes per message. This is safer under // failures but may reduce throughput due to smaller write batches. DeliveryModeMessage = "message" )
Variables ¶
This section is empty.
Functions ¶
func NewBenthosStream ¶
func NewBenthosStream( log logrus.FieldLogger, logLevel string, kafkaConfig *KafkaConfig, metrics *telemetry.Metrics, routeEngine *chtransform.Engine, writer Writer, classifier WriteErrorClassifier, ) (*service.Stream, error)
NewBenthosStream creates a Benthos stream that consumes from Kafka and writes to ClickHouse via the custom xatu_clickhouse output plugin.
Types ¶
type KafkaConfig ¶
type KafkaConfig struct {
// Brokers is a list of Kafka broker addresses.
Brokers []string `yaml:"brokers"`
// Topics is a list of topic patterns to subscribe to (supports regex).
Topics []string `yaml:"topics"`
// ConsumerGroup is the Kafka consumer group ID.
ConsumerGroup string `yaml:"consumerGroup"`
// Encoding is the message encoding format ("json" or "protobuf").
Encoding string `yaml:"encoding" default:"json"`
// TLS enables TLS for the Kafka connection.
TLS bool `yaml:"tls" default:"false"`
// SASLConfig is the SASL authentication configuration.
SASLConfig *SASLConfig `yaml:"sasl"`
// FetchMinBytes is the minimum number of bytes to fetch per request.
FetchMinBytes int32 `yaml:"fetchMinBytes" default:"1"`
// FetchWaitMaxMs is the maximum time to wait for fetch responses.
FetchWaitMaxMs int `yaml:"fetchWaitMaxMs" default:"500"`
// MaxPartitionFetchBytes is the max bytes per partition per request.
MaxPartitionFetchBytes int32 `yaml:"maxPartitionFetchBytes" default:"10485760"`
// SessionTimeoutMs is the consumer group session timeout.
SessionTimeoutMs int `yaml:"sessionTimeoutMs" default:"30000"`
// HeartbeatIntervalMs is the consumer group heartbeat interval.
HeartbeatIntervalMs int `yaml:"heartbeatIntervalMs" default:"3000"`
// OffsetDefault controls where to start consuming when no offset exists.
// Valid values: "newest" or "oldest".
OffsetDefault string `yaml:"offsetDefault" default:"oldest"`
// CommitInterval controls Kafka offset commit cadence for kafka_franz.
CommitInterval time.Duration `yaml:"commitInterval" default:"5s"`
// DeliveryMode controls write boundary behavior.
// "batch" writes whole Benthos batches together.
// "message" flushes each message independently.
DeliveryMode string `yaml:"deliveryMode" default:"batch"`
// RejectedTopic is an optional Kafka topic where permanently rejected
// messages are emitted as JSON envelopes.
RejectedTopic string `yaml:"rejectedTopic"`
}
KafkaConfig configures the Kafka consumer.
func (*KafkaConfig) Validate ¶
func (c *KafkaConfig) Validate() error
Validate checks the Kafka configuration for errors.
type SASLConfig ¶
type SASLConfig struct {
// Mechanism is the SASL mechanism to use.
Mechanism string `yaml:"mechanism" default:"PLAIN"`
// User is the SASL username.
User string `yaml:"user"`
// Password is the SASL password.
Password string `yaml:"password"`
// PasswordFile is the path to a file containing the SASL password.
PasswordFile string `yaml:"passwordFile"`
}
SASLConfig configures SASL authentication for Kafka.
func (*SASLConfig) Validate ¶
func (c *SASLConfig) Validate() error
Validate checks the SASL configuration for errors.
type WriteErrorClassifier ¶
WriteErrorClassifier classifies sink write errors for source-level retry and reject handling.
type Writer ¶
type Writer interface {
Start(ctx context.Context) error
Stop(ctx context.Context) error
Write(table string, event *xatu.DecoratedEvent, meta *metadata.CommonMetadata)
// FlushAll forces all table writers to drain their buffers and write
// to ClickHouse synchronously. Returns the first error encountered.
// On failure, unflushed events are preserved in the table writers for
// retry on the next cycle.
FlushAll(ctx context.Context) error
}
Writer writes flattened rows to ClickHouse.
Click to show internal directories.
Click to hide internal directories.