kafka

package
v1.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2025 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitKafkaProducer

func InitKafkaProducer(brokerServer []string, topic string, async bool) *kafka.Writer

func TestConsume

func TestConsume(brokerList []string, topic string)

Types

type Config

type Config struct {
	Adders   []string
	Username string
	Password string
	ClientId string
}

func NewConfig

func NewConfig(adders []string, clientId string, username string, password string) Config

type DefaultKafkaConnection

type DefaultKafkaConnection struct {
	// contains filtered or unexported fields
}

func (*DefaultKafkaConnection) Close

func (d *DefaultKafkaConnection) Close()

func (*DefaultKafkaConnection) GetClient

func (d *DefaultKafkaConnection) GetClient() sarama.Client

func (*DefaultKafkaConnection) GetConfig

func (d *DefaultKafkaConnection) GetConfig() *sarama.Config

func (*DefaultKafkaConnection) GetIsConnected

func (d *DefaultKafkaConnection) GetIsConnected() bool

func (*DefaultKafkaConnection) GetProducer

func (d *DefaultKafkaConnection) GetProducer() sarama.AsyncProducer

func (*DefaultKafkaConnection) TryConnect

func (d *DefaultKafkaConnection) TryConnect() bool

type IDefaultKafkaConnection

type IDefaultKafkaConnection interface {
	GetClient() sarama.Client
	GetConfig() *sarama.Config
	GetProducer() sarama.AsyncProducer
	GetIsConnected() bool
	TryConnect() bool
	Close()
}
var DefaultClient IDefaultKafkaConnection

func NewConnectionFactory

func NewConnectionFactory(conf Config) IDefaultKafkaConnection

type KafkaManager

type KafkaManager struct {
	Producer sarama.SyncProducer
	Consumer sarama.Consumer
}

KafkaManager holds the Kafka producer and consumer instances.

func NewKafkaManager

func NewKafkaManager(brokerList []string, topic string) (*KafkaManager, error)

NewKafkaManager creates a new KafkaManager instance.

func (*KafkaManager) Close

func (km *KafkaManager) Close()

Close closes the Kafka producer and consumer.

func (*KafkaManager) Consume

func (km *KafkaManager) Consume(ctx context.Context, topic string) error

Consume reads messages from Kafka.

func (*KafkaManager) Produce

func (km *KafkaManager) Produce(ctx context.Context, key string, value interface{}, topic string) error

Produce sends a message to Kafka.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL