Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Config ¶
type Config struct {
// > @3@4@5@6
// >
// > The name of kafka brokers to read from.
Brokers []string `json:"brokers" required:"true"` // *
// > @3@4@5@6
// >
// > The list of kafka topics to read from.
Topics []string `json:"topics" required:"true"` // *
// > @3@4@5@6
// >
// > The name of consumer group to use.
ConsumerGroup string `json:"consumer_group" default:"file-d"` // *
// > @3@4@5@6
// >
// > Kafka client ID.
ClientID string `json:"client_id" default:"file-d"` // *
// > @3@4@5@6
// >
// > The number of unprocessed messages in the buffer that are loaded in the background from kafka. (max.poll.records)
ChannelBufferSize int `json:"channel_buffer_size" default:"256"` // *
// > @3@4@5@6
// > MaxConcurrentConsumers sets the maximum number of consumers
// > Optimal value: number of topics * number of partitions of topic
// >
MaxConcurrentConsumers int `json:"max_concurrent_consumers" default:"5"` // *
// > @3@4@5@6
// >
// > MaxConcurrentFetches sets the maximum number of fetch requests to allow in
// > flight or buffered at once, overriding the unbounded (i.e. number of
// > brokers) default.
MaxConcurrentFetches int `json:"max_concurrent_fetches" default:"0"` // *
// > @3@4@5@6
// >
// > FetchMaxBytes (fetch.max.bytes) sets the maximum amount of bytes a broker will try to send during a fetch
FetchMaxBytes cfg.Expression `json:"fetch_max_bytes" default:"52428800" parse:"expression"` // *
FetchMaxBytes_ int32
// > @3@4@5@6
// >
// > FetchMinBytes (fetch.min.bytes) sets the minimum amount of bytes a broker will try to send during a fetch
FetchMinBytes cfg.Expression `json:"fetch_min_bytes" default:"1" parse:"expression"` // *
FetchMinBytes_ int32
// > @3@4@5@6
// >
// > The newest and oldest values is used when a consumer starts but there is no committed offset for the assigned partition.
// > * *`newest`* - set offset to the newest message
// > * *`oldest`* - set offset to the oldest message
Offset string `json:"offset" default:"newest" options:"newest|oldest"` // *
Offset_ OffsetType
// > @3@4@5@6
// >
// > Algorithm used by Kafka to assign partitions to consumers in a group.
// > * *`round-robin`* - M0: [t0p0, t0p2, t1p1], M1: [t0p1, t1p0, t1p2]
// > * *`range`* - M0: [t0p0, t0p1, t1p0, t1p1], M1: [t0p2, t1p2]
// > * *`sticky`* - ensures minimal partition movement on group changes while also ensuring optimal balancing
// > * *`cooperative-sticky`* - performs the sticky balancing strategy, but additionally opts the consumer group into "cooperative" rebalancing
Balancer string `json:"balancer" default:"round-robin" options:"round-robin|range|sticky|cooperative-sticky"` // *
// > @3@4@5@6
// >
// > The maximum amount of time the consumer expects a message takes to process for the user. (Not used anymore!)
ConsumerMaxProcessingTime cfg.Duration `json:"consumer_max_processing_time" default:"200ms" parse:"duration"` // *
ConsumerMaxProcessingTime_ time.Duration
// > @3@4@5@6
// >
// > The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it returns fewer than that anyways. (fetch.max.wait.ms)
ConsumerMaxWaitTime cfg.Duration `json:"consumer_max_wait_time" default:"250ms" parse:"duration"` // *
ConsumerMaxWaitTime_ time.Duration
// > @3@4@5@6
// >
// > AutoCommitInterval sets how long to go between autocommits
AutoCommitInterval cfg.Duration `json:"auto_commit_interval" default:"1s" parse:"duration"` // *
AutoCommitInterval_ time.Duration
// > @3@4@5@6
// >
// > SessionTimeout sets how long a member in the group can go between heartbeats
SessionTimeout cfg.Duration `json:"session_timeout" default:"10s" parse:"duration"` // *
SessionTimeout_ time.Duration
// > @3@4@5@6
// >
// > HeartbeatInterval sets how long a group member goes between heartbeats to Kafka
HeartbeatInterval cfg.Duration `json:"heartbeat_interval" default:"3s" parse:"duration"` // *
HeartbeatInterval_ time.Duration
// > @3@4@5@6
// >
// > If set, the plugin will use SASL authentications mechanism.
SaslEnabled bool `json:"is_sasl_enabled" default:"false"` // *
// > @3@4@5@6
// >
// > SASL mechanism to use.
SaslMechanism string `json:"sasl_mechanism" default:"SCRAM-SHA-512" options:"PLAIN|SCRAM-SHA-256|SCRAM-SHA-512|AWS_MSK_IAM"` // *
// > @3@4@5@6
// >
// > SASL username.
SaslUsername string `json:"sasl_username" default:"user"` // *
// > @3@4@5@6
// >
// > SASL password.
SaslPassword string `json:"sasl_password" default:"password"` // *
// > @3@4@5@6
// >
// > If set, the plugin will use SSL/TLS connections method.
SslEnabled bool `json:"is_ssl_enabled" default:"false"` // *
// > @3@4@5@6
// >
// > If set, the plugin will skip SSL/TLS verification.
SslSkipVerify bool `json:"ssl_skip_verify" default:"false"` // *
// > @3@4@5@6
// >
// > Path or content of a PEM-encoded client certificate file.
ClientCert string `json:"client_cert"` // *
// > @3@4@5@6
// >
// > > Path or content of a PEM-encoded client key file.
ClientKey string `json:"client_key"` // *
// > @3@4@5@6
// >
// > Path or content of a PEM-encoded CA file.
CACert string `json:"ca_cert"` // *
// > @3@4@5@6
// >
// > Meta params
// >
// > Add meta information to an event (look at Meta params)
// > Use [go-template](https://pkg.go.dev/text/template) syntax
// >
// > Example: “`topic: '{{ .topic }}'“`
Meta cfg.MetaTemplates `json:"meta"` // *
}
! config-params ^ config-params
func (*Config) GetBrokers ¶ added in v0.29.0
func (*Config) GetClientID ¶ added in v0.29.0
func (*Config) GetSaslConfig ¶ added in v0.29.0
func (c *Config) GetSaslConfig() cfg.KafkaClientSaslConfig
func (*Config) GetSslConfig ¶ added in v0.29.0
func (c *Config) GetSslConfig() cfg.KafkaClientSslConfig
func (*Config) IsSaslEnabled ¶ added in v0.29.0
func (*Config) IsSslEnabled ¶ added in v0.29.0
type OffsetType ¶ added in v0.19.2
type OffsetType byte
const ( OffsetTypeNewest OffsetType = iota OffsetTypeOldest )
Click to show internal directories.
Click to hide internal directories.