source

package
v1.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2026 License: GPL-3.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SASLMechanismPLAIN       = "PLAIN"
	SASLMechanismSCRAMSHA256 = "SCRAM-SHA-256"
	SASLMechanismSCRAMSHA512 = "SCRAM-SHA-512"
	SASLMechanismOAUTHBEARER = "OAUTHBEARER"
)

Supported SASL mechanism values.

Variables

This section is empty.

Functions

func DiscoverTopics added in v1.8.12

func DiscoverTopics(
	ctx context.Context,
	cfg *KafkaConfig,
) ([]string, error)

DiscoverTopics queries Kafka metadata and returns topic names that match at least one of the configured regex patterns, sorted alphabetically.

func NewBenthosStream

func NewBenthosStream(
	log logrus.FieldLogger,
	logLevel string,
	kafkaConfig *KafkaConfig,
	metrics *telemetry.Metrics,
	routeEngine *router.Engine,
	writer Writer,
	ownsWriter bool,
	groupRetry GroupRetryConfig,
) (*service.Stream, error)

NewBenthosStream creates a Benthos stream that consumes from Kafka and writes to ClickHouse via the custom xatu_clickhouse output plugin. When ownsWriter is true the output plugin manages the writer lifecycle (Start/Stop). When false the caller is responsible for the writer lifecycle, which is the case when multiple streams share a single writer.

Types

type GroupRetryConfig added in v1.8.12

type GroupRetryConfig struct {
	MaxAttempts int
	BaseDelay   time.Duration
	MaxDelay    time.Duration
}

GroupRetryConfig configures group-level retry behavior for partial table failures in processGroup. When a fanout flush partially fails (some tables succeed, others fail transiently), only the failed tables are retried with exponential backoff to limit write amplification.

type KafkaConfig

type KafkaConfig struct {
	// Brokers is a list of Kafka broker addresses.
	Brokers []string `yaml:"brokers"`
	// Topics is a list of topic patterns to subscribe to (supports regex).
	Topics []string `yaml:"topics"`
	// ConsumerGroup is the Kafka consumer group ID.
	ConsumerGroup string `yaml:"consumerGroup"`
	// ClientID is the Kafka client ID sent to brokers. When set, all per-topic
	// streams share this value, which allows broker-side quota enforcement
	// (quota.consumer.default) to apply as a single budget across all streams.
	// If empty, franz-go generates a unique ID per stream.
	ClientID string `yaml:"clientId"`
	// Encoding is the message encoding format ("json" or "protobuf").
	Encoding string `yaml:"encoding" default:"json"`

	// TLS configures TLS for the Kafka connection.
	TLS xtls.Config `yaml:"tls"`
	// SASLConfig is the SASL authentication configuration.
	SASLConfig *SASLConfig `yaml:"sasl"`

	// FetchMinBytes is the minimum number of bytes to fetch per request.
	FetchMinBytes int32 `yaml:"fetchMinBytes" default:"1"`
	// FetchWaitMaxMs is the maximum time to wait for fetch responses.
	FetchWaitMaxMs int `yaml:"fetchWaitMaxMs" default:"250"`
	// MaxPartitionFetchBytes is the max bytes per partition per request.
	MaxPartitionFetchBytes int32 `yaml:"maxPartitionFetchBytes" default:"3145728"`
	// FetchMaxBytes is the max total bytes per fetch request across all
	// partitions from a single broker. With many independent consumers
	// (one per topic) this is the primary lever for capping in-flight
	// memory from Kafka fetch buffers. Default: 10 MiB.
	FetchMaxBytes int32 `yaml:"fetchMaxBytes" default:"10485760"`

	// SessionTimeoutMs is the consumer group session timeout.
	SessionTimeoutMs int `yaml:"sessionTimeoutMs" default:"30000"`
	// RebalanceTimeout is the maximum time group members are allowed to
	// take when a rebalance has begun (finish work, commit offsets, rejoin).
	// Lower values speed up partition reassignment when scaling. Default: 15s.
	RebalanceTimeout time.Duration `yaml:"rebalanceTimeout" default:"15s"`

	// OffsetDefault controls where to start consuming when no offset exists.
	// Valid values: "earliest" or "latest".
	OffsetDefault string `yaml:"offsetDefault" default:"earliest"`

	// CommitInterval controls Kafka offset commit cadence for kafka_franz.
	CommitInterval time.Duration `yaml:"commitInterval" default:"5s"`
	// ShutdownTimeout is the maximum time the Benthos stream waits for
	// in-flight messages to complete during graceful shutdown.
	ShutdownTimeout time.Duration `yaml:"shutdownTimeout" default:"30s"`
	// RejectedTopic is an optional Kafka topic where permanently rejected
	// messages are emitted as JSON envelopes.
	RejectedTopic string `yaml:"rejectedTopic"`

	// TopicRefreshInterval controls how often Kafka metadata is refreshed to
	// discover new topics matching the configured regex patterns. Defaults to
	// 60s. Set to 0 to disable periodic refresh (startup-only discovery).
	TopicRefreshInterval time.Duration `yaml:"topicRefreshInterval" default:"60s"`
	// LagPollInterval controls how often consumer lag is polled from Kafka.
	// Set to 0 to disable lag monitoring. Default: 30s.
	LagPollInterval time.Duration `yaml:"lagPollInterval" default:"30s"`
	// ConnectTimeout is the maximum time a TCP dial to a broker will wait
	// for a connection to complete. A reasonable value (e.g. 10s) prevents
	// hung dials from generating noisy warnings when some brokers are
	// temporarily unreachable. Default: 10s. Set to 0 to disable.
	ConnectTimeout time.Duration `yaml:"connectTimeout" default:"10s"`

	// OutputBatchCount is the number of messages Benthos accumulates before
	// calling WriteBatch on the output plugin. Higher values increase INSERT
	// throughput by writing more rows per ClickHouse INSERT. Set to 0 to
	// disable count-based batching. Default: 10000.
	OutputBatchCount int `yaml:"outputBatchCount" default:"10000"`
	// OutputBatchPeriod is the maximum time Benthos waits to fill a batch
	// before flushing a partial batch. Ensures low-volume topics still make
	// progress. Default: 5s. Set to 0 to disable period-based flushing.
	OutputBatchPeriod time.Duration `yaml:"outputBatchPeriod" default:"5s"`

	// MaxInFlight is the maximum number of concurrent WriteBatch calls
	// Benthos makes for each stream's output. Higher values increase
	// throughput by allowing concurrent ClickHouse INSERTs and bigger
	// natural batches. Default: 64.
	MaxInFlight int `yaml:"maxInFlight" default:"64"`

	// TopicOverrides contains per-topic batch settings keyed by exact topic name.
	// Overrides are matched against discovered concrete topic names. Unset fields
	// inherit the global defaults from this KafkaConfig.
	TopicOverrides map[string]TopicOverride `yaml:"topicOverrides"`
}

