source

package
v1.8.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2026 License: GPL-3.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DeliveryModeBatch batches messages before flushing writes. This is the
	// highest-throughput mode but can replay cross-table rows on partial failure.
	DeliveryModeBatch = "batch"
	// DeliveryModeMessage flushes writes per message. This is safer under
	// failures but may reduce throughput due to smaller write batches.
	DeliveryModeMessage = "message"
)

Variables

This section is empty.

Functions

func NewBenthosStream

func NewBenthosStream(
	log logrus.FieldLogger,
	logLevel string,
	kafkaConfig *KafkaConfig,
	metrics *telemetry.Metrics,
	routeEngine *chtransform.Engine,
	writer Writer,
	classifier WriteErrorClassifier,
) (*service.Stream, error)

NewBenthosStream creates a Benthos stream that consumes from Kafka and writes to ClickHouse via the custom xatu_clickhouse output plugin.

Types

type KafkaConfig

type KafkaConfig struct {
	// Brokers is a list of Kafka broker addresses.
	Brokers []string `yaml:"brokers"`
	// Topics is a list of topic patterns to subscribe to (supports regex).
	Topics []string `yaml:"topics"`
	// ConsumerGroup is the Kafka consumer group ID.
	ConsumerGroup string `yaml:"consumerGroup"`
	// Encoding is the message encoding format ("json" or "protobuf").
	Encoding string `yaml:"encoding" default:"json"`

	// TLS enables TLS for the Kafka connection.
	TLS bool `yaml:"tls" default:"false"`
	// SASLConfig is the SASL authentication configuration.
	SASLConfig *SASLConfig `yaml:"sasl"`

	// FetchMinBytes is the minimum number of bytes to fetch per request.
	FetchMinBytes int32 `yaml:"fetchMinBytes" default:"1"`
	// FetchWaitMaxMs is the maximum time to wait for fetch responses.
	FetchWaitMaxMs int `yaml:"fetchWaitMaxMs" default:"500"`
	// MaxPartitionFetchBytes is the max bytes per partition per request.
	MaxPartitionFetchBytes int32 `yaml:"maxPartitionFetchBytes" default:"10485760"`

	// SessionTimeoutMs is the consumer group session timeout.
	SessionTimeoutMs int `yaml:"sessionTimeoutMs" default:"30000"`
	// HeartbeatIntervalMs is the consumer group heartbeat interval.
	HeartbeatIntervalMs int `yaml:"heartbeatIntervalMs" default:"3000"`

	// OffsetDefault controls where to start consuming when no offset exists.
	// Valid values: "newest" or "oldest".
	OffsetDefault string `yaml:"offsetDefault" default:"oldest"`

	// CommitInterval controls Kafka offset commit cadence for kafka_franz.
	CommitInterval time.Duration `yaml:"commitInterval" default:"5s"`
	// DeliveryMode controls write boundary behavior.
	// "batch" writes whole Benthos batches together.
	// "message" flushes each message independently.
	DeliveryMode string `yaml:"deliveryMode" default:"batch"`
	// RejectedTopic is an optional Kafka topic where permanently rejected
	// messages are emitted as JSON envelopes.
	RejectedTopic string `yaml:"rejectedTopic"`
}

KafkaConfig configures the Kafka consumer.

func (*KafkaConfig) Validate

func (c *KafkaConfig) Validate() error

Validate checks the Kafka configuration for errors.

type SASLConfig

type SASLConfig struct {
	// Mechanism is the SASL mechanism to use.
	Mechanism string `yaml:"mechanism" default:"PLAIN"`
	// User is the SASL username.
	User string `yaml:"user"`
	// Password is the SASL password.
	Password string `yaml:"password"`
	// PasswordFile is the path to a file containing the SASL password.
	PasswordFile string `yaml:"passwordFile"`
}

SASLConfig configures SASL authentication for Kafka.

func (*SASLConfig) Validate

func (c *SASLConfig) Validate() error

Validate checks the SASL configuration for errors.

type WriteErrorClassifier

type WriteErrorClassifier interface {
	IsPermanent(err error) bool
	Table(err error) string
}

WriteErrorClassifier classifies sink write errors for source-level retry and reject handling.

type Writer

type Writer interface {
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
	Write(table string, event *xatu.DecoratedEvent, meta *metadata.CommonMetadata)
	// FlushAll forces all table writers to drain their buffers and write
	// to ClickHouse synchronously. Returns the first error encountered.
	// On failure, unflushed events are preserved in the table writers for
	// retry on the next cycle.
	FlushAll(ctx context.Context) error
}

Writer writes flattened rows to ClickHouse.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL