strategy.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package pod
  14. import (
  15. "context"
  16. "fmt"
  17. "net"
  18. "net/http"
  19. "net/url"
  20. "strconv"
  21. "strings"
  22. "time"
  23. "k8s.io/apimachinery/pkg/api/errors"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/apimachinery/pkg/fields"
  26. "k8s.io/apimachinery/pkg/labels"
  27. "k8s.io/apimachinery/pkg/runtime"
  28. "k8s.io/apimachinery/pkg/types"
  29. utilnet "k8s.io/apimachinery/pkg/util/net"
  30. "k8s.io/apimachinery/pkg/util/validation/field"
  31. genericfeatures "k8s.io/apiserver/pkg/features"
  32. "k8s.io/apiserver/pkg/registry/generic"
  33. "k8s.io/apiserver/pkg/storage"
  34. "k8s.io/apiserver/pkg/storage/names"
  35. utilfeature "k8s.io/apiserver/pkg/util/feature"
  36. "k8s.io/client-go/tools/cache"
  37. "k8s.io/kubernetes/pkg/api/legacyscheme"
  38. podutil "k8s.io/kubernetes/pkg/api/pod"
  39. api "k8s.io/kubernetes/pkg/apis/core"
  40. "k8s.io/kubernetes/pkg/apis/core/helper/qos"
  41. "k8s.io/kubernetes/pkg/apis/core/validation"
  42. "k8s.io/kubernetes/pkg/features"
  43. "k8s.io/kubernetes/pkg/kubelet/client"
  44. proxyutil "k8s.io/kubernetes/pkg/proxy/util"
  45. )
  46. // podStrategy implements behavior for Pods
  47. type podStrategy struct {
  48. runtime.ObjectTyper
  49. names.NameGenerator
  50. }
  51. // Strategy is the default logic that applies when creating and updating Pod
  52. // objects via the REST API.
  53. var Strategy = podStrategy{legacyscheme.Scheme, names.SimpleNameGenerator}
  54. // NamespaceScoped is true for pods.
  55. func (podStrategy) NamespaceScoped() bool {
  56. return true
  57. }
  58. // PrepareForCreate clears fields that are not allowed to be set by end users on creation.
  59. func (podStrategy) PrepareForCreate(ctx context.Context, obj runtime.Object) {
  60. pod := obj.(*api.Pod)
  61. pod.Status = api.PodStatus{
  62. Phase: api.PodPending,
  63. QOSClass: qos.GetPodQOS(pod),
  64. }
  65. podutil.DropDisabledPodFields(pod, nil)
  66. }
  67. // PrepareForUpdate clears fields that are not allowed to be set by end users on update.
  68. func (podStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) {
  69. newPod := obj.(*api.Pod)
  70. oldPod := old.(*api.Pod)
  71. newPod.Status = oldPod.Status
  72. podutil.DropDisabledPodFields(newPod, oldPod)
  73. }
  74. // Validate validates a new pod.
  75. func (podStrategy) Validate(ctx context.Context, obj runtime.Object) field.ErrorList {
  76. pod := obj.(*api.Pod)
  77. allErrs := validation.ValidatePodCreate(pod)
  78. allErrs = append(allErrs, validation.ValidateConditionalPod(pod, nil, field.NewPath(""))...)
  79. return allErrs
  80. }
  81. // Canonicalize normalizes the object after validation.
  82. func (podStrategy) Canonicalize(obj runtime.Object) {
  83. }
  84. // AllowCreateOnUpdate is false for pods.
  85. func (podStrategy) AllowCreateOnUpdate() bool {
  86. return false
  87. }
  88. // ValidateUpdate is the default update validation for an end user.
  89. func (podStrategy) ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList {
  90. errorList := validation.ValidatePod(obj.(*api.Pod))
  91. errorList = append(errorList, validation.ValidatePodUpdate(obj.(*api.Pod), old.(*api.Pod))...)
  92. errorList = append(errorList, validation.ValidateConditionalPod(obj.(*api.Pod), old.(*api.Pod), field.NewPath(""))...)
  93. return errorList
  94. }
  95. // AllowUnconditionalUpdate allows pods to be overwritten
  96. func (podStrategy) AllowUnconditionalUpdate() bool {
  97. return true
  98. }
  99. // CheckGracefulDelete allows a pod to be gracefully deleted. It updates the DeleteOptions to
  100. // reflect the desired grace value.
  101. func (podStrategy) CheckGracefulDelete(ctx context.Context, obj runtime.Object, options *metav1.DeleteOptions) bool {
  102. if options == nil {
  103. return false
  104. }
  105. pod := obj.(*api.Pod)
  106. period := int64(0)
  107. // user has specified a value
  108. if options.GracePeriodSeconds != nil {
  109. period = *options.GracePeriodSeconds
  110. } else {
  111. // use the default value if set, or deletes the pod immediately (0)
  112. if pod.Spec.TerminationGracePeriodSeconds != nil {
  113. period = *pod.Spec.TerminationGracePeriodSeconds
  114. }
  115. }
  116. // if the pod is not scheduled, delete immediately
  117. if len(pod.Spec.NodeName) == 0 {
  118. period = 0
  119. }
  120. // if the pod is already terminated, delete immediately
  121. if pod.Status.Phase == api.PodFailed || pod.Status.Phase == api.PodSucceeded {
  122. period = 0
  123. }
  124. // ensure the options and the pod are in sync
  125. options.GracePeriodSeconds = &period
  126. return true
  127. }
  128. type podStatusStrategy struct {
  129. podStrategy
  130. }
  131. // StatusStrategy wraps and exports the used podStrategy for the storage package.
  132. var StatusStrategy = podStatusStrategy{Strategy}
  133. func (podStatusStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) {
  134. newPod := obj.(*api.Pod)
  135. oldPod := old.(*api.Pod)
  136. newPod.Spec = oldPod.Spec
  137. newPod.DeletionTimestamp = nil
  138. // don't allow the pods/status endpoint to touch owner references since old kubelets corrupt them in a way
  139. // that breaks garbage collection
  140. newPod.OwnerReferences = oldPod.OwnerReferences
  141. }
  142. func (podStatusStrategy) ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList {
  143. return validation.ValidatePodStatusUpdate(obj.(*api.Pod), old.(*api.Pod))
  144. }
  145. type podEphemeralContainersStrategy struct {
  146. podStrategy
  147. }
  148. // EphemeralContainersStrategy wraps and exports the used podStrategy for the storage package.
  149. var EphemeralContainersStrategy = podEphemeralContainersStrategy{Strategy}
  150. func (podEphemeralContainersStrategy) ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList {
  151. return validation.ValidatePodEphemeralContainersUpdate(obj.(*api.Pod), old.(*api.Pod))
  152. }
  153. // GetAttrs returns labels and fields of a given object for filtering purposes.
  154. func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
  155. pod, ok := obj.(*api.Pod)
  156. if !ok {
  157. return nil, nil, fmt.Errorf("not a pod")
  158. }
  159. return labels.Set(pod.ObjectMeta.Labels), ToSelectableFields(pod), nil
  160. }
  161. // MatchPod returns a generic matcher for a given label and field selector.
  162. func MatchPod(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
  163. return storage.SelectionPredicate{
  164. Label: label,
  165. Field: field,
  166. GetAttrs: GetAttrs,
  167. IndexFields: []string{"spec.nodeName"},
  168. }
  169. }
  170. // NodeNameTriggerFunc returns value spec.nodename of given object.
  171. func NodeNameTriggerFunc(obj runtime.Object) string {
  172. return obj.(*api.Pod).Spec.NodeName
  173. }
  174. // NodeNameIndexFunc return value spec.nodename of given object.
  175. func NodeNameIndexFunc(obj interface{}) ([]string, error) {
  176. pod, ok := obj.(*api.Pod)
  177. if !ok {
  178. return nil, fmt.Errorf("not a pod")
  179. }
  180. return []string{pod.Spec.NodeName}, nil
  181. }
  182. // Indexers returns the indexers for pod storage.
  183. func Indexers() *cache.Indexers {
  184. if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.SelectorIndex) {
  185. return &cache.Indexers{
  186. storage.FieldIndex("spec.nodeName"): NodeNameIndexFunc,
  187. }
  188. }
  189. return nil
  190. }
  191. // ToSelectableFields returns a field set that represents the object
  192. // TODO: fields are not labels, and the validation rules for them do not apply.
  193. func ToSelectableFields(pod *api.Pod) fields.Set {
  194. // The purpose of allocation with a given number of elements is to reduce
  195. // amount of allocations needed to create the fields.Set. If you add any
  196. // field here or the number of object-meta related fields changes, this should
  197. // be adjusted.
  198. podSpecificFieldsSet := make(fields.Set, 9)
  199. podSpecificFieldsSet["spec.nodeName"] = pod.Spec.NodeName
  200. podSpecificFieldsSet["spec.restartPolicy"] = string(pod.Spec.RestartPolicy)
  201. podSpecificFieldsSet["spec.schedulerName"] = string(pod.Spec.SchedulerName)
  202. podSpecificFieldsSet["spec.serviceAccountName"] = string(pod.Spec.ServiceAccountName)
  203. podSpecificFieldsSet["status.phase"] = string(pod.Status.Phase)
  204. // TODO: add podIPs as a downward API value(s) with proper format
  205. podIP := ""
  206. if len(pod.Status.PodIPs) > 0 {
  207. podIP = string(pod.Status.PodIPs[0].IP)
  208. }
  209. podSpecificFieldsSet["status.podIP"] = podIP
  210. podSpecificFieldsSet["status.nominatedNodeName"] = string(pod.Status.NominatedNodeName)
  211. return generic.AddObjectMetaFieldsSet(podSpecificFieldsSet, &pod.ObjectMeta, true)
  212. }
  213. // ResourceGetter is an interface for retrieving resources by ResourceLocation.
  214. type ResourceGetter interface {
  215. Get(context.Context, string, *metav1.GetOptions) (runtime.Object, error)
  216. }
  217. func getPod(ctx context.Context, getter ResourceGetter, name string) (*api.Pod, error) {
  218. obj, err := getter.Get(ctx, name, &metav1.GetOptions{})
  219. if err != nil {
  220. return nil, err
  221. }
  222. pod := obj.(*api.Pod)
  223. if pod == nil {
  224. return nil, fmt.Errorf("Unexpected object type: %#v", pod)
  225. }
  226. return pod, nil
  227. }
  228. // getPodIP returns primary IP for a Pod
  229. func getPodIP(pod *api.Pod) string {
  230. if pod == nil {
  231. return ""
  232. }
  233. if len(pod.Status.PodIPs) > 0 {
  234. return pod.Status.PodIPs[0].IP
  235. }
  236. return ""
  237. }
  238. // ResourceLocation returns a URL to which one can send traffic for the specified pod.
  239. func ResourceLocation(ctx context.Context, getter ResourceGetter, rt http.RoundTripper, id string) (*url.URL, http.RoundTripper, error) {
  240. // Allow ID as "podname" or "podname:port" or "scheme:podname:port".
  241. // If port is not specified, try to use the first defined port on the pod.
  242. scheme, name, port, valid := utilnet.SplitSchemeNamePort(id)
  243. if !valid {
  244. return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid pod request %q", id))
  245. }
  246. pod, err := getPod(ctx, getter, name)
  247. if err != nil {
  248. return nil, nil, err
  249. }
  250. // Try to figure out a port.
  251. if port == "" {
  252. for i := range pod.Spec.Containers {
  253. if len(pod.Spec.Containers[i].Ports) > 0 {
  254. port = fmt.Sprintf("%d", pod.Spec.Containers[i].Ports[0].ContainerPort)
  255. break
  256. }
  257. }
  258. }
  259. podIP := getPodIP(pod)
  260. if err := proxyutil.IsProxyableIP(podIP); err != nil {
  261. return nil, nil, errors.NewBadRequest(err.Error())
  262. }
  263. loc := &url.URL{
  264. Scheme: scheme,
  265. }
  266. if port == "" {
  267. loc.Host = podIP
  268. } else {
  269. loc.Host = net.JoinHostPort(podIP, port)
  270. }
  271. return loc, rt, nil
  272. }
  273. // LogLocation returns the log URL for a pod container. If opts.Container is blank
  274. // and only one container is present in the pod, that container is used.
  275. func LogLocation(
  276. ctx context.Context, getter ResourceGetter,
  277. connInfo client.ConnectionInfoGetter,
  278. name string,
  279. opts *api.PodLogOptions,
  280. ) (*url.URL, http.RoundTripper, error) {
  281. pod, err := getPod(ctx, getter, name)
  282. if err != nil {
  283. return nil, nil, err
  284. }
  285. // Try to figure out a container
  286. // If a container was provided, it must be valid
  287. container := opts.Container
  288. container, err = validateContainer(container, pod)
  289. if err != nil {
  290. return nil, nil, err
  291. }
  292. nodeName := types.NodeName(pod.Spec.NodeName)
  293. if len(nodeName) == 0 {
  294. // If pod has not been assigned a host, return an empty location
  295. return nil, nil, nil
  296. }
  297. nodeInfo, err := connInfo.GetConnectionInfo(ctx, nodeName)
  298. if err != nil {
  299. return nil, nil, err
  300. }
  301. params := url.Values{}
  302. if opts.Follow {
  303. params.Add("follow", "true")
  304. }
  305. if opts.Previous {
  306. params.Add("previous", "true")
  307. }
  308. if opts.Timestamps {
  309. params.Add("timestamps", "true")
  310. }
  311. if opts.SinceSeconds != nil {
  312. params.Add("sinceSeconds", strconv.FormatInt(*opts.SinceSeconds, 10))
  313. }
  314. if opts.SinceTime != nil {
  315. params.Add("sinceTime", opts.SinceTime.Format(time.RFC3339))
  316. }
  317. if opts.TailLines != nil {
  318. params.Add("tailLines", strconv.FormatInt(*opts.TailLines, 10))
  319. }
  320. if opts.LimitBytes != nil {
  321. params.Add("limitBytes", strconv.FormatInt(*opts.LimitBytes, 10))
  322. }
  323. loc := &url.URL{
  324. Scheme: nodeInfo.Scheme,
  325. Host: net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port),
  326. Path: fmt.Sprintf("/containerLogs/%s/%s/%s", pod.Namespace, pod.Name, container),
  327. RawQuery: params.Encode(),
  328. }
  329. if opts.InsecureSkipTLSVerifyBackend && utilfeature.DefaultFeatureGate.Enabled(features.AllowInsecureBackendProxy) {
  330. return loc, nodeInfo.InsecureSkipTLSVerifyTransport, nil
  331. }
  332. return loc, nodeInfo.Transport, nil
  333. }
  334. func podHasContainerWithName(pod *api.Pod, containerName string) bool {
  335. var hasContainer bool
  336. podutil.VisitContainers(&pod.Spec, func(c *api.Container) bool {
  337. if c.Name == containerName {
  338. hasContainer = true
  339. return false
  340. }
  341. return true
  342. })
  343. return hasContainer
  344. }
  345. func streamParams(params url.Values, opts runtime.Object) error {
  346. switch opts := opts.(type) {
  347. case *api.PodExecOptions:
  348. if opts.Stdin {
  349. params.Add(api.ExecStdinParam, "1")
  350. }
  351. if opts.Stdout {
  352. params.Add(api.ExecStdoutParam, "1")
  353. }
  354. if opts.Stderr {
  355. params.Add(api.ExecStderrParam, "1")
  356. }
  357. if opts.TTY {
  358. params.Add(api.ExecTTYParam, "1")
  359. }
  360. for _, c := range opts.Command {
  361. params.Add("command", c)
  362. }
  363. case *api.PodAttachOptions:
  364. if opts.Stdin {
  365. params.Add(api.ExecStdinParam, "1")
  366. }
  367. if opts.Stdout {
  368. params.Add(api.ExecStdoutParam, "1")
  369. }
  370. if opts.Stderr {
  371. params.Add(api.ExecStderrParam, "1")
  372. }
  373. if opts.TTY {
  374. params.Add(api.ExecTTYParam, "1")
  375. }
  376. case *api.PodPortForwardOptions:
  377. if len(opts.Ports) > 0 {
  378. ports := make([]string, len(opts.Ports))
  379. for i, p := range opts.Ports {
  380. ports[i] = strconv.FormatInt(int64(p), 10)
  381. }
  382. params.Add(api.PortHeader, strings.Join(ports, ","))
  383. }
  384. default:
  385. return fmt.Errorf("Unknown object for streaming: %v", opts)
  386. }
  387. return nil
  388. }
  389. // AttachLocation returns the attach URL for a pod container. If opts.Container is blank
  390. // and only one container is present in the pod, that container is used.
  391. func AttachLocation(
  392. ctx context.Context,
  393. getter ResourceGetter,
  394. connInfo client.ConnectionInfoGetter,
  395. name string,
  396. opts *api.PodAttachOptions,
  397. ) (*url.URL, http.RoundTripper, error) {
  398. return streamLocation(ctx, getter, connInfo, name, opts, opts.Container, "attach")
  399. }
  400. // ExecLocation returns the exec URL for a pod container. If opts.Container is blank
  401. // and only one container is present in the pod, that container is used.
  402. func ExecLocation(
  403. ctx context.Context,
  404. getter ResourceGetter,
  405. connInfo client.ConnectionInfoGetter,
  406. name string,
  407. opts *api.PodExecOptions,
  408. ) (*url.URL, http.RoundTripper, error) {
  409. return streamLocation(ctx, getter, connInfo, name, opts, opts.Container, "exec")
  410. }
  411. func streamLocation(
  412. ctx context.Context,
  413. getter ResourceGetter,
  414. connInfo client.ConnectionInfoGetter,
  415. name string,
  416. opts runtime.Object,
  417. container,
  418. path string,
  419. ) (*url.URL, http.RoundTripper, error) {
  420. pod, err := getPod(ctx, getter, name)
  421. if err != nil {
  422. return nil, nil, err
  423. }
  424. // Try to figure out a container
  425. // If a container was provided, it must be valid
  426. container, err = validateContainer(container, pod)
  427. if err != nil {
  428. return nil, nil, err
  429. }
  430. nodeName := types.NodeName(pod.Spec.NodeName)
  431. if len(nodeName) == 0 {
  432. // If pod has not been assigned a host, return an empty location
  433. return nil, nil, errors.NewBadRequest(fmt.Sprintf("pod %s does not have a host assigned", name))
  434. }
  435. nodeInfo, err := connInfo.GetConnectionInfo(ctx, nodeName)
  436. if err != nil {
  437. return nil, nil, err
  438. }
  439. params := url.Values{}
  440. if err := streamParams(params, opts); err != nil {
  441. return nil, nil, err
  442. }
  443. loc := &url.URL{
  444. Scheme: nodeInfo.Scheme,
  445. Host: net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port),
  446. Path: fmt.Sprintf("/%s/%s/%s/%s", path, pod.Namespace, pod.Name, container),
  447. RawQuery: params.Encode(),
  448. }
  449. return loc, nodeInfo.Transport, nil
  450. }
  451. // PortForwardLocation returns the port-forward URL for a pod.
  452. func PortForwardLocation(
  453. ctx context.Context,
  454. getter ResourceGetter,
  455. connInfo client.ConnectionInfoGetter,
  456. name string,
  457. opts *api.PodPortForwardOptions,
  458. ) (*url.URL, http.RoundTripper, error) {
  459. pod, err := getPod(ctx, getter, name)
  460. if err != nil {
  461. return nil, nil, err
  462. }
  463. nodeName := types.NodeName(pod.Spec.NodeName)
  464. if len(nodeName) == 0 {
  465. // If pod has not been assigned a host, return an empty location
  466. return nil, nil, errors.NewBadRequest(fmt.Sprintf("pod %s does not have a host assigned", name))
  467. }
  468. nodeInfo, err := connInfo.GetConnectionInfo(ctx, nodeName)
  469. if err != nil {
  470. return nil, nil, err
  471. }
  472. params := url.Values{}
  473. if err := streamParams(params, opts); err != nil {
  474. return nil, nil, err
  475. }
  476. loc := &url.URL{
  477. Scheme: nodeInfo.Scheme,
  478. Host: net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port),
  479. Path: fmt.Sprintf("/portForward/%s/%s", pod.Namespace, pod.Name),
  480. RawQuery: params.Encode(),
  481. }
  482. return loc, nodeInfo.Transport, nil
  483. }
  484. // validateContainer validate container is valid for pod, return valid container
  485. func validateContainer(container string, pod *api.Pod) (string, error) {
  486. if len(container) == 0 {
  487. switch len(pod.Spec.Containers) {
  488. case 1:
  489. container = pod.Spec.Containers[0].Name
  490. case 0:
  491. return "", errors.NewBadRequest(fmt.Sprintf("a container name must be specified for pod %s", pod.Name))
  492. default:
  493. var containerNames []string
  494. podutil.VisitContainers(&pod.Spec, func(c *api.Container) bool {
  495. containerNames = append(containerNames, c.Name)
  496. return true
  497. })
  498. errStr := fmt.Sprintf("a container name must be specified for pod %s, choose one of: %s", pod.Name, containerNames)
  499. return "", errors.NewBadRequest(errStr)
  500. }
  501. } else {
  502. if !podHasContainerWithName(pod, container) {
  503. return "", errors.NewBadRequest(fmt.Sprintf("container %s is not valid for pod %s", container, pod.Name))
  504. }
  505. }
  506. return container, nil
  507. }