123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- /*
- Copyright 2018 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package pluginwatcher
- import (
- "context"
- "errors"
- "fmt"
- "net"
- "os"
- "sync"
- "time"
- "google.golang.org/grpc"
- "k8s.io/klog"
- registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
- "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
- v1beta1 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta1"
- v1beta2 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta2"
- )
- // examplePlugin is a sample plugin to work with plugin watcher
- type examplePlugin struct {
- grpcServer *grpc.Server
- wg sync.WaitGroup
- registrationStatus chan registerapi.RegistrationStatus // for testing
- endpoint string // for testing
- pluginName string
- pluginType string
- versions []string
- }
- type pluginServiceV1Beta1 struct {
- server *examplePlugin
- }
- func (s *pluginServiceV1Beta1) GetExampleInfo(ctx context.Context, rqt *v1beta1.ExampleRequest) (*v1beta1.ExampleResponse, error) {
- klog.Infof("GetExampleInfo v1beta1field: %s", rqt.V1Beta1Field)
- return &v1beta1.ExampleResponse{}, nil
- }
- func (s *pluginServiceV1Beta1) RegisterService() {
- v1beta1.RegisterExampleServer(s.server.grpcServer, s)
- }
- type pluginServiceV1Beta2 struct {
- server *examplePlugin
- }
- func (s *pluginServiceV1Beta2) GetExampleInfo(ctx context.Context, rqt *v1beta2.ExampleRequest) (*v1beta2.ExampleResponse, error) {
- klog.Infof("GetExampleInfo v1beta2_field: %s", rqt.V1Beta2Field)
- return &v1beta2.ExampleResponse{}, nil
- }
- func (s *pluginServiceV1Beta2) RegisterService() {
- v1beta2.RegisterExampleServer(s.server.grpcServer, s)
- }
- // NewExamplePlugin returns an initialized examplePlugin instance
- func NewExamplePlugin() *examplePlugin {
- return &examplePlugin{}
- }
- // NewTestExamplePlugin returns an initialized examplePlugin instance for testing
- func NewTestExamplePlugin(pluginName string, pluginType string, endpoint string, advertisedVersions ...string) *examplePlugin {
- return &examplePlugin{
- pluginName: pluginName,
- pluginType: pluginType,
- endpoint: endpoint,
- versions: advertisedVersions,
- registrationStatus: make(chan registerapi.RegistrationStatus),
- }
- }
- // GetPluginInfo returns a PluginInfo object
- func GetPluginInfo(plugin *examplePlugin) cache.PluginInfo {
- return cache.PluginInfo{
- SocketPath: plugin.endpoint,
- }
- }
- // GetInfo is the RPC invoked by plugin watcher
- func (e *examplePlugin) GetInfo(ctx context.Context, req *registerapi.InfoRequest) (*registerapi.PluginInfo, error) {
- return ®isterapi.PluginInfo{
- Type: e.pluginType,
- Name: e.pluginName,
- Endpoint: e.endpoint,
- SupportedVersions: e.versions,
- }, nil
- }
- func (e *examplePlugin) NotifyRegistrationStatus(ctx context.Context, status *registerapi.RegistrationStatus) (*registerapi.RegistrationStatusResponse, error) {
- klog.Errorf("Registration is: %v\n", status)
- if e.registrationStatus != nil {
- e.registrationStatus <- *status
- }
- return ®isterapi.RegistrationStatusResponse{}, nil
- }
- // Serve starts a pluginwatcher server and one or more of the plugin services
- func (e *examplePlugin) Serve(services ...string) error {
- klog.Infof("starting example server at: %s\n", e.endpoint)
- lis, err := net.Listen("unix", e.endpoint)
- if err != nil {
- return err
- }
- klog.Infof("example server started at: %s\n", e.endpoint)
- e.grpcServer = grpc.NewServer()
- // Registers kubelet plugin watcher api.
- registerapi.RegisterRegistrationServer(e.grpcServer, e)
- for _, service := range services {
- switch service {
- case "v1beta1":
- v1beta1 := &pluginServiceV1Beta1{server: e}
- v1beta1.RegisterService()
- case "v1beta2":
- v1beta2 := &pluginServiceV1Beta2{server: e}
- v1beta2.RegisterService()
- default:
- return fmt.Errorf("unsupported service: '%s'", service)
- }
- }
- // Starts service
- e.wg.Add(1)
- go func() {
- defer e.wg.Done()
- // Blocking call to accept incoming connections.
- if err := e.grpcServer.Serve(lis); err != nil {
- klog.Errorf("example server stopped serving: %v", err)
- }
- }()
- return nil
- }
- func (e *examplePlugin) Stop() error {
- klog.Infof("Stopping example server at: %s\n", e.endpoint)
- e.grpcServer.Stop()
- c := make(chan struct{})
- go func() {
- defer close(c)
- e.wg.Wait()
- }()
- select {
- case <-c:
- break
- case <-time.After(time.Second):
- return errors.New("timed out on waiting for stop completion")
- }
- if err := os.Remove(e.endpoint); err != nil && !os.IsNotExist(err) {
- return err
- }
- return nil
- }
|