123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package server
- import (
- "bytes"
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "net"
- "net/http"
- "net/http/httptest"
- "net/http/httputil"
- "net/url"
- "reflect"
- "strconv"
- "strings"
- "testing"
- "time"
- cadvisorapi "github.com/google/cadvisor/info/v1"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/httpstream"
- "k8s.io/apimachinery/pkg/util/httpstream/spdy"
- "k8s.io/apiserver/pkg/authentication/authenticator"
- "k8s.io/apiserver/pkg/authentication/user"
- "k8s.io/apiserver/pkg/authorization/authorizer"
- "k8s.io/client-go/tools/remotecommand"
- utiltesting "k8s.io/client-go/util/testing"
- runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
- api "k8s.io/kubernetes/pkg/apis/core"
- statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
- "k8s.io/utils/pointer"
- // Do some initialization to decode the query parameters correctly.
- _ "k8s.io/kubernetes/pkg/apis/core/install"
- "k8s.io/kubernetes/pkg/kubelet/cm"
- kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
- "k8s.io/kubernetes/pkg/kubelet/server/portforward"
- remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
- "k8s.io/kubernetes/pkg/kubelet/server/stats"
- "k8s.io/kubernetes/pkg/kubelet/server/streaming"
- "k8s.io/kubernetes/pkg/volume"
- )
- const (
- testUID = "9b01b80f-8fb4-11e4-95ab-4200af06647"
- testContainerID = "container789"
- testPodSandboxID = "pod0987"
- )
- type fakeKubelet struct {
- podByNameFunc func(namespace, name string) (*v1.Pod, bool)
- containerInfoFunc func(podFullName string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error)
- rawInfoFunc func(query *cadvisorapi.ContainerInfoRequest) (map[string]*cadvisorapi.ContainerInfo, error)
- machineInfoFunc func() (*cadvisorapi.MachineInfo, error)
- podsFunc func() []*v1.Pod
- runningPodsFunc func() ([]*v1.Pod, error)
- logFunc func(w http.ResponseWriter, req *http.Request)
- runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error)
- getExecCheck func(string, types.UID, string, []string, remotecommandserver.Options)
- getAttachCheck func(string, types.UID, string, remotecommandserver.Options)
- getPortForwardCheck func(string, string, types.UID, portforward.V4Options)
- containerLogsFunc func(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
- hostnameFunc func() string
- resyncInterval time.Duration
- loopEntryTime time.Time
- plegHealth bool
- streamingRuntime streaming.Server
- }
- func (fk *fakeKubelet) ResyncInterval() time.Duration {
- return fk.resyncInterval
- }
- func (fk *fakeKubelet) LatestLoopEntryTime() time.Time {
- return fk.loopEntryTime
- }
- func (fk *fakeKubelet) GetPodByName(namespace, name string) (*v1.Pod, bool) {
- return fk.podByNameFunc(namespace, name)
- }
- func (fk *fakeKubelet) GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
- return fk.containerInfoFunc(podFullName, uid, containerName, req)
- }
- func (fk *fakeKubelet) GetRawContainerInfo(containerName string, req *cadvisorapi.ContainerInfoRequest, subcontainers bool) (map[string]*cadvisorapi.ContainerInfo, error) {
- return fk.rawInfoFunc(req)
- }
- func (fk *fakeKubelet) GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error) {
- return fk.machineInfoFunc()
- }
- func (*fakeKubelet) GetVersionInfo() (*cadvisorapi.VersionInfo, error) {
- return &cadvisorapi.VersionInfo{}, nil
- }
- func (fk *fakeKubelet) GetPods() []*v1.Pod {
- return fk.podsFunc()
- }
- func (fk *fakeKubelet) GetRunningPods() ([]*v1.Pod, error) {
- return fk.runningPodsFunc()
- }
- func (fk *fakeKubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
- fk.logFunc(w, req)
- }
- func (fk *fakeKubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
- return fk.containerLogsFunc(ctx, podFullName, containerName, logOptions, stdout, stderr)
- }
- func (fk *fakeKubelet) GetHostname() string {
- return fk.hostnameFunc()
- }
- func (fk *fakeKubelet) RunInContainer(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) {
- return fk.runFunc(podFullName, uid, containerName, cmd)
- }
- type fakeRuntime struct {
- execFunc func(string, []string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error
- attachFunc func(string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error
- portForwardFunc func(string, int32, io.ReadWriteCloser) error
- }
- func (f *fakeRuntime) Exec(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
- return f.execFunc(containerID, cmd, stdin, stdout, stderr, tty, resize)
- }
- func (f *fakeRuntime) Attach(containerID string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
- return f.attachFunc(containerID, stdin, stdout, stderr, tty, resize)
- }
- func (f *fakeRuntime) PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error {
- return f.portForwardFunc(podSandboxID, port, stream)
- }
- type testStreamingServer struct {
- streaming.Server
- fakeRuntime *fakeRuntime
- testHTTPServer *httptest.Server
- }
- func newTestStreamingServer(streamIdleTimeout time.Duration) (s *testStreamingServer, err error) {
- s = &testStreamingServer{}
- s.testHTTPServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- s.ServeHTTP(w, r)
- }))
- defer func() {
- if err != nil {
- s.testHTTPServer.Close()
- }
- }()
- testURL, err := url.Parse(s.testHTTPServer.URL)
- if err != nil {
- return nil, err
- }
- s.fakeRuntime = &fakeRuntime{}
- config := streaming.DefaultConfig
- config.BaseURL = testURL
- if streamIdleTimeout != 0 {
- config.StreamIdleTimeout = streamIdleTimeout
- }
- s.Server, err = streaming.NewServer(config, s.fakeRuntime)
- if err != nil {
- return nil, err
- }
- return s, nil
- }
- func (fk *fakeKubelet) GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error) {
- if fk.getExecCheck != nil {
- fk.getExecCheck(podFullName, podUID, containerName, cmd, streamOpts)
- }
- // Always use testContainerID
- resp, err := fk.streamingRuntime.GetExec(&runtimeapi.ExecRequest{
- ContainerId: testContainerID,
- Cmd: cmd,
- Tty: streamOpts.TTY,
- Stdin: streamOpts.Stdin,
- Stdout: streamOpts.Stdout,
- Stderr: streamOpts.Stderr,
- })
- if err != nil {
- return nil, err
- }
- return url.Parse(resp.GetUrl())
- }
- func (fk *fakeKubelet) GetAttach(podFullName string, podUID types.UID, containerName string, streamOpts remotecommandserver.Options) (*url.URL, error) {
- if fk.getAttachCheck != nil {
- fk.getAttachCheck(podFullName, podUID, containerName, streamOpts)
- }
- // Always use testContainerID
- resp, err := fk.streamingRuntime.GetAttach(&runtimeapi.AttachRequest{
- ContainerId: testContainerID,
- Tty: streamOpts.TTY,
- Stdin: streamOpts.Stdin,
- Stdout: streamOpts.Stdout,
- Stderr: streamOpts.Stderr,
- })
- if err != nil {
- return nil, err
- }
- return url.Parse(resp.GetUrl())
- }
- func (fk *fakeKubelet) GetPortForward(podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error) {
- if fk.getPortForwardCheck != nil {
- fk.getPortForwardCheck(podName, podNamespace, podUID, portForwardOpts)
- }
- // Always use testPodSandboxID
- resp, err := fk.streamingRuntime.GetPortForward(&runtimeapi.PortForwardRequest{
- PodSandboxId: testPodSandboxID,
- Port: portForwardOpts.Ports,
- })
- if err != nil {
- return nil, err
- }
- return url.Parse(resp.GetUrl())
- }
- // Unused functions
- func (*fakeKubelet) GetNode() (*v1.Node, error) { return nil, nil }
- func (*fakeKubelet) GetNodeConfig() cm.NodeConfig { return cm.NodeConfig{} }
- func (*fakeKubelet) GetPodCgroupRoot() string { return "" }
- func (*fakeKubelet) GetPodByCgroupfs(cgroupfs string) (*v1.Pod, bool) { return nil, false }
- func (fk *fakeKubelet) ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool) {
- return map[string]volume.Volume{}, true
- }
- func (*fakeKubelet) RootFsStats() (*statsapi.FsStats, error) { return nil, nil }
- func (*fakeKubelet) ListPodStats() ([]statsapi.PodStats, error) { return nil, nil }
- func (*fakeKubelet) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) {
- return nil, nil
- }
- func (*fakeKubelet) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) { return nil, nil }
- func (*fakeKubelet) ImageFsStats() (*statsapi.FsStats, error) { return nil, nil }
- func (*fakeKubelet) RlimitStats() (*statsapi.RlimitStats, error) { return nil, nil }
- func (*fakeKubelet) GetCgroupStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, *statsapi.NetworkStats, error) {
- return nil, nil, nil
- }
- func (*fakeKubelet) GetCgroupCPUAndMemoryStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, error) {
- return nil, nil
- }
- type fakeAuth struct {
- authenticateFunc func(*http.Request) (*authenticator.Response, bool, error)
- attributesFunc func(user.Info, *http.Request) authorizer.Attributes
- authorizeFunc func(authorizer.Attributes) (authorized authorizer.Decision, reason string, err error)
- }
- func (f *fakeAuth) AuthenticateRequest(req *http.Request) (*authenticator.Response, bool, error) {
- return f.authenticateFunc(req)
- }
- func (f *fakeAuth) GetRequestAttributes(u user.Info, req *http.Request) authorizer.Attributes {
- return f.attributesFunc(u, req)
- }
- func (f *fakeAuth) Authorize(ctx context.Context, a authorizer.Attributes) (authorized authorizer.Decision, reason string, err error) {
- return f.authorizeFunc(a)
- }
- type serverTestFramework struct {
- serverUnderTest *Server
- fakeKubelet *fakeKubelet
- fakeAuth *fakeAuth
- testHTTPServer *httptest.Server
- criHandler *utiltesting.FakeHandler
- }
- func newServerTest() *serverTestFramework {
- return newServerTestWithDebug(true, false, nil)
- }
- func newServerTestWithDebug(enableDebugging, redirectContainerStreaming bool, streamingServer streaming.Server) *serverTestFramework {
- fw := &serverTestFramework{}
- fw.fakeKubelet = &fakeKubelet{
- hostnameFunc: func() string {
- return "127.0.0.1"
- },
- podByNameFunc: func(namespace, name string) (*v1.Pod, bool) {
- return &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Namespace: namespace,
- Name: name,
- UID: testUID,
- },
- }, true
- },
- plegHealth: true,
- streamingRuntime: streamingServer,
- }
- fw.fakeAuth = &fakeAuth{
- authenticateFunc: func(req *http.Request) (*authenticator.Response, bool, error) {
- return &authenticator.Response{User: &user.DefaultInfo{Name: "test"}}, true, nil
- },
- attributesFunc: func(u user.Info, req *http.Request) authorizer.Attributes {
- return &authorizer.AttributesRecord{User: u}
- },
- authorizeFunc: func(a authorizer.Attributes) (decision authorizer.Decision, reason string, err error) {
- return authorizer.DecisionAllow, "", nil
- },
- }
- fw.criHandler = &utiltesting.FakeHandler{
- StatusCode: http.StatusOK,
- }
- server := NewServer(
- fw.fakeKubelet,
- stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute),
- fw.fakeAuth,
- true,
- enableDebugging,
- false,
- redirectContainerStreaming,
- fw.criHandler)
- fw.serverUnderTest = &server
- fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
- return fw
- }
- // A helper function to return the correct pod name.
- func getPodName(name, namespace string) string {
- if namespace == "" {
- namespace = metav1.NamespaceDefault
- }
- return name + "_" + namespace
- }
- func TestContainerInfo(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- expectedInfo := &cadvisorapi.ContainerInfo{}
- podID := "somepod"
- expectedPodID := getPodName(podID, "")
- expectedContainerName := "goodcontainer"
- fw.fakeKubelet.containerInfoFunc = func(podID string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
- if podID != expectedPodID || containerName != expectedContainerName {
- return nil, fmt.Errorf("bad podID or containerName: podID=%v; containerName=%v", podID, containerName)
- }
- return expectedInfo, nil
- }
- resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v", podID, expectedContainerName))
- if err != nil {
- t.Fatalf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- var receivedInfo cadvisorapi.ContainerInfo
- err = json.NewDecoder(resp.Body).Decode(&receivedInfo)
- if err != nil {
- t.Fatalf("received invalid json data: %v", err)
- }
- if !receivedInfo.Eq(expectedInfo) {
- t.Errorf("received wrong data: %#v", receivedInfo)
- }
- }
- func TestContainerInfoWithUidNamespace(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- expectedInfo := &cadvisorapi.ContainerInfo{}
- podID := "somepod"
- expectedNamespace := "custom"
- expectedPodID := getPodName(podID, expectedNamespace)
- expectedContainerName := "goodcontainer"
- fw.fakeKubelet.containerInfoFunc = func(podID string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
- if podID != expectedPodID || string(uid) != testUID || containerName != expectedContainerName {
- return nil, fmt.Errorf("bad podID or uid or containerName: podID=%v; uid=%v; containerName=%v", podID, uid, containerName)
- }
- return expectedInfo, nil
- }
- resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v/%v/%v", expectedNamespace, podID, testUID, expectedContainerName))
- if err != nil {
- t.Fatalf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- var receivedInfo cadvisorapi.ContainerInfo
- err = json.NewDecoder(resp.Body).Decode(&receivedInfo)
- if err != nil {
- t.Fatalf("received invalid json data: %v", err)
- }
- if !receivedInfo.Eq(expectedInfo) {
- t.Errorf("received wrong data: %#v", receivedInfo)
- }
- }
- func TestContainerNotFound(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- podID := "somepod"
- expectedNamespace := "custom"
- expectedContainerName := "slowstartcontainer"
- fw.fakeKubelet.containerInfoFunc = func(podID string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
- return nil, kubecontainer.ErrContainerNotFound
- }
- resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v/%v/%v", expectedNamespace, podID, testUID, expectedContainerName))
- if err != nil {
- t.Fatalf("Got error GETing: %v", err)
- }
- if resp.StatusCode != http.StatusNotFound {
- t.Fatalf("Received status %d expecting %d", resp.StatusCode, http.StatusNotFound)
- }
- defer resp.Body.Close()
- }
- func TestRootInfo(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- expectedInfo := &cadvisorapi.ContainerInfo{
- ContainerReference: cadvisorapi.ContainerReference{
- Name: "/",
- },
- }
- fw.fakeKubelet.rawInfoFunc = func(req *cadvisorapi.ContainerInfoRequest) (map[string]*cadvisorapi.ContainerInfo, error) {
- return map[string]*cadvisorapi.ContainerInfo{
- expectedInfo.Name: expectedInfo,
- }, nil
- }
- resp, err := http.Get(fw.testHTTPServer.URL + "/stats")
- if err != nil {
- t.Fatalf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- var receivedInfo cadvisorapi.ContainerInfo
- err = json.NewDecoder(resp.Body).Decode(&receivedInfo)
- if err != nil {
- t.Fatalf("received invalid json data: %v", err)
- }
- if !receivedInfo.Eq(expectedInfo) {
- t.Errorf("received wrong data: %#v, expected %#v", receivedInfo, expectedInfo)
- }
- }
- func TestSubcontainerContainerInfo(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- const kubeletContainer = "/kubelet"
- const kubeletSubContainer = "/kubelet/sub"
- expectedInfo := map[string]*cadvisorapi.ContainerInfo{
- kubeletContainer: {
- ContainerReference: cadvisorapi.ContainerReference{
- Name: kubeletContainer,
- },
- },
- kubeletSubContainer: {
- ContainerReference: cadvisorapi.ContainerReference{
- Name: kubeletSubContainer,
- },
- },
- }
- fw.fakeKubelet.rawInfoFunc = func(req *cadvisorapi.ContainerInfoRequest) (map[string]*cadvisorapi.ContainerInfo, error) {
- return expectedInfo, nil
- }
- request := fmt.Sprintf("{\"containerName\":%q, \"subcontainers\": true}", kubeletContainer)
- resp, err := http.Post(fw.testHTTPServer.URL+"/stats/container", "application/json", bytes.NewBuffer([]byte(request)))
- if err != nil {
- t.Fatalf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- var receivedInfo map[string]*cadvisorapi.ContainerInfo
- err = json.NewDecoder(resp.Body).Decode(&receivedInfo)
- if err != nil {
- t.Fatalf("Received invalid json data: %v", err)
- }
- if len(receivedInfo) != len(expectedInfo) {
- t.Errorf("Received wrong data: %#v, expected %#v", receivedInfo, expectedInfo)
- }
- for _, containerName := range []string{kubeletContainer, kubeletSubContainer} {
- if _, ok := receivedInfo[containerName]; !ok {
- t.Errorf("Expected container %q to be present in result: %#v", containerName, receivedInfo)
- }
- if !receivedInfo[containerName].Eq(expectedInfo[containerName]) {
- t.Errorf("Invalid result for %q: Expected %#v, received %#v", containerName, expectedInfo[containerName], receivedInfo[containerName])
- }
- }
- }
- func TestMachineInfo(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- expectedInfo := &cadvisorapi.MachineInfo{
- NumCores: 4,
- MemoryCapacity: 1024,
- }
- fw.fakeKubelet.machineInfoFunc = func() (*cadvisorapi.MachineInfo, error) {
- return expectedInfo, nil
- }
- resp, err := http.Get(fw.testHTTPServer.URL + "/spec")
- if err != nil {
- t.Fatalf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- var receivedInfo cadvisorapi.MachineInfo
- err = json.NewDecoder(resp.Body).Decode(&receivedInfo)
- if err != nil {
- t.Fatalf("received invalid json data: %v", err)
- }
- if !reflect.DeepEqual(&receivedInfo, expectedInfo) {
- t.Errorf("received wrong data: %#v", receivedInfo)
- }
- }
- func TestServeLogs(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- content := string(`<pre><a href="kubelet.log">kubelet.log</a><a href="google.log">google.log</a></pre>`)
- fw.fakeKubelet.logFunc = func(w http.ResponseWriter, req *http.Request) {
- w.WriteHeader(http.StatusOK)
- w.Header().Add("Content-Type", "text/html")
- w.Write([]byte(content))
- }
- resp, err := http.Get(fw.testHTTPServer.URL + "/logs/")
- if err != nil {
- t.Fatalf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- body, err := httputil.DumpResponse(resp, true)
- if err != nil {
- // copying the response body did not work
- t.Errorf("Cannot copy resp: %#v", err)
- }
- result := string(body)
- if !strings.Contains(result, "kubelet.log") || !strings.Contains(result, "google.log") {
- t.Errorf("Received wrong data: %s", result)
- }
- }
- func TestServeRunInContainer(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- output := "foo bar"
- podNamespace := "other"
- podName := "foo"
- expectedPodName := getPodName(podName, podNamespace)
- expectedContainerName := "baz"
- expectedCommand := "ls -a"
- fw.fakeKubelet.runFunc = func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) {
- if podFullName != expectedPodName {
- t.Errorf("expected %s, got %s", expectedPodName, podFullName)
- }
- if containerName != expectedContainerName {
- t.Errorf("expected %s, got %s", expectedContainerName, containerName)
- }
- if strings.Join(cmd, " ") != expectedCommand {
- t.Errorf("expected: %s, got %v", expectedCommand, cmd)
- }
- return []byte(output), nil
- }
- resp, err := http.Post(fw.testHTTPServer.URL+"/run/"+podNamespace+"/"+podName+"/"+expectedContainerName+"?cmd=ls%20-a", "", nil)
- if err != nil {
- t.Fatalf("Got error POSTing: %v", err)
- }
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- // copying the response body did not work
- t.Errorf("Cannot copy resp: %#v", err)
- }
- result := string(body)
- if result != output {
- t.Errorf("expected %s, got %s", output, result)
- }
- }
- func TestServeRunInContainerWithUID(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- output := "foo bar"
- podNamespace := "other"
- podName := "foo"
- expectedPodName := getPodName(podName, podNamespace)
- expectedContainerName := "baz"
- expectedCommand := "ls -a"
- fw.fakeKubelet.runFunc = func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) {
- if podFullName != expectedPodName {
- t.Errorf("expected %s, got %s", expectedPodName, podFullName)
- }
- if string(uid) != testUID {
- t.Errorf("expected %s, got %s", testUID, uid)
- }
- if containerName != expectedContainerName {
- t.Errorf("expected %s, got %s", expectedContainerName, containerName)
- }
- if strings.Join(cmd, " ") != expectedCommand {
- t.Errorf("expected: %s, got %v", expectedCommand, cmd)
- }
- return []byte(output), nil
- }
- resp, err := http.Post(fw.testHTTPServer.URL+"/run/"+podNamespace+"/"+podName+"/"+testUID+"/"+expectedContainerName+"?cmd=ls%20-a", "", nil)
- if err != nil {
- t.Fatalf("Got error POSTing: %v", err)
- }
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- // copying the response body did not work
- t.Errorf("Cannot copy resp: %#v", err)
- }
- result := string(body)
- if result != output {
- t.Errorf("expected %s, got %s", output, result)
- }
- }
- func TestHealthCheck(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- fw.fakeKubelet.hostnameFunc = func() string {
- return "127.0.0.1"
- }
- // Test with correct hostname, Docker version
- assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
- // Test with incorrect hostname
- fw.fakeKubelet.hostnameFunc = func() string {
- return "fake"
- }
- assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
- }
- func assertHealthFails(t *testing.T, httpURL string, expectedErrorCode int) {
- resp, err := http.Get(httpURL)
- if err != nil {
- t.Fatalf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != expectedErrorCode {
- t.Errorf("expected status code %d, got %d", expectedErrorCode, resp.StatusCode)
- }
- }
- // Ensure all registered handlers & services have an associated testcase.
- func TestAuthzCoverage(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- // method:path -> has coverage
- expectedCases := map[string]bool{}
- // Test all the non-web-service handlers
- for _, path := range fw.serverUnderTest.restfulCont.RegisteredHandlePaths() {
- expectedCases["GET:"+path] = false
- expectedCases["POST:"+path] = false
- }
- // Test all the generated web-service paths
- for _, ws := range fw.serverUnderTest.restfulCont.RegisteredWebServices() {
- for _, r := range ws.Routes() {
- expectedCases[r.Method+":"+r.Path] = false
- }
- }
- // This is a sanity check that the Handle->HandleWithFilter() delegation is working
- // Ideally, these would move to registered web services and this list would get shorter
- expectedPaths := []string{"/healthz", "/metrics", "/metrics/cadvisor"}
- for _, expectedPath := range expectedPaths {
- if _, expected := expectedCases["GET:"+expectedPath]; !expected {
- t.Errorf("Expected registered handle path %s was missing", expectedPath)
- }
- }
- for _, tc := range AuthzTestCases() {
- expectedCases[tc.Method+":"+tc.Path] = true
- }
- for tc, found := range expectedCases {
- if !found {
- t.Errorf("Missing authz test case for %s", tc)
- }
- }
- }
- func TestAuthFilters(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- attributesGetter := NewNodeAuthorizerAttributesGetter(authzTestNodeName)
- for _, tc := range AuthzTestCases() {
- t.Run(tc.Method+":"+tc.Path, func(t *testing.T) {
- var (
- expectedUser = AuthzTestUser()
- calledAuthenticate = false
- calledAuthorize = false
- calledAttributes = false
- )
- fw.fakeAuth.authenticateFunc = func(req *http.Request) (*authenticator.Response, bool, error) {
- calledAuthenticate = true
- return &authenticator.Response{User: expectedUser}, true, nil
- }
- fw.fakeAuth.attributesFunc = func(u user.Info, req *http.Request) authorizer.Attributes {
- calledAttributes = true
- require.Equal(t, expectedUser, u)
- return attributesGetter.GetRequestAttributes(u, req)
- }
- fw.fakeAuth.authorizeFunc = func(a authorizer.Attributes) (decision authorizer.Decision, reason string, err error) {
- calledAuthorize = true
- tc.AssertAttributes(t, a)
- return authorizer.DecisionNoOpinion, "", nil
- }
- req, err := http.NewRequest(tc.Method, fw.testHTTPServer.URL+tc.Path, nil)
- require.NoError(t, err)
- resp, err := http.DefaultClient.Do(req)
- require.NoError(t, err)
- defer resp.Body.Close()
- assert.Equal(t, http.StatusForbidden, resp.StatusCode)
- assert.True(t, calledAuthenticate, "Authenticate was not called")
- assert.True(t, calledAttributes, "Attributes were not called")
- assert.True(t, calledAuthorize, "Authorize was not called")
- })
- }
- }
- func TestAuthenticationError(t *testing.T) {
- var (
- expectedUser = &user.DefaultInfo{Name: "test"}
- expectedAttributes = &authorizer.AttributesRecord{User: expectedUser}
- calledAuthenticate = false
- calledAuthorize = false
- calledAttributes = false
- )
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- fw.fakeAuth.authenticateFunc = func(req *http.Request) (*authenticator.Response, bool, error) {
- calledAuthenticate = true
- return &authenticator.Response{User: expectedUser}, true, nil
- }
- fw.fakeAuth.attributesFunc = func(u user.Info, req *http.Request) authorizer.Attributes {
- calledAttributes = true
- return expectedAttributes
- }
- fw.fakeAuth.authorizeFunc = func(a authorizer.Attributes) (decision authorizer.Decision, reason string, err error) {
- calledAuthorize = true
- return authorizer.DecisionNoOpinion, "", errors.New("Failed")
- }
- assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
- if !calledAuthenticate {
- t.Fatalf("Authenticate was not called")
- }
- if !calledAttributes {
- t.Fatalf("Attributes was not called")
- }
- if !calledAuthorize {
- t.Fatalf("Authorize was not called")
- }
- }
- func TestAuthenticationFailure(t *testing.T) {
- var (
- expectedUser = &user.DefaultInfo{Name: "test"}
- expectedAttributes = &authorizer.AttributesRecord{User: expectedUser}
- calledAuthenticate = false
- calledAuthorize = false
- calledAttributes = false
- )
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- fw.fakeAuth.authenticateFunc = func(req *http.Request) (*authenticator.Response, bool, error) {
- calledAuthenticate = true
- return nil, false, nil
- }
- fw.fakeAuth.attributesFunc = func(u user.Info, req *http.Request) authorizer.Attributes {
- calledAttributes = true
- return expectedAttributes
- }
- fw.fakeAuth.authorizeFunc = func(a authorizer.Attributes) (decision authorizer.Decision, reason string, err error) {
- calledAuthorize = true
- return authorizer.DecisionNoOpinion, "", nil
- }
- assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusUnauthorized)
- if !calledAuthenticate {
- t.Fatalf("Authenticate was not called")
- }
- if calledAttributes {
- t.Fatalf("Attributes was called unexpectedly")
- }
- if calledAuthorize {
- t.Fatalf("Authorize was called unexpectedly")
- }
- }
- func TestAuthorizationSuccess(t *testing.T) {
- var (
- expectedUser = &user.DefaultInfo{Name: "test"}
- expectedAttributes = &authorizer.AttributesRecord{User: expectedUser}
- calledAuthenticate = false
- calledAuthorize = false
- calledAttributes = false
- )
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- fw.fakeAuth.authenticateFunc = func(req *http.Request) (*authenticator.Response, bool, error) {
- calledAuthenticate = true
- return &authenticator.Response{User: expectedUser}, true, nil
- }
- fw.fakeAuth.attributesFunc = func(u user.Info, req *http.Request) authorizer.Attributes {
- calledAttributes = true
- return expectedAttributes
- }
- fw.fakeAuth.authorizeFunc = func(a authorizer.Attributes) (decision authorizer.Decision, reason string, err error) {
- calledAuthorize = true
- return authorizer.DecisionAllow, "", nil
- }
- assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
- if !calledAuthenticate {
- t.Fatalf("Authenticate was not called")
- }
- if !calledAttributes {
- t.Fatalf("Attributes were not called")
- }
- if !calledAuthorize {
- t.Fatalf("Authorize was not called")
- }
- }
- func TestSyncLoopCheck(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- fw.fakeKubelet.hostnameFunc = func() string {
- return "127.0.0.1"
- }
- fw.fakeKubelet.resyncInterval = time.Minute
- fw.fakeKubelet.loopEntryTime = time.Now()
- // Test with correct hostname, Docker version
- assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
- fw.fakeKubelet.loopEntryTime = time.Now().Add(time.Minute * -10)
- assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
- }
- // returns http response status code from the HTTP GET
- func assertHealthIsOk(t *testing.T, httpURL string) {
- resp, err := http.Get(httpURL)
- if err != nil {
- t.Fatalf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- t.Errorf("expected status code %d, got %d", http.StatusOK, resp.StatusCode)
- }
- body, readErr := ioutil.ReadAll(resp.Body)
- if readErr != nil {
- // copying the response body did not work
- t.Fatalf("Cannot copy resp: %#v", readErr)
- }
- result := string(body)
- if !strings.Contains(result, "ok") {
- t.Errorf("expected body contains ok, got %s", result)
- }
- }
- func setPodByNameFunc(fw *serverTestFramework, namespace, pod, container string) {
- fw.fakeKubelet.podByNameFunc = func(namespace, name string) (*v1.Pod, bool) {
- return &v1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Namespace: namespace,
- Name: pod,
- },
- Spec: v1.PodSpec{
- Containers: []v1.Container{
- {
- Name: container,
- },
- },
- },
- }, true
- }
- }
- func setGetContainerLogsFunc(fw *serverTestFramework, t *testing.T, expectedPodName, expectedContainerName string, expectedLogOptions *v1.PodLogOptions, output string) {
- fw.fakeKubelet.containerLogsFunc = func(_ context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
- if podFullName != expectedPodName {
- t.Errorf("expected %s, got %s", expectedPodName, podFullName)
- }
- if containerName != expectedContainerName {
- t.Errorf("expected %s, got %s", expectedContainerName, containerName)
- }
- if !reflect.DeepEqual(expectedLogOptions, logOptions) {
- t.Errorf("expected %#v, got %#v", expectedLogOptions, logOptions)
- }
- io.WriteString(stdout, output)
- return nil
- }
- }
- func TestContainerLogs(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- tests := map[string]struct {
- query string
- podLogOption *v1.PodLogOptions
- }{
- "without tail": {"", &v1.PodLogOptions{}},
- "with tail": {"?tailLines=5", &v1.PodLogOptions{TailLines: pointer.Int64Ptr(5)}},
- "with legacy tail": {"?tail=5", &v1.PodLogOptions{TailLines: pointer.Int64Ptr(5)}},
- "with tail all": {"?tail=all", &v1.PodLogOptions{}},
- "with follow": {"?follow=1", &v1.PodLogOptions{Follow: true}},
- }
- for desc, test := range tests {
- t.Run(desc, func(t *testing.T) {
- output := "foo bar"
- podNamespace := "other"
- podName := "foo"
- expectedPodName := getPodName(podName, podNamespace)
- expectedContainerName := "baz"
- setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
- setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, test.podLogOption, output)
- resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + test.query)
- if err != nil {
- t.Errorf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- t.Errorf("Error reading container logs: %v", err)
- }
- result := string(body)
- if result != output {
- t.Errorf("Expected: '%v', got: '%v'", output, result)
- }
- })
- }
- }
- func TestContainerLogsWithInvalidTail(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- output := "foo bar"
- podNamespace := "other"
- podName := "foo"
- expectedPodName := getPodName(podName, podNamespace)
- expectedContainerName := "baz"
- setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
- setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, &v1.PodLogOptions{}, output)
- resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?tail=-1")
- if err != nil {
- t.Errorf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusUnprocessableEntity {
- t.Errorf("Unexpected non-error reading container logs: %#v", resp)
- }
- }
- func makeReq(t *testing.T, method, url, clientProtocol string) *http.Request {
- req, err := http.NewRequest(method, url, nil)
- if err != nil {
- t.Fatalf("error creating request: %v", err)
- }
- req.Header.Set("Content-Type", "")
- req.Header.Add("X-Stream-Protocol-Version", clientProtocol)
- return req
- }
- func TestServeExecInContainerIdleTimeout(t *testing.T) {
- ss, err := newTestStreamingServer(100 * time.Millisecond)
- require.NoError(t, err)
- defer ss.testHTTPServer.Close()
- fw := newServerTestWithDebug(true, false, ss)
- defer fw.testHTTPServer.Close()
- podNamespace := "other"
- podName := "foo"
- expectedContainerName := "baz"
- url := fw.testHTTPServer.URL + "/exec/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?c=ls&c=-a&" + api.ExecStdinParam + "=1"
- upgradeRoundTripper := spdy.NewSpdyRoundTripper(nil, true, true)
- c := &http.Client{Transport: upgradeRoundTripper}
- resp, err := c.Do(makeReq(t, "POST", url, "v4.channel.k8s.io"))
- if err != nil {
- t.Fatalf("Got error POSTing: %v", err)
- }
- defer resp.Body.Close()
- upgradeRoundTripper.Dialer = &net.Dialer{
- Deadline: time.Now().Add(60 * time.Second),
- Timeout: 60 * time.Second,
- }
- conn, err := upgradeRoundTripper.NewConnection(resp)
- if err != nil {
- t.Fatalf("Unexpected error creating streaming connection: %s", err)
- }
- if conn == nil {
- t.Fatal("Unexpected nil connection")
- }
- <-conn.CloseChan()
- }
- func testExecAttach(t *testing.T, verb string) {
- tests := map[string]struct {
- stdin bool
- stdout bool
- stderr bool
- tty bool
- responseStatusCode int
- uid bool
- redirect bool
- }{
- "no input or output": {responseStatusCode: http.StatusBadRequest},
- "stdin": {stdin: true, responseStatusCode: http.StatusSwitchingProtocols},
- "stdout": {stdout: true, responseStatusCode: http.StatusSwitchingProtocols},
- "stderr": {stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
- "stdout and stderr": {stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
- "stdin stdout and stderr": {stdin: true, stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
- "stdin stdout stderr with uid": {stdin: true, stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols, uid: true},
- "stdout with redirect": {stdout: true, responseStatusCode: http.StatusFound, redirect: true},
- }
- for desc := range tests {
- test := tests[desc]
- t.Run(desc, func(t *testing.T) {
- ss, err := newTestStreamingServer(0)
- require.NoError(t, err)
- defer ss.testHTTPServer.Close()
- fw := newServerTestWithDebug(true, test.redirect, ss)
- defer fw.testHTTPServer.Close()
- fmt.Println(desc)
- podNamespace := "other"
- podName := "foo"
- expectedPodName := getPodName(podName, podNamespace)
- expectedContainerName := "baz"
- expectedCommand := "ls -a"
- expectedStdin := "stdin"
- expectedStdout := "stdout"
- expectedStderr := "stderr"
- done := make(chan struct{})
- clientStdoutReadDone := make(chan struct{})
- clientStderrReadDone := make(chan struct{})
- execInvoked := false
- attachInvoked := false
- checkStream := func(podFullName string, uid types.UID, containerName string, streamOpts remotecommandserver.Options) {
- assert.Equal(t, expectedPodName, podFullName, "podFullName")
- if test.uid {
- assert.Equal(t, testUID, string(uid), "uid")
- }
- assert.Equal(t, expectedContainerName, containerName, "containerName")
- assert.Equal(t, test.stdin, streamOpts.Stdin, "stdin")
- assert.Equal(t, test.stdout, streamOpts.Stdout, "stdout")
- assert.Equal(t, test.tty, streamOpts.TTY, "tty")
- assert.Equal(t, !test.tty && test.stderr, streamOpts.Stderr, "stderr")
- }
- fw.fakeKubelet.getExecCheck = func(podFullName string, uid types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) {
- execInvoked = true
- assert.Equal(t, expectedCommand, strings.Join(cmd, " "), "cmd")
- checkStream(podFullName, uid, containerName, streamOpts)
- }
- fw.fakeKubelet.getAttachCheck = func(podFullName string, uid types.UID, containerName string, streamOpts remotecommandserver.Options) {
- attachInvoked = true
- checkStream(podFullName, uid, containerName, streamOpts)
- }
- testStream := func(containerID string, in io.Reader, out, stderr io.WriteCloser, tty bool, done chan struct{}) error {
- close(done)
- assert.Equal(t, testContainerID, containerID, "containerID")
- assert.Equal(t, test.tty, tty, "tty")
- require.Equal(t, test.stdin, in != nil, "in")
- require.Equal(t, test.stdout, out != nil, "out")
- require.Equal(t, !test.tty && test.stderr, stderr != nil, "err")
- if test.stdin {
- b := make([]byte, 10)
- n, err := in.Read(b)
- assert.NoError(t, err, "reading from stdin")
- assert.Equal(t, expectedStdin, string(b[0:n]), "content from stdin")
- }
- if test.stdout {
- _, err := out.Write([]byte(expectedStdout))
- assert.NoError(t, err, "writing to stdout")
- out.Close()
- <-clientStdoutReadDone
- }
- if !test.tty && test.stderr {
- _, err := stderr.Write([]byte(expectedStderr))
- assert.NoError(t, err, "writing to stderr")
- stderr.Close()
- <-clientStderrReadDone
- }
- return nil
- }
- ss.fakeRuntime.execFunc = func(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
- assert.Equal(t, expectedCommand, strings.Join(cmd, " "), "cmd")
- return testStream(containerID, stdin, stdout, stderr, tty, done)
- }
- ss.fakeRuntime.attachFunc = func(containerID string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
- return testStream(containerID, stdin, stdout, stderr, tty, done)
- }
- var url string
- if test.uid {
- url = fw.testHTTPServer.URL + "/" + verb + "/" + podNamespace + "/" + podName + "/" + testUID + "/" + expectedContainerName + "?ignore=1"
- } else {
- url = fw.testHTTPServer.URL + "/" + verb + "/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?ignore=1"
- }
- if verb == "exec" {
- url += "&command=ls&command=-a"
- }
- if test.stdin {
- url += "&" + api.ExecStdinParam + "=1"
- }
- if test.stdout {
- url += "&" + api.ExecStdoutParam + "=1"
- }
- if test.stderr && !test.tty {
- url += "&" + api.ExecStderrParam + "=1"
- }
- if test.tty {
- url += "&" + api.ExecTTYParam + "=1"
- }
- var (
- resp *http.Response
- upgradeRoundTripper httpstream.UpgradeRoundTripper
- c *http.Client
- )
- if test.redirect {
- c = &http.Client{}
- // Don't follow redirects, since we want to inspect the redirect response.
- c.CheckRedirect = func(*http.Request, []*http.Request) error {
- return http.ErrUseLastResponse
- }
- } else {
- upgradeRoundTripper = spdy.NewRoundTripper(nil, true, true)
- c = &http.Client{Transport: upgradeRoundTripper}
- }
- resp, err = c.Do(makeReq(t, "POST", url, "v4.channel.k8s.io"))
- require.NoError(t, err, "POSTing")
- defer resp.Body.Close()
- _, err = ioutil.ReadAll(resp.Body)
- assert.NoError(t, err, "reading response body")
- require.Equal(t, test.responseStatusCode, resp.StatusCode, "response status")
- if test.responseStatusCode != http.StatusSwitchingProtocols {
- return
- }
- conn, err := upgradeRoundTripper.NewConnection(resp)
- require.NoError(t, err, "creating streaming connection")
- defer conn.Close()
- h := http.Header{}
- h.Set(api.StreamType, api.StreamTypeError)
- _, err = conn.CreateStream(h)
- require.NoError(t, err, "creating error stream")
- if test.stdin {
- h.Set(api.StreamType, api.StreamTypeStdin)
- stream, err := conn.CreateStream(h)
- require.NoError(t, err, "creating stdin stream")
- _, err = stream.Write([]byte(expectedStdin))
- require.NoError(t, err, "writing to stdin stream")
- }
- var stdoutStream httpstream.Stream
- if test.stdout {
- h.Set(api.StreamType, api.StreamTypeStdout)
- stdoutStream, err = conn.CreateStream(h)
- require.NoError(t, err, "creating stdout stream")
- }
- var stderrStream httpstream.Stream
- if test.stderr && !test.tty {
- h.Set(api.StreamType, api.StreamTypeStderr)
- stderrStream, err = conn.CreateStream(h)
- require.NoError(t, err, "creating stderr stream")
- }
- if test.stdout {
- output := make([]byte, 10)
- n, err := stdoutStream.Read(output)
- close(clientStdoutReadDone)
- assert.NoError(t, err, "reading from stdout stream")
- assert.Equal(t, expectedStdout, string(output[0:n]), "stdout")
- }
- if test.stderr && !test.tty {
- output := make([]byte, 10)
- n, err := stderrStream.Read(output)
- close(clientStderrReadDone)
- assert.NoError(t, err, "reading from stderr stream")
- assert.Equal(t, expectedStderr, string(output[0:n]), "stderr")
- }
- // wait for the server to finish before checking if the attach/exec funcs were invoked
- <-done
- if verb == "exec" {
- assert.True(t, execInvoked, "exec should be invoked")
- assert.False(t, attachInvoked, "attach should not be invoked")
- } else {
- assert.True(t, attachInvoked, "attach should be invoked")
- assert.False(t, execInvoked, "exec should not be invoked")
- }
- })
- }
- }
- func TestServeExecInContainer(t *testing.T) {
- testExecAttach(t, "exec")
- }
- func TestServeAttachContainer(t *testing.T) {
- testExecAttach(t, "attach")
- }
- func TestServePortForwardIdleTimeout(t *testing.T) {
- ss, err := newTestStreamingServer(100 * time.Millisecond)
- require.NoError(t, err)
- defer ss.testHTTPServer.Close()
- fw := newServerTestWithDebug(true, false, ss)
- defer fw.testHTTPServer.Close()
- podNamespace := "other"
- podName := "foo"
- url := fw.testHTTPServer.URL + "/portForward/" + podNamespace + "/" + podName
- upgradeRoundTripper := spdy.NewRoundTripper(nil, true, true)
- c := &http.Client{Transport: upgradeRoundTripper}
- req := makeReq(t, "POST", url, "portforward.k8s.io")
- resp, err := c.Do(req)
- if err != nil {
- t.Fatalf("Got error POSTing: %v", err)
- }
- defer resp.Body.Close()
- conn, err := upgradeRoundTripper.NewConnection(resp)
- if err != nil {
- t.Fatalf("Unexpected error creating streaming connection: %s", err)
- }
- if conn == nil {
- t.Fatal("Unexpected nil connection")
- }
- defer conn.Close()
- <-conn.CloseChan()
- }
- func TestServePortForward(t *testing.T) {
- tests := map[string]struct {
- port string
- uid bool
- clientData string
- containerData string
- redirect bool
- shouldError bool
- }{
- "no port": {port: "", shouldError: true},
- "none number port": {port: "abc", shouldError: true},
- "negative port": {port: "-1", shouldError: true},
- "too large port": {port: "65536", shouldError: true},
- "0 port": {port: "0", shouldError: true},
- "min port": {port: "1", shouldError: false},
- "normal port": {port: "8000", shouldError: false},
- "normal port with data forward": {port: "8000", clientData: "client data", containerData: "container data", shouldError: false},
- "max port": {port: "65535", shouldError: false},
- "normal port with uid": {port: "8000", uid: true, shouldError: false},
- "normal port with redirect": {port: "8000", redirect: true, shouldError: false},
- }
- podNamespace := "other"
- podName := "foo"
- for desc := range tests {
- test := tests[desc]
- t.Run(desc, func(t *testing.T) {
- ss, err := newTestStreamingServer(0)
- require.NoError(t, err)
- defer ss.testHTTPServer.Close()
- fw := newServerTestWithDebug(true, test.redirect, ss)
- defer fw.testHTTPServer.Close()
- portForwardFuncDone := make(chan struct{})
- fw.fakeKubelet.getPortForwardCheck = func(name, namespace string, uid types.UID, opts portforward.V4Options) {
- assert.Equal(t, podName, name, "pod name")
- assert.Equal(t, podNamespace, namespace, "pod namespace")
- if test.uid {
- assert.Equal(t, testUID, string(uid), "uid")
- }
- }
- ss.fakeRuntime.portForwardFunc = func(podSandboxID string, port int32, stream io.ReadWriteCloser) error {
- defer close(portForwardFuncDone)
- assert.Equal(t, testPodSandboxID, podSandboxID, "pod sandbox id")
- // The port should be valid if it reaches here.
- testPort, err := strconv.ParseInt(test.port, 10, 32)
- require.NoError(t, err, "parse port")
- assert.Equal(t, int32(testPort), port, "port")
- if test.clientData != "" {
- fromClient := make([]byte, 32)
- n, err := stream.Read(fromClient)
- assert.NoError(t, err, "reading client data")
- assert.Equal(t, test.clientData, string(fromClient[0:n]), "client data")
- }
- if test.containerData != "" {
- _, err := stream.Write([]byte(test.containerData))
- assert.NoError(t, err, "writing container data")
- }
- return nil
- }
- var url string
- if test.uid {
- url = fmt.Sprintf("%s/portForward/%s/%s/%s", fw.testHTTPServer.URL, podNamespace, podName, testUID)
- } else {
- url = fmt.Sprintf("%s/portForward/%s/%s", fw.testHTTPServer.URL, podNamespace, podName)
- }
- var (
- upgradeRoundTripper httpstream.UpgradeRoundTripper
- c *http.Client
- )
- if test.redirect {
- c = &http.Client{}
- // Don't follow redirects, since we want to inspect the redirect response.
- c.CheckRedirect = func(*http.Request, []*http.Request) error {
- return http.ErrUseLastResponse
- }
- } else {
- upgradeRoundTripper = spdy.NewRoundTripper(nil, true, true)
- c = &http.Client{Transport: upgradeRoundTripper}
- }
- req := makeReq(t, "POST", url, "portforward.k8s.io")
- resp, err := c.Do(req)
- require.NoError(t, err, "POSTing")
- defer resp.Body.Close()
- if test.redirect {
- assert.Equal(t, http.StatusFound, resp.StatusCode, "status code")
- return
- }
- assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode, "status code")
- conn, err := upgradeRoundTripper.NewConnection(resp)
- require.NoError(t, err, "creating streaming connection")
- defer conn.Close()
- headers := http.Header{}
- headers.Set("streamType", "error")
- headers.Set("port", test.port)
- _, err = conn.CreateStream(headers)
- assert.Equal(t, test.shouldError, err != nil, "expect error")
- if test.shouldError {
- return
- }
- headers.Set("streamType", "data")
- headers.Set("port", test.port)
- dataStream, err := conn.CreateStream(headers)
- require.NoError(t, err, "create stream")
- if test.clientData != "" {
- _, err := dataStream.Write([]byte(test.clientData))
- assert.NoError(t, err, "writing client data")
- }
- if test.containerData != "" {
- fromContainer := make([]byte, 32)
- n, err := dataStream.Read(fromContainer)
- assert.NoError(t, err, "reading container data")
- assert.Equal(t, test.containerData, string(fromContainer[0:n]), "container data")
- }
- <-portForwardFuncDone
- })
- }
- }
- func TestCRIHandler(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- const (
- path = "/cri/exec/123456abcdef"
- query = "cmd=echo+foo"
- )
- resp, err := http.Get(fw.testHTTPServer.URL + path + "?" + query)
- require.NoError(t, err)
- assert.Equal(t, http.StatusOK, resp.StatusCode)
- assert.Equal(t, "GET", fw.criHandler.RequestReceived.Method)
- assert.Equal(t, path, fw.criHandler.RequestReceived.URL.Path)
- assert.Equal(t, query, fw.criHandler.RequestReceived.URL.RawQuery)
- }
- func TestMetricBuckets(t *testing.T) {
- tests := map[string]struct {
- url string
- bucket string
- }{
- "healthz endpoint": {url: "/healthz", bucket: "healthz"},
- "attach": {url: "/attach/podNamespace/podID/containerName", bucket: "attach"},
- "attach with uid": {url: "/attach/podNamespace/podID/uid/containerName", bucket: "attach"},
- "configz": {url: "/configz", bucket: "configz"},
- "containerLogs": {url: "/containerLogs/podNamespace/podID/containerName", bucket: "containerLogs"},
- "cri": {url: "/cri/", bucket: "cri"},
- "cri with sub": {url: "/cri/foo", bucket: "cri"},
- "debug v flags": {url: "/debug/flags/v", bucket: "debug"},
- "pprof with sub": {url: "/debug/pprof/subpath", bucket: "debug"},
- "exec": {url: "/exec/podNamespace/podID/containerName", bucket: "exec"},
- "exec with uid": {url: "/exec/podNamespace/podID/uid/containerName", bucket: "exec"},
- "healthz": {url: "/healthz/", bucket: "healthz"},
- "healthz log sub": {url: "/healthz/log", bucket: "healthz"},
- "healthz ping": {url: "/healthz/ping", bucket: "healthz"},
- "healthz sync loop": {url: "/healthz/syncloop", bucket: "healthz"},
- "logs": {url: "/logs/", bucket: "logs"},
- "logs with path": {url: "/logs/logpath", bucket: "logs"},
- "metrics": {url: "/metrics", bucket: "metrics"},
- "metrics cadvisor sub": {url: "/metrics/cadvisor", bucket: "metrics/cadvisor"},
- "metrics probes sub": {url: "/metrics/probes", bucket: "metrics/probes"},
- "metrics resource v1alpha1": {url: "/metrics/resource/v1alpha1", bucket: "metrics/resource"},
- "metrics resource sub": {url: "/metrics/resource", bucket: "metrics/resource"},
- "pods": {url: "/pods/", bucket: "pods"},
- "portForward": {url: "/portForward/podNamespace/podID", bucket: "portForward"},
- "portForward with uid": {url: "/portForward/podNamespace/podID/uid", bucket: "portForward"},
- "run": {url: "/run/podNamespace/podID/containerName", bucket: "run"},
- "run with uid": {url: "/run/podNamespace/podID/uid/containerName", bucket: "run"},
- "runningpods": {url: "/runningpods/", bucket: "runningpods"},
- "spec": {url: "/spec/", bucket: "spec"},
- "stats": {url: "/stats/", bucket: "stats"},
- "stats container sub": {url: "/stats/container", bucket: "stats"},
- "stats summary sub": {url: "/stats/summary", bucket: "stats"},
- "stats containerName with uid": {url: "/stats/namespace/podName/uid/containerName", bucket: "stats"},
- "stats containerName": {url: "/stats/podName/containerName", bucket: "stats"},
- "invalid path": {url: "/junk", bucket: "Invalid path"},
- "invalid path starting with good": {url: "/healthzjunk", bucket: "Invalid path"},
- }
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- for _, test := range tests {
- path := test.url
- bucket := test.bucket
- require.Equal(t, fw.serverUnderTest.getMetricBucket(path), bucket)
- }
- }
- func TestDebuggingDisabledHandlers(t *testing.T) {
- fw := newServerTestWithDebug(false, false, nil)
- defer fw.testHTTPServer.Close()
- paths := []string{
- "/run", "/exec", "/attach", "/portForward", "/containerLogs", "/runningpods",
- "/run/", "/exec/", "/attach/", "/portForward/", "/containerLogs/", "/runningpods/",
- "/run/xxx", "/exec/xxx", "/attach/xxx", "/debug/pprof/profile", "/logs/kubelet.log",
- }
- for _, p := range paths {
- resp, err := http.Get(fw.testHTTPServer.URL + p)
- require.NoError(t, err)
- assert.Equal(t, http.StatusMethodNotAllowed, resp.StatusCode)
- body, err := ioutil.ReadAll(resp.Body)
- require.NoError(t, err)
- assert.Equal(t, "Debug endpoints are disabled.\n", string(body))
- resp, err = http.Post(fw.testHTTPServer.URL+p, "", nil)
- require.NoError(t, err)
- assert.Equal(t, http.StatusMethodNotAllowed, resp.StatusCode)
- body, err = ioutil.ReadAll(resp.Body)
- require.NoError(t, err)
- assert.Equal(t, "Debug endpoints are disabled.\n", string(body))
- }
- // test some other paths, make sure they're working
- containerInfo := &cadvisorapi.ContainerInfo{
- ContainerReference: cadvisorapi.ContainerReference{
- Name: "/",
- },
- }
- fw.fakeKubelet.rawInfoFunc = func(req *cadvisorapi.ContainerInfoRequest) (map[string]*cadvisorapi.ContainerInfo, error) {
- return map[string]*cadvisorapi.ContainerInfo{
- containerInfo.Name: containerInfo,
- }, nil
- }
- resp, err := http.Get(fw.testHTTPServer.URL + "/stats")
- require.NoError(t, err)
- assert.Equal(t, http.StatusOK, resp.StatusCode)
- machineInfo := &cadvisorapi.MachineInfo{
- NumCores: 4,
- MemoryCapacity: 1024,
- }
- fw.fakeKubelet.machineInfoFunc = func() (*cadvisorapi.MachineInfo, error) {
- return machineInfo, nil
- }
- resp, err = http.Get(fw.testHTTPServer.URL + "/spec")
- require.NoError(t, err)
- assert.Equal(t, http.StatusOK, resp.StatusCode)
- }
- func TestTrimURLPath(t *testing.T) {
- tests := []struct {
- path, expected string
- }{
- {"", ""},
- {"//", ""},
- {"/pods", "pods"},
- {"pods", "pods"},
- {"pods/", "pods"},
- {"good/", "good"},
- {"pods/probes", "pods"},
- {"metrics", "metrics"},
- {"metrics/resource", "metrics/resource"},
- {"metrics/hello", "metrics/hello"},
- }
- for _, test := range tests {
- assert.Equal(t, test.expected, getURLRootPath(test.path), fmt.Sprintf("path is: %s", test.path))
- }
- }
|