etcd.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcd
  14. import (
  15. "context"
  16. "crypto/tls"
  17. "net"
  18. "net/url"
  19. "path/filepath"
  20. "strconv"
  21. "strings"
  22. "time"
  23. "github.com/coreos/etcd/clientv3"
  24. "github.com/coreos/etcd/pkg/transport"
  25. "github.com/pkg/errors"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. clientset "k8s.io/client-go/kubernetes"
  28. "k8s.io/klog"
  29. kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
  30. "k8s.io/kubernetes/cmd/kubeadm/app/constants"
  31. "k8s.io/kubernetes/cmd/kubeadm/app/util/config"
  32. )
  33. // Exponential backoff for MemberAdd/Remove (values exclude jitter):
  34. // 0, 50, 150, 350, 750, 1550, 3150, 6350, 12750 ms
  35. var addRemoveBackoff = wait.Backoff{
  36. Steps: 8,
  37. Duration: 50 * time.Millisecond,
  38. Factor: 2.0,
  39. Jitter: 0.1,
  40. }
  41. // ClusterInterrogator is an interface to get etcd cluster related information
  42. type ClusterInterrogator interface {
  43. ClusterAvailable() (bool, error)
  44. GetClusterStatus() (map[string]*clientv3.StatusResponse, error)
  45. GetClusterVersions() (map[string]string, error)
  46. GetVersion() (string, error)
  47. WaitForClusterAvailable(retries int, retryInterval time.Duration) (bool, error)
  48. Sync() error
  49. AddMember(name string, peerAddrs string) ([]Member, error)
  50. GetMemberID(peerURL string) (uint64, error)
  51. RemoveMember(id uint64) ([]Member, error)
  52. }
  53. // Client provides connection parameters for an etcd cluster
  54. type Client struct {
  55. Endpoints []string
  56. TLS *tls.Config
  57. }
  58. // New creates a new EtcdCluster client
  59. func New(endpoints []string, ca, cert, key string) (*Client, error) {
  60. client := Client{Endpoints: endpoints}
  61. if ca != "" || cert != "" || key != "" {
  62. tlsInfo := transport.TLSInfo{
  63. CertFile: cert,
  64. KeyFile: key,
  65. TrustedCAFile: ca,
  66. }
  67. tlsConfig, err := tlsInfo.ClientConfig()
  68. if err != nil {
  69. return nil, err
  70. }
  71. client.TLS = tlsConfig
  72. }
  73. return &client, nil
  74. }
  75. // NewFromCluster creates an etcd client for the etcd endpoints defined in the ClusterStatus value stored in
  76. // the kubeadm-config ConfigMap in kube-system namespace.
  77. // Once created, the client synchronizes client's endpoints with the known endpoints from the etcd membership API (reality check).
  78. func NewFromCluster(client clientset.Interface, certificatesDir string) (*Client, error) {
  79. // etcd is listening the API server advertise address on each control-plane node
  80. // so it is necessary to get the list of endpoints from kubeadm cluster status before connecting
  81. // Gets the cluster status
  82. clusterStatus, err := config.GetClusterStatus(client)
  83. if err != nil {
  84. return nil, err
  85. }
  86. // Get the list of etcd endpoints from cluster status
  87. endpoints := []string{}
  88. for _, e := range clusterStatus.APIEndpoints {
  89. endpoints = append(endpoints, GetClientURLByIP(e.AdvertiseAddress))
  90. }
  91. klog.V(1).Infof("etcd endpoints read from pods: %s", strings.Join(endpoints, ","))
  92. // Creates an etcd client
  93. etcdClient, err := New(
  94. endpoints,
  95. filepath.Join(certificatesDir, constants.EtcdCACertName),
  96. filepath.Join(certificatesDir, constants.EtcdHealthcheckClientCertName),
  97. filepath.Join(certificatesDir, constants.EtcdHealthcheckClientKeyName),
  98. )
  99. if err != nil {
  100. return nil, errors.Wrapf(err, "error creating etcd client for %v endpoints", endpoints)
  101. }
  102. // synchronizes client's endpoints with the known endpoints from the etcd membership.
  103. err = etcdClient.Sync()
  104. if err != nil {
  105. return nil, errors.Wrap(err, "error syncing endpoints with etc")
  106. }
  107. klog.V(1).Infof("update etcd endpoints: %s", strings.Join(etcdClient.Endpoints, ","))
  108. return etcdClient, nil
  109. }
  110. // Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
  111. func (c *Client) Sync() error {
  112. cli, err := clientv3.New(clientv3.Config{
  113. Endpoints: c.Endpoints,
  114. DialTimeout: 20 * time.Second,
  115. TLS: c.TLS,
  116. })
  117. if err != nil {
  118. return err
  119. }
  120. defer cli.Close()
  121. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  122. err = cli.Sync(ctx)
  123. cancel()
  124. if err != nil {
  125. return err
  126. }
  127. klog.V(1).Infof("etcd endpoints read from etcd: %s", strings.Join(cli.Endpoints(), ","))
  128. c.Endpoints = cli.Endpoints()
  129. return nil
  130. }
  131. // Member struct defines an etcd member; it is used for avoiding to spread github.com/coreos/etcd dependency
  132. // across kubeadm codebase
  133. type Member struct {
  134. Name string
  135. PeerURL string
  136. }
  137. // GetMemberID returns the member ID of the given peer URL
  138. func (c *Client) GetMemberID(peerURL string) (uint64, error) {
  139. cli, err := clientv3.New(clientv3.Config{
  140. Endpoints: c.Endpoints,
  141. DialTimeout: 30 * time.Second,
  142. TLS: c.TLS,
  143. })
  144. if err != nil {
  145. return 0, err
  146. }
  147. defer cli.Close()
  148. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  149. resp, err := cli.MemberList(ctx)
  150. cancel()
  151. if err != nil {
  152. return 0, err
  153. }
  154. for _, member := range resp.Members {
  155. if member.GetPeerURLs()[0] == peerURL {
  156. return member.GetID(), nil
  157. }
  158. }
  159. return 0, nil
  160. }
  161. // RemoveMember notifies an etcd cluster to remove an existing member
  162. func (c *Client) RemoveMember(id uint64) ([]Member, error) {
  163. cli, err := clientv3.New(clientv3.Config{
  164. Endpoints: c.Endpoints,
  165. DialTimeout: 30 * time.Second,
  166. TLS: c.TLS,
  167. })
  168. if err != nil {
  169. return nil, err
  170. }
  171. defer cli.Close()
  172. // Remove an existing member from the cluster
  173. var lastError error
  174. var resp *clientv3.MemberRemoveResponse
  175. err = wait.ExponentialBackoff(addRemoveBackoff, func() (bool, error) {
  176. resp, err = cli.MemberRemove(context.Background(), id)
  177. if err == nil {
  178. return true, nil
  179. }
  180. lastError = err
  181. return false, nil
  182. })
  183. if err != nil {
  184. return nil, lastError
  185. }
  186. // Returns the updated list of etcd members
  187. ret := []Member{}
  188. for _, m := range resp.Members {
  189. ret = append(ret, Member{Name: m.Name, PeerURL: m.PeerURLs[0]})
  190. }
  191. return ret, nil
  192. }
  193. // AddMember notifies an existing etcd cluster that a new member is joining
  194. func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
  195. // Parse the peer address, required to add the client URL later to the list
  196. // of endpoints for this client. Parsing as a first operation to make sure that
  197. // if this fails no member addition is performed on the etcd cluster.
  198. parsedPeerAddrs, err := url.Parse(peerAddrs)
  199. if err != nil {
  200. return nil, errors.Wrapf(err, "error parsing peer address %s", peerAddrs)
  201. }
  202. cli, err := clientv3.New(clientv3.Config{
  203. Endpoints: c.Endpoints,
  204. DialTimeout: 20 * time.Second,
  205. TLS: c.TLS,
  206. })
  207. if err != nil {
  208. return nil, err
  209. }
  210. defer cli.Close()
  211. // Adds a new member to the cluster
  212. var lastError error
  213. var resp *clientv3.MemberAddResponse
  214. err = wait.ExponentialBackoff(addRemoveBackoff, func() (bool, error) {
  215. resp, err = cli.MemberAdd(context.Background(), []string{peerAddrs})
  216. if err == nil {
  217. return true, nil
  218. }
  219. lastError = err
  220. return false, nil
  221. })
  222. if err != nil {
  223. return nil, lastError
  224. }
  225. // Returns the updated list of etcd members
  226. ret := []Member{}
  227. for _, m := range resp.Members {
  228. // fixes the entry for the joining member (that doesn't have a name set in the initialCluster returned by etcd)
  229. if m.Name == "" {
  230. ret = append(ret, Member{Name: name, PeerURL: m.PeerURLs[0]})
  231. } else {
  232. ret = append(ret, Member{Name: m.Name, PeerURL: m.PeerURLs[0]})
  233. }
  234. }
  235. // Add the new member client address to the list of endpoints
  236. c.Endpoints = append(c.Endpoints, GetClientURLByIP(parsedPeerAddrs.Hostname()))
  237. return ret, nil
  238. }
  239. // GetVersion returns the etcd version of the cluster.
  240. // An error is returned if the version of all endpoints do not match
  241. func (c *Client) GetVersion() (string, error) {
  242. var clusterVersion string
  243. versions, err := c.GetClusterVersions()
  244. if err != nil {
  245. return "", err
  246. }
  247. for _, v := range versions {
  248. if clusterVersion != "" && clusterVersion != v {
  249. return "", errors.Errorf("etcd cluster contains endpoints with mismatched versions: %v", versions)
  250. }
  251. clusterVersion = v
  252. }
  253. if clusterVersion == "" {
  254. return "", errors.New("could not determine cluster etcd version")
  255. }
  256. return clusterVersion, nil
  257. }
  258. // GetClusterVersions returns a map of the endpoints and their associated versions
  259. func (c *Client) GetClusterVersions() (map[string]string, error) {
  260. versions := make(map[string]string)
  261. statuses, err := c.GetClusterStatus()
  262. if err != nil {
  263. return versions, err
  264. }
  265. for ep, status := range statuses {
  266. versions[ep] = status.Version
  267. }
  268. return versions, nil
  269. }
  270. // ClusterAvailable returns true if the cluster status indicates the cluster is available.
  271. func (c *Client) ClusterAvailable() (bool, error) {
  272. _, err := c.GetClusterStatus()
  273. if err != nil {
  274. return false, err
  275. }
  276. return true, nil
  277. }
  278. // GetClusterStatus returns nil for status Up or error for status Down
  279. func (c *Client) GetClusterStatus() (map[string]*clientv3.StatusResponse, error) {
  280. cli, err := clientv3.New(clientv3.Config{
  281. Endpoints: c.Endpoints,
  282. DialTimeout: 5 * time.Second,
  283. TLS: c.TLS,
  284. })
  285. if err != nil {
  286. return nil, err
  287. }
  288. defer cli.Close()
  289. clusterStatus := make(map[string]*clientv3.StatusResponse)
  290. for _, ep := range c.Endpoints {
  291. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  292. resp, err := cli.Status(ctx, ep)
  293. cancel()
  294. if err != nil {
  295. return nil, err
  296. }
  297. clusterStatus[ep] = resp
  298. }
  299. return clusterStatus, nil
  300. }
  301. // WaitForClusterAvailable returns true if all endpoints in the cluster are available after retry attempts, an error is returned otherwise
  302. func (c *Client) WaitForClusterAvailable(retries int, retryInterval time.Duration) (bool, error) {
  303. for i := 0; i < retries; i++ {
  304. if i > 0 {
  305. klog.V(1).Infof("[etcd] Waiting %v until next retry\n", retryInterval)
  306. time.Sleep(retryInterval)
  307. }
  308. klog.V(2).Infof("[etcd] attempting to see if all cluster endpoints (%s) are available %d/%d", c.Endpoints, i+1, retries)
  309. resp, err := c.ClusterAvailable()
  310. if err != nil {
  311. switch err {
  312. case context.DeadlineExceeded:
  313. klog.V(1).Infof("[etcd] Attempt timed out")
  314. default:
  315. klog.V(1).Infof("[etcd] Attempt failed with error: %v\n", err)
  316. }
  317. continue
  318. }
  319. return resp, nil
  320. }
  321. return false, errors.New("timeout waiting for etcd cluster to be available")
  322. }
  323. // CheckConfigurationIsHA returns true if the given InitConfiguration etcd block appears to be an HA configuration.
  324. func CheckConfigurationIsHA(cfg *kubeadmapi.Etcd) bool {
  325. return cfg.External != nil && len(cfg.External.Endpoints) > 1
  326. }
  327. // GetClientURL creates an HTTPS URL that uses the configured advertise
  328. // address and client port for the API controller
  329. func GetClientURL(localEndpoint *kubeadmapi.APIEndpoint) string {
  330. return "https://" + net.JoinHostPort(localEndpoint.AdvertiseAddress, strconv.Itoa(constants.EtcdListenClientPort))
  331. }
  332. // GetPeerURL creates an HTTPS URL that uses the configured advertise
  333. // address and peer port for the API controller
  334. func GetPeerURL(localEndpoint *kubeadmapi.APIEndpoint) string {
  335. return "https://" + net.JoinHostPort(localEndpoint.AdvertiseAddress, strconv.Itoa(constants.EtcdListenPeerPort))
  336. }
  337. // GetClientURLByIP creates an HTTPS URL based on an IP address
  338. // and the client listening port.
  339. func GetClientURLByIP(ip string) string {
  340. return "https://" + net.JoinHostPort(ip, strconv.Itoa(constants.EtcdListenClientPort))
  341. }