Documentation
¶
Index ¶
- Constants
- func DiscoverTopics(ctx context.Context, cfg *KafkaConfig) ([]string, error)
- func NewBenthosStream(log logrus.FieldLogger, logLevel string, kafkaConfig *KafkaConfig, ...) (*service.Stream, error)
- type GroupRetryConfig
- type KafkaConfig
- type LagMonitor
- type SASLConfig
- type TopicOverride
- type Writer
Constants ¶
const ( SASLMechanismPLAIN = "PLAIN" SASLMechanismSCRAMSHA256 = "SCRAM-SHA-256" SASLMechanismSCRAMSHA512 = "SCRAM-SHA-512" SASLMechanismOAUTHBEARER = "OAUTHBEARER" )
Supported SASL mechanism values.
Variables ¶
This section is empty.
Functions ¶
func DiscoverTopics ¶ added in v1.8.12
func DiscoverTopics( ctx context.Context, cfg *KafkaConfig, ) ([]string, error)
DiscoverTopics queries Kafka metadata and returns topic names that match at least one of the configured regex patterns, sorted alphabetically.
func NewBenthosStream ¶
func NewBenthosStream( log logrus.FieldLogger, logLevel string, kafkaConfig *KafkaConfig, metrics *telemetry.Metrics, routeEngine *router.Engine, writer Writer, ownsWriter bool, groupRetry GroupRetryConfig, ) (*service.Stream, error)
NewBenthosStream creates a Benthos stream that consumes from Kafka and writes to ClickHouse via the custom xatu_clickhouse output plugin. When ownsWriter is true the output plugin manages the writer lifecycle (Start/Stop). When false the caller is responsible for the writer lifecycle, which is the case when multiple streams share a single writer.
Types ¶
type GroupRetryConfig ¶ added in v1.8.12
GroupRetryConfig configures group-level retry behavior for partial table failures in processGroup. When a fanout flush partially fails (some tables succeed, others fail transiently), only the failed tables are retried with exponential backoff to limit write amplification.
type KafkaConfig ¶
type KafkaConfig struct {
// Brokers is a list of Kafka broker addresses.
Brokers []string `yaml:"brokers"`
// Topics is a list of topic patterns to subscribe to (supports regex).
Topics []string `yaml:"topics"`
// ConsumerGroup is the Kafka consumer group ID.
ConsumerGroup string `yaml:"consumerGroup"`
// ClientID is the Kafka client ID sent to brokers. When set, all per-topic
// streams share this value, which allows broker-side quota enforcement
// (quota.consumer.default) to apply as a single budget across all streams.
// If empty, franz-go generates a unique ID per stream.
ClientID string `yaml:"clientId"`
// Encoding is the message encoding format ("json" or "protobuf").
Encoding string `yaml:"encoding" default:"json"`
// TLS configures TLS for the Kafka connection.
TLS xtls.Config `yaml:"tls"`
// SASLConfig is the SASL authentication configuration.
SASLConfig *SASLConfig `yaml:"sasl"`
// FetchMinBytes is the minimum number of bytes to fetch per request.
FetchMinBytes int32 `yaml:"fetchMinBytes" default:"1"`
// FetchWaitMaxMs is the maximum time to wait for fetch responses.
FetchWaitMaxMs int `yaml:"fetchWaitMaxMs" default:"250"`
// MaxPartitionFetchBytes is the max bytes per partition per request.
MaxPartitionFetchBytes int32 `yaml:"maxPartitionFetchBytes" default:"3145728"`
// FetchMaxBytes is the max total bytes per fetch request across all
// partitions from a single broker. With many independent consumers
// (one per topic) this is the primary lever for capping in-flight
// memory from Kafka fetch buffers. Default: 10 MiB.
FetchMaxBytes int32 `yaml:"fetchMaxBytes" default:"10485760"`
// SessionTimeoutMs is the consumer group session timeout.
SessionTimeoutMs int `yaml:"sessionTimeoutMs" default:"30000"`
// RebalanceTimeout is the maximum time group members are allowed to
// take when a rebalance has begun (finish work, commit offsets, rejoin).
// Lower values speed up partition reassignment when scaling. Default: 15s.
RebalanceTimeout time.Duration `yaml:"rebalanceTimeout" default:"15s"`
// OffsetDefault controls where to start consuming when no offset exists.
// Valid values: "earliest" or "latest".
OffsetDefault string `yaml:"offsetDefault" default:"earliest"`
// CommitInterval controls Kafka offset commit cadence for kafka_franz.
CommitInterval time.Duration `yaml:"commitInterval" default:"5s"`
// ShutdownTimeout is the maximum time the Benthos stream waits for
// in-flight messages to complete during graceful shutdown.
ShutdownTimeout time.Duration `yaml:"shutdownTimeout" default:"30s"`
// RejectedTopic is an optional Kafka topic where permanently rejected
// messages are emitted as JSON envelopes.
RejectedTopic string `yaml:"rejectedTopic"`
// TopicRefreshInterval controls how often Kafka metadata is refreshed to
// discover new topics matching the configured regex patterns. Defaults to
// 60s. Set to 0 to disable periodic refresh (startup-only discovery).
TopicRefreshInterval time.Duration `yaml:"topicRefreshInterval" default:"60s"`
// LagPollInterval controls how often consumer lag is polled from Kafka.
// Set to 0 to disable lag monitoring. Default: 30s.
LagPollInterval time.Duration `yaml:"lagPollInterval" default:"30s"`
// ConnectTimeout is the maximum time a TCP dial to a broker will wait
// for a connection to complete. A reasonable value (e.g. 10s) prevents
// hung dials from generating noisy warnings when some brokers are
// temporarily unreachable. Default: 10s. Set to 0 to disable.
ConnectTimeout time.Duration `yaml:"connectTimeout" default:"10s"`
// OutputBatchCount is the number of messages Benthos accumulates before
// calling WriteBatch on the output plugin. Higher values increase INSERT
// throughput by writing more rows per ClickHouse INSERT. Set to 0 to
// disable count-based batching. Default: 10000.
OutputBatchCount int `yaml:"outputBatchCount" default:"10000"`
// OutputBatchPeriod is the maximum time Benthos waits to fill a batch
// before flushing a partial batch. Ensures low-volume topics still make
// progress. Default: 5s. Set to 0 to disable period-based flushing.
OutputBatchPeriod time.Duration `yaml:"outputBatchPeriod" default:"5s"`
// MaxInFlight is the maximum number of concurrent WriteBatch calls
// Benthos makes for each stream's output. Higher values increase
// throughput by allowing concurrent ClickHouse INSERTs and bigger
// natural batches. Default: 64.
MaxInFlight int `yaml:"maxInFlight" default:"64"`
// TopicOverrides contains per-topic batch settings keyed by exact topic name.
// Overrides are matched against discovered concrete topic names. Unset fields
// inherit the global defaults from this KafkaConfig.
TopicOverrides map[string]TopicOverride `yaml:"topicOverrides"`
}
KafkaConfig configures the Kafka consumer.
func (*KafkaConfig) ApplyTopicOverride ¶ added in v1.8.12
func (c *KafkaConfig) ApplyTopicOverride(topic string) KafkaConfig
ApplyTopicOverride returns a shallow copy with per-topic overrides merged in. Fields not set in the override keep the global default.
func (*KafkaConfig) Validate ¶
func (c *KafkaConfig) Validate() error
Validate checks the Kafka configuration for errors.
type LagMonitor ¶ added in v1.8.12
type LagMonitor struct {
// contains filtered or unexported fields
}
LagMonitor periodically polls Kafka to compute consumer group lag and updates a Prometheus gauge. It reuses the same broker/SASL/TLS config as the main consumer.
func NewLagMonitor ¶ added in v1.8.12
func NewLagMonitor( log logrus.FieldLogger, cfg *KafkaConfig, consumerGroups []string, metrics *telemetry.Metrics, ) (*LagMonitor, error)
NewLagMonitor creates a new LagMonitor. Call Start to begin polling. The consumerGroups slice contains the consumer group names to monitor for lag (one per-topic consumer group).
func (*LagMonitor) Start ¶ added in v1.8.12
func (m *LagMonitor) Start(ctx context.Context) error
Start begins the periodic lag polling loop. It blocks until Stop is called or the context is cancelled.
func (*LagMonitor) Stop ¶ added in v1.8.12
func (m *LagMonitor) Stop() error
Stop signals the lag monitor to exit and waits for it to finish. It is safe to call multiple times; only the first call performs cleanup.
type SASLConfig ¶
type SASLConfig struct {
// Mechanism is the SASL mechanism to use.
Mechanism string `yaml:"mechanism" default:"PLAIN"`
// User is the SASL username.
User string `yaml:"user"`
// Password is the SASL password.
Password string `yaml:"password"`
// PasswordFile is the path to a file containing the SASL password.
PasswordFile string `yaml:"passwordFile"`
}
SASLConfig configures SASL authentication for Kafka.
func (*SASLConfig) Validate ¶
func (c *SASLConfig) Validate() error
Validate checks the SASL configuration for errors.
type TopicOverride ¶ added in v1.8.12
type TopicOverride struct {
OutputBatchCount *int `yaml:"outputBatchCount"`
OutputBatchPeriod *time.Duration `yaml:"outputBatchPeriod"`
MaxInFlight *int `yaml:"maxInFlight"`
}
TopicOverride holds per-topic batch settings that override KafkaConfig defaults. Nil pointer fields inherit the global default.
func (*TopicOverride) Validate ¶ added in v1.8.12
func (o *TopicOverride) Validate(topic string) error
Validate checks the per-topic override for errors.
type Writer ¶
type Writer interface {
Start(ctx context.Context) error
Stop(ctx context.Context) error
// FlushTableEvents writes the given events directly to their respective
// ClickHouse tables concurrently. The map keys are base table names
// (without suffix). Returns a FlushResult containing per-table errors
// and any invalid events that should be sent to the DLQ.
FlushTableEvents(ctx context.Context, tableEvents map[string][]*xatu.DecoratedEvent) *clickhouse.FlushResult
// Ping checks connectivity to the underlying datastore.
Ping(ctx context.Context) error
}
Writer writes flattened rows to ClickHouse.