Documentation
¶
Index ¶
- Variables
- func NewCloudWatchClient(region string) (*cloudwatch.Client, error)
- func NewCostExplorerClient(region string) (*costexplorer.Client, error)
- func NewEC2Client(region string) (*ec2.Client, error)
- func NewIAMClient() (*iam.Client, error)
- func NewMSKClient(region string) (*kafka.Client, error)
- func NewMSKConnectClient(region string) (*kafkaconnect.Client, error)
- func NewS3Client(region string) (*s3.Client, error)
- func NewSchemaRegistryClient(url string, opts ...SchemaRegistryOption) (schemaregistry.Client, error)
- type AdminConfig
- type AdminOption
- type ClusterKafkaMetadata
- type KafkaAdmin
- type KafkaAdminClient
- func (k *KafkaAdminClient) Close() error
- func (k *KafkaAdminClient) DescribeConfig() ([]sarama.ConfigEntry, error)
- func (k *KafkaAdminClient) GetAllMessagesWithKeyFilter(topicName string, keyPrefix string) (map[string]string, error)
- func (k *KafkaAdminClient) GetClusterKafkaMetadata() (*ClusterKafkaMetadata, error)
- func (k *KafkaAdminClient) GetConnectorStatusMessages(topicName string) (map[string]string, error)
- func (k *KafkaAdminClient) ListAcls() ([]sarama.ResourceAcls, error)
- func (k *KafkaAdminClient) ListTopicsWithConfigs() (map[string]sarama.TopicDetail, error)
- type MSKAccessTokenProvider
- type SchemaRegistryConfig
- type SchemaRegistryOption
- type XDGSCRAMClient
Constants ¶
This section is empty.
Variables ¶
var ( SHA256 scram.HashGeneratorFcn = sha256.New SHA512 scram.HashGeneratorFcn = sha512.New )
Functions ¶
func NewCloudWatchClient ¶
func NewCloudWatchClient(region string) (*cloudwatch.Client, error)
func NewCostExplorerClient ¶
func NewCostExplorerClient(region string) (*costexplorer.Client, error)
func NewIAMClient ¶ added in v0.2.1
func NewMSKConnectClient ¶ added in v0.4.0
func NewMSKConnectClient(region string) (*kafkaconnect.Client, error)
func NewSchemaRegistryClient ¶ added in v0.4.2
func NewSchemaRegistryClient(url string, opts ...SchemaRegistryOption) (schemaregistry.Client, error)
NewSchemaRegistryClient creates a new Schema Registry client for the given URL
Types ¶
type AdminConfig ¶
type AdminConfig struct {
// contains filtered or unexported fields
}
AdminConfig holds the configuration for creating a Kafka admin client
type AdminOption ¶
type AdminOption func(*AdminConfig)
AdminOption is a function type for configuring the Kafka admin client
func WithIAMAuth ¶
func WithIAMAuth() AdminOption
WithIAMAuth configures the admin client to use IAM authentication
func WithSASLSCRAMAuth ¶
func WithSASLSCRAMAuth(username, password string) AdminOption
WithSASLSCRAMAuth configures the admin client to use SASL/SCRAM authentication
func WithTLSAuth ¶
func WithTLSAuth(caCertFile string, clientCertFile string, clientKeyFile string) AdminOption
func WithUnauthenticatedPlaintextAuth ¶ added in v0.4.0
func WithUnauthenticatedPlaintextAuth() AdminOption
func WithUnauthenticatedTlsAuth ¶ added in v0.4.0
func WithUnauthenticatedTlsAuth() AdminOption
type ClusterKafkaMetadata ¶
ClusterKafkaMetadata represents cluster information including brokers, controller, and cluster ID
type KafkaAdmin ¶
type KafkaAdmin interface {
ListTopicsWithConfigs() (map[string]sarama.TopicDetail, error)
GetClusterKafkaMetadata() (*ClusterKafkaMetadata, error)
DescribeConfig() ([]sarama.ConfigEntry, error)
ListAcls() ([]sarama.ResourceAcls, error)
GetAllMessagesWithKeyFilter(topicName string, keyPrefix string) (map[string]string, error)
GetConnectorStatusMessages(topicName string) (map[string]string, error)
Close() error
}
KafkaAdmin interface defines the Kafka admin operations we need
func NewKafkaAdmin ¶
func NewKafkaAdmin(brokerAddresses []string, clientBrokerEncryptionInTransit kafkatypes.ClientBroker, region string, kafkaVersion string, opts ...AdminOption) (KafkaAdmin, error)
NewKafkaAdmin creates a new Kafka admin client for the given broker addresses and region
type KafkaAdminClient ¶
type KafkaAdminClient struct {
// contains filtered or unexported fields
}
KafkaAdminClient wraps sarama.ClusterAdmin to implement our KafkaAdmin interface
func (*KafkaAdminClient) Close ¶
func (k *KafkaAdminClient) Close() error
func (*KafkaAdminClient) DescribeConfig ¶
func (k *KafkaAdminClient) DescribeConfig() ([]sarama.ConfigEntry, error)
func (*KafkaAdminClient) GetAllMessagesWithKeyFilter ¶ added in v0.4.2
func (k *KafkaAdminClient) GetAllMessagesWithKeyFilter(topicName string, keyPrefix string) (map[string]string, error)
GetAllMessagesWithKeyFilter retrieves all messages from a specific topic across all partitions that have keys starting with the specified prefix Returns a map of connector names to their configuration JSON strings
func (*KafkaAdminClient) GetClusterKafkaMetadata ¶
func (k *KafkaAdminClient) GetClusterKafkaMetadata() (*ClusterKafkaMetadata, error)
func (*KafkaAdminClient) GetConnectorStatusMessages ¶ added in v0.4.2
func (k *KafkaAdminClient) GetConnectorStatusMessages(topicName string) (map[string]string, error)
GetConnectorStatusMessages retrieves status messages from the connect-status topic by consuming the last 1000 messages from each partition and tracking the most recent status for each connector based on message timestamp
func (*KafkaAdminClient) ListAcls ¶ added in v0.1.3
func (k *KafkaAdminClient) ListAcls() ([]sarama.ResourceAcls, error)
func (*KafkaAdminClient) ListTopicsWithConfigs ¶ added in v0.3.4
func (k *KafkaAdminClient) ListTopicsWithConfigs() (map[string]sarama.TopicDetail, error)
A custom implementation of the ListTopics() function in Sarama that returns all topic configs instead of just overridden configs. This was done to reduce the number of requests to the broker. https://github.com/IBM/sarama/blob/main/admin.go#L349
type MSKAccessTokenProvider ¶
type MSKAccessTokenProvider struct {
// contains filtered or unexported fields
}
MSKAccessTokenProvider implements sarama.AccessTokenProvider for MSK IAM authentication
func (*MSKAccessTokenProvider) Token ¶
func (m *MSKAccessTokenProvider) Token() (*sarama.AccessToken, error)
type SchemaRegistryConfig ¶ added in v0.4.2
type SchemaRegistryConfig struct {
// contains filtered or unexported fields
}
SchemaRegistryConfig holds the configuration for creating a Schema Registry client
type SchemaRegistryOption ¶ added in v0.4.2
type SchemaRegistryOption func(*SchemaRegistryConfig)
SchemaRegistryOption is a function type for configuring the Schema Registry client
func WithBasicAuth ¶ added in v0.4.2
func WithBasicAuth(username, password string) SchemaRegistryOption
WithBasicAuth configures the Schema Registry client to use basic authentication
func WithUnauthenticated ¶ added in v0.4.2
func WithUnauthenticated() SchemaRegistryOption
WithUnauthenticated configures the Schema Registry client to use no authentication
type XDGSCRAMClient ¶
type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶
func (x *XDGSCRAMClient) Done() bool