consumoor

package
v1.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2026 License: GPL-3.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// LoggingLevel is the logging level to use.
	LoggingLevel string `yaml:"logging" default:"info"`
	// MetricsAddr is the address to listen on for metrics.
	MetricsAddr string `yaml:"metricsAddr" default:":9090"`
	// PProfAddr is the address to listen on for pprof.
	PProfAddr *string `yaml:"pprofAddr"`

	// Kafka is the Kafka source configuration.
	Kafka source.KafkaConfig `yaml:"kafka"`
	// ClickHouse is the ClickHouse sink configuration.
	ClickHouse clickhouse.Config `yaml:"clickhouse"`

	// DisabledEvents is a list of event names to drop without processing.
	DisabledEvents []string `yaml:"disabledEvents"`

	// Tracing configuration
	Tracing observability.TracingConfig `yaml:"tracing"`
}

Config is the configuration for the consumoor service.

func (*Config) ApplyOverrides

func (c *Config) ApplyOverrides(o *Override)

ApplyOverrides applies CLI/env overrides to the configuration.

func (*Config) DisabledEventEnums

func (c *Config) DisabledEventEnums() ([]xatu.Event_Name, error)

DisabledEventEnums parses disabled event names into typed enum values. Unknown names are rejected to fail fast during startup.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks the configuration for errors.

type Consumoor

type Consumoor struct {
	// contains filtered or unexported fields
}

Consumoor is the main service that consumes events from Kafka and writes them to ClickHouse. Each matched Kafka topic gets its own Benthos stream and consumer group while sharing a single ClickHouse writer for efficient connection reuse.

func New

func New(
	ctx context.Context,
	log observability.ContextualLogger, config *Config,
	overrides *Override,
) (*Consumoor, error)

New creates a new Consumoor service. Call Start() to run it.

func (*Consumoor) Start

func (c *Consumoor) Start(ctx context.Context) error

Start runs the consumoor service until the context is cancelled or a SIGINT/SIGTERM is received.

type Override

type Override struct {
	MetricsAddr struct {
		Enabled bool
		Value   string
	}
}

Override holds values that can be set via CLI flags or environment variables, overriding the config file.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL