monitor_test.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. // +build linux
  2. /*
  3. Copyright 2019 The Kubernetes Authors.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package iptables
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "sync"
  20. "sync/atomic"
  21. "testing"
  22. "time"
  23. "k8s.io/apimachinery/pkg/util/sets"
  24. utilwait "k8s.io/apimachinery/pkg/util/wait"
  25. "k8s.io/utils/exec"
  26. )
  27. // We can't use the normal FakeExec because we don't know precisely how many times the
  28. // Monitor thread will do its checks, and we don't know precisely how its iptables calls
  29. // will interleave with the main thread's. So we use our own fake Exec implementation that
  30. // implements a minimal iptables interface. This will need updates as iptables.runner
  31. // changes its use of Exec.
  32. type monitorFakeExec struct {
  33. sync.Mutex
  34. tables map[string]sets.String
  35. block bool
  36. wasBlocked bool
  37. }
  38. func newMonitorFakeExec() *monitorFakeExec {
  39. tables := make(map[string]sets.String)
  40. tables["mangle"] = sets.NewString()
  41. tables["filter"] = sets.NewString()
  42. tables["nat"] = sets.NewString()
  43. return &monitorFakeExec{tables: tables}
  44. }
  45. func (mfe *monitorFakeExec) blockIPTables(block bool) {
  46. mfe.Lock()
  47. defer mfe.Unlock()
  48. mfe.block = block
  49. }
  50. func (mfe *monitorFakeExec) getWasBlocked() bool {
  51. mfe.Lock()
  52. defer mfe.Unlock()
  53. wasBlocked := mfe.wasBlocked
  54. mfe.wasBlocked = false
  55. return wasBlocked
  56. }
  57. func (mfe *monitorFakeExec) Command(cmd string, args ...string) exec.Cmd {
  58. return &monitorFakeCmd{mfe: mfe, cmd: cmd, args: args}
  59. }
  60. func (mfe *monitorFakeExec) CommandContext(ctx context.Context, cmd string, args ...string) exec.Cmd {
  61. return mfe.Command(cmd, args...)
  62. }
  63. func (mfe *monitorFakeExec) LookPath(file string) (string, error) {
  64. return file, nil
  65. }
  66. type monitorFakeCmd struct {
  67. mfe *monitorFakeExec
  68. cmd string
  69. args []string
  70. }
  71. func (mfc *monitorFakeCmd) CombinedOutput() ([]byte, error) {
  72. if mfc.cmd == cmdIPTablesRestore {
  73. // Only used for "iptables-restore --version", and the result doesn't matter
  74. return []byte{}, nil
  75. } else if mfc.cmd != cmdIPTables {
  76. panic("bad command " + mfc.cmd)
  77. }
  78. if len(mfc.args) == 1 && mfc.args[0] == "--version" {
  79. return []byte("iptables v1.6.2"), nil
  80. }
  81. if len(mfc.args) != 8 || mfc.args[0] != WaitString || mfc.args[1] != WaitSecondsValue || mfc.args[2] != WaitIntervalString || mfc.args[3] != WaitIntervalUsecondsValue || mfc.args[6] != "-t" {
  82. panic(fmt.Sprintf("bad args %#v", mfc.args))
  83. }
  84. op := operation(mfc.args[4])
  85. chainName := mfc.args[5]
  86. tableName := mfc.args[7]
  87. mfc.mfe.Lock()
  88. defer mfc.mfe.Unlock()
  89. table := mfc.mfe.tables[tableName]
  90. if table == nil {
  91. return []byte{}, fmt.Errorf("no such table %q", tableName)
  92. }
  93. // For ease-of-testing reasons, blockIPTables blocks create and list, but not delete
  94. if mfc.mfe.block && op != opDeleteChain {
  95. mfc.mfe.wasBlocked = true
  96. return []byte{}, exec.CodeExitError{Code: 4, Err: fmt.Errorf("could not get xtables.lock, etc")}
  97. }
  98. switch op {
  99. case opCreateChain:
  100. if !table.Has(chainName) {
  101. table.Insert(chainName)
  102. }
  103. return []byte{}, nil
  104. case opListChain:
  105. if table.Has(chainName) {
  106. return []byte{}, nil
  107. }
  108. return []byte{}, fmt.Errorf("no such chain %q", chainName)
  109. case opDeleteChain:
  110. table.Delete(chainName)
  111. return []byte{}, nil
  112. default:
  113. panic("should not be reached")
  114. }
  115. }
  116. func (mfc *monitorFakeCmd) SetStdin(in io.Reader) {
  117. // Used by getIPTablesRestoreVersionString(), can be ignored
  118. }
  119. func (mfc *monitorFakeCmd) Run() error {
  120. panic("should not be reached")
  121. }
  122. func (mfc *monitorFakeCmd) Output() ([]byte, error) {
  123. panic("should not be reached")
  124. }
  125. func (mfc *monitorFakeCmd) SetDir(dir string) {
  126. panic("should not be reached")
  127. }
  128. func (mfc *monitorFakeCmd) SetStdout(out io.Writer) {
  129. panic("should not be reached")
  130. }
  131. func (mfc *monitorFakeCmd) SetStderr(out io.Writer) {
  132. panic("should not be reached")
  133. }
  134. func (mfc *monitorFakeCmd) SetEnv(env []string) {
  135. panic("should not be reached")
  136. }
  137. func (mfc *monitorFakeCmd) StdoutPipe() (io.ReadCloser, error) {
  138. panic("should not be reached")
  139. }
  140. func (mfc *monitorFakeCmd) StderrPipe() (io.ReadCloser, error) {
  141. panic("should not be reached")
  142. }
  143. func (mfc *monitorFakeCmd) Start() error {
  144. panic("should not be reached")
  145. }
  146. func (mfc *monitorFakeCmd) Wait() error {
  147. panic("should not be reached")
  148. }
  149. func (mfc *monitorFakeCmd) Stop() {
  150. panic("should not be reached")
  151. }
  152. func TestIPTablesMonitor(t *testing.T) {
  153. mfe := newMonitorFakeExec()
  154. ipt := New(mfe, ProtocolIpv4)
  155. var reloads uint32
  156. stopCh := make(chan struct{})
  157. canary := Chain("MONITOR-TEST-CANARY")
  158. tables := []Table{TableMangle, TableFilter, TableNAT}
  159. go ipt.Monitor(canary, tables, func() {
  160. if !ensureNoChains(mfe) {
  161. t.Errorf("reload called while canaries still exist")
  162. }
  163. atomic.AddUint32(&reloads, 1)
  164. }, 100*time.Millisecond, stopCh)
  165. // Monitor should create canary chains quickly
  166. if err := waitForChains(mfe, canary, tables); err != nil {
  167. t.Errorf("failed to create iptables canaries: %v", err)
  168. }
  169. if err := waitForReloads(&reloads, 0); err != nil {
  170. t.Errorf("got unexpected reloads: %v", err)
  171. }
  172. // If we delete all of the chains, it should reload
  173. ipt.DeleteChain(TableMangle, canary)
  174. ipt.DeleteChain(TableFilter, canary)
  175. ipt.DeleteChain(TableNAT, canary)
  176. if err := waitForReloads(&reloads, 1); err != nil {
  177. t.Errorf("got unexpected number of reloads after flush: %v", err)
  178. }
  179. if err := waitForChains(mfe, canary, tables); err != nil {
  180. t.Errorf("failed to create iptables canaries: %v", err)
  181. }
  182. // If we delete two chains, it should not reload yet
  183. ipt.DeleteChain(TableMangle, canary)
  184. ipt.DeleteChain(TableFilter, canary)
  185. if err := waitForNoReload(&reloads, 1); err != nil {
  186. t.Errorf("got unexpected number of reloads after partial flush: %v", err)
  187. }
  188. // Now ensure that "iptables -L" will get an error about the xtables.lock, and
  189. // delete the last chain. The monitor should not reload, because it can't actually
  190. // tell if the chain was deleted or not.
  191. mfe.blockIPTables(true)
  192. ipt.DeleteChain(TableNAT, canary)
  193. if err := waitForBlocked(mfe); err != nil {
  194. t.Errorf("failed waiting for monitor to be blocked from monitoring: %v", err)
  195. }
  196. // After unblocking the monitor, it should now reload
  197. mfe.blockIPTables(false)
  198. if err := waitForReloads(&reloads, 2); err != nil {
  199. t.Errorf("got unexpected number of reloads after slow flush: %v", err)
  200. }
  201. if err := waitForChains(mfe, canary, tables); err != nil {
  202. t.Errorf("failed to create iptables canaries: %v", err)
  203. }
  204. // If we close the stop channel, it should stop running
  205. close(stopCh)
  206. if err := waitForNoReload(&reloads, 2); err != nil {
  207. t.Errorf("got unexpected number of reloads after stop: %v", err)
  208. }
  209. if !ensureNoChains(mfe) {
  210. t.Errorf("canaries still exist after stopping monitor")
  211. }
  212. // If we create a new monitor while the iptables lock is held, it will
  213. // retry creating canaries until it succeeds
  214. stopCh = make(chan struct{})
  215. _ = mfe.getWasBlocked()
  216. mfe.blockIPTables(true)
  217. go ipt.Monitor(canary, tables, func() {
  218. if !ensureNoChains(mfe) {
  219. t.Errorf("reload called while canaries still exist")
  220. }
  221. atomic.AddUint32(&reloads, 1)
  222. }, 100*time.Millisecond, stopCh)
  223. // Monitor should not have created canaries yet
  224. if !ensureNoChains(mfe) {
  225. t.Errorf("canary created while iptables blocked")
  226. }
  227. if err := waitForBlocked(mfe); err != nil {
  228. t.Errorf("failed waiting for monitor to fail creating canaries: %v", err)
  229. }
  230. mfe.blockIPTables(false)
  231. if err := waitForChains(mfe, canary, tables); err != nil {
  232. t.Errorf("failed to create iptables canaries: %v", err)
  233. }
  234. close(stopCh)
  235. }
  236. func waitForChains(mfe *monitorFakeExec, canary Chain, tables []Table) error {
  237. return utilwait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) {
  238. mfe.Lock()
  239. defer mfe.Unlock()
  240. for _, table := range tables {
  241. if !mfe.tables[string(table)].Has(string(canary)) {
  242. return false, nil
  243. }
  244. }
  245. return true, nil
  246. })
  247. }
  248. func ensureNoChains(mfe *monitorFakeExec) bool {
  249. mfe.Lock()
  250. defer mfe.Unlock()
  251. return mfe.tables["mangle"].Len() == 0 &&
  252. mfe.tables["filter"].Len() == 0 &&
  253. mfe.tables["nat"].Len() == 0
  254. }
  255. func waitForReloads(reloads *uint32, expected uint32) error {
  256. if atomic.LoadUint32(reloads) < expected {
  257. utilwait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) {
  258. return atomic.LoadUint32(reloads) >= expected, nil
  259. })
  260. }
  261. got := atomic.LoadUint32(reloads)
  262. if got != expected {
  263. return fmt.Errorf("expected %d, got %d", expected, got)
  264. }
  265. return nil
  266. }
  267. func waitForNoReload(reloads *uint32, expected uint32) error {
  268. utilwait.PollImmediate(50*time.Millisecond, 250*time.Millisecond, func() (bool, error) {
  269. return atomic.LoadUint32(reloads) > expected, nil
  270. })
  271. got := atomic.LoadUint32(reloads)
  272. if got != expected {
  273. return fmt.Errorf("expected %d, got %d", expected, got)
  274. }
  275. return nil
  276. }
  277. func waitForBlocked(mfe *monitorFakeExec) error {
  278. return utilwait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) {
  279. blocked := mfe.getWasBlocked()
  280. return blocked, nil
  281. })
  282. }