Documentation
¶
Index ¶
- Constants
- func BootstrapListenerName(roleGroupInfo *reconciler.RoleGroupInfo) string
- func CreateServiceMetricsName(roleGroupInfo *reconciler.RoleGroupInfo) string
- func DefaultLogging()
- func ExtendConfigMapByVector(ctx context.Context, params VectorConfigParams, data map[string]string)
- func GetVectorFactory(image *util.Image) *builder.Vector
- func IsVectorEnable(roleLoggingConfig *commonsv1alpha1.LoggingSpec) bool
- func KafkaContainerPorts(kafkaTlsSecurity *security.KafkaSecurity) []corev1.ContainerPort
- func MergeFromUserConfig(userConfig *kafkav1alpha1.BrokersConfigSpec, ...) error
- func NewKafkaConfigmapBuilder(client *client.Client, roleGroupInfo *reconciler.RoleGroupInfo, ...) builder.ConfigBuilder
- func NewKafkaConfigmapReconciler(ctx context.Context, client *client.Client, ...) reconciler.ResourceReconciler[builder.ConfigBuilder]
- func NewKafkaDiscoveryBuilder(client *client.Client, kafkaSecurity *security.KafkaSecurity, isNodePort bool) builder.ConfigBuilder
- func NewKafkaDiscoveryNodePortReconciler(ctx context.Context, client *client.Client, ...) reconciler.ResourceReconciler[builder.ConfigBuilder]
- func NewKafkaDiscoveryReconciler(ctx context.Context, client *client.Client, ...) reconciler.ResourceReconciler[builder.ConfigBuilder]
- func NewRoleGroupBootstrapListenerReconciler(client *client.Client, bootstrapListenerClass string, ...) reconciler.ResourceReconciler[pkg.ListenerBuidler]
- func NewRoleGroupMetricsService(client *client.Client, roleGroupInfo *reconciler.RoleGroupInfo) reconciler.Reconciler
- func NewRoleGroupService(client *client.Client, info *reconciler.RoleGroupInfo) reconciler.ResourceReconciler[builder.ServiceBuilder]
- func NewServiceAccountReconciler(client *client.Client, saName string, opts ...builder.Option) reconciler.ResourceReconciler[*builder.GenericServiceAccountBuilder]
- func NewServiceBuilder(client *client.Client, name string, ports []corev1.ContainerPort, ...) builder.ServiceBuilder
- func NewStatefulSetBuilder(ctx context.Context, client *client.Client, image *opgoutil.Image, ...) builder.StatefulSetBuilder
- func NewStatefulSetReconciler(ctx context.Context, client *client.Client, image *opgoutil.Image, ...) reconciler.ResourceReconciler[builder.StatefulSetBuilder]
- func RoleGroupConfigMapName(roleGroupInfo *reconciler.RoleGroupInfo) string
- func ServiceAccountName(crName string) string
- type BrokerReconciler
- type ContainerComponent
- type DiscoveryBuilder
- type Error
- type HostPort
- type KafkaClusterReconciler
- type KafkaConfig
- type KafkaConfigmapBuilder
- type KafkaContainerBuilder
- func (d *KafkaContainerBuilder) Command() []string
- func (d *KafkaContainerBuilder) CommandArgs() []string
- func (d *KafkaContainerBuilder) ContainerEnv() []corev1.EnvVar
- func (d *KafkaContainerBuilder) ContainerName() string
- func (d *KafkaContainerBuilder) ContainerPorts() []corev1.ContainerPort
- func (d *KafkaContainerBuilder) LaunchCommand(listeners, advertisedListers, lisenerSecurityProtocolMap string) string
- func (d *KafkaContainerBuilder) LivenessProbe() *corev1.Probe
- func (d *KafkaContainerBuilder) ReadinessProbe() *corev1.Probe
- func (d *KafkaContainerBuilder) ResourceRequirements() corev1.ResourceRequirements
- func (d *KafkaContainerBuilder) VolumeMount() []corev1.VolumeMount
- type KafkaListener
- type KafkaListenerConfig
- type KafkaListenerError
- type KafkaListenerName
- type KafkaListenerProtocol
- type OverrideConfiguration
- type Reconciler
- type ServiceBuilder
- type StatefulSetBuilder
- type VectorConfigParams
Constants ¶
const ( ServerPropertiesFilename = "server.properties" SecurityPropertiesFilename = "security.properties" Log4jPropertiesFilename = "log4j.properties" VectorConfigFilename = "vector.yaml" KafkaLog4jFilename = "kafka.log4j.xml" ConsoleConversionPattern = "[%d] %p %m (%c)%n" )
const ( ZookeeperDiscoveryKey = "ZOOKEEPER" NodePortFileName = "kafka_nodeport" )
mount
const ( EnvJvmArgs = "EXTRA_ARGS" EnvZookeeperConnections = "ZOOKEEPER" EnvKafkaLog4jOpts = "KAFKA_LOG4J_OPTS" EnvKafkaHeapOpts = "KAFKA_HEAP_OPTS" EnvNode = "NODE" EnvNodePort = "NODE_PORT" EnvPodName = "POD_NAME" )
const ( KafkaDiscoveryKey = "KAFKA" LabelListenerBootstrap = "app.kubernetes.io/listener-bootstrap" LabelValueTrue = "true" )
const ( LISTENER_LOCAL_ADDRESS = "0.0.0.0" LISTENER_NODE_ADDRESS = "$NODE" )
const ContainerVector = "vector"
const (
RoleName = "broker"
)
Variables ¶
This section is empty.
Functions ¶
func BootstrapListenerName ¶
func BootstrapListenerName(roleGroupInfo *reconciler.RoleGroupInfo) string
func CreateServiceMetricsName ¶
func CreateServiceMetricsName(roleGroupInfo *reconciler.RoleGroupInfo) string
func DefaultLogging ¶
func DefaultLogging()
func ExtendConfigMapByVector ¶
func ExtendConfigMapByVector(ctx context.Context, params VectorConfigParams, data map[string]string)
func GetVectorFactory ¶
GetVectorFactory returns a new vector factory can provide vector container, volumes
func IsVectorEnable ¶
func IsVectorEnable(roleLoggingConfig *commonsv1alpha1.LoggingSpec) bool
func KafkaContainerPorts ¶
func KafkaContainerPorts(kafkaTlsSecurity *security.KafkaSecurity) []corev1.ContainerPort
func MergeFromUserConfig ¶
func MergeFromUserConfig( userConfig *kafkav1alpha1.BrokersConfigSpec, userOverrides *commonsv1alpha1.OverridesSpec, clusterName string, ) error
func NewKafkaConfigmapBuilder ¶
func NewKafkaConfigmapBuilder( client *client.Client, roleGroupInfo *reconciler.RoleGroupInfo, clusterConfig *kafkav1alpha1.ClusterConfigSpec, kafkaTlsSecurity *security.KafkaSecurity, overrides *commonsv1alpha1.OverridesSpec, roleGroupConfig *commonsv1alpha1.RoleGroupConfigSpec, ) builder.ConfigBuilder
func NewKafkaConfigmapReconciler ¶
func NewKafkaConfigmapReconciler( ctx context.Context, client *client.Client, clusterConfig *kafkav1alpha1.ClusterConfigSpec, kafkaTlsSecurity *security.KafkaSecurity, roleGroupInf *reconciler.RoleGroupInfo, overrides *commonsv1alpha1.OverridesSpec, roleGroupConfig *commonsv1alpha1.RoleGroupConfigSpec, ) reconciler.ResourceReconciler[builder.ConfigBuilder]
func NewKafkaDiscoveryBuilder ¶
func NewKafkaDiscoveryBuilder( client *client.Client, kafkaSecurity *security.KafkaSecurity, isNodePort bool, ) builder.ConfigBuilder
func NewKafkaDiscoveryNodePortReconciler ¶
func NewKafkaDiscoveryNodePortReconciler( ctx context.Context, client *client.Client, kafkaTlsSecurity *security.KafkaSecurity, ) reconciler.ResourceReconciler[builder.ConfigBuilder]
func NewKafkaDiscoveryReconciler ¶
func NewKafkaDiscoveryReconciler( ctx context.Context, client *client.Client, kafkaTlsSecurity *security.KafkaSecurity, ) reconciler.ResourceReconciler[builder.ConfigBuilder]
func NewRoleGroupBootstrapListenerReconciler ¶
func NewRoleGroupBootstrapListenerReconciler( client *client.Client, bootstrapListenerClass string, info *reconciler.RoleGroupInfo, kafkaTlsSecurity *security.KafkaSecurity, ) reconciler.ResourceReconciler[pkg.ListenerBuidler]
func NewRoleGroupMetricsService ¶
func NewRoleGroupMetricsService( client *client.Client, roleGroupInfo *reconciler.RoleGroupInfo, ) reconciler.Reconciler
NewRoleGroupMetricsService creates a metrics service reconciler using a simple function approach This creates a headless service for metrics with Prometheus labels and annotations
func NewRoleGroupService ¶
func NewRoleGroupService( client *client.Client, info *reconciler.RoleGroupInfo, ) reconciler.ResourceReconciler[builder.ServiceBuilder]
func NewServiceAccountReconciler ¶
func NewServiceAccountReconciler( client *client.Client, saName string, opts ...builder.Option, ) reconciler.ResourceReconciler[*builder.GenericServiceAccountBuilder]
func NewServiceBuilder ¶
func NewServiceBuilder( client *client.Client, name string, ports []corev1.ContainerPort, options ...builder.ServiceBuilderOption, ) builder.ServiceBuilder
func NewStatefulSetBuilder ¶
func NewStatefulSetBuilder( ctx context.Context, client *client.Client, image *opgoutil.Image, replicas *int32, clusterConfig *kafkav1alpha1.ClusterConfigSpec, roleGroupInf *reconciler.RoleGroupInfo, brokerConfig *kafkav1alpha1.BrokersConfigSpec, overrdes *commonsv1alpha1.OverridesSpec, kafkaTlsSecurity *security.KafkaSecurity, ) builder.StatefulSetBuilder
func NewStatefulSetReconciler ¶
func NewStatefulSetReconciler( ctx context.Context, client *client.Client, image *opgoutil.Image, replicas *int32, clusterConfig *kafkav1alpha1.ClusterConfigSpec, clusterOperation *commonsv1alpha1.ClusterOperationSpec, roleGroupInf *reconciler.RoleGroupInfo, brokerConfig *kafkav1alpha1.BrokersConfigSpec, overrides *commonsv1alpha1.OverridesSpec, kafkaTlsSecurity *security.KafkaSecurity, ) reconciler.ResourceReconciler[builder.StatefulSetBuilder]
func RoleGroupConfigMapName ¶
func RoleGroupConfigMapName(roleGroupInfo *reconciler.RoleGroupInfo) string
func ServiceAccountName ¶
Types ¶
type BrokerReconciler ¶
type BrokerReconciler struct {
reconciler.BaseRoleReconciler[*kafkav1alpha1.BrokersSpec]
// contains filtered or unexported fields
}
func NewBrokerReconciler ¶
func NewBrokerReconciler( client *client.Client, roleInfo reconciler.RoleInfo, spec *kafkav1alpha1.BrokersSpec, image *opgoutil.Image, clusterConfig *kafkav1alpha1.ClusterConfigSpec, clusterOperation *commonsv1alpha1.ClusterOperationSpec, kafkaTlsSecurity *security.KafkaSecurity, ) *BrokerReconciler
func (*BrokerReconciler) RegisterResourceWithRoleGroup ¶
func (r *BrokerReconciler) RegisterResourceWithRoleGroup( ctx context.Context, replicas int32, roleGroupInfo *reconciler.RoleGroupInfo, overrides *commonsv1alpha1.OverridesSpec, brokerConfig *kafkav1alpha1.BrokersConfigSpec, ) ([]reconciler.Reconciler, error)
func (*BrokerReconciler) RegisterResources ¶
func (r *BrokerReconciler) RegisterResources(ctx context.Context) error
type ContainerComponent ¶
type ContainerComponent string
ContainerComponent use for define container name
const ( Kafka ContainerComponent = "kafka" Vector ContainerComponent = "vector" )
type DiscoveryBuilder ¶
type DiscoveryBuilder struct {
builder.ConfigMapBuilder
// contains filtered or unexported fields
}
func (*DiscoveryBuilder) Build ¶
func (b *DiscoveryBuilder) Build(ctx context.Context) (ctrlclient.Object, error)
type KafkaClusterReconciler ¶
KafkaClusterReconciler reconciles a KafkaCluster object
func (*KafkaClusterReconciler) Reconcile ¶
func (r *KafkaClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)
Reconcile is part of the main kubernetes reconciliation loop which aims to move the current state of the cluster closer to the desired state. TODO(user): Modify the Reconcile function to compare the state specified by the KafkaCluster object against the actual cluster state, and then perform operations to make the cluster state reflect the state specified by the user.
For more details, check Reconcile and its Result here: - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.15.0/pkg/reconcile
func (*KafkaClusterReconciler) SetupWithManager ¶
func (r *KafkaClusterReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager sets up the controller with the Manager.
type KafkaConfig ¶
type KafkaConfig struct {
commonsv1alpha1.RoleGroupConfigSpec
BootstrapListenerClass string
BrokerListenerClass string
RequestedSecretLifetime string
}
func DefaultKafkaConfig ¶
func DefaultKafkaConfig(clusterName string) KafkaConfig
func (*KafkaConfig) ComputeCli ¶
func (k *KafkaConfig) ComputeCli() ([]string, error)
ComputeCli implements OverrideConfiguration.
func (*KafkaConfig) ComputeEnv ¶
func (k *KafkaConfig) ComputeEnv() (map[string]string, error)
ComputeEnv implements OverrideConfiguration.
func (*KafkaConfig) ComputeFile ¶
func (k *KafkaConfig) ComputeFile() (map[string]map[string]string, error)
ComputeFile implements OverrideConfiguration.
type KafkaConfigmapBuilder ¶
type KafkaConfigmapBuilder struct {
builder.ConfigMapBuilder
ClusterConfig *kafkav1alpha1.ClusterConfigSpec
ClusterName string
RoleName string
RoleGroupName string
// contains filtered or unexported fields
}
func (*KafkaConfigmapBuilder) Build ¶
func (b *KafkaConfigmapBuilder) Build(ctx context.Context) (ctrlclient.Object, error)
type KafkaContainerBuilder ¶
type KafkaContainerBuilder struct {
*security.KafkaSecurity
// contains filtered or unexported fields
}
func NewKafkaContainer ¶
func NewKafkaContainer( image string, imagePullPolicy corev1.PullPolicy, zookeeperDiscoveryZNode string, tlsSecurity *security.KafkaSecurity, namespace string, groupSvcName string, ) *KafkaContainerBuilder
func (*KafkaContainerBuilder) Command ¶
func (d *KafkaContainerBuilder) Command() []string
func (*KafkaContainerBuilder) CommandArgs ¶
func (d *KafkaContainerBuilder) CommandArgs() []string
CommandArgs command args ex: export NODE_PORT=$(cat /kubedoop/tmp/kafka_nodepor
func (*KafkaContainerBuilder) ContainerEnv ¶
func (d *KafkaContainerBuilder) ContainerEnv() []corev1.EnvVar
func (*KafkaContainerBuilder) ContainerName ¶
func (d *KafkaContainerBuilder) ContainerName() string
func (*KafkaContainerBuilder) ContainerPorts ¶
func (d *KafkaContainerBuilder) ContainerPorts() []corev1.ContainerPort
ContainerPorts make container ports of data node
func (*KafkaContainerBuilder) LaunchCommand ¶
func (d *KafkaContainerBuilder) LaunchCommand(listeners, advertisedListers, lisenerSecurityProtocolMap string) string
kafka launch command
func (*KafkaContainerBuilder) LivenessProbe ¶
func (d *KafkaContainerBuilder) LivenessProbe() *corev1.Probe
func (*KafkaContainerBuilder) ReadinessProbe ¶
func (d *KafkaContainerBuilder) ReadinessProbe() *corev1.Probe
func (*KafkaContainerBuilder) ResourceRequirements ¶
func (d *KafkaContainerBuilder) ResourceRequirements() corev1.ResourceRequirements
func (*KafkaContainerBuilder) VolumeMount ¶
func (d *KafkaContainerBuilder) VolumeMount() []corev1.VolumeMount
type KafkaListener ¶
type KafkaListener struct {
Name KafkaListenerName
Host string
Port string
}
func (KafkaListener) String ¶
func (kl KafkaListener) String() string
type KafkaListenerConfig ¶
type KafkaListenerConfig struct {
Listeners []KafkaListener
AdvertisedListeners []KafkaListener
ListenerSecurityProtocolMap map[KafkaListenerName]KafkaListenerProtocol
}
func GetKafkaListenerConfig ¶
func GetKafkaListenerConfig( namespace string, kafkaSecurity *security.KafkaSecurity, objectName string, ) (*KafkaListenerConfig, error)
func (*KafkaListenerConfig) AdvertisedListenersString ¶
func (config *KafkaListenerConfig) AdvertisedListenersString() string
func (*KafkaListenerConfig) ListenerSecurityProtocolMapString ¶
func (config *KafkaListenerConfig) ListenerSecurityProtocolMapString() string
func (*KafkaListenerConfig) ListenersString ¶
func (config *KafkaListenerConfig) ListenersString() string
type KafkaListenerError ¶
type KafkaListenerError struct {
Message string
}
func (*KafkaListenerError) Error ¶
func (e *KafkaListenerError) Error() string
type KafkaListenerName ¶
type KafkaListenerName string
const ( Client KafkaListenerName = "CLIENT" ClientAuth KafkaListenerName = "CLIENT_AUTH" Internal KafkaListenerName = "INTERNAL" Bootstrap KafkaListenerName = "BOOTSTRAP" )
type KafkaListenerProtocol ¶
type KafkaListenerProtocol string
const ( Plaintext KafkaListenerProtocol = "PLAINTEXT" Ssl KafkaListenerProtocol = "SSL" )
type OverrideConfiguration ¶
type Reconciler ¶
type Reconciler struct {
reconciler.BaseCluster[*kafkav1alpha1.KafkaClusterSpec]
ClusterConfig *kafkav1alpha1.ClusterConfigSpec
ClusterOperation *commonsv1alpha1.ClusterOperationSpec
}
func NewClusterReconciler ¶
func NewClusterReconciler( client *resourceClient.Client, clusterInfo reconciler.ClusterInfo, spec *kafkav1alpha1.KafkaClusterSpec, ) *Reconciler
func (*Reconciler) GetImage ¶
func (r *Reconciler) GetImage() *util.Image
func (*Reconciler) RegisterResources ¶
func (r *Reconciler) RegisterResources(ctx context.Context) error
type ServiceBuilder ¶
type ServiceBuilder struct {
*builder.BaseServiceBuilder
}
func (*ServiceBuilder) GetObject ¶
func (b *ServiceBuilder) GetObject() *corev1.Service
type StatefulSetBuilder ¶
type StatefulSetBuilder struct {
builder.StatefulSet
ClusterConfig *kafkav1alpha1.ClusterConfigSpec
// contains filtered or unexported fields
}
func (*StatefulSetBuilder) Build ¶
func (b *StatefulSetBuilder) Build(ctx context.Context) (ctrlclient.Object, error)
func (*StatefulSetBuilder) GetObject ¶
func (b *StatefulSetBuilder) GetObject() (*appv1.StatefulSet, error)
type VectorConfigParams ¶
type VectorConfigParams struct {
Client client.Client
ClusterConfig *kafkav1alpha1.ClusterConfigSpec
Namespace string
InstanceName string
Role string
GroupName string
}