audit_dynamic.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package utils
  14. import (
  15. "fmt"
  16. "io/ioutil"
  17. "net/http"
  18. "net/http/httptest"
  19. "sync"
  20. "testing"
  21. "time"
  22. "github.com/stretchr/testify/require"
  23. auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/apimachinery/pkg/runtime"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. auditinternal "k8s.io/apiserver/pkg/apis/audit"
  28. auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
  29. "k8s.io/apiserver/pkg/audit"
  30. )
  31. // AuditTestServer is a helper server for dynamic audit testing
  32. type AuditTestServer struct {
  33. Name string
  34. LockedEventList *LockedEventList
  35. Server *httptest.Server
  36. t *testing.T
  37. }
  38. // LockedEventList is an event list with a lock for concurrent access
  39. type LockedEventList struct {
  40. *sync.RWMutex
  41. EventList auditinternal.EventList
  42. }
  43. // NewLockedEventList returns a new LockedEventList
  44. func NewLockedEventList() *LockedEventList {
  45. return &LockedEventList{
  46. RWMutex: &sync.RWMutex{},
  47. EventList: auditinternal.EventList{},
  48. }
  49. }
  50. // NewAuditTestServer returns a new audit test server
  51. func NewAuditTestServer(t *testing.T, name string) *AuditTestServer {
  52. s := &AuditTestServer{
  53. Name: name,
  54. LockedEventList: NewLockedEventList(),
  55. t: t,
  56. }
  57. s.buildServer()
  58. return s
  59. }
  60. // GetEventList safely returns the internal event list
  61. func (a *AuditTestServer) GetEventList() auditinternal.EventList {
  62. a.LockedEventList.RLock()
  63. defer a.LockedEventList.RUnlock()
  64. return a.LockedEventList.EventList
  65. }
  66. // ResetEventList resets the internal event list
  67. func (a *AuditTestServer) ResetEventList() {
  68. a.LockedEventList.Lock()
  69. defer a.LockedEventList.Unlock()
  70. a.LockedEventList.EventList = auditinternal.EventList{}
  71. }
  72. // AppendEvents will add the given events to the internal event list
  73. func (a *AuditTestServer) AppendEvents(events []auditinternal.Event) {
  74. a.LockedEventList.Lock()
  75. defer a.LockedEventList.Unlock()
  76. a.LockedEventList.EventList.Items = append(a.LockedEventList.EventList.Items, events...)
  77. }
  78. // WaitForEvents waits for the given events to arrive in the server or the 30s timeout is reached
  79. func (a *AuditTestServer) WaitForEvents(expected []AuditEvent) ([]AuditEvent, error) {
  80. var missing []AuditEvent
  81. err := wait.PollImmediate(50*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
  82. var err error
  83. a.LockedEventList.RLock()
  84. defer a.LockedEventList.RUnlock()
  85. el := a.GetEventList()
  86. if len(el.Items) < 1 {
  87. return false, nil
  88. }
  89. missing, err = CheckAuditList(el, expected)
  90. if err != nil {
  91. return false, nil
  92. }
  93. return true, nil
  94. })
  95. return missing, err
  96. }
  97. // WaitForNumEvents checks that at least the given number of events has arrived or the 30s timeout is reached
  98. func (a *AuditTestServer) WaitForNumEvents(numEvents int) error {
  99. err := wait.PollImmediate(50*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
  100. el := a.GetEventList()
  101. if len(el.Items) < numEvents {
  102. return false, nil
  103. }
  104. return true, nil
  105. })
  106. if err != nil {
  107. return fmt.Errorf("%v: %d events failed to arrive in %v", err, numEvents, wait.ForeverTestTimeout)
  108. }
  109. return nil
  110. }
  111. // Health polls the server healthcheck until successful or the 30s timeout has been reached
  112. func (a *AuditTestServer) Health() error {
  113. err := wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
  114. resp, err := http.Get(fmt.Sprintf("%s/health", a.Server.URL))
  115. if err != nil {
  116. return false, nil
  117. }
  118. if resp.StatusCode != 200 {
  119. return false, nil
  120. }
  121. return true, nil
  122. })
  123. if err != nil {
  124. return fmt.Errorf("server %s permanently failed health check: %v", a.Server.URL, err)
  125. }
  126. return nil
  127. }
  128. // Close the server
  129. func (a *AuditTestServer) Close() {
  130. a.Server.Close()
  131. }
  132. // BuildSinkConfiguration creates a generic audit sink configuration for this server
  133. func (a *AuditTestServer) BuildSinkConfiguration() *auditregv1alpha1.AuditSink {
  134. return &auditregv1alpha1.AuditSink{
  135. ObjectMeta: metav1.ObjectMeta{
  136. Name: a.Name,
  137. },
  138. Spec: auditregv1alpha1.AuditSinkSpec{
  139. Policy: auditregv1alpha1.Policy{
  140. Level: auditregv1alpha1.LevelRequestResponse,
  141. Stages: []auditregv1alpha1.Stage{
  142. auditregv1alpha1.StageResponseStarted,
  143. auditregv1alpha1.StageResponseComplete,
  144. },
  145. },
  146. Webhook: auditregv1alpha1.Webhook{
  147. ClientConfig: auditregv1alpha1.WebhookClientConfig{
  148. URL: &a.Server.URL,
  149. },
  150. },
  151. },
  152. }
  153. }
  154. // buildServer creates an http test server that will update the internal event list
  155. // with the value it receives
  156. func (a *AuditTestServer) buildServer() {
  157. decoder := audit.Codecs.UniversalDecoder(auditv1.SchemeGroupVersion)
  158. mux := http.NewServeMux()
  159. mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
  160. body, err := ioutil.ReadAll(r.Body)
  161. require.NoError(a.t, err, "could not read request body")
  162. el := auditinternal.EventList{}
  163. err = runtime.DecodeInto(decoder, body, &el)
  164. r.Body.Close()
  165. require.NoError(a.t, err, "failed decoding buf: %b, apiVersion: %s", body, auditv1.SchemeGroupVersion)
  166. a.AppendEvents(el.Items)
  167. w.WriteHeader(200)
  168. })
  169. mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
  170. w.WriteHeader(200)
  171. })
  172. a.Server = httptest.NewServer(mux)
  173. }