graceful_termination.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package ipvs
  14. import (
  15. "fmt"
  16. "strings"
  17. "sync"
  18. "time"
  19. "k8s.io/apimachinery/pkg/util/wait"
  20. "k8s.io/klog"
  21. utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
  22. )
  23. const (
  24. rsGracefulDeletePeriod = 15 * time.Minute
  25. rsCheckDeleteInterval = 1 * time.Minute
  26. )
  27. // listItem stores real server information and the process time.
  28. // If nothing special happened, real server will be delete after process time.
  29. type listItem struct {
  30. VirtualServer *utilipvs.VirtualServer
  31. RealServer *utilipvs.RealServer
  32. }
  33. // String return the unique real server name(with virtual server information)
  34. func (g *listItem) String() string {
  35. return GetUniqueRSName(g.VirtualServer, g.RealServer)
  36. }
  37. // GetUniqueRSName return a string type unique rs name with vs information
  38. func GetUniqueRSName(vs *utilipvs.VirtualServer, rs *utilipvs.RealServer) string {
  39. return vs.String() + "/" + rs.String()
  40. }
  41. type graceTerminateRSList struct {
  42. lock sync.Mutex
  43. list map[string]*listItem
  44. }
  45. // add push an new element to the rsList
  46. func (q *graceTerminateRSList) add(rs *listItem) bool {
  47. q.lock.Lock()
  48. defer q.lock.Unlock()
  49. uniqueRS := rs.String()
  50. if _, ok := q.list[uniqueRS]; ok {
  51. return false
  52. }
  53. klog.V(5).Infof("Adding rs %v to graceful delete rsList", rs)
  54. q.list[uniqueRS] = rs
  55. return true
  56. }
  57. // remove remove an element from the rsList
  58. func (q *graceTerminateRSList) remove(rs *listItem) bool {
  59. q.lock.Lock()
  60. defer q.lock.Unlock()
  61. uniqueRS := rs.String()
  62. if _, ok := q.list[uniqueRS]; ok {
  63. delete(q.list, uniqueRS)
  64. return true
  65. }
  66. return false
  67. }
  68. func (q *graceTerminateRSList) flushList(handler func(rsToDelete *listItem) (bool, error)) bool {
  69. success := true
  70. for name, rs := range q.list {
  71. deleted, err := handler(rs)
  72. if err != nil {
  73. klog.Errorf("Try delete rs %q err: %v", name, err)
  74. success = false
  75. }
  76. if deleted {
  77. klog.Infof("lw: remote out of the list: %s", name)
  78. q.remove(rs)
  79. }
  80. }
  81. return success
  82. }
  83. // exist check whether the specified unique RS is in the rsList
  84. func (q *graceTerminateRSList) exist(uniqueRS string) (*listItem, bool) {
  85. q.lock.Lock()
  86. defer q.lock.Unlock()
  87. if rs, ok := q.list[uniqueRS]; ok {
  88. return rs, true
  89. }
  90. return nil, false
  91. }
  92. // GracefulTerminationManager manage rs graceful termination information and do graceful termination work
  93. // rsList is the rs list to graceful termination, ipvs is the ipvsinterface to do ipvs delete/update work
  94. type GracefulTerminationManager struct {
  95. rsList graceTerminateRSList
  96. ipvs utilipvs.Interface
  97. }
  98. // NewGracefulTerminationManager create a gracefulTerminationManager to manage ipvs rs graceful termination work
  99. func NewGracefulTerminationManager(ipvs utilipvs.Interface) *GracefulTerminationManager {
  100. l := make(map[string]*listItem)
  101. return &GracefulTerminationManager{
  102. rsList: graceTerminateRSList{
  103. list: l,
  104. },
  105. ipvs: ipvs,
  106. }
  107. }
  108. // InTerminationList to check whether specified unique rs name is in graceful termination list
  109. func (m *GracefulTerminationManager) InTerminationList(uniqueRS string) bool {
  110. _, exist := m.rsList.exist(uniqueRS)
  111. return exist
  112. }
  113. // GracefulDeleteRS to update rs weight to 0, and add rs to graceful terminate list
  114. func (m *GracefulTerminationManager) GracefulDeleteRS(vs *utilipvs.VirtualServer, rs *utilipvs.RealServer) error {
  115. // Try to delete rs before add it to graceful delete list
  116. ele := &listItem{
  117. VirtualServer: vs,
  118. RealServer: rs,
  119. }
  120. deleted, err := m.deleteRsFunc(ele)
  121. if err != nil {
  122. klog.Errorf("Delete rs %q err: %v", ele.String(), err)
  123. }
  124. if deleted {
  125. return nil
  126. }
  127. rs.Weight = 0
  128. err = m.ipvs.UpdateRealServer(vs, rs)
  129. if err != nil {
  130. return err
  131. }
  132. klog.V(5).Infof("Adding an element to graceful delete rsList: %+v", ele)
  133. m.rsList.add(ele)
  134. return nil
  135. }
  136. func (m *GracefulTerminationManager) deleteRsFunc(rsToDelete *listItem) (bool, error) {
  137. klog.V(2).Infof("Trying to delete rs: %s", rsToDelete.String())
  138. rss, err := m.ipvs.GetRealServers(rsToDelete.VirtualServer)
  139. if err != nil {
  140. return false, err
  141. }
  142. for _, rs := range rss {
  143. if rsToDelete.RealServer.Equal(rs) {
  144. // For UDP traffic, no graceful termination, we immediately delete the RS
  145. // (existing connections will be deleted on the next packet because sysctlExpireNoDestConn=1)
  146. // For other protocols, don't delete until all connections have expired)
  147. if strings.ToUpper(rsToDelete.VirtualServer.Protocol) != "UDP" && rs.ActiveConn+rs.InactiveConn != 0 {
  148. klog.Infof("Not deleting, RS %v: %v ActiveConn, %v InactiveConn", rsToDelete.String(), rs.ActiveConn, rs.InactiveConn)
  149. return false, nil
  150. }
  151. klog.V(2).Infof("Deleting rs: %s", rsToDelete.String())
  152. err := m.ipvs.DeleteRealServer(rsToDelete.VirtualServer, rs)
  153. if err != nil {
  154. return false, fmt.Errorf("Delete destination %q err: %v", rs.String(), err)
  155. }
  156. return true, nil
  157. }
  158. }
  159. return true, fmt.Errorf("Failed to delete rs %q, can't find the real server", rsToDelete.String())
  160. }
  161. func (m *GracefulTerminationManager) tryDeleteRs() {
  162. if !m.rsList.flushList(m.deleteRsFunc) {
  163. klog.Errorf("Try flush graceful termination list err")
  164. }
  165. }
  166. // MoveRSOutofGracefulDeleteList to delete an rs and remove it from the rsList immediately
  167. func (m *GracefulTerminationManager) MoveRSOutofGracefulDeleteList(uniqueRS string) error {
  168. rsToDelete, find := m.rsList.exist(uniqueRS)
  169. if !find || rsToDelete == nil {
  170. return fmt.Errorf("failed to find rs: %q", uniqueRS)
  171. }
  172. err := m.ipvs.DeleteRealServer(rsToDelete.VirtualServer, rsToDelete.RealServer)
  173. if err != nil {
  174. return err
  175. }
  176. m.rsList.remove(rsToDelete)
  177. return nil
  178. }
  179. // Run start a goroutine to try to delete rs in the graceful delete rsList with an interval 1 minute
  180. func (m *GracefulTerminationManager) Run() {
  181. go wait.Until(m.tryDeleteRs, rsCheckDeleteInterval, wait.NeverStop)
  182. }