etcd.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcd
  14. import (
  15. "context"
  16. "crypto/tls"
  17. "fmt"
  18. "net"
  19. "net/url"
  20. "path/filepath"
  21. "strconv"
  22. "strings"
  23. "time"
  24. "github.com/pkg/errors"
  25. "go.etcd.io/etcd/clientv3"
  26. "go.etcd.io/etcd/pkg/transport"
  27. "google.golang.org/grpc"
  28. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  29. "k8s.io/apimachinery/pkg/util/wait"
  30. clientset "k8s.io/client-go/kubernetes"
  31. "k8s.io/klog"
  32. kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
  33. "k8s.io/kubernetes/cmd/kubeadm/app/constants"
  34. "k8s.io/kubernetes/cmd/kubeadm/app/util/config"
  35. )
  36. const etcdTimeout = 2 * time.Second
  37. // Exponential backoff for etcd operations
  38. var etcdBackoff = wait.Backoff{
  39. Steps: 11,
  40. Duration: 50 * time.Millisecond,
  41. Factor: 2.0,
  42. Jitter: 0.1,
  43. }
  44. // ClusterInterrogator is an interface to get etcd cluster related information
  45. type ClusterInterrogator interface {
  46. CheckClusterHealth() error
  47. GetClusterVersions() (map[string]string, error)
  48. GetVersion() (string, error)
  49. WaitForClusterAvailable(retries int, retryInterval time.Duration) (bool, error)
  50. Sync() error
  51. AddMember(name string, peerAddrs string) ([]Member, error)
  52. GetMemberID(peerURL string) (uint64, error)
  53. RemoveMember(id uint64) ([]Member, error)
  54. }
  55. // Client provides connection parameters for an etcd cluster
  56. type Client struct {
  57. Endpoints []string
  58. TLS *tls.Config
  59. }
  60. // New creates a new EtcdCluster client
  61. func New(endpoints []string, ca, cert, key string) (*Client, error) {
  62. client := Client{Endpoints: endpoints}
  63. if ca != "" || cert != "" || key != "" {
  64. tlsInfo := transport.TLSInfo{
  65. CertFile: cert,
  66. KeyFile: key,
  67. TrustedCAFile: ca,
  68. }
  69. tlsConfig, err := tlsInfo.ClientConfig()
  70. if err != nil {
  71. return nil, err
  72. }
  73. client.TLS = tlsConfig
  74. }
  75. return &client, nil
  76. }
  77. // NewFromCluster creates an etcd client for the etcd endpoints present in etcd member list. In order to compose this information,
  78. // it will first discover at least one etcd endpoint to connect to. Once created, the client synchronizes client's endpoints with
  79. // the known endpoints from the etcd membership API, since it is the authoritative source of truth for the list of available members.
  80. func NewFromCluster(client clientset.Interface, certificatesDir string) (*Client, error) {
  81. // Discover at least one etcd endpoint to connect to by inspecting the existing etcd pods
  82. // Get the list of etcd endpoints
  83. endpoints, err := getEtcdEndpoints(client)
  84. if err != nil {
  85. return nil, err
  86. }
  87. klog.V(1).Infof("etcd endpoints read from pods: %s", strings.Join(endpoints, ","))
  88. // Creates an etcd client
  89. etcdClient, err := New(
  90. endpoints,
  91. filepath.Join(certificatesDir, constants.EtcdCACertName),
  92. filepath.Join(certificatesDir, constants.EtcdHealthcheckClientCertName),
  93. filepath.Join(certificatesDir, constants.EtcdHealthcheckClientKeyName),
  94. )
  95. if err != nil {
  96. return nil, errors.Wrapf(err, "error creating etcd client for %v endpoints", endpoints)
  97. }
  98. // synchronizes client's endpoints with the known endpoints from the etcd membership.
  99. err = etcdClient.Sync()
  100. if err != nil {
  101. return nil, errors.Wrap(err, "error syncing endpoints with etc")
  102. }
  103. klog.V(1).Infof("update etcd endpoints: %s", strings.Join(etcdClient.Endpoints, ","))
  104. return etcdClient, nil
  105. }
  106. // getEtcdEndpoints returns the list of etcd endpoints.
  107. func getEtcdEndpoints(client clientset.Interface) ([]string, error) {
  108. return getEtcdEndpointsWithBackoff(client, constants.StaticPodMirroringDefaultRetry)
  109. }
  110. func getEtcdEndpointsWithBackoff(client clientset.Interface, backoff wait.Backoff) ([]string, error) {
  111. etcdEndpoints, err := getRawEtcdEndpointsFromPodAnnotation(client, backoff)
  112. if err != nil {
  113. // NB: this is a fallback when there is no annotation found in the etcd pods that contains
  114. // the client URL, and so we fallback to reading the ClusterStatus struct present in the
  115. // kubeadm-config ConfigMap. This can happen for example, when performing the first
  116. // `kubeadm upgrade apply`. This logic will be removed when the cluster status struct
  117. // is removed from the kubeadm-config ConfigMap.
  118. return getRawEtcdEndpointsFromClusterStatus(client)
  119. }
  120. return etcdEndpoints, nil
  121. }
  122. // getRawEtcdEndpointsFromPodAnnotation returns the list of endpoints as reported on etcd's pod annotations using the given backoff
  123. func getRawEtcdEndpointsFromPodAnnotation(client clientset.Interface, backoff wait.Backoff) ([]string, error) {
  124. etcdEndpoints := []string{}
  125. var lastErr error
  126. // Let's tolerate some unexpected transient failures from the API server or load balancers. Also, if
  127. // static pods were not yet mirrored into the API server we want to wait for this propagation.
  128. err := wait.ExponentialBackoff(backoff, func() (bool, error) {
  129. var overallEtcdPodCount int
  130. if etcdEndpoints, overallEtcdPodCount, lastErr = getRawEtcdEndpointsFromPodAnnotationWithoutRetry(client); lastErr != nil {
  131. return false, nil
  132. }
  133. // TODO (ereslibre): this logic will need tweaking once that we get rid of the ClusterStatus, since we won't have
  134. // the ClusterStatus safety net we will have to retry in both cases.
  135. if len(etcdEndpoints) == 0 {
  136. if overallEtcdPodCount == 0 {
  137. return false, nil
  138. }
  139. // Fail fast scenario, to be removed once we get rid of the ClusterStatus
  140. return true, errors.New("etcd Pods exist, but no etcd endpoint annotations were found")
  141. }
  142. return true, nil
  143. })
  144. if err != nil {
  145. if lastErr != nil {
  146. return []string{}, errors.Wrap(lastErr, "could not retrieve the list of etcd endpoints")
  147. }
  148. return []string{}, errors.Wrap(err, "could not retrieve the list of etcd endpoints")
  149. }
  150. return etcdEndpoints, nil
  151. }
  152. // getRawEtcdEndpointsFromPodAnnotationWithoutRetry returns the list of etcd endpoints as reported by etcd Pod annotations,
  153. // along with the number of global etcd pods. This allows for callers to tell the difference between "no endpoints found",
  154. // and "no endpoints found and pods were listed", so they can skip retrying.
  155. func getRawEtcdEndpointsFromPodAnnotationWithoutRetry(client clientset.Interface) ([]string, int, error) {
  156. klog.V(3).Infof("retrieving etcd endpoints from %q annotation in etcd Pods", constants.EtcdAdvertiseClientUrlsAnnotationKey)
  157. podList, err := client.CoreV1().Pods(metav1.NamespaceSystem).List(
  158. context.TODO(),
  159. metav1.ListOptions{
  160. LabelSelector: fmt.Sprintf("component=%s,tier=%s", constants.Etcd, constants.ControlPlaneTier),
  161. },
  162. )
  163. if err != nil {
  164. return []string{}, 0, err
  165. }
  166. etcdEndpoints := []string{}
  167. for _, pod := range podList.Items {
  168. etcdEndpoint, ok := pod.ObjectMeta.Annotations[constants.EtcdAdvertiseClientUrlsAnnotationKey]
  169. if !ok {
  170. klog.V(3).Infof("etcd Pod %q is missing the %q annotation; cannot infer etcd advertise client URL using the Pod annotation", pod.ObjectMeta.Name, constants.EtcdAdvertiseClientUrlsAnnotationKey)
  171. continue
  172. }
  173. etcdEndpoints = append(etcdEndpoints, etcdEndpoint)
  174. }
  175. return etcdEndpoints, len(podList.Items), nil
  176. }
  177. // TODO: remove after 1.20, when the ClusterStatus struct is removed from the kubeadm-config ConfigMap.
  178. func getRawEtcdEndpointsFromClusterStatus(client clientset.Interface) ([]string, error) {
  179. klog.V(3).Info("retrieving etcd endpoints from the cluster status")
  180. clusterStatus, err := config.GetClusterStatus(client)
  181. if err != nil {
  182. return []string{}, err
  183. }
  184. etcdEndpoints := []string{}
  185. for _, e := range clusterStatus.APIEndpoints {
  186. etcdEndpoints = append(etcdEndpoints, GetClientURLByIP(e.AdvertiseAddress))
  187. }
  188. return etcdEndpoints, nil
  189. }
  190. // dialTimeout is the timeout for failing to establish a connection.
  191. // It is set to >20 seconds as times shorter than that will cause TLS connections to fail
  192. // on heavily loaded arm64 CPUs (issue #64649)
  193. const dialTimeout = 40 * time.Second
  194. // Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
  195. func (c *Client) Sync() error {
  196. cli, err := clientv3.New(clientv3.Config{
  197. Endpoints: c.Endpoints,
  198. DialTimeout: dialTimeout,
  199. DialOptions: []grpc.DialOption{
  200. grpc.WithBlock(), // block until the underlying connection is up
  201. },
  202. TLS: c.TLS,
  203. })
  204. if err != nil {
  205. return err
  206. }
  207. defer cli.Close()
  208. // Syncs the list of endpoints
  209. var lastError error
  210. err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
  211. ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
  212. err = cli.Sync(ctx)
  213. cancel()
  214. if err == nil {
  215. return true, nil
  216. }
  217. klog.V(5).Infof("Failed to sync etcd endpoints: %v", err)
  218. lastError = err
  219. return false, nil
  220. })
  221. if err != nil {
  222. return lastError
  223. }
  224. klog.V(1).Infof("etcd endpoints read from etcd: %s", strings.Join(cli.Endpoints(), ","))
  225. c.Endpoints = cli.Endpoints()
  226. return nil
  227. }
  228. // Member struct defines an etcd member; it is used for avoiding to spread github.com/coreos/etcd dependency
  229. // across kubeadm codebase
  230. type Member struct {
  231. Name string
  232. PeerURL string
  233. }
  234. // GetMemberID returns the member ID of the given peer URL
  235. func (c *Client) GetMemberID(peerURL string) (uint64, error) {
  236. cli, err := clientv3.New(clientv3.Config{
  237. Endpoints: c.Endpoints,
  238. DialTimeout: dialTimeout,
  239. DialOptions: []grpc.DialOption{
  240. grpc.WithBlock(), // block until the underlying connection is up
  241. },
  242. TLS: c.TLS,
  243. })
  244. if err != nil {
  245. return 0, err
  246. }
  247. defer cli.Close()
  248. // Gets the member list
  249. var lastError error
  250. var resp *clientv3.MemberListResponse
  251. err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
  252. ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
  253. resp, err = cli.MemberList(ctx)
  254. cancel()
  255. if err == nil {
  256. return true, nil
  257. }
  258. klog.V(5).Infof("Failed to get etcd member list: %v", err)
  259. lastError = err
  260. return false, nil
  261. })
  262. if err != nil {
  263. return 0, lastError
  264. }
  265. for _, member := range resp.Members {
  266. if member.GetPeerURLs()[0] == peerURL {
  267. return member.GetID(), nil
  268. }
  269. }
  270. return 0, nil
  271. }
  272. // RemoveMember notifies an etcd cluster to remove an existing member
  273. func (c *Client) RemoveMember(id uint64) ([]Member, error) {
  274. cli, err := clientv3.New(clientv3.Config{
  275. Endpoints: c.Endpoints,
  276. DialTimeout: dialTimeout,
  277. DialOptions: []grpc.DialOption{
  278. grpc.WithBlock(), // block until the underlying connection is up
  279. },
  280. TLS: c.TLS,
  281. })
  282. if err != nil {
  283. return nil, err
  284. }
  285. defer cli.Close()
  286. // Remove an existing member from the cluster
  287. var lastError error
  288. var resp *clientv3.MemberRemoveResponse
  289. err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
  290. ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
  291. resp, err = cli.MemberRemove(ctx, id)
  292. cancel()
  293. if err == nil {
  294. return true, nil
  295. }
  296. klog.V(5).Infof("Failed to remove etcd member: %v", err)
  297. lastError = err
  298. return false, nil
  299. })
  300. if err != nil {
  301. return nil, lastError
  302. }
  303. // Returns the updated list of etcd members
  304. ret := []Member{}
  305. for _, m := range resp.Members {
  306. ret = append(ret, Member{Name: m.Name, PeerURL: m.PeerURLs[0]})
  307. }
  308. return ret, nil
  309. }
  310. // AddMember notifies an existing etcd cluster that a new member is joining
  311. func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
  312. // Parse the peer address, required to add the client URL later to the list
  313. // of endpoints for this client. Parsing as a first operation to make sure that
  314. // if this fails no member addition is performed on the etcd cluster.
  315. parsedPeerAddrs, err := url.Parse(peerAddrs)
  316. if err != nil {
  317. return nil, errors.Wrapf(err, "error parsing peer address %s", peerAddrs)
  318. }
  319. cli, err := clientv3.New(clientv3.Config{
  320. Endpoints: c.Endpoints,
  321. DialTimeout: dialTimeout,
  322. DialOptions: []grpc.DialOption{
  323. grpc.WithBlock(), // block until the underlying connection is up
  324. },
  325. TLS: c.TLS,
  326. })
  327. if err != nil {
  328. return nil, err
  329. }
  330. defer cli.Close()
  331. // Adds a new member to the cluster
  332. var lastError error
  333. var resp *clientv3.MemberAddResponse
  334. err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
  335. ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
  336. resp, err = cli.MemberAdd(ctx, []string{peerAddrs})
  337. cancel()
  338. if err == nil {
  339. return true, nil
  340. }
  341. klog.V(5).Infof("Failed to add etcd member: %v", err)
  342. lastError = err
  343. return false, nil
  344. })
  345. if err != nil {
  346. return nil, lastError
  347. }
  348. // Returns the updated list of etcd members
  349. ret := []Member{}
  350. for _, m := range resp.Members {
  351. // If the peer address matches, this is the member we are adding.
  352. // Use the name we passed to the function.
  353. if peerAddrs == m.PeerURLs[0] {
  354. ret = append(ret, Member{Name: name, PeerURL: peerAddrs})
  355. continue
  356. }
  357. // Otherwise, we are processing other existing etcd members returned by AddMembers.
  358. memberName := m.Name
  359. // In some cases during concurrent join, some members can end up without a name.
  360. // Use the member ID as name for those.
  361. if len(memberName) == 0 {
  362. memberName = strconv.FormatUint(m.ID, 16)
  363. }
  364. ret = append(ret, Member{Name: memberName, PeerURL: m.PeerURLs[0]})
  365. }
  366. // Add the new member client address to the list of endpoints
  367. c.Endpoints = append(c.Endpoints, GetClientURLByIP(parsedPeerAddrs.Hostname()))
  368. return ret, nil
  369. }
  370. // GetVersion returns the etcd version of the cluster.
  371. // An error is returned if the version of all endpoints do not match
  372. func (c *Client) GetVersion() (string, error) {
  373. var clusterVersion string
  374. versions, err := c.GetClusterVersions()
  375. if err != nil {
  376. return "", err
  377. }
  378. for _, v := range versions {
  379. if clusterVersion != "" && clusterVersion != v {
  380. return "", errors.Errorf("etcd cluster contains endpoints with mismatched versions: %v", versions)
  381. }
  382. clusterVersion = v
  383. }
  384. if clusterVersion == "" {
  385. return "", errors.New("could not determine cluster etcd version")
  386. }
  387. return clusterVersion, nil
  388. }
  389. // GetClusterVersions returns a map of the endpoints and their associated versions
  390. func (c *Client) GetClusterVersions() (map[string]string, error) {
  391. versions := make(map[string]string)
  392. statuses, err := c.getClusterStatus()
  393. if err != nil {
  394. return versions, err
  395. }
  396. for ep, status := range statuses {
  397. versions[ep] = status.Version
  398. }
  399. return versions, nil
  400. }
  401. // CheckClusterHealth returns nil for status Up or error for status Down
  402. func (c *Client) CheckClusterHealth() error {
  403. _, err := c.getClusterStatus()
  404. return err
  405. }
  406. // getClusterStatus returns nil for status Up (along with endpoint status response map) or error for status Down
  407. func (c *Client) getClusterStatus() (map[string]*clientv3.StatusResponse, error) {
  408. cli, err := clientv3.New(clientv3.Config{
  409. Endpoints: c.Endpoints,
  410. DialTimeout: dialTimeout,
  411. DialOptions: []grpc.DialOption{
  412. grpc.WithBlock(), // block until the underlying connection is up
  413. },
  414. TLS: c.TLS,
  415. })
  416. if err != nil {
  417. return nil, err
  418. }
  419. defer cli.Close()
  420. clusterStatus := make(map[string]*clientv3.StatusResponse)
  421. for _, ep := range c.Endpoints {
  422. // Gets the member status
  423. var lastError error
  424. var resp *clientv3.StatusResponse
  425. err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
  426. ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
  427. resp, err = cli.Status(ctx, ep)
  428. cancel()
  429. if err == nil {
  430. return true, nil
  431. }
  432. klog.V(5).Infof("Failed to get etcd status for %s: %v", ep, err)
  433. lastError = err
  434. return false, nil
  435. })
  436. if err != nil {
  437. return nil, lastError
  438. }
  439. clusterStatus[ep] = resp
  440. }
  441. return clusterStatus, nil
  442. }
  443. // WaitForClusterAvailable returns true if all endpoints in the cluster are available after retry attempts, an error is returned otherwise
  444. func (c *Client) WaitForClusterAvailable(retries int, retryInterval time.Duration) (bool, error) {
  445. for i := 0; i < retries; i++ {
  446. if i > 0 {
  447. klog.V(1).Infof("[etcd] Waiting %v until next retry\n", retryInterval)
  448. time.Sleep(retryInterval)
  449. }
  450. klog.V(2).Infof("[etcd] attempting to see if all cluster endpoints (%s) are available %d/%d", c.Endpoints, i+1, retries)
  451. _, err := c.getClusterStatus()
  452. if err != nil {
  453. switch err {
  454. case context.DeadlineExceeded:
  455. klog.V(1).Infof("[etcd] Attempt timed out")
  456. default:
  457. klog.V(1).Infof("[etcd] Attempt failed with error: %v\n", err)
  458. }
  459. continue
  460. }
  461. return true, nil
  462. }
  463. return false, errors.New("timeout waiting for etcd cluster to be available")
  464. }
  465. // GetClientURL creates an HTTPS URL that uses the configured advertise
  466. // address and client port for the API controller
  467. func GetClientURL(localEndpoint *kubeadmapi.APIEndpoint) string {
  468. return "https://" + net.JoinHostPort(localEndpoint.AdvertiseAddress, strconv.Itoa(constants.EtcdListenClientPort))
  469. }
  470. // GetPeerURL creates an HTTPS URL that uses the configured advertise
  471. // address and peer port for the API controller
  472. func GetPeerURL(localEndpoint *kubeadmapi.APIEndpoint) string {
  473. return "https://" + net.JoinHostPort(localEndpoint.AdvertiseAddress, strconv.Itoa(constants.EtcdListenPeerPort))
  474. }
  475. // GetClientURLByIP creates an HTTPS URL based on an IP address
  476. // and the client listening port.
  477. func GetClientURLByIP(ip string) string {
  478. return "https://" + net.JoinHostPort(ip, strconv.Itoa(constants.EtcdListenClientPort))
  479. }