strategy.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package pod
  14. import (
  15. "context"
  16. "fmt"
  17. "net"
  18. "net/http"
  19. "net/url"
  20. "strconv"
  21. "strings"
  22. "time"
  23. "k8s.io/apimachinery/pkg/api/errors"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/apimachinery/pkg/fields"
  26. "k8s.io/apimachinery/pkg/labels"
  27. "k8s.io/apimachinery/pkg/runtime"
  28. "k8s.io/apimachinery/pkg/types"
  29. utilnet "k8s.io/apimachinery/pkg/util/net"
  30. "k8s.io/apimachinery/pkg/util/validation/field"
  31. "k8s.io/apiserver/pkg/registry/generic"
  32. "k8s.io/apiserver/pkg/storage"
  33. "k8s.io/apiserver/pkg/storage/names"
  34. utilfeature "k8s.io/apiserver/pkg/util/feature"
  35. "k8s.io/kubernetes/pkg/api/legacyscheme"
  36. podutil "k8s.io/kubernetes/pkg/api/pod"
  37. api "k8s.io/kubernetes/pkg/apis/core"
  38. "k8s.io/kubernetes/pkg/apis/core/helper/qos"
  39. "k8s.io/kubernetes/pkg/apis/core/validation"
  40. "k8s.io/kubernetes/pkg/features"
  41. "k8s.io/kubernetes/pkg/kubelet/client"
  42. proxyutil "k8s.io/kubernetes/pkg/proxy/util"
  43. )
  44. // podStrategy implements behavior for Pods
  45. type podStrategy struct {
  46. runtime.ObjectTyper
  47. names.NameGenerator
  48. }
  49. // Strategy is the default logic that applies when creating and updating Pod
  50. // objects via the REST API.
  51. var Strategy = podStrategy{legacyscheme.Scheme, names.SimpleNameGenerator}
  52. // NamespaceScoped is true for pods.
  53. func (podStrategy) NamespaceScoped() bool {
  54. return true
  55. }
  56. // PrepareForCreate clears fields that are not allowed to be set by end users on creation.
  57. func (podStrategy) PrepareForCreate(ctx context.Context, obj runtime.Object) {
  58. pod := obj.(*api.Pod)
  59. pod.Status = api.PodStatus{
  60. Phase: api.PodPending,
  61. QOSClass: qos.GetPodQOS(pod),
  62. }
  63. podutil.DropDisabledPodFields(pod, nil)
  64. }
  65. // PrepareForUpdate clears fields that are not allowed to be set by end users on update.
  66. func (podStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) {
  67. newPod := obj.(*api.Pod)
  68. oldPod := old.(*api.Pod)
  69. newPod.Status = oldPod.Status
  70. podutil.DropDisabledPodFields(newPod, oldPod)
  71. }
  72. // Validate validates a new pod.
  73. func (podStrategy) Validate(ctx context.Context, obj runtime.Object) field.ErrorList {
  74. pod := obj.(*api.Pod)
  75. allErrs := validation.ValidatePodCreate(pod)
  76. allErrs = append(allErrs, validation.ValidateConditionalPod(pod, nil, field.NewPath(""))...)
  77. return allErrs
  78. }
  79. // Canonicalize normalizes the object after validation.
  80. func (podStrategy) Canonicalize(obj runtime.Object) {
  81. }
  82. // AllowCreateOnUpdate is false for pods.
  83. func (podStrategy) AllowCreateOnUpdate() bool {
  84. return false
  85. }
  86. // ValidateUpdate is the default update validation for an end user.
  87. func (podStrategy) ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList {
  88. errorList := validation.ValidatePod(obj.(*api.Pod))
  89. errorList = append(errorList, validation.ValidatePodUpdate(obj.(*api.Pod), old.(*api.Pod))...)
  90. errorList = append(errorList, validation.ValidateConditionalPod(obj.(*api.Pod), old.(*api.Pod), field.NewPath(""))...)
  91. return errorList
  92. }
  93. // AllowUnconditionalUpdate allows pods to be overwritten
  94. func (podStrategy) AllowUnconditionalUpdate() bool {
  95. return true
  96. }
  97. // CheckGracefulDelete allows a pod to be gracefully deleted. It updates the DeleteOptions to
  98. // reflect the desired grace value.
  99. func (podStrategy) CheckGracefulDelete(ctx context.Context, obj runtime.Object, options *metav1.DeleteOptions) bool {
  100. if options == nil {
  101. return false
  102. }
  103. pod := obj.(*api.Pod)
  104. period := int64(0)
  105. // user has specified a value
  106. if options.GracePeriodSeconds != nil {
  107. period = *options.GracePeriodSeconds
  108. } else {
  109. // use the default value if set, or deletes the pod immediately (0)
  110. if pod.Spec.TerminationGracePeriodSeconds != nil {
  111. period = *pod.Spec.TerminationGracePeriodSeconds
  112. }
  113. }
  114. // if the pod is not scheduled, delete immediately
  115. if len(pod.Spec.NodeName) == 0 {
  116. period = 0
  117. }
  118. // if the pod is already terminated, delete immediately
  119. if pod.Status.Phase == api.PodFailed || pod.Status.Phase == api.PodSucceeded {
  120. period = 0
  121. }
  122. // ensure the options and the pod are in sync
  123. options.GracePeriodSeconds = &period
  124. return true
  125. }
  126. type podStatusStrategy struct {
  127. podStrategy
  128. }
  129. // StatusStrategy wraps and exports the used podStrategy for the storage package.
  130. var StatusStrategy = podStatusStrategy{Strategy}
  131. func (podStatusStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) {
  132. newPod := obj.(*api.Pod)
  133. oldPod := old.(*api.Pod)
  134. newPod.Spec = oldPod.Spec
  135. newPod.DeletionTimestamp = nil
  136. // don't allow the pods/status endpoint to touch owner references since old kubelets corrupt them in a way
  137. // that breaks garbage collection
  138. newPod.OwnerReferences = oldPod.OwnerReferences
  139. }
  140. func (podStatusStrategy) ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList {
  141. return validation.ValidatePodStatusUpdate(obj.(*api.Pod), old.(*api.Pod))
  142. }
  143. type podEphemeralContainersStrategy struct {
  144. podStrategy
  145. }
  146. // EphemeralContainersStrategy wraps and exports the used podStrategy for the storage package.
  147. var EphemeralContainersStrategy = podEphemeralContainersStrategy{Strategy}
  148. func (podEphemeralContainersStrategy) ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList {
  149. return validation.ValidatePodEphemeralContainersUpdate(obj.(*api.Pod), old.(*api.Pod))
  150. }
  151. // GetAttrs returns labels and fields of a given object for filtering purposes.
  152. func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
  153. pod, ok := obj.(*api.Pod)
  154. if !ok {
  155. return nil, nil, fmt.Errorf("not a pod")
  156. }
  157. return labels.Set(pod.ObjectMeta.Labels), ToSelectableFields(pod), nil
  158. }
  159. // MatchPod returns a generic matcher for a given label and field selector.
  160. func MatchPod(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
  161. return storage.SelectionPredicate{
  162. Label: label,
  163. Field: field,
  164. GetAttrs: GetAttrs,
  165. IndexFields: []string{"spec.nodeName"},
  166. }
  167. }
  168. // NodeNameTriggerFunc returns value spec.nodename of given object.
  169. func NodeNameTriggerFunc(obj runtime.Object) string {
  170. return obj.(*api.Pod).Spec.NodeName
  171. }
  172. // ToSelectableFields returns a field set that represents the object
  173. // TODO: fields are not labels, and the validation rules for them do not apply.
  174. func ToSelectableFields(pod *api.Pod) fields.Set {
  175. // The purpose of allocation with a given number of elements is to reduce
  176. // amount of allocations needed to create the fields.Set. If you add any
  177. // field here or the number of object-meta related fields changes, this should
  178. // be adjusted.
  179. podSpecificFieldsSet := make(fields.Set, 9)
  180. podSpecificFieldsSet["spec.nodeName"] = pod.Spec.NodeName
  181. podSpecificFieldsSet["spec.restartPolicy"] = string(pod.Spec.RestartPolicy)
  182. podSpecificFieldsSet["spec.schedulerName"] = string(pod.Spec.SchedulerName)
  183. podSpecificFieldsSet["spec.serviceAccountName"] = string(pod.Spec.ServiceAccountName)
  184. podSpecificFieldsSet["status.phase"] = string(pod.Status.Phase)
  185. // TODO: add podIPs as a downward API value(s) with proper format
  186. podIP := ""
  187. if len(pod.Status.PodIPs) > 0 {
  188. podIP = string(pod.Status.PodIPs[0].IP)
  189. }
  190. podSpecificFieldsSet["status.podIP"] = podIP
  191. podSpecificFieldsSet["status.nominatedNodeName"] = string(pod.Status.NominatedNodeName)
  192. return generic.AddObjectMetaFieldsSet(podSpecificFieldsSet, &pod.ObjectMeta, true)
  193. }
  194. // ResourceGetter is an interface for retrieving resources by ResourceLocation.
  195. type ResourceGetter interface {
  196. Get(context.Context, string, *metav1.GetOptions) (runtime.Object, error)
  197. }
  198. func getPod(ctx context.Context, getter ResourceGetter, name string) (*api.Pod, error) {
  199. obj, err := getter.Get(ctx, name, &metav1.GetOptions{})
  200. if err != nil {
  201. return nil, err
  202. }
  203. pod := obj.(*api.Pod)
  204. if pod == nil {
  205. return nil, fmt.Errorf("Unexpected object type: %#v", pod)
  206. }
  207. return pod, nil
  208. }
  209. // getPodIP returns primary IP for a Pod
  210. func getPodIP(pod *api.Pod) string {
  211. if pod == nil {
  212. return ""
  213. }
  214. if len(pod.Status.PodIPs) > 0 {
  215. return pod.Status.PodIPs[0].IP
  216. }
  217. return ""
  218. }
  219. // ResourceLocation returns a URL to which one can send traffic for the specified pod.
  220. func ResourceLocation(ctx context.Context, getter ResourceGetter, rt http.RoundTripper, id string) (*url.URL, http.RoundTripper, error) {
  221. // Allow ID as "podname" or "podname:port" or "scheme:podname:port".
  222. // If port is not specified, try to use the first defined port on the pod.
  223. scheme, name, port, valid := utilnet.SplitSchemeNamePort(id)
  224. if !valid {
  225. return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid pod request %q", id))
  226. }
  227. pod, err := getPod(ctx, getter, name)
  228. if err != nil {
  229. return nil, nil, err
  230. }
  231. // Try to figure out a port.
  232. if port == "" {
  233. for i := range pod.Spec.Containers {
  234. if len(pod.Spec.Containers[i].Ports) > 0 {
  235. port = fmt.Sprintf("%d", pod.Spec.Containers[i].Ports[0].ContainerPort)
  236. break
  237. }
  238. }
  239. }
  240. podIP := getPodIP(pod)
  241. if err := proxyutil.IsProxyableIP(podIP); err != nil {
  242. return nil, nil, errors.NewBadRequest(err.Error())
  243. }
  244. loc := &url.URL{
  245. Scheme: scheme,
  246. }
  247. if port == "" {
  248. loc.Host = podIP
  249. } else {
  250. loc.Host = net.JoinHostPort(podIP, port)
  251. }
  252. return loc, rt, nil
  253. }
  254. // getContainerNames returns a formatted string containing the container names
  255. func getContainerNames(containers []api.Container) string {
  256. names := []string{}
  257. for _, c := range containers {
  258. names = append(names, c.Name)
  259. }
  260. return strings.Join(names, " ")
  261. }
  262. // LogLocation returns the log URL for a pod container. If opts.Container is blank
  263. // and only one container is present in the pod, that container is used.
  264. func LogLocation(
  265. ctx context.Context, getter ResourceGetter,
  266. connInfo client.ConnectionInfoGetter,
  267. name string,
  268. opts *api.PodLogOptions,
  269. ) (*url.URL, http.RoundTripper, error) {
  270. pod, err := getPod(ctx, getter, name)
  271. if err != nil {
  272. return nil, nil, err
  273. }
  274. // Try to figure out a container
  275. // If a container was provided, it must be valid
  276. container := opts.Container
  277. container, err = validateContainer(container, pod)
  278. if err != nil {
  279. return nil, nil, err
  280. }
  281. nodeName := types.NodeName(pod.Spec.NodeName)
  282. if len(nodeName) == 0 {
  283. // If pod has not been assigned a host, return an empty location
  284. return nil, nil, nil
  285. }
  286. nodeInfo, err := connInfo.GetConnectionInfo(ctx, nodeName)
  287. if err != nil {
  288. return nil, nil, err
  289. }
  290. params := url.Values{}
  291. if opts.Follow {
  292. params.Add("follow", "true")
  293. }
  294. if opts.Previous {
  295. params.Add("previous", "true")
  296. }
  297. if opts.Timestamps {
  298. params.Add("timestamps", "true")
  299. }
  300. if opts.SinceSeconds != nil {
  301. params.Add("sinceSeconds", strconv.FormatInt(*opts.SinceSeconds, 10))
  302. }
  303. if opts.SinceTime != nil {
  304. params.Add("sinceTime", opts.SinceTime.Format(time.RFC3339))
  305. }
  306. if opts.TailLines != nil {
  307. params.Add("tailLines", strconv.FormatInt(*opts.TailLines, 10))
  308. }
  309. if opts.LimitBytes != nil {
  310. params.Add("limitBytes", strconv.FormatInt(*opts.LimitBytes, 10))
  311. }
  312. loc := &url.URL{
  313. Scheme: nodeInfo.Scheme,
  314. Host: net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port),
  315. Path: fmt.Sprintf("/containerLogs/%s/%s/%s", pod.Namespace, pod.Name, container),
  316. RawQuery: params.Encode(),
  317. }
  318. if opts.InsecureSkipTLSVerifyBackend && utilfeature.DefaultFeatureGate.Enabled(features.AllowInsecureBackendProxy) {
  319. return loc, nodeInfo.InsecureSkipTLSVerifyTransport, nil
  320. }
  321. return loc, nodeInfo.Transport, nil
  322. }
  323. func podHasContainerWithName(pod *api.Pod, containerName string) bool {
  324. var hasContainer bool
  325. podutil.VisitContainers(&pod.Spec, func(c *api.Container) bool {
  326. if c.Name == containerName {
  327. hasContainer = true
  328. return false
  329. }
  330. return true
  331. })
  332. return hasContainer
  333. }
  334. func streamParams(params url.Values, opts runtime.Object) error {
  335. switch opts := opts.(type) {
  336. case *api.PodExecOptions:
  337. if opts.Stdin {
  338. params.Add(api.ExecStdinParam, "1")
  339. }
  340. if opts.Stdout {
  341. params.Add(api.ExecStdoutParam, "1")
  342. }
  343. if opts.Stderr {
  344. params.Add(api.ExecStderrParam, "1")
  345. }
  346. if opts.TTY {
  347. params.Add(api.ExecTTYParam, "1")
  348. }
  349. for _, c := range opts.Command {
  350. params.Add("command", c)
  351. }
  352. case *api.PodAttachOptions:
  353. if opts.Stdin {
  354. params.Add(api.ExecStdinParam, "1")
  355. }
  356. if opts.Stdout {
  357. params.Add(api.ExecStdoutParam, "1")
  358. }
  359. if opts.Stderr {
  360. params.Add(api.ExecStderrParam, "1")
  361. }
  362. if opts.TTY {
  363. params.Add(api.ExecTTYParam, "1")
  364. }
  365. case *api.PodPortForwardOptions:
  366. if len(opts.Ports) > 0 {
  367. ports := make([]string, len(opts.Ports))
  368. for i, p := range opts.Ports {
  369. ports[i] = strconv.FormatInt(int64(p), 10)
  370. }
  371. params.Add(api.PortHeader, strings.Join(ports, ","))
  372. }
  373. default:
  374. return fmt.Errorf("Unknown object for streaming: %v", opts)
  375. }
  376. return nil
  377. }
  378. // AttachLocation returns the attach URL for a pod container. If opts.Container is blank
  379. // and only one container is present in the pod, that container is used.
  380. func AttachLocation(
  381. ctx context.Context,
  382. getter ResourceGetter,
  383. connInfo client.ConnectionInfoGetter,
  384. name string,
  385. opts *api.PodAttachOptions,
  386. ) (*url.URL, http.RoundTripper, error) {
  387. return streamLocation(ctx, getter, connInfo, name, opts, opts.Container, "attach")
  388. }
  389. // ExecLocation returns the exec URL for a pod container. If opts.Container is blank
  390. // and only one container is present in the pod, that container is used.
  391. func ExecLocation(
  392. ctx context.Context,
  393. getter ResourceGetter,
  394. connInfo client.ConnectionInfoGetter,
  395. name string,
  396. opts *api.PodExecOptions,
  397. ) (*url.URL, http.RoundTripper, error) {
  398. return streamLocation(ctx, getter, connInfo, name, opts, opts.Container, "exec")
  399. }
  400. func streamLocation(
  401. ctx context.Context,
  402. getter ResourceGetter,
  403. connInfo client.ConnectionInfoGetter,
  404. name string,
  405. opts runtime.Object,
  406. container,
  407. path string,
  408. ) (*url.URL, http.RoundTripper, error) {
  409. pod, err := getPod(ctx, getter, name)
  410. if err != nil {
  411. return nil, nil, err
  412. }
  413. // Try to figure out a container
  414. // If a container was provided, it must be valid
  415. container, err = validateContainer(container, pod)
  416. if err != nil {
  417. return nil, nil, err
  418. }
  419. nodeName := types.NodeName(pod.Spec.NodeName)
  420. if len(nodeName) == 0 {
  421. // If pod has not been assigned a host, return an empty location
  422. return nil, nil, errors.NewBadRequest(fmt.Sprintf("pod %s does not have a host assigned", name))
  423. }
  424. nodeInfo, err := connInfo.GetConnectionInfo(ctx, nodeName)
  425. if err != nil {
  426. return nil, nil, err
  427. }
  428. params := url.Values{}
  429. if err := streamParams(params, opts); err != nil {
  430. return nil, nil, err
  431. }
  432. loc := &url.URL{
  433. Scheme: nodeInfo.Scheme,
  434. Host: net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port),
  435. Path: fmt.Sprintf("/%s/%s/%s/%s", path, pod.Namespace, pod.Name, container),
  436. RawQuery: params.Encode(),
  437. }
  438. return loc, nodeInfo.Transport, nil
  439. }
  440. // PortForwardLocation returns the port-forward URL for a pod.
  441. func PortForwardLocation(
  442. ctx context.Context,
  443. getter ResourceGetter,
  444. connInfo client.ConnectionInfoGetter,
  445. name string,
  446. opts *api.PodPortForwardOptions,
  447. ) (*url.URL, http.RoundTripper, error) {
  448. pod, err := getPod(ctx, getter, name)
  449. if err != nil {
  450. return nil, nil, err
  451. }
  452. nodeName := types.NodeName(pod.Spec.NodeName)
  453. if len(nodeName) == 0 {
  454. // If pod has not been assigned a host, return an empty location
  455. return nil, nil, errors.NewBadRequest(fmt.Sprintf("pod %s does not have a host assigned", name))
  456. }
  457. nodeInfo, err := connInfo.GetConnectionInfo(ctx, nodeName)
  458. if err != nil {
  459. return nil, nil, err
  460. }
  461. params := url.Values{}
  462. if err := streamParams(params, opts); err != nil {
  463. return nil, nil, err
  464. }
  465. loc := &url.URL{
  466. Scheme: nodeInfo.Scheme,
  467. Host: net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port),
  468. Path: fmt.Sprintf("/portForward/%s/%s", pod.Namespace, pod.Name),
  469. RawQuery: params.Encode(),
  470. }
  471. return loc, nodeInfo.Transport, nil
  472. }
  473. // validateContainer validate container is valid for pod, return valid container
  474. func validateContainer(container string, pod *api.Pod) (string, error) {
  475. if len(container) == 0 {
  476. switch len(pod.Spec.Containers) {
  477. case 1:
  478. container = pod.Spec.Containers[0].Name
  479. case 0:
  480. return "", errors.NewBadRequest(fmt.Sprintf("a container name must be specified for pod %s", pod.Name))
  481. default:
  482. containerNames := getContainerNames(pod.Spec.Containers)
  483. initContainerNames := getContainerNames(pod.Spec.InitContainers)
  484. err := fmt.Sprintf("a container name must be specified for pod %s, choose one of: [%s]", pod.Name, containerNames)
  485. if len(initContainerNames) > 0 {
  486. err += fmt.Sprintf(" or one of the init containers: [%s]", initContainerNames)
  487. }
  488. return "", errors.NewBadRequest(err)
  489. }
  490. } else {
  491. if !podHasContainerWithName(pod, container) {
  492. return "", errors.NewBadRequest(fmt.Sprintf("container %s is not valid for pod %s", container, pod.Name))
  493. }
  494. }
  495. return container, nil
  496. }