pod_devices.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package devicemanager
  14. import (
  15. "k8s.io/klog"
  16. "k8s.io/apimachinery/pkg/util/sets"
  17. pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
  18. podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
  19. "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
  20. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  21. )
  22. type deviceAllocateInfo struct {
  23. // deviceIds contains device Ids allocated to this container for the given resourceName.
  24. deviceIds sets.String
  25. // allocResp contains cached rpc AllocateResponse.
  26. allocResp *pluginapi.ContainerAllocateResponse
  27. }
  28. type resourceAllocateInfo map[string]deviceAllocateInfo // Keyed by resourceName.
  29. type containerDevices map[string]resourceAllocateInfo // Keyed by containerName.
  30. type podDevices map[string]containerDevices // Keyed by podUID.
  31. func (pdev podDevices) pods() sets.String {
  32. ret := sets.NewString()
  33. for k := range pdev {
  34. ret.Insert(k)
  35. }
  36. return ret
  37. }
  38. func (pdev podDevices) insert(podUID, contName, resource string, devices sets.String, resp *pluginapi.ContainerAllocateResponse) {
  39. if _, podExists := pdev[podUID]; !podExists {
  40. pdev[podUID] = make(containerDevices)
  41. }
  42. if _, contExists := pdev[podUID][contName]; !contExists {
  43. pdev[podUID][contName] = make(resourceAllocateInfo)
  44. }
  45. pdev[podUID][contName][resource] = deviceAllocateInfo{
  46. deviceIds: devices,
  47. allocResp: resp,
  48. }
  49. }
  50. func (pdev podDevices) delete(pods []string) {
  51. for _, uid := range pods {
  52. delete(pdev, uid)
  53. }
  54. }
  55. // Returns list of device Ids allocated to the given container for the given resource.
  56. // Returns nil if we don't have cached state for the given <podUID, contName, resource>.
  57. func (pdev podDevices) containerDevices(podUID, contName, resource string) sets.String {
  58. if _, podExists := pdev[podUID]; !podExists {
  59. return nil
  60. }
  61. if _, contExists := pdev[podUID][contName]; !contExists {
  62. return nil
  63. }
  64. devs, resourceExists := pdev[podUID][contName][resource]
  65. if !resourceExists {
  66. return nil
  67. }
  68. return devs.deviceIds
  69. }
  70. // Populates allocatedResources with the device resources allocated to the specified <podUID, contName>.
  71. func (pdev podDevices) addContainerAllocatedResources(podUID, contName string, allocatedResources map[string]sets.String) {
  72. containers, exists := pdev[podUID]
  73. if !exists {
  74. return
  75. }
  76. resources, exists := containers[contName]
  77. if !exists {
  78. return
  79. }
  80. for resource, devices := range resources {
  81. allocatedResources[resource] = allocatedResources[resource].Union(devices.deviceIds)
  82. }
  83. }
  84. // Removes the device resources allocated to the specified <podUID, contName> from allocatedResources.
  85. func (pdev podDevices) removeContainerAllocatedResources(podUID, contName string, allocatedResources map[string]sets.String) {
  86. containers, exists := pdev[podUID]
  87. if !exists {
  88. return
  89. }
  90. resources, exists := containers[contName]
  91. if !exists {
  92. return
  93. }
  94. for resource, devices := range resources {
  95. allocatedResources[resource] = allocatedResources[resource].Difference(devices.deviceIds)
  96. }
  97. }
  98. // Returns all of devices allocated to the pods being tracked, keyed by resourceName.
  99. func (pdev podDevices) devices() map[string]sets.String {
  100. ret := make(map[string]sets.String)
  101. for _, containerDevices := range pdev {
  102. for _, resources := range containerDevices {
  103. for resource, devices := range resources {
  104. if _, exists := ret[resource]; !exists {
  105. ret[resource] = sets.NewString()
  106. }
  107. if devices.allocResp != nil {
  108. ret[resource] = ret[resource].Union(devices.deviceIds)
  109. }
  110. }
  111. }
  112. }
  113. return ret
  114. }
  115. // Turns podDevices to checkpointData.
  116. func (pdev podDevices) toCheckpointData() []checkpoint.PodDevicesEntry {
  117. var data []checkpoint.PodDevicesEntry
  118. for podUID, containerDevices := range pdev {
  119. for conName, resources := range containerDevices {
  120. for resource, devices := range resources {
  121. devIds := devices.deviceIds.UnsortedList()
  122. if devices.allocResp == nil {
  123. klog.Errorf("Can't marshal allocResp for %v %v %v: allocation response is missing", podUID, conName, resource)
  124. continue
  125. }
  126. allocResp, err := devices.allocResp.Marshal()
  127. if err != nil {
  128. klog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err)
  129. continue
  130. }
  131. data = append(data, checkpoint.PodDevicesEntry{
  132. PodUID: podUID,
  133. ContainerName: conName,
  134. ResourceName: resource,
  135. DeviceIDs: devIds,
  136. AllocResp: allocResp})
  137. }
  138. }
  139. }
  140. return data
  141. }
  142. // Populates podDevices from the passed in checkpointData.
  143. func (pdev podDevices) fromCheckpointData(data []checkpoint.PodDevicesEntry) {
  144. for _, entry := range data {
  145. klog.V(2).Infof("Get checkpoint entry: %v %v %v %v %v\n",
  146. entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, entry.AllocResp)
  147. devIDs := sets.NewString()
  148. for _, devID := range entry.DeviceIDs {
  149. devIDs.Insert(devID)
  150. }
  151. allocResp := &pluginapi.ContainerAllocateResponse{}
  152. err := allocResp.Unmarshal(entry.AllocResp)
  153. if err != nil {
  154. klog.Errorf("Can't unmarshal allocResp for %v %v %v: %v", entry.PodUID, entry.ContainerName, entry.ResourceName, err)
  155. continue
  156. }
  157. pdev.insert(entry.PodUID, entry.ContainerName, entry.ResourceName, devIDs, allocResp)
  158. }
  159. }
  160. // Returns combined container runtime settings to consume the container's allocated devices.
  161. func (pdev podDevices) deviceRunContainerOptions(podUID, contName string) *DeviceRunContainerOptions {
  162. containers, exists := pdev[podUID]
  163. if !exists {
  164. return nil
  165. }
  166. resources, exists := containers[contName]
  167. if !exists {
  168. return nil
  169. }
  170. opts := &DeviceRunContainerOptions{}
  171. // Maps to detect duplicate settings.
  172. devsMap := make(map[string]string)
  173. mountsMap := make(map[string]string)
  174. envsMap := make(map[string]string)
  175. annotationsMap := make(map[string]string)
  176. // Loops through AllocationResponses of all cached device resources.
  177. for _, devices := range resources {
  178. resp := devices.allocResp
  179. // Each Allocate response has the following artifacts.
  180. // Environment variables
  181. // Mount points
  182. // Device files
  183. // Container annotations
  184. // These artifacts are per resource per container.
  185. // Updates RunContainerOptions.Envs.
  186. for k, v := range resp.Envs {
  187. if e, ok := envsMap[k]; ok {
  188. klog.V(4).Infof("Skip existing env %s %s", k, v)
  189. if e != v {
  190. klog.Errorf("Environment variable %s has conflicting setting: %s and %s", k, e, v)
  191. }
  192. continue
  193. }
  194. klog.V(4).Infof("Add env %s %s", k, v)
  195. envsMap[k] = v
  196. opts.Envs = append(opts.Envs, kubecontainer.EnvVar{Name: k, Value: v})
  197. }
  198. // Updates RunContainerOptions.Devices.
  199. for _, dev := range resp.Devices {
  200. if d, ok := devsMap[dev.ContainerPath]; ok {
  201. klog.V(4).Infof("Skip existing device %s %s", dev.ContainerPath, dev.HostPath)
  202. if d != dev.HostPath {
  203. klog.Errorf("Container device %s has conflicting mapping host devices: %s and %s",
  204. dev.ContainerPath, d, dev.HostPath)
  205. }
  206. continue
  207. }
  208. klog.V(4).Infof("Add device %s %s", dev.ContainerPath, dev.HostPath)
  209. devsMap[dev.ContainerPath] = dev.HostPath
  210. opts.Devices = append(opts.Devices, kubecontainer.DeviceInfo{
  211. PathOnHost: dev.HostPath,
  212. PathInContainer: dev.ContainerPath,
  213. Permissions: dev.Permissions,
  214. })
  215. }
  216. // Updates RunContainerOptions.Mounts.
  217. for _, mount := range resp.Mounts {
  218. if m, ok := mountsMap[mount.ContainerPath]; ok {
  219. klog.V(4).Infof("Skip existing mount %s %s", mount.ContainerPath, mount.HostPath)
  220. if m != mount.HostPath {
  221. klog.Errorf("Container mount %s has conflicting mapping host mounts: %s and %s",
  222. mount.ContainerPath, m, mount.HostPath)
  223. }
  224. continue
  225. }
  226. klog.V(4).Infof("Add mount %s %s", mount.ContainerPath, mount.HostPath)
  227. mountsMap[mount.ContainerPath] = mount.HostPath
  228. opts.Mounts = append(opts.Mounts, kubecontainer.Mount{
  229. Name: mount.ContainerPath,
  230. ContainerPath: mount.ContainerPath,
  231. HostPath: mount.HostPath,
  232. ReadOnly: mount.ReadOnly,
  233. // TODO: This may need to be part of Device plugin API.
  234. SELinuxRelabel: false,
  235. })
  236. }
  237. // Updates for Annotations
  238. for k, v := range resp.Annotations {
  239. if e, ok := annotationsMap[k]; ok {
  240. klog.V(4).Infof("Skip existing annotation %s %s", k, v)
  241. if e != v {
  242. klog.Errorf("Annotation %s has conflicting setting: %s and %s", k, e, v)
  243. }
  244. continue
  245. }
  246. klog.V(4).Infof("Add annotation %s %s", k, v)
  247. annotationsMap[k] = v
  248. opts.Annotations = append(opts.Annotations, kubecontainer.Annotation{Name: k, Value: v})
  249. }
  250. }
  251. return opts
  252. }
  253. // getContainerDevices returns the devices assigned to the provided container for all ResourceNames
  254. func (pdev podDevices) getContainerDevices(podUID, contName string) []*podresourcesapi.ContainerDevices {
  255. if _, podExists := pdev[podUID]; !podExists {
  256. return nil
  257. }
  258. if _, contExists := pdev[podUID][contName]; !contExists {
  259. return nil
  260. }
  261. cDev := []*podresourcesapi.ContainerDevices{}
  262. for resource, allocateInfo := range pdev[podUID][contName] {
  263. cDev = append(cDev, &podresourcesapi.ContainerDevices{
  264. ResourceName: resource,
  265. DeviceIds: allocateInfo.deviceIds.UnsortedList(),
  266. })
  267. }
  268. return cDev
  269. }