controller

package
v0.0.0-...-d71a2e0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2026 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ServerPropertiesFilename   = "server.properties"
	SecurityPropertiesFilename = "security.properties"
	Log4jPropertiesFilename    = "log4j.properties"
	VectorConfigFilename       = "vector.yaml"

	KafkaLog4jFilename = "kafka.log4j.xml"

	ConsoleConversionPattern = "[%d] %p %m (%c)%n"
)
View Source
const (
	ZookeeperDiscoveryKey = "ZOOKEEPER"
	NodePortFileName      = "kafka_nodeport"
)

mount

View Source
const (
	EnvJvmArgs              = "EXTRA_ARGS"
	EnvZookeeperConnections = "ZOOKEEPER"
	EnvKafkaLog4jOpts       = "KAFKA_LOG4J_OPTS"
	EnvKafkaHeapOpts        = "KAFKA_HEAP_OPTS"
	EnvNode                 = "NODE"
	EnvNodePort             = "NODE_PORT"
	EnvPodName              = "POD_NAME"
)
View Source
const (
	KafkaDiscoveryKey = "KAFKA"

	LabelListenerBootstrap = "app.kubernetes.io/listener-bootstrap"
	LabelValueTrue         = "true"
)
View Source
const (
	LISTENER_LOCAL_ADDRESS = "0.0.0.0"
	LISTENER_NODE_ADDRESS  = "$NODE"
)
View Source
const ContainerVector = "vector"
View Source
const (
	RoleName = "broker"
)

Variables

This section is empty.

Functions

func BootstrapListenerName

func BootstrapListenerName(roleGroupInfo *reconciler.RoleGroupInfo) string

func CreateServiceMetricsName

func CreateServiceMetricsName(roleGroupInfo *reconciler.RoleGroupInfo) string

func DefaultLogging

func DefaultLogging()

func ExtendConfigMapByVector

func ExtendConfigMapByVector(ctx context.Context, params VectorConfigParams, data map[string]string)

func GetVectorFactory

func GetVectorFactory(
	image *util.Image,
) *builder.Vector

GetVectorFactory returns a new vector factory can provide vector container, volumes

func IsVectorEnable

func IsVectorEnable(roleLoggingConfig *commonsv1alpha1.LoggingSpec) bool

func KafkaContainerPorts

func KafkaContainerPorts(kafkaTlsSecurity *security.KafkaSecurity) []corev1.ContainerPort

func MergeFromUserConfig

func MergeFromUserConfig(
	userConfig *kafkav1alpha1.BrokersConfigSpec,
	userOverrides *commonsv1alpha1.OverridesSpec,
	clusterName string,
) error

func NewKafkaConfigmapBuilder

func NewKafkaConfigmapBuilder(
	client *client.Client,
	roleGroupInfo *reconciler.RoleGroupInfo,
	clusterConfig *kafkav1alpha1.ClusterConfigSpec,
	kafkaTlsSecurity *security.KafkaSecurity,
	overrides *commonsv1alpha1.OverridesSpec,
	roleGroupConfig *commonsv1alpha1.RoleGroupConfigSpec,
) builder.ConfigBuilder

func NewKafkaDiscoveryBuilder

func NewKafkaDiscoveryBuilder(
	client *client.Client,
	kafkaSecurity *security.KafkaSecurity,
	isNodePort bool,
) builder.ConfigBuilder

func NewKafkaDiscoveryNodePortReconciler

func NewKafkaDiscoveryNodePortReconciler(
	ctx context.Context,
	client *client.Client,
	kafkaTlsSecurity *security.KafkaSecurity,
) reconciler.ResourceReconciler[builder.ConfigBuilder]

func NewKafkaDiscoveryReconciler

func NewKafkaDiscoveryReconciler(
	ctx context.Context,
	client *client.Client,
	kafkaTlsSecurity *security.KafkaSecurity,
) reconciler.ResourceReconciler[builder.ConfigBuilder]

func NewRoleGroupBootstrapListenerReconciler

func NewRoleGroupBootstrapListenerReconciler(
	client *client.Client,
	bootstrapListenerClass string,
	info *reconciler.RoleGroupInfo,
	kafkaTlsSecurity *security.KafkaSecurity,
) reconciler.ResourceReconciler[pkg.ListenerBuidler]

func NewRoleGroupMetricsService

func NewRoleGroupMetricsService(
	client *client.Client,
	roleGroupInfo *reconciler.RoleGroupInfo,
) reconciler.Reconciler

NewRoleGroupMetricsService creates a metrics service reconciler using a simple function approach This creates a headless service for metrics with Prometheus labels and annotations

func NewServiceBuilder

func NewServiceBuilder(
	client *client.Client,
	name string,
	ports []corev1.ContainerPort,
	options ...builder.ServiceBuilderOption,
) builder.ServiceBuilder

func NewStatefulSetBuilder

func NewStatefulSetBuilder(
	ctx context.Context,
	client *client.Client,
	image *opgoutil.Image,
	replicas *int32,
	clusterConfig *kafkav1alpha1.ClusterConfigSpec,
	roleGroupInf *reconciler.RoleGroupInfo,
	brokerConfig *kafkav1alpha1.BrokersConfigSpec,
	overrdes *commonsv1alpha1.OverridesSpec,
	kafkaTlsSecurity *security.KafkaSecurity,
) builder.StatefulSetBuilder

func NewStatefulSetReconciler

func NewStatefulSetReconciler(
	ctx context.Context,
	client *client.Client,
	image *opgoutil.Image,
	replicas *int32,
	clusterConfig *kafkav1alpha1.ClusterConfigSpec,
	clusterOperation *commonsv1alpha1.ClusterOperationSpec,
	roleGroupInf *reconciler.RoleGroupInfo,
	brokerConfig *kafkav1alpha1.BrokersConfigSpec,
	overrides *commonsv1alpha1.OverridesSpec,
	kafkaTlsSecurity *security.KafkaSecurity,
) reconciler.ResourceReconciler[builder.StatefulSetBuilder]

func RoleGroupConfigMapName

func RoleGroupConfigMapName(roleGroupInfo *reconciler.RoleGroupInfo) string

func ServiceAccountName

func ServiceAccountName(crName string) string

Types

type BrokerReconciler

type BrokerReconciler struct {
	reconciler.BaseRoleReconciler[*kafkav1alpha1.BrokersSpec]
	// contains filtered or unexported fields
}

func NewBrokerReconciler

func NewBrokerReconciler(
	client *client.Client,
	roleInfo reconciler.RoleInfo,
	spec *kafkav1alpha1.BrokersSpec,
	image *opgoutil.Image,
	clusterConfig *kafkav1alpha1.ClusterConfigSpec,
	clusterOperation *commonsv1alpha1.ClusterOperationSpec,
	kafkaTlsSecurity *security.KafkaSecurity,
) *BrokerReconciler

func (*BrokerReconciler) RegisterResourceWithRoleGroup

func (r *BrokerReconciler) RegisterResourceWithRoleGroup(
	ctx context.Context,
	replicas int32,
	roleGroupInfo *reconciler.RoleGroupInfo,
	overrides *commonsv1alpha1.OverridesSpec,
	brokerConfig *kafkav1alpha1.BrokersConfigSpec,
) ([]reconciler.Reconciler, error)

func (*BrokerReconciler) RegisterResources

func (r *BrokerReconciler) RegisterResources(ctx context.Context) error

type ContainerComponent

type ContainerComponent string

ContainerComponent use for define container name

const (
	Kafka  ContainerComponent = "kafka"
	Vector ContainerComponent = "vector"
)

type DiscoveryBuilder

