driver_mic_sink.c 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2012 Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <errno.h>
  17. #include <dlfcn.h>
  18. #include <common/COISysInfo_common.h>
  19. #include <starpu.h>
  20. #include <drivers/mp_common/mp_common.h>
  21. #include <drivers/mp_common/sink_common.h>
  22. #include <datawizard/interfaces/data_interface.h>
  23. #include "driver_mic_common.h"
  24. #include "driver_mic_sink.h"
  25. /* Initialize the MIC sink, initializing connection to the source
  26. * and to the other devices (not implemented yet).
  27. */
  28. void _starpu_mic_sink_init(struct _starpu_mp_node *node)
  29. {
  30. pthread_t thread, self;
  31. cpu_set_t cpuset;
  32. pthread_attr_t attr;
  33. int i, ret;
  34. struct arg_sink_thread * arg;
  35. /*Bind on the first core*/
  36. self = pthread_self();
  37. CPU_ZERO(&cpuset);
  38. CPU_SET(241,&cpuset);
  39. pthread_setaffinity_np(self,sizeof(cpu_set_t),&cpuset);
  40. /* Initialize connection with the source */
  41. _starpu_mic_common_accept(&node->mp_connection.mic_endpoint,
  42. STARPU_MIC_SOURCE_PORT_NUMBER);
  43. _starpu_mic_common_accept(&node->host_sink_dt_connection.mic_endpoint,
  44. STARPU_MIC_SOURCE_DT_PORT_NUMBER);
  45. node->is_running = 1;
  46. node->nb_cores = COISysGetHardwareThreadCount() - COISysGetHardwareThreadCount() / COISysGetCoreCount();
  47. node->thread_table = malloc(sizeof(pthread_t)*node->nb_cores);
  48. node->run_table = malloc(sizeof(struct mp_task *)*node->nb_cores);
  49. node->sem_run_table = malloc(sizeof(sem_t)*node->nb_cores);
  50. node->barrier_list = mp_barrier_list_new();
  51. node->message_queue = mp_message_list_new();
  52. STARPU_PTHREAD_MUTEX_INIT(&node->message_queue_mutex,NULL);
  53. STARPU_PTHREAD_MUTEX_INIT(&node->barrier_mutex,NULL);
  54. STARPU_PTHREAD_BARRIER_INIT(&node->init_completed_barrier, NULL, node->nb_cores+1);
  55. /*for each core init the mutex, the task pointer and launch the thread */
  56. for(i=0; i<node->nb_cores; i++)
  57. {
  58. node->run_table[i] = NULL;
  59. sem_init(&node->sem_run_table[i],0,0);
  60. //init the set
  61. CPU_ZERO(&cpuset);
  62. CPU_SET(i,&cpuset);
  63. ret = pthread_attr_init(&attr);
  64. STARPU_ASSERT(ret == 0);
  65. ret = pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpuset);
  66. STARPU_ASSERT(ret == 0);
  67. /*prepare the argument for the thread*/
  68. arg= malloc(sizeof(struct arg_sink_thread));
  69. arg->coreid = i;
  70. arg->node = node;
  71. arg->sem = &node->sem_run_table[i];
  72. ret = pthread_create(&thread, &attr, _starpu_sink_thread, arg);
  73. ((pthread_t *)node->thread_table)[i] = thread;
  74. STARPU_ASSERT(ret == 0);
  75. }
  76. //node->sink_sink_dt_connections = malloc(node->nb_mp_sinks * sizeof(union _starpu_mp_connection));
  77. //for (i = 0; i < (unsigned int)node->devid; ++i)
  78. // _starpu_mic_common_connect(&node->sink_sink_dt_connections[i].mic_endpoint,
  79. // STARPU_TO_MIC_ID(i),
  80. // STARPU_MIC_SINK_SINK_DT_PORT_NUMBER(node->devid, i),
  81. // STARPU_MIC_SINK_SINK_DT_PORT_NUMBER(i, node->devid));
  82. //for (i = node->devid + 1; i < node->nb_mp_sinks; ++i)
  83. // _starpu_mic_common_accept(&node->sink_sink_dt_connections[i].mic_endpoint,
  84. // STARPU_MIC_SINK_SINK_DT_PORT_NUMBER(node->devid, i));
  85. }
  86. /* Deinitialize the MIC sink, close all the connections.
  87. */
  88. void _starpu_mic_sink_deinit(struct _starpu_mp_node *node)
  89. {
  90. int i;
  91. node->is_running = 0;
  92. for(i=0; i<node->nb_cores; i++)
  93. {
  94. sem_post(&node->sem_run_table[i]);
  95. pthread_join(((pthread_t *)node->thread_table)[i],NULL);
  96. sem_destroy(&node->sem_run_table[i]);
  97. }
  98. free(node->thread_table);
  99. free(node->run_table);
  100. free(node->sem_run_table);
  101. mp_barrier_list_delete(node->barrier_list);
  102. mp_message_list_delete(node->message_queue);
  103. STARPU_PTHREAD_MUTEX_DESTROY(&node->message_queue_mutex);
  104. STARPU_PTHREAD_MUTEX_DESTROY(&node->barrier_mutex);
  105. STARPU_PTHREAD_BARRIER_DESTROY(&node->init_completed_barrier);
  106. //unsigned int i;
  107. //for (i = 0; i < node->nb_mp_sinks; ++i)
  108. //{
  109. // if (i != (unsigned int)node->devid)
  110. // scif_close(node->sink_sink_dt_connections[i].mic_endpoint);
  111. //}
  112. //free(node->sink_sink_dt_connections);
  113. scif_close(node->host_sink_dt_connection.mic_endpoint);
  114. scif_close(node->mp_connection.mic_endpoint);
  115. }
  116. /* Report an error which occured when using a MIC device
  117. * and print this error in a human-readable style
  118. */
  119. void _starpu_mic_sink_report_error(const char *func, const char *file, const int line, const int status)
  120. {
  121. const char *errormsg = strerror(status);
  122. printf("SINK: oops in %s (%s:%u)... %d: %s \n", func, file, line, status, errormsg);
  123. STARPU_ASSERT(0);
  124. }
  125. /* Allocate memory on the MIC.
  126. * Memory is register for remote direct access. */
  127. void _starpu_mic_sink_allocate(const struct _starpu_mp_node *mp_node, void *arg, int arg_size)
  128. {
  129. STARPU_ASSERT(arg_size == sizeof(size_t));
  130. void *addr = NULL;
  131. size_t size = *(size_t *)(arg);
  132. if (posix_memalign(&addr, STARPU_MIC_PAGE_SIZE, size) != 0)
  133. _starpu_mp_common_send_command(mp_node, STARPU_ERROR_ALLOCATE, NULL, 0);
  134. #ifndef STARPU_DISABLE_ASYNCHRONOUS_MIC_COPY
  135. scif_epd_t epd = mp_node->host_sink_dt_connection.mic_endpoint;
  136. size_t window_size = STARPU_MIC_GET_PAGE_SIZE_MULTIPLE(size);
  137. if (scif_register(epd, addr, window_size, (off_t)addr, SCIF_PROT_READ | SCIF_PROT_WRITE, SCIF_MAP_FIXED) < 0)
  138. {
  139. free(addr);
  140. _starpu_mp_common_send_command(mp_node, STARPU_ERROR_ALLOCATE, NULL, 0);
  141. }
  142. #endif
  143. _starpu_mp_common_send_command(mp_node, STARPU_ANSWER_ALLOCATE, &addr, sizeof(addr));
  144. }
  145. /* Unregister and free memory. */
  146. void _starpu_mic_sink_free(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, void *arg, int arg_size)
  147. {
  148. STARPU_ASSERT(arg_size == sizeof(struct _starpu_mic_free_command));
  149. void *addr = ((struct _starpu_mic_free_command *)arg)->addr;
  150. #ifndef STARPU_DISABLE_ASYNCHRONOUS_MIC_COPY
  151. scif_epd_t epd = mp_node->host_sink_dt_connection.mic_endpoint;
  152. size_t size = ((struct _starpu_mic_free_command *)arg)->size;
  153. size_t window_size = STARPU_MIC_GET_PAGE_SIZE_MULTIPLE(size);
  154. scif_unregister(epd, (off_t)addr, window_size);
  155. #endif
  156. free(addr);
  157. }
  158. /* bind the thread to a core
  159. */
  160. void _starpu_mic_sink_bind_thread(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, int coreid, int * core_table, int nb_core)
  161. {
  162. cpu_set_t cpuset;
  163. int i;
  164. //init the set
  165. CPU_ZERO(&cpuset);
  166. //adding the core to the set
  167. for(i=0;i<nb_core;i++)
  168. CPU_SET(core_table[i],&cpuset);
  169. pthread_setaffinity_np(((pthread_t*)mp_node->thread_table)[coreid],sizeof(cpu_set_t),&cpuset);
  170. }
  171. void (*_starpu_mic_sink_lookup (const struct _starpu_mp_node * node STARPU_ATTRIBUTE_UNUSED, char* func_name))(void)
  172. {
  173. void *dl_handle = dlopen(NULL, RTLD_NOW);
  174. return dlsym(dl_handle, func_name);
  175. }