Browse Source

solving MPI_Test issue for simgrid

Luka Stanisic 8 years ago
parent
commit
019ea63df4
6 changed files with 206 additions and 8 deletions
  1. 121 4
      mpi/src/starpu_mpi.c
  2. 25 2
      mpi/src/starpu_mpi_comm.c
  3. 24 0
      mpi/src/starpu_mpi_private.h
  4. 2 2
      src/common/thread.c
  5. 32 0
      src/core/simgrid.c
  6. 2 0
      src/core/simgrid.h

+ 121 - 4
mpi/src/starpu_mpi.c

@@ -76,6 +76,10 @@ static int running = 0;
 #ifdef STARPU_SIMGRID
 static int _mpi_world_size;
 static int _mpi_world_rank;
+
+static int wait_counter;
+static starpu_pthread_cond_t wait_counter_cond;
+static starpu_pthread_mutex_t wait_counter_mutex;
 #endif
 int _starpu_mpi_fake_world_size = -1;
 int _starpu_mpi_fake_world_rank = -1;
@@ -143,6 +147,12 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 	(*req)->early_data_handle = NULL;
 	(*req)->envelope = NULL;
 	(*req)->sequential_consistency = 1;
+
+#ifdef STARPU_SIMGRID
+	starpu_pthread_queue_init(&((*req)->queue));
+	starpu_pthread_queue_register(&wait, &((*req)->queue));
+	(*req)->done = 0;
+#endif
 }
 
 static void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req)
@@ -153,6 +163,10 @@ static void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req)
 	STARPU_PTHREAD_COND_DESTROY(&req->posted_cond);
 	free(req->datatype_name);
 	req->datatype_name = NULL;
+#ifdef STARPU_SIMGRID
+	starpu_pthread_queue_unregister(&wait, &req->queue);
+	starpu_pthread_queue_destroy(&req->queue);
+#endif
 	free(req);
 	req = NULL;
 }
@@ -294,6 +308,9 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 
 	newer_requests = 1;
 	STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
+#ifdef STARPU_SIMGRID	
+	starpu_pthread_queue_signal(&dontsleep);
+#endif	
 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 	_STARPU_MPI_LOG_OUT();
 }
@@ -350,6 +367,55 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
 	return req;
  }
 
+#ifdef STARPU_SIMGRID
+int _starpu_mpi_simgrid_mpi_test(int *done, int *flag)
+{
+	*flag = 0;
+	if (*done)
+	{
+		starpu_pthread_queue_signal(&dontsleep);
+		*flag = 1;
+	}	
+	return MPI_SUCCESS;
+}	
+static void* _starpu_mpi_simgrid_wait_req_func(void* arg)
+{
+	struct _starpu_simgrid_mpi_req *sim_req = arg;
+	int ret;
+	STARPU_PTHREAD_MUTEX_LOCK(&wait_counter_mutex);
+	wait_counter++;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&wait_counter_mutex);
+
+	ret = MPI_Wait(sim_req->request, sim_req->status);
+	
+	STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_error_code(ret));
+
+	*(sim_req->done) = 1;
+	starpu_pthread_queue_signal(sim_req->queue);
+
+	free(sim_req);
+
+	STARPU_PTHREAD_MUTEX_LOCK(&wait_counter_mutex);
+	if (--wait_counter == 0)
+		STARPU_PTHREAD_COND_SIGNAL(&wait_counter_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&wait_counter_mutex);
+	
+	return NULL;
+}
+void _starpu_mpi_simgrid_wait_req(MPI_Request *request, MPI_Status *status, starpu_pthread_queue_t *queue, unsigned *done)
+{
+	struct _starpu_simgrid_mpi_req *sim_req;
+	_STARPU_MPI_CALLOC(sim_req, 1, sizeof(struct _starpu_simgrid_mpi_req));
+	sim_req->request = request;
+	sim_req->status = status;
+	sim_req->queue = queue;
+	sim_req->done = done;
+	*done = 0;
+
+	_starpu_simgrid_xbt_thread_create("wait for mpi transfer", _starpu_mpi_simgrid_wait_req_func, sim_req);    
+}
+#endif
+
  /********************************************************/
  /*                                                      */
  /*  Send functionalities                                */
@@ -378,7 +444,11 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 		req->ret = MPI_Issend(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.comm, &req->data_request);
 		STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Issend returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
 	}
-
+	
+#ifdef STARPU_SIMGRID
+	_starpu_mpi_simgrid_wait_req(&req->data_request, &req->status_store, &req->queue, &req->done);
+#endif
+	
 	_STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->node_tag.rank, req->node_tag.data_tag, 0);
 
 	/* somebody is perhaps waiting for the MPI request to be posted */
