controllermanager.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package app
  14. import (
  15. "context"
  16. "flag"
  17. "fmt"
  18. "net/http"
  19. "os"
  20. "time"
  21. "github.com/spf13/cobra"
  22. "k8s.io/apimachinery/pkg/util/sets"
  23. "k8s.io/apimachinery/pkg/util/uuid"
  24. "k8s.io/apimachinery/pkg/util/wait"
  25. "k8s.io/apiserver/pkg/server"
  26. "k8s.io/apiserver/pkg/server/healthz"
  27. "k8s.io/apiserver/pkg/util/term"
  28. "k8s.io/client-go/tools/leaderelection"
  29. "k8s.io/client-go/tools/leaderelection/resourcelock"
  30. cloudprovider "k8s.io/cloud-provider"
  31. cliflag "k8s.io/component-base/cli/flag"
  32. "k8s.io/component-base/cli/globalflag"
  33. "k8s.io/klog"
  34. cloudcontrollerconfig "k8s.io/kubernetes/cmd/cloud-controller-manager/app/config"
  35. "k8s.io/kubernetes/cmd/cloud-controller-manager/app/options"
  36. genericcontrollermanager "k8s.io/kubernetes/cmd/controller-manager/app"
  37. "k8s.io/kubernetes/pkg/util/configz"
  38. utilflag "k8s.io/kubernetes/pkg/util/flag"
  39. "k8s.io/kubernetes/pkg/version"
  40. "k8s.io/kubernetes/pkg/version/verflag"
  41. )
  42. const (
  43. // ControllerStartJitter is the jitter value used when starting controller managers.
  44. ControllerStartJitter = 1.0
  45. // ConfigzName is the name used for register cloud-controller manager /configz, same with GroupName.
  46. ConfigzName = "cloudcontrollermanager.config.k8s.io"
  47. )
  48. // NewCloudControllerManagerCommand creates a *cobra.Command object with default parameters
  49. func NewCloudControllerManagerCommand() *cobra.Command {
  50. s, err := options.NewCloudControllerManagerOptions()
  51. if err != nil {
  52. klog.Fatalf("unable to initialize command options: %v", err)
  53. }
  54. cmd := &cobra.Command{
  55. Use: "cloud-controller-manager",
  56. Long: `The Cloud controller manager is a daemon that embeds
  57. the cloud specific control loops shipped with Kubernetes.`,
  58. Run: func(cmd *cobra.Command, args []string) {
  59. verflag.PrintAndExitIfRequested()
  60. utilflag.PrintFlags(cmd.Flags())
  61. c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
  62. if err != nil {
  63. fmt.Fprintf(os.Stderr, "%v\n", err)
  64. os.Exit(1)
  65. }
  66. if err := Run(c.Complete(), wait.NeverStop); err != nil {
  67. fmt.Fprintf(os.Stderr, "%v\n", err)
  68. os.Exit(1)
  69. }
  70. },
  71. }
  72. fs := cmd.Flags()
  73. namedFlagSets := s.Flags(KnownControllers(), ControllersDisabledByDefault.List())
  74. verflag.AddFlags(namedFlagSets.FlagSet("global"))
  75. globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
  76. if flag.CommandLine.Lookup("cloud-provider-gce-lb-src-cidrs") != nil {
  77. // hoist this flag from the global flagset to preserve the commandline until
  78. // the gce cloudprovider is removed.
  79. globalflag.Register(namedFlagSets.FlagSet("generic"), "cloud-provider-gce-lb-src-cidrs")
  80. }
  81. for _, f := range namedFlagSets.FlagSets {
  82. fs.AddFlagSet(f)
  83. }
  84. usageFmt := "Usage:\n %s\n"
  85. cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
  86. cmd.SetUsageFunc(func(cmd *cobra.Command) error {
  87. fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine())
  88. cliflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols)
  89. return nil
  90. })
  91. cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
  92. fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
  93. cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
  94. })
  95. return cmd
  96. }
  97. // Run runs the ExternalCMServer. This should never exit.
  98. func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error {
  99. // To help debugging, immediately log version
  100. klog.Infof("Version: %+v", version.Get())
  101. cloud, err := cloudprovider.InitCloudProvider(c.ComponentConfig.KubeCloudShared.CloudProvider.Name, c.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile)
  102. if err != nil {
  103. klog.Fatalf("Cloud provider could not be initialized: %v", err)
  104. }
  105. if cloud == nil {
  106. klog.Fatalf("cloud provider is nil")
  107. }
  108. if !cloud.HasClusterID() {
  109. if c.ComponentConfig.KubeCloudShared.AllowUntaggedCloud {
  110. klog.Warning("detected a cluster without a ClusterID. A ClusterID will be required in the future. Please tag your cluster to avoid any future issues")
  111. } else {
  112. klog.Fatalf("no ClusterID found. A ClusterID is required for the cloud provider to function properly. This check can be bypassed by setting the allow-untagged-cloud option")
  113. }
  114. }
  115. // setup /configz endpoint
  116. if cz, err := configz.New(ConfigzName); err == nil {
  117. cz.Set(c.ComponentConfig)
  118. } else {
  119. klog.Errorf("unable to register configz: %c", err)
  120. }
  121. // Setup any healthz checks we will want to use.
  122. var checks []healthz.HealthzChecker
  123. var electionChecker *leaderelection.HealthzAdaptor
  124. if c.ComponentConfig.Generic.LeaderElection.LeaderElect {
  125. electionChecker = leaderelection.NewLeaderHealthzAdaptor(time.Second * 20)
  126. checks = append(checks, electionChecker)
  127. }
  128. // Start the controller manager HTTP server
  129. if c.SecureServing != nil {
  130. unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
  131. handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
  132. // TODO: handle stoppedCh returned by c.SecureServing.Serve
  133. if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
  134. return err
  135. }
  136. }
  137. if c.InsecureServing != nil {
  138. unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
  139. insecureSuperuserAuthn := server.AuthenticationInfo{Authenticator: &server.InsecureSuperuser{}}
  140. handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, nil, &insecureSuperuserAuthn)
  141. if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
  142. return err
  143. }
  144. }
  145. run := func(ctx context.Context) {
  146. if err := startControllers(c, ctx.Done(), cloud, newControllerInitializers()); err != nil {
  147. klog.Fatalf("error running controllers: %v", err)
  148. }
  149. }
  150. if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
  151. run(context.TODO())
  152. panic("unreachable")
  153. }
  154. // Identity used to distinguish between multiple cloud controller manager instances
  155. id, err := os.Hostname()
  156. if err != nil {
  157. return err
  158. }
  159. // add a uniquifier so that two processes on the same host don't accidentally both become active
  160. id = id + "_" + string(uuid.NewUUID())
  161. // Lock required for leader election
  162. rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
  163. "kube-system",
  164. "cloud-controller-manager",
  165. c.LeaderElectionClient.CoreV1(),
  166. c.LeaderElectionClient.CoordinationV1(),
  167. resourcelock.ResourceLockConfig{
  168. Identity: id,
  169. EventRecorder: c.EventRecorder,
  170. })
  171. if err != nil {
  172. klog.Fatalf("error creating lock: %v", err)
  173. }
  174. // Try and become the leader and start cloud controller manager loops
  175. leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
  176. Lock: rl,
  177. LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
  178. RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
  179. RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
  180. Callbacks: leaderelection.LeaderCallbacks{
  181. OnStartedLeading: run,
  182. OnStoppedLeading: func() {
  183. klog.Fatalf("leaderelection lost")
  184. },
  185. },
  186. WatchDog: electionChecker,
  187. Name: "cloud-controller-manager",
  188. })
  189. panic("unreachable")
  190. }
  191. // startControllers starts the cloud specific controller loops.
  192. func startControllers(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}, cloud cloudprovider.Interface, controllers map[string]initFunc) error {
  193. // Initialize the cloud provider with a reference to the clientBuilder
  194. cloud.Initialize(c.ClientBuilder, stopCh)
  195. // Set the informer on the user cloud object
  196. if informerUserCloud, ok := cloud.(cloudprovider.InformerUser); ok {
  197. informerUserCloud.SetInformers(c.SharedInformers)
  198. }
  199. for controllerName, initFn := range controllers {
  200. if !genericcontrollermanager.IsControllerEnabled(controllerName, ControllersDisabledByDefault, c.ComponentConfig.Generic.Controllers) {
  201. klog.Warningf("%q is disabled", controllerName)
  202. continue
  203. }
  204. klog.V(1).Infof("Starting %q", controllerName)
  205. _, started, err := initFn(c, cloud, stopCh)
  206. if err != nil {
  207. klog.Errorf("Error starting %q", controllerName)
  208. return err
  209. }
  210. if !started {
  211. klog.Warningf("Skipping %q", controllerName)
  212. continue
  213. }
  214. klog.Infof("Started %q", controllerName)
  215. time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
  216. }
  217. // If apiserver is not running we should wait for some time and fail only then. This is particularly
  218. // important when we start apiserver and controller manager at the same time.
  219. if err := genericcontrollermanager.WaitForAPIServer(c.VersionedClient, 10*time.Second); err != nil {
  220. klog.Fatalf("Failed to wait for apiserver being healthy: %v", err)
  221. }
  222. c.SharedInformers.Start(stopCh)
  223. select {}
  224. }
  225. // initFunc is used to launch a particular controller. It may run additional "should I activate checks".
  226. // Any error returned will cause the controller process to `Fatal`
  227. // The bool indicates whether the controller was enabled.
  228. type initFunc func(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stop <-chan struct{}) (debuggingHandler http.Handler, enabled bool, err error)
  229. // KnownControllers indicate the default controller we are known.
  230. func KnownControllers() []string {
  231. ret := sets.StringKeySet(newControllerInitializers())
  232. return ret.List()
  233. }
  234. // ControllersDisabledByDefault is the controller disabled default when starting cloud-controller managers.
  235. var ControllersDisabledByDefault = sets.NewString()
  236. // newControllerInitializers is a private map of named controller groups (you can start more than one in an init func)
  237. // paired to their initFunc. This allows for structured downstream composition and subdivision.
  238. func newControllerInitializers() map[string]initFunc {
  239. controllers := map[string]initFunc{}
  240. controllers["cloud-node"] = startCloudNodeController
  241. controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
  242. controllers["service"] = startServiceController
  243. controllers["route"] = startRouteController
  244. return controllers
  245. }