watcher.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. // Copyright 2014 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // Package container defines types for sub-container events and also
  15. // defines an interface for container operation handlers.
  16. package raw
  17. import (
  18. "fmt"
  19. "io/ioutil"
  20. "os"
  21. "path"
  22. "strings"
  23. "github.com/google/cadvisor/container/common"
  24. "github.com/google/cadvisor/container/libcontainer"
  25. "github.com/google/cadvisor/watcher"
  26. inotify "github.com/sigma/go-inotify"
  27. "k8s.io/klog"
  28. )
  29. type rawContainerWatcher struct {
  30. // Absolute path to the root of the cgroup hierarchies
  31. cgroupPaths map[string]string
  32. cgroupSubsystems *libcontainer.CgroupSubsystems
  33. // Inotify event watcher.
  34. watcher *common.InotifyWatcher
  35. // Signal for watcher thread to stop.
  36. stopWatcher chan error
  37. }
  38. func NewRawContainerWatcher() (watcher.ContainerWatcher, error) {
  39. cgroupSubsystems, err := libcontainer.GetAllCgroupSubsystems()
  40. if err != nil {
  41. return nil, fmt.Errorf("failed to get cgroup subsystems: %v", err)
  42. }
  43. if len(cgroupSubsystems.Mounts) == 0 {
  44. return nil, fmt.Errorf("failed to find supported cgroup mounts for the raw factory")
  45. }
  46. watcher, err := common.NewInotifyWatcher()
  47. if err != nil {
  48. return nil, err
  49. }
  50. rawWatcher := &rawContainerWatcher{
  51. cgroupPaths: common.MakeCgroupPaths(cgroupSubsystems.MountPoints, "/"),
  52. cgroupSubsystems: &cgroupSubsystems,
  53. watcher: watcher,
  54. stopWatcher: make(chan error),
  55. }
  56. return rawWatcher, nil
  57. }
  58. func (self *rawContainerWatcher) Start(events chan watcher.ContainerEvent) error {
  59. // Watch this container (all its cgroups) and all subdirectories.
  60. for _, cgroupPath := range self.cgroupPaths {
  61. _, err := self.watchDirectory(events, cgroupPath, "/")
  62. if err != nil {
  63. return err
  64. }
  65. }
  66. // Process the events received from the kernel.
  67. go func() {
  68. for {
  69. select {
  70. case event := <-self.watcher.Event():
  71. err := self.processEvent(event, events)
  72. if err != nil {
  73. klog.Warningf("Error while processing event (%+v): %v", event, err)
  74. }
  75. case err := <-self.watcher.Error():
  76. klog.Warningf("Error while watching %q: %v", "/", err)
  77. case <-self.stopWatcher:
  78. err := self.watcher.Close()
  79. if err == nil {
  80. self.stopWatcher <- err
  81. return
  82. }
  83. }
  84. }
  85. }()
  86. return nil
  87. }
  88. func (self *rawContainerWatcher) Stop() error {
  89. // Rendezvous with the watcher thread.
  90. self.stopWatcher <- nil
  91. return <-self.stopWatcher
  92. }
  93. // Watches the specified directory and all subdirectories. Returns whether the path was
  94. // already being watched and an error (if any).
  95. func (self *rawContainerWatcher) watchDirectory(events chan watcher.ContainerEvent, dir string, containerName string) (bool, error) {
  96. // Don't watch .mount cgroups because they never have containers as sub-cgroups. A single container
  97. // can have many .mount cgroups associated with it which can quickly exhaust the inotify watches on a node.
  98. if strings.HasSuffix(containerName, ".mount") {
  99. return false, nil
  100. }
  101. alreadyWatching, err := self.watcher.AddWatch(containerName, dir)
  102. if err != nil {
  103. return alreadyWatching, err
  104. }
  105. // Remove the watch if further operations failed.
  106. cleanup := true
  107. defer func() {
  108. if cleanup {
  109. _, err := self.watcher.RemoveWatch(containerName, dir)
  110. if err != nil {
  111. klog.Warningf("Failed to remove inotify watch for %q: %v", dir, err)
  112. }
  113. }
  114. }()
  115. // TODO(vmarmol): We should re-do this once we're done to ensure directories were not added in the meantime.
  116. // Watch subdirectories as well.
  117. entries, err := ioutil.ReadDir(dir)
  118. if err != nil {
  119. return alreadyWatching, err
  120. }
  121. for _, entry := range entries {
  122. if entry.IsDir() {
  123. entryPath := path.Join(dir, entry.Name())
  124. subcontainerName := path.Join(containerName, entry.Name())
  125. alreadyWatchingSubDir, err := self.watchDirectory(events, entryPath, subcontainerName)
  126. if err != nil {
  127. klog.Errorf("Failed to watch directory %q: %v", entryPath, err)
  128. if os.IsNotExist(err) {
  129. // The directory may have been removed before watching. Try to watch the other
  130. // subdirectories. (https://github.com/kubernetes/kubernetes/issues/28997)
  131. continue
  132. }
  133. return alreadyWatching, err
  134. }
  135. // since we already missed the creation event for this directory, publish an event here.
  136. if !alreadyWatchingSubDir {
  137. go func() {
  138. events <- watcher.ContainerEvent{
  139. EventType: watcher.ContainerAdd,
  140. Name: subcontainerName,
  141. WatchSource: watcher.Raw,
  142. }
  143. }()
  144. }
  145. }
  146. }
  147. cleanup = false
  148. return alreadyWatching, nil
  149. }
  150. func (self *rawContainerWatcher) processEvent(event *inotify.Event, events chan watcher.ContainerEvent) error {
  151. // Convert the inotify event type to a container create or delete.
  152. var eventType watcher.ContainerEventType
  153. switch {
  154. case (event.Mask & inotify.IN_CREATE) > 0:
  155. eventType = watcher.ContainerAdd
  156. case (event.Mask & inotify.IN_DELETE) > 0:
  157. eventType = watcher.ContainerDelete
  158. case (event.Mask & inotify.IN_MOVED_FROM) > 0:
  159. eventType = watcher.ContainerDelete
  160. case (event.Mask & inotify.IN_MOVED_TO) > 0:
  161. eventType = watcher.ContainerAdd
  162. default:
  163. // Ignore other events.
  164. return nil
  165. }
  166. // Derive the container name from the path name.
  167. var containerName string
  168. for _, mount := range self.cgroupSubsystems.Mounts {
  169. mountLocation := path.Clean(mount.Mountpoint) + "/"
  170. if strings.HasPrefix(event.Name, mountLocation) {
  171. containerName = event.Name[len(mountLocation)-1:]
  172. break
  173. }
  174. }
  175. if containerName == "" {
  176. return fmt.Errorf("unable to detect container from watch event on directory %q", event.Name)
  177. }
  178. // Maintain the watch for the new or deleted container.
  179. switch eventType {
  180. case watcher.ContainerAdd:
  181. // New container was created, watch it.
  182. alreadyWatched, err := self.watchDirectory(events, event.Name, containerName)
  183. if err != nil {
  184. return err
  185. }
  186. // Only report container creation once.
  187. if alreadyWatched {
  188. return nil
  189. }
  190. case watcher.ContainerDelete:
  191. // Container was deleted, stop watching for it.
  192. lastWatched, err := self.watcher.RemoveWatch(containerName, event.Name)
  193. if err != nil {
  194. return err
  195. }
  196. // Only report container deletion once.
  197. if !lastWatched {
  198. return nil
  199. }
  200. default:
  201. return fmt.Errorf("unknown event type %v", eventType)
  202. }
  203. // Deliver the event.
  204. events <- watcher.ContainerEvent{
  205. EventType: eventType,
  206. Name: containerName,
  207. WatchSource: watcher.Raw,
  208. }
  209. return nil
  210. }