KafkaConfig configures the Kafka consumer.

func (*KafkaConfig) ApplyTopicOverride added in v1.8.12

func (c *KafkaConfig) ApplyTopicOverride(topic string) KafkaConfig

ApplyTopicOverride returns a shallow copy with per-topic overrides merged in. Fields not set in the override keep the global default.

func (*KafkaConfig) Validate

func (c *KafkaConfig) Validate() error

Validate checks the Kafka configuration for errors.

type LagMonitor added in v1.8.12

type LagMonitor struct {
	// contains filtered or unexported fields
}

LagMonitor periodically polls Kafka to compute consumer group lag and updates a Prometheus gauge. It reuses the same broker/SASL/TLS config as the main consumer.

func NewLagMonitor added in v1.8.12

func NewLagMonitor(
	log logrus.FieldLogger,
	cfg *KafkaConfig,
	consumerGroups []string,
	metrics *telemetry.Metrics,
) (*LagMonitor, error)

NewLagMonitor creates a new LagMonitor. Call Start to begin polling. The consumerGroups slice contains the consumer group names to monitor for lag (one per-topic consumer group).

func (*LagMonitor) Start added in v1.8.12

func (m *LagMonitor) Start(ctx context.Context) error

Start begins the periodic lag polling loop. It blocks until Stop is called or the context is cancelled.

func (*LagMonitor) Stop added in v1.8.12

func (m *LagMonitor) Stop() error

Stop signals the lag monitor to exit and waits for it to finish. It is safe to call multiple times; only the first call performs cleanup.

type SASLConfig

type SASLConfig struct {
	// Mechanism is the SASL mechanism to use.
	Mechanism string `yaml:"mechanism" default:"PLAIN"`
	// User is the SASL username.
	User string `yaml:"user"`
	// Password is the SASL password.
	Password string `yaml:"password"`
	// PasswordFile is the path to a file containing the SASL password.
	PasswordFile string `yaml:"passwordFile"`
}

SASLConfig configures SASL authentication for Kafka.

func (*SASLConfig) Validate

func (c *SASLConfig) Validate() error

Validate checks the SASL configuration for errors.

type TopicOverride added in v1.8.12

type TopicOverride struct {
	OutputBatchCount  *int           `yaml:"outputBatchCount"`
	OutputBatchPeriod *time.Duration `yaml:"outputBatchPeriod"`
	MaxInFlight       *int           `yaml:"maxInFlight"`
}

TopicOverride holds per-topic batch settings that override KafkaConfig defaults. Nil pointer fields inherit the global default.

func (*TopicOverride) Validate added in v1.8.12

func (o *TopicOverride) Validate(topic string) error

Validate checks the per-topic override for errors.

type Writer

type Writer interface {
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
	// FlushTableEvents writes the given events directly to their respective
	// ClickHouse tables concurrently. The map keys are base table names
	// (without suffix). Returns a FlushResult containing per-table errors
	// and any invalid events that should be sent to the DLQ.
	FlushTableEvents(ctx context.Context, tableEvents map[string][]*xatu.DecoratedEvent) *clickhouse.FlushResult
	// Ping checks connectivity to the underlying datastore.
	Ping(ctx context.Context) error
}

Writer writes flattened rows to ClickHouse.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL