kafka

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 13, 2025 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrMissingConsumerBroker    = utils.Error("Missing Consumer broker address")
	ErrMissingConsumerTopic     = utils.Error("Missing Consumer Topic or Topic Group")
	ErrConsumerAlreadyConnected = utils.Error("Cannot change connection properties; already connected")
	DefaultTimeout              = time.Second * 30

	ErrMissingProducerBroker = utils.Error("Missing Producer broker address")
	ErrMissingProducerTopic  = utils.Error("Missing Producer Topic name")
	ErrProducerClosed        = utils.Error("Producer is already closed")
	ErrInvalidAuthType       = utils.Error("Invalid authentication type")
	ErrInvalidStartOffset    = utils.Error("Invalid start offset")
	ErrInvalidIsolationLevel = utils.Error("Invalid isolation level")

	ErrMissingAdminBroker = utils.Error("Missing Admin broker address")
	ErrNilConfig          = utils.Error("Config is nil")

	AuthTypeNone     = "none"
	AuthTypePlain    = "plain"
	AuthTypeScram256 = "scram256"
	AuthTypeScram512 = "scram512"
)
View Source
const (
	KafkaTopicKey     = "kafka_topic"
	KafkaPartitionKey = "kafka_partition"
	KafkaBrokerKey    = "kafka_broker"
	KafkaOffsetKey    = "kafka_offset"
	KafkaKeyKey       = "kafka_key"
	KafkaGroupKey     = "kafka_group"
)

KafkaLogContext provides context keys for Kafka logging

Variables

This section is empty.

Functions

func AdminLogger added in v0.3.0

func AdminLogger(l *log.Logger, broker string) *log.Logger

AdminLogger creates a child logger with admin details

func ConsumerLogger added in v0.3.0

func ConsumerLogger(l *log.Logger, topic string, group string) *log.Logger

ConsumerLogger creates a child logger with consumer details

func LogMessageReceived added in v0.3.0

func LogMessageReceived(logger *log.Logger, msg kafka.Message, group string)

LogMessageReceived logs a message when a Kafka message is received

func LogMessageSent added in v0.3.0

func LogMessageSent(logger *log.Logger, msg kafka.Message)

LogMessageSent logs a message when a Kafka message is sent

func NewAdminLogger added in v0.3.0

func NewAdminLogger(broker string) *log.Logger

NewAdminLogger creates a new logger with Kafka admin information

func NewConsumerLogger added in v0.3.0

func NewConsumerLogger(topic string, group string) *log.Logger

NewConsumerLogger creates a new logger with Kafka consumer information

func NewProducerLogger added in v0.3.0

func NewProducerLogger(topic string) *log.Logger

NewProducerLogger creates a new logger with Kafka producer information

func ProducerLogger added in v0.3.0

func ProducerLogger(l *log.Logger, topic string) *log.Logger

ProducerLogger creates a child logger with producer details

Types

type Admin added in v0.3.0

type Admin struct {
	Conn   *kafka.Conn
	Logger *log.Logger
	// contains filtered or unexported fields
}

func NewAdmin

func NewAdmin(cfg *AdminConfig, logger *log.Logger) (*Admin, error)

func (*Admin) Connect added in v0.3.0

func (c *Admin) Connect(ctx context.Context) error

func (*Admin) CreateTopic added in v0.3.0

func (c *Admin) CreateTopic(ctx context.Context, topic string, numPartitions int, replicationFactor int) error

CreateTopic create a new Topic

func (*Admin) DeleteTopic added in v0.3.0

func (c *Admin) DeleteTopic(ctx context.Context, topic string) error

DeleteTopic removes a Topic

func (*Admin) Disconnect added in v0.3.0

func (c *Admin) Disconnect()

func (*Admin) GetTopics added in v0.3.0

func (c *Admin) GetTopics(ctx context.Context, topics ...string) ([]kafka.Partition, error)

func (*Admin) IsConnected added in v0.3.0

func (c *Admin) IsConnected() bool

func (*Admin) ListTopics added in v0.3.0

func (c *Admin) ListTopics(ctx context.Context) ([]string, error)

ListTopics list existing kafka topics

func (*Admin) TopicExists added in v0.3.0

func (c *Admin) TopicExists(ctx context.Context, topic string) (bool, error)

TopicExists returns true if Topic exists

type AdminConfig

type AdminConfig struct {
	Brokers  string `json:"brokers"`
	AuthType string `json:"authType"`
	Username string `json:"username"`
	secure.DefaultCredentialConfig
	tlsProvider.ClientConfig
}

