123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402 |
- /*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package kubemark
- import (
- "fmt"
- "math/rand"
- "sync"
- "time"
- apiv1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/fields"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/client-go/informers"
- informersv1 "k8s.io/client-go/informers/core/v1"
- kubeclient "k8s.io/client-go/kubernetes"
- listersv1 "k8s.io/client-go/listers/core/v1"
- "k8s.io/client-go/tools/cache"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/klog"
- )
- const (
- namespaceKubemark = "kubemark"
- nodeGroupLabel = "autoscaling.k8s.io/nodegroup"
- numRetries = 3
- )
- // KubemarkController is a simplified version of cloud provider for kubemark. It allows
- // to add and delete nodes from a kubemark cluster and introduces nodegroups
- // by applying labels to the kubemark's hollow-nodes.
- type KubemarkController struct {
- nodeTemplate *apiv1.ReplicationController
- externalCluster externalCluster
- kubemarkCluster kubemarkCluster
- rand *rand.Rand
- createNodeQueue chan string
- nodeGroupQueueSize map[string]int
- nodeGroupQueueSizeLock sync.Mutex
- }
- // externalCluster is used to communicate with the external cluster that hosts
- // kubemark, in order to be able to list, create and delete hollow nodes
- // by manipulating the replication controllers.
- type externalCluster struct {
- rcLister listersv1.ReplicationControllerLister
- rcSynced cache.InformerSynced
- podLister listersv1.PodLister
- podSynced cache.InformerSynced
- client kubeclient.Interface
- }
- // kubemarkCluster is used to delete nodes from kubemark cluster once their
- // respective replication controllers have been deleted and the nodes have
- // become unready. This is to cover for the fact that there is no proper cloud
- // provider for kubemark that would care for deleting the nodes.
- type kubemarkCluster struct {
- client kubeclient.Interface
- nodeLister listersv1.NodeLister
- nodeSynced cache.InformerSynced
- nodesToDelete map[string]bool
- nodesToDeleteLock sync.Mutex
- }
- // NewKubemarkController creates KubemarkController using the provided clients to talk to external
- // and kubemark clusters.
- func NewKubemarkController(externalClient kubeclient.Interface, externalInformerFactory informers.SharedInformerFactory,
- kubemarkClient kubeclient.Interface, kubemarkNodeInformer informersv1.NodeInformer) (*KubemarkController, error) {
- rcInformer := externalInformerFactory.InformerFor(&apiv1.ReplicationController{}, newReplicationControllerInformer)
- podInformer := externalInformerFactory.InformerFor(&apiv1.Pod{}, newPodInformer)
- controller := &KubemarkController{
- externalCluster: externalCluster{
- rcLister: listersv1.NewReplicationControllerLister(rcInformer.GetIndexer()),
- rcSynced: rcInformer.HasSynced,
- podLister: listersv1.NewPodLister(podInformer.GetIndexer()),
- podSynced: podInformer.HasSynced,
- client: externalClient,
- },
- kubemarkCluster: kubemarkCluster{
- nodeLister: kubemarkNodeInformer.Lister(),
- nodeSynced: kubemarkNodeInformer.Informer().HasSynced,
- client: kubemarkClient,
- nodesToDelete: make(map[string]bool),
- nodesToDeleteLock: sync.Mutex{},
- },
- rand: rand.New(rand.NewSource(time.Now().UnixNano())),
- createNodeQueue: make(chan string, 1000),
- nodeGroupQueueSize: make(map[string]int),
- nodeGroupQueueSizeLock: sync.Mutex{},
- }
- kubemarkNodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- UpdateFunc: controller.kubemarkCluster.removeUnneededNodes,
- })
- return controller, nil
- }
- // WaitForCacheSync waits until all caches in the controller are populated.
- func (kubemarkController *KubemarkController) WaitForCacheSync(stopCh chan struct{}) bool {
- return controller.WaitForCacheSync("kubemark", stopCh,
- kubemarkController.externalCluster.rcSynced,
- kubemarkController.externalCluster.podSynced,
- kubemarkController.kubemarkCluster.nodeSynced)
- }
- // Run populates the node template needed for creation of kubemark nodes and
- // starts the worker routine for creating new nodes.
- func (kubemarkController *KubemarkController) Run(stopCh chan struct{}) {
- nodeTemplate, err := kubemarkController.getNodeTemplate()
- if err != nil {
- klog.Fatalf("failed to get node template: %s", err)
- }
- kubemarkController.nodeTemplate = nodeTemplate
- go kubemarkController.runNodeCreation(stopCh)
- <-stopCh
- }
- // GetNodeNamesForNodeGroup returns list of the nodes in the node group.
- func (kubemarkController *KubemarkController) GetNodeNamesForNodeGroup(nodeGroup string) ([]string, error) {
- selector := labels.SelectorFromSet(labels.Set{nodeGroupLabel: nodeGroup})
- pods, err := kubemarkController.externalCluster.podLister.List(selector)
- if err != nil {
- return nil, err
- }
- result := make([]string, 0, len(pods))
- for _, pod := range pods {
- result = append(result, pod.ObjectMeta.Name)
- }
- return result, nil
- }
- // GetNodeGroupSize returns the current size for the node group as observed.
- func (kubemarkController *KubemarkController) GetNodeGroupSize(nodeGroup string) (int, error) {
- selector := labels.SelectorFromSet(labels.Set(map[string]string{nodeGroupLabel: nodeGroup}))
- nodes, err := kubemarkController.externalCluster.rcLister.List(selector)
- if err != nil {
- return 0, err
- }
- return len(nodes), nil
- }
- // GetNodeGroupTargetSize returns the size of the node group as a sum of current
- // observed size and number of upcoming nodes.
- func (kubemarkController *KubemarkController) GetNodeGroupTargetSize(nodeGroup string) (int, error) {
- kubemarkController.nodeGroupQueueSizeLock.Lock()
- defer kubemarkController.nodeGroupQueueSizeLock.Unlock()
- realSize, err := kubemarkController.GetNodeGroupSize(nodeGroup)
- if err != nil {
- return realSize, err
- }
- return realSize + kubemarkController.nodeGroupQueueSize[nodeGroup], nil
- }
- // SetNodeGroupSize changes the size of node group by adding or removing nodes.
- func (kubemarkController *KubemarkController) SetNodeGroupSize(nodeGroup string, size int) error {
- currSize, err := kubemarkController.GetNodeGroupTargetSize(nodeGroup)
- if err != nil {
- return err
- }
- switch delta := size - currSize; {
- case delta < 0:
- absDelta := -delta
- nodes, err := kubemarkController.GetNodeNamesForNodeGroup(nodeGroup)
- if err != nil {
- return err
- }
- if len(nodes) < absDelta {
- return fmt.Errorf("can't remove %d nodes from %s nodegroup, not enough nodes: %d", absDelta, nodeGroup, len(nodes))
- }
- for i, node := range nodes {
- if i == absDelta {
- return nil
- }
- if err := kubemarkController.RemoveNodeFromNodeGroup(nodeGroup, node); err != nil {
- return err
- }
- }
- case delta > 0:
- kubemarkController.nodeGroupQueueSizeLock.Lock()
- for i := 0; i < delta; i++ {
- kubemarkController.nodeGroupQueueSize[nodeGroup]++
- kubemarkController.createNodeQueue <- nodeGroup
- }
- kubemarkController.nodeGroupQueueSizeLock.Unlock()
- }
- return nil
- }
- // GetNodeGroupForNode returns the name of the node group to which the node
- // belongs.
- func (kubemarkController *KubemarkController) GetNodeGroupForNode(node string) (string, error) {
- pod := kubemarkController.getPodByName(node)
- if pod == nil {
- return "", fmt.Errorf("node %s does not exist", node)
- }
- nodeGroup, ok := pod.ObjectMeta.Labels[nodeGroupLabel]
- if ok {
- return nodeGroup, nil
- }
- return "", fmt.Errorf("can't find nodegroup for node %s due to missing label %s", node, nodeGroupLabel)
- }
- func (kubemarkController *KubemarkController) addNodeToNodeGroup(nodeGroup string) error {
- node := kubemarkController.nodeTemplate.DeepCopy()
- node.Name = fmt.Sprintf("%s-%d", nodeGroup, kubemarkController.rand.Int63())
- node.Labels = map[string]string{nodeGroupLabel: nodeGroup, "name": node.Name}
- node.Spec.Template.Labels = node.Labels
- var err error
- for i := 0; i < numRetries; i++ {
- _, err = kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(node.Namespace).Create(node)
- if err == nil {
- return nil
- }
- }
- return err
- }
- func (kubemarkController *KubemarkController) RemoveNodeFromNodeGroup(nodeGroup string, node string) error {
- pod := kubemarkController.getPodByName(node)
- if pod == nil {
- klog.Warningf("Can't delete node %s from nodegroup %s. Node does not exist.", node, nodeGroup)
- return nil
- }
- if pod.ObjectMeta.Labels[nodeGroupLabel] != nodeGroup {
- return fmt.Errorf("can't delete node %s from nodegroup %s. Node is not in nodegroup", node, nodeGroup)
- }
- policy := metav1.DeletePropagationForeground
- var err error
- for i := 0; i < numRetries; i++ {
- err = kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(namespaceKubemark).Delete(
- pod.ObjectMeta.Labels["name"],
- &metav1.DeleteOptions{PropagationPolicy: &policy})
- if err == nil {
- klog.Infof("marking node %s for deletion", node)
- // Mark node for deletion from kubemark cluster.
- // Once it becomes unready after replication controller
- // deletion has been noticed, we will delete it explicitly.
- // This is to cover for the fact that kubemark does not
- // take care of this itself.
- kubemarkController.kubemarkCluster.markNodeForDeletion(node)
- return nil
- }
- }
- return fmt.Errorf("Failed to delete node %s: %v", node, err)
- }
- func (kubemarkController *KubemarkController) getReplicationControllerByName(name string) *apiv1.ReplicationController {
- rcs, err := kubemarkController.externalCluster.rcLister.List(labels.Everything())
- if err != nil {
- return nil
- }
- for _, rc := range rcs {
- if rc.ObjectMeta.Name == name {
- return rc
- }
- }
- return nil
- }
- func (kubemarkController *KubemarkController) getPodByName(name string) *apiv1.Pod {
- pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything())
- if err != nil {
- return nil
- }
- for _, pod := range pods {
- if pod.ObjectMeta.Name == name {
- return pod
- }
- }
- return nil
- }
- func (kubemarkController *KubemarkController) getNodeNameForPod(podName string) (string, error) {
- pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything())
- if err != nil {
- return "", err
- }
- for _, pod := range pods {
- if pod.ObjectMeta.Name == podName {
- return pod.Labels["name"], nil
- }
- }
- return "", fmt.Errorf("pod %s not found", podName)
- }
- // getNodeTemplate returns the template for hollow node replication controllers
- // by looking for an existing hollow node specification. This requires at least
- // one kubemark node to be present on startup.
- func (kubemarkController *KubemarkController) getNodeTemplate() (*apiv1.ReplicationController, error) {
- podName, err := kubemarkController.kubemarkCluster.getHollowNodeName()
- if err != nil {
- return nil, err
- }
- hollowNodeName, err := kubemarkController.getNodeNameForPod(podName)
- if err != nil {
- return nil, err
- }
- if hollowNode := kubemarkController.getReplicationControllerByName(hollowNodeName); hollowNode != nil {
- nodeTemplate := &apiv1.ReplicationController{
- Spec: apiv1.ReplicationControllerSpec{
- Template: hollowNode.Spec.Template,
- },
- }
- nodeTemplate.Spec.Selector = nil
- nodeTemplate.Namespace = namespaceKubemark
- one := int32(1)
- nodeTemplate.Spec.Replicas = &one
- return nodeTemplate, nil
- }
- return nil, fmt.Errorf("can't get hollow node template")
- }
- func (kubemarkController *KubemarkController) runNodeCreation(stop <-chan struct{}) {
- for {
- select {
- case nodeGroup := <-kubemarkController.createNodeQueue:
- kubemarkController.nodeGroupQueueSizeLock.Lock()
- err := kubemarkController.addNodeToNodeGroup(nodeGroup)
- if err != nil {
- klog.Errorf("failed to add node to node group %s: %v", nodeGroup, err)
- } else {
- kubemarkController.nodeGroupQueueSize[nodeGroup]--
- }
- kubemarkController.nodeGroupQueueSizeLock.Unlock()
- case <-stop:
- return
- }
- }
- }
- func (kubemarkCluster *kubemarkCluster) getHollowNodeName() (string, error) {
- nodes, err := kubemarkCluster.nodeLister.List(labels.Everything())
- if err != nil {
- return "", err
- }
- for _, node := range nodes {
- return node.Name, nil
- }
- return "", fmt.Errorf("did not find any hollow nodes in the cluster")
- }
- func (kubemarkCluster *kubemarkCluster) removeUnneededNodes(oldObj interface{}, newObj interface{}) {
- node, ok := newObj.(*apiv1.Node)
- if !ok {
- return
- }
- for _, condition := range node.Status.Conditions {
- // Delete node if it is in unready state, and it has been
- // explicitly marked for deletion.
- if condition.Type == apiv1.NodeReady && condition.Status != apiv1.ConditionTrue {
- kubemarkCluster.nodesToDeleteLock.Lock()
- defer kubemarkCluster.nodesToDeleteLock.Unlock()
- if kubemarkCluster.nodesToDelete[node.Name] {
- kubemarkCluster.nodesToDelete[node.Name] = false
- if err := kubemarkCluster.client.CoreV1().Nodes().Delete(node.Name, &metav1.DeleteOptions{}); err != nil {
- klog.Errorf("failed to delete node %s from kubemark cluster, err: %v", node.Name, err)
- }
- }
- return
- }
- }
- }
- func (kubemarkCluster *kubemarkCluster) markNodeForDeletion(name string) {
- kubemarkCluster.nodesToDeleteLock.Lock()
- defer kubemarkCluster.nodesToDeleteLock.Unlock()
- kubemarkCluster.nodesToDelete[name] = true
- }
- func newReplicationControllerInformer(kubeClient kubeclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
- rcListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "replicationcontrollers", namespaceKubemark, fields.Everything())
- return cache.NewSharedIndexInformer(rcListWatch, &apiv1.ReplicationController{}, resyncPeriod, nil)
- }
- func newPodInformer(kubeClient kubeclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
- podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespaceKubemark, fields.Everything())
- return cache.NewSharedIndexInformer(podListWatch, &apiv1.Pod{}, resyncPeriod, nil)
- }
|