graceful_termination.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package ipvs
  14. import (
  15. "fmt"
  16. "strings"
  17. "sync"
  18. "time"
  19. "k8s.io/apimachinery/pkg/util/wait"
  20. "k8s.io/klog"
  21. utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
  22. )
  23. const (
  24. rsCheckDeleteInterval = 1 * time.Minute
  25. )
  26. // listItem stores real server information and the process time.
  27. // If nothing special happened, real server will be delete after process time.
  28. type listItem struct {
  29. VirtualServer *utilipvs.VirtualServer
  30. RealServer *utilipvs.RealServer
  31. }
  32. // String return the unique real server name(with virtual server information)
  33. func (g *listItem) String() string {
  34. return GetUniqueRSName(g.VirtualServer, g.RealServer)
  35. }
  36. // GetUniqueRSName return a string type unique rs name with vs information
  37. func GetUniqueRSName(vs *utilipvs.VirtualServer, rs *utilipvs.RealServer) string {
  38. return vs.String() + "/" + rs.String()
  39. }
  40. type graceTerminateRSList struct {
  41. lock sync.Mutex
  42. list map[string]*listItem
  43. }
  44. // add push an new element to the rsList
  45. func (q *graceTerminateRSList) add(rs *listItem) bool {
  46. q.lock.Lock()
  47. defer q.lock.Unlock()
  48. uniqueRS := rs.String()
  49. if _, ok := q.list[uniqueRS]; ok {
  50. return false
  51. }
  52. klog.V(5).Infof("Adding rs %v to graceful delete rsList", rs)
  53. q.list[uniqueRS] = rs
  54. return true
  55. }
  56. // remove remove an element from the rsList
  57. func (q *graceTerminateRSList) remove(rs *listItem) bool {
  58. q.lock.Lock()
  59. defer q.lock.Unlock()
  60. uniqueRS := rs.String()
  61. if _, ok := q.list[uniqueRS]; ok {
  62. delete(q.list, uniqueRS)
  63. return true
  64. }
  65. return false
  66. }
  67. func (q *graceTerminateRSList) flushList(handler func(rsToDelete *listItem) (bool, error)) bool {
  68. success := true
  69. for name, rs := range q.list {
  70. deleted, err := handler(rs)
  71. if err != nil {
  72. klog.Errorf("Try delete rs %q err: %v", name, err)
  73. success = false
  74. }
  75. if deleted {
  76. klog.Infof("lw: remote out of the list: %s", name)
  77. q.remove(rs)
  78. }
  79. }
  80. return success
  81. }
  82. // exist check whether the specified unique RS is in the rsList
  83. func (q *graceTerminateRSList) exist(uniqueRS string) (*listItem, bool) {
  84. q.lock.Lock()
  85. defer q.lock.Unlock()
  86. if rs, ok := q.list[uniqueRS]; ok {
  87. return rs, true
  88. }
  89. return nil, false
  90. }
  91. // GracefulTerminationManager manage rs graceful termination information and do graceful termination work
  92. // rsList is the rs list to graceful termination, ipvs is the ipvsinterface to do ipvs delete/update work
  93. type GracefulTerminationManager struct {
  94. rsList graceTerminateRSList
  95. ipvs utilipvs.Interface
  96. }
  97. // NewGracefulTerminationManager create a gracefulTerminationManager to manage ipvs rs graceful termination work
  98. func NewGracefulTerminationManager(ipvs utilipvs.Interface) *GracefulTerminationManager {
  99. l := make(map[string]*listItem)
  100. return &GracefulTerminationManager{
  101. rsList: graceTerminateRSList{
  102. list: l,
  103. },
  104. ipvs: ipvs,
  105. }
  106. }
  107. // InTerminationList to check whether specified unique rs name is in graceful termination list
  108. func (m *GracefulTerminationManager) InTerminationList(uniqueRS string) bool {
  109. _, exist := m.rsList.exist(uniqueRS)
  110. return exist
  111. }
  112. // GracefulDeleteRS to update rs weight to 0, and add rs to graceful terminate list
  113. func (m *GracefulTerminationManager) GracefulDeleteRS(vs *utilipvs.VirtualServer, rs *utilipvs.RealServer) error {
  114. // Try to delete rs before add it to graceful delete list
  115. ele := &listItem{
  116. VirtualServer: vs,
  117. RealServer: rs,
  118. }
  119. deleted, err := m.deleteRsFunc(ele)
  120. if err != nil {
  121. klog.Errorf("Delete rs %q err: %v", ele.String(), err)
  122. }
  123. if deleted {
  124. return nil
  125. }
  126. rs.Weight = 0
  127. err = m.ipvs.UpdateRealServer(vs, rs)
  128. if err != nil {
  129. return err
  130. }
  131. klog.V(5).Infof("Adding an element to graceful delete rsList: %+v", ele)
  132. m.rsList.add(ele)
  133. return nil
  134. }
  135. func (m *GracefulTerminationManager) deleteRsFunc(rsToDelete *listItem) (bool, error) {
  136. klog.V(5).Infof("Trying to delete rs: %s", rsToDelete.String())
  137. rss, err := m.ipvs.GetRealServers(rsToDelete.VirtualServer)
  138. if err != nil {
  139. return false, err
  140. }
  141. for _, rs := range rss {
  142. if rsToDelete.RealServer.Equal(rs) {
  143. // For UDP traffic, no graceful termination, we immediately delete the RS
  144. // (existing connections will be deleted on the next packet because sysctlExpireNoDestConn=1)
  145. // For other protocols, don't delete until all connections have expired)
  146. if strings.ToUpper(rsToDelete.VirtualServer.Protocol) != "UDP" && rs.ActiveConn+rs.InactiveConn != 0 {
  147. klog.V(5).Infof("Not deleting, RS %v: %v ActiveConn, %v InactiveConn", rsToDelete.String(), rs.ActiveConn, rs.InactiveConn)
  148. return false, nil
  149. }
  150. klog.V(5).Infof("Deleting rs: %s", rsToDelete.String())
  151. err := m.ipvs.DeleteRealServer(rsToDelete.VirtualServer, rs)
  152. if err != nil {
  153. return false, fmt.Errorf("Delete destination %q err: %v", rs.String(), err)
  154. }
  155. return true, nil
  156. }
  157. }
  158. return true, fmt.Errorf("Failed to delete rs %q, can't find the real server", rsToDelete.String())
  159. }
  160. func (m *GracefulTerminationManager) tryDeleteRs() {
  161. if !m.rsList.flushList(m.deleteRsFunc) {
  162. klog.Errorf("Try flush graceful termination list err")
  163. }
  164. }
  165. // MoveRSOutofGracefulDeleteList to delete an rs and remove it from the rsList immediately
  166. func (m *GracefulTerminationManager) MoveRSOutofGracefulDeleteList(uniqueRS string) error {
  167. rsToDelete, find := m.rsList.exist(uniqueRS)
  168. if !find || rsToDelete == nil {
  169. return fmt.Errorf("failed to find rs: %q", uniqueRS)
  170. }
  171. err := m.ipvs.DeleteRealServer(rsToDelete.VirtualServer, rsToDelete.RealServer)
  172. if err != nil {
  173. return err
  174. }
  175. m.rsList.remove(rsToDelete)
  176. return nil
  177. }
  178. // Run start a goroutine to try to delete rs in the graceful delete rsList with an interval 1 minute
  179. func (m *GracefulTerminationManager) Run() {
  180. go wait.Until(m.tryDeleteRs, rsCheckDeleteInterval, wait.NeverStop)
  181. }