kafka

package
v0.1.1-beta.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2025 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CommonOpts

func CommonOpts(cfg CommonConfig) ([]kgo.Opt, error)

Types

type CommonConfig

type CommonConfig struct {
	Brokers  []string // broker 列表
	ClientID string   // 客户端 ID

	// TLS 配置
	TLS        bool
	CACert     string
	ClientCert string
	ClientKey  string

	// SASL 配置
	SASLMechanism string
	SASLUser      string
	SASLPass      string

	EnableTrace bool
}

type ConsumeManager

type ConsumeManager struct {
	CallBack map[string]Handler
	// contains filtered or unexported fields
}

func NewConsumeManager

func NewConsumeManager(cfg ConsumerConfig, callback map[string]Handler) (*ConsumeManager, error)

func (*ConsumeManager) CreateTopics

func (s *ConsumeManager) CreateTopics(ctx context.Context, topics []NewTopic) error

func (*ConsumeManager) Stop

func (s *ConsumeManager) Stop(ctx context.Context) error

type Consumer

type Consumer struct {
	Handler Handler
	// contains filtered or unexported fields
}

type ConsumerConfig

type ConsumerConfig struct {
	CommonConfig        `json:"CommonConfig"`
	Topic               []string
	GroupID             string
	AutoCommit          bool
	BlockRebalance      bool
	ConsumerChannelSize int
	DLQChannelSize      int
}

type Handler

type Handler interface {
	Handle(record kgo.FetchTopicPartition) error
}

type NewTopic

type NewTopic struct {
	Topic             string
	Partition         int32
	ReplicationFactor int16
	Configs           map[string]*string
}

type ProducerClient

type ProducerClient struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(cfg ProducerConfig) (*ProducerClient, error)

func (*ProducerClient) Produce

func (p *ProducerClient) Produce(ctx context.Context, r *kgo.Record) error

type ProducerConfig

type ProducerConfig struct {
	CommonConfig           `json:"CommonConfig"`
	Topic                  []string
	Compression            string
	RetryMax               int
	AllowAutoTopicCreation bool
}

type ProducerInterface

type ProducerInterface interface {
	Produce(ctx context.Context, r *kgo.Record, cb func(*kgo.Record, error))
	Close()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL