Преглед на файлове

Added nmad direct send and callback receive.
The userdefinied callback are run by the previous communication thread.
Compile, but fail at execution.

Guillaume Beauchamp преди 8 години
родител
ревизия
4c4c87b6f0
променени са 7 файла, в които са добавени 409 реда и са изтрити 385 реда
  1. 2 0
      configure.ac
  2. 41 10
      include/starpu_thread.h
  3. 0 1
      nmad/examples/matrix_decomposition/mpi_cholesky.c
  4. 4 4
      nmad/src/Makefile.am
  5. 254 364
      nmad/src/starpu_mpi.c
  6. 19 5
      nmad/src/starpu_mpi_private.h
  7. 89 1
      src/common/thread.c

+ 2 - 0
configure.ac

@@ -411,6 +411,7 @@ AC_SUBST(CC_OR_MPICC, $cc_or_mpicc)
 if test x$use_mpi = xyes -a x$enable_nmad = xyes ; then
     build_nmad_lib=yes
     enable_mpi=no
+    PKG_CHECK_MODULES([NMAD],[nmad pioman tbx])
 else
     build_nmad_lib=no
 fi
@@ -422,6 +423,7 @@ AC_SUBST(USE_NMAD, $build_nmad_lib)
 AM_CONDITIONAL(USE_NMAD, test x$build_nmad_lib = xyes)
 
 
+
 ###############################################################################
 #                                                                             #
 #                                LIBTOOLS                                     #

+ 41 - 10
include/starpu_thread.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010, 2012-2016  Université de Bordeaux
- * Copyright (C) 2010, 2011, 2012, 2013, 2014  CNRS
+ * Copyright (C) 2010, 2012-2017  Université de Bordeaux
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2017  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -33,6 +33,7 @@
 #endif
 #elif !defined(_MSC_VER) || defined(BUILDING_STARPU)
 #include <pthread.h>
+#include <semaphore.h>
 #endif
 #include <stdint.h>
 
@@ -50,8 +51,9 @@ extern "C"
 typedef msg_process_t starpu_pthread_t;
 typedef int starpu_pthread_attr_t;
 
+int starpu_pthread_equal(starpu_pthread_t t1, starpu_pthread_t t2);
+starpu_pthread_t starpu_pthread_self(void);
 int starpu_pthread_create_on(char *name, starpu_pthread_t *thread, const starpu_pthread_attr_t *attr, void *(*start_routine) (void *), void *arg, msg_host_t host);
-#define starpu_pthread_setname(name)
 int starpu_pthread_create(starpu_pthread_t *thread, const starpu_pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
 int starpu_pthread_join(starpu_pthread_t thread, void **retval);
 int starpu_pthread_exit(void *retval) STARPU_ATTRIBUTE_NORETURN;
@@ -64,8 +66,18 @@ int starpu_pthread_attr_setdetachstate(starpu_pthread_attr_t *attr, int detachst
 typedef pthread_t starpu_pthread_t;
 typedef pthread_attr_t starpu_pthread_attr_t;
 
+#define starpu_pthread_equal pthread_equal
+#define starpu_pthread_self pthread_self
 #define starpu_pthread_create pthread_create
 #define starpu_pthread_create_on(name, thread, attr, routine, arg, where) starpu_pthread_create(thread, attr, routine, arg)
+#define starpu_pthread_join pthread_join
+#define starpu_pthread_exit pthread_exit
+#define starpu_pthread_attr_init pthread_attr_init
+#define starpu_pthread_attr_destroy pthread_attr_destroy
+#define starpu_pthread_attr_setdetachstate pthread_attr_setdetachstate
+
+#endif /* STARPU_SIMGRID, _MSC_VER */
+
 #ifdef STARPU_HAVE_PTHREAD_SETNAME_NP
 #ifdef STARPU_HAVE_DARWIN
 #define starpu_pthread_setname(name) pthread_setname_np(name)
@@ -75,13 +87,6 @@ typedef pthread_attr_t starpu_pthread_attr_t;
 #else
 #define starpu_pthread_setname(name)
 #endif
-#define starpu_pthread_join pthread_join
-#define starpu_pthread_exit pthread_exit
-#define starpu_pthread_attr_init pthread_attr_init
-#define starpu_pthread_attr_destroy pthread_attr_destroy
-#define starpu_pthread_attr_setdetachstate pthread_attr_setdetachstate
-
-#endif /* STARPU_SIMGRID, _MSC_VER */
 
 /*
  * Encapsulation of the pthread_mutex_* functions.
@@ -403,6 +408,32 @@ int starpu_pthread_wait_wait(starpu_pthread_wait_t *w);
 int starpu_pthread_wait_destroy(starpu_pthread_wait_t *w);
 #endif
 
+/*
+ * Encapsulation of the semaphore functions.
+ */
+
+#ifdef STARPU_SIMGRID
+
+typedef msg_sem_t starpu_sem_t;
+int starpu_sem_destroy(starpu_sem_t *);
+int starpu_sem_getvalue(starpu_sem_t *, int *);
+int starpu_sem_init(starpu_sem_t *, int, unsigned);
+int starpu_sem_post(starpu_sem_t *);
+int starpu_sem_trywait(starpu_sem_t *);
+int starpu_sem_wait(starpu_sem_t *);
+
+#elif !defined(_MSC_VER) || defined(BUILDING_STARPU) /* !STARPU_SIMGRID */
+
+typedef sem_t starpu_sem_t;
+#define starpu_sem_destroy sem_destroy
+#define starpu_sem_getvalue sem_getvalue
+#define starpu_sem_init sem_init
+#define starpu_sem_post sem_post
+int starpu_sem_trywait(starpu_sem_t *);
+int starpu_sem_wait(starpu_sem_t *);
+
+#endif
+
 #ifdef __cplusplus
 }
 #endif

+ 0 - 1
nmad/examples/matrix_decomposition/mpi_cholesky.c

@@ -33,7 +33,6 @@ int main(int argc, char **argv)
 	int rank, nodes, ret;
 	double timing, flops;
 	int correctness;
-
 	ret = starpu_init(NULL);
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
 

+ 4 - 4
nmad/src/Makefile.am

@@ -21,17 +21,17 @@ BUILT_SOURCES =
 
 CLEANFILES = *.gcno *.gcda *.linkinfo
 
-AM_CFLAGS = -Wall $(STARPU_CUDA_CPPFLAGS) $(STARPU_OPENCL_CPPFLAGS) $(FXT_CFLAGS) $(MAGMA_CFLAGS) $(HWLOC_CFLAGS) $(GLOBAL_AM_CFLAGS)
-LIBS = $(top_builddir)/src/@LIBSTARPU_LINK@ @LIBS@ $(FXT_LIBS) $(MAGMA_LIBS)
+AM_CFLAGS = -Wall $(STARPU_CUDA_CPPFLAGS) $(STARPU_OPENCL_CPPFLAGS) $(FXT_CFLAGS) $(MAGMA_CFLAGS) $(HWLOC_CFLAGS) $(GLOBAL_AM_CFLAGS) $(NMAD_CFLAGS)
+LIBS = $(top_builddir)/src/@LIBSTARPU_LINK@ @LIBS@ $(FXT_LIBS) $(MAGMA_LIBS) $(NMAD_LIBS)
 AM_CPPFLAGS = -I$(top_srcdir)/include/ -I$(top_srcdir)/src/ -I$(top_builddir)/src -I$(top_builddir)/include -I$(top_srcdir)/mpi/include -I$(top_srcdir)/mpi/src
-AM_LDFLAGS = $(STARPU_OPENCL_LDFLAGS) $(STARPU_CUDA_LDFLAGS)
+AM_LDFLAGS = $(STARPU_OPENCL_LDFLAGS) $(STARPU_CUDA_LDFLAGS) $(NMAD_LDFLAGS)
 
 lib_LTLIBRARIES = libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
 
 libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_LIBADD = $(top_builddir)/src/libstarpu-@STARPU_EFFECTIVE_VERSION@.la
 libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_LDFLAGS = $(ldflags) -no-undefined					\
   -version-info $(LIBSTARPUMPI_INTERFACE_CURRENT):$(LIBSTARPUMPI_INTERFACE_REVISION):$(LIBSTARPUMPI_INTERFACE_AGE) \
-  $(MPICC_LDFLAGS) $(FXT_LDFLAGS)
+  $(MPICC_LDFLAGS) $(FXT_LDFLAGS) $(NMAD_LDFLAGS)
 noinst_HEADERS =					\
 	starpu_mpi_private.h				\
 	starpu_mpi_fxt.h				\

+ 254 - 364
nmad/src/starpu_mpi.c

@@ -27,9 +27,13 @@
 #include <common/config.h>
 #include <common/thread.h>
 #include <datawizard/coherency.h>
+#include <nm_sendrecv_interface.h>
+
+#define nm_mpi_communicator_get(c) NULL 
+#define nm_mpi_datatype_get(c) NULL
 
 static void _starpu_mpi_submit_new_mpi_request(void *arg);
-static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req);
+static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,nm_sr_event_t event);
 #ifdef STARPU_VERBOSE
 static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type);
 #endif
@@ -39,6 +43,8 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t dat
 static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle,
 							int source, int mpi_tag, MPI_Comm comm,
 							unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency);
+static void _starpu_mpi_handle_new_request(struct _starpu_mpi_req *req);
+
 static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req);
 
 /* The list of requests that have been newly submitted by the application */
@@ -51,18 +57,26 @@ static starpu_pthread_mutex_t detached_requests_mutex;
 /* Condition to wake up progression thread */
 static starpu_pthread_cond_t cond_progression;
 /* Condition to wake up waiting for all current MPI requests to finish */
-static starpu_pthread_cond_t cond_finished;
+static starpu_pthread_cond_t cond_finished;//FIXME no longer working nor usefull.
 static starpu_pthread_mutex_t mutex;
 static starpu_pthread_t progress_thread;
-static int running = 0;
+static volatile int running = 0;
 
 /* Count requests posted by the application and not yet submitted to MPI, i.e pushed into the new_requests list */
 static starpu_pthread_mutex_t mutex_posted_requests;
 static int posted_requests = 0, newer_requests, barrier_running = 0;
 
 #define _STARPU_MPI_INC_POSTED_REQUESTS(value) { STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
+//TODO remove (we no longer need to count them.)
+#define REQ_FINALIZED 0x1
+
 
 
+PUK_LFSTACK_TYPE(callback,	void *callback_arg; void (*callback)(void *););
+static callback_lfstack_t callback_stack = NULL;
+
+static starpu_sem_t callback_sem;
+
 /********************************************************/
 /*                                                      */
 /*  Send/Receive functionalities                        */
@@ -82,12 +96,12 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
 	STARPU_ASSERT_MSG(req, "Invalid request");
 
 	_STARPU_MPI_INC_POSTED_REQUESTS(1);
+	nm_mpi_communicator_t*p_comm = nm_mpi_communicator_get(comm);
 
 	/* Initialize the request structure */
-	req->submitted = 0;
 	req->completed = 0;
 	STARPU_PTHREAD_MUTEX_INIT(&req->req_mutex, NULL);
-	STARPU_PTHREAD_COND_INIT(&req->req_cond, NULL);
+	piom_cond_init(&req->req_cond, 0);
 
 	req->request_type = request_type;
 	req->user_datatype = -1;
@@ -96,6 +110,8 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
 	req->srcdst = srcdst;
 	req->mpi_tag = mpi_tag;
 	req->comm = comm;
+	req->session = nm_mpi_communicator_get_session(p_comm);
+	req->gate = nm_mpi_communicator_get_gate(p_comm,req->srcdst);
 
 	req->detached = detached;
 	req->sync = sync;
@@ -107,7 +123,7 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
 	/* Asynchronously request StarPU to fetch the data in main memory: when
 	 * it is available in main memory, _starpu_mpi_submit_new_mpi_request(req) is called and
 	 * the request is actually submitted */
-	starpu_data_acquire_cb_sequential_consistency(data_handle, mode, _starpu_mpi_submit_new_mpi_request, (void *)req,sequential_consistency);
+	starpu_data_acquire_cb_sequential_consistency(data_handle, mode, _starpu_mpi_handle_new_request, (void *)req,sequential_consistency);
 
 	_STARPU_MPI_LOG_OUT();
 	return req;
@@ -129,25 +145,25 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 
 	TRACE_MPI_ISEND_SUBMIT_BEGIN(req->srcdst, req->mpi_tag, 0);
 
+	struct nm_data_s data;
+	nm_mpi_data_build(&data, (void*)req->ptr,  nm_mpi_datatype_get(req->datatype), req->count);
+	nm_sr_send_init(req->session, &(req->request));
+	nm_sr_send_pack_data(req->session, &(req->request), &data);
+
 	if (req->sync == 0)
 	{
-		req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
-		STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %d", req->ret);
+		req->ret = nm_sr_send_isend(req->session, &(req->request), req->gate, req->mpi_tag);
+
+		STARPU_ASSERT_MSG(req->ret == NM_ESUCCESS, "MPI_Isend returning %d", req->ret);
 	}
 	else
 	{
-		req->ret = MPI_Issend(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
-		STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Issend returning %d", req->ret);
+		req->ret = nm_sr_send_issend(req->session, &(req->request), req->gate, req->mpi_tag);
+		STARPU_ASSERT_MSG(req->ret == NM_ESUCCESS, "MPI_Issend returning %d", req->ret);
 	}
 
 	TRACE_MPI_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, starpu_data_get_size(req->data_handle));
 
-	/* somebody is perhaps waiting for the MPI request to be posted */
-	STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
-	req->submitted = 1;
-	STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
-
 	_starpu_mpi_handle_detached_request(req);
 
 	_STARPU_MPI_LOG_OUT();
@@ -158,6 +174,7 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 	_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
 	if (req->user_datatype == 0)
 	{
+		req->waited = 1;
 		req->count = 1;
 		req->ptr = starpu_data_get_local_ptr(req->data_handle);
 	}
@@ -165,6 +182,7 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 	{
 		starpu_ssize_t psize = -1;
 		int ret;
+		req->waited =2;
 
 		// Do not pack the data, just try to find out the size
 		starpu_data_pack(req->data_handle, NULL, &psize);
@@ -172,10 +190,13 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 		if (psize != -1)
 		{
 			// We already know the size of the data, let's send it to overlap with the packing of the data
-			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), req->mpi_tag, req->srcdst);
-			req->count = psize;
-			ret = MPI_Isend(&req->count, sizeof(req->count), MPI_BYTE, req->srcdst, req->mpi_tag, req->comm, &req->size_req);
-			STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
+			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), req->mpi_tag, req->srcdst);			req->count = psize;
+			//ret = nm_sr_isend(nm_mpi_communicator_get_session(p_req->p_comm),nm_mpi_communicator_get_gate(p_comm,req->srcdst), req->mpi_tag,&req->count, sizeof(req->count), &req->size_req);
+			ret = nm_sr_isend(req->session,req->gate, req->mpi_tag,&req->count, sizeof(req->count), &req->size_req);
+
+
+		//	ret = MPI_Isend(&req->count, sizeof(req->count), MPI_BYTE, req->srcdst, req->mpi_tag, req->comm, &req->size_req);
+			STARPU_ASSERT_MSG(ret == NM_ESUCCESS, "when sending size, nm_sr_isend returning %d", ret);
 		}
 
 		// Pack the data
@@ -184,8 +205,8 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 		{
 			// We know the size now, let's send it
 			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", req->count, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), req->mpi_tag, req->srcdst);
-			ret = MPI_Isend(&req->count, sizeof(req->count), MPI_BYTE, req->srcdst, req->mpi_tag, req->comm, &req->size_req);
-			STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
+			ret = nm_sr_isend(req->session,req->gate, req->mpi_tag,&req->count, sizeof(req->count), &req->size_req);
+			STARPU_ASSERT_MSG(ret == NM_ESUCCESS, "when sending size, nm_sr_isend returning %d", ret);
 		}
 		else
 		{
@@ -284,16 +305,17 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 
 	TRACE_MPI_IRECV_SUBMIT_BEGIN(req->srcdst, req->mpi_tag);
 
-	req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
+	//req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
+	struct nm_data_s data;
+	nm_mpi_data_build(&data, (void*)req->ptr,  nm_mpi_datatype_get(req->datatype), req->count);
+	nm_sr_recv_init(req->session, &(req->request));
+	nm_sr_recv_unpack_data(req->session, &(req->request), &data);
+	req->ret = nm_sr_recv_irecv(req->session, &(req->request), req->gate, req->mpi_tag,0);
+
 	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %d", req->ret);
 
 	TRACE_MPI_IRECV_SUBMIT_END(req->srcdst, req->mpi_tag);
 
-	/* somebody is perhaps waiting for the MPI request to be posted */
-	STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
-	req->submitted = 1;
-	STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
 
 	_starpu_mpi_handle_detached_request(req);
 
@@ -404,65 +426,33 @@ int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, M
 /*                                                      */
 /********************************************************/
 
-static void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
-{
-	_STARPU_MPI_LOG_IN();
-	/* Which is the mpi request we are waiting for ? */
-	struct _starpu_mpi_req *req = waiting_req->other_request;
+// static void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
+// {
+// 	_STARPU_MPI_LOG_IN();
+// 	/* Which is the mpi request we are waiting for ? */
+// 	struct _starpu_mpi_req *req = waiting_req->other_request;
 
-	TRACE_MPI_UWAIT_BEGIN(req->srcdst, req->mpi_tag);
+// 	TRACE_MPI_UWAIT_BEGIN(req->srcdst, req->mpi_tag);
 
-	req->ret = MPI_Wait(&req->request, waiting_req->status);
-	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Wait returning %d", req->ret);
+// 	//TODO req->ret = MPI_Wait(&req->request, waiting_req->status);
+// 	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Wait returning %d", req->ret);
 
-	TRACE_MPI_UWAIT_END(req->srcdst, req->mpi_tag);
+// 	TRACE_MPI_UWAIT_END(req->srcdst, req->mpi_tag);
 
-	_starpu_mpi_handle_request_termination(req);
-	_STARPU_MPI_LOG_OUT();
-}
+// 	_starpu_mpi_handle_request_termination(req);
+// 	_STARPU_MPI_LOG_OUT();
+// }
 
 int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
 {
-	_STARPU_MPI_LOG_IN();
-	int ret;
-	struct _starpu_mpi_req *waiting_req = calloc(1, sizeof(struct _starpu_mpi_req));
-	STARPU_ASSERT_MSG(waiting_req, "Allocation failed");
 	struct _starpu_mpi_req *req = *public_req;
-
-	_STARPU_MPI_INC_POSTED_REQUESTS(1);
-
-	/* We cannot try to complete a MPI request that was not actually posted
-	 * to MPI yet. */
-	STARPU_PTHREAD_MUTEX_LOCK(&(req->req_mutex));
-	while (!(req->submitted))
-		STARPU_PTHREAD_COND_WAIT(&(req->req_cond), &(req->req_mutex));
-	STARPU_PTHREAD_MUTEX_UNLOCK(&(req->req_mutex));
-
-	/* Initialize the request structure */
-	STARPU_PTHREAD_MUTEX_INIT(&(waiting_req->req_mutex), NULL);
-	STARPU_PTHREAD_COND_INIT(&(waiting_req->req_cond), NULL);
-	waiting_req->status = status;
-	waiting_req->other_request = req;
-	waiting_req->func = _starpu_mpi_wait_func;
-	waiting_req->request_type = WAIT_REQ;
-
-	_starpu_mpi_submit_new_mpi_request(waiting_req);
-
-	/* We wait for the MPI request to finish */
-	STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
+	_STARPU_MPI_LOG_IN();
 	while (!req->completed)
-		STARPU_PTHREAD_COND_WAIT(&req->req_cond, &req->req_mutex);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
-
-	ret = req->ret;
-
-	/* The internal request structure was automatically allocated */
-	*public_req = NULL;
-	free(req);
-
-	free(waiting_req);
+		piom_cond_wait(&(req->req_cond),REQ_FINALIZED);
+	if (status!=MPI_STATUS_IGNORE)
+		_starpu_mpi_req_status(req,status);
 	_STARPU_MPI_LOG_OUT();
-	return ret;
+	return req->ret; //FIXME May have already been freed ?
 }
 
 /********************************************************/
@@ -471,95 +461,23 @@ int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
 /*                                                      */
 /********************************************************/
 
-static void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
+
+int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 {
+	struct _starpu_mpi_req *req = *public_req;
 	_STARPU_MPI_LOG_IN();
-	/* Which is the mpi request we are testing for ? */
-	struct _starpu_mpi_req *req = testing_req->other_request;
-
 	_STARPU_MPI_DEBUG(2, "Test request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
 			  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 
 	TRACE_MPI_UTESTING_BEGIN(req->srcdst, req->mpi_tag);
 
-	req->ret = MPI_Test(&req->request, testing_req->flag, testing_req->status);
-	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %d", req->ret);
-
+	*flag = req->completed;
+	if (*flag && status!=MPI_STATUS_IGNORE)
+		_starpu_mpi_req_status(req,status);
 	TRACE_MPI_UTESTING_END(req->srcdst, req->mpi_tag);
 
-	if (*testing_req->flag)
-	{
-		testing_req->ret = req->ret;
-		_starpu_mpi_handle_request_termination(req);
-	}
-
-	STARPU_PTHREAD_MUTEX_LOCK(&testing_req->req_mutex);
-	testing_req->completed = 1;
-	STARPU_PTHREAD_COND_SIGNAL(&testing_req->req_cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&testing_req->req_mutex);
-	_STARPU_MPI_LOG_OUT();
-}
-
-int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
-{
-	_STARPU_MPI_LOG_IN();
-	int ret = 0;
-
-	STARPU_ASSERT_MSG(public_req, "starpu_mpi_test needs a valid starpu_mpi_req");
-
-	struct _starpu_mpi_req *req = *public_req;
-
-	STARPU_ASSERT_MSG(!req->detached, "MPI_Test cannot be called on a detached request");
-
-	STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
-	unsigned submitted = req->submitted;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
-
-	if (submitted)
-	{
-		struct _starpu_mpi_req *testing_req = calloc(1, sizeof(struct _starpu_mpi_req));
-		STARPU_ASSERT_MSG(testing_req, "allocation failed");
-		//		memset(testing_req, 0, sizeof(struct _starpu_mpi_req));
-
-		/* Initialize the request structure */
-		STARPU_PTHREAD_MUTEX_INIT(&(testing_req->req_mutex), NULL);
-		STARPU_PTHREAD_COND_INIT(&(testing_req->req_cond), NULL);
-		testing_req->flag = flag;
-		testing_req->status = status;
-		testing_req->other_request = req;
-		testing_req->func = _starpu_mpi_test_func;
-		testing_req->completed = 0;
-		testing_req->request_type = TEST_REQ;
-
-		_STARPU_MPI_INC_POSTED_REQUESTS(1);
-		_starpu_mpi_submit_new_mpi_request(testing_req);
-
-		/* We wait for the test request to finish */
-		STARPU_PTHREAD_MUTEX_LOCK(&(testing_req->req_mutex));
-		while (!(testing_req->completed))
-			STARPU_PTHREAD_COND_WAIT(&(testing_req->req_cond), &(testing_req->req_mutex));
-		STARPU_PTHREAD_MUTEX_UNLOCK(&(testing_req->req_mutex));
-
-		ret = testing_req->ret;
-
-		if (*(testing_req->flag))
-		{
-			/* The request was completed so we free the internal
-			 * request structure which was automatically allocated
-			 * */
-			*public_req = NULL;
-			free(req);
-		}
-
-		free(testing_req);
-	}
-	else
-	{
-		*flag = 0;
-	}
-
 	_STARPU_MPI_LOG_OUT();
-	return ret;
+	return MPI_SUCCESS;
 }
 
 /********************************************************/
@@ -568,67 +486,15 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 /*                                                      */
 /********************************************************/
 
-static void _starpu_mpi_barrier_func(struct _starpu_mpi_req *barrier_req)
-{
-	_STARPU_MPI_LOG_IN();
-
-	barrier_req->ret = MPI_Barrier(barrier_req->comm);
-	STARPU_ASSERT_MSG(barrier_req->ret == MPI_SUCCESS, "MPI_Barrier returning %d", barrier_req->ret);
-
-	_starpu_mpi_handle_request_termination(barrier_req);
-	_STARPU_MPI_LOG_OUT();
-}
-
 int starpu_mpi_barrier(MPI_Comm comm)
 {
 	_STARPU_MPI_LOG_IN();
 	int ret;
-	struct _starpu_mpi_req *barrier_req = calloc(1, sizeof(struct _starpu_mpi_req));
-	STARPU_ASSERT_MSG(barrier_req, "allocation failed");
-
-	/* First wait for *both* all tasks and MPI requests to finish, in case
-	 * some tasks generate MPI requests, MPI requests generate tasks, etc.
-	 */
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-	STARPU_ASSERT_MSG(!barrier_running, "Concurrent starpu_mpi_barrier is not implemented, even on different communicators");
-	barrier_running = 1;
-	do
-	{
-		while (posted_requests)
-			/* Wait for all current MPI requests to finish */
-			STARPU_PTHREAD_COND_WAIT(&cond_finished, &mutex);
-		/* No current request, clear flag */
-		newer_requests = 0;
-		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-		/* Now wait for all tasks */
-		starpu_task_wait_for_all();
-		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-		/* Check newer_requests again, in case some MPI requests
-		 * triggered by tasks completed and triggered tasks between
-		 * wait_for_all finished and we take the lock */
-	} while (posted_requests || newer_requests);
-	barrier_running = 0;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-
-	/* Initialize the request structure */
-	STARPU_PTHREAD_MUTEX_INIT(&(barrier_req->req_mutex), NULL);
-	STARPU_PTHREAD_COND_INIT(&(barrier_req->req_cond), NULL);
-	barrier_req->func = _starpu_mpi_barrier_func;
-	barrier_req->request_type = BARRIER_REQ;
-	barrier_req->comm = comm;
+//	STARPU_ASSERT_MSG(!barrier_running, "Concurrent starpu_mpi_barrier is not implemented, even on different communicators");
+	ret = MPI_Barrier(comm);
 
-	_STARPU_MPI_INC_POSTED_REQUESTS(1);
-	_starpu_mpi_submit_new_mpi_request(barrier_req);
-
-	/* We wait for the MPI request to finish */
-	STARPU_PTHREAD_MUTEX_LOCK(&barrier_req->req_mutex);
-	while (!barrier_req->completed)
-		STARPU_PTHREAD_COND_WAIT(&barrier_req->req_cond, &barrier_req->req_mutex);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&barrier_req->req_mutex);
-
-	ret = barrier_req->ret;
+	STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Barrier returning %d", ret);
 
-	free(barrier_req);
 	_STARPU_MPI_LOG_OUT();
 	return ret;
 }
@@ -654,9 +520,8 @@ static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type
 }
 #endif
 
