Documentation
¶
Index ¶
- func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig)
- func NewConsumerGroup(c *Config, l *zap.SugaredLogger) sarama.ConsumerGroup
- type Config
- type OffsetType
- type Plugin
- func (p *Plugin) Cleanup(sarama.ConsumerGroupSession) error
- func (p *Plugin) Commit(event *pipeline.Event)
- func (p *Plugin) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (p *Plugin) PassEvent(_ *pipeline.Event) bool
- func (p *Plugin) Setup(session sarama.ConsumerGroupSession) error
- func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginParams)
- func (p *Plugin) Stop()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewConsumerGroup ¶ added in v0.19.2
func NewConsumerGroup(c *Config, l *zap.SugaredLogger) sarama.ConsumerGroup
Types ¶
type Config ¶
type Config struct {
// > @3@4@5@6
// >
// > The name of kafka brokers to read from.
Brokers []string `json:"brokers" required:"true"` // *
// > @3@4@5@6
// >
// > The list of kafka topics to read from.
Topics []string `json:"topics" required:"true"` // *
// > @3@4@5@6
// >
// > The name of consumer group to use.
ConsumerGroup string `json:"consumer_group" default:"file-d"` // *
// > @3@4@5@6
// >
// > Kafka client ID.
ClientID string `json:"client_id" default:"file-d"` // *
// > @3@4@5@6
// >
// > The number of unprocessed messages in the buffer that are loaded in the background from kafka.
ChannelBufferSize int `json:"channel_buffer_size" default:"256"` // *
// > @3@4@5@6
// >
// > The newest and oldest values is used when a consumer starts but there is no committed offset for the assigned partition.
// > * *`newest`* - set offset to the newest message
// > * *`oldest`* - set offset to the oldest message
Offset string `json:"offset" default:"newest" options:"newest|oldest"` // *
Offset_ OffsetType
// > @3@4@5@6
// >
// > The maximum amount of time the consumer expects a message takes to process for the user.
ConsumerMaxProcessingTime cfg.Duration `json:"consumer_max_processing_time" default:"200ms" parse:"duration"` // *
ConsumerMaxProcessingTime_ time.Duration
// > @3@4@5@6
// >
// > The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it returns fewer than that anyways.
ConsumerMaxWaitTime cfg.Duration `json:"consumer_max_wait_time" default:"250ms" parse:"duration"` // *
ConsumerMaxWaitTime_ time.Duration
// > @3@4@5@6
// >
// > If set, the plugin will use SASL authentications mechanism.
SaslEnabled bool `json:"is_sasl_enabled" default:"false"` // *
// > @3@4@5@6
// >
// > SASL mechanism to use.
SaslMechanism string `json:"sasl_mechanism" default:"SCRAM-SHA-512" options:"PLAIN|SCRAM-SHA-256|SCRAM-SHA-512"` // *
// > @3@4@5@6
// >
// > SASL username.
SaslUsername string `json:"sasl_username" default:"user"` // *
// > @3@4@5@6
// >
// > SASL password.
SaslPassword string `json:"sasl_password" default:"password"` // *
// > @3@4@5@6
// >
// > If set, the plugin will use SSL/TLS connections method.
SslEnabled bool `json:"is_ssl_enabled" default:"false"` // *
// > @3@4@5@6
// >
// > If set, the plugin will skip SSL/TLS verification.
SslSkipVerify bool `json:"ssl_skip_verify" default:"false"` // *
// > @3@4@5@6
// >
// > Path or content of a PEM-encoded client certificate file.
ClientCert string `json:"client_cert"` // *
// > @3@4@5@6
// >
// > > Path or content of a PEM-encoded client key file.
ClientKey string `json:"client_key"` // *
// > @3@4@5@6
// >
// > Path or content of a PEM-encoded CA file.
CACert string `json:"ca_cert"` // *
}
! config-params ^ config-params
type OffsetType ¶ added in v0.19.2
type OffsetType byte
const ( OffsetTypeNewest OffsetType = iota OffsetTypeOldest )
type Plugin ¶
type Plugin struct {
// contains filtered or unexported fields
}
func (*Plugin) ConsumeClaim ¶
func (p *Plugin) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
Click to show internal directories.
Click to hide internal directories.