@@ -571,6 +641,9 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 	{
 		_STARPU_MPI_COMM_FROM_DEBUG(req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.data_tag, req->node_tag.comm);
 		req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.comm, &req->data_request);
+#ifdef STARPU_SIMGRID
+		_starpu_mpi_simgrid_wait_req(&req->data_request, &req->status_store, &req->queue, &req->done);
+#endif
 	}
 	STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
 
@@ -682,6 +755,10 @@ static void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
 	_STARPU_MPI_TRACE_UWAIT_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
 	if (req->data_request != MPI_REQUEST_NULL)
 	{
+		// TODO: Fix for STARPU_SIMGRID
+#ifdef STARPU_SIMGRID
+		STARPU_MPI_ASSERT_MSG(0, "Implement this in STARPU_SIMGRID");
+#endif
 		req->ret = MPI_Wait(&req->data_request, waiting_req->status);
 		STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
 	}
@@ -755,7 +832,13 @@ static void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
 
 	_STARPU_MPI_TRACE_UTESTING_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
 
+#ifdef STARPU_SIMGRID
+	req->ret = _starpu_mpi_simgrid_mpi_test(&req->done, testing_req->flag);
+	memcpy(testing_req->status, &req->status_store, sizeof(*testing_req->status));
+#else			
 	req->ret = MPI_Test(&req->data_request, testing_req->flag, testing_req->status);
+#endif
+	
 	STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
 
 	_STARPU_MPI_TRACE_UTESTING_END(req->node_tag.rank, req->node_tag.data_tag);
@@ -1095,7 +1178,6 @@ static void _starpu_mpi_test_detached_requests(void)
 {
 	//_STARPU_MPI_LOG_IN();
 	int flag;
-	MPI_Status status;
 	struct _starpu_mpi_req *req;
 
 	STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
@@ -1106,7 +1188,11 @@ static void _starpu_mpi_test_detached_requests(void)
 		STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
 
 		//_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %d - TYPE %s %d\n", &req->data_request, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->node_tag.rank);
-		req->ret = MPI_Test(&req->data_request, &flag, &status);
+#ifdef STARPU_SIMGRID
+		req->ret = _starpu_mpi_simgrid_mpi_test(&req->done, &flag);
+#else			
+		req->ret = MPI_Test(&req->data_request, &flag, MPI_STATUS_IGNORE);
+#endif		
 
 		STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
 
@@ -1354,6 +1440,11 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 
+#ifdef STARPU_SIMGRID
+	starpu_pthread_wait_init(&wait);
+	starpu_pthread_queue_init(&dontsleep);
+	starpu_pthread_queue_register(&wait, &dontsleep);
+#endif
 
 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 
@@ -1361,6 +1452,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 	while (running || posted_requests || !(_starpu_mpi_req_list_empty(ready_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))// || !(_starpu_mpi_early_request_count()) || !(_starpu_mpi_sync_data_count()))
 	{
+#ifdef STARPU_SIMGRID
+		starpu_pthread_wait_reset(&wait);
+#endif
 		/* shall we block ? */
 		unsigned block = _starpu_mpi_req_list_empty(ready_requests) && _starpu_mpi_early_request_count() == 0 && _starpu_mpi_sync_data_count() == 0;
 
@@ -1522,7 +1616,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		}
 #ifdef STARPU_SIMGRID
 		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-		MSG_process_sleep(0.000010);
+		starpu_pthread_wait_wait(&wait);
 		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 #endif
 	}
@@ -1532,7 +1626,22 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		_starpu_mpi_comm_cancel_recv();
 		envelope_request_submitted = 0;
 	}
+	
 
+#ifdef STARPU_SIMGRID
+	STARPU_PTHREAD_MUTEX_LOCK(&wait_counter_mutex);
+	while (wait_counter != 0)
+		STARPU_PTHREAD_COND_WAIT(&wait_counter_cond, &wait_counter_mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&wait_counter_mutex);
+
+	STARPU_PTHREAD_MUTEX_DESTROY(&wait_counter_mutex);
+	STARPU_PTHREAD_COND_DESTROY(&wait_counter_cond);
+	
+	starpu_pthread_queue_unregister(&wait, &dontsleep);
+	starpu_pthread_queue_destroy(&dontsleep);
+	starpu_pthread_wait_destroy(&wait);
+#endif
+	
 	STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
 	STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_list_empty(ready_requests), "List of ready requests not empty");
 	STARPU_MPI_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
@@ -1628,6 +1737,11 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi, MPI_Comm
 	STARPU_PTHREAD_MUTEX_INIT(&mutex_posted_requests, NULL);
 	_starpu_mpi_comm = starpu_getenv("STARPU_MPI_COMM") != NULL;
 
+#ifdef STARPU_SIMGRID
+	STARPU_PTHREAD_MUTEX_INIT(&wait_counter_mutex, NULL);
+	STARPU_PTHREAD_COND_INIT(&wait_counter_cond, NULL);
+#endif
+ 
 #ifdef STARPU_MPI_ACTIVITY
 	hookid = starpu_progression_hook_register(_starpu_mpi_progression_hook_func, NULL);
 	STARPU_MPI_ASSERT_MSG(hookid >= 0, "starpu_progression_hook_register failed");
@@ -1721,6 +1835,9 @@ int starpu_mpi_shutdown(void)
 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 	running = 0;
 	STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
+#ifdef STARPU_SIMGRID
+	starpu_pthread_queue_signal(&dontsleep);	
+#endif		
 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 
 #ifndef STARPU_SIMGRID

+ 25 - 2
mpi/src/starpu_mpi_comm.c

@@ -28,6 +28,12 @@ struct _starpu_mpi_comm
 	struct _starpu_mpi_envelope *envelope;
 	MPI_Request request;
 	int posted;
+
+#ifdef STARPU_SIMGRID
+	MPI_Status status;
+	starpu_pthread_queue_t queue;
+	unsigned done;
+#endif
 };
 struct _starpu_mpi_comm_hashtable
 {
@@ -62,6 +68,10 @@ void _starpu_mpi_comm_free()
 	{
 		struct _starpu_mpi_comm *_comm = _starpu_mpi_comms[i]; // get the ith _comm;
 		free(_comm->envelope);
+#ifdef STARPU_SIMGRID
+		starpu_pthread_queue_unregister(&wait, &_comm->queue);
+		starpu_pthread_queue_destroy(&_comm->queue);
+#endif
 		free(_comm);
 	}
 	free(_starpu_mpi_comms);
@@ -106,6 +116,12 @@ void _starpu_mpi_comm_register(MPI_Comm comm)
 		_STARPU_MPI_MALLOC(entry, sizeof(*entry));
 		entry->comm = comm;
 		HASH_ADD(hh, _starpu_mpi_comms_cache, comm, sizeof(entry->comm), entry);
+
+#ifdef STARPU_SIMGRID
+		starpu_pthread_queue_init(&_comm->queue);
+		starpu_pthread_queue_register(&wait, &_comm->queue);
+		_comm->done = 0;
+#endif	
 	}
 	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_comms_mutex);
 }
@@ -123,6 +139,9 @@ void _starpu_mpi_comm_post_recv()
 			_STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop on comm %d %d\n", i, _comm->comm);
 			_STARPU_MPI_COMM_FROM_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _STARPU_MPI_TAG_ENVELOPE, _STARPU_MPI_TAG_ENVELOPE, _comm->comm);
 			MPI_Irecv(_comm->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _STARPU_MPI_TAG_ENVELOPE, _comm->comm, &_comm->request);
+#ifdef STARPU_SIMGRID
+			_starpu_mpi_simgrid_wait_req(&_comm->request, &_comm->status, &_comm->queue, &_comm->done);
+#endif
 			_comm->posted = 1;
 		}
 	}
@@ -143,9 +162,11 @@ int _starpu_mpi_comm_test_recv(MPI_Status *status, struct _starpu_mpi_envelope *
 			int flag, res;
 			/* test whether an envelope has arrived. */
 #ifdef STARPU_SIMGRID
-			MSG_process_sleep(0.000001);
-#endif
+			res = _starpu_mpi_simgrid_mpi_test(&_comm->done, &flag);
+			memcpy(status, &_comm->status, sizeof(*status));
+#else
 			res = MPI_Test(&_comm->request, &flag, status);
+#endif
 			STARPU_ASSERT(res == MPI_SUCCESS);
 			if (flag)
 			{
@@ -184,7 +205,9 @@ void _starpu_mpi_comm_cancel_recv()
 		{
 			MPI_Status status;
 			MPI_Cancel(&_comm->request);
+#ifndef STARPU_SIMGRID
 			MPI_Wait(&_comm->request, &status);
+#endif			
 			_comm->posted = 0;
 		}
 	}

+ 24 - 0
mpi/src/starpu_mpi_private.h

@@ -24,11 +24,28 @@
 #include "starpu_mpi.h"
 #include "starpu_mpi_fxt.h"
 #include <common/list.h>
+#include <core/simgrid.h>
 
 #ifdef __cplusplus
 extern "C" {
 #endif
+	
+#ifdef STARPU_SIMGRID
+starpu_pthread_wait_t wait;
+starpu_pthread_queue_t dontsleep;
 
+struct _starpu_simgrid_mpi_req
+{
+	MPI_Request *request;
+	MPI_Status *status;
+	starpu_pthread_queue_t *queue;
+	unsigned *done;
+};
+
+int _starpu_mpi_simgrid_mpi_test(int *done, int *flag);
+void _starpu_mpi_simgrid_wait_req(MPI_Request *request, 	MPI_Status *status, starpu_pthread_queue_t *queue, unsigned *done);
+#endif
+	
 extern int _starpu_debug_rank;
 char *_starpu_mpi_get_mpi_error_code(int code);
 extern int _starpu_mpi_comm;
@@ -224,6 +241,13 @@ LIST_TYPE(_starpu_mpi_req,
 	int sequential_consistency;
 
      	UT_hash_handle hh;
+
+#ifdef STARPU_SIMGRID
+        MPI_Status status_store;
+	starpu_pthread_queue_t queue;
+	unsigned done;
+#endif
+	  
 );
 
 struct _starpu_mpi_argc_argv

+ 2 - 2
src/common/thread.c

@@ -215,7 +215,7 @@ int starpu_pthread_setspecific(starpu_pthread_key_t key, const void *pointer)
 {
 	void **array;
 #ifdef STARPU_SIMGRID_HAVE_SIMIX_PROCESS_GET_CODE
-	if (SIMIX_process_get_code() == _starpu_mpi_simgrid_init)
+	if ((SIMIX_process_get_code() == _starpu_mpi_simgrid_init) || (!strcmp(SIMIX_process_self_get_name(),"wait for mpi transfer")))
 		/* Special-case the SMPI process */
 		array = smpi_process_get_user_data();
 	else
@@ -229,7 +229,7 @@ void* starpu_pthread_getspecific(starpu_pthread_key_t key)
 {
 	void **array;
 #ifdef STARPU_SIMGRID_HAVE_SIMIX_PROCESS_GET_CODE
-	if (SIMIX_process_get_code() == _starpu_mpi_simgrid_init)
+	if ((SIMIX_process_get_code() == _starpu_mpi_simgrid_init) || (!strcmp(SIMIX_process_self_get_name(),"wait for mpi transfer")))
 		/* Special-case the SMPI process */
 		array = smpi_process_get_user_data();
 	else

+ 32 - 0
src/core/simgrid.c

@@ -31,6 +31,7 @@
 
 #ifdef STARPU_SIMGRID
 #include <sys/resource.h>
+#include <simgrid/simix.h>
 
 #pragma weak starpu_main
 extern int starpu_main(int argc, char *argv[]);
@@ -803,4 +804,35 @@ void _starpu_simgrid_count_ngpus(void)
 		}
 #endif
 }
+
+typedef struct{
+  void_f_pvoid_t code;
+  void *userparam;
+  void *father_data;
+} thread_data_t;
+
+static int _starpu_simgrid_xbt_thread_create_wrapper(int argc, char *argv[])
+{
+  smx_process_t self = SIMIX_process_self();
+  thread_data_t *t = SIMIX_process_self_get_data(self);
+  simcall_process_set_data(self, t->father_data);
+  t->code(t->userparam);
+  simcall_process_set_data(self, NULL);
+  free(t);
+  
+  return 0;
+}
+
+void _starpu_simgrid_xbt_thread_create(const char *name, void_f_pvoid_t code, void *param)
+{
+  thread_data_t *res = malloc(sizeof(thread_data_t));
+  res->userparam = param;
+  res->code = code;
+  res->father_data = SIMIX_process_self_get_data(SIMIX_process_self());
+
+  simcall_process_create(name,
+                           _starpu_simgrid_xbt_thread_create_wrapper, res,
+                           SIMIX_host_self_get_name(), -1.0, 0, NULL,
+                           /*props */ NULL,0);
+}
 #endif

+ 2 - 0
src/core/simgrid.h

@@ -68,6 +68,8 @@ starpu_pthread_queue_t _starpu_simgrid_task_queue[STARPU_NMAXWORKERS];
  * bus */
 void _starpu_simgrid_count_ngpus(void);
 
+void _starpu_simgrid_xbt_thread_create(const char *name, void_f_pvoid_t code,
+				       void *param);
 #endif
 
 #endif // __SIMGRID_H__