-static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
+static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,nm_sr_event_t event)
 {
-	int ret;
 
 	_STARPU_MPI_LOG_IN();
 
@@ -669,11 +534,15 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 		{
 			if (req->request_type == SEND_REQ)
 			{
+				req->waited--;
 				// We need to make sure the communication for sending the size
-				// has completed, as MPI can re-order messages, let's call
-				// MPI_Wait to make sure data have been sent
-				ret = MPI_Wait(&req->size_req, MPI_STATUS_IGNORE);
-				STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Wait returning %d", ret);
+				// has completed, as MPI can re-order messages, let's count
+				// recerived message.
+				// FIXME concurent access.
+				STARPU_ASSERT_MSG(event == NM_SR_EVENT_FINALIZED, "Callback with event %d", event);
+				if(req->waited>0)
+					return;
+				
 			}
 			if (req->request_type == RECV_REQ)
 				// req->ptr is freed by starpu_data_unpack
@@ -689,34 +558,38 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 	}
 
 	/* Execute the specified callback, if any */
-	if (req->callback)
-		req->callback(req->callback_arg);
+	if (req->callback){
+		struct callback_lfstack_cell_s* c = padico_malloc(sizeof(struct callback_lfstack_cell_s));
+		c->callback = req->callback;
+		c->callback_arg = req->callback_arg;
+		callback_lfstack_push(&callback_stack, c);
+		starpu_sem_post(&callback_sem);
+	}
 
 	/* tell anyone potentially waiting on the request that it is
 	 * terminated now */
-	STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
 	req->completed = 1;
-	STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
+	piom_cond_signal(&req->req_cond, REQ_FINALIZED)
+
 	_STARPU_MPI_LOG_OUT();
 }
 
-static void _starpu_mpi_submit_new_mpi_request(void *arg)
-{
-	_STARPU_MPI_LOG_IN();
-	struct _starpu_mpi_req *req = arg;
+// static void _starpu_mpi_submit_new_mpi_request(void *arg)
+// {
+// 	_STARPU_MPI_LOG_IN();
+// 	struct _starpu_mpi_req *req = arg;
 
-	_STARPU_MPI_INC_POSTED_REQUESTS(-1);
+// 	_STARPU_MPI_INC_POSTED_REQUESTS(-1);
 
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-	_starpu_mpi_req_list_push_front(new_requests, req);
-	newer_requests = 1;
-	_STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
-			  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
-	STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-	_STARPU_MPI_LOG_OUT();
-}
+// 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+// 	_starpu_mpi_req_list_push_front(new_requests, req);
+// 	newer_requests = 1;
+// 	_STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
+// 			  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+// 	STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
+// 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+// 	_STARPU_MPI_LOG_OUT();
+// }
 
 #ifdef STARPU_MPI_ACTIVITY
 static unsigned _starpu_mpi_progression_hook_func(void *arg STARPU_ATTRIBUTE_UNUSED)
@@ -735,79 +608,80 @@ static unsigned _starpu_mpi_progression_hook_func(void *arg STARPU_ATTRIBUTE_UNU
 }
 #endif /* STARPU_MPI_ACTIVITY */
 
-static void _starpu_mpi_test_detached_requests(void)
-{
-	_STARPU_MPI_LOG_IN();
-	int flag;
-	MPI_Status status;
-	struct _starpu_mpi_req *req, *next_req;
-
-	STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
-
-	for (req = _starpu_mpi_req_list_begin(detached_requests);
-		req != _starpu_mpi_req_list_end(detached_requests);
-		req = next_req)
-	{
-		next_req = _starpu_mpi_req_list_next(req);
-
-		STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
-
-		//_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %d - TYPE %s %d\n", &req->request, req->mpi_tag, _starpu_mpi_request_type(req->request_type), req->srcdst);
-		req->ret = MPI_Test(&req->request, &flag, &status);
-		STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %d", req->ret);
-
-		if (flag)
-		{
-			if (req->request_type == RECV_REQ)
-			{
-				TRACE_MPI_IRECV_COMPLETE_BEGIN(req->srcdst, req->mpi_tag);
-			}
-			else if (req->request_type == SEND_REQ)
-			{
-				TRACE_MPI_ISEND_COMPLETE_BEGIN(req->srcdst, req->mpi_tag, 0);
-			}
-
-			_starpu_mpi_handle_request_termination(req);
-
-			if (req->request_type == RECV_REQ)
-			{
-				TRACE_MPI_IRECV_COMPLETE_END(req->srcdst, req->mpi_tag);
-			}
-			else if (req->request_type == SEND_REQ)
-			{
-				TRACE_MPI_ISEND_COMPLETE_END(req->srcdst, req->mpi_tag, 0);
-			}
-		}
-
-		STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
-
-		if (flag)
-		{
-			_starpu_mpi_req_list_erase(detached_requests, req);
-			free(req);
-		}
-
-	}
-
-	STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
-	_STARPU_MPI_LOG_OUT();
+// static void _starpu_mpi_test_detached_requests(void)
+// {
+// 	_STARPU_MPI_LOG_IN();
+// 	int flag;
+// 	MPI_Status status;
+// 	struct _starpu_mpi_req *req, *next_req;
+
+// 	STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
+
+// 	for (req = _starpu_mpi_req_list_begin(detached_requests);
+// 		req != _starpu_mpi_req_list_end(detached_requests);
+// 		req = next_req)
+// 	{
+// 		next_req = _starpu_mpi_req_list_next(req);
+
+// 		STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
+
+// 		//_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %d - TYPE %s %d\n", &req->request, req->mpi_tag, _starpu_mpi_request_type(req->request_type), req->srcdst);
+// 		//TODO req->ret = MPI_Test(&req->request, &flag, &status);
+// 		STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %d", req->ret);
+
+// 		if (flag)
+// 		{
+// 			if (req->request_type == RECV_REQ)
+// 			{
+// 				TRACE_MPI_IRECV_COMPLETE_BEGIN(req->srcdst, req->mpi_tag);
+// 			}
+// 			else if (req->request_type == SEND_REQ)
+// 			{
+// 				TRACE_MPI_ISEND_COMPLETE_BEGIN(req->srcdst, req->mpi_tag, 0);
+// 			}
+
+// 			_starpu_mpi_handle_request_termination(req);
+
+// 			if (req->request_type == RECV_REQ)
+// 			{
+// 				TRACE_MPI_IRECV_COMPLETE_END(req->srcdst, req->mpi_tag);
+// 			}
+// 			else if (req->request_type == SEND_REQ)
+// 			{
+// 				TRACE_MPI_ISEND_COMPLETE_END(req->srcdst, req->mpi_tag, 0);
+// 			}
+// 		}
+
+// 		STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
+
+// 		if (flag)
+// 		{
+// 			_starpu_mpi_req_list_erase(detached_requests, req);
+// 			free(req);
+// 		}
+
+// 	}
+
+// 	STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
+// 	_STARPU_MPI_LOG_OUT();
+// }
+
+void _starpu_mpi_handle_request_termination_callback(nm_sr_event_t event, const nm_sr_event_info_t*event_info, void*ref){
+	_starpu_mpi_handle_request_termination(ref,event);
 }
 
 static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req)
 {
 	if (req->detached)
 	{
-		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-		_starpu_mpi_req_list_push_front(detached_requests, req);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+		nm_sr_request_set_ref(&(req->request), req);
 
-		starpu_wake_all_blocked_workers();
+		nm_sr_request_monitor(req->session, &(req->request), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
+		if(req->request_type == SEND_REQ && req->waited>1){
+			nm_sr_request_set_ref(&(req->size_req), req);
 
-		/* put the submitted request into the list of pending requests
-		 * so that it can be handled by the progression mechanisms */
-		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-		STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+			nm_sr_request_monitor(req->session, &(req->size_req), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
+		}
 	}
 }
 
@@ -857,17 +731,6 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 {
 	struct _starpu_mpi_argc_argv *argc_argv = (struct _starpu_mpi_argc_argv *) arg;
 
-	if (argc_argv->initialize_mpi)
-	{
-		int thread_support;
-		_STARPU_DEBUG("Calling MPI_Init_thread\n");
-		if (MPI_Init_thread(argc_argv->argc, argc_argv->argv, MPI_THREAD_SERIALIZED, &thread_support) != MPI_SUCCESS)
-		{
-			_STARPU_ERROR("MPI_Init_thread failed\n");
-		}
-		_starpu_mpi_print_thread_level_support(thread_support, "_Init_thread level =");
-	}
-	else
 	{
 		int provided;
 		MPI_Query_thread(&provided);
@@ -885,56 +748,67 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	}
 
 
-	/* notify the main thread that the progression thread is ready */
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+	/* notify the main thread that the progression thread is ready */ //Why?
+//	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 	running = 1;
-	STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
+/*	STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 
 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+	printf("CALLBACK T START\n");
+	*/
 	while (running || posted_requests || !(_starpu_mpi_req_list_empty(new_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
-	{
-		/* shall we block ? */
-		unsigned block = _starpu_mpi_req_list_empty(new_requests);
+	{//FIXME how do i know if it there won't be any other callback?
+		printf("CALLBACK T IT\n");
 
-#ifndef STARPU_MPI_ACTIVITY
-		block = block && _starpu_mpi_req_list_empty(detached_requests);
-#endif /* STARPU_MPI_ACTIVITY */
-
-		if (block)
-		{
-			_STARPU_MPI_DEBUG(3, "NO MORE REQUESTS TO HANDLE\n");
-
-			TRACE_MPI_SLEEP_BEGIN();
-
-			if (barrier_running)
-				/* Tell mpi_barrier */
-				STARPU_PTHREAD_COND_SIGNAL(&cond_finished);
-			STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
-
-			TRACE_MPI_SLEEP_END();
-		}
-
-		/* test whether there are some terminated "detached request" */
-		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-		_starpu_mpi_test_detached_requests();
-		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-
-		/* get one request */
-		struct _starpu_mpi_req *req;
-		while (!_starpu_mpi_req_list_empty(new_requests))
-		{
-			req = _starpu_mpi_req_list_pop_back(new_requests);
-
-			/* handling a request is likely to block for a while
-			 * (on a sync_data_with_mem call), we want to let the
-			 * application submit requests in the meantime, so we
-			 * release the lock. */
-			STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-			_starpu_mpi_handle_new_request(req);
-			STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-		}
+		/* shall we block ? */
+// 		unsigned block = _starpu_mpi_req_list_empty(new_requests);
+
+// #ifndef STARPU_MPI_ACTIVITY
+// 		block = block && _starpu_mpi_req_list_empty(detached_requests);
+// #endif /* STARPU_MPI_ACTIVITY */
+		starpu_sem_wait(&callback_sem);
+
+		// if (block)
+		// {
+		// 	_STARPU_MPI_DEBUG(3, "NO MORE REQUESTS TO HANDLE\n");
+
+		// 	TRACE_MPI_SLEEP_BEGIN();
+
+		// 	if (barrier_running)
+		// 		 Tell mpi_barrier 
+		// 		STARPU_PTHREAD_COND_SIGNAL(&cond_finished);
+		// 	STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
+
+		// 	TRACE_MPI_SLEEP_END();
+		// }
+
+		struct callback_lfstack_cell_s* c = callback_lfstack_pop(&callback_stack);
+		STARPU_ASSERT_MSG(c!=NULL, "Callback thread awakened without callback ready.");
+		c->callback(c->callback_arg);
+		free(c);
+
+		// /* test whether there are some terminated "detached request" */
+		// STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+		// _starpu_mpi_test_detached_requests();
+		// STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+
+		// /* get one request */
+		// struct _starpu_mpi_req *req;
+		// while (!_starpu_mpi_req_list_empty(new_requests))
+		// {
+		// 	req = _starpu_mpi_req_list_pop_back(new_requests);
+
+		// 	/* handling a request is likely to block for a while
+		// 	 * (on a sync_data_with_mem call), we want to let the
+		// 	 * application submit requests in the meantime, so we
+		// 	 * release the lock. */
+		// 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+		// 	_starpu_mpi_handle_new_request(req);
+		// 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+		// }
 	}
+	printf("CALLBACK IT END\n");
 
 	STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
 	STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(new_requests), "List of new requests not empty");
@@ -947,7 +821,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	}
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-
+	starpu_sem_destroy(&callback_sem);
 	free(argc_argv);
 	return NULL;
 }
@@ -1009,17 +883,33 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi)
 
 	STARPU_PTHREAD_MUTEX_INIT(&mutex_posted_requests, NULL);
 
+	starpu_sem_init(&callback_sem, 0, 0);
+
 	struct _starpu_mpi_argc_argv *argc_argv = malloc(sizeof(struct _starpu_mpi_argc_argv));
 	argc_argv->initialize_mpi = initialize_mpi;
 	argc_argv->argc = argc;
 	argc_argv->argv = argv;
 
+
+	if (initialize_mpi)
+	{
+		printf("Init MPI\n");
+		int thread_support;
+		_STARPU_DEBUG("Calling MPI_Init_thread\n");
+		if (MPI_Init_thread(argc_argv->argc, argc_argv->argv, MPI_THREAD_SERIALIZED, &thread_support) != MPI_SUCCESS)
+		{
+			_STARPU_ERROR("MPI_Init_thread failed\n");
+		}
+		_starpu_mpi_print_thread_level_support(thread_support, "_Init_thread level =");
+	}
+
+
 	STARPU_PTHREAD_CREATE(&progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
 
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-	while (!running)
-		STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	// STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+	// while (!running)
+	//	STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
+	// STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 
 #ifdef STARPU_MPI_ACTIVITY
 	hookid = starpu_progression_hook_register(_starpu_mpi_progression_hook_func, NULL);
@@ -1068,10 +958,10 @@ int starpu_mpi_shutdown(void)
 	MPI_Comm_size(MPI_COMM_WORLD, &world_size);
 
 	/* kill the progression thread */
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+//	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 	running = 0;
-	STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+//	STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
+//	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 
 	starpu_pthread_join(progress_thread, &value);
 
@@ -1094,7 +984,7 @@ int starpu_mpi_shutdown(void)
 
 void _starpu_mpi_clear_cache(starpu_data_handle_t data_handle)
 {
-	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
+//	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
 	_starpu_mpi_cache_flush(data_handle);
 	free(data_handle->mpi_data);
 }

+ 19 - 5
nmad/src/starpu_mpi_private.h

@@ -23,6 +23,8 @@
 #include "starpu_mpi.h"
 #include "starpu_mpi_fxt.h"
 #include <common/list.h>
+#include <nm_mpi_private.h>
+#include <piom_lock.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -126,20 +128,22 @@ LIST_TYPE(_starpu_mpi_req,
 	int user_datatype;
 
 	/* who are we talking to ? */
-	int srcdst;
-	int mpi_tag;
+	nm_gate_t gate;
 	MPI_Comm comm;
+	int mpi_tag;
+	int srcdst;
+	nm_session_t session;
 
 	void (*func)(struct _starpu_mpi_req *);
 
 	MPI_Status *status;
-	MPI_Request request;
+	nm_sr_request_t request;
 	int *flag;
 	unsigned sync;
 
 	int ret;
 	starpu_pthread_mutex_t req_mutex;
-	starpu_pthread_cond_t req_cond;
+	piom_cond_t req_cond;
 
 	enum _starpu_mpi_request_type request_type; /* 0 send, 1 recv */
 
@@ -156,7 +160,9 @@ LIST_TYPE(_starpu_mpi_req,
 	void (*callback)(void *);
 
         /* in the case of user-defined datatypes, we need to send the size of the data */
-	MPI_Request size_req;
+	nm_sr_request_t size_req;
+
+	int waited;
 );
 
 struct _starpu_mpi_data
@@ -166,6 +172,14 @@ struct _starpu_mpi_data
 	MPI_Comm comm;
 };
 
+#define _starpu_mpi_req_status(PUBLIC_REQ,STATUS) do {\
+  STATUS->MPI_SOURCE=PUBLIC_REQ->srcdst; /**< field name mandatory by spec */\
+  STATUS->MPI_TAG=PUBLIC_REQ->mpi_tag;    /**< field name mandatory by spec */\
+  STATUS->MPI_ERROR=PUBLIC_REQ->ret;  /**< field name mandatory by spec */\
+  STATUS->size=PUBLIC_REQ->count;       /**< size of data received */\
+  STATUS->cancelled=0;  /**< whether request was cancelled */\
+} while(0)
+
 #ifdef __cplusplus
 }
 #endif

+ 89 - 1
src/common/thread.c

@@ -19,6 +19,9 @@
 #include <core/simgrid.h>
 #include <core/workers.h>
 
+#include <errno.h>
+#include <limits.h>
+
 #ifdef STARPU_SIMGRID
 #ifdef STARPU_HAVE_XBT_SYNCHRO_H
 #include <xbt/synchro.h>
@@ -48,6 +51,16 @@ static int _starpu_futex_wake = FUTEX_WAKE;
 
 extern int _starpu_simgrid_thread_start(int argc, char *argv[]);
 
+int starpu_pthread_equal(starpu_pthread_t t1, starpu_pthread_t t2)
+{
+	return t1 == t2;
+}
+
+starpu_pthread_t starpu_pthread_self(void)
+{
+	return MSG_process_self();
+}
+
 int starpu_pthread_create_on(char *name, starpu_pthread_t *thread, const starpu_pthread_attr_t *attr STARPU_ATTRIBUTE_UNUSED, void *(*start_routine) (void *), void *arg, msg_host_t host)
 {
 	char **_args;
@@ -60,6 +73,9 @@ int starpu_pthread_create_on(char *name, starpu_pthread_t *thread, const starpu_
 	void *tsd;
 	_STARPU_CALLOC(tsd, MAX_TSD+1, sizeof(void*));
 	*thread = MSG_process_create_with_arguments(name, _starpu_simgrid_thread_start, tsd, host, 2, _args);
+#if SIMGRID_VERSION_MAJOR > 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR >= 15)
+	MSG_process_ref(*thread);
+#endif
 	return 0;
 }
 
@@ -72,6 +88,9 @@ int starpu_pthread_join(starpu_pthread_t thread STARPU_ATTRIBUTE_UNUSED, void **
 {
 #if SIMGRID_VERSION_MAJOR > 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR >= 14)
 	MSG_process_join(thread, 1000000);
+#if SIMGRID_VERSION_MAJOR > 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR >= 15)
+	MSG_process_unref(thread);
+#endif
 #else
 	MSG_process_sleep(1);
 #endif
@@ -517,7 +536,7 @@ int starpu_pthread_queue_destroy(starpu_pthread_queue_t *q)
 #endif /* STARPU_SIMGRID */
 
 #if (defined(STARPU_SIMGRID) && !defined(STARPU_SIMGRID_HAVE_XBT_BARRIER_INIT)) || (!defined(STARPU_SIMGRID) && !defined(STARPU_HAVE_PTHREAD_BARRIER))
-int starpu_pthread_barrier_init(starpu_pthread_barrier_t *restrict barrier, const starpu_pthread_barrierattr_t *restrict attr, unsigned count)
+int starpu_pthread_barrier_init(starpu_pthread_barrier_t *restrict barrier, const starpu_pthread_barrierattr_t *restrict attr STARPU_ATTRIBUTE_UNUSED, unsigned count)
 {
 	int ret = starpu_pthread_mutex_init(&barrier->mutex, NULL);
 	if (!ret)
@@ -855,3 +874,72 @@ void _starpu_pthread_spin_do_unlock(starpu_pthread_spinlock_t *lock)
 #endif
 
 #endif /* defined(STARPU_SIMGRID) || (defined(STARPU_LINUX_SYS) && defined(STARPU_HAVE_XCHG)) || !defined(STARPU_HAVE_PTHREAD_SPIN_LOCK) */
+
+#ifdef STARPU_SIMGRID
+
+int starpu_sem_destroy(starpu_sem_t *sem)
+{
+	MSG_sem_destroy(*sem);
+	return 0;
+}
+
+int starpu_sem_init(starpu_sem_t *sem, int pshared, unsigned value)
+{
+	STARPU_ASSERT_MSG(pshared == 0, "pshared semaphores not supported under simgrid");
+	*sem = MSG_sem_init(value);
+	return 0;
+}
+
+int starpu_sem_post(starpu_sem_t *sem)
+{
+	MSG_sem_release(*sem);
+	return 0;
+}
+
+int starpu_sem_wait(starpu_sem_t *sem)
+{
+	MSG_sem_acquire(*sem);
+	return 0;
+}
+
+int starpu_sem_trywait(starpu_sem_t *sem)
+{
+	if (MSG_sem_would_block(*sem))
+		return EAGAIN;
+	starpu_sem_wait(sem);
+	return 0;
+}
+
+int starpu_sem_getvalue(starpu_sem_t *sem, int *sval)
+{
+#if SIMGRID_VERSION_MAJOR > 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR > 13)
+	*sval = MSG_sem_get_capacity(*sem);
+	return 0;
+#else
+	(void) sem;
+	(void) sval;
+	STARPU_ABORT_MSG("sigmrid up to 3.13 did not have working MSG_sem_get_capacity");
+#endif
+}
+
+#elif defined(BUILDING_STARPU) /* !STARPU_SIMGRID */
+
+int starpu_sem_wait(starpu_sem_t *sem)
+{
+	int ret;
+	while((ret = sem_wait(sem)) == -1 && errno == EINTR)
+		;
+
+	return ret;
+}
+
+int starpu_sem_trywait(starpu_sem_t *sem)
+{
+	int ret;
+	while((ret = sem_trywait(sem)) == -1 && errno == EINTR)
+		;
+	
+	return ret;
+}
+
+#endif