aggregator.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. /*
  2. Aggregator is a reporter used by the Ginkgo CLI to aggregate and present parallel test output
  3. coherently as tests complete. You shouldn't need to use this in your code. To run tests in parallel:
  4. ginkgo -nodes=N
  5. where N is the number of nodes you desire.
  6. */
  7. package remote
  8. import (
  9. "time"
  10. "github.com/onsi/ginkgo/config"
  11. "github.com/onsi/ginkgo/reporters/stenographer"
  12. "github.com/onsi/ginkgo/types"
  13. )
  14. type configAndSuite struct {
  15. config config.GinkgoConfigType
  16. summary *types.SuiteSummary
  17. }
  18. type Aggregator struct {
  19. nodeCount int
  20. config config.DefaultReporterConfigType
  21. stenographer stenographer.Stenographer
  22. result chan bool
  23. suiteBeginnings chan configAndSuite
  24. aggregatedSuiteBeginnings []configAndSuite
  25. beforeSuites chan *types.SetupSummary
  26. aggregatedBeforeSuites []*types.SetupSummary
  27. afterSuites chan *types.SetupSummary
  28. aggregatedAfterSuites []*types.SetupSummary
  29. specCompletions chan *types.SpecSummary
  30. completedSpecs []*types.SpecSummary
  31. suiteEndings chan *types.SuiteSummary
  32. aggregatedSuiteEndings []*types.SuiteSummary
  33. specs []*types.SpecSummary
  34. startTime time.Time
  35. }
  36. func NewAggregator(nodeCount int, result chan bool, config config.DefaultReporterConfigType, stenographer stenographer.Stenographer) *Aggregator {
  37. aggregator := &Aggregator{
  38. nodeCount: nodeCount,
  39. result: result,
  40. config: config,
  41. stenographer: stenographer,
  42. suiteBeginnings: make(chan configAndSuite, 0),
  43. beforeSuites: make(chan *types.SetupSummary, 0),
  44. afterSuites: make(chan *types.SetupSummary, 0),
  45. specCompletions: make(chan *types.SpecSummary, 0),
  46. suiteEndings: make(chan *types.SuiteSummary, 0),
  47. }
  48. go aggregator.mux()
  49. return aggregator
  50. }
  51. func (aggregator *Aggregator) SpecSuiteWillBegin(config config.GinkgoConfigType, summary *types.SuiteSummary) {
  52. aggregator.suiteBeginnings <- configAndSuite{config, summary}
  53. }
  54. func (aggregator *Aggregator) BeforeSuiteDidRun(setupSummary *types.SetupSummary) {
  55. aggregator.beforeSuites <- setupSummary
  56. }
  57. func (aggregator *Aggregator) AfterSuiteDidRun(setupSummary *types.SetupSummary) {
  58. aggregator.afterSuites <- setupSummary
  59. }
  60. func (aggregator *Aggregator) SpecWillRun(specSummary *types.SpecSummary) {
  61. //noop
  62. }
  63. func (aggregator *Aggregator) SpecDidComplete(specSummary *types.SpecSummary) {
  64. aggregator.specCompletions <- specSummary
  65. }
  66. func (aggregator *Aggregator) SpecSuiteDidEnd(summary *types.SuiteSummary) {
  67. aggregator.suiteEndings <- summary
  68. }
  69. func (aggregator *Aggregator) mux() {
  70. loop:
  71. for {
  72. select {
  73. case configAndSuite := <-aggregator.suiteBeginnings:
  74. aggregator.registerSuiteBeginning(configAndSuite)
  75. case setupSummary := <-aggregator.beforeSuites:
  76. aggregator.registerBeforeSuite(setupSummary)
  77. case setupSummary := <-aggregator.afterSuites:
  78. aggregator.registerAfterSuite(setupSummary)
  79. case specSummary := <-aggregator.specCompletions:
  80. aggregator.registerSpecCompletion(specSummary)
  81. case suite := <-aggregator.suiteEndings:
  82. finished, passed := aggregator.registerSuiteEnding(suite)
  83. if finished {
  84. aggregator.result <- passed
  85. break loop
  86. }
  87. }
  88. }
  89. }
  90. func (aggregator *Aggregator) registerSuiteBeginning(configAndSuite configAndSuite) {
  91. aggregator.aggregatedSuiteBeginnings = append(aggregator.aggregatedSuiteBeginnings, configAndSuite)
  92. if len(aggregator.aggregatedSuiteBeginnings) == 1 {
  93. aggregator.startTime = time.Now()
  94. }
  95. if len(aggregator.aggregatedSuiteBeginnings) != aggregator.nodeCount {
  96. return
  97. }
  98. aggregator.stenographer.AnnounceSuite(configAndSuite.summary.SuiteDescription, configAndSuite.config.RandomSeed, configAndSuite.config.RandomizeAllSpecs, aggregator.config.Succinct)
  99. totalNumberOfSpecs := 0
  100. if len(aggregator.aggregatedSuiteBeginnings) > 0 {
  101. totalNumberOfSpecs = configAndSuite.summary.NumberOfSpecsBeforeParallelization
  102. }
  103. aggregator.stenographer.AnnounceTotalNumberOfSpecs(totalNumberOfSpecs, aggregator.config.Succinct)
  104. aggregator.stenographer.AnnounceAggregatedParallelRun(aggregator.nodeCount, aggregator.config.Succinct)
  105. aggregator.flushCompletedSpecs()
  106. }
  107. func (aggregator *Aggregator) registerBeforeSuite(setupSummary *types.SetupSummary) {
  108. aggregator.aggregatedBeforeSuites = append(aggregator.aggregatedBeforeSuites, setupSummary)
  109. aggregator.flushCompletedSpecs()
  110. }
  111. func (aggregator *Aggregator) registerAfterSuite(setupSummary *types.SetupSummary) {
  112. aggregator.aggregatedAfterSuites = append(aggregator.aggregatedAfterSuites, setupSummary)
  113. aggregator.flushCompletedSpecs()
  114. }
  115. func (aggregator *Aggregator) registerSpecCompletion(specSummary *types.SpecSummary) {
  116. aggregator.completedSpecs = append(aggregator.completedSpecs, specSummary)
  117. aggregator.specs = append(aggregator.specs, specSummary)
  118. aggregator.flushCompletedSpecs()
  119. }
  120. func (aggregator *Aggregator) flushCompletedSpecs() {
  121. if len(aggregator.aggregatedSuiteBeginnings) != aggregator.nodeCount {
  122. return
  123. }
  124. for _, setupSummary := range aggregator.aggregatedBeforeSuites {
  125. aggregator.announceBeforeSuite(setupSummary)
  126. }
  127. for _, specSummary := range aggregator.completedSpecs {
  128. aggregator.announceSpec(specSummary)
  129. }
  130. for _, setupSummary := range aggregator.aggregatedAfterSuites {
  131. aggregator.announceAfterSuite(setupSummary)
  132. }
  133. aggregator.aggregatedBeforeSuites = []*types.SetupSummary{}
  134. aggregator.completedSpecs = []*types.SpecSummary{}
  135. aggregator.aggregatedAfterSuites = []*types.SetupSummary{}
  136. }
  137. func (aggregator *Aggregator) announceBeforeSuite(setupSummary *types.SetupSummary) {
  138. aggregator.stenographer.AnnounceCapturedOutput(setupSummary.CapturedOutput)
  139. if setupSummary.State != types.SpecStatePassed {
  140. aggregator.stenographer.AnnounceBeforeSuiteFailure(setupSummary, aggregator.config.Succinct, aggregator.config.FullTrace)
  141. }
  142. }
  143. func (aggregator *Aggregator) announceAfterSuite(setupSummary *types.SetupSummary) {
  144. aggregator.stenographer.AnnounceCapturedOutput(setupSummary.CapturedOutput)
  145. if setupSummary.State != types.SpecStatePassed {
  146. aggregator.stenographer.AnnounceAfterSuiteFailure(setupSummary, aggregator.config.Succinct, aggregator.config.FullTrace)
  147. }
  148. }
  149. func (aggregator *Aggregator) announceSpec(specSummary *types.SpecSummary) {
  150. if aggregator.config.Verbose && specSummary.State != types.SpecStatePending && specSummary.State != types.SpecStateSkipped {
  151. aggregator.stenographer.AnnounceSpecWillRun(specSummary)
  152. }
  153. aggregator.stenographer.AnnounceCapturedOutput(specSummary.CapturedOutput)
  154. switch specSummary.State {
  155. case types.SpecStatePassed:
  156. if specSummary.IsMeasurement {
  157. aggregator.stenographer.AnnounceSuccesfulMeasurement(specSummary, aggregator.config.Succinct)
  158. } else if specSummary.RunTime.Seconds() >= aggregator.config.SlowSpecThreshold {
  159. aggregator.stenographer.AnnounceSuccesfulSlowSpec(specSummary, aggregator.config.Succinct)
  160. } else {
  161. aggregator.stenographer.AnnounceSuccesfulSpec(specSummary)
  162. }
  163. case types.SpecStatePending:
  164. aggregator.stenographer.AnnouncePendingSpec(specSummary, aggregator.config.NoisyPendings && !aggregator.config.Succinct)
  165. case types.SpecStateSkipped:
  166. aggregator.stenographer.AnnounceSkippedSpec(specSummary, aggregator.config.Succinct || !aggregator.config.NoisySkippings, aggregator.config.FullTrace)
  167. case types.SpecStateTimedOut:
  168. aggregator.stenographer.AnnounceSpecTimedOut(specSummary, aggregator.config.Succinct, aggregator.config.FullTrace)
  169. case types.SpecStatePanicked:
  170. aggregator.stenographer.AnnounceSpecPanicked(specSummary, aggregator.config.Succinct, aggregator.config.FullTrace)
  171. case types.SpecStateFailed:
  172. aggregator.stenographer.AnnounceSpecFailed(specSummary, aggregator.config.Succinct, aggregator.config.FullTrace)
  173. }
  174. }
  175. func (aggregator *Aggregator) registerSuiteEnding(suite *types.SuiteSummary) (finished bool, passed bool) {
  176. aggregator.aggregatedSuiteEndings = append(aggregator.aggregatedSuiteEndings, suite)
  177. if len(aggregator.aggregatedSuiteEndings) < aggregator.nodeCount {
  178. return false, false
  179. }
  180. aggregatedSuiteSummary := &types.SuiteSummary{}
  181. aggregatedSuiteSummary.SuiteSucceeded = true
  182. for _, suiteSummary := range aggregator.aggregatedSuiteEndings {
  183. if suiteSummary.SuiteSucceeded == false {
  184. aggregatedSuiteSummary.SuiteSucceeded = false
  185. }
  186. aggregatedSuiteSummary.NumberOfSpecsThatWillBeRun += suiteSummary.NumberOfSpecsThatWillBeRun
  187. aggregatedSuiteSummary.NumberOfTotalSpecs += suiteSummary.NumberOfTotalSpecs
  188. aggregatedSuiteSummary.NumberOfPassedSpecs += suiteSummary.NumberOfPassedSpecs
  189. aggregatedSuiteSummary.NumberOfFailedSpecs += suiteSummary.NumberOfFailedSpecs
  190. aggregatedSuiteSummary.NumberOfPendingSpecs += suiteSummary.NumberOfPendingSpecs
  191. aggregatedSuiteSummary.NumberOfSkippedSpecs += suiteSummary.NumberOfSkippedSpecs
  192. aggregatedSuiteSummary.NumberOfFlakedSpecs += suiteSummary.NumberOfFlakedSpecs
  193. }
  194. aggregatedSuiteSummary.RunTime = time.Since(aggregator.startTime)
  195. aggregator.stenographer.SummarizeFailures(aggregator.specs)
  196. aggregator.stenographer.AnnounceSpecRunCompletion(aggregatedSuiteSummary, aggregator.config.Succinct)
  197. return true, aggregatedSuiteSummary.SuiteSucceeded
  198. }