threshold_notifier_linux.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package eviction
  14. import (
  15. "fmt"
  16. "sync"
  17. "time"
  18. "golang.org/x/sys/unix"
  19. "k8s.io/klog"
  20. )
  21. const (
  22. // eventSize is the number of bytes returned by a successful read from an eventfd
  23. // see http://man7.org/linux/man-pages/man2/eventfd.2.html for more information
  24. eventSize = 8
  25. // numFdEvents is the number of events we can record at once.
  26. // If EpollWait finds more than this, they will be missed.
  27. numFdEvents = 6
  28. )
  29. type linuxCgroupNotifier struct {
  30. eventfd int
  31. epfd int
  32. stop chan struct{}
  33. stopLock sync.Mutex
  34. }
  35. var _ CgroupNotifier = &linuxCgroupNotifier{}
  36. // NewCgroupNotifier returns a linuxCgroupNotifier, which performs cgroup control operations required
  37. // to receive notifications from the cgroup when the threshold is crossed in either direction.
  38. func NewCgroupNotifier(path, attribute string, threshold int64) (CgroupNotifier, error) {
  39. var watchfd, eventfd, epfd, controlfd int
  40. var err error
  41. watchfd, err = unix.Open(fmt.Sprintf("%s/%s", path, attribute), unix.O_RDONLY, 0)
  42. if err != nil {
  43. return nil, err
  44. }
  45. defer unix.Close(watchfd)
  46. controlfd, err = unix.Open(fmt.Sprintf("%s/cgroup.event_control", path), unix.O_WRONLY, 0)
  47. if err != nil {
  48. return nil, err
  49. }
  50. defer unix.Close(controlfd)
  51. eventfd, err = unix.Eventfd(0, unix.EFD_CLOEXEC)
  52. if err != nil {
  53. return nil, err
  54. }
  55. if eventfd < 0 {
  56. err = fmt.Errorf("eventfd call failed")
  57. return nil, err
  58. }
  59. defer func() {
  60. // Close eventfd if we get an error later in initialization
  61. if err != nil {
  62. unix.Close(eventfd)
  63. }
  64. }()
  65. epfd, err = unix.EpollCreate1(0)
  66. if err != nil {
  67. return nil, err
  68. }
  69. if epfd < 0 {
  70. err = fmt.Errorf("EpollCreate1 call failed")
  71. return nil, err
  72. }
  73. defer func() {
  74. // Close epfd if we get an error later in initialization
  75. if err != nil {
  76. unix.Close(epfd)
  77. }
  78. }()
  79. config := fmt.Sprintf("%d %d %d", eventfd, watchfd, threshold)
  80. _, err = unix.Write(controlfd, []byte(config))
  81. if err != nil {
  82. return nil, err
  83. }
  84. return &linuxCgroupNotifier{
  85. eventfd: eventfd,
  86. epfd: epfd,
  87. stop: make(chan struct{}),
  88. }, nil
  89. }
  90. func (n *linuxCgroupNotifier) Start(eventCh chan<- struct{}) {
  91. err := unix.EpollCtl(n.epfd, unix.EPOLL_CTL_ADD, n.eventfd, &unix.EpollEvent{
  92. Fd: int32(n.eventfd),
  93. Events: unix.EPOLLIN,
  94. })
  95. if err != nil {
  96. klog.Warningf("eviction manager: error adding epoll eventfd: %v", err)
  97. return
  98. }
  99. for {
  100. select {
  101. case <-n.stop:
  102. return
  103. default:
  104. }
  105. event, err := wait(n.epfd, n.eventfd, notifierRefreshInterval)
  106. if err != nil {
  107. klog.Warningf("eviction manager: error while waiting for memcg events: %v", err)
  108. return
  109. } else if !event {
  110. // Timeout on wait. This is expected if the threshold was not crossed
  111. continue
  112. }
  113. // Consume the event from the eventfd
  114. buf := make([]byte, eventSize)
  115. _, err = unix.Read(n.eventfd, buf)
  116. if err != nil {
  117. klog.Warningf("eviction manager: error reading memcg events: %v", err)
  118. return
  119. }
  120. eventCh <- struct{}{}
  121. }
  122. }
  123. // wait waits up to notifierRefreshInterval for an event on the Epoll FD for the
  124. // eventfd we are concerned about. It returns an error if one occurrs, and true
  125. // if the consumer should read from the eventfd.
  126. func wait(epfd, eventfd int, timeout time.Duration) (bool, error) {
  127. events := make([]unix.EpollEvent, numFdEvents+1)
  128. timeoutMS := int(timeout / time.Millisecond)
  129. n, err := unix.EpollWait(epfd, events, timeoutMS)
  130. if n == -1 {
  131. if err == unix.EINTR {
  132. // Interrupt, ignore the error
  133. return false, nil
  134. }
  135. return false, err
  136. }
  137. if n == 0 {
  138. // Timeout
  139. return false, nil
  140. }
  141. if n > numFdEvents {
  142. return false, fmt.Errorf("epoll_wait returned more events than we know what to do with")
  143. }
  144. for _, event := range events[:n] {
  145. if event.Fd == int32(eventfd) {
  146. if event.Events&unix.EPOLLHUP != 0 || event.Events&unix.EPOLLERR != 0 || event.Events&unix.EPOLLIN != 0 {
  147. // EPOLLHUP: should not happen, but if it does, treat it as a wakeup.
  148. // EPOLLERR: If an error is waiting on the file descriptor, we should pretend
  149. // something is ready to read, and let unix.Read pick up the error.
  150. // EPOLLIN: There is data to read.
  151. return true, nil
  152. }
  153. }
  154. }
  155. // An event occurred that we don't care about.
  156. return false, nil
  157. }
  158. func (n *linuxCgroupNotifier) Stop() {
  159. n.stopLock.Lock()
  160. defer n.stopLock.Unlock()
  161. select {
  162. case <-n.stop:
  163. // the linuxCgroupNotifier is already stopped
  164. return
  165. default:
  166. }
  167. unix.Close(n.eventfd)
  168. unix.Close(n.epfd)
  169. close(n.stop)
  170. }