Documentation
¶
Index ¶
Constants ¶
View Source
const ( MaxBufferSize = 1 << 20 // 1048576 DefaultMaxPollIntervalMs = 3600000 // 1 hour DefaultDiscoveryIntervalSec = 60 // 1min )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Assignment ¶
type ClickHouseConfig ¶
type ClickHouseConfig struct {
Cluster string
DB string
Hosts [][]string
Port int
Username string
Password string
Protocol string //native, http
// Whether enable TLS encryption with clickhouse-server
Secure bool
// Whether skip verify clickhouse-server cert
InsecureSkipVerify bool
RetryTimes int // <=0 means retry infinitely
MaxOpenConns int
ReadTimeout int
AsyncInsert bool
AsyncSettings struct {
// refers to https://clickhouse.com/docs/en/operations/settings/settings#async-insert
AsyncInsertMaxDataSize int `json:"async_insert_max_data_size,omitempty"`
AsyncInsertMaxQueryNumber int `json:"async_insert_max_query_number,omitempty"` // 450
AsyncInsertBusyTimeoutMs int `json:"async_insert_busy_timeout_ms,omitempty"` // 200
WaitforAsyncInsert int `json:"wait_for_async_insert,omitempty"`
WaitforAsyncInsertTimeout int `json:"wait_for_async_insert_timeout,omitempty"`
AsyncInsertThreads int `json:"async_insert_threads,omitempty"` // 16
AsyncInsertDeduplicate int `json:"async_insert_deduplicate,omitempty"`
}
Ctx context.Context `json:"-"`
}
ClickHouseConfig configuration parameters
type Config ¶
type Config struct {
Kafka KafkaConfig
SchemaRegistry SchemaRegistryConfig
Clickhouse ClickHouseConfig
Discovery Discovery
Task *TaskConfig
Tasks []*TaskConfig
Assignment Assignment
LogLevel string
LogTrace bool
RecordPoolSize int64
ReloadSeriesMapInterval int
ActiveSeriesRange int
Groups map[string]*GroupConfig `json:"-"`
}
Config struct used for different configurations use
func ParseLocalCfgFile ¶
func (*Config) IsAssigned ¶
func (*Config) Normallize ¶
func (cfg *Config) Normallize(constructGroup bool, httpAddr string, cred util.Credentials) (err error)
Normalize and validate configuration
type GroupConfig ¶ added in v0.5.0
type KafkaConfig ¶
type KafkaConfig struct {
Brokers string
Properties struct {
HeartbeatInterval int `json:"heartbeat.interval.ms"`
SessionTimeout int `json:"session.timeout.ms"`
RebalanceTimeout int `json:"rebalance.timeout.ms"`
RequestTimeoutOverhead int `json:"request.timeout.ms"`
MaxPollInterval int `json:"max.poll.interval.ms"`
}
ResetSaslRealm bool
Security map[string]string
TLS struct {
Enable bool
CaCertFiles string // CA cert.pem with which Kafka brokers certs be signed. Leave empty for certificates trusted by the OS
ClientCertFile string // Required for client authentication. It's client cert.pem.
ClientKeyFile string // Required if and only if ClientCertFile is present. It's client key.pem.
TrustStoreLocation string // JKS format of CA certificate, used to extract CA cert.pem.
TrustStorePassword string
KeystoreLocation string // JKS format of client certificate and key, used to extrace client cert.pem and key.pem.
KeystorePassword string
EndpIdentAlgo string
}
// simplified sarama.Config.Net.SASL to only support SASL/PLAIN and SASL/GSSAPI(Kerberos)
Sasl struct {
// Whether or not to use SASL authentication when connecting to the broker
// (defaults to false).
Enable bool
// Mechanism is the name of the enabled SASL mechanism.
// Possible values: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI (defaults to PLAIN)
Mechanism string
// Username is the authentication identity (authcid) to present for
// SASL/PLAIN or SASL/SCRAM authentication
Username string
// Password for SASL/PLAIN or SASL/SCRAM authentication
Password string
GSSAPI struct {
AuthType int // 1. KRB5_USER_AUTH, 2. KRB5_KEYTAB_AUTH
KeyTabPath string
KerberosConfigPath string
ServiceName string
Username string
Password string
Realm string
DisablePAFXFAST bool
}
}
AssignInterval int
CalcLagInterval int
RebalanceByLags bool
}
KafkaConfig configuration parameters
type SchemaRegistryConfig ¶
type SchemaRegistryConfig struct {
URL string
}
SchemaRegistryConfig configuration parameters
type TaskConfig ¶
type TaskConfig struct {
Name string
Topic string
ConsumerGroup string
// Earliest set to true to consume the message from oldest position
Earliest bool
Parser string
// the csv cloum title if Parser is csv
CsvFormat []string
Delimiter string
TableName string
SeriesTableName string
// AutoSchema will auto fetch the schema from clickhouse
AutoSchema bool
ExcludeColumns []string
Dims []struct {
Name string
Type string
SourceName string
// Const is used to set column value to some constant from config.
Const string
} `json:"dims"`
// DynamicSchema will add columns present in message to clickhouse. Requires AutoSchema be true.
DynamicSchema struct {
Enable bool
NotNullable bool
MaxDims int // the upper limit of dynamic columns number, <=0 means math.MaxInt16. protecting dirty data attack
// A column is added for new key K if all following conditions are true:
// - K isn't in ExcludeColumns
// - number of existing columns doesn't reach MaxDims-1
// - WhiteList is empty, or K matchs WhiteList
// - BlackList is empty, or K doesn't match BlackList
WhiteList string // the regexp of white list
BlackList string // the regexp of black list
}
// additional fields to be appended to each input message, should be a valid json string
Fields string `json:"fields,omitempty"`
// PrometheusSchema expects each message is a Prometheus metric(timestamp, value, metric name and a list of labels).
PrometheusSchema bool
// fields match PromLabelsBlackList are not considered as labels. Requires PrometheusSchema be true.
PromLabelsBlackList string // the regexp of black list
// ShardingKey is the column name to which sharding against
ShardingKey string `json:"shardingKey,omitempty"`
// ShardingStripe take effect if the sharding key is numerical
ShardingStripe uint64 `json:"shardingStripe,omitempty"`
FlushInterval int `json:"flushInterval,omitempty"`
BufferSize int `json:"bufferSize,omitempty"`
TimeZone string `json:"timeZone"`
TimeUnit float64 `json:"timeUnit"`
}
TaskConfig parameters
Click to show internal directories.
Click to hide internal directories.