etcd.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcd
  14. import (
  15. "context"
  16. "crypto/tls"
  17. "net"
  18. "net/url"
  19. "path/filepath"
  20. "strconv"
  21. "strings"
  22. "time"
  23. "github.com/pkg/errors"
  24. "go.etcd.io/etcd/clientv3"
  25. "go.etcd.io/etcd/pkg/transport"
  26. "google.golang.org/grpc"
  27. "k8s.io/apimachinery/pkg/util/wait"
  28. clientset "k8s.io/client-go/kubernetes"
  29. "k8s.io/klog"
  30. kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
  31. "k8s.io/kubernetes/cmd/kubeadm/app/constants"
  32. "k8s.io/kubernetes/cmd/kubeadm/app/util/config"
  33. )
  34. const etcdTimeout = 2 * time.Second
  35. // Exponential backoff for etcd operations
  36. var etcdBackoff = wait.Backoff{
  37. Steps: 11,
  38. Duration: 50 * time.Millisecond,
  39. Factor: 2.0,
  40. Jitter: 0.1,
  41. }
  42. // ClusterInterrogator is an interface to get etcd cluster related information
  43. type ClusterInterrogator interface {
  44. CheckClusterHealth() error
  45. GetClusterVersions() (map[string]string, error)
  46. GetVersion() (string, error)
  47. WaitForClusterAvailable(retries int, retryInterval time.Duration) (bool, error)
  48. Sync() error
  49. AddMember(name string, peerAddrs string) ([]Member, error)
  50. GetMemberID(peerURL string) (uint64, error)
  51. RemoveMember(id uint64) ([]Member, error)
  52. }
  53. // Client provides connection parameters for an etcd cluster
  54. type Client struct {
  55. Endpoints []string
  56. TLS *tls.Config
  57. }
  58. // New creates a new EtcdCluster client
  59. func New(endpoints []string, ca, cert, key string) (*Client, error) {
  60. client := Client{Endpoints: endpoints}
  61. if ca != "" || cert != "" || key != "" {
  62. tlsInfo := transport.TLSInfo{
  63. CertFile: cert,
  64. KeyFile: key,
  65. TrustedCAFile: ca,
  66. }
  67. tlsConfig, err := tlsInfo.ClientConfig()
  68. if err != nil {
  69. return nil, err
  70. }
  71. client.TLS = tlsConfig
  72. }
  73. return &client, nil
  74. }
  75. // NewFromCluster creates an etcd client for the etcd endpoints defined in the ClusterStatus value stored in
  76. // the kubeadm-config ConfigMap in kube-system namespace.
  77. // Once created, the client synchronizes client's endpoints with the known endpoints from the etcd membership API (reality check).
  78. func NewFromCluster(client clientset.Interface, certificatesDir string) (*Client, error) {
  79. // etcd is listening the API server advertise address on each control-plane node
  80. // so it is necessary to get the list of endpoints from kubeadm cluster status before connecting
  81. // Gets the cluster status
  82. clusterStatus, err := config.GetClusterStatus(client)
  83. if err != nil {
  84. return nil, err
  85. }
  86. // Get the list of etcd endpoints from cluster status
  87. endpoints := []string{}
  88. for _, e := range clusterStatus.APIEndpoints {
  89. endpoints = append(endpoints, GetClientURLByIP(e.AdvertiseAddress))
  90. }
  91. klog.V(1).Infof("etcd endpoints read from pods: %s", strings.Join(endpoints, ","))
  92. // Creates an etcd client
  93. etcdClient, err := New(
  94. endpoints,
  95. filepath.Join(certificatesDir, constants.EtcdCACertName),
  96. filepath.Join(certificatesDir, constants.EtcdHealthcheckClientCertName),
  97. filepath.Join(certificatesDir, constants.EtcdHealthcheckClientKeyName),
  98. )
  99. if err != nil {
  100. return nil, errors.Wrapf(err, "error creating etcd client for %v endpoints", endpoints)
  101. }
  102. // synchronizes client's endpoints with the known endpoints from the etcd membership.
  103. err = etcdClient.Sync()
  104. if err != nil {
  105. return nil, errors.Wrap(err, "error syncing endpoints with etc")
  106. }
  107. klog.V(1).Infof("update etcd endpoints: %s", strings.Join(etcdClient.Endpoints, ","))
  108. return etcdClient, nil
  109. }
  110. // dialTimeout is the timeout for failing to establish a connection.
  111. // It is set to >20 seconds as times shorter than that will cause TLS connections to fail
  112. // on heavily loaded arm64 CPUs (issue #64649)
  113. const dialTimeout = 40 * time.Second
  114. // Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
  115. func (c *Client) Sync() error {
  116. cli, err := clientv3.New(clientv3.Config{
  117. Endpoints: c.Endpoints,
  118. DialTimeout: dialTimeout,
  119. DialOptions: []grpc.DialOption{
  120. grpc.WithBlock(), // block until the underlying connection is up
  121. },
  122. TLS: c.TLS,
  123. })
  124. if err != nil {
  125. return err
  126. }
  127. defer cli.Close()
  128. // Syncs the list of endpoints
  129. var lastError error
  130. err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
  131. ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
  132. err = cli.Sync(ctx)
  133. cancel()
  134. if err == nil {
  135. return true, nil
  136. }
  137. klog.V(5).Infof("Failed to sync etcd endpoints: %v", err)
  138. lastError = err
  139. return false, nil
  140. })
  141. if err != nil {
  142. return lastError
  143. }
  144. klog.V(1).Infof("etcd endpoints read from etcd: %s", strings.Join(cli.Endpoints(), ","))
  145. c.Endpoints = cli.Endpoints()
  146. return nil
  147. }
  148. // Member struct defines an etcd member; it is used for avoiding to spread github.com/coreos/etcd dependency
  149. // across kubeadm codebase
  150. type Member struct {
  151. Name string
  152. PeerURL string
  153. }
  154. // GetMemberID returns the member ID of the given peer URL
  155. func (c *Client) GetMemberID(peerURL string) (uint64, error) {
  156. cli, err := clientv3.New(clientv3.Config{
  157. Endpoints: c.Endpoints,
  158. DialTimeout: dialTimeout,
  159. DialOptions: []grpc.DialOption{
  160. grpc.WithBlock(), // block until the underlying connection is up
  161. },
  162. TLS: c.TLS,
  163. })
  164. if err != nil {
  165. return 0, err
  166. }
  167. defer cli.Close()
  168. // Gets the member list
  169. var lastError error
  170. var resp *clientv3.MemberListResponse
  171. err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
  172. ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
  173. resp, err = cli.MemberList(ctx)
  174. cancel()
  175. if err == nil {
  176. return true, nil
  177. }
  178. klog.V(5).Infof("Failed to get etcd member list: %v", err)
  179. lastError = err
  180. return false, nil
  181. })
  182. if err != nil {
  183. return 0, lastError
  184. }
  185. for _, member := range resp.Members {
  186. if member.GetPeerURLs()[0] == peerURL {
  187. return member.GetID(), nil
  188. }
  189. }
  190. return 0, nil
  191. }
  192. // RemoveMember notifies an etcd cluster to remove an existing member
  193. func (c *Client) RemoveMember(id uint64) ([]Member, error) {
  194. cli, err := clientv3.New(clientv3.Config{
  195. Endpoints: c.Endpoints,
  196. DialTimeout: dialTimeout,
  197. DialOptions: []grpc.DialOption{
  198. grpc.WithBlock(), // block until the underlying connection is up
  199. },
  200. TLS: c.TLS,
  201. })
  202. if err != nil {
  203. return nil, err
  204. }
  205. defer cli.Close()
  206. // Remove an existing member from the cluster
  207. var lastError error
  208. var resp *clientv3.MemberRemoveResponse
  209. err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
  210. ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
  211. resp, err = cli.MemberRemove(ctx, id)
  212. cancel()
  213. if err == nil {
  214. return true, nil
  215. }
  216. klog.V(5).Infof("Failed to remove etcd member: %v", err)
  217. lastError = err
  218. return false, nil
  219. })
  220. if err != nil {
  221. return nil, lastError
  222. }
  223. // Returns the updated list of etcd members
  224. ret := []Member{}
  225. for _, m := range resp.Members {
  226. ret = append(ret, Member{Name: m.Name, PeerURL: m.PeerURLs[0]})
  227. }
  228. return ret, nil
  229. }
  230. // AddMember notifies an existing etcd cluster that a new member is joining
  231. func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
  232. // Parse the peer address, required to add the client URL later to the list
  233. // of endpoints for this client. Parsing as a first operation to make sure that
  234. // if this fails no member addition is performed on the etcd cluster.
  235. parsedPeerAddrs, err := url.Parse(peerAddrs)
  236. if err != nil {
  237. return nil, errors.Wrapf(err, "error parsing peer address %s", peerAddrs)
  238. }
  239. cli, err := clientv3.New(clientv3.Config{
  240. Endpoints: c.Endpoints,
  241. DialTimeout: dialTimeout,
  242. DialOptions: []grpc.DialOption{
  243. grpc.WithBlock(), // block until the underlying connection is up
  244. },
  245. TLS: c.TLS,
  246. })
  247. if err != nil {
  248. return nil, err
  249. }
  250. defer cli.Close()
  251. // Adds a new member to the cluster
  252. var lastError error
  253. var resp *clientv3.MemberAddResponse
  254. err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
  255. ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
  256. resp, err = cli.MemberAdd(ctx, []string{peerAddrs})
  257. cancel()
  258. if err == nil {
  259. return true, nil
  260. }
  261. klog.V(5).Infof("Failed to add etcd member: %v", err)
  262. lastError = err
  263. return false, nil
  264. })
  265. if err != nil {
  266. return nil, lastError
  267. }
  268. // Returns the updated list of etcd members
  269. ret := []Member{}
  270. for _, m := range resp.Members {
  271. // If the peer address matches, this is the member we are adding.
  272. // Use the name we passed to the function.
  273. if peerAddrs == m.PeerURLs[0] {
  274. ret = append(ret, Member{Name: name, PeerURL: peerAddrs})
  275. continue
  276. }
  277. // Otherwise, we are processing other existing etcd members returned by AddMembers.
  278. memberName := m.Name
  279. // In some cases during concurrent join, some members can end up without a name.
  280. // Use the member ID as name for those.
  281. if len(memberName) == 0 {
  282. memberName = strconv.FormatUint(m.ID, 16)
  283. }
  284. ret = append(ret, Member{Name: memberName, PeerURL: m.PeerURLs[0]})
  285. }
  286. // Add the new member client address to the list of endpoints
  287. c.Endpoints = append(c.Endpoints, GetClientURLByIP(parsedPeerAddrs.Hostname()))
  288. return ret, nil
  289. }
  290. // GetVersion returns the etcd version of the cluster.
  291. // An error is returned if the version of all endpoints do not match
  292. func (c *Client) GetVersion() (string, error) {
  293. var clusterVersion string
  294. versions, err := c.GetClusterVersions()
  295. if err != nil {
  296. return "", err
  297. }
  298. for _, v := range versions {
  299. if clusterVersion != "" && clusterVersion != v {
  300. return "", errors.Errorf("etcd cluster contains endpoints with mismatched versions: %v", versions)
  301. }
  302. clusterVersion = v
  303. }
  304. if clusterVersion == "" {
  305. return "", errors.New("could not determine cluster etcd version")
  306. }
  307. return clusterVersion, nil
  308. }
  309. // GetClusterVersions returns a map of the endpoints and their associated versions
  310. func (c *Client) GetClusterVersions() (map[string]string, error) {
  311. versions := make(map[string]string)
  312. statuses, err := c.getClusterStatus()
  313. if err != nil {
  314. return versions, err
  315. }
  316. for ep, status := range statuses {
  317. versions[ep] = status.Version
  318. }
  319. return versions, nil
  320. }
  321. // CheckClusterHealth returns nil for status Up or error for status Down
  322. func (c *Client) CheckClusterHealth() error {
  323. _, err := c.getClusterStatus()
  324. return err
  325. }
  326. // getClusterStatus returns nil for status Up (along with endpoint status response map) or error for status Down
  327. func (c *Client) getClusterStatus() (map[string]*clientv3.StatusResponse, error) {
  328. cli, err := clientv3.New(clientv3.Config{
  329. Endpoints: c.Endpoints,
  330. DialTimeout: dialTimeout,
  331. DialOptions: []grpc.DialOption{
  332. grpc.WithBlock(), // block until the underlying connection is up
  333. },
  334. TLS: c.TLS,
  335. })
  336. if err != nil {
  337. return nil, err
  338. }
  339. defer cli.Close()
  340. clusterStatus := make(map[string]*clientv3.StatusResponse)
  341. for _, ep := range c.Endpoints {
  342. // Gets the member status
  343. var lastError error
  344. var resp *clientv3.StatusResponse
  345. err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
  346. ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
  347. resp, err = cli.Status(ctx, ep)
  348. cancel()
  349. if err == nil {
  350. return true, nil
  351. }
  352. klog.V(5).Infof("Failed to get etcd status for %s: %v", ep, err)
  353. lastError = err
  354. return false, nil
  355. })
  356. if err != nil {
  357. return nil, lastError
  358. }
  359. clusterStatus[ep] = resp
  360. }
  361. return clusterStatus, nil
  362. }
  363. // WaitForClusterAvailable returns true if all endpoints in the cluster are available after retry attempts, an error is returned otherwise
  364. func (c *Client) WaitForClusterAvailable(retries int, retryInterval time.Duration) (bool, error) {
  365. for i := 0; i < retries; i++ {
  366. if i > 0 {
  367. klog.V(1).Infof("[etcd] Waiting %v until next retry\n", retryInterval)
  368. time.Sleep(retryInterval)
  369. }
  370. klog.V(2).Infof("[etcd] attempting to see if all cluster endpoints (%s) are available %d/%d", c.Endpoints, i+1, retries)
  371. _, err := c.getClusterStatus()
  372. if err != nil {
  373. switch err {
  374. case context.DeadlineExceeded:
  375. klog.V(1).Infof("[etcd] Attempt timed out")
  376. default:
  377. klog.V(1).Infof("[etcd] Attempt failed with error: %v\n", err)
  378. }
  379. continue
  380. }
  381. return true, nil
  382. }
  383. return false, errors.New("timeout waiting for etcd cluster to be available")
  384. }
  385. // GetClientURL creates an HTTPS URL that uses the configured advertise
  386. // address and client port for the API controller
  387. func GetClientURL(localEndpoint *kubeadmapi.APIEndpoint) string {
  388. return "https://" + net.JoinHostPort(localEndpoint.AdvertiseAddress, strconv.Itoa(constants.EtcdListenClientPort))
  389. }
  390. // GetPeerURL creates an HTTPS URL that uses the configured advertise
  391. // address and peer port for the API controller
  392. func GetPeerURL(localEndpoint *kubeadmapi.APIEndpoint) string {
  393. return "https://" + net.JoinHostPort(localEndpoint.AdvertiseAddress, strconv.Itoa(constants.EtcdListenPeerPort))
  394. }
  395. // GetClientURLByIP creates an HTTPS URL based on an IP address
  396. // and the client listening port.
  397. func GetClientURLByIP(ip string) string {
  398. return "https://" + net.JoinHostPort(ip, strconv.Itoa(constants.EtcdListenClientPort))
  399. }