plugins.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package network
  14. import (
  15. "fmt"
  16. "net"
  17. "strings"
  18. "sync"
  19. "time"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. utilerrors "k8s.io/apimachinery/pkg/util/errors"
  22. utilsets "k8s.io/apimachinery/pkg/util/sets"
  23. "k8s.io/apimachinery/pkg/util/validation"
  24. "k8s.io/klog"
  25. kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
  26. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  27. "k8s.io/kubernetes/pkg/kubelet/dockershim/network/hostport"
  28. "k8s.io/kubernetes/pkg/kubelet/dockershim/network/metrics"
  29. utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
  30. utilexec "k8s.io/utils/exec"
  31. )
  32. const (
  33. DefaultPluginName = "kubernetes.io/no-op"
  34. // Called when the node's Pod CIDR is known when using the
  35. // controller manager's --allocate-node-cidrs=true option
  36. NET_PLUGIN_EVENT_POD_CIDR_CHANGE = "pod-cidr-change"
  37. NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR = "pod-cidr"
  38. )
  39. // NetworkPlugin is an interface to network plugins for the kubelet
  40. type NetworkPlugin interface {
  41. // Init initializes the plugin. This will be called exactly once
  42. // before any other methods are called.
  43. Init(host Host, hairpinMode kubeletconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) error
  44. // Called on various events like:
  45. // NET_PLUGIN_EVENT_POD_CIDR_CHANGE
  46. Event(name string, details map[string]interface{})
  47. // Name returns the plugin's name. This will be used when searching
  48. // for a plugin by name, e.g.
  49. Name() string
  50. // Returns a set of NET_PLUGIN_CAPABILITY_*
  51. Capabilities() utilsets.Int
  52. // SetUpPod is the method called after the infra container of
  53. // the pod has been created but before the other containers of the
  54. // pod are launched.
  55. SetUpPod(namespace string, name string, podSandboxID kubecontainer.ContainerID, annotations, options map[string]string) error
  56. // TearDownPod is the method called before a pod's infra container will be deleted
  57. TearDownPod(namespace string, name string, podSandboxID kubecontainer.ContainerID) error
  58. // GetPodNetworkStatus is the method called to obtain the ipv4 or ipv6 addresses of the container
  59. GetPodNetworkStatus(namespace string, name string, podSandboxID kubecontainer.ContainerID) (*PodNetworkStatus, error)
  60. // Status returns error if the network plugin is in error state
  61. Status() error
  62. }
  63. // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
  64. // PodNetworkStatus stores the network status of a pod (currently just the primary IP address)
  65. // This struct represents version "v1beta1"
  66. type PodNetworkStatus struct {
  67. metav1.TypeMeta `json:",inline"`
  68. // IP is the primary ipv4/ipv6 address of the pod. Among other things it is the address that -
  69. // - kube expects to be reachable across the cluster
  70. // - service endpoints are constructed with
  71. // - will be reported in the PodStatus.PodIP field (will override the IP reported by docker)
  72. IP net.IP `json:"ip" description:"Primary IP address of the pod"`
  73. }
  74. // Host is an interface that plugins can use to access the kubelet.
  75. // TODO(#35457): get rid of this backchannel to the kubelet. The scope of
  76. // the back channel is restricted to host-ports/testing, and restricted
  77. // to kubenet. No other network plugin wrapper needs it. Other plugins
  78. // only require a way to access namespace information and port mapping
  79. // information , which they can do directly through the embedded interfaces.
  80. type Host interface {
  81. // NamespaceGetter is a getter for sandbox namespace information.
  82. NamespaceGetter
  83. // PortMappingGetter is a getter for sandbox port mapping information.
  84. PortMappingGetter
  85. }
  86. // NamespaceGetter is an interface to retrieve namespace information for a given
  87. // podSandboxID. Typically implemented by runtime shims that are closely coupled to
  88. // CNI plugin wrappers like kubenet.
  89. type NamespaceGetter interface {
  90. // GetNetNS returns network namespace information for the given containerID.
  91. // Runtimes should *never* return an empty namespace and nil error for
  92. // a container; if error is nil then the namespace string must be valid.
  93. GetNetNS(containerID string) (string, error)
  94. }
  95. // PortMappingGetter is an interface to retrieve port mapping information for a given
  96. // podSandboxID. Typically implemented by runtime shims that are closely coupled to
  97. // CNI plugin wrappers like kubenet.
  98. type PortMappingGetter interface {
  99. // GetPodPortMappings returns sandbox port mappings information.
  100. GetPodPortMappings(containerID string) ([]*hostport.PortMapping, error)
  101. }
  102. // InitNetworkPlugin inits the plugin that matches networkPluginName. Plugins must have unique names.
  103. func InitNetworkPlugin(plugins []NetworkPlugin, networkPluginName string, host Host, hairpinMode kubeletconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) (NetworkPlugin, error) {
  104. if networkPluginName == "" {
  105. // default to the no_op plugin
  106. plug := &NoopNetworkPlugin{}
  107. plug.Sysctl = utilsysctl.New()
  108. if err := plug.Init(host, hairpinMode, nonMasqueradeCIDR, mtu); err != nil {
  109. return nil, err
  110. }
  111. return plug, nil
  112. }
  113. pluginMap := map[string]NetworkPlugin{}
  114. allErrs := []error{}
  115. for _, plugin := range plugins {
  116. name := plugin.Name()
  117. if errs := validation.IsQualifiedName(name); len(errs) != 0 {
  118. allErrs = append(allErrs, fmt.Errorf("network plugin has invalid name: %q: %s", name, strings.Join(errs, ";")))
  119. continue
  120. }
  121. if _, found := pluginMap[name]; found {
  122. allErrs = append(allErrs, fmt.Errorf("network plugin %q was registered more than once", name))
  123. continue
  124. }
  125. pluginMap[name] = plugin
  126. }
  127. chosenPlugin := pluginMap[networkPluginName]
  128. if chosenPlugin != nil {
  129. err := chosenPlugin.Init(host, hairpinMode, nonMasqueradeCIDR, mtu)
  130. if err != nil {
  131. allErrs = append(allErrs, fmt.Errorf("Network plugin %q failed init: %v", networkPluginName, err))
  132. } else {
  133. klog.V(1).Infof("Loaded network plugin %q", networkPluginName)
  134. }
  135. } else {
  136. allErrs = append(allErrs, fmt.Errorf("Network plugin %q not found.", networkPluginName))
  137. }
  138. return chosenPlugin, utilerrors.NewAggregate(allErrs)
  139. }
  140. type NoopNetworkPlugin struct {
  141. Sysctl utilsysctl.Interface
  142. }
  143. const sysctlBridgeCallIPTables = "net/bridge/bridge-nf-call-iptables"
  144. const sysctlBridgeCallIP6Tables = "net/bridge/bridge-nf-call-ip6tables"
  145. func (plugin *NoopNetworkPlugin) Init(host Host, hairpinMode kubeletconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) error {
  146. // Set bridge-nf-call-iptables=1 to maintain compatibility with older
  147. // kubernetes versions to ensure the iptables-based kube proxy functions
  148. // correctly. Other plugins are responsible for setting this correctly
  149. // depending on whether or not they connect containers to Linux bridges
  150. // or use some other mechanism (ie, SDN vswitch).
  151. // Ensure the netfilter module is loaded on kernel >= 3.18; previously
  152. // it was built-in.
  153. utilexec.New().Command("modprobe", "br-netfilter").CombinedOutput()
  154. if err := plugin.Sysctl.SetSysctl(sysctlBridgeCallIPTables, 1); err != nil {
  155. klog.Warningf("can't set sysctl %s: %v", sysctlBridgeCallIPTables, err)
  156. }
  157. if val, err := plugin.Sysctl.GetSysctl(sysctlBridgeCallIP6Tables); err == nil {
  158. if val != 1 {
  159. if err = plugin.Sysctl.SetSysctl(sysctlBridgeCallIP6Tables, 1); err != nil {
  160. klog.Warningf("can't set sysctl %s: %v", sysctlBridgeCallIP6Tables, err)
  161. }
  162. }
  163. }
  164. return nil
  165. }
  166. func (plugin *NoopNetworkPlugin) Event(name string, details map[string]interface{}) {
  167. }
  168. func (plugin *NoopNetworkPlugin) Name() string {
  169. return DefaultPluginName
  170. }
  171. func (plugin *NoopNetworkPlugin) Capabilities() utilsets.Int {
  172. return utilsets.NewInt()
  173. }
  174. func (plugin *NoopNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations, options map[string]string) error {
  175. return nil
  176. }
  177. func (plugin *NoopNetworkPlugin) TearDownPod(namespace string, name string, id kubecontainer.ContainerID) error {
  178. return nil
  179. }
  180. func (plugin *NoopNetworkPlugin) GetPodNetworkStatus(namespace string, name string, id kubecontainer.ContainerID) (*PodNetworkStatus, error) {
  181. return nil, nil
  182. }
  183. func (plugin *NoopNetworkPlugin) Status() error {
  184. return nil
  185. }
  186. func getOnePodIP(execer utilexec.Interface, nsenterPath, netnsPath, interfaceName, addrType string) (net.IP, error) {
  187. // Try to retrieve ip inside container network namespace
  188. output, err := execer.Command(nsenterPath, fmt.Sprintf("--net=%s", netnsPath), "-F", "--",
  189. "ip", "-o", addrType, "addr", "show", "dev", interfaceName, "scope", "global").CombinedOutput()
  190. if err != nil {
  191. return nil, fmt.Errorf("Unexpected command output %s with error: %v", output, err)
  192. }
  193. lines := strings.Split(string(output), "\n")
  194. if len(lines) < 1 {
  195. return nil, fmt.Errorf("Unexpected command output %s", output)
  196. }
  197. fields := strings.Fields(lines[0])
  198. if len(fields) < 4 {
  199. return nil, fmt.Errorf("Unexpected address output %s ", lines[0])
  200. }
  201. ip, _, err := net.ParseCIDR(fields[3])
  202. if err != nil {
  203. return nil, fmt.Errorf("CNI failed to parse ip from output %s due to %v", output, err)
  204. }
  205. return ip, nil
  206. }
  207. // GetPodIP gets the IP of the pod by inspecting the network info inside the pod's network namespace.
  208. func GetPodIP(execer utilexec.Interface, nsenterPath, netnsPath, interfaceName string) (net.IP, error) {
  209. ip, err := getOnePodIP(execer, nsenterPath, netnsPath, interfaceName, "-4")
  210. if err != nil {
  211. // Fall back to IPv6 address if no IPv4 address is present
  212. ip, err = getOnePodIP(execer, nsenterPath, netnsPath, interfaceName, "-6")
  213. }
  214. if err != nil {
  215. return nil, err
  216. }
  217. return ip, nil
  218. }
  219. type NoopPortMappingGetter struct{}
  220. func (*NoopPortMappingGetter) GetPodPortMappings(containerID string) ([]*hostport.PortMapping, error) {
  221. return nil, nil
  222. }
  223. // The PluginManager wraps a kubelet network plugin and provides synchronization
  224. // for a given pod's network operations. Each pod's setup/teardown/status operations
  225. // are synchronized against each other, but network operations of other pods can
  226. // proceed in parallel.
  227. type PluginManager struct {
  228. // Network plugin being wrapped
  229. plugin NetworkPlugin
  230. // Pod list and lock
  231. podsLock sync.Mutex
  232. pods map[string]*podLock
  233. }
  234. func NewPluginManager(plugin NetworkPlugin) *PluginManager {
  235. metrics.Register()
  236. return &PluginManager{
  237. plugin: plugin,
  238. pods: make(map[string]*podLock),
  239. }
  240. }
  241. func (pm *PluginManager) PluginName() string {
  242. return pm.plugin.Name()
  243. }
  244. func (pm *PluginManager) Event(name string, details map[string]interface{}) {
  245. pm.plugin.Event(name, details)
  246. }
  247. func (pm *PluginManager) Status() error {
  248. return pm.plugin.Status()
  249. }
  250. type podLock struct {
  251. // Count of in-flight operations for this pod; when this reaches zero
  252. // the lock can be removed from the pod map
  253. refcount uint
  254. // Lock to synchronize operations for this specific pod
  255. mu sync.Mutex
  256. }
  257. // Lock network operations for a specific pod. If that pod is not yet in
  258. // the pod map, it will be added. The reference count for the pod will
  259. // be increased.
  260. func (pm *PluginManager) podLock(fullPodName string) *sync.Mutex {
  261. pm.podsLock.Lock()
  262. defer pm.podsLock.Unlock()
  263. lock, ok := pm.pods[fullPodName]
  264. if !ok {
  265. lock = &podLock{}
  266. pm.pods[fullPodName] = lock
  267. }
  268. lock.refcount++
  269. return &lock.mu
  270. }
  271. // Unlock network operations for a specific pod. The reference count for the
  272. // pod will be decreased. If the reference count reaches zero, the pod will be
  273. // removed from the pod map.
  274. func (pm *PluginManager) podUnlock(fullPodName string) {
  275. pm.podsLock.Lock()
  276. defer pm.podsLock.Unlock()
  277. lock, ok := pm.pods[fullPodName]
  278. if !ok {
  279. klog.Warningf("Unbalanced pod lock unref for %s", fullPodName)
  280. return
  281. } else if lock.refcount == 0 {
  282. // This should never ever happen, but handle it anyway
  283. delete(pm.pods, fullPodName)
  284. klog.Warningf("Pod lock for %s still in map with zero refcount", fullPodName)
  285. return
  286. }
  287. lock.refcount--
  288. lock.mu.Unlock()
  289. if lock.refcount == 0 {
  290. delete(pm.pods, fullPodName)
  291. }
  292. }
  293. // recordOperation records operation and duration
  294. func recordOperation(operation string, start time.Time) {
  295. metrics.NetworkPluginOperationsLatency.WithLabelValues(operation).Observe(metrics.SinceInSeconds(start))
  296. metrics.DeprecatedNetworkPluginOperationsLatency.WithLabelValues(operation).Observe(metrics.SinceInMicroseconds(start))
  297. }
  298. func (pm *PluginManager) GetPodNetworkStatus(podNamespace, podName string, id kubecontainer.ContainerID) (*PodNetworkStatus, error) {
  299. defer recordOperation("get_pod_network_status", time.Now())
  300. fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace)
  301. pm.podLock(fullPodName).Lock()
  302. defer pm.podUnlock(fullPodName)
  303. netStatus, err := pm.plugin.GetPodNetworkStatus(podNamespace, podName, id)
  304. if err != nil {
  305. return nil, fmt.Errorf("NetworkPlugin %s failed on the status hook for pod %q: %v", pm.plugin.Name(), fullPodName, err)
  306. }
  307. return netStatus, nil
  308. }
  309. func (pm *PluginManager) SetUpPod(podNamespace, podName string, id kubecontainer.ContainerID, annotations, options map[string]string) error {
  310. defer recordOperation("set_up_pod", time.Now())
  311. fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace)
  312. pm.podLock(fullPodName).Lock()
  313. defer pm.podUnlock(fullPodName)
  314. klog.V(3).Infof("Calling network plugin %s to set up pod %q", pm.plugin.Name(), fullPodName)
  315. if err := pm.plugin.SetUpPod(podNamespace, podName, id, annotations, options); err != nil {
  316. return fmt.Errorf("NetworkPlugin %s failed to set up pod %q network: %v", pm.plugin.Name(), fullPodName, err)
  317. }
  318. return nil
  319. }
  320. func (pm *PluginManager) TearDownPod(podNamespace, podName string, id kubecontainer.ContainerID) error {
  321. defer recordOperation("tear_down_pod", time.Now())
  322. fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace)
  323. pm.podLock(fullPodName).Lock()
  324. defer pm.podUnlock(fullPodName)
  325. klog.V(3).Infof("Calling network plugin %s to tear down pod %q", pm.plugin.Name(), fullPodName)
  326. if err := pm.plugin.TearDownPod(podNamespace, podName, id); err != nil {
  327. return fmt.Errorf("NetworkPlugin %s failed to teardown pod %q network: %v", pm.plugin.Name(), fullPodName, err)
  328. }
  329. return nil
  330. }