Documentation
¶
Overview ¶
Package kafka implements tools to work with kafka Producers and Consumers.
Index ¶
- Variables
- func ConsumeLogChannel(emitter LogEmitter, logger *slog.Logger)
- func LogProcessor(logEmitter LogEmitter, logger *slog.Logger) (execute func() error, interrupt func(error))
- type AdminClient
- type AdminConfig
- type AutoOffsetReset
- type BrokerAddressFamily
- type CommonConfigParams
- type ConfigMapper
- type ConfigValidator
- type ConsumerConfig
- type ConsumerConfigParams
- type DebugContext
- type DebugContexts
- type LogEmitter
- type PartitionAssignmentStrategies
- type PartitionAssignmentStrategy
- type Partitioner
- type ProducerConfig
- type ProducerConfigParams
- type TimeDurationMilliSeconds
- type TopicConfig
- type TopicProvisioner
- type TopicProvisionerConfig
- type ValidValues
Constants ¶
This section is empty.
Variables ¶
var AutoOffsetResetValues = []AutoOffsetReset{ AutoOffsetResetSmallest, AutoOffsetResetEarliest, AutoOffsetResetBeginning, AutoOffsetResetLargest, AutoOffsetResetLatest, AutoOffsetResetEnd, AutoOffsetResetError, }
var BrokerAddressFamilyValues = ValidValues[BrokerAddressFamily]{ BrokerAddressFamilyAny, BrokerAddressFamilyIPv4, BrokerAddressFamilyIPv6, }
var CooperativePartitionAssignmentStrategyValues = ValidValues[PartitionAssignmentStrategy]{ PartitionAssignmentStrategyCooperativeSticky, }
var DebugContextValues = ValidValues[DebugContext]{ DebugContextGeneric, DebugContextBroker, DebugContextTopic, DebugContextMetadata, DebugContextFeature, DebugContextQueue, DebugContextMessage, DebugContextProtocol, DebugContextConsumerGroup, DebugContextSecurity, DebugContextFetch, DebugContextInterceptor, DebugContextPlugin, DebugContextConsumer, DebugContextAdmin, DebugContextIdempotentProducer, DebugContextMock, DebugContextAssignor, DebugContextConfig, DebugContextAll, }
var EagerPartitionAssignmentStrategyValues = ValidValues[PartitionAssignmentStrategy]{ PartitionAssignmentStrategyRange, PartitionAssignmentStrategyRoundRobin, }
var PartitionAssignmentStrategyValues = ValidValues[PartitionAssignmentStrategy]{ PartitionAssignmentStrategyRange, PartitionAssignmentStrategyRoundRobin, PartitionAssignmentStrategyCooperativeSticky, }
Functions ¶
func ConsumeLogChannel ¶
func ConsumeLogChannel(emitter LogEmitter, logger *slog.Logger)
ConsumeLogChannel is supposed to be called in a goroutine. It consumes a log channel returned by a LogEmitter.
func LogProcessor ¶
func LogProcessor(logEmitter LogEmitter, logger *slog.Logger) (execute func() error, interrupt func(error))
LogProcessor consumes logs from a LogEmitter and passes them to an slog.Logger.
Types ¶
type AdminClient ¶
type AdminClient interface {
CreateTopics(ctx context.Context, topics []kafka.TopicSpecification, options ...kafka.CreateTopicsAdminOption) ([]kafka.TopicResult, error)
DeleteTopics(ctx context.Context, topics []string, options ...kafka.DeleteTopicsAdminOption) ([]kafka.TopicResult, error)
}
type AdminConfig ¶
type AdminConfig struct {
CommonConfigParams
}
func (AdminConfig) AsConfigMap ¶
func (c AdminConfig) AsConfigMap() (kafka.ConfigMap, error)
func (AdminConfig) Validate ¶
func (c AdminConfig) Validate() error
type AutoOffsetReset ¶
type AutoOffsetReset string
const ( // AutoOffsetResetSmallest automatically reset the offset to the smallest offset. AutoOffsetResetSmallest AutoOffsetReset = "smallest" // AutoOffsetResetEarliest automatically reset the offset to the smallest offset. AutoOffsetResetEarliest AutoOffsetReset = "earliest" // AutoOffsetResetBeginning automatically reset the offset to the smallest offset. AutoOffsetResetBeginning AutoOffsetReset = "beginning" // AutoOffsetResetLargest automatically reset the offset to the largest offset. AutoOffsetResetLargest AutoOffsetReset = "largest" // AutoOffsetResetLatest automatically reset the offset to the largest offset. AutoOffsetResetLatest AutoOffsetReset = "latest" // AutoOffsetResetEnd automatically reset the offset to the largest offset. AutoOffsetResetEnd AutoOffsetReset = "end" // AutoOffsetResetError trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by // consuming messages and checking 'message->err' AutoOffsetResetError AutoOffsetReset = "error" )
func (AutoOffsetReset) String ¶
func (s AutoOffsetReset) String() string
func (*AutoOffsetReset) UnmarshalJSON ¶
func (s *AutoOffsetReset) UnmarshalJSON(data []byte) error
func (*AutoOffsetReset) UnmarshalText ¶
func (s *AutoOffsetReset) UnmarshalText(text []byte) error
type BrokerAddressFamily ¶
type BrokerAddressFamily string
const ( BrokerAddressFamilyAny BrokerAddressFamily = "any" BrokerAddressFamilyIPv4 BrokerAddressFamily = "v4" BrokerAddressFamilyIPv6 BrokerAddressFamily = "v6" )
func (BrokerAddressFamily) String ¶
func (s BrokerAddressFamily) String() string
func (*BrokerAddressFamily) UnmarshalJSON ¶
func (s *BrokerAddressFamily) UnmarshalJSON(data []byte) error
func (*BrokerAddressFamily) UnmarshalText ¶
func (s *BrokerAddressFamily) UnmarshalText(text []byte) error
type CommonConfigParams ¶
type CommonConfigParams struct {
Brokers string
SecurityProtocol string
SaslMechanisms string
SaslUsername string
SaslPassword string
StatsInterval TimeDurationMilliSeconds
// StatsExtended defines if extended metrics are enabled.
StatsExtended bool
// BrokerAddressFamily defines the IP address family to be used for network communication with Kafka cluster
BrokerAddressFamily BrokerAddressFamily
// SocketKeepAliveEnable defines if TCP socket keep-alive is enabled to prevent closing idle connections
// by Kafka brokers.
SocketKeepAliveEnabled bool
// TopicMetadataRefreshInterval defines how frequently the Kafka client needs to fetch metadata information
// (brokers, topic, partitions, etc) from the Kafka cluster.
// The 5 minutes default value is appropriate for mostly static Kafka clusters, but needs to be lowered
// in case of large clusters where changes are more frequent.
// This value must not be set to value lower than 10s.
TopicMetadataRefreshInterval TimeDurationMilliSeconds
// Enable contexts for extensive debugging of librdkafka.
// See: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts
DebugContexts DebugContexts
// ClientID sets the Consumer/Producer identifier
ClientID string
}
func (CommonConfigParams) AsConfigMap ¶
func (c CommonConfigParams) AsConfigMap() (kafka.ConfigMap, error)
func (CommonConfigParams) Validate ¶
func (c CommonConfigParams) Validate() error
type ConfigMapper ¶
type ConfigValidator ¶
type ConfigValidator interface {
Validate() error
}
type ConsumerConfig ¶
type ConsumerConfig struct {
CommonConfigParams
ConsumerConfigParams
}
func (ConsumerConfig) AsConfigMap ¶
func (c ConsumerConfig) AsConfigMap() (kafka.ConfigMap, error)
func (ConsumerConfig) Validate ¶
func (c ConsumerConfig) Validate() error
type ConsumerConfigParams ¶
type ConsumerConfigParams struct {
// ConsumerGroupID defines the group id. All clients sharing the same ConsumerGroupID belong to the same group.
ConsumerGroupID string
// ConsumerGroupInstanceID defines the instance id in consumer group. Setting this parameter enables static group membership.
// Static group members are able to leave and rejoin a group within the configured SessionTimeout without prompting a group rebalance.
// This should be used in combination with a larger session.timeout.ms to avoid group rebalances caused by transient unavailability (e.g. process restarts).
ConsumerGroupInstanceID string
// SessionTimeout defines the consumer group session and failure detection timeout.
// The consumer sends periodic heartbeats (HeartbeatInterval) to indicate its liveness to the broker.
// If no hearts are received by the broker for a group member within the session timeout,
// the broker will remove the consumer from the group and trigger a rebalance.
SessionTimeout TimeDurationMilliSeconds
// Defines the consumer group session keepalive heartbeat interval.
HeartbeatInterval TimeDurationMilliSeconds
// EnableAutoCommit enables automatically and periodically commit offsets in the background.
EnableAutoCommit bool
// EnableAutoOffsetStore enables automatically store offset of last message provided to application.
// The offset store is an in-memory store of the next offset to (auto-)commit for each partition.
EnableAutoOffsetStore bool
// AutoOffsetReset defines the action to take when there is no initial offset in offset store or the desired offset is out of range:
// * "smallest","earliest","beginning": automatically reset the offset to the smallest offset
// * "largest","latest","end": automatically reset the offset to the largest offset
// * "error": trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'.
AutoOffsetReset AutoOffsetReset
// PartitionAssignmentStrategy defines one or more partition assignment strategies.
// The elected group leader will use a strategy supported by all members of the group to assign partitions to group members.
// If there is more than one eligible strategy, preference is determined by the order of this list (strategies earlier in the list have higher priority).
// Cooperative and non-cooperative (eager) strategies must not be mixed.
// Available strategies: PartitionAssignmentStrategyRange, PartitionAssignmentStrategyRoundRobin, PartitionAssignmentStrategyCooperativeSticky.
PartitionAssignmentStrategy PartitionAssignmentStrategies
// The maximum delay between invocations of poll() when using consumer group management.
// This places an upper bound on the amount of time that the consumer can be idle before fetching more records.
// If poll() is not called before expiration of this timeout, then the consumer is considered failed and
// the group will rebalance in order to reassign the partitions to another member.
// See https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#max-poll-interval-ms
MaxPollInterval TimeDurationMilliSeconds
}
func (ConsumerConfigParams) AsConfigMap ¶
func (c ConsumerConfigParams) AsConfigMap() (kafka.ConfigMap, error)
func (ConsumerConfigParams) Validate ¶
func (c ConsumerConfigParams) Validate() error
type DebugContext ¶
type DebugContext string
const ( // DebugContextGeneric enables generic client instance level debugging. // Includes initialization and termination debugging. // Client Type: producer, consumer DebugContextGeneric DebugContext = "generic" // DebugContextBroker enables broker and connection state debugging. // Client Type: producer, consumer DebugContextBroker DebugContext = "broker" // DebugContextTopic enables topic and partition state debugging. Includes leader changes. // Client Type: producer, consumer DebugContextTopic DebugContext = "topic" // DebugContextMetadata enables cluster and topic metadata retrieval debugging. // Client Type: producer, consumer DebugContextMetadata DebugContext = "metadata" // DebugContextFeature enables Kafka protocol feature support as negotiated with the broker. // Client Type: producer, consumer DebugContextFeature DebugContext = "feature" // DebugContextQueue enables message queue debugging. // Client Type: producer DebugContextQueue DebugContext = "queue" // DebugContextMessage enables message debugging. Includes information about batching, compression, sizes, etc. // Client Type: producer, consumer DebugContextMessage DebugContext = "msg" // DebugContextProtocol enables Kafka protocol request/response debugging. Includes latency (rtt) printouts. // Client Type: producer, consumer DebugContextProtocol DebugContext = "protocol" // DebugContextConsumerGroup enables low-level consumer group state debugging. // Client Type: consumer DebugContextConsumerGroup DebugContext = "cgrp" // DebugContextSecurity enables security and authentication debugging. // Client Type: producer, consumer DebugContextSecurity DebugContext = "security" // DebugContextFetch enables consumer message fetch debugging. Includes decision when and why messages are fetched. // Client Type: consumer DebugContextFetch DebugContext = "fetch" // DebugContextInterceptor enables interceptor interface debugging. // Client Type: producer, consumer DebugContextInterceptor DebugContext = "interceptor" // DebugContextPlugin enables plugin loading debugging. // Client Type: producer, consumer DebugContextPlugin DebugContext = "plugin" // DebugContextConsumer enables high-level consumer debugging. // Client Type: consumer DebugContextConsumer DebugContext = "consumer" // DebugContextAdmin enables admin API debugging. // Client Type: admin DebugContextAdmin DebugContext = "admin" // DebugContextIdempotentProducer enables idempotent Producer debugging. // Client Type: producer DebugContextIdempotentProducer DebugContext = "eos" // DebugContextMock enables mock cluster functionality debugging. // Client Type: producer, consumer DebugContextMock DebugContext = "mock" // DebugContextAssignor enables detailed consumer group partition assignor debugging. // Client Type: consumer DebugContextAssignor DebugContext = "assignor" // DebugContextConfig enables displaying set configuration properties on startup. // Client Type: producer, consumer DebugContextConfig DebugContext = "conf" // DebugContextAll enables all of the above. // Client Type: producer, consumer DebugContextAll DebugContext = "all" )
func (DebugContext) String ¶
func (c DebugContext) String() string
func (*DebugContext) UnmarshalJSON ¶
func (c *DebugContext) UnmarshalJSON(data []byte) error
func (*DebugContext) UnmarshalText ¶
func (c *DebugContext) UnmarshalText(text []byte) error
type DebugContexts ¶
type DebugContexts []DebugContext
func (DebugContexts) String ¶
func (d DebugContexts) String() string
type LogEmitter ¶
LogEmitter emits logs from a kafka.Consumer or kafka.Producer.
Requires `go.logs.channel.enable` option set to true.
This feature was implemented in this PR.
type PartitionAssignmentStrategies ¶
type PartitionAssignmentStrategies []PartitionAssignmentStrategy
PartitionAssignmentStrategies one or more partition assignment strategies. If there is more than one eligible strategy, preference is determined by the configured order of strategies. IMPORTANT: cooperative and non-cooperative (eager) strategies must NOT be mixed.
func (PartitionAssignmentStrategies) String ¶
func (p PartitionAssignmentStrategies) String() string
type PartitionAssignmentStrategy ¶
type PartitionAssignmentStrategy string
const ( // PartitionAssignmentStrategyRange assigns partitions on a per-topic basis. PartitionAssignmentStrategyRange PartitionAssignmentStrategy = "range" // PartitionAssignmentStrategyRoundRobin assigns partitions to consumers in a round-robin fashion. PartitionAssignmentStrategyRoundRobin PartitionAssignmentStrategy = "roundrobin" // PartitionAssignmentStrategyCooperativeSticky guarantees an assignment that is maximally balanced while preserving // as many existing partition assignments as possible while allowing cooperative rebalancing. PartitionAssignmentStrategyCooperativeSticky PartitionAssignmentStrategy = "cooperative-sticky" )
func (PartitionAssignmentStrategy) String ¶
func (c PartitionAssignmentStrategy) String() string
func (*PartitionAssignmentStrategy) UnmarshalJSON ¶
func (c *PartitionAssignmentStrategy) UnmarshalJSON(data []byte) error
func (*PartitionAssignmentStrategy) UnmarshalText ¶
func (c *PartitionAssignmentStrategy) UnmarshalText(text []byte) error
type Partitioner ¶
type Partitioner string
const ( // PartitionerRandom uses random distribution. PartitionerRandom Partitioner = "random" // PartitionerConsistent uses the CRC32 hash of key while Empty and NULL keys are mapped to single partition. PartitionerConsistent Partitioner = "consistent" // PartitionerConsistentRandom uses CRC32 hash of key while Empty and NULL keys are randomly partitioned. PartitionerConsistentRandom Partitioner = "consistent_random" // PartitionerMurmur2 uses Java Producer compatible Murmur2 hash of key while NULL keys are mapped to single partition. PartitionerMurmur2 Partitioner = "murmur2" // PartitionerMurmur2Random uses Java Producer compatible Murmur2 hash of key whileNULL keys are randomly partitioned. // This is functionally equivalent to the default partitioner in the Java Producer. PartitionerMurmur2Random Partitioner = "murmur2_random" // PartitionerFnv1a uses FNV-1a hash of key whileNULL keys are mapped to single partition. PartitionerFnv1a Partitioner = "fnv1a" // PartitionerFnv1aRandom uses FNV-1a hash of key whileNULL keys are randomly partitioned. PartitionerFnv1aRandom Partitioner = "fnv1a_random" )
func (Partitioner) String ¶
func (s Partitioner) String() string
func (*Partitioner) UnmarshalJSON ¶
func (s *Partitioner) UnmarshalJSON(data []byte) error
func (*Partitioner) UnmarshalText ¶
func (s *Partitioner) UnmarshalText(text []byte) error
type ProducerConfig ¶
type ProducerConfig struct {
CommonConfigParams
ProducerConfigParams
}
func (ProducerConfig) AsConfigMap ¶
func (c ProducerConfig) AsConfigMap() (kafka.ConfigMap, error)
func (ProducerConfig) Validate ¶
func (c ProducerConfig) Validate() error
type ProducerConfigParams ¶
type ProducerConfigParams struct {
// Partitioner defines the algorithm used for assigning topic partition for produced message based its partition key.
Partitioner Partitioner
}
func (ProducerConfigParams) AsConfigMap ¶
func (p ProducerConfigParams) AsConfigMap() (kafka.ConfigMap, error)
func (ProducerConfigParams) Validate ¶
func (p ProducerConfigParams) Validate() error
type TimeDurationMilliSeconds ¶
func (TimeDurationMilliSeconds) Duration ¶
func (d TimeDurationMilliSeconds) Duration() time.Duration
func (TimeDurationMilliSeconds) String ¶
func (d TimeDurationMilliSeconds) String() string
func (*TimeDurationMilliSeconds) UnmarshalJSON ¶
func (d *TimeDurationMilliSeconds) UnmarshalJSON(data []byte) error
func (*TimeDurationMilliSeconds) UnmarshalText ¶
func (d *TimeDurationMilliSeconds) UnmarshalText(text []byte) error
type TopicConfig ¶
type TopicConfig struct {
Name string
Partitions int
Replicas int
RetentionTime TimeDurationMilliSeconds
}
func (TopicConfig) Validate ¶
func (c TopicConfig) Validate() error
type TopicProvisioner ¶
type TopicProvisioner interface {
Provision(ctx context.Context, topics ...TopicConfig) error
DeProvision(ctx context.Context, topics ...string) error
}
func NewTopicProvisioner ¶
func NewTopicProvisioner(config TopicProvisionerConfig) (TopicProvisioner, error)
NewTopicProvisioner returns a new TopicProvisioner.
type TopicProvisionerConfig ¶
type TopicProvisionerConfig struct {
AdminClient AdminClient
Logger *slog.Logger
Meter metric.Meter
// CacheSize stores he maximum number of entries stored in topic cache at a time which after the least recently used is evicted.
// Setting it to 0 makes the cache size unlimited.
CacheSize int
// CacheTTL stores maximum time an entries is kept in cache before being evicted.
// Setting it to 0 disables cache entry expiration.
CacheTTL time.Duration
// ProtectedTopics defines a list of topics which are protected from deletion.
ProtectedTopics []string
}
type ValidValues ¶
type ValidValues[T comparable] []T
func (ValidValues[T]) AsKeyMap ¶
func (v ValidValues[T]) AsKeyMap() map[T]struct{}