example_plugin.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package pluginwatcher
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "net"
  19. "os"
  20. "sync"
  21. "time"
  22. "google.golang.org/grpc"
  23. "k8s.io/klog"
  24. registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
  25. "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
  26. v1beta1 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta1"
  27. v1beta2 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta2"
  28. )
  29. // examplePlugin is a sample plugin to work with plugin watcher
  30. type examplePlugin struct {
  31. grpcServer *grpc.Server
  32. wg sync.WaitGroup
  33. registrationStatus chan registerapi.RegistrationStatus // for testing
  34. endpoint string // for testing
  35. pluginName string
  36. pluginType string
  37. versions []string
  38. }
  39. type pluginServiceV1Beta1 struct {
  40. server *examplePlugin
  41. }
  42. func (s *pluginServiceV1Beta1) GetExampleInfo(ctx context.Context, rqt *v1beta1.ExampleRequest) (*v1beta1.ExampleResponse, error) {
  43. klog.Infof("GetExampleInfo v1beta1field: %s", rqt.V1Beta1Field)
  44. return &v1beta1.ExampleResponse{}, nil
  45. }
  46. func (s *pluginServiceV1Beta1) RegisterService() {
  47. v1beta1.RegisterExampleServer(s.server.grpcServer, s)
  48. }
  49. type pluginServiceV1Beta2 struct {
  50. server *examplePlugin
  51. }
  52. func (s *pluginServiceV1Beta2) GetExampleInfo(ctx context.Context, rqt *v1beta2.ExampleRequest) (*v1beta2.ExampleResponse, error) {
  53. klog.Infof("GetExampleInfo v1beta2_field: %s", rqt.V1Beta2Field)
  54. return &v1beta2.ExampleResponse{}, nil
  55. }
  56. func (s *pluginServiceV1Beta2) RegisterService() {
  57. v1beta2.RegisterExampleServer(s.server.grpcServer, s)
  58. }
  59. // NewExamplePlugin returns an initialized examplePlugin instance
  60. func NewExamplePlugin() *examplePlugin {
  61. return &examplePlugin{}
  62. }
  63. // NewTestExamplePlugin returns an initialized examplePlugin instance for testing
  64. func NewTestExamplePlugin(pluginName string, pluginType string, endpoint string, advertisedVersions ...string) *examplePlugin {
  65. return &examplePlugin{
  66. pluginName: pluginName,
  67. pluginType: pluginType,
  68. endpoint: endpoint,
  69. versions: advertisedVersions,
  70. registrationStatus: make(chan registerapi.RegistrationStatus),
  71. }
  72. }
  73. // GetPluginInfo returns a PluginInfo object
  74. func GetPluginInfo(plugin *examplePlugin) cache.PluginInfo {
  75. return cache.PluginInfo{
  76. SocketPath: plugin.endpoint,
  77. }
  78. }
  79. // GetInfo is the RPC invoked by plugin watcher
  80. func (e *examplePlugin) GetInfo(ctx context.Context, req *registerapi.InfoRequest) (*registerapi.PluginInfo, error) {
  81. return &registerapi.PluginInfo{
  82. Type: e.pluginType,
  83. Name: e.pluginName,
  84. Endpoint: e.endpoint,
  85. SupportedVersions: e.versions,
  86. }, nil
  87. }
  88. func (e *examplePlugin) NotifyRegistrationStatus(ctx context.Context, status *registerapi.RegistrationStatus) (*registerapi.RegistrationStatusResponse, error) {
  89. klog.Errorf("Registration is: %v\n", status)
  90. if e.registrationStatus != nil {
  91. e.registrationStatus <- *status
  92. }
  93. return &registerapi.RegistrationStatusResponse{}, nil
  94. }
  95. // Serve starts a pluginwatcher server and one or more of the plugin services
  96. func (e *examplePlugin) Serve(services ...string) error {
  97. klog.Infof("starting example server at: %s\n", e.endpoint)
  98. lis, err := net.Listen("unix", e.endpoint)
  99. if err != nil {
  100. return err
  101. }
  102. klog.Infof("example server started at: %s\n", e.endpoint)
  103. e.grpcServer = grpc.NewServer()
  104. // Registers kubelet plugin watcher api.
  105. registerapi.RegisterRegistrationServer(e.grpcServer, e)
  106. for _, service := range services {
  107. switch service {
  108. case "v1beta1":
  109. v1beta1 := &pluginServiceV1Beta1{server: e}
  110. v1beta1.RegisterService()
  111. case "v1beta2":
  112. v1beta2 := &pluginServiceV1Beta2{server: e}
  113. v1beta2.RegisterService()
  114. default:
  115. return fmt.Errorf("unsupported service: '%s'", service)
  116. }
  117. }
  118. // Starts service
  119. e.wg.Add(1)
  120. go func() {
  121. defer e.wg.Done()
  122. // Blocking call to accept incoming connections.
  123. if err := e.grpcServer.Serve(lis); err != nil {
  124. klog.Errorf("example server stopped serving: %v", err)
  125. }
  126. }()
  127. return nil
  128. }
  129. func (e *examplePlugin) Stop() error {
  130. klog.Infof("Stopping example server at: %s\n", e.endpoint)
  131. e.grpcServer.Stop()
  132. c := make(chan struct{})
  133. go func() {
  134. defer close(c)
  135. e.wg.Wait()
  136. }()
  137. select {
  138. case <-c:
  139. break
  140. case <-time.After(time.Second):
  141. return errors.New("timed out on waiting for stop completion")
  142. }
  143. if err := os.Remove(e.endpoint); err != nil && !os.IsNotExist(err) {
  144. return err
  145. }
  146. return nil
  147. }