controller.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package kubemark
  14. import (
  15. "fmt"
  16. "math/rand"
  17. "sync"
  18. "time"
  19. apiv1 "k8s.io/api/core/v1"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/apimachinery/pkg/fields"
  22. "k8s.io/apimachinery/pkg/labels"
  23. "k8s.io/client-go/informers"
  24. informersv1 "k8s.io/client-go/informers/core/v1"
  25. kubeclient "k8s.io/client-go/kubernetes"
  26. listersv1 "k8s.io/client-go/listers/core/v1"
  27. "k8s.io/client-go/tools/cache"
  28. "k8s.io/kubernetes/pkg/controller"
  29. "k8s.io/klog"
  30. )
  31. const (
  32. namespaceKubemark = "kubemark"
  33. nodeGroupLabel = "autoscaling.k8s.io/nodegroup"
  34. numRetries = 3
  35. )
  36. // KubemarkController is a simplified version of cloud provider for kubemark. It allows
  37. // to add and delete nodes from a kubemark cluster and introduces nodegroups
  38. // by applying labels to the kubemark's hollow-nodes.
  39. type KubemarkController struct {
  40. nodeTemplate *apiv1.ReplicationController
  41. externalCluster externalCluster
  42. kubemarkCluster kubemarkCluster
  43. rand *rand.Rand
  44. createNodeQueue chan string
  45. nodeGroupQueueSize map[string]int
  46. nodeGroupQueueSizeLock sync.Mutex
  47. }
  48. // externalCluster is used to communicate with the external cluster that hosts
  49. // kubemark, in order to be able to list, create and delete hollow nodes
  50. // by manipulating the replication controllers.
  51. type externalCluster struct {
  52. rcLister listersv1.ReplicationControllerLister
  53. rcSynced cache.InformerSynced
  54. podLister listersv1.PodLister
  55. podSynced cache.InformerSynced
  56. client kubeclient.Interface
  57. }
  58. // kubemarkCluster is used to delete nodes from kubemark cluster once their
  59. // respective replication controllers have been deleted and the nodes have
  60. // become unready. This is to cover for the fact that there is no proper cloud
  61. // provider for kubemark that would care for deleting the nodes.
  62. type kubemarkCluster struct {
  63. client kubeclient.Interface
  64. nodeLister listersv1.NodeLister
  65. nodeSynced cache.InformerSynced
  66. nodesToDelete map[string]bool
  67. nodesToDeleteLock sync.Mutex
  68. }
  69. // NewKubemarkController creates KubemarkController using the provided clients to talk to external
  70. // and kubemark clusters.
  71. func NewKubemarkController(externalClient kubeclient.Interface, externalInformerFactory informers.SharedInformerFactory,
  72. kubemarkClient kubeclient.Interface, kubemarkNodeInformer informersv1.NodeInformer) (*KubemarkController, error) {
  73. rcInformer := externalInformerFactory.InformerFor(&apiv1.ReplicationController{}, newReplicationControllerInformer)
  74. podInformer := externalInformerFactory.InformerFor(&apiv1.Pod{}, newPodInformer)
  75. controller := &KubemarkController{
  76. externalCluster: externalCluster{
  77. rcLister: listersv1.NewReplicationControllerLister(rcInformer.GetIndexer()),
  78. rcSynced: rcInformer.HasSynced,
  79. podLister: listersv1.NewPodLister(podInformer.GetIndexer()),
  80. podSynced: podInformer.HasSynced,
  81. client: externalClient,
  82. },
  83. kubemarkCluster: kubemarkCluster{
  84. nodeLister: kubemarkNodeInformer.Lister(),
  85. nodeSynced: kubemarkNodeInformer.Informer().HasSynced,
  86. client: kubemarkClient,
  87. nodesToDelete: make(map[string]bool),
  88. nodesToDeleteLock: sync.Mutex{},
  89. },
  90. rand: rand.New(rand.NewSource(time.Now().UnixNano())),
  91. createNodeQueue: make(chan string, 1000),
  92. nodeGroupQueueSize: make(map[string]int),
  93. nodeGroupQueueSizeLock: sync.Mutex{},
  94. }
  95. kubemarkNodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  96. UpdateFunc: controller.kubemarkCluster.removeUnneededNodes,
  97. })
  98. return controller, nil
  99. }
  100. // WaitForCacheSync waits until all caches in the controller are populated.
  101. func (kubemarkController *KubemarkController) WaitForCacheSync(stopCh chan struct{}) bool {
  102. return controller.WaitForCacheSync("kubemark", stopCh,
  103. kubemarkController.externalCluster.rcSynced,
  104. kubemarkController.externalCluster.podSynced,
  105. kubemarkController.kubemarkCluster.nodeSynced)
  106. }
  107. // Run populates the node template needed for creation of kubemark nodes and
  108. // starts the worker routine for creating new nodes.
  109. func (kubemarkController *KubemarkController) Run(stopCh chan struct{}) {
  110. nodeTemplate, err := kubemarkController.getNodeTemplate()
  111. if err != nil {
  112. klog.Fatalf("failed to get node template: %s", err)
  113. }
  114. kubemarkController.nodeTemplate = nodeTemplate
  115. go kubemarkController.runNodeCreation(stopCh)
  116. <-stopCh
  117. }
  118. // GetNodeNamesForNodeGroup returns list of the nodes in the node group.
  119. func (kubemarkController *KubemarkController) GetNodeNamesForNodeGroup(nodeGroup string) ([]string, error) {
  120. selector := labels.SelectorFromSet(labels.Set{nodeGroupLabel: nodeGroup})
  121. pods, err := kubemarkController.externalCluster.podLister.List(selector)
  122. if err != nil {
  123. return nil, err
  124. }
  125. result := make([]string, 0, len(pods))
  126. for _, pod := range pods {
  127. result = append(result, pod.ObjectMeta.Name)
  128. }
  129. return result, nil
  130. }
  131. // GetNodeGroupSize returns the current size for the node group as observed.
  132. func (kubemarkController *KubemarkController) GetNodeGroupSize(nodeGroup string) (int, error) {
  133. selector := labels.SelectorFromSet(labels.Set(map[string]string{nodeGroupLabel: nodeGroup}))
  134. nodes, err := kubemarkController.externalCluster.rcLister.List(selector)
  135. if err != nil {
  136. return 0, err
  137. }
  138. return len(nodes), nil
  139. }
  140. // GetNodeGroupTargetSize returns the size of the node group as a sum of current
  141. // observed size and number of upcoming nodes.
  142. func (kubemarkController *KubemarkController) GetNodeGroupTargetSize(nodeGroup string) (int, error) {
  143. kubemarkController.nodeGroupQueueSizeLock.Lock()
  144. defer kubemarkController.nodeGroupQueueSizeLock.Unlock()
  145. realSize, err := kubemarkController.GetNodeGroupSize(nodeGroup)
  146. if err != nil {
  147. return realSize, err
  148. }
  149. return realSize + kubemarkController.nodeGroupQueueSize[nodeGroup], nil
  150. }
  151. // SetNodeGroupSize changes the size of node group by adding or removing nodes.
  152. func (kubemarkController *KubemarkController) SetNodeGroupSize(nodeGroup string, size int) error {
  153. currSize, err := kubemarkController.GetNodeGroupTargetSize(nodeGroup)
  154. if err != nil {
  155. return err
  156. }
  157. switch delta := size - currSize; {
  158. case delta < 0:
  159. absDelta := -delta
  160. nodes, err := kubemarkController.GetNodeNamesForNodeGroup(nodeGroup)
  161. if err != nil {
  162. return err
  163. }
  164. if len(nodes) < absDelta {
  165. return fmt.Errorf("can't remove %d nodes from %s nodegroup, not enough nodes: %d", absDelta, nodeGroup, len(nodes))
  166. }
  167. for i, node := range nodes {
  168. if i == absDelta {
  169. return nil
  170. }
  171. if err := kubemarkController.RemoveNodeFromNodeGroup(nodeGroup, node); err != nil {
  172. return err
  173. }
  174. }
  175. case delta > 0:
  176. kubemarkController.nodeGroupQueueSizeLock.Lock()
  177. for i := 0; i < delta; i++ {
  178. kubemarkController.nodeGroupQueueSize[nodeGroup]++
  179. kubemarkController.createNodeQueue <- nodeGroup
  180. }
  181. kubemarkController.nodeGroupQueueSizeLock.Unlock()
  182. }
  183. return nil
  184. }
  185. // GetNodeGroupForNode returns the name of the node group to which the node
  186. // belongs.
  187. func (kubemarkController *KubemarkController) GetNodeGroupForNode(node string) (string, error) {
  188. pod := kubemarkController.getPodByName(node)
  189. if pod == nil {
  190. return "", fmt.Errorf("node %s does not exist", node)
  191. }
  192. nodeGroup, ok := pod.ObjectMeta.Labels[nodeGroupLabel]
  193. if ok {
  194. return nodeGroup, nil
  195. }
  196. return "", fmt.Errorf("can't find nodegroup for node %s due to missing label %s", node, nodeGroupLabel)
  197. }
  198. func (kubemarkController *KubemarkController) addNodeToNodeGroup(nodeGroup string) error {
  199. node := kubemarkController.nodeTemplate.DeepCopy()
  200. node.Name = fmt.Sprintf("%s-%d", nodeGroup, kubemarkController.rand.Int63())
  201. node.Labels = map[string]string{nodeGroupLabel: nodeGroup, "name": node.Name}
  202. node.Spec.Template.Labels = node.Labels
  203. var err error
  204. for i := 0; i < numRetries; i++ {
  205. _, err = kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(node.Namespace).Create(node)
  206. if err == nil {
  207. return nil
  208. }
  209. }
  210. return err
  211. }
  212. func (kubemarkController *KubemarkController) RemoveNodeFromNodeGroup(nodeGroup string, node string) error {
  213. pod := kubemarkController.getPodByName(node)
  214. if pod == nil {
  215. klog.Warningf("Can't delete node %s from nodegroup %s. Node does not exist.", node, nodeGroup)
  216. return nil
  217. }
  218. if pod.ObjectMeta.Labels[nodeGroupLabel] != nodeGroup {
  219. return fmt.Errorf("can't delete node %s from nodegroup %s. Node is not in nodegroup", node, nodeGroup)
  220. }
  221. policy := metav1.DeletePropagationForeground
  222. var err error
  223. for i := 0; i < numRetries; i++ {
  224. err = kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(namespaceKubemark).Delete(
  225. pod.ObjectMeta.Labels["name"],
  226. &metav1.DeleteOptions{PropagationPolicy: &policy})
  227. if err == nil {
  228. klog.Infof("marking node %s for deletion", node)
  229. // Mark node for deletion from kubemark cluster.
  230. // Once it becomes unready after replication controller
  231. // deletion has been noticed, we will delete it explicitly.
  232. // This is to cover for the fact that kubemark does not
  233. // take care of this itself.
  234. kubemarkController.kubemarkCluster.markNodeForDeletion(node)
  235. return nil
  236. }
  237. }
  238. return fmt.Errorf("Failed to delete node %s: %v", node, err)
  239. }
  240. func (kubemarkController *KubemarkController) getReplicationControllerByName(name string) *apiv1.ReplicationController {
  241. rcs, err := kubemarkController.externalCluster.rcLister.List(labels.Everything())
  242. if err != nil {
  243. return nil
  244. }
  245. for _, rc := range rcs {
  246. if rc.ObjectMeta.Name == name {
  247. return rc
  248. }
  249. }
  250. return nil
  251. }
  252. func (kubemarkController *KubemarkController) getPodByName(name string) *apiv1.Pod {
  253. pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything())
  254. if err != nil {
  255. return nil
  256. }
  257. for _, pod := range pods {
  258. if pod.ObjectMeta.Name == name {
  259. return pod
  260. }
  261. }
  262. return nil
  263. }
  264. func (kubemarkController *KubemarkController) getNodeNameForPod(podName string) (string, error) {
  265. pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything())
  266. if err != nil {
  267. return "", err
  268. }
  269. for _, pod := range pods {
  270. if pod.ObjectMeta.Name == podName {
  271. return pod.Labels["name"], nil
  272. }
  273. }
  274. return "", fmt.Errorf("pod %s not found", podName)
  275. }
  276. // getNodeTemplate returns the template for hollow node replication controllers
  277. // by looking for an existing hollow node specification. This requires at least
  278. // one kubemark node to be present on startup.
  279. func (kubemarkController *KubemarkController) getNodeTemplate() (*apiv1.ReplicationController, error) {
  280. podName, err := kubemarkController.kubemarkCluster.getHollowNodeName()
  281. if err != nil {
  282. return nil, err
  283. }
  284. hollowNodeName, err := kubemarkController.getNodeNameForPod(podName)
  285. if err != nil {
  286. return nil, err
  287. }
  288. if hollowNode := kubemarkController.getReplicationControllerByName(hollowNodeName); hollowNode != nil {
  289. nodeTemplate := &apiv1.ReplicationController{
  290. Spec: apiv1.ReplicationControllerSpec{
  291. Template: hollowNode.Spec.Template,
  292. },
  293. }
  294. nodeTemplate.Spec.Selector = nil
  295. nodeTemplate.Namespace = namespaceKubemark
  296. one := int32(1)
  297. nodeTemplate.Spec.Replicas = &one
  298. return nodeTemplate, nil
  299. }
  300. return nil, fmt.Errorf("can't get hollow node template")
  301. }
  302. func (kubemarkController *KubemarkController) runNodeCreation(stop <-chan struct{}) {
  303. for {
  304. select {
  305. case nodeGroup := <-kubemarkController.createNodeQueue:
  306. kubemarkController.nodeGroupQueueSizeLock.Lock()
  307. err := kubemarkController.addNodeToNodeGroup(nodeGroup)
  308. if err != nil {
  309. klog.Errorf("failed to add node to node group %s: %v", nodeGroup, err)
  310. } else {
  311. kubemarkController.nodeGroupQueueSize[nodeGroup]--
  312. }
  313. kubemarkController.nodeGroupQueueSizeLock.Unlock()
  314. case <-stop:
  315. return
  316. }
  317. }
  318. }
  319. func (kubemarkCluster *kubemarkCluster) getHollowNodeName() (string, error) {
  320. nodes, err := kubemarkCluster.nodeLister.List(labels.Everything())
  321. if err != nil {
  322. return "", err
  323. }
  324. for _, node := range nodes {
  325. return node.Name, nil
  326. }
  327. return "", fmt.Errorf("did not find any hollow nodes in the cluster")
  328. }
  329. func (kubemarkCluster *kubemarkCluster) removeUnneededNodes(oldObj interface{}, newObj interface{}) {
  330. node, ok := newObj.(*apiv1.Node)
  331. if !ok {
  332. return
  333. }
  334. for _, condition := range node.Status.Conditions {
  335. // Delete node if it is in unready state, and it has been
  336. // explicitly marked for deletion.
  337. if condition.Type == apiv1.NodeReady && condition.Status != apiv1.ConditionTrue {
  338. kubemarkCluster.nodesToDeleteLock.Lock()
  339. defer kubemarkCluster.nodesToDeleteLock.Unlock()
  340. if kubemarkCluster.nodesToDelete[node.Name] {
  341. kubemarkCluster.nodesToDelete[node.Name] = false
  342. if err := kubemarkCluster.client.CoreV1().Nodes().Delete(node.Name, &metav1.DeleteOptions{}); err != nil {
  343. klog.Errorf("failed to delete node %s from kubemark cluster, err: %v", node.Name, err)
  344. }
  345. }
  346. return
  347. }
  348. }
  349. }
  350. func (kubemarkCluster *kubemarkCluster) markNodeForDeletion(name string) {
  351. kubemarkCluster.nodesToDeleteLock.Lock()
  352. defer kubemarkCluster.nodesToDeleteLock.Unlock()
  353. kubemarkCluster.nodesToDelete[name] = true
  354. }
  355. func newReplicationControllerInformer(kubeClient kubeclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
  356. rcListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "replicationcontrollers", namespaceKubemark, fields.Everything())
  357. return cache.NewSharedIndexInformer(rcListWatch, &apiv1.ReplicationController{}, resyncPeriod, nil)
  358. }
  359. func newPodInformer(kubeClient kubeclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
  360. podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespaceKubemark, fields.Everything())
  361. return cache.NewSharedIndexInformer(podListWatch, &apiv1.Pod{}, resyncPeriod, nil)
  362. }