Documentation
¶
Overview ¶
Package config loads application configuration.
Index ¶
- func Configure(v *viper.Viper, flags *pflag.FlagSet)
- func DecodeHook() mapstructure.DecodeHookFunc
- type ClickHouseKafkaConnectSinkConfiguration
- type ClickHouseProcessorConfiguration
- type Configuration
- type DeadLetterQueueKafkaConnectSinkConfiguration
- type DedupeConfiguration
- type DedupeDriverConfiguration
- type DedupeDriverMemoryConfiguration
- type DedupeDriverRedisConfiguration
- type ExporterTraceTelemetryConfig
- type IngestConfiguration
- type KafkaConnectSinkConfiguration
- type KafkaIngestConfiguration
- type LogTelemetryConfiguration
- type NamespaceConfiguration
- type ProcessorConfiguration
- type SinkConfiguration
- type TelemetryConfig
- type TraceTelemetryConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DecodeHook ¶
func DecodeHook() mapstructure.DecodeHookFunc
Types ¶
type ClickHouseKafkaConnectSinkConfiguration ¶
type ClickHouseKafkaConnectSinkConfiguration struct {
Hostname string
Port int
SSL bool
Username string
Password string
Database string
}
Clickhouse configuration This may feel repetative but clikhouse sink and processor configs can be different, for example Kafka Connect ClickHouse plugin uses 8123 HTTP port while client uses native protocol's 9000 port. Hostname can be also different, as Kafka Connect and ClickHouse communicates inside the docker compose network. This why we default hostname in config to `clickhouse`.
func (ClickHouseKafkaConnectSinkConfiguration) Validate ¶
func (c ClickHouseKafkaConnectSinkConfiguration) Validate() error
type ClickHouseProcessorConfiguration ¶
type ClickHouseProcessorConfiguration struct {
Address string
TLS bool
Username string
Password string
Database string
}
func (ClickHouseProcessorConfiguration) Validate ¶
func (c ClickHouseProcessorConfiguration) Validate() error
type Configuration ¶
type Configuration struct {
Address string
Environment string
Telemetry TelemetryConfig
Namespace NamespaceConfiguration
Ingest IngestConfiguration
Processor ProcessorConfiguration
Sink SinkConfiguration
Dedupe DedupeConfiguration
Meters []*models.Meter
}
Configuration holds any kind of Configuration that comes from the outside world and is necessary for running the application.
func (Configuration) Validate ¶
func (c Configuration) Validate() error
Validate validates the configuration.
type DeadLetterQueueKafkaConnectSinkConfiguration ¶
type DeadLetterQueueKafkaConnectSinkConfiguration struct {
TopicName string
ReplicationFactor int
ContextHeaders bool
}
Clickhouse configuration See: https://docs.confluent.io/platform/current/installation/configuration/connect/sink-connect-configs.html
func (DeadLetterQueueKafkaConnectSinkConfiguration) Validate ¶
func (c DeadLetterQueueKafkaConnectSinkConfiguration) Validate() error
type DedupeConfiguration ¶
type DedupeConfiguration struct {
Enabled bool
DedupeDriverConfiguration
}
Requires mapstructurex.MapDecoderHookFunc to be high up in the decode hook chain.
func (*DedupeConfiguration) DecodeMap ¶
func (c *DedupeConfiguration) DecodeMap(v map[string]any) error
func (DedupeConfiguration) NewDeduplicator ¶
func (c DedupeConfiguration) NewDeduplicator() (ingest.Deduplicator, error)
func (DedupeConfiguration) Validate ¶
func (c DedupeConfiguration) Validate() error
type DedupeDriverConfiguration ¶
type DedupeDriverConfiguration interface {
DriverName() string
NewDeduplicator() (ingest.Deduplicator, error)
Validate() error
}
type DedupeDriverMemoryConfiguration ¶
Dedupe memory driver configuration
func (DedupeDriverMemoryConfiguration) DriverName ¶
func (DedupeDriverMemoryConfiguration) DriverName() string
func (DedupeDriverMemoryConfiguration) NewDeduplicator ¶
func (c DedupeDriverMemoryConfiguration) NewDeduplicator() (ingest.Deduplicator, error)
func (DedupeDriverMemoryConfiguration) Validate ¶
func (c DedupeDriverMemoryConfiguration) Validate() error
type DedupeDriverRedisConfiguration ¶
type DedupeDriverRedisConfiguration struct {
Address string
Database int
Username string
Password string
Expiration time.Duration
Sentinel struct {
Enabled bool
MasterName string
}
TLS struct {
Enabled bool
InsecureSkipVerify bool
}
}
Dedupe redis driver configuration
func (DedupeDriverRedisConfiguration) DriverName ¶
func (DedupeDriverRedisConfiguration) DriverName() string
func (DedupeDriverRedisConfiguration) NewDeduplicator ¶
func (c DedupeDriverRedisConfiguration) NewDeduplicator() (ingest.Deduplicator, error)
func (DedupeDriverRedisConfiguration) Validate ¶
func (c DedupeDriverRedisConfiguration) Validate() error
type ExporterTraceTelemetryConfig ¶
func (ExporterTraceTelemetryConfig) GetExporter ¶
func (c ExporterTraceTelemetryConfig) GetExporter() (sdktrace.SpanExporter, error)
func (ExporterTraceTelemetryConfig) Validate ¶
func (c ExporterTraceTelemetryConfig) Validate() error
Validate validates the configuration.
type IngestConfiguration ¶
type IngestConfiguration struct {
Kafka KafkaIngestConfiguration
}
func (IngestConfiguration) Validate ¶
func (c IngestConfiguration) Validate() error
Validate validates the configuration.
type KafkaConnectSinkConfiguration ¶
type KafkaConnectSinkConfiguration struct {
Enabled bool
URL string
DeadLetterQueue DeadLetterQueueKafkaConnectSinkConfiguration
ClickHouse ClickHouseKafkaConnectSinkConfiguration
}
func (KafkaConnectSinkConfiguration) Validate ¶
func (c KafkaConnectSinkConfiguration) Validate() error
type KafkaIngestConfiguration ¶
type KafkaIngestConfiguration struct {
Broker string
SecurityProtocol string
SaslMechanisms string
SaslUsername string
SaslPassword string
Partitions int
EventsTopicTemplate string
}
func (KafkaIngestConfiguration) CreateKafkaConfig ¶
func (c KafkaIngestConfiguration) CreateKafkaConfig() kafka.ConfigMap
CreateKafkaConfig creates a Kafka config map.
func (KafkaIngestConfiguration) Validate ¶
func (c KafkaIngestConfiguration) Validate() error
Validate validates the configuration.
type LogTelemetryConfiguration ¶
type LogTelemetryConfiguration struct {
// Format specifies the output log format.
// Accepted values are: json, text
Format string
// Level is the minimum log level that should appear on the output.
//
// Requires [mapstructure.TextUnmarshallerHookFunc] to be high up in the decode hook chain.
Level slog.Level
}
func (LogTelemetryConfiguration) NewHandler ¶
func (c LogTelemetryConfiguration) NewHandler(w io.Writer) slog.Handler
NewHandler creates a new slog.Handler.
func (LogTelemetryConfiguration) Validate ¶
func (c LogTelemetryConfiguration) Validate() error
Validate validates the configuration.
type NamespaceConfiguration ¶
Namespace configuration
func (NamespaceConfiguration) Validate ¶
func (c NamespaceConfiguration) Validate() error
type ProcessorConfiguration ¶
type ProcessorConfiguration struct {
ClickHouse ClickHouseProcessorConfiguration
}
func (ProcessorConfiguration) Validate ¶
func (c ProcessorConfiguration) Validate() error
Validate validates the configuration.
type SinkConfiguration ¶
type SinkConfiguration struct {
KafkaConnect KafkaConnectSinkConfiguration
}
func (SinkConfiguration) Validate ¶
func (c SinkConfiguration) Validate() error
type TelemetryConfig ¶
type TelemetryConfig struct {
// Telemetry HTTP server address
Address string
Trace TraceTelemetryConfig
Log LogTelemetryConfiguration
}
func (TelemetryConfig) Validate ¶
func (c TelemetryConfig) Validate() error
Validate validates the configuration.
type TraceTelemetryConfig ¶
type TraceTelemetryConfig struct {
Exporter ExporterTraceTelemetryConfig
Sampler string
}
func (TraceTelemetryConfig) GetSampler ¶
func (c TraceTelemetryConfig) GetSampler() sdktrace.Sampler
func (TraceTelemetryConfig) Validate ¶
func (c TraceTelemetryConfig) Validate() error
Validate validates the configuration.