scheduler_assume_cache.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduling
  14. import (
  15. "fmt"
  16. "strconv"
  17. "sync"
  18. "k8s.io/klog"
  19. "k8s.io/api/core/v1"
  20. "k8s.io/apimachinery/pkg/api/meta"
  21. "k8s.io/client-go/tools/cache"
  22. )
  23. // AssumeCache is a cache on top of the informer that allows for updating
  24. // objects outside of informer events and also restoring the informer
  25. // cache's version of the object. Objects are assumed to be
  26. // Kubernetes API objects that implement meta.Interface
  27. type AssumeCache interface {
  28. // Assume updates the object in-memory only
  29. Assume(obj interface{}) error
  30. // Restore the informer cache's version of the object
  31. Restore(objName string)
  32. // Get the object by name
  33. Get(objName string) (interface{}, error)
  34. // Get the API object by name
  35. GetAPIObj(objName string) (interface{}, error)
  36. // List all the objects in the cache
  37. List(indexObj interface{}) []interface{}
  38. }
  39. type errWrongType struct {
  40. typeName string
  41. object interface{}
  42. }
  43. func (e *errWrongType) Error() string {
  44. return fmt.Sprintf("could not convert object to type %v: %+v", e.typeName, e.object)
  45. }
  46. type errNotFound struct {
  47. typeName string
  48. objectName string
  49. }
  50. func (e *errNotFound) Error() string {
  51. return fmt.Sprintf("could not find %v %q", e.typeName, e.objectName)
  52. }
  53. type errObjectName struct {
  54. detailedErr error
  55. }
  56. func (e *errObjectName) Error() string {
  57. return fmt.Sprintf("failed to get object name: %v", e.detailedErr)
  58. }
  59. // assumeCache stores two pointers to represent a single object:
  60. // * The pointer to the informer object.
  61. // * The pointer to the latest object, which could be the same as
  62. // the informer object, or an in-memory object.
  63. //
  64. // An informer update always overrides the latest object pointer.
  65. //
  66. // Assume() only updates the latest object pointer.
  67. // Restore() sets the latest object pointer back to the informer object.
  68. // Get/List() always returns the latest object pointer.
  69. type assumeCache struct {
  70. // Synchronizes updates to store
  71. rwMutex sync.RWMutex
  72. // describes the object stored
  73. description string
  74. // Stores objInfo pointers
  75. store cache.Indexer
  76. // Index function for object
  77. indexFunc cache.IndexFunc
  78. indexName string
  79. }
  80. type objInfo struct {
  81. // name of the object
  82. name string
  83. // Latest version of object could be cached-only or from informer
  84. latestObj interface{}
  85. // Latest object from informer
  86. apiObj interface{}
  87. }
  88. func objInfoKeyFunc(obj interface{}) (string, error) {
  89. objInfo, ok := obj.(*objInfo)
  90. if !ok {
  91. return "", &errWrongType{"objInfo", obj}
  92. }
  93. return objInfo.name, nil
  94. }
  95. func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) {
  96. objInfo, ok := obj.(*objInfo)
  97. if !ok {
  98. return []string{""}, &errWrongType{"objInfo", obj}
  99. }
  100. return c.indexFunc(objInfo.latestObj)
  101. }
  102. // NewAssumeCache creates an assume cache for genernal objects.
  103. func NewAssumeCache(informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache {
  104. c := &assumeCache{
  105. description: description,
  106. indexFunc: indexFunc,
  107. indexName: indexName,
  108. }
  109. c.store = cache.NewIndexer(objInfoKeyFunc, cache.Indexers{indexName: c.objInfoIndexFunc})
  110. // Unit tests don't use informers
  111. if informer != nil {
  112. informer.AddEventHandler(
  113. cache.ResourceEventHandlerFuncs{
  114. AddFunc: c.add,
  115. UpdateFunc: c.update,
  116. DeleteFunc: c.delete,
  117. },
  118. )
  119. }
  120. return c
  121. }
  122. func (c *assumeCache) add(obj interface{}) {
  123. if obj == nil {
  124. return
  125. }
  126. name, err := cache.MetaNamespaceKeyFunc(obj)
  127. if err != nil {
  128. klog.Errorf("add failed: %v", &errObjectName{err})
  129. return
  130. }
  131. c.rwMutex.Lock()
  132. defer c.rwMutex.Unlock()
  133. if objInfo, _ := c.getObjInfo(name); objInfo != nil {
  134. newVersion, err := c.getObjVersion(name, obj)
  135. if err != nil {
  136. klog.Errorf("add: couldn't get object version: %v", err)
  137. return
  138. }
  139. storedVersion, err := c.getObjVersion(name, objInfo.latestObj)
  140. if err != nil {
  141. klog.Errorf("add: couldn't get stored object version: %v", err)
  142. return
  143. }
  144. // Only update object if version is newer.
  145. // This is so we don't override assumed objects due to informer resync.
  146. if newVersion <= storedVersion {
  147. klog.V(10).Infof("Skip adding %v %v to assume cache because version %v is not newer than %v", c.description, name, newVersion, storedVersion)
  148. return
  149. }
  150. }
  151. objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj}
  152. if err = c.store.Update(objInfo); err != nil {
  153. klog.Warningf("got error when updating stored object : %v", err)
  154. } else {
  155. klog.V(10).Infof("Adding %v %v to assume cache: %+v ", c.description, name, obj)
  156. }
  157. }
  158. func (c *assumeCache) update(oldObj interface{}, newObj interface{}) {
  159. c.add(newObj)
  160. }
  161. func (c *assumeCache) delete(obj interface{}) {
  162. if obj == nil {
  163. return
  164. }
  165. name, err := cache.MetaNamespaceKeyFunc(obj)
  166. if err != nil {
  167. klog.Errorf("delete failed: %v", &errObjectName{err})
  168. return
  169. }
  170. c.rwMutex.Lock()
  171. defer c.rwMutex.Unlock()
  172. objInfo := &objInfo{name: name}
  173. err = c.store.Delete(objInfo)
  174. if err != nil {
  175. klog.Errorf("delete: failed to delete %v %v: %v", c.description, name, err)
  176. }
  177. }
  178. func (c *assumeCache) getObjVersion(name string, obj interface{}) (int64, error) {
  179. objAccessor, err := meta.Accessor(obj)
  180. if err != nil {
  181. return -1, err
  182. }
  183. objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64)
  184. if err != nil {
  185. return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %s", objAccessor.GetResourceVersion(), c.description, name, err)
  186. }
  187. return objResourceVersion, nil
  188. }
  189. func (c *assumeCache) getObjInfo(name string) (*objInfo, error) {
  190. obj, ok, err := c.store.GetByKey(name)
  191. if err != nil {
  192. return nil, err
  193. }
  194. if !ok {
  195. return nil, &errNotFound{c.description, name}
  196. }
  197. objInfo, ok := obj.(*objInfo)
  198. if !ok {
  199. return nil, &errWrongType{"objInfo", obj}
  200. }
  201. return objInfo, nil
  202. }
  203. func (c *assumeCache) Get(objName string) (interface{}, error) {
  204. c.rwMutex.RLock()
  205. defer c.rwMutex.RUnlock()
  206. objInfo, err := c.getObjInfo(objName)
  207. if err != nil {
  208. return nil, err
  209. }
  210. return objInfo.latestObj, nil
  211. }
  212. func (c *assumeCache) GetAPIObj(objName string) (interface{}, error) {
  213. c.rwMutex.RLock()
  214. defer c.rwMutex.RUnlock()
  215. objInfo, err := c.getObjInfo(objName)
  216. if err != nil {
  217. return nil, err
  218. }
  219. return objInfo.apiObj, nil
  220. }
  221. func (c *assumeCache) List(indexObj interface{}) []interface{} {
  222. c.rwMutex.RLock()
  223. defer c.rwMutex.RUnlock()
  224. allObjs := []interface{}{}
  225. objs, err := c.store.Index(c.indexName, &objInfo{latestObj: indexObj})
  226. if err != nil {
  227. klog.Errorf("list index error: %v", err)
  228. return nil
  229. }
  230. for _, obj := range objs {
  231. objInfo, ok := obj.(*objInfo)
  232. if !ok {
  233. klog.Errorf("list error: %v", &errWrongType{"objInfo", obj})
  234. continue
  235. }
  236. allObjs = append(allObjs, objInfo.latestObj)
  237. }
  238. return allObjs
  239. }
  240. func (c *assumeCache) Assume(obj interface{}) error {
  241. name, err := cache.MetaNamespaceKeyFunc(obj)
  242. if err != nil {
  243. return &errObjectName{err}
  244. }
  245. c.rwMutex.Lock()
  246. defer c.rwMutex.Unlock()
  247. objInfo, err := c.getObjInfo(name)
  248. if err != nil {
  249. return err
  250. }
  251. newVersion, err := c.getObjVersion(name, obj)
  252. if err != nil {
  253. return err
  254. }
  255. storedVersion, err := c.getObjVersion(name, objInfo.latestObj)
  256. if err != nil {
  257. return err
  258. }
  259. if newVersion < storedVersion {
  260. return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion)
  261. }
  262. // Only update the cached object
  263. objInfo.latestObj = obj
  264. klog.V(4).Infof("Assumed %v %q, version %v", c.description, name, newVersion)
  265. return nil
  266. }
  267. func (c *assumeCache) Restore(objName string) {
  268. c.rwMutex.Lock()
  269. defer c.rwMutex.Unlock()
  270. objInfo, err := c.getObjInfo(objName)
  271. if err != nil {
  272. // This could be expected if object got deleted
  273. klog.V(5).Infof("Restore %v %q warning: %v", c.description, objName, err)
  274. } else {
  275. objInfo.latestObj = objInfo.apiObj
  276. klog.V(4).Infof("Restored %v %q", c.description, objName)
  277. }
  278. }
  279. // PVAssumeCache is a AssumeCache for PersistentVolume objects
  280. type PVAssumeCache interface {
  281. AssumeCache
  282. GetPV(pvName string) (*v1.PersistentVolume, error)
  283. GetAPIPV(pvName string) (*v1.PersistentVolume, error)
  284. ListPVs(storageClassName string) []*v1.PersistentVolume
  285. }
  286. type pvAssumeCache struct {
  287. AssumeCache
  288. }
  289. func pvStorageClassIndexFunc(obj interface{}) ([]string, error) {
  290. if pv, ok := obj.(*v1.PersistentVolume); ok {
  291. return []string{pv.Spec.StorageClassName}, nil
  292. }
  293. return []string{""}, fmt.Errorf("object is not a v1.PersistentVolume: %v", obj)
  294. }
  295. // NewPVAssumeCache creates a PV assume cache.
  296. func NewPVAssumeCache(informer cache.SharedIndexInformer) PVAssumeCache {
  297. return &pvAssumeCache{NewAssumeCache(informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc)}
  298. }
  299. func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
  300. obj, err := c.Get(pvName)
  301. if err != nil {
  302. return nil, err
  303. }
  304. pv, ok := obj.(*v1.PersistentVolume)
  305. if !ok {
  306. return nil, &errWrongType{"v1.PersistentVolume", obj}
  307. }
  308. return pv, nil
  309. }
  310. func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) {
  311. obj, err := c.GetAPIObj(pvName)
  312. if err != nil {
  313. return nil, err
  314. }
  315. pv, ok := obj.(*v1.PersistentVolume)
  316. if !ok {
  317. return nil, &errWrongType{"v1.PersistentVolume", obj}
  318. }
  319. return pv, nil
  320. }
  321. func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume {
  322. objs := c.List(&v1.PersistentVolume{
  323. Spec: v1.PersistentVolumeSpec{
  324. StorageClassName: storageClassName,
  325. },
  326. })
  327. pvs := []*v1.PersistentVolume{}
  328. for _, obj := range objs {
  329. pv, ok := obj.(*v1.PersistentVolume)
  330. if !ok {
  331. klog.Errorf("ListPVs: %v", &errWrongType{"v1.PersistentVolume", obj})
  332. continue
  333. }
  334. pvs = append(pvs, pv)
  335. }
  336. return pvs
  337. }
  338. // PVCAssumeCache is a AssumeCache for PersistentVolumeClaim objects
  339. type PVCAssumeCache interface {
  340. AssumeCache
  341. // GetPVC returns the PVC from the cache with given pvcKey.
  342. // pvcKey is the result of MetaNamespaceKeyFunc on PVC obj
  343. GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
  344. GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
  345. }
  346. type pvcAssumeCache struct {
  347. AssumeCache
  348. }
  349. // NewPVCAssumeCache creates a PVC assume cache.
  350. func NewPVCAssumeCache(informer cache.SharedIndexInformer) PVCAssumeCache {
  351. return &pvcAssumeCache{NewAssumeCache(informer, "v1.PersistentVolumeClaim", "namespace", cache.MetaNamespaceIndexFunc)}
  352. }
  353. func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
  354. obj, err := c.Get(pvcKey)
  355. if err != nil {
  356. return nil, err
  357. }
  358. pvc, ok := obj.(*v1.PersistentVolumeClaim)
  359. if !ok {
  360. return nil, &errWrongType{"v1.PersistentVolumeClaim", obj}
  361. }
  362. return pvc, nil
  363. }
  364. func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
  365. obj, err := c.GetAPIObj(pvcKey)
  366. if err != nil {
  367. return nil, err
  368. }
  369. pvc, ok := obj.(*v1.PersistentVolumeClaim)
  370. if !ok {
  371. return nil, &errWrongType{"v1.PersistentVolumeClaim", obj}
  372. }
  373. return pvc, nil
  374. }