kafka

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2024 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncProducer

type AsyncProducer interface {
	Produce(topic string, message []byte, opts ...MessageOption)
	ProduceWithContext(ctx context.Context, topic string, message []byte, opts ...MessageOption)
	Close() error
}

func NewAsyncProducer

func NewAsyncProducer(cfg *Config, opts ...AsyncProducerOption) (AsyncProducer, error)

type AsyncProducerOption

type AsyncProducerOption func(*sarama.Config)

type Config

type Config struct {
	ClientID string
	Brokers  []string
	Version  sarama.KafkaVersion
}

func NewConfig

func NewConfig(brokers []string, opts ...ConfigOption) *Config

type ConfigOption

type ConfigOption func(*Config)

func WithClientID

func WithClientID(clientID string) ConfigOption

func WithVersion

func WithVersion(version sarama.KafkaVersion) ConfigOption

type ConsumerGroup

type ConsumerGroup interface {
	Consume(ctx context.Context, topic []string, handler Handler) error
	Close() error
}

func NewConsumerGroup

func NewConsumerGroup(cfg *Config, groupID string, opts ...ConsumerGroupOption) (ConsumerGroup, error)

type ConsumerGroupOption

type ConsumerGroupOption func(*sarama.Config)

func WithNewestOffset

func WithNewestOffset() ConsumerGroupOption

func WithOldestOffset

func WithOldestOffset() ConsumerGroupOption

func WithRebalanceStrategy

func WithRebalanceStrategy(strategy sarama.BalanceStrategy) ConsumerGroupOption

type Handler

type Handler func(message *sarama.ConsumerMessage) error

type MessageOption

type MessageOption func(*sarama.ProducerMessage)

func WithHeader

func WithHeader(key []byte, value []byte) MessageOption

WithHeader append a header to the message.

func WithHeaders

func WithHeaders(headers []sarama.RecordHeader) MessageOption

WithHeaders sets the headers of the message.

func WithKey

func WithKey(key []byte) MessageOption

WithKey sets the key of the message. The key is used to determine the partition of the message using hash partitioning.

func WithPartition

func WithPartition(partition int32) MessageOption

WithPartition sets the partition of the message.

func WithTimestamp

func WithTimestamp(timestamp int64) MessageOption

WithTimestamp sets the timestamp of the message. The timestamp is the number of milliseconds since the Unix epoch.

type SaramaOption

type SaramaOption func(*sarama.Config)

type SyncProducer

type SyncProducer interface {
	Produce(topic string, message []byte, opts ...MessageOption) error
	ProduceWithContext(ctx context.Context, topic string, message []byte, opts ...MessageOption) error
	Close() error
}

func NewSyncProducer

func NewSyncProducer(cfg *Config, opts ...SyncProducerOption) (SyncProducer, error)

type SyncProducerOption

type SyncProducerOption func(*sarama.Config)

func WithRequiredAcks

func WithRequiredAcks(acks sarama.RequiredAcks) SyncProducerOption

Directories

Path Synopsis
example
async command
consumer-group command
sync command
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL