server.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package app does all of the work necessary to configure and run a
  14. // Kubernetes app process.
  15. package app
  16. import (
  17. "errors"
  18. "fmt"
  19. "io/ioutil"
  20. "net/http"
  21. "os"
  22. goruntime "runtime"
  23. "strings"
  24. "time"
  25. "github.com/fsnotify/fsnotify"
  26. "github.com/spf13/cobra"
  27. "github.com/spf13/pflag"
  28. gerrors "github.com/pkg/errors"
  29. v1 "k8s.io/api/core/v1"
  30. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  31. "k8s.io/apimachinery/pkg/fields"
  32. "k8s.io/apimachinery/pkg/labels"
  33. "k8s.io/apimachinery/pkg/runtime"
  34. "k8s.io/apimachinery/pkg/runtime/serializer"
  35. "k8s.io/apimachinery/pkg/selection"
  36. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  37. "k8s.io/apimachinery/pkg/util/wait"
  38. "k8s.io/apiserver/pkg/server/healthz"
  39. "k8s.io/apiserver/pkg/server/mux"
  40. "k8s.io/apiserver/pkg/server/routes"
  41. utilfeature "k8s.io/apiserver/pkg/util/feature"
  42. "k8s.io/client-go/informers"
  43. clientset "k8s.io/client-go/kubernetes"
  44. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  45. "k8s.io/client-go/rest"
  46. "k8s.io/client-go/tools/clientcmd"
  47. clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
  48. "k8s.io/client-go/tools/record"
  49. cliflag "k8s.io/component-base/cli/flag"
  50. componentbaseconfig "k8s.io/component-base/config"
  51. "k8s.io/component-base/metrics/legacyregistry"
  52. "k8s.io/component-base/version"
  53. "k8s.io/component-base/version/verflag"
  54. "k8s.io/klog"
  55. "k8s.io/kube-proxy/config/v1alpha1"
  56. api "k8s.io/kubernetes/pkg/apis/core"
  57. "k8s.io/kubernetes/pkg/features"
  58. "k8s.io/kubernetes/pkg/kubelet/qos"
  59. "k8s.io/kubernetes/pkg/master/ports"
  60. "k8s.io/kubernetes/pkg/proxy"
  61. "k8s.io/kubernetes/pkg/proxy/apis"
  62. kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
  63. proxyconfigscheme "k8s.io/kubernetes/pkg/proxy/apis/config/scheme"
  64. kubeproxyconfigv1alpha1 "k8s.io/kubernetes/pkg/proxy/apis/config/v1alpha1"
  65. "k8s.io/kubernetes/pkg/proxy/apis/config/validation"
  66. "k8s.io/kubernetes/pkg/proxy/config"
  67. "k8s.io/kubernetes/pkg/proxy/healthcheck"
  68. "k8s.io/kubernetes/pkg/proxy/iptables"
  69. "k8s.io/kubernetes/pkg/proxy/ipvs"
  70. "k8s.io/kubernetes/pkg/proxy/userspace"
  71. proxyutil "k8s.io/kubernetes/pkg/proxy/util"
  72. "k8s.io/kubernetes/pkg/util/configz"
  73. "k8s.io/kubernetes/pkg/util/filesystem"
  74. utilflag "k8s.io/kubernetes/pkg/util/flag"
  75. utilipset "k8s.io/kubernetes/pkg/util/ipset"
  76. utiliptables "k8s.io/kubernetes/pkg/util/iptables"
  77. utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
  78. "k8s.io/kubernetes/pkg/util/oom"
  79. "k8s.io/utils/exec"
  80. utilpointer "k8s.io/utils/pointer"
  81. )
  82. const (
  83. proxyModeUserspace = "userspace"
  84. proxyModeIPTables = "iptables"
  85. proxyModeIPVS = "ipvs"
  86. proxyModeKernelspace = "kernelspace"
  87. )
  88. // proxyRun defines the interface to run a specified ProxyServer
  89. type proxyRun interface {
  90. Run() error
  91. CleanupAndExit() error
  92. }
  93. // Options contains everything necessary to create and run a proxy server.
  94. type Options struct {
  95. // ConfigFile is the location of the proxy server's configuration file.
  96. ConfigFile string
  97. // WriteConfigTo is the path where the default configuration will be written.
  98. WriteConfigTo string
  99. // CleanupAndExit, when true, makes the proxy server clean up iptables and ipvs rules, then exit.
  100. CleanupAndExit bool
  101. // CleanupIPVS, when true, makes the proxy server clean up ipvs rules before running.
  102. CleanupIPVS bool
  103. // WindowsService should be set to true if kube-proxy is running as a service on Windows.
  104. // Its corresponding flag only gets registered in Windows builds
  105. WindowsService bool
  106. // config is the proxy server's configuration object.
  107. config *kubeproxyconfig.KubeProxyConfiguration
  108. // watcher is used to watch on the update change of ConfigFile
  109. watcher filesystem.FSWatcher
  110. // proxyServer is the interface to run the proxy server
  111. proxyServer proxyRun
  112. // errCh is the channel that errors will be sent
  113. errCh chan error
  114. // The fields below here are placeholders for flags that can't be directly mapped into
  115. // config.KubeProxyConfiguration.
  116. //
  117. // TODO remove these fields once the deprecated flags are removed.
  118. // master is used to override the kubeconfig's URL to the apiserver.
  119. master string
  120. // healthzPort is the port to be used by the healthz server.
  121. healthzPort int32
  122. // metricsPort is the port to be used by the metrics server.
  123. metricsPort int32
  124. // hostnameOverride, if set from the command line flag, takes precedence over the `HostnameOverride` value from the config file
  125. hostnameOverride string
  126. }
  127. // AddFlags adds flags to fs and binds them to options.
  128. func (o *Options) AddFlags(fs *pflag.FlagSet) {
  129. o.addOSFlags(fs)
  130. fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file.")
  131. fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the default configuration values to this file and exit.")
  132. fs.StringVar(&o.config.ClientConnection.Kubeconfig, "kubeconfig", o.config.ClientConnection.Kubeconfig, "Path to kubeconfig file with authorization information (the master location is set by the master flag).")
  133. fs.StringVar(&o.config.ClusterCIDR, "cluster-cidr", o.config.ClusterCIDR, "The CIDR range of pods in the cluster. When configured, traffic sent to a Service cluster IP from outside this range will be masqueraded and traffic sent from pods to an external LoadBalancer IP will be directed to the respective cluster IP instead")
  134. fs.StringVar(&o.config.ClientConnection.ContentType, "kube-api-content-type", o.config.ClientConnection.ContentType, "Content type of requests sent to apiserver.")
  135. fs.StringVar(&o.master, "master", o.master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
  136. fs.StringVar(&o.hostnameOverride, "hostname-override", o.hostnameOverride, "If non-empty, will use this string as identification instead of the actual hostname.")
  137. fs.StringVar(&o.config.IPVS.Scheduler, "ipvs-scheduler", o.config.IPVS.Scheduler, "The ipvs scheduler type when proxy mode is ipvs")
  138. fs.StringVar(&o.config.ShowHiddenMetricsForVersion, "show-hidden-metrics-for-version", o.config.ShowHiddenMetricsForVersion,
  139. "The previous version for which you want to show hidden metrics. "+
  140. "Only the previous minor version is meaningful, other values will not be allowed. "+
  141. "The format is <major>.<minor>, e.g.: '1.16'. "+
  142. "The purpose of this format is make sure you have the opportunity to notice if the next release hides additional metrics, "+
  143. "rather than being surprised when they are permanently removed in the release after that.")
  144. fs.StringSliceVar(&o.config.IPVS.ExcludeCIDRs, "ipvs-exclude-cidrs", o.config.IPVS.ExcludeCIDRs, "A comma-separated list of CIDR's which the ipvs proxier should not touch when cleaning up IPVS rules.")
  145. fs.StringSliceVar(&o.config.NodePortAddresses, "nodeport-addresses", o.config.NodePortAddresses,
  146. "A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses.")
  147. fs.BoolVar(&o.CleanupAndExit, "cleanup", o.CleanupAndExit, "If true cleanup iptables and ipvs rules and exit.")
  148. fs.BoolVar(&o.CleanupIPVS, "cleanup-ipvs", o.CleanupIPVS, "If true and --cleanup is specified, kube-proxy will also flush IPVS rules, in addition to normal cleanup.")
  149. fs.MarkDeprecated("cleanup-ipvs", "In a future release, running --cleanup will always flush IPVS rules")
  150. fs.Var(utilflag.IPVar{Val: &o.config.BindAddress}, "bind-address", "The IP address for the proxy server to serve on (set to `0.0.0.0` for all IPv4 interfaces and `::` for all IPv6 interfaces)")
  151. fs.Var(utilflag.IPVar{Val: &o.config.HealthzBindAddress}, "healthz-bind-address", "The IP address for the health check server to serve on (set to `0.0.0.0` for all IPv4 interfaces and `::` for all IPv6 interfaces)")
  152. fs.Var(utilflag.IPVar{Val: &o.config.MetricsBindAddress}, "metrics-bind-address", "The IP address for the metrics server to serve on (set to `0.0.0.0` for all IPv4 interfaces and `::` for all IPv6 interfaces)")
  153. fs.Var(utilflag.PortRangeVar{Val: &o.config.PortRange}, "proxy-port-range", "Range of host ports (beginPort-endPort, single port or beginPort+offset, inclusive) that may be consumed in order to proxy service traffic. If (unspecified, 0, or 0-0) then ports will be randomly chosen.")
  154. fs.Var(&o.config.Mode, "proxy-mode", "Which proxy mode to use: 'userspace' (older) or 'iptables' (faster) or 'ipvs'. If blank, use the best-available proxy (currently iptables). If the iptables proxy is selected, regardless of how, but the system's kernel or iptables versions are insufficient, this always falls back to the userspace proxy.")
  155. fs.Var(cliflag.NewMapStringBool(&o.config.FeatureGates), "feature-gates", "A set of key=value pairs that describe feature gates for alpha/experimental features. "+
  156. "Options are:\n"+strings.Join(utilfeature.DefaultFeatureGate.KnownFeatures(), "\n"))
  157. fs.Int32Var(&o.healthzPort, "healthz-port", o.healthzPort, "The port to bind the health check server. Use 0 to disable.")
  158. fs.Int32Var(&o.metricsPort, "metrics-port", o.metricsPort, "The port to bind the metrics server. Use 0 to disable.")
  159. fs.Int32Var(o.config.OOMScoreAdj, "oom-score-adj", utilpointer.Int32PtrDerefOr(o.config.OOMScoreAdj, int32(qos.KubeProxyOOMScoreAdj)), "The oom-score-adj value for kube-proxy process. Values must be within the range [-1000, 1000]")
  160. fs.Int32Var(o.config.IPTables.MasqueradeBit, "iptables-masquerade-bit", utilpointer.Int32PtrDerefOr(o.config.IPTables.MasqueradeBit, 14), "If using the pure iptables proxy, the bit of the fwmark space to mark packets requiring SNAT with. Must be within the range [0, 31].")
  161. fs.Int32Var(o.config.Conntrack.MaxPerCore, "conntrack-max-per-core", *o.config.Conntrack.MaxPerCore,
  162. "Maximum number of NAT connections to track per CPU core (0 to leave the limit as-is and ignore conntrack-min).")
  163. fs.Int32Var(o.config.Conntrack.Min, "conntrack-min", *o.config.Conntrack.Min,
  164. "Minimum number of conntrack entries to allocate, regardless of conntrack-max-per-core (set conntrack-max-per-core=0 to leave the limit as-is).")
  165. fs.Int32Var(&o.config.ClientConnection.Burst, "kube-api-burst", o.config.ClientConnection.Burst, "Burst to use while talking with kubernetes apiserver")
  166. fs.DurationVar(&o.config.IPTables.SyncPeriod.Duration, "iptables-sync-period", o.config.IPTables.SyncPeriod.Duration, "The maximum interval of how often iptables rules are refreshed (e.g. '5s', '1m', '2h22m'). Must be greater than 0.")
  167. fs.DurationVar(&o.config.IPTables.MinSyncPeriod.Duration, "iptables-min-sync-period", o.config.IPTables.MinSyncPeriod.Duration, "The minimum interval of how often the iptables rules can be refreshed as endpoints and services change (e.g. '5s', '1m', '2h22m').")
  168. fs.DurationVar(&o.config.IPVS.SyncPeriod.Duration, "ipvs-sync-period", o.config.IPVS.SyncPeriod.Duration, "The maximum interval of how often ipvs rules are refreshed (e.g. '5s', '1m', '2h22m'). Must be greater than 0.")
  169. fs.DurationVar(&o.config.IPVS.MinSyncPeriod.Duration, "ipvs-min-sync-period", o.config.IPVS.MinSyncPeriod.Duration, "The minimum interval of how often the ipvs rules can be refreshed as endpoints and services change (e.g. '5s', '1m', '2h22m').")
  170. fs.DurationVar(&o.config.IPVS.TCPTimeout.Duration, "ipvs-tcp-timeout", o.config.IPVS.TCPTimeout.Duration, "The timeout for idle IPVS TCP connections, 0 to leave as-is. (e.g. '5s', '1m', '2h22m').")
  171. fs.DurationVar(&o.config.IPVS.TCPFinTimeout.Duration, "ipvs-tcpfin-timeout", o.config.IPVS.TCPFinTimeout.Duration, "The timeout for IPVS TCP connections after receiving a FIN packet, 0 to leave as-is. (e.g. '5s', '1m', '2h22m').")
  172. fs.DurationVar(&o.config.IPVS.UDPTimeout.Duration, "ipvs-udp-timeout", o.config.IPVS.UDPTimeout.Duration, "The timeout for IPVS UDP packets, 0 to leave as-is. (e.g. '5s', '1m', '2h22m').")
  173. fs.DurationVar(&o.config.Conntrack.TCPEstablishedTimeout.Duration, "conntrack-tcp-timeout-established", o.config.Conntrack.TCPEstablishedTimeout.Duration, "Idle timeout for established TCP connections (0 to leave as-is)")
  174. fs.DurationVar(
  175. &o.config.Conntrack.TCPCloseWaitTimeout.Duration, "conntrack-tcp-timeout-close-wait",
  176. o.config.Conntrack.TCPCloseWaitTimeout.Duration,
  177. "NAT timeout for TCP connections in the CLOSE_WAIT state")
  178. fs.DurationVar(&o.config.ConfigSyncPeriod.Duration, "config-sync-period", o.config.ConfigSyncPeriod.Duration, "How often configuration from the apiserver is refreshed. Must be greater than 0.")
  179. fs.DurationVar(&o.config.UDPIdleTimeout.Duration, "udp-timeout", o.config.UDPIdleTimeout.Duration, "How long an idle UDP connection will be kept open (e.g. '250ms', '2s'). Must be greater than 0. Only applicable for proxy-mode=userspace")
  180. fs.BoolVar(&o.config.IPVS.StrictARP, "ipvs-strict-arp", o.config.IPVS.StrictARP, "Enable strict ARP by setting arp_ignore to 1 and arp_announce to 2")
  181. fs.BoolVar(&o.config.IPTables.MasqueradeAll, "masquerade-all", o.config.IPTables.MasqueradeAll, "If using the pure iptables proxy, SNAT all traffic sent via Service cluster IPs (this not commonly needed)")
  182. fs.BoolVar(&o.config.EnableProfiling, "profiling", o.config.EnableProfiling, "If true enables profiling via web interface on /debug/pprof handler.")
  183. fs.Float32Var(&o.config.ClientConnection.QPS, "kube-api-qps", o.config.ClientConnection.QPS, "QPS to use while talking with kubernetes apiserver")
  184. fs.Var(&o.config.DetectLocalMode, "detect-local-mode", "Mode to use to detect local traffic")
  185. }
  186. // NewOptions returns initialized Options
  187. func NewOptions() *Options {
  188. return &Options{
  189. config: new(kubeproxyconfig.KubeProxyConfiguration),
  190. healthzPort: ports.ProxyHealthzPort,
  191. metricsPort: ports.ProxyStatusPort,
  192. CleanupIPVS: true,
  193. errCh: make(chan error),
  194. }
  195. }
  196. // Complete completes all the required options.
  197. func (o *Options) Complete() error {
  198. if len(o.ConfigFile) == 0 && len(o.WriteConfigTo) == 0 {
  199. klog.Warning("WARNING: all flags other than --config, --write-config-to, and --cleanup are deprecated. Please begin using a config file ASAP.")
  200. o.config.HealthzBindAddress = addressFromDeprecatedFlags(o.config.HealthzBindAddress, o.healthzPort)
  201. o.config.MetricsBindAddress = addressFromDeprecatedFlags(o.config.MetricsBindAddress, o.metricsPort)
  202. }
  203. // Load the config file here in Complete, so that Validate validates the fully-resolved config.
  204. if len(o.ConfigFile) > 0 {
  205. c, err := o.loadConfigFromFile(o.ConfigFile)
  206. if err != nil {
  207. return err
  208. }
  209. o.config = c
  210. if err := o.initWatcher(); err != nil {
  211. return err
  212. }
  213. }
  214. if err := o.processHostnameOverrideFlag(); err != nil {
  215. return err
  216. }
  217. return utilfeature.DefaultMutableFeatureGate.SetFromMap(o.config.FeatureGates)
  218. }
  219. // Creates a new filesystem watcher and adds watches for the config file.
  220. func (o *Options) initWatcher() error {
  221. fswatcher := filesystem.NewFsnotifyWatcher()
  222. err := fswatcher.Init(o.eventHandler, o.errorHandler)
  223. if err != nil {
  224. return err
  225. }
  226. err = fswatcher.AddWatch(o.ConfigFile)
  227. if err != nil {
  228. return err
  229. }
  230. o.watcher = fswatcher
  231. return nil
  232. }
  233. func (o *Options) eventHandler(ent fsnotify.Event) {
  234. eventOpIs := func(Op fsnotify.Op) bool {
  235. return ent.Op&Op == Op
  236. }
  237. if eventOpIs(fsnotify.Write) || eventOpIs(fsnotify.Rename) {
  238. // error out when ConfigFile is updated
  239. o.errCh <- fmt.Errorf("content of the proxy server's configuration file was updated")
  240. return
  241. }
  242. o.errCh <- nil
  243. }
  244. func (o *Options) errorHandler(err error) {
  245. o.errCh <- err
  246. }
  247. // processHostnameOverrideFlag processes hostname-override flag
  248. func (o *Options) processHostnameOverrideFlag() error {
  249. // Check if hostname-override flag is set and use value since configFile always overrides
  250. if len(o.hostnameOverride) > 0 {
  251. hostName := strings.TrimSpace(o.hostnameOverride)
  252. if len(hostName) == 0 {
  253. return fmt.Errorf("empty hostname-override is invalid")
  254. }
  255. o.config.HostnameOverride = strings.ToLower(hostName)
  256. }
  257. return nil
  258. }
  259. // Validate validates all the required options.
  260. func (o *Options) Validate(args []string) error {
  261. if len(args) != 0 {
  262. return errors.New("no arguments are supported")
  263. }
  264. if errs := validation.Validate(o.config); len(errs) != 0 {
  265. return errs.ToAggregate()
  266. }
  267. return nil
  268. }
  269. // Run runs the specified ProxyServer.
  270. func (o *Options) Run() error {
  271. defer close(o.errCh)
  272. if len(o.WriteConfigTo) > 0 {
  273. return o.writeConfigFile()
  274. }
  275. proxyServer, err := NewProxyServer(o)
  276. if err != nil {
  277. return err
  278. }
  279. if o.CleanupAndExit {
  280. return proxyServer.CleanupAndExit()
  281. }
  282. o.proxyServer = proxyServer
  283. return o.runLoop()
  284. }
  285. // runLoop will watch on the update change of the proxy server's configuration file.
  286. // Return an error when updated
  287. func (o *Options) runLoop() error {
  288. if o.watcher != nil {
  289. o.watcher.Run()
  290. }
  291. // run the proxy in goroutine
  292. go func() {
  293. err := o.proxyServer.Run()
  294. o.errCh <- err
  295. }()
  296. for {
  297. err := <-o.errCh
  298. if err != nil {
  299. return err
  300. }
  301. }
  302. }
  303. func (o *Options) writeConfigFile() (err error) {
  304. const mediaType = runtime.ContentTypeYAML
  305. info, ok := runtime.SerializerInfoForMediaType(proxyconfigscheme.Codecs.SupportedMediaTypes(), mediaType)
  306. if !ok {
  307. return fmt.Errorf("unable to locate encoder -- %q is not a supported media type", mediaType)
  308. }
  309. encoder := proxyconfigscheme.Codecs.EncoderForVersion(info.Serializer, v1alpha1.SchemeGroupVersion)
  310. configFile, err := os.Create(o.WriteConfigTo)
  311. if err != nil {
  312. return err
  313. }
  314. defer func() {
  315. ferr := configFile.Close()
  316. if ferr != nil && err == nil {
  317. err = ferr
  318. }
  319. }()
  320. if err = encoder.Encode(o.config, configFile); err != nil {
  321. return err
  322. }
  323. klog.Infof("Wrote configuration to: %s\n", o.WriteConfigTo)
  324. return nil
  325. }
  326. // addressFromDeprecatedFlags returns server address from flags
  327. // passed on the command line based on the following rules:
  328. // 1. If port is 0, disable the server (e.g. set address to empty).
  329. // 2. Otherwise, set the port portion of the config accordingly.
  330. func addressFromDeprecatedFlags(addr string, port int32) string {
  331. if port == 0 {
  332. return ""
  333. }
  334. return proxyutil.AppendPortIfNeeded(addr, port)
  335. }
  336. // newLenientSchemeAndCodecs returns a scheme that has only v1alpha1 registered into
  337. // it and a CodecFactory with strict decoding disabled.
  338. func newLenientSchemeAndCodecs() (*runtime.Scheme, *serializer.CodecFactory, error) {
  339. lenientScheme := runtime.NewScheme()
  340. if err := kubeproxyconfig.AddToScheme(lenientScheme); err != nil {
  341. return nil, nil, fmt.Errorf("failed to add kube-proxy config API to lenient scheme: %v", err)
  342. }
  343. if err := kubeproxyconfigv1alpha1.AddToScheme(lenientScheme); err != nil {
  344. return nil, nil, fmt.Errorf("failed to add kube-proxy config v1alpha1 API to lenient scheme: %v", err)
  345. }
  346. lenientCodecs := serializer.NewCodecFactory(lenientScheme, serializer.DisableStrict)
  347. return lenientScheme, &lenientCodecs, nil
  348. }
  349. // loadConfigFromFile loads the contents of file and decodes it as a
  350. // KubeProxyConfiguration object.
  351. func (o *Options) loadConfigFromFile(file string) (*kubeproxyconfig.KubeProxyConfiguration, error) {
  352. data, err := ioutil.ReadFile(file)
  353. if err != nil {
  354. return nil, err
  355. }
  356. return o.loadConfig(data)
  357. }
  358. // loadConfig decodes a serialized KubeProxyConfiguration to the internal type.
  359. func (o *Options) loadConfig(data []byte) (*kubeproxyconfig.KubeProxyConfiguration, error) {
  360. configObj, gvk, err := proxyconfigscheme.Codecs.UniversalDecoder().Decode(data, nil, nil)
  361. if err != nil {
  362. // Try strict decoding first. If that fails decode with a lenient
  363. // decoder, which has only v1alpha1 registered, and log a warning.
  364. // The lenient path is to be dropped when support for v1alpha1 is dropped.
  365. if !runtime.IsStrictDecodingError(err) {
  366. return nil, gerrors.Wrap(err, "failed to decode")
  367. }
  368. _, lenientCodecs, lenientErr := newLenientSchemeAndCodecs()
  369. if lenientErr != nil {
  370. return nil, lenientErr
  371. }
  372. configObj, gvk, lenientErr = lenientCodecs.UniversalDecoder().Decode(data, nil, nil)
  373. if lenientErr != nil {
  374. // Lenient decoding failed with the current version, return the
  375. // original strict error.
  376. return nil, fmt.Errorf("failed lenient decoding: %v", err)
  377. }
  378. // Continue with the v1alpha1 object that was decoded leniently, but emit a warning.
  379. klog.Warningf("using lenient decoding as strict decoding failed: %v", err)
  380. }
  381. proxyConfig, ok := configObj.(*kubeproxyconfig.KubeProxyConfiguration)
  382. if !ok {
  383. return nil, fmt.Errorf("got unexpected config type: %v", gvk)
  384. }
  385. return proxyConfig, nil
  386. }
  387. // ApplyDefaults applies the default values to Options.
  388. func (o *Options) ApplyDefaults(in *kubeproxyconfig.KubeProxyConfiguration) (*kubeproxyconfig.KubeProxyConfiguration, error) {
  389. external, err := proxyconfigscheme.Scheme.ConvertToVersion(in, v1alpha1.SchemeGroupVersion)
  390. if err != nil {
  391. return nil, err
  392. }
  393. proxyconfigscheme.Scheme.Default(external)
  394. internal, err := proxyconfigscheme.Scheme.ConvertToVersion(external, kubeproxyconfig.SchemeGroupVersion)
  395. if err != nil {
  396. return nil, err
  397. }
  398. out := internal.(*kubeproxyconfig.KubeProxyConfiguration)
  399. return out, nil
  400. }
  401. // NewProxyCommand creates a *cobra.Command object with default parameters
  402. func NewProxyCommand() *cobra.Command {
  403. opts := NewOptions()
  404. cmd := &cobra.Command{
  405. Use: "kube-proxy",
  406. Long: `The Kubernetes network proxy runs on each node. This
  407. reflects services as defined in the Kubernetes API on each node and can do simple
  408. TCP, UDP, and SCTP stream forwarding or round robin TCP, UDP, and SCTP forwarding across a set of backends.
  409. Service cluster IPs and ports are currently found through Docker-links-compatible
  410. environment variables specifying ports opened by the service proxy. There is an optional
  411. addon that provides cluster DNS for these cluster IPs. The user must create a service
  412. with the apiserver API to configure the proxy.`,
  413. Run: func(cmd *cobra.Command, args []string) {
  414. verflag.PrintAndExitIfRequested()
  415. utilflag.PrintFlags(cmd.Flags())
  416. if err := initForOS(opts.WindowsService); err != nil {
  417. klog.Fatalf("failed OS init: %v", err)
  418. }
  419. if err := opts.Complete(); err != nil {
  420. klog.Fatalf("failed complete: %v", err)
  421. }
  422. if err := opts.Validate(args); err != nil {
  423. klog.Fatalf("failed validate: %v", err)
  424. }
  425. if err := opts.Run(); err != nil {
  426. klog.Exit(err)
  427. }
  428. },
  429. }
  430. var err error
  431. opts.config, err = opts.ApplyDefaults(opts.config)
  432. if err != nil {
  433. klog.Fatalf("unable to create flag defaults: %v", err)
  434. }
  435. opts.AddFlags(cmd.Flags())
  436. // TODO handle error
  437. cmd.MarkFlagFilename("config", "yaml", "yml", "json")
  438. return cmd
  439. }
  440. // ProxyServer represents all the parameters required to start the Kubernetes proxy server. All
  441. // fields are required.
  442. type ProxyServer struct {
  443. Client clientset.Interface
  444. EventClient v1core.EventsGetter
  445. IptInterface utiliptables.Interface
  446. IpvsInterface utilipvs.Interface
  447. IpsetInterface utilipset.Interface
  448. execer exec.Interface
  449. Proxier proxy.Provider
  450. Broadcaster record.EventBroadcaster
  451. Recorder record.EventRecorder
  452. ConntrackConfiguration kubeproxyconfig.KubeProxyConntrackConfiguration
  453. Conntracker Conntracker // if nil, ignored
  454. ProxyMode string
  455. NodeRef *v1.ObjectReference
  456. CleanupIPVS bool
  457. MetricsBindAddress string
  458. EnableProfiling bool
  459. UseEndpointSlices bool
  460. OOMScoreAdj *int32
  461. ConfigSyncPeriod time.Duration
  462. HealthzServer healthcheck.ProxierHealthUpdater
  463. }
  464. // createClients creates a kube client and an event client from the given config and masterOverride.
  465. // TODO remove masterOverride when CLI flags are removed.
  466. func createClients(config componentbaseconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, v1core.EventsGetter, error) {
  467. var kubeConfig *rest.Config
  468. var err error
  469. if len(config.Kubeconfig) == 0 && len(masterOverride) == 0 {
  470. klog.Info("Neither kubeconfig file nor master URL was specified. Falling back to in-cluster config.")
  471. kubeConfig, err = rest.InClusterConfig()
  472. } else {
  473. // This creates a client, first loading any specified kubeconfig
  474. // file, and then overriding the Master flag, if non-empty.
  475. kubeConfig, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
  476. &clientcmd.ClientConfigLoadingRules{ExplicitPath: config.Kubeconfig},
  477. &clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterOverride}}).ClientConfig()
  478. }
  479. if err != nil {
  480. return nil, nil, err
  481. }
  482. kubeConfig.AcceptContentTypes = config.AcceptContentTypes
  483. kubeConfig.ContentType = config.ContentType
  484. kubeConfig.QPS = config.QPS
  485. kubeConfig.Burst = int(config.Burst)
  486. client, err := clientset.NewForConfig(kubeConfig)
  487. if err != nil {
  488. return nil, nil, err
  489. }
  490. eventClient, err := clientset.NewForConfig(kubeConfig)
  491. if err != nil {
  492. return nil, nil, err
  493. }
  494. return client, eventClient.CoreV1(), nil
  495. }
  496. // Run runs the specified ProxyServer. This should never exit (unless CleanupAndExit is set).
  497. // TODO: At the moment, Run() cannot return a nil error, otherwise it's caller will never exit. Update callers of Run to handle nil errors.
  498. func (s *ProxyServer) Run() error {
  499. // To help debugging, immediately log version
  500. klog.Infof("Version: %+v", version.Get())
  501. // TODO(vmarmol): Use container config for this.
  502. var oomAdjuster *oom.OOMAdjuster
  503. if s.OOMScoreAdj != nil {
  504. oomAdjuster = oom.NewOOMAdjuster()
  505. if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.OOMScoreAdj)); err != nil {
  506. klog.V(2).Info(err)
  507. }
  508. }
  509. if s.Broadcaster != nil && s.EventClient != nil {
  510. s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")})
  511. }
  512. // Start up a healthz server if requested
  513. if s.HealthzServer != nil {
  514. s.HealthzServer.Run()
  515. }
  516. // Start up a metrics server if requested
  517. if len(s.MetricsBindAddress) > 0 {
  518. proxyMux := mux.NewPathRecorderMux("kube-proxy")
  519. healthz.InstallHandler(proxyMux)
  520. proxyMux.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) {
  521. w.Header().Set("Content-Type", "text/plain; charset=utf-8")
  522. w.Header().Set("X-Content-Type-Options", "nosniff")
  523. fmt.Fprintf(w, "%s", s.ProxyMode)
  524. })
  525. //lint:ignore SA1019 See the Metrics Stability Migration KEP
  526. proxyMux.Handle("/metrics", legacyregistry.Handler())
  527. if s.EnableProfiling {
  528. routes.Profiling{}.Install(proxyMux)
  529. }
  530. configz.InstallHandler(proxyMux)
  531. go wait.Until(func() {
  532. err := http.ListenAndServe(s.MetricsBindAddress, proxyMux)
  533. if err != nil {
  534. utilruntime.HandleError(fmt.Errorf("starting metrics server failed: %v", err))
  535. }
  536. }, 5*time.Second, wait.NeverStop)
  537. }
  538. // Tune conntrack, if requested
  539. // Conntracker is always nil for windows
  540. if s.Conntracker != nil {
  541. max, err := getConntrackMax(s.ConntrackConfiguration)
  542. if err != nil {
  543. return err
  544. }
  545. if max > 0 {
  546. err := s.Conntracker.SetMax(max)
  547. if err != nil {
  548. if err != errReadOnlySysFS {
  549. return err
  550. }
  551. // errReadOnlySysFS is caused by a known docker issue (https://github.com/docker/docker/issues/24000),
  552. // the only remediation we know is to restart the docker daemon.
  553. // Here we'll send an node event with specific reason and message, the
  554. // administrator should decide whether and how to handle this issue,
  555. // whether to drain the node and restart docker. Occurs in other container runtimes
  556. // as well.
  557. // TODO(random-liu): Remove this when the docker bug is fixed.
  558. const message = "CRI error: /sys is read-only: " +
  559. "cannot modify conntrack limits, problems may arise later (If running Docker, see docker issue #24000)"
  560. s.Recorder.Eventf(s.NodeRef, api.EventTypeWarning, err.Error(), message)
  561. }
  562. }
  563. if s.ConntrackConfiguration.TCPEstablishedTimeout != nil && s.ConntrackConfiguration.TCPEstablishedTimeout.Duration > 0 {
  564. timeout := int(s.ConntrackConfiguration.TCPEstablishedTimeout.Duration / time.Second)
  565. if err := s.Conntracker.SetTCPEstablishedTimeout(timeout); err != nil {
  566. return err
  567. }
  568. }
  569. if s.ConntrackConfiguration.TCPCloseWaitTimeout != nil && s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration > 0 {
  570. timeout := int(s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration / time.Second)
  571. if err := s.Conntracker.SetTCPCloseWaitTimeout(timeout); err != nil {
  572. return err
  573. }
  574. }
  575. }
  576. noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
  577. if err != nil {
  578. return err
  579. }
  580. noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)
  581. if err != nil {
  582. return err
  583. }
  584. labelSelector := labels.NewSelector()
  585. labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)
  586. // Make informers that filter out objects that want a non-default service proxy.
  587. informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
  588. informers.WithTweakListOptions(func(options *metav1.ListOptions) {
  589. options.LabelSelector = labelSelector.String()
  590. }))
  591. // Create configs (i.e. Watches for Services and Endpoints or EndpointSlices)
  592. // Note: RegisterHandler() calls need to happen before creation of Sources because sources
  593. // only notify on changes, and the initial update (on process start) may be lost if no handlers
  594. // are registered yet.
  595. serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
  596. serviceConfig.RegisterEventHandler(s.Proxier)
  597. go serviceConfig.Run(wait.NeverStop)
  598. if s.UseEndpointSlices {
  599. endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1beta1().EndpointSlices(), s.ConfigSyncPeriod)
  600. endpointSliceConfig.RegisterEventHandler(s.Proxier)
  601. go endpointSliceConfig.Run(wait.NeverStop)
  602. } else {
  603. endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
  604. endpointsConfig.RegisterEventHandler(s.Proxier)
  605. go endpointsConfig.Run(wait.NeverStop)
  606. }
  607. // This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
  608. // functions must configure their shared informer event handlers first.
  609. informerFactory.Start(wait.NeverStop)
  610. if utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) {
  611. // Make an informer that selects for our nodename.
  612. currentNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
  613. informers.WithTweakListOptions(func(options *metav1.ListOptions) {
  614. options.FieldSelector = fields.OneTermEqualSelector("metadata.name", s.NodeRef.Name).String()
  615. }))
  616. nodeConfig := config.NewNodeConfig(currentNodeInformerFactory.Core().V1().Nodes(), s.ConfigSyncPeriod)
  617. nodeConfig.RegisterEventHandler(s.Proxier)
  618. go nodeConfig.Run(wait.NeverStop)
  619. // This has to start after the calls to NewNodeConfig because that must
  620. // configure the shared informer event handler first.
  621. currentNodeInformerFactory.Start(wait.NeverStop)
  622. }
  623. // Birth Cry after the birth is successful
  624. s.birthCry()
  625. // Just loop forever for now...
  626. s.Proxier.SyncLoop()
  627. return nil
  628. }
  629. func (s *ProxyServer) birthCry() {
  630. s.Recorder.Eventf(s.NodeRef, api.EventTypeNormal, "Starting", "Starting kube-proxy.")
  631. }
  632. func getConntrackMax(config kubeproxyconfig.KubeProxyConntrackConfiguration) (int, error) {
  633. if config.MaxPerCore != nil && *config.MaxPerCore > 0 {
  634. floor := 0
  635. if config.Min != nil {
  636. floor = int(*config.Min)
  637. }
  638. scaled := int(*config.MaxPerCore) * goruntime.NumCPU()
  639. if scaled > floor {
  640. klog.V(3).Infof("getConntrackMax: using scaled conntrack-max-per-core")
  641. return scaled, nil
  642. }
  643. klog.V(3).Infof("getConntrackMax: using conntrack-min")
  644. return floor, nil
  645. }
  646. return 0, nil
  647. }
  648. // CleanupAndExit remove iptables rules and exit if success return nil
  649. func (s *ProxyServer) CleanupAndExit() error {
  650. encounteredError := userspace.CleanupLeftovers(s.IptInterface)
  651. encounteredError = iptables.CleanupLeftovers(s.IptInterface) || encounteredError
  652. encounteredError = ipvs.CleanupLeftovers(s.IpvsInterface, s.IptInterface, s.IpsetInterface, s.CleanupIPVS) || encounteredError
  653. if encounteredError {
  654. return errors.New("encountered an error while tearing down rules")
  655. }
  656. return nil
  657. }