audit_dynamic.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package utils
  14. import (
  15. "fmt"
  16. "io/ioutil"
  17. "net/http"
  18. "net/http/httptest"
  19. "sync"
  20. "testing"
  21. "time"
  22. "github.com/stretchr/testify/require"
  23. auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/apimachinery/pkg/runtime"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. auditinternal "k8s.io/apiserver/pkg/apis/audit"
  28. auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
  29. "k8s.io/apiserver/pkg/audit"
  30. )
  31. // AuditTestServer is a helper server for dynamic audit testing
  32. type AuditTestServer struct {
  33. Name string
  34. LockedEventList *LockedEventList
  35. Server *httptest.Server
  36. t *testing.T
  37. }
  38. // LockedEventList is an event list with a lock for concurrent access
  39. type LockedEventList struct {
  40. *sync.RWMutex
  41. EventList auditinternal.EventList
  42. }
  43. // NewLockedEventList returns a new LockedEventList
  44. func NewLockedEventList() *LockedEventList {
  45. return &LockedEventList{
  46. RWMutex: &sync.RWMutex{},
  47. EventList: auditinternal.EventList{},
  48. }
  49. }
  50. // NewAuditTestServer returns a new audit test server
  51. func NewAuditTestServer(t *testing.T, name string) *AuditTestServer {
  52. s := &AuditTestServer{
  53. Name: name,
  54. LockedEventList: NewLockedEventList(),
  55. t: t,
  56. }
  57. s.buildServer()
  58. return s
  59. }
  60. // GetEventList safely returns the internal event list
  61. func (a *AuditTestServer) GetEventList() auditinternal.EventList {
  62. a.LockedEventList.RLock()
  63. defer a.LockedEventList.RUnlock()
  64. return a.LockedEventList.EventList
  65. }
  66. // ResetEventList resets the internal event list
  67. func (a *AuditTestServer) ResetEventList() {
  68. a.LockedEventList.Lock()
  69. defer a.LockedEventList.Unlock()
  70. a.LockedEventList.EventList = auditinternal.EventList{}
  71. }
  72. // AppendEvents will add the given events to the internal event list
  73. func (a *AuditTestServer) AppendEvents(events []auditinternal.Event) {
  74. a.LockedEventList.Lock()
  75. defer a.LockedEventList.Unlock()
  76. a.LockedEventList.EventList.Items = append(a.LockedEventList.EventList.Items, events...)
  77. }
  78. // WaitForEvents waits for the given events to arrive in the server or the 30s timeout is reached
  79. func (a *AuditTestServer) WaitForEvents(expected []AuditEvent) ([]AuditEvent, error) {
  80. var missing []AuditEvent
  81. err := wait.PollImmediate(50*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
  82. var err error
  83. el := a.GetEventList()
  84. if len(el.Items) < 1 {
  85. return false, nil
  86. }
  87. missing, err = CheckAuditList(el, expected)
  88. if err != nil {
  89. return false, nil
  90. }
  91. return true, nil
  92. })
  93. return missing, err
  94. }
  95. // WaitForNumEvents checks that at least the given number of events has arrived or the 30s timeout is reached
  96. func (a *AuditTestServer) WaitForNumEvents(numEvents int) error {
  97. err := wait.PollImmediate(50*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
  98. el := a.GetEventList()
  99. if len(el.Items) < numEvents {
  100. return false, nil
  101. }
  102. return true, nil
  103. })
  104. if err != nil {
  105. return fmt.Errorf("%v: %d events failed to arrive in %v", err, numEvents, wait.ForeverTestTimeout)
  106. }
  107. return nil
  108. }
  109. // Health polls the server healthcheck until successful or the 30s timeout has been reached
  110. func (a *AuditTestServer) Health() error {
  111. err := wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
  112. resp, err := http.Get(fmt.Sprintf("%s/health", a.Server.URL))
  113. if err != nil {
  114. return false, nil
  115. }
  116. if resp.StatusCode != 200 {
  117. return false, nil
  118. }
  119. return true, nil
  120. })
  121. if err != nil {
  122. return fmt.Errorf("server %s permanently failed health check: %v", a.Server.URL, err)
  123. }
  124. return nil
  125. }
  126. // Close the server
  127. func (a *AuditTestServer) Close() {
  128. a.Server.Close()
  129. }
  130. // BuildSinkConfiguration creates a generic audit sink configuration for this server
  131. func (a *AuditTestServer) BuildSinkConfiguration() *auditregv1alpha1.AuditSink {
  132. return &auditregv1alpha1.AuditSink{
  133. ObjectMeta: metav1.ObjectMeta{
  134. Name: a.Name,
  135. },
  136. Spec: auditregv1alpha1.AuditSinkSpec{
  137. Policy: auditregv1alpha1.Policy{
  138. Level: auditregv1alpha1.LevelRequestResponse,
  139. Stages: []auditregv1alpha1.Stage{
  140. auditregv1alpha1.StageResponseStarted,
  141. auditregv1alpha1.StageResponseComplete,
  142. },
  143. },
  144. Webhook: auditregv1alpha1.Webhook{
  145. ClientConfig: auditregv1alpha1.WebhookClientConfig{
  146. URL: &a.Server.URL,
  147. },
  148. },
  149. },
  150. }
  151. }
  152. // buildServer creates an http test server that will update the internal event list
  153. // with the value it receives
  154. func (a *AuditTestServer) buildServer() {
  155. decoder := audit.Codecs.UniversalDecoder(auditv1.SchemeGroupVersion)
  156. mux := http.NewServeMux()
  157. mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
  158. body, err := ioutil.ReadAll(r.Body)
  159. require.NoError(a.t, err, "could not read request body")
  160. el := auditinternal.EventList{}
  161. err = runtime.DecodeInto(decoder, body, &el)
  162. r.Body.Close()
  163. require.NoError(a.t, err, "failed decoding buf: %b, apiVersion: %s", body, auditv1.SchemeGroupVersion)
  164. a.AppendEvents(el.Items)
  165. w.WriteHeader(200)
  166. })
  167. mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
  168. w.WriteHeader(200)
  169. })
  170. a.Server = httptest.NewServer(mux)
  171. }