123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- /*
- Copyright 2018 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package master
- import (
- "context"
- "fmt"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "github.com/stretchr/testify/require"
- apierrors "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/util/wait"
- auditinternal "k8s.io/apiserver/pkg/apis/audit"
- "k8s.io/apiserver/pkg/features"
- utilfeature "k8s.io/apiserver/pkg/util/feature"
- "k8s.io/client-go/kubernetes"
- featuregatetesting "k8s.io/component-base/featuregate/testing"
- "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
- "k8s.io/kubernetes/test/integration/framework"
- "k8s.io/kubernetes/test/utils"
- )
- // TestDynamicAudit ensures that v1alpha of the auditregistration api works
- func TestDynamicAudit(t *testing.T) {
- // start api server
- stopCh := make(chan struct{})
- defer close(stopCh)
- defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicAuditing, true)()
- kubeclient, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
- ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
- opts.Audit.DynamicOptions.Enabled = true
- // set max batch size so the buffers flush immediately
- opts.Audit.DynamicOptions.BatchConfig.MaxBatchSize = 1
- opts.APIEnablement.RuntimeConfig.Set("auditregistration.k8s.io/v1alpha1=true")
- },
- })
- // create test sinks
- testServer1 := utils.NewAuditTestServer(t, "test1")
- defer testServer1.Close()
- testServer2 := utils.NewAuditTestServer(t, "test2")
- defer testServer2.Close()
- // check that servers are healthy
- require.NoError(t, testServer1.Health(), "server1 never became healthy")
- require.NoError(t, testServer2.Health(), "server2 never became healthy")
- // build AuditSink configurations
- sinkConfig1 := testServer1.BuildSinkConfiguration()
- sinkConfig2 := testServer2.BuildSinkConfiguration()
- // test creates a single audit sink, generates audit events, and ensures they arrive at the server
- success := t.Run("one sink", func(t *testing.T) {
- _, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Create(context.TODO(), sinkConfig1, metav1.CreateOptions{})
- require.NoError(t, err, "failed to create audit sink1")
- t.Log("created audit sink1")
- // verify sink is ready
- sinkHealth(t, kubeclient, testServer1)
- // perform configmap ops
- configMapOperations(t, kubeclient)
- // check for corresponding events
- missing, err := testServer1.WaitForEvents(expectedEvents)
- require.NoError(t, err, "failed to match all expected events for server1, events %#v not found", missing)
- })
- require.True(t, success) // propagate failure
- // test creates a second audit sink, generates audit events, and ensures events arrive in both servers
- success = t.Run("two sink", func(t *testing.T) {
- _, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Create(context.TODO(), sinkConfig2, metav1.CreateOptions{})
- require.NoError(t, err, "failed to create audit sink2")
- t.Log("created audit sink2")
- // verify both sinks are ready
- sinkHealth(t, kubeclient, testServer1, testServer2)
- // perform configmap ops
- configMapOperations(t, kubeclient)
- // check for corresponding events in both sinks
- missing, err := testServer1.WaitForEvents(expectedEvents)
- require.NoError(t, err, "failed to match all expected events for server1, events %#v not found", missing)
- missing, err = testServer2.WaitForEvents(expectedEvents)
- require.NoError(t, err, "failed to match all expected events for server2, events %#v not found", missing)
- })
- require.True(t, success) // propagate failure
- // test deletes an audit sink, generates audit events, and ensures they don't arrive in the corresponding server
- success = t.Run("delete sink", func(t *testing.T) {
- err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Delete(context.TODO(), sinkConfig2.Name, &metav1.DeleteOptions{})
- require.NoError(t, err, "failed to delete audit sink2")
- t.Log("deleted audit sink2")
- var finalErr error
- err = wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
- // reset event lists
- testServer1.ResetEventList()
- testServer2.ResetEventList()
- // perform configmap ops
- configMapOperations(t, kubeclient)
- // check for corresponding events in server1
- missing, err := testServer1.WaitForEvents(expectedEvents)
- if err != nil {
- finalErr = fmt.Errorf("%v: failed to match all expected events for server1, events %#v not found", err, missing)
- return false, nil
- }
- // check that server2 is empty
- if len(testServer2.GetEventList().Items) != 0 {
- finalErr = fmt.Errorf("server2 event list should be empty")
- return false, nil
- }
- return true, nil
- })
- require.NoError(t, err, finalErr)
- })
- require.True(t, success) // propagate failure
- // This test will run a background process that generates audit events sending them to a sink.
- // Whilst that generation is occurring, the sink is updated to point to a different server.
- // The test checks that no events are lost or duplicated during the update.
- t.Run("update sink", func(t *testing.T) {
- // fetch sink1 config
- sink1, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Get(context.TODO(), sinkConfig1.Name, metav1.GetOptions{})
- require.NoError(t, err)
- // reset event lists
- testServer1.ResetEventList()
- testServer2.ResetEventList()
- // run operations in background
- stopChan := make(chan struct{})
- expectedEvents := &atomic.Value{}
- expectedEvents.Store([]utils.AuditEvent{})
- wg := &sync.WaitGroup{}
- wg.Add(1)
- go asyncOps(stopChan, wg, kubeclient, expectedEvents)
- // check to see that at least 20 events have arrived in server1
- err = testServer1.WaitForNumEvents(20)
- require.NoError(t, err, "failed to find enough events in server1")
- // check that no events are in server 2 yet
- require.Len(t, testServer2.GetEventList().Items, 0, "server2 should not have events yet")
- // update the url
- sink1.Spec.Webhook.ClientConfig.URL = &testServer2.Server.URL
- _, err = kubeclient.AuditregistrationV1alpha1().AuditSinks().Update(context.TODO(), sink1, metav1.UpdateOptions{})
- require.NoError(t, err, "failed to update audit sink1")
- t.Log("updated audit sink1 to point to server2")
- // check that at least 20 events have arrived in server2
- err = testServer2.WaitForNumEvents(20)
- require.NoError(t, err, "failed to find enough events in server2")
- // stop the operations and ensure they have finished
- close(stopChan)
- wg.Wait()
- // check that the final events have arrived
- expected := expectedEvents.Load().([]utils.AuditEvent)
- missing, err := testServer2.WaitForEvents(expected[len(expected)-4:])
- require.NoError(t, err, "failed to find the final events in server2, events %#v not found", missing)
- // combine the event lists
- el1 := testServer1.GetEventList()
- el2 := testServer2.GetEventList()
- combinedList := auditinternal.EventList{}
- combinedList.Items = append(el1.Items, el2.Items...)
- // check that there are no duplicate events
- dups, err := utils.CheckForDuplicates(combinedList)
- require.NoError(t, err, "duplicate events found: %#v", dups)
- // check that no events are missing
- missing, err = utils.CheckAuditList(combinedList, expected)
- require.NoError(t, err, "failed to match all expected events: %#v not found", missing)
- })
- }
- // sinkHealth checks if sinks are running by verifying that uniquely identified events are found
- // in the given servers
- func sinkHealth(t *testing.T, kubeclient kubernetes.Interface, servers ...*utils.AuditTestServer) {
- var missing []utils.AuditEvent
- i := 0
- var finalErr error
- err := wait.PollImmediate(50*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
- i++
- name := fmt.Sprintf("health-%d-%d", i, time.Now().UnixNano())
- expected, err := simpleOp(name, kubeclient)
- require.NoError(t, err, "could not perform config map operations")
- // check that all given servers have received events
- for _, server := range servers {
- missing, err = server.WaitForEvents(expected)
- if err != nil {
- finalErr = fmt.Errorf("not all events found in %s health check: missing %#v", server.Name, missing)
- return false, nil
- }
- server.ResetEventList()
- }
- return true, nil
- })
- require.NoError(t, err, finalErr)
- }
- // simpleOp is a function that simply tries to get a configmap with the given name and returns the
- // corresponding expected audit event
- func simpleOp(name string, kubeclient kubernetes.Interface) ([]utils.AuditEvent, error) {
- _, err := kubeclient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
- if err != nil && !apierrors.IsNotFound(err) {
- return nil, err
- }
- expectedEvents := []utils.AuditEvent{
- {
- Level: auditinternal.LevelRequestResponse,
- Stage: auditinternal.StageResponseComplete,
- RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/configmaps/%s", namespace, name),
- Verb: "get",
- Code: 404,
- User: auditTestUser,
- Resource: "configmaps",
- Namespace: namespace,
- RequestObject: false,
- ResponseObject: true,
- AuthorizeDecision: "allow",
- },
- }
- return expectedEvents, nil
- }
- // asyncOps runs the simpleOp function until the stopChan is closed updating
- // the expected atomic events list
- func asyncOps(
- stopChan <-chan struct{},
- wg *sync.WaitGroup,
- kubeclient kubernetes.Interface,
- expected *atomic.Value,
- ) {
- for i := 0; ; i++ {
- select {
- case <-stopChan:
- wg.Done()
- return
- default:
- name := fmt.Sprintf("health-%d-%d", i, time.Now().UnixNano())
- exp, err := simpleOp(name, kubeclient)
- if err != nil {
- // retry on errors
- continue
- }
- e := expected.Load().([]utils.AuditEvent)
- evList := append(e, exp...)
- expected.Store(evList)
- }
- }
- }
|