Documentation
¶
Overview ¶
Package kafka provides Kafka connection and client helpers.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Connect ¶
type Connect interface {
// GetInfo gets information about the Kafka Connect cluster
GetInfo(ctx context.Context) (map[string]interface{}, error)
// ListConnectors lists all connectors
ListConnectors(ctx context.Context) ([]string, error)
// GetConnector gets information about a connector
GetConnector(ctx context.Context, name string) (*ConnectorInfo, error)
// CreateConnector creates a new connector
CreateConnector(ctx context.Context, name string, config map[string]string) (*ConnectorInfo, error)
// UpdateConnector updates a connector
UpdateConnector(ctx context.Context, name string, config map[string]string) (*ConnectorInfo, error)
// DeleteConnector deletes a connector
DeleteConnector(ctx context.Context, name string) error
// PauseConnector pauses a connector
PauseConnector(ctx context.Context, name string) error
// ResumeConnector resumes a connector
ResumeConnector(ctx context.Context, name string) error
// RestartConnector restarts a connector
RestartConnector(ctx context.Context, name string) error
// GetConnectorConfig gets the configuration of a connector
GetConnectorConfig(ctx context.Context, name string) (map[string]string, error)
// GetConnectorStatus gets the status of a connector
GetConnectorStatus(ctx context.Context, name string) (*ConnectorInfo, error)
// GetConnectorTasks gets the tasks of a connector
GetConnectorTasks(ctx context.Context, name string) ([]TaskInfo, error)
// ListPlugins lists all connector plugins
ListPlugins(ctx context.Context) ([]PluginInfo, error)
// ValidateConfig validates a connector configuration
ValidateConfig(ctx context.Context, pluginClass string, config map[string]string) (map[string]interface{}, error)
}
Connect is the interface for Kafka Connect operations
func NewConnect ¶
func NewConnect(kc *KafkaContext) (Connect, error)
NewConnect creates a new Kafka Connect client
type ConnectorInfo ¶
type ConnectorInfo struct {
// Name is the connector name
Name string
// Config is the connector configuration
Config map[string]string
// Tasks is the list of tasks
Tasks []TaskInfo
// Type is the connector type (source or sink)
Type string
// State is the connector state
State ConnectorState
}
ConnectorInfo contains information about a connector
type ConnectorState ¶
type ConnectorState string
ConnectorState represents the state of a connector
const ( // ConnectorStateRunning means the connector is running ConnectorStateRunning ConnectorState = "RUNNING" // ConnectorStatePaused means the connector is paused ConnectorStatePaused ConnectorState = "PAUSED" // ConnectorStateFailed means the connector has failed ConnectorStateFailed ConnectorState = "FAILED" // ConnectorStateStopped means the connector is stopped ConnectorStateStopped ConnectorState = "STOPPED" )
type KafkaContext ¶
type KafkaContext struct {
BootstrapServers string
AuthType string
AuthMechanism string
AuthUser string
AuthPass string
UseTLS bool
ClientKeyFile string
ClientCertFile string
CaFile string
SchemaRegistryURL string
ConnectURL string
SchemaRegistryAuthUser string
SchemaRegistryAuthPass string
SchemaRegistryBearerToken string
ConnectAuthUser string
ConnectAuthPass string
}
type PluginInfo ¶
type PluginInfo struct {
// Class is the plugin class
Class string
// Type is the plugin type (source or sink)
Type string
// Version is the plugin version
Version string
}
PluginInfo contains information about a connector plugin
type SASLConfig ¶
SASLConfig holds SASL authentication configuration.
type Session ¶ added in v0.1.11
type Session struct {
Ctx KafkaContext
Client *kgo.Client
AdminClient *kadm.Client
SchemaRegistryClient *sr.Client
ConnectClient Connect
Options []kgo.Opt
// contains filtered or unexported fields
}
Session represents a Kafka session
func NewSession ¶ added in v0.1.11
func NewSession(ctx KafkaContext) (*Session, error)
NewSession creates a new Kafka session with the given context This function dynamically constructs clients without relying on global state
func (*Session) GetAdminClient ¶ added in v0.1.11
GetAdminClient returns the Kafka admin client.
func (*Session) GetClient ¶ added in v0.1.11
GetClient returns a Kafka client with optional overrides.
func (*Session) GetConnectClient ¶ added in v0.1.11
GetConnectClient returns the Kafka Connect client.
func (*Session) GetSchemaRegistryClient ¶ added in v0.1.11
GetSchemaRegistryClient returns the schema registry client.
func (*Session) SetKafkaContext ¶ added in v0.1.11
func (s *Session) SetKafkaContext(ctx KafkaContext) error
SetKafkaContext initializes Kafka clients using the provided context.