Documentation
¶
Index ¶
- Constants
- func AdminLogger(l *log.Logger, broker string) *log.Logger
- func ConsumerLogger(l *log.Logger, topic string, group string) *log.Logger
- func LogMessageReceived(logger *log.Logger, msg kafka.Message, group string)
- func LogMessageSent(logger *log.Logger, msg kafka.Message)
- func NewAdminLogger(broker string) *log.Logger
- func NewConsumerLogger(topic string, group string) *log.Logger
- func NewProducerLogger(topic string) *log.Logger
- func ProducerLogger(l *log.Logger, topic string) *log.Logger
- type Admin
- func (c *Admin) Connect(ctx context.Context) error
- func (c *Admin) CreateTopic(ctx context.Context, topic string, numPartitions int, replicationFactor int) error
- func (c *Admin) DeleteTopic(ctx context.Context, topic string) error
- func (c *Admin) Disconnect()
- func (c *Admin) GetTopics(ctx context.Context, topics ...string) ([]kafka.Partition, error)
- func (c *Admin) IsConnected() bool
- func (c *Admin) ListTopics(ctx context.Context) ([]string, error)
- func (c *Admin) TopicExists(ctx context.Context, topic string) (bool, error)
- type AdminConfig
- type Consumer
- func (c *Consumer) ChannelSubscribe(ctx context.Context, ch chan Message) error
- func (c *Consumer) Connect()
- func (c *Consumer) Disconnect()
- func (c *Consumer) GetConfig() *kafka.ReaderConfig
- func (c *Consumer) IsConnected() bool
- func (c *Consumer) ReadMessage(ctx context.Context) (Message, error)
- func (c *Consumer) Rewind() error
- func (c *Consumer) Subscribe(ctx context.Context, handler ConsumerFunc) error
- func (c *Consumer) SubscribeWithOffsets(ctx context.Context, handler ConsumerFunc) error
- type ConsumerConfig
- type ConsumerFunc
- type ConsumerOptions
- type Message
- type Producer
- func (p *Producer) Disconnect()
- func (p *Producer) IsConnected() bool
- func (p *Producer) Write(ctx context.Context, value []byte) error
- func (p *Producer) WriteJson(ctx context.Context, data interface{}, key ...[]byte) error
- func (p *Producer) WriteMulti(ctx context.Context, values ...[]byte) error
- func (p *Producer) WriteMultiJson(ctx context.Context, values ...interface{}) error
- func (p *Producer) WriteWithHeaders(ctx context.Context, value []byte, key []byte, headers []kafka.Header) error
- func (p *Producer) WriteWithKey(ctx context.Context, value []byte, key []byte) error
- type ProducerConfig
- type ProducerOptions
Constants ¶
const ( ErrMissingConsumerBroker = utils.Error("Missing Consumer broker address") ErrMissingConsumerTopic = utils.Error("Missing Consumer Topic or Topic Group") ErrConsumerAlreadyConnected = utils.Error("Cannot change connection properties; already connected") DefaultTimeout = time.Second * 30 ErrMissingProducerBroker = utils.Error("Missing Producer broker address") ErrMissingProducerTopic = utils.Error("Missing Producer Topic name") ErrProducerClosed = utils.Error("Producer is already closed") ErrInvalidAuthType = utils.Error("Invalid authentication type") ErrInvalidStartOffset = utils.Error("Invalid start offset") ErrInvalidIsolationLevel = utils.Error("Invalid isolation level") ErrMissingAdminBroker = utils.Error("Missing Admin broker address") ErrNilConfig = utils.Error("Config is nil") AuthTypeNone = "none" AuthTypePlain = "plain" AuthTypeScram256 = "scram256" AuthTypeScram512 = "scram512" )
const ( KafkaTopicKey = "kafka_topic" KafkaPartitionKey = "kafka_partition" KafkaBrokerKey = "kafka_broker" KafkaOffsetKey = "kafka_offset" KafkaKeyKey = "kafka_key" KafkaGroupKey = "kafka_group" )
KafkaLogContext provides context keys for Kafka logging
Variables ¶
This section is empty.
Functions ¶
func AdminLogger ¶ added in v0.3.0
AdminLogger creates a child logger with admin details
func ConsumerLogger ¶ added in v0.3.0
ConsumerLogger creates a child logger with consumer details
func LogMessageReceived ¶ added in v0.3.0
LogMessageReceived logs a message when a Kafka message is received
func LogMessageSent ¶ added in v0.3.0
LogMessageSent logs a message when a Kafka message is sent
func NewAdminLogger ¶ added in v0.3.0
NewAdminLogger creates a new logger with Kafka admin information
func NewConsumerLogger ¶ added in v0.3.0
NewConsumerLogger creates a new logger with Kafka consumer information
func NewProducerLogger ¶ added in v0.3.0
NewProducerLogger creates a new logger with Kafka producer information
Types ¶
type Admin ¶ added in v0.3.0
func (*Admin) CreateTopic ¶ added in v0.3.0
func (c *Admin) CreateTopic(ctx context.Context, topic string, numPartitions int, replicationFactor int) error
CreateTopic create a new Topic
func (*Admin) DeleteTopic ¶ added in v0.3.0
DeleteTopic removes a Topic
func (*Admin) Disconnect ¶ added in v0.3.0
func (c *Admin) Disconnect()
func (*Admin) IsConnected ¶ added in v0.3.0
func (*Admin) ListTopics ¶ added in v0.3.0
ListTopics list existing kafka topics
type AdminConfig ¶
type AdminConfig struct {
Brokers string `json:"brokers"`
AuthType string `json:"authType"`
Username string `json:"username"`
secure.DefaultCredentialConfig
tlsProvider.ClientConfig
}
func (AdminConfig) Validate ¶
func (c AdminConfig) Validate() error
type Consumer ¶ added in v0.3.0
type Consumer struct {
Brokers string
Group string
Topic string
Reader *kafka.Reader
Logger *log.Logger
// contains filtered or unexported fields
}
func NewConsumer ¶
func NewConsumer(cfg *ConsumerConfig, logger *log.Logger) (*Consumer, error)
func (*Consumer) ChannelSubscribe ¶ added in v0.3.0
ChannelSubscribe subscribes to a reader handler by channel Note: This function is blocking
func (*Consumer) Disconnect ¶ added in v0.3.0
func (c *Consumer) Disconnect()
Disconnect Diconnect from kafka
func (*Consumer) GetConfig ¶ added in v0.3.0
func (c *Consumer) GetConfig() *kafka.ReaderConfig
GetConfig Get initial config object Useful to set other options before connect
func (*Consumer) IsConnected ¶ added in v0.3.0
IsConnected Returns true if Reader was initialized
func (*Consumer) ReadMessage ¶ added in v0.3.0
ReadMessage reads a single message from Kafka It returns the Kafka message and an error If there is no message available, it will block until a message is available If an error occurs, it will be returned
func (*Consumer) Subscribe ¶ added in v0.3.0
func (c *Consumer) Subscribe(ctx context.Context, handler ConsumerFunc) error
Subscribe consumes a message from a topic using a handler Note: this function is blocking
func (*Consumer) SubscribeWithOffsets ¶ added in v0.3.0
func (c *Consumer) SubscribeWithOffsets(ctx context.Context, handler ConsumerFunc) error
SubscribeWithOffsets manages a reader handler that explicitly commits offsets Note: this function is blocking
type ConsumerConfig ¶
type ConsumerConfig struct {
Brokers string `json:"brokers"`
Topic string `json:"topic"` // Topic to consume from, if not specified will use GroupTopics
Group string `json:"group"` // Group consumer group, if not specified will use specified partition
AuthType string `json:"authType"` // AuthType to use, one of "none", "plain", "scram256", "scram512"
Username string `json:"username"` // Username optional username
secure.DefaultCredentialConfig // optional password
tlsProvider.ClientConfig
ConsumerOptions
}
func (ConsumerConfig) ApplyOptions ¶ added in v0.3.0
func (c ConsumerConfig) ApplyOptions(r *kafka.ReaderConfig)
ApplyOptions set ReaderConfig additional parameters
func (ConsumerConfig) Validate ¶
func (c ConsumerConfig) Validate() error
Validate validates ConsumerConfig fields
type ConsumerFunc ¶
ConsumerFunc Reader handler type
type ConsumerOptions ¶ added in v0.3.0
type ConsumerOptions struct {
GroupTopics []string `json:"groupTopics"` // GroupTopics if specified, topics to consume as a group instead of Topic
Partition uint `json:"partition"` // Partition id, used if no Group specified, defaults to 0
QueueCapacity uint `json:"queueCapacity"` // QueueCapacity, defaults to 100
MinBytes uint `json:"minBytes"` // MinBytes, defaults to 0
MaxBytes uint `json:"maxBytes"` // MaxBytes, defaults to 1048576
MaxWait uint `json:"maxWait"` // MaxWait in milliseconds, default 10.000 (10s)
ReadBatchTimeout uint `json:"readBatchTimeout"` // ReadBatchTimeout in milliseconds, default 10.000 (10s)
HeartbeatInterval uint `json:"heartbeatInterval"` // HeartbeatInterval in milliseconds, default 3000 (3s)
CommitInterval uint `json:"commitInterval"` // CommitInterval in milliseconds, default 0
PartitionWatchInterval uint `json:"partitionWatchInterval"` // PartitionWatchInterval in milliseconds, default 5000 (5s)
WatchPartitionChanges bool `json:"watchPartitionChanges"` // WatchPartitionChanges, default true
SessionTimeout uint `json:"sessionTimeout"` // SessionTimeout in milliseconds, default 30.000 (30s)
RebalanceTimeout uint `json:"rebalanceTimeout"` // RebalanceTimeout in milliseconds, default 30.000 (30s)
JoinGroupBackoff uint `json:"joinGroupBackoff"` // JoinGroupBackoff in milliseconds, default 5000 (5s)
RetentionTime uint `json:"retentionTime"` // RetentionTime, in milliseconds, default 86.400.000ms (24h)
StartOffset string `json:"startOffset"` // StartOffset either "first", "last", default "last"
ReadBackoffMin uint `json:"readBackoffMin"` // ReadBackoffMin in milliseconds, default 100
ReadBackoffMax uint `json:"readBackoffMax"` // ReadBackoffMax in milliseconds, default 1000 (1s)
MaxAttempts uint `json:"maxAttempts"` // MaxAttempts default 3
IsolationLevel string `json:"isolationLevel"` // IsolationLevel "uncommitted" or "committed", default "committed"
}
ConsumerOptions additional consumer options
type Producer ¶ added in v0.3.0
func NewProducer ¶
func NewProducer(cfg *ProducerConfig, logger *log.Logger) (*Producer, error)
func (*Producer) Disconnect ¶ added in v0.3.0
func (p *Producer) Disconnect()
Disconnect disconnects from the Writer
func (*Producer) IsConnected ¶ added in v0.3.0
IsConnected returns ture if Writer is connected
func (*Producer) WriteMulti ¶ added in v0.3.0
WriteMulti Write multiple messages to Topic
func (*Producer) WriteMultiJson ¶ added in v0.3.0
WriteMultiJson Write a slice of structs to a Topic as a json message
type ProducerConfig ¶
type ProducerConfig struct {
Brokers string `json:"brokers"`
Topic string `json:"topic"`
AuthType string `json:"authType"`
Username string `json:"username"`
secure.DefaultCredentialConfig
tlsProvider.ClientConfig
ProducerOptions
}
func (ProducerConfig) Validate ¶
func (c ProducerConfig) Validate() error
type ProducerOptions ¶ added in v0.3.0
type ProducerOptions struct {
MaxAttempts uint `json:"maxAttempts"`
WriteBackoffMin uint `json:"writeBackoffMin"` // WriteBackoffMin value in milliseconds, defaults to 100
WriteBackoffMax uint `json:"writeBackoffMax"` // WriteBackoffMax value in milliseconds, defaults to 1000
BatchSize uint `json:"batchSize"` // BatchSize, defaults to 100
BatchBytes uint64 `json:"batchBytes"` // BatchBytes, defaults to 1048576
BatchTimeout uint `json:"batchTimeout"` // BatchTimeout value in milliseconds, defaults to 1000
ReadTimeout uint `json:"readTimeout"` // ReadTimeout value in milliseconds, defaults to 10.000
WriteTimeout uint `json:"writeTimeout"` // WriteTimeout value in milliseconds, defaults to 10.000
RequiredAcks string `json:"requiredAcks"` // RequiredAcks one of "none", "one", "all", default "none"
Async bool `json:"async"` // Async, default false
}
ProducerOptions additional producer options
func (ProducerOptions) ApplyOptions ¶ added in v0.3.0
func (p ProducerOptions) ApplyOptions(w *kafka.Writer)
ApplyOptions sets additional Writer parameters