controllermanager.go 11 KB


  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package app
  14. import (
  15. "context"
  16. "flag"
  17. "fmt"
  18. "net/http"
  19. "os"
  20. "time"
  21. "github.com/spf13/cobra"
  22. "k8s.io/apimachinery/pkg/util/sets"
  23. "k8s.io/apimachinery/pkg/util/uuid"
  24. "k8s.io/apimachinery/pkg/util/wait"
  25. "k8s.io/apiserver/pkg/server"
  26. "k8s.io/apiserver/pkg/server/healthz"
  27. "k8s.io/apiserver/pkg/util/term"
  28. "k8s.io/client-go/tools/leaderelection"
  29. "k8s.io/client-go/tools/leaderelection/resourcelock"
  30. cloudprovider "k8s.io/cloud-provider"
  31. cliflag "k8s.io/component-base/cli/flag"
  32. "k8s.io/component-base/cli/globalflag"
  33. "k8s.io/component-base/version"
  34. "k8s.io/component-base/version/verflag"
  35. "k8s.io/klog"
  36. cloudcontrollerconfig "k8s.io/kubernetes/cmd/cloud-controller-manager/app/config"
  37. "k8s.io/kubernetes/cmd/cloud-controller-manager/app/options"
  38. genericcontrollermanager "k8s.io/kubernetes/cmd/controller-manager/app"
  39. "k8s.io/kubernetes/pkg/util/configz"
  40. utilflag "k8s.io/kubernetes/pkg/util/flag"
  41. )
  42. const (
  43. // ControllerStartJitter is the jitter value used when starting controller managers.
  44. ControllerStartJitter = 1.0
  45. // ConfigzName is the name used for register cloud-controller manager /configz, same with GroupName.
  46. ConfigzName = "cloudcontrollermanager.config.k8s.io"
  47. )
  48. // NewCloudControllerManagerCommand creates a *cobra.Command object with default parameters
  49. func NewCloudControllerManagerCommand() *cobra.Command {
  50. s, err := options.NewCloudControllerManagerOptions()
  51. if err != nil {
  52. klog.Fatalf("unable to initialize command options: %v", err)
  53. }
  54. cmd := &cobra.Command{
  55. Use: "cloud-controller-manager",
  56. Long: `The Cloud controller manager is a daemon that embeds
  57. the cloud specific control loops shipped with Kubernetes.`,
  58. Run: func(cmd *cobra.Command, args []string) {
  59. verflag.PrintAndExitIfRequested()
  60. utilflag.PrintFlags(cmd.Flags())
  61. c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
  62. if err != nil {
  63. fmt.Fprintf(os.Stderr, "%v\n", err)
  64. os.Exit(1)
  65. }
  66. if err := Run(c.Complete(), wait.NeverStop); err != nil {
  67. fmt.Fprintf(os.Stderr, "%v\n", err)
  68. os.Exit(1)
  69. }
  70. },
  71. }
  72. fs := cmd.Flags()
  73. namedFlagSets := s.Flags(KnownControllers(), ControllersDisabledByDefault.List())
  74. verflag.AddFlags(namedFlagSets.FlagSet("global"))
  75. globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
  76. if flag.CommandLine.Lookup("cloud-provider-gce-lb-src-cidrs") != nil {
  77. // hoist this flag from the global flagset to preserve the commandline until
  78. // the gce cloudprovider is removed.
  79. globalflag.Register(namedFlagSets.FlagSet("generic"), "cloud-provider-gce-lb-src-cidrs")
  80. }
  81. if flag.CommandLine.Lookup("cloud-provider-gce-l7lb-src-cidrs") != nil {
  82. globalflag.Register(namedFlagSets.FlagSet("generic"), "cloud-provider-gce-l7lb-src-cidrs")
  83. }
  84. for _, f := range namedFlagSets.FlagSets {
  85. fs.AddFlagSet(f)
  86. }
  87. usageFmt := "Usage:\n %s\n"
  88. cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
  89. cmd.SetUsageFunc(func(cmd *cobra.Command) error {
  90. fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine())
  91. cliflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols)
  92. return nil
  93. })
  94. cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
  95. fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
  96. cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
  97. })
  98. return cmd
  99. }
  100. // Run runs the ExternalCMServer. This should never exit.
  101. func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error {
  102. // To help debugging, immediately log version
  103. klog.Infof("Version: %+v", version.Get())
  104. cloud, err := cloudprovider.InitCloudProvider(c.ComponentConfig.KubeCloudShared.CloudProvider.Name, c.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile)
  105. if err != nil {
  106. klog.Fatalf("Cloud provider could not be initialized: %v", err)
  107. }
  108. if cloud == nil {
  109. klog.Fatalf("cloud provider is nil")
  110. }
  111. if !cloud.HasClusterID() {
  112. if c.ComponentConfig.KubeCloudShared.AllowUntaggedCloud {
  113. klog.Warning("detected a cluster without a ClusterID. A ClusterID will be required in the future. Please tag your cluster to avoid any future issues")
  114. } else {
  115. klog.Fatalf("no ClusterID found. A ClusterID is required for the cloud provider to function properly. This check can be bypassed by setting the allow-untagged-cloud option")
  116. }
  117. }
  118. // setup /configz endpoint
  119. if cz, err := configz.New(ConfigzName); err == nil {
  120. cz.Set(c.ComponentConfig)
  121. } else {
  122. klog.Errorf("unable to register configz: %v", err)
  123. }
  124. // Setup any health checks we will want to use.
  125. var checks []healthz.HealthChecker
  126. var electionChecker *leaderelection.HealthzAdaptor
  127. if c.ComponentConfig.Generic.LeaderElection.LeaderElect {
  128. electionChecker = leaderelection.NewLeaderHealthzAdaptor(time.Second * 20)
  129. checks = append(checks, electionChecker)
  130. }
  131. // Start the controller manager HTTP server
  132. if c.SecureServing != nil {
  133. unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
  134. handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
  135. // TODO: handle stoppedCh returned by c.SecureServing.Serve
  136. if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
  137. return err
  138. }
  139. }
  140. if c.InsecureServing != nil {
  141. unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
  142. insecureSuperuserAuthn := server.AuthenticationInfo{Authenticator: &server.InsecureSuperuser{}}
  143. handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, nil, &insecureSuperuserAuthn)
  144. if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
  145. return err
  146. }
  147. }
  148. run := func(ctx context.Context) {
  149. if err := startControllers(c, ctx.Done(), cloud, newControllerInitializers()); err != nil {
  150. klog.Fatalf("error running controllers: %v", err)
  151. }
  152. }
  153. if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
  154. run(context.TODO())
  155. panic("unreachable")
  156. }
  157. // Identity used to distinguish between multiple cloud controller manager instances
  158. id, err := os.Hostname()
  159. if err != nil {
  160. return err
  161. }
  162. // add a uniquifier so that two processes on the same host don't accidentally both become active
  163. id = id + "_" + string(uuid.NewUUID())
  164. // Lock required for leader election
  165. rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
  166. c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
  167. c.ComponentConfig.Generic.LeaderElection.ResourceName,
  168. c.LeaderElectionClient.CoreV1(),
  169. c.LeaderElectionClient.CoordinationV1(),
  170. resourcelock.ResourceLockConfig{
  171. Identity: id,
  172. EventRecorder: c.EventRecorder,
  173. })
  174. if err != nil {
  175. klog.Fatalf("error creating lock: %v", err)
  176. }
  177. // Try and become the leader and start cloud controller manager loops
  178. leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
  179. Lock: rl,
  180. LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
  181. RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
  182. RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
  183. Callbacks: leaderelection.LeaderCallbacks{
  184. OnStartedLeading: run,
  185. OnStoppedLeading: func() {
  186. klog.Fatalf("leaderelection lost")
  187. },
  188. },
  189. WatchDog: electionChecker,
  190. Name: "cloud-controller-manager",
  191. })
  192. panic("unreachable")
  193. }
  194. // startControllers starts the cloud specific controller loops.
  195. func startControllers(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}, cloud cloudprovider.Interface, controllers map[string]initFunc) error {
  196. // Initialize the cloud provider with a reference to the clientBuilder
  197. cloud.Initialize(c.ClientBuilder, stopCh)
  198. // Set the informer on the user cloud object
  199. if informerUserCloud, ok := cloud.(cloudprovider.InformerUser); ok {
  200. informerUserCloud.SetInformers(c.SharedInformers)
  201. }
  202. for controllerName, initFn := range controllers {
  203. if !genericcontrollermanager.IsControllerEnabled(controllerName, ControllersDisabledByDefault, c.ComponentConfig.Generic.Controllers) {
  204. klog.Warningf("%q is disabled", controllerName)
  205. continue
  206. }
  207. klog.V(1).Infof("Starting %q", controllerName)
  208. _, started, err := initFn(c, cloud, stopCh)
  209. if err != nil {
  210. klog.Errorf("Error starting %q", controllerName)
  211. return err
  212. }
  213. if !started {
  214. klog.Warningf("Skipping %q", controllerName)
  215. continue
  216. }
  217. klog.Infof("Started %q", controllerName)
  218. time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
  219. }
  220. // If apiserver is not running we should wait for some time and fail only then. This is particularly
  221. // important when we start apiserver and controller manager at the same time.
  222. if err := genericcontrollermanager.WaitForAPIServer(c.VersionedClient, 10*time.Second); err != nil {
  223. klog.Fatalf("Failed to wait for apiserver being healthy: %v", err)
  224. }
  225. c.SharedInformers.Start(stopCh)
  226. select {}
  227. }
  228. // initFunc is used to launch a particular controller. It may run additional "should I activate checks".
  229. // Any error returned will cause the controller process to `Fatal`
  230. // The bool indicates whether the controller was enabled.
  231. type initFunc func(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stop <-chan struct{}) (debuggingHandler http.Handler, enabled bool, err error)
  232. // KnownControllers indicate the default controller we are known.
  233. func KnownControllers() []string {
  234. ret := sets.StringKeySet(newControllerInitializers())
  235. return ret.List()
  236. }
  237. // ControllersDisabledByDefault is the controller disabled default when starting cloud-controller managers.
  238. var ControllersDisabledByDefault = sets.NewString()
  239. // newControllerInitializers is a private map of named controller groups (you can start more than one in an init func)
  240. // paired to their initFunc. This allows for structured downstream composition and subdivision.
  241. func newControllerInitializers() map[string]initFunc {
  242. controllers := map[string]initFunc{}
  243. controllers["cloud-node"] = startCloudNodeController
  244. controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
  245. controllers["service"] = startServiceController
  246. controllers["route"] = startRouteController
  247. return controllers
  248. }