config

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 20, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MaxBufferSize = 1 << 20 // 1048576

	DefaultMaxPollIntervalMs = 3600000 // 1 hour

	DefaultDiscoveryIntervalSec = 60 // 1min
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Assignment

type Assignment struct {
	Version   int
	UpdatedAt int64               // timestamp when created
	UpdatedBy string              // leader instance
	Map       map[string][]string // map instance to a list of task_name
}

type ClickHouseConfig

type ClickHouseConfig struct {
	Cluster  string
	DB       string
	Hosts    [][]string
	Port     int
	Username string
	Password string
	Protocol string //native, http

	// Whether enable TLS encryption with clickhouse-server
	Secure bool
	// Whether skip verify clickhouse-server cert
	InsecureSkipVerify bool
	RetryTimes         int // <=0 means retry infinitely
	MaxOpenConns       int
	ReadTimeout        int
	AsyncInsert        bool
	AsyncSettings      struct {
		// refers to https://clickhouse.com/docs/en/operations/settings/settings#async-insert
		AsyncInsertMaxDataSize    int `json:"async_insert_max_data_size,omitempty"`
		AsyncInsertMaxQueryNumber int `json:"async_insert_max_query_number,omitempty"` // 450
		AsyncInsertBusyTimeoutMs  int `json:"async_insert_busy_timeout_ms,omitempty"`  // 200
		WaitforAsyncInsert        int `json:"wait_for_async_insert,omitempty"`
		WaitforAsyncInsertTimeout int `json:"wait_for_async_insert_timeout,omitempty"`
		AsyncInsertThreads        int `json:"async_insert_threads,omitempty"` // 16
		AsyncInsertDeduplicate    int `json:"async_insert_deduplicate,omitempty"`
	}
	Ctx context.Context `json:"-"`
}

ClickHouseConfig configuration parameters

type Config

type Config struct {
	Kafka                   KafkaConfig
	SchemaRegistry          SchemaRegistryConfig
	Clickhouse              ClickHouseConfig
	Discovery               Discovery
	Task                    *TaskConfig
	Tasks                   []*TaskConfig
	Assignment              Assignment
	LogLevel                string
	LogTrace                bool
	RecordPoolSize          int64
	ReloadSeriesMapInterval int
	ActiveSeriesRange       int

	Groups map[string]*GroupConfig `json:"-"`
}

Config struct used for different configurations use

func ParseLocalCfgFile

func ParseLocalCfgFile(cfgPath string) (cfg *Config, err error)

func (*Config) IsAssigned

func (cfg *Config) IsAssigned(instance, task string) (assigned bool)

func (*Config) Normallize

func (cfg *Config) Normallize(constructGroup bool, httpAddr string, cred util.Credentials) (err error)

Normalize and validate configuration

type Discovery added in v0.7.0

type Discovery struct {
	Enabled       bool
	CheckInterval int
	UpdatedBy     string
	UpdatedAt     time.Time
}

type GroupConfig added in v0.5.0

type GroupConfig struct {
	Name          string
	Topics        []string
	Earliest      bool
	FlushInterval int
	BufferSize    int
	Configs       map[string]*TaskConfig
}

type KafkaConfig

type KafkaConfig struct {
	Brokers    string
	Properties struct {
		HeartbeatInterval      int `json:"heartbeat.interval.ms"`
		SessionTimeout         int `json:"session.timeout.ms"`
		RebalanceTimeout       int `json:"rebalance.timeout.ms"`
		RequestTimeoutOverhead int `json:"request.timeout.ms"`
		MaxPollInterval        int `json:"max.poll.interval.ms"`
	}
	ResetSaslRealm bool
	Security       map[string]string
	TLS            struct {
		Enable         bool
		CaCertFiles    string // CA cert.pem with which Kafka brokers certs be signed.  Leave empty for certificates trusted by the OS
		ClientCertFile string // Required for client authentication. It's client cert.pem.
		ClientKeyFile  string // Required if and only if ClientCertFile is present. It's client key.pem.

		TrustStoreLocation string // JKS format of CA certificate, used to extract CA cert.pem.
		TrustStorePassword string
		KeystoreLocation   string // JKS format of client certificate and key, used to extrace client cert.pem and key.pem.
		KeystorePassword   string
		EndpIdentAlgo      string
	}
	// simplified sarama.Config.Net.SASL to only support SASL/PLAIN and SASL/GSSAPI(Kerberos)
	Sasl struct {
		// Whether or not to use SASL authentication when connecting to the broker
		// (defaults to false).
		Enable bool
		// Mechanism is the name of the enabled SASL mechanism.
		// Possible values: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI (defaults to PLAIN)
		Mechanism string
		// Username is the authentication identity (authcid) to present for
		// SASL/PLAIN or SASL/SCRAM authentication
		Username string
		// Password for SASL/PLAIN or SASL/SCRAM authentication
		Password string
		GSSAPI   struct {
			AuthType           int // 1. KRB5_USER_AUTH, 2. KRB5_KEYTAB_AUTH
			KeyTabPath         string
			KerberosConfigPath string
			ServiceName        string
			Username           string
			Password           string
			Realm              string
			DisablePAFXFAST    bool
		}
	}
	AssignInterval  int
	CalcLagInterval int
	RebalanceByLags bool
}

KafkaConfig configuration parameters

type SchemaRegistryConfig

type SchemaRegistryConfig struct {
	URL string
}

SchemaRegistryConfig configuration parameters

type TaskConfig

type TaskConfig struct {
	Name string

	Topic         string
	ConsumerGroup string

	// Earliest set to true to consume the message from oldest position
	Earliest bool
	Parser   string
	// the csv cloum title if Parser is csv
	CsvFormat []string
	Delimiter string

	TableName       string
	SeriesTableName string

	// AutoSchema will auto fetch the schema from clickhouse
	AutoSchema     bool
	ExcludeColumns []string
	Dims           []struct {
		Name       string
		Type       string
		SourceName string
		// Const is used to set column value to some constant from config.
		Const string
	} `json:"dims"`
	// DynamicSchema will add columns present in message to clickhouse. Requires AutoSchema be true.
	DynamicSchema struct {
		Enable      bool
		NotNullable bool
		MaxDims     int // the upper limit of dynamic columns number, <=0 means math.MaxInt16. protecting dirty data attack
		// A column is added for new key K if all following conditions are true:
		// - K isn't in ExcludeColumns
		// - number of existing columns doesn't reach MaxDims-1
		// - WhiteList is empty, or K matchs WhiteList
		// - BlackList is empty, or K doesn't match BlackList
		WhiteList string // the regexp of white list
		BlackList string // the regexp of black list
	}
	// additional fields to be appended to each input message, should be a valid json string
	Fields string `json:"fields,omitempty"`
	// PrometheusSchema expects each message is a Prometheus metric(timestamp, value, metric name and a list of labels).
	PrometheusSchema bool
	// fields match PromLabelsBlackList are not considered as labels. Requires PrometheusSchema be true.
	PromLabelsBlackList string // the regexp of black list

	// ShardingKey is the column name to which sharding against
	ShardingKey string `json:"shardingKey,omitempty"`
	// ShardingStripe take effect if the sharding key is numerical
	ShardingStripe uint64 `json:"shardingStripe,omitempty"`

	FlushInterval int     `json:"flushInterval,omitempty"`
	BufferSize    int     `json:"bufferSize,omitempty"`
	TimeZone      string  `json:"timeZone"`
	TimeUnit      float64 `json:"timeUnit"`
}

TaskConfig parameters

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL