file_linux_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. // +build linux
  2. /*
  3. Copyright 2016 The Kubernetes Authors.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package config
  15. import (
  16. "fmt"
  17. "io"
  18. "os"
  19. "path/filepath"
  20. "sync"
  21. "testing"
  22. "time"
  23. "k8s.io/api/core/v1"
  24. apiequality "k8s.io/apimachinery/pkg/api/equality"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. "k8s.io/apimachinery/pkg/runtime"
  27. "k8s.io/apimachinery/pkg/types"
  28. "k8s.io/apimachinery/pkg/util/wait"
  29. "k8s.io/kubernetes/pkg/api/legacyscheme"
  30. "k8s.io/kubernetes/pkg/api/testapi"
  31. api "k8s.io/kubernetes/pkg/apis/core"
  32. k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
  33. "k8s.io/kubernetes/pkg/apis/core/validation"
  34. kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
  35. "k8s.io/kubernetes/pkg/securitycontext"
  36. )
  37. func TestExtractFromNonExistentFile(t *testing.T) {
  38. ch := make(chan interface{}, 1)
  39. lw := newSourceFile("/some/fake/file", "localhost", time.Millisecond, ch)
  40. err := lw.doWatch()
  41. if err == nil {
  42. t.Errorf("Expected error")
  43. }
  44. }
  45. func TestUpdateOnNonExistentFile(t *testing.T) {
  46. ch := make(chan interface{})
  47. NewSourceFile("random_non_existent_path", "localhost", time.Millisecond, ch)
  48. select {
  49. case got := <-ch:
  50. update := got.(kubetypes.PodUpdate)
  51. expected := CreatePodUpdate(kubetypes.SET, kubetypes.FileSource)
  52. if !apiequality.Semantic.DeepDerivative(expected, update) {
  53. t.Fatalf("expected %#v, Got %#v", expected, update)
  54. }
  55. case <-time.After(wait.ForeverTestTimeout):
  56. t.Fatalf("expected update, timeout instead")
  57. }
  58. }
  59. func TestReadPodsFromFileExistAlready(t *testing.T) {
  60. hostname := types.NodeName("random-test-hostname")
  61. var testCases = getTestCases(hostname)
  62. for _, testCase := range testCases {
  63. func() {
  64. dirName, err := mkTempDir("file-test")
  65. if err != nil {
  66. t.Fatalf("unable to create temp dir: %v", err)
  67. }
  68. defer os.RemoveAll(dirName)
  69. file := testCase.writeToFile(dirName, "test_pod_manifest", t)
  70. ch := make(chan interface{})
  71. NewSourceFile(file, hostname, time.Millisecond, ch)
  72. select {
  73. case got := <-ch:
  74. update := got.(kubetypes.PodUpdate)
  75. for _, pod := range update.Pods {
  76. // TODO: remove the conversion when validation is performed on versioned objects.
  77. internalPod := &api.Pod{}
  78. if err := k8s_api_v1.Convert_v1_Pod_To_core_Pod(pod, internalPod, nil); err != nil {
  79. t.Fatalf("%s: Cannot convert pod %#v, %#v", testCase.desc, pod, err)
  80. }
  81. if errs := validation.ValidatePod(internalPod); len(errs) > 0 {
  82. t.Fatalf("%s: Invalid pod %#v, %#v", testCase.desc, internalPod, errs)
  83. }
  84. }
  85. if !apiequality.Semantic.DeepEqual(testCase.expected, update) {
  86. t.Fatalf("%s: Expected %#v, Got %#v", testCase.desc, testCase.expected, update)
  87. }
  88. case <-time.After(wait.ForeverTestTimeout):
  89. t.Fatalf("%s: Expected update, timeout instead", testCase.desc)
  90. }
  91. }()
  92. }
  93. }
  94. var (
  95. testCases = []struct {
  96. watchDir bool
  97. symlink bool
  98. }{
  99. {true, true},
  100. {true, false},
  101. {false, true},
  102. {false, false},
  103. }
  104. )
  105. func TestWatchFileAdded(t *testing.T) {
  106. for _, testCase := range testCases {
  107. watchFileAdded(testCase.watchDir, testCase.symlink, t)
  108. }
  109. }
  110. func TestWatchFileChanged(t *testing.T) {
  111. for _, testCase := range testCases {
  112. watchFileChanged(testCase.watchDir, testCase.symlink, t)
  113. }
  114. }
  115. type testCase struct {
  116. lock *sync.Mutex
  117. desc string
  118. linkedFile string
  119. pod runtime.Object
  120. expected kubetypes.PodUpdate
  121. }
  122. func getTestCases(hostname types.NodeName) []*testCase {
  123. grace := int64(30)
  124. enableServiceLinks := v1.DefaultEnableServiceLinks
  125. return []*testCase{
  126. {
  127. lock: &sync.Mutex{},
  128. desc: "Simple pod",
  129. pod: &v1.Pod{
  130. TypeMeta: metav1.TypeMeta{
  131. Kind: "Pod",
  132. APIVersion: "",
  133. },
  134. ObjectMeta: metav1.ObjectMeta{
  135. Name: "test",
  136. UID: "12345",
  137. Namespace: "mynamespace",
  138. },
  139. Spec: v1.PodSpec{
  140. Containers: []v1.Container{{Name: "image", Image: "test/image", SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults()}},
  141. SecurityContext: &v1.PodSecurityContext{},
  142. SchedulerName: api.DefaultSchedulerName,
  143. },
  144. Status: v1.PodStatus{
  145. Phase: v1.PodPending,
  146. },
  147. },
  148. expected: CreatePodUpdate(kubetypes.SET, kubetypes.FileSource, &v1.Pod{
  149. ObjectMeta: metav1.ObjectMeta{
  150. Name: "test-" + string(hostname),
  151. UID: "12345",
  152. Namespace: "mynamespace",
  153. Annotations: map[string]string{kubetypes.ConfigHashAnnotationKey: "12345"},
  154. SelfLink: getSelfLink("test-"+string(hostname), "mynamespace"),
  155. },
  156. Spec: v1.PodSpec{
  157. NodeName: string(hostname),
  158. RestartPolicy: v1.RestartPolicyAlways,
  159. DNSPolicy: v1.DNSClusterFirst,
  160. TerminationGracePeriodSeconds: &grace,
  161. Tolerations: []v1.Toleration{{
  162. Operator: "Exists",
  163. Effect: "NoExecute",
  164. }},
  165. Containers: []v1.Container{{
  166. Name: "image",
  167. Image: "test/image",
  168. TerminationMessagePath: "/dev/termination-log",
  169. ImagePullPolicy: "Always",
  170. SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(),
  171. TerminationMessagePolicy: v1.TerminationMessageReadFile,
  172. }},
  173. SecurityContext: &v1.PodSecurityContext{},
  174. SchedulerName: api.DefaultSchedulerName,
  175. EnableServiceLinks: &enableServiceLinks,
  176. },
  177. Status: v1.PodStatus{
  178. Phase: v1.PodPending,
  179. },
  180. }),
  181. },
  182. }
  183. }
  184. func (tc *testCase) writeToFile(dir, name string, t *testing.T) string {
  185. var versionedPod runtime.Object
  186. err := legacyscheme.Scheme.Convert(&tc.pod, &versionedPod, nil)
  187. if err != nil {
  188. t.Fatalf("%s: error in versioning the pod: %v", tc.desc, err)
  189. }
  190. fileContents, err := runtime.Encode(testapi.Default.Codec(), versionedPod)
  191. if err != nil {
  192. t.Fatalf("%s: error in encoding the pod: %v", tc.desc, err)
  193. }
  194. fileName := filepath.Join(dir, name)
  195. if err := writeFile(fileName, []byte(fileContents)); err != nil {
  196. t.Fatalf("unable to write test file %#v", err)
  197. }
  198. return fileName
  199. }
  200. func createSymbolicLink(link, target, name string, t *testing.T) string {
  201. linkName := filepath.Join(link, name)
  202. linkedFile := filepath.Join(target, name)
  203. err := os.Symlink(linkedFile, linkName)
  204. if err != nil {
  205. t.Fatalf("unexpected error when create symbolic link: %v", err)
  206. }
  207. return linkName
  208. }
  209. func watchFileAdded(watchDir bool, symlink bool, t *testing.T) {
  210. hostname := types.NodeName("random-test-hostname")
  211. var testCases = getTestCases(hostname)
  212. fileNamePre := "test_pod_manifest"
  213. for index, testCase := range testCases {
  214. func() {
  215. dirName, err := mkTempDir("dir-test")
  216. if err != nil {
  217. t.Fatalf("unable to create temp dir: %v", err)
  218. }
  219. defer removeAll(dirName, t)
  220. fileName := fmt.Sprintf("%s_%d", fileNamePre, index)
  221. var linkedDirName string
  222. if symlink {
  223. linkedDirName, err = mkTempDir("linked-dir-test")
  224. if err != nil {
  225. t.Fatalf("unable to create temp dir for linked files: %v", err)
  226. }
  227. defer removeAll(linkedDirName, t)
  228. createSymbolicLink(dirName, linkedDirName, fileName, t)
  229. }
  230. ch := make(chan interface{})
  231. if watchDir {
  232. NewSourceFile(dirName, hostname, 100*time.Millisecond, ch)
  233. } else {
  234. NewSourceFile(filepath.Join(dirName, fileName), hostname, 100*time.Millisecond, ch)
  235. }
  236. expectEmptyUpdate(t, ch)
  237. addFile := func() {
  238. // Add a file
  239. if symlink {
  240. testCase.writeToFile(linkedDirName, fileName, t)
  241. return
  242. }
  243. testCase.writeToFile(dirName, fileName, t)
  244. }
  245. go addFile()
  246. // For !watchDir: expect an update by SourceFile.reloadConfig().
  247. // For watchDir: expect at least one update from CREATE & MODIFY inotify event.
  248. // Shouldn't expect two updates from CREATE & MODIFY because CREATE doesn't guarantee file written.
  249. // In that case no update will be sent from CREATE event.
  250. expectUpdate(t, ch, testCase)
  251. }()
  252. }
  253. }
  254. func watchFileChanged(watchDir bool, symlink bool, t *testing.T) {
  255. hostname := types.NodeName("random-test-hostname")
  256. var testCases = getTestCases(hostname)
  257. fileNamePre := "test_pod_manifest"
  258. for index, testCase := range testCases {
  259. func() {
  260. dirName, err := mkTempDir("dir-test")
  261. fileName := fmt.Sprintf("%s_%d", fileNamePre, index)
  262. if err != nil {
  263. t.Fatalf("unable to create temp dir: %v", err)
  264. }
  265. defer removeAll(dirName, t)
  266. var linkedDirName string
  267. if symlink {
  268. linkedDirName, err = mkTempDir("linked-dir-test")
  269. if err != nil {
  270. t.Fatalf("unable to create temp dir for linked files: %v", err)
  271. }
  272. defer removeAll(linkedDirName, t)
  273. createSymbolicLink(dirName, linkedDirName, fileName, t)
  274. }
  275. var file string
  276. ch := make(chan interface{})
  277. func() {
  278. testCase.lock.Lock()
  279. defer testCase.lock.Unlock()
  280. if symlink {
  281. file = testCase.writeToFile(linkedDirName, fileName, t)
  282. return
  283. }
  284. file = testCase.writeToFile(dirName, fileName, t)
  285. }()
  286. if watchDir {
  287. NewSourceFile(dirName, hostname, 100*time.Millisecond, ch)
  288. } else {
  289. NewSourceFile(file, hostname, 100*time.Millisecond, ch)
  290. }
  291. // expect an update by SourceFile.resetStoreFromPath()
  292. expectUpdate(t, ch, testCase)
  293. changeFile := func() {
  294. // Edit the file content
  295. testCase.lock.Lock()
  296. defer testCase.lock.Unlock()
  297. pod := testCase.pod.(*v1.Pod)
  298. pod.Spec.Containers[0].Name = "image2"
  299. testCase.expected.Pods[0].Spec.Containers[0].Name = "image2"
  300. if symlink {
  301. file = testCase.writeToFile(linkedDirName, fileName, t)
  302. return
  303. }
  304. file = testCase.writeToFile(dirName, fileName, t)
  305. }
  306. go changeFile()
  307. // expect an update by MODIFY inotify event
  308. expectUpdate(t, ch, testCase)
  309. if watchDir {
  310. go changeFileName(dirName, fileName, fileName+"_ch", t)
  311. // expect an update by MOVED_FROM inotify event cause changing file name
  312. expectEmptyUpdate(t, ch)
  313. // expect an update by MOVED_TO inotify event cause changing file name
  314. expectUpdate(t, ch, testCase)
  315. }
  316. }()
  317. }
  318. }
  319. func expectUpdate(t *testing.T, ch chan interface{}, testCase *testCase) {
  320. timer := time.After(5 * time.Second)
  321. for {
  322. select {
  323. case got := <-ch:
  324. update := got.(kubetypes.PodUpdate)
  325. if len(update.Pods) == 0 {
  326. // filter out the empty updates from reading a non-existing path
  327. continue
  328. }
  329. for _, pod := range update.Pods {
  330. // TODO: remove the conversion when validation is performed on versioned objects.
  331. internalPod := &api.Pod{}
  332. if err := k8s_api_v1.Convert_v1_Pod_To_core_Pod(pod, internalPod, nil); err != nil {
  333. t.Fatalf("%s: Cannot convert pod %#v, %#v", testCase.desc, pod, err)
  334. }
  335. if errs := validation.ValidatePod(internalPod); len(errs) > 0 {
  336. t.Fatalf("%s: Invalid pod %#v, %#v", testCase.desc, internalPod, errs)
  337. }
  338. }
  339. testCase.lock.Lock()
  340. defer testCase.lock.Unlock()
  341. if !apiequality.Semantic.DeepEqual(testCase.expected, update) {
  342. t.Fatalf("%s: Expected: %#v, Got: %#v", testCase.desc, testCase.expected, update)
  343. }
  344. return
  345. case <-timer:
  346. t.Fatalf("%s: Expected update, timeout instead", testCase.desc)
  347. }
  348. }
  349. }
  350. func expectEmptyUpdate(t *testing.T, ch chan interface{}) {
  351. timer := time.After(5 * time.Second)
  352. for {
  353. select {
  354. case got := <-ch:
  355. update := got.(kubetypes.PodUpdate)
  356. if len(update.Pods) != 0 {
  357. t.Fatalf("expected empty update, got %#v", update)
  358. }
  359. return
  360. case <-timer:
  361. t.Fatalf("expected empty update, timeout instead")
  362. }
  363. }
  364. }
  365. func writeFile(filename string, data []byte) error {
  366. f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE, 0666)
  367. if err != nil {
  368. return err
  369. }
  370. n, err := f.Write(data)
  371. if err == nil && n < len(data) {
  372. err = io.ErrShortWrite
  373. }
  374. if err1 := f.Close(); err == nil {
  375. err = err1
  376. }
  377. return err
  378. }
  379. func changeFileName(dir, from, to string, t *testing.T) {
  380. fromPath := filepath.Join(dir, from)
  381. toPath := filepath.Join(dir, to)
  382. if err := os.Rename(fromPath, toPath); err != nil {
  383. t.Errorf("Fail to change file name: %s", err)
  384. }
  385. }