client.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. package flocker
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "time"
  10. )
  11. // From https://github.com/ClusterHQ/flocker-docker-plugin/blob/master/flockerdockerplugin/adapter.py#L18
  12. const defaultVolumeSize = json.Number("107374182400")
  13. var (
  14. // A volume can take a long time to be available, if we don't want
  15. // Kubernetes to wait forever we need to stop trying after some time, that
  16. // time is defined here
  17. timeoutWaitingForVolume = 2 * time.Minute
  18. tickerWaitingForVolume = 5 * time.Second
  19. errStateNotFound = errors.New("State not found by Dataset ID")
  20. errConfigurationNotFound = errors.New("Configuration not found by Name")
  21. errFlockerControlServiceHost = errors.New("The volume config must have a key CONTROL_SERVICE_HOST defined in the OtherAttributes field")
  22. errFlockerControlServicePort = errors.New("The volume config must have a key CONTROL_SERVICE_PORT defined in the OtherAttributes field")
  23. errVolumeAlreadyExists = errors.New("The volume already exists")
  24. errVolumeDoesNotExist = errors.New("The volume does not exist")
  25. errUpdatingDataset = errors.New("It was impossible to update the dataset")
  26. )
  27. // Clientable exposes the needed methods to implement your own Flocker Client.
  28. type Clientable interface {
  29. CreateDataset(options *CreateDatasetOptions) (*DatasetState, error)
  30. DeleteDataset(datasetID string) error
  31. GetDatasetState(datasetID string) (*DatasetState, error)
  32. GetDatasetID(metaName string) (datasetID string, err error)
  33. GetPrimaryUUID() (primaryUUID string, err error)
  34. ListNodes() (nodes []NodeState, err error)
  35. UpdatePrimaryForDataset(primaryUUID, datasetID string) (*DatasetState, error)
  36. }
  37. // Client is a default Flocker Client.
  38. type Client struct {
  39. *http.Client
  40. schema string
  41. host string
  42. port int
  43. version string
  44. clientIP string
  45. maximumSize json.Number
  46. }
  47. var _ Clientable = &Client{}
  48. // NewClient creates a wrapper over http.Client to communicate with the flocker control service.
  49. func NewClient(host string, port int, clientIP string, caCertPath, keyPath, certPath string) (*Client, error) {
  50. client, err := newTLSClient(caCertPath, keyPath, certPath)
  51. if err != nil {
  52. return nil, err
  53. }
  54. return &Client{
  55. Client: client,
  56. schema: "https",
  57. host: host,
  58. port: port,
  59. version: "v1",
  60. maximumSize: defaultVolumeSize,
  61. clientIP: clientIP,
  62. }, nil
  63. }
  64. /*
  65. request do a request using the http.Client embedded to the control service
  66. and returns the response or an error in case it happens.
  67. Note: you will need to deal with the response body call to Close if you
  68. don't want to deal with problems later.
  69. */
  70. func (c Client) request(method, url string, payload interface{}) (*http.Response, error) {
  71. var (
  72. b []byte
  73. err error
  74. )
  75. if method == "POST" { // Just allow payload on POST
  76. b, err = json.Marshal(payload)
  77. if err != nil {
  78. return nil, err
  79. }
  80. }
  81. req, err := http.NewRequest(method, url, bytes.NewBuffer(b))
  82. if err != nil {
  83. return nil, err
  84. }
  85. req.Header.Set("Content-Type", "application/json")
  86. // REMEMBER TO CLOSE THE BODY IN THE OUTSIDE FUNCTION
  87. return c.Do(req)
  88. }
  89. // post performs a post request with the indicated payload
  90. func (c Client) post(url string, payload interface{}) (*http.Response, error) {
  91. return c.request("POST", url, payload)
  92. }
  93. // delete performs a delete request with the indicated payload
  94. func (c Client) delete(url string, payload interface{}) (*http.Response, error) {
  95. return c.request("DELETE", url, payload)
  96. }
  97. // get performs a get request
  98. func (c Client) get(url string) (*http.Response, error) {
  99. return c.request("GET", url, nil)
  100. }
  101. // getURL returns a full URI to the control service
  102. func (c Client) getURL(path string) string {
  103. return fmt.Sprintf("%s://%s:%d/%s/%s", c.schema, c.host, c.port, c.version, path)
  104. }
  105. type configurationPayload struct {
  106. Deleted bool `json:"deleted"`
  107. Primary string `json:"primary"`
  108. DatasetID string `json:"dataset_id,omitempty"`
  109. MaximumSize json.Number `json:"maximum_size,omitempty"`
  110. Metadata metadataPayload `json:"metadata,omitempty"`
  111. }
  112. type CreateDatasetOptions struct {
  113. Primary string `json:"primary"`
  114. DatasetID string `json:"dataset_id,omitempty"`
  115. MaximumSize int64 `json:"maximum_size,omitempty"`
  116. Metadata map[string]string `json:"metadata,omitempty"`
  117. }
  118. type metadataPayload struct {
  119. Name string `json:"name,omitempty"`
  120. }
  121. type DatasetState struct {
  122. Path string `json:"path"`
  123. DatasetID string `json:"dataset_id"`
  124. Primary string `json:"primary,omitempty"`
  125. MaximumSize json.Number `json:"maximum_size,omitempty"`
  126. }
  127. type datasetStatePayload struct {
  128. *DatasetState
  129. }
  130. type NodeState struct {
  131. UUID string `json:"uuid"`
  132. Host string `json:"host"`
  133. }
  134. // findIDInConfigurationsPayload returns the datasetID if it was found in the
  135. // configurations payload, otherwise it will return an error.
  136. func (c Client) findIDInConfigurationsPayload(body io.ReadCloser, name string) (datasetID string, err error) {
  137. var configurations []configurationPayload
  138. if err = json.NewDecoder(body).Decode(&configurations); err == nil {
  139. for _, r := range configurations {
  140. if r.Metadata.Name == name {
  141. return r.DatasetID, nil
  142. }
  143. }
  144. return "", errConfigurationNotFound
  145. }
  146. return "", err
  147. }
  148. // ListNodes returns a list of dataset agent nodes from Flocker Control Service
  149. func (c *Client) ListNodes() (nodes []NodeState, err error) {
  150. resp, err := c.get(c.getURL("state/nodes"))
  151. if err != nil {
  152. return []NodeState{}, err
  153. }
  154. defer resp.Body.Close()
  155. if resp.StatusCode >= 300 {
  156. return []NodeState{}, fmt.Errorf("Expected: {1,2}xx listing nodes, got: %d", resp.StatusCode)
  157. }
  158. err = json.NewDecoder(resp.Body).Decode(&nodes)
  159. if err != nil {
  160. return []NodeState{}, err
  161. }
  162. return nodes, err
  163. }
  164. // GetPrimaryUUID returns the UUID of the primary Flocker Control Service for
  165. // the given host.
  166. func (c Client) GetPrimaryUUID() (uuid string, err error) {
  167. states, err := c.ListNodes()
  168. if err != nil {
  169. return "", err
  170. }
  171. for _, s := range states {
  172. if s.Host == c.clientIP {
  173. return s.UUID, nil
  174. }
  175. }
  176. return "", fmt.Errorf("No node found with IP '%s', available nodes %+v", c.clientIP, states)
  177. }
  178. // DeleteDataset performs a delete request to the given datasetID
  179. func (c *Client) DeleteDataset(datasetID string) error {
  180. url := c.getURL(fmt.Sprintf("configuration/datasets/%s", datasetID))
  181. resp, err := c.delete(url, nil)
  182. if err != nil {
  183. return err
  184. }
  185. defer resp.Body.Close()
  186. if resp.StatusCode >= 300 {
  187. return fmt.Errorf("Expected: {1,2}xx deleting the dataset %s, got: %d", datasetID, resp.StatusCode)
  188. }
  189. return nil
  190. }
  191. // GetDatasetState performs a get request to get the state of the given datasetID, if
  192. // something goes wrong or the datasetID was not found it returns an error.
  193. func (c Client) GetDatasetState(datasetID string) (*DatasetState, error) {
  194. resp, err := c.get(c.getURL("state/datasets"))
  195. if err != nil {
  196. return nil, err
  197. }
  198. defer resp.Body.Close()
  199. var states []datasetStatePayload
  200. if err = json.NewDecoder(resp.Body).Decode(&states); err == nil {
  201. for _, s := range states {
  202. if s.DatasetID == datasetID {
  203. return s.DatasetState, nil
  204. }
  205. }
  206. return nil, errStateNotFound
  207. }
  208. return nil, err
  209. }
  210. /*
  211. CreateDataset creates a volume in Flocker, waits for it to be ready and
  212. returns the dataset id.
  213. This process is a little bit complex but follows this flow:
  214. 1. Find the Flocker Control Service UUID
  215. 2. If it already exists an error is returned
  216. 3. If it didn't previously exist, wait for it to be ready
  217. */
  218. func (c *Client) CreateDataset(options *CreateDatasetOptions) (datasetState *DatasetState, err error) {
  219. // 1) Find the primary Flocker UUID
  220. // Note: it could be cached, but doing this query we health check it
  221. if options.Primary == "" {
  222. options.Primary, err = c.GetPrimaryUUID()
  223. if err != nil {
  224. return nil, err
  225. }
  226. }
  227. if options.MaximumSize == 0 {
  228. options.MaximumSize, _ = c.maximumSize.Int64()
  229. }
  230. resp, err := c.post(c.getURL("configuration/datasets"), options)
  231. if err != nil {
  232. return nil, err
  233. }
  234. defer resp.Body.Close()
  235. // 2) Return if the dataset was previously created
  236. if resp.StatusCode == http.StatusConflict {
  237. return nil, errVolumeAlreadyExists
  238. }
  239. if resp.StatusCode >= 300 {
  240. return nil, fmt.Errorf("Expected: {1,2}xx creating the volume, got: %d", resp.StatusCode)
  241. }
  242. var p configurationPayload
  243. if err := json.NewDecoder(resp.Body).Decode(&p); err != nil {
  244. return nil, err
  245. }
  246. // 3) Wait until the dataset is ready for usage. In case it never gets
  247. // ready there is a timeoutChan that will return an error
  248. timeoutChan := time.NewTimer(timeoutWaitingForVolume).C
  249. tickChan := time.NewTicker(tickerWaitingForVolume).C
  250. for {
  251. var strErrDel string
  252. s, err := c.GetDatasetState(p.DatasetID)
  253. if err == nil {
  254. return s, nil
  255. } else if err != errStateNotFound {
  256. errDel := c.DeleteDataset(p.DatasetID)
  257. if errDel != nil {
  258. strErrDel = fmt.Sprintf(", deletion of dataset failed with %s", errDel)
  259. }
  260. return nil, fmt.Errorf("Flocker API error during dataset creation (datasetID %s): %s%s", p.DatasetID, err, strErrDel)
  261. }
  262. select {
  263. case <-timeoutChan:
  264. errDel := c.DeleteDataset(p.DatasetID)
  265. if errDel != nil {
  266. strErrDel = fmt.Sprintf(", deletion of dataset failed with %s", errDel)
  267. }
  268. return nil, fmt.Errorf("Flocker API timeout during dataset creation (datasetID %s): %s%s", p.DatasetID, err, strErrDel)
  269. case <-tickChan:
  270. break
  271. }
  272. }
  273. }
  274. // UpdatePrimaryForDataset will update the Primary for the given dataset
  275. // returning the current DatasetState.
  276. func (c Client) UpdatePrimaryForDataset(newPrimaryUUID, datasetID string) (*DatasetState, error) {
  277. payload := struct {
  278. Primary string `json:"primary"`
  279. }{
  280. Primary: newPrimaryUUID,
  281. }
  282. url := c.getURL(fmt.Sprintf("configuration/datasets/%s", datasetID))
  283. resp, err := c.post(url, payload)
  284. if err != nil {
  285. return nil, err
  286. }
  287. defer resp.Body.Close()
  288. if resp.StatusCode >= 300 {
  289. return nil, errUpdatingDataset
  290. }
  291. var s DatasetState
  292. if err := json.NewDecoder(resp.Body).Decode(&s); err != nil {
  293. return nil, err
  294. }
  295. return &s, nil
  296. }
  297. // GetDatasetID will return the DatasetID found for the given metadata name.
  298. func (c Client) GetDatasetID(metaName string) (datasetID string, err error) {
  299. resp, err := c.get(c.getURL("configuration/datasets"))
  300. if err != nil {
  301. return "", err
  302. }
  303. defer resp.Body.Close()
  304. var configurations []configurationPayload
  305. if err = json.NewDecoder(resp.Body).Decode(&configurations); err == nil {
  306. for _, c := range configurations {
  307. if c.Metadata.Name == metaName && c.Deleted == false {
  308. return c.DatasetID, nil
  309. }
  310. }
  311. return "", errConfigurationNotFound
  312. }
  313. return "", err
  314. }