Documentation
¶
Overview ¶
Package config loads application configuration.
Index ¶
- func AddPrefix(prefix, key string) string
- func ConfigureAggregation(v *viper.Viper)
- func ConfigureApps(v *viper.Viper, flags *pflag.FlagSet)
- func ConfigureBalanceWorker(v *viper.Viper)
- func ConfigureBilling(v *viper.Viper, flags *pflag.FlagSet)
- func ConfigureBillingWorker(v *viper.Viper)
- func ConfigureConsumer(v *viper.Viper, prefix string)
- func ConfigureDedupe(v *viper.Viper)
- func ConfigureEntitlements(v *viper.Viper, flags *pflag.FlagSet)
- func ConfigureEvents(v *viper.Viper)
- func ConfigureIngest(v *viper.Viper)
- func ConfigureKafkaConfiguration(v *viper.Viper, prefix string)
- func ConfigureNamespace(v *viper.Viper)
- func ConfigureNotification(v *viper.Viper)
- func ConfigurePortal(v *viper.Viper)
- func ConfigurePostgres(v *viper.Viper)
- func ConfigureProductCatalog(v *viper.Viper)
- func ConfigureProgressManager(v *viper.Viper)
- func ConfigureSink(v *viper.Viper)
- func ConfigureTelemetry(v *viper.Viper, flags *pflag.FlagSet)
- func ConfigureTermination(v *viper.Viper, prefixes ...string)
- func ConfigureTopicProvisioner(v *viper.Viper, prefixes ...string)
- func DecodeHook() mapstructure.DecodeHookFunc
- func SetViperDefaults(v *viper.Viper, flags *pflag.FlagSet)
- type AggregationConfiguration
- type AppStripeConfiguration
- type AppsConfiguration
- type AutoMigrate
- type AutoProvisionConfiguration
- type BalanceWorkerConfiguration
- type BillingConfiguration
- type BillingWorkerConfiguration
- type ClickHouseAggregationConfiguration
- type Configuration
- type ConsumerConfiguration
- type DLQAutoProvisionConfiguration
- type DLQConfiguration
- type DedupeConfiguration
- type DedupeDriverConfiguration
- type DedupeDriverMemoryConfiguration
- type DedupeDriverRedisConfiguration
- type EntitlementsConfiguration
- type EventSubsystemConfiguration
- type EventsConfiguration
- type ExportersLogTelemetryConfig
- type ExportersMetricsTelemetryConfig
- type ExportersTraceTelemetryConfig
- type FileExportersLogTelemetryConfig
- type IngestConfiguration
- type IngestNotificationsConfiguration
- type KafkaConfig
- type KafkaConfiguration
- type KafkaIngestConfiguration
- type LogTelemetryConfig
- type MetricsTelemetryConfig
- type NamespaceConfiguration
- type NotificationConfiguration
- type OTLPExporterTelemetryConfig
- type OTLPExportersLogTelemetryConfig
- type OTLPExportersMetricsTelemetryConfig
- type OTLPExportersTraceTelemetryConfig
- type PortalCORSConfiguration
- type PortalConfiguration
- type PostgresConfig
- type ProductCatalogConfiguration
- type ProgressManagerConfiguration
- type PrometheusExportersMetricsTelemetryConfig
- type RetryConfiguration
- type SinkConfiguration
- type StdoutExportersLogTelemetryConfig
- type StorageConfiguration
- type SvixConfig
- type TelemetryConfig
- type TerminationConfig
- type TopicProvisionerConfig
- type TraceTelemetryConfig
- type ViperKeyPrefixer
- type WebhookConfiguration
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AddPrefix ¶
AddPrefix returns string with "<prefix>." prepended to key. If returns key unmodified if prefix is empty or key already has the prefix added.
func ConfigureAggregation ¶
ConfigureAggregation configures some defaults in the Viper instance.
func ConfigureBalanceWorker ¶
func ConfigureBillingWorker ¶
func ConfigureConsumer ¶
func ConfigureDedupe ¶
ConfigureDedupe configures some defaults in the Viper instance.
func ConfigureEvents ¶
func ConfigureIngest ¶
Configure configures some defaults in the Viper instance.
func ConfigureKafkaConfiguration ¶
ConfigureKafkaConfiguration sets defaults in the Viper instance.
func ConfigureNamespace ¶
ConfigureNamespace configures some defaults in the Viper instance.
func ConfigureNotification ¶
func ConfigurePortal ¶
ConfigurePortal configures some defaults in the Viper instance.
func ConfigurePostgres ¶
func ConfigureProductCatalog ¶
func ConfigureProgressManager ¶
ConfigureProgressManager sets the default values for the progress manager configuration
func ConfigureSink ¶
ConfigureSink setup Sink specific configuration defaults for provided *viper.Viper instance.
func ConfigureTelemetry ¶
ConfigureTelemetry configures some defaults in the Viper instance.
func ConfigureTermination ¶
ConfigureTermination configures some defaults in the Viper instance.
func ConfigureTopicProvisioner ¶
ConfigureTopicProvisioner configures some defaults in the Viper instance.
func DecodeHook ¶
func DecodeHook() mapstructure.DecodeHookFunc
Types ¶
type AggregationConfiguration ¶
type AggregationConfiguration struct {
ClickHouse ClickHouseAggregationConfiguration
EventsTableName string
// Set true for ClickHouse first store the incoming inserts into an in-memory buffer
// before flushing them regularly to disk.
// See https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts
AsyncInsert bool
// Set true if you want an insert statement to return with an acknowledgment immediately
// without waiting for the data got inserted into the buffer.
// Setting true can cause silent errors that you need to monitor separately.
AsyncInsertWait bool
// See https://clickhouse.com/docs/en/operations/settings/settings
// For example, you can set the `max_insert_threads` setting to control the number of threads
// or the `parallel_view_processing` setting to enable pushing to attached views concurrently.
InsertQuerySettings map[string]string
}
func (AggregationConfiguration) Validate ¶
func (c AggregationConfiguration) Validate() error
Validate validates the configuration.
type AppStripeConfiguration ¶
type AppStripeConfiguration struct {
DisableWebhookRegistration bool `yaml:"disableWebhookRegistration"`
}
func (AppStripeConfiguration) Validate ¶
func (c AppStripeConfiguration) Validate() error
type AppsConfiguration ¶
type AppsConfiguration struct {
// BaseURL is the base URL for the Stripe webhook.
BaseURL string `yaml:"baseURL"`
Stripe AppStripeConfiguration `yaml:"stripe"`
}
func (AppsConfiguration) Validate ¶
func (c AppsConfiguration) Validate() error
type AutoMigrate ¶
type AutoMigrate string
const ( AutoMigrateEnt AutoMigrate = "ent" AutoMigrateMigration AutoMigrate = "migration" AutoMigrateOff AutoMigrate = "false" )
func (AutoMigrate) Enabled ¶
func (a AutoMigrate) Enabled() bool
func (AutoMigrate) Validate ¶
func (a AutoMigrate) Validate() error
type AutoProvisionConfiguration ¶
func (AutoProvisionConfiguration) Validate ¶
func (c AutoProvisionConfiguration) Validate() error
type BalanceWorkerConfiguration ¶
type BalanceWorkerConfiguration struct {
ConsumerConfiguration `mapstructure:",squash"`
}
func (BalanceWorkerConfiguration) Validate ¶
func (c BalanceWorkerConfiguration) Validate() error
type BillingConfiguration ¶
type BillingConfiguration struct {
AdvancementStrategy billing.AdvancementStrategy
Worker BillingWorkerConfiguration
}
func (BillingConfiguration) Validate ¶
func (c BillingConfiguration) Validate() error
type BillingWorkerConfiguration ¶
type BillingWorkerConfiguration struct {
ConsumerConfiguration `mapstructure:",squash"`
}
func (BillingWorkerConfiguration) Validate ¶
func (c BillingWorkerConfiguration) Validate() error
type ClickHouseAggregationConfiguration ¶
type ClickHouseAggregationConfiguration struct {
Address string
TLS bool
Username string
Password string
Database string
// ClickHouse connection options
DialTimeout time.Duration
MaxOpenConns int
MaxIdleConns int
ConnMaxLifetime time.Duration
BlockBufferSize uint8
}
func (ClickHouseAggregationConfiguration) GetClientOptions ¶
func (c ClickHouseAggregationConfiguration) GetClientOptions() *clickhouse.Options
func (ClickHouseAggregationConfiguration) Validate ¶
func (c ClickHouseAggregationConfiguration) Validate() error
type Configuration ¶
type Configuration struct {
Address string
Environment string
Telemetry TelemetryConfig
Termination TerminationConfig
Aggregation AggregationConfiguration
Entitlements EntitlementsConfiguration
Dedupe DedupeConfiguration
Events EventsConfiguration
Ingest IngestConfiguration
Meters []*meter.Meter
Namespace NamespaceConfiguration
Portal PortalConfiguration
Postgres PostgresConfig
Sink SinkConfiguration
BalanceWorker BalanceWorkerConfiguration
Notification NotificationConfiguration
ProductCatalog ProductCatalogConfiguration
ProgressManager ProgressManagerConfiguration
Billing BillingConfiguration
Apps AppsConfiguration
Svix SvixConfig
}
Configuration holds any kind of Configuration that comes from the outside world and is necessary for running the application.
func (Configuration) Validate ¶
func (c Configuration) Validate() error
Validate validates the configuration.
type ConsumerConfiguration ¶
type ConsumerConfiguration struct {
// ProcessingTimeout is the maximum time a message is allowed to be processed before it is considered failed. 0 disables the timeout.
ProcessingTimeout time.Duration
// Retry specifies how many times a message should be retried before it is sent to the DLQ.
Retry RetryConfiguration
// ConsumerGroupName is the name of the consumer group that the consumer belongs to.
ConsumerGroupName string
// DLQ specifies the configuration for the Dead Letter Queue.
DLQ DLQConfiguration
}
func (ConsumerConfiguration) Validate ¶
func (c ConsumerConfiguration) Validate() error
type DLQAutoProvisionConfiguration ¶
func (DLQAutoProvisionConfiguration) Validate ¶
func (c DLQAutoProvisionConfiguration) Validate() error
type DLQConfiguration ¶
type DLQConfiguration struct {
Enabled bool
Topic string
AutoProvision DLQAutoProvisionConfiguration
}
func (DLQConfiguration) Validate ¶
func (c DLQConfiguration) Validate() error
type DedupeConfiguration ¶
type DedupeConfiguration struct {
Enabled bool
DedupeDriverConfiguration
}
Requires mapstructurex.MapDecoderHookFunc to be high up in the decode hook chain.
func (*DedupeConfiguration) DecodeMap ¶
func (c *DedupeConfiguration) DecodeMap(v map[string]any) error
func (DedupeConfiguration) NewDeduplicator ¶
func (c DedupeConfiguration) NewDeduplicator() (dedupe.Deduplicator, error)
func (DedupeConfiguration) Validate ¶
func (c DedupeConfiguration) Validate() error
type DedupeDriverConfiguration ¶
type DedupeDriverConfiguration interface {
DriverName() string
NewDeduplicator() (dedupe.Deduplicator, error)
Validate() error
}
type DedupeDriverMemoryConfiguration ¶
Dedupe memory driver configuration
func (DedupeDriverMemoryConfiguration) DriverName ¶
func (DedupeDriverMemoryConfiguration) DriverName() string
func (DedupeDriverMemoryConfiguration) NewDeduplicator ¶
func (c DedupeDriverMemoryConfiguration) NewDeduplicator() (dedupe.Deduplicator, error)
func (DedupeDriverMemoryConfiguration) Validate ¶
func (c DedupeDriverMemoryConfiguration) Validate() error
type DedupeDriverRedisConfiguration ¶
type DedupeDriverRedisConfiguration struct {
redis.Config `mapstructure:",squash"`
Expiration time.Duration
}
Dedupe redis driver configuration
func (DedupeDriverRedisConfiguration) DriverName ¶
func (DedupeDriverRedisConfiguration) DriverName() string
func (DedupeDriverRedisConfiguration) NewDeduplicator ¶
func (c DedupeDriverRedisConfiguration) NewDeduplicator() (dedupe.Deduplicator, error)
func (DedupeDriverRedisConfiguration) Validate ¶
func (c DedupeDriverRedisConfiguration) Validate() error
type EntitlementsConfiguration ¶
func (*EntitlementsConfiguration) GetGracePeriod ¶
func (c *EntitlementsConfiguration) GetGracePeriod() isodate.Period
func (EntitlementsConfiguration) Validate ¶
func (c EntitlementsConfiguration) Validate() error
Validate validates the configuration.
type EventSubsystemConfiguration ¶
type EventSubsystemConfiguration struct {
Enabled bool
Topic string
AutoProvision AutoProvisionConfiguration
}
func (EventSubsystemConfiguration) Validate ¶
func (c EventSubsystemConfiguration) Validate() error
type EventsConfiguration ¶
type EventsConfiguration struct {
SystemEvents EventSubsystemConfiguration
IngestEvents EventSubsystemConfiguration
}
func (EventsConfiguration) EventBusTopicMapping ¶
func (c EventsConfiguration) EventBusTopicMapping() eventbus.TopicMapping
func (EventsConfiguration) Validate ¶
func (c EventsConfiguration) Validate() error
type ExportersLogTelemetryConfig ¶
type ExportersLogTelemetryConfig struct {
OTLP OTLPExportersLogTelemetryConfig
Stdout StdoutExportersLogTelemetryConfig
File FileExportersLogTelemetryConfig
}
func (ExportersLogTelemetryConfig) Validate ¶
func (c ExportersLogTelemetryConfig) Validate() error
Validate validates the configuration.
type ExportersMetricsTelemetryConfig ¶
type ExportersMetricsTelemetryConfig struct {
Prometheus PrometheusExportersMetricsTelemetryConfig
OTLP OTLPExportersMetricsTelemetryConfig
}
func (ExportersMetricsTelemetryConfig) Validate ¶
func (c ExportersMetricsTelemetryConfig) Validate() error
Validate validates the configuration.
type ExportersTraceTelemetryConfig ¶
type ExportersTraceTelemetryConfig struct {
OTLP OTLPExportersTraceTelemetryConfig
}
func (ExportersTraceTelemetryConfig) Validate ¶
func (c ExportersTraceTelemetryConfig) Validate() error
Validate validates the configuration.
type FileExportersLogTelemetryConfig ¶
FileExportersLogTelemetryConfig represents the configuration for the file log exporter.
func (FileExportersLogTelemetryConfig) NewExporter ¶
func (c FileExportersLogTelemetryConfig) NewExporter() (sdklog.Exporter, error)
NewExporter creates a new sdklog.Exporter.
func (FileExportersLogTelemetryConfig) Validate ¶
func (c FileExportersLogTelemetryConfig) Validate() error
Validate validates the configuration.
type IngestConfiguration ¶
type IngestConfiguration struct {
Kafka KafkaIngestConfiguration
}
func (IngestConfiguration) Validate ¶
func (c IngestConfiguration) Validate() error
Validate validates the configuration.
type IngestNotificationsConfiguration ¶
type IngestNotificationsConfiguration struct {
MaxEventsInBatch int
}
func (IngestNotificationsConfiguration) Validate ¶
func (c IngestNotificationsConfiguration) Validate() error
type KafkaConfig ¶
type KafkaConfig struct {
pkgkafka.CommonConfigParams `mapstructure:",squash"`
pkgkafka.ConsumerConfigParams `mapstructure:",squash"`
pkgkafka.ProducerConfigParams `mapstructure:",squash"`
}
func (KafkaConfig) AsAdminConfig ¶
func (c KafkaConfig) AsAdminConfig() pkgkafka.AdminConfig
func (KafkaConfig) AsConsumerConfig ¶
func (c KafkaConfig) AsConsumerConfig() pkgkafka.ConsumerConfig
func (KafkaConfig) AsProducerConfig ¶
func (c KafkaConfig) AsProducerConfig() pkgkafka.ProducerConfig
func (KafkaConfig) Validate ¶
func (c KafkaConfig) Validate() error
type KafkaConfiguration ¶
type KafkaConfiguration struct {
Broker string
SecurityProtocol string
SaslMechanisms string
SaslUsername string
SaslPassword string
StatsInterval pkgkafka.TimeDurationMilliSeconds
// BrokerAddressFamily defines the IP address family to be used for network communication with Kafka cluster
BrokerAddressFamily pkgkafka.BrokerAddressFamily
// SocketKeepAliveEnable defines if TCP socket keep-alive is enabled to prevent closing idle connections
// by Kafka brokers.
SocketKeepAliveEnabled bool
// TopicMetadataRefreshInterval defines how frequently the Kafka client needs to fetch metadata information
// (brokers, topic, partitions, etc) from the Kafka cluster.
// The 5 minutes default value is appropriate for mostly static Kafka clusters, but needs to be lowered
// in case of large clusters where changes are more frequent.
// This value must not be set to value lower than 10s.
TopicMetadataRefreshInterval pkgkafka.TimeDurationMilliSeconds
// Enable contexts for extensive debugging of librdkafka.
// See: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts
DebugContexts pkgkafka.DebugContexts
}
func (KafkaConfiguration) CreateKafkaConfig ¶
func (c KafkaConfiguration) CreateKafkaConfig() kafka.ConfigMap
CreateKafkaConfig creates a Kafka config map.
func (KafkaConfiguration) Validate ¶
func (c KafkaConfiguration) Validate() error
type KafkaIngestConfiguration ¶
type KafkaIngestConfiguration struct {
KafkaConfiguration `mapstructure:",squash"`
TopicProvisionerConfig `mapstructure:",squash"`
Partitions int
EventsTopicTemplate string
// NamespaceDeletionEnabled defines whether deleting namespaces are allowed or not.
NamespaceDeletionEnabled bool
}
func (KafkaIngestConfiguration) Validate ¶
func (c KafkaIngestConfiguration) Validate() error
Validate validates the configuration.
type LogTelemetryConfig ¶
type LogTelemetryConfig struct {
// Format specifies the output log format.
// Accepted values are: json, text
Format string
// Level is the minimum log level that should appear on the output.
//
// Requires [mapstructure.TextUnmarshallerHookFunc] to be high up in the decode hook chain.
Level slog.Level
Exporters ExportersLogTelemetryConfig
}
func (LogTelemetryConfig) NewHandler ¶
func (c LogTelemetryConfig) NewHandler(w io.Writer) slog.Handler
NewHandler creates a new slog.Handler.
func (LogTelemetryConfig) NewLoggerProvider ¶
func (c LogTelemetryConfig) NewLoggerProvider(ctx context.Context, res *resource.Resource) (*sdklog.LoggerProvider, error)
func (LogTelemetryConfig) Validate ¶
func (c LogTelemetryConfig) Validate() error
Validate validates the configuration.
type MetricsTelemetryConfig ¶
type MetricsTelemetryConfig struct {
Exporters ExportersMetricsTelemetryConfig
}
func (MetricsTelemetryConfig) NewMeterProvider ¶
func (c MetricsTelemetryConfig) NewMeterProvider(ctx context.Context, res *resource.Resource) (*sdkmetric.MeterProvider, error)
func (MetricsTelemetryConfig) Validate ¶
func (c MetricsTelemetryConfig) Validate() error
Validate validates the configuration.
type NamespaceConfiguration ¶
Namespace configuration
func (NamespaceConfiguration) Validate ¶
func (c NamespaceConfiguration) Validate() error
type NotificationConfiguration ¶
type NotificationConfiguration struct {
Consumer ConsumerConfiguration
Webhook WebhookConfiguration
}
func (NotificationConfiguration) Validate ¶
func (c NotificationConfiguration) Validate() error
type OTLPExporterTelemetryConfig ¶
type OTLPExporterTelemetryConfig struct {
Address string
}
func (OTLPExporterTelemetryConfig) DialExporter ¶
func (c OTLPExporterTelemetryConfig) DialExporter(ctx context.Context) (*grpc.ClientConn, error)
func (OTLPExporterTelemetryConfig) Validate ¶
func (c OTLPExporterTelemetryConfig) Validate() error
Validate validates the configuration.
type OTLPExportersLogTelemetryConfig ¶
type OTLPExportersLogTelemetryConfig struct {
Enabled bool
OTLPExporterTelemetryConfig `mapstructure:",squash"`
}
func (OTLPExportersLogTelemetryConfig) NewExporter ¶
NewExporter creates a new sdklog.Exporter.
func (OTLPExportersLogTelemetryConfig) Validate ¶
func (c OTLPExportersLogTelemetryConfig) Validate() error
Validate validates the configuration.
type OTLPExportersMetricsTelemetryConfig ¶
type OTLPExportersMetricsTelemetryConfig struct {
Enabled bool
OTLPExporterTelemetryConfig `mapstructure:",squash"`
}
func (OTLPExportersMetricsTelemetryConfig) NewExporter ¶
func (c OTLPExportersMetricsTelemetryConfig) NewExporter(ctx context.Context) (sdkmetric.Reader, error)
NewExporter creates a new sdkmetric.Reader.
func (OTLPExportersMetricsTelemetryConfig) Validate ¶
func (c OTLPExportersMetricsTelemetryConfig) Validate() error
Validate validates the configuration.
type OTLPExportersTraceTelemetryConfig ¶
type OTLPExportersTraceTelemetryConfig struct {
Enabled bool
OTLPExporterTelemetryConfig `mapstructure:",squash"`
}
func (OTLPExportersTraceTelemetryConfig) NewExporter ¶
func (c OTLPExportersTraceTelemetryConfig) NewExporter(ctx context.Context) (sdktrace.SpanExporter, error)
NewExporter creates a new sdktrace.SpanExporter.
func (OTLPExportersTraceTelemetryConfig) Validate ¶
func (c OTLPExportersTraceTelemetryConfig) Validate() error
Validate validates the configuration.
type PortalCORSConfiguration ¶
type PortalCORSConfiguration struct {
Enabled bool `mapstructure:"enabled"`
}
type PortalConfiguration ¶
type PortalConfiguration struct {
Enabled bool `mapstructure:"enabled"`
CORS PortalCORSConfiguration `mapstructure:"cors"`
TokenSecret string `mapstructure:"tokenSecret"`
TokenExpiration time.Duration `mapstructure:"tokenExpiration"`
}
func (PortalConfiguration) Validate ¶
func (c PortalConfiguration) Validate() error
Validate validates the configuration.
type PostgresConfig ¶
type PostgresConfig struct {
// URL is the PostgreSQL database connection URL.
URL string `yaml:"url"`
// AutoMigrate is a flag that indicates whether the database should be automatically migrated.
// Supported values are:
// - "false" to disable auto-migration at startup
// - "ent" to use ent Schema Upserts (the default value)
// - "migration" to use the migrations directory
AutoMigrate AutoMigrate `yaml:"autoMigrate"`
}
func (PostgresConfig) Validate ¶
func (c PostgresConfig) Validate() error
Validate validates the configuration.
type ProductCatalogConfiguration ¶
type ProductCatalogConfiguration struct{}
func (ProductCatalogConfiguration) Validate ¶
func (c ProductCatalogConfiguration) Validate() error
type ProgressManagerConfiguration ¶
type ProgressManagerConfiguration struct {
Enabled bool
Expiration time.Duration
Redis redis.Config
}
ProgressManagerConfiguration stores the configuration parameters for the progress manager
func (ProgressManagerConfiguration) Validate ¶
func (c ProgressManagerConfiguration) Validate() error
Validate checks if the configuration is valid
type PrometheusExportersMetricsTelemetryConfig ¶
type PrometheusExportersMetricsTelemetryConfig struct {
Enabled bool
}
func (PrometheusExportersMetricsTelemetryConfig) NewExporter ¶
func (c PrometheusExportersMetricsTelemetryConfig) NewExporter() (sdkmetric.Reader, error)
NewExporter creates a new sdkmetric.Reader.
func (PrometheusExportersMetricsTelemetryConfig) Validate ¶
func (c PrometheusExportersMetricsTelemetryConfig) Validate() error
Validate validates the configuration.
type RetryConfiguration ¶
type RetryConfiguration struct {
// MaxRetries is maximum number of times a retry will be attempted. Disabled if 0
MaxRetries int
// InitialInterval is the first interval between retries. Subsequent intervals will be scaled by Multiplier.
InitialInterval time.Duration
// MaxInterval sets the limit for the exponential backoff of retries. The interval will not be increased beyond MaxInterval.
MaxInterval time.Duration
// MaxElapsedTime sets the time limit of how long retries will be attempted. Disabled if 0.
MaxElapsedTime time.Duration
}
func (RetryConfiguration) Validate ¶
func (c RetryConfiguration) Validate() error
type SinkConfiguration ¶
type SinkConfiguration struct {
// FIXME(chrisgacsal): remove as it is deprecated by moving Kafka specific configuration to dedicated config params.
GroupId string
Dedupe DedupeConfiguration
MinCommitCount int
MaxCommitWait time.Duration
MaxPollTimeout time.Duration
NamespaceRefetch time.Duration
FlushSuccessTimeout time.Duration
DrainTimeout time.Duration
IngestNotifications IngestNotificationsConfiguration
// Kafka client/Consumer configuration
Kafka KafkaConfig
// TODO: remove, config moved to aggregation config
// Storage configuration
Storage StorageConfiguration
// NamespaceRefetchTimeout is the timeout for updating namespaces and consumer subscription.
// It must be less than NamespaceRefetch interval.
NamespaceRefetchTimeout time.Duration
// NamespaceTopicRegexp defines the regular expression to match/validate topic names the sink-worker needs to subscribe to.
NamespaceTopicRegexp string
// MeterRefetchInterval is the interval to refetch meters from the database
MeterRefetchInterval time.Duration
}
func (SinkConfiguration) Validate ¶
func (c SinkConfiguration) Validate() error
type StdoutExportersLogTelemetryConfig ¶
StdoutExportersLogTelemetryConfig represents the configuration for the stdout log exporter. See https://pkg.go.dev/go.opentelemetry.io/otel/exporters/stdout/stdoutlog
func (StdoutExportersLogTelemetryConfig) NewExporter ¶
func (c StdoutExportersLogTelemetryConfig) NewExporter() (sdklog.Exporter, error)
NewExporter creates a new sdklog.Exporter.
func (StdoutExportersLogTelemetryConfig) Validate ¶
func (c StdoutExportersLogTelemetryConfig) Validate() error
Validate validates the configuration.
type StorageConfiguration ¶
type StorageConfiguration struct {
// Set true for ClickHouse first store the incoming inserts into an in-memory buffer
// before flushing them regularly to disk.
// See https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts
AsyncInsert bool
// Set true if you want an insert statement to return with an acknowledgment immediately
// without waiting for the data got inserted into the buffer.
// Setting true can cause silent errors that you need to monitor separately.
AsyncInsertWait bool
// See https://clickhouse.com/docs/en/operations/settings/settings
// For example, you can set the `max_insert_threads` setting to control the number of threads
// or the `parallel_view_processing` setting to enable pushing to attached views concurrently.
QuerySettings map[string]string
}
func (StorageConfiguration) Validate ¶
func (c StorageConfiguration) Validate() error
type SvixConfig ¶
type SvixConfig = notificationwebhook.SvixConfig
type TelemetryConfig ¶
type TelemetryConfig struct {
// Telemetry HTTP server address
Address string
Trace TraceTelemetryConfig
Metrics MetricsTelemetryConfig
Log LogTelemetryConfig
}
func (TelemetryConfig) Validate ¶
func (c TelemetryConfig) Validate() error
Validate validates the configuration.
type TerminationConfig ¶
type TerminationConfig struct {
// CheckInterval defines the time period used for updating the readiness check based on the termination status
CheckInterval time.Duration
// GracefulShutdownTimeout defines the maximum time for the process to gracefully stop on receiving stop signal.
GracefulShutdownTimeout time.Duration
// PropagationTimeout defines how long to block the termination process in order
// to allow the termination event to be propagated to other systems. e.g. reverse proxy.
// Its value should be set higher than the failure threshold for readiness probe.
// In Kubernetes it should be: readiness.periodSeconds * (readiness.failureThreshold + 1) + CheckInterval
// PropagationTimeout must always less than GracefulShutdownTimeout.
PropagationTimeout time.Duration
}
func (TerminationConfig) Validate ¶
func (c TerminationConfig) Validate() error
type TopicProvisionerConfig ¶
type TopicProvisionerConfig struct {
// The maximum number of entries stored in topic cache at a time which after the least recently used is evicted.
// Setting size to 0 makes it unlimited
CacheSize int
// The maximum time an entries is kept in cache before being evicted
CacheTTL time.Duration
// ProtectedTopics defines a list of topics which are protected from deletion.
ProtectedTopics []string
}
TopicProvisionerConfig stores the configuration for TopicProvisioner
func (TopicProvisionerConfig) Validate ¶
func (c TopicProvisionerConfig) Validate() error
type TraceTelemetryConfig ¶
type TraceTelemetryConfig struct {
Sampler string
Exporters ExportersTraceTelemetryConfig
}
func (TraceTelemetryConfig) GetSampler ¶
func (c TraceTelemetryConfig) GetSampler() sdktrace.Sampler
func (TraceTelemetryConfig) NewTracerProvider ¶
func (c TraceTelemetryConfig) NewTracerProvider(ctx context.Context, res *resource.Resource) (*sdktrace.TracerProvider, error)
func (TraceTelemetryConfig) Validate ¶
func (c TraceTelemetryConfig) Validate() error
Validate validates the configuration.
type ViperKeyPrefixer ¶
ViperKeyPrefixer is a helper to prepend prefix to a key name.
func NewViperKeyPrefixer ¶
func NewViperKeyPrefixer(prefixes ...string) ViperKeyPrefixer
NewViperKeyPrefixer returns a new ViperKeyPrefixer which prepends a dot delimited prefix calculated by concatenating provided prefixes in the order they appear in prefixes list.
prefixer := NewViperKeyPrefixer("a", "b")
s := prefixer("c")
fmt.Println(s) // -> "a.b.c"