瀏覽代碼

Nodes are correctly initiated and can send and recv some messages

Corentin Salingue 8 年之前
父節點
當前提交
265c0614c9

+ 1 - 1
src/core/topology.c

@@ -977,7 +977,7 @@ _starpu_init_mp_config (struct _starpu_machine_config *config,
 #endif
 #ifdef STARPU_USE_MPI_MASTER_SLAVE
     {
-        /* Discover and initialize the number of MIC nodes through the mp
+        /* Discover and initialize the number of MPI nodes through the mp
          * infrastructure. */
         unsigned nhwmpidevices = _starpu_mpi_src_get_device_count();
 

+ 1 - 1
src/core/workers.c

@@ -1272,7 +1272,7 @@ int starpu_initialize(struct starpu_conf *user_conf, int *argc, char ***argv)
 #endif
 #ifdef STARPU_USE_MPI_MASTER_SLAVE
         if (_starpu_mpi_common_is_mp_initialized())
-            _starpu_mpi_src_mp_deinit();
+            _starpu_mpi_common_mp_deinit();
 #endif
 
 		initialized = UNINITIALIZED;

+ 27 - 19
src/drivers/mp_common/mp_common.c

@@ -26,6 +26,9 @@
 #include <drivers/scc/driver_scc_common.h>
 #include <drivers/scc/driver_scc_source.h>
 #include <drivers/scc/driver_scc_sink.h>
+#include <drivers/mpi/driver_mpi_common.h>
+#include <drivers/mpi/driver_mpi_source.h>
+#include <drivers/mpi/driver_mpi_sink.h>
 
 #include <common/list.h>
 
@@ -225,22 +228,26 @@ _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind,
     /*
 		node->nb_mp_sinks = 
 		node->devid = 
+    */
+        node->mp_connection.mpi_remote_nodeid = peer_id+1;
 
         node->init = _starpu_mpi_source_init;
         node->launch_workers = NULL;
         node->deinit = _starpu_mpi_source_deinit;
-        node->report_error = 
+   /*     node->report_error = 
 
-		node->mp_recv_is_ready = 
-		node->mp_send = 
-		node->mp_recv = 
-		node->dt_send = 
-		node->dt_recv = 
+	*/	node->mp_recv_is_ready = _starpu_mpi_common_recv_is_ready;
+		node->mp_send = _starpu_mpi_common_send;
+		node->mp_recv = _starpu_mpi_common_recv;
+		node->dt_send = _starpu_mpi_common_send;
+		node->dt_recv = _starpu_mpi_common_recv;
+        node->dt_send_to_device = _starpu_mpi_common_send_to_device;
+        node->dt_recv_from_device = _starpu_mpi_common_recv_from_device;
 
-		node->get_kernel_from_job = 
+/*		node->get_kernel_from_job = 
 		node->lookup = 
-		node->bind_thread = 
-		node->execute = 
+*/		node->bind_thread = NULL;
+/*		node->execute = 
 		node->allocate = 
 		node->free = 
 
@@ -253,22 +260,24 @@ _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind,
     /*
 		node->nb_mp_sinks = 
 		node->devid = 
+    */
+        node->mp_connection.mpi_remote_nodeid = _starpu_mpi_common_get_src_node();
 
         node->init = _starpu_mpi_sink_init;
         node->launch_workers = _starpu_mpi_sink_launch_workers;
         node->deinit = _starpu_mpi_sink_deinit;
-        node->report_error = 
+    /*    node->report_error = 
 
-		node->mp_recv_is_ready = ;
-		node->mp_send = 
-		node->mp_recv = 
-		node->dt_send = 
-		node->dt_recv = 
+	*/	node->mp_recv_is_ready = _starpu_mpi_common_recv_is_ready;
+        node->mp_send = _starpu_mpi_common_send;
+		node->mp_recv = _starpu_mpi_common_recv;
+		node->dt_send = _starpu_mpi_common_send;
+		node->dt_recv = _starpu_mpi_common_recv;
 
-		node->get_kernel_from_job = 
+	/*	node->get_kernel_from_job = 
 		node->lookup = 
-		node->bind_thread = 
-		node->execute = 
+*/		node->bind_thread = _starpu_mpi_sink_bind_thread;
+/*		node->execute = 
 		node->allocate = 
 		node->free = 
 
@@ -306,7 +315,6 @@ _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind,
 		}
 		mp_barrier_list_init(&node->barrier_list);
 		STARPU_PTHREAD_MUTEX_INIT(&node->barrier_mutex,NULL);
-
 		STARPU_PTHREAD_BARRIER_INIT(&node->init_completed_barrier, NULL, node->nb_cores+1);
 
 		node->launch_workers(node);

+ 1 - 1
src/drivers/mp_common/mp_common.h

@@ -89,7 +89,7 @@ union _starpu_mp_connection
 	int scc_nodeid;
 #endif
 #ifdef STARPU_USE_MPI_MASTER_SLAVE
-	int mpi_nodeid;
+	int mpi_remote_nodeid;
 #endif
 };
 

+ 64 - 1
src/drivers/mpi/driver_mpi_common.c

@@ -24,6 +24,9 @@
 static int mpi_initialized;
 static int src_node_id;
 
+#define STARPU_MPI_MS_MSG_TAG 42
+
+
 static void _starpu_mpi_set_src_node_id()
 {
 	int node_id = starpu_get_env_number("STARPU_MPI_MASTER_NODE");
@@ -65,12 +68,13 @@ int _starpu_mpi_common_mp_init()
     return 1;
 }
 
-void _starpu_mpi_src_mp_deinit()
+void _starpu_mpi_common_mp_deinit()
 {
     MPI_Finalize();    
     mpi_initialized = 0;
 }
 
+
 int _starpu_mpi_common_is_src_node()
 {   
     int id_proc;
@@ -78,8 +82,67 @@ int _starpu_mpi_common_is_src_node()
     return id_proc == src_node_id;
 } 
 
+int _starpu_mpi_common_get_src_node()
+{
+    return src_node_id;
+}
+
 int _starpu_mpi_common_is_mp_initialized()
 {
 	return mpi_initialized;
 }
 
+/* common parts to initialize a source or a sink node */
+void _starpu_mpi_common_mp_initialize_src_sink(struct _starpu_mp_node *node)
+{
+    /* TODO : use hwloc */
+    node->nb_cores = 4;
+
+    /* TODO next */
+
+}
+
+int _starpu_mpi_common_recv_is_ready(const struct _starpu_mp_node *mp_node)
+{
+    int res;
+    int flag = 0;
+
+    res = MPI_Iprobe(MPI_ANY_SOURCE, STARPU_MPI_MS_MSG_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
+    STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot test if we received a message !");
+
+    return flag;
+}
+
+/* SEND to source node */
+void _starpu_mpi_common_send(const struct _starpu_mp_node *node, void *msg, int len)
+{
+    printf("envoi %d B to %d \n", len, node->mp_connection.mpi_remote_nodeid);
+    int res;
+    res = MPI_Send(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, STARPU_MPI_MS_MSG_TAG, MPI_COMM_WORLD);
+    STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
+}
+
+/* RECV to source node */
+void _starpu_mpi_common_recv(const struct _starpu_mp_node *node, void *msg, int len)
+{
+    printf("recv %d B from %d \n", len, node->mp_connection.mpi_remote_nodeid);
+    int res;
+    res = MPI_Recv(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, STARPU_MPI_MS_MSG_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
+    STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
+}
+
+/* SEND to any node */
+void _starpu_mpi_common_send_to_device(const struct _starpu_mp_node *node, int dst_devid, void *msg, int len)
+{   
+    int res;
+    res = MPI_Send(msg, len, MPI_BYTE, dst_devid, STARPU_MPI_MS_MSG_TAG, MPI_COMM_WORLD);
+    STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
+}
+
+/* RECV to any node */
+void _starpu_mpi_common_recv_from_device(const struct _starpu_mp_node *node, int src_devid, void *msg, int len)
+{
+    int res;
+    res = MPI_Recv(msg, len, MPI_BYTE, src_devid, STARPU_MPI_MS_MSG_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
+    STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
+}

+ 13 - 1
src/drivers/mpi/driver_mpi_common.h

@@ -24,9 +24,21 @@
 #ifdef STARPU_USE_MPI_MASTER_SLAVE
 
 int _starpu_mpi_common_mp_init();
-void _starpu_mpi_src_mp_deinit();
+void _starpu_mpi_common_mp_deinit();
+
 int _starpu_mpi_common_is_src_node();
+int _starpu_mpi_common_get_src_node();
+
 int _starpu_mpi_common_is_mp_initialized();
+int _starpu_mpi_common_recv_is_ready(const struct _starpu_mp_node *mp_node);
+
+void _starpu_mpi_common_mp_initialize_src_sink(struct _starpu_mp_node *node);
+
+void _starpu_mpi_common_send(const struct _starpu_mp_node *node, void *msg, int len);
+void _starpu_mpi_common_recv(const struct _starpu_mp_node *node, void *msg, int len);
+
+void _starpu_mpi_common_recv_from_device(const struct _starpu_mp_node *node, int src_devid, void *msg, int len);
+void _starpu_mpi_common_send_to_device(const struct _starpu_mp_node *node, int dst_devid, void *msg, int len);
 
 
 #endif  /* STARPU_USE_MPI_MASTER_SLAVE */

+ 38 - 0
src/drivers/mpi/driver_mpi_sink.c

@@ -18,22 +18,60 @@
 
 #include <mpi.h>
 #include "driver_mpi_sink.h"
+#include "driver_mpi_common.h"
 
 void _starpu_mpi_sink_init(struct _starpu_mp_node *node)
 {
+    _starpu_mpi_common_mp_initialize_src_sink(node);
+
+    node->thread_table = malloc(sizeof(starpu_pthread_t)*node->nb_cores);
     //TODO
 }
 
 void _starpu_mpi_sink_deinit(struct _starpu_mp_node *node)
 {
+    free(node->thread_table);
     //TODO
 }
 
 void _starpu_mpi_sink_launch_workers(struct _starpu_mp_node *node)
 {
     //TODO
+    int i, ret;
+    struct arg_sink_thread * arg;
+    cpu_set_t cpuset;
+    starpu_pthread_attr_t attr;
+    starpu_pthread_t thread;
+
+    for(i=0; i < node->nb_cores; i++)
+    {
+        //init the set
+        CPU_ZERO(&cpuset);
+        CPU_SET(i,&cpuset);
+
+        ret = starpu_pthread_attr_init(&attr);
+        STARPU_ASSERT(ret == 0);
+        ret = pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpuset);
+        STARPU_ASSERT(ret == 0);
+
+        /*prepare the argument for the thread*/
+        arg= malloc(sizeof(struct arg_sink_thread));
+        arg->coreid = i;
+        arg->node = node;
+
+        ret = starpu_pthread_create(&thread, &attr, _starpu_sink_thread, arg);
+        STARPU_ASSERT(ret == 0);
+        ((starpu_pthread_t *)node->thread_table)[i] = thread;
+
+    }
 }
 
+void _starpu_mpi_sink_bind_thread(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, int coreid, int * core_table, int nb_core)
+{
+    //TODO
+}
+
+
 //void _starpu_mpi_sink_send(const struct _starpu_mp_node *sink, void *msg,
 //			   int len)
 //{

+ 5 - 0
src/drivers/mpi/driver_mpi_sink.h

@@ -22,6 +22,11 @@
 
 #ifdef STARPU_USE_MPI_MASTER_SLAVE
 
+void _starpu_mpi_sink_init(struct _starpu_mp_node *node);
+void _starpu_mpi_sink_deinit(struct _starpu_mp_node *node);
+void _starpu_mpi_sink_launch_workers(struct _starpu_mp_node *node);
+void _starpu_mpi_sink_bind_thread(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, int coreid, int * core_table, int nb_core);
+
 ///* Send *MSG which can be an answer or data, to MPI SOURCE. */
 //extern void _starpu_mpi_sink_send(const struct _starpu_mp_node *source,
 //				  void *msg, int len);

+ 2 - 15
src/drivers/mpi/driver_mpi_source.c

@@ -29,28 +29,15 @@
 
 struct _starpu_mp_node *mpi_ms_nodes[STARPU_MAXMPIDEVS];
 
-//static void _starpu_mpi_src_init_context(int devid)
-//{
-//    mpi_mp_nodes[devid] = _starpu_mp_common_node_create(STARPU_MPI_SOURCE, devid);
-//}
-
-static void _starpu_mpi_src_deinit_context(int devid)
-{
-    _starpu_mp_common_send_command(mpi_ms_nodes[devid], STARPU_EXIT, NULL, 0);
-
-    _starpu_mp_common_node_destroy(mpi_ms_nodes[devid]);
-}
-
-
-
 void _starpu_mpi_source_init(struct _starpu_mp_node *node)
 {
+    _starpu_mpi_common_mp_initialize_src_sink(node);
     //TODO
 }
 
 void _starpu_mpi_source_deinit(struct _starpu_mp_node *node)
 {
-    //TODO
+
 }
 
 unsigned _starpu_mpi_src_get_device_count()

+ 3 - 0
src/drivers/mpi/driver_mpi_source.h

@@ -30,6 +30,9 @@ unsigned _starpu_mpi_src_get_device_count();
 void *_starpu_mpi_src_worker(void *arg);
 void _starpu_mpi_exit_useless_node(int devid);
 
+void _starpu_mpi_source_init(struct _starpu_mp_node *node);
+void _starpu_mpi_source_deinit(struct _starpu_mp_node *node);
+
 ///* Send *MSG which can be a command or data, to a MPI sink. */
 //extern void _starpu_mpi_source_send(const struct _starpu_mp_node *node,
 //				    void *msg, int len);