func (AdminConfig) Validate

func (c AdminConfig) Validate() error

type Consumer added in v0.3.0

type Consumer struct {
	Brokers string
	Group   string
	Topic   string

	Reader *kafka.Reader
	Logger *log.Logger
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(cfg *ConsumerConfig, logger *log.Logger) (*Consumer, error)

func (*Consumer) ChannelSubscribe added in v0.3.0

func (c *Consumer) ChannelSubscribe(ctx context.Context, ch chan Message) error

ChannelSubscribe subscribes to a reader handler by channel Note: This function is blocking

func (*Consumer) Connect added in v0.3.0

func (c *Consumer) Connect()

Connect to Kafka broker

func (*Consumer) Disconnect added in v0.3.0

func (c *Consumer) Disconnect()

Disconnect Diconnect from kafka

func (*Consumer) GetConfig added in v0.3.0

func (c *Consumer) GetConfig() *kafka.ReaderConfig

GetConfig Get initial config object Useful to set other options before connect

func (*Consumer) IsConnected added in v0.3.0

func (c *Consumer) IsConnected() bool

IsConnected Returns true if Reader was initialized

func (*Consumer) ReadMessage added in v0.3.0

func (c *Consumer) ReadMessage(ctx context.Context) (Message, error)

ReadMessage reads a single message from Kafka It returns the Kafka message and an error If there is no message available, it will block until a message is available If an error occurs, it will be returned

func (*Consumer) Rewind added in v0.3.0

func (c *Consumer) Rewind() error

Rewind Read messages from the beginning

func (*Consumer) Subscribe added in v0.3.0

func (c *Consumer) Subscribe(ctx context.Context, handler ConsumerFunc) error

Subscribe consumes a message from a topic using a handler Note: this function is blocking

func (*Consumer) SubscribeWithOffsets added in v0.3.0

func (c *Consumer) SubscribeWithOffsets(ctx context.Context, handler ConsumerFunc) error

SubscribeWithOffsets manages a reader handler that explicitly commits offsets Note: this function is blocking

type ConsumerConfig

type ConsumerConfig struct {
	Brokers                        string `json:"brokers"`
	Topic                          string `json:"topic"`    // Topic to consume from, if not specified will use GroupTopics
	Group                          string `json:"group"`    // Group consumer group, if not specified will use specified partition
	AuthType                       string `json:"authType"` // AuthType to use, one of "none", "plain", "scram256", "scram512"
	Username                       string `json:"username"` // Username optional username
	secure.DefaultCredentialConfig        // optional password
	tlsProvider.ClientConfig
	ConsumerOptions
}

func (ConsumerConfig) ApplyOptions added in v0.3.0

func (c ConsumerConfig) ApplyOptions(r *kafka.ReaderConfig)

ApplyOptions set ReaderConfig additional parameters

func (ConsumerConfig) Validate

func (c ConsumerConfig) Validate() error

Validate validates ConsumerConfig fields

type ConsumerFunc

type ConsumerFunc func(ctx context.Context, message Message) error

ConsumerFunc Reader handler type

type ConsumerOptions added in v0.3.0

type ConsumerOptions struct {
	GroupTopics            []string `json:"groupTopics"`            // GroupTopics if specified, topics to consume as a group instead of Topic
	Partition              uint     `json:"partition"`              // Partition id, used if no Group specified, defaults to 0
	QueueCapacity          uint     `json:"queueCapacity"`          // QueueCapacity, defaults to 100
	MinBytes               uint     `json:"minBytes"`               // MinBytes, defaults to 0
	MaxBytes               uint     `json:"maxBytes"`               // MaxBytes, defaults to 1048576
	MaxWait                uint     `json:"maxWait"`                // MaxWait in milliseconds, default 10.000 (10s)
	ReadBatchTimeout       uint     `json:"readBatchTimeout"`       // ReadBatchTimeout in milliseconds, default 10.000 (10s)
	HeartbeatInterval      uint     `json:"heartbeatInterval"`      // HeartbeatInterval in milliseconds, default 3000 (3s)
	CommitInterval         uint     `json:"commitInterval"`         // CommitInterval in milliseconds, default 0
	PartitionWatchInterval uint     `json:"partitionWatchInterval"` // PartitionWatchInterval in milliseconds, default 5000 (5s)
	WatchPartitionChanges  bool     `json:"watchPartitionChanges"`  // WatchPartitionChanges, default true
	SessionTimeout         uint     `json:"sessionTimeout"`         // SessionTimeout in milliseconds, default 30.000 (30s)
	RebalanceTimeout       uint     `json:"rebalanceTimeout"`       // RebalanceTimeout in milliseconds, default 30.000 (30s)
	JoinGroupBackoff       uint     `json:"joinGroupBackoff"`       // JoinGroupBackoff in milliseconds, default 5000 (5s)
	RetentionTime          uint     `json:"retentionTime"`          // RetentionTime, in milliseconds, default 86.400.000ms (24h)
	StartOffset            string   `json:"startOffset"`            // StartOffset either "first", "last", default "last"
	ReadBackoffMin         uint     `json:"readBackoffMin"`         // ReadBackoffMin in milliseconds, default 100
	ReadBackoffMax         uint     `json:"readBackoffMax"`         // ReadBackoffMax in milliseconds, default 1000 (1s)
	MaxAttempts            uint     `json:"maxAttempts"`            // MaxAttempts default 3
	IsolationLevel         string   `json:"isolationLevel"`         // IsolationLevel "uncommitted" or "committed", default "committed"
}

ConsumerOptions additional consumer options

type Message added in v0.1.1

type Message = kafka.Message

Message is a type alias to avoid using kafka-go in application code

type Producer added in v0.3.0

type Producer struct {
	Brokers string
	Topic   string
	Writer  *kafka.Writer
	Logger  *log.Logger
}

func NewProducer

func NewProducer(cfg *ProducerConfig, logger *log.Logger) (*Producer, error)

func (*Producer) Disconnect added in v0.3.0

func (p *Producer) Disconnect()

Disconnect disconnects from the Writer

func (*Producer) IsConnected added in v0.3.0

func (p *Producer) IsConnected() bool

IsConnected returns ture if Writer is connected

func (*Producer) Write added in v0.3.0

func (p *Producer) Write(ctx context.Context, value []byte) error

Write writes a single message to topic

func (*Producer) WriteJson added in v0.3.0

func (p *Producer) WriteJson(ctx context.Context, data interface{}, key ...[]byte) error

WriteJson Write a struct to a Topic as a json message

func (*Producer) WriteMulti added in v0.3.0

func (p *Producer) WriteMulti(ctx context.Context, values ...[]byte) error

WriteMulti Write multiple messages to Topic

func (*Producer) WriteMultiJson added in v0.3.0

func (p *Producer) WriteMultiJson(ctx context.Context, values ...interface{}) error

WriteMultiJson Write a slice of structs to a Topic as a json message

func (*Producer) WriteWithHeaders added in v0.3.0

func (p *Producer) WriteWithHeaders(ctx context.Context, value []byte, key []byte, headers []kafka.Header) error

WriteWithHeaders writes a single message to topic, with headers

func (*Producer) WriteWithKey added in v0.3.0

func (p *Producer) WriteWithKey(ctx context.Context, value []byte, key []byte) error

WriteWithKey writes a message with key

type ProducerConfig

type ProducerConfig struct {
	Brokers  string `json:"brokers"`
	Topic    string `json:"topic"`
	AuthType string `json:"authType"`
	Username string `json:"username"`
	secure.DefaultCredentialConfig
	tlsProvider.ClientConfig
	ProducerOptions
}

func (ProducerConfig) Validate

func (c ProducerConfig) Validate() error

type ProducerOptions added in v0.3.0

type ProducerOptions struct {
	MaxAttempts     uint   `json:"maxAttempts"`
	WriteBackoffMin uint   `json:"writeBackoffMin"` // WriteBackoffMin value in milliseconds, defaults to 100
	WriteBackoffMax uint   `json:"writeBackoffMax"` // WriteBackoffMax value in milliseconds, defaults to 1000
	BatchSize       uint   `json:"batchSize"`       // BatchSize, defaults to 100
	BatchBytes      uint64 `json:"batchBytes"`      // BatchBytes, defaults to 1048576
	BatchTimeout    uint   `json:"batchTimeout"`    // BatchTimeout value in milliseconds, defaults to 1000
	ReadTimeout     uint   `json:"readTimeout"`     // ReadTimeout value in milliseconds, defaults to 10.000
	WriteTimeout    uint   `json:"writeTimeout"`    // WriteTimeout value in milliseconds, defaults to 10.000
	RequiredAcks    string `json:"requiredAcks"`    // RequiredAcks one of "none", "one", "all", default "none"
	Async           bool   `json:"async"`           // Async, default false
}

ProducerOptions additional producer options

func (ProducerOptions) ApplyOptions added in v0.3.0

func (p ProducerOptions) ApplyOptions(w *kafka.Writer)

ApplyOptions sets additional Writer parameters

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL