kafka

package
v1.4.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 22, 2025 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultSessionTimeout = 10 * time.Second
	DefaultGroupPrefix    = "xmsgbus"
)

Variables

This section is empty.

Functions

func ExampleUsage

func ExampleUsage()

ExampleUsage demonstrates how to use SASL authentication with Kafka

func NewMsgBus

func NewMsgBus(options ...IMsgBusOption) (xmsgbus.IMsgBus, error)

func NewStorage

func NewStorage(brokers []string, options ...IMsgBusOption) (xmsgbus.ISharedStorage, error)

func ProductionConfig

func ProductionConfig() xmsgbus.IMsgBus

Production configuration example

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

type IMsgBusOption

type IMsgBusOption interface {
	// contains filtered or unexported methods
}

type MsgBus

type MsgBus struct {
	// contains filtered or unexported fields
}

func (*MsgBus) AddChannel

func (x *MsgBus) AddChannel(ctx context.Context, topic string, channel string) error

func (*MsgBus) Close

func (x *MsgBus) Close() error

func (*MsgBus) ListChannel

func (x *MsgBus) ListChannel(ctx context.Context, topic string) ([]string, error)

func (*MsgBus) Pop

func (x *MsgBus) Pop(ctx context.Context, topic, channel string, blockTimeout time.Duration) ([]byte, func(), error)

func (*MsgBus) Push

func (x *MsgBus) Push(ctx context.Context, topic string, bs []byte) error

func (*MsgBus) RemoveChannel

func (x *MsgBus) RemoveChannel(ctx context.Context, topic string, channel string) error

type MsgBusOptionFunc

type MsgBusOptionFunc func(*msgBusOptions)

func WithBrokers

func WithBrokers(brokers []string) MsgBusOptionFunc

func WithGroupPrefix

func WithGroupPrefix(prefix string) MsgBusOptionFunc

func WithInsecureTLS

func WithInsecureTLS() MsgBusOptionFunc

WithInsecureTLS enables TLS with insecure verification (for testing)

func WithSASLPlainAuth

func WithSASLPlainAuth(username, password string) MsgBusOptionFunc

WithSASLPlainAuth configures SASL/PLAIN authentication

func WithSASLSCRAMSHA256Auth

func WithSASLSCRAMSHA256Auth(username, password string) MsgBusOptionFunc

WithSASLSCRAMSHA256Auth configures SASL/SCRAM-SHA256 authentication

func WithSASLSCRAMSHA512Auth

func WithSASLSCRAMSHA512Auth(username, password string) MsgBusOptionFunc

WithSASLSCRAMSHA512Auth configures SASL/SCRAM-SHA512 authentication

func WithSaramaConfig

func WithSaramaConfig(config *sarama.Config) MsgBusOptionFunc

func WithSessionTimeout

func WithSessionTimeout(timeout time.Duration) MsgBusOptionFunc

func WithTLS

func WithTLS(tlsConfig *tls.Config) MsgBusOptionFunc

WithTLS enables TLS encryption

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

func (*Storage) Close

func (s *Storage) Close() error

func (*Storage) Del

func (s *Storage) Del(ctx context.Context, key string) error

func (*Storage) Keys

func (s *Storage) Keys(ctx context.Context, prefix string) ([]string, error)

func (*Storage) SetEx

func (s *Storage) SetEx(ctx context.Context, key string, value interface{}, ttl time.Duration) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL