audit_dynamic_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package master
  14. import (
  15. "context"
  16. "fmt"
  17. "sync"
  18. "sync/atomic"
  19. "testing"
  20. "time"
  21. "github.com/stretchr/testify/require"
  22. apierrors "k8s.io/apimachinery/pkg/api/errors"
  23. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  24. "k8s.io/apimachinery/pkg/util/wait"
  25. auditinternal "k8s.io/apiserver/pkg/apis/audit"
  26. "k8s.io/apiserver/pkg/features"
  27. utilfeature "k8s.io/apiserver/pkg/util/feature"
  28. "k8s.io/client-go/kubernetes"
  29. featuregatetesting "k8s.io/component-base/featuregate/testing"
  30. "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
  31. "k8s.io/kubernetes/test/integration/framework"
  32. "k8s.io/kubernetes/test/utils"
  33. )
  34. // TestDynamicAudit ensures that v1alpha of the auditregistration api works
  35. func TestDynamicAudit(t *testing.T) {
  36. // start api server
  37. stopCh := make(chan struct{})
  38. defer close(stopCh)
  39. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicAuditing, true)()
  40. kubeclient, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
  41. ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
  42. opts.Audit.DynamicOptions.Enabled = true
  43. // set max batch size so the buffers flush immediately
  44. opts.Audit.DynamicOptions.BatchConfig.MaxBatchSize = 1
  45. opts.APIEnablement.RuntimeConfig.Set("auditregistration.k8s.io/v1alpha1=true")
  46. },
  47. })
  48. // create test sinks
  49. testServer1 := utils.NewAuditTestServer(t, "test1")
  50. defer testServer1.Close()
  51. testServer2 := utils.NewAuditTestServer(t, "test2")
  52. defer testServer2.Close()
  53. // check that servers are healthy
  54. require.NoError(t, testServer1.Health(), "server1 never became healthy")
  55. require.NoError(t, testServer2.Health(), "server2 never became healthy")
  56. // build AuditSink configurations
  57. sinkConfig1 := testServer1.BuildSinkConfiguration()
  58. sinkConfig2 := testServer2.BuildSinkConfiguration()
  59. // test creates a single audit sink, generates audit events, and ensures they arrive at the server
  60. success := t.Run("one sink", func(t *testing.T) {
  61. _, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Create(context.TODO(), sinkConfig1, metav1.CreateOptions{})
  62. require.NoError(t, err, "failed to create audit sink1")
  63. t.Log("created audit sink1")
  64. // verify sink is ready
  65. sinkHealth(t, kubeclient, testServer1)
  66. // perform configmap ops
  67. configMapOperations(t, kubeclient)
  68. // check for corresponding events
  69. missing, err := testServer1.WaitForEvents(expectedEvents)
  70. require.NoError(t, err, "failed to match all expected events for server1, events %#v not found", missing)
  71. })
  72. require.True(t, success) // propagate failure
  73. // test creates a second audit sink, generates audit events, and ensures events arrive in both servers
  74. success = t.Run("two sink", func(t *testing.T) {
  75. _, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Create(context.TODO(), sinkConfig2, metav1.CreateOptions{})
  76. require.NoError(t, err, "failed to create audit sink2")
  77. t.Log("created audit sink2")
  78. // verify both sinks are ready
  79. sinkHealth(t, kubeclient, testServer1, testServer2)
  80. // perform configmap ops
  81. configMapOperations(t, kubeclient)
  82. // check for corresponding events in both sinks
  83. missing, err := testServer1.WaitForEvents(expectedEvents)
  84. require.NoError(t, err, "failed to match all expected events for server1, events %#v not found", missing)
  85. missing, err = testServer2.WaitForEvents(expectedEvents)
  86. require.NoError(t, err, "failed to match all expected events for server2, events %#v not found", missing)
  87. })
  88. require.True(t, success) // propagate failure
  89. // test deletes an audit sink, generates audit events, and ensures they don't arrive in the corresponding server
  90. success = t.Run("delete sink", func(t *testing.T) {
  91. err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Delete(context.TODO(), sinkConfig2.Name, &metav1.DeleteOptions{})
  92. require.NoError(t, err, "failed to delete audit sink2")
  93. t.Log("deleted audit sink2")
  94. var finalErr error
  95. err = wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
  96. // reset event lists
  97. testServer1.ResetEventList()
  98. testServer2.ResetEventList()
  99. // perform configmap ops
  100. configMapOperations(t, kubeclient)
  101. // check for corresponding events in server1
  102. missing, err := testServer1.WaitForEvents(expectedEvents)
  103. if err != nil {
  104. finalErr = fmt.Errorf("%v: failed to match all expected events for server1, events %#v not found", err, missing)
  105. return false, nil
  106. }
  107. // check that server2 is empty
  108. if len(testServer2.GetEventList().Items) != 0 {
  109. finalErr = fmt.Errorf("server2 event list should be empty")
  110. return false, nil
  111. }
  112. return true, nil
  113. })
  114. require.NoError(t, err, finalErr)
  115. })
  116. require.True(t, success) // propagate failure
  117. // This test will run a background process that generates audit events sending them to a sink.
  118. // Whilst that generation is occurring, the sink is updated to point to a different server.
  119. // The test checks that no events are lost or duplicated during the update.
  120. t.Run("update sink", func(t *testing.T) {
  121. // fetch sink1 config
  122. sink1, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Get(context.TODO(), sinkConfig1.Name, metav1.GetOptions{})
  123. require.NoError(t, err)
  124. // reset event lists
  125. testServer1.ResetEventList()
  126. testServer2.ResetEventList()
  127. // run operations in background
  128. stopChan := make(chan struct{})
  129. expectedEvents := &atomic.Value{}
  130. expectedEvents.Store([]utils.AuditEvent{})
  131. wg := &sync.WaitGroup{}
  132. wg.Add(1)
  133. go asyncOps(stopChan, wg, kubeclient, expectedEvents)
  134. // check to see that at least 20 events have arrived in server1
  135. err = testServer1.WaitForNumEvents(20)
  136. require.NoError(t, err, "failed to find enough events in server1")
  137. // check that no events are in server 2 yet
  138. require.Len(t, testServer2.GetEventList().Items, 0, "server2 should not have events yet")
  139. // update the url
  140. sink1.Spec.Webhook.ClientConfig.URL = &testServer2.Server.URL
  141. _, err = kubeclient.AuditregistrationV1alpha1().AuditSinks().Update(context.TODO(), sink1, metav1.UpdateOptions{})
  142. require.NoError(t, err, "failed to update audit sink1")
  143. t.Log("updated audit sink1 to point to server2")
  144. // check that at least 20 events have arrived in server2
  145. err = testServer2.WaitForNumEvents(20)
  146. require.NoError(t, err, "failed to find enough events in server2")
  147. // stop the operations and ensure they have finished
  148. close(stopChan)
  149. wg.Wait()
  150. // check that the final events have arrived
  151. expected := expectedEvents.Load().([]utils.AuditEvent)
  152. missing, err := testServer2.WaitForEvents(expected[len(expected)-4:])
  153. require.NoError(t, err, "failed to find the final events in server2, events %#v not found", missing)
  154. // combine the event lists
  155. el1 := testServer1.GetEventList()
  156. el2 := testServer2.GetEventList()
  157. combinedList := auditinternal.EventList{}
  158. combinedList.Items = append(el1.Items, el2.Items...)
  159. // check that there are no duplicate events
  160. dups, err := utils.CheckForDuplicates(combinedList)
  161. require.NoError(t, err, "duplicate events found: %#v", dups)
  162. // check that no events are missing
  163. missing, err = utils.CheckAuditList(combinedList, expected)
  164. require.NoError(t, err, "failed to match all expected events: %#v not found", missing)
  165. })
  166. }
  167. // sinkHealth checks if sinks are running by verifying that uniquely identified events are found
  168. // in the given servers
  169. func sinkHealth(t *testing.T, kubeclient kubernetes.Interface, servers ...*utils.AuditTestServer) {
  170. var missing []utils.AuditEvent
  171. i := 0
  172. var finalErr error
  173. err := wait.PollImmediate(50*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
  174. i++
  175. name := fmt.Sprintf("health-%d-%d", i, time.Now().UnixNano())
  176. expected, err := simpleOp(name, kubeclient)
  177. require.NoError(t, err, "could not perform config map operations")
  178. // check that all given servers have received events
  179. for _, server := range servers {
  180. missing, err = server.WaitForEvents(expected)
  181. if err != nil {
  182. finalErr = fmt.Errorf("not all events found in %s health check: missing %#v", server.Name, missing)
  183. return false, nil
  184. }
  185. server.ResetEventList()
  186. }
  187. return true, nil
  188. })
  189. require.NoError(t, err, finalErr)
  190. }
  191. // simpleOp is a function that simply tries to get a configmap with the given name and returns the
  192. // corresponding expected audit event
  193. func simpleOp(name string, kubeclient kubernetes.Interface) ([]utils.AuditEvent, error) {
  194. _, err := kubeclient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
  195. if err != nil && !apierrors.IsNotFound(err) {
  196. return nil, err
  197. }
  198. expectedEvents := []utils.AuditEvent{
  199. {
  200. Level: auditinternal.LevelRequestResponse,
  201. Stage: auditinternal.StageResponseComplete,
  202. RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/configmaps/%s", namespace, name),
  203. Verb: "get",
  204. Code: 404,
  205. User: auditTestUser,
  206. Resource: "configmaps",
  207. Namespace: namespace,
  208. RequestObject: false,
  209. ResponseObject: true,
  210. AuthorizeDecision: "allow",
  211. },
  212. }
  213. return expectedEvents, nil
  214. }
  215. // asyncOps runs the simpleOp function until the stopChan is closed updating
  216. // the expected atomic events list
  217. func asyncOps(
  218. stopChan <-chan struct{},
  219. wg *sync.WaitGroup,
  220. kubeclient kubernetes.Interface,
  221. expected *atomic.Value,
  222. ) {
  223. for i := 0; ; i++ {
  224. select {
  225. case <-stopChan:
  226. wg.Done()
  227. return
  228. default:
  229. name := fmt.Sprintf("health-%d-%d", i, time.Now().UnixNano())
  230. exp, err := simpleOp(name, kubeclient)
  231. if err != nil {
  232. // retry on errors
  233. continue
  234. }
  235. e := expected.Load().([]utils.AuditEvent)
  236. evList := append(e, exp...)
  237. expected.Store(evList)
  238. }
  239. }
  240. }