type DiscoveryBuilder struct {
	builder.ConfigMapBuilder
	// contains filtered or unexported fields
}

func (*DiscoveryBuilder) Build

type Error

type Error struct {
	// contains filtered or unexported fields
}

func (*Error) Error

func (e *Error) Error() string

type HostPort

type HostPort struct {
	Host string
	Port int32
}

type KafkaClusterReconciler

type KafkaClusterReconciler struct {
	ctrlclient.Client
	Scheme *runtime.Scheme
	Log    logr.Logger
}

KafkaClusterReconciler reconciles a KafkaCluster object

func (*KafkaClusterReconciler) Reconcile

func (r *KafkaClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)

Reconcile is part of the main kubernetes reconciliation loop which aims to move the current state of the cluster closer to the desired state. TODO(user): Modify the Reconcile function to compare the state specified by the KafkaCluster object against the actual cluster state, and then perform operations to make the cluster state reflect the state specified by the user.

For more details, check Reconcile and its Result here: - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.15.0/pkg/reconcile

func (*KafkaClusterReconciler) SetupWithManager

func (r *KafkaClusterReconciler) SetupWithManager(mgr ctrl.Manager) error

SetupWithManager sets up the controller with the Manager.

type KafkaConfig

type KafkaConfig struct {
	commonsv1alpha1.RoleGroupConfigSpec

	BootstrapListenerClass string

	BrokerListenerClass string

	RequestedSecretLifetime string
}

func DefaultKafkaConfig

func DefaultKafkaConfig(clusterName string) KafkaConfig

func (*KafkaConfig) ComputeCli

func (k *KafkaConfig) ComputeCli() ([]string, error)

ComputeCli implements OverrideConfiguration.

func (*KafkaConfig) ComputeEnv

func (k *KafkaConfig) ComputeEnv() (map[string]string, error)

ComputeEnv implements OverrideConfiguration.

func (*KafkaConfig) ComputeFile

func (k *KafkaConfig) ComputeFile() (map[string]map[string]string, error)

ComputeFile implements OverrideConfiguration.

type KafkaConfigmapBuilder

type KafkaConfigmapBuilder struct {
	builder.ConfigMapBuilder

	ClusterConfig *kafkav1alpha1.ClusterConfigSpec

	ClusterName   string
	RoleName      string
	RoleGroupName string
	// contains filtered or unexported fields
}

func (*KafkaConfigmapBuilder) Build

type KafkaContainerBuilder

type KafkaContainerBuilder struct {
	*security.KafkaSecurity
	// contains filtered or unexported fields
}

func NewKafkaContainer

func NewKafkaContainer(
	image string,
	imagePullPolicy corev1.PullPolicy,
	zookeeperDiscoveryZNode string,
	tlsSecurity *security.KafkaSecurity,
	namespace string,
	groupSvcName string,
) *KafkaContainerBuilder

func (*KafkaContainerBuilder) Command

func (d *KafkaContainerBuilder) Command() []string

func (*KafkaContainerBuilder) CommandArgs

func (d *KafkaContainerBuilder) CommandArgs() []string

