ssh.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package ssh
  14. import (
  15. "bytes"
  16. "context"
  17. "fmt"
  18. "net"
  19. "os"
  20. "path/filepath"
  21. "time"
  22. "github.com/onsi/gomega"
  23. "golang.org/x/crypto/ssh"
  24. v1 "k8s.io/api/core/v1"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. "k8s.io/apimachinery/pkg/fields"
  27. "k8s.io/apimachinery/pkg/util/wait"
  28. clientset "k8s.io/client-go/kubernetes"
  29. sshutil "k8s.io/kubernetes/pkg/ssh"
  30. e2elog "k8s.io/kubernetes/test/e2e/framework/log"
  31. testutils "k8s.io/kubernetes/test/utils"
  32. )
  33. const (
  34. // ssh port
  35. sshPort = "22"
  36. // pollNodeInterval is how often to Poll pods.
  37. pollNodeInterval = 2 * time.Second
  38. // singleCallTimeout is how long to try single API calls (like 'get' or 'list'). Used to prevent
  39. // transient failures from failing tests.
  40. singleCallTimeout = 5 * time.Minute
  41. )
  42. // GetSigner returns an ssh.Signer for the provider ("gce", etc.) that can be
  43. // used to SSH to their nodes.
  44. func GetSigner(provider string) (ssh.Signer, error) {
  45. // honor a consistent SSH key across all providers
  46. if path := os.Getenv("KUBE_SSH_KEY_PATH"); len(path) > 0 {
  47. return sshutil.MakePrivateKeySignerFromFile(path)
  48. }
  49. // Select the key itself to use. When implementing more providers here,
  50. // please also add them to any SSH tests that are disabled because of signer
  51. // support.
  52. keyfile := ""
  53. switch provider {
  54. case "gce", "gke", "kubemark":
  55. keyfile = os.Getenv("GCE_SSH_KEY")
  56. if keyfile == "" {
  57. keyfile = "google_compute_engine"
  58. }
  59. case "aws", "eks":
  60. keyfile = os.Getenv("AWS_SSH_KEY")
  61. if keyfile == "" {
  62. keyfile = "kube_aws_rsa"
  63. }
  64. case "local", "vsphere":
  65. keyfile = os.Getenv("LOCAL_SSH_KEY")
  66. if keyfile == "" {
  67. keyfile = "id_rsa"
  68. }
  69. case "skeleton":
  70. keyfile = os.Getenv("KUBE_SSH_KEY")
  71. if keyfile == "" {
  72. keyfile = "id_rsa"
  73. }
  74. default:
  75. return nil, fmt.Errorf("GetSigner(...) not implemented for %s", provider)
  76. }
  77. // Respect absolute paths for keys given by user, fallback to assuming
  78. // relative paths are in ~/.ssh
  79. if !filepath.IsAbs(keyfile) {
  80. keydir := filepath.Join(os.Getenv("HOME"), ".ssh")
  81. keyfile = filepath.Join(keydir, keyfile)
  82. }
  83. return sshutil.MakePrivateKeySignerFromFile(keyfile)
  84. }
  85. // NodeSSHHosts returns SSH-able host names for all schedulable nodes - this
  86. // excludes master node. If it can't find any external IPs, it falls back to
  87. // looking for internal IPs. If it can't find an internal IP for every node it
  88. // returns an error, though it still returns all hosts that it found in that
  89. // case.
  90. func NodeSSHHosts(c clientset.Interface) ([]string, error) {
  91. nodelist := waitListSchedulableNodesOrDie(c)
  92. hosts := nodeAddresses(nodelist, v1.NodeExternalIP)
  93. // If ExternalIPs aren't available for all nodes, try falling back to the InternalIPs.
  94. if len(hosts) < len(nodelist.Items) {
  95. e2elog.Logf("No external IP address on nodes, falling back to internal IPs")
  96. hosts = nodeAddresses(nodelist, v1.NodeInternalIP)
  97. }
  98. // Error if neither External nor Internal IPs weren't available for all nodes.
  99. if len(hosts) != len(nodelist.Items) {
  100. return hosts, fmt.Errorf(
  101. "only found %d IPs on nodes, but found %d nodes. Nodelist: %v",
  102. len(hosts), len(nodelist.Items), nodelist)
  103. }
  104. sshHosts := make([]string, 0, len(hosts))
  105. for _, h := range hosts {
  106. sshHosts = append(sshHosts, net.JoinHostPort(h, sshPort))
  107. }
  108. return sshHosts, nil
  109. }
  110. // Result holds the execution result of SSH command
  111. type Result struct {
  112. User string
  113. Host string
  114. Cmd string
  115. Stdout string
  116. Stderr string
  117. Code int
  118. }
  119. // NodeExec execs the given cmd on node via SSH. Note that the nodeName is an sshable name,
  120. // eg: the name returned by framework.GetMasterHost(). This is also not guaranteed to work across
  121. // cloud providers since it involves ssh.
  122. func NodeExec(nodeName, cmd, provider string) (Result, error) {
  123. return SSH(cmd, net.JoinHostPort(nodeName, sshPort), provider)
  124. }
  125. // SSH synchronously SSHs to a node running on provider and runs cmd. If there
  126. // is no error performing the SSH, the stdout, stderr, and exit code are
  127. // returned.
  128. func SSH(cmd, host, provider string) (Result, error) {
  129. result := Result{Host: host, Cmd: cmd}
  130. // Get a signer for the provider.
  131. signer, err := GetSigner(provider)
  132. if err != nil {
  133. return result, fmt.Errorf("error getting signer for provider %s: '%v'", provider, err)
  134. }
  135. // RunSSHCommand will default to Getenv("USER") if user == "", but we're
  136. // defaulting here as well for logging clarity.
  137. result.User = os.Getenv("KUBE_SSH_USER")
  138. if result.User == "" {
  139. result.User = os.Getenv("USER")
  140. }
  141. if bastion := os.Getenv("KUBE_SSH_BASTION"); len(bastion) > 0 {
  142. stdout, stderr, code, err := runSSHCommandViaBastion(cmd, result.User, bastion, host, signer)
  143. result.Stdout = stdout
  144. result.Stderr = stderr
  145. result.Code = code
  146. return result, err
  147. }
  148. stdout, stderr, code, err := sshutil.RunSSHCommand(cmd, result.User, host, signer)
  149. result.Stdout = stdout
  150. result.Stderr = stderr
  151. result.Code = code
  152. return result, err
  153. }
  154. // runSSHCommandViaBastion returns the stdout, stderr, and exit code from running cmd on
  155. // host as specific user, along with any SSH-level error. It uses an SSH proxy to connect
  156. // to bastion, then via that tunnel connects to the remote host. Similar to
  157. // sshutil.RunSSHCommand but scoped to the needs of the test infrastructure.
  158. func runSSHCommandViaBastion(cmd, user, bastion, host string, signer ssh.Signer) (string, string, int, error) {
  159. // Setup the config, dial the server, and open a session.
  160. config := &ssh.ClientConfig{
  161. User: user,
  162. Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)},
  163. HostKeyCallback: ssh.InsecureIgnoreHostKey(),
  164. Timeout: 150 * time.Second,
  165. }
  166. bastionClient, err := ssh.Dial("tcp", bastion, config)
  167. if err != nil {
  168. err = wait.Poll(5*time.Second, 20*time.Second, func() (bool, error) {
  169. fmt.Printf("error dialing %s@%s: '%v', retrying\n", user, bastion, err)
  170. if bastionClient, err = ssh.Dial("tcp", bastion, config); err != nil {
  171. return false, err
  172. }
  173. return true, nil
  174. })
  175. }
  176. if err != nil {
  177. return "", "", 0, fmt.Errorf("error getting SSH client to %s@%s: %v", user, bastion, err)
  178. }
  179. defer bastionClient.Close()
  180. conn, err := bastionClient.Dial("tcp", host)
  181. if err != nil {
  182. return "", "", 0, fmt.Errorf("error dialing %s from bastion: %v", host, err)
  183. }
  184. defer conn.Close()
  185. ncc, chans, reqs, err := ssh.NewClientConn(conn, host, config)
  186. if err != nil {
  187. return "", "", 0, fmt.Errorf("error creating forwarding connection %s from bastion: %v", host, err)
  188. }
  189. client := ssh.NewClient(ncc, chans, reqs)
  190. defer client.Close()
  191. session, err := client.NewSession()
  192. if err != nil {
  193. return "", "", 0, fmt.Errorf("error creating session to %s@%s from bastion: '%v'", user, host, err)
  194. }
  195. defer session.Close()
  196. // Run the command.
  197. code := 0
  198. var bout, berr bytes.Buffer
  199. session.Stdout, session.Stderr = &bout, &berr
  200. if err = session.Run(cmd); err != nil {
  201. // Check whether the command failed to run or didn't complete.
  202. if exiterr, ok := err.(*ssh.ExitError); ok {
  203. // If we got an ExitError and the exit code is nonzero, we'll
  204. // consider the SSH itself successful (just that the command run
  205. // errored on the host).
  206. if code = exiterr.ExitStatus(); code != 0 {
  207. err = nil
  208. }
  209. } else {
  210. // Some other kind of error happened (e.g. an IOError); consider the
  211. // SSH unsuccessful.
  212. err = fmt.Errorf("failed running `%s` on %s@%s: '%v'", cmd, user, host, err)
  213. }
  214. }
  215. return bout.String(), berr.String(), code, err
  216. }
  217. // LogResult records result log
  218. func LogResult(result Result) {
  219. remote := fmt.Sprintf("%s@%s", result.User, result.Host)
  220. e2elog.Logf("ssh %s: command: %s", remote, result.Cmd)
  221. e2elog.Logf("ssh %s: stdout: %q", remote, result.Stdout)
  222. e2elog.Logf("ssh %s: stderr: %q", remote, result.Stderr)
  223. e2elog.Logf("ssh %s: exit code: %d", remote, result.Code)
  224. }
  225. // IssueSSHCommandWithResult tries to execute a SSH command and returns the execution result
  226. func IssueSSHCommandWithResult(cmd, provider string, node *v1.Node) (*Result, error) {
  227. e2elog.Logf("Getting external IP address for %s", node.Name)
  228. host := ""
  229. for _, a := range node.Status.Addresses {
  230. if a.Type == v1.NodeExternalIP && a.Address != "" {
  231. host = net.JoinHostPort(a.Address, sshPort)
  232. break
  233. }
  234. }
  235. if host == "" {
  236. // No external IPs were found, let's try to use internal as plan B
  237. for _, a := range node.Status.Addresses {
  238. if a.Type == v1.NodeInternalIP && a.Address != "" {
  239. host = net.JoinHostPort(a.Address, sshPort)
  240. break
  241. }
  242. }
  243. }
  244. if host == "" {
  245. return nil, fmt.Errorf("couldn't find any IP address for node %s", node.Name)
  246. }
  247. e2elog.Logf("SSH %q on %s(%s)", cmd, node.Name, host)
  248. result, err := SSH(cmd, host, provider)
  249. LogResult(result)
  250. if result.Code != 0 || err != nil {
  251. return nil, fmt.Errorf("failed running %q: %v (exit code %d, stderr %v)",
  252. cmd, err, result.Code, result.Stderr)
  253. }
  254. return &result, nil
  255. }
  256. // IssueSSHCommand tries to execute a SSH command
  257. func IssueSSHCommand(cmd, provider string, node *v1.Node) error {
  258. _, err := IssueSSHCommandWithResult(cmd, provider, node)
  259. if err != nil {
  260. return err
  261. }
  262. return nil
  263. }
  264. // nodeAddresses returns the first address of the given type of each node.
  265. func nodeAddresses(nodelist *v1.NodeList, addrType v1.NodeAddressType) []string {
  266. hosts := []string{}
  267. for _, n := range nodelist.Items {
  268. for _, addr := range n.Status.Addresses {
  269. if addr.Type == addrType && addr.Address != "" {
  270. hosts = append(hosts, addr.Address)
  271. break
  272. }
  273. }
  274. }
  275. return hosts
  276. }
  277. // waitListSchedulableNodes is a wrapper around listing nodes supporting retries.
  278. func waitListSchedulableNodes(c clientset.Interface) (*v1.NodeList, error) {
  279. var nodes *v1.NodeList
  280. var err error
  281. if wait.PollImmediate(pollNodeInterval, singleCallTimeout, func() (bool, error) {
  282. nodes, err = c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{FieldSelector: fields.Set{
  283. "spec.unschedulable": "false",
  284. }.AsSelector().String()})
  285. if err != nil {
  286. if testutils.IsRetryableAPIError(err) {
  287. return false, nil
  288. }
  289. return false, err
  290. }
  291. return true, nil
  292. }) != nil {
  293. return nodes, err
  294. }
  295. return nodes, nil
  296. }
  297. // waitListSchedulableNodesOrDie is a wrapper around listing nodes supporting retries.
  298. func waitListSchedulableNodesOrDie(c clientset.Interface) *v1.NodeList {
  299. nodes, err := waitListSchedulableNodes(c)
  300. if err != nil {
  301. expectNoError(err, "Non-retryable failure or timed out while listing nodes for e2e cluster.")
  302. }
  303. return nodes
  304. }
  305. // expectNoError checks if "err" is set, and if so, fails assertion while logging the error.
  306. func expectNoError(err error, explain ...interface{}) {
  307. expectNoErrorWithOffset(1, err, explain...)
  308. }
  309. // expectNoErrorWithOffset checks if "err" is set, and if so, fails assertion while logging the error at "offset" levels above its caller
  310. // (for example, for call chain f -> g -> ExpectNoErrorWithOffset(1, ...) error would be logged for "f").
  311. func expectNoErrorWithOffset(offset int, err error, explain ...interface{}) {
  312. if err != nil {
  313. e2elog.Logf("Unexpected error occurred: %v", err)
  314. }
  315. gomega.ExpectWithOffset(1+offset, err).NotTo(gomega.HaveOccurred(), explain...)
  316. }