results.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package ipamperf
  14. import (
  15. "bytes"
  16. "fmt"
  17. "sort"
  18. "sync"
  19. "time"
  20. "k8s.io/api/core/v1"
  21. "k8s.io/client-go/informers"
  22. clientset "k8s.io/client-go/kubernetes"
  23. "k8s.io/client-go/tools/cache"
  24. cloudprovider "k8s.io/cloud-provider"
  25. "k8s.io/klog"
  26. "k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
  27. nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
  28. )
  29. // Config represents the test configuration that is being run
  30. type Config struct {
  31. CreateQPS int // rate at which nodes are created
  32. KubeQPS int // rate for communication with kubernetes API
  33. CloudQPS int // rate for communication with cloud endpoint
  34. NumNodes int // number of nodes to created and monitored
  35. AllocatorType ipam.CIDRAllocatorType // type of allocator to run
  36. Cloud cloudprovider.Interface // cloud provider
  37. }
  38. type nodeTime struct {
  39. added time.Time // observed time for when node was added
  40. allocated time.Time // observed time for when node was assigned podCIDR
  41. podCIDR string // the allocated podCIDR range
  42. }
  43. // Observer represents the handle to test observer that watches for node changes
  44. // and tracks behavior
  45. type Observer struct {
  46. numAdded int // number of nodes observed added
  47. numAllocated int // number of nodes observed allocated podCIDR
  48. timing map[string]*nodeTime // per node timing
  49. numNodes int // the number of nodes to expect
  50. stopChan chan struct{} // for the shared informer
  51. wg sync.WaitGroup
  52. clientSet *clientset.Clientset
  53. }
  54. // JSONDuration is an alias of time.Duration to support custom Marshal code
  55. type JSONDuration time.Duration
  56. // NodeDuration represents the CIDR allocation time for each node
  57. type NodeDuration struct {
  58. Name string // node name
  59. PodCIDR string // the podCIDR that was assigned to the node
  60. Duration JSONDuration // how long it took to assign podCIDR
  61. }
  62. // Results represents the observed test results.
  63. type Results struct {
  64. Name string // name for the test
  65. Config *Config // handle to the test config
  66. Succeeded bool // whether all nodes were assigned podCIDR
  67. MaxAllocTime JSONDuration // the maximum time take for assignment per node
  68. TotalAllocTime JSONDuration // duration between first addition and last assignment
  69. NodeAllocTime []NodeDuration // assignment time by node name
  70. }
  71. // NewObserver creates a new observer given a handle to the Clientset
  72. func NewObserver(clientSet *clientset.Clientset, numNodes int) *Observer {
  73. o := &Observer{
  74. timing: map[string]*nodeTime{},
  75. numNodes: numNodes,
  76. clientSet: clientSet,
  77. stopChan: make(chan struct{}),
  78. }
  79. return o
  80. }
  81. // StartObserving starts an asynchronous loop to monitor for node changes.
  82. // Call Results() to get the test results after starting observer.
  83. func (o *Observer) StartObserving() error {
  84. o.monitor()
  85. klog.Infof("Test observer started")
  86. return nil
  87. }
  88. // Results returns the test results. It waits for the observer to finish
  89. // and returns the computed results of the observations.
  90. func (o *Observer) Results(name string, config *Config) *Results {
  91. var (
  92. firstAdd time.Time // earliest time any node was added (first node add)
  93. lastAssignment time.Time // latest time any node was assigned CIDR (last node assignment)
  94. )
  95. o.wg.Wait()
  96. close(o.stopChan) // shutdown the shared informer
  97. results := &Results{
  98. Name: name,
  99. Config: config,
  100. Succeeded: o.numAdded == o.numNodes && o.numAllocated == o.numNodes,
  101. MaxAllocTime: 0,
  102. NodeAllocTime: []NodeDuration{},
  103. }
  104. for name, nTime := range o.timing {
  105. addFound := !nTime.added.IsZero()
  106. if addFound && (firstAdd.IsZero() || nTime.added.Before(firstAdd)) {
  107. firstAdd = nTime.added
  108. }
  109. cidrFound := !nTime.allocated.IsZero()
  110. if cidrFound && nTime.allocated.After(lastAssignment) {
  111. lastAssignment = nTime.allocated
  112. }
  113. if addFound && cidrFound {
  114. allocTime := nTime.allocated.Sub(nTime.added)
  115. if allocTime > time.Duration(results.MaxAllocTime) {
  116. results.MaxAllocTime = JSONDuration(allocTime)
  117. }
  118. results.NodeAllocTime = append(results.NodeAllocTime, NodeDuration{
  119. Name: name, PodCIDR: nTime.podCIDR, Duration: JSONDuration(allocTime),
  120. })
  121. }
  122. }
  123. results.TotalAllocTime = JSONDuration(lastAssignment.Sub(firstAdd))
  124. sort.Slice(results.NodeAllocTime, func(i, j int) bool {
  125. return results.NodeAllocTime[i].Duration > results.NodeAllocTime[j].Duration
  126. })
  127. return results
  128. }
  129. func (o *Observer) monitor() {
  130. o.wg.Add(1)
  131. sharedInformer := informers.NewSharedInformerFactory(o.clientSet, 1*time.Second)
  132. nodeInformer := sharedInformer.Core().V1().Nodes().Informer()
  133. nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  134. AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) (err error) {
  135. name := node.GetName()
  136. if node.Spec.PodCIDR != "" {
  137. // ignore nodes that have PodCIDR (might be hold over from previous runs that did not get cleaned up)
  138. return
  139. }
  140. nTime := &nodeTime{}
  141. o.timing[name] = nTime
  142. nTime.added = time.Now()
  143. o.numAdded = o.numAdded + 1
  144. return
  145. }),
  146. UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) (err error) {
  147. name := newNode.GetName()
  148. nTime, found := o.timing[name]
  149. if !found {
  150. return // consistency check - ignore nodes we have not seen the add event for
  151. }
  152. // check if CIDR assigned and ignore redundant updates
  153. if newNode.Spec.PodCIDR != "" && nTime.podCIDR == "" {
  154. nTime.allocated = time.Now()
  155. nTime.podCIDR = newNode.Spec.PodCIDR
  156. o.numAllocated++
  157. if o.numAllocated%10 == 0 {
  158. klog.Infof("progress: %d/%d - %.2d%%", o.numAllocated, o.numNodes, (o.numAllocated * 100.0 / o.numNodes))
  159. }
  160. // do following check only if numAllocated is modified, as otherwise, redundant updates
  161. // can cause wg.Done() to be called multiple times, causing a panic
  162. if o.numAdded == o.numNodes && o.numAllocated == o.numNodes {
  163. klog.Info("All nodes assigned podCIDR")
  164. o.wg.Done()
  165. }
  166. }
  167. return
  168. }),
  169. })
  170. sharedInformer.Start(o.stopChan)
  171. }
  172. // String implements the Stringer interface and returns a multi-line representation
  173. // of the test results.
  174. func (results *Results) String() string {
  175. var b bytes.Buffer
  176. fmt.Fprintf(&b, "\n TestName: %s", results.Name)
  177. fmt.Fprintf(&b, "\n NumNodes: %d, CreateQPS: %d, KubeQPS: %d, CloudQPS: %d, Allocator: %v",
  178. results.Config.NumNodes, results.Config.CreateQPS, results.Config.KubeQPS,
  179. results.Config.CloudQPS, results.Config.AllocatorType)
  180. fmt.Fprintf(&b, "\n Succeeded: %v, TotalAllocTime: %v, MaxAllocTime: %v",
  181. results.Succeeded, time.Duration(results.TotalAllocTime), time.Duration(results.MaxAllocTime))
  182. fmt.Fprintf(&b, "\n %5s %-20s %-20s %s", "Num", "Node", "PodCIDR", "Duration (s)")
  183. for i, d := range results.NodeAllocTime {
  184. fmt.Fprintf(&b, "\n %5d %-20s %-20s %10.3f", i+1, d.Name, d.PodCIDR, time.Duration(d.Duration).Seconds())
  185. }
  186. return b.String()
  187. }
  188. // MarshalJSON implements the json.Marshaler interface
  189. func (jDuration *JSONDuration) MarshalJSON() ([]byte, error) {
  190. return []byte(fmt.Sprintf("\"%s\"", time.Duration(*jDuration).String())), nil
  191. }
  192. // UnmarshalJSON implements the json.Unmarshaler interface
  193. func (jDuration *JSONDuration) UnmarshalJSON(b []byte) (err error) {
  194. var d time.Duration
  195. if d, err = time.ParseDuration(string(b[1 : len(b)-1])); err == nil {
  196. *jDuration = JSONDuration(d)
  197. }
  198. return
  199. }