message.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package storage
  2. // Copyright 2017 Microsoft Corporation
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License");
  5. // you may not use this file except in compliance with the License.
  6. // You may obtain a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS,
  12. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. // See the License for the specific language governing permissions and
  14. // limitations under the License.
  15. import (
  16. "encoding/xml"
  17. "fmt"
  18. "net/http"
  19. "net/url"
  20. "strconv"
  21. "time"
  22. )
  23. // Message represents an Azure message.
  24. type Message struct {
  25. Queue *Queue
  26. Text string `xml:"MessageText"`
  27. ID string `xml:"MessageId"`
  28. Insertion TimeRFC1123 `xml:"InsertionTime"`
  29. Expiration TimeRFC1123 `xml:"ExpirationTime"`
  30. PopReceipt string `xml:"PopReceipt"`
  31. NextVisible TimeRFC1123 `xml:"TimeNextVisible"`
  32. DequeueCount int `xml:"DequeueCount"`
  33. }
  34. func (m *Message) buildPath() string {
  35. return fmt.Sprintf("%s/%s", m.Queue.buildPathMessages(), m.ID)
  36. }
  37. // PutMessageOptions is the set of options can be specified for Put Messsage
  38. // operation. A zero struct does not use any preferences for the request.
  39. type PutMessageOptions struct {
  40. Timeout uint
  41. VisibilityTimeout int
  42. MessageTTL int
  43. RequestID string `header:"x-ms-client-request-id"`
  44. }
  45. // Put operation adds a new message to the back of the message queue.
  46. //
  47. // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Put-Message
  48. func (m *Message) Put(options *PutMessageOptions) error {
  49. query := url.Values{}
  50. headers := m.Queue.qsc.client.getStandardHeaders()
  51. req := putMessageRequest{MessageText: m.Text}
  52. body, nn, err := xmlMarshal(req)
  53. if err != nil {
  54. return err
  55. }
  56. headers["Content-Length"] = strconv.Itoa(nn)
  57. if options != nil {
  58. if options.VisibilityTimeout != 0 {
  59. query.Set("visibilitytimeout", strconv.Itoa(options.VisibilityTimeout))
  60. }
  61. if options.MessageTTL != 0 {
  62. query.Set("messagettl", strconv.Itoa(options.MessageTTL))
  63. }
  64. query = addTimeout(query, options.Timeout)
  65. headers = mergeHeaders(headers, headersFromStruct(*options))
  66. }
  67. uri := m.Queue.qsc.client.getEndpoint(queueServiceName, m.Queue.buildPathMessages(), query)
  68. resp, err := m.Queue.qsc.client.exec(http.MethodPost, uri, headers, body, m.Queue.qsc.auth)
  69. if err != nil {
  70. return err
  71. }
  72. defer drainRespBody(resp)
  73. err = checkRespCode(resp, []int{http.StatusCreated})
  74. if err != nil {
  75. return err
  76. }
  77. err = xmlUnmarshal(resp.Body, m)
  78. if err != nil {
  79. return err
  80. }
  81. return nil
  82. }
  83. // UpdateMessageOptions is the set of options can be specified for Update Messsage
  84. // operation. A zero struct does not use any preferences for the request.
  85. type UpdateMessageOptions struct {
  86. Timeout uint
  87. VisibilityTimeout int
  88. RequestID string `header:"x-ms-client-request-id"`
  89. }
  90. // Update operation updates the specified message.
  91. //
  92. // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Update-Message
  93. func (m *Message) Update(options *UpdateMessageOptions) error {
  94. query := url.Values{}
  95. if m.PopReceipt != "" {
  96. query.Set("popreceipt", m.PopReceipt)
  97. }
  98. headers := m.Queue.qsc.client.getStandardHeaders()
  99. req := putMessageRequest{MessageText: m.Text}
  100. body, nn, err := xmlMarshal(req)
  101. if err != nil {
  102. return err
  103. }
  104. headers["Content-Length"] = strconv.Itoa(nn)
  105. // visibilitytimeout is required for Update (zero or greater) so set the default here
  106. query.Set("visibilitytimeout", "0")
  107. if options != nil {
  108. if options.VisibilityTimeout != 0 {
  109. query.Set("visibilitytimeout", strconv.Itoa(options.VisibilityTimeout))
  110. }
  111. query = addTimeout(query, options.Timeout)
  112. headers = mergeHeaders(headers, headersFromStruct(*options))
  113. }
  114. uri := m.Queue.qsc.client.getEndpoint(queueServiceName, m.buildPath(), query)
  115. resp, err := m.Queue.qsc.client.exec(http.MethodPut, uri, headers, body, m.Queue.qsc.auth)
  116. if err != nil {
  117. return err
  118. }
  119. defer drainRespBody(resp)
  120. m.PopReceipt = resp.Header.Get("x-ms-popreceipt")
  121. nextTimeStr := resp.Header.Get("x-ms-time-next-visible")
  122. if nextTimeStr != "" {
  123. nextTime, err := time.Parse(time.RFC1123, nextTimeStr)
  124. if err != nil {
  125. return err
  126. }
  127. m.NextVisible = TimeRFC1123(nextTime)
  128. }
  129. return checkRespCode(resp, []int{http.StatusNoContent})
  130. }
  131. // Delete operation deletes the specified message.
  132. //
  133. // See https://msdn.microsoft.com/en-us/library/azure/dd179347.aspx
  134. func (m *Message) Delete(options *QueueServiceOptions) error {
  135. params := url.Values{"popreceipt": {m.PopReceipt}}
  136. headers := m.Queue.qsc.client.getStandardHeaders()
  137. if options != nil {
  138. params = addTimeout(params, options.Timeout)
  139. headers = mergeHeaders(headers, headersFromStruct(*options))
  140. }
  141. uri := m.Queue.qsc.client.getEndpoint(queueServiceName, m.buildPath(), params)
  142. resp, err := m.Queue.qsc.client.exec(http.MethodDelete, uri, headers, nil, m.Queue.qsc.auth)
  143. if err != nil {
  144. return err
  145. }
  146. defer drainRespBody(resp)
  147. return checkRespCode(resp, []int{http.StatusNoContent})
  148. }
  149. type putMessageRequest struct {
  150. XMLName xml.Name `xml:"QueueMessage"`
  151. MessageText string `xml:"MessageText"`
  152. }