Documentation
¶
Index ¶
- Constants
- type K8sService
- func (s *K8sService) ApplyGatewayYAML(ctx context.Context, namespace, gatewayName string, yamlData []byte) error
- func (s *K8sService) CheckPermissions(ctx context.Context, verb, resource, group, namespace string) (bool, error)
- func (s *K8sService) GetGatewayPodUIDs(ctx context.Context, namespace, gatewayName string) (map[types.UID]struct{}, error)
- func (s *K8sService) GetGatewayYAML(ctx context.Context, namespace, gatewayName string) ([]byte, error)
- func (s *K8sService) ValidateGatewayCRs(initialYAML, fencedYAML, switchoverYAML []byte) error
- func (s *K8sService) WaitForGatewayPods(ctx context.Context, namespace, gatewayName string, ...) error
- type PodRolloutProgress
- type Service
Constants ¶
const ( // ConfluentNamespace = "kcp" GatewayGroup = "platform.confluent.io" GatewayVersion = "v1beta1" GatewayResourcePlural = "gateways" )
Kubernetes resource constants
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type K8sService ¶
type K8sService struct {
// contains filtered or unexported fields
}
K8sService implements gateway operations using Kubernetes clients
func NewK8sService ¶
func NewK8sService(kubeConfigPath string) *K8sService
NewK8sService creates a new gateway service
func (*K8sService) ApplyGatewayYAML ¶
func (s *K8sService) ApplyGatewayYAML(ctx context.Context, namespace, gatewayName string, yamlData []byte) error
ApplyGatewayYAML applies a complete gateway CR YAML to the cluster using server-side apply
func (*K8sService) CheckPermissions ¶
func (s *K8sService) CheckPermissions(ctx context.Context, verb, resource, group, namespace string) (bool, error)
CheckPermissions checks if the user has the required Kubernetes permissions
func (*K8sService) GetGatewayPodUIDs ¶
func (s *K8sService) GetGatewayPodUIDs(ctx context.Context, namespace, gatewayName string) (map[types.UID]struct{}, error)
GetGatewayPodUIDs returns a set of UIDs for the current gateway pods. This should be called BEFORE patching the gateway to capture the initial pod state.
func (*K8sService) GetGatewayYAML ¶
func (s *K8sService) GetGatewayYAML(ctx context.Context, namespace, gatewayName string) ([]byte, error)
GetGatewayYAML retrieves the gateway resource as YAML
func (*K8sService) ValidateGatewayCRs ¶
func (s *K8sService) ValidateGatewayCRs(initialYAML, fencedYAML, switchoverYAML []byte) error
ValidateGatewayCRs validates that the initial, fenced, and switchover gateway CRs are consistent
func (*K8sService) WaitForGatewayPods ¶
func (s *K8sService) WaitForGatewayPods(ctx context.Context, namespace, gatewayName string, initialPodUIDs map[types.UID]struct{}, pollInterval, timeout time.Duration, onProgress func(PodRolloutProgress)) error
WaitForGatewayPods waits for all gateway pods to be completely replaced after a config change.
After patching the Gateway CRD, the Confluent operator triggers a rolling restart of gateway pods. This method polls until all initial pods are replaced with new pods and all are ready, ensuring the rollout is truly complete even when maxSurge temporarily creates extra pods.
The initialPodUIDs parameter must contain the pod UIDs captured BEFORE the gateway patch is applied. This is critical to avoid race conditions where the rollout completes before we capture the initial state.
The method works in two phases:
- Wait for change detection: Wait up to 10 seconds to detect if rollout starts
- Wait for complete replacement: Ensure all initial pods are replaced and new pods are ready
This prevents returning prematurely when maxSurge creates extra pods during the rollout.
type PodRolloutProgress ¶
type PodRolloutProgress struct {
InitialPodCount int
NewPodsReady int
OldPodsRemaining int
RolloutDetected bool
}
PodRolloutProgress reports the current state of a pod rollout
type Service ¶
type Service interface {
GetGatewayYAML(ctx context.Context, namespace, gatewayName string) ([]byte, error)
ValidateGatewayCRs(initialYAML, fencedYAML, switchoverYAML []byte) error
CheckPermissions(ctx context.Context, verb, resource, group, namespace string) (bool, error)
ApplyGatewayYAML(ctx context.Context, namespace, gatewayName string, yamlData []byte) error
GetGatewayPodUIDs(ctx context.Context, namespace, gatewayName string) (map[types.UID]struct{}, error)
WaitForGatewayPods(ctx context.Context, namespace, gatewayName string, initialPodUIDs map[types.UID]struct{}, pollInterval, timeout time.Duration, onProgress func(PodRolloutProgress)) error
}
Service defines gateway operations