CommandArgs command args ex: export NODE_PORT=$(cat /kubedoop/tmp/kafka_nodepor

func (*KafkaContainerBuilder) ContainerEnv

func (d *KafkaContainerBuilder) ContainerEnv() []corev1.EnvVar

func (*KafkaContainerBuilder) ContainerName

func (d *KafkaContainerBuilder) ContainerName() string

func (*KafkaContainerBuilder) ContainerPorts

func (d *KafkaContainerBuilder) ContainerPorts() []corev1.ContainerPort

ContainerPorts make container ports of data node

func (*KafkaContainerBuilder) LaunchCommand

func (d *KafkaContainerBuilder) LaunchCommand(listeners, advertisedListers, lisenerSecurityProtocolMap string) string

kafka launch command

func (*KafkaContainerBuilder) LivenessProbe

func (d *KafkaContainerBuilder) LivenessProbe() *corev1.Probe

func (*KafkaContainerBuilder) ReadinessProbe

func (d *KafkaContainerBuilder) ReadinessProbe() *corev1.Probe

func (*KafkaContainerBuilder) ResourceRequirements

func (d *KafkaContainerBuilder) ResourceRequirements() corev1.ResourceRequirements

func (*KafkaContainerBuilder) VolumeMount

func (d *KafkaContainerBuilder) VolumeMount() []corev1.VolumeMount

type KafkaListener

type KafkaListener struct {
	Name KafkaListenerName
	Host string
	Port string
}

func (KafkaListener) String

func (kl KafkaListener) String() string

type KafkaListenerConfig

type KafkaListenerConfig struct {
	Listeners                   []KafkaListener
	AdvertisedListeners         []KafkaListener
	ListenerSecurityProtocolMap map[KafkaListenerName]KafkaListenerProtocol
}

func GetKafkaListenerConfig

func GetKafkaListenerConfig(
	namespace string,
	kafkaSecurity *security.KafkaSecurity,
	objectName string,
) (*KafkaListenerConfig, error)

func (*KafkaListenerConfig) AdvertisedListenersString

func (config *KafkaListenerConfig) AdvertisedListenersString() string

func (*KafkaListenerConfig) ListenerSecurityProtocolMapString

func (config *KafkaListenerConfig) ListenerSecurityProtocolMapString() string

func (*KafkaListenerConfig) ListenersString

func (config *KafkaListenerConfig) ListenersString() string

type KafkaListenerError

type KafkaListenerError struct {
	Message string
}

func (*KafkaListenerError) Error

func (e *KafkaListenerError) Error() string

type KafkaListenerName

type KafkaListenerName string
const (
	Client     KafkaListenerName = "CLIENT"
	ClientAuth KafkaListenerName = "CLIENT_AUTH"
	Internal   KafkaListenerName = "INTERNAL"
	Bootstrap  KafkaListenerName = "BOOTSTRAP"
)

type KafkaListenerProtocol

type KafkaListenerProtocol string
const (
	Plaintext KafkaListenerProtocol = "PLAINTEXT"
	Ssl       KafkaListenerProtocol = "SSL"
)

type OverrideConfiguration

type OverrideConfiguration interface {
	ComputeEnv() (map[string]string, error)
	ComputeFile() (map[string]map[string]string, error)
	ComputeCli() ([]string, error)
}

type Reconciler

func NewClusterReconciler

func NewClusterReconciler(
	client *resourceClient.Client,
	clusterInfo reconciler.ClusterInfo,
	spec *kafkav1alpha1.KafkaClusterSpec,
) *Reconciler

func (*Reconciler) GetImage

func (r *Reconciler) GetImage() *util.Image

func (*Reconciler) RegisterResources

func (r *Reconciler) RegisterResources(ctx context.Context) error

type ServiceBuilder

type ServiceBuilder struct {
	*builder.BaseServiceBuilder
}

func (*ServiceBuilder) GetObject

func (b *ServiceBuilder) GetObject() *corev1.Service

type StatefulSetBuilder

type StatefulSetBuilder struct {
	builder.StatefulSet
	ClusterConfig *kafkav1alpha1.ClusterConfigSpec
	// contains filtered or unexported fields
}

func (*StatefulSetBuilder) Build

func (*StatefulSetBuilder) GetObject

func (b *StatefulSetBuilder) GetObject() (*appv1.StatefulSet, error)

func (*StatefulSetBuilder) Volumes

func (b *StatefulSetBuilder) Volumes() ([]corev1.Volume, error)

Volumes

type VectorConfigParams

type VectorConfigParams struct {
	Client        client.Client
	ClusterConfig *kafkav1alpha1.ClusterConfigSpec
	Namespace     string
	InstanceName  string
	Role          string
	GroupName     string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL