Documentation
¶
Overview ¶
Package kafka implements tools to work with kafka Producers and Consumers.
Index ¶
- func ConsumeLogChannel(emitter LogEmitter, logger *slog.Logger)
- func LogProcessor(logEmitter LogEmitter, logger *slog.Logger) (execute func() error, interrupt func(error))
- func ProvisionTopic(ctx context.Context, adminClient *kafka.AdminClient, logger *slog.Logger, ...) error
- type BrokerAddressFamily
- type CommonConfigParams
- type ConfigMapper
- type ConfigValidator
- type ConsumerConfig
- type ConsumerConfigParams
- type DebugContext
- type DebugContexts
- type LogEmitter
- type ProducerConfig
- type ProducerConfigParams
- type TimeDurationMilliSeconds
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConsumeLogChannel ¶
func ConsumeLogChannel(emitter LogEmitter, logger *slog.Logger)
ConsumeLogChannel is supposed to be called in a goroutine. It consumes a log channel returned by a LogEmitter.
func LogProcessor ¶
func LogProcessor(logEmitter LogEmitter, logger *slog.Logger) (execute func() error, interrupt func(error))
LogProcessor consumes logs from a LogEmitter and passes them to an slog.Logger.
Types ¶
type BrokerAddressFamily ¶
type BrokerAddressFamily string
const ( BrokerAddressFamilyAny BrokerAddressFamily = "any" BrokerAddressFamilyIPv4 BrokerAddressFamily = "v4" BrokerAddressFamilyIPv6 BrokerAddressFamily = "v6" )
func (BrokerAddressFamily) String ¶
func (s BrokerAddressFamily) String() string
func (*BrokerAddressFamily) UnmarshalJSON ¶
func (s *BrokerAddressFamily) UnmarshalJSON(data []byte) error
func (*BrokerAddressFamily) UnmarshalText ¶
func (s *BrokerAddressFamily) UnmarshalText(text []byte) error
type CommonConfigParams ¶
type CommonConfigParams struct {
Brokers string
SecurityProtocol string
SaslMechanisms string
SaslUsername string
SaslPassword string
StatsInterval TimeDurationMilliSeconds
// BrokerAddressFamily defines the IP address family to be used for network communication with Kafka cluster
BrokerAddressFamily BrokerAddressFamily
// SocketKeepAliveEnable defines if TCP socket keep-alive is enabled to prevent closing idle connections
// by Kafka brokers.
SocketKeepAliveEnabled bool
// TopicMetadataRefreshInterval defines how frequently the Kafka client needs to fetch metadata information
// (brokers, topic, partitions, etc) from the Kafka cluster.
// The 5 minutes default value is appropriate for mostly static Kafka clusters, but needs to be lowered
// in case of large clusters where changes are more frequent.
// This value must not be set to value lower than 10s.
TopicMetadataRefreshInterval TimeDurationMilliSeconds
// Enable contexts for extensive debugging of librdkafka.
// See: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts
DebugContexts DebugContexts
// ClientID sets the Consumer/Producer identifier
ClientID string
}
func (CommonConfigParams) AsConfigMap ¶
func (c CommonConfigParams) AsConfigMap() (kafka.ConfigMap, error)
func (CommonConfigParams) Validate ¶
func (c CommonConfigParams) Validate() error
type ConfigMapper ¶
type ConfigValidator ¶
type ConfigValidator interface {
Validate() error
}
type ConsumerConfig ¶
type ConsumerConfig struct {
CommonConfigParams
ConsumerConfigParams
}
func (ConsumerConfig) AsConfigMap ¶
func (c ConsumerConfig) AsConfigMap() (kafka.ConfigMap, error)
func (ConsumerConfig) Validate ¶
func (c ConsumerConfig) Validate() error
type ConsumerConfigParams ¶
type ConsumerConfigParams struct {
// ConsumerGroupID defines the group id. All clients sharing the same ConsumerGroupID belong to the same group.
ConsumerGroupID string
// ConsumerGroupInstanceID defines the instance id in consumer group. Setting this parameter enables static group membership.
// Static group members are able to leave and rejoin a group within the configured SessionTimeout without prompting a group rebalance.
// This should be used in combination with a larger session.timeout.ms to avoid group rebalances caused by transient unavailability (e.g. process restarts).
ConsumerGroupInstanceID string
// SessionTimeout defines the consumer group session and failure detection timeout.
// The consumer sends periodic heartbeats (HeartbeatInterval) to indicate its liveness to the broker.
// If no hearts are received by the broker for a group member within the session timeout,
// the broker will remove the consumer from the group and trigger a rebalance.
SessionTimeout TimeDurationMilliSeconds
// Defines the consumer group session keepalive heartbeat interval.
HeartbeatInterval TimeDurationMilliSeconds
// EnableAutoCommit enables automatically and periodically commit offsets in the background.
EnableAutoCommit bool
// EnableAutoOffsetStore enables automatically store offset of last message provided to application.
// The offset store is an in-memory store of the next offset to (auto-)commit for each partition.
EnableAutoOffsetStore bool
// AutoOffsetReset defines the action to take when there is no initial offset in offset store or the desired offset is out of range:
// * "smallest","earliest","beginning": automatically reset the offset to the smallest offset
// * "largest","latest","end": automatically reset the offset to the largest offset
// * "error": trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'.
AutoOffsetReset string
// PartitionAssignmentStrategy defines one or more partition assignment strategies.
// The elected group leader will use a strategy supported by all members of the group to assign partitions to group members.
// If there is more than one eligible strategy, preference is determined by the order of this list (strategies earlier in the list have higher priority).
// Cooperative and non-cooperative (eager) strategies must not be mixed.
// Available strategies: range, roundrobin, cooperative-sticky.
PartitionAssignmentStrategy string
}
func (ConsumerConfigParams) AsConfigMap ¶
func (c ConsumerConfigParams) AsConfigMap() (kafka.ConfigMap, error)
func (ConsumerConfigParams) Validate ¶
func (c ConsumerConfigParams) Validate() error
type DebugContext ¶
type DebugContext string
const ( // DebugContextGeneric enables generic client instance level debugging. // Includes initialization and termination debugging. // Client Type: producer, consumer DebugContextGeneric DebugContext = "generic" // DebugContextBroker enables broker and connection state debugging. // Client Type: producer, consumer DebugContextBroker DebugContext = "broker" // DebugContextTopic enables topic and partition state debugging. Includes leader changes. // Client Type: producer, consumer DebugContextTopic DebugContext = "topic" // DebugContextMetadata enables cluster and topic metadata retrieval debugging. // Client Type: producer, consumer DebugContextMetadata DebugContext = "metadata" // DebugContextFeature enables Kafka protocol feature support as negotiated with the broker. // Client Type: producer, consumer DebugContextFeature DebugContext = "feature" // DebugContextQueue enables message queue debugging. // Client Type: producer DebugContextQueue DebugContext = "queue" // DebugContextMessage enables message debugging. Includes information about batching, compression, sizes, etc. // Client Type: producer, consumer DebugContextMessage DebugContext = "msg" // DebugContextProtocol enables Kafka protocol request/response debugging. Includes latency (rtt) printouts. // Client Type: producer, consumer DebugContextProtocol DebugContext = "protocol" // DebugContextConsumerGroup enables low-level consumer group state debugging. // Client Type: consumer DebugContextConsumerGroup DebugContext = "cgrp" // DebugContextSecurity enables security and authentication debugging. // Client Type: producer, consumer DebugContextSecurity DebugContext = "security" // DebugContextFetch enables consumer message fetch debugging. Includes decision when and why messages are fetched. // Client Type: consumer DebugContextFetch DebugContext = "fetch" // DebugContextInterceptor enables interceptor interface debugging. // Client Type: producer, consumer DebugContextInterceptor DebugContext = "interceptor" // DebugContextPlugin enables plugin loading debugging. // Client Type: producer, consumer DebugContextPlugin DebugContext = "plugin" // DebugContextConsumer enables high-level consumer debugging. // Client Type: consumer DebugContextConsumer DebugContext = "consumer" // DebugContextAdmin enables admin API debugging. // Client Type: admin DebugContextAdmin DebugContext = "admin" // DebugContextIdempotentProducer enables idempotent Producer debugging. // Client Type: producer DebugContextIdempotentProducer DebugContext = "eos" // DebugContextMock enables mock cluster functionality debugging. // Client Type: producer, consumer DebugContextMock DebugContext = "mock" // DebugContextAssignor enables detailed consumer group partition assignor debugging. // Client Type: consumer DebugContextAssignor DebugContext = "assignor" // DebugContextConfig enables displaying set configuration properties on startup. // Client Type: producer, consumer DebugContextConfig DebugContext = "conf" // DebugContextAll enables all of the above. // Client Type: producer, consumer DebugContextAll DebugContext = "all" )
func (DebugContext) String ¶
func (c DebugContext) String() string
func (*DebugContext) UnmarshalJSON ¶
func (c *DebugContext) UnmarshalJSON(data []byte) error
func (*DebugContext) UnmarshalText ¶
func (c *DebugContext) UnmarshalText(text []byte) error
type DebugContexts ¶
type DebugContexts []DebugContext
func (DebugContexts) String ¶
func (d DebugContexts) String() string
type LogEmitter ¶
LogEmitter emits logs from a kafka.Consumer or kafka.Producer.
Requires `go.logs.channel.enable` option set to true.
This feature was implemented in this PR.
type ProducerConfig ¶
type ProducerConfig struct {
CommonConfigParams
ProducerConfigParams
}
func (ProducerConfig) AsConfigMap ¶
func (c ProducerConfig) AsConfigMap() (kafka.ConfigMap, error)
func (ProducerConfig) Validate ¶
func (c ProducerConfig) Validate() error
type ProducerConfigParams ¶
type ProducerConfigParams struct{}
func (ProducerConfigParams) AsConfigMap ¶
func (p ProducerConfigParams) AsConfigMap() (kafka.ConfigMap, error)
func (ProducerConfigParams) Validate ¶
func (p ProducerConfigParams) Validate() error
type TimeDurationMilliSeconds ¶
func (TimeDurationMilliSeconds) Duration ¶
func (d TimeDurationMilliSeconds) Duration() time.Duration
func (TimeDurationMilliSeconds) String ¶
func (d TimeDurationMilliSeconds) String() string
func (*TimeDurationMilliSeconds) UnmarshalJSON ¶
func (d *TimeDurationMilliSeconds) UnmarshalJSON(data []byte) error
func (*TimeDurationMilliSeconds) UnmarshalText ¶
func (d *TimeDurationMilliSeconds) UnmarshalText(text []byte) error
Click to show internal directories.
Click to hide internal directories.