123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377 |
- package flocker
- import (
- "bytes"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "net/http"
- "time"
- )
- // From https://github.com/ClusterHQ/flocker-docker-plugin/blob/master/flockerdockerplugin/adapter.py#L18
- const defaultVolumeSize = json.Number("107374182400")
- var (
- // A volume can take a long time to be available, if we don't want
- // Kubernetes to wait forever we need to stop trying after some time, that
- // time is defined here
- timeoutWaitingForVolume = 2 * time.Minute
- tickerWaitingForVolume = 5 * time.Second
- errStateNotFound = errors.New("State not found by Dataset ID")
- errConfigurationNotFound = errors.New("Configuration not found by Name")
- errFlockerControlServiceHost = errors.New("The volume config must have a key CONTROL_SERVICE_HOST defined in the OtherAttributes field")
- errFlockerControlServicePort = errors.New("The volume config must have a key CONTROL_SERVICE_PORT defined in the OtherAttributes field")
- errVolumeAlreadyExists = errors.New("The volume already exists")
- errVolumeDoesNotExist = errors.New("The volume does not exist")
- errUpdatingDataset = errors.New("It was impossible to update the dataset")
- )
- // Clientable exposes the needed methods to implement your own Flocker Client.
- type Clientable interface {
- CreateDataset(options *CreateDatasetOptions) (*DatasetState, error)
- DeleteDataset(datasetID string) error
- GetDatasetState(datasetID string) (*DatasetState, error)
- GetDatasetID(metaName string) (datasetID string, err error)
- GetPrimaryUUID() (primaryUUID string, err error)
- ListNodes() (nodes []NodeState, err error)
- UpdatePrimaryForDataset(primaryUUID, datasetID string) (*DatasetState, error)
- }
- // Client is a default Flocker Client.
- type Client struct {
- *http.Client
- schema string
- host string
- port int
- version string
- clientIP string
- maximumSize json.Number
- }
- var _ Clientable = &Client{}
- // NewClient creates a wrapper over http.Client to communicate with the flocker control service.
- func NewClient(host string, port int, clientIP string, caCertPath, keyPath, certPath string) (*Client, error) {
- client, err := newTLSClient(caCertPath, keyPath, certPath)
- if err != nil {
- return nil, err
- }
- return &Client{
- Client: client,
- schema: "https",
- host: host,
- port: port,
- version: "v1",
- maximumSize: defaultVolumeSize,
- clientIP: clientIP,
- }, nil
- }
- /*
- request do a request using the http.Client embedded to the control service
- and returns the response or an error in case it happens.
- Note: you will need to deal with the response body call to Close if you
- don't want to deal with problems later.
- */
- func (c Client) request(method, url string, payload interface{}) (*http.Response, error) {
- var (
- b []byte
- err error
- )
- if method == "POST" { // Just allow payload on POST
- b, err = json.Marshal(payload)
- if err != nil {
- return nil, err
- }
- }
- req, err := http.NewRequest(method, url, bytes.NewBuffer(b))
- if err != nil {
- return nil, err
- }
- req.Header.Set("Content-Type", "application/json")
- // REMEMBER TO CLOSE THE BODY IN THE OUTSIDE FUNCTION
- return c.Do(req)
- }
- // post performs a post request with the indicated payload
- func (c Client) post(url string, payload interface{}) (*http.Response, error) {
- return c.request("POST", url, payload)
- }
- // delete performs a delete request with the indicated payload
- func (c Client) delete(url string, payload interface{}) (*http.Response, error) {
- return c.request("DELETE", url, payload)
- }
- // get performs a get request
- func (c Client) get(url string) (*http.Response, error) {
- return c.request("GET", url, nil)
- }
- // getURL returns a full URI to the control service
- func (c Client) getURL(path string) string {
- return fmt.Sprintf("%s://%s:%d/%s/%s", c.schema, c.host, c.port, c.version, path)
- }
- type configurationPayload struct {
- Deleted bool `json:"deleted"`
- Primary string `json:"primary"`
- DatasetID string `json:"dataset_id,omitempty"`
- MaximumSize json.Number `json:"maximum_size,omitempty"`
- Metadata metadataPayload `json:"metadata,omitempty"`
- }
- type CreateDatasetOptions struct {
- Primary string `json:"primary"`
- DatasetID string `json:"dataset_id,omitempty"`
- MaximumSize int64 `json:"maximum_size,omitempty"`
- Metadata map[string]string `json:"metadata,omitempty"`
- }
- type metadataPayload struct {
- Name string `json:"name,omitempty"`
- }
- type DatasetState struct {
- Path string `json:"path"`
- DatasetID string `json:"dataset_id"`
- Primary string `json:"primary,omitempty"`
- MaximumSize json.Number `json:"maximum_size,omitempty"`
- }
- type datasetStatePayload struct {
- *DatasetState
- }
- type NodeState struct {
- UUID string `json:"uuid"`
- Host string `json:"host"`
- }
- // findIDInConfigurationsPayload returns the datasetID if it was found in the
- // configurations payload, otherwise it will return an error.
- func (c Client) findIDInConfigurationsPayload(body io.ReadCloser, name string) (datasetID string, err error) {
- var configurations []configurationPayload
- if err = json.NewDecoder(body).Decode(&configurations); err == nil {
- for _, r := range configurations {
- if r.Metadata.Name == name {
- return r.DatasetID, nil
- }
- }
- return "", errConfigurationNotFound
- }
- return "", err
- }
- // ListNodes returns a list of dataset agent nodes from Flocker Control Service
- func (c *Client) ListNodes() (nodes []NodeState, err error) {
- resp, err := c.get(c.getURL("state/nodes"))
- if err != nil {
- return []NodeState{}, err
- }
- defer resp.Body.Close()
- if resp.StatusCode >= 300 {
- return []NodeState{}, fmt.Errorf("Expected: {1,2}xx listing nodes, got: %d", resp.StatusCode)
- }
- err = json.NewDecoder(resp.Body).Decode(&nodes)
- if err != nil {
- return []NodeState{}, err
- }
- return nodes, err
- }
- // GetPrimaryUUID returns the UUID of the primary Flocker Control Service for
- // the given host.
- func (c Client) GetPrimaryUUID() (uuid string, err error) {
- states, err := c.ListNodes()
- if err != nil {
- return "", err
- }
- for _, s := range states {
- if s.Host == c.clientIP {
- return s.UUID, nil
- }
- }
- return "", fmt.Errorf("No node found with IP '%s', available nodes %+v", c.clientIP, states)
- }
- // DeleteDataset performs a delete request to the given datasetID
- func (c *Client) DeleteDataset(datasetID string) error {
- url := c.getURL(fmt.Sprintf("configuration/datasets/%s", datasetID))
- resp, err := c.delete(url, nil)
- if err != nil {
- return err
- }
- defer resp.Body.Close()
- if resp.StatusCode >= 300 {
- return fmt.Errorf("Expected: {1,2}xx deleting the dataset %s, got: %d", datasetID, resp.StatusCode)
- }
- return nil
- }
- // GetDatasetState performs a get request to get the state of the given datasetID, if
- // something goes wrong or the datasetID was not found it returns an error.
- func (c Client) GetDatasetState(datasetID string) (*DatasetState, error) {
- resp, err := c.get(c.getURL("state/datasets"))
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- var states []datasetStatePayload
- if err = json.NewDecoder(resp.Body).Decode(&states); err == nil {
- for _, s := range states {
- if s.DatasetID == datasetID {
- return s.DatasetState, nil
- }
- }
- return nil, errStateNotFound
- }
- return nil, err
- }
- /*
- CreateDataset creates a volume in Flocker, waits for it to be ready and
- returns the dataset id.
- This process is a little bit complex but follows this flow:
- 1. Find the Flocker Control Service UUID
- 2. If it already exists an error is returned
- 3. If it didn't previously exist, wait for it to be ready
- */
- func (c *Client) CreateDataset(options *CreateDatasetOptions) (datasetState *DatasetState, err error) {
- // 1) Find the primary Flocker UUID
- // Note: it could be cached, but doing this query we health check it
- if options.Primary == "" {
- options.Primary, err = c.GetPrimaryUUID()
- if err != nil {
- return nil, err
- }
- }
- if options.MaximumSize == 0 {
- options.MaximumSize, _ = c.maximumSize.Int64()
- }
- resp, err := c.post(c.getURL("configuration/datasets"), options)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- // 2) Return if the dataset was previously created
- if resp.StatusCode == http.StatusConflict {
- return nil, errVolumeAlreadyExists
- }
- if resp.StatusCode >= 300 {
- return nil, fmt.Errorf("Expected: {1,2}xx creating the volume, got: %d", resp.StatusCode)
- }
- var p configurationPayload
- if err := json.NewDecoder(resp.Body).Decode(&p); err != nil {
- return nil, err
- }
- // 3) Wait until the dataset is ready for usage. In case it never gets
- // ready there is a timeoutChan that will return an error
- timeoutChan := time.NewTimer(timeoutWaitingForVolume).C
- tickChan := time.NewTicker(tickerWaitingForVolume).C
- for {
- var strErrDel string
- s, err := c.GetDatasetState(p.DatasetID)
- if err == nil {
- return s, nil
- } else if err != errStateNotFound {
- errDel := c.DeleteDataset(p.DatasetID)
- if errDel != nil {
- strErrDel = fmt.Sprintf(", deletion of dataset failed with %s", errDel)
- }
- return nil, fmt.Errorf("Flocker API error during dataset creation (datasetID %s): %s%s", p.DatasetID, err, strErrDel)
- }
- select {
- case <-timeoutChan:
- errDel := c.DeleteDataset(p.DatasetID)
- if errDel != nil {
- strErrDel = fmt.Sprintf(", deletion of dataset failed with %s", errDel)
- }
- return nil, fmt.Errorf("Flocker API timeout during dataset creation (datasetID %s): %s%s", p.DatasetID, err, strErrDel)
- case <-tickChan:
- break
- }
- }
- }
- // UpdatePrimaryForDataset will update the Primary for the given dataset
- // returning the current DatasetState.
- func (c Client) UpdatePrimaryForDataset(newPrimaryUUID, datasetID string) (*DatasetState, error) {
- payload := struct {
- Primary string `json:"primary"`
- }{
- Primary: newPrimaryUUID,
- }
- url := c.getURL(fmt.Sprintf("configuration/datasets/%s", datasetID))
- resp, err := c.post(url, payload)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- if resp.StatusCode >= 300 {
- return nil, errUpdatingDataset
- }
- var s DatasetState
- if err := json.NewDecoder(resp.Body).Decode(&s); err != nil {
- return nil, err
- }
- return &s, nil
- }
- // GetDatasetID will return the DatasetID found for the given metadata name.
- func (c Client) GetDatasetID(metaName string) (datasetID string, err error) {
- resp, err := c.get(c.getURL("configuration/datasets"))
- if err != nil {
- return "", err
- }
- defer resp.Body.Close()
- var configurations []configurationPayload
- if err = json.NewDecoder(resp.Body).Decode(&configurations); err == nil {
- for _, c := range configurations {
- if c.Metadata.Name == metaName && c.Deleted == false {
- return c.DatasetID, nil
- }
- }
- return "", errConfigurationNotFound
- }
- return "", err
- }
|