events.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package common
  14. import (
  15. "fmt"
  16. "sync"
  17. "time"
  18. "k8s.io/api/core/v1"
  19. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  20. "k8s.io/apimachinery/pkg/fields"
  21. "k8s.io/apimachinery/pkg/runtime"
  22. "k8s.io/apimachinery/pkg/util/wait"
  23. "k8s.io/apimachinery/pkg/watch"
  24. "k8s.io/client-go/tools/cache"
  25. "k8s.io/kubernetes/test/e2e/framework"
  26. . "github.com/onsi/ginkgo"
  27. . "github.com/onsi/gomega"
  28. )
  29. type Action func() error
  30. // Returns true if a node update matching the predicate was emitted from the
  31. // system after performing the supplied action.
  32. func ObserveNodeUpdateAfterAction(f *framework.Framework, nodeName string, nodePredicate func(*v1.Node) bool, action Action) (bool, error) {
  33. observedMatchingNode := false
  34. nodeSelector := fields.OneTermEqualSelector("metadata.name", nodeName)
  35. informerStartedChan := make(chan struct{})
  36. var informerStartedGuard sync.Once
  37. _, controller := cache.NewInformer(
  38. &cache.ListWatch{
  39. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  40. options.FieldSelector = nodeSelector.String()
  41. ls, err := f.ClientSet.CoreV1().Nodes().List(options)
  42. return ls, err
  43. },
  44. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  45. // Signal parent goroutine that watching has begun.
  46. defer informerStartedGuard.Do(func() { close(informerStartedChan) })
  47. options.FieldSelector = nodeSelector.String()
  48. w, err := f.ClientSet.CoreV1().Nodes().Watch(options)
  49. return w, err
  50. },
  51. },
  52. &v1.Node{},
  53. 0,
  54. cache.ResourceEventHandlerFuncs{
  55. UpdateFunc: func(oldObj, newObj interface{}) {
  56. n, ok := newObj.(*v1.Node)
  57. Expect(ok).To(Equal(true))
  58. if nodePredicate(n) {
  59. observedMatchingNode = true
  60. }
  61. },
  62. },
  63. )
  64. // Start the informer and block this goroutine waiting for the started signal.
  65. informerStopChan := make(chan struct{})
  66. defer func() { close(informerStopChan) }()
  67. go controller.Run(informerStopChan)
  68. <-informerStartedChan
  69. // Invoke the action function.
  70. err := action()
  71. if err != nil {
  72. return false, err
  73. }
  74. // Poll whether the informer has found a matching node update with a timeout.
  75. // Wait up 2 minutes polling every second.
  76. timeout := 2 * time.Minute
  77. interval := 1 * time.Second
  78. err = wait.Poll(interval, timeout, func() (bool, error) {
  79. return observedMatchingNode, nil
  80. })
  81. return err == nil, err
  82. }
  83. // Returns true if an event matching the predicate was emitted from the system
  84. // after performing the supplied action.
  85. func ObserveEventAfterAction(f *framework.Framework, eventPredicate func(*v1.Event) bool, action Action) (bool, error) {
  86. observedMatchingEvent := false
  87. informerStartedChan := make(chan struct{})
  88. var informerStartedGuard sync.Once
  89. // Create an informer to list/watch events from the test framework namespace.
  90. _, controller := cache.NewInformer(
  91. &cache.ListWatch{
  92. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  93. ls, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).List(options)
  94. return ls, err
  95. },
  96. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  97. // Signal parent goroutine that watching has begun.
  98. defer informerStartedGuard.Do(func() { close(informerStartedChan) })
  99. w, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).Watch(options)
  100. return w, err
  101. },
  102. },
  103. &v1.Event{},
  104. 0,
  105. cache.ResourceEventHandlerFuncs{
  106. AddFunc: func(obj interface{}) {
  107. e, ok := obj.(*v1.Event)
  108. By(fmt.Sprintf("Considering event: \nType = [%s], Name = [%s], Reason = [%s], Message = [%s]", e.Type, e.Name, e.Reason, e.Message))
  109. Expect(ok).To(Equal(true))
  110. if ok && eventPredicate(e) {
  111. observedMatchingEvent = true
  112. }
  113. },
  114. },
  115. )
  116. // Start the informer and block this goroutine waiting for the started signal.
  117. informerStopChan := make(chan struct{})
  118. defer func() { close(informerStopChan) }()
  119. go controller.Run(informerStopChan)
  120. <-informerStartedChan
  121. // Invoke the action function.
  122. err := action()
  123. if err != nil {
  124. return false, err
  125. }
  126. // Poll whether the informer has found a matching event with a timeout.
  127. // Wait up 2 minutes polling every second.
  128. timeout := 2 * time.Minute
  129. interval := 1 * time.Second
  130. err = wait.Poll(interval, timeout, func() (bool, error) {
  131. return observedMatchingEvent, nil
  132. })
  133. return err == nil, err
  134. }