events.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduling
  14. import (
  15. "context"
  16. "fmt"
  17. "strings"
  18. "sync"
  19. "time"
  20. "k8s.io/api/core/v1"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/fields"
  23. "k8s.io/apimachinery/pkg/runtime"
  24. "k8s.io/apimachinery/pkg/util/wait"
  25. "k8s.io/apimachinery/pkg/watch"
  26. clientset "k8s.io/client-go/kubernetes"
  27. "k8s.io/client-go/tools/cache"
  28. "k8s.io/kubernetes/test/e2e/framework"
  29. "github.com/onsi/ginkgo"
  30. )
  31. func scheduleSuccessEvent(ns, podName, nodeName string) func(*v1.Event) bool {
  32. return func(e *v1.Event) bool {
  33. return e.Type == v1.EventTypeNormal &&
  34. e.Reason == "Scheduled" &&
  35. strings.HasPrefix(e.Name, podName) &&
  36. strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v/%v to %v", ns, podName, nodeName))
  37. }
  38. }
  39. func scheduleFailureEvent(podName string) func(*v1.Event) bool {
  40. return func(e *v1.Event) bool {
  41. return strings.HasPrefix(e.Name, podName) &&
  42. e.Type == "Warning" &&
  43. e.Reason == "FailedScheduling"
  44. }
  45. }
  46. // Action is a function to be performed by the system.
  47. type Action func() error
  48. // observeNodeUpdateAfterAction returns true if a node update matching the predicate was emitted
  49. // from the system after performing the supplied action.
  50. func observeNodeUpdateAfterAction(c clientset.Interface, nodeName string, nodePredicate func(*v1.Node) bool, action Action) (bool, error) {
  51. observedMatchingNode := false
  52. nodeSelector := fields.OneTermEqualSelector("metadata.name", nodeName)
  53. informerStartedChan := make(chan struct{})
  54. var informerStartedGuard sync.Once
  55. _, controller := cache.NewInformer(
  56. &cache.ListWatch{
  57. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  58. options.FieldSelector = nodeSelector.String()
  59. ls, err := c.CoreV1().Nodes().List(context.TODO(), options)
  60. return ls, err
  61. },
  62. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  63. // Signal parent goroutine that watching has begun.
  64. defer informerStartedGuard.Do(func() { close(informerStartedChan) })
  65. options.FieldSelector = nodeSelector.String()
  66. w, err := c.CoreV1().Nodes().Watch(context.TODO(), options)
  67. return w, err
  68. },
  69. },
  70. &v1.Node{},
  71. 0,
  72. cache.ResourceEventHandlerFuncs{
  73. UpdateFunc: func(oldObj, newObj interface{}) {
  74. n, ok := newObj.(*v1.Node)
  75. framework.ExpectEqual(ok, true)
  76. if nodePredicate(n) {
  77. observedMatchingNode = true
  78. }
  79. },
  80. },
  81. )
  82. // Start the informer and block this goroutine waiting for the started signal.
  83. informerStopChan := make(chan struct{})
  84. defer func() { close(informerStopChan) }()
  85. go controller.Run(informerStopChan)
  86. <-informerStartedChan
  87. // Invoke the action function.
  88. err := action()
  89. if err != nil {
  90. return false, err
  91. }
  92. // Poll whether the informer has found a matching node update with a timeout.
  93. // Wait up 2 minutes polling every second.
  94. timeout := 2 * time.Minute
  95. interval := 1 * time.Second
  96. err = wait.Poll(interval, timeout, func() (bool, error) {
  97. return observedMatchingNode, nil
  98. })
  99. return err == nil, err
  100. }
  101. // observeEventAfterAction returns true if an event matching the predicate was emitted
  102. // from the system after performing the supplied action.
  103. func observeEventAfterAction(c clientset.Interface, ns string, eventPredicate func(*v1.Event) bool, action Action) (bool, error) {
  104. observedMatchingEvent := false
  105. informerStartedChan := make(chan struct{})
  106. var informerStartedGuard sync.Once
  107. // Create an informer to list/watch events from the test framework namespace.
  108. _, controller := cache.NewInformer(
  109. &cache.ListWatch{
  110. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  111. ls, err := c.CoreV1().Events(ns).List(context.TODO(), options)
  112. return ls, err
  113. },
  114. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  115. // Signal parent goroutine that watching has begun.
  116. defer informerStartedGuard.Do(func() { close(informerStartedChan) })
  117. w, err := c.CoreV1().Events(ns).Watch(context.TODO(), options)
  118. return w, err
  119. },
  120. },
  121. &v1.Event{},
  122. 0,
  123. cache.ResourceEventHandlerFuncs{
  124. AddFunc: func(obj interface{}) {
  125. e, ok := obj.(*v1.Event)
  126. ginkgo.By(fmt.Sprintf("Considering event: \nType = [%s], Name = [%s], Reason = [%s], Message = [%s]", e.Type, e.Name, e.Reason, e.Message))
  127. framework.ExpectEqual(ok, true)
  128. if eventPredicate(e) {
  129. observedMatchingEvent = true
  130. }
  131. },
  132. },
  133. )
  134. // Start the informer and block this goroutine waiting for the started signal.
  135. informerStopChan := make(chan struct{})
  136. defer func() { close(informerStopChan) }()
  137. go controller.Run(informerStopChan)
  138. <-informerStartedChan
  139. // Invoke the action function.
  140. err := action()
  141. if err != nil {
  142. return false, err
  143. }
  144. // Poll whether the informer has found a matching event with a timeout.
  145. // Wait up 2 minutes polling every second.
  146. timeout := 2 * time.Minute
  147. interval := 1 * time.Second
  148. err = wait.Poll(interval, timeout, func() (bool, error) {
  149. return observedMatchingEvent, nil
  150. })
  151. return err == nil, err
  152. }