queue.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. package storage
  2. // Copyright 2017 Microsoft Corporation
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License");
  5. // you may not use this file except in compliance with the License.
  6. // You may obtain a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS,
  12. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. // See the License for the specific language governing permissions and
  14. // limitations under the License.
  15. import (
  16. "encoding/xml"
  17. "fmt"
  18. "io"
  19. "net/http"
  20. "net/url"
  21. "strconv"
  22. "time"
  23. )
  24. const (
  25. // casing is per Golang's http.Header canonicalizing the header names.
  26. approximateMessagesCountHeader = "X-Ms-Approximate-Messages-Count"
  27. )
  28. // QueueAccessPolicy represents each access policy in the queue ACL.
  29. type QueueAccessPolicy struct {
  30. ID string
  31. StartTime time.Time
  32. ExpiryTime time.Time
  33. CanRead bool
  34. CanAdd bool
  35. CanUpdate bool
  36. CanProcess bool
  37. }
  38. // QueuePermissions represents the queue ACLs.
  39. type QueuePermissions struct {
  40. AccessPolicies []QueueAccessPolicy
  41. }
  42. // SetQueuePermissionOptions includes options for a set queue permissions operation
  43. type SetQueuePermissionOptions struct {
  44. Timeout uint
  45. RequestID string `header:"x-ms-client-request-id"`
  46. }
  47. // Queue represents an Azure queue.
  48. type Queue struct {
  49. qsc *QueueServiceClient
  50. Name string
  51. Metadata map[string]string
  52. AproxMessageCount uint64
  53. }
  54. func (q *Queue) buildPath() string {
  55. return fmt.Sprintf("/%s", q.Name)
  56. }
  57. func (q *Queue) buildPathMessages() string {
  58. return fmt.Sprintf("%s/messages", q.buildPath())
  59. }
  60. // QueueServiceOptions includes options for some queue service operations
  61. type QueueServiceOptions struct {
  62. Timeout uint
  63. RequestID string `header:"x-ms-client-request-id"`
  64. }
  65. // Create operation creates a queue under the given account.
  66. //
  67. // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Create-Queue4
  68. func (q *Queue) Create(options *QueueServiceOptions) error {
  69. params := url.Values{}
  70. headers := q.qsc.client.getStandardHeaders()
  71. headers = q.qsc.client.addMetadataToHeaders(headers, q.Metadata)
  72. if options != nil {
  73. params = addTimeout(params, options.Timeout)
  74. headers = mergeHeaders(headers, headersFromStruct(*options))
  75. }
  76. uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
  77. resp, err := q.qsc.client.exec(http.MethodPut, uri, headers, nil, q.qsc.auth)
  78. if err != nil {
  79. return err
  80. }
  81. defer drainRespBody(resp)
  82. return checkRespCode(resp, []int{http.StatusCreated})
  83. }
  84. // Delete operation permanently deletes the specified queue.
  85. //
  86. // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Delete-Queue3
  87. func (q *Queue) Delete(options *QueueServiceOptions) error {
  88. params := url.Values{}
  89. headers := q.qsc.client.getStandardHeaders()
  90. if options != nil {
  91. params = addTimeout(params, options.Timeout)
  92. headers = mergeHeaders(headers, headersFromStruct(*options))
  93. }
  94. uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
  95. resp, err := q.qsc.client.exec(http.MethodDelete, uri, headers, nil, q.qsc.auth)
  96. if err != nil {
  97. return err
  98. }
  99. defer drainRespBody(resp)
  100. return checkRespCode(resp, []int{http.StatusNoContent})
  101. }
  102. // Exists returns true if a queue with given name exists.
  103. func (q *Queue) Exists() (bool, error) {
  104. uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), url.Values{"comp": {"metadata"}})
  105. resp, err := q.qsc.client.exec(http.MethodGet, uri, q.qsc.client.getStandardHeaders(), nil, q.qsc.auth)
  106. if resp != nil {
  107. defer drainRespBody(resp)
  108. if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusNotFound {
  109. return resp.StatusCode == http.StatusOK, nil
  110. }
  111. err = getErrorFromResponse(resp)
  112. }
  113. return false, err
  114. }
  115. // SetMetadata operation sets user-defined metadata on the specified queue.
  116. // Metadata is associated with the queue as name-value pairs.
  117. //
  118. // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Set-Queue-Metadata
  119. func (q *Queue) SetMetadata(options *QueueServiceOptions) error {
  120. params := url.Values{"comp": {"metadata"}}
  121. headers := q.qsc.client.getStandardHeaders()
  122. headers = q.qsc.client.addMetadataToHeaders(headers, q.Metadata)
  123. if options != nil {
  124. params = addTimeout(params, options.Timeout)
  125. headers = mergeHeaders(headers, headersFromStruct(*options))
  126. }
  127. uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
  128. resp, err := q.qsc.client.exec(http.MethodPut, uri, headers, nil, q.qsc.auth)
  129. if err != nil {
  130. return err
  131. }
  132. defer drainRespBody(resp)
  133. return checkRespCode(resp, []int{http.StatusNoContent})
  134. }
  135. // GetMetadata operation retrieves user-defined metadata and queue
  136. // properties on the specified queue. Metadata is associated with
  137. // the queue as name-values pairs.
  138. //
  139. // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Set-Queue-Metadata
  140. //
  141. // Because the way Golang's http client (and http.Header in particular)
  142. // canonicalize header names, the returned metadata names would always
  143. // be all lower case.
  144. func (q *Queue) GetMetadata(options *QueueServiceOptions) error {
  145. params := url.Values{"comp": {"metadata"}}
  146. headers := q.qsc.client.getStandardHeaders()
  147. if options != nil {
  148. params = addTimeout(params, options.Timeout)
  149. headers = mergeHeaders(headers, headersFromStruct(*options))
  150. }
  151. uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
  152. resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth)
  153. if err != nil {
  154. return err
  155. }
  156. defer drainRespBody(resp)
  157. if err := checkRespCode(resp, []int{http.StatusOK}); err != nil {
  158. return err
  159. }
  160. aproxMessagesStr := resp.Header.Get(http.CanonicalHeaderKey(approximateMessagesCountHeader))
  161. if aproxMessagesStr != "" {
  162. aproxMessages, err := strconv.ParseUint(aproxMessagesStr, 10, 64)
  163. if err != nil {
  164. return err
  165. }
  166. q.AproxMessageCount = aproxMessages
  167. }
  168. q.Metadata = getMetadataFromHeaders(resp.Header)
  169. return nil
  170. }
  171. // GetMessageReference returns a message object with the specified text.
  172. func (q *Queue) GetMessageReference(text string) *Message {
  173. return &Message{
  174. Queue: q,
  175. Text: text,
  176. }
  177. }
  178. // GetMessagesOptions is the set of options can be specified for Get
  179. // Messsages operation. A zero struct does not use any preferences for the
  180. // request.
  181. type GetMessagesOptions struct {
  182. Timeout uint
  183. NumOfMessages int
  184. VisibilityTimeout int
  185. RequestID string `header:"x-ms-client-request-id"`
  186. }
  187. type messages struct {
  188. XMLName xml.Name `xml:"QueueMessagesList"`
  189. Messages []Message `xml:"QueueMessage"`
  190. }
  191. // GetMessages operation retrieves one or more messages from the front of the
  192. // queue.
  193. //
  194. // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Get-Messages
  195. func (q *Queue) GetMessages(options *GetMessagesOptions) ([]Message, error) {
  196. query := url.Values{}
  197. headers := q.qsc.client.getStandardHeaders()
  198. if options != nil {
  199. if options.NumOfMessages != 0 {
  200. query.Set("numofmessages", strconv.Itoa(options.NumOfMessages))
  201. }
  202. if options.VisibilityTimeout != 0 {
  203. query.Set("visibilitytimeout", strconv.Itoa(options.VisibilityTimeout))
  204. }
  205. query = addTimeout(query, options.Timeout)
  206. headers = mergeHeaders(headers, headersFromStruct(*options))
  207. }
  208. uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPathMessages(), query)
  209. resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth)
  210. if err != nil {
  211. return []Message{}, err
  212. }
  213. defer resp.Body.Close()
  214. var out messages
  215. err = xmlUnmarshal(resp.Body, &out)
  216. if err != nil {
  217. return []Message{}, err
  218. }
  219. for i := range out.Messages {
  220. out.Messages[i].Queue = q
  221. }
  222. return out.Messages, err
  223. }
  224. // PeekMessagesOptions is the set of options can be specified for Peek
  225. // Messsage operation. A zero struct does not use any preferences for the
  226. // request.
  227. type PeekMessagesOptions struct {
  228. Timeout uint
  229. NumOfMessages int
  230. RequestID string `header:"x-ms-client-request-id"`
  231. }
  232. // PeekMessages retrieves one or more messages from the front of the queue, but
  233. // does not alter the visibility of the message.
  234. //
  235. // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Peek-Messages
  236. func (q *Queue) PeekMessages(options *PeekMessagesOptions) ([]Message, error) {
  237. query := url.Values{"peekonly": {"true"}} // Required for peek operation
  238. headers := q.qsc.client.getStandardHeaders()
  239. if options != nil {
  240. if options.NumOfMessages != 0 {
  241. query.Set("numofmessages", strconv.Itoa(options.NumOfMessages))
  242. }
  243. query = addTimeout(query, options.Timeout)
  244. headers = mergeHeaders(headers, headersFromStruct(*options))
  245. }
  246. uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPathMessages(), query)
  247. resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth)
  248. if err != nil {
  249. return []Message{}, err
  250. }
  251. defer resp.Body.Close()
  252. var out messages
  253. err = xmlUnmarshal(resp.Body, &out)
  254. if err != nil {
  255. return []Message{}, err
  256. }
  257. for i := range out.Messages {
  258. out.Messages[i].Queue = q
  259. }
  260. return out.Messages, err
  261. }
  262. // ClearMessages operation deletes all messages from the specified queue.
  263. //
  264. // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Clear-Messages
  265. func (q *Queue) ClearMessages(options *QueueServiceOptions) error {
  266. params := url.Values{}
  267. headers := q.qsc.client.getStandardHeaders()
  268. if options != nil {
  269. params = addTimeout(params, options.Timeout)
  270. headers = mergeHeaders(headers, headersFromStruct(*options))
  271. }
  272. uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPathMessages(), params)
  273. resp, err := q.qsc.client.exec(http.MethodDelete, uri, headers, nil, q.qsc.auth)
  274. if err != nil {
  275. return err
  276. }
  277. defer drainRespBody(resp)
  278. return checkRespCode(resp, []int{http.StatusNoContent})
  279. }
  280. // SetPermissions sets up queue permissions
  281. // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/set-queue-acl
  282. func (q *Queue) SetPermissions(permissions QueuePermissions, options *SetQueuePermissionOptions) error {
  283. body, length, err := generateQueueACLpayload(permissions.AccessPolicies)
  284. if err != nil {
  285. return err
  286. }
  287. params := url.Values{
  288. "comp": {"acl"},
  289. }
  290. headers := q.qsc.client.getStandardHeaders()
  291. headers["Content-Length"] = strconv.Itoa(length)
  292. if options != nil {
  293. params = addTimeout(params, options.Timeout)
  294. headers = mergeHeaders(headers, headersFromStruct(*options))
  295. }
  296. uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
  297. resp, err := q.qsc.client.exec(http.MethodPut, uri, headers, body, q.qsc.auth)
  298. if err != nil {
  299. return err
  300. }
  301. defer drainRespBody(resp)
  302. return checkRespCode(resp, []int{http.StatusNoContent})
  303. }
  304. func generateQueueACLpayload(policies []QueueAccessPolicy) (io.Reader, int, error) {
  305. sil := SignedIdentifiers{
  306. SignedIdentifiers: []SignedIdentifier{},
  307. }
  308. for _, qapd := range policies {
  309. permission := qapd.generateQueuePermissions()
  310. signedIdentifier := convertAccessPolicyToXMLStructs(qapd.ID, qapd.StartTime, qapd.ExpiryTime, permission)
  311. sil.SignedIdentifiers = append(sil.SignedIdentifiers, signedIdentifier)
  312. }
  313. return xmlMarshal(sil)
  314. }
  315. func (qapd *QueueAccessPolicy) generateQueuePermissions() (permissions string) {
  316. // generate the permissions string (raup).
  317. // still want the end user API to have bool flags.
  318. permissions = ""
  319. if qapd.CanRead {
  320. permissions += "r"
  321. }
  322. if qapd.CanAdd {
  323. permissions += "a"
  324. }
  325. if qapd.CanUpdate {
  326. permissions += "u"
  327. }
  328. if qapd.CanProcess {
  329. permissions += "p"
  330. }
  331. return permissions
  332. }
  333. // GetQueuePermissionOptions includes options for a get queue permissions operation
  334. type GetQueuePermissionOptions struct {
  335. Timeout uint
  336. RequestID string `header:"x-ms-client-request-id"`
  337. }
  338. // GetPermissions gets the queue permissions as per https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/get-queue-acl
  339. // If timeout is 0 then it will not be passed to Azure
  340. func (q *Queue) GetPermissions(options *GetQueuePermissionOptions) (*QueuePermissions, error) {
  341. params := url.Values{
  342. "comp": {"acl"},
  343. }
  344. headers := q.qsc.client.getStandardHeaders()
  345. if options != nil {
  346. params = addTimeout(params, options.Timeout)
  347. headers = mergeHeaders(headers, headersFromStruct(*options))
  348. }
  349. uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
  350. resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth)
  351. if err != nil {
  352. return nil, err
  353. }
  354. defer resp.Body.Close()
  355. var ap AccessPolicy
  356. err = xmlUnmarshal(resp.Body, &ap.SignedIdentifiersList)
  357. if err != nil {
  358. return nil, err
  359. }
  360. return buildQueueAccessPolicy(ap, &resp.Header), nil
  361. }
  362. func buildQueueAccessPolicy(ap AccessPolicy, headers *http.Header) *QueuePermissions {
  363. permissions := QueuePermissions{
  364. AccessPolicies: []QueueAccessPolicy{},
  365. }
  366. for _, policy := range ap.SignedIdentifiersList.SignedIdentifiers {
  367. qapd := QueueAccessPolicy{
  368. ID: policy.ID,
  369. StartTime: policy.AccessPolicy.StartTime,
  370. ExpiryTime: policy.AccessPolicy.ExpiryTime,
  371. }
  372. qapd.CanRead = updatePermissions(policy.AccessPolicy.Permission, "r")
  373. qapd.CanAdd = updatePermissions(policy.AccessPolicy.Permission, "a")
  374. qapd.CanUpdate = updatePermissions(policy.AccessPolicy.Permission, "u")
  375. qapd.CanProcess = updatePermissions(policy.AccessPolicy.Permission, "p")
  376. permissions.AccessPolicies = append(permissions.AccessPolicies, qapd)
  377. }
  378. return &permissions
  379. }