load_balance_test.go 8.9 KB


  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package admissionwebhook
  14. import (
  15. "context"
  16. "crypto/tls"
  17. "crypto/x509"
  18. "encoding/json"
  19. "fmt"
  20. "io/ioutil"
  21. "net"
  22. "net/http"
  23. "sync"
  24. "sync/atomic"
  25. "testing"
  26. "time"
  27. "k8s.io/api/admission/v1beta1"
  28. admissionv1beta1 "k8s.io/api/admissionregistration/v1beta1"
  29. corev1 "k8s.io/api/core/v1"
  30. v1 "k8s.io/api/core/v1"
  31. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  32. "k8s.io/apimachinery/pkg/types"
  33. "k8s.io/apimachinery/pkg/util/wait"
  34. clientset "k8s.io/client-go/kubernetes"
  35. "k8s.io/client-go/rest"
  36. kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
  37. "k8s.io/kubernetes/test/integration/framework"
  38. )
  39. const (
  40. testLoadBalanceClientUsername = "webhook-balance-integration-client"
  41. )
  42. // TestWebhookLoadBalance ensures that the admission webhook opens multiple connections to backends to satisfy concurrent requests
  43. func TestWebhookLoadBalance(t *testing.T) {
  44. roots := x509.NewCertPool()
  45. if !roots.AppendCertsFromPEM(localhostCert) {
  46. t.Fatal("Failed to append Cert from PEM")
  47. }
  48. cert, err := tls.X509KeyPair(localhostCert, localhostKey)
  49. if err != nil {
  50. t.Fatalf("Failed to build cert with error: %+v", err)
  51. }
  52. localListener, err := net.Listen("tcp", "127.0.0.1:0")
  53. if err != nil {
  54. if localListener, err = net.Listen("tcp6", "[::1]:0"); err != nil {
  55. t.Fatal(err)
  56. }
  57. }
  58. trackingListener := &connectionTrackingListener{delegate: localListener}
  59. recorder := &connectionRecorder{}
  60. handler := newLoadBalanceWebhookHandler(recorder)
  61. httpServer := &http.Server{
  62. Handler: handler,
  63. TLSConfig: &tls.Config{
  64. RootCAs: roots,
  65. Certificates: []tls.Certificate{cert},
  66. },
  67. }
  68. go func() {
  69. httpServer.ServeTLS(trackingListener, "", "")
  70. }()
  71. defer httpServer.Close()
  72. webhookURL := "https://" + localListener.Addr().String()
  73. s := kubeapiservertesting.StartTestServerOrDie(t, kubeapiservertesting.NewDefaultTestServerOptions(), []string{
  74. "--disable-admission-plugins=ServiceAccount",
  75. }, framework.SharedEtcd())
  76. defer s.TearDownFn()
  77. // Configure a client with a distinct user name so that it is easy to distinguish requests
  78. // made by the client from requests made by controllers. We use this to filter out requests
  79. // before recording them to ensure we don't accidentally mistake requests from controllers
  80. // as requests made by the client.
  81. clientConfig := rest.CopyConfig(s.ClientConfig)
  82. clientConfig.QPS = 100
  83. clientConfig.Burst = 200
  84. clientConfig.Impersonate.UserName = testLoadBalanceClientUsername
  85. clientConfig.Impersonate.Groups = []string{"system:masters", "system:authenticated"}
  86. client, err := clientset.NewForConfig(clientConfig)
  87. if err != nil {
  88. t.Fatalf("unexpected error: %v", err)
  89. }
  90. _, err = client.CoreV1().Pods("default").Create(context.TODO(), loadBalanceMarkerFixture, metav1.CreateOptions{})
  91. if err != nil {
  92. t.Fatal(err)
  93. }
  94. upCh := recorder.Reset()
  95. ns := "load-balance"
  96. _, err = client.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}}, metav1.CreateOptions{})
  97. if err != nil {
  98. t.Fatal(err)
  99. }
  100. fail := admissionv1beta1.Fail
  101. mutatingCfg, err := client.AdmissionregistrationV1beta1().MutatingWebhookConfigurations().Create(context.TODO(), &admissionv1beta1.MutatingWebhookConfiguration{
  102. ObjectMeta: metav1.ObjectMeta{Name: "admission.integration.test"},
  103. Webhooks: []admissionv1beta1.MutatingWebhook{{
  104. Name: "admission.integration.test",
  105. ClientConfig: admissionv1beta1.WebhookClientConfig{
  106. URL: &webhookURL,
  107. CABundle: localhostCert,
  108. },
  109. Rules: []admissionv1beta1.RuleWithOperations{{
  110. Operations: []admissionv1beta1.OperationType{admissionv1beta1.OperationAll},
  111. Rule: admissionv1beta1.Rule{APIGroups: []string{""}, APIVersions: []string{"v1"}, Resources: []string{"pods"}},
  112. }},
  113. FailurePolicy: &fail,
  114. AdmissionReviewVersions: []string{"v1beta1"},
  115. }},
  116. }, metav1.CreateOptions{})
  117. if err != nil {
  118. t.Fatal(err)
  119. }
  120. defer func() {
  121. err := client.AdmissionregistrationV1beta1().MutatingWebhookConfigurations().Delete(context.TODO(), mutatingCfg.GetName(), &metav1.DeleteOptions{})
  122. if err != nil {
  123. t.Fatal(err)
  124. }
  125. }()
  126. // wait until new webhook is called the first time
  127. if err := wait.PollImmediate(time.Millisecond*5, wait.ForeverTestTimeout, func() (bool, error) {
  128. _, err = client.CoreV1().Pods("default").Patch(context.TODO(), loadBalanceMarkerFixture.Name, types.JSONPatchType, []byte("[]"), metav1.PatchOptions{})
  129. select {
  130. case <-upCh:
  131. return true, nil
  132. default:
  133. t.Logf("Waiting for webhook to become effective, getting marker object: %v", err)
  134. return false, nil
  135. }
  136. }); err != nil {
  137. t.Fatal(err)
  138. }
  139. pod := func() *corev1.Pod {
  140. return &corev1.Pod{
  141. ObjectMeta: metav1.ObjectMeta{
  142. Namespace: ns,
  143. GenerateName: "loadbalance-",
  144. },
  145. Spec: corev1.PodSpec{
  146. Containers: []v1.Container{{
  147. Name: "fake-name",
  148. Image: "fakeimage",
  149. }},
  150. },
  151. }
  152. }
  153. // Submit 10 parallel requests
  154. wg := &sync.WaitGroup{}
  155. for i := 0; i < 10; i++ {
  156. wg.Add(1)
  157. go func() {
  158. defer wg.Done()
  159. _, err := client.CoreV1().Pods(ns).Create(context.TODO(), pod(), metav1.CreateOptions{})
  160. if err != nil {
  161. t.Error(err)
  162. }
  163. }()
  164. }
  165. wg.Wait()
  166. if actual := atomic.LoadInt64(&trackingListener.connections); actual < 10 {
  167. t.Errorf("expected at least 10 connections, got %d", actual)
  168. }
  169. trackingListener.Reset()
  170. // Submit 10 more parallel requests
  171. wg = &sync.WaitGroup{}
  172. for i := 0; i < 10; i++ {
  173. wg.Add(1)
  174. go func() {
  175. defer wg.Done()
  176. _, err := client.CoreV1().Pods(ns).Create(context.TODO(), pod(), metav1.CreateOptions{})
  177. if err != nil {
  178. t.Error(err)
  179. }
  180. }()
  181. }
  182. wg.Wait()
  183. if actual := atomic.LoadInt64(&trackingListener.connections); actual > 0 {
  184. t.Errorf("expected no additional connections (reusing kept-alive connections), got %d", actual)
  185. }
  186. }
  187. type connectionRecorder struct {
  188. mu sync.Mutex
  189. upCh chan struct{}
  190. upOnce sync.Once
  191. }
  192. // Reset zeros out all counts and returns a channel that is closed when the first admission of the
  193. // marker object is received.
  194. func (i *connectionRecorder) Reset() chan struct{} {
  195. i.mu.Lock()
  196. defer i.mu.Unlock()
  197. i.upCh = make(chan struct{})
  198. i.upOnce = sync.Once{}
  199. return i.upCh
  200. }
  201. func (i *connectionRecorder) MarkerReceived() {
  202. i.mu.Lock()
  203. defer i.mu.Unlock()
  204. i.upOnce.Do(func() {
  205. close(i.upCh)
  206. })
  207. }
  208. func newLoadBalanceWebhookHandler(recorder *connectionRecorder) http.Handler {
  209. allow := func(w http.ResponseWriter) {
  210. w.Header().Set("Content-Type", "application/json")
  211. json.NewEncoder(w).Encode(&v1beta1.AdmissionReview{
  212. Response: &v1beta1.AdmissionResponse{
  213. Allowed: true,
  214. },
  215. })
  216. }
  217. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  218. fmt.Println(r.Proto)
  219. defer r.Body.Close()
  220. data, err := ioutil.ReadAll(r.Body)
  221. if err != nil {
  222. http.Error(w, err.Error(), 400)
  223. }
  224. review := v1beta1.AdmissionReview{}
  225. if err := json.Unmarshal(data, &review); err != nil {
  226. http.Error(w, err.Error(), 400)
  227. }
  228. if review.Request.UserInfo.Username != testLoadBalanceClientUsername {
  229. // skip requests not originating from this integration test's client
  230. allow(w)
  231. return
  232. }
  233. if len(review.Request.Object.Raw) == 0 {
  234. http.Error(w, err.Error(), 400)
  235. }
  236. pod := &corev1.Pod{}
  237. if err := json.Unmarshal(review.Request.Object.Raw, pod); err != nil {
  238. http.Error(w, err.Error(), 400)
  239. }
  240. // When resetting between tests, a marker object is patched until this webhook
  241. // observes it, at which point it is considered ready.
  242. if pod.Namespace == loadBalanceMarkerFixture.Namespace && pod.Name == loadBalanceMarkerFixture.Name {
  243. recorder.MarkerReceived()
  244. allow(w)
  245. return
  246. }
  247. // simulate a loaded backend
  248. time.Sleep(2 * time.Second)
  249. allow(w)
  250. })
  251. }
  252. var loadBalanceMarkerFixture = &corev1.Pod{
  253. ObjectMeta: metav1.ObjectMeta{
  254. Namespace: "default",
  255. Name: "marker",
  256. },
  257. Spec: corev1.PodSpec{
  258. Containers: []v1.Container{{
  259. Name: "fake-name",
  260. Image: "fakeimage",
  261. }},
  262. },
  263. }
  264. type connectionTrackingListener struct {
  265. connections int64
  266. delegate net.Listener
  267. }
  268. func (c *connectionTrackingListener) Reset() {
  269. atomic.StoreInt64(&c.connections, 0)
  270. }
  271. func (c *connectionTrackingListener) Accept() (net.Conn, error) {
  272. conn, err := c.delegate.Accept()
  273. if err == nil {
  274. atomic.AddInt64(&c.connections, 1)
  275. }
  276. return conn, err
  277. }
  278. func (c *connectionTrackingListener) Close() error {
  279. return c.delegate.Close()
  280. }
  281. func (c *connectionTrackingListener) Addr() net.Addr {
  282. return c.delegate.Addr()
  283. }