example_plugin.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package pluginwatcher
  14. import (
  15. "errors"
  16. "fmt"
  17. "net"
  18. "os"
  19. "sync"
  20. "time"
  21. "golang.org/x/net/context"
  22. "google.golang.org/grpc"
  23. "k8s.io/klog"
  24. registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
  25. "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
  26. v1beta1 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta1"
  27. v1beta2 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta2"
  28. )
  29. // examplePlugin is a sample plugin to work with plugin watcher
  30. type examplePlugin struct {
  31. grpcServer *grpc.Server
  32. wg sync.WaitGroup
  33. registrationStatus chan registerapi.RegistrationStatus // for testing
  34. endpoint string // for testing
  35. pluginName string
  36. pluginType string
  37. versions []string
  38. }
  39. type pluginServiceV1Beta1 struct {
  40. server *examplePlugin
  41. }
  42. func (s *pluginServiceV1Beta1) GetExampleInfo(ctx context.Context, rqt *v1beta1.ExampleRequest) (*v1beta1.ExampleResponse, error) {
  43. klog.Infof("GetExampleInfo v1beta1field: %s", rqt.V1Beta1Field)
  44. return &v1beta1.ExampleResponse{}, nil
  45. }
  46. func (s *pluginServiceV1Beta1) RegisterService() {
  47. v1beta1.RegisterExampleServer(s.server.grpcServer, s)
  48. }
  49. type pluginServiceV1Beta2 struct {
  50. server *examplePlugin
  51. }
  52. func (s *pluginServiceV1Beta2) GetExampleInfo(ctx context.Context, rqt *v1beta2.ExampleRequest) (*v1beta2.ExampleResponse, error) {
  53. klog.Infof("GetExampleInfo v1beta2_field: %s", rqt.V1Beta2Field)
  54. return &v1beta2.ExampleResponse{}, nil
  55. }
  56. func (s *pluginServiceV1Beta2) RegisterService() {
  57. v1beta2.RegisterExampleServer(s.server.grpcServer, s)
  58. }
  59. // NewExamplePlugin returns an initialized examplePlugin instance
  60. func NewExamplePlugin() *examplePlugin {
  61. return &examplePlugin{}
  62. }
  63. // NewTestExamplePlugin returns an initialized examplePlugin instance for testing
  64. func NewTestExamplePlugin(pluginName string, pluginType string, endpoint string, advertisedVersions ...string) *examplePlugin {
  65. return &examplePlugin{
  66. pluginName: pluginName,
  67. pluginType: pluginType,
  68. endpoint: endpoint,
  69. versions: advertisedVersions,
  70. registrationStatus: make(chan registerapi.RegistrationStatus),
  71. }
  72. }
  73. // GetPluginInfo returns a PluginInfo object
  74. func GetPluginInfo(plugin *examplePlugin, foundInDeprecatedDir bool) cache.PluginInfo {
  75. return cache.PluginInfo{
  76. SocketPath: plugin.endpoint,
  77. FoundInDeprecatedDir: foundInDeprecatedDir,
  78. }
  79. }
  80. // GetInfo is the RPC invoked by plugin watcher
  81. func (e *examplePlugin) GetInfo(ctx context.Context, req *registerapi.InfoRequest) (*registerapi.PluginInfo, error) {
  82. return &registerapi.PluginInfo{
  83. Type: e.pluginType,
  84. Name: e.pluginName,
  85. Endpoint: e.endpoint,
  86. SupportedVersions: e.versions,
  87. }, nil
  88. }
  89. func (e *examplePlugin) NotifyRegistrationStatus(ctx context.Context, status *registerapi.RegistrationStatus) (*registerapi.RegistrationStatusResponse, error) {
  90. klog.Errorf("Registration is: %v\n", status)
  91. if e.registrationStatus != nil {
  92. e.registrationStatus <- *status
  93. }
  94. return &registerapi.RegistrationStatusResponse{}, nil
  95. }
  96. // Serve starts a pluginwatcher server and one or more of the plugin services
  97. func (e *examplePlugin) Serve(services ...string) error {
  98. klog.Infof("starting example server at: %s\n", e.endpoint)
  99. lis, err := net.Listen("unix", e.endpoint)
  100. if err != nil {
  101. return err
  102. }
  103. klog.Infof("example server started at: %s\n", e.endpoint)
  104. e.grpcServer = grpc.NewServer()
  105. // Registers kubelet plugin watcher api.
  106. registerapi.RegisterRegistrationServer(e.grpcServer, e)
  107. for _, service := range services {
  108. switch service {
  109. case "v1beta1":
  110. v1beta1 := &pluginServiceV1Beta1{server: e}
  111. v1beta1.RegisterService()
  112. break
  113. case "v1beta2":
  114. v1beta2 := &pluginServiceV1Beta2{server: e}
  115. v1beta2.RegisterService()
  116. break
  117. default:
  118. return fmt.Errorf("Unsupported service: '%s'", service)
  119. }
  120. }
  121. // Starts service
  122. e.wg.Add(1)
  123. go func() {
  124. defer e.wg.Done()
  125. // Blocking call to accept incoming connections.
  126. if err := e.grpcServer.Serve(lis); err != nil {
  127. klog.Errorf("example server stopped serving: %v", err)
  128. }
  129. }()
  130. return nil
  131. }
  132. func (e *examplePlugin) Stop() error {
  133. klog.Infof("Stopping example server at: %s\n", e.endpoint)
  134. e.grpcServer.Stop()
  135. c := make(chan struct{})
  136. go func() {
  137. defer close(c)
  138. e.wg.Wait()
  139. }()
  140. select {
  141. case <-c:
  142. break
  143. case <-time.After(time.Second):
  144. return errors.New("Timed out on waiting for stop completion")
  145. }
  146. if err := os.Remove(e.endpoint); err != nil && !os.IsNotExist(err) {
  147. return err
  148. }
  149. return nil
  150. }