horizontal.go 54 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package podautoscaler
  14. import (
  15. "context"
  16. "fmt"
  17. "math"
  18. "time"
  19. autoscalingv1 "k8s.io/api/autoscaling/v1"
  20. autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
  21. v1 "k8s.io/api/core/v1"
  22. apiequality "k8s.io/apimachinery/pkg/api/equality"
  23. "k8s.io/apimachinery/pkg/api/errors"
  24. apimeta "k8s.io/apimachinery/pkg/api/meta"
  25. "k8s.io/apimachinery/pkg/api/resource"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/apimachinery/pkg/labels"
  28. "k8s.io/apimachinery/pkg/runtime"
  29. "k8s.io/apimachinery/pkg/runtime/schema"
  30. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  31. "k8s.io/apimachinery/pkg/util/wait"
  32. autoscalinginformers "k8s.io/client-go/informers/autoscaling/v1"
  33. coreinformers "k8s.io/client-go/informers/core/v1"
  34. "k8s.io/client-go/kubernetes/scheme"
  35. autoscalingclient "k8s.io/client-go/kubernetes/typed/autoscaling/v1"
  36. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  37. autoscalinglisters "k8s.io/client-go/listers/autoscaling/v1"
  38. corelisters "k8s.io/client-go/listers/core/v1"
  39. scaleclient "k8s.io/client-go/scale"
  40. "k8s.io/client-go/tools/cache"
  41. "k8s.io/client-go/tools/record"
  42. "k8s.io/client-go/util/workqueue"
  43. "k8s.io/klog"
  44. "k8s.io/kubernetes/pkg/api/legacyscheme"
  45. "k8s.io/kubernetes/pkg/controller"
  46. metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
  47. )
  48. var (
  49. scaleUpLimitFactor = 2.0
  50. scaleUpLimitMinimum = 4.0
  51. )
  52. type timestampedRecommendation struct {
  53. recommendation int32
  54. timestamp time.Time
  55. }
  56. type timestampedScaleEvent struct {
  57. replicaChange int32 // positive for scaleUp, negative for scaleDown
  58. timestamp time.Time
  59. outdated bool
  60. }
  61. // HorizontalController is responsible for the synchronizing HPA objects stored
  62. // in the system with the actual deployments/replication controllers they
  63. // control.
  64. type HorizontalController struct {
  65. scaleNamespacer scaleclient.ScalesGetter
  66. hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter
  67. mapper apimeta.RESTMapper
  68. replicaCalc *ReplicaCalculator
  69. eventRecorder record.EventRecorder
  70. downscaleStabilisationWindow time.Duration
  71. // hpaLister is able to list/get HPAs from the shared cache from the informer passed in to
  72. // NewHorizontalController.
  73. hpaLister autoscalinglisters.HorizontalPodAutoscalerLister
  74. hpaListerSynced cache.InformerSynced
  75. // podLister is able to list/get Pods from the shared cache from the informer passed in to
  76. // NewHorizontalController.
  77. podLister corelisters.PodLister
  78. podListerSynced cache.InformerSynced
  79. // Controllers that need to be synced
  80. queue workqueue.RateLimitingInterface
  81. // Latest unstabilized recommendations for each autoscaler.
  82. recommendations map[string][]timestampedRecommendation
  83. // Latest autoscaler events
  84. scaleUpEvents map[string][]timestampedScaleEvent
  85. scaleDownEvents map[string][]timestampedScaleEvent
  86. }
  87. // NewHorizontalController creates a new HorizontalController.
  88. func NewHorizontalController(
  89. evtNamespacer v1core.EventsGetter,
  90. scaleNamespacer scaleclient.ScalesGetter,
  91. hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter,
  92. mapper apimeta.RESTMapper,
  93. metricsClient metricsclient.MetricsClient,
  94. hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer,
  95. podInformer coreinformers.PodInformer,
  96. resyncPeriod time.Duration,
  97. downscaleStabilisationWindow time.Duration,
  98. tolerance float64,
  99. cpuInitializationPeriod,
  100. delayOfInitialReadinessStatus time.Duration,
  101. ) *HorizontalController {
  102. broadcaster := record.NewBroadcaster()
  103. broadcaster.StartLogging(klog.Infof)
  104. broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: evtNamespacer.Events("")})
  105. recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "horizontal-pod-autoscaler"})
  106. hpaController := &HorizontalController{
  107. eventRecorder: recorder,
  108. scaleNamespacer: scaleNamespacer,
  109. hpaNamespacer: hpaNamespacer,
  110. downscaleStabilisationWindow: downscaleStabilisationWindow,
  111. queue: workqueue.NewNamedRateLimitingQueue(NewDefaultHPARateLimiter(resyncPeriod), "horizontalpodautoscaler"),
  112. mapper: mapper,
  113. recommendations: map[string][]timestampedRecommendation{},
  114. scaleUpEvents: map[string][]timestampedScaleEvent{},
  115. scaleDownEvents: map[string][]timestampedScaleEvent{},
  116. }
  117. hpaInformer.Informer().AddEventHandlerWithResyncPeriod(
  118. cache.ResourceEventHandlerFuncs{
  119. AddFunc: hpaController.enqueueHPA,
  120. UpdateFunc: hpaController.updateHPA,
  121. DeleteFunc: hpaController.deleteHPA,
  122. },
  123. resyncPeriod,
  124. )
  125. hpaController.hpaLister = hpaInformer.Lister()
  126. hpaController.hpaListerSynced = hpaInformer.Informer().HasSynced
  127. hpaController.podLister = podInformer.Lister()
  128. hpaController.podListerSynced = podInformer.Informer().HasSynced
  129. replicaCalc := NewReplicaCalculator(
  130. metricsClient,
  131. hpaController.podLister,
  132. tolerance,
  133. cpuInitializationPeriod,
  134. delayOfInitialReadinessStatus,
  135. )
  136. hpaController.replicaCalc = replicaCalc
  137. return hpaController
  138. }
  139. // Run begins watching and syncing.
  140. func (a *HorizontalController) Run(stopCh <-chan struct{}) {
  141. defer utilruntime.HandleCrash()
  142. defer a.queue.ShutDown()
  143. klog.Infof("Starting HPA controller")
  144. defer klog.Infof("Shutting down HPA controller")
  145. if !cache.WaitForNamedCacheSync("HPA", stopCh, a.hpaListerSynced, a.podListerSynced) {
  146. return
  147. }
  148. // start a single worker (we may wish to start more in the future)
  149. go wait.Until(a.worker, time.Second, stopCh)
  150. <-stopCh
  151. }
  152. // obj could be an *v1.HorizontalPodAutoscaler, or a DeletionFinalStateUnknown marker item.
  153. func (a *HorizontalController) updateHPA(old, cur interface{}) {
  154. a.enqueueHPA(cur)
  155. }
  156. // obj could be an *v1.HorizontalPodAutoscaler, or a DeletionFinalStateUnknown marker item.
  157. func (a *HorizontalController) enqueueHPA(obj interface{}) {
  158. key, err := controller.KeyFunc(obj)
  159. if err != nil {
  160. utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
  161. return
  162. }
  163. // Requests are always added to queue with resyncPeriod delay. If there's already
  164. // request for the HPA in the queue then a new request is always dropped. Requests spend resync
  165. // interval in queue so HPAs are processed every resync interval.
  166. a.queue.AddRateLimited(key)
  167. }
  168. func (a *HorizontalController) deleteHPA(obj interface{}) {
  169. key, err := controller.KeyFunc(obj)
  170. if err != nil {
  171. utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
  172. return
  173. }
  174. // TODO: could we leak if we fail to get the key?
  175. a.queue.Forget(key)
  176. }
  177. func (a *HorizontalController) worker() {
  178. for a.processNextWorkItem() {
  179. }
  180. klog.Infof("horizontal pod autoscaler controller worker shutting down")
  181. }
  182. func (a *HorizontalController) processNextWorkItem() bool {
  183. key, quit := a.queue.Get()
  184. if quit {
  185. return false
  186. }
  187. defer a.queue.Done(key)
  188. deleted, err := a.reconcileKey(key.(string))
  189. if err != nil {
  190. utilruntime.HandleError(err)
  191. }
  192. // Add request processing HPA to queue with resyncPeriod delay.
  193. // Requests are always added to queue with resyncPeriod delay. If there's already request
  194. // for the HPA in the queue then a new request is always dropped. Requests spend resyncPeriod
  195. // in queue so HPAs are processed every resyncPeriod.
  196. // Request is added here just in case last resync didn't insert request into the queue. This
  197. // happens quite often because there is race condition between adding request after resyncPeriod
  198. // and removing them from queue. Request can be added by resync before previous request is
  199. // removed from queue. If we didn't add request here then in this case one request would be dropped
  200. // and HPA would processed after 2 x resyncPeriod.
  201. if !deleted {
  202. a.queue.AddRateLimited(key)
  203. }
  204. return true
  205. }
  206. // computeReplicasForMetrics computes the desired number of replicas for the metric specifications listed in the HPA,
  207. // returning the maximum of the computed replica counts, a description of the associated metric, and the statuses of
  208. // all metrics computed.
  209. func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,
  210. metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {
  211. if scale.Status.Selector == "" {
  212. errMsg := "selector is required"
  213. a.eventRecorder.Event(hpa, v1.EventTypeWarning, "SelectorRequired", errMsg)
  214. setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "InvalidSelector", "the HPA target's scale is missing a selector")
  215. return 0, "", nil, time.Time{}, fmt.Errorf(errMsg)
  216. }
  217. selector, err := labels.Parse(scale.Status.Selector)
  218. if err != nil {
  219. errMsg := fmt.Sprintf("couldn't convert selector into a corresponding internal selector object: %v", err)
  220. a.eventRecorder.Event(hpa, v1.EventTypeWarning, "InvalidSelector", errMsg)
  221. setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "InvalidSelector", errMsg)
  222. return 0, "", nil, time.Time{}, fmt.Errorf(errMsg)
  223. }
  224. specReplicas := scale.Spec.Replicas
  225. statusReplicas := scale.Status.Replicas
  226. statuses = make([]autoscalingv2.MetricStatus, len(metricSpecs))
  227. invalidMetricsCount := 0
  228. var invalidMetricError error
  229. var invalidMetricCondition autoscalingv2.HorizontalPodAutoscalerCondition
  230. for i, metricSpec := range metricSpecs {
  231. replicaCountProposal, metricNameProposal, timestampProposal, condition, err := a.computeReplicasForMetric(hpa, metricSpec, specReplicas, statusReplicas, selector, &statuses[i])
  232. if err != nil {
  233. if invalidMetricsCount <= 0 {
  234. invalidMetricCondition = condition
  235. invalidMetricError = err
  236. }
  237. invalidMetricsCount++
  238. }
  239. if err == nil && (replicas == 0 || replicaCountProposal > replicas) {
  240. timestamp = timestampProposal
  241. replicas = replicaCountProposal
  242. metric = metricNameProposal
  243. }
  244. }
  245. // If all metrics are invalid return error and set condition on hpa based on first invalid metric.
  246. if invalidMetricsCount >= len(metricSpecs) {
  247. setCondition(hpa, invalidMetricCondition.Type, invalidMetricCondition.Status, invalidMetricCondition.Reason, invalidMetricCondition.Message)
  248. return 0, "", statuses, time.Time{}, fmt.Errorf("invalid metrics (%v invalid out of %v), first error is: %v", invalidMetricsCount, len(metricSpecs), invalidMetricError)
  249. }
  250. setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionTrue, "ValidMetricFound", "the HPA was able to successfully calculate a replica count from %s", metric)
  251. return replicas, metric, statuses, timestamp, nil
  252. }
  253. // Computes the desired number of replicas for a specific hpa and metric specification,
  254. // returning the metric status and a proposed condition to be set on the HPA object.
  255. func (a *HorizontalController) computeReplicasForMetric(hpa *autoscalingv2.HorizontalPodAutoscaler, spec autoscalingv2.MetricSpec,
  256. specReplicas, statusReplicas int32, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, metricNameProposal string,
  257. timestampProposal time.Time, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
  258. switch spec.Type {
  259. case autoscalingv2.ObjectMetricSourceType:
  260. metricSelector, err := metav1.LabelSelectorAsSelector(spec.Object.Metric.Selector)
  261. if err != nil {
  262. condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
  263. return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
  264. }
  265. replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForObjectMetric(specReplicas, statusReplicas, spec, hpa, selector, status, metricSelector)
  266. if err != nil {
  267. return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
  268. }
  269. case autoscalingv2.PodsMetricSourceType:
  270. metricSelector, err := metav1.LabelSelectorAsSelector(spec.Pods.Metric.Selector)
  271. if err != nil {
  272. condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)
  273. return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
  274. }
  275. replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForPodsMetric(specReplicas, spec, hpa, selector, status, metricSelector)
  276. if err != nil {
  277. return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
  278. }
  279. case autoscalingv2.ResourceMetricSourceType:
  280. replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForResourceMetric(specReplicas, spec, hpa, selector, status)
  281. if err != nil {
  282. return 0, "", time.Time{}, condition, err
  283. }
  284. case autoscalingv2.ExternalMetricSourceType:
  285. replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForExternalMetric(specReplicas, statusReplicas, spec, hpa, selector, status)
  286. if err != nil {
  287. return 0, "", time.Time{}, condition, err
  288. }
  289. default:
  290. errMsg := fmt.Sprintf("unknown metric source type %q", string(spec.Type))
  291. err = fmt.Errorf(errMsg)
  292. condition := a.getUnableComputeReplicaCountCondition(hpa, "InvalidMetricSourceType", err)
  293. return 0, "", time.Time{}, condition, err
  294. }
  295. return replicaCountProposal, metricNameProposal, timestampProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
  296. }
  297. func (a *HorizontalController) reconcileKey(key string) (deleted bool, err error) {
  298. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  299. if err != nil {
  300. return true, err
  301. }
  302. hpa, err := a.hpaLister.HorizontalPodAutoscalers(namespace).Get(name)
  303. if errors.IsNotFound(err) {
  304. klog.Infof("Horizontal Pod Autoscaler %s has been deleted in %s", name, namespace)
  305. delete(a.recommendations, key)
  306. delete(a.scaleUpEvents, key)
  307. delete(a.scaleDownEvents, key)
  308. return true, nil
  309. }
  310. return false, a.reconcileAutoscaler(hpa, key)
  311. }
  312. // computeStatusForObjectMetric computes the desired number of replicas for the specified metric of type ObjectMetricSourceType.
  313. func (a *HorizontalController) computeStatusForObjectMetric(specReplicas, statusReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler, selector labels.Selector, status *autoscalingv2.MetricStatus, metricSelector labels.Selector) (replicas int32, timestamp time.Time, metricName string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
  314. if metricSpec.Object.Target.Type == autoscalingv2.ValueMetricType {
  315. replicaCountProposal, utilizationProposal, timestampProposal, err := a.replicaCalc.GetObjectMetricReplicas(specReplicas, metricSpec.Object.Target.Value.MilliValue(), metricSpec.Object.Metric.Name, hpa.Namespace, &metricSpec.Object.DescribedObject, selector, metricSelector)
  316. if err != nil {
  317. condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
  318. return 0, timestampProposal, "", condition, err
  319. }
  320. *status = autoscalingv2.MetricStatus{
  321. Type: autoscalingv2.ObjectMetricSourceType,
  322. Object: &autoscalingv2.ObjectMetricStatus{
  323. DescribedObject: metricSpec.Object.DescribedObject,
  324. Metric: autoscalingv2.MetricIdentifier{
  325. Name: metricSpec.Object.Metric.Name,
  326. Selector: metricSpec.Object.Metric.Selector,
  327. },
  328. Current: autoscalingv2.MetricValueStatus{
  329. Value: resource.NewMilliQuantity(utilizationProposal, resource.DecimalSI),
  330. },
  331. },
  332. }
  333. return replicaCountProposal, timestampProposal, fmt.Sprintf("%s metric %s", metricSpec.Object.DescribedObject.Kind, metricSpec.Object.Metric.Name), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
  334. } else if metricSpec.Object.Target.Type == autoscalingv2.AverageValueMetricType {
  335. replicaCountProposal, utilizationProposal, timestampProposal, err := a.replicaCalc.GetObjectPerPodMetricReplicas(statusReplicas, metricSpec.Object.Target.AverageValue.MilliValue(), metricSpec.Object.Metric.Name, hpa.Namespace, &metricSpec.Object.DescribedObject, metricSelector)
  336. if err != nil {
  337. condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
  338. return 0, time.Time{}, "", condition, fmt.Errorf("failed to get %s object metric: %v", metricSpec.Object.Metric.Name, err)
  339. }
  340. *status = autoscalingv2.MetricStatus{
  341. Type: autoscalingv2.ObjectMetricSourceType,
  342. Object: &autoscalingv2.ObjectMetricStatus{
  343. Metric: autoscalingv2.MetricIdentifier{
  344. Name: metricSpec.Object.Metric.Name,
  345. Selector: metricSpec.Object.Metric.Selector,
  346. },
  347. Current: autoscalingv2.MetricValueStatus{
  348. AverageValue: resource.NewMilliQuantity(utilizationProposal, resource.DecimalSI),
  349. },
  350. },
  351. }
  352. return replicaCountProposal, timestampProposal, fmt.Sprintf("external metric %s(%+v)", metricSpec.Object.Metric.Name, metricSpec.Object.Metric.Selector), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
  353. }
  354. errMsg := "invalid object metric source: neither a value target nor an average value target was set"
  355. err = fmt.Errorf(errMsg)
  356. condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
  357. return 0, time.Time{}, "", condition, err
  358. }
  359. // computeStatusForPodsMetric computes the desired number of replicas for the specified metric of type PodsMetricSourceType.
  360. func (a *HorizontalController) computeStatusForPodsMetric(currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler, selector labels.Selector, status *autoscalingv2.MetricStatus, metricSelector labels.Selector) (replicaCountProposal int32, timestampProposal time.Time, metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
  361. replicaCountProposal, utilizationProposal, timestampProposal, err := a.replicaCalc.GetMetricReplicas(currentReplicas, metricSpec.Pods.Target.AverageValue.MilliValue(), metricSpec.Pods.Metric.Name, hpa.Namespace, selector, metricSelector)
  362. if err != nil {
  363. condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)
  364. return 0, timestampProposal, "", condition, err
  365. }
  366. *status = autoscalingv2.MetricStatus{
  367. Type: autoscalingv2.PodsMetricSourceType,
  368. Pods: &autoscalingv2.PodsMetricStatus{
  369. Metric: autoscalingv2.MetricIdentifier{
  370. Name: metricSpec.Pods.Metric.Name,
  371. Selector: metricSpec.Pods.Metric.Selector,
  372. },
  373. Current: autoscalingv2.MetricValueStatus{
  374. AverageValue: resource.NewMilliQuantity(utilizationProposal, resource.DecimalSI),
  375. },
  376. },
  377. }
  378. return replicaCountProposal, timestampProposal, fmt.Sprintf("pods metric %s", metricSpec.Pods.Metric.Name), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
  379. }
  380. // computeStatusForResourceMetric computes the desired number of replicas for the specified metric of type ResourceMetricSourceType.
  381. func (a *HorizontalController) computeStatusForResourceMetric(currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, timestampProposal time.Time, metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
  382. if metricSpec.Resource.Target.AverageValue != nil {
  383. var rawProposal int64
  384. replicaCountProposal, rawProposal, timestampProposal, err := a.replicaCalc.GetRawResourceReplicas(currentReplicas, metricSpec.Resource.Target.AverageValue.MilliValue(), metricSpec.Resource.Name, hpa.Namespace, selector)
  385. if err != nil {
  386. condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetResourceMetric", err)
  387. return 0, time.Time{}, "", condition, fmt.Errorf("failed to get %s utilization: %v", metricSpec.Resource.Name, err)
  388. }
  389. metricNameProposal = fmt.Sprintf("%s resource", metricSpec.Resource.Name)
  390. *status = autoscalingv2.MetricStatus{
  391. Type: autoscalingv2.ResourceMetricSourceType,
  392. Resource: &autoscalingv2.ResourceMetricStatus{
  393. Name: metricSpec.Resource.Name,
  394. Current: autoscalingv2.MetricValueStatus{
  395. AverageValue: resource.NewMilliQuantity(rawProposal, resource.DecimalSI),
  396. },
  397. },
  398. }
  399. return replicaCountProposal, timestampProposal, metricNameProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
  400. } else {
  401. if metricSpec.Resource.Target.AverageUtilization == nil {
  402. errMsg := "invalid resource metric source: neither a utilization target nor a value target was set"
  403. err = fmt.Errorf(errMsg)
  404. condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetResourceMetric", err)
  405. return 0, time.Time{}, "", condition, fmt.Errorf(errMsg)
  406. }
  407. targetUtilization := *metricSpec.Resource.Target.AverageUtilization
  408. var percentageProposal int32
  409. var rawProposal int64
  410. replicaCountProposal, percentageProposal, rawProposal, timestampProposal, err := a.replicaCalc.GetResourceReplicas(currentReplicas, targetUtilization, metricSpec.Resource.Name, hpa.Namespace, selector)
  411. if err != nil {
  412. condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetResourceMetric", err)
  413. return 0, time.Time{}, "", condition, fmt.Errorf("failed to get %s utilization: %v", metricSpec.Resource.Name, err)
  414. }
  415. metricNameProposal = fmt.Sprintf("%s resource utilization (percentage of request)", metricSpec.Resource.Name)
  416. *status = autoscalingv2.MetricStatus{
  417. Type: autoscalingv2.ResourceMetricSourceType,
  418. Resource: &autoscalingv2.ResourceMetricStatus{
  419. Name: metricSpec.Resource.Name,
  420. Current: autoscalingv2.MetricValueStatus{
  421. AverageUtilization: &percentageProposal,
  422. AverageValue: resource.NewMilliQuantity(rawProposal, resource.DecimalSI),
  423. },
  424. },
  425. }
  426. return replicaCountProposal, timestampProposal, metricNameProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
  427. }
  428. }
  429. // computeStatusForExternalMetric computes the desired number of replicas for the specified metric of type ExternalMetricSourceType.
  430. func (a *HorizontalController) computeStatusForExternalMetric(specReplicas, statusReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, timestampProposal time.Time, metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
  431. if metricSpec.External.Target.AverageValue != nil {
  432. replicaCountProposal, utilizationProposal, timestampProposal, err := a.replicaCalc.GetExternalPerPodMetricReplicas(statusReplicas, metricSpec.External.Target.AverageValue.MilliValue(), metricSpec.External.Metric.Name, hpa.Namespace, metricSpec.External.Metric.Selector)
  433. if err != nil {
  434. condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetExternalMetric", err)
  435. return 0, time.Time{}, "", condition, fmt.Errorf("failed to get %s external metric: %v", metricSpec.External.Metric.Name, err)
  436. }
  437. *status = autoscalingv2.MetricStatus{
  438. Type: autoscalingv2.ExternalMetricSourceType,
  439. External: &autoscalingv2.ExternalMetricStatus{
  440. Metric: autoscalingv2.MetricIdentifier{
  441. Name: metricSpec.External.Metric.Name,
  442. Selector: metricSpec.External.Metric.Selector,
  443. },
  444. Current: autoscalingv2.MetricValueStatus{
  445. AverageValue: resource.NewMilliQuantity(utilizationProposal, resource.DecimalSI),
  446. },
  447. },
  448. }
  449. return replicaCountProposal, timestampProposal, fmt.Sprintf("external metric %s(%+v)", metricSpec.External.Metric.Name, metricSpec.External.Metric.Selector), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
  450. }
  451. if metricSpec.External.Target.Value != nil {
  452. replicaCountProposal, utilizationProposal, timestampProposal, err := a.replicaCalc.GetExternalMetricReplicas(specReplicas, metricSpec.External.Target.Value.MilliValue(), metricSpec.External.Metric.Name, hpa.Namespace, metricSpec.External.Metric.Selector, selector)
  453. if err != nil {
  454. condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetExternalMetric", err)
  455. return 0, time.Time{}, "", condition, fmt.Errorf("failed to get external metric %s: %v", metricSpec.External.Metric.Name, err)
  456. }
  457. *status = autoscalingv2.MetricStatus{
  458. Type: autoscalingv2.ExternalMetricSourceType,
  459. External: &autoscalingv2.ExternalMetricStatus{
  460. Metric: autoscalingv2.MetricIdentifier{
  461. Name: metricSpec.External.Metric.Name,
  462. Selector: metricSpec.External.Metric.Selector,
  463. },
  464. Current: autoscalingv2.MetricValueStatus{
  465. Value: resource.NewMilliQuantity(utilizationProposal, resource.DecimalSI),
  466. },
  467. },
  468. }
  469. return replicaCountProposal, timestampProposal, fmt.Sprintf("external metric %s(%+v)", metricSpec.External.Metric.Name, metricSpec.External.Metric.Selector), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
  470. }
  471. errMsg := "invalid external metric source: neither a value target nor an average value target was set"
  472. err = fmt.Errorf(errMsg)
  473. condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetExternalMetric", err)
  474. return 0, time.Time{}, "", condition, fmt.Errorf(errMsg)
  475. }
  476. func (a *HorizontalController) recordInitialRecommendation(currentReplicas int32, key string) {
  477. if a.recommendations[key] == nil {
  478. a.recommendations[key] = []timestampedRecommendation{{currentReplicas, time.Now()}}
  479. }
  480. }
  481. func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.HorizontalPodAutoscaler, key string) error {
  482. // make a copy so that we never mutate the shared informer cache (conversion can mutate the object)
  483. hpav1 := hpav1Shared.DeepCopy()
  484. // then, convert to autoscaling/v2, which makes our lives easier when calculating metrics
  485. hpaRaw, err := unsafeConvertToVersionVia(hpav1, autoscalingv2.SchemeGroupVersion)
  486. if err != nil {
  487. a.eventRecorder.Event(hpav1, v1.EventTypeWarning, "FailedConvertHPA", err.Error())
  488. return fmt.Errorf("failed to convert the given HPA to %s: %v", autoscalingv2.SchemeGroupVersion.String(), err)
  489. }
  490. hpa := hpaRaw.(*autoscalingv2.HorizontalPodAutoscaler)
  491. hpaStatusOriginal := hpa.Status.DeepCopy()
  492. reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name)
  493. targetGV, err := schema.ParseGroupVersion(hpa.Spec.ScaleTargetRef.APIVersion)
  494. if err != nil {
  495. a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
  496. setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
  497. a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
  498. return fmt.Errorf("invalid API version in scale target reference: %v", err)
  499. }
  500. targetGK := schema.GroupKind{
  501. Group: targetGV.Group,
  502. Kind: hpa.Spec.ScaleTargetRef.Kind,
  503. }
  504. mappings, err := a.mapper.RESTMappings(targetGK)
  505. if err != nil {
  506. a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
  507. setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
  508. a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
  509. return fmt.Errorf("unable to determine resource for scale target reference: %v", err)
  510. }
  511. scale, targetGR, err := a.scaleForResourceMappings(hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, mappings)
  512. if err != nil {
  513. a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
  514. setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
  515. a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
  516. return fmt.Errorf("failed to query scale subresource for %s: %v", reference, err)
  517. }
  518. setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededGetScale", "the HPA controller was able to get the target's current scale")
  519. currentReplicas := scale.Spec.Replicas
  520. a.recordInitialRecommendation(currentReplicas, key)
  521. var (
  522. metricStatuses []autoscalingv2.MetricStatus
  523. metricDesiredReplicas int32
  524. metricName string
  525. )
  526. desiredReplicas := int32(0)
  527. rescaleReason := ""
  528. var minReplicas int32
  529. if hpa.Spec.MinReplicas != nil {
  530. minReplicas = *hpa.Spec.MinReplicas
  531. } else {
  532. // Default value
  533. minReplicas = 1
  534. }
  535. rescale := true
  536. if scale.Spec.Replicas == 0 && minReplicas != 0 {
  537. // Autoscaling is disabled for this resource
  538. desiredReplicas = 0
  539. rescale = false
  540. setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero")
  541. } else if currentReplicas > hpa.Spec.MaxReplicas {
  542. rescaleReason = "Current number of replicas above Spec.MaxReplicas"
  543. desiredReplicas = hpa.Spec.MaxReplicas
  544. } else if currentReplicas < minReplicas {
  545. rescaleReason = "Current number of replicas below Spec.MinReplicas"
  546. desiredReplicas = minReplicas
  547. } else {
  548. var metricTimestamp time.Time
  549. metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(hpa, scale, hpa.Spec.Metrics)
  550. if err != nil {
  551. a.setCurrentReplicasInStatus(hpa, currentReplicas)
  552. if err := a.updateStatusIfNeeded(hpaStatusOriginal, hpa); err != nil {
  553. utilruntime.HandleError(err)
  554. }
  555. a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error())
  556. return fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", reference, err)
  557. }
  558. klog.V(4).Infof("proposing %v desired replicas (based on %s from %s) for %s", metricDesiredReplicas, metricName, metricTimestamp, reference)
  559. rescaleMetric := ""
  560. if metricDesiredReplicas > desiredReplicas {
  561. desiredReplicas = metricDesiredReplicas
  562. rescaleMetric = metricName
  563. }
  564. if desiredReplicas > currentReplicas {
  565. rescaleReason = fmt.Sprintf("%s above target", rescaleMetric)
  566. }
  567. if desiredReplicas < currentReplicas {
  568. rescaleReason = "All metrics below target"
  569. }
  570. if hpa.Spec.Behavior == nil {
  571. desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas)
  572. } else {
  573. desiredReplicas = a.normalizeDesiredReplicasWithBehaviors(hpa, key, currentReplicas, desiredReplicas, minReplicas)
  574. }
  575. rescale = desiredReplicas != currentReplicas
  576. }
  577. if rescale {
  578. scale.Spec.Replicas = desiredReplicas
  579. _, err = a.scaleNamespacer.Scales(hpa.Namespace).Update(targetGR, scale)
  580. if err != nil {
  581. a.eventRecorder.Eventf(hpa, v1.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err.Error())
  582. setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedUpdateScale", "the HPA controller was unable to update the target scale: %v", err)
  583. a.setCurrentReplicasInStatus(hpa, currentReplicas)
  584. if err := a.updateStatusIfNeeded(hpaStatusOriginal, hpa); err != nil {
  585. utilruntime.HandleError(err)
  586. }
  587. return fmt.Errorf("failed to rescale %s: %v", reference, err)
  588. }
  589. setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededRescale", "the HPA controller was able to update the target scale to %d", desiredReplicas)
  590. a.eventRecorder.Eventf(hpa, v1.EventTypeNormal, "SuccessfulRescale", "New size: %d; reason: %s", desiredReplicas, rescaleReason)
  591. a.storeScaleEvent(hpa.Spec.Behavior, key, currentReplicas, desiredReplicas)
  592. klog.Infof("Successful rescale of %s, old size: %d, new size: %d, reason: %s",
  593. hpa.Name, currentReplicas, desiredReplicas, rescaleReason)
  594. } else {
  595. klog.V(4).Infof("decided not to scale %s to %v (last scale time was %s)", reference, desiredReplicas, hpa.Status.LastScaleTime)
  596. desiredReplicas = currentReplicas
  597. }
  598. a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
  599. return a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
  600. }
  601. // stabilizeRecommendation:
  602. // - replaces old recommendation with the newest recommendation,
  603. // - returns max of recommendations that are not older than downscaleStabilisationWindow.
  604. func (a *HorizontalController) stabilizeRecommendation(key string, prenormalizedDesiredReplicas int32) int32 {
  605. maxRecommendation := prenormalizedDesiredReplicas
  606. foundOldSample := false
  607. oldSampleIndex := 0
  608. cutoff := time.Now().Add(-a.downscaleStabilisationWindow)
  609. for i, rec := range a.recommendations[key] {
  610. if rec.timestamp.Before(cutoff) {
  611. foundOldSample = true
  612. oldSampleIndex = i
  613. } else if rec.recommendation > maxRecommendation {
  614. maxRecommendation = rec.recommendation
  615. }
  616. }
  617. if foundOldSample {
  618. a.recommendations[key][oldSampleIndex] = timestampedRecommendation{prenormalizedDesiredReplicas, time.Now()}
  619. } else {
  620. a.recommendations[key] = append(a.recommendations[key], timestampedRecommendation{prenormalizedDesiredReplicas, time.Now()})
  621. }
  622. return maxRecommendation
  623. }
  624. // normalizeDesiredReplicas takes the metrics desired replicas value and normalizes it based on the appropriate conditions (i.e. < maxReplicas, >
  625. // minReplicas, etc...)
  626. func (a *HorizontalController) normalizeDesiredReplicas(hpa *autoscalingv2.HorizontalPodAutoscaler, key string, currentReplicas int32, prenormalizedDesiredReplicas int32, minReplicas int32) int32 {
  627. stabilizedRecommendation := a.stabilizeRecommendation(key, prenormalizedDesiredReplicas)
  628. if stabilizedRecommendation != prenormalizedDesiredReplicas {
  629. setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "ScaleDownStabilized", "recent recommendations were higher than current one, applying the highest recent recommendation")
  630. } else {
  631. setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "ReadyForNewScale", "recommended size matches current size")
  632. }
  633. desiredReplicas, condition, reason := convertDesiredReplicasWithRules(currentReplicas, stabilizedRecommendation, minReplicas, hpa.Spec.MaxReplicas)
  634. if desiredReplicas == stabilizedRecommendation {
  635. setCondition(hpa, autoscalingv2.ScalingLimited, v1.ConditionFalse, condition, reason)
  636. } else {
  637. setCondition(hpa, autoscalingv2.ScalingLimited, v1.ConditionTrue, condition, reason)
  638. }
  639. return desiredReplicas
  640. }
  641. // NormalizationArg is used to pass all needed information between functions as one structure
  642. type NormalizationArg struct {
  643. Key string
  644. ScaleUpBehavior *autoscalingv2.HPAScalingRules
  645. ScaleDownBehavior *autoscalingv2.HPAScalingRules
  646. MinReplicas int32
  647. MaxReplicas int32
  648. CurrentReplicas int32
  649. DesiredReplicas int32
  650. }
  651. // normalizeDesiredReplicasWithBehaviors takes the metrics desired replicas value and normalizes it:
  652. // 1. Apply the basic conditions (i.e. < maxReplicas, > minReplicas, etc...)
  653. // 2. Apply the scale up/down limits from the hpaSpec.Behaviors (i.e. add no more than 4 pods)
  654. // 3. Apply the constraints period (i.e. add no more than 4 pods per minute)
  655. // 4. Apply the stabilization (i.e. add no more than 4 pods per minute, and pick the smallest recommendation during last 5 minutes)
  656. func (a *HorizontalController) normalizeDesiredReplicasWithBehaviors(hpa *autoscalingv2.HorizontalPodAutoscaler, key string, currentReplicas, prenormalizedDesiredReplicas, minReplicas int32) int32 {
  657. a.maybeInitScaleDownStabilizationWindow(hpa)
  658. normalizationArg := NormalizationArg{
  659. Key: key,
  660. ScaleUpBehavior: hpa.Spec.Behavior.ScaleUp,
  661. ScaleDownBehavior: hpa.Spec.Behavior.ScaleDown,
  662. MinReplicas: minReplicas,
  663. MaxReplicas: hpa.Spec.MaxReplicas,
  664. CurrentReplicas: currentReplicas,
  665. DesiredReplicas: prenormalizedDesiredReplicas}
  666. stabilizedRecommendation, reason, message := a.stabilizeRecommendationWithBehaviors(normalizationArg)
  667. normalizationArg.DesiredReplicas = stabilizedRecommendation
  668. if stabilizedRecommendation != prenormalizedDesiredReplicas {
  669. // "ScaleUpStabilized" || "ScaleDownStabilized"
  670. setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, reason, message)
  671. } else {
  672. setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "ReadyForNewScale", "recommended size matches current size")
  673. }
  674. desiredReplicas, reason, message := a.convertDesiredReplicasWithBehaviorRate(normalizationArg)
  675. if desiredReplicas == stabilizedRecommendation {
  676. setCondition(hpa, autoscalingv2.ScalingLimited, v1.ConditionFalse, reason, message)
  677. } else {
  678. setCondition(hpa, autoscalingv2.ScalingLimited, v1.ConditionTrue, reason, message)
  679. }
  680. return desiredReplicas
  681. }
  682. func (a *HorizontalController) maybeInitScaleDownStabilizationWindow(hpa *autoscalingv2.HorizontalPodAutoscaler) {
  683. behavior := hpa.Spec.Behavior
  684. if behavior != nil && behavior.ScaleDown != nil && behavior.ScaleDown.StabilizationWindowSeconds == nil {
  685. stabilizationWindowSeconds := (int32)(a.downscaleStabilisationWindow.Seconds())
  686. hpa.Spec.Behavior.ScaleDown.StabilizationWindowSeconds = &stabilizationWindowSeconds
  687. }
  688. }
  689. // getReplicasChangePerPeriod function find all the replica changes per period
  690. func getReplicasChangePerPeriod(periodSeconds int32, scaleEvents []timestampedScaleEvent) int32 {
  691. period := time.Second * time.Duration(periodSeconds)
  692. cutoff := time.Now().Add(-period)
  693. var replicas int32
  694. for _, rec := range scaleEvents {
  695. if rec.timestamp.After(cutoff) {
  696. replicas += rec.replicaChange
  697. }
  698. }
  699. return replicas
  700. }
  701. func (a *HorizontalController) getUnableComputeReplicaCountCondition(hpa *autoscalingv2.HorizontalPodAutoscaler, reason string, err error) (condition autoscalingv2.HorizontalPodAutoscalerCondition) {
  702. a.eventRecorder.Event(hpa, v1.EventTypeWarning, reason, err.Error())
  703. return autoscalingv2.HorizontalPodAutoscalerCondition{
  704. Type: autoscalingv2.ScalingActive,
  705. Status: v1.ConditionFalse,
  706. Reason: reason,
  707. Message: fmt.Sprintf("the HPA was unable to compute the replica count: %v", err),
  708. }
  709. }
  710. // storeScaleEvent stores (adds or replaces outdated) scale event.
  711. // outdated events to be replaced were marked as outdated in the `markScaleEventsOutdated` function
  712. func (a *HorizontalController) storeScaleEvent(behavior *autoscalingv2.HorizontalPodAutoscalerBehavior, key string, prevReplicas, newReplicas int32) {
  713. if behavior == nil {
  714. return // we should not store any event as they will not be used
  715. }
  716. var oldSampleIndex int
  717. var longestPolicyPeriod int32
  718. foundOldSample := false
  719. if newReplicas > prevReplicas {
  720. longestPolicyPeriod = getLongestPolicyPeriod(behavior.ScaleUp)
  721. markScaleEventsOutdated(a.scaleUpEvents[key], longestPolicyPeriod)
  722. replicaChange := newReplicas - prevReplicas
  723. for i, event := range a.scaleUpEvents[key] {
  724. if event.outdated {
  725. foundOldSample = true
  726. oldSampleIndex = i
  727. }
  728. }
  729. newEvent := timestampedScaleEvent{replicaChange, time.Now(), false}
  730. if foundOldSample {
  731. a.scaleUpEvents[key][oldSampleIndex] = newEvent
  732. } else {
  733. a.scaleUpEvents[key] = append(a.scaleUpEvents[key], newEvent)
  734. }
  735. } else {
  736. longestPolicyPeriod = getLongestPolicyPeriod(behavior.ScaleDown)
  737. markScaleEventsOutdated(a.scaleDownEvents[key], longestPolicyPeriod)
  738. replicaChange := prevReplicas - newReplicas
  739. for i, event := range a.scaleDownEvents[key] {
  740. if event.outdated {
  741. foundOldSample = true
  742. oldSampleIndex = i
  743. }
  744. }
  745. newEvent := timestampedScaleEvent{replicaChange, time.Now(), false}
  746. if foundOldSample {
  747. a.scaleDownEvents[key][oldSampleIndex] = newEvent
  748. } else {
  749. a.scaleDownEvents[key] = append(a.scaleDownEvents[key], newEvent)
  750. }
  751. }
  752. }
  753. // stabilizeRecommendationWithBehaviors:
  754. // - replaces old recommendation with the newest recommendation,
  755. // - returns {max,min} of recommendations that are not older than constraints.Scale{Up,Down}.DelaySeconds
  756. func (a *HorizontalController) stabilizeRecommendationWithBehaviors(args NormalizationArg) (int32, string, string) {
  757. recommendation := args.DesiredReplicas
  758. foundOldSample := false
  759. oldSampleIndex := 0
  760. var scaleDelaySeconds int32
  761. var reason, message string
  762. var betterRecommendation func(int32, int32) int32
  763. if args.DesiredReplicas >= args.CurrentReplicas {
  764. scaleDelaySeconds = *args.ScaleUpBehavior.StabilizationWindowSeconds
  765. betterRecommendation = min
  766. reason = "ScaleUpStabilized"
  767. message = "recent recommendations were lower than current one, applying the lowest recent recommendation"
  768. } else {
  769. scaleDelaySeconds = *args.ScaleDownBehavior.StabilizationWindowSeconds
  770. betterRecommendation = max
  771. reason = "ScaleDownStabilized"
  772. message = "recent recommendations were higher than current one, applying the highest recent recommendation"
  773. }
  774. maxDelaySeconds := max(*args.ScaleUpBehavior.StabilizationWindowSeconds, *args.ScaleDownBehavior.StabilizationWindowSeconds)
  775. obsoleteCutoff := time.Now().Add(-time.Second * time.Duration(maxDelaySeconds))
  776. cutoff := time.Now().Add(-time.Second * time.Duration(scaleDelaySeconds))
  777. for i, rec := range a.recommendations[args.Key] {
  778. if rec.timestamp.After(cutoff) {
  779. recommendation = betterRecommendation(rec.recommendation, recommendation)
  780. }
  781. if rec.timestamp.Before(obsoleteCutoff) {
  782. foundOldSample = true
  783. oldSampleIndex = i
  784. }
  785. }
  786. if foundOldSample {
  787. a.recommendations[args.Key][oldSampleIndex] = timestampedRecommendation{args.DesiredReplicas, time.Now()}
  788. } else {
  789. a.recommendations[args.Key] = append(a.recommendations[args.Key], timestampedRecommendation{args.DesiredReplicas, time.Now()})
  790. }
  791. return recommendation, reason, message
  792. }
  793. // convertDesiredReplicasWithBehaviorRate performs the actual normalization, given the constraint rate
  794. // It doesn't consider the stabilizationWindow, it is done separately
  795. func (a *HorizontalController) convertDesiredReplicasWithBehaviorRate(args NormalizationArg) (int32, string, string) {
  796. var possibleLimitingReason, possibleLimitingMessage string
  797. if args.DesiredReplicas > args.CurrentReplicas {
  798. scaleUpLimit := calculateScaleUpLimitWithScalingRules(args.CurrentReplicas, a.scaleUpEvents[args.Key], args.ScaleUpBehavior)
  799. if scaleUpLimit < args.CurrentReplicas {
  800. // We shouldn't scale up further until the scaleUpEvents will be cleaned up
  801. scaleUpLimit = args.CurrentReplicas
  802. }
  803. maximumAllowedReplicas := args.MaxReplicas
  804. if maximumAllowedReplicas > scaleUpLimit {
  805. maximumAllowedReplicas = scaleUpLimit
  806. possibleLimitingReason = "ScaleUpLimit"
  807. possibleLimitingMessage = "the desired replica count is increasing faster than the maximum scale rate"
  808. } else {
  809. possibleLimitingReason = "TooManyReplicas"
  810. possibleLimitingMessage = "the desired replica count is more than the maximum replica count"
  811. }
  812. if args.DesiredReplicas > maximumAllowedReplicas {
  813. return maximumAllowedReplicas, possibleLimitingReason, possibleLimitingMessage
  814. }
  815. } else if args.DesiredReplicas < args.CurrentReplicas {
  816. scaleDownLimit := calculateScaleDownLimitWithBehaviors(args.CurrentReplicas, a.scaleDownEvents[args.Key], args.ScaleDownBehavior)
  817. if scaleDownLimit > args.CurrentReplicas {
  818. // We shouldn't scale down further until the scaleDownEvents will be cleaned up
  819. scaleDownLimit = args.CurrentReplicas
  820. }
  821. minimumAllowedReplicas := args.MinReplicas
  822. if minimumAllowedReplicas < scaleDownLimit {
  823. minimumAllowedReplicas = scaleDownLimit
  824. possibleLimitingReason = "ScaleDownLimit"
  825. possibleLimitingMessage = "the desired replica count is decreasing faster than the maximum scale rate"
  826. } else {
  827. possibleLimitingMessage = "the desired replica count is less than the minimum replica count"
  828. possibleLimitingReason = "TooFewReplicas"
  829. }
  830. if args.DesiredReplicas < minimumAllowedReplicas {
  831. return minimumAllowedReplicas, possibleLimitingReason, possibleLimitingMessage
  832. }
  833. }
  834. return args.DesiredReplicas, "DesiredWithinRange", "the desired count is within the acceptable range"
  835. }
  836. // convertDesiredReplicas performs the actual normalization, without depending on `HorizontalController` or `HorizontalPodAutoscaler`
  837. func convertDesiredReplicasWithRules(currentReplicas, desiredReplicas, hpaMinReplicas, hpaMaxReplicas int32) (int32, string, string) {
  838. var minimumAllowedReplicas int32
  839. var maximumAllowedReplicas int32
  840. var possibleLimitingCondition string
  841. var possibleLimitingReason string
  842. minimumAllowedReplicas = hpaMinReplicas
  843. // Do not upscale too much to prevent incorrect rapid increase of the number of master replicas caused by
  844. // bogus CPU usage report from heapster/kubelet (like in issue #32304).
  845. scaleUpLimit := calculateScaleUpLimit(currentReplicas)
  846. if hpaMaxReplicas > scaleUpLimit {
  847. maximumAllowedReplicas = scaleUpLimit
  848. possibleLimitingCondition = "ScaleUpLimit"
  849. possibleLimitingReason = "the desired replica count is increasing faster than the maximum scale rate"
  850. } else {
  851. maximumAllowedReplicas = hpaMaxReplicas
  852. possibleLimitingCondition = "TooManyReplicas"
  853. possibleLimitingReason = "the desired replica count is more than the maximum replica count"
  854. }
  855. if desiredReplicas < minimumAllowedReplicas {
  856. possibleLimitingCondition = "TooFewReplicas"
  857. possibleLimitingReason = "the desired replica count is less than the minimum replica count"
  858. return minimumAllowedReplicas, possibleLimitingCondition, possibleLimitingReason
  859. } else if desiredReplicas > maximumAllowedReplicas {
  860. return maximumAllowedReplicas, possibleLimitingCondition, possibleLimitingReason
  861. }
  862. return desiredReplicas, "DesiredWithinRange", "the desired count is within the acceptable range"
  863. }
  864. func calculateScaleUpLimit(currentReplicas int32) int32 {
  865. return int32(math.Max(scaleUpLimitFactor*float64(currentReplicas), scaleUpLimitMinimum))
  866. }
  867. // markScaleEventsOutdated set 'outdated=true' flag for all scale events that are not used by any HPA object
  868. func markScaleEventsOutdated(scaleEvents []timestampedScaleEvent, longestPolicyPeriod int32) {
  869. period := time.Second * time.Duration(longestPolicyPeriod)
  870. cutoff := time.Now().Add(-period)
  871. for i, event := range scaleEvents {
  872. if event.timestamp.Before(cutoff) {
  873. // outdated scale event are marked for later reuse
  874. scaleEvents[i].outdated = true
  875. }
  876. }
  877. }
  878. func getLongestPolicyPeriod(scalingRules *autoscalingv2.HPAScalingRules) int32 {
  879. var longestPolicyPeriod int32
  880. for _, policy := range scalingRules.Policies {
  881. if policy.PeriodSeconds > longestPolicyPeriod {
  882. longestPolicyPeriod = policy.PeriodSeconds
  883. }
  884. }
  885. return longestPolicyPeriod
  886. }
  887. // calculateScaleUpLimitWithScalingRules returns the maximum number of pods that could be added for the given HPAScalingRules
  888. func calculateScaleUpLimitWithScalingRules(currentReplicas int32, scaleEvents []timestampedScaleEvent, scalingRules *autoscalingv2.HPAScalingRules) int32 {
  889. var result int32 = 0
  890. var proposed int32
  891. var selectPolicyFn func(int32, int32) int32
  892. if *scalingRules.SelectPolicy == autoscalingv2.DisabledPolicySelect {
  893. return currentReplicas // Scaling is disabled
  894. } else if *scalingRules.SelectPolicy == autoscalingv2.MinPolicySelect {
  895. selectPolicyFn = min // For scaling up, the lowest change ('min' policy) produces a minimum value
  896. } else {
  897. selectPolicyFn = max // Use the default policy otherwise to produce a highest possible change
  898. }
  899. for _, policy := range scalingRules.Policies {
  900. replicasAddedInCurrentPeriod := getReplicasChangePerPeriod(policy.PeriodSeconds, scaleEvents)
  901. periodStartReplicas := currentReplicas - replicasAddedInCurrentPeriod
  902. if policy.Type == autoscalingv2.PodsScalingPolicy {
  903. proposed = int32(periodStartReplicas + policy.Value)
  904. } else if policy.Type == autoscalingv2.PercentScalingPolicy {
  905. // the proposal has to be rounded up because the proposed change might not increase the replica count causing the target to never scale up
  906. proposed = int32(math.Ceil(float64(periodStartReplicas) * (1 + float64(policy.Value)/100)))
  907. }
  908. result = selectPolicyFn(result, proposed)
  909. }
  910. return result
  911. }
  912. // calculateScaleDownLimitWithBehavior returns the maximum number of pods that could be deleted for the given HPAScalingRules
  913. func calculateScaleDownLimitWithBehaviors(currentReplicas int32, scaleEvents []timestampedScaleEvent, scalingRules *autoscalingv2.HPAScalingRules) int32 {
  914. var result int32 = math.MaxInt32
  915. var proposed int32
  916. var selectPolicyFn func(int32, int32) int32
  917. if *scalingRules.SelectPolicy == autoscalingv2.DisabledPolicySelect {
  918. return currentReplicas // Scaling is disabled
  919. } else if *scalingRules.SelectPolicy == autoscalingv2.MinPolicySelect {
  920. selectPolicyFn = max // For scaling down, the lowest change ('min' policy) produces a maximum value
  921. } else {
  922. selectPolicyFn = min // Use the default policy otherwise to produce a highest possible change
  923. }
  924. for _, policy := range scalingRules.Policies {
  925. replicasDeletedInCurrentPeriod := getReplicasChangePerPeriod(policy.PeriodSeconds, scaleEvents)
  926. periodStartReplicas := currentReplicas + replicasDeletedInCurrentPeriod
  927. if policy.Type == autoscalingv2.PodsScalingPolicy {
  928. proposed = periodStartReplicas - policy.Value
  929. } else if policy.Type == autoscalingv2.PercentScalingPolicy {
  930. proposed = int32(float64(periodStartReplicas) * (1 - float64(policy.Value)/100))
  931. }
  932. result = selectPolicyFn(result, proposed)
  933. }
  934. return result
  935. }
  936. // scaleForResourceMappings attempts to fetch the scale for the
  937. // resource with the given name and namespace, trying each RESTMapping
  938. // in turn until a working one is found. If none work, the first error
  939. // is returned. It returns both the scale, as well as the group-resource from
  940. // the working mapping.
  941. func (a *HorizontalController) scaleForResourceMappings(namespace, name string, mappings []*apimeta.RESTMapping) (*autoscalingv1.Scale, schema.GroupResource, error) {
  942. var firstErr error
  943. for i, mapping := range mappings {
  944. targetGR := mapping.Resource.GroupResource()
  945. scale, err := a.scaleNamespacer.Scales(namespace).Get(targetGR, name)
  946. if err == nil {
  947. return scale, targetGR, nil
  948. }
  949. // if this is the first error, remember it,
  950. // then go on and try other mappings until we find a good one
  951. if i == 0 {
  952. firstErr = err
  953. }
  954. }
  955. // make sure we handle an empty set of mappings
  956. if firstErr == nil {
  957. firstErr = fmt.Errorf("unrecognized resource")
  958. }
  959. return nil, schema.GroupResource{}, firstErr
  960. }
  961. // setCurrentReplicasInStatus sets the current replica count in the status of the HPA.
  962. func (a *HorizontalController) setCurrentReplicasInStatus(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas int32) {
  963. a.setStatus(hpa, currentReplicas, hpa.Status.DesiredReplicas, hpa.Status.CurrentMetrics, false)
  964. }
  965. // setStatus recreates the status of the given HPA, updating the current and
  966. // desired replicas, as well as the metric statuses
  967. func (a *HorizontalController) setStatus(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, metricStatuses []autoscalingv2.MetricStatus, rescale bool) {
  968. hpa.Status = autoscalingv2.HorizontalPodAutoscalerStatus{
  969. CurrentReplicas: currentReplicas,
  970. DesiredReplicas: desiredReplicas,
  971. LastScaleTime: hpa.Status.LastScaleTime,
  972. CurrentMetrics: metricStatuses,
  973. Conditions: hpa.Status.Conditions,
  974. }
  975. if rescale {
  976. now := metav1.NewTime(time.Now())
  977. hpa.Status.LastScaleTime = &now
  978. }
  979. }
  980. // updateStatusIfNeeded calls updateStatus only if the status of the new HPA is not the same as the old status
  981. func (a *HorizontalController) updateStatusIfNeeded(oldStatus *autoscalingv2.HorizontalPodAutoscalerStatus, newHPA *autoscalingv2.HorizontalPodAutoscaler) error {
  982. // skip a write if we wouldn't need to update
  983. if apiequality.Semantic.DeepEqual(oldStatus, &newHPA.Status) {
  984. return nil
  985. }
  986. return a.updateStatus(newHPA)
  987. }
  988. // updateStatus actually does the update request for the status of the given HPA
  989. func (a *HorizontalController) updateStatus(hpa *autoscalingv2.HorizontalPodAutoscaler) error {
  990. // convert back to autoscalingv1
  991. hpaRaw, err := unsafeConvertToVersionVia(hpa, autoscalingv1.SchemeGroupVersion)
  992. if err != nil {
  993. a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedConvertHPA", err.Error())
  994. return fmt.Errorf("failed to convert the given HPA to %s: %v", autoscalingv2.SchemeGroupVersion.String(), err)
  995. }
  996. hpav1 := hpaRaw.(*autoscalingv1.HorizontalPodAutoscaler)
  997. _, err = a.hpaNamespacer.HorizontalPodAutoscalers(hpav1.Namespace).UpdateStatus(context.TODO(), hpav1, metav1.UpdateOptions{})
  998. if err != nil {
  999. a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedUpdateStatus", err.Error())
  1000. return fmt.Errorf("failed to update status for %s: %v", hpa.Name, err)
  1001. }
  1002. klog.V(2).Infof("Successfully updated status for %s", hpa.Name)
  1003. return nil
  1004. }
  1005. // unsafeConvertToVersionVia is like Scheme.UnsafeConvertToVersion, but it does so via an internal version first.
  1006. // We use it since working with v2alpha1 is convenient here, but we want to use the v1 client (and
  1007. // can't just use the internal version). Note that conversion mutates the object, so you need to deepcopy
  1008. // *before* you call this if the input object came out of a shared cache.
  1009. func unsafeConvertToVersionVia(obj runtime.Object, externalVersion schema.GroupVersion) (runtime.Object, error) {
  1010. objInt, err := legacyscheme.Scheme.UnsafeConvertToVersion(obj, schema.GroupVersion{Group: externalVersion.Group, Version: runtime.APIVersionInternal})
  1011. if err != nil {
  1012. return nil, fmt.Errorf("failed to convert the given object to the internal version: %v", err)
  1013. }
  1014. objExt, err := legacyscheme.Scheme.UnsafeConvertToVersion(objInt, externalVersion)
  1015. if err != nil {
  1016. return nil, fmt.Errorf("failed to convert the given object back to the external version: %v", err)
  1017. }
  1018. return objExt, err
  1019. }
  1020. // setCondition sets the specific condition type on the given HPA to the specified value with the given reason
  1021. // and message. The message and args are treated like a format string. The condition will be added if it is
  1022. // not present.
  1023. func setCondition(hpa *autoscalingv2.HorizontalPodAutoscaler, conditionType autoscalingv2.HorizontalPodAutoscalerConditionType, status v1.ConditionStatus, reason, message string, args ...interface{}) {
  1024. hpa.Status.Conditions = setConditionInList(hpa.Status.Conditions, conditionType, status, reason, message, args...)
  1025. }
  1026. // setConditionInList sets the specific condition type on the given HPA to the specified value with the given
  1027. // reason and message. The message and args are treated like a format string. The condition will be added if
  1028. // it is not present. The new list will be returned.
  1029. func setConditionInList(inputList []autoscalingv2.HorizontalPodAutoscalerCondition, conditionType autoscalingv2.HorizontalPodAutoscalerConditionType, status v1.ConditionStatus, reason, message string, args ...interface{}) []autoscalingv2.HorizontalPodAutoscalerCondition {
  1030. resList := inputList
  1031. var existingCond *autoscalingv2.HorizontalPodAutoscalerCondition
  1032. for i, condition := range resList {
  1033. if condition.Type == conditionType {
  1034. // can't take a pointer to an iteration variable
  1035. existingCond = &resList[i]
  1036. break
  1037. }
  1038. }
  1039. if existingCond == nil {
  1040. resList = append(resList, autoscalingv2.HorizontalPodAutoscalerCondition{
  1041. Type: conditionType,
  1042. })
  1043. existingCond = &resList[len(resList)-1]
  1044. }
  1045. if existingCond.Status != status {
  1046. existingCond.LastTransitionTime = metav1.Now()
  1047. }
  1048. existingCond.Status = status
  1049. existingCond.Reason = reason
  1050. existingCond.Message = fmt.Sprintf(message, args...)
  1051. return resList
  1052. }
  1053. func max(a, b int32) int32 {
  1054. if a >= b {
  1055. return a
  1056. } else {
  1057. return b
  1058. }
  1059. }
  1060. func min(a, b int32) int32 {
  1061. if a <= b {
  1062. return a
  1063. } else {
  1064. return b
  1065. }
  1066. }