Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// LoggingLevel is the logging level to use.
LoggingLevel string `yaml:"logging" default:"info"`
// MetricsAddr is the address to listen on for metrics.
MetricsAddr string `yaml:"metricsAddr" default:":9090"`
// PProfAddr is the address to listen on for pprof.
PProfAddr *string `yaml:"pprofAddr"`
// Kafka is the Kafka source configuration.
Kafka source.KafkaConfig `yaml:"kafka"`
// ClickHouse is the ClickHouse sink configuration.
ClickHouse clickhouse.Config `yaml:"clickhouse"`
// DisabledEvents is a list of event names to drop without processing.
DisabledEvents []string `yaml:"disabledEvents"`
// Tracing configuration
Tracing observability.TracingConfig `yaml:"tracing"`
}
Config is the configuration for the consumoor service.
func (*Config) ApplyOverrides ¶
ApplyOverrides applies CLI/env overrides to the configuration.
func (*Config) DisabledEventEnums ¶
func (c *Config) DisabledEventEnums() ([]xatu.Event_Name, error)
DisabledEventEnums parses disabled event names into typed enum values. Unknown names are rejected to fail fast during startup.
type Consumoor ¶
type Consumoor struct {
// contains filtered or unexported fields
}
Consumoor is the main service that consumes events from Kafka and writes them to ClickHouse. Each matched Kafka topic gets its own Benthos stream and consumer group while sharing a single ClickHouse writer for efficient connection reuse.
func New ¶
func New( ctx context.Context, log observability.ContextualLogger, config *Config, overrides *Override, ) (*Consumoor, error)
New creates a new Consumoor service. Call Start() to run it.
Click to show internal directories.
Click to hide internal directories.