strategy.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package pod
  14. import (
  15. "context"
  16. "fmt"
  17. "net"
  18. "net/http"
  19. "net/url"
  20. "strconv"
  21. "strings"
  22. "time"
  23. "k8s.io/apimachinery/pkg/api/errors"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/apimachinery/pkg/fields"
  26. "k8s.io/apimachinery/pkg/labels"
  27. "k8s.io/apimachinery/pkg/runtime"
  28. "k8s.io/apimachinery/pkg/types"
  29. utilnet "k8s.io/apimachinery/pkg/util/net"
  30. "k8s.io/apimachinery/pkg/util/validation/field"
  31. "k8s.io/apiserver/pkg/registry/generic"
  32. "k8s.io/apiserver/pkg/storage"
  33. "k8s.io/apiserver/pkg/storage/names"
  34. "k8s.io/kubernetes/pkg/api/legacyscheme"
  35. podutil "k8s.io/kubernetes/pkg/api/pod"
  36. api "k8s.io/kubernetes/pkg/apis/core"
  37. "k8s.io/kubernetes/pkg/apis/core/helper/qos"
  38. "k8s.io/kubernetes/pkg/apis/core/validation"
  39. "k8s.io/kubernetes/pkg/kubelet/client"
  40. proxyutil "k8s.io/kubernetes/pkg/proxy/util"
  41. )
  42. // podStrategy implements behavior for Pods
  43. type podStrategy struct {
  44. runtime.ObjectTyper
  45. names.NameGenerator
  46. }
  47. // Strategy is the default logic that applies when creating and updating Pod
  48. // objects via the REST API.
  49. var Strategy = podStrategy{legacyscheme.Scheme, names.SimpleNameGenerator}
  50. // NamespaceScoped is true for pods.
  51. func (podStrategy) NamespaceScoped() bool {
  52. return true
  53. }
  54. // PrepareForCreate clears fields that are not allowed to be set by end users on creation.
  55. func (podStrategy) PrepareForCreate(ctx context.Context, obj runtime.Object) {
  56. pod := obj.(*api.Pod)
  57. pod.Status = api.PodStatus{
  58. Phase: api.PodPending,
  59. QOSClass: qos.GetPodQOS(pod),
  60. }
  61. podutil.DropDisabledPodFields(pod, nil)
  62. }
  63. // PrepareForUpdate clears fields that are not allowed to be set by end users on update.
  64. func (podStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) {
  65. newPod := obj.(*api.Pod)
  66. oldPod := old.(*api.Pod)
  67. newPod.Status = oldPod.Status
  68. podutil.DropDisabledPodFields(newPod, oldPod)
  69. }
  70. // Validate validates a new pod.
  71. func (podStrategy) Validate(ctx context.Context, obj runtime.Object) field.ErrorList {
  72. pod := obj.(*api.Pod)
  73. allErrs := validation.ValidatePod(pod)
  74. allErrs = append(allErrs, validation.ValidateConditionalPod(pod, nil, field.NewPath(""))...)
  75. return allErrs
  76. }
  77. // Canonicalize normalizes the object after validation.
  78. func (podStrategy) Canonicalize(obj runtime.Object) {
  79. }
  80. // AllowCreateOnUpdate is false for pods.
  81. func (podStrategy) AllowCreateOnUpdate() bool {
  82. return false
  83. }
  84. // ValidateUpdate is the default update validation for an end user.
  85. func (podStrategy) ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList {
  86. errorList := validation.ValidatePod(obj.(*api.Pod))
  87. errorList = append(errorList, validation.ValidatePodUpdate(obj.(*api.Pod), old.(*api.Pod))...)
  88. errorList = append(errorList, validation.ValidateConditionalPod(obj.(*api.Pod), old.(*api.Pod), field.NewPath(""))...)
  89. return errorList
  90. }
  91. // AllowUnconditionalUpdate allows pods to be overwritten
  92. func (podStrategy) AllowUnconditionalUpdate() bool {
  93. return true
  94. }
  95. // CheckGracefulDelete allows a pod to be gracefully deleted. It updates the DeleteOptions to
  96. // reflect the desired grace value.
  97. func (podStrategy) CheckGracefulDelete(ctx context.Context, obj runtime.Object, options *metav1.DeleteOptions) bool {
  98. if options == nil {
  99. return false
  100. }
  101. pod := obj.(*api.Pod)
  102. period := int64(0)
  103. // user has specified a value
  104. if options.GracePeriodSeconds != nil {
  105. period = *options.GracePeriodSeconds
  106. } else {
  107. // use the default value if set, or deletes the pod immediately (0)
  108. if pod.Spec.TerminationGracePeriodSeconds != nil {
  109. period = *pod.Spec.TerminationGracePeriodSeconds
  110. }
  111. }
  112. // if the pod is not scheduled, delete immediately
  113. if len(pod.Spec.NodeName) == 0 {
  114. period = 0
  115. }
  116. // if the pod is already terminated, delete immediately
  117. if pod.Status.Phase == api.PodFailed || pod.Status.Phase == api.PodSucceeded {
  118. period = 0
  119. }
  120. // ensure the options and the pod are in sync
  121. options.GracePeriodSeconds = &period
  122. return true
  123. }
  124. type podStrategyWithoutGraceful struct {
  125. podStrategy
  126. }
  127. // CheckGracefulDelete prohibits graceful deletion.
  128. func (podStrategyWithoutGraceful) CheckGracefulDelete(ctx context.Context, obj runtime.Object, options *metav1.DeleteOptions) bool {
  129. return false
  130. }
  131. // StrategyWithoutGraceful implements the legacy instant delele behavior.
  132. var StrategyWithoutGraceful = podStrategyWithoutGraceful{Strategy}
  133. type podStatusStrategy struct {
  134. podStrategy
  135. }
  136. var StatusStrategy = podStatusStrategy{Strategy}
  137. func (podStatusStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) {
  138. newPod := obj.(*api.Pod)
  139. oldPod := old.(*api.Pod)
  140. newPod.Spec = oldPod.Spec
  141. newPod.DeletionTimestamp = nil
  142. // don't allow the pods/status endpoint to touch owner references since old kubelets corrupt them in a way
  143. // that breaks garbage collection
  144. newPod.OwnerReferences = oldPod.OwnerReferences
  145. }
  146. func (podStatusStrategy) ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList {
  147. return validation.ValidatePodStatusUpdate(obj.(*api.Pod), old.(*api.Pod))
  148. }
  149. // GetAttrs returns labels and fields of a given object for filtering purposes.
  150. func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
  151. pod, ok := obj.(*api.Pod)
  152. if !ok {
  153. return nil, nil, fmt.Errorf("not a pod")
  154. }
  155. return labels.Set(pod.ObjectMeta.Labels), PodToSelectableFields(pod), nil
  156. }
  157. // MatchPod returns a generic matcher for a given label and field selector.
  158. func MatchPod(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
  159. return storage.SelectionPredicate{
  160. Label: label,
  161. Field: field,
  162. GetAttrs: GetAttrs,
  163. IndexFields: []string{"spec.nodeName"},
  164. }
  165. }
  166. func NodeNameTriggerFunc(obj runtime.Object) []storage.MatchValue {
  167. pod := obj.(*api.Pod)
  168. result := storage.MatchValue{IndexName: "spec.nodeName", Value: pod.Spec.NodeName}
  169. return []storage.MatchValue{result}
  170. }
  171. // PodToSelectableFields returns a field set that represents the object
  172. // TODO: fields are not labels, and the validation rules for them do not apply.
  173. func PodToSelectableFields(pod *api.Pod) fields.Set {
  174. // The purpose of allocation with a given number of elements is to reduce
  175. // amount of allocations needed to create the fields.Set. If you add any
  176. // field here or the number of object-meta related fields changes, this should
  177. // be adjusted.
  178. podSpecificFieldsSet := make(fields.Set, 9)
  179. podSpecificFieldsSet["spec.nodeName"] = pod.Spec.NodeName
  180. podSpecificFieldsSet["spec.restartPolicy"] = string(pod.Spec.RestartPolicy)
  181. podSpecificFieldsSet["spec.schedulerName"] = string(pod.Spec.SchedulerName)
  182. podSpecificFieldsSet["spec.serviceAccountName"] = string(pod.Spec.ServiceAccountName)
  183. podSpecificFieldsSet["status.phase"] = string(pod.Status.Phase)
  184. podSpecificFieldsSet["status.podIP"] = string(pod.Status.PodIP)
  185. podSpecificFieldsSet["status.nominatedNodeName"] = string(pod.Status.NominatedNodeName)
  186. return generic.AddObjectMetaFieldsSet(podSpecificFieldsSet, &pod.ObjectMeta, true)
  187. }
  188. // ResourceGetter is an interface for retrieving resources by ResourceLocation.
  189. type ResourceGetter interface {
  190. Get(context.Context, string, *metav1.GetOptions) (runtime.Object, error)
  191. }
  192. func getPod(getter ResourceGetter, ctx context.Context, name string) (*api.Pod, error) {
  193. obj, err := getter.Get(ctx, name, &metav1.GetOptions{})
  194. if err != nil {
  195. return nil, err
  196. }
  197. pod := obj.(*api.Pod)
  198. if pod == nil {
  199. return nil, fmt.Errorf("Unexpected object type: %#v", pod)
  200. }
  201. return pod, nil
  202. }
  203. // ResourceLocation returns a URL to which one can send traffic for the specified pod.
  204. func ResourceLocation(getter ResourceGetter, rt http.RoundTripper, ctx context.Context, id string) (*url.URL, http.RoundTripper, error) {
  205. // Allow ID as "podname" or "podname:port" or "scheme:podname:port".
  206. // If port is not specified, try to use the first defined port on the pod.
  207. scheme, name, port, valid := utilnet.SplitSchemeNamePort(id)
  208. if !valid {
  209. return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid pod request %q", id))
  210. }
  211. // TODO: if port is not a number but a "(container)/(portname)", do a name lookup.
  212. pod, err := getPod(getter, ctx, name)
  213. if err != nil {
  214. return nil, nil, err
  215. }
  216. // Try to figure out a port.
  217. if port == "" {
  218. for i := range pod.Spec.Containers {
  219. if len(pod.Spec.Containers[i].Ports) > 0 {
  220. port = fmt.Sprintf("%d", pod.Spec.Containers[i].Ports[0].ContainerPort)
  221. break
  222. }
  223. }
  224. }
  225. if err := proxyutil.IsProxyableIP(pod.Status.PodIP); err != nil {
  226. return nil, nil, errors.NewBadRequest(err.Error())
  227. }
  228. loc := &url.URL{
  229. Scheme: scheme,
  230. }
  231. if port == "" {
  232. loc.Host = pod.Status.PodIP
  233. } else {
  234. loc.Host = net.JoinHostPort(pod.Status.PodIP, port)
  235. }
  236. return loc, rt, nil
  237. }
  238. // getContainerNames returns a formatted string containing the container names
  239. func getContainerNames(containers []api.Container) string {
  240. names := []string{}
  241. for _, c := range containers {
  242. names = append(names, c.Name)
  243. }
  244. return strings.Join(names, " ")
  245. }
  246. // LogLocation returns the log URL for a pod container. If opts.Container is blank
  247. // and only one container is present in the pod, that container is used.
  248. func LogLocation(
  249. getter ResourceGetter,
  250. connInfo client.ConnectionInfoGetter,
  251. ctx context.Context,
  252. name string,
  253. opts *api.PodLogOptions,
  254. ) (*url.URL, http.RoundTripper, error) {
  255. pod, err := getPod(getter, ctx, name)
  256. if err != nil {
  257. return nil, nil, err
  258. }
  259. // Try to figure out a container
  260. // If a container was provided, it must be valid
  261. container := opts.Container
  262. if len(container) == 0 {
  263. switch len(pod.Spec.Containers) {
  264. case 1:
  265. container = pod.Spec.Containers[0].Name
  266. case 0:
  267. return nil, nil, errors.NewBadRequest(fmt.Sprintf("a container name must be specified for pod %s", name))
  268. default:
  269. containerNames := getContainerNames(pod.Spec.Containers)
  270. initContainerNames := getContainerNames(pod.Spec.InitContainers)
  271. err := fmt.Sprintf("a container name must be specified for pod %s, choose one of: [%s]", name, containerNames)
  272. if len(initContainerNames) > 0 {
  273. err += fmt.Sprintf(" or one of the init containers: [%s]", initContainerNames)
  274. }
  275. return nil, nil, errors.NewBadRequest(err)
  276. }
  277. } else {
  278. if !podHasContainerWithName(pod, container) {
  279. return nil, nil, errors.NewBadRequest(fmt.Sprintf("container %s is not valid for pod %s", container, name))
  280. }
  281. }
  282. nodeName := types.NodeName(pod.Spec.NodeName)
  283. if len(nodeName) == 0 {
  284. // If pod has not been assigned a host, return an empty location
  285. return nil, nil, nil
  286. }
  287. nodeInfo, err := connInfo.GetConnectionInfo(ctx, nodeName)
  288. if err != nil {
  289. return nil, nil, err
  290. }
  291. params := url.Values{}
  292. if opts.Follow {
  293. params.Add("follow", "true")
  294. }
  295. if opts.Previous {
  296. params.Add("previous", "true")
  297. }
  298. if opts.Timestamps {
  299. params.Add("timestamps", "true")
  300. }
  301. if opts.SinceSeconds != nil {
  302. params.Add("sinceSeconds", strconv.FormatInt(*opts.SinceSeconds, 10))
  303. }
  304. if opts.SinceTime != nil {
  305. params.Add("sinceTime", opts.SinceTime.Format(time.RFC3339))
  306. }
  307. if opts.TailLines != nil {
  308. params.Add("tailLines", strconv.FormatInt(*opts.TailLines, 10))
  309. }
  310. if opts.LimitBytes != nil {
  311. params.Add("limitBytes", strconv.FormatInt(*opts.LimitBytes, 10))
  312. }
  313. loc := &url.URL{
  314. Scheme: nodeInfo.Scheme,
  315. Host: net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port),
  316. Path: fmt.Sprintf("/containerLogs/%s/%s/%s", pod.Namespace, pod.Name, container),
  317. RawQuery: params.Encode(),
  318. }
  319. return loc, nodeInfo.Transport, nil
  320. }
  321. func podHasContainerWithName(pod *api.Pod, containerName string) bool {
  322. for _, c := range pod.Spec.Containers {
  323. if c.Name == containerName {
  324. return true
  325. }
  326. }
  327. for _, c := range pod.Spec.InitContainers {
  328. if c.Name == containerName {
  329. return true
  330. }
  331. }
  332. return false
  333. }
  334. func streamParams(params url.Values, opts runtime.Object) error {
  335. switch opts := opts.(type) {
  336. case *api.PodExecOptions:
  337. if opts.Stdin {
  338. params.Add(api.ExecStdinParam, "1")
  339. }
  340. if opts.Stdout {
  341. params.Add(api.ExecStdoutParam, "1")
  342. }
  343. if opts.Stderr {
  344. params.Add(api.ExecStderrParam, "1")
  345. }
  346. if opts.TTY {
  347. params.Add(api.ExecTTYParam, "1")
  348. }
  349. for _, c := range opts.Command {
  350. params.Add("command", c)
  351. }
  352. case *api.PodAttachOptions:
  353. if opts.Stdin {
  354. params.Add(api.ExecStdinParam, "1")
  355. }
  356. if opts.Stdout {
  357. params.Add(api.ExecStdoutParam, "1")
  358. }
  359. if opts.Stderr {
  360. params.Add(api.ExecStderrParam, "1")
  361. }
  362. if opts.TTY {
  363. params.Add(api.ExecTTYParam, "1")
  364. }
  365. case *api.PodPortForwardOptions:
  366. if len(opts.Ports) > 0 {
  367. ports := make([]string, len(opts.Ports))
  368. for i, p := range opts.Ports {
  369. ports[i] = strconv.FormatInt(int64(p), 10)
  370. }
  371. params.Add(api.PortHeader, strings.Join(ports, ","))
  372. }
  373. default:
  374. return fmt.Errorf("Unknown object for streaming: %v", opts)
  375. }
  376. return nil
  377. }
  378. // AttachLocation returns the attach URL for a pod container. If opts.Container is blank
  379. // and only one container is present in the pod, that container is used.
  380. func AttachLocation(
  381. getter ResourceGetter,
  382. connInfo client.ConnectionInfoGetter,
  383. ctx context.Context,
  384. name string,
  385. opts *api.PodAttachOptions,
  386. ) (*url.URL, http.RoundTripper, error) {
  387. return streamLocation(getter, connInfo, ctx, name, opts, opts.Container, "attach")
  388. }
  389. // ExecLocation returns the exec URL for a pod container. If opts.Container is blank
  390. // and only one container is present in the pod, that container is used.
  391. func ExecLocation(
  392. getter ResourceGetter,
  393. connInfo client.ConnectionInfoGetter,
  394. ctx context.Context,
  395. name string,
  396. opts *api.PodExecOptions,
  397. ) (*url.URL, http.RoundTripper, error) {
  398. return streamLocation(getter, connInfo, ctx, name, opts, opts.Container, "exec")
  399. }
  400. func streamLocation(
  401. getter ResourceGetter,
  402. connInfo client.ConnectionInfoGetter,
  403. ctx context.Context,
  404. name string,
  405. opts runtime.Object,
  406. container,
  407. path string,
  408. ) (*url.URL, http.RoundTripper, error) {
  409. pod, err := getPod(getter, ctx, name)
  410. if err != nil {
  411. return nil, nil, err
  412. }
  413. // Try to figure out a container
  414. // If a container was provided, it must be valid
  415. if container == "" {
  416. switch len(pod.Spec.Containers) {
  417. case 1:
  418. container = pod.Spec.Containers[0].Name
  419. case 0:
  420. return nil, nil, errors.NewBadRequest(fmt.Sprintf("a container name must be specified for pod %s", name))
  421. default:
  422. containerNames := getContainerNames(pod.Spec.Containers)
  423. initContainerNames := getContainerNames(pod.Spec.InitContainers)
  424. err := fmt.Sprintf("a container name must be specified for pod %s, choose one of: [%s]", name, containerNames)
  425. if len(initContainerNames) > 0 {
  426. err += fmt.Sprintf(" or one of the init containers: [%s]", initContainerNames)
  427. }
  428. return nil, nil, errors.NewBadRequest(err)
  429. }
  430. } else {
  431. if !podHasContainerWithName(pod, container) {
  432. return nil, nil, errors.NewBadRequest(fmt.Sprintf("container %s is not valid for pod %s", container, name))
  433. }
  434. }
  435. nodeName := types.NodeName(pod.Spec.NodeName)
  436. if len(nodeName) == 0 {
  437. // If pod has not been assigned a host, return an empty location
  438. return nil, nil, errors.NewBadRequest(fmt.Sprintf("pod %s does not have a host assigned", name))
  439. }
  440. nodeInfo, err := connInfo.GetConnectionInfo(ctx, nodeName)
  441. if err != nil {
  442. return nil, nil, err
  443. }
  444. params := url.Values{}
  445. if err := streamParams(params, opts); err != nil {
  446. return nil, nil, err
  447. }
  448. loc := &url.URL{
  449. Scheme: nodeInfo.Scheme,
  450. Host: net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port),
  451. Path: fmt.Sprintf("/%s/%s/%s/%s", path, pod.Namespace, pod.Name, container),
  452. RawQuery: params.Encode(),
  453. }
  454. return loc, nodeInfo.Transport, nil
  455. }
  456. // PortForwardLocation returns the port-forward URL for a pod.
  457. func PortForwardLocation(
  458. getter ResourceGetter,
  459. connInfo client.ConnectionInfoGetter,
  460. ctx context.Context,
  461. name string,
  462. opts *api.PodPortForwardOptions,
  463. ) (*url.URL, http.RoundTripper, error) {
  464. pod, err := getPod(getter, ctx, name)
  465. if err != nil {
  466. return nil, nil, err
  467. }
  468. nodeName := types.NodeName(pod.Spec.NodeName)
  469. if len(nodeName) == 0 {
  470. // If pod has not been assigned a host, return an empty location
  471. return nil, nil, errors.NewBadRequest(fmt.Sprintf("pod %s does not have a host assigned", name))
  472. }
  473. nodeInfo, err := connInfo.GetConnectionInfo(ctx, nodeName)
  474. if err != nil {
  475. return nil, nil, err
  476. }
  477. params := url.Values{}
  478. if err := streamParams(params, opts); err != nil {
  479. return nil, nil, err
  480. }
  481. loc := &url.URL{
  482. Scheme: nodeInfo.Scheme,
  483. Host: net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port),
  484. Path: fmt.Sprintf("/portForward/%s/%s", pod.Namespace, pod.Name),
  485. RawQuery: params.Encode(),
  486. }
  487. return loc, nodeInfo.Transport, nil
  488. }