123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package kuberuntime
- import (
- "context"
- "errors"
- "fmt"
- "io"
- "math/rand"
- "net/url"
- "os"
- "path/filepath"
- "sort"
- "strings"
- "sync"
- "time"
- "google.golang.org/grpc"
- "github.com/armon/circbuf"
- "k8s.io/klog"
- v1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- kubetypes "k8s.io/apimachinery/pkg/types"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/sets"
- runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
- kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
- "k8s.io/kubernetes/pkg/kubelet/events"
- "k8s.io/kubernetes/pkg/kubelet/types"
- "k8s.io/kubernetes/pkg/kubelet/util/format"
- "k8s.io/kubernetes/pkg/util/selinux"
- "k8s.io/kubernetes/pkg/util/tail"
- )
- var (
- // ErrCreateContainerConfig - failed to create container config
- ErrCreateContainerConfig = errors.New("CreateContainerConfigError")
- // ErrCreateContainer - failed to create container
- ErrCreateContainer = errors.New("CreateContainerError")
- // ErrPreStartHook - failed to execute PreStartHook
- ErrPreStartHook = errors.New("PreStartHookError")
- // ErrPostStartHook - failed to execute PostStartHook
- ErrPostStartHook = errors.New("PostStartHookError")
- )
- // recordContainerEvent should be used by the runtime manager for all container related events.
- // it has sanity checks to ensure that we do not write events that can abuse our masters.
- // in particular, it ensures that a containerID never appears in an event message as that
- // is prone to causing a lot of distinct events that do not count well.
- // it replaces any reference to a containerID with the containerName which is stable, and is what users know.
- func (m *kubeGenericRuntimeManager) recordContainerEvent(pod *v1.Pod, container *v1.Container, containerID, eventType, reason, message string, args ...interface{}) {
- ref, err := kubecontainer.GenerateContainerRef(pod, container)
- if err != nil {
- klog.Errorf("Can't make a ref to pod %q, container %v: %v", format.Pod(pod), container.Name, err)
- return
- }
- eventMessage := message
- if len(args) > 0 {
- eventMessage = fmt.Sprintf(message, args...)
- }
- // this is a hack, but often the error from the runtime includes the containerID
- // which kills our ability to deduplicate events. this protection makes a huge
- // difference in the number of unique events
- if containerID != "" {
- eventMessage = strings.Replace(eventMessage, containerID, container.Name, -1)
- }
- m.recorder.Event(ref, eventType, reason, eventMessage)
- }
- // startContainer starts a container and returns a message indicates why it is failed on error.
- // It starts the container through the following steps:
- // * pull the image
- // * create the container
- // * start the container
- // * run the post start lifecycle hooks (if applicable)
- func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string) (string, error) {
- // Step 1: pull the image.
- imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
- if err != nil {
- m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
- return msg, err
- }
- // Step 2: create the container.
- ref, err := kubecontainer.GenerateContainerRef(pod, container)
- if err != nil {
- klog.Errorf("Can't make a ref to pod %q, container %v: %v", format.Pod(pod), container.Name, err)
- }
- klog.V(4).Infof("Generating ref for container %s: %#v", container.Name, ref)
- // For a new container, the RestartCount should be 0
- restartCount := 0
- containerStatus := podStatus.FindContainerStatusByName(container.Name)
- if containerStatus != nil {
- restartCount = containerStatus.RestartCount + 1
- }
- containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef)
- if cleanupAction != nil {
- defer cleanupAction()
- }
- if err != nil {
- m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
- return grpc.ErrorDesc(err), ErrCreateContainerConfig
- }
- containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
- if err != nil {
- m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
- return grpc.ErrorDesc(err), ErrCreateContainer
- }
- err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
- if err != nil {
- m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", grpc.ErrorDesc(err))
- return grpc.ErrorDesc(err), ErrPreStartHook
- }
- m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, fmt.Sprintf("Created container %s", container.Name))
- if ref != nil {
- m.containerRefManager.SetRef(kubecontainer.ContainerID{
- Type: m.runtimeName,
- ID: containerID,
- }, ref)
- }
- // Step 3: start the container.
- err = m.runtimeService.StartContainer(containerID)
- if err != nil {
- m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", grpc.ErrorDesc(err))
- return grpc.ErrorDesc(err), kubecontainer.ErrRunContainer
- }
- m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, fmt.Sprintf("Started container %s", container.Name))
- // Symlink container logs to the legacy container log location for cluster logging
- // support.
- // TODO(random-liu): Remove this after cluster logging supports CRI container log path.
- containerMeta := containerConfig.GetMetadata()
- sandboxMeta := podSandboxConfig.GetMetadata()
- legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
- sandboxMeta.Namespace)
- containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
- // only create legacy symlink if containerLog path exists (or the error is not IsNotExist).
- // Because if containerLog path does not exist, only dandling legacySymlink is created.
- // This dangling legacySymlink is later removed by container gc, so it does not make sense
- // to create it in the first place. it happens when journald logging driver is used with docker.
- if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
- if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
- klog.Errorf("Failed to create legacy symbolic link %q to container %q log %q: %v",
- legacySymlink, containerID, containerLog, err)
- }
- }
- // Step 4: execute the post start hook.
- if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
- kubeContainerID := kubecontainer.ContainerID{
- Type: m.runtimeName,
- ID: containerID,
- }
- msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
- if handlerErr != nil {
- m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)
- if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", nil); err != nil {
- klog.Errorf("Failed to kill container %q(id=%q) in pod %q: %v, %v",
- container.Name, kubeContainerID.String(), format.Pod(pod), ErrPostStartHook, err)
- }
- return msg, fmt.Errorf("%s: %v", ErrPostStartHook, handlerErr)
- }
- }
- return "", nil
- }
- // generateContainerConfig generates container config for kubelet runtime v1.
- func (m *kubeGenericRuntimeManager) generateContainerConfig(container *v1.Container, pod *v1.Pod, restartCount int, podIP, imageRef string) (*runtimeapi.ContainerConfig, func(), error) {
- opts, cleanupAction, err := m.runtimeHelper.GenerateRunContainerOptions(pod, container, podIP)
- if err != nil {
- return nil, nil, err
- }
- uid, username, err := m.getImageUser(container.Image)
- if err != nil {
- return nil, cleanupAction, err
- }
- // Verify RunAsNonRoot. Non-root verification only supports numeric user.
- if err := verifyRunAsNonRoot(pod, container, uid, username); err != nil {
- return nil, cleanupAction, err
- }
- command, args := kubecontainer.ExpandContainerCommandAndArgs(container, opts.Envs)
- logDir := BuildContainerLogsDirectory(pod.Namespace, pod.Name, pod.UID, container.Name)
- err = m.osInterface.MkdirAll(logDir, 0755)
- if err != nil {
- return nil, cleanupAction, fmt.Errorf("create container log directory for container %s failed: %v", container.Name, err)
- }
- containerLogsPath := buildContainerLogsPath(container.Name, restartCount)
- restartCountUint32 := uint32(restartCount)
- config := &runtimeapi.ContainerConfig{
- Metadata: &runtimeapi.ContainerMetadata{
- Name: container.Name,
- Attempt: restartCountUint32,
- },
- Image: &runtimeapi.ImageSpec{Image: imageRef},
- Command: command,
- Args: args,
- WorkingDir: container.WorkingDir,
- Labels: newContainerLabels(container, pod),
- Annotations: newContainerAnnotations(container, pod, restartCount, opts),
- Devices: makeDevices(opts),
- Mounts: m.makeMounts(opts, container),
- LogPath: containerLogsPath,
- Stdin: container.Stdin,
- StdinOnce: container.StdinOnce,
- Tty: container.TTY,
- }
- // set platform specific configurations.
- if err := m.applyPlatformSpecificContainerConfig(config, container, pod, uid, username); err != nil {
- return nil, cleanupAction, err
- }
- // set environment variables
- envs := make([]*runtimeapi.KeyValue, len(opts.Envs))
- for idx := range opts.Envs {
- e := opts.Envs[idx]
- envs[idx] = &runtimeapi.KeyValue{
- Key: e.Name,
- Value: e.Value,
- }
- }
- config.Envs = envs
- return config, cleanupAction, nil
- }
- // makeDevices generates container devices for kubelet runtime v1.
- func makeDevices(opts *kubecontainer.RunContainerOptions) []*runtimeapi.Device {
- devices := make([]*runtimeapi.Device, len(opts.Devices))
- for idx := range opts.Devices {
- device := opts.Devices[idx]
- devices[idx] = &runtimeapi.Device{
- HostPath: device.PathOnHost,
- ContainerPath: device.PathInContainer,
- Permissions: device.Permissions,
- }
- }
- return devices
- }
- // makeMounts generates container volume mounts for kubelet runtime v1.
- func (m *kubeGenericRuntimeManager) makeMounts(opts *kubecontainer.RunContainerOptions, container *v1.Container) []*runtimeapi.Mount {
- volumeMounts := []*runtimeapi.Mount{}
- for idx := range opts.Mounts {
- v := opts.Mounts[idx]
- selinuxRelabel := v.SELinuxRelabel && selinux.SELinuxEnabled()
- mount := &runtimeapi.Mount{
- HostPath: v.HostPath,
- ContainerPath: v.ContainerPath,
- Readonly: v.ReadOnly,
- SelinuxRelabel: selinuxRelabel,
- Propagation: v.Propagation,
- }
- volumeMounts = append(volumeMounts, mount)
- }
- // The reason we create and mount the log file in here (not in kubelet) is because
- // the file's location depends on the ID of the container, and we need to create and
- // mount the file before actually starting the container.
- if opts.PodContainerDir != "" && len(container.TerminationMessagePath) != 0 {
- // Because the PodContainerDir contains pod uid and container name which is unique enough,
- // here we just add a random id to make the path unique for different instances
- // of the same container.
- cid := makeUID()
- containerLogPath := filepath.Join(opts.PodContainerDir, cid)
- fs, err := m.osInterface.Create(containerLogPath)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("error on creating termination-log file %q: %v", containerLogPath, err))
- } else {
- fs.Close()
- // Chmod is needed because ioutil.WriteFile() ends up calling
- // open(2) to create the file, so the final mode used is "mode &
- // ~umask". But we want to make sure the specified mode is used
- // in the file no matter what the umask is.
- if err := m.osInterface.Chmod(containerLogPath, 0666); err != nil {
- utilruntime.HandleError(fmt.Errorf("unable to set termination-log file permissions %q: %v", containerLogPath, err))
- }
- selinuxRelabel := selinux.SELinuxEnabled()
- volumeMounts = append(volumeMounts, &runtimeapi.Mount{
- HostPath: containerLogPath,
- ContainerPath: container.TerminationMessagePath,
- SelinuxRelabel: selinuxRelabel,
- })
- }
- }
- return volumeMounts
- }
- // getKubeletContainers lists containers managed by kubelet.
- // The boolean parameter specifies whether returns all containers including
- // those already exited and dead containers (used for garbage collection).
- func (m *kubeGenericRuntimeManager) getKubeletContainers(allContainers bool) ([]*runtimeapi.Container, error) {
- filter := &runtimeapi.ContainerFilter{}
- if !allContainers {
- filter.State = &runtimeapi.ContainerStateValue{
- State: runtimeapi.ContainerState_CONTAINER_RUNNING,
- }
- }
- containers, err := m.runtimeService.ListContainers(filter)
- if err != nil {
- klog.Errorf("getKubeletContainers failed: %v", err)
- return nil, err
- }
- return containers, nil
- }
- // makeUID returns a randomly generated string.
- func makeUID() string {
- return fmt.Sprintf("%08x", rand.Uint32())
- }
- // getTerminationMessage looks on the filesystem for the provided termination message path, returning a limited
- // amount of those bytes, or returns true if the logs should be checked.
- func getTerminationMessage(status *runtimeapi.ContainerStatus, terminationMessagePath string, fallbackToLogs bool) (string, bool) {
- if len(terminationMessagePath) == 0 {
- return "", fallbackToLogs
- }
- for _, mount := range status.Mounts {
- if mount.ContainerPath != terminationMessagePath {
- continue
- }
- path := mount.HostPath
- data, _, err := tail.ReadAtMost(path, kubecontainer.MaxContainerTerminationMessageLength)
- if err != nil {
- if os.IsNotExist(err) {
- return "", fallbackToLogs
- }
- return fmt.Sprintf("Error on reading termination log %s: %v", path, err), false
- }
- return string(data), (fallbackToLogs && len(data) == 0)
- }
- return "", fallbackToLogs
- }
- // readLastStringFromContainerLogs attempts to read up to the max log length from the end of the CRI log represented
- // by path. It reads up to max log lines.
- func (m *kubeGenericRuntimeManager) readLastStringFromContainerLogs(path string) string {
- value := int64(kubecontainer.MaxContainerTerminationMessageLogLines)
- buf, _ := circbuf.NewBuffer(kubecontainer.MaxContainerTerminationMessageLogLength)
- if err := m.ReadLogs(context.Background(), path, "", &v1.PodLogOptions{TailLines: &value}, buf, buf); err != nil {
- return fmt.Sprintf("Error on reading termination message from logs: %v", err)
- }
- return buf.String()
- }
- // getPodContainerStatuses gets all containers' statuses for the pod.
- func (m *kubeGenericRuntimeManager) getPodContainerStatuses(uid kubetypes.UID, name, namespace string) ([]*kubecontainer.ContainerStatus, error) {
- // Select all containers of the given pod.
- containers, err := m.runtimeService.ListContainers(&runtimeapi.ContainerFilter{
- LabelSelector: map[string]string{types.KubernetesPodUIDLabel: string(uid)},
- })
- if err != nil {
- klog.Errorf("ListContainers error: %v", err)
- return nil, err
- }
- statuses := make([]*kubecontainer.ContainerStatus, len(containers))
- // TODO: optimization: set maximum number of containers per container name to examine.
- for i, c := range containers {
- status, err := m.runtimeService.ContainerStatus(c.Id)
- if err != nil {
- // Merely log this here; GetPodStatus will actually report the error out.
- klog.V(4).Infof("ContainerStatus for %s error: %v", c.Id, err)
- return nil, err
- }
- cStatus := toKubeContainerStatus(status, m.runtimeName)
- if status.State == runtimeapi.ContainerState_CONTAINER_EXITED {
- // Populate the termination message if needed.
- annotatedInfo := getContainerInfoFromAnnotations(status.Annotations)
- fallbackToLogs := annotatedInfo.TerminationMessagePolicy == v1.TerminationMessageFallbackToLogsOnError && cStatus.ExitCode != 0
- tMessage, checkLogs := getTerminationMessage(status, annotatedInfo.TerminationMessagePath, fallbackToLogs)
- if checkLogs {
- // if dockerLegacyService is populated, we're supposed to use it to fetch logs
- if m.legacyLogProvider != nil {
- tMessage, err = m.legacyLogProvider.GetContainerLogTail(uid, name, namespace, kubecontainer.ContainerID{Type: m.runtimeName, ID: c.Id})
- if err != nil {
- tMessage = fmt.Sprintf("Error reading termination message from logs: %v", err)
- }
- } else {
- tMessage = m.readLastStringFromContainerLogs(status.GetLogPath())
- }
- }
- // Use the termination message written by the application is not empty
- if len(tMessage) != 0 {
- cStatus.Message = tMessage
- }
- }
- statuses[i] = cStatus
- }
- sort.Sort(containerStatusByCreated(statuses))
- return statuses, nil
- }
- func toKubeContainerStatus(status *runtimeapi.ContainerStatus, runtimeName string) *kubecontainer.ContainerStatus {
- annotatedInfo := getContainerInfoFromAnnotations(status.Annotations)
- labeledInfo := getContainerInfoFromLabels(status.Labels)
- cStatus := &kubecontainer.ContainerStatus{
- ID: kubecontainer.ContainerID{
- Type: runtimeName,
- ID: status.Id,
- },
- Name: labeledInfo.ContainerName,
- Image: status.Image.Image,
- ImageID: status.ImageRef,
- Hash: annotatedInfo.Hash,
- RestartCount: annotatedInfo.RestartCount,
- State: toKubeContainerState(status.State),
- CreatedAt: time.Unix(0, status.CreatedAt),
- }
- if status.State != runtimeapi.ContainerState_CONTAINER_CREATED {
- // If container is not in the created state, we have tried and
- // started the container. Set the StartedAt time.
- cStatus.StartedAt = time.Unix(0, status.StartedAt)
- }
- if status.State == runtimeapi.ContainerState_CONTAINER_EXITED {
- cStatus.Reason = status.Reason
- cStatus.Message = status.Message
- cStatus.ExitCode = int(status.ExitCode)
- cStatus.FinishedAt = time.Unix(0, status.FinishedAt)
- }
- return cStatus
- }
- // executePreStopHook runs the pre-stop lifecycle hooks if applicable and returns the duration it takes.
- func (m *kubeGenericRuntimeManager) executePreStopHook(pod *v1.Pod, containerID kubecontainer.ContainerID, containerSpec *v1.Container, gracePeriod int64) int64 {
- klog.V(3).Infof("Running preStop hook for container %q", containerID.String())
- start := metav1.Now()
- done := make(chan struct{})
- go func() {
- defer close(done)
- defer utilruntime.HandleCrash()
- if msg, err := m.runner.Run(containerID, pod, containerSpec, containerSpec.Lifecycle.PreStop); err != nil {
- klog.Errorf("preStop hook for container %q failed: %v", containerSpec.Name, err)
- m.recordContainerEvent(pod, containerSpec, containerID.ID, v1.EventTypeWarning, events.FailedPreStopHook, msg)
- }
- }()
- select {
- case <-time.After(time.Duration(gracePeriod) * time.Second):
- klog.V(2).Infof("preStop hook for container %q did not complete in %d seconds", containerID, gracePeriod)
- case <-done:
- klog.V(3).Infof("preStop hook for container %q completed", containerID)
- }
- return int64(metav1.Now().Sub(start.Time).Seconds())
- }
- // restoreSpecsFromContainerLabels restores all information needed for killing a container. In some
- // case we may not have pod and container spec when killing a container, e.g. pod is deleted during
- // kubelet restart.
- // To solve this problem, we've already written necessary information into container labels. Here we
- // just need to retrieve them from container labels and restore the specs.
- // TODO(random-liu): Add a node e2e test to test this behaviour.
- // TODO(random-liu): Change the lifecycle handler to just accept information needed, so that we can
- // just pass the needed function not create the fake object.
- func (m *kubeGenericRuntimeManager) restoreSpecsFromContainerLabels(containerID kubecontainer.ContainerID) (*v1.Pod, *v1.Container, error) {
- var pod *v1.Pod
- var container *v1.Container
- s, err := m.runtimeService.ContainerStatus(containerID.ID)
- if err != nil {
- return nil, nil, err
- }
- l := getContainerInfoFromLabels(s.Labels)
- a := getContainerInfoFromAnnotations(s.Annotations)
- // Notice that the followings are not full spec. The container killing code should not use
- // un-restored fields.
- pod = &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- UID: l.PodUID,
- Name: l.PodName,
- Namespace: l.PodNamespace,
- DeletionGracePeriodSeconds: a.PodDeletionGracePeriod,
- },
- Spec: v1.PodSpec{
- TerminationGracePeriodSeconds: a.PodTerminationGracePeriod,
- },
- }
- container = &v1.Container{
- Name: l.ContainerName,
- Ports: a.ContainerPorts,
- TerminationMessagePath: a.TerminationMessagePath,
- }
- if a.PreStopHandler != nil {
- container.Lifecycle = &v1.Lifecycle{
- PreStop: a.PreStopHandler,
- }
- }
- return pod, container, nil
- }
- // killContainer kills a container through the following steps:
- // * Run the pre-stop lifecycle hooks (if applicable).
- // * Stop the container.
- func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, message string, gracePeriodOverride *int64) error {
- var containerSpec *v1.Container
- if pod != nil {
- if containerSpec = kubecontainer.GetContainerSpec(pod, containerName); containerSpec == nil {
- return fmt.Errorf("failed to get containerSpec %q(id=%q) in pod %q when killing container for reason %q",
- containerName, containerID.String(), format.Pod(pod), message)
- }
- } else {
- // Restore necessary information if one of the specs is nil.
- restoredPod, restoredContainer, err := m.restoreSpecsFromContainerLabels(containerID)
- if err != nil {
- return err
- }
- pod, containerSpec = restoredPod, restoredContainer
- }
- // From this point, pod and container must be non-nil.
- gracePeriod := int64(minimumGracePeriodInSeconds)
- switch {
- case pod.DeletionGracePeriodSeconds != nil:
- gracePeriod = *pod.DeletionGracePeriodSeconds
- case pod.Spec.TerminationGracePeriodSeconds != nil:
- gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
- }
- if len(message) == 0 {
- message = fmt.Sprintf("Stopping container %s", containerSpec.Name)
- }
- m.recordContainerEvent(pod, containerSpec, containerID.ID, v1.EventTypeNormal, events.KillingContainer, message)
- // Run internal pre-stop lifecycle hook
- if err := m.internalLifecycle.PreStopContainer(containerID.ID); err != nil {
- return err
- }
- // Run the pre-stop lifecycle hooks if applicable and if there is enough time to run it
- if containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil && gracePeriod > 0 {
- gracePeriod = gracePeriod - m.executePreStopHook(pod, containerID, containerSpec, gracePeriod)
- }
- // always give containers a minimal shutdown window to avoid unnecessary SIGKILLs
- if gracePeriod < minimumGracePeriodInSeconds {
- gracePeriod = minimumGracePeriodInSeconds
- }
- if gracePeriodOverride != nil {
- gracePeriod = *gracePeriodOverride
- klog.V(3).Infof("Killing container %q, but using %d second grace period override", containerID, gracePeriod)
- }
- klog.V(2).Infof("Killing container %q with %d second grace period", containerID.String(), gracePeriod)
- err := m.runtimeService.StopContainer(containerID.ID, gracePeriod)
- if err != nil {
- klog.Errorf("Container %q termination failed with gracePeriod %d: %v", containerID.String(), gracePeriod, err)
- } else {
- klog.V(3).Infof("Container %q exited normally", containerID.String())
- }
- m.containerRefManager.ClearRef(containerID)
- return err
- }
- // killContainersWithSyncResult kills all pod's containers with sync results.
- func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (syncResults []*kubecontainer.SyncResult) {
- containerResults := make(chan *kubecontainer.SyncResult, len(runningPod.Containers))
- wg := sync.WaitGroup{}
- wg.Add(len(runningPod.Containers))
- for _, container := range runningPod.Containers {
- go func(container *kubecontainer.Container) {
- defer utilruntime.HandleCrash()
- defer wg.Done()
- killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name)
- if err := m.killContainer(pod, container.ID, container.Name, "", gracePeriodOverride); err != nil {
- killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
- }
- containerResults <- killContainerResult
- }(container)
- }
- wg.Wait()
- close(containerResults)
- for containerResult := range containerResults {
- syncResults = append(syncResults, containerResult)
- }
- return
- }
- // pruneInitContainersBeforeStart ensures that before we begin creating init
- // containers, we have reduced the number of outstanding init containers still
- // present. This reduces load on the container garbage collector by only
- // preserving the most recent terminated init container.
- func (m *kubeGenericRuntimeManager) pruneInitContainersBeforeStart(pod *v1.Pod, podStatus *kubecontainer.PodStatus) {
- // only the last execution of each init container should be preserved, and only preserve it if it is in the
- // list of init containers to keep.
- initContainerNames := sets.NewString()
- for _, container := range pod.Spec.InitContainers {
- initContainerNames.Insert(container.Name)
- }
- for name := range initContainerNames {
- count := 0
- for _, status := range podStatus.ContainerStatuses {
- if status.Name != name || !initContainerNames.Has(status.Name) ||
- (status.State != kubecontainer.ContainerStateExited &&
- status.State != kubecontainer.ContainerStateUnknown) {
- continue
- }
- // Remove init containers in unknown state. It should have
- // been stopped before pruneInitContainersBeforeStart is
- // called.
- count++
- // keep the first init container for this name
- if count == 1 {
- continue
- }
- // prune all other init containers that match this container name
- klog.V(4).Infof("Removing init container %q instance %q %d", status.Name, status.ID.ID, count)
- if err := m.removeContainer(status.ID.ID); err != nil {
- utilruntime.HandleError(fmt.Errorf("failed to remove pod init container %q: %v; Skipping pod %q", status.Name, err, format.Pod(pod)))
- continue
- }
- // remove any references to this container
- if _, ok := m.containerRefManager.GetRef(status.ID); ok {
- m.containerRefManager.ClearRef(status.ID)
- } else {
- klog.Warningf("No ref for container %q", status.ID)
- }
- }
- }
- }
- // Remove all init containres. Note that this function does not check the state
- // of the container because it assumes all init containers have been stopped
- // before the call happens.
- func (m *kubeGenericRuntimeManager) purgeInitContainers(pod *v1.Pod, podStatus *kubecontainer.PodStatus) {
- initContainerNames := sets.NewString()
- for _, container := range pod.Spec.InitContainers {
- initContainerNames.Insert(container.Name)
- }
- for name := range initContainerNames {
- count := 0
- for _, status := range podStatus.ContainerStatuses {
- if status.Name != name || !initContainerNames.Has(status.Name) {
- continue
- }
- count++
- // Purge all init containers that match this container name
- klog.V(4).Infof("Removing init container %q instance %q %d", status.Name, status.ID.ID, count)
- if err := m.removeContainer(status.ID.ID); err != nil {
- utilruntime.HandleError(fmt.Errorf("failed to remove pod init container %q: %v; Skipping pod %q", status.Name, err, format.Pod(pod)))
- continue
- }
- // Remove any references to this container
- if _, ok := m.containerRefManager.GetRef(status.ID); ok {
- m.containerRefManager.ClearRef(status.ID)
- } else {
- klog.Warningf("No ref for container %q", status.ID)
- }
- }
- }
- }
- // findNextInitContainerToRun returns the status of the last failed container, the
- // index of next init container to start, or done if there are no further init containers.
- // Status is only returned if an init container is failed, in which case next will
- // point to the current container.
- func findNextInitContainerToRun(pod *v1.Pod, podStatus *kubecontainer.PodStatus) (status *kubecontainer.ContainerStatus, next *v1.Container, done bool) {
- if len(pod.Spec.InitContainers) == 0 {
- return nil, nil, true
- }
- // If there are failed containers, return the status of the last failed one.
- for i := len(pod.Spec.InitContainers) - 1; i >= 0; i-- {
- container := &pod.Spec.InitContainers[i]
- status := podStatus.FindContainerStatusByName(container.Name)
- if status != nil && isInitContainerFailed(status) {
- return status, container, false
- }
- }
- // There are no failed containers now.
- for i := len(pod.Spec.InitContainers) - 1; i >= 0; i-- {
- container := &pod.Spec.InitContainers[i]
- status := podStatus.FindContainerStatusByName(container.Name)
- if status == nil {
- continue
- }
- // container is still running, return not done.
- if status.State == kubecontainer.ContainerStateRunning {
- return nil, nil, false
- }
- if status.State == kubecontainer.ContainerStateExited {
- // all init containers successful
- if i == (len(pod.Spec.InitContainers) - 1) {
- return nil, nil, true
- }
- // all containers up to i successful, go to i+1
- return nil, &pod.Spec.InitContainers[i+1], false
- }
- }
- return nil, &pod.Spec.InitContainers[0], false
- }
- // GetContainerLogs returns logs of a specific container.
- func (m *kubeGenericRuntimeManager) GetContainerLogs(ctx context.Context, pod *v1.Pod, containerID kubecontainer.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) {
- status, err := m.runtimeService.ContainerStatus(containerID.ID)
- if err != nil {
- klog.V(4).Infof("failed to get container status for %v: %v", containerID.String(), err)
- return fmt.Errorf("Unable to retrieve container logs for %v", containerID.String())
- }
- return m.ReadLogs(ctx, status.GetLogPath(), containerID.ID, logOptions, stdout, stderr)
- }
- // GetExec gets the endpoint the runtime will serve the exec request from.
- func (m *kubeGenericRuntimeManager) GetExec(id kubecontainer.ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) {
- req := &runtimeapi.ExecRequest{
- ContainerId: id.ID,
- Cmd: cmd,
- Tty: tty,
- Stdin: stdin,
- Stdout: stdout,
- Stderr: stderr,
- }
- resp, err := m.runtimeService.Exec(req)
- if err != nil {
- return nil, err
- }
- return url.Parse(resp.Url)
- }
- // GetAttach gets the endpoint the runtime will serve the attach request from.
- func (m *kubeGenericRuntimeManager) GetAttach(id kubecontainer.ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error) {
- req := &runtimeapi.AttachRequest{
- ContainerId: id.ID,
- Stdin: stdin,
- Stdout: stdout,
- Stderr: stderr,
- Tty: tty,
- }
- resp, err := m.runtimeService.Attach(req)
- if err != nil {
- return nil, err
- }
- return url.Parse(resp.Url)
- }
- // RunInContainer synchronously executes the command in the container, and returns the output.
- func (m *kubeGenericRuntimeManager) RunInContainer(id kubecontainer.ContainerID, cmd []string, timeout time.Duration) ([]byte, error) {
- stdout, stderr, err := m.runtimeService.ExecSync(id.ID, cmd, timeout)
- // NOTE(tallclair): This does not correctly interleave stdout & stderr, but should be sufficient
- // for logging purposes. A combined output option will need to be added to the ExecSyncRequest
- // if more precise output ordering is ever required.
- return append(stdout, stderr...), err
- }
- // removeContainer removes the container and the container logs.
- // Notice that we remove the container logs first, so that container will not be removed if
- // container logs are failed to be removed, and kubelet will retry this later. This guarantees
- // that container logs to be removed with the container.
- // Notice that we assume that the container should only be removed in non-running state, and
- // it will not write container logs anymore in that state.
- func (m *kubeGenericRuntimeManager) removeContainer(containerID string) error {
- klog.V(4).Infof("Removing container %q", containerID)
- // Call internal container post-stop lifecycle hook.
- if err := m.internalLifecycle.PostStopContainer(containerID); err != nil {
- return err
- }
- // Remove the container log.
- // TODO: Separate log and container lifecycle management.
- if err := m.removeContainerLog(containerID); err != nil {
- return err
- }
- // Remove the container.
- return m.runtimeService.RemoveContainer(containerID)
- }
- // removeContainerLog removes the container log.
- func (m *kubeGenericRuntimeManager) removeContainerLog(containerID string) error {
- // Remove the container log.
- status, err := m.runtimeService.ContainerStatus(containerID)
- if err != nil {
- return fmt.Errorf("failed to get container status %q: %v", containerID, err)
- }
- labeledInfo := getContainerInfoFromLabels(status.Labels)
- path := status.GetLogPath()
- if err := m.osInterface.Remove(path); err != nil && !os.IsNotExist(err) {
- return fmt.Errorf("failed to remove container %q log %q: %v", containerID, path, err)
- }
- // Remove the legacy container log symlink.
- // TODO(random-liu): Remove this after cluster logging supports CRI container log path.
- legacySymlink := legacyLogSymlink(containerID, labeledInfo.ContainerName, labeledInfo.PodName,
- labeledInfo.PodNamespace)
- if err := m.osInterface.Remove(legacySymlink); err != nil && !os.IsNotExist(err) {
- return fmt.Errorf("failed to remove container %q log legacy symbolic link %q: %v",
- containerID, legacySymlink, err)
- }
- return nil
- }
- // DeleteContainer removes a container.
- func (m *kubeGenericRuntimeManager) DeleteContainer(containerID kubecontainer.ContainerID) error {
- return m.removeContainer(containerID.ID)
- }
|