Documentation
¶
Index ¶
- func GenExpectationPodsKey(jobKey, replicaType string) string
- func GenExpectationServicesKey(jobKey, replicaType string) string
- func GenGeneralName(jobName, rtype, index string) string
- func RecheckDeletionTimestamp(getObject func() (metav1.Object, error)) func() error
- type ControllerInterface
- type JobController
- func (jc *JobController) AddPod(obj interface{})
- func (jc *JobController) AddService(obj interface{})
- func (jc *JobController) DeletePdb(job metav1.Object) error
- func (jc *JobController) DeletePod(obj interface{})
- func (jc *JobController) DeleteService(obj interface{})
- func (jc *JobController) FilterPodsForReplicaType(pods []*v1.Pod, replicaType string) ([]*v1.Pod, error)
- func (jc *JobController) FilterServicesForReplicaType(services []*v1.Service, replicaType string) ([]*v1.Service, error)
- func (jc *JobController) GenLabels(jobName string) map[string]string
- func (jc *JobController) GenOwnerReference(obj metav1.Object) *metav1.OwnerReference
- func (jc *JobController) GetPodSlices(pods []*v1.Pod, replicas int, logger *log.Entry) [][]*v1.Pod
- func (jc *JobController) GetPodsForJob(job metav1.Object) ([]*v1.Pod, error)
- func (jc *JobController) GetServiceSlices(services []*v1.Service, replicas int, logger *log.Entry) [][]*v1.Service
- func (jc *JobController) GetServicesForJob(job metav1.Object) ([]*v1.Service, error)
- func (jc *JobController) SyncPdb(job metav1.Object, minAvailableReplicas int32) (*v1beta1.PodDisruptionBudget, error)
- func (jc *JobController) UpdatePod(old, cur interface{})
- func (jc *JobController) UpdateService(old, cur interface{})
- type JobControllerConfiguration
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GenExpectationPodsKey ¶
func GenGeneralName ¶
func RecheckDeletionTimestamp ¶
RecheckDeletionTimestamp returns a CanAdopt() function to recheck deletion.
The CanAdopt() function calls getObject() to fetch the latest value, and denies adoption attempts if that object has a non-nil DeletionTimestamp.
Types ¶
type ControllerInterface ¶
type ControllerInterface interface {
// Returns the Controller name
ControllerName() string
// Returns the GroupVersionKind of the API
GetAPIGroupVersionKind() schema.GroupVersionKind
// Returns the GroupVersion of the API
GetAPIGroupVersion() schema.GroupVersion
// Returns the Group Name(key) in the labels of the job
GetGroupNameLabelKey() string
// Returns the Job Name(key) in the labels of the job
GetJobNameLabelKey() string
// Returns the Group Name(value) in the labels of the job
GetGroupNameLabelValue() string
// Returns the Replica Type(key) in the labels of the job
GetReplicaTypeLabelKey() string
// Returns the Replica Index(value) in the labels of the job
GetReplicaIndexLabelKey() string
// Returns the Job from Informer Cache
GetJobFromInformerCache(namespace, name string) (metav1.Object, error)
// Returns the Job from API server
GetJobFromAPIClient(namespace, name string) (metav1.Object, error)
}
Common Interface to be implemented by all operators.
type JobController ¶
type JobController struct {
Controller ControllerInterface
Config JobControllerConfiguration
// podControl is used to add or delete pods.
PodControl controller.PodControlInterface
// serviceControl is used to add or delete services.
ServiceControl control.ServiceControlInterface
// kubeClientSet is a standard kubernetes clientset.
KubeClientSet kubeclientset.Interface
// podLister can list/get pods from the shared informer's store.
PodLister corelisters.PodLister
// serviceLister can list/get services from the shared informer's store.
ServiceLister corelisters.ServiceLister
// podInformerSynced returns true if the pod store has been synced at least once.
PodInformerSynced cache.InformerSynced
// serviceInformerSynced returns true if the service store has been synced at least once.
ServiceInformerSynced cache.InformerSynced
// A TTLCache of pod/services creates/deletes each job expects to see
// We use Job namespace/name + ReplicaType + pods/services as an expectation key,
// For example, there is a TFJob with namespace "tf-operator" and name "tfjob-abc":
// {
// "PS": {
// "Replicas": 2,
// },
// "Worker": {
// "Replicas": 4,
// }
// }
// We will create 4 expectations:
// - "tf-operator/tfjob-abc/ps/services", expects 2 adds.
// - "tf-operator/tfjob-abc/ps/pods", expects 2 adds.
// - "tf-operator/tfjob-abc/worker/services", expects 4 adds.
// - "tf-operator/tfjob-abc/worker/pods", expects 4 adds.
Expectations controller.ControllerExpectationsInterface
// workQueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
// means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
WorkQueue workqueue.RateLimitingInterface
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
Recorder record.EventRecorder
}
JobController abstracts other operators to manage the lifecycle of Jobs.
func NewJobController ¶
func NewJobController( controllerImpl ControllerInterface, reconcilerSyncPeriod metav1.Duration, enableGangScheduling bool, kubeClientSet kubeclientset.Interface, kubeInformerFactory kubeinformers.SharedInformerFactory, workQueueName string) JobController
func (*JobController) AddPod ¶
func (jc *JobController) AddPod(obj interface{})
When a pod is created, enqueue the job that manages it and update its expectations.
func (*JobController) AddService ¶
func (jc *JobController) AddService(obj interface{})
When a service is created, enqueue the controller that manages it and update its expectations.
func (*JobController) DeletePod ¶
func (jc *JobController) DeletePod(obj interface{})
When a pod is deleted, enqueue the job that manages the pod and update its expectations. obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
func (*JobController) DeleteService ¶
func (jc *JobController) DeleteService(obj interface{})
When a service is deleted, enqueue the job that manages the service and update its expectations. obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
func (*JobController) FilterPodsForReplicaType ¶
func (jc *JobController) FilterPodsForReplicaType(pods []*v1.Pod, replicaType string) ([]*v1.Pod, error)
FilterPodsForReplicaType returns pods belong to a replicaType.
func (*JobController) FilterServicesForReplicaType ¶
func (jc *JobController) FilterServicesForReplicaType(services []*v1.Service, replicaType string) ([]*v1.Service, error)
FilterServicesForReplicaType returns service belong to a replicaType.
func (*JobController) GenLabels ¶
func (jc *JobController) GenLabels(jobName string) map[string]string
func (*JobController) GenOwnerReference ¶
func (jc *JobController) GenOwnerReference(obj metav1.Object) *metav1.OwnerReference
func (*JobController) GetPodSlices ¶
getPodSlices returns a slice, which element is the slice of pod.
func (*JobController) GetPodsForJob ¶
getPodsForJob returns the set of pods that this job should manage. It also reconciles ControllerRef by adopting/orphaning. Note that the returned Pods are pointers into the cache.
func (*JobController) GetServiceSlices ¶
func (jc *JobController) GetServiceSlices(services []*v1.Service, replicas int, logger *log.Entry) [][]*v1.Service
getServiceSlices returns a slice, which element is the slice of service. Assume the return object is serviceSlices, then serviceSlices[i] is an array of pointers to services corresponding to Services for replica i.
func (*JobController) GetServicesForJob ¶
getServicesForJob returns the set of services that this job should manage. It also reconciles ControllerRef by adopting/orphaning. Note that the returned services are pointers into the cache.
func (*JobController) SyncPdb ¶
func (jc *JobController) SyncPdb(job metav1.Object, minAvailableReplicas int32) (*v1beta1.PodDisruptionBudget, error)
SyncPdb will create a PDB for gang scheduling by kube-arbitrator.
func (*JobController) UpdatePod ¶
func (jc *JobController) UpdatePod(old, cur interface{})
When a pod is updated, figure out what tfjob/s manage it and wake them up. If the labels of the pod have changed we need to awaken both the old and new replica set. old and cur must be *v1.Pod types.
func (*JobController) UpdateService ¶
func (jc *JobController) UpdateService(old, cur interface{})
When a service is updated, figure out what job/s manage it and wake them up. If the labels of the service have changed we need to awaken both the old and new replica set. old and cur must be *v1.Service types.
type JobControllerConfiguration ¶
type JobControllerConfiguration struct {
// ReconcilerSyncLoopPeriod is the amount of time the reconciler sync states loop
// wait between two reconciler sync.
// It is set to 15 sec by default.
// TODO(cph): maybe we can let it grows by multiple in the future
// and up to 5 minutes to reduce idle loop.
// e.g. 15s, 30s, 60s, 120s...
ReconcilerSyncLoopPeriod metav1.Duration
// Enable gang scheduling by kube-arbitrator
EnableGangScheduling bool
}
JobControllerConfiguration contains configuration of operator.