ssh.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package ssh
  14. import (
  15. "bytes"
  16. "fmt"
  17. "net"
  18. "os"
  19. "path/filepath"
  20. "time"
  21. "github.com/onsi/gomega"
  22. "golang.org/x/crypto/ssh"
  23. v1 "k8s.io/api/core/v1"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/apimachinery/pkg/fields"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. clientset "k8s.io/client-go/kubernetes"
  28. sshutil "k8s.io/kubernetes/pkg/ssh"
  29. e2elog "k8s.io/kubernetes/test/e2e/framework/log"
  30. testutils "k8s.io/kubernetes/test/utils"
  31. )
  32. const (
  33. // ssh port
  34. sshPort = "22"
  35. // pollNodeInterval is how often to Poll pods.
  36. pollNodeInterval = 2 * time.Second
  37. // singleCallTimeout is how long to try single API calls (like 'get' or 'list'). Used to prevent
  38. // transient failures from failing tests.
  39. // TODO: client should not apply this timeout to Watch calls. Increased from 30s until that is fixed.
  40. singleCallTimeout = 5 * time.Minute
  41. )
  42. // GetSigner returns an ssh.Signer for the provider ("gce", etc.) that can be
  43. // used to SSH to their nodes.
  44. func GetSigner(provider string) (ssh.Signer, error) {
  45. // honor a consistent SSH key across all providers
  46. if path := os.Getenv("KUBE_SSH_KEY_PATH"); len(path) > 0 {
  47. return sshutil.MakePrivateKeySignerFromFile(path)
  48. }
  49. // Select the key itself to use. When implementing more providers here,
  50. // please also add them to any SSH tests that are disabled because of signer
  51. // support.
  52. keyfile := ""
  53. switch provider {
  54. case "gce", "gke", "kubemark":
  55. keyfile = os.Getenv("GCE_SSH_KEY")
  56. if keyfile == "" {
  57. keyfile = "google_compute_engine"
  58. }
  59. case "aws", "eks":
  60. keyfile = os.Getenv("AWS_SSH_KEY")
  61. if keyfile == "" {
  62. keyfile = "kube_aws_rsa"
  63. }
  64. case "local", "vsphere":
  65. keyfile = os.Getenv("LOCAL_SSH_KEY")
  66. if keyfile == "" {
  67. keyfile = "id_rsa"
  68. }
  69. case "skeleton":
  70. keyfile = os.Getenv("KUBE_SSH_KEY")
  71. if keyfile == "" {
  72. keyfile = "id_rsa"
  73. }
  74. default:
  75. return nil, fmt.Errorf("GetSigner(...) not implemented for %s", provider)
  76. }
  77. // Respect absolute paths for keys given by user, fallback to assuming
  78. // relative paths are in ~/.ssh
  79. if !filepath.IsAbs(keyfile) {
  80. keydir := filepath.Join(os.Getenv("HOME"), ".ssh")
  81. keyfile = filepath.Join(keydir, keyfile)
  82. }
  83. return sshutil.MakePrivateKeySignerFromFile(keyfile)
  84. }
  85. // NodeSSHHosts returns SSH-able host names for all schedulable nodes - this
  86. // excludes master node. If it can't find any external IPs, it falls back to
  87. // looking for internal IPs. If it can't find an internal IP for every node it
  88. // returns an error, though it still returns all hosts that it found in that
  89. // case.
  90. func NodeSSHHosts(c clientset.Interface) ([]string, error) {
  91. nodelist := waitListSchedulableNodesOrDie(c)
  92. hosts := nodeAddresses(nodelist, v1.NodeExternalIP)
  93. // If ExternalIPs aren't set, assume the test programs can reach the
  94. // InternalIP. Simplified exception logic here assumes that the hosts will
  95. // either all have ExternalIP or none will. Simplifies handling here and
  96. // should be adequate since the setting of the external IPs is provider
  97. // specific: they should either all have them or none of them will.
  98. if len(hosts) == 0 {
  99. e2elog.Logf("No external IP address on nodes, falling back to internal IPs")
  100. hosts = nodeAddresses(nodelist, v1.NodeInternalIP)
  101. }
  102. // Error if any node didn't have an external/internal IP.
  103. if len(hosts) != len(nodelist.Items) {
  104. return hosts, fmt.Errorf(
  105. "only found %d IPs on nodes, but found %d nodes. Nodelist: %v",
  106. len(hosts), len(nodelist.Items), nodelist)
  107. }
  108. sshHosts := make([]string, 0, len(hosts))
  109. for _, h := range hosts {
  110. sshHosts = append(sshHosts, net.JoinHostPort(h, sshPort))
  111. }
  112. return sshHosts, nil
  113. }
  114. // Result holds the execution result of SSH command
  115. type Result struct {
  116. User string
  117. Host string
  118. Cmd string
  119. Stdout string
  120. Stderr string
  121. Code int
  122. }
  123. // NodeExec execs the given cmd on node via SSH. Note that the nodeName is an sshable name,
  124. // eg: the name returned by framework.GetMasterHost(). This is also not guaranteed to work across
  125. // cloud providers since it involves ssh.
  126. func NodeExec(nodeName, cmd, provider string) (Result, error) {
  127. return SSH(cmd, net.JoinHostPort(nodeName, sshPort), provider)
  128. }
  129. // SSH synchronously SSHs to a node running on provider and runs cmd. If there
  130. // is no error performing the SSH, the stdout, stderr, and exit code are
  131. // returned.
  132. func SSH(cmd, host, provider string) (Result, error) {
  133. result := Result{Host: host, Cmd: cmd}
  134. // Get a signer for the provider.
  135. signer, err := GetSigner(provider)
  136. if err != nil {
  137. return result, fmt.Errorf("error getting signer for provider %s: '%v'", provider, err)
  138. }
  139. // RunSSHCommand will default to Getenv("USER") if user == "", but we're
  140. // defaulting here as well for logging clarity.
  141. result.User = os.Getenv("KUBE_SSH_USER")
  142. if result.User == "" {
  143. result.User = os.Getenv("USER")
  144. }
  145. if bastion := os.Getenv("KUBE_SSH_BASTION"); len(bastion) > 0 {
  146. stdout, stderr, code, err := RunSSHCommandViaBastion(cmd, result.User, bastion, host, signer)
  147. result.Stdout = stdout
  148. result.Stderr = stderr
  149. result.Code = code
  150. return result, err
  151. }
  152. stdout, stderr, code, err := sshutil.RunSSHCommand(cmd, result.User, host, signer)
  153. result.Stdout = stdout
  154. result.Stderr = stderr
  155. result.Code = code
  156. return result, err
  157. }
  158. // RunSSHCommandViaBastion returns the stdout, stderr, and exit code from running cmd on
  159. // host as specific user, along with any SSH-level error. It uses an SSH proxy to connect
  160. // to bastion, then via that tunnel connects to the remote host. Similar to
  161. // sshutil.RunSSHCommand but scoped to the needs of the test infrastructure.
  162. func RunSSHCommandViaBastion(cmd, user, bastion, host string, signer ssh.Signer) (string, string, int, error) {
  163. // Setup the config, dial the server, and open a session.
  164. config := &ssh.ClientConfig{
  165. User: user,
  166. Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)},
  167. HostKeyCallback: ssh.InsecureIgnoreHostKey(),
  168. Timeout: 150 * time.Second,
  169. }
  170. bastionClient, err := ssh.Dial("tcp", bastion, config)
  171. if err != nil {
  172. err = wait.Poll(5*time.Second, 20*time.Second, func() (bool, error) {
  173. fmt.Printf("error dialing %s@%s: '%v', retrying\n", user, bastion, err)
  174. if bastionClient, err = ssh.Dial("tcp", bastion, config); err != nil {
  175. return false, err
  176. }
  177. return true, nil
  178. })
  179. }
  180. if err != nil {
  181. return "", "", 0, fmt.Errorf("error getting SSH client to %s@%s: %v", user, bastion, err)
  182. }
  183. defer bastionClient.Close()
  184. conn, err := bastionClient.Dial("tcp", host)
  185. if err != nil {
  186. return "", "", 0, fmt.Errorf("error dialing %s from bastion: %v", host, err)
  187. }
  188. defer conn.Close()
  189. ncc, chans, reqs, err := ssh.NewClientConn(conn, host, config)
  190. if err != nil {
  191. return "", "", 0, fmt.Errorf("error creating forwarding connection %s from bastion: %v", host, err)
  192. }
  193. client := ssh.NewClient(ncc, chans, reqs)
  194. defer client.Close()
  195. session, err := client.NewSession()
  196. if err != nil {
  197. return "", "", 0, fmt.Errorf("error creating session to %s@%s from bastion: '%v'", user, host, err)
  198. }
  199. defer session.Close()
  200. // Run the command.
  201. code := 0
  202. var bout, berr bytes.Buffer
  203. session.Stdout, session.Stderr = &bout, &berr
  204. if err = session.Run(cmd); err != nil {
  205. // Check whether the command failed to run or didn't complete.
  206. if exiterr, ok := err.(*ssh.ExitError); ok {
  207. // If we got an ExitError and the exit code is nonzero, we'll
  208. // consider the SSH itself successful (just that the command run
  209. // errored on the host).
  210. if code = exiterr.ExitStatus(); code != 0 {
  211. err = nil
  212. }
  213. } else {
  214. // Some other kind of error happened (e.g. an IOError); consider the
  215. // SSH unsuccessful.
  216. err = fmt.Errorf("failed running `%s` on %s@%s: '%v'", cmd, user, host, err)
  217. }
  218. }
  219. return bout.String(), berr.String(), code, err
  220. }
  221. // LogResult records result log
  222. func LogResult(result Result) {
  223. remote := fmt.Sprintf("%s@%s", result.User, result.Host)
  224. e2elog.Logf("ssh %s: command: %s", remote, result.Cmd)
  225. e2elog.Logf("ssh %s: stdout: %q", remote, result.Stdout)
  226. e2elog.Logf("ssh %s: stderr: %q", remote, result.Stderr)
  227. e2elog.Logf("ssh %s: exit code: %d", remote, result.Code)
  228. }
  229. // IssueSSHCommandWithResult tries to execute a SSH command and returns the execution result
  230. func IssueSSHCommandWithResult(cmd, provider string, node *v1.Node) (*Result, error) {
  231. e2elog.Logf("Getting external IP address for %s", node.Name)
  232. host := ""
  233. for _, a := range node.Status.Addresses {
  234. if a.Type == v1.NodeExternalIP && a.Address != "" {
  235. host = net.JoinHostPort(a.Address, sshPort)
  236. break
  237. }
  238. }
  239. if host == "" {
  240. // No external IPs were found, let's try to use internal as plan B
  241. for _, a := range node.Status.Addresses {
  242. if a.Type == v1.NodeInternalIP && a.Address != "" {
  243. host = net.JoinHostPort(a.Address, sshPort)
  244. break
  245. }
  246. }
  247. }
  248. if host == "" {
  249. return nil, fmt.Errorf("couldn't find any IP address for node %s", node.Name)
  250. }
  251. e2elog.Logf("SSH %q on %s(%s)", cmd, node.Name, host)
  252. result, err := SSH(cmd, host, provider)
  253. LogResult(result)
  254. if result.Code != 0 || err != nil {
  255. return nil, fmt.Errorf("failed running %q: %v (exit code %d, stderr %v)",
  256. cmd, err, result.Code, result.Stderr)
  257. }
  258. return &result, nil
  259. }
  260. // IssueSSHCommand tries to execute a SSH command
  261. func IssueSSHCommand(cmd, provider string, node *v1.Node) error {
  262. _, err := IssueSSHCommandWithResult(cmd, provider, node)
  263. if err != nil {
  264. return err
  265. }
  266. return nil
  267. }
  268. // nodeAddresses returns the first address of the given type of each node.
  269. func nodeAddresses(nodelist *v1.NodeList, addrType v1.NodeAddressType) []string {
  270. hosts := []string{}
  271. for _, n := range nodelist.Items {
  272. for _, addr := range n.Status.Addresses {
  273. if addr.Type == addrType && addr.Address != "" {
  274. hosts = append(hosts, addr.Address)
  275. break
  276. }
  277. }
  278. }
  279. return hosts
  280. }
  281. // waitListSchedulableNodes is a wrapper around listing nodes supporting retries.
  282. func waitListSchedulableNodes(c clientset.Interface) (*v1.NodeList, error) {
  283. var nodes *v1.NodeList
  284. var err error
  285. if wait.PollImmediate(pollNodeInterval, singleCallTimeout, func() (bool, error) {
  286. nodes, err = c.CoreV1().Nodes().List(metav1.ListOptions{FieldSelector: fields.Set{
  287. "spec.unschedulable": "false",
  288. }.AsSelector().String()})
  289. if err != nil {
  290. if testutils.IsRetryableAPIError(err) {
  291. return false, nil
  292. }
  293. return false, err
  294. }
  295. return true, nil
  296. }) != nil {
  297. return nodes, err
  298. }
  299. return nodes, nil
  300. }
  301. // waitListSchedulableNodesOrDie is a wrapper around listing nodes supporting retries.
  302. func waitListSchedulableNodesOrDie(c clientset.Interface) *v1.NodeList {
  303. nodes, err := waitListSchedulableNodes(c)
  304. if err != nil {
  305. expectNoError(err, "Non-retryable failure or timed out while listing nodes for e2e cluster.")
  306. }
  307. return nodes
  308. }
  309. // expectNoError checks if "err" is set, and if so, fails assertion while logging the error.
  310. func expectNoError(err error, explain ...interface{}) {
  311. expectNoErrorWithOffset(1, err, explain...)
  312. }
  313. // expectNoErrorWithOffset checks if "err" is set, and if so, fails assertion while logging the error at "offset" levels above its caller
  314. // (for example, for call chain f -> g -> ExpectNoErrorWithOffset(1, ...) error would be logged for "f").
  315. func expectNoErrorWithOffset(offset int, err error, explain ...interface{}) {
  316. if err != nil {
  317. e2elog.Logf("Unexpected error occurred: %v", err)
  318. }
  319. gomega.ExpectWithOffset(1+offset, err).NotTo(gomega.HaveOccurred(), explain...)
  320. }