driver_mpi_source.c 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2015 Mathieu Lirzin <mthl@openmailbox.org>
  4. * Copyright (C) 2016 Inria
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <mpi.h>
  18. #include <errno.h>
  19. #include <starpu.h>
  20. #include <drivers/mpi/driver_mpi_source.h>
  21. #include <drivers/mpi/driver_mpi_common.h>
  22. #include <drivers/driver_common/driver_common.h>
  23. #include <drivers/mp_common/source_common.h>
  24. struct _starpu_mp_node *mpi_ms_nodes[STARPU_MAXMPIDEVS];
  25. void _starpu_mpi_source_init(struct _starpu_mp_node *node)
  26. {
  27. _starpu_mpi_common_mp_initialize_src_sink(node);
  28. //TODO
  29. }
  30. void _starpu_mpi_source_deinit(struct _starpu_mp_node *node)
  31. {
  32. }
  33. unsigned _starpu_mpi_src_get_device_count()
  34. {
  35. int nb_mpi_devices;
  36. if (!_starpu_mpi_common_is_mp_initialized())
  37. return 0;
  38. MPI_Comm_size(MPI_COMM_WORLD, &nb_mpi_devices);
  39. //Remove one for master
  40. nb_mpi_devices = nb_mpi_devices - 1;
  41. return nb_mpi_devices;
  42. }
  43. void _starpu_mpi_exit_useless_node(int devid)
  44. {
  45. struct _starpu_mp_node *node = _starpu_mp_common_node_create(STARPU_MPI_SOURCE, devid);
  46. _starpu_mp_common_send_command(node, STARPU_EXIT, NULL, 0);
  47. _starpu_mp_common_node_destroy(node);
  48. }
  49. void *_starpu_mpi_src_worker(void *arg)
  50. {
  51. struct _starpu_worker_set *worker_set = arg;
  52. /* As all workers of a set share common data, we just use the first
  53. * * one for intializing the following stuffs. */
  54. struct _starpu_worker *baseworker = &worker_set->workers[0];
  55. struct _starpu_machine_config *config = baseworker->config;
  56. unsigned baseworkerid = baseworker - config->workers;
  57. unsigned devid = baseworker->devid;
  58. unsigned i;
  59. /* unsigned memnode = baseworker->memory_node; */
  60. _starpu_driver_start(baseworker, _STARPU_FUT_MPI_KEY, 0);
  61. #ifdef STARPU_USE_FXT
  62. for (i = 1; i < worker_set->nworkers; i++)
  63. _starpu_worker_start(&worker_set->workers[i], _STARPU_FUT_MPI_KEY, 0);
  64. #endif
  65. // Current task for a thread managing a worker set has no sense.
  66. _starpu_set_current_task(NULL);
  67. for (i = 0; i < config->topology.nmpicores[devid]; i++)
  68. {
  69. struct _starpu_worker *worker = &config->workers[baseworkerid+i];
  70. snprintf(worker->name, sizeof(worker->name), "MPI_MS %d core %u", devid, i);
  71. snprintf(worker->short_name, sizeof(worker->short_name), "MPI_MS %d.%u", devid, i);
  72. }
  73. {
  74. char thread_name[16];
  75. snprintf(thread_name, sizeof(thread_name), "MPI_MS %d", devid);
  76. starpu_pthread_setname(thread_name);
  77. }
  78. for (i = 0; i < worker_set->nworkers; i++)
  79. {
  80. struct _starpu_worker *worker = &worker_set->workers[i];
  81. _STARPU_TRACE_WORKER_INIT_END(worker->workerid);
  82. }
  83. /* tell the main thread that this one is ready */
  84. STARPU_PTHREAD_MUTEX_LOCK(&worker_set->mutex);
  85. baseworker->status = STATUS_UNKNOWN;
  86. worker_set->set_is_initialized = 1;
  87. STARPU_PTHREAD_COND_SIGNAL(&worker_set->ready_cond);
  88. STARPU_PTHREAD_MUTEX_UNLOCK(&worker_set->mutex);
  89. _starpu_src_common_worker(worker_set, baseworkerid, mpi_ms_nodes[devid]);
  90. return NULL;
  91. }
  92. //void _starpu_mpi_source_send(const struct _starpu_mp_node *node, void *msg,
  93. // int len)
  94. //{
  95. // int dst = node->mp_connection.mpi_nodeid;
  96. // if (MPI_Send(msg, len, MPI_CHAR, dst, dst, MPI_COMM_WORLD))
  97. // STARPU_MP_COMMON_REPORT_ERROR(node, errno);
  98. //}
  99. //
  100. //void _starpu_mpi_source_recv(const struct _starpu_mp_node *node, void *msg,
  101. // int len)
  102. //{
  103. // int src = node->mp_connection.mpi_nodeid;
  104. // if (MPI_Recv(msg, len, MPI_CHAR, src, STARPU_MP_SRC_NODE,
  105. // MPI_COMM_WORLD, MPI_STATUS_IGNORE))
  106. // STARPU_MP_COMMON_REPORT_ERROR(node, errno);
  107. //}
  108. //
  109. //int _starpu_mpi_copy_src_to_sink(void *src,
  110. // unsigned src_node STARPU_ATTRIBUTE_UNUSED,
  111. // void *dst, unsigned dst_node, size_t size)
  112. //{
  113. // /* TODO */
  114. // return 0;
  115. //}
  116. //
  117. //int _starpu_mpi_copy_sink_to_src(void *src, unsigned src_node, void *dst,
  118. // unsigned dst_node STARPU_ATTRIBUTE_UNUSED,
  119. // size_t size)
  120. //{
  121. // /* TODO */
  122. // return 0;
  123. //}
  124. //
  125. //int _starpu_mpi_copy_sink_to_sink(void *src, unsigned src_node, void *dst,
  126. // unsigned dst_node, size_t size)
  127. //{
  128. // /* TODO */
  129. // return 0;
  130. //}
  131. //
  132. //void (*_starpu_mpi_get_kernel_from_job(const struct _starpu_mp_node *node,
  133. // struct _starpu_job *j))(void)
  134. //{
  135. // /* TODO */
  136. // return NULL;
  137. //}