handler.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. // Copyright 2015 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package events
  15. import (
  16. "errors"
  17. "sort"
  18. "strings"
  19. "sync"
  20. "time"
  21. info "github.com/google/cadvisor/info/v1"
  22. "github.com/google/cadvisor/utils"
  23. "k8s.io/klog"
  24. )
  25. type byTimestamp []*info.Event
  26. // functions necessary to implement the sort interface on the Events struct
  27. func (e byTimestamp) Len() int {
  28. return len(e)
  29. }
  30. func (e byTimestamp) Swap(i, j int) {
  31. e[i], e[j] = e[j], e[i]
  32. }
  33. func (e byTimestamp) Less(i, j int) bool {
  34. return e[i].Timestamp.Before(e[j].Timestamp)
  35. }
  36. type EventChannel struct {
  37. // Watch ID. Can be used by the caller to request cancellation of watch events.
  38. watchId int
  39. // Channel on which the caller can receive watch events.
  40. channel chan *info.Event
  41. }
  42. // Request holds a set of parameters by which Event objects may be screened.
  43. // The caller may want events that occurred within a specific timeframe
  44. // or of a certain type, which may be specified in the *Request object
  45. // they pass to an EventManager function
  46. type Request struct {
  47. // events falling before StartTime do not satisfy the request. StartTime
  48. // must be left blank in calls to WatchEvents
  49. StartTime time.Time
  50. // events falling after EndTime do not satisfy the request. EndTime
  51. // must be left blank in calls to WatchEvents
  52. EndTime time.Time
  53. // EventType is a map that specifies the type(s) of events wanted
  54. EventType map[info.EventType]bool
  55. // allows the caller to put a limit on how many
  56. // events to receive. If there are more events than MaxEventsReturned
  57. // then the most chronologically recent events in the time period
  58. // specified are returned. Must be >= 1
  59. MaxEventsReturned int
  60. // the absolute container name for which the event occurred
  61. ContainerName string
  62. // if IncludeSubcontainers is false, only events occurring in the specific
  63. // container, and not the subcontainers, will be returned
  64. IncludeSubcontainers bool
  65. }
  66. // EventManager is implemented by Events. It provides two ways to monitor
  67. // events and one way to add events
  68. type EventManager interface {
  69. // WatchEvents() allows a caller to register for receiving events based on the specified request.
  70. // On successful registration, an EventChannel object is returned.
  71. WatchEvents(request *Request) (*EventChannel, error)
  72. // GetEvents() returns all detected events based on the filters specified in request.
  73. GetEvents(request *Request) ([]*info.Event, error)
  74. // AddEvent allows the caller to add an event to an EventManager
  75. // object
  76. AddEvent(e *info.Event) error
  77. // Cancels a previously requested watch event.
  78. StopWatch(watch_id int)
  79. }
  80. // events provides an implementation for the EventManager interface.
  81. type events struct {
  82. // eventStore holds the events by event type.
  83. eventStore map[info.EventType]*utils.TimedStore
  84. // map of registered watchers keyed by watch id.
  85. watchers map[int]*watch
  86. // lock guarding the eventStore.
  87. eventsLock sync.RWMutex
  88. // lock guarding watchers.
  89. watcherLock sync.RWMutex
  90. // last allocated watch id.
  91. lastId int
  92. // Event storage policy.
  93. storagePolicy StoragePolicy
  94. }
  95. // initialized by a call to WatchEvents(), a watch struct will then be added
  96. // to the events slice of *watch objects. When AddEvent() finds an event that
  97. // satisfies the request parameter of a watch object in events.watchers,
  98. // it will send that event out over the watch object's channel. The caller that
  99. // called WatchEvents will receive the event over the channel provided to
  100. // WatchEvents
  101. type watch struct {
  102. // request parameters passed in by the caller of WatchEvents()
  103. request *Request
  104. // a channel used to send event back to the caller.
  105. eventChannel *EventChannel
  106. }
  107. func NewEventChannel(watchId int) *EventChannel {
  108. return &EventChannel{
  109. watchId: watchId,
  110. channel: make(chan *info.Event, 10),
  111. }
  112. }
  113. // Policy specifying how many events to store.
  114. // MaxAge is the max duration for which to keep events.
  115. // MaxNumEvents is the max number of events to keep (-1 for no limit).
  116. type StoragePolicy struct {
  117. // Defaults limites, used if a per-event limit is not set.
  118. DefaultMaxAge time.Duration
  119. DefaultMaxNumEvents int
  120. // Per-event type limits.
  121. PerTypeMaxAge map[info.EventType]time.Duration
  122. PerTypeMaxNumEvents map[info.EventType]int
  123. }
  124. func DefaultStoragePolicy() StoragePolicy {
  125. return StoragePolicy{
  126. DefaultMaxAge: 24 * time.Hour,
  127. DefaultMaxNumEvents: 100000,
  128. PerTypeMaxAge: make(map[info.EventType]time.Duration),
  129. PerTypeMaxNumEvents: make(map[info.EventType]int),
  130. }
  131. }
  132. // returns a pointer to an initialized Events object.
  133. func NewEventManager(storagePolicy StoragePolicy) *events {
  134. return &events{
  135. eventStore: make(map[info.EventType]*utils.TimedStore, 0),
  136. watchers: make(map[int]*watch),
  137. storagePolicy: storagePolicy,
  138. }
  139. }
  140. // returns a pointer to an initialized Request object
  141. func NewRequest() *Request {
  142. return &Request{
  143. EventType: map[info.EventType]bool{},
  144. IncludeSubcontainers: false,
  145. MaxEventsReturned: 10,
  146. }
  147. }
  148. // returns a pointer to an initialized watch object
  149. func newWatch(request *Request, eventChannel *EventChannel) *watch {
  150. return &watch{
  151. request: request,
  152. eventChannel: eventChannel,
  153. }
  154. }
  155. func (self *EventChannel) GetChannel() chan *info.Event {
  156. return self.channel
  157. }
  158. func (self *EventChannel) GetWatchId() int {
  159. return self.watchId
  160. }
  161. // sorts and returns up to the last MaxEventsReturned chronological elements
  162. func getMaxEventsReturned(request *Request, eSlice []*info.Event) []*info.Event {
  163. sort.Sort(byTimestamp(eSlice))
  164. n := request.MaxEventsReturned
  165. if n >= len(eSlice) || n <= 0 {
  166. return eSlice
  167. }
  168. return eSlice[len(eSlice)-n:]
  169. }
  170. // If the request wants all subcontainers, this returns if the request's
  171. // container path is a prefix of the event container path. Otherwise,
  172. // it checks that the container paths of the event and request are
  173. // equivalent
  174. func checkIfIsSubcontainer(request *Request, event *info.Event) bool {
  175. if request.IncludeSubcontainers == true {
  176. return request.ContainerName == "/" || strings.HasPrefix(event.ContainerName+"/", request.ContainerName+"/")
  177. }
  178. return event.ContainerName == request.ContainerName
  179. }
  180. // determines if an event occurs within the time set in the request object and is the right type
  181. func checkIfEventSatisfiesRequest(request *Request, event *info.Event) bool {
  182. startTime := request.StartTime
  183. endTime := request.EndTime
  184. eventTime := event.Timestamp
  185. if !startTime.IsZero() {
  186. if startTime.After(eventTime) {
  187. return false
  188. }
  189. }
  190. if !endTime.IsZero() {
  191. if endTime.Before(eventTime) {
  192. return false
  193. }
  194. }
  195. if !request.EventType[event.EventType] {
  196. return false
  197. }
  198. if request.ContainerName != "" {
  199. return checkIfIsSubcontainer(request, event)
  200. }
  201. return true
  202. }
  203. // method of Events object that screens Event objects found in the eventStore
  204. // attribute and if they fit the parameters passed by the Request object,
  205. // adds it to a slice of *Event objects that is returned. If both MaxEventsReturned
  206. // and StartTime/EndTime are specified in the request object, then only
  207. // up to the most recent MaxEventsReturned events in that time range are returned.
  208. func (self *events) GetEvents(request *Request) ([]*info.Event, error) {
  209. returnEventList := []*info.Event{}
  210. self.eventsLock.RLock()
  211. defer self.eventsLock.RUnlock()
  212. for eventType, fetch := range request.EventType {
  213. if !fetch {
  214. continue
  215. }
  216. evs, ok := self.eventStore[eventType]
  217. if !ok {
  218. continue
  219. }
  220. res := evs.InTimeRange(request.StartTime, request.EndTime, request.MaxEventsReturned)
  221. for _, in := range res {
  222. e := in.(*info.Event)
  223. if checkIfEventSatisfiesRequest(request, e) {
  224. returnEventList = append(returnEventList, e)
  225. }
  226. }
  227. }
  228. returnEventList = getMaxEventsReturned(request, returnEventList)
  229. return returnEventList, nil
  230. }
  231. // method of Events object that maintains an *Event channel passed by the user.
  232. // When an event is added by AddEvents that satisfies the parameters in the passed
  233. // Request object it is fed to the channel. The StartTime and EndTime of the watch
  234. // request should be uninitialized because the purpose is to watch indefinitely
  235. // for events that will happen in the future
  236. func (self *events) WatchEvents(request *Request) (*EventChannel, error) {
  237. if !request.StartTime.IsZero() || !request.EndTime.IsZero() {
  238. return nil, errors.New(
  239. "for a call to watch, request.StartTime and request.EndTime must be uninitialized")
  240. }
  241. self.watcherLock.Lock()
  242. defer self.watcherLock.Unlock()
  243. new_id := self.lastId + 1
  244. returnEventChannel := NewEventChannel(new_id)
  245. newWatcher := newWatch(request, returnEventChannel)
  246. self.watchers[new_id] = newWatcher
  247. self.lastId = new_id
  248. return returnEventChannel, nil
  249. }
  250. // helper function to update the event manager's eventStore
  251. func (self *events) updateEventStore(e *info.Event) {
  252. self.eventsLock.Lock()
  253. defer self.eventsLock.Unlock()
  254. if _, ok := self.eventStore[e.EventType]; !ok {
  255. maxNumEvents := self.storagePolicy.DefaultMaxNumEvents
  256. if numEvents, ok := self.storagePolicy.PerTypeMaxNumEvents[e.EventType]; ok {
  257. maxNumEvents = numEvents
  258. }
  259. if maxNumEvents == 0 {
  260. // Event storage is disabled for e.EventType
  261. return
  262. }
  263. maxAge := self.storagePolicy.DefaultMaxAge
  264. if age, ok := self.storagePolicy.PerTypeMaxAge[e.EventType]; ok {
  265. maxAge = age
  266. }
  267. self.eventStore[e.EventType] = utils.NewTimedStore(maxAge, maxNumEvents)
  268. }
  269. self.eventStore[e.EventType].Add(e.Timestamp, e)
  270. }
  271. func (self *events) findValidWatchers(e *info.Event) []*watch {
  272. watchesToSend := make([]*watch, 0)
  273. for _, watcher := range self.watchers {
  274. watchRequest := watcher.request
  275. if checkIfEventSatisfiesRequest(watchRequest, e) {
  276. watchesToSend = append(watchesToSend, watcher)
  277. }
  278. }
  279. return watchesToSend
  280. }
  281. // method of Events object that adds the argument Event object to the
  282. // eventStore. It also feeds the event to a set of watch channels
  283. // held by the manager if it satisfies the request keys of the channels
  284. func (self *events) AddEvent(e *info.Event) error {
  285. self.updateEventStore(e)
  286. self.watcherLock.RLock()
  287. defer self.watcherLock.RUnlock()
  288. watchesToSend := self.findValidWatchers(e)
  289. for _, watchObject := range watchesToSend {
  290. watchObject.eventChannel.GetChannel() <- e
  291. }
  292. klog.V(4).Infof("Added event %v", e)
  293. return nil
  294. }
  295. // Removes a watch instance from the EventManager's watchers map
  296. func (self *events) StopWatch(watchId int) {
  297. self.watcherLock.Lock()
  298. defer self.watcherLock.Unlock()
  299. _, ok := self.watchers[watchId]
  300. if !ok {
  301. klog.Errorf("Could not find watcher instance %v", watchId)
  302. }
  303. close(self.watchers[watchId].eventChannel.GetChannel())
  304. delete(self.watchers, watchId)
  305. }