client.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560
  1. package storageos
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "math/rand"
  11. "net"
  12. "net/http"
  13. "net/url"
  14. "reflect"
  15. "strconv"
  16. "strings"
  17. "sync"
  18. "time"
  19. "github.com/storageos/go-api/netutil"
  20. "github.com/storageos/go-api/serror"
  21. )
  22. const (
  23. // DefaultUserAgent is the default User-Agent header to include in HTTP requests.
  24. DefaultUserAgent = "go-storageosclient"
  25. // DefaultVersionStr is the string value of the default API version.
  26. DefaultVersionStr = "1"
  27. // DefaultVersion is the default API version.
  28. DefaultVersion = 1
  29. )
  30. var (
  31. // ErrConnectionRefused is returned when the client cannot connect to the given endpoint.
  32. ErrConnectionRefused = errors.New("cannot connect to StorageOS API endpoint")
  33. // ErrInactivityTimeout is returned when a streamable call has been inactive for some time.
  34. ErrInactivityTimeout = errors.New("inactivity time exceeded timeout")
  35. // ErrInvalidVersion is returned when a versioned client was requested but no version specified.
  36. ErrInvalidVersion = errors.New("invalid version")
  37. // ErrProxyNotSupported is returned when a client is unable to set a proxy for http requests.
  38. ErrProxyNotSupported = errors.New("client does not support http proxy")
  39. // DefaultPort is the default API port.
  40. DefaultPort = "5705"
  41. // DataplaneHealthPort is the the port used by the dataplane health-check service.
  42. DataplaneHealthPort = "5704"
  43. // DefaultHost is the default API host.
  44. DefaultHost = "http://localhost:" + DefaultPort
  45. )
  46. // APIVersion is an internal representation of a version of the Remote API.
  47. type APIVersion int
  48. // NewAPIVersion returns an instance of APIVersion for the given string.
  49. //
  50. // The given string must be in the form <major>
  51. func NewAPIVersion(input string) (APIVersion, error) {
  52. if input == "" {
  53. return DefaultVersion, ErrInvalidVersion
  54. }
  55. version, err := strconv.Atoi(input)
  56. if err != nil {
  57. return 0, fmt.Errorf("Unable to parse version %q", input)
  58. }
  59. return APIVersion(version), nil
  60. }
  61. func (version APIVersion) String() string {
  62. return fmt.Sprintf("v%d", version)
  63. }
  64. // Client is the basic type of this package. It provides methods for
  65. // interaction with the API.
  66. type Client struct {
  67. httpClient *http.Client
  68. addresses []string
  69. username string
  70. secret string
  71. userAgent string
  72. configLock *sync.RWMutex // Lock for config changes
  73. addressLock *sync.Mutex // Lock used to copy/update the address slice
  74. requestedAPIVersion APIVersion
  75. serverAPIVersion APIVersion
  76. expectedAPIVersion APIVersion
  77. SkipServerVersionCheck bool
  78. }
  79. // ClientVersion returns the API version of the client
  80. func (c *Client) ClientVersion() string {
  81. return DefaultVersionStr
  82. }
  83. // Dialer is an interface that allows network connections to be dialed
  84. // (net.Dialer fulfills this interface) and named pipes (a shim using
  85. // winio.DialPipe)
  86. type Dialer interface {
  87. Dial(network, address string) (net.Conn, error)
  88. }
  89. // NewClient returns a Client instance ready for communication with the given
  90. // server endpoint. It will use the latest remote API version available in the
  91. // server.
  92. func NewClient(nodes string) (*Client, error) {
  93. client, err := NewVersionedClient(nodes, "")
  94. if err != nil {
  95. return nil, err
  96. }
  97. client.SkipServerVersionCheck = true
  98. client.userAgent = DefaultUserAgent
  99. return client, nil
  100. }
  101. // NewVersionedClient returns a Client instance ready for communication with
  102. // the given server endpoint, using a specific remote API version.
  103. func NewVersionedClient(nodestring string, apiVersionString string) (*Client, error) {
  104. nodes := strings.Split(nodestring, ",")
  105. addresses, err := netutil.AddressesFromNodes(nodes)
  106. if err != nil {
  107. return nil, err
  108. }
  109. if len(addresses) > 1 {
  110. // Shuffle returned addresses in attempt to spread the load
  111. rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
  112. rnd.Shuffle(len(addresses), func(i, j int) {
  113. addresses[i], addresses[j] = addresses[j], addresses[i]
  114. })
  115. }
  116. client := &Client{
  117. httpClient: defaultClient(),
  118. addresses: addresses,
  119. configLock: &sync.RWMutex{},
  120. addressLock: &sync.Mutex{},
  121. }
  122. if apiVersionString != "" {
  123. version, err := strconv.Atoi(apiVersionString)
  124. if err != nil {
  125. return nil, err
  126. }
  127. client.requestedAPIVersion = APIVersion(version)
  128. }
  129. return client, nil
  130. }
  131. // SetUserAgent sets the client useragent.
  132. func (c *Client) SetUserAgent(useragent string) {
  133. c.configLock.Lock()
  134. defer c.configLock.Unlock()
  135. c.userAgent = useragent
  136. }
  137. // SetAuth sets the API username and secret to be used for all API requests.
  138. // It should not be called concurrently with any other Client methods.
  139. func (c *Client) SetAuth(username string, secret string) {
  140. c.configLock.Lock()
  141. defer c.configLock.Unlock()
  142. if username != "" {
  143. c.username = username
  144. }
  145. if secret != "" {
  146. c.secret = secret
  147. }
  148. }
  149. // SetProxy will set the proxy URL for both the HTTPClient.
  150. // If the transport method does not support usage
  151. // of proxies, an error will be returned.
  152. func (c *Client) SetProxy(proxy *url.URL) error {
  153. c.configLock.Lock()
  154. defer c.configLock.Unlock()
  155. if client := c.httpClient; client != nil {
  156. transport, supported := client.Transport.(*http.Transport)
  157. if !supported {
  158. return ErrProxyNotSupported
  159. }
  160. transport.Proxy = http.ProxyURL(proxy)
  161. }
  162. return nil
  163. }
  164. // SetTimeout takes a timeout and applies it to both the HTTPClient and
  165. // nativeHTTPClient. It should not be called concurrently with any other Client
  166. // methods.
  167. func (c *Client) SetTimeout(t time.Duration) {
  168. c.configLock.Lock()
  169. defer c.configLock.Unlock()
  170. if c.httpClient != nil {
  171. c.httpClient.Timeout = t
  172. }
  173. }
  174. func (c *Client) checkAPIVersion() error {
  175. serverAPIVersionString, err := c.getServerAPIVersionString()
  176. if err != nil {
  177. return err
  178. }
  179. c.serverAPIVersion, err = NewAPIVersion(serverAPIVersionString)
  180. if err != nil {
  181. return err
  182. }
  183. c.configLock.Lock()
  184. defer c.configLock.Unlock()
  185. if c.requestedAPIVersion == 0 {
  186. c.expectedAPIVersion = c.serverAPIVersion
  187. } else {
  188. c.expectedAPIVersion = c.requestedAPIVersion
  189. }
  190. return nil
  191. }
  192. // Ping pings the API server
  193. //
  194. // See https://goo.gl/wYfgY1 for more details.
  195. func (c *Client) Ping() error {
  196. urlpath := "/_ping"
  197. resp, err := c.do("GET", urlpath, doOptions{})
  198. if err != nil {
  199. return err
  200. }
  201. if resp.StatusCode != http.StatusOK {
  202. return newError(resp)
  203. }
  204. return resp.Body.Close()
  205. }
  206. func (c *Client) getServerAPIVersionString() (version string, err error) {
  207. v, err := c.ServerVersion(context.Background())
  208. if err != nil {
  209. return "", err
  210. }
  211. return v.APIVersion, nil
  212. }
  213. type doOptions struct {
  214. context context.Context
  215. data interface{}
  216. values url.Values
  217. headers map[string]string
  218. fieldSelector string
  219. labelSelector string
  220. namespace string
  221. forceJSON bool
  222. force bool
  223. unversioned bool
  224. }
  225. func (c *Client) do(method, urlpath string, doOptions doOptions) (*http.Response, error) {
  226. var params io.Reader
  227. if doOptions.data != nil || doOptions.forceJSON {
  228. buf, err := json.Marshal(doOptions.data)
  229. if err != nil {
  230. return nil, err
  231. }
  232. params = bytes.NewBuffer(buf)
  233. }
  234. // Prefix the path with the namespace if given. The caller should only set
  235. // the namespace if this is desired.
  236. if doOptions.namespace != "" {
  237. urlpath = "/" + NamespaceAPIPrefix + "/" + doOptions.namespace + "/" + urlpath
  238. }
  239. if !c.SkipServerVersionCheck && !doOptions.unversioned {
  240. err := c.checkAPIVersion()
  241. if err != nil {
  242. return nil, err
  243. }
  244. }
  245. query := url.Values{}
  246. if doOptions.values != nil {
  247. query = doOptions.values
  248. }
  249. if doOptions.force {
  250. query.Add("force", "1")
  251. }
  252. // Obtain a reader lock to prevent the http client from being
  253. // modified underneath us during a do().
  254. c.configLock.RLock()
  255. defer c.configLock.RUnlock() // This defer matches both the initial and the above lock
  256. httpClient := c.httpClient
  257. endpoint := c.getAPIPath(urlpath, query, doOptions.unversioned)
  258. // The doOptions Context is shared for every attempted request in the do.
  259. ctx := doOptions.context
  260. if ctx == nil {
  261. ctx = context.Background()
  262. }
  263. var failedAddresses = map[string]struct{}{}
  264. c.addressLock.Lock()
  265. var addresses = make([]string, len(c.addresses))
  266. copy(addresses, c.addresses)
  267. c.addressLock.Unlock()
  268. for _, address := range addresses {
  269. target := address + endpoint
  270. req, err := http.NewRequest(method, target, params)
  271. if err != nil {
  272. // Probably should not try and continue if we're unable
  273. // to create the request.
  274. return nil, err
  275. }
  276. req.Header.Set("User-Agent", c.userAgent)
  277. if doOptions.data != nil {
  278. req.Header.Set("Content-Type", "application/json")
  279. } else if method == "POST" {
  280. req.Header.Set("Content-Type", "plain/text")
  281. }
  282. if c.username != "" && c.secret != "" {
  283. req.SetBasicAuth(c.username, c.secret)
  284. }
  285. for k, v := range doOptions.headers {
  286. req.Header.Set(k, v)
  287. }
  288. resp, err := httpClient.Do(req.WithContext(ctx))
  289. if err != nil {
  290. // If it is a custom error, return it. It probably knows more than us
  291. if serror.IsStorageOSError(err) {
  292. switch serror.ErrorKind(err) {
  293. case serror.APIUncontactable:
  294. // If API isn't contactable we should try the next address
  295. failedAddresses[address] = struct{}{}
  296. continue
  297. case serror.InvalidHostConfig:
  298. // If invalid host or unknown error, we should report back
  299. fallthrough
  300. case serror.UnknownError:
  301. return nil, err
  302. }
  303. }
  304. select {
  305. case <-ctx.Done():
  306. return nil, ctx.Err()
  307. default:
  308. if _, ok := err.(net.Error); ok {
  309. // Be optimistic and try the next endpoint
  310. failedAddresses[address] = struct{}{}
  311. continue
  312. }
  313. return nil, err
  314. }
  315. }
  316. // If we get to the point of response, we should move any failed
  317. // addresses to the back.
  318. failed := len(failedAddresses)
  319. if failed > 0 {
  320. // Copy addresses we think are okay into the head of the list
  321. newOrder := make([]string, 0, len(addresses)-failed)
  322. for _, addr := range addresses {
  323. if _, exists := failedAddresses[addr]; !exists {
  324. newOrder = append(newOrder, addr)
  325. }
  326. }
  327. for addr := range failedAddresses {
  328. newOrder = append(newOrder, addr)
  329. }
  330. c.addressLock.Lock()
  331. // Bring in the new order
  332. c.addresses = newOrder
  333. c.addressLock.Unlock()
  334. }
  335. if resp.StatusCode < 200 || resp.StatusCode >= 400 {
  336. return nil, newError(resp) // These status codes are likely to be fatal
  337. }
  338. return resp, nil
  339. }
  340. return nil, netutil.ErrAllFailed(addresses)
  341. }
  342. func (c *Client) getAPIPath(path string, query url.Values, unversioned bool) string {
  343. var apiPath = strings.TrimLeft(path, "/")
  344. if !unversioned {
  345. apiPath = fmt.Sprintf("/%s/%s", c.requestedAPIVersion, apiPath)
  346. } else {
  347. apiPath = fmt.Sprintf("/%s", apiPath)
  348. }
  349. if len(query) > 0 {
  350. apiPath = apiPath + "?" + query.Encode()
  351. }
  352. return apiPath
  353. }
  354. func queryString(opts interface{}) string {
  355. if opts == nil {
  356. return ""
  357. }
  358. value := reflect.ValueOf(opts)
  359. if value.Kind() == reflect.Ptr {
  360. value = value.Elem()
  361. }
  362. if value.Kind() != reflect.Struct {
  363. return ""
  364. }
  365. items := url.Values(map[string][]string{})
  366. for i := 0; i < value.NumField(); i++ {
  367. field := value.Type().Field(i)
  368. if field.PkgPath != "" {
  369. continue
  370. }
  371. key := field.Tag.Get("qs")
  372. if key == "" {
  373. key = strings.ToLower(field.Name)
  374. } else if key == "-" {
  375. continue
  376. }
  377. addQueryStringValue(items, key, value.Field(i))
  378. }
  379. return items.Encode()
  380. }
  381. func addQueryStringValue(items url.Values, key string, v reflect.Value) {
  382. switch v.Kind() {
  383. case reflect.Bool:
  384. if v.Bool() {
  385. items.Add(key, "1")
  386. }
  387. case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
  388. if v.Int() > 0 {
  389. items.Add(key, strconv.FormatInt(v.Int(), 10))
  390. }
  391. case reflect.Float32, reflect.Float64:
  392. if v.Float() > 0 {
  393. items.Add(key, strconv.FormatFloat(v.Float(), 'f', -1, 64))
  394. }
  395. case reflect.String:
  396. if v.String() != "" {
  397. items.Add(key, v.String())
  398. }
  399. case reflect.Ptr:
  400. if !v.IsNil() {
  401. if b, err := json.Marshal(v.Interface()); err == nil {
  402. items.Add(key, string(b))
  403. }
  404. }
  405. case reflect.Map:
  406. if len(v.MapKeys()) > 0 {
  407. if b, err := json.Marshal(v.Interface()); err == nil {
  408. items.Add(key, string(b))
  409. }
  410. }
  411. case reflect.Array, reflect.Slice:
  412. vLen := v.Len()
  413. if vLen > 0 {
  414. for i := 0; i < vLen; i++ {
  415. addQueryStringValue(items, key, v.Index(i))
  416. }
  417. }
  418. }
  419. }
  420. // Error represents failures in the API. It represents a failure from the API.
  421. type Error struct {
  422. Status int
  423. Message string
  424. }
  425. func newError(resp *http.Response) *Error {
  426. type jsonError struct {
  427. Message string `json:"message"`
  428. }
  429. defer resp.Body.Close()
  430. data, err := ioutil.ReadAll(resp.Body)
  431. if err != nil {
  432. return &Error{Status: resp.StatusCode, Message: fmt.Sprintf("cannot read body, err: %v", err)}
  433. }
  434. // attempt to unmarshal the error if in json format
  435. jerr := &jsonError{}
  436. err = json.Unmarshal(data, jerr)
  437. if err != nil {
  438. return &Error{Status: resp.StatusCode, Message: string(data)} // Failed, just return string
  439. }
  440. return &Error{Status: resp.StatusCode, Message: jerr.Message}
  441. }
  442. func (e *Error) Error() string {
  443. var niceStatus string
  444. switch e.Status {
  445. case 400, 500:
  446. niceStatus = "Server failed to process your request. Was the data correct?"
  447. case 401:
  448. niceStatus = "Unauthenticated access of secure endpoint, please retry after authentication"
  449. case 403:
  450. niceStatus = "Forbidden request. Your user cannot perform this action"
  451. case 404:
  452. niceStatus = "Requested object not found. Does this item exist?"
  453. }
  454. if niceStatus != "" {
  455. return fmt.Sprintf("API error (%s): %s", niceStatus, e.Message)
  456. }
  457. return fmt.Sprintf("API error (%s): %s", http.StatusText(e.Status), e.Message)
  458. }
  459. // defaultPooledTransport returns a new http.Transport with similar default
  460. // values to http.DefaultTransport. Do not use this for transient transports as
  461. // it can leak file descriptors over time. Only use this for transports that
  462. // will be re-used for the same host(s).
  463. func defaultPooledTransport(dialer Dialer) *http.Transport {
  464. transport := &http.Transport{
  465. Proxy: http.ProxyFromEnvironment,
  466. Dial: dialer.Dial,
  467. TLSHandshakeTimeout: 5 * time.Second,
  468. DisableKeepAlives: false,
  469. MaxIdleConnsPerHost: 1,
  470. }
  471. return transport
  472. }
  473. // defaultClient returns a new http.Client with similar default values to
  474. // http.Client, but with a non-shared Transport, idle connections disabled, and
  475. // keepalives disabled.
  476. // If a custom dialer is not provided, one with sane defaults will be created.
  477. func defaultClient() *http.Client {
  478. dialer := &net.Dialer{
  479. Timeout: 5 * time.Second,
  480. KeepAlive: 5 * time.Second,
  481. }
  482. return &http.Client{
  483. Transport: defaultPooledTransport(dialer),
  484. }
  485. }