kubenet_linux.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950
  1. // +build linux
  2. /*
  3. Copyright 2014 The Kubernetes Authors.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package kubenet
  15. import (
  16. "context"
  17. "fmt"
  18. "io/ioutil"
  19. "net"
  20. "strings"
  21. "sync"
  22. "time"
  23. "github.com/containernetworking/cni/libcni"
  24. cnitypes "github.com/containernetworking/cni/pkg/types"
  25. cnitypes020 "github.com/containernetworking/cni/pkg/types/020"
  26. "github.com/vishvananda/netlink"
  27. "golang.org/x/sys/unix"
  28. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  29. utilnet "k8s.io/apimachinery/pkg/util/net"
  30. utilsets "k8s.io/apimachinery/pkg/util/sets"
  31. "k8s.io/klog"
  32. kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
  33. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  34. "k8s.io/kubernetes/pkg/kubelet/dockershim/network"
  35. "k8s.io/kubernetes/pkg/kubelet/dockershim/network/hostport"
  36. "k8s.io/kubernetes/pkg/util/bandwidth"
  37. utilebtables "k8s.io/kubernetes/pkg/util/ebtables"
  38. utiliptables "k8s.io/kubernetes/pkg/util/iptables"
  39. utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
  40. utilexec "k8s.io/utils/exec"
  41. utilfeature "k8s.io/apiserver/pkg/util/feature"
  42. kubefeatures "k8s.io/kubernetes/pkg/features"
  43. netutils "k8s.io/utils/net"
  44. )
  45. const (
  46. BridgeName = "cbr0"
  47. DefaultCNIDir = "/opt/cni/bin"
  48. sysctlBridgeCallIPTables = "net/bridge/bridge-nf-call-iptables"
  49. // fallbackMTU is used if an MTU is not specified, and we cannot determine the MTU
  50. fallbackMTU = 1460
  51. // ebtables Chain to store dedup rules
  52. dedupChain = utilebtables.Chain("KUBE-DEDUP")
  53. zeroCIDRv6 = "::/0"
  54. zeroCIDRv4 = "0.0.0.0/0"
  55. NET_CONFIG_TEMPLATE = `{
  56. "cniVersion": "0.1.0",
  57. "name": "kubenet",
  58. "type": "bridge",
  59. "bridge": "%s",
  60. "mtu": %d,
  61. "addIf": "%s",
  62. "isGateway": true,
  63. "ipMasq": false,
  64. "hairpinMode": %t,
  65. "ipam": {
  66. "type": "host-local",
  67. "ranges": [%s],
  68. "routes": [%s]
  69. }
  70. }`
  71. )
  72. // CNI plugins required by kubenet in /opt/cni/bin or user-specified directory
  73. var requiredCNIPlugins = [...]string{"bridge", "host-local", "loopback"}
  74. type kubenetNetworkPlugin struct {
  75. network.NoopNetworkPlugin
  76. host network.Host
  77. netConfig *libcni.NetworkConfig
  78. loConfig *libcni.NetworkConfig
  79. cniConfig libcni.CNI
  80. bandwidthShaper bandwidth.Shaper
  81. mu sync.Mutex //Mutex for protecting podIPs map, netConfig, and shaper initialization
  82. podIPs map[kubecontainer.ContainerID]utilsets.String
  83. mtu int
  84. execer utilexec.Interface
  85. nsenterPath string
  86. hairpinMode kubeletconfig.HairpinMode
  87. // kubenet can use either hostportSyncer and hostportManager to implement hostports
  88. // Currently, if network host supports legacy features, hostportSyncer will be used,
  89. // otherwise, hostportManager will be used.
  90. hostportSyncer hostport.HostportSyncer
  91. hostportSyncerv6 hostport.HostportSyncer
  92. hostportManager hostport.HostPortManager
  93. hostportManagerv6 hostport.HostPortManager
  94. iptables utiliptables.Interface
  95. iptablesv6 utiliptables.Interface
  96. sysctl utilsysctl.Interface
  97. ebtables utilebtables.Interface
  98. // binDirs is passed by kubelet cni-bin-dir parameter.
  99. // kubenet will search for CNI binaries in DefaultCNIDir first, then continue to binDirs.
  100. binDirs []string
  101. nonMasqueradeCIDR string
  102. cacheDir string
  103. podCIDRs []*net.IPNet
  104. }
  105. func NewPlugin(networkPluginDirs []string, cacheDir string) network.NetworkPlugin {
  106. execer := utilexec.New()
  107. iptInterface := utiliptables.New(execer, utiliptables.ProtocolIpv4)
  108. iptInterfacev6 := utiliptables.New(execer, utiliptables.ProtocolIpv6)
  109. return &kubenetNetworkPlugin{
  110. podIPs: make(map[kubecontainer.ContainerID]utilsets.String),
  111. execer: utilexec.New(),
  112. iptables: iptInterface,
  113. iptablesv6: iptInterfacev6,
  114. sysctl: utilsysctl.New(),
  115. binDirs: append([]string{DefaultCNIDir}, networkPluginDirs...),
  116. hostportSyncer: hostport.NewHostportSyncer(iptInterface),
  117. hostportSyncerv6: hostport.NewHostportSyncer(iptInterfacev6),
  118. hostportManager: hostport.NewHostportManager(iptInterface),
  119. hostportManagerv6: hostport.NewHostportManager(iptInterfacev6),
  120. nonMasqueradeCIDR: "10.0.0.0/8",
  121. cacheDir: cacheDir,
  122. podCIDRs: make([]*net.IPNet, 0),
  123. }
  124. }
  125. func (plugin *kubenetNetworkPlugin) Init(host network.Host, hairpinMode kubeletconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) error {
  126. plugin.host = host
  127. plugin.hairpinMode = hairpinMode
  128. plugin.nonMasqueradeCIDR = nonMasqueradeCIDR
  129. plugin.cniConfig = &libcni.CNIConfig{Path: plugin.binDirs}
  130. if mtu == network.UseDefaultMTU {
  131. if link, err := findMinMTU(); err == nil {
  132. plugin.mtu = link.MTU
  133. klog.V(5).Infof("Using interface %s MTU %d as bridge MTU", link.Name, link.MTU)
  134. } else {
  135. plugin.mtu = fallbackMTU
  136. klog.Warningf("Failed to find default bridge MTU, using %d: %v", fallbackMTU, err)
  137. }
  138. } else {
  139. plugin.mtu = mtu
  140. }
  141. // Since this plugin uses a Linux bridge, set bridge-nf-call-iptables=1
  142. // is necessary to ensure kube-proxy functions correctly.
  143. //
  144. // This will return an error on older kernel version (< 3.18) as the module
  145. // was built-in, we simply ignore the error here. A better thing to do is
  146. // to check the kernel version in the future.
  147. plugin.execer.Command("modprobe", "br-netfilter").CombinedOutput()
  148. err := plugin.sysctl.SetSysctl(sysctlBridgeCallIPTables, 1)
  149. if err != nil {
  150. klog.Warningf("can't set sysctl %s: %v", sysctlBridgeCallIPTables, err)
  151. }
  152. plugin.loConfig, err = libcni.ConfFromBytes([]byte(`{
  153. "cniVersion": "0.1.0",
  154. "name": "kubenet-loopback",
  155. "type": "loopback"
  156. }`))
  157. if err != nil {
  158. return fmt.Errorf("failed to generate loopback config: %v", err)
  159. }
  160. plugin.nsenterPath, err = plugin.execer.LookPath("nsenter")
  161. if err != nil {
  162. return fmt.Errorf("failed to find nsenter binary: %v", err)
  163. }
  164. // Need to SNAT outbound traffic from cluster
  165. if err = plugin.ensureMasqRule(); err != nil {
  166. return err
  167. }
  168. return nil
  169. }
  170. // TODO: move thic logic into cni bridge plugin and remove this from kubenet
  171. func (plugin *kubenetNetworkPlugin) ensureMasqRule() error {
  172. if plugin.nonMasqueradeCIDR != zeroCIDRv4 && plugin.nonMasqueradeCIDR != zeroCIDRv6 {
  173. // switch according to target nonMasqueradeCidr ip family
  174. ipt := plugin.iptables
  175. if netutils.IsIPv6CIDRString(plugin.nonMasqueradeCIDR) {
  176. ipt = plugin.iptablesv6
  177. }
  178. if _, err := ipt.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting,
  179. "-m", "comment", "--comment", "kubenet: SNAT for outbound traffic from cluster",
  180. "-m", "addrtype", "!", "--dst-type", "LOCAL",
  181. "!", "-d", plugin.nonMasqueradeCIDR,
  182. "-j", "MASQUERADE"); err != nil {
  183. return fmt.Errorf("failed to ensure that %s chain %s jumps to MASQUERADE: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, err)
  184. }
  185. }
  186. return nil
  187. }
  188. func findMinMTU() (*net.Interface, error) {
  189. intfs, err := net.Interfaces()
  190. if err != nil {
  191. return nil, err
  192. }
  193. mtu := 999999
  194. defIntfIndex := -1
  195. for i, intf := range intfs {
  196. if ((intf.Flags & net.FlagUp) != 0) && (intf.Flags&(net.FlagLoopback|net.FlagPointToPoint) == 0) {
  197. if intf.MTU < mtu {
  198. mtu = intf.MTU
  199. defIntfIndex = i
  200. }
  201. }
  202. }
  203. if mtu >= 999999 || mtu < 576 || defIntfIndex < 0 {
  204. return nil, fmt.Errorf("no suitable interface: %v", BridgeName)
  205. }
  206. return &intfs[defIntfIndex], nil
  207. }
  208. func (plugin *kubenetNetworkPlugin) Event(name string, details map[string]interface{}) {
  209. var err error
  210. if name != network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE {
  211. return
  212. }
  213. plugin.mu.Lock()
  214. defer plugin.mu.Unlock()
  215. podCIDR, ok := details[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR].(string)
  216. if !ok {
  217. klog.Warningf("%s event didn't contain pod CIDR", network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE)
  218. return
  219. }
  220. if plugin.netConfig != nil {
  221. klog.Warningf("Ignoring subsequent pod CIDR update to %s", podCIDR)
  222. return
  223. }
  224. klog.V(4).Infof("kubenet: PodCIDR is set to %q", podCIDR)
  225. podCIDRs := strings.Split(podCIDR, ",")
  226. // reset to one cidr if dual stack is not enabled
  227. if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.IPv6DualStack) && len(podCIDRs) > 1 {
  228. klog.V(2).Infof("This node has multiple pod cidrs assigned and dual stack is not enabled. ignoring all except first cidr")
  229. podCIDRs = podCIDRs[0:1]
  230. }
  231. for idx, currentPodCIDR := range podCIDRs {
  232. _, cidr, err := net.ParseCIDR(currentPodCIDR)
  233. if nil != err {
  234. klog.Warningf("Failed to generate CNI network config with cidr %s at index:%v: %v", currentPodCIDR, idx, err)
  235. return
  236. }
  237. // create list of ips
  238. plugin.podCIDRs = append(plugin.podCIDRs, cidr)
  239. }
  240. //setup hairpinMode
  241. setHairpin := plugin.hairpinMode == kubeletconfig.HairpinVeth
  242. json := fmt.Sprintf(NET_CONFIG_TEMPLATE, BridgeName, plugin.mtu, network.DefaultInterfaceName, setHairpin, plugin.getRangesConfig(), plugin.getRoutesConfig())
  243. klog.V(4).Infof("CNI network config set to %v", json)
  244. plugin.netConfig, err = libcni.ConfFromBytes([]byte(json))
  245. if err != nil {
  246. klog.Warningf("** failed to set up CNI with %v err:%v", json, err)
  247. // just incase it was set by mistake
  248. plugin.netConfig = nil
  249. // we bail out by clearing the *entire* list
  250. // of addresses assigned to cbr0
  251. plugin.clearUnusedBridgeAddresses()
  252. }
  253. }
  254. // clear all address on bridge except those operated on by kubenet
  255. func (plugin *kubenetNetworkPlugin) clearUnusedBridgeAddresses() {
  256. cidrIncluded := func(list []*net.IPNet, check *net.IPNet) bool {
  257. for _, thisNet := range list {
  258. if utilnet.IPNetEqual(thisNet, check) {
  259. return true
  260. }
  261. }
  262. return false
  263. }
  264. bridge, err := netlink.LinkByName(BridgeName)
  265. if err != nil {
  266. return
  267. }
  268. addrs, err := netlink.AddrList(bridge, unix.AF_INET)
  269. if err != nil {
  270. klog.V(2).Infof("attempting to get address for interface: %s failed with err:%v", BridgeName, err)
  271. return
  272. }
  273. for _, addr := range addrs {
  274. if !cidrIncluded(plugin.podCIDRs, addr.IPNet) {
  275. klog.V(2).Infof("Removing old address %s from %s", addr.IPNet.String(), BridgeName)
  276. netlink.AddrDel(bridge, &addr)
  277. }
  278. }
  279. }
  280. func (plugin *kubenetNetworkPlugin) Name() string {
  281. return KubenetPluginName
  282. }
  283. func (plugin *kubenetNetworkPlugin) Capabilities() utilsets.Int {
  284. return utilsets.NewInt()
  285. }
  286. // setup sets up networking through CNI using the given ns/name and sandbox ID.
  287. func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error {
  288. var ipv4, ipv6 net.IP
  289. var podGateways []net.IP
  290. var podCIDRs []net.IPNet
  291. // Disable DAD so we skip the kernel delay on bringing up new interfaces.
  292. if err := plugin.disableContainerDAD(id); err != nil {
  293. klog.V(3).Infof("Failed to disable DAD in container: %v", err)
  294. }
  295. // Bring up container loopback interface
  296. if _, err := plugin.addContainerToNetwork(plugin.loConfig, "lo", namespace, name, id); err != nil {
  297. return err
  298. }
  299. // Hook container up with our bridge
  300. resT, err := plugin.addContainerToNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id)
  301. if err != nil {
  302. return err
  303. }
  304. // Coerce the CNI result version
  305. res, err := cnitypes020.GetResult(resT)
  306. if err != nil {
  307. return fmt.Errorf("unable to understand network config: %v", err)
  308. }
  309. //TODO: v1.16 (khenidak) update NET_CONFIG_TEMPLATE to CNI version 0.3.0 or later so
  310. // that we get multiple IP addresses in the returned Result structure
  311. if res.IP4 != nil {
  312. ipv4 = res.IP4.IP.IP.To4()
  313. podGateways = append(podGateways, res.IP4.Gateway)
  314. podCIDRs = append(podCIDRs, net.IPNet{IP: ipv4.Mask(res.IP4.IP.Mask), Mask: res.IP4.IP.Mask})
  315. }
  316. if res.IP6 != nil {
  317. ipv6 = res.IP6.IP.IP
  318. podGateways = append(podGateways, res.IP6.Gateway)
  319. podCIDRs = append(podCIDRs, net.IPNet{IP: ipv6.Mask(res.IP6.IP.Mask), Mask: res.IP6.IP.Mask})
  320. }
  321. if ipv4 == nil && ipv6 == nil {
  322. return fmt.Errorf("cni didn't report ipv4 ipv6")
  323. }
  324. // Put the container bridge into promiscuous mode to force it to accept hairpin packets.
  325. // TODO: Remove this once the kernel bug (#20096) is fixed.
  326. if plugin.hairpinMode == kubeletconfig.PromiscuousBridge {
  327. link, err := netlink.LinkByName(BridgeName)
  328. if err != nil {
  329. return fmt.Errorf("failed to lookup %q: %v", BridgeName, err)
  330. }
  331. if link.Attrs().Promisc != 1 {
  332. // promiscuous mode is not on, then turn it on.
  333. err := netlink.SetPromiscOn(link)
  334. if err != nil {
  335. return fmt.Errorf("error setting promiscuous mode on %s: %v", BridgeName, err)
  336. }
  337. }
  338. // configure the ebtables rules to eliminate duplicate packets by best effort
  339. plugin.syncEbtablesDedupRules(link.Attrs().HardwareAddr, podCIDRs, podGateways)
  340. }
  341. // add the ip to tracked ips
  342. if ipv4 != nil {
  343. plugin.addPodIP(id, ipv4.String())
  344. }
  345. if ipv6 != nil {
  346. plugin.addPodIP(id, ipv6.String())
  347. }
  348. if err := plugin.addTrafficShaping(id, annotations); err != nil {
  349. return err
  350. }
  351. return plugin.addPortMapping(id, name, namespace)
  352. }
  353. // The first SetUpPod call creates the bridge; get a shaper for the sake of initialization
  354. // TODO: replace with CNI traffic shaper plugin
  355. func (plugin *kubenetNetworkPlugin) addTrafficShaping(id kubecontainer.ContainerID, annotations map[string]string) error {
  356. shaper := plugin.shaper()
  357. ingress, egress, err := bandwidth.ExtractPodBandwidthResources(annotations)
  358. if err != nil {
  359. return fmt.Errorf("error reading pod bandwidth annotations: %v", err)
  360. }
  361. iplist, exists := plugin.getCachedPodIPs(id)
  362. if !exists {
  363. return fmt.Errorf("pod %s does not have recorded ips", id)
  364. }
  365. if egress != nil || ingress != nil {
  366. for _, ip := range iplist {
  367. mask := 32
  368. if netutils.IsIPv6String(ip) {
  369. mask = 128
  370. }
  371. if err != nil {
  372. return fmt.Errorf("failed to setup traffic shaping for pod ip%s", ip)
  373. }
  374. if err := shaper.ReconcileCIDR(fmt.Sprintf("%v/%v", ip, mask), egress, ingress); err != nil {
  375. return fmt.Errorf("failed to add pod to shaper: %v", err)
  376. }
  377. }
  378. }
  379. return nil
  380. }
  381. // TODO: replace with CNI port-forwarding plugin
  382. func (plugin *kubenetNetworkPlugin) addPortMapping(id kubecontainer.ContainerID, name, namespace string) error {
  383. portMappings, err := plugin.host.GetPodPortMappings(id.ID)
  384. if err != nil {
  385. return err
  386. }
  387. if len(portMappings) == 0 {
  388. return nil
  389. }
  390. iplist, exists := plugin.getCachedPodIPs(id)
  391. if !exists {
  392. return fmt.Errorf("pod %s does not have recorded ips", id)
  393. }
  394. for _, ip := range iplist {
  395. pm := &hostport.PodPortMapping{
  396. Namespace: namespace,
  397. Name: name,
  398. PortMappings: portMappings,
  399. IP: net.ParseIP(ip),
  400. HostNetwork: false,
  401. }
  402. if netutils.IsIPv6(pm.IP) {
  403. if err := plugin.hostportManagerv6.Add(id.ID, pm, BridgeName); err != nil {
  404. return err
  405. }
  406. } else {
  407. if err := plugin.hostportManager.Add(id.ID, pm, BridgeName); err != nil {
  408. return err
  409. }
  410. }
  411. }
  412. return nil
  413. }
  414. func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations, options map[string]string) error {
  415. start := time.Now()
  416. if err := plugin.Status(); err != nil {
  417. return fmt.Errorf("kubenet cannot SetUpPod: %v", err)
  418. }
  419. defer func() {
  420. klog.V(4).Infof("SetUpPod took %v for %s/%s", time.Since(start), namespace, name)
  421. }()
  422. if err := plugin.setup(namespace, name, id, annotations); err != nil {
  423. if err := plugin.teardown(namespace, name, id); err != nil {
  424. // Not a hard error or warning
  425. klog.V(4).Infof("Failed to clean up %s/%s after SetUpPod failure: %v", namespace, name, err)
  426. }
  427. return err
  428. }
  429. // Need to SNAT outbound traffic from cluster
  430. if err := plugin.ensureMasqRule(); err != nil {
  431. klog.Errorf("Failed to ensure MASQ rule: %v", err)
  432. }
  433. return nil
  434. }
  435. // Tears down as much of a pod's network as it can even if errors occur. Returns
  436. // an aggregate error composed of all errors encountered during the teardown.
  437. func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id kubecontainer.ContainerID) error {
  438. errList := []error{}
  439. // Loopback network deletion failure should not be fatal on teardown
  440. if err := plugin.delContainerFromNetwork(plugin.loConfig, "lo", namespace, name, id); err != nil {
  441. klog.Warningf("Failed to delete loopback network: %v", err)
  442. errList = append(errList, err)
  443. }
  444. // no ip dependent actions
  445. if err := plugin.delContainerFromNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id); err != nil {
  446. klog.Warningf("Failed to delete %q network: %v", network.DefaultInterfaceName, err)
  447. errList = append(errList, err)
  448. }
  449. // If there are no IPs registered we can't teardown pod's IP dependencies
  450. iplist, exists := plugin.getCachedPodIPs(id)
  451. if !exists || len(iplist) == 0 {
  452. klog.V(5).Infof("container %s (%s/%s) does not have recorded. ignoring teardown call", id, name, namespace)
  453. return nil
  454. }
  455. // get the list of port mappings
  456. portMappings, err := plugin.host.GetPodPortMappings(id.ID)
  457. if err != nil {
  458. errList = append(errList, err)
  459. }
  460. // process each pod IP
  461. for _, ip := range iplist {
  462. isV6 := netutils.IsIPv6String(ip)
  463. klog.V(5).Infof("Removing pod port mappings from IP %s", ip)
  464. if portMappings != nil && len(portMappings) > 0 {
  465. if isV6 {
  466. if err = plugin.hostportManagerv6.Remove(id.ID, &hostport.PodPortMapping{
  467. Namespace: namespace,
  468. Name: name,
  469. PortMappings: portMappings,
  470. HostNetwork: false,
  471. }); err != nil {
  472. errList = append(errList, err)
  473. }
  474. } else {
  475. if err = plugin.hostportManager.Remove(id.ID, &hostport.PodPortMapping{
  476. Namespace: namespace,
  477. Name: name,
  478. PortMappings: portMappings,
  479. HostNetwork: false,
  480. }); err != nil {
  481. errList = append(errList, err)
  482. }
  483. }
  484. }
  485. klog.V(5).Infof("Removing pod IP %s from shaper for (%s/%s)", ip, name, namespace)
  486. // shaper uses a cidr, but we are using a single IP.
  487. mask := "32"
  488. if isV6 {
  489. mask = "128"
  490. }
  491. if err := plugin.shaper().Reset(fmt.Sprintf("%s/%s", ip, mask)); err != nil {
  492. // Possible bandwidth shaping wasn't enabled for this pod anyways
  493. klog.V(4).Infof("Failed to remove pod IP %s from shaper: %v", ip, err)
  494. }
  495. plugin.removePodIP(id, ip)
  496. }
  497. return utilerrors.NewAggregate(errList)
  498. }
  499. func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, id kubecontainer.ContainerID) error {
  500. start := time.Now()
  501. defer func() {
  502. klog.V(4).Infof("TearDownPod took %v for %s/%s", time.Since(start), namespace, name)
  503. }()
  504. if plugin.netConfig == nil {
  505. return fmt.Errorf("kubenet needs a PodCIDR to tear down pods")
  506. }
  507. if err := plugin.teardown(namespace, name, id); err != nil {
  508. return err
  509. }
  510. // Need to SNAT outbound traffic from cluster
  511. if err := plugin.ensureMasqRule(); err != nil {
  512. klog.Errorf("Failed to ensure MASQ rule: %v", err)
  513. }
  514. return nil
  515. }
  516. // TODO: Use the addToNetwork function to obtain the IP of the Pod. That will assume idempotent ADD call to the plugin.
  517. // Also fix the runtime's call to Status function to be done only in the case that the IP is lost, no need to do periodic calls
  518. func (plugin *kubenetNetworkPlugin) GetPodNetworkStatus(namespace string, name string, id kubecontainer.ContainerID) (*network.PodNetworkStatus, error) {
  519. // try cached version
  520. networkStatus := plugin.getNetworkStatus(id)
  521. if networkStatus != nil {
  522. return networkStatus, nil
  523. }
  524. // not a cached version, get via network ns
  525. netnsPath, err := plugin.host.GetNetNS(id.ID)
  526. if err != nil {
  527. return nil, fmt.Errorf("kubenet failed to retrieve network namespace path: %v", err)
  528. }
  529. if netnsPath == "" {
  530. return nil, fmt.Errorf("cannot find the network namespace, skipping pod network status for container %q", id)
  531. }
  532. ips, err := network.GetPodIPs(plugin.execer, plugin.nsenterPath, netnsPath, network.DefaultInterfaceName)
  533. if err != nil {
  534. return nil, err
  535. }
  536. // cache the ips
  537. for _, ip := range ips {
  538. plugin.addPodIP(id, ip.String())
  539. }
  540. // return from cached
  541. return plugin.getNetworkStatus(id), nil
  542. }
  543. // returns networkstatus
  544. func (plugin *kubenetNetworkPlugin) getNetworkStatus(id kubecontainer.ContainerID) *network.PodNetworkStatus {
  545. // Assuming the ip of pod does not change. Try to retrieve ip from kubenet map first.
  546. iplist, ok := plugin.getCachedPodIPs(id)
  547. if !ok {
  548. return nil
  549. }
  550. // sort making v4 first
  551. // TODO: (khenidak) IPv6 beta stage.
  552. // This - forced sort - could be avoided by checking which cidr that an IP belongs
  553. // to, then placing the IP according to cidr index. But before doing that. Check how IP is collected
  554. // across all of kubelet code (against cni and cri).
  555. ips := make([]net.IP, 0)
  556. for _, ip := range iplist {
  557. isV6 := netutils.IsIPv6String(ip)
  558. if !isV6 {
  559. ips = append([]net.IP{net.ParseIP(ip)}, ips...)
  560. } else {
  561. ips = append(ips, net.ParseIP(ip))
  562. }
  563. }
  564. return &network.PodNetworkStatus{
  565. IP: ips[0],
  566. IPs: ips,
  567. }
  568. }
  569. func (plugin *kubenetNetworkPlugin) Status() error {
  570. // Can't set up pods if we don't have a PodCIDR yet
  571. if plugin.netConfig == nil {
  572. return fmt.Errorf("kubenet does not have netConfig. This is most likely due to lack of PodCIDR")
  573. }
  574. if !plugin.checkRequiredCNIPlugins() {
  575. return fmt.Errorf("could not locate kubenet required CNI plugins %v at %q", requiredCNIPlugins, plugin.binDirs)
  576. }
  577. return nil
  578. }
  579. // checkRequiredCNIPlugins returns if all kubenet required cni plugins can be found at /opt/cni/bin or user specified NetworkPluginDir.
  580. func (plugin *kubenetNetworkPlugin) checkRequiredCNIPlugins() bool {
  581. for _, dir := range plugin.binDirs {
  582. if plugin.checkRequiredCNIPluginsInOneDir(dir) {
  583. return true
  584. }
  585. }
  586. return false
  587. }
  588. // checkRequiredCNIPluginsInOneDir returns true if all required cni plugins are placed in dir
  589. func (plugin *kubenetNetworkPlugin) checkRequiredCNIPluginsInOneDir(dir string) bool {
  590. files, err := ioutil.ReadDir(dir)
  591. if err != nil {
  592. return false
  593. }
  594. for _, cniPlugin := range requiredCNIPlugins {
  595. found := false
  596. for _, file := range files {
  597. if strings.TrimSpace(file.Name()) == cniPlugin {
  598. found = true
  599. break
  600. }
  601. }
  602. if !found {
  603. return false
  604. }
  605. }
  606. return true
  607. }
  608. func (plugin *kubenetNetworkPlugin) buildCNIRuntimeConf(ifName string, id kubecontainer.ContainerID, needNetNs bool) (*libcni.RuntimeConf, error) {
  609. netnsPath, err := plugin.host.GetNetNS(id.ID)
  610. if needNetNs && err != nil {
  611. klog.Errorf("Kubenet failed to retrieve network namespace path: %v", err)
  612. }
  613. return &libcni.RuntimeConf{
  614. ContainerID: id.ID,
  615. NetNS: netnsPath,
  616. IfName: ifName,
  617. CacheDir: plugin.cacheDir,
  618. }, nil
  619. }
  620. func (plugin *kubenetNetworkPlugin) addContainerToNetwork(config *libcni.NetworkConfig, ifName, namespace, name string, id kubecontainer.ContainerID) (cnitypes.Result, error) {
  621. rt, err := plugin.buildCNIRuntimeConf(ifName, id, true)
  622. if err != nil {
  623. return nil, fmt.Errorf("error building CNI config: %v", err)
  624. }
  625. klog.V(3).Infof("Adding %s/%s to '%s' with CNI '%s' plugin and runtime: %+v", namespace, name, config.Network.Name, config.Network.Type, rt)
  626. // Because the default remote runtime request timeout is 4 min,so set slightly less than 240 seconds
  627. // Todo get the timeout from parent ctx
  628. cniTimeoutCtx, cancelFunc := context.WithTimeout(context.Background(), network.CNITimeoutSec*time.Second)
  629. defer cancelFunc()
  630. res, err := plugin.cniConfig.AddNetwork(cniTimeoutCtx, config, rt)
  631. if err != nil {
  632. return nil, fmt.Errorf("error adding container to network: %v", err)
  633. }
  634. return res, nil
  635. }
  636. func (plugin *kubenetNetworkPlugin) delContainerFromNetwork(config *libcni.NetworkConfig, ifName, namespace, name string, id kubecontainer.ContainerID) error {
  637. rt, err := plugin.buildCNIRuntimeConf(ifName, id, false)
  638. if err != nil {
  639. return fmt.Errorf("error building CNI config: %v", err)
  640. }
  641. klog.V(3).Infof("Removing %s/%s from '%s' with CNI '%s' plugin and runtime: %+v", namespace, name, config.Network.Name, config.Network.Type, rt)
  642. // Because the default remote runtime request timeout is 4 min,so set slightly less than 240 seconds
  643. // Todo get the timeout from parent ctx
  644. cniTimeoutCtx, cancelFunc := context.WithTimeout(context.Background(), network.CNITimeoutSec*time.Second)
  645. defer cancelFunc()
  646. err = plugin.cniConfig.DelNetwork(cniTimeoutCtx, config, rt)
  647. // The pod may not get deleted successfully at the first time.
  648. // Ignore "no such file or directory" error in case the network has already been deleted in previous attempts.
  649. if err != nil && !strings.Contains(err.Error(), "no such file or directory") {
  650. return fmt.Errorf("error removing container from network: %v", err)
  651. }
  652. return nil
  653. }
  654. // shaper retrieves the bandwidth shaper and, if it hasn't been fetched before,
  655. // initializes it and ensures the bridge is appropriately configured
  656. func (plugin *kubenetNetworkPlugin) shaper() bandwidth.Shaper {
  657. plugin.mu.Lock()
  658. defer plugin.mu.Unlock()
  659. if plugin.bandwidthShaper == nil {
  660. plugin.bandwidthShaper = bandwidth.NewTCShaper(BridgeName)
  661. plugin.bandwidthShaper.ReconcileInterface()
  662. }
  663. return plugin.bandwidthShaper
  664. }
  665. //TODO: make this into a goroutine and rectify the dedup rules periodically
  666. func (plugin *kubenetNetworkPlugin) syncEbtablesDedupRules(macAddr net.HardwareAddr, podCIDRs []net.IPNet, podGateways []net.IP) {
  667. if plugin.ebtables == nil {
  668. plugin.ebtables = utilebtables.New(plugin.execer)
  669. klog.V(3).Infof("Flushing dedup chain")
  670. if err := plugin.ebtables.FlushChain(utilebtables.TableFilter, dedupChain); err != nil {
  671. klog.Errorf("Failed to flush dedup chain: %v", err)
  672. }
  673. }
  674. _, err := plugin.ebtables.GetVersion()
  675. if err != nil {
  676. klog.Warningf("Failed to get ebtables version. Skip syncing ebtables dedup rules: %v", err)
  677. return
  678. }
  679. // ensure custom chain exists
  680. _, err = plugin.ebtables.EnsureChain(utilebtables.TableFilter, dedupChain)
  681. if err != nil {
  682. klog.Errorf("Failed to ensure %v chain %v", utilebtables.TableFilter, dedupChain)
  683. return
  684. }
  685. // jump to custom chain to the chain from core tables
  686. _, err = plugin.ebtables.EnsureRule(utilebtables.Append, utilebtables.TableFilter, utilebtables.ChainOutput, "-j", string(dedupChain))
  687. if err != nil {
  688. klog.Errorf("Failed to ensure %v chain %v jump to %v chain: %v", utilebtables.TableFilter, utilebtables.ChainOutput, dedupChain, err)
  689. return
  690. }
  691. // per gateway rule
  692. for idx, gw := range podGateways {
  693. klog.V(3).Infof("Filtering packets with ebtables on mac address: %v, gateway: %v, pod CIDR: %v", macAddr.String(), gw.String(), podCIDRs[idx].String())
  694. bIsV6 := netutils.IsIPv6(gw)
  695. IPFamily := "IPv4"
  696. ipSrc := "--ip-src"
  697. if bIsV6 {
  698. IPFamily = "IPv6"
  699. ipSrc = "--ip6-src"
  700. }
  701. commonArgs := []string{"-p", IPFamily, "-s", macAddr.String(), "-o", "veth+"}
  702. _, err = plugin.ebtables.EnsureRule(utilebtables.Prepend, utilebtables.TableFilter, dedupChain, append(commonArgs, ipSrc, gw.String(), "-j", "ACCEPT")...)
  703. if err != nil {
  704. klog.Errorf("Failed to ensure packets from cbr0 gateway:%v to be accepted with error:%v", gw.String(), err)
  705. return
  706. }
  707. _, err = plugin.ebtables.EnsureRule(utilebtables.Append, utilebtables.TableFilter, dedupChain, append(commonArgs, ipSrc, podCIDRs[idx].String(), "-j", "DROP")...)
  708. if err != nil {
  709. klog.Errorf("Failed to ensure packets from podCidr[%v] but has mac address of cbr0 to get dropped. err:%v", podCIDRs[idx].String(), err)
  710. return
  711. }
  712. }
  713. }
  714. // disableContainerDAD disables duplicate address detection in the container.
  715. // DAD has a negative affect on pod creation latency, since we have to wait
  716. // a second or more for the addresses to leave the "tentative" state. Since
  717. // we're sure there won't be an address conflict (since we manage them manually),
  718. // this is safe. See issue 54651.
  719. //
  720. // This sets net.ipv6.conf.default.dad_transmits to 0. It must be run *before*
  721. // the CNI plugins are run.
  722. func (plugin *kubenetNetworkPlugin) disableContainerDAD(id kubecontainer.ContainerID) error {
  723. key := "net/ipv6/conf/default/dad_transmits"
  724. sysctlBin, err := plugin.execer.LookPath("sysctl")
  725. if err != nil {
  726. return fmt.Errorf("could not find sysctl binary: %s", err)
  727. }
  728. netnsPath, err := plugin.host.GetNetNS(id.ID)
  729. if err != nil {
  730. return fmt.Errorf("failed to get netns: %v", err)
  731. }
  732. if netnsPath == "" {
  733. return fmt.Errorf("pod has no network namespace")
  734. }
  735. // If the sysctl doesn't exist, it means ipv6 is disabled; log and move on
  736. if _, err := plugin.sysctl.GetSysctl(key); err != nil {
  737. return fmt.Errorf("ipv6 not enabled: %v", err)
  738. }
  739. output, err := plugin.execer.Command(plugin.nsenterPath,
  740. fmt.Sprintf("--net=%s", netnsPath), "-F", "--",
  741. sysctlBin, "-w", fmt.Sprintf("%s=%s", key, "0"),
  742. ).CombinedOutput()
  743. if err != nil {
  744. return fmt.Errorf("failed to write sysctl: output: %s error: %s",
  745. output, err)
  746. }
  747. return nil
  748. }
  749. // given a n cidrs assigned to nodes,
  750. // create bridge configuration that conforms to them
  751. func (plugin *kubenetNetworkPlugin) getRangesConfig() string {
  752. createRange := func(thisNet *net.IPNet) string {
  753. template := `
  754. [{
  755. "subnet": "%s"
  756. }]`
  757. return fmt.Sprintf(template, thisNet.String())
  758. }
  759. ranges := make([]string, len(plugin.podCIDRs))
  760. for idx, thisCIDR := range plugin.podCIDRs {
  761. ranges[idx] = createRange(thisCIDR)
  762. }
  763. //[{range}], [{range}]
  764. // each range contains a subnet. gateway will be fetched from cni result
  765. return strings.Join(ranges[:], ",")
  766. }
  767. // given a n cidrs assigned to nodes,
  768. // create bridge routes configuration that conforms to them
  769. func (plugin *kubenetNetworkPlugin) getRoutesConfig() string {
  770. var (
  771. routes []string
  772. hasV4, hasV6 bool
  773. )
  774. for _, thisCIDR := range plugin.podCIDRs {
  775. if thisCIDR.IP.To4() != nil {
  776. hasV4 = true
  777. } else {
  778. hasV6 = true
  779. }
  780. }
  781. if hasV4 {
  782. routes = append(routes, fmt.Sprintf(`{"dst": "%s"}`, zeroCIDRv4))
  783. }
  784. if hasV6 {
  785. routes = append(routes, fmt.Sprintf(`{"dst": "%s"}`, zeroCIDRv6))
  786. }
  787. return strings.Join(routes, ",")
  788. }
  789. func (plugin *kubenetNetworkPlugin) addPodIP(id kubecontainer.ContainerID, ip string) {
  790. plugin.mu.Lock()
  791. defer plugin.mu.Unlock()
  792. _, exist := plugin.podIPs[id]
  793. if !exist {
  794. plugin.podIPs[id] = utilsets.NewString()
  795. }
  796. if !plugin.podIPs[id].Has(ip) {
  797. plugin.podIPs[id].Insert(ip)
  798. }
  799. }
  800. func (plugin *kubenetNetworkPlugin) removePodIP(id kubecontainer.ContainerID, ip string) {
  801. plugin.mu.Lock()
  802. defer plugin.mu.Unlock()
  803. _, exist := plugin.podIPs[id]
  804. if !exist {
  805. return // did we restart kubelet?
  806. }
  807. if plugin.podIPs[id].Has(ip) {
  808. plugin.podIPs[id].Delete(ip)
  809. }
  810. // if there is no more ips here. let us delete
  811. if plugin.podIPs[id].Len() == 0 {
  812. delete(plugin.podIPs, id)
  813. }
  814. }
  815. // returns a copy of pod ips
  816. // false is returned if id does not exist
  817. func (plugin *kubenetNetworkPlugin) getCachedPodIPs(id kubecontainer.ContainerID) ([]string, bool) {
  818. plugin.mu.Lock()
  819. defer plugin.mu.Unlock()
  820. iplist, exists := plugin.podIPs[id]
  821. if !exists {
  822. return nil, false
  823. }
  824. return iplist.UnsortedList(), true
  825. }