fake_client.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package fake
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "io/ioutil"
  19. "os"
  20. "strings"
  21. csipb "github.com/container-storage-interface/spec/lib/go/csi"
  22. "google.golang.org/grpc"
  23. "google.golang.org/grpc/codes"
  24. "google.golang.org/grpc/status"
  25. )
  26. const (
  27. // NodePublishTimeout_VolumeID is volume id that will result in NodePublish operation to timeout
  28. NodePublishTimeOut_VolumeID = "node-publish-timeout"
  29. // NodeStageTimeOut_VolumeID is a volume id that will result in NodeStage operation to timeout
  30. NodeStageTimeOut_VolumeID = "node-stage-timeout"
  31. )
  32. // IdentityClient is a CSI identity client used for testing
  33. type IdentityClient struct {
  34. nextErr error
  35. }
  36. // NewIdentityClient returns a new IdentityClient
  37. func NewIdentityClient() *IdentityClient {
  38. return &IdentityClient{}
  39. }
  40. // SetNextError injects expected error
  41. func (f *IdentityClient) SetNextError(err error) {
  42. f.nextErr = err
  43. }
  44. // GetPluginInfo returns plugin info
  45. func (f *IdentityClient) GetPluginInfo(ctx context.Context, in *csipb.GetPluginInfoRequest, opts ...grpc.CallOption) (*csipb.GetPluginInfoResponse, error) {
  46. return nil, nil
  47. }
  48. // GetPluginCapabilities implements csi method
  49. func (f *IdentityClient) GetPluginCapabilities(ctx context.Context, in *csipb.GetPluginCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.GetPluginCapabilitiesResponse, error) {
  50. return nil, nil
  51. }
  52. // Probe implements csi method
  53. func (f *IdentityClient) Probe(ctx context.Context, in *csipb.ProbeRequest, opts ...grpc.CallOption) (*csipb.ProbeResponse, error) {
  54. return nil, nil
  55. }
  56. type CSIVolume struct {
  57. VolumeHandle string
  58. VolumeContext map[string]string
  59. Path string
  60. DeviceMountPath string
  61. FSType string
  62. MountFlags []string
  63. }
  64. // NodeClient returns CSI node client
  65. type NodeClient struct {
  66. nodePublishedVolumes map[string]CSIVolume
  67. nodeStagedVolumes map[string]CSIVolume
  68. stageUnstageSet bool
  69. expansionSet bool
  70. volumeStatsSet bool
  71. nodeGetInfoResp *csipb.NodeGetInfoResponse
  72. nodeVolumeStatsResp *csipb.NodeGetVolumeStatsResponse
  73. nextErr error
  74. }
  75. // NewNodeClient returns fake node client
  76. func NewNodeClient(stageUnstageSet bool) *NodeClient {
  77. return &NodeClient{
  78. nodePublishedVolumes: make(map[string]CSIVolume),
  79. nodeStagedVolumes: make(map[string]CSIVolume),
  80. stageUnstageSet: stageUnstageSet,
  81. volumeStatsSet: true,
  82. }
  83. }
  84. func NewNodeClientWithExpansion(stageUnstageSet bool, expansionSet bool) *NodeClient {
  85. return &NodeClient{
  86. nodePublishedVolumes: make(map[string]CSIVolume),
  87. nodeStagedVolumes: make(map[string]CSIVolume),
  88. stageUnstageSet: stageUnstageSet,
  89. expansionSet: expansionSet,
  90. }
  91. }
  92. func NewNodeClientWithVolumeStats(volumeStatsSet bool) *NodeClient {
  93. return &NodeClient{
  94. volumeStatsSet: volumeStatsSet,
  95. }
  96. }
  97. // SetNextError injects next expected error
  98. func (f *NodeClient) SetNextError(err error) {
  99. f.nextErr = err
  100. }
  101. func (f *NodeClient) SetNodeGetInfoResp(resp *csipb.NodeGetInfoResponse) {
  102. f.nodeGetInfoResp = resp
  103. }
  104. func (f *NodeClient) SetNodeVolumeStatsResp(resp *csipb.NodeGetVolumeStatsResponse) {
  105. f.nodeVolumeStatsResp = resp
  106. }
  107. // GetNodePublishedVolumes returns node published volumes
  108. func (f *NodeClient) GetNodePublishedVolumes() map[string]CSIVolume {
  109. return f.nodePublishedVolumes
  110. }
  111. // AddNodePublishedVolume adds specified volume to nodePublishedVolumes
  112. func (f *NodeClient) AddNodePublishedVolume(volID, deviceMountPath string, volumeContext map[string]string) {
  113. f.nodePublishedVolumes[volID] = CSIVolume{
  114. Path: deviceMountPath,
  115. VolumeContext: volumeContext,
  116. }
  117. }
  118. // GetNodeStagedVolumes returns node staged volumes
  119. func (f *NodeClient) GetNodeStagedVolumes() map[string]CSIVolume {
  120. return f.nodeStagedVolumes
  121. }
  122. // AddNodeStagedVolume adds specified volume to nodeStagedVolumes
  123. func (f *NodeClient) AddNodeStagedVolume(volID, deviceMountPath string, volumeContext map[string]string) {
  124. f.nodeStagedVolumes[volID] = CSIVolume{
  125. Path: deviceMountPath,
  126. VolumeContext: volumeContext,
  127. }
  128. }
  129. // NodePublishVolume implements CSI NodePublishVolume
  130. func (f *NodeClient) NodePublishVolume(ctx context.Context, req *csipb.NodePublishVolumeRequest, opts ...grpc.CallOption) (*csipb.NodePublishVolumeResponse, error) {
  131. if f.nextErr != nil {
  132. return nil, f.nextErr
  133. }
  134. if req.GetVolumeId() == "" {
  135. return nil, errors.New("missing volume id")
  136. }
  137. if req.GetTargetPath() == "" {
  138. return nil, errors.New("missing target path")
  139. }
  140. fsTypes := "block|ext4|xfs|zfs"
  141. fsType := req.GetVolumeCapability().GetMount().GetFsType()
  142. if !strings.Contains(fsTypes, fsType) {
  143. return nil, errors.New("invalid fstype")
  144. }
  145. if req.GetVolumeId() == NodePublishTimeOut_VolumeID {
  146. timeoutErr := status.Errorf(codes.DeadlineExceeded, "timeout exceeded")
  147. return nil, timeoutErr
  148. }
  149. // "Creation of target_path is the responsibility of the SP."
  150. // Our plugin depends on it.
  151. if req.VolumeCapability.GetBlock() != nil {
  152. if err := ioutil.WriteFile(req.TargetPath, []byte{}, 0644); err != nil {
  153. return nil, fmt.Errorf("cannot create target path %s for block file: %s", req.TargetPath, err)
  154. }
  155. } else {
  156. if err := os.MkdirAll(req.TargetPath, 0755); err != nil {
  157. return nil, fmt.Errorf("cannot create target directory %s for mount: %s", req.TargetPath, err)
  158. }
  159. }
  160. publishedVolume := CSIVolume{
  161. VolumeHandle: req.GetVolumeId(),
  162. Path: req.GetTargetPath(),
  163. DeviceMountPath: req.GetStagingTargetPath(),
  164. VolumeContext: req.GetVolumeContext(),
  165. }
  166. if req.GetVolumeCapability().GetMount() != nil {
  167. publishedVolume.FSType = req.GetVolumeCapability().GetMount().FsType
  168. publishedVolume.MountFlags = req.GetVolumeCapability().GetMount().MountFlags
  169. }
  170. f.nodePublishedVolumes[req.GetVolumeId()] = publishedVolume
  171. return &csipb.NodePublishVolumeResponse{}, nil
  172. }
  173. // NodeUnpublishVolume implements csi method
  174. func (f *NodeClient) NodeUnpublishVolume(ctx context.Context, req *csipb.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeUnpublishVolumeResponse, error) {
  175. if f.nextErr != nil {
  176. return nil, f.nextErr
  177. }
  178. if req.GetVolumeId() == "" {
  179. return nil, errors.New("missing volume id")
  180. }
  181. if req.GetTargetPath() == "" {
  182. return nil, errors.New("missing target path")
  183. }
  184. delete(f.nodePublishedVolumes, req.GetVolumeId())
  185. // "The SP MUST delete the file or directory it created at this path."
  186. if err := os.Remove(req.TargetPath); err != nil && !os.IsNotExist(err) {
  187. return nil, fmt.Errorf("failed to remove publish path %s: %s", req.TargetPath, err)
  188. }
  189. return &csipb.NodeUnpublishVolumeResponse{}, nil
  190. }
  191. // NodeStagevolume implements csi method
  192. func (f *NodeClient) NodeStageVolume(ctx context.Context, req *csipb.NodeStageVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeStageVolumeResponse, error) {
  193. if f.nextErr != nil {
  194. return nil, f.nextErr
  195. }
  196. if req.GetVolumeId() == "" {
  197. return nil, errors.New("missing volume id")
  198. }
  199. if req.GetStagingTargetPath() == "" {
  200. return nil, errors.New("missing staging target path")
  201. }
  202. csiVol := CSIVolume{
  203. Path: req.GetStagingTargetPath(),
  204. VolumeContext: req.GetVolumeContext(),
  205. }
  206. fsType := ""
  207. fsTypes := "block|ext4|xfs|zfs"
  208. mounted := req.GetVolumeCapability().GetMount()
  209. if mounted != nil {
  210. fsType = mounted.GetFsType()
  211. csiVol.MountFlags = mounted.GetMountFlags()
  212. }
  213. if !strings.Contains(fsTypes, fsType) {
  214. return nil, errors.New("invalid fstype")
  215. }
  216. if req.GetVolumeId() == NodeStageTimeOut_VolumeID {
  217. timeoutErr := status.Errorf(codes.DeadlineExceeded, "timeout exceeded")
  218. return nil, timeoutErr
  219. }
  220. f.nodeStagedVolumes[req.GetVolumeId()] = csiVol
  221. return &csipb.NodeStageVolumeResponse{}, nil
  222. }
  223. // NodeUnstageVolume implements csi method
  224. func (f *NodeClient) NodeUnstageVolume(ctx context.Context, req *csipb.NodeUnstageVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeUnstageVolumeResponse, error) {
  225. if f.nextErr != nil {
  226. return nil, f.nextErr
  227. }
  228. if req.GetVolumeId() == "" {
  229. return nil, errors.New("missing volume id")
  230. }
  231. if req.GetStagingTargetPath() == "" {
  232. return nil, errors.New("missing staging target path")
  233. }
  234. delete(f.nodeStagedVolumes, req.GetVolumeId())
  235. return &csipb.NodeUnstageVolumeResponse{}, nil
  236. }
  237. // NodeExpandVolume implements csi method
  238. func (f *NodeClient) NodeExpandVolume(ctx context.Context, req *csipb.NodeExpandVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeExpandVolumeResponse, error) {
  239. if f.nextErr != nil {
  240. return nil, f.nextErr
  241. }
  242. if req.GetVolumeId() == "" {
  243. return nil, errors.New("missing volume id")
  244. }
  245. if req.GetVolumePath() == "" {
  246. return nil, errors.New("missing volume path")
  247. }
  248. if req.GetCapacityRange().RequiredBytes <= 0 {
  249. return nil, errors.New("required bytes should be greater than 0")
  250. }
  251. resp := &csipb.NodeExpandVolumeResponse{
  252. CapacityBytes: req.GetCapacityRange().RequiredBytes,
  253. }
  254. return resp, nil
  255. }
  256. // NodeGetId implements csi method
  257. func (f *NodeClient) NodeGetInfo(ctx context.Context, in *csipb.NodeGetInfoRequest, opts ...grpc.CallOption) (*csipb.NodeGetInfoResponse, error) {
  258. if f.nextErr != nil {
  259. return nil, f.nextErr
  260. }
  261. return f.nodeGetInfoResp, nil
  262. }
  263. // NodeGetCapabilities implements csi method
  264. func (f *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipb.NodeGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.NodeGetCapabilitiesResponse, error) {
  265. resp := &csipb.NodeGetCapabilitiesResponse{
  266. Capabilities: []*csipb.NodeServiceCapability{},
  267. }
  268. if f.stageUnstageSet {
  269. resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{
  270. Type: &csipb.NodeServiceCapability_Rpc{
  271. Rpc: &csipb.NodeServiceCapability_RPC{
  272. Type: csipb.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
  273. },
  274. },
  275. })
  276. }
  277. if f.expansionSet {
  278. resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{
  279. Type: &csipb.NodeServiceCapability_Rpc{
  280. Rpc: &csipb.NodeServiceCapability_RPC{
  281. Type: csipb.NodeServiceCapability_RPC_EXPAND_VOLUME,
  282. },
  283. },
  284. })
  285. }
  286. if f.volumeStatsSet {
  287. resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{
  288. Type: &csipb.NodeServiceCapability_Rpc{
  289. Rpc: &csipb.NodeServiceCapability_RPC{
  290. Type: csipb.NodeServiceCapability_RPC_GET_VOLUME_STATS,
  291. },
  292. },
  293. })
  294. }
  295. return resp, nil
  296. }
  297. /*
  298. // NodeGetVolumeStats implements csi method
  299. func (f *NodeClient) NodeGetVolumeStats(ctx context.Context, in *csipb.NodeGetVolumeStatsRequest, opts ...grpc.CallOption) (*csipb.NodeGetVolumeStatsResponse, error) {
  300. return nil, nil
  301. }
  302. */
  303. // NodeGetVolumeStats implements csi method
  304. func (f *NodeClient) NodeGetVolumeStats(ctx context.Context, req *csipb.NodeGetVolumeStatsRequest, opts ...grpc.CallOption) (*csipb.NodeGetVolumeStatsResponse, error) {
  305. if f.nextErr != nil {
  306. return nil, f.nextErr
  307. }
  308. if req.GetVolumeId() == "" {
  309. return nil, errors.New("missing volume id")
  310. }
  311. if req.GetVolumePath() == "" {
  312. return nil, errors.New("missing Volume path")
  313. }
  314. if f.nodeVolumeStatsResp != nil {
  315. return f.nodeVolumeStatsResp, nil
  316. }
  317. return &csipb.NodeGetVolumeStatsResponse{}, nil
  318. }
  319. // ControllerClient represents a CSI Controller client
  320. type ControllerClient struct {
  321. nextCapabilities []*csipb.ControllerServiceCapability
  322. nextErr error
  323. }
  324. // NewControllerClient returns a ControllerClient
  325. func NewControllerClient() *ControllerClient {
  326. return &ControllerClient{}
  327. }
  328. // SetNextError injects next expected error
  329. func (f *ControllerClient) SetNextError(err error) {
  330. f.nextErr = err
  331. }
  332. // SetNextCapabilities injects next expected capabilities
  333. func (f *ControllerClient) SetNextCapabilities(caps []*csipb.ControllerServiceCapability) {
  334. f.nextCapabilities = caps
  335. }
  336. // ControllerGetCapabilities implements csi method
  337. func (f *ControllerClient) ControllerGetCapabilities(ctx context.Context, in *csipb.ControllerGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.ControllerGetCapabilitiesResponse, error) {
  338. if f.nextErr != nil {
  339. return nil, f.nextErr
  340. }
  341. if f.nextCapabilities == nil {
  342. f.nextCapabilities = []*csipb.ControllerServiceCapability{
  343. {
  344. Type: &csipb.ControllerServiceCapability_Rpc{
  345. Rpc: &csipb.ControllerServiceCapability_RPC{
  346. Type: csipb.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
  347. },
  348. },
  349. },
  350. }
  351. }
  352. return &csipb.ControllerGetCapabilitiesResponse{
  353. Capabilities: f.nextCapabilities,
  354. }, nil
  355. }
  356. // CreateVolume implements csi method
  357. func (f *ControllerClient) CreateVolume(ctx context.Context, in *csipb.CreateVolumeRequest, opts ...grpc.CallOption) (*csipb.CreateVolumeResponse, error) {
  358. return nil, nil
  359. }
  360. // DeleteVolume implements csi method
  361. func (f *ControllerClient) DeleteVolume(ctx context.Context, in *csipb.DeleteVolumeRequest, opts ...grpc.CallOption) (*csipb.DeleteVolumeResponse, error) {
  362. return nil, nil
  363. }
  364. // ControllerPublishVolume implements csi method
  365. func (f *ControllerClient) ControllerPublishVolume(ctx context.Context, in *csipb.ControllerPublishVolumeRequest, opts ...grpc.CallOption) (*csipb.ControllerPublishVolumeResponse, error) {
  366. return nil, nil
  367. }
  368. // ControllerUnpublishVolume implements csi method
  369. func (f *ControllerClient) ControllerUnpublishVolume(ctx context.Context, in *csipb.ControllerUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipb.ControllerUnpublishVolumeResponse, error) {
  370. return nil, nil
  371. }
  372. // ValidateVolumeCapabilities implements csi method
  373. func (f *ControllerClient) ValidateVolumeCapabilities(ctx context.Context, in *csipb.ValidateVolumeCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.ValidateVolumeCapabilitiesResponse, error) {
  374. return nil, nil
  375. }
  376. // ListVolumes implements csi method
  377. func (f *ControllerClient) ListVolumes(ctx context.Context, in *csipb.ListVolumesRequest, opts ...grpc.CallOption) (*csipb.ListVolumesResponse, error) {
  378. return nil, nil
  379. }
  380. // GetCapacity implements csi method
  381. func (f *ControllerClient) GetCapacity(ctx context.Context, in *csipb.GetCapacityRequest, opts ...grpc.CallOption) (*csipb.GetCapacityResponse, error) {
  382. return nil, nil
  383. }