watch_based_manager_test.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package manager
  14. import (
  15. "context"
  16. "fmt"
  17. "strings"
  18. "testing"
  19. "time"
  20. "k8s.io/api/core/v1"
  21. apiequality "k8s.io/apimachinery/pkg/api/equality"
  22. apierrors "k8s.io/apimachinery/pkg/api/errors"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/apimachinery/pkg/runtime"
  25. "k8s.io/apimachinery/pkg/util/wait"
  26. "k8s.io/apimachinery/pkg/watch"
  27. utilfeature "k8s.io/apiserver/pkg/util/feature"
  28. clientset "k8s.io/client-go/kubernetes"
  29. "k8s.io/client-go/kubernetes/fake"
  30. core "k8s.io/client-go/testing"
  31. featuregatetesting "k8s.io/component-base/featuregate/testing"
  32. corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
  33. "k8s.io/kubernetes/pkg/features"
  34. "github.com/stretchr/testify/assert"
  35. )
  36. func listSecret(fakeClient clientset.Interface) listObjectFunc {
  37. return func(namespace string, opts metav1.ListOptions) (runtime.Object, error) {
  38. return fakeClient.CoreV1().Secrets(namespace).List(context.TODO(), opts)
  39. }
  40. }
  41. func watchSecret(fakeClient clientset.Interface) watchObjectFunc {
  42. return func(namespace string, opts metav1.ListOptions) (watch.Interface, error) {
  43. return fakeClient.CoreV1().Secrets(namespace).Watch(context.TODO(), opts)
  44. }
  45. }
  46. func isSecretImmutable(object runtime.Object) bool {
  47. if secret, ok := object.(*v1.Secret); ok {
  48. return secret.Immutable != nil && *secret.Immutable
  49. }
  50. return false
  51. }
  52. func newSecretCache(fakeClient clientset.Interface) *objectCache {
  53. return &objectCache{
  54. listObject: listSecret(fakeClient),
  55. watchObject: watchSecret(fakeClient),
  56. newObject: func() runtime.Object { return &v1.Secret{} },
  57. isImmutable: isSecretImmutable,
  58. groupResource: corev1.Resource("secret"),
  59. items: make(map[objectKey]*objectCacheItem),
  60. }
  61. }
  62. func TestSecretCache(t *testing.T) {
  63. fakeClient := &fake.Clientset{}
  64. listReactor := func(a core.Action) (bool, runtime.Object, error) {
  65. result := &v1.SecretList{
  66. ListMeta: metav1.ListMeta{
  67. ResourceVersion: "123",
  68. },
  69. }
  70. return true, result, nil
  71. }
  72. fakeClient.AddReactor("list", "secrets", listReactor)
  73. fakeWatch := watch.NewFake()
  74. fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
  75. store := newSecretCache(fakeClient)
  76. store.AddReference("ns", "name")
  77. _, err := store.Get("ns", "name")
  78. if !apierrors.IsNotFound(err) {
  79. t.Errorf("Expected NotFound error, got: %v", err)
  80. }
  81. // Eventually we should be able to read added secret.
  82. secret := &v1.Secret{
  83. ObjectMeta: metav1.ObjectMeta{Name: "name", Namespace: "ns", ResourceVersion: "125"},
  84. }
  85. fakeWatch.Add(secret)
  86. getFn := func() (bool, error) {
  87. object, err := store.Get("ns", "name")
  88. if err != nil {
  89. if apierrors.IsNotFound(err) {
  90. return false, nil
  91. }
  92. return false, err
  93. }
  94. secret := object.(*v1.Secret)
  95. if secret == nil || secret.Name != "name" || secret.Namespace != "ns" {
  96. return false, fmt.Errorf("unexpected secret: %v", secret)
  97. }
  98. return true, nil
  99. }
  100. if err := wait.PollImmediate(10*time.Millisecond, time.Second, getFn); err != nil {
  101. t.Errorf("unexpected error: %v", err)
  102. }
  103. // Eventually we should observer secret deletion.
  104. fakeWatch.Delete(secret)
  105. getFn = func() (bool, error) {
  106. _, err := store.Get("ns", "name")
  107. if err != nil {
  108. if apierrors.IsNotFound(err) {
  109. return true, nil
  110. }
  111. return false, err
  112. }
  113. return false, nil
  114. }
  115. if err := wait.PollImmediate(10*time.Millisecond, time.Second, getFn); err != nil {
  116. t.Errorf("unexpected error: %v", err)
  117. }
  118. store.DeleteReference("ns", "name")
  119. _, err = store.Get("ns", "name")
  120. if err == nil || !strings.Contains(err.Error(), "not registered") {
  121. t.Errorf("unexpected error: %v", err)
  122. }
  123. }
  124. func TestSecretCacheMultipleRegistrations(t *testing.T) {
  125. fakeClient := &fake.Clientset{}
  126. listReactor := func(a core.Action) (bool, runtime.Object, error) {
  127. result := &v1.SecretList{
  128. ListMeta: metav1.ListMeta{
  129. ResourceVersion: "123",
  130. },
  131. }
  132. return true, result, nil
  133. }
  134. fakeClient.AddReactor("list", "secrets", listReactor)
  135. fakeWatch := watch.NewFake()
  136. fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
  137. store := newSecretCache(fakeClient)
  138. store.AddReference("ns", "name")
  139. // This should trigger List and Watch actions eventually.
  140. actionsFn := func() (bool, error) {
  141. actions := fakeClient.Actions()
  142. if len(actions) > 2 {
  143. return false, fmt.Errorf("too many actions: %v", actions)
  144. }
  145. if len(actions) < 2 {
  146. return false, nil
  147. }
  148. if actions[0].GetVerb() != "list" || actions[1].GetVerb() != "watch" {
  149. return false, fmt.Errorf("unexpected actions: %v", actions)
  150. }
  151. return true, nil
  152. }
  153. if err := wait.PollImmediate(10*time.Millisecond, time.Second, actionsFn); err != nil {
  154. t.Errorf("unexpected error: %v", err)
  155. }
  156. // Next registrations shouldn't trigger any new actions.
  157. for i := 0; i < 20; i++ {
  158. store.AddReference("ns", "name")
  159. store.DeleteReference("ns", "name")
  160. }
  161. actions := fakeClient.Actions()
  162. assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions)
  163. // Final delete also doesn't trigger any action.
  164. store.DeleteReference("ns", "name")
  165. _, err := store.Get("ns", "name")
  166. if err == nil || !strings.Contains(err.Error(), "not registered") {
  167. t.Errorf("unexpected error: %v", err)
  168. }
  169. actions = fakeClient.Actions()
  170. assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions)
  171. }
  172. func TestImmutableSecretStopsTheReflector(t *testing.T) {
  173. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ImmutableEphemeralVolumes, true)()
  174. secret := func(rv string, immutable bool) *v1.Secret {
  175. result := &v1.Secret{
  176. ObjectMeta: metav1.ObjectMeta{
  177. Name: "name",
  178. Namespace: "ns",
  179. ResourceVersion: rv,
  180. },
  181. }
  182. if immutable {
  183. trueVal := true
  184. result.Immutable = &trueVal
  185. }
  186. return result
  187. }
  188. tests := []struct {
  189. desc string
  190. initial *v1.Secret
  191. eventual *v1.Secret
  192. }{
  193. {
  194. desc: "secret doesn't exist, created as mutable",
  195. initial: nil,
  196. eventual: secret("200", false),
  197. },
  198. {
  199. desc: "secret doesn't exist, created as immutable",
  200. initial: nil,
  201. eventual: secret("200", true),
  202. },
  203. {
  204. desc: "mutable secret modified to mutable",
  205. initial: secret("100", false),
  206. eventual: secret("200", false),
  207. },
  208. {
  209. desc: "mutable secret modified to immutable",
  210. initial: secret("100", false),
  211. eventual: secret("200", true),
  212. },
  213. {
  214. desc: "immutable secret",
  215. initial: secret("100", true),
  216. eventual: nil,
  217. },
  218. }
  219. for _, tc := range tests {
  220. t.Run(tc.desc, func(t *testing.T) {
  221. fakeClient := &fake.Clientset{}
  222. listReactor := func(a core.Action) (bool, runtime.Object, error) {
  223. result := &v1.SecretList{
  224. ListMeta: metav1.ListMeta{
  225. ResourceVersion: "100",
  226. },
  227. }
  228. if tc.initial != nil {
  229. result.Items = []v1.Secret{*tc.initial}
  230. }
  231. return true, result, nil
  232. }
  233. fakeClient.AddReactor("list", "secrets", listReactor)
  234. fakeWatch := watch.NewFake()
  235. fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
  236. store := newSecretCache(fakeClient)
  237. key := objectKey{namespace: "ns", name: "name"}
  238. itemExists := func() (bool, error) {
  239. store.lock.Lock()
  240. defer store.lock.Unlock()
  241. _, ok := store.items[key]
  242. return ok, nil
  243. }
  244. reflectorRunning := func() bool {
  245. store.lock.Lock()
  246. defer store.lock.Unlock()
  247. item := store.items[key]
  248. item.lock.Lock()
  249. defer item.lock.Unlock()
  250. select {
  251. case <-item.stopCh:
  252. return false
  253. default:
  254. return true
  255. }
  256. }
  257. // AddReference should start reflector.
  258. store.AddReference("ns", "name")
  259. if err := wait.Poll(10*time.Millisecond, time.Second, itemExists); err != nil {
  260. t.Errorf("item wasn't added to cache")
  261. }
  262. obj, err := store.Get("ns", "name")
  263. if tc.initial != nil {
  264. assert.True(t, apiequality.Semantic.DeepEqual(tc.initial, obj))
  265. } else {
  266. assert.True(t, apierrors.IsNotFound(err))
  267. }
  268. // Reflector should already be stopped for immutable secrets.
  269. assert.Equal(t, tc.initial == nil || !isSecretImmutable(tc.initial), reflectorRunning())
  270. if tc.eventual == nil {
  271. return
  272. }
  273. fakeWatch.Add(tc.eventual)
  274. // Eventually Get should return that secret.
  275. getFn := func() (bool, error) {
  276. object, err := store.Get("ns", "name")
  277. if err != nil {
  278. if apierrors.IsNotFound(err) {
  279. return false, nil
  280. }
  281. return false, err
  282. }
  283. secret := object.(*v1.Secret)
  284. return apiequality.Semantic.DeepEqual(tc.eventual, secret), nil
  285. }
  286. if err := wait.PollImmediate(10*time.Millisecond, time.Second, getFn); err != nil {
  287. t.Errorf("unexpected error: %v", err)
  288. }
  289. // Reflector should already be stopped for immutable secrets.
  290. assert.Equal(t, tc.eventual == nil || !isSecretImmutable(tc.eventual), reflectorRunning())
  291. })
  292. }
  293. }