123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- // Reads the pod configuration from an HTTP GET response.
- package config
- import (
- "bytes"
- "fmt"
- "io/ioutil"
- "net/http"
- "time"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/util/wait"
- api "k8s.io/kubernetes/pkg/apis/core"
- kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/klog"
- )
- type sourceURL struct {
- url string
- header http.Header
- nodeName types.NodeName
- updates chan<- interface{}
- data []byte
- failureLogs int
- client *http.Client
- }
- func NewSourceURL(url string, header http.Header, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
- config := &sourceURL{
- url: url,
- header: header,
- nodeName: nodeName,
- updates: updates,
- data: nil,
- // Timing out requests leads to retries. This client is only used to
- // read the manifest URL passed to kubelet.
- client: &http.Client{Timeout: 10 * time.Second},
- }
- klog.V(1).Infof("Watching URL %s", url)
- go wait.Until(config.run, period, wait.NeverStop)
- }
- func (s *sourceURL) run() {
- if err := s.extractFromURL(); err != nil {
- // Don't log this multiple times per minute. The first few entries should be
- // enough to get the point across.
- if s.failureLogs < 3 {
- klog.Warningf("Failed to read pods from URL: %v", err)
- } else if s.failureLogs == 3 {
- klog.Warningf("Failed to read pods from URL. Dropping verbosity of this message to V(4): %v", err)
- } else {
- klog.V(4).Infof("Failed to read pods from URL: %v", err)
- }
- s.failureLogs++
- } else {
- if s.failureLogs > 0 {
- klog.Info("Successfully read pods from URL.")
- s.failureLogs = 0
- }
- }
- }
- func (s *sourceURL) applyDefaults(pod *api.Pod) error {
- return applyDefaults(pod, s.url, false, s.nodeName)
- }
- func (s *sourceURL) extractFromURL() error {
- req, err := http.NewRequest("GET", s.url, nil)
- if err != nil {
- return err
- }
- req.Header = s.header
- resp, err := s.client.Do(req)
- if err != nil {
- return err
- }
- defer resp.Body.Close()
- data, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return err
- }
- if resp.StatusCode != http.StatusOK {
- return fmt.Errorf("%v: %v", s.url, resp.Status)
- }
- if len(data) == 0 {
- // Emit an update with an empty PodList to allow HTTPSource to be marked as seen
- s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
- return fmt.Errorf("zero-length data received from %v", s.url)
- }
- // Short circuit if the data has not changed since the last time it was read.
- if bytes.Compare(data, s.data) == 0 {
- return nil
- }
- s.data = data
- // First try as it is a single pod.
- parsed, pod, singlePodErr := tryDecodeSinglePod(data, s.applyDefaults)
- if parsed {
- if singlePodErr != nil {
- // It parsed but could not be used.
- return singlePodErr
- }
- s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{pod}, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
- return nil
- }
- // That didn't work, so try a list of pods.
- parsed, podList, multiPodErr := tryDecodePodList(data, s.applyDefaults)
- if parsed {
- if multiPodErr != nil {
- // It parsed but could not be used.
- return multiPodErr
- }
- pods := make([]*v1.Pod, 0)
- for i := range podList.Items {
- pods = append(pods, &podList.Items[i])
- }
- s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
- return nil
- }
- return fmt.Errorf("%v: received '%v', but couldn't parse as "+
- "single (%v) or multiple pods (%v).\n",
- s.url, string(data), singlePodErr, multiPodErr)
- }
|