audit_dynamic_test.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package master
  14. import (
  15. "fmt"
  16. "sync"
  17. "sync/atomic"
  18. "testing"
  19. "time"
  20. "github.com/stretchr/testify/require"
  21. "k8s.io/apimachinery/pkg/api/errors"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/util/wait"
  24. auditinternal "k8s.io/apiserver/pkg/apis/audit"
  25. "k8s.io/apiserver/pkg/features"
  26. utilfeature "k8s.io/apiserver/pkg/util/feature"
  27. "k8s.io/client-go/kubernetes"
  28. featuregatetesting "k8s.io/component-base/featuregate/testing"
  29. "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
  30. "k8s.io/kubernetes/test/integration/framework"
  31. "k8s.io/kubernetes/test/utils"
  32. )
  33. // TestDynamicAudit ensures that v1alpha of the auditregistration api works
  34. func TestDynamicAudit(t *testing.T) {
  35. // start api server
  36. stopCh := make(chan struct{})
  37. defer close(stopCh)
  38. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicAuditing, true)()
  39. kubeclient, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
  40. ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
  41. opts.Audit.DynamicOptions.Enabled = true
  42. // set max batch size so the buffers flush immediately
  43. opts.Audit.DynamicOptions.BatchConfig.MaxBatchSize = 1
  44. opts.APIEnablement.RuntimeConfig.Set("auditregistration.k8s.io/v1alpha1=true")
  45. },
  46. })
  47. // create test sinks
  48. testServer1 := utils.NewAuditTestServer(t, "test1")
  49. defer testServer1.Close()
  50. testServer2 := utils.NewAuditTestServer(t, "test2")
  51. defer testServer2.Close()
  52. // check that servers are healthy
  53. require.NoError(t, testServer1.Health(), "server1 never became healthy")
  54. require.NoError(t, testServer2.Health(), "server2 never became healthy")
  55. // build AuditSink configurations
  56. sinkConfig1 := testServer1.BuildSinkConfiguration()
  57. sinkConfig2 := testServer2.BuildSinkConfiguration()
  58. // test creates a single audit sink, generates audit events, and ensures they arrive at the server
  59. success := t.Run("one sink", func(t *testing.T) {
  60. _, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Create(sinkConfig1)
  61. require.NoError(t, err, "failed to create audit sink1")
  62. t.Log("created audit sink1")
  63. // verify sink is ready
  64. sinkHealth(t, kubeclient, testServer1)
  65. // perform configmap ops
  66. configMapOperations(t, kubeclient)
  67. // check for corresponding events
  68. missing, err := testServer1.WaitForEvents(expectedEvents)
  69. require.NoError(t, err, "failed to match all expected events for server1, events %#v not found", missing)
  70. })
  71. require.True(t, success) // propagate failure
  72. // test creates a second audit sink, generates audit events, and ensures events arrive in both servers
  73. success = t.Run("two sink", func(t *testing.T) {
  74. _, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Create(sinkConfig2)
  75. require.NoError(t, err, "failed to create audit sink2")
  76. t.Log("created audit sink2")
  77. // verify both sinks are ready
  78. sinkHealth(t, kubeclient, testServer1, testServer2)
  79. // perform configmap ops
  80. configMapOperations(t, kubeclient)
  81. // check for corresponding events in both sinks
  82. missing, err := testServer1.WaitForEvents(expectedEvents)
  83. require.NoError(t, err, "failed to match all expected events for server1, events %#v not found", missing)
  84. missing, err = testServer2.WaitForEvents(expectedEvents)
  85. require.NoError(t, err, "failed to match all expected events for server2, events %#v not found", missing)
  86. })
  87. require.True(t, success) // propagate failure
  88. // test deletes an audit sink, generates audit events, and ensures they don't arrive in the corresponding server
  89. success = t.Run("delete sink", func(t *testing.T) {
  90. err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Delete(sinkConfig2.Name, &metav1.DeleteOptions{})
  91. require.NoError(t, err, "failed to delete audit sink2")
  92. t.Log("deleted audit sink2")
  93. var finalErr error
  94. err = wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
  95. // reset event lists
  96. testServer1.ResetEventList()
  97. testServer2.ResetEventList()
  98. // perform configmap ops
  99. configMapOperations(t, kubeclient)
  100. // check for corresponding events in server1
  101. missing, err := testServer1.WaitForEvents(expectedEvents)
  102. if err != nil {
  103. finalErr = fmt.Errorf("%v: failed to match all expected events for server1, events %#v not found", err, missing)
  104. return false, nil
  105. }
  106. // check that server2 is empty
  107. if len(testServer2.GetEventList().Items) != 0 {
  108. finalErr = fmt.Errorf("server2 event list should be empty")
  109. return false, nil
  110. }
  111. return true, nil
  112. })
  113. require.NoError(t, err, finalErr)
  114. })
  115. require.True(t, success) // propagate failure
  116. // This test will run a background process that generates audit events sending them to a sink.
  117. // Whilst that generation is occurring, the sink is updated to point to a different server.
  118. // The test checks that no events are lost or duplicated during the update.
  119. t.Run("update sink", func(t *testing.T) {
  120. // fetch sink1 config
  121. sink1, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Get(sinkConfig1.Name, metav1.GetOptions{})
  122. require.NoError(t, err)
  123. // reset event lists
  124. testServer1.ResetEventList()
  125. testServer2.ResetEventList()
  126. // run operations in background
  127. stopChan := make(chan struct{})
  128. expectedEvents := &atomic.Value{}
  129. expectedEvents.Store([]utils.AuditEvent{})
  130. wg := &sync.WaitGroup{}
  131. wg.Add(1)
  132. go asyncOps(stopChan, wg, kubeclient, expectedEvents)
  133. // check to see that at least 20 events have arrived in server1
  134. err = testServer1.WaitForNumEvents(20)
  135. require.NoError(t, err, "failed to find enough events in server1")
  136. // check that no events are in server 2 yet
  137. require.Len(t, testServer2.GetEventList().Items, 0, "server2 should not have events yet")
  138. // update the url
  139. sink1.Spec.Webhook.ClientConfig.URL = &testServer2.Server.URL
  140. _, err = kubeclient.AuditregistrationV1alpha1().AuditSinks().Update(sink1)
  141. require.NoError(t, err, "failed to update audit sink1")
  142. t.Log("updated audit sink1 to point to server2")
  143. // check that at least 20 events have arrived in server2
  144. err = testServer2.WaitForNumEvents(20)
  145. require.NoError(t, err, "failed to find enough events in server2")
  146. // stop the operations and ensure they have finished
  147. close(stopChan)
  148. wg.Wait()
  149. // check that the final events have arrived
  150. expected := expectedEvents.Load().([]utils.AuditEvent)
  151. missing, err := testServer2.WaitForEvents(expected[len(expected)-4:])
  152. require.NoError(t, err, "failed to find the final events in server2, events %#v not found", missing)
  153. // combine the event lists
  154. el1 := testServer1.GetEventList()
  155. el2 := testServer2.GetEventList()
  156. combinedList := auditinternal.EventList{}
  157. combinedList.Items = append(el1.Items, el2.Items...)
  158. // check that there are no duplicate events
  159. dups, err := utils.CheckForDuplicates(combinedList)
  160. require.NoError(t, err, "duplicate events found: %#v", dups)
  161. // check that no events are missing
  162. missing, err = utils.CheckAuditList(combinedList, expected)
  163. require.NoError(t, err, "failed to match all expected events: %#v not found", missing)
  164. })
  165. }
  166. // sinkHealth checks if sinks are running by verifying that uniquely identified events are found
  167. // in the given servers
  168. func sinkHealth(t *testing.T, kubeclient kubernetes.Interface, servers ...*utils.AuditTestServer) {
  169. var missing []utils.AuditEvent
  170. i := 0
  171. var finalErr error
  172. err := wait.PollImmediate(50*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
  173. i++
  174. name := fmt.Sprintf("health-%d-%d", i, time.Now().UnixNano())
  175. expected, err := simpleOp(name, kubeclient)
  176. require.NoError(t, err, "could not perform config map operations")
  177. // check that all given servers have received events
  178. for _, server := range servers {
  179. missing, err = server.WaitForEvents(expected)
  180. if err != nil {
  181. finalErr = fmt.Errorf("not all events found in %s health check: missing %#v", server.Name, missing)
  182. return false, nil
  183. }
  184. server.ResetEventList()
  185. }
  186. return true, nil
  187. })
  188. require.NoError(t, err, finalErr)
  189. }
  190. // simpleOp is a function that simply tries to get a configmap with the given name and returns the
  191. // corresponding expected audit event
  192. func simpleOp(name string, kubeclient kubernetes.Interface) ([]utils.AuditEvent, error) {
  193. _, err := kubeclient.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{})
  194. if err != nil && !errors.IsNotFound(err) {
  195. return nil, err
  196. }
  197. expectedEvents := []utils.AuditEvent{
  198. {
  199. Level: auditinternal.LevelRequestResponse,
  200. Stage: auditinternal.StageResponseComplete,
  201. RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/configmaps/%s", namespace, name),
  202. Verb: "get",
  203. Code: 404,
  204. User: auditTestUser,
  205. Resource: "configmaps",
  206. Namespace: namespace,
  207. RequestObject: false,
  208. ResponseObject: true,
  209. AuthorizeDecision: "allow",
  210. },
  211. }
  212. return expectedEvents, nil
  213. }
  214. // asyncOps runs the simpleOp function until the stopChan is closed updating
  215. // the expected atomic events list
  216. func asyncOps(
  217. stopChan <-chan struct{},
  218. wg *sync.WaitGroup,
  219. kubeclient kubernetes.Interface,
  220. expected *atomic.Value,
  221. ) {
  222. for i := 0; ; i++ {
  223. select {
  224. case <-stopChan:
  225. wg.Done()
  226. return
  227. default:
  228. name := fmt.Sprintf("health-%d-%d", i, time.Now().UnixNano())
  229. exp, err := simpleOp(name, kubeclient)
  230. if err != nil {
  231. // retry on errors
  232. continue
  233. }
  234. e := expected.Load().([]utils.AuditEvent)
  235. evList := []utils.AuditEvent{}
  236. evList = append(e, exp...)
  237. expected.Store(evList)
  238. }
  239. }
  240. }