Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AdminAuthOpts ¶
type AdminClient ¶
type AdminClient interface {
CreateUser(username, password string) error
DeleteUser(username string) error
UserExists(username string) (bool, error)
CreateTopic(topicName string, partitionCount int) error
DeleteTopic(topicName string) error
TopicExists(topicName string) (bool, error)
AllowUserOnTopics(username string, allowedOperations string, topicNames ...string) error
}
func NewAdminClient ¶
func NewAdminClient(adminEndpoint string, kafkaBrokers string, opts *AdminAuthOpts) AdminClient
type Consumer ¶
type Consumer interface {
Ping(ctx context.Context) error
Close()
StartConsuming(reader ReaderFunc)
}
func NewConsumer ¶
type ConsumerError ¶
type ConsumerOpts ¶
type ConsumerOpts struct {
SASLAuth *KafkaSASLAuth
Logger logging.Logger
MaxRetries *int
MaxPollRecords *int
}
type KafkaMessage ¶
type KafkaSASLAuth ¶
type KafkaSASLAuth struct {
SASLMechanism SASLMechanism
User string
Password string
}
type Producer ¶
type Producer interface {
Ping(ctx context.Context) error
Close()
Produce(ctx context.Context, topic, key string, value []byte) (*ProducerOutput, error)
}
func NewProducer ¶
func NewProducer(brokerHosts string, producerOpts ProducerOpts) (Producer, error)
type ProducerOpts ¶
type ProducerOpts struct {
SASLAuth *KafkaSASLAuth
Logger logging.Logger
}
type ProducerOutput ¶
type ReaderFunc ¶
type ReaderFunc func(msg KafkaMessage) error
type SASLMechanism ¶
type SASLMechanism string
const ( ScramSHA256 SASLMechanism = "SCRAM-SHA-256" ScramSHA512 SASLMechanism = "SCRAM-SHA-512" )
Click to show internal directories.
Click to hide internal directories.