123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437 |
- package storage
- // Copyright 2017 Microsoft Corporation
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- import (
- "encoding/xml"
- "fmt"
- "io"
- "net/http"
- "net/url"
- "strconv"
- "time"
- )
- const (
- // casing is per Golang's http.Header canonicalizing the header names.
- approximateMessagesCountHeader = "X-Ms-Approximate-Messages-Count"
- )
- // QueueAccessPolicy represents each access policy in the queue ACL.
- type QueueAccessPolicy struct {
- ID string
- StartTime time.Time
- ExpiryTime time.Time
- CanRead bool
- CanAdd bool
- CanUpdate bool
- CanProcess bool
- }
- // QueuePermissions represents the queue ACLs.
- type QueuePermissions struct {
- AccessPolicies []QueueAccessPolicy
- }
- // SetQueuePermissionOptions includes options for a set queue permissions operation
- type SetQueuePermissionOptions struct {
- Timeout uint
- RequestID string `header:"x-ms-client-request-id"`
- }
- // Queue represents an Azure queue.
- type Queue struct {
- qsc *QueueServiceClient
- Name string
- Metadata map[string]string
- AproxMessageCount uint64
- }
- func (q *Queue) buildPath() string {
- return fmt.Sprintf("/%s", q.Name)
- }
- func (q *Queue) buildPathMessages() string {
- return fmt.Sprintf("%s/messages", q.buildPath())
- }
- // QueueServiceOptions includes options for some queue service operations
- type QueueServiceOptions struct {
- Timeout uint
- RequestID string `header:"x-ms-client-request-id"`
- }
- // Create operation creates a queue under the given account.
- //
- // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Create-Queue4
- func (q *Queue) Create(options *QueueServiceOptions) error {
- params := url.Values{}
- headers := q.qsc.client.getStandardHeaders()
- headers = q.qsc.client.addMetadataToHeaders(headers, q.Metadata)
- if options != nil {
- params = addTimeout(params, options.Timeout)
- headers = mergeHeaders(headers, headersFromStruct(*options))
- }
- uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
- resp, err := q.qsc.client.exec(http.MethodPut, uri, headers, nil, q.qsc.auth)
- if err != nil {
- return err
- }
- defer drainRespBody(resp)
- return checkRespCode(resp, []int{http.StatusCreated})
- }
- // Delete operation permanently deletes the specified queue.
- //
- // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Delete-Queue3
- func (q *Queue) Delete(options *QueueServiceOptions) error {
- params := url.Values{}
- headers := q.qsc.client.getStandardHeaders()
- if options != nil {
- params = addTimeout(params, options.Timeout)
- headers = mergeHeaders(headers, headersFromStruct(*options))
- }
- uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
- resp, err := q.qsc.client.exec(http.MethodDelete, uri, headers, nil, q.qsc.auth)
- if err != nil {
- return err
- }
- defer drainRespBody(resp)
- return checkRespCode(resp, []int{http.StatusNoContent})
- }
- // Exists returns true if a queue with given name exists.
- func (q *Queue) Exists() (bool, error) {
- uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), url.Values{"comp": {"metadata"}})
- resp, err := q.qsc.client.exec(http.MethodGet, uri, q.qsc.client.getStandardHeaders(), nil, q.qsc.auth)
- if resp != nil {
- defer drainRespBody(resp)
- if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusNotFound {
- return resp.StatusCode == http.StatusOK, nil
- }
- err = getErrorFromResponse(resp)
- }
- return false, err
- }
- // SetMetadata operation sets user-defined metadata on the specified queue.
- // Metadata is associated with the queue as name-value pairs.
- //
- // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Set-Queue-Metadata
- func (q *Queue) SetMetadata(options *QueueServiceOptions) error {
- params := url.Values{"comp": {"metadata"}}
- headers := q.qsc.client.getStandardHeaders()
- headers = q.qsc.client.addMetadataToHeaders(headers, q.Metadata)
- if options != nil {
- params = addTimeout(params, options.Timeout)
- headers = mergeHeaders(headers, headersFromStruct(*options))
- }
- uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
- resp, err := q.qsc.client.exec(http.MethodPut, uri, headers, nil, q.qsc.auth)
- if err != nil {
- return err
- }
- defer drainRespBody(resp)
- return checkRespCode(resp, []int{http.StatusNoContent})
- }
- // GetMetadata operation retrieves user-defined metadata and queue
- // properties on the specified queue. Metadata is associated with
- // the queue as name-values pairs.
- //
- // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Set-Queue-Metadata
- //
- // Because the way Golang's http client (and http.Header in particular)
- // canonicalize header names, the returned metadata names would always
- // be all lower case.
- func (q *Queue) GetMetadata(options *QueueServiceOptions) error {
- params := url.Values{"comp": {"metadata"}}
- headers := q.qsc.client.getStandardHeaders()
- if options != nil {
- params = addTimeout(params, options.Timeout)
- headers = mergeHeaders(headers, headersFromStruct(*options))
- }
- uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
- resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth)
- if err != nil {
- return err
- }
- defer drainRespBody(resp)
- if err := checkRespCode(resp, []int{http.StatusOK}); err != nil {
- return err
- }
- aproxMessagesStr := resp.Header.Get(http.CanonicalHeaderKey(approximateMessagesCountHeader))
- if aproxMessagesStr != "" {
- aproxMessages, err := strconv.ParseUint(aproxMessagesStr, 10, 64)
- if err != nil {
- return err
- }
- q.AproxMessageCount = aproxMessages
- }
- q.Metadata = getMetadataFromHeaders(resp.Header)
- return nil
- }
- // GetMessageReference returns a message object with the specified text.
- func (q *Queue) GetMessageReference(text string) *Message {
- return &Message{
- Queue: q,
- Text: text,
- }
- }
- // GetMessagesOptions is the set of options can be specified for Get
- // Messsages operation. A zero struct does not use any preferences for the
- // request.
- type GetMessagesOptions struct {
- Timeout uint
- NumOfMessages int
- VisibilityTimeout int
- RequestID string `header:"x-ms-client-request-id"`
- }
- type messages struct {
- XMLName xml.Name `xml:"QueueMessagesList"`
- Messages []Message `xml:"QueueMessage"`
- }
- // GetMessages operation retrieves one or more messages from the front of the
- // queue.
- //
- // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Get-Messages
- func (q *Queue) GetMessages(options *GetMessagesOptions) ([]Message, error) {
- query := url.Values{}
- headers := q.qsc.client.getStandardHeaders()
- if options != nil {
- if options.NumOfMessages != 0 {
- query.Set("numofmessages", strconv.Itoa(options.NumOfMessages))
- }
- if options.VisibilityTimeout != 0 {
- query.Set("visibilitytimeout", strconv.Itoa(options.VisibilityTimeout))
- }
- query = addTimeout(query, options.Timeout)
- headers = mergeHeaders(headers, headersFromStruct(*options))
- }
- uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPathMessages(), query)
- resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth)
- if err != nil {
- return []Message{}, err
- }
- defer resp.Body.Close()
- var out messages
- err = xmlUnmarshal(resp.Body, &out)
- if err != nil {
- return []Message{}, err
- }
- for i := range out.Messages {
- out.Messages[i].Queue = q
- }
- return out.Messages, err
- }
- // PeekMessagesOptions is the set of options can be specified for Peek
- // Messsage operation. A zero struct does not use any preferences for the
- // request.
- type PeekMessagesOptions struct {
- Timeout uint
- NumOfMessages int
- RequestID string `header:"x-ms-client-request-id"`
- }
- // PeekMessages retrieves one or more messages from the front of the queue, but
- // does not alter the visibility of the message.
- //
- // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Peek-Messages
- func (q *Queue) PeekMessages(options *PeekMessagesOptions) ([]Message, error) {
- query := url.Values{"peekonly": {"true"}} // Required for peek operation
- headers := q.qsc.client.getStandardHeaders()
- if options != nil {
- if options.NumOfMessages != 0 {
- query.Set("numofmessages", strconv.Itoa(options.NumOfMessages))
- }
- query = addTimeout(query, options.Timeout)
- headers = mergeHeaders(headers, headersFromStruct(*options))
- }
- uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPathMessages(), query)
- resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth)
- if err != nil {
- return []Message{}, err
- }
- defer resp.Body.Close()
- var out messages
- err = xmlUnmarshal(resp.Body, &out)
- if err != nil {
- return []Message{}, err
- }
- for i := range out.Messages {
- out.Messages[i].Queue = q
- }
- return out.Messages, err
- }
- // ClearMessages operation deletes all messages from the specified queue.
- //
- // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Clear-Messages
- func (q *Queue) ClearMessages(options *QueueServiceOptions) error {
- params := url.Values{}
- headers := q.qsc.client.getStandardHeaders()
- if options != nil {
- params = addTimeout(params, options.Timeout)
- headers = mergeHeaders(headers, headersFromStruct(*options))
- }
- uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPathMessages(), params)
- resp, err := q.qsc.client.exec(http.MethodDelete, uri, headers, nil, q.qsc.auth)
- if err != nil {
- return err
- }
- defer drainRespBody(resp)
- return checkRespCode(resp, []int{http.StatusNoContent})
- }
- // SetPermissions sets up queue permissions
- // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/set-queue-acl
- func (q *Queue) SetPermissions(permissions QueuePermissions, options *SetQueuePermissionOptions) error {
- body, length, err := generateQueueACLpayload(permissions.AccessPolicies)
- if err != nil {
- return err
- }
- params := url.Values{
- "comp": {"acl"},
- }
- headers := q.qsc.client.getStandardHeaders()
- headers["Content-Length"] = strconv.Itoa(length)
- if options != nil {
- params = addTimeout(params, options.Timeout)
- headers = mergeHeaders(headers, headersFromStruct(*options))
- }
- uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
- resp, err := q.qsc.client.exec(http.MethodPut, uri, headers, body, q.qsc.auth)
- if err != nil {
- return err
- }
- defer drainRespBody(resp)
- return checkRespCode(resp, []int{http.StatusNoContent})
- }
- func generateQueueACLpayload(policies []QueueAccessPolicy) (io.Reader, int, error) {
- sil := SignedIdentifiers{
- SignedIdentifiers: []SignedIdentifier{},
- }
- for _, qapd := range policies {
- permission := qapd.generateQueuePermissions()
- signedIdentifier := convertAccessPolicyToXMLStructs(qapd.ID, qapd.StartTime, qapd.ExpiryTime, permission)
- sil.SignedIdentifiers = append(sil.SignedIdentifiers, signedIdentifier)
- }
- return xmlMarshal(sil)
- }
- func (qapd *QueueAccessPolicy) generateQueuePermissions() (permissions string) {
- // generate the permissions string (raup).
- // still want the end user API to have bool flags.
- permissions = ""
- if qapd.CanRead {
- permissions += "r"
- }
- if qapd.CanAdd {
- permissions += "a"
- }
- if qapd.CanUpdate {
- permissions += "u"
- }
- if qapd.CanProcess {
- permissions += "p"
- }
- return permissions
- }
- // GetQueuePermissionOptions includes options for a get queue permissions operation
- type GetQueuePermissionOptions struct {
- Timeout uint
- RequestID string `header:"x-ms-client-request-id"`
- }
- // GetPermissions gets the queue permissions as per https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/get-queue-acl
- // If timeout is 0 then it will not be passed to Azure
- func (q *Queue) GetPermissions(options *GetQueuePermissionOptions) (*QueuePermissions, error) {
- params := url.Values{
- "comp": {"acl"},
- }
- headers := q.qsc.client.getStandardHeaders()
- if options != nil {
- params = addTimeout(params, options.Timeout)
- headers = mergeHeaders(headers, headersFromStruct(*options))
- }
- uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
- resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- var ap AccessPolicy
- err = xmlUnmarshal(resp.Body, &ap.SignedIdentifiersList)
- if err != nil {
- return nil, err
- }
- return buildQueueAccessPolicy(ap, &resp.Header), nil
- }
- func buildQueueAccessPolicy(ap AccessPolicy, headers *http.Header) *QueuePermissions {
- permissions := QueuePermissions{
- AccessPolicies: []QueueAccessPolicy{},
- }
- for _, policy := range ap.SignedIdentifiersList.SignedIdentifiers {
- qapd := QueueAccessPolicy{
- ID: policy.ID,
- StartTime: policy.AccessPolicy.StartTime,
- ExpiryTime: policy.AccessPolicy.ExpiryTime,
- }
- qapd.CanRead = updatePermissions(policy.AccessPolicy.Permission, "r")
- qapd.CanAdd = updatePermissions(policy.AccessPolicy.Permission, "a")
- qapd.CanUpdate = updatePermissions(policy.AccessPolicy.Permission, "u")
- qapd.CanProcess = updatePermissions(policy.AccessPolicy.Permission, "p")
- permissions.AccessPolicies = append(permissions.AccessPolicies, qapd)
- }
- return &permissions
- }
|