client_test.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package client
  14. import (
  15. "fmt"
  16. "log"
  17. "reflect"
  18. rt "runtime"
  19. "sync"
  20. "testing"
  21. "time"
  22. v1 "k8s.io/api/core/v1"
  23. apierrors "k8s.io/apimachinery/pkg/api/errors"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/apimachinery/pkg/fields"
  26. "k8s.io/apimachinery/pkg/labels"
  27. "k8s.io/apimachinery/pkg/runtime"
  28. "k8s.io/apimachinery/pkg/runtime/schema"
  29. "k8s.io/apimachinery/pkg/types"
  30. "k8s.io/apimachinery/pkg/util/wait"
  31. "k8s.io/apimachinery/pkg/watch"
  32. clientset "k8s.io/client-go/kubernetes"
  33. kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
  34. "k8s.io/kubernetes/pkg/api/legacyscheme"
  35. "k8s.io/kubernetes/pkg/version"
  36. "k8s.io/kubernetes/test/integration/framework"
  37. imageutils "k8s.io/kubernetes/test/utils/image"
  38. )
  39. func TestClient(t *testing.T) {
  40. result := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins", "ServiceAccount"}, framework.SharedEtcd())
  41. defer result.TearDownFn()
  42. client := clientset.NewForConfigOrDie(result.ClientConfig)
  43. info, err := client.Discovery().ServerVersion()
  44. if err != nil {
  45. t.Fatalf("unexpected error: %v", err)
  46. }
  47. if e, a := version.Get(), *info; !reflect.DeepEqual(e, a) {
  48. t.Errorf("expected %#v, got %#v", e, a)
  49. }
  50. pods, err := client.CoreV1().Pods("default").List(metav1.ListOptions{})
  51. if err != nil {
  52. t.Fatalf("unexpected error: %v", err)
  53. }
  54. if len(pods.Items) != 0 {
  55. t.Errorf("expected no pods, got %#v", pods)
  56. }
  57. // get a validation error
  58. pod := &v1.Pod{
  59. ObjectMeta: metav1.ObjectMeta{
  60. GenerateName: "test",
  61. Namespace: "default",
  62. },
  63. Spec: v1.PodSpec{
  64. Containers: []v1.Container{
  65. {
  66. Name: "test",
  67. },
  68. },
  69. },
  70. }
  71. got, err := client.CoreV1().Pods("default").Create(pod)
  72. if err == nil {
  73. t.Fatalf("unexpected non-error: %v", got)
  74. }
  75. // get a created pod
  76. pod.Spec.Containers[0].Image = "an-image"
  77. got, err = client.CoreV1().Pods("default").Create(pod)
  78. if err != nil {
  79. t.Fatalf("unexpected error: %v", err)
  80. }
  81. if got.Name == "" {
  82. t.Errorf("unexpected empty pod Name %v", got)
  83. }
  84. // pod is shown, but not scheduled
  85. pods, err = client.CoreV1().Pods("default").List(metav1.ListOptions{})
  86. if err != nil {
  87. t.Fatalf("unexpected error: %v", err)
  88. }
  89. if len(pods.Items) != 1 {
  90. t.Errorf("expected one pod, got %#v", pods)
  91. }
  92. actual := pods.Items[0]
  93. if actual.Name != got.Name {
  94. t.Errorf("expected pod %#v, got %#v", got, actual)
  95. }
  96. if actual.Spec.NodeName != "" {
  97. t.Errorf("expected pod to be unscheduled, got %#v", actual)
  98. }
  99. }
  100. func TestAtomicPut(t *testing.T) {
  101. result := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins", "ServiceAccount"}, framework.SharedEtcd())
  102. defer result.TearDownFn()
  103. c := clientset.NewForConfigOrDie(result.ClientConfig)
  104. rcBody := v1.ReplicationController{
  105. TypeMeta: metav1.TypeMeta{
  106. APIVersion: c.CoreV1().RESTClient().APIVersion().String(),
  107. },
  108. ObjectMeta: metav1.ObjectMeta{
  109. Name: "atomicrc",
  110. Namespace: "default",
  111. Labels: map[string]string{
  112. "name": "atomicrc",
  113. },
  114. },
  115. Spec: v1.ReplicationControllerSpec{
  116. Replicas: func(i int32) *int32 { return &i }(0),
  117. Selector: map[string]string{
  118. "foo": "bar",
  119. },
  120. Template: &v1.PodTemplateSpec{
  121. ObjectMeta: metav1.ObjectMeta{
  122. Labels: map[string]string{
  123. "foo": "bar",
  124. },
  125. },
  126. Spec: v1.PodSpec{
  127. Containers: []v1.Container{
  128. {Name: "name", Image: "image"},
  129. },
  130. },
  131. },
  132. },
  133. }
  134. rcs := c.CoreV1().ReplicationControllers("default")
  135. rc, err := rcs.Create(&rcBody)
  136. if err != nil {
  137. t.Fatalf("Failed creating atomicRC: %v", err)
  138. }
  139. testLabels := labels.Set{
  140. "foo": "bar",
  141. }
  142. for i := 0; i < 5; i++ {
  143. // a: z, b: y, etc...
  144. testLabels[string([]byte{byte('a' + i)})] = string([]byte{byte('z' - i)})
  145. }
  146. var wg sync.WaitGroup
  147. wg.Add(len(testLabels))
  148. for label, value := range testLabels {
  149. go func(l, v string) {
  150. defer wg.Done()
  151. for {
  152. tmpRC, err := rcs.Get(rc.Name, metav1.GetOptions{})
  153. if err != nil {
  154. t.Errorf("Error getting atomicRC: %v", err)
  155. continue
  156. }
  157. if tmpRC.Spec.Selector == nil {
  158. tmpRC.Spec.Selector = map[string]string{l: v}
  159. tmpRC.Spec.Template.Labels = map[string]string{l: v}
  160. } else {
  161. tmpRC.Spec.Selector[l] = v
  162. tmpRC.Spec.Template.Labels[l] = v
  163. }
  164. tmpRC, err = rcs.Update(tmpRC)
  165. if err != nil {
  166. if apierrors.IsConflict(err) {
  167. // This is what we expect.
  168. continue
  169. }
  170. t.Errorf("Unexpected error putting atomicRC: %v", err)
  171. continue
  172. }
  173. return
  174. }
  175. }(label, value)
  176. }
  177. wg.Wait()
  178. rc, err = rcs.Get(rc.Name, metav1.GetOptions{})
  179. if err != nil {
  180. t.Fatalf("Failed getting atomicRC after writers are complete: %v", err)
  181. }
  182. if !reflect.DeepEqual(testLabels, labels.Set(rc.Spec.Selector)) {
  183. t.Errorf("Selector PUTs were not atomic: wanted %v, got %v", testLabels, rc.Spec.Selector)
  184. }
  185. }
  186. func TestPatch(t *testing.T) {
  187. result := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins", "ServiceAccount"}, framework.SharedEtcd())
  188. defer result.TearDownFn()
  189. c := clientset.NewForConfigOrDie(result.ClientConfig)
  190. name := "patchpod"
  191. resource := "pods"
  192. podBody := v1.Pod{
  193. TypeMeta: metav1.TypeMeta{
  194. APIVersion: c.CoreV1().RESTClient().APIVersion().String(),
  195. },
  196. ObjectMeta: metav1.ObjectMeta{
  197. Name: name,
  198. Namespace: "default",
  199. Labels: map[string]string{},
  200. },
  201. Spec: v1.PodSpec{
  202. Containers: []v1.Container{
  203. {Name: "name", Image: "image"},
  204. },
  205. },
  206. }
  207. pods := c.CoreV1().Pods("default")
  208. pod, err := pods.Create(&podBody)
  209. if err != nil {
  210. t.Fatalf("Failed creating patchpods: %v", err)
  211. }
  212. patchBodies := map[schema.GroupVersion]map[types.PatchType]struct {
  213. AddLabelBody []byte
  214. RemoveLabelBody []byte
  215. RemoveAllLabelsBody []byte
  216. }{
  217. v1.SchemeGroupVersion: {
  218. types.JSONPatchType: {
  219. []byte(`[{"op":"add","path":"/metadata/labels","value":{"foo":"bar","baz":"qux"}}]`),
  220. []byte(`[{"op":"remove","path":"/metadata/labels/foo"}]`),
  221. []byte(`[{"op":"remove","path":"/metadata/labels"}]`),
  222. },
  223. types.MergePatchType: {
  224. []byte(`{"metadata":{"labels":{"foo":"bar","baz":"qux"}}}`),
  225. []byte(`{"metadata":{"labels":{"foo":null}}}`),
  226. []byte(`{"metadata":{"labels":null}}`),
  227. },
  228. types.StrategicMergePatchType: {
  229. []byte(`{"metadata":{"labels":{"foo":"bar","baz":"qux"}}}`),
  230. []byte(`{"metadata":{"labels":{"foo":null}}}`),
  231. []byte(`{"metadata":{"labels":{"$patch":"replace"}}}`),
  232. },
  233. },
  234. }
  235. pb := patchBodies[c.CoreV1().RESTClient().APIVersion()]
  236. execPatch := func(pt types.PatchType, body []byte) error {
  237. result := c.CoreV1().RESTClient().Patch(pt).
  238. Resource(resource).
  239. Namespace("default").
  240. Name(name).
  241. Body(body).
  242. Do()
  243. if result.Error() != nil {
  244. return result.Error()
  245. }
  246. // trying to chase flakes, this should give us resource versions of objects as we step through
  247. jsonObj, err := result.Raw()
  248. if err != nil {
  249. t.Log(err)
  250. } else {
  251. t.Logf("%v", string(jsonObj))
  252. }
  253. return nil
  254. }
  255. for k, v := range pb {
  256. // add label
  257. err := execPatch(k, v.AddLabelBody)
  258. if err != nil {
  259. t.Fatalf("Failed updating patchpod with patch type %s: %v", k, err)
  260. }
  261. pod, err = pods.Get(name, metav1.GetOptions{})
  262. if err != nil {
  263. t.Fatalf("Failed getting patchpod: %v", err)
  264. }
  265. if len(pod.Labels) != 2 || pod.Labels["foo"] != "bar" || pod.Labels["baz"] != "qux" {
  266. t.Errorf("Failed updating patchpod with patch type %s: labels are: %v", k, pod.Labels)
  267. }
  268. // remove one label
  269. err = execPatch(k, v.RemoveLabelBody)
  270. if err != nil {
  271. t.Fatalf("Failed updating patchpod with patch type %s: %v", k, err)
  272. }
  273. pod, err = pods.Get(name, metav1.GetOptions{})
  274. if err != nil {
  275. t.Fatalf("Failed getting patchpod: %v", err)
  276. }
  277. if len(pod.Labels) != 1 || pod.Labels["baz"] != "qux" {
  278. t.Errorf("Failed updating patchpod with patch type %s: labels are: %v", k, pod.Labels)
  279. }
  280. // remove all labels
  281. err = execPatch(k, v.RemoveAllLabelsBody)
  282. if err != nil {
  283. t.Fatalf("Failed updating patchpod with patch type %s: %v", k, err)
  284. }
  285. pod, err = pods.Get(name, metav1.GetOptions{})
  286. if err != nil {
  287. t.Fatalf("Failed getting patchpod: %v", err)
  288. }
  289. if pod.Labels != nil {
  290. t.Errorf("Failed remove all labels from patchpod with patch type %s: %v", k, pod.Labels)
  291. }
  292. }
  293. }
  294. func TestPatchWithCreateOnUpdate(t *testing.T) {
  295. result := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
  296. defer result.TearDownFn()
  297. c := clientset.NewForConfigOrDie(result.ClientConfig)
  298. endpointTemplate := &v1.Endpoints{
  299. ObjectMeta: metav1.ObjectMeta{
  300. Name: "patchendpoint",
  301. Namespace: "default",
  302. },
  303. Subsets: []v1.EndpointSubset{
  304. {
  305. Addresses: []v1.EndpointAddress{{IP: "1.2.3.4"}},
  306. Ports: []v1.EndpointPort{{Port: 80, Protocol: v1.ProtocolTCP}},
  307. },
  308. },
  309. }
  310. patchEndpoint := func(json []byte) (runtime.Object, error) {
  311. return c.CoreV1().RESTClient().Patch(types.MergePatchType).Resource("endpoints").Namespace("default").Name("patchendpoint").Body(json).Do().Get()
  312. }
  313. // Make sure patch doesn't get to CreateOnUpdate
  314. {
  315. endpointJSON, err := runtime.Encode(legacyscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), endpointTemplate)
  316. if err != nil {
  317. t.Fatalf("Failed creating endpoint JSON: %v", err)
  318. }
  319. if obj, err := patchEndpoint(endpointJSON); !apierrors.IsNotFound(err) {
  320. t.Errorf("Expected notfound creating from patch, got error=%v and object: %#v", err, obj)
  321. }
  322. }
  323. // Create the endpoint (endpoints set AllowCreateOnUpdate=true) to get a UID and resource version
  324. createdEndpoint, err := c.CoreV1().Endpoints("default").Update(endpointTemplate)
  325. if err != nil {
  326. t.Fatalf("Failed creating endpoint: %v", err)
  327. }
  328. // Make sure identity patch is accepted
  329. {
  330. endpointJSON, err := runtime.Encode(legacyscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), createdEndpoint)
  331. if err != nil {
  332. t.Fatalf("Failed creating endpoint JSON: %v", err)
  333. }
  334. if _, err := patchEndpoint(endpointJSON); err != nil {
  335. t.Errorf("Failed patching endpoint: %v", err)
  336. }
  337. }
  338. // Make sure patch complains about a mismatched resourceVersion
  339. {
  340. endpointTemplate.Name = ""
  341. endpointTemplate.UID = ""
  342. endpointTemplate.ResourceVersion = "1"
  343. endpointJSON, err := runtime.Encode(legacyscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), endpointTemplate)
  344. if err != nil {
  345. t.Fatalf("Failed creating endpoint JSON: %v", err)
  346. }
  347. if _, err := patchEndpoint(endpointJSON); !apierrors.IsConflict(err) {
  348. t.Errorf("Expected error, got %#v", err)
  349. }
  350. }
  351. // Make sure patch complains about mutating the UID
  352. {
  353. endpointTemplate.Name = ""
  354. endpointTemplate.UID = "abc"
  355. endpointTemplate.ResourceVersion = ""
  356. endpointJSON, err := runtime.Encode(legacyscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), endpointTemplate)
  357. if err != nil {
  358. t.Fatalf("Failed creating endpoint JSON: %v", err)
  359. }
  360. if _, err := patchEndpoint(endpointJSON); !apierrors.IsInvalid(err) {
  361. t.Errorf("Expected error, got %#v", err)
  362. }
  363. }
  364. // Make sure patch complains about a mismatched name
  365. {
  366. endpointTemplate.Name = "changedname"
  367. endpointTemplate.UID = ""
  368. endpointTemplate.ResourceVersion = ""
  369. endpointJSON, err := runtime.Encode(legacyscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), endpointTemplate)
  370. if err != nil {
  371. t.Fatalf("Failed creating endpoint JSON: %v", err)
  372. }
  373. if _, err := patchEndpoint(endpointJSON); !apierrors.IsBadRequest(err) {
  374. t.Errorf("Expected error, got %#v", err)
  375. }
  376. }
  377. // Make sure patch containing originally submitted JSON is accepted
  378. {
  379. endpointTemplate.Name = ""
  380. endpointTemplate.UID = ""
  381. endpointTemplate.ResourceVersion = ""
  382. endpointJSON, err := runtime.Encode(legacyscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), endpointTemplate)
  383. if err != nil {
  384. t.Fatalf("Failed creating endpoint JSON: %v", err)
  385. }
  386. if _, err := patchEndpoint(endpointJSON); err != nil {
  387. t.Errorf("Failed patching endpoint: %v", err)
  388. }
  389. }
  390. }
  391. func TestAPIVersions(t *testing.T) {
  392. result := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
  393. defer result.TearDownFn()
  394. c := clientset.NewForConfigOrDie(result.ClientConfig)
  395. clientVersion := c.CoreV1().RESTClient().APIVersion().String()
  396. g, err := c.Discovery().ServerGroups()
  397. if err != nil {
  398. t.Fatalf("Failed to get api versions: %v", err)
  399. }
  400. versions := metav1.ExtractGroupVersions(g)
  401. // Verify that the server supports the API version used by the client.
  402. for _, version := range versions {
  403. if version == clientVersion {
  404. return
  405. }
  406. }
  407. t.Errorf("Server does not support APIVersion used by client. Server supported APIVersions: '%v', client APIVersion: '%v'", versions, clientVersion)
  408. }
  409. func TestSingleWatch(t *testing.T) {
  410. result := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
  411. defer result.TearDownFn()
  412. client := clientset.NewForConfigOrDie(result.ClientConfig)
  413. mkEvent := func(i int) *v1.Event {
  414. name := fmt.Sprintf("event-%v", i)
  415. return &v1.Event{
  416. ObjectMeta: metav1.ObjectMeta{
  417. Namespace: "default",
  418. Name: name,
  419. },
  420. InvolvedObject: v1.ObjectReference{
  421. Namespace: "default",
  422. Name: name,
  423. },
  424. Reason: fmt.Sprintf("event %v", i),
  425. }
  426. }
  427. rv1 := ""
  428. for i := 0; i < 10; i++ {
  429. event := mkEvent(i)
  430. got, err := client.CoreV1().Events("default").Create(event)
  431. if err != nil {
  432. t.Fatalf("Failed creating event %#q: %v", event, err)
  433. }
  434. if rv1 == "" {
  435. rv1 = got.ResourceVersion
  436. if rv1 == "" {
  437. t.Fatal("did not get a resource version.")
  438. }
  439. }
  440. t.Logf("Created event %#v", got.ObjectMeta)
  441. }
  442. w, err := client.CoreV1().RESTClient().Get().
  443. Namespace("default").
  444. Resource("events").
  445. VersionedParams(&metav1.ListOptions{
  446. ResourceVersion: rv1,
  447. Watch: true,
  448. FieldSelector: fields.OneTermEqualSelector("metadata.name", "event-9").String(),
  449. }, metav1.ParameterCodec).
  450. Watch()
  451. if err != nil {
  452. t.Fatalf("Failed watch: %v", err)
  453. }
  454. defer w.Stop()
  455. select {
  456. case <-time.After(wait.ForeverTestTimeout):
  457. t.Fatalf("watch took longer than %s", wait.ForeverTestTimeout.String())
  458. case got, ok := <-w.ResultChan():
  459. if !ok {
  460. t.Fatal("Watch channel closed unexpectedly.")
  461. }
  462. // We expect to see an ADD of event-9 and only event-9. (This
  463. // catches a bug where all the events would have been sent down
  464. // the channel.)
  465. if e, a := watch.Added, got.Type; e != a {
  466. t.Errorf("Wanted %v, got %v", e, a)
  467. }
  468. switch o := got.Object.(type) {
  469. case *v1.Event:
  470. if e, a := "event-9", o.Name; e != a {
  471. t.Errorf("Wanted %v, got %v", e, a)
  472. }
  473. default:
  474. t.Fatalf("Unexpected watch event containing object %#q", got)
  475. }
  476. }
  477. }
  478. func TestMultiWatch(t *testing.T) {
  479. // Disable this test as long as it demonstrates a problem.
  480. // TODO: Re-enable this test when we get #6059 resolved.
  481. t.Skip()
  482. const watcherCount = 50
  483. rt.GOMAXPROCS(watcherCount)
  484. result := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
  485. defer result.TearDownFn()
  486. client := clientset.NewForConfigOrDie(result.ClientConfig)
  487. dummyEvent := func(i int) *v1.Event {
  488. name := fmt.Sprintf("unrelated-%v", i)
  489. return &v1.Event{
  490. ObjectMeta: metav1.ObjectMeta{
  491. Name: fmt.Sprintf("%v.%x", name, time.Now().UnixNano()),
  492. Namespace: "default",
  493. },
  494. InvolvedObject: v1.ObjectReference{
  495. Name: name,
  496. Namespace: "default",
  497. },
  498. Reason: fmt.Sprintf("unrelated change %v", i),
  499. }
  500. }
  501. type timePair struct {
  502. t time.Time
  503. name string
  504. }
  505. receivedTimes := make(chan timePair, watcherCount*2)
  506. watchesStarted := sync.WaitGroup{}
  507. // make a bunch of pods and watch them
  508. for i := 0; i < watcherCount; i++ {
  509. watchesStarted.Add(1)
  510. name := fmt.Sprintf("multi-watch-%v", i)
  511. got, err := client.CoreV1().Pods("default").Create(&v1.Pod{
  512. ObjectMeta: metav1.ObjectMeta{
  513. Name: name,
  514. Labels: labels.Set{"watchlabel": name},
  515. },
  516. Spec: v1.PodSpec{
  517. Containers: []v1.Container{{
  518. Name: "pause",
  519. Image: imageutils.GetPauseImageName(),
  520. }},
  521. },
  522. })
  523. if err != nil {
  524. t.Fatalf("Couldn't make %v: %v", name, err)
  525. }
  526. go func(name, rv string) {
  527. options := metav1.ListOptions{
  528. LabelSelector: labels.Set{"watchlabel": name}.AsSelector().String(),
  529. ResourceVersion: rv,
  530. }
  531. w, err := client.CoreV1().Pods("default").Watch(options)
  532. if err != nil {
  533. panic(fmt.Sprintf("watch error for %v: %v", name, err))
  534. }
  535. defer w.Stop()
  536. watchesStarted.Done()
  537. e, ok := <-w.ResultChan() // should get the update (that we'll do below)
  538. if !ok {
  539. panic(fmt.Sprintf("%v ended early?", name))
  540. }
  541. if e.Type != watch.Modified {
  542. panic(fmt.Sprintf("Got unexpected watch notification:\n%v: %+v %+v", name, e, e.Object))
  543. }
  544. receivedTimes <- timePair{time.Now(), name}
  545. }(name, got.ObjectMeta.ResourceVersion)
  546. }
  547. log.Printf("%v: %v pods made and watchers started", time.Now(), watcherCount)
  548. // wait for watches to start before we start spamming the system with
  549. // objects below, otherwise we'll hit the watch window restriction.
  550. watchesStarted.Wait()
  551. const (
  552. useEventsAsUnrelatedType = false
  553. usePodsAsUnrelatedType = true
  554. )
  555. // make a bunch of unrelated changes in parallel
  556. if useEventsAsUnrelatedType {
  557. const unrelatedCount = 3000
  558. var wg sync.WaitGroup
  559. defer wg.Wait()
  560. changeToMake := make(chan int, unrelatedCount*2)
  561. changeMade := make(chan int, unrelatedCount*2)
  562. go func() {
  563. for i := 0; i < unrelatedCount; i++ {
  564. changeToMake <- i
  565. }
  566. close(changeToMake)
  567. }()
  568. for i := 0; i < 50; i++ {
  569. wg.Add(1)
  570. go func() {
  571. defer wg.Done()
  572. for {
  573. i, ok := <-changeToMake
  574. if !ok {
  575. return
  576. }
  577. if _, err := client.CoreV1().Events("default").Create(dummyEvent(i)); err != nil {
  578. panic(fmt.Sprintf("couldn't make an event: %v", err))
  579. }
  580. changeMade <- i
  581. }
  582. }()
  583. }
  584. for i := 0; i < 2000; i++ {
  585. <-changeMade
  586. if (i+1)%50 == 0 {
  587. log.Printf("%v: %v unrelated changes made", time.Now(), i+1)
  588. }
  589. }
  590. }
  591. if usePodsAsUnrelatedType {
  592. const unrelatedCount = 3000
  593. var wg sync.WaitGroup
  594. defer wg.Wait()
  595. changeToMake := make(chan int, unrelatedCount*2)
  596. changeMade := make(chan int, unrelatedCount*2)
  597. go func() {
  598. for i := 0; i < unrelatedCount; i++ {
  599. changeToMake <- i
  600. }
  601. close(changeToMake)
  602. }()
  603. for i := 0; i < 50; i++ {
  604. wg.Add(1)
  605. go func() {
  606. defer wg.Done()
  607. for {
  608. i, ok := <-changeToMake
  609. if !ok {
  610. return
  611. }
  612. name := fmt.Sprintf("unrelated-%v", i)
  613. _, err := client.CoreV1().Pods("default").Create(&v1.Pod{
  614. ObjectMeta: metav1.ObjectMeta{
  615. Name: name,
  616. },
  617. Spec: v1.PodSpec{
  618. Containers: []v1.Container{{
  619. Name: "nothing",
  620. Image: imageutils.GetPauseImageName(),
  621. }},
  622. },
  623. })
  624. if err != nil {
  625. panic(fmt.Sprintf("couldn't make unrelated pod: %v", err))
  626. }
  627. changeMade <- i
  628. }
  629. }()
  630. }
  631. for i := 0; i < 2000; i++ {
  632. <-changeMade
  633. if (i+1)%50 == 0 {
  634. log.Printf("%v: %v unrelated changes made", time.Now(), i+1)
  635. }
  636. }
  637. }
  638. // Now we still have changes being made in parallel, but at least 1000 have been made.
  639. // Make some updates to send down the watches.
  640. sentTimes := make(chan timePair, watcherCount*2)
  641. for i := 0; i < watcherCount; i++ {
  642. go func(i int) {
  643. name := fmt.Sprintf("multi-watch-%v", i)
  644. pod, err := client.CoreV1().Pods("default").Get(name, metav1.GetOptions{})
  645. if err != nil {
  646. panic(fmt.Sprintf("Couldn't get %v: %v", name, err))
  647. }
  648. pod.Spec.Containers[0].Image = imageutils.GetPauseImageName()
  649. sentTimes <- timePair{time.Now(), name}
  650. if _, err := client.CoreV1().Pods("default").Update(pod); err != nil {
  651. panic(fmt.Sprintf("Couldn't make %v: %v", name, err))
  652. }
  653. }(i)
  654. }
  655. sent := map[string]time.Time{}
  656. for i := 0; i < watcherCount; i++ {
  657. tp := <-sentTimes
  658. sent[tp.name] = tp.t
  659. }
  660. log.Printf("all changes made")
  661. dur := map[string]time.Duration{}
  662. for i := 0; i < watcherCount; i++ {
  663. tp := <-receivedTimes
  664. delta := tp.t.Sub(sent[tp.name])
  665. dur[tp.name] = delta
  666. log.Printf("%v: %v", tp.name, delta)
  667. }
  668. log.Printf("all watches ended")
  669. t.Errorf("durations: %v", dur)
  670. }
  671. func runSelfLinkTestOnNamespace(t *testing.T, c clientset.Interface, namespace string) {
  672. podBody := v1.Pod{
  673. ObjectMeta: metav1.ObjectMeta{
  674. Name: "selflinktest",
  675. Namespace: namespace,
  676. Labels: map[string]string{
  677. "name": "selflinktest",
  678. },
  679. },
  680. Spec: v1.PodSpec{
  681. Containers: []v1.Container{
  682. {Name: "name", Image: "image"},
  683. },
  684. },
  685. }
  686. pod, err := c.CoreV1().Pods(namespace).Create(&podBody)
  687. if err != nil {
  688. t.Fatalf("Failed creating selflinktest pod: %v", err)
  689. }
  690. if err = c.CoreV1().RESTClient().Get().RequestURI(pod.SelfLink).Do().Into(pod); err != nil {
  691. t.Errorf("Failed listing pod with supplied self link '%v': %v", pod.SelfLink, err)
  692. }
  693. podList, err := c.CoreV1().Pods(namespace).List(metav1.ListOptions{})
  694. if err != nil {
  695. t.Errorf("Failed listing pods: %v", err)
  696. }
  697. if err = c.CoreV1().RESTClient().Get().RequestURI(podList.SelfLink).Do().Into(podList); err != nil {
  698. t.Errorf("Failed listing pods with supplied self link '%v': %v", podList.SelfLink, err)
  699. }
  700. found := false
  701. for i := range podList.Items {
  702. item := &podList.Items[i]
  703. if item.Name != "selflinktest" {
  704. continue
  705. }
  706. found = true
  707. err = c.CoreV1().RESTClient().Get().RequestURI(item.SelfLink).Do().Into(pod)
  708. if err != nil {
  709. t.Errorf("Failed listing pod with supplied self link '%v': %v", item.SelfLink, err)
  710. }
  711. break
  712. }
  713. if !found {
  714. t.Errorf("never found selflinktest pod in namespace %s", namespace)
  715. }
  716. }
  717. func TestSelfLinkOnNamespace(t *testing.T) {
  718. result := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins", "ServiceAccount"}, framework.SharedEtcd())
  719. defer result.TearDownFn()
  720. c := clientset.NewForConfigOrDie(result.ClientConfig)
  721. runSelfLinkTestOnNamespace(t, c, "default")
  722. }