driver_opencl.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. /*
  2. * StarPU
  3. * Copyright (C) INRIA 2008-2010 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <math.h>
  17. #include <starpu.h>
  18. #include <starpu_profiling.h>
  19. #include <common/config.h>
  20. #include <common/utils.h>
  21. #include <core/debug.h>
  22. #include <starpu_opencl.h>
  23. #include "driver_opencl.h"
  24. #include "driver_opencl_utils.h"
  25. #include <common/utils.h>
  26. #include <profiling/profiling.h>
  27. static cl_context contexts[STARPU_MAXOPENCLDEVS];
  28. static cl_device_id devices[STARPU_MAXOPENCLDEVS];
  29. static cl_command_queue queues[STARPU_MAXOPENCLDEVS];
  30. static cl_uint nb_devices = -1;
  31. static int init_done = 0;
  32. extern char *_starpu_opencl_codelet_dir;
  33. void starpu_opencl_get_context(int devid, cl_context *context)
  34. {
  35. *context = contexts[devid];
  36. }
  37. void starpu_opencl_get_device(int devid, cl_device_id *device)
  38. {
  39. *device = devices[devid];
  40. }
  41. void starpu_opencl_get_queue(int devid, cl_command_queue *queue)
  42. {
  43. *queue = queues[devid];
  44. }
  45. int _starpu_opencl_init_context(int devid)
  46. {
  47. cl_int err;
  48. cl_device_id device;
  49. _STARPU_OPENCL_DEBUG("Initialising context for dev %d\n", devid);
  50. // Create a compute context
  51. device = devices[devid];
  52. contexts[devid] = clCreateContext(NULL, 1, &device, NULL, NULL, &err);
  53. if (err != CL_SUCCESS) STARPU_OPENCL_REPORT_ERROR(err);
  54. // Create queue for the given device
  55. queues[devid] = clCreateCommandQueue(contexts[devid], devices[devid], 0, &err);
  56. if (err != CL_SUCCESS) STARPU_OPENCL_REPORT_ERROR(err);
  57. _starpu_opencl_init_programs(devid);
  58. return EXIT_SUCCESS;
  59. }
  60. int _starpu_opencl_deinit_context(int devid)
  61. {
  62. int err;
  63. _STARPU_OPENCL_DEBUG("De-initialising context for dev %d\n", devid);
  64. err = clReleaseContext(contexts[devid]);
  65. if (err != CL_SUCCESS) STARPU_OPENCL_REPORT_ERROR(err);
  66. err = clReleaseCommandQueue(queues[devid]);
  67. if (err != CL_SUCCESS) STARPU_OPENCL_REPORT_ERROR(err);
  68. _starpu_opencl_release_programs(devid);
  69. return EXIT_SUCCESS;
  70. }
  71. int _starpu_opencl_allocate_memory(void **addr, size_t size, cl_mem_flags flags)
  72. {
  73. cl_int err;
  74. cl_mem address;
  75. struct starpu_worker_s *worker = _starpu_get_local_worker_key();
  76. address = clCreateBuffer(contexts[worker->devid], flags, size, NULL, &err);
  77. if (err != CL_SUCCESS) STARPU_OPENCL_REPORT_ERROR(err);
  78. *addr = address;
  79. return EXIT_SUCCESS;
  80. }
  81. int _starpu_opencl_copy_to_opencl(void *ptr, cl_mem buffer, size_t size, size_t offset, cl_event *event)
  82. {
  83. int err;
  84. struct starpu_worker_s *worker = _starpu_get_local_worker_key();
  85. if (event == NULL) {
  86. err = clEnqueueWriteBuffer(queues[worker->devid], buffer, CL_TRUE, offset, size, ptr, 0, NULL, NULL);
  87. }
  88. else {
  89. err = clEnqueueWriteBuffer(queues[worker->devid], buffer, CL_FALSE, offset, size, ptr, 0, NULL, event);
  90. }
  91. if (err != CL_SUCCESS) STARPU_OPENCL_REPORT_ERROR(err);
  92. return EXIT_SUCCESS;
  93. }
  94. int _starpu_opencl_copy_from_opencl(cl_mem buffer, void *ptr, size_t size, size_t offset, cl_event *event)
  95. {
  96. int err;
  97. struct starpu_worker_s *worker = _starpu_get_local_worker_key();
  98. if (event == NULL) {
  99. err = clEnqueueReadBuffer(queues[worker->devid], buffer, CL_TRUE, offset, size, ptr, 0, NULL, NULL);
  100. }
  101. else {
  102. err = clEnqueueReadBuffer(queues[worker->devid], buffer, CL_FALSE, offset, size, ptr, 0, NULL, event);
  103. }
  104. if (err != CL_SUCCESS) STARPU_OPENCL_REPORT_ERROR(err);
  105. return EXIT_SUCCESS;
  106. }
  107. void _starpu_opencl_init()
  108. {
  109. if (!init_done) {
  110. cl_platform_id platform_id[STARPU_OPENCL_PLATFORM_MAX];
  111. cl_uint nb_platforms;
  112. cl_device_type device_type = CL_DEVICE_TYPE_GPU;
  113. cl_int err;
  114. _STARPU_OPENCL_DEBUG("Initialising OpenCL\n");
  115. // Get Platforms
  116. err = clGetPlatformIDs(STARPU_OPENCL_PLATFORM_MAX, platform_id, &nb_platforms);
  117. if (err != CL_SUCCESS) STARPU_OPENCL_REPORT_ERROR(err);
  118. _STARPU_OPENCL_DEBUG("Platforms detected: %d\n", nb_platforms);
  119. // Get devices
  120. nb_devices = 0;
  121. {
  122. unsigned int i;
  123. for (i=0; i<nb_platforms; i++) {
  124. cl_uint num;
  125. #ifdef STARPU_VERBOSE
  126. {
  127. char name[1024], vendor[1024];
  128. clGetPlatformInfo(platform_id[i], CL_PLATFORM_NAME, 1024, name, NULL);
  129. clGetPlatformInfo(platform_id[i], CL_PLATFORM_VENDOR, 1024, vendor, NULL);
  130. _STARPU_OPENCL_DEBUG("Platform: %s - %s\n", name, vendor);
  131. }
  132. #endif
  133. err = clGetDeviceIDs(platform_id[i], device_type, STARPU_MAXOPENCLDEVS-nb_devices, &devices[nb_devices], &num);
  134. if (err == CL_DEVICE_NOT_FOUND) {
  135. _STARPU_OPENCL_DEBUG(" No devices detected on this platform\n");
  136. }
  137. else {
  138. if (err != CL_SUCCESS) STARPU_OPENCL_REPORT_ERROR(err);
  139. _STARPU_OPENCL_DEBUG(" %d devices detected\n", num);
  140. nb_devices += num;
  141. }
  142. }
  143. }
  144. // Get location of OpenCl codelet source files
  145. _starpu_opencl_codelet_dir = getenv("STARPU_OPENCL_CODELET_DIR");
  146. init_done=1;
  147. }
  148. }
  149. static unsigned _starpu_opencl_get_device_name(int dev, char *name, int lname);
  150. static int _starpu_opencl_execute_job(starpu_job_t j, struct starpu_worker_s *args);
  151. void *_starpu_opencl_worker(void *arg)
  152. {
  153. struct starpu_worker_s* args = arg;
  154. struct starpu_jobq_s *jobq = args->jobq;
  155. int devid = args->devid;
  156. int workerid = args->workerid;
  157. #ifdef USE_FXT
  158. fxt_register_thread(args->bindid);
  159. #endif
  160. unsigned memnode = args->memory_node;
  161. STARPU_TRACE_WORKER_INIT_START(STARPU_FUT_OPENCL_KEY, devid, memnode);
  162. _starpu_bind_thread_on_cpu(args->config, args->bindid);
  163. _starpu_set_local_memory_node_key(&memnode);
  164. _starpu_set_local_queue(jobq);
  165. _starpu_set_local_worker_key(args);
  166. _starpu_opencl_init_context(devid);
  167. /* one more time to avoid hacks from third party lib :) */
  168. _starpu_bind_thread_on_cpu(args->config, args->bindid);
  169. args->status = STATUS_UNKNOWN;
  170. /* get the device's name */
  171. char devname[128];
  172. _starpu_opencl_get_device_name(devid, devname, 128);
  173. snprintf(args->name, 32, "OpenCL %d (%s)", args->devid, devname);
  174. _STARPU_OPENCL_DEBUG("OpenCL (%s) dev id %d thread is ready to run on CPU %d !\n", devname, devid, args->bindid);
  175. STARPU_TRACE_WORKER_INIT_END
  176. /* tell the main thread that this one is ready */
  177. PTHREAD_MUTEX_LOCK(&args->mutex);
  178. args->worker_is_initialized = 1;
  179. PTHREAD_COND_SIGNAL(&args->ready_cond);
  180. PTHREAD_MUTEX_UNLOCK(&args->mutex);
  181. struct starpu_job_s * j;
  182. int res;
  183. struct starpu_sched_policy_s *policy = _starpu_get_sched_policy();
  184. struct starpu_jobq_s *queue = policy->starpu_get_local_queue(policy);
  185. while (_starpu_machine_is_running())
  186. {
  187. STARPU_TRACE_START_PROGRESS(memnode);
  188. _starpu_datawizard_progress(memnode, 1);
  189. STARPU_TRACE_END_PROGRESS(memnode);
  190. _starpu_execute_registered_progression_hooks();
  191. _starpu_jobq_lock(queue);
  192. /* perhaps there is some local task to be executed first */
  193. j = _starpu_pop_local_task(args);
  194. /* otherwise ask a task to the scheduler */
  195. if (!j)
  196. j = _starpu_pop_task();
  197. if (j == NULL)
  198. {
  199. if (_starpu_worker_can_block(memnode))
  200. _starpu_block_worker(workerid, &queue->activity_cond, &queue->activity_mutex);
  201. _starpu_jobq_unlock(queue);
  202. continue;
  203. };
  204. _starpu_jobq_unlock(queue);
  205. /* can OpenCL do that task ? */
  206. if (!STARPU_OPENCL_MAY_PERFORM(j))
  207. {
  208. /* this is not a OpenCL task */
  209. _starpu_push_task(j, 0);
  210. continue;
  211. }
  212. _starpu_set_current_task(j->task);
  213. res = _starpu_opencl_execute_job(j, args);
  214. _starpu_set_current_task(NULL);
  215. if (res) {
  216. switch (res) {
  217. case -EAGAIN:
  218. fprintf(stderr, "ouch, put the codelet %p back ... \n", j);
  219. _starpu_push_task(j, 0);
  220. STARPU_ABORT();
  221. continue;
  222. default:
  223. assert(0);
  224. }
  225. }
  226. _starpu_handle_job_termination(j, 0);
  227. }
  228. STARPU_TRACE_WORKER_DEINIT_START
  229. _starpu_opencl_deinit_context(devid);
  230. #ifdef DATA_STATS
  231. fprintf(stderr, "OpenCL #%d computation %le comm %le (%lf \%%)\n", args->id, jobq->total_computation_time, jobq->total_communication_time, jobq->total_communication_time*100.0/jobq->total_computation_time);
  232. #endif
  233. #ifdef STARPU_VERBOSE
  234. double ratio = 0;
  235. if (jobq->total_job_performed != 0)
  236. {
  237. ratio = jobq->total_computation_time_error/jobq->total_computation_time;
  238. }
  239. _starpu_print_to_logfile("MODEL ERROR: OpenCL %d ERROR %lf EXEC %lf RATIO %lf NTASKS %d\n", args->devid, jobq->total_computation_time_error, jobq->total_computation_time, ratio, jobq->total_job_performed);
  240. #endif
  241. pthread_exit(NULL);
  242. return NULL;
  243. }
  244. static unsigned _starpu_opencl_get_device_name(int dev, char *name, int lname)
  245. {
  246. int err;
  247. if (!init_done) {
  248. _starpu_opencl_init();
  249. }
  250. // Get device name
  251. err = clGetDeviceInfo(devices[dev], CL_DEVICE_NAME, lname, name, NULL);
  252. if (err != CL_SUCCESS) STARPU_OPENCL_REPORT_ERROR(err);
  253. _STARPU_OPENCL_DEBUG("Device %d : [%s]\n", dev, name);
  254. return EXIT_SUCCESS;
  255. }
  256. unsigned _starpu_opencl_get_device_count(void)
  257. {
  258. if (!init_done) {
  259. _starpu_opencl_init();
  260. }
  261. return nb_devices;
  262. }
  263. static int _starpu_opencl_execute_job(starpu_job_t j, struct starpu_worker_s *args)
  264. {
  265. int ret;
  266. uint32_t mask = 0;
  267. STARPU_ASSERT(j);
  268. struct starpu_task *task = j->task;
  269. struct timespec codelet_start, codelet_end;
  270. struct timespec codelet_start_comm, codelet_end_comm;
  271. int64_t start_time;
  272. int64_t end_time;
  273. unsigned calibrate_model = 0;
  274. int workerid = args->workerid;
  275. STARPU_ASSERT(task);
  276. struct starpu_codelet_t *cl = task->cl;
  277. STARPU_ASSERT(cl);
  278. if (cl->model && cl->model->benchmarking)
  279. calibrate_model = 1;
  280. /* we do not take communication into account when modeling the performance */
  281. if (STARPU_BENCHMARK_COMM)
  282. {
  283. //barrier(CLK_GLOBAL_MEM_FENCE);
  284. starpu_clock_gettime(&codelet_start_comm);
  285. }
  286. ret = _starpu_fetch_task_input(task, mask);
  287. if (ret != 0) {
  288. /* there was not enough memory, so the input of
  289. * the codelet cannot be fetched ... put the
  290. * codelet back, and try it later */
  291. return -EAGAIN;
  292. }
  293. if (calibrate_model || STARPU_BENCHMARK_COMM)
  294. {
  295. //barrier(CLK_GLOBAL_MEM_FENCE);
  296. starpu_clock_gettime(&codelet_end_comm);
  297. }
  298. STARPU_TRACE_START_CODELET_BODY(j);
  299. int profiling_status = starpu_profiling_status_get();
  300. if (profiling_status)
  301. start_time = (int64_t)_starpu_timing_now();
  302. args->status = STATUS_EXECUTING;
  303. task->status = STARPU_TASK_RUNNING;
  304. cl_func func = cl->opencl_func;
  305. STARPU_ASSERT(func);
  306. starpu_clock_gettime(&codelet_start);
  307. func(task->interface, task->cl_arg);
  308. cl->per_worker_stats[workerid]++;
  309. if (profiling_status)
  310. end_time = (int64_t)_starpu_timing_now();
  311. struct starpu_task_profiling_info *profiling_info;
  312. profiling_info = task->profiling_info;
  313. if (profiling_info)
  314. {
  315. profiling_info->start_time = start_time;
  316. profiling_info->end_time = end_time;
  317. profiling_info->workerid = workerid;
  318. }
  319. if (profiling_status)
  320. _starpu_worker_update_profiling_info(workerid, end_time - start_time, 0, 1);
  321. starpu_clock_gettime(&codelet_end);
  322. args->status = STATUS_UNKNOWN;
  323. STARPU_TRACE_END_CODELET_BODY(j);
  324. if (calibrate_model || STARPU_BENCHMARK_COMM)
  325. {
  326. double measured = _starpu_timing_timespec_delay_us(&codelet_start, &codelet_end);
  327. double measured_comm = _starpu_timing_timespec_delay_us(&codelet_start_comm, &codelet_end_comm);
  328. args->jobq->total_computation_time += measured;
  329. args->jobq->total_communication_time += measured_comm;
  330. double error;
  331. error = fabs(STARPU_MAX(measured, 0.0) - STARPU_MAX(j->predicted, 0.0));
  332. args->jobq->total_computation_time_error += error;
  333. if (calibrate_model)
  334. _starpu_update_perfmodel_history(j, args->perf_arch, (unsigned)args->devid, measured);
  335. }
  336. (void)STARPU_ATOMIC_ADD(&args->jobq->total_job_performed, 1);
  337. _starpu_push_task_output(task, mask);
  338. return EXIT_SUCCESS;
  339. }