ssh.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package ssh
  14. import (
  15. "bytes"
  16. "context"
  17. "fmt"
  18. "net"
  19. "os"
  20. "path/filepath"
  21. "time"
  22. "github.com/onsi/gomega"
  23. "golang.org/x/crypto/ssh"
  24. v1 "k8s.io/api/core/v1"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. "k8s.io/apimachinery/pkg/fields"
  27. "k8s.io/apimachinery/pkg/util/wait"
  28. clientset "k8s.io/client-go/kubernetes"
  29. sshutil "k8s.io/kubernetes/pkg/ssh"
  30. e2elog "k8s.io/kubernetes/test/e2e/framework/log"
  31. testutils "k8s.io/kubernetes/test/utils"
  32. )
  33. const (
  34. // ssh port
  35. sshPort = "22"
  36. // pollNodeInterval is how often to Poll pods.
  37. pollNodeInterval = 2 * time.Second
  38. // singleCallTimeout is how long to try single API calls (like 'get' or 'list'). Used to prevent
  39. // transient failures from failing tests.
  40. // TODO: client should not apply this timeout to Watch calls. Increased from 30s until that is fixed.
  41. singleCallTimeout = 5 * time.Minute
  42. )
  43. // GetSigner returns an ssh.Signer for the provider ("gce", etc.) that can be
  44. // used to SSH to their nodes.
  45. func GetSigner(provider string) (ssh.Signer, error) {
  46. // honor a consistent SSH key across all providers
  47. if path := os.Getenv("KUBE_SSH_KEY_PATH"); len(path) > 0 {
  48. return sshutil.MakePrivateKeySignerFromFile(path)
  49. }
  50. // Select the key itself to use. When implementing more providers here,
  51. // please also add them to any SSH tests that are disabled because of signer
  52. // support.
  53. keyfile := ""
  54. switch provider {
  55. case "gce", "gke", "kubemark":
  56. keyfile = os.Getenv("GCE_SSH_KEY")
  57. if keyfile == "" {
  58. keyfile = "google_compute_engine"
  59. }
  60. case "aws", "eks":
  61. keyfile = os.Getenv("AWS_SSH_KEY")
  62. if keyfile == "" {
  63. keyfile = "kube_aws_rsa"
  64. }
  65. case "local", "vsphere":
  66. keyfile = os.Getenv("LOCAL_SSH_KEY")
  67. if keyfile == "" {
  68. keyfile = "id_rsa"
  69. }
  70. case "skeleton":
  71. keyfile = os.Getenv("KUBE_SSH_KEY")
  72. if keyfile == "" {
  73. keyfile = "id_rsa"
  74. }
  75. default:
  76. return nil, fmt.Errorf("GetSigner(...) not implemented for %s", provider)
  77. }
  78. // Respect absolute paths for keys given by user, fallback to assuming
  79. // relative paths are in ~/.ssh
  80. if !filepath.IsAbs(keyfile) {
  81. keydir := filepath.Join(os.Getenv("HOME"), ".ssh")
  82. keyfile = filepath.Join(keydir, keyfile)
  83. }
  84. return sshutil.MakePrivateKeySignerFromFile(keyfile)
  85. }
  86. // NodeSSHHosts returns SSH-able host names for all schedulable nodes - this
  87. // excludes master node. If it can't find any external IPs, it falls back to
  88. // looking for internal IPs. If it can't find an internal IP for every node it
  89. // returns an error, though it still returns all hosts that it found in that
  90. // case.
  91. func NodeSSHHosts(c clientset.Interface) ([]string, error) {
  92. nodelist := waitListSchedulableNodesOrDie(c)
  93. hosts := nodeAddresses(nodelist, v1.NodeExternalIP)
  94. // If ExternalIPs aren't available for all nodes, try falling back to the InternalIPs.
  95. if len(hosts) < len(nodelist.Items) {
  96. e2elog.Logf("No external IP address on nodes, falling back to internal IPs")
  97. hosts = nodeAddresses(nodelist, v1.NodeInternalIP)
  98. }
  99. // Error if neither External nor Internal IPs weren't available for all nodes.
  100. if len(hosts) != len(nodelist.Items) {
  101. return hosts, fmt.Errorf(
  102. "only found %d IPs on nodes, but found %d nodes. Nodelist: %v",
  103. len(hosts), len(nodelist.Items), nodelist)
  104. }
  105. sshHosts := make([]string, 0, len(hosts))
  106. for _, h := range hosts {
  107. sshHosts = append(sshHosts, net.JoinHostPort(h, sshPort))
  108. }
  109. return sshHosts, nil
  110. }
  111. // Result holds the execution result of SSH command
  112. type Result struct {
  113. User string
  114. Host string
  115. Cmd string
  116. Stdout string
  117. Stderr string
  118. Code int
  119. }
  120. // NodeExec execs the given cmd on node via SSH. Note that the nodeName is an sshable name,
  121. // eg: the name returned by framework.GetMasterHost(). This is also not guaranteed to work across
  122. // cloud providers since it involves ssh.
  123. func NodeExec(nodeName, cmd, provider string) (Result, error) {
  124. return SSH(cmd, net.JoinHostPort(nodeName, sshPort), provider)
  125. }
  126. // SSH synchronously SSHs to a node running on provider and runs cmd. If there
  127. // is no error performing the SSH, the stdout, stderr, and exit code are
  128. // returned.
  129. func SSH(cmd, host, provider string) (Result, error) {
  130. result := Result{Host: host, Cmd: cmd}
  131. // Get a signer for the provider.
  132. signer, err := GetSigner(provider)
  133. if err != nil {
  134. return result, fmt.Errorf("error getting signer for provider %s: '%v'", provider, err)
  135. }
  136. // RunSSHCommand will default to Getenv("USER") if user == "", but we're
  137. // defaulting here as well for logging clarity.
  138. result.User = os.Getenv("KUBE_SSH_USER")
  139. if result.User == "" {
  140. result.User = os.Getenv("USER")
  141. }
  142. if bastion := os.Getenv("KUBE_SSH_BASTION"); len(bastion) > 0 {
  143. stdout, stderr, code, err := runSSHCommandViaBastion(cmd, result.User, bastion, host, signer)
  144. result.Stdout = stdout
  145. result.Stderr = stderr
  146. result.Code = code
  147. return result, err
  148. }
  149. stdout, stderr, code, err := sshutil.RunSSHCommand(cmd, result.User, host, signer)
  150. result.Stdout = stdout
  151. result.Stderr = stderr
  152. result.Code = code
  153. return result, err
  154. }
  155. // runSSHCommandViaBastion returns the stdout, stderr, and exit code from running cmd on
  156. // host as specific user, along with any SSH-level error. It uses an SSH proxy to connect
  157. // to bastion, then via that tunnel connects to the remote host. Similar to
  158. // sshutil.RunSSHCommand but scoped to the needs of the test infrastructure.
  159. func runSSHCommandViaBastion(cmd, user, bastion, host string, signer ssh.Signer) (string, string, int, error) {
  160. // Setup the config, dial the server, and open a session.
  161. config := &ssh.ClientConfig{
  162. User: user,
  163. Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)},
  164. HostKeyCallback: ssh.InsecureIgnoreHostKey(),
  165. Timeout: 150 * time.Second,
  166. }
  167. bastionClient, err := ssh.Dial("tcp", bastion, config)
  168. if err != nil {
  169. err = wait.Poll(5*time.Second, 20*time.Second, func() (bool, error) {
  170. fmt.Printf("error dialing %s@%s: '%v', retrying\n", user, bastion, err)
  171. if bastionClient, err = ssh.Dial("tcp", bastion, config); err != nil {
  172. return false, err
  173. }
  174. return true, nil
  175. })
  176. }
  177. if err != nil {
  178. return "", "", 0, fmt.Errorf("error getting SSH client to %s@%s: %v", user, bastion, err)
  179. }
  180. defer bastionClient.Close()
  181. conn, err := bastionClient.Dial("tcp", host)
  182. if err != nil {
  183. return "", "", 0, fmt.Errorf("error dialing %s from bastion: %v", host, err)
  184. }
  185. defer conn.Close()
  186. ncc, chans, reqs, err := ssh.NewClientConn(conn, host, config)
  187. if err != nil {
  188. return "", "", 0, fmt.Errorf("error creating forwarding connection %s from bastion: %v", host, err)
  189. }
  190. client := ssh.NewClient(ncc, chans, reqs)
  191. defer client.Close()
  192. session, err := client.NewSession()
  193. if err != nil {
  194. return "", "", 0, fmt.Errorf("error creating session to %s@%s from bastion: '%v'", user, host, err)
  195. }
  196. defer session.Close()
  197. // Run the command.
  198. code := 0
  199. var bout, berr bytes.Buffer
  200. session.Stdout, session.Stderr = &bout, &berr
  201. if err = session.Run(cmd); err != nil {
  202. // Check whether the command failed to run or didn't complete.
  203. if exiterr, ok := err.(*ssh.ExitError); ok {
  204. // If we got an ExitError and the exit code is nonzero, we'll
  205. // consider the SSH itself successful (just that the command run
  206. // errored on the host).
  207. if code = exiterr.ExitStatus(); code != 0 {
  208. err = nil
  209. }
  210. } else {
  211. // Some other kind of error happened (e.g. an IOError); consider the
  212. // SSH unsuccessful.
  213. err = fmt.Errorf("failed running `%s` on %s@%s: '%v'", cmd, user, host, err)
  214. }
  215. }
  216. return bout.String(), berr.String(), code, err
  217. }
  218. // LogResult records result log
  219. func LogResult(result Result) {
  220. remote := fmt.Sprintf("%s@%s", result.User, result.Host)
  221. e2elog.Logf("ssh %s: command: %s", remote, result.Cmd)
  222. e2elog.Logf("ssh %s: stdout: %q", remote, result.Stdout)
  223. e2elog.Logf("ssh %s: stderr: %q", remote, result.Stderr)
  224. e2elog.Logf("ssh %s: exit code: %d", remote, result.Code)
  225. }
  226. // IssueSSHCommandWithResult tries to execute a SSH command and returns the execution result
  227. func IssueSSHCommandWithResult(cmd, provider string, node *v1.Node) (*Result, error) {
  228. e2elog.Logf("Getting external IP address for %s", node.Name)
  229. host := ""
  230. for _, a := range node.Status.Addresses {
  231. if a.Type == v1.NodeExternalIP && a.Address != "" {
  232. host = net.JoinHostPort(a.Address, sshPort)
  233. break
  234. }
  235. }
  236. if host == "" {
  237. // No external IPs were found, let's try to use internal as plan B
  238. for _, a := range node.Status.Addresses {
  239. if a.Type == v1.NodeInternalIP && a.Address != "" {
  240. host = net.JoinHostPort(a.Address, sshPort)
  241. break
  242. }
  243. }
  244. }
  245. if host == "" {
  246. return nil, fmt.Errorf("couldn't find any IP address for node %s", node.Name)
  247. }
  248. e2elog.Logf("SSH %q on %s(%s)", cmd, node.Name, host)
  249. result, err := SSH(cmd, host, provider)
  250. LogResult(result)
  251. if result.Code != 0 || err != nil {
  252. return nil, fmt.Errorf("failed running %q: %v (exit code %d, stderr %v)",
  253. cmd, err, result.Code, result.Stderr)
  254. }
  255. return &result, nil
  256. }
  257. // IssueSSHCommand tries to execute a SSH command
  258. func IssueSSHCommand(cmd, provider string, node *v1.Node) error {
  259. _, err := IssueSSHCommandWithResult(cmd, provider, node)
  260. if err != nil {
  261. return err
  262. }
  263. return nil
  264. }
  265. // nodeAddresses returns the first address of the given type of each node.
  266. func nodeAddresses(nodelist *v1.NodeList, addrType v1.NodeAddressType) []string {
  267. hosts := []string{}
  268. for _, n := range nodelist.Items {
  269. for _, addr := range n.Status.Addresses {
  270. if addr.Type == addrType && addr.Address != "" {
  271. hosts = append(hosts, addr.Address)
  272. break
  273. }
  274. }
  275. }
  276. return hosts
  277. }
  278. // waitListSchedulableNodes is a wrapper around listing nodes supporting retries.
  279. func waitListSchedulableNodes(c clientset.Interface) (*v1.NodeList, error) {
  280. var nodes *v1.NodeList
  281. var err error
  282. if wait.PollImmediate(pollNodeInterval, singleCallTimeout, func() (bool, error) {
  283. nodes, err = c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{FieldSelector: fields.Set{
  284. "spec.unschedulable": "false",
  285. }.AsSelector().String()})
  286. if err != nil {
  287. if testutils.IsRetryableAPIError(err) {
  288. return false, nil
  289. }
  290. return false, err
  291. }
  292. return true, nil
  293. }) != nil {
  294. return nodes, err
  295. }
  296. return nodes, nil
  297. }
  298. // waitListSchedulableNodesOrDie is a wrapper around listing nodes supporting retries.
  299. func waitListSchedulableNodesOrDie(c clientset.Interface) *v1.NodeList {
  300. nodes, err := waitListSchedulableNodes(c)
  301. if err != nil {
  302. expectNoError(err, "Non-retryable failure or timed out while listing nodes for e2e cluster.")
  303. }
  304. return nodes
  305. }
  306. // expectNoError checks if "err" is set, and if so, fails assertion while logging the error.
  307. func expectNoError(err error, explain ...interface{}) {
  308. expectNoErrorWithOffset(1, err, explain...)
  309. }
  310. // expectNoErrorWithOffset checks if "err" is set, and if so, fails assertion while logging the error at "offset" levels above its caller
  311. // (for example, for call chain f -> g -> ExpectNoErrorWithOffset(1, ...) error would be logged for "f").
  312. func expectNoErrorWithOffset(offset int, err error, explain ...interface{}) {
  313. if err != nil {
  314. e2elog.Logf("Unexpected error occurred: %v", err)
  315. }
  316. gomega.ExpectWithOffset(1+offset, err).NotTo(gomega.HaveOccurred(), explain...)
  317. }