driver_core.c 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. /*
  2. * StarPU
  3. * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include "driver_core.h"
  17. #include <core/policies/sched_policy.h>
  18. int execute_job_on_core(job_t j, struct worker_s *core_args)
  19. {
  20. int ret;
  21. tick_t codelet_start, codelet_end;
  22. tick_t codelet_start_comm, codelet_end_comm;
  23. unsigned calibrate_model = 0;
  24. struct starpu_task *task = j->task;
  25. STARPU_ASSERT(task->cl);
  26. STARPU_ASSERT(task->cl->core_func);
  27. if (task->cl->model && task->cl->model->benchmarking)
  28. calibrate_model = 1;
  29. if (calibrate_model || BENCHMARK_COMM)
  30. GET_TICK(codelet_start_comm);
  31. ret = fetch_codelet_input(task->buffers, task->interface,
  32. task->cl->nbuffers, 0);
  33. if (calibrate_model || BENCHMARK_COMM)
  34. GET_TICK(codelet_end_comm);
  35. if (ret != 0) {
  36. /* there was not enough memory so the codelet cannot be executed right now ... */
  37. /* push the codelet back and try another one ... */
  38. return STARPU_TRYAGAIN;
  39. }
  40. TRACE_START_CODELET_BODY(j);
  41. if (calibrate_model || BENCHMARK_COMM)
  42. GET_TICK(codelet_start);
  43. cl_func func = task->cl->core_func;
  44. func(task->interface, task->cl_arg);
  45. if (calibrate_model || BENCHMARK_COMM)
  46. GET_TICK(codelet_end);
  47. TRACE_END_CODELET_BODY(j);
  48. push_codelet_output(task->buffers, task->cl->nbuffers, 0);
  49. //#ifdef MODEL_DEBUG
  50. if (calibrate_model || BENCHMARK_COMM)
  51. {
  52. double measured = timing_delay(&codelet_start, &codelet_end);
  53. double measured_comm = timing_delay(&codelet_start_comm, &codelet_end_comm);
  54. // fprintf(stderr, "%d\t%d\n", (int)j->penality, (int)measured_comm);
  55. core_args->jobq->total_computation_time += measured;
  56. core_args->jobq->total_communication_time += measured_comm;
  57. if (calibrate_model)
  58. update_perfmodel_history(j, core_args->arch, measured);
  59. }
  60. //#endif
  61. return STARPU_SUCCESS;
  62. }
  63. void *core_worker(void *arg)
  64. {
  65. struct worker_s *core_arg = arg;
  66. #ifdef USE_FXT
  67. fxt_register_thread(core_arg->bindid);
  68. #endif
  69. TRACE_NEW_WORKER(FUT_CORE_KEY, core_arg->memory_node);
  70. bind_thread_on_cpu(core_arg->bindid);
  71. #ifdef VERBOSE
  72. fprintf(stderr, "core worker %d is ready on logical core %d\n", core_arg->id, core_arg->bindid);
  73. #endif
  74. set_local_memory_node_key(&core_arg->memory_node);
  75. set_local_queue(core_arg->jobq);
  76. /* this is only useful (and meaningful) is there is a single
  77. memory node "related" to that queue */
  78. core_arg->jobq->memory_node = core_arg->memory_node;
  79. core_arg->jobq->total_computation_time = 0.0;
  80. core_arg->jobq->total_communication_time = 0.0;
  81. /* tell the main thread that we are ready */
  82. pthread_mutex_lock(&core_arg->mutex);
  83. core_arg->worker_is_initialized = 1;
  84. pthread_cond_signal(&core_arg->ready_cond);
  85. pthread_mutex_unlock(&core_arg->mutex);
  86. job_t j;
  87. int res;
  88. while (machine_is_running())
  89. {
  90. j = pop_task();
  91. if (j == NULL) continue;
  92. /* can a core perform that task ? */
  93. if (!CORE_MAY_PERFORM(j))
  94. {
  95. /* put it and the end of the queue ... XXX */
  96. push_task(j);
  97. continue;
  98. }
  99. res = execute_job_on_core(j, core_arg);
  100. if (res != STARPU_SUCCESS) {
  101. switch (res) {
  102. case STARPU_FATAL:
  103. assert(0);
  104. case STARPU_TRYAGAIN:
  105. push_task(j);
  106. continue;
  107. default:
  108. assert(0);
  109. }
  110. }
  111. handle_job_termination(j);
  112. }
  113. #ifdef DATA_STATS
  114. fprintf(stderr, "CORE #%d computation %le comm %le (%lf \%%)\n", core_arg->id, core_arg->jobq->total_computation_time, core_arg->jobq->total_communication_time, core_arg->jobq->total_communication_time*100.0/core_arg->jobq->total_computation_time);
  115. #endif
  116. TRACE_WORKER_TERMINATED(FUT_CORE_KEY);
  117. pthread_exit(NULL);
  118. }