Documentation
¶
Index ¶
- type ConsumerGroup
- type ConsumerGroupCreator
- type ConsumerGroupHandler
- func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error
- func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) error
- func (h *ConsumerGroupHandler) Reserve(ctx context.Context) error
- func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error
- type KafkaConsumer
- type Message
- type SaramaCreator
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumerGroup ¶ added in v1.14.0
type ConsumerGroupCreator ¶ added in v1.14.0
type ConsumerGroupHandler ¶ added in v1.14.0
type ConsumerGroupHandler struct {
MaxMessageLen int
TopicTag string
MsgHeadersToTags map[string]bool
MsgHeaderToMetricName string
// contains filtered or unexported fields
}
ConsumerGroupHandler is a sarama.ConsumerGroupHandler implementation.
func NewConsumerGroupHandler ¶ added in v1.14.0
func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, parser telegraf.Parser, log telegraf.Logger) *ConsumerGroupHandler
func (*ConsumerGroupHandler) Cleanup ¶ added in v1.14.0
func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error
Cleanup stops the internal goroutine and is called after all ConsumeClaim functions have completed.
func (*ConsumerGroupHandler) ConsumeClaim ¶ added in v1.14.0
func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim is called once each claim in a goroutine and must be thread-safe. Should run until the claim is closed.
func (*ConsumerGroupHandler) Handle ¶ added in v1.14.0
func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) error
Handle processes a message and if successful saves it to be acknowledged after delivery.
func (*ConsumerGroupHandler) Reserve ¶ added in v1.14.0
func (h *ConsumerGroupHandler) Reserve(ctx context.Context) error
Reserve blocks until there is an available slot for a new message.
func (*ConsumerGroupHandler) Setup ¶ added in v1.14.0
func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error
Setup is called once when a new session is opened. It setups up the handler and begins processing delivered messages.
type KafkaConsumer ¶ added in v1.14.0
type KafkaConsumer struct {
Brokers []string `toml:"brokers"`
Version string `toml:"kafka_version"`
ConsumerGroup string `toml:"consumer_group"`
MaxMessageLen int `toml:"max_message_len"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
MaxProcessingTime config.Duration `toml:"max_processing_time"`
Offset string `toml:"offset"`
BalanceStrategy string `toml:"balance_strategy"`
Topics []string `toml:"topics"`
TopicRegexps []string `toml:"topic_regexps"`
TopicTag string `toml:"topic_tag"`
MsgHeadersAsTags []string `toml:"msg_headers_as_tags"`
MsgHeaderAsMetricName string `toml:"msg_header_as_metric_name"`
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
ConnectionStrategy string `toml:"connection_strategy"`
ResolveCanonicalBootstrapServersOnly bool `toml:"resolve_canonical_bootstrap_servers_only"`
kafka.ReadConfig
kafka.Logger
Log telegraf.Logger `toml:"-"`
ConsumerCreator ConsumerGroupCreator `toml:"-"`
// contains filtered or unexported fields
}
func (*KafkaConsumer) Gather ¶ added in v1.14.0
func (k *KafkaConsumer) Gather(_ telegraf.Accumulator) error
func (*KafkaConsumer) Init ¶ added in v1.14.0
func (k *KafkaConsumer) Init() error
func (*KafkaConsumer) SampleConfig ¶ added in v1.14.0
func (*KafkaConsumer) SampleConfig() string
func (*KafkaConsumer) SetParser ¶ added in v1.14.0
func (k *KafkaConsumer) SetParser(parser telegraf.Parser)
func (*KafkaConsumer) Start ¶ added in v1.14.0
func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error
func (*KafkaConsumer) Stop ¶ added in v1.14.0
func (k *KafkaConsumer) Stop()
type Message ¶ added in v1.14.0
type Message struct {
// contains filtered or unexported fields
}
Message is an aggregate type binding the Kafka message and the session so that offsets can be updated.
type SaramaCreator ¶ added in v1.14.0
type SaramaCreator struct{}
func (*SaramaCreator) Create ¶ added in v1.14.0
func (*SaramaCreator) Create(brokers []string, group string, cfg *sarama.Config) (ConsumerGroup, error)