 Documentation
      ¶
      Documentation
      ¶
    
    
  
    
  
    Index ¶
- Constants
- Variables
- func GetEventMetadata(message *sarama.ConsumerMessage) map[string]string
- type BulkEventHandler
- type EventHandler
- type Kafka
- func (k *Kafka) BulkPublish(_ context.Context, topic string, entries []pubsub.BulkMessageEntry, ...) (pubsub.BulkPublishResponse, error)
- func (k *Kafka) Close() error
- func (k *Kafka) DeserializeValue(message *sarama.ConsumerMessage, config SubscriptionHandlerConfig) ([]byte, error)
- func (k *Kafka) GetTopicHandlerConfig(topic string) (SubscriptionHandlerConfig, error)
- func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error
- func (k *Kafka) Publish(_ context.Context, topic string, data []byte, metadata map[string]string) error
- func (k *Kafka) SerializeValue(topic string, data []byte, metadata map[string]string) ([]byte, error)
- func (k *Kafka) Subscribe(ctx context.Context, handlerConfig SubscriptionHandlerConfig, topics ...string)
 
- type KafkaBulkMessage
- type KafkaBulkMessageEntry
- type KafkaMetadata
- type NewEvent
- type OAuthTokenSource
- type SaramaLogBridge
- type SchemaCacheEntry
- type SchemaType
- type SubscriptionHandlerConfig
- type TopicHandlerConfig
- type XDGSCRAMClient
Constants ¶
const ( // DefaultMaxBulkSubCount is the default max bulk count for kafka pubsub component // if the MaxBulkCountKey is not set in the metadata. DefaultMaxBulkSubCount = 80 // DefaultMaxBulkSubAwaitDurationMs is the default max bulk await duration for kafka pubsub component // if the MaxBulkAwaitDurationKey is not set in the metadata. DefaultMaxBulkSubAwaitDurationMs = 10000 )
Variables ¶
var ( SHA256 scram.HashGeneratorFcn = sha256.New SHA512 scram.HashGeneratorFcn = sha512.New )
Functions ¶
func GetEventMetadata ¶
func GetEventMetadata(message *sarama.ConsumerMessage) map[string]string
Types ¶
type BulkEventHandler ¶
type BulkEventHandler func(ctx context.Context, msg *KafkaBulkMessage) ([]pubsub.BulkSubscribeResponseEntry, error)
BulkEventHandler is the handler used to handle the subscribed bulk event.
type EventHandler ¶
EventHandler is the handler used to handle the subscribed event.
type Kafka ¶
type Kafka struct {
	// The default value should be true for kafka pubsub component and false for kafka binding component
	// This default value can be overridden by metadata consumeRetryEnabled
	DefaultConsumeRetryEnabled bool
	// contains filtered or unexported fields
}
    Kafka allows reading/writing to a Kafka consumer group.
func (*Kafka) BulkPublish ¶
func (*Kafka) DeserializeValue ¶
func (k *Kafka) DeserializeValue(message *sarama.ConsumerMessage, config SubscriptionHandlerConfig) ([]byte, error)
func (*Kafka) GetTopicHandlerConfig ¶
func (k *Kafka) GetTopicHandlerConfig(topic string) (SubscriptionHandlerConfig, error)
GetTopicBulkHandler returns the handlerConfig for a topic
func (*Kafka) Publish ¶
func (k *Kafka) Publish(_ context.Context, topic string, data []byte, metadata map[string]string) error
Publish message to Kafka cluster.
func (*Kafka) SerializeValue ¶
type KafkaBulkMessage ¶
type KafkaBulkMessage struct {
	Entries  []KafkaBulkMessageEntry `json:"entries"`
	Topic    string                  `json:"topic"`
	Metadata map[string]string       `json:"metadata"`
}
    KafkaBulkMessage is a bulk event arriving from a message bus instance.
type KafkaBulkMessageEntry ¶
type KafkaBulkMessageEntry struct {
	EntryId     string            `json:"entryId"` //nolint:stylecheck
	Event       []byte            `json:"event"`
	ContentType string            `json:"contentType,omitempty"`
	Metadata    map[string]string `json:"metadata"`
}
    KafkaBulkMessageEntry is an item contained inside bulk event arriving from a message bus instance.
type KafkaMetadata ¶
type KafkaMetadata struct {
	Brokers string `mapstructure:"brokers"`
	ConsumerGroup string `mapstructure:"consumerGroup"`
	ClientID      string `mapstructure:"clientId"`
	AuthType      string `mapstructure:"authType"`
	SaslUsername  string `mapstructure:"saslUsername"`
	SaslPassword  string `mapstructure:"saslPassword"`
	SaslMechanism string `mapstructure:"saslMechanism"`
	InitialOffset string `mapstructure:"initialOffset"`
	MaxMessageBytes   int    `mapstructure:"maxMessageBytes"`
	OidcTokenEndpoint string `mapstructure:"oidcTokenEndpoint"`
	OidcClientID      string `mapstructure:"oidcClientID"`
	OidcClientSecret  string `mapstructure:"oidcClientSecret"`
	OidcScopes        string `mapstructure:"oidcScopes"`
	OidcExtensions    string `mapstructure:"oidcExtensions"`
	TLSDisable           bool          `mapstructure:"disableTls"`
	TLSSkipVerify        bool          `mapstructure:"skipVerify"`
	TLSCaCert            string        `mapstructure:"caCert"`
	TLSClientCert        string        `mapstructure:"clientCert"`
	TLSClientKey         string        `mapstructure:"clientKey"`
	ConsumeRetryEnabled  bool          `mapstructure:"consumeRetryEnabled"`
	ConsumeRetryInterval time.Duration `mapstructure:"consumeRetryInterval"`
	Version              string        `mapstructure:"version"`
	// aws iam auth profile
	AWSAccessKey      string `mapstructure:"awsAccessKey"`
	AWSSecretKey      string `mapstructure:"awsSecretKey"`
	AWSSessionToken   string `mapstructure:"awsSessionToken"`
	AWSIamRoleArn     string `mapstructure:"awsIamRoleArn"`
	AWSStsSessionName string `mapstructure:"awsStsSessionName"`
	AWSRegion         string `mapstructure:"awsRegion"`
	// schema registry
	SchemaRegistryURL           string        `mapstructure:"schemaRegistryURL"`
	SchemaRegistryAPIKey        string        `mapstructure:"schemaRegistryAPIKey"`
	SchemaRegistryAPISecret     string        `mapstructure:"schemaRegistryAPISecret"`
	SchemaCachingEnabled        bool          `mapstructure:"schemaCachingEnabled"`
	SchemaLatestVersionCacheTTL time.Duration `mapstructure:"schemaLatestVersionCacheTTL"`
	// contains filtered or unexported fields
}
    type NewEvent ¶
type NewEvent struct {
	Data        []byte            `json:"data"`
	Topic       string            `json:"topic"`
	Metadata    map[string]string `json:"metadata"`
	ContentType *string           `json:"contentType,omitempty"`
}
    NewEvent is an event arriving from a message bus instance.
type OAuthTokenSource ¶
type OAuthTokenSource struct {
	CachedToken   oauth2.Token
	Extensions    map[string]string
	TokenEndpoint oauth2.Endpoint
	ClientID      string
	ClientSecret  string
	Scopes        []string
	// contains filtered or unexported fields
}
    func (*OAuthTokenSource) Token ¶
func (ts *OAuthTokenSource) Token() (*sarama.AccessToken, error)
type SaramaLogBridge ¶
type SaramaLogBridge struct {
	// contains filtered or unexported fields
}
    func (SaramaLogBridge) Print ¶
func (b SaramaLogBridge) Print(v ...interface{})
func (SaramaLogBridge) Printf ¶
func (b SaramaLogBridge) Printf(format string, v ...interface{})
func (SaramaLogBridge) Println ¶
func (b SaramaLogBridge) Println(v ...interface{})
type SchemaCacheEntry ¶
type SchemaCacheEntry struct {
	// contains filtered or unexported fields
}
    type SchemaType ¶
type SchemaType int
const ( None SchemaType = iota Avro )
func GetValueSchemaType ¶
func GetValueSchemaType(metadata map[string]string) (SchemaType, error)
type SubscriptionHandlerConfig ¶
type SubscriptionHandlerConfig struct {
	IsBulkSubscribe bool
	SubscribeConfig pubsub.BulkSubscribeConfig
	BulkHandler     BulkEventHandler
	Handler         EventHandler
	ValueSchemaType SchemaType
}
    SubscriptionHandlerConfig is the handler and configuration for subscription.
type TopicHandlerConfig ¶
type TopicHandlerConfig map[string]SubscriptionHandlerConfig
TopicHandlerConfig is the map of topics and sruct containing handler and their config.
func (TopicHandlerConfig) TopicList ¶
func (tbh TopicHandlerConfig) TopicList() []string
// TopicList returns the list of topics
type XDGSCRAMClient ¶
type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}
    func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶
func (x *XDGSCRAMClient) Done() bool