kafka

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2025 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var KafkaAdminClient *kadm.Client
View Source
var KafkaClient *kgo.Client
View Source
var KafkaSchemaRegistryClient *sr.Client

Functions

func GetKafkaAdminClient

func GetKafkaAdminClient() (*kadm.Client, error)

func GetKafkaClient

func GetKafkaClient(opts ...kgo.Opt) (*kgo.Client, error)

func GetKafkaSchemaRegistryClient

func GetKafkaSchemaRegistryClient() (*sr.Client, error)

func NewCurrentKafkaContext

func NewCurrentKafkaContext(kc KafkaContext) error

func ResetCurrentKafkaContext

func ResetCurrentKafkaContext()

Types

type Connect

type Connect interface {
	// GetInfo gets information about the Kafka Connect cluster
	GetInfo(ctx context.Context) (map[string]interface{}, error)
	// ListConnectors lists all connectors
	ListConnectors(ctx context.Context) ([]string, error)
	// GetConnector gets information about a connector
	GetConnector(ctx context.Context, name string) (*ConnectorInfo, error)
	// CreateConnector creates a new connector
	CreateConnector(ctx context.Context, name string, config map[string]string) (*ConnectorInfo, error)
	// UpdateConnector updates a connector
	UpdateConnector(ctx context.Context, name string, config map[string]string) (*ConnectorInfo, error)
	// DeleteConnector deletes a connector
	DeleteConnector(ctx context.Context, name string) error
	// PauseConnector pauses a connector
	PauseConnector(ctx context.Context, name string) error
	// ResumeConnector resumes a connector
	ResumeConnector(ctx context.Context, name string) error
	// RestartConnector restarts a connector
	RestartConnector(ctx context.Context, name string) error
	// GetConnectorConfig gets the configuration of a connector
	GetConnectorConfig(ctx context.Context, name string) (map[string]string, error)
	// GetConnectorStatus gets the status of a connector
	GetConnectorStatus(ctx context.Context, name string) (*ConnectorInfo, error)
	// GetConnectorTasks gets the tasks of a connector
	GetConnectorTasks(ctx context.Context, name string) ([]TaskInfo, error)
	// ListPlugins lists all connector plugins
	ListPlugins(ctx context.Context) ([]PluginInfo, error)
	// ValidateConfig validates a connector configuration
	ValidateConfig(ctx context.Context, pluginClass string, config map[string]string) (map[string]interface{}, error)
}

Connect is the interface for Kafka Connect operations

var KafkaConnectClient Connect

func GetKafkaConnectClient

func GetKafkaConnectClient() (Connect, error)

func NewConnect

func NewConnect(kc *KafkaContext) (Connect, error)

NewConnect creates a new Kafka Connect client

type ConnectorInfo

type ConnectorInfo struct {
	// Name is the connector name
	Name string
	// Config is the connector configuration
	Config map[string]string
	// Tasks is the list of tasks
	Tasks []TaskInfo
	// Type is the connector type (source or sink)
	Type string
	// State is the connector state
	State ConnectorState
}

ConnectorInfo contains information about a connector

type ConnectorState

type ConnectorState string

ConnectorState represents the state of a connector

const (
	// ConnectorStateRunning means the connector is running
	ConnectorStateRunning ConnectorState = "RUNNING"
	// ConnectorStatePaused means the connector is paused
	ConnectorStatePaused ConnectorState = "PAUSED"
	// ConnectorStateFailed means the connector has failed
	ConnectorStateFailed ConnectorState = "FAILED"
	// ConnectorStateStopped means the connector is stopped
	ConnectorStateStopped ConnectorState = "STOPPED"
)

type KafkaContext

type KafkaContext struct {
	BootstrapServers  string
	AuthType          string
	AuthMechanism     string
	AuthUser          string
	AuthPass          string
	UseTLS            bool
	ClientKeyFile     string
	ClientCertFile    string
	CaFile            string
	SchemaRegistryURL string
	ConnectURL        string

	SchemaRegistryAuthUser    string
	SchemaRegistryAuthPass    string
	SchemaRegistryBearerToken string

	ConnectAuthUser string
	ConnectAuthPass string
}
var CurrentKafkaContext KafkaContext

func (*KafkaContext) SetKafkaContext

func (kc *KafkaContext) SetKafkaContext() error

type PluginInfo

type PluginInfo struct {
	// Class is the plugin class
	Class string
	// Type is the plugin type (source or sink)
	Type string
	// Version is the plugin version
	Version string
}

PluginInfo contains information about a connector plugin

type SASLConfig

type SASLConfig struct {
	Mechanism string
	Username  string
	Password  string
}

type TLSConfig

type TLSConfig struct {
	Enabled        bool
	ClientKeyFile  string
	ClientCertFile string
	CaFile         string
}

type TaskInfo

type TaskInfo struct {
	// ID is the task ID
	ID int
	// State is the task state
	State ConnectorState
	// WorkerID is the worker ID
	WorkerID string
	// Trace is the error trace (if any)
	Trace string
}

TaskInfo contains information about a connector task

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL