123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package podautoscaler
- import (
- "fmt"
- "math"
- "time"
- autoscaling "k8s.io/api/autoscaling/v2beta2"
- v1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/util/sets"
- corelisters "k8s.io/client-go/listers/core/v1"
- podutil "k8s.io/kubernetes/pkg/api/v1/pod"
- metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
- )
- const (
- // defaultTestingTolerance is default value for calculating when to
- // scale up/scale down.
- defaultTestingTolerance = 0.1
- defaultTestingCpuInitializationPeriod = 2 * time.Minute
- defaultTestingDelayOfInitialReadinessStatus = 10 * time.Second
- )
- type ReplicaCalculator struct {
- metricsClient metricsclient.MetricsClient
- podLister corelisters.PodLister
- tolerance float64
- cpuInitializationPeriod time.Duration
- delayOfInitialReadinessStatus time.Duration
- }
- func NewReplicaCalculator(metricsClient metricsclient.MetricsClient, podLister corelisters.PodLister, tolerance float64, cpuInitializationPeriod, delayOfInitialReadinessStatus time.Duration) *ReplicaCalculator {
- return &ReplicaCalculator{
- metricsClient: metricsClient,
- podLister: podLister,
- tolerance: tolerance,
- cpuInitializationPeriod: cpuInitializationPeriod,
- delayOfInitialReadinessStatus: delayOfInitialReadinessStatus,
- }
- }
- // GetResourceReplicas calculates the desired replica count based on a target resource utilization percentage
- // of the given resource for pods matching the given selector in the given namespace, and the current replica count
- func (c *ReplicaCalculator) GetResourceReplicas(currentReplicas int32, targetUtilization int32, resource v1.ResourceName, namespace string, selector labels.Selector) (replicaCount int32, utilization int32, rawUtilization int64, timestamp time.Time, err error) {
- metrics, timestamp, err := c.metricsClient.GetResourceMetric(resource, namespace, selector)
- if err != nil {
- return 0, 0, 0, time.Time{}, fmt.Errorf("unable to get metrics for resource %s: %v", resource, err)
- }
- podList, err := c.podLister.Pods(namespace).List(selector)
- if err != nil {
- return 0, 0, 0, time.Time{}, fmt.Errorf("unable to get pods while calculating replica count: %v", err)
- }
- itemsLen := len(podList)
- if itemsLen == 0 {
- return 0, 0, 0, time.Time{}, fmt.Errorf("no pods returned by selector while calculating replica count")
- }
- readyPodCount, ignoredPods, missingPods := groupPods(podList, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus)
- removeMetricsForPods(metrics, ignoredPods)
- requests, err := calculatePodRequests(podList, resource)
- if err != nil {
- return 0, 0, 0, time.Time{}, err
- }
- if len(metrics) == 0 {
- return 0, 0, 0, time.Time{}, fmt.Errorf("did not receive metrics for any ready pods")
- }
- usageRatio, utilization, rawUtilization, err := metricsclient.GetResourceUtilizationRatio(metrics, requests, targetUtilization)
- if err != nil {
- return 0, 0, 0, time.Time{}, err
- }
- rebalanceIgnored := len(ignoredPods) > 0 && usageRatio > 1.0
- if !rebalanceIgnored && len(missingPods) == 0 {
- if math.Abs(1.0-usageRatio) <= c.tolerance {
- // return the current replicas if the change would be too small
- return currentReplicas, utilization, rawUtilization, timestamp, nil
- }
- // if we don't have any unready or missing pods, we can calculate the new replica count now
- return int32(math.Ceil(usageRatio * float64(readyPodCount))), utilization, rawUtilization, timestamp, nil
- }
- if len(missingPods) > 0 {
- if usageRatio < 1.0 {
- // on a scale-down, treat missing pods as using 100% of the resource request
- for podName := range missingPods {
- metrics[podName] = metricsclient.PodMetric{Value: requests[podName]}
- }
- } else if usageRatio > 1.0 {
- // on a scale-up, treat missing pods as using 0% of the resource request
- for podName := range missingPods {
- metrics[podName] = metricsclient.PodMetric{Value: 0}
- }
- }
- }
- if rebalanceIgnored {
- // on a scale-up, treat unready pods as using 0% of the resource request
- for podName := range ignoredPods {
- metrics[podName] = metricsclient.PodMetric{Value: 0}
- }
- }
- // re-run the utilization calculation with our new numbers
- newUsageRatio, _, _, err := metricsclient.GetResourceUtilizationRatio(metrics, requests, targetUtilization)
- if err != nil {
- return 0, utilization, rawUtilization, time.Time{}, err
- }
- if math.Abs(1.0-newUsageRatio) <= c.tolerance || (usageRatio < 1.0 && newUsageRatio > 1.0) || (usageRatio > 1.0 && newUsageRatio < 1.0) {
- // return the current replicas if the change would be too small,
- // or if the new usage ratio would cause a change in scale direction
- return currentReplicas, utilization, rawUtilization, timestamp, nil
- }
- // return the result, where the number of replicas considered is
- // however many replicas factored into our calculation
- return int32(math.Ceil(newUsageRatio * float64(len(metrics)))), utilization, rawUtilization, timestamp, nil
- }
- // GetRawResourceReplicas calculates the desired replica count based on a target resource utilization (as a raw milli-value)
- // for pods matching the given selector in the given namespace, and the current replica count
- func (c *ReplicaCalculator) GetRawResourceReplicas(currentReplicas int32, targetUtilization int64, resource v1.ResourceName, namespace string, selector labels.Selector) (replicaCount int32, utilization int64, timestamp time.Time, err error) {
- metrics, timestamp, err := c.metricsClient.GetResourceMetric(resource, namespace, selector)
- if err != nil {
- return 0, 0, time.Time{}, fmt.Errorf("unable to get metrics for resource %s: %v", resource, err)
- }
- replicaCount, utilization, err = c.calcPlainMetricReplicas(metrics, currentReplicas, targetUtilization, namespace, selector, resource)
- return replicaCount, utilization, timestamp, err
- }
- // GetMetricReplicas calculates the desired replica count based on a target metric utilization
- // (as a milli-value) for pods matching the given selector in the given namespace, and the
- // current replica count
- func (c *ReplicaCalculator) GetMetricReplicas(currentReplicas int32, targetUtilization int64, metricName string, namespace string, selector labels.Selector, metricSelector labels.Selector) (replicaCount int32, utilization int64, timestamp time.Time, err error) {
- metrics, timestamp, err := c.metricsClient.GetRawMetric(metricName, namespace, selector, metricSelector)
- if err != nil {
- return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v", metricName, err)
- }
- replicaCount, utilization, err = c.calcPlainMetricReplicas(metrics, currentReplicas, targetUtilization, namespace, selector, v1.ResourceName(""))
- return replicaCount, utilization, timestamp, err
- }
- // calcPlainMetricReplicas calculates the desired replicas for plain (i.e. non-utilization percentage) metrics.
- func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUtilization int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, utilization int64, err error) {
- podList, err := c.podLister.Pods(namespace).List(selector)
- if err != nil {
- return 0, 0, fmt.Errorf("unable to get pods while calculating replica count: %v", err)
- }
- if len(podList) == 0 {
- return 0, 0, fmt.Errorf("no pods returned by selector while calculating replica count")
- }
- readyPodCount, ignoredPods, missingPods := groupPods(podList, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus)
- removeMetricsForPods(metrics, ignoredPods)
- if len(metrics) == 0 {
- return 0, 0, fmt.Errorf("did not receive metrics for any ready pods")
- }
- usageRatio, utilization := metricsclient.GetMetricUtilizationRatio(metrics, targetUtilization)
- rebalanceIgnored := len(ignoredPods) > 0 && usageRatio > 1.0
- if !rebalanceIgnored && len(missingPods) == 0 {
- if math.Abs(1.0-usageRatio) <= c.tolerance {
- // return the current replicas if the change would be too small
- return currentReplicas, utilization, nil
- }
- // if we don't have any unready or missing pods, we can calculate the new replica count now
- return int32(math.Ceil(usageRatio * float64(readyPodCount))), utilization, nil
- }
- if len(missingPods) > 0 {
- if usageRatio < 1.0 {
- // on a scale-down, treat missing pods as using 100% of the resource request
- for podName := range missingPods {
- metrics[podName] = metricsclient.PodMetric{Value: targetUtilization}
- }
- } else {
- // on a scale-up, treat missing pods as using 0% of the resource request
- for podName := range missingPods {
- metrics[podName] = metricsclient.PodMetric{Value: 0}
- }
- }
- }
- if rebalanceIgnored {
- // on a scale-up, treat unready pods as using 0% of the resource request
- for podName := range ignoredPods {
- metrics[podName] = metricsclient.PodMetric{Value: 0}
- }
- }
- // re-run the utilization calculation with our new numbers
- newUsageRatio, _ := metricsclient.GetMetricUtilizationRatio(metrics, targetUtilization)
- if math.Abs(1.0-newUsageRatio) <= c.tolerance || (usageRatio < 1.0 && newUsageRatio > 1.0) || (usageRatio > 1.0 && newUsageRatio < 1.0) {
- // return the current replicas if the change would be too small,
- // or if the new usage ratio would cause a change in scale direction
- return currentReplicas, utilization, nil
- }
- // return the result, where the number of replicas considered is
- // however many replicas factored into our calculation
- return int32(math.Ceil(newUsageRatio * float64(len(metrics)))), utilization, nil
- }
- // GetObjectMetricReplicas calculates the desired replica count based on a target metric utilization (as a milli-value)
- // for the given object in the given namespace, and the current replica count.
- func (c *ReplicaCalculator) GetObjectMetricReplicas(currentReplicas int32, targetUtilization int64, metricName string, namespace string, objectRef *autoscaling.CrossVersionObjectReference, selector labels.Selector, metricSelector labels.Selector) (replicaCount int32, utilization int64, timestamp time.Time, err error) {
- utilization, timestamp, err = c.metricsClient.GetObjectMetric(metricName, namespace, objectRef, metricSelector)
- if err != nil {
- return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v on %s %s/%s", metricName, objectRef.Kind, namespace, objectRef.Name, err)
- }
- usageRatio := float64(utilization) / float64(targetUtilization)
- replicaCount, timestamp, err = c.getUsageRatioReplicaCount(currentReplicas, usageRatio, namespace, selector)
- return replicaCount, utilization, timestamp, err
- }
- // getUsageRatioReplicaCount calculates the desired replica count based on usageRatio and ready pods count.
- // For currentReplicas=0 doesn't take into account ready pods count and tolerance to support scaling to zero pods.
- func (c *ReplicaCalculator) getUsageRatioReplicaCount(currentReplicas int32, usageRatio float64, namespace string, selector labels.Selector) (replicaCount int32, timestamp time.Time, err error) {
- if currentReplicas != 0 {
- if math.Abs(1.0-usageRatio) <= c.tolerance {
- // return the current replicas if the change would be too small
- return currentReplicas, timestamp, nil
- }
- readyPodCount := int64(0)
- readyPodCount, err = c.getReadyPodsCount(namespace, selector)
- if err != nil {
- return 0, time.Time{}, fmt.Errorf("unable to calculate ready pods: %s", err)
- }
- replicaCount = int32(math.Ceil(usageRatio * float64(readyPodCount)))
- } else {
- // Scale to zero or n pods depending on usageRatio
- replicaCount = int32(math.Ceil(usageRatio))
- }
- return replicaCount, timestamp, err
- }
- // GetObjectPerPodMetricReplicas calculates the desired replica count based on a target metric utilization (as a milli-value)
- // for the given object in the given namespace, and the current replica count.
- func (c *ReplicaCalculator) GetObjectPerPodMetricReplicas(statusReplicas int32, targetAverageUtilization int64, metricName string, namespace string, objectRef *autoscaling.CrossVersionObjectReference, metricSelector labels.Selector) (replicaCount int32, utilization int64, timestamp time.Time, err error) {
- utilization, timestamp, err = c.metricsClient.GetObjectMetric(metricName, namespace, objectRef, metricSelector)
- if err != nil {
- return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v on %s %s/%s", metricName, objectRef.Kind, namespace, objectRef.Name, err)
- }
- replicaCount = statusReplicas
- usageRatio := float64(utilization) / (float64(targetAverageUtilization) * float64(replicaCount))
- if math.Abs(1.0-usageRatio) > c.tolerance {
- // update number of replicas if change is large enough
- replicaCount = int32(math.Ceil(float64(utilization) / float64(targetAverageUtilization)))
- }
- utilization = int64(math.Ceil(float64(utilization) / float64(statusReplicas)))
- return replicaCount, utilization, timestamp, nil
- }
- // @TODO(mattjmcnaughton) Many different functions in this module use variations
- // of this function. Make this function generic, so we don't repeat the same
- // logic in multiple places.
- func (c *ReplicaCalculator) getReadyPodsCount(namespace string, selector labels.Selector) (int64, error) {
- podList, err := c.podLister.Pods(namespace).List(selector)
- if err != nil {
- return 0, fmt.Errorf("unable to get pods while calculating replica count: %v", err)
- }
- if len(podList) == 0 {
- return 0, fmt.Errorf("no pods returned by selector while calculating replica count")
- }
- readyPodCount := 0
- for _, pod := range podList {
- if pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod) {
- readyPodCount++
- }
- }
- return int64(readyPodCount), nil
- }
- // GetExternalMetricReplicas calculates the desired replica count based on a
- // target metric value (as a milli-value) for the external metric in the given
- // namespace, and the current replica count.
- func (c *ReplicaCalculator) GetExternalMetricReplicas(currentReplicas int32, targetUtilization int64, metricName, namespace string, metricSelector *metav1.LabelSelector, podSelector labels.Selector) (replicaCount int32, utilization int64, timestamp time.Time, err error) {
- metricLabelSelector, err := metav1.LabelSelectorAsSelector(metricSelector)
- if err != nil {
- return 0, 0, time.Time{}, err
- }
- metrics, timestamp, err := c.metricsClient.GetExternalMetric(metricName, namespace, metricLabelSelector)
- if err != nil {
- return 0, 0, time.Time{}, fmt.Errorf("unable to get external metric %s/%s/%+v: %s", namespace, metricName, metricSelector, err)
- }
- utilization = 0
- for _, val := range metrics {
- utilization = utilization + val
- }
- usageRatio := float64(utilization) / float64(targetUtilization)
- replicaCount, timestamp, err = c.getUsageRatioReplicaCount(currentReplicas, usageRatio, namespace, podSelector)
- return replicaCount, utilization, timestamp, err
- }
- // GetExternalPerPodMetricReplicas calculates the desired replica count based on a
- // target metric value per pod (as a milli-value) for the external metric in the
- // given namespace, and the current replica count.
- func (c *ReplicaCalculator) GetExternalPerPodMetricReplicas(statusReplicas int32, targetUtilizationPerPod int64, metricName, namespace string, metricSelector *metav1.LabelSelector) (replicaCount int32, utilization int64, timestamp time.Time, err error) {
- metricLabelSelector, err := metav1.LabelSelectorAsSelector(metricSelector)
- if err != nil {
- return 0, 0, time.Time{}, err
- }
- metrics, timestamp, err := c.metricsClient.GetExternalMetric(metricName, namespace, metricLabelSelector)
- if err != nil {
- return 0, 0, time.Time{}, fmt.Errorf("unable to get external metric %s/%s/%+v: %s", namespace, metricName, metricSelector, err)
- }
- utilization = 0
- for _, val := range metrics {
- utilization = utilization + val
- }
- replicaCount = statusReplicas
- usageRatio := float64(utilization) / (float64(targetUtilizationPerPod) * float64(replicaCount))
- if math.Abs(1.0-usageRatio) > c.tolerance {
- // update number of replicas if the change is large enough
- replicaCount = int32(math.Ceil(float64(utilization) / float64(targetUtilizationPerPod)))
- }
- utilization = int64(math.Ceil(float64(utilization) / float64(statusReplicas)))
- return replicaCount, utilization, timestamp, nil
- }
- func groupPods(pods []*v1.Pod, metrics metricsclient.PodMetricsInfo, resource v1.ResourceName, cpuInitializationPeriod, delayOfInitialReadinessStatus time.Duration) (readyPodCount int, ignoredPods sets.String, missingPods sets.String) {
- missingPods = sets.NewString()
- ignoredPods = sets.NewString()
- for _, pod := range pods {
- if pod.DeletionTimestamp != nil || pod.Status.Phase == v1.PodFailed {
- continue
- }
- // Pending pods are ignored.
- if pod.Status.Phase == v1.PodPending {
- ignoredPods.Insert(pod.Name)
- continue
- }
- // Pods missing metrics.
- metric, found := metrics[pod.Name]
- if !found {
- missingPods.Insert(pod.Name)
- continue
- }
- // Unready pods are ignored.
- if resource == v1.ResourceCPU {
- var ignorePod bool
- _, condition := podutil.GetPodCondition(&pod.Status, v1.PodReady)
- if condition == nil || pod.Status.StartTime == nil {
- ignorePod = true
- } else {
- // Pod still within possible initialisation period.
- if pod.Status.StartTime.Add(cpuInitializationPeriod).After(time.Now()) {
- // Ignore sample if pod is unready or one window of metric wasn't collected since last state transition.
- ignorePod = condition.Status == v1.ConditionFalse || metric.Timestamp.Before(condition.LastTransitionTime.Time.Add(metric.Window))
- } else {
- // Ignore metric if pod is unready and it has never been ready.
- ignorePod = condition.Status == v1.ConditionFalse && pod.Status.StartTime.Add(delayOfInitialReadinessStatus).After(condition.LastTransitionTime.Time)
- }
- }
- if ignorePod {
- ignoredPods.Insert(pod.Name)
- continue
- }
- }
- readyPodCount++
- }
- return
- }
- func calculatePodRequests(pods []*v1.Pod, resource v1.ResourceName) (map[string]int64, error) {
- requests := make(map[string]int64, len(pods))
- for _, pod := range pods {
- podSum := int64(0)
- for _, container := range pod.Spec.Containers {
- if containerRequest, ok := container.Resources.Requests[resource]; ok {
- podSum += containerRequest.MilliValue()
- } else {
- return nil, fmt.Errorf("missing request for %s", resource)
- }
- }
- requests[pod.Name] = podSum
- }
- return requests, nil
- }
- func removeMetricsForPods(metrics metricsclient.PodMetricsInfo, pods sets.String) {
- for _, pod := range pods.UnsortedList() {
- delete(metrics, pod)
- }
- }
|