event_manager.go 11 KB


  1. /*
  2. Copyright (c) 2018 VMware, Inc. All Rights Reserved.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package simulator
  14. import (
  15. "bytes"
  16. "container/ring"
  17. "log"
  18. "reflect"
  19. "text/template"
  20. "time"
  21. "github.com/vmware/govmomi/object"
  22. "github.com/vmware/govmomi/simulator/esx"
  23. "github.com/vmware/govmomi/vim25/methods"
  24. "github.com/vmware/govmomi/vim25/mo"
  25. "github.com/vmware/govmomi/vim25/soap"
  26. "github.com/vmware/govmomi/vim25/types"
  27. )
  28. var (
  29. maxPageSize = 1000
  30. logEvents = false
  31. )
  32. type EventManager struct {
  33. mo.EventManager
  34. root types.ManagedObjectReference
  35. page *ring.Ring
  36. key int32
  37. collectors map[types.ManagedObjectReference]*EventHistoryCollector
  38. templates map[string]*template.Template
  39. }
  40. func NewEventManager(ref types.ManagedObjectReference) object.Reference {
  41. return &EventManager{
  42. EventManager: mo.EventManager{
  43. Self: ref,
  44. Description: types.EventDescription{
  45. EventInfo: esx.EventInfo,
  46. },
  47. MaxCollector: 1000,
  48. },
  49. root: Map.content().RootFolder,
  50. page: ring.New(maxPageSize),
  51. collectors: make(map[types.ManagedObjectReference]*EventHistoryCollector),
  52. templates: make(map[string]*template.Template),
  53. }
  54. }
  55. func (m *EventManager) createCollector(ctx *Context, req *types.CreateCollectorForEvents) (*EventHistoryCollector, *soap.Fault) {
  56. size, err := validatePageSize(req.Filter.MaxCount)
  57. if err != nil {
  58. return nil, err
  59. }
  60. if len(m.collectors) >= int(m.MaxCollector) {
  61. return nil, Fault("Too many event collectors to create", new(types.InvalidState))
  62. }
  63. collector := &EventHistoryCollector{
  64. m: m,
  65. page: ring.New(size),
  66. }
  67. collector.Filter = req.Filter
  68. collector.fillPage(size)
  69. return collector, nil
  70. }
  71. func (m *EventManager) CreateCollectorForEvents(ctx *Context, req *types.CreateCollectorForEvents) soap.HasFault {
  72. body := new(methods.CreateCollectorForEventsBody)
  73. collector, err := m.createCollector(ctx, req)
  74. if err != nil {
  75. body.Fault_ = err
  76. return body
  77. }
  78. ref := ctx.Session.Put(collector).Reference()
  79. m.collectors[ref] = collector
  80. body.Res = &types.CreateCollectorForEventsResponse{
  81. Returnval: ref,
  82. }
  83. return body
  84. }
  85. func (m *EventManager) QueryEvents(ctx *Context, req *types.QueryEvents) soap.HasFault {
  86. if Map.IsESX() {
  87. return &methods.QueryEventsBody{
  88. Fault_: Fault("", new(types.NotImplemented)),
  89. }
  90. }
  91. body := new(methods.QueryEventsBody)
  92. collector, err := m.createCollector(ctx, &types.CreateCollectorForEvents{Filter: req.Filter})
  93. if err != nil {
  94. body.Fault_ = err
  95. return body
  96. }
  97. body.Res = &types.QueryEventsResponse{
  98. Returnval: collector.GetLatestPage(),
  99. }
  100. return body
  101. }
  102. // formatMessage applies the EventDescriptionEventDetail.FullFormat template to the given event's FullFormattedMessage field.
  103. func (m *EventManager) formatMessage(event types.BaseEvent) {
  104. id := reflect.ValueOf(event).Elem().Type().Name()
  105. e := event.GetEvent()
  106. t, ok := m.templates[id]
  107. if !ok {
  108. for _, info := range m.Description.EventInfo {
  109. if info.Key == id {
  110. t = template.Must(template.New(id).Parse(info.FullFormat))
  111. m.templates[id] = t
  112. break
  113. }
  114. }
  115. }
  116. if t != nil {
  117. var buf bytes.Buffer
  118. if err := t.Execute(&buf, event); err != nil {
  119. log.Print(err)
  120. }
  121. e.FullFormattedMessage = buf.String()
  122. }
  123. if logEvents {
  124. log.Printf("[%s] %s", id, e.FullFormattedMessage)
  125. }
  126. }
  127. func (m *EventManager) PostEvent(ctx *Context, req *types.PostEvent) soap.HasFault {
  128. m.key++
  129. event := req.EventToPost.GetEvent()
  130. event.Key = m.key
  131. event.ChainId = event.Key
  132. event.CreatedTime = time.Now()
  133. event.UserName = ctx.Session.UserName
  134. m.page = m.page.Prev()
  135. m.page.Value = req.EventToPost
  136. m.formatMessage(req.EventToPost)
  137. for _, c := range m.collectors {
  138. ctx.WithLock(c, func() {
  139. if c.eventMatches(req.EventToPost) {
  140. c.page = c.page.Prev()
  141. c.page.Value = req.EventToPost
  142. Map.Update(c, []types.PropertyChange{{Name: "latestPage", Val: c.GetLatestPage()}})
  143. }
  144. })
  145. }
  146. return &methods.PostEventBody{
  147. Res: new(types.PostEventResponse),
  148. }
  149. }
  150. type EventHistoryCollector struct {
  151. mo.EventHistoryCollector
  152. m *EventManager
  153. page *ring.Ring
  154. pos int
  155. }
  156. // doEntityEventArgument calls f for each entity argument in the event.
  157. // If f returns true, the iteration stops.
  158. func doEntityEventArgument(event types.BaseEvent, f func(types.ManagedObjectReference, *types.EntityEventArgument) bool) bool {
  159. e := event.GetEvent()
  160. if arg := e.Vm; arg != nil {
  161. if f(arg.Vm, &arg.EntityEventArgument) {
  162. return true
  163. }
  164. }
  165. if arg := e.Host; arg != nil {
  166. if f(arg.Host, &arg.EntityEventArgument) {
  167. return true
  168. }
  169. }
  170. if arg := e.ComputeResource; arg != nil {
  171. if f(arg.ComputeResource, &arg.EntityEventArgument) {
  172. return true
  173. }
  174. }
  175. if arg := e.Ds; arg != nil {
  176. if f(arg.Datastore, &arg.EntityEventArgument) {
  177. return true
  178. }
  179. }
  180. if arg := e.Net; arg != nil {
  181. if f(arg.Network, &arg.EntityEventArgument) {
  182. return true
  183. }
  184. }
  185. if arg := e.Dvs; arg != nil {
  186. if f(arg.Dvs, &arg.EntityEventArgument) {
  187. return true
  188. }
  189. }
  190. if arg := e.Datacenter; arg != nil {
  191. if f(arg.Datacenter, &arg.EntityEventArgument) {
  192. return true
  193. }
  194. }
  195. return false
  196. }
  197. // eventFilterSelf returns true if self is one of the entity arguments in the event.
  198. func eventFilterSelf(event types.BaseEvent, self types.ManagedObjectReference) bool {
  199. return doEntityEventArgument(event, func(ref types.ManagedObjectReference, _ *types.EntityEventArgument) bool {
  200. return self == ref
  201. })
  202. }
  203. // eventFilterChildren returns true if a child of self is one of the entity arguments in the event.
  204. func eventFilterChildren(event types.BaseEvent, self types.ManagedObjectReference) bool {
  205. return doEntityEventArgument(event, func(ref types.ManagedObjectReference, _ *types.EntityEventArgument) bool {
  206. seen := false
  207. var match func(types.ManagedObjectReference)
  208. match = func(child types.ManagedObjectReference) {
  209. if child == self {
  210. seen = true
  211. return
  212. }
  213. walk(child, match)
  214. }
  215. walk(ref, match)
  216. return seen
  217. })
  218. }
  219. // entityMatches returns true if the spec Entity filter matches the event.
  220. func (c *EventHistoryCollector) entityMatches(event types.BaseEvent, spec *types.EventFilterSpec) bool {
  221. e := spec.Entity
  222. if e == nil {
  223. return true
  224. }
  225. isRootFolder := c.m.root == e.Entity
  226. switch e.Recursion {
  227. case types.EventFilterSpecRecursionOptionSelf:
  228. return isRootFolder || eventFilterSelf(event, e.Entity)
  229. case types.EventFilterSpecRecursionOptionChildren:
  230. return eventFilterChildren(event, e.Entity)
  231. case types.EventFilterSpecRecursionOptionAll:
  232. if isRootFolder || eventFilterSelf(event, e.Entity) {
  233. return true
  234. }
  235. return eventFilterChildren(event, e.Entity)
  236. }
  237. return false
  238. }
  239. // typeMatches returns true if one of the spec EventTypeId types matches the event.
  240. func (c *EventHistoryCollector) typeMatches(event types.BaseEvent, spec *types.EventFilterSpec) bool {
  241. if len(spec.EventTypeId) == 0 {
  242. return true
  243. }
  244. matches := func(name string) bool {
  245. for _, id := range spec.EventTypeId {
  246. if id == name {
  247. return true
  248. }
  249. }
  250. return false
  251. }
  252. kind := reflect.ValueOf(event).Elem().Type()
  253. if matches(kind.Name()) {
  254. return true // concrete type
  255. }
  256. field, ok := kind.FieldByNameFunc(matches)
  257. if ok {
  258. return field.Anonymous // base type (embedded field)
  259. }
  260. return false
  261. }
  262. // eventMatches returns true one of the filters matches the event.
  263. func (c *EventHistoryCollector) eventMatches(event types.BaseEvent) bool {
  264. spec := c.Filter.(types.EventFilterSpec)
  265. if !c.typeMatches(event, &spec) {
  266. return false
  267. }
  268. // TODO: spec.Time, spec.UserName, etc
  269. return c.entityMatches(event, &spec)
  270. }
  271. // filePage copies the manager's latest events into the collector's page with Filter applied.
  272. func (c *EventHistoryCollector) fillPage(size int) {
  273. c.pos = 0
  274. l := c.page.Len()
  275. delta := size - l
  276. if delta < 0 {
  277. // Shrink ring size
  278. c.page = c.page.Unlink(-delta)
  279. return
  280. }
  281. matches := 0
  282. mpage := c.m.page
  283. page := c.page
  284. if delta != 0 {
  285. // Grow ring size
  286. c.page = c.page.Link(ring.New(delta))
  287. }
  288. for i := 0; i < maxPageSize; i++ {
  289. event, ok := mpage.Value.(types.BaseEvent)
  290. mpage = mpage.Prev()
  291. if !ok {
  292. continue
  293. }
  294. if c.eventMatches(event) {
  295. page.Value = event
  296. page = page.Prev()
  297. matches++
  298. if matches == size {
  299. break
  300. }
  301. }
  302. }
  303. }
  304. func validatePageSize(count int32) (int, *soap.Fault) {
  305. size := int(count)
  306. if size == 0 {
  307. size = 10 // defaultPageSize
  308. } else if size < 0 || size > maxPageSize {
  309. return -1, Fault("", &types.InvalidArgument{InvalidProperty: "maxCount"})
  310. }
  311. return size, nil
  312. }
  313. func (c *EventHistoryCollector) SetCollectorPageSize(ctx *Context, req *types.SetCollectorPageSize) soap.HasFault {
  314. body := new(methods.SetCollectorPageSizeBody)
  315. size, err := validatePageSize(req.MaxCount)
  316. if err != nil {
  317. body.Fault_ = err
  318. return body
  319. }
  320. ctx.WithLock(c.m, func() {
  321. c.fillPage(size)
  322. })
  323. body.Res = new(types.SetCollectorPageSizeResponse)
  324. return body
  325. }
  326. func (c *EventHistoryCollector) RewindCollector(ctx *Context, req *types.RewindCollector) soap.HasFault {
  327. c.pos = 0
  328. return &methods.RewindCollectorBody{
  329. Res: new(types.RewindCollectorResponse),
  330. }
  331. }
  332. func (c *EventHistoryCollector) ReadNextEvents(ctx *Context, req *types.ReadNextEvents) soap.HasFault {
  333. body := &methods.ReadNextEventsBody{}
  334. if req.MaxCount <= 0 {
  335. body.Fault_ = Fault("", &types.InvalidArgument{InvalidProperty: "maxCount"})
  336. return body
  337. }
  338. body.Res = new(types.ReadNextEventsResponse)
  339. events := c.GetLatestPage()
  340. nevents := len(events)
  341. if c.pos == nevents {
  342. return body // already read to EOF
  343. }
  344. start := c.pos
  345. end := start + int(req.MaxCount)
  346. c.pos += int(req.MaxCount)
  347. if end > nevents {
  348. end = nevents
  349. c.pos = nevents
  350. }
  351. body.Res.Returnval = events[start:end]
  352. return body
  353. }
  354. func (c *EventHistoryCollector) ReadPreviousEvents(ctx *Context, req *types.ReadPreviousEvents) soap.HasFault {
  355. body := &methods.ReadPreviousEventsBody{}
  356. if req.MaxCount <= 0 {
  357. body.Fault_ = Fault("", &types.InvalidArgument{InvalidProperty: "maxCount"})
  358. return body
  359. }
  360. body.Res = new(types.ReadPreviousEventsResponse)
  361. events := c.GetLatestPage()
  362. if c.pos == 0 {
  363. return body // already read to EOF
  364. }
  365. start := c.pos - int(req.MaxCount)
  366. end := c.pos
  367. c.pos -= int(req.MaxCount)
  368. if start < 0 {
  369. start = 0
  370. c.pos = 0
  371. }
  372. body.Res.Returnval = events[start:end]
  373. return body
  374. }
  375. func (c *EventHistoryCollector) DestroyCollector(ctx *Context, req *types.DestroyCollector) soap.HasFault {
  376. ctx.Session.Remove(req.This)
  377. ctx.WithLock(c.m, func() {
  378. delete(c.m.collectors, req.This)
  379. })
  380. return &methods.DestroyCollectorBody{
  381. Res: new(types.DestroyCollectorResponse),
  382. }
  383. }
  384. func (c *EventHistoryCollector) GetLatestPage() []types.BaseEvent {
  385. var latestPage []types.BaseEvent
  386. c.page.Do(func(val interface{}) {
  387. if val == nil {
  388. return
  389. }
  390. latestPage = append(latestPage, val.(types.BaseEvent))
  391. })
  392. return latestPage
  393. }
  394. func (c *EventHistoryCollector) Get() mo.Reference {
  395. clone := *c
  396. clone.LatestPage = clone.GetLatestPage()
  397. return &clone
  398. }