Versions in this module Expand all Collapse all v1 v1.15.1 Jun 9, 2025 Changes in this version + const DefaultMaxBulkSubAwaitDurationMs + const DefaultMaxBulkSubCount + var SHA256 scram.HashGeneratorFcn = sha256.New + var SHA512 scram.HashGeneratorFcn = sha512.New + func GetEventMetadata(message *sarama.ConsumerMessage, escapeHeaders bool) map[string]string + func GetSyncProducer(config sarama.Config, brokers []string, maxMessageBytes int) (sarama.SyncProducer, error) + type BulkEventHandler func(ctx context.Context, msg *KafkaBulkMessage) ([]pubsub.BulkSubscribeResponseEntry, error) + type EventHandler func(ctx context.Context, msg *NewEvent) error + type Kafka struct + DefaultConsumeRetryEnabled bool + func NewKafka(logger logger.Logger) *Kafka + func (k *Kafka) BulkPublish(_ context.Context, topic string, entries []pubsub.BulkMessageEntry, ...) (pubsub.BulkPublishResponse, error) + func (k *Kafka) Close() error + func (k *Kafka) DeserializeValue(message *sarama.ConsumerMessage, config SubscriptionHandlerConfig) ([]byte, error) + func (k *Kafka) GetTopicHandlerConfig(topic string) (SubscriptionHandlerConfig, error) + func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error + func (k *Kafka) Publish(_ context.Context, topic string, data []byte, metadata map[string]string) error + func (k *Kafka) SerializeValue(topic string, data []byte, metadata map[string]string) ([]byte, error) + func (k *Kafka) Subscribe(ctx context.Context, handlerConfig SubscriptionHandlerConfig, topics ...string) + func (k *Kafka) ValidateAWS(metadata map[string]string) (*awsAuth.DeprecatedKafkaIAM, error) + type KafkaBulkMessage struct + Entries []KafkaBulkMessageEntry + Metadata map[string]string + Topic string + type KafkaBulkMessageEntry struct + ContentType string + EntryId string + Event []byte + Metadata map[string]string + type KafkaMetadata struct + AuthType string + Brokers string + ClientConnectionKeepAliveInterval time.Duration + ClientConnectionTopicMetadataRefreshInterval time.Duration + ClientID string + Compression string + ConsumeRetryEnabled bool + ConsumeRetryInterval time.Duration + ConsumerGroup string + EscapeHeaders bool + HeartbeatInterval time.Duration + InitialOffset string + MaxMessageBytes int + OidcClientID string + OidcClientSecret string + OidcExtensions string + OidcScopes string + OidcTokenEndpoint string + SaslMechanism string + SaslPassword string + SaslUsername string + SchemaCachingEnabled bool + SchemaLatestVersionCacheTTL time.Duration + SchemaRegistryAPIKey string + SchemaRegistryAPISecret string + SchemaRegistryURL string + SessionTimeout time.Duration + TLSCaCert string + TLSClientCert string + TLSClientKey string + TLSDisable bool + TLSSkipVerify bool + Version string + type NewEvent struct + ContentType *string + Data []byte + Metadata map[string]string + Topic string + type OAuthTokenSource struct + CachedToken oauth2.Token + ClientID string + ClientSecret string + Extensions map[string]string + Scopes []string + TokenEndpoint oauth2.Endpoint + func (ts *OAuthTokenSource) Token() (*sarama.AccessToken, error) + type SaramaLogBridge struct + func (b SaramaLogBridge) Print(v ...interface{}) + func (b SaramaLogBridge) Printf(format string, v ...interface{}) + func (b SaramaLogBridge) Println(v ...interface{}) + type SchemaCacheEntry struct + type SchemaType int + const Avro + const None + func GetValueSchemaType(metadata map[string]string) (SchemaType, error) + type SubscriptionHandlerConfig struct + BulkHandler BulkEventHandler + Handler EventHandler + IsBulkSubscribe bool + SubscribeConfig pubsub.BulkSubscribeConfig + ValueSchemaType SchemaType + type TopicHandlerConfig map[string]SubscriptionHandlerConfig + func (tbh TopicHandlerConfig) TopicList() []string + type XDGSCRAMClient struct + func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) + func (x *XDGSCRAMClient) Done() bool + func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)