scheduler_assume_cache.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduling
  14. import (
  15. "fmt"
  16. "strconv"
  17. "sync"
  18. "k8s.io/klog"
  19. "k8s.io/api/core/v1"
  20. "k8s.io/apimachinery/pkg/api/meta"
  21. "k8s.io/client-go/tools/cache"
  22. )
  23. // AssumeCache is a cache on top of the informer that allows for updating
  24. // objects outside of informer events and also restoring the informer
  25. // cache's version of the object. Objects are assumed to be
  26. // Kubernetes API objects that implement meta.Interface
  27. type AssumeCache interface {
  28. // Assume updates the object in-memory only
  29. Assume(obj interface{}) error
  30. // Restore the informer cache's version of the object
  31. Restore(objName string)
  32. // Get the object by name
  33. Get(objName string) (interface{}, error)
  34. // Get the API object by name
  35. GetAPIObj(objName string) (interface{}, error)
  36. // List all the objects in the cache
  37. List(indexObj interface{}) []interface{}
  38. }
  39. type errWrongType struct {
  40. typeName string
  41. object interface{}
  42. }
  43. func (e *errWrongType) Error() string {
  44. return fmt.Sprintf("could not convert object to type %v: %+v", e.typeName, e.object)
  45. }
  46. type errNotFound struct {
  47. typeName string
  48. objectName string
  49. }
  50. func (e *errNotFound) Error() string {
  51. return fmt.Sprintf("could not find %v %q", e.typeName, e.objectName)
  52. }
  53. type errObjectName struct {
  54. detailedErr error
  55. }
  56. func (e *errObjectName) Error() string {
  57. return fmt.Sprintf("failed to get object name: %v", e.detailedErr)
  58. }
  59. // assumeCache stores two pointers to represent a single object:
  60. // * The pointer to the informer object.
  61. // * The pointer to the latest object, which could be the same as
  62. // the informer object, or an in-memory object.
  63. //
  64. // An informer update always overrides the latest object pointer.
  65. //
  66. // Assume() only updates the latest object pointer.
  67. // Restore() sets the latest object pointer back to the informer object.
  68. // Get/List() always returns the latest object pointer.
  69. type assumeCache struct {
  70. // Synchronizes updates to store
  71. rwMutex sync.RWMutex
  72. // describes the object stored
  73. description string
  74. // Stores objInfo pointers
  75. store cache.Indexer
  76. // Index function for object
  77. indexFunc cache.IndexFunc
  78. indexName string
  79. }
  80. type objInfo struct {
  81. // name of the object
  82. name string
  83. // Latest version of object could be cached-only or from informer
  84. latestObj interface{}
  85. // Latest object from informer
  86. apiObj interface{}
  87. }
  88. func objInfoKeyFunc(obj interface{}) (string, error) {
  89. objInfo, ok := obj.(*objInfo)
  90. if !ok {
  91. return "", &errWrongType{"objInfo", obj}
  92. }
  93. return objInfo.name, nil
  94. }
  95. func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) {
  96. objInfo, ok := obj.(*objInfo)
  97. if !ok {
  98. return []string{""}, &errWrongType{"objInfo", obj}
  99. }
  100. return c.indexFunc(objInfo.latestObj)
  101. }
  102. // NewAssumeCache creates an assume cache for genernal objects.
  103. func NewAssumeCache(informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache {
  104. c := &assumeCache{
  105. description: description,
  106. indexFunc: indexFunc,
  107. indexName: indexName,
  108. }
  109. c.store = cache.NewIndexer(objInfoKeyFunc, cache.Indexers{indexName: c.objInfoIndexFunc})
  110. // Unit tests don't use informers
  111. if informer != nil {
  112. informer.AddEventHandler(
  113. cache.ResourceEventHandlerFuncs{
  114. AddFunc: c.add,
  115. UpdateFunc: c.update,
  116. DeleteFunc: c.delete,
  117. },
  118. )
  119. }
  120. return c
  121. }
  122. func (c *assumeCache) add(obj interface{}) {
  123. if obj == nil {
  124. return
  125. }
  126. name, err := cache.MetaNamespaceKeyFunc(obj)
  127. if err != nil {
  128. klog.Errorf("add failed: %v", &errObjectName{err})
  129. return
  130. }
  131. c.rwMutex.Lock()
  132. defer c.rwMutex.Unlock()
  133. if objInfo, _ := c.getObjInfo(name); objInfo != nil {
  134. newVersion, err := c.getObjVersion(name, obj)
  135. if err != nil {
  136. klog.Errorf("add: couldn't get object version: %v", err)
  137. return
  138. }
  139. storedVersion, err := c.getObjVersion(name, objInfo.latestObj)
  140. if err != nil {
  141. klog.Errorf("add: couldn't get stored object version: %v", err)
  142. return
  143. }
  144. // Only update object if version is newer.
  145. // This is so we don't override assumed objects due to informer resync.
  146. if newVersion <= storedVersion {
  147. klog.V(10).Infof("Skip adding %v %v to assume cache because version %v is not newer than %v", c.description, name, newVersion, storedVersion)
  148. return
  149. }
  150. }
  151. objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj}
  152. c.store.Update(objInfo)
  153. klog.V(10).Infof("Adding %v %v to assume cache: %+v ", c.description, name, obj)
  154. }
  155. func (c *assumeCache) update(oldObj interface{}, newObj interface{}) {
  156. c.add(newObj)
  157. }
  158. func (c *assumeCache) delete(obj interface{}) {
  159. if obj == nil {
  160. return
  161. }
  162. name, err := cache.MetaNamespaceKeyFunc(obj)
  163. if err != nil {
  164. klog.Errorf("delete failed: %v", &errObjectName{err})
  165. return
  166. }
  167. c.rwMutex.Lock()
  168. defer c.rwMutex.Unlock()
  169. objInfo := &objInfo{name: name}
  170. err = c.store.Delete(objInfo)
  171. if err != nil {
  172. klog.Errorf("delete: failed to delete %v %v: %v", c.description, name, err)
  173. }
  174. }
  175. func (c *assumeCache) getObjVersion(name string, obj interface{}) (int64, error) {
  176. objAccessor, err := meta.Accessor(obj)
  177. if err != nil {
  178. return -1, err
  179. }
  180. objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64)
  181. if err != nil {
  182. return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %s", objAccessor.GetResourceVersion(), c.description, name, err)
  183. }
  184. return objResourceVersion, nil
  185. }
  186. func (c *assumeCache) getObjInfo(name string) (*objInfo, error) {
  187. obj, ok, err := c.store.GetByKey(name)
  188. if err != nil {
  189. return nil, err
  190. }
  191. if !ok {
  192. return nil, &errNotFound{c.description, name}
  193. }
  194. objInfo, ok := obj.(*objInfo)
  195. if !ok {
  196. return nil, &errWrongType{"objInfo", obj}
  197. }
  198. return objInfo, nil
  199. }
  200. func (c *assumeCache) Get(objName string) (interface{}, error) {
  201. c.rwMutex.RLock()
  202. defer c.rwMutex.RUnlock()
  203. objInfo, err := c.getObjInfo(objName)
  204. if err != nil {
  205. return nil, err
  206. }
  207. return objInfo.latestObj, nil
  208. }
  209. func (c *assumeCache) GetAPIObj(objName string) (interface{}, error) {
  210. c.rwMutex.RLock()
  211. defer c.rwMutex.RUnlock()
  212. objInfo, err := c.getObjInfo(objName)
  213. if err != nil {
  214. return nil, err
  215. }
  216. return objInfo.apiObj, nil
  217. }
  218. func (c *assumeCache) List(indexObj interface{}) []interface{} {
  219. c.rwMutex.RLock()
  220. defer c.rwMutex.RUnlock()
  221. allObjs := []interface{}{}
  222. objs, err := c.store.Index(c.indexName, &objInfo{latestObj: indexObj})
  223. if err != nil {
  224. klog.Errorf("list index error: %v", err)
  225. return nil
  226. }
  227. for _, obj := range objs {
  228. objInfo, ok := obj.(*objInfo)
  229. if !ok {
  230. klog.Errorf("list error: %v", &errWrongType{"objInfo", obj})
  231. continue
  232. }
  233. allObjs = append(allObjs, objInfo.latestObj)
  234. }
  235. return allObjs
  236. }
  237. func (c *assumeCache) Assume(obj interface{}) error {
  238. name, err := cache.MetaNamespaceKeyFunc(obj)
  239. if err != nil {
  240. return &errObjectName{err}
  241. }
  242. c.rwMutex.Lock()
  243. defer c.rwMutex.Unlock()
  244. objInfo, err := c.getObjInfo(name)
  245. if err != nil {
  246. return err
  247. }
  248. newVersion, err := c.getObjVersion(name, obj)
  249. if err != nil {
  250. return err
  251. }
  252. storedVersion, err := c.getObjVersion(name, objInfo.latestObj)
  253. if err != nil {
  254. return err
  255. }
  256. if newVersion < storedVersion {
  257. return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion)
  258. }
  259. // Only update the cached object
  260. objInfo.latestObj = obj
  261. klog.V(4).Infof("Assumed %v %q, version %v", c.description, name, newVersion)
  262. return nil
  263. }
  264. func (c *assumeCache) Restore(objName string) {
  265. c.rwMutex.Lock()
  266. defer c.rwMutex.Unlock()
  267. objInfo, err := c.getObjInfo(objName)
  268. if err != nil {
  269. // This could be expected if object got deleted
  270. klog.V(5).Infof("Restore %v %q warning: %v", c.description, objName, err)
  271. } else {
  272. objInfo.latestObj = objInfo.apiObj
  273. klog.V(4).Infof("Restored %v %q", c.description, objName)
  274. }
  275. }
  276. // PVAssumeCache is a AssumeCache for PersistentVolume objects
  277. type PVAssumeCache interface {
  278. AssumeCache
  279. GetPV(pvName string) (*v1.PersistentVolume, error)
  280. GetAPIPV(pvName string) (*v1.PersistentVolume, error)
  281. ListPVs(storageClassName string) []*v1.PersistentVolume
  282. }
  283. type pvAssumeCache struct {
  284. AssumeCache
  285. }
  286. func pvStorageClassIndexFunc(obj interface{}) ([]string, error) {
  287. if pv, ok := obj.(*v1.PersistentVolume); ok {
  288. return []string{pv.Spec.StorageClassName}, nil
  289. }
  290. return []string{""}, fmt.Errorf("object is not a v1.PersistentVolume: %v", obj)
  291. }
  292. // NewPVAssumeCache creates a PV assume cache.
  293. func NewPVAssumeCache(informer cache.SharedIndexInformer) PVAssumeCache {
  294. return &pvAssumeCache{NewAssumeCache(informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc)}
  295. }
  296. func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
  297. obj, err := c.Get(pvName)
  298. if err != nil {
  299. return nil, err
  300. }
  301. pv, ok := obj.(*v1.PersistentVolume)
  302. if !ok {
  303. return nil, &errWrongType{"v1.PersistentVolume", obj}
  304. }
  305. return pv, nil
  306. }
  307. func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) {
  308. obj, err := c.GetAPIObj(pvName)
  309. if err != nil {
  310. return nil, err
  311. }
  312. pv, ok := obj.(*v1.PersistentVolume)
  313. if !ok {
  314. return nil, &errWrongType{"v1.PersistentVolume", obj}
  315. }
  316. return pv, nil
  317. }
  318. func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume {
  319. objs := c.List(&v1.PersistentVolume{
  320. Spec: v1.PersistentVolumeSpec{
  321. StorageClassName: storageClassName,
  322. },
  323. })
  324. pvs := []*v1.PersistentVolume{}
  325. for _, obj := range objs {
  326. pv, ok := obj.(*v1.PersistentVolume)
  327. if !ok {
  328. klog.Errorf("ListPVs: %v", &errWrongType{"v1.PersistentVolume", obj})
  329. }
  330. pvs = append(pvs, pv)
  331. }
  332. return pvs
  333. }
  334. // PVCAssumeCache is a AssumeCache for PersistentVolumeClaim objects
  335. type PVCAssumeCache interface {
  336. AssumeCache
  337. // GetPVC returns the PVC from the cache with given pvcKey.
  338. // pvcKey is the result of MetaNamespaceKeyFunc on PVC obj
  339. GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
  340. GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
  341. }
  342. type pvcAssumeCache struct {
  343. AssumeCache
  344. }
  345. // NewPVCAssumeCache creates a PVC assume cache.
  346. func NewPVCAssumeCache(informer cache.SharedIndexInformer) PVCAssumeCache {
  347. return &pvcAssumeCache{NewAssumeCache(informer, "v1.PersistentVolumeClaim", "namespace", cache.MetaNamespaceIndexFunc)}
  348. }
  349. func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
  350. obj, err := c.Get(pvcKey)
  351. if err != nil {
  352. return nil, err
  353. }
  354. pvc, ok := obj.(*v1.PersistentVolumeClaim)
  355. if !ok {
  356. return nil, &errWrongType{"v1.PersistentVolumeClaim", obj}
  357. }
  358. return pvc, nil
  359. }
  360. func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
  361. obj, err := c.GetAPIObj(pvcKey)
  362. if err != nil {
  363. return nil, err
  364. }
  365. pvc, ok := obj.(*v1.PersistentVolumeClaim)
  366. if !ok {
  367. return nil, &errWrongType{"v1.PersistentVolumeClaim", obj}
  368. }
  369. return pvc, nil
  370. }