input

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 20, 2025 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Krb5KeytabAuth = 2
	CommitRetries  = 6
	RetryBackoff   = 5 * time.Second
)

Variables

This section is empty.

Functions

func GetFranzConfig

func GetFranzConfig(kfkCfg *config.KafkaConfig) (opts []kgo.Opt, err error)

func Leave added in v0.7.0

func Leave(consumerId string)

func NewConsumerPoller added in v0.7.0

func NewConsumerPoller(consumerId, consumerName string, client *kgo.Client)

func OnConsumerPoll added in v0.7.0

func OnConsumerPoll(consumerId string)

func Walk added in v0.7.0

func Walk(maxPollInterval int) string

Types

type Fetches added in v0.5.0

type Fetches struct {
	Fetch   *kgo.Fetches
	TraceId string
}

type KafkaFranz

type KafkaFranz struct {
	// contains filtered or unexported fields
}

KafkaFranz implements input.Inputer refers to examples/group_consuming/main.go

func NewKafkaFranz

func NewKafkaFranz() *KafkaFranz

NewKafkaFranz get instance of kafka reader

func (*KafkaFranz) CommitMessages

func (k *KafkaFranz) CommitMessages(msg *model.InputMessage) error

func (*KafkaFranz) Description

func (k *KafkaFranz) Description() string

Description of this kafka consumer, consumer group name

func (*KafkaFranz) Init

func (k *KafkaFranz) Init(cfg *config.Config, gCfg *config.GroupConfig, f chan Fetches, cleanupFn func()) (err error)

Init Initialise the kafka instance with configuration

func (*KafkaFranz) Run

func (k *KafkaFranz) Run()

kafka main loop

func (*KafkaFranz) Stop

func (k *KafkaFranz) Stop()

Stop kafka consumer and close all connections

type Poller added in v0.7.0

type Poller struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL