status_manager.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package status
  14. import (
  15. "context"
  16. "fmt"
  17. "sort"
  18. "sync"
  19. "time"
  20. clientset "k8s.io/client-go/kubernetes"
  21. v1 "k8s.io/api/core/v1"
  22. apiequality "k8s.io/apimachinery/pkg/api/equality"
  23. "k8s.io/apimachinery/pkg/api/errors"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/apimachinery/pkg/types"
  26. "k8s.io/apimachinery/pkg/util/diff"
  27. "k8s.io/apimachinery/pkg/util/wait"
  28. "k8s.io/klog"
  29. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  30. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  31. kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
  32. kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
  33. "k8s.io/kubernetes/pkg/kubelet/util/format"
  34. statusutil "k8s.io/kubernetes/pkg/util/pod"
  35. )
  36. // A wrapper around v1.PodStatus that includes a version to enforce that stale pod statuses are
  37. // not sent to the API server.
  38. type versionedPodStatus struct {
  39. status v1.PodStatus
  40. // Monotonically increasing version number (per pod).
  41. version uint64
  42. // Pod name & namespace, for sending updates to API server.
  43. podName string
  44. podNamespace string
  45. }
  46. type podStatusSyncRequest struct {
  47. podUID types.UID
  48. status versionedPodStatus
  49. }
  50. // Updates pod statuses in apiserver. Writes only when new status has changed.
  51. // All methods are thread-safe.
  52. type manager struct {
  53. kubeClient clientset.Interface
  54. podManager kubepod.Manager
  55. // Map from pod UID to sync status of the corresponding pod.
  56. podStatuses map[types.UID]versionedPodStatus
  57. podStatusesLock sync.RWMutex
  58. podStatusChannel chan podStatusSyncRequest
  59. // Map from (mirror) pod UID to latest status version successfully sent to the API server.
  60. // apiStatusVersions must only be accessed from the sync thread.
  61. apiStatusVersions map[kubetypes.MirrorPodUID]uint64
  62. podDeletionSafety PodDeletionSafetyProvider
  63. }
  64. // PodStatusProvider knows how to provide status for a pod. It's intended to be used by other components
  65. // that need to introspect status.
  66. type PodStatusProvider interface {
  67. // GetPodStatus returns the cached status for the provided pod UID, as well as whether it
  68. // was a cache hit.
  69. GetPodStatus(uid types.UID) (v1.PodStatus, bool)
  70. }
  71. // PodDeletionSafetyProvider provides guarantees that a pod can be safely deleted.
  72. type PodDeletionSafetyProvider interface {
  73. // A function which returns true if the pod can safely be deleted
  74. PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool
  75. }
  76. // Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with
  77. // the latest v1.PodStatus. It also syncs updates back to the API server.
  78. type Manager interface {
  79. PodStatusProvider
  80. // Start the API server status sync loop.
  81. Start()
  82. // SetPodStatus caches updates the cached status for the given pod, and triggers a status update.
  83. SetPodStatus(pod *v1.Pod, status v1.PodStatus)
  84. // SetContainerReadiness updates the cached container status with the given readiness, and
  85. // triggers a status update.
  86. SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
  87. // SetContainerStartup updates the cached container status with the given startup, and
  88. // triggers a status update.
  89. SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool)
  90. // TerminatePod resets the container status for the provided pod to terminated and triggers
  91. // a status update.
  92. TerminatePod(pod *v1.Pod)
  93. // RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in
  94. // the provided podUIDs.
  95. RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
  96. }
  97. const syncPeriod = 10 * time.Second
  98. // NewManager returns a functional Manager.
  99. func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podDeletionSafety PodDeletionSafetyProvider) Manager {
  100. return &manager{
  101. kubeClient: kubeClient,
  102. podManager: podManager,
  103. podStatuses: make(map[types.UID]versionedPodStatus),
  104. podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses
  105. apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64),
  106. podDeletionSafety: podDeletionSafety,
  107. }
  108. }
  109. // isPodStatusByKubeletEqual returns true if the given pod statuses are equal when non-kubelet-owned
  110. // pod conditions are excluded.
  111. // This method normalizes the status before comparing so as to make sure that meaningless
  112. // changes will be ignored.
  113. func isPodStatusByKubeletEqual(oldStatus, status *v1.PodStatus) bool {
  114. oldCopy := oldStatus.DeepCopy()
  115. for _, c := range status.Conditions {
  116. if kubetypes.PodConditionByKubelet(c.Type) {
  117. _, oc := podutil.GetPodCondition(oldCopy, c.Type)
  118. if oc == nil || oc.Status != c.Status || oc.Message != c.Message || oc.Reason != c.Reason {
  119. return false
  120. }
  121. }
  122. }
  123. oldCopy.Conditions = status.Conditions
  124. return apiequality.Semantic.DeepEqual(oldCopy, status)
  125. }
  126. func (m *manager) Start() {
  127. // Don't start the status manager if we don't have a client. This will happen
  128. // on the master, where the kubelet is responsible for bootstrapping the pods
  129. // of the master components.
  130. if m.kubeClient == nil {
  131. klog.Infof("Kubernetes client is nil, not starting status manager.")
  132. return
  133. }
  134. klog.Info("Starting to sync pod status with apiserver")
  135. //lint:ignore SA1015 Ticker can link since this is only called once and doesn't handle termination.
  136. syncTicker := time.Tick(syncPeriod)
  137. // syncPod and syncBatch share the same go routine to avoid sync races.
  138. go wait.Forever(func() {
  139. select {
  140. case syncRequest := <-m.podStatusChannel:
  141. klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
  142. syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
  143. m.syncPod(syncRequest.podUID, syncRequest.status)
  144. case <-syncTicker:
  145. m.syncBatch()
  146. }
  147. }, 0)
  148. }
  149. func (m *manager) GetPodStatus(uid types.UID) (v1.PodStatus, bool) {
  150. m.podStatusesLock.RLock()
  151. defer m.podStatusesLock.RUnlock()
  152. status, ok := m.podStatuses[types.UID(m.podManager.TranslatePodUID(uid))]
  153. return status.status, ok
  154. }
  155. func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) {
  156. m.podStatusesLock.Lock()
  157. defer m.podStatusesLock.Unlock()
  158. for _, c := range pod.Status.Conditions {
  159. if !kubetypes.PodConditionByKubelet(c.Type) {
  160. klog.Errorf("Kubelet is trying to update pod condition %q for pod %q. "+
  161. "But it is not owned by kubelet.", string(c.Type), format.Pod(pod))
  162. }
  163. }
  164. // Make sure we're caching a deep copy.
  165. status = *status.DeepCopy()
  166. // Force a status update if deletion timestamp is set. This is necessary
  167. // because if the pod is in the non-running state, the pod worker still
  168. // needs to be able to trigger an update and/or deletion.
  169. m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil)
  170. }
  171. func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
  172. m.podStatusesLock.Lock()
  173. defer m.podStatusesLock.Unlock()
  174. pod, ok := m.podManager.GetPodByUID(podUID)
  175. if !ok {
  176. klog.V(4).Infof("Pod %q has been deleted, no need to update readiness", string(podUID))
  177. return
  178. }
  179. oldStatus, found := m.podStatuses[pod.UID]
  180. if !found {
  181. klog.Warningf("Container readiness changed before pod has synced: %q - %q",
  182. format.Pod(pod), containerID.String())
  183. return
  184. }
  185. // Find the container to update.
  186. containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
  187. if !ok {
  188. klog.Warningf("Container readiness changed for unknown container: %q - %q",
  189. format.Pod(pod), containerID.String())
  190. return
  191. }
  192. if containerStatus.Ready == ready {
  193. klog.V(4).Infof("Container readiness unchanged (%v): %q - %q", ready,
  194. format.Pod(pod), containerID.String())
  195. return
  196. }
  197. // Make sure we're not updating the cached version.
  198. status := *oldStatus.status.DeepCopy()
  199. containerStatus, _, _ = findContainerStatus(&status, containerID.String())
  200. containerStatus.Ready = ready
  201. // updateConditionFunc updates the corresponding type of condition
  202. updateConditionFunc := func(conditionType v1.PodConditionType, condition v1.PodCondition) {
  203. conditionIndex := -1
  204. for i, condition := range status.Conditions {
  205. if condition.Type == conditionType {
  206. conditionIndex = i
  207. break
  208. }
  209. }
  210. if conditionIndex != -1 {
  211. status.Conditions[conditionIndex] = condition
  212. } else {
  213. klog.Warningf("PodStatus missing %s type condition: %+v", conditionType, status)
  214. status.Conditions = append(status.Conditions, condition)
  215. }
  216. }
  217. updateConditionFunc(v1.PodReady, GeneratePodReadyCondition(&pod.Spec, status.Conditions, status.ContainerStatuses, status.Phase))
  218. updateConditionFunc(v1.ContainersReady, GenerateContainersReadyCondition(&pod.Spec, status.ContainerStatuses, status.Phase))
  219. m.updateStatusInternal(pod, status, false)
  220. }
  221. func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool) {
  222. m.podStatusesLock.Lock()
  223. defer m.podStatusesLock.Unlock()
  224. pod, ok := m.podManager.GetPodByUID(podUID)
  225. if !ok {
  226. klog.V(4).Infof("Pod %q has been deleted, no need to update startup", string(podUID))
  227. return
  228. }
  229. oldStatus, found := m.podStatuses[pod.UID]
  230. if !found {
  231. klog.Warningf("Container startup changed before pod has synced: %q - %q",
  232. format.Pod(pod), containerID.String())
  233. return
  234. }
  235. // Find the container to update.
  236. containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
  237. if !ok {
  238. klog.Warningf("Container startup changed for unknown container: %q - %q",
  239. format.Pod(pod), containerID.String())
  240. return
  241. }
  242. if containerStatus.Started != nil && *containerStatus.Started == started {
  243. klog.V(4).Infof("Container startup unchanged (%v): %q - %q", started,
  244. format.Pod(pod), containerID.String())
  245. return
  246. }
  247. // Make sure we're not updating the cached version.
  248. status := *oldStatus.status.DeepCopy()
  249. containerStatus, _, _ = findContainerStatus(&status, containerID.String())
  250. containerStatus.Started = &started
  251. m.updateStatusInternal(pod, status, false)
  252. }
  253. func findContainerStatus(status *v1.PodStatus, containerID string) (containerStatus *v1.ContainerStatus, init bool, ok bool) {
  254. // Find the container to update.
  255. for i, c := range status.ContainerStatuses {
  256. if c.ContainerID == containerID {
  257. return &status.ContainerStatuses[i], false, true
  258. }
  259. }
  260. for i, c := range status.InitContainerStatuses {
  261. if c.ContainerID == containerID {
  262. return &status.InitContainerStatuses[i], true, true
  263. }
  264. }
  265. return nil, false, false
  266. }
  267. func (m *manager) TerminatePod(pod *v1.Pod) {
  268. m.podStatusesLock.Lock()
  269. defer m.podStatusesLock.Unlock()
  270. oldStatus := &pod.Status
  271. if cachedStatus, ok := m.podStatuses[pod.UID]; ok {
  272. oldStatus = &cachedStatus.status
  273. }
  274. status := *oldStatus.DeepCopy()
  275. for i := range status.ContainerStatuses {
  276. status.ContainerStatuses[i].State = v1.ContainerState{
  277. Terminated: &v1.ContainerStateTerminated{},
  278. }
  279. }
  280. for i := range status.InitContainerStatuses {
  281. status.InitContainerStatuses[i].State = v1.ContainerState{
  282. Terminated: &v1.ContainerStateTerminated{},
  283. }
  284. }
  285. m.updateStatusInternal(pod, status, true)
  286. }
  287. // checkContainerStateTransition ensures that no container is trying to transition
  288. // from a terminated to non-terminated state, which is illegal and indicates a
  289. // logical error in the kubelet.
  290. func checkContainerStateTransition(oldStatuses, newStatuses []v1.ContainerStatus, restartPolicy v1.RestartPolicy) error {
  291. // If we should always restart, containers are allowed to leave the terminated state
  292. if restartPolicy == v1.RestartPolicyAlways {
  293. return nil
  294. }
  295. for _, oldStatus := range oldStatuses {
  296. // Skip any container that wasn't terminated
  297. if oldStatus.State.Terminated == nil {
  298. continue
  299. }
  300. // Skip any container that failed but is allowed to restart
  301. if oldStatus.State.Terminated.ExitCode != 0 && restartPolicy == v1.RestartPolicyOnFailure {
  302. continue
  303. }
  304. for _, newStatus := range newStatuses {
  305. if oldStatus.Name == newStatus.Name && newStatus.State.Terminated == nil {
  306. return fmt.Errorf("terminated container %v attempted illegal transition to non-terminated state", newStatus.Name)
  307. }
  308. }
  309. }
  310. return nil
  311. }
  312. // updateStatusInternal updates the internal status cache, and queues an update to the api server if
  313. // necessary. Returns whether an update was triggered.
  314. // This method IS NOT THREAD SAFE and must be called from a locked function.
  315. func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate bool) bool {
  316. var oldStatus v1.PodStatus
  317. cachedStatus, isCached := m.podStatuses[pod.UID]
  318. if isCached {
  319. oldStatus = cachedStatus.status
  320. } else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok {
  321. oldStatus = mirrorPod.Status
  322. } else {
  323. oldStatus = pod.Status
  324. }
  325. // Check for illegal state transition in containers
  326. if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err != nil {
  327. klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err)
  328. return false
  329. }
  330. if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err != nil {
  331. klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err)
  332. return false
  333. }
  334. // Set ContainersReadyCondition.LastTransitionTime.
  335. updateLastTransitionTime(&status, &oldStatus, v1.ContainersReady)
  336. // Set ReadyCondition.LastTransitionTime.
  337. updateLastTransitionTime(&status, &oldStatus, v1.PodReady)
  338. // Set InitializedCondition.LastTransitionTime.
  339. updateLastTransitionTime(&status, &oldStatus, v1.PodInitialized)
  340. // Set PodScheduledCondition.LastTransitionTime.
  341. updateLastTransitionTime(&status, &oldStatus, v1.PodScheduled)
  342. // ensure that the start time does not change across updates.
  343. if oldStatus.StartTime != nil && !oldStatus.StartTime.IsZero() {
  344. status.StartTime = oldStatus.StartTime
  345. } else if status.StartTime.IsZero() {
  346. // if the status has no start time, we need to set an initial time
  347. now := metav1.Now()
  348. status.StartTime = &now
  349. }
  350. normalizeStatus(pod, &status)
  351. // The intent here is to prevent concurrent updates to a pod's status from
  352. // clobbering each other so the phase of a pod progresses monotonically.
  353. if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate {
  354. klog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
  355. return false // No new status.
  356. }
  357. newStatus := versionedPodStatus{
  358. status: status,
  359. version: cachedStatus.version + 1,
  360. podName: pod.Name,
  361. podNamespace: pod.Namespace,
  362. }
  363. m.podStatuses[pod.UID] = newStatus
  364. select {
  365. case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}:
  366. klog.V(5).Infof("Status Manager: adding pod: %q, with status: (%d, %v) to podStatusChannel",
  367. pod.UID, newStatus.version, newStatus.status)
  368. return true
  369. default:
  370. // Let the periodic syncBatch handle the update if the channel is full.
  371. // We can't block, since we hold the mutex lock.
  372. klog.V(4).Infof("Skipping the status update for pod %q for now because the channel is full; status: %+v",
  373. format.Pod(pod), status)
  374. return false
  375. }
  376. }
  377. // updateLastTransitionTime updates the LastTransitionTime of a pod condition.
  378. func updateLastTransitionTime(status, oldStatus *v1.PodStatus, conditionType v1.PodConditionType) {
  379. _, condition := podutil.GetPodCondition(status, conditionType)
  380. if condition == nil {
  381. return
  382. }
  383. // Need to set LastTransitionTime.
  384. lastTransitionTime := metav1.Now()
  385. _, oldCondition := podutil.GetPodCondition(oldStatus, conditionType)
  386. if oldCondition != nil && condition.Status == oldCondition.Status {
  387. lastTransitionTime = oldCondition.LastTransitionTime
  388. }
  389. condition.LastTransitionTime = lastTransitionTime
  390. }
  391. // deletePodStatus simply removes the given pod from the status cache.
  392. func (m *manager) deletePodStatus(uid types.UID) {
  393. m.podStatusesLock.Lock()
  394. defer m.podStatusesLock.Unlock()
  395. delete(m.podStatuses, uid)
  396. }
  397. // TODO(filipg): It'd be cleaner if we can do this without signal from user.
  398. func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
  399. m.podStatusesLock.Lock()
  400. defer m.podStatusesLock.Unlock()
  401. for key := range m.podStatuses {
  402. if _, ok := podUIDs[key]; !ok {
  403. klog.V(5).Infof("Removing %q from status map.", key)
  404. delete(m.podStatuses, key)
  405. }
  406. }
  407. }
  408. // syncBatch syncs pods statuses with the apiserver.
  409. func (m *manager) syncBatch() {
  410. var updatedStatuses []podStatusSyncRequest
  411. podToMirror, mirrorToPod := m.podManager.GetUIDTranslations()
  412. func() { // Critical section
  413. m.podStatusesLock.RLock()
  414. defer m.podStatusesLock.RUnlock()
  415. // Clean up orphaned versions.
  416. for uid := range m.apiStatusVersions {
  417. _, hasPod := m.podStatuses[types.UID(uid)]
  418. _, hasMirror := mirrorToPod[uid]
  419. if !hasPod && !hasMirror {
  420. delete(m.apiStatusVersions, uid)
  421. }
  422. }
  423. for uid, status := range m.podStatuses {
  424. syncedUID := kubetypes.MirrorPodUID(uid)
  425. if mirrorUID, ok := podToMirror[kubetypes.ResolvedPodUID(uid)]; ok {
  426. if mirrorUID == "" {
  427. klog.V(5).Infof("Static pod %q (%s/%s) does not have a corresponding mirror pod; skipping", uid, status.podName, status.podNamespace)
  428. continue
  429. }
  430. syncedUID = mirrorUID
  431. }
  432. if m.needsUpdate(types.UID(syncedUID), status) {
  433. updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
  434. } else if m.needsReconcile(uid, status.status) {
  435. // Delete the apiStatusVersions here to force an update on the pod status
  436. // In most cases the deleted apiStatusVersions here should be filled
  437. // soon after the following syncPod() [If the syncPod() sync an update
  438. // successfully].
  439. delete(m.apiStatusVersions, syncedUID)
  440. updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
  441. }
  442. }
  443. }()
  444. for _, update := range updatedStatuses {
  445. klog.V(5).Infof("Status Manager: syncPod in syncbatch. pod UID: %q", update.podUID)
  446. m.syncPod(update.podUID, update.status)
  447. }
  448. }
  449. // syncPod syncs the given status with the API server. The caller must not hold the lock.
  450. func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
  451. if !m.needsUpdate(uid, status) {
  452. klog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid)
  453. return
  454. }
  455. // TODO: make me easier to express from client code
  456. pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{})
  457. if errors.IsNotFound(err) {
  458. klog.V(3).Infof("Pod %q does not exist on the server", format.PodDesc(status.podName, status.podNamespace, uid))
  459. // If the Pod is deleted the status will be cleared in
  460. // RemoveOrphanedStatuses, so we just ignore the update here.
  461. return
  462. }
  463. if err != nil {
  464. klog.Warningf("Failed to get status for pod %q: %v", format.PodDesc(status.podName, status.podNamespace, uid), err)
  465. return
  466. }
  467. translatedUID := m.podManager.TranslatePodUID(pod.UID)
  468. // Type convert original uid just for the purpose of comparison.
  469. if len(translatedUID) > 0 && translatedUID != kubetypes.ResolvedPodUID(uid) {
  470. klog.V(2).Infof("Pod %q was deleted and then recreated, skipping status update; old UID %q, new UID %q", format.Pod(pod), uid, translatedUID)
  471. m.deletePodStatus(uid)
  472. return
  473. }
  474. oldStatus := pod.Status.DeepCopy()
  475. newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status))
  476. klog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes)
  477. if err != nil {
  478. klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)
  479. return
  480. }
  481. pod = newPod
  482. klog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status)
  483. m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version
  484. // We don't handle graceful deletion of mirror pods.
  485. if m.canBeDeleted(pod, status.status) {
  486. deleteOptions := metav1.NewDeleteOptions(0)
  487. // Use the pod UID as the precondition for deletion to prevent deleting a newly created pod with the same name and namespace.
  488. deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(pod.UID))
  489. err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions)
  490. if err != nil {
  491. klog.Warningf("Failed to delete status for pod %q: %v", format.Pod(pod), err)
  492. return
  493. }
  494. klog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod))
  495. m.deletePodStatus(uid)
  496. }
  497. }
  498. // needsUpdate returns whether the status is stale for the given pod UID.
  499. // This method is not thread safe, and must only be accessed by the sync thread.
  500. func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool {
  501. latest, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(uid)]
  502. if !ok || latest < status.version {
  503. return true
  504. }
  505. pod, ok := m.podManager.GetPodByUID(uid)
  506. if !ok {
  507. return false
  508. }
  509. return m.canBeDeleted(pod, status.status)
  510. }
  511. func (m *manager) canBeDeleted(pod *v1.Pod, status v1.PodStatus) bool {
  512. if pod.DeletionTimestamp == nil || kubetypes.IsMirrorPod(pod) {
  513. return false
  514. }
  515. return m.podDeletionSafety.PodResourcesAreReclaimed(pod, status)
  516. }
  517. // needsReconcile compares the given status with the status in the pod manager (which
  518. // in fact comes from apiserver), returns whether the status needs to be reconciled with
  519. // the apiserver. Now when pod status is inconsistent between apiserver and kubelet,
  520. // kubelet should forcibly send an update to reconcile the inconsistence, because kubelet
  521. // should be the source of truth of pod status.
  522. // NOTE(random-liu): It's simpler to pass in mirror pod uid and get mirror pod by uid, but
  523. // now the pod manager only supports getting mirror pod by static pod, so we have to pass
  524. // static pod uid here.
  525. // TODO(random-liu): Simplify the logic when mirror pod manager is added.
  526. func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool {
  527. // The pod could be a static pod, so we should translate first.
  528. pod, ok := m.podManager.GetPodByUID(uid)
  529. if !ok {
  530. klog.V(4).Infof("Pod %q has been deleted, no need to reconcile", string(uid))
  531. return false
  532. }
  533. // If the pod is a static pod, we should check its mirror pod, because only status in mirror pod is meaningful to us.
  534. if kubetypes.IsStaticPod(pod) {
  535. mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod)
  536. if !ok {
  537. klog.V(4).Infof("Static pod %q has no corresponding mirror pod, no need to reconcile", format.Pod(pod))
  538. return false
  539. }
  540. pod = mirrorPod
  541. }
  542. podStatus := pod.Status.DeepCopy()
  543. normalizeStatus(pod, podStatus)
  544. if isPodStatusByKubeletEqual(podStatus, &status) {
  545. // If the status from the source is the same with the cached status,
  546. // reconcile is not needed. Just return.
  547. return false
  548. }
  549. klog.V(3).Infof("Pod status is inconsistent with cached status for pod %q, a reconciliation should be triggered:\n %s", format.Pod(pod),
  550. diff.ObjectDiff(podStatus, &status))
  551. return true
  552. }
  553. // normalizeStatus normalizes nanosecond precision timestamps in podStatus
  554. // down to second precision (*RFC339NANO* -> *RFC3339*). This must be done
  555. // before comparing podStatus to the status returned by apiserver because
  556. // apiserver does not support RFC339NANO.
  557. // Related issue #15262/PR #15263 to move apiserver to RFC339NANO is closed.
  558. func normalizeStatus(pod *v1.Pod, status *v1.PodStatus) *v1.PodStatus {
  559. bytesPerStatus := kubecontainer.MaxPodTerminationMessageLogLength
  560. if containers := len(pod.Spec.Containers) + len(pod.Spec.InitContainers); containers > 0 {
  561. bytesPerStatus = bytesPerStatus / containers
  562. }
  563. normalizeTimeStamp := func(t *metav1.Time) {
  564. *t = t.Rfc3339Copy()
  565. }
  566. normalizeContainerState := func(c *v1.ContainerState) {
  567. if c.Running != nil {
  568. normalizeTimeStamp(&c.Running.StartedAt)
  569. }
  570. if c.Terminated != nil {
  571. normalizeTimeStamp(&c.Terminated.StartedAt)
  572. normalizeTimeStamp(&c.Terminated.FinishedAt)
  573. if len(c.Terminated.Message) > bytesPerStatus {
  574. c.Terminated.Message = c.Terminated.Message[:bytesPerStatus]
  575. }
  576. }
  577. }
  578. if status.StartTime != nil {
  579. normalizeTimeStamp(status.StartTime)
  580. }
  581. for i := range status.Conditions {
  582. condition := &status.Conditions[i]
  583. normalizeTimeStamp(&condition.LastProbeTime)
  584. normalizeTimeStamp(&condition.LastTransitionTime)
  585. }
  586. // update container statuses
  587. for i := range status.ContainerStatuses {
  588. cstatus := &status.ContainerStatuses[i]
  589. normalizeContainerState(&cstatus.State)
  590. normalizeContainerState(&cstatus.LastTerminationState)
  591. }
  592. // Sort the container statuses, so that the order won't affect the result of comparison
  593. sort.Sort(kubetypes.SortedContainerStatuses(status.ContainerStatuses))
  594. // update init container statuses
  595. for i := range status.InitContainerStatuses {
  596. cstatus := &status.InitContainerStatuses[i]
  597. normalizeContainerState(&cstatus.State)
  598. normalizeContainerState(&cstatus.LastTerminationState)
  599. }
  600. // Sort the container statuses, so that the order won't affect the result of comparison
  601. kubetypes.SortInitContainerStatuses(pod, status.InitContainerStatuses)
  602. return status
  603. }
  604. // mergePodStatus merges oldPodStatus and newPodStatus where pod conditions
  605. // not owned by kubelet is preserved from oldPodStatus
  606. func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus) v1.PodStatus {
  607. podConditions := []v1.PodCondition{}
  608. for _, c := range oldPodStatus.Conditions {
  609. if !kubetypes.PodConditionByKubelet(c.Type) {
  610. podConditions = append(podConditions, c)
  611. }
  612. }
  613. for _, c := range newPodStatus.Conditions {
  614. if kubetypes.PodConditionByKubelet(c.Type) {
  615. podConditions = append(podConditions, c)
  616. }
  617. }
  618. newPodStatus.Conditions = podConditions
  619. return newPodStatus
  620. }
  621. // NeedToReconcilePodReadiness returns if the pod "Ready" condition need to be reconcile
  622. func NeedToReconcilePodReadiness(pod *v1.Pod) bool {
  623. if len(pod.Spec.ReadinessGates) == 0 {
  624. return false
  625. }
  626. podReadyCondition := GeneratePodReadyCondition(&pod.Spec, pod.Status.Conditions, pod.Status.ContainerStatuses, pod.Status.Phase)
  627. i, curCondition := podutil.GetPodConditionFromList(pod.Status.Conditions, v1.PodReady)
  628. // Only reconcile if "Ready" condition is present and Status or Message is not expected
  629. if i >= 0 && (curCondition.Status != podReadyCondition.Status || curCondition.Message != podReadyCondition.Message) {
  630. return true
  631. }
  632. return false
  633. }