123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225 |
- /*
- The remote package provides the pieces to allow Ginkgo test suites to report to remote listeners.
- This is used, primarily, to enable streaming parallel test output but has, in principal, broader applications (e.g. streaming test output to a browser).
- */
- package remote
- import (
- "encoding/json"
- "io/ioutil"
- "net"
- "net/http"
- "sync"
- "github.com/onsi/ginkgo/internal/spec_iterator"
- "github.com/onsi/ginkgo/config"
- "github.com/onsi/ginkgo/reporters"
- "github.com/onsi/ginkgo/types"
- )
- /*
- Server spins up on an automatically selected port and listens for communication from the forwarding reporter.
- It then forwards that communication to attached reporters.
- */
- type Server struct {
- listener net.Listener
- reporters []reporters.Reporter
- alives []func() bool
- lock *sync.Mutex
- beforeSuiteData types.RemoteBeforeSuiteData
- parallelTotal int
- counter int
- }
- //Create a new server, automatically selecting a port
- func NewServer(parallelTotal int) (*Server, error) {
- listener, err := net.Listen("tcp", "127.0.0.1:0")
- if err != nil {
- return nil, err
- }
- return &Server{
- listener: listener,
- lock: &sync.Mutex{},
- alives: make([]func() bool, parallelTotal),
- beforeSuiteData: types.RemoteBeforeSuiteData{Data: nil, State: types.RemoteBeforeSuiteStatePending},
- parallelTotal: parallelTotal,
- }, nil
- }
- //Start the server. You don't need to `go s.Start()`, just `s.Start()`
- func (server *Server) Start() {
- httpServer := &http.Server{}
- mux := http.NewServeMux()
- httpServer.Handler = mux
- //streaming endpoints
- mux.HandleFunc("/SpecSuiteWillBegin", server.specSuiteWillBegin)
- mux.HandleFunc("/BeforeSuiteDidRun", server.beforeSuiteDidRun)
- mux.HandleFunc("/AfterSuiteDidRun", server.afterSuiteDidRun)
- mux.HandleFunc("/SpecWillRun", server.specWillRun)
- mux.HandleFunc("/SpecDidComplete", server.specDidComplete)
- mux.HandleFunc("/SpecSuiteDidEnd", server.specSuiteDidEnd)
- //synchronization endpoints
- mux.HandleFunc("/BeforeSuiteState", server.handleBeforeSuiteState)
- mux.HandleFunc("/RemoteAfterSuiteData", server.handleRemoteAfterSuiteData)
- mux.HandleFunc("/counter", server.handleCounter)
- mux.HandleFunc("/has-counter", server.handleHasCounter) //for backward compatibility
- go httpServer.Serve(server.listener)
- }
- //Stop the server
- func (server *Server) Close() {
- server.listener.Close()
- }
- //The address the server can be reached it. Pass this into the `ForwardingReporter`.
- func (server *Server) Address() string {
- return "http://" + server.listener.Addr().String()
- }
- //
- // Streaming Endpoints
- //
- //The server will forward all received messages to Ginkgo reporters registered with `RegisterReporters`
- func (server *Server) readAll(request *http.Request) []byte {
- defer request.Body.Close()
- body, _ := ioutil.ReadAll(request.Body)
- return body
- }
- func (server *Server) RegisterReporters(reporters ...reporters.Reporter) {
- server.reporters = reporters
- }
- func (server *Server) specSuiteWillBegin(writer http.ResponseWriter, request *http.Request) {
- body := server.readAll(request)
- var data struct {
- Config config.GinkgoConfigType `json:"config"`
- Summary *types.SuiteSummary `json:"suite-summary"`
- }
- json.Unmarshal(body, &data)
- for _, reporter := range server.reporters {
- reporter.SpecSuiteWillBegin(data.Config, data.Summary)
- }
- }
- func (server *Server) beforeSuiteDidRun(writer http.ResponseWriter, request *http.Request) {
- body := server.readAll(request)
- var setupSummary *types.SetupSummary
- json.Unmarshal(body, &setupSummary)
- for _, reporter := range server.reporters {
- reporter.BeforeSuiteDidRun(setupSummary)
- }
- }
- func (server *Server) afterSuiteDidRun(writer http.ResponseWriter, request *http.Request) {
- body := server.readAll(request)
- var setupSummary *types.SetupSummary
- json.Unmarshal(body, &setupSummary)
- for _, reporter := range server.reporters {
- reporter.AfterSuiteDidRun(setupSummary)
- }
- }
- func (server *Server) specWillRun(writer http.ResponseWriter, request *http.Request) {
- body := server.readAll(request)
- var specSummary *types.SpecSummary
- json.Unmarshal(body, &specSummary)
- for _, reporter := range server.reporters {
- reporter.SpecWillRun(specSummary)
- }
- }
- func (server *Server) specDidComplete(writer http.ResponseWriter, request *http.Request) {
- body := server.readAll(request)
- var specSummary *types.SpecSummary
- json.Unmarshal(body, &specSummary)
- for _, reporter := range server.reporters {
- reporter.SpecDidComplete(specSummary)
- }
- }
- func (server *Server) specSuiteDidEnd(writer http.ResponseWriter, request *http.Request) {
- body := server.readAll(request)
- var suiteSummary *types.SuiteSummary
- json.Unmarshal(body, &suiteSummary)
- for _, reporter := range server.reporters {
- reporter.SpecSuiteDidEnd(suiteSummary)
- }
- }
- //
- // Synchronization Endpoints
- //
- func (server *Server) RegisterAlive(node int, alive func() bool) {
- server.lock.Lock()
- defer server.lock.Unlock()
- server.alives[node-1] = alive
- }
- func (server *Server) nodeIsAlive(node int) bool {
- server.lock.Lock()
- defer server.lock.Unlock()
- alive := server.alives[node-1]
- if alive == nil {
- return true
- }
- return alive()
- }
- func (server *Server) handleBeforeSuiteState(writer http.ResponseWriter, request *http.Request) {
- if request.Method == "POST" {
- dec := json.NewDecoder(request.Body)
- dec.Decode(&(server.beforeSuiteData))
- } else {
- beforeSuiteData := server.beforeSuiteData
- if beforeSuiteData.State == types.RemoteBeforeSuiteStatePending && !server.nodeIsAlive(1) {
- beforeSuiteData.State = types.RemoteBeforeSuiteStateDisappeared
- }
- enc := json.NewEncoder(writer)
- enc.Encode(beforeSuiteData)
- }
- }
- func (server *Server) handleRemoteAfterSuiteData(writer http.ResponseWriter, request *http.Request) {
- afterSuiteData := types.RemoteAfterSuiteData{
- CanRun: true,
- }
- for i := 2; i <= server.parallelTotal; i++ {
- afterSuiteData.CanRun = afterSuiteData.CanRun && !server.nodeIsAlive(i)
- }
- enc := json.NewEncoder(writer)
- enc.Encode(afterSuiteData)
- }
- func (server *Server) handleCounter(writer http.ResponseWriter, request *http.Request) {
- c := spec_iterator.Counter{}
- server.lock.Lock()
- c.Index = server.counter
- server.counter++
- server.lock.Unlock()
- json.NewEncoder(writer).Encode(c)
- }
- func (server *Server) handleHasCounter(writer http.ResponseWriter, request *http.Request) {
- writer.Write([]byte(""))
- }
|