Documentation
¶
Index ¶
- Constants
- func MaskDiff(topics []string, maskFile string, desiredVersion string, currentVersion string, ...) ([]string, []string, []string, error)
- func NewRedshiftConn(client client.Client, secretName, secretNamespace string, database *string) (*redshift.Redshift, error)
- func NewRedshiftConnection(secret map[string]string, schema string) (*redshift.Redshift, error)
- type Batcher
- func (b Batcher) Config() *corev1.ConfigMap
- func (b Batcher) Deployment() *appsv1.Deployment
- func (b Batcher) Name() string
- func (b Batcher) Namespace() string
- func (b Batcher) Topics() []string
- func (b Batcher) UpdateConfig(current *corev1.ConfigMap) bool
- func (b Batcher) UpdateDeployment(current *appsv1.Deployment) bool
- type ConfigMapCreatedEvent
- type ConfigMapDeletedEvent
- type Deployment
- type DeploymentCreatedEvent
- type DeploymentDeletedEvent
- type DeploymentUpdatedEvent
- type Loader
- func (l Loader) Config() *corev1.ConfigMap
- func (l Loader) Deployment() *appsv1.Deployment
- func (l Loader) Name() string
- func (l Loader) Namespace() string
- func (l Loader) Topics() []string
- func (l Loader) UpdateConfig(current *corev1.ConfigMap) bool
- func (l Loader) UpdateDeployment(current *appsv1.Deployment) bool
- type ReconcilerEvent
- type RedshiftSinkReconciler
Constants ¶
View Source
const ( BatcherTag = "batcher" BatcherLabelInstance = "redshiftbatcher" )
View Source
const ( LoaderTag = "loader" LoaderLabelInstance = "redshiftloader" )
View Source
const ( K8sEventTypeNormal = "Normal" K8sEventTypeWarning = "Warning" )
View Source
const ( AllSinkGroup = "all" MainSinkGroup = "main" ReloadSinkGroup = "reload" ReloadDupeSinkGroup = "reload-dupe" DefaultMaxBatcherLag = int64(100) DefautMaxLoaderLag = int64(10) ReloadTableSuffix = "_ts_adx_reload" )
View Source
const ( InstanceLabel = "app.kubernetes.io/instance" InstanceName = "practo.dev/name" SinkGroupLabel = "practo.dev/sinkgroup" RskResource = "practo.dev/resource" )
View Source
const (
MaxTopicRelease = 5
)
Variables ¶
This section is empty.
Functions ¶
func MaskDiff ¶
func MaskDiff( topics []string, maskFile string, desiredVersion string, currentVersion string, gitToken string, kafkaTopicsCache *sync.Map, includeTablesCache *sync.Map, ) ( []string, []string, []string, error, )
MaskDiff reads two database mask configurations and returns the list of topics whose mask values has changed. returns the updated list of kafka topics return the list of include tables based on desired mask config
func NewRedshiftConn ¶
Types ¶
type Batcher ¶
type Batcher struct {
// contains filtered or unexported fields
}
func (Batcher) Deployment ¶
func (b Batcher) Deployment() *appsv1.Deployment
func (Batcher) UpdateDeployment ¶
func (b Batcher) UpdateDeployment(current *appsv1.Deployment) bool
type ConfigMapCreatedEvent ¶
func (ConfigMapCreatedEvent) Record ¶
func (d ConfigMapCreatedEvent) Record(recorder record.EventRecorder)
type ConfigMapDeletedEvent ¶
func (ConfigMapDeletedEvent) Record ¶
func (d ConfigMapDeletedEvent) Record(recorder record.EventRecorder)
type Deployment ¶
type Deployment interface {
Name() string
Namespace() string
Config() *corev1.ConfigMap
Deployment() *appsv1.Deployment
UpdateConfig(current *corev1.ConfigMap) bool
UpdateDeployment(current *appsv1.Deployment) bool
Topics() []string
}
func NewBatcher ¶
func NewBatcher( name string, rsk *tipocav1.RedshiftSink, maskFileVersion string, secret map[string]string, sinkGroup string, sinkGroupSpec *tipocav1.SinkGroupSpec, consumerGroups map[string]consumerGroup, defaultImage string, defaultKafkaVersion string, tlsConfig *kafka.TLSConfig, ) ( Deployment, error, )
func NewLoader ¶
func NewLoader( name string, rsk *tipocav1.RedshiftSink, tableSuffix string, secret map[string]string, sinkGroup string, sinkGroupSpec *tipocav1.SinkGroupSpec, consumerGroups map[string]consumerGroup, defaultImage string, defaultKafkaVersion string, tlsConfig *kafka.TLSConfig, defaultMaxOpenConns int, defaultMaxIdleConns int, prometheusURL string, redshiftMetrics bool, ) ( Deployment, error, )
type DeploymentCreatedEvent ¶
func (DeploymentCreatedEvent) Record ¶
func (d DeploymentCreatedEvent) Record(recorder record.EventRecorder)
type DeploymentDeletedEvent ¶
func (DeploymentDeletedEvent) Record ¶
func (d DeploymentDeletedEvent) Record(recorder record.EventRecorder)
type DeploymentUpdatedEvent ¶
func (DeploymentUpdatedEvent) Record ¶
func (d DeploymentUpdatedEvent) Record(recorder record.EventRecorder)
type Loader ¶
type Loader struct {
// contains filtered or unexported fields
}
func (Loader) Deployment ¶
func (l Loader) Deployment() *appsv1.Deployment
func (Loader) UpdateDeployment ¶
func (l Loader) UpdateDeployment(current *appsv1.Deployment) bool
type ReconcilerEvent ¶
type ReconcilerEvent interface {
// Record this into an event recorder as a Kubernetes API event
Record(recorder record.EventRecorder)
}
ReconcilerEvent represents the action of the operator having actually done anything. Any meaningful change should result in one of these.
type RedshiftSinkReconciler ¶
type RedshiftSinkReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
KafkaTopicRegexes *sync.Map
KafkaClients *sync.Map
KafkaTopicsCache *sync.Map
KafkaRealtimeCache *sync.Map
ReleaseCache *sync.Map
GitCache *sync.Map
IncludeTablesCache *sync.Map
DefaultBatcherImage string
DefaultLoaderImage string
DefaultSecretRefName string
DefaultSecretRefNamespace string
DefaultKafkaVersion string
DefaultRedshiftMaxIdleConns int
DefaultRedshiftMaxOpenConns int
AllowedResources []string
PrometheusClient prometheus.Client
RedshiftMetrics bool
}
RedshiftSinkReconciler reconciles a RedshiftSink object
func (*RedshiftSinkReconciler) SetupWithManager ¶
func (r *RedshiftSinkReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager sets up the controller and applies all controller configs
Click to show internal directories.
Click to hide internal directories.