瀏覽代碼

mpi: access mpi and nmad implementations through a backend interface to avoid having MPI or NMAD code directly in the common code

Nathalie Furmento 5 年之前
父節點
當前提交
2906ec44bd

+ 5 - 1
mpi/src/Makefile.am

@@ -1,7 +1,7 @@
 # StarPU --- Runtime system for heterogeneous multicore architectures.
 #
 # Copyright (C) 2012,2016                                Inria
-# Copyright (C) 2010-2018                                CNRS
+# Copyright (C) 2010-2019                                CNRS
 # Copyright (C) 2009-2014,2018                           Université de Bordeaux
 #
 # StarPU is free software; you can redistribute it and/or modify
@@ -73,6 +73,8 @@ noinst_HEADERS =					\
 	mpi/starpu_mpi_comm.h				\
 	mpi/starpu_mpi_tag.h				\
 	mpi/starpu_mpi_driver.h				\
+	mpi/starpu_mpi_mpi_backend.h			\
+	nmad/starpu_mpi_nmad_backend.h			\
 	load_balancer/policy/data_movements_interface.h	\
 	load_balancer/policy/load_data_interface.h	\
 	load_balancer/policy/load_balancer_policy.h
@@ -94,7 +96,9 @@ libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
 	starpu_mpi_task_insert_fortran.c		\
 	starpu_mpi_init.c				\
 	nmad/starpu_mpi_nmad.c				\
+	nmad/starpu_mpi_nmad_backend.c			\
 	mpi/starpu_mpi_mpi.c				\
+	mpi/starpu_mpi_mpi_backend.c			\
 	mpi/starpu_mpi_early_data.c			\
 	mpi/starpu_mpi_early_request.c			\
 	mpi/starpu_mpi_sync_data.c			\

+ 3 - 1
mpi/src/mpi/starpu_mpi_comm.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2015-2017                                CNRS
+ * Copyright (C) 2015-2017, 2019                          CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -23,6 +23,8 @@
 
 #ifdef STARPU_USE_MPI_MPI
 
+#include <mpi/starpu_mpi_mpi_backend.h>
+
 #ifdef __cplusplus
 extern "C"
 {

+ 1 - 0
mpi/src/mpi/starpu_mpi_early_data.c

@@ -18,6 +18,7 @@
 #include <stdlib.h>
 #include <starpu_mpi.h>
 #include <mpi/starpu_mpi_early_data.h>
+#include <mpi/starpu_mpi_mpi_backend.h>
 #include <starpu_mpi_private.h>
 #include <common/uthash.h>
 

+ 98 - 98
mpi/src/mpi/starpu_mpi_mpi.c

@@ -166,7 +166,7 @@ void _starpu_mpi_submit_ready_request(void *arg)
 
 	_STARPU_MPI_INC_POSTED_REQUESTS(-1);
 
-	_STARPU_MPI_DEBUG(0, "new req %p srcdst %d tag %"PRIi64" and type %s %d\n", req, req->node_tag.rank, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->is_internal_req);
+	_STARPU_MPI_DEBUG(0, "new req %p srcdst %d tag %"PRIi64" and type %s %d\n", req, req->node_tag.rank, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->backend->is_internal_req);
 
 	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 
@@ -178,7 +178,7 @@ void _starpu_mpi_submit_ready_request(void *arg)
 		 * pointer associated to the data_handle, and push it into the
 		 * ready_requests list, so as the real MPI request can be submitted
 		 * before the next submission of the envelope-catching request. */
-		if (req->is_internal_req)
+		if (req->backend->is_internal_req)
 		{
 			_starpu_mpi_datatype_allocate(req->data_handle, req);
 			if (req->registered_datatype == 1)
@@ -200,10 +200,10 @@ void _starpu_mpi_submit_ready_request(void *arg)
 
 			/* inform the starpu mpi thread that the request has been pushed in the ready_requests list */
 			STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
-			STARPU_PTHREAD_MUTEX_LOCK(&req->posted_mutex);
+			STARPU_PTHREAD_MUTEX_LOCK(&req->backend->posted_mutex);
 			req->posted = 1;
-			STARPU_PTHREAD_COND_BROADCAST(&req->posted_cond);
-			STARPU_PTHREAD_MUTEX_UNLOCK(&req->posted_mutex);
+			STARPU_PTHREAD_COND_BROADCAST(&req->backend->posted_cond);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->posted_mutex);
 			STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 		}
 		else
@@ -227,8 +227,8 @@ void _starpu_mpi_submit_ready_request(void *arg)
 				_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %"PRIi64" has already been received, copying previously received data into handle's pointer..\n", req, req->node_tag.data_tag);
 				STARPU_ASSERT(req->data_handle != early_data_handle->handle);
 
-				req->internal_req = early_data_handle->req;
-				req->early_data_handle = early_data_handle;
+				req->backend->internal_req = early_data_handle->req;
+				req->backend->early_data_handle = early_data_handle;
 
 				struct _starpu_mpi_early_data_cb_args *cb_args;
 				_STARPU_MPI_MALLOC(cb_args, sizeof(struct _starpu_mpi_early_data_cb_args));
@@ -368,27 +368,27 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 	if (req->sync == 0)
 	{
 		_STARPU_MPI_COMM_TO_DEBUG(req, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.data_tag, req->node_tag.comm);
-		req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.comm, &req->data_request);
+		req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.comm, &req->backend->data_request);
 		STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
 	}
 	else
 	{
 		_STARPU_MPI_COMM_TO_DEBUG(req, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.data_tag, req->node_tag.comm);
-		req->ret = MPI_Issend(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.comm, &req->data_request);
+		req->ret = MPI_Issend(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.comm, &req->backend->data_request);
 		STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Issend returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
 	}
 
 #ifdef STARPU_SIMGRID
-	_starpu_mpi_simgrid_wait_req(&req->data_request, &req->status_store, &req->queue, &req->done);
+	_starpu_mpi_simgrid_wait_req(&req->backend->data_request, &req->status_store, &req->queue, &req->done);
 #endif
 
 	_STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->node_tag.rank, req->node_tag.data_tag, starpu_data_get_size(req->data_handle), req->pre_sync_jobid);
 
 	/* somebody is perhaps waiting for the MPI request to be posted */
-	STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);
 	req->submitted = 1;
-	STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
+	STARPU_PTHREAD_COND_BROADCAST(&req->backend->req_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
 
 	_starpu_mpi_handle_detached_request(req);
 
@@ -399,10 +399,10 @@ void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 {
 	_starpu_mpi_datatype_allocate(req->data_handle, req);
 
-	_STARPU_MPI_CALLOC(req->envelope, 1,sizeof(struct _starpu_mpi_envelope));
-	req->envelope->mode = _STARPU_MPI_ENVELOPE_DATA;
-	req->envelope->data_tag = req->node_tag.data_tag;
-	req->envelope->sync = req->sync;
+	_STARPU_MPI_CALLOC(req->backend->envelope, 1,sizeof(struct _starpu_mpi_envelope));
+	req->backend->envelope->mode = _STARPU_MPI_ENVELOPE_DATA;
+	req->backend->envelope->data_tag = req->node_tag.data_tag;
+	req->backend->envelope->sync = req->sync;
 
 	if (req->registered_datatype == 1)
 	{
@@ -411,10 +411,10 @@ void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 		req->ptr = starpu_data_handle_to_pointer(req->data_handle, STARPU_MAIN_RAM);
 
 		MPI_Type_size(req->datatype, &size);
-		req->envelope->size = (starpu_ssize_t)req->count * size;
+		req->backend->envelope->size = (starpu_ssize_t)req->count * size;
 		_STARPU_MPI_DEBUG(20, "Post MPI isend count (%ld) datatype_size %ld request to %d\n",req->count,starpu_data_get_size(req->data_handle), req->node_tag.rank);
-		_STARPU_MPI_COMM_TO_DEBUG(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->envelope->data_tag, req->node_tag.comm);
-		ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm, &req->size_req);
+		_STARPU_MPI_COMM_TO_DEBUG(req->backend->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->backend->envelope->data_tag, req->node_tag.comm);
+		ret = MPI_Isend(req->backend->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm, &req->backend->size_req);
 		STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "when sending envelope, MPI_Isend returning %s", _starpu_mpi_get_mpi_error_code(ret));
 	}
 	else
@@ -422,32 +422,32 @@ void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 		int ret;
 
  		// Do not pack the data, just try to find out the size
-		starpu_data_pack(req->data_handle, NULL, &(req->envelope->size));
+		starpu_data_pack(req->data_handle, NULL, &(req->backend->envelope->size));
 
-		if (req->envelope->size != -1)
+		if (req->backend->envelope->size != -1)
  		{
  			// We already know the size of the data, let's send it to overlap with the packing of the data
-			_STARPU_MPI_DEBUG(20, "Sending size %ld (%ld %s) to node %d (first call to pack)\n", req->envelope->size, sizeof(req->count), "MPI_BYTE", req->node_tag.rank);
-			req->count = req->envelope->size;
-			_STARPU_MPI_COMM_TO_DEBUG(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->envelope->data_tag, req->node_tag.comm);
-			ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm, &req->size_req);
+			_STARPU_MPI_DEBUG(20, "Sending size %ld (%ld %s) to node %d (first call to pack)\n", req->backend->envelope->size, sizeof(req->count), "MPI_BYTE", req->node_tag.rank);
+			req->count = req->backend->envelope->size;
+			_STARPU_MPI_COMM_TO_DEBUG(req->backend->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->backend->envelope->data_tag, req->node_tag.comm);
+			ret = MPI_Isend(req->backend->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm, &req->backend->size_req);
 			STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %s", _starpu_mpi_get_mpi_error_code(ret));
  		}
 
  		// Pack the data
  		starpu_data_pack(req->data_handle, &req->ptr, &req->count);
-		if (req->envelope->size == -1)
+		if (req->backend->envelope->size == -1)
  		{
  			// We know the size now, let's send it
-			_STARPU_MPI_DEBUG(20, "Sending size %ld (%ld %s) to node %d (second call to pack)\n", req->envelope->size, sizeof(req->count), "MPI_BYTE", req->node_tag.rank);
-			_STARPU_MPI_COMM_TO_DEBUG(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->envelope->data_tag, req->node_tag.comm);
-			ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm, &req->size_req);
+			_STARPU_MPI_DEBUG(20, "Sending size %ld (%ld %s) to node %d (second call to pack)\n", req->backend->envelope->size, sizeof(req->count), "MPI_BYTE", req->node_tag.rank);
+			_STARPU_MPI_COMM_TO_DEBUG(req->backend->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->backend->envelope->data_tag, req->node_tag.comm);
+			ret = MPI_Isend(req->backend->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm, &req->backend->size_req);
 			STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %s", _starpu_mpi_get_mpi_error_code(ret));
  		}
  		else
  		{
  			// We check the size returned with the 2 calls to pack is the same
-			STARPU_MPI_ASSERT_MSG(req->count == req->envelope->size, "Calls to pack_data returned different sizes %ld != %ld", req->count, req->envelope->size);
+			STARPU_MPI_ASSERT_MSG(req->count == req->backend->envelope->size, "Calls to pack_data returned different sizes %ld != %ld", req->count, req->backend->envelope->size);
  		}
 		// We can send the data now
 	}
@@ -495,14 +495,14 @@ void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
 	if (req->sync)
 	{
 		_STARPU_MPI_COMM_FROM_DEBUG(req, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.data_tag, req->node_tag.comm);
-		req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.comm, &req->data_request);
+		req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.comm, &req->backend->data_request);
 	}
 	else
 	{
 		_STARPU_MPI_COMM_FROM_DEBUG(req, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.data_tag, req->node_tag.comm);
-		req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.comm, &req->data_request);
+		req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.comm, &req->backend->data_request);
 #ifdef STARPU_SIMGRID
-		_starpu_mpi_simgrid_wait_req(&req->data_request, &req->status_store, &req->queue, &req->done);
+		_starpu_mpi_simgrid_wait_req(&req->backend->data_request, &req->status_store, &req->queue, &req->done);
 #endif
 	}
 	STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
@@ -510,10 +510,10 @@ void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
 	_STARPU_MPI_TRACE_IRECV_SUBMIT_END(req->node_tag.rank, req->node_tag.data_tag);
 
 	/* somebody is perhaps waiting for the MPI request to be posted */
-	STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);
 	req->submitted = 1;
-	STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
+	STARPU_PTHREAD_COND_BROADCAST(&req->backend->req_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
 
 	_starpu_mpi_handle_detached_request(req);
 
@@ -530,16 +530,16 @@ void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
 {
 	_STARPU_MPI_LOG_IN();
 	/* Which is the mpi request we are waiting for ? */
-	struct _starpu_mpi_req *req = waiting_req->other_request;
+	struct _starpu_mpi_req *req = waiting_req->backend->other_request;
 
 	_STARPU_MPI_TRACE_UWAIT_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
-	if (req->data_request != MPI_REQUEST_NULL)
+	if (req->backend->data_request != MPI_REQUEST_NULL)
 	{
 		// TODO: Fix for STARPU_SIMGRID
 #ifdef STARPU_SIMGRID
 		STARPU_MPI_ASSERT_MSG(0, "Implement this in STARPU_SIMGRID");
 #endif
-		req->ret = MPI_Wait(&req->data_request, waiting_req->status);
+		req->ret = MPI_Wait(&req->backend->data_request, waiting_req->status);
 		STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
 	}
 	_STARPU_MPI_TRACE_UWAIT_END(req->node_tag.rank, req->node_tag.data_tag);
@@ -559,34 +559,34 @@ int _starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
 
 	/* We cannot try to complete a MPI request that was not actually posted
 	 * to MPI yet. */
-	STARPU_PTHREAD_MUTEX_LOCK(&(req->req_mutex));
+	STARPU_PTHREAD_MUTEX_LOCK(&(req->backend->req_mutex));
 	while (!(req->submitted))
-		STARPU_PTHREAD_COND_WAIT(&(req->req_cond), &(req->req_mutex));
-	STARPU_PTHREAD_MUTEX_UNLOCK(&(req->req_mutex));
+		STARPU_PTHREAD_COND_WAIT(&(req->backend->req_cond), &(req->backend->req_mutex));
+	STARPU_PTHREAD_MUTEX_UNLOCK(&(req->backend->req_mutex));
 
 	/* Initialize the request structure */
 	 _starpu_mpi_request_init(&waiting_req);
 	waiting_req->prio = INT_MAX;
 	waiting_req->status = status;
-	waiting_req->other_request = req;
+	waiting_req->backend->other_request = req;
 	waiting_req->func = _starpu_mpi_wait_func;
 	waiting_req->request_type = WAIT_REQ;
 
 	_starpu_mpi_submit_ready_request_inc(waiting_req);
 
 	/* We wait for the MPI request to finish */
-	STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);
 	while (!req->completed)
-		STARPU_PTHREAD_COND_WAIT(&req->req_cond, &req->req_mutex);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
+		STARPU_PTHREAD_COND_WAIT(&req->backend->req_cond, &req->backend->req_mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
 
 	ret = req->ret;
 
 	/* The internal request structure was automatically allocated */
 	*public_req = NULL;
-	if (req->internal_req)
+	if (req->backend->internal_req)
 	{
-		_starpu_mpi_request_destroy(req->internal_req);
+		_starpu_mpi_request_destroy(req->backend->internal_req);
 	}
 	_starpu_mpi_request_destroy(req);
 	_starpu_mpi_request_destroy(waiting_req);
@@ -605,7 +605,7 @@ void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
 {
 	_STARPU_MPI_LOG_IN();
 	/* Which is the mpi request we are testing for ? */
-	struct _starpu_mpi_req *req = testing_req->other_request;
+	struct _starpu_mpi_req *req = testing_req->backend->other_request;
 
 	_STARPU_MPI_DEBUG(0, "Test request %p type %s tag %"PRIi64" src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
 			  req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr,
@@ -617,7 +617,7 @@ void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
 	req->ret = _starpu_mpi_simgrid_mpi_test(&req->done, testing_req->flag);
 	memcpy(testing_req->status, &req->status_store, sizeof(*testing_req->status));
 #else
-	req->ret = MPI_Test(&req->data_request, testing_req->flag, testing_req->status);
+	req->ret = MPI_Test(&req->backend->data_request, testing_req->flag, testing_req->status);
 #endif
 
 	STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
@@ -630,10 +630,10 @@ void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
 		_starpu_mpi_handle_request_termination(req);
 	}
 
-	STARPU_PTHREAD_MUTEX_LOCK(&testing_req->req_mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&testing_req->backend->req_mutex);
 	testing_req->completed = 1;
-	STARPU_PTHREAD_COND_SIGNAL(&testing_req->req_cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&testing_req->req_mutex);
+	STARPU_PTHREAD_COND_SIGNAL(&testing_req->backend->req_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&testing_req->backend->req_mutex);
 	_STARPU_MPI_LOG_OUT();
 }
 
@@ -648,9 +648,9 @@ int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 
 	STARPU_MPI_ASSERT_MSG(!req->detached, "MPI_Test cannot be called on a detached request");
 
-	STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);
 	unsigned submitted = req->submitted;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
 
 	if (submitted)
 	{
@@ -661,7 +661,7 @@ int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 		testing_req->prio = INT_MAX;
 		testing_req->flag = flag;
 		testing_req->status = status;
-		testing_req->other_request = req;
+		testing_req->backend->other_request = req;
 		testing_req->func = _starpu_mpi_test_func;
 		testing_req->completed = 0;
 		testing_req->request_type = TEST_REQ;
@@ -669,10 +669,10 @@ int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 		_starpu_mpi_submit_ready_request_inc(testing_req);
 
 		/* We wait for the test request to finish */
-		STARPU_PTHREAD_MUTEX_LOCK(&(testing_req->req_mutex));
+		STARPU_PTHREAD_MUTEX_LOCK(&(testing_req->backend->req_mutex));
 		while (!(testing_req->completed))
-			STARPU_PTHREAD_COND_WAIT(&(testing_req->req_cond), &(testing_req->req_mutex));
-		STARPU_PTHREAD_MUTEX_UNLOCK(&(testing_req->req_mutex));
+			STARPU_PTHREAD_COND_WAIT(&(testing_req->backend->req_cond), &(testing_req->backend->req_mutex));
+		STARPU_PTHREAD_MUTEX_UNLOCK(&(testing_req->backend->req_mutex));
 
 		ret = testing_req->ret;
 
@@ -682,9 +682,9 @@ int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 			 * request structure which was automatically allocated
 			 * */
 			*public_req = NULL;
-			if (req->internal_req)
+			if (req->backend->internal_req)
 			{
-				_starpu_mpi_request_destroy(req->internal_req);
+				_starpu_mpi_request_destroy(req->backend->internal_req);
 			}
 			_starpu_mpi_request_destroy(req);
 		}
@@ -759,10 +759,10 @@ int _starpu_mpi_barrier(MPI_Comm comm)
 	_starpu_mpi_submit_ready_request(barrier_req);
 
 	/* We wait for the MPI request to finish */
-	STARPU_PTHREAD_MUTEX_LOCK(&barrier_req->req_mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&barrier_req->backend->req_mutex);
 	while (!barrier_req->completed)
-		STARPU_PTHREAD_COND_WAIT(&barrier_req->req_cond, &barrier_req->req_mutex);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&barrier_req->req_mutex);
+		STARPU_PTHREAD_COND_WAIT(&barrier_req->backend->req_cond, &barrier_req->backend->req_mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&barrier_req->backend->req_mutex);
 
 	_starpu_mpi_request_destroy(barrier_req);
 	_STARPU_MPI_LOG_OUT();
@@ -798,12 +798,12 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 
 	_STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %"PRIi64" src %d data %p ptr %p datatype '%s' count %d registered_datatype %d internal_req %p\n",
 			  req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr,
-			  req->datatype_name, (int)req->count, req->registered_datatype, req->internal_req);
+			  req->datatype_name, (int)req->count, req->registered_datatype, req->backend->internal_req);
 
-	if (req->internal_req)
+	if (req->backend->internal_req)
 	{
-		free(req->early_data_handle);
-		req->early_data_handle = NULL;
+		free(req->backend->early_data_handle);
+		req->backend->early_data_handle = NULL;
 	}
 	else
 	{
@@ -817,7 +817,7 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 					// has completed, as MPI can re-order messages, let's call
 					// MPI_Wait to make sure data have been sent
 					int ret;
-					ret = MPI_Wait(&req->size_req, MPI_STATUS_IGNORE);
+					ret = MPI_Wait(&req->backend->size_req, MPI_STATUS_IGNORE);
 					STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_error_code(ret));
 					free(req->ptr);
 					req->ptr = NULL;
@@ -839,10 +839,10 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 
 	_starpu_mpi_release_req_data(req);
 
-	if (req->envelope)
+	if (req->backend->envelope)
 	{
-		free(req->envelope);
-		req->envelope = NULL;
+		free(req->backend->envelope);
+		req->backend->envelope = NULL;
 	}
 
 	/* Execute the specified callback, if any */
@@ -851,10 +851,10 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 
 	/* tell anyone potentially waiting on the request that it is
 	 * terminated now */
-	STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);
 	req->completed = 1;
-	STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
+	STARPU_PTHREAD_COND_BROADCAST(&req->backend->req_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
 	_STARPU_MPI_LOG_OUT();
 }
 
@@ -903,18 +903,18 @@ static void _starpu_mpi_early_data_cb(void* arg)
 		if (args->req->detached)
 		{
 			/* have the internal request destroyed now or when completed */
-			STARPU_PTHREAD_MUTEX_LOCK(&args->req->internal_req->req_mutex);
-			if (args->req->internal_req->to_destroy)
+			STARPU_PTHREAD_MUTEX_LOCK(&args->req->backend->internal_req->backend->req_mutex);
+			if (args->req->backend->internal_req->backend->to_destroy)
 			{
 				/* The request completed first, can now destroy it */
-				STARPU_PTHREAD_MUTEX_UNLOCK(&args->req->internal_req->req_mutex);
-				_starpu_mpi_request_destroy(args->req->internal_req);
+				STARPU_PTHREAD_MUTEX_UNLOCK(&args->req->backend->internal_req->backend->req_mutex);
+				_starpu_mpi_request_destroy(args->req->backend->internal_req);
 			}
 			else
 			{
 				/* The request didn't complete yet, tell it to destroy it when it completes */
-				args->req->internal_req->to_destroy = 1;
-				STARPU_PTHREAD_MUTEX_UNLOCK(&args->req->internal_req->req_mutex);
+				args->req->backend->internal_req->backend->to_destroy = 1;
+				STARPU_PTHREAD_MUTEX_UNLOCK(&args->req->backend->internal_req->backend->req_mutex);
 			}
 			_starpu_mpi_handle_request_termination(args->req);
 			_starpu_mpi_request_destroy(args->req);
@@ -925,11 +925,11 @@ static void _starpu_mpi_early_data_cb(void* arg)
 			// be handled when calling starpu_mpi_wait
 			// We store in the application request the internal MPI
 			// request so that it can be used by starpu_mpi_wait
-			args->req->data_request = args->req->internal_req->data_request;
-			STARPU_PTHREAD_MUTEX_LOCK(&args->req->req_mutex);
+			args->req->backend->data_request = args->req->backend->internal_req->backend->data_request;
+			STARPU_PTHREAD_MUTEX_LOCK(&args->req->backend->req_mutex);
 			args->req->submitted = 1;
-			STARPU_PTHREAD_COND_BROADCAST(&args->req->req_cond);
-			STARPU_PTHREAD_MUTEX_UNLOCK(&args->req->req_mutex);
+			STARPU_PTHREAD_COND_BROADCAST(&args->req->backend->req_cond);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&args->req->backend->req_mutex);
 		}
 	}
 
@@ -959,12 +959,12 @@ static void _starpu_mpi_test_detached_requests(void)
 		STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
 
 		_STARPU_MPI_TRACE_TEST_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
-		//_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %"PRIi64" - TYPE %s %d\n", &req->data_request, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->node_tag.rank);
+		//_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %"PRIi64" - TYPE %s %d\n", &req->backend->data_request, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->node_tag.rank);
 #ifdef STARPU_SIMGRID
 		req->ret = _starpu_mpi_simgrid_mpi_test(&req->done, &flag);
 #else
-		STARPU_MPI_ASSERT_MSG(req->data_request != MPI_REQUEST_NULL, "Cannot test completion of the request MPI_REQUEST_NULL");
-		req->ret = MPI_Test(&req->data_request, &flag, MPI_STATUS_IGNORE);
+		STARPU_MPI_ASSERT_MSG(req->backend->data_request != MPI_REQUEST_NULL, "Cannot test completion of the request MPI_REQUEST_NULL");
+		req->ret = MPI_Test(&req->backend->data_request, &flag, MPI_STATUS_IGNORE);
 #endif
 
 		STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
@@ -991,19 +991,19 @@ static void _starpu_mpi_test_detached_requests(void)
 
 			_STARPU_MPI_TRACE_COMPLETE_END(req->request_type, req->node_tag.rank, req->node_tag.data_tag);
 
-			STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
+			STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);
 			/* We don't want to free internal non-detached
 			   requests, we need to get their MPI request before
 			   destroying them */
-			if (req->is_internal_req && !req->to_destroy)
+			if (req->backend->is_internal_req && !req->backend->to_destroy)
 			{
 				/* We have completed the request, let the application request destroy it */
-				req->to_destroy = 1;
-				STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
+				req->backend->to_destroy = 1;
+				STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
 			}
 			else
 			{
-				STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
+				STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
 				_starpu_mpi_request_destroy(req);
 			}
 
@@ -1096,10 +1096,10 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
 	// We wait until the request is pushed in the
 	// ready_request list
 	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
-	STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req->posted_mutex));
+	STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req->backend->posted_mutex));
 	while (!(early_data_handle->req->posted))
-		STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req->posted_cond), &(early_data_handle->req->posted_mutex));
-	STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req->posted_mutex));
+		STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req->backend->posted_cond), &(early_data_handle->req->backend->posted_mutex));
+	STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req->backend->posted_mutex));
 
 #ifdef STARPU_DEVEL
 #warning check if req_ready is still necessary
@@ -1345,7 +1345,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 							new_req->callback_arg = NULL;
 							new_req->func = _starpu_mpi_irecv_size_func;
 							new_req->sequential_consistency = 1;
-							new_req->is_internal_req = 0; // ????
+							new_req->backend->is_internal_req = 0; // ????
 							new_req->count = envelope->size;
 							_starpu_mpi_sync_data_add(new_req);
 						}

+ 117 - 0
mpi/src/mpi/starpu_mpi_mpi_backend.c

@@ -0,0 +1,117 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2017                                     Inria
+ * Copyright (C) 2010-2015,2017,2018,2019                 CNRS
+ * Copyright (C) 2009-2014,2017,2018-2019                 Université de Bordeaux
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <stdlib.h>
+#include <starpu_config.h>
+#include <starpu_mpi.h>
+#include <starpu_mpi_private.h>
+
+#ifdef STARPU_USE_MPI_MPI
+
+#include <mpi/starpu_mpi_mpi_backend.h>
+#include <mpi/starpu_mpi_tag.h>
+#include <mpi/starpu_mpi_comm.h>
+#include <mpi/starpu_mpi_comm.h>
+#include <mpi/starpu_mpi_tag.h>
+#include <mpi/starpu_mpi_driver.h>
+
+void _starpu_mpi_mpi_backend_init(struct starpu_conf *conf)
+{
+	_starpu_mpi_driver_init(conf);
+}
+
+void _starpu_mpi_mpi_backend_shutdown(void)
+{
+	_starpu_mpi_tag_shutdown();
+	_starpu_mpi_comm_shutdown();
+	_starpu_mpi_driver_shutdown();
+}
+
+int _starpu_mpi_mpi_backend_reserve_core(void)
+{
+	return (starpu_get_env_number_default("STARPU_MPI_DRIVER_CALL_FREQUENCY", 0) <= 0);
+}
+
+void _starpu_mpi_mpi_backend_request_init(struct _starpu_mpi_req *req)
+{
+	_STARPU_MPI_CALLOC(req->backend, 1, sizeof(struct _starpu_mpi_req_backend));
+
+	req->backend->data_request = 0;
+
+	STARPU_PTHREAD_MUTEX_INIT(&req->backend->req_mutex, NULL);
+	STARPU_PTHREAD_COND_INIT(&req->backend->req_cond, NULL);
+	STARPU_PTHREAD_MUTEX_INIT(&req->backend->posted_mutex, NULL);
+	STARPU_PTHREAD_COND_INIT(&req->backend->posted_cond, NULL);
+
+	req->backend->other_request = NULL;
+
+	req->backend->size_req = 0;
+	req->backend->internal_req = NULL;
+	req->backend->is_internal_req = 0;
+	req->backend->to_destroy = 1;
+	req->backend->early_data_handle = NULL;
+	req->backend->envelope = NULL;
+}
+
+void _starpu_mpi_mpi_backend_request_fill(struct _starpu_mpi_req *req, MPI_Comm comm, int is_internal_req)
+{
+	_starpu_mpi_comm_register(comm);
+
+	req->backend->is_internal_req = is_internal_req;
+	/* For internal requests, we wait for both the request completion and the matching application request completion */
+	req->backend->to_destroy = !is_internal_req;
+}
+
+void _starpu_mpi_mpi_backend_request_destroy(struct _starpu_mpi_req *req)
+{
+	STARPU_PTHREAD_MUTEX_DESTROY(&req->backend->req_mutex);
+	STARPU_PTHREAD_COND_DESTROY(&req->backend->req_cond);
+	STARPU_PTHREAD_MUTEX_DESTROY(&req->backend->posted_mutex);
+	STARPU_PTHREAD_COND_DESTROY(&req->backend->posted_cond);
+	free(req->backend);
+}
+
+void _starpu_mpi_mpi_backend_data_clear(starpu_data_handle_t data_handle)
+{
+	_starpu_mpi_tag_data_release(data_handle);
+}
+
+void _starpu_mpi_mpi_backend_data_register(starpu_data_handle_t data_handle, starpu_mpi_tag_t data_tag)
+{
+	_starpu_mpi_tag_data_register(data_handle, data_tag);
+}
+
+void _starpu_mpi_mpi_backend_comm_register(MPI_Comm comm)
+{
+	_starpu_mpi_comm_register(comm);
+}
+
+struct _starpu_mpi_backend _mpi_backend =
+{
+ 	._starpu_mpi_backend_init = _starpu_mpi_mpi_backend_init,
+ 	._starpu_mpi_backend_shutdown = _starpu_mpi_mpi_backend_shutdown,
+	._starpu_mpi_backend_reserve_core = _starpu_mpi_mpi_backend_reserve_core,
+	._starpu_mpi_backend_request_init = _starpu_mpi_mpi_backend_request_init,
+	._starpu_mpi_backend_request_fill = _starpu_mpi_mpi_backend_request_fill,
+	._starpu_mpi_backend_request_destroy = _starpu_mpi_mpi_backend_request_destroy,
+	._starpu_mpi_backend_data_clear = _starpu_mpi_mpi_backend_data_clear,
+	._starpu_mpi_backend_data_register = _starpu_mpi_mpi_backend_data_register,
+	._starpu_mpi_backend_comm_register = _starpu_mpi_mpi_backend_comm_register
+};
+
+#endif /* STARPU_USE_MPI_MPI*/

+ 80 - 0
mpi/src/mpi/starpu_mpi_mpi_backend.h

@@ -0,0 +1,80 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2017                                     Inria
+ * Copyright (C) 2010-2015,2017,2018,2019                 CNRS
+ * Copyright (C) 2009-2014,2017,2018-2019                 Université de Bordeaux
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __STARPU_MPI_MPI_BACKEND_H__
+#define __STARPU_MPI_MPI_BACKEND_H__
+
+#include <common/config.h>
+#include <common/uthash.h>
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#ifdef STARPU_USE_MPI_MPI
+
+extern int _starpu_mpi_tag;
+#define _STARPU_MPI_TAG_ENVELOPE  _starpu_mpi_tag
+#define _STARPU_MPI_TAG_DATA      _starpu_mpi_tag+1
+#define _STARPU_MPI_TAG_SYNC_DATA _starpu_mpi_tag+2
+
+enum _starpu_envelope_mode
+{
+	_STARPU_MPI_ENVELOPE_DATA=0,
+	_STARPU_MPI_ENVELOPE_SYNC_READY=1
+};
+
+struct _starpu_mpi_envelope
+{
+	enum _starpu_envelope_mode mode;
+	starpu_ssize_t size;
+	starpu_mpi_tag_t data_tag;
+	unsigned sync;
+};
+
+struct _starpu_mpi_req_backend
+{
+	MPI_Request data_request;
+
+	starpu_pthread_mutex_t req_mutex;
+	starpu_pthread_cond_t req_cond;
+	starpu_pthread_mutex_t posted_mutex;
+	starpu_pthread_cond_t posted_cond;
+	/* In the case of a Wait/Test request, we are going to post a request
+	 * to test the completion of another request */
+	struct _starpu_mpi_req *other_request;
+
+	MPI_Request size_req;
+
+	struct _starpu_mpi_envelope* envelope;
+
+	unsigned is_internal_req:1;
+	unsigned to_destroy:1;
+	struct _starpu_mpi_req *internal_req;
+	struct _starpu_mpi_early_data_handle *early_data_handle;
+     	UT_hash_handle hh;
+};
+
+#endif // STARPU_USE_MPI_MPI
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // __STARPU_MPI_MPI_BACKEND_H__

+ 32 - 32
mpi/src/nmad/starpu_mpi_nmad.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2017                                     Inria
- * Copyright (C) 2010-2015,2017,2018                      CNRS
+ * Copyright (C) 2010-2015,2017,2018,2019                 CNRS
  * Copyright (C) 2009-2014,2017,2018-2019                 Université de Bordeaux
  * Copyright (C) 2017                                     Guillaume Beauchamp
  *
@@ -38,7 +38,7 @@
 
 #include <nm_sendrecv_interface.h>
 #include <nm_mpi_nmad.h>
-
+#include "starpu_mpi_nmad_backend.h"
 
 static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,nm_sr_event_t event);
 #ifdef STARPU_VERBOSE
@@ -96,18 +96,18 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 
 	struct nm_data_s data;
 	nm_mpi_nmad_data_get(&data, (void*)req->ptr, req->datatype, req->count);
-	nm_sr_send_init(req->session, &(req->data_request));
-	nm_sr_send_pack_data(req->session, &(req->data_request), &data);
-	nm_sr_send_set_priority(req->session, &req->data_request, req->prio);
+	nm_sr_send_init(req->backend->session, &(req->backend->data_request));
+	nm_sr_send_pack_data(req->backend->session, &(req->backend->data_request), &data);
+	nm_sr_send_set_priority(req->backend->session, &req->backend->data_request, req->prio);
 
 	if (req->sync == 0)
 	{
-		req->ret = nm_sr_send_isend(req->session, &(req->data_request), req->gate, req->node_tag.data_tag);
+		req->ret = nm_sr_send_isend(req->backend->session, &(req->backend->data_request), req->backend->gate, req->node_tag.data_tag);
 		STARPU_ASSERT_MSG(req->ret == NM_ESUCCESS, "MPI_Isend returning %d", req->ret);
 	}
 	else
 	{
-		req->ret = nm_sr_send_issend(req->session, &(req->data_request), req->gate, req->node_tag.data_tag);
+		req->ret = nm_sr_send_issend(req->backend->session, &(req->backend->data_request), req->backend->gate, req->node_tag.data_tag);
 		STARPU_ASSERT_MSG(req->ret == NM_ESUCCESS, "MPI_Issend returning %d", req->ret);
 	}
 
@@ -124,7 +124,7 @@ void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 
 	if (req->registered_datatype == 1)
 	{
-		req->waited = 1;
+		req->backend->waited = 1;
 		req->count = 1;
 		req->ptr = starpu_data_handle_to_pointer(req->data_handle, STARPU_MAIN_RAM);
 	}
@@ -132,7 +132,7 @@ void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 	{
 		starpu_ssize_t psize = -1;
 		int ret;
-		req->waited =2;
+		req->backend->waited =2;
 
 		// Do not pack the data, just try to find out the size
 		starpu_data_pack(req->data_handle, NULL, &psize);
@@ -142,10 +142,10 @@ void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 			// We already know the size of the data, let's send it to overlap with the packing of the data
 			_STARPU_MPI_DEBUG(20, "Sending size %ld (%ld %s) to node %d (first call to pack)\n", psize, sizeof(req->count), "MPI_BYTE", req->node_tag.rank);
 			req->count = psize;
-			//ret = nm_sr_isend(nm_mpi_communicator_get_session(p_req->p_comm),nm_mpi_communicator_get_gate(p_comm,req->srcdst), req->mpi_tag,&req->count, sizeof(req->count), &req->size_req);
-			ret = nm_sr_isend(req->session,req->gate, req->node_tag.data_tag,&req->count, sizeof(req->count), &req->size_req);
+			//ret = nm_sr_isend(nm_mpi_communicator_get_session(p_req->p_comm),nm_mpi_communicator_get_gate(p_comm,req->srcdst), req->mpi_tag,&req->count, sizeof(req->count), &req->backend->size_req);
+			ret = nm_sr_isend(req->backend->session,req->backend->gate, req->node_tag.data_tag,&req->count, sizeof(req->count), &req->backend->size_req);
 
-			//	ret = MPI_Isend(&req->count, sizeof(req->count), MPI_BYTE, req->srcdst, req->mpi_tag, req->comm, &req->size_req);
+			//	ret = MPI_Isend(&req->count, sizeof(req->count), MPI_BYTE, req->srcdst, req->mpi_tag, req->comm, &req->backend->size_req);
 			STARPU_ASSERT_MSG(ret == NM_ESUCCESS, "when sending size, nm_sr_isend returning %d", ret);
 		}
 
@@ -155,7 +155,7 @@ void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 		{
 			// We know the size now, let's send it
 			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %ld to node %d (second call to pack)\n", req->count, sizeof(req->count), "MPI_BYTE", req->node_tag.data_tag, req->node_tag.rank);
-			ret = nm_sr_isend(req->session,req->gate, req->node_tag.data_tag,&req->count, sizeof(req->count), &req->size_req);
+			ret = nm_sr_isend(req->backend->session,req->backend->gate, req->node_tag.data_tag,&req->count, sizeof(req->count), &req->backend->size_req);
 			STARPU_ASSERT_MSG(ret == NM_ESUCCESS, "when sending size, nm_sr_isend returning %d", ret);
 		}
 		else
@@ -186,9 +186,9 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 	//req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
 	struct nm_data_s data;
 	nm_mpi_nmad_data_get(&data, (void*)req->ptr, req->datatype, req->count);
-	nm_sr_recv_init(req->session, &(req->data_request));
-	nm_sr_recv_unpack_data(req->session, &(req->data_request), &data);
-	nm_sr_recv_irecv(req->session, &(req->data_request), req->gate, req->node_tag.data_tag, NM_TAG_MASK_FULL);
+	nm_sr_recv_init(req->backend->session, &(req->backend->data_request));
+	nm_sr_recv_unpack_data(req->backend->session, &(req->backend->data_request), &data);
+	nm_sr_recv_irecv(req->backend->session, &(req->backend->data_request), req->backend->gate, req->node_tag.data_tag, NM_TAG_MASK_FULL);
 
 	_STARPU_MPI_TRACE_IRECV_SUBMIT_END(req->node_tag.rank, req->node_tag.data_tag);
 
@@ -259,9 +259,9 @@ int _starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
 
 	/* we must do a test_locked to avoid race condition :
 	 * without req_cond could still be used and couldn't be freed)*/
-	while (!req->completed || ! piom_cond_test_locked(&(req->req_cond),REQ_FINALIZED))
+	while (!req->completed || ! piom_cond_test_locked(&(req->backend->req_cond),REQ_FINALIZED))
 	{
-		piom_cond_wait(&(req->req_cond),REQ_FINALIZED);
+		piom_cond_wait(&(req->backend->req_cond),REQ_FINALIZED);
 	}
 
 	if (status!=MPI_STATUS_IGNORE)
@@ -292,7 +292,7 @@ int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 
 	/* we must do a test_locked to avoid race condition :
 	 * without req_cond could still be used and couldn't be freed)*/
-	*flag = req->completed && piom_cond_test_locked(&(req->req_cond),REQ_FINALIZED);
+	*flag = req->completed && piom_cond_test_locked(&(req->backend->req_cond),REQ_FINALIZED);
 	if (*flag && status!=MPI_STATUS_IGNORE)
 		_starpu_mpi_req_status(req,status);
 
@@ -358,17 +358,17 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,n
 	{
 		if (req->registered_datatype == 0)
 		{
-			if(req->waited == 1)
+			if(req->backend->waited == 1)
 			        nm_mpi_nmad_data_release(req->datatype);
 			if (req->request_type == SEND_REQ)
 			{
-				req->waited--;
+				req->backend->waited--;
 				// We need to make sure the communication for sending the size
 				// has completed, as MPI can re-order messages, let's count
 				// recerived message.
 				// FIXME concurent access.
 				STARPU_ASSERT_MSG(event == NM_SR_EVENT_FINALIZED, "Callback with event %d", event);
-				if(req->waited>0)
+				if(req->backend->waited>0)
 					return;
 
 			}
@@ -411,7 +411,7 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,n
 			/* tell anyone potentially waiting on the request that it is
 			 * terminated now (should be done after the callback)*/
 			req->completed = 1;
-			piom_cond_signal(&req->req_cond, REQ_FINALIZED);
+			piom_cond_signal(&req->backend->req_cond, REQ_FINALIZED);
 		}
 		int pending_remaining = STARPU_ATOMIC_ADD(&pending_request, -1);
 		if (!running && !pending_remaining)
@@ -427,16 +427,16 @@ void _starpu_mpi_handle_request_termination_callback(nm_sr_event_t event, const
 
 static void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req)
 {
-	if(req->request_type == SEND_REQ && req->waited>1)
+	if(req->request_type == SEND_REQ && req->backend->waited>1)
 	{
-		nm_sr_request_set_ref(&(req->size_req), req);
-		nm_sr_request_monitor(req->session, &(req->size_req), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
+		nm_sr_request_set_ref(&(req->backend->size_req), req);
+		nm_sr_request_monitor(req->backend->session, &(req->backend->size_req), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
 	}
 	/* the if must be before, because the first callback can directly free
-	* a detached request (the second callback free if req->waited>1). */
-	nm_sr_request_set_ref(&(req->data_request), req);
+	* a detached request (the second callback free if req->backend->waited>1). */
+	nm_sr_request_set_ref(&(req->backend->data_request), req);
 
-	nm_sr_request_monitor(req->session, &(req->data_request), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
+	nm_sr_request_monitor(req->backend->session, &(req->backend->data_request), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
 }
 
 void _starpu_mpi_coop_sends_build_tree(struct _starpu_mpi_coop_sends *coop_sends)
@@ -572,7 +572,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		else
 		{
 			c->req->completed=1;
-			piom_cond_signal(&(c->req->req_cond), REQ_FINALIZED);
+			piom_cond_signal(&(c->req->backend->req_cond), REQ_FINALIZED);
 		}
 		STARPU_ATOMIC_ADD( &pending_request, -1);
 		/* we signal that the request is completed.*/
@@ -685,12 +685,12 @@ int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv)
 			(strcmp(s_idle_hooks, "HOOK")   == 0) ? PIOM_POLL_POINT_HOOK :
 			0;
 	}
-	
+
 	if(polling_point_prog)
 	{
 		starpu_progression_hook_register((unsigned (*)(void *))&piom_ltask_schedule, (void *)&polling_point_prog);
 	}
-	
+
 	if(polling_point_idle)
 	{
 		starpu_idle_hook_register((unsigned (*)(void *))&piom_ltask_schedule, (void *)&polling_point_idle);

+ 87 - 0
mpi/src/nmad/starpu_mpi_nmad_backend.c

@@ -0,0 +1,87 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2017                                     Inria
+ * Copyright (C) 2010-2015,2017,2018,2019                 CNRS
+ * Copyright (C) 2009-2014,2017,2018-2019                 Université de Bordeaux
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <stdlib.h>
+#include "starpu_mpi_nmad_backend.h"
+#include <starpu_mpi_private.h>
+
+#ifdef STARPU_USE_MPI_NMAD
+
+void _starpu_mpi_nmad_backend_init(struct starpu_conf *conf)
+{
+	(void)conf;
+	/* strat_prio is preferred for StarPU instead of default strat_aggreg */
+	setenv("NMAD_STRATEGY", "prio", 0 /* do not overwrite user-supplied value, if set */);
+}
+
+void _starpu_mpi_nmad_backend_shutdown(void)
+{
+}
+
+int _starpu_mpi_nmad_backend_reserve_core(void)
+{
+	return 1;
+}
+
+void _starpu_mpi_nmad_backend_request_init(struct _starpu_mpi_req *req)
+{
+	_STARPU_MPI_CALLOC(req->backend, 1, sizeof(struct _starpu_mpi_req_backend));
+	piom_cond_init(&req->backend->req_cond, 0);
+}
+
+void _starpu_mpi_nmad_backend_request_fill(struct _starpu_mpi_req *req, MPI_Comm comm, int is_internal_req)
+{
+	nm_mpi_nmad_dest(&req->backend->session, &req->backend->gate, comm, req->node_tag.rank);
+}
+
+void _starpu_mpi_nmad_backend_request_destroy(struct _starpu_mpi_req *req)
+{
+	piom_cond_destroy(&(req->backend->req_cond));
+	free(req->backend);
+}
+
+void _starpu_mpi_nmad_backend_data_clear(starpu_data_handle_t data_handle)
+{
+	(void)data_handle;
+}
+
+void _starpu_mpi_nmad_backend_data_register(starpu_data_handle_t data_handle, starpu_mpi_tag_t data_tag)
+{
+	(void)data_handle;
+	(void)data_tag;
+}
+
+void _starpu_mpi_nmad_backend_comm_register(MPI_Comm comm)
+{
+	(void)comm;
+}
+
+struct _starpu_mpi_backend _mpi_backend =
+{
+ 	._starpu_mpi_backend_init = _starpu_mpi_nmad_backend_init,
+ 	._starpu_mpi_backend_shutdown = _starpu_mpi_nmad_backend_shutdown,
+	._starpu_mpi_backend_reserve_core = _starpu_mpi_nmad_backend_reserve_core,
+	._starpu_mpi_backend_request_init = _starpu_mpi_nmad_backend_request_init,
+	._starpu_mpi_backend_request_fill = _starpu_mpi_nmad_backend_request_fill,
+	._starpu_mpi_backend_request_destroy = _starpu_mpi_nmad_backend_request_destroy,
+	._starpu_mpi_backend_data_clear = _starpu_mpi_nmad_backend_data_clear,
+	._starpu_mpi_backend_data_register = _starpu_mpi_nmad_backend_data_register,
+	._starpu_mpi_backend_comm_register = _starpu_mpi_nmad_backend_comm_register
+};
+
+#endif /* STARPU_USE_MPI_NMAD*/

+ 51 - 0
mpi/src/nmad/starpu_mpi_nmad_backend.h

@@ -0,0 +1,51 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2017                                     Inria
+ * Copyright (C) 2010-2015,2017,2018,2019                 CNRS
+ * Copyright (C) 2009-2014,2017,2018-2019                 Université de Bordeaux
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __STARPU_MPI_NMAD_BACKEND_H__
+#define __STARPU_MPI_NMAD_BACKEND_H__
+
+#include <common/config.h>
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#ifdef STARPU_USE_MPI_NMAD
+
+#include <nm_sendrecv_interface.h>
+#include <nm_session_interface.h>
+#include <nm_mpi_nmad.h>
+
+struct _starpu_mpi_req_backend
+{
+	nm_gate_t gate;
+	nm_session_t session;
+	nm_sr_request_t data_request;
+	int waited;
+	piom_cond_t req_cond;
+	nm_sr_request_t size_req;
+};
+
+#endif // STARPU_USE_MPI_NMAD
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // __STARPU_MPI_NMAD_BACKEND_H__

+ 2 - 14
mpi/src/starpu_mpi.c

@@ -36,11 +36,6 @@
 #include <core/topology.h>
 #include <core/workers.h>
 
-#if defined(STARPU_USE_MPI_MPI)
-#include <mpi/starpu_mpi_comm.h>
-#include <mpi/starpu_mpi_tag.h>
-#endif
-
 static void _starpu_mpi_isend_irecv_common(struct _starpu_mpi_req *req, enum starpu_data_access_mode mode, int sequential_consistency)
 {
 	/* Asynchronously request StarPU to fetch the data in main memory: when
@@ -248,9 +243,7 @@ int starpu_mpi_barrier(MPI_Comm comm)
 
 void _starpu_mpi_data_clear(starpu_data_handle_t data_handle)
 {
-#if defined(STARPU_USE_MPI_MPI)
-	_starpu_mpi_tag_data_release(data_handle);
-#endif
+	_mpi_backend._starpu_mpi_backend_data_clear(data_handle);
 	_starpu_mpi_cache_data_clear(data_handle);
 	free(data_handle->mpi_data);
 	data_handle->mpi_data = NULL;
@@ -284,9 +277,7 @@ void starpu_mpi_data_register_comm(starpu_data_handle_t data_handle, starpu_mpi_
 
 	if (data_tag != -1)
 	{
-#if defined(STARPU_USE_MPI_MPI)
-		_starpu_mpi_tag_data_register(data_handle, data_tag);
-#endif
+		_mpi_backend._starpu_mpi_backend_data_register(data_handle, data_tag);
 		mpi_data->node_tag.data_tag = data_tag;
 	}
 	if (rank != -1)
@@ -294,9 +285,6 @@ void starpu_mpi_data_register_comm(starpu_data_handle_t data_handle, starpu_mpi_
 		_STARPU_MPI_TRACE_DATA_SET_RANK(data_handle, rank);
 		mpi_data->node_tag.rank = rank;
 		mpi_data->node_tag.comm = comm;
-#if defined(STARPU_USE_MPI_MPI)
-		_starpu_mpi_comm_register(comm);
-#endif
 	}
 }
 

+ 4 - 20
mpi/src/starpu_mpi_init.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2016,2017                                Inria
- * Copyright (C) 2010-2018                                CNRS
+ * Copyright (C) 2010-2019                                CNRS
  * Copyright (C) 2009-2018                                Université de Bordeaux
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -32,12 +32,6 @@
 #include <core/simgrid.h>
 #include <core/task.h>
 
-#if defined(STARPU_USE_MPI_MPI)
-#include <mpi/starpu_mpi_comm.h>
-#include <mpi/starpu_mpi_tag.h>
-#include <mpi/starpu_mpi_driver.h>
-#endif
-
 #ifdef STARPU_SIMGRID
 static int _mpi_world_size;
 static int _mpi_world_rank;
@@ -75,10 +69,6 @@ void _starpu_mpi_do_initialize(struct _starpu_mpi_argc_argv *argc_argv)
 	{
 		STARPU_ASSERT_MSG(argc_argv->comm == MPI_COMM_WORLD, "It does not make sense to ask StarPU-MPI to initialize MPI while a non-world communicator was given");
 		int thread_support;
-#ifdef STARPU_USE_MPI_NMAD
-		/* strat_prio is preferred for StarPU instead of default strat_aggreg */
-		setenv("NMAD_STRATEGY", "prio", 0 /* do not overwrite user-supplied value, if set */);
-#endif /* STARPU_USE_MPI_NMAD */
 		_STARPU_DEBUG("Calling MPI_Init_thread\n");
 		if (MPI_Init_thread(argc_argv->argc, argc_argv->argv, MPI_THREAD_SERIALIZED, &thread_support) != MPI_SUCCESS)
 		{
@@ -189,11 +179,9 @@ int starpu_mpi_init_conf(int *argc, char ***argv, int initialize_mpi, MPI_Comm c
 		conf = &localconf;
 	}
 
-#if defined(STARPU_USE_MPI_MPI)
-	_starpu_mpi_driver_init(conf);
+	_mpi_backend._starpu_mpi_backend_init(conf);
 
-	if (starpu_get_env_number_default("STARPU_MPI_DRIVER_CALL_FREQUENCY", 0) <= 0)
-#endif
+	if (_mpi_backend._starpu_mpi_backend_reserve_core())
 	{
 		/* Reserve a core for our progression thread */
 		if (conf->reserve_ncpus == -1)
@@ -227,11 +215,7 @@ int starpu_mpi_shutdown(void)
 	_starpu_mpi_comm_amounts_display(stderr, rank);
 	_starpu_mpi_comm_amounts_shutdown();
 	_starpu_mpi_cache_shutdown(world_size);
-#if defined(STARPU_USE_MPI_MPI)
-	_starpu_mpi_tag_shutdown();
-	_starpu_mpi_comm_shutdown();
-	_starpu_mpi_driver_shutdown();
-#endif
+
 	if (_mpi_initialized_starpu)
 		starpu_shutdown();
 

+ 20 - 63
mpi/src/starpu_mpi_private.h

@@ -28,11 +28,6 @@
 #include <common/prio_list.h>
 #include <common/starpu_spinlock.h>
 #include <core/simgrid.h>
-#if defined(STARPU_USE_MPI_NMAD)
-#include <pioman.h>
-#include <nm_sendrecv_interface.h>
-#include <nm_session_interface.h>
-#endif
 
 #ifdef __cplusplus
 extern "C"
@@ -163,27 +158,6 @@ int _starpu_debug_rank;
 #  define _STARPU_MPI_LOG_OUT()
 #endif
 
-#if defined(STARPU_USE_MPI_MPI)
-extern int _starpu_mpi_tag;
-#define _STARPU_MPI_TAG_ENVELOPE  _starpu_mpi_tag
-#define _STARPU_MPI_TAG_DATA      _starpu_mpi_tag+1
-#define _STARPU_MPI_TAG_SYNC_DATA _starpu_mpi_tag+2
-
-enum _starpu_envelope_mode
-{
-	_STARPU_MPI_ENVELOPE_DATA=0,
-	_STARPU_MPI_ENVELOPE_SYNC_READY=1
-};
-
-struct _starpu_mpi_envelope
-{
-	enum _starpu_envelope_mode mode;
-	starpu_ssize_t size;
-	starpu_mpi_tag_t data_tag;
-	unsigned sync;
-};
-#endif /* STARPU_USE_MPI_MPI */
-
 enum _starpu_mpi_request_type
 {
 	SEND_REQ=0,
@@ -232,6 +206,7 @@ struct _starpu_mpi_data
 
 struct _starpu_mpi_data *_starpu_mpi_data_get(starpu_data_handle_t data_handle);
 
+struct _starpu_mpi_req_backend;
 struct _starpu_mpi_req;
 LIST_TYPE(_starpu_mpi_req,
 	/* description of the data at StarPU level */
@@ -246,22 +221,13 @@ LIST_TYPE(_starpu_mpi_req,
 	starpu_ssize_t count;
 	int registered_datatype;
 
+	struct _starpu_mpi_req_backend *backend;
+
 	/* who are we talking to ? */
 	struct _starpu_mpi_node_tag node_tag;
-#if defined(STARPU_USE_MPI_NMAD)
-	nm_gate_t gate;
-	nm_session_t session;
-#endif
-
 	void (*func)(struct _starpu_mpi_req *);
 
 	MPI_Status *status;
-#if defined(STARPU_USE_MPI_NMAD)
-	nm_sr_request_t data_request;
-	int waited;
-#elif defined(STARPU_USE_MPI_MPI)
-	MPI_Request data_request;
-#endif
 	struct _starpu_mpi_req_multilist_coop_sends coop_sends;
 	struct _starpu_mpi_coop_sends *coop_sends_head;
 
@@ -269,17 +235,6 @@ LIST_TYPE(_starpu_mpi_req,
 	unsigned sync;
 
 	int ret;
-#if defined(STARPU_USE_MPI_NMAD)
-	piom_cond_t req_cond;
-#elif defined(STARPU_USE_MPI_MPI)
-	starpu_pthread_mutex_t req_mutex;
-	starpu_pthread_cond_t req_cond;
-	starpu_pthread_mutex_t posted_mutex;
-	starpu_pthread_cond_t posted_cond;
-	/* In the case of a Wait/Test request, we are going to post a request
-	 * to test the completion of another request */
-	struct _starpu_mpi_req *other_request;
-#endif
 
 	enum _starpu_mpi_request_type request_type; /* 0 send, 1 recv */
 
@@ -293,21 +248,6 @@ LIST_TYPE(_starpu_mpi_req,
 	void (*callback)(void *);
 
         /* in the case of user-defined datatypes, we need to send the size of the data */
-#if defined(STARPU_USE_MPI_NMAD)
-	nm_sr_request_t size_req;
-#elif defined(STARPU_USE_MPI_MPI)
-	MPI_Request size_req;
-#endif
-
-#if defined(STARPU_USE_MPI_MPI)
-	struct _starpu_mpi_envelope* envelope;
-
-	unsigned is_internal_req:1;
-	unsigned to_destroy:1;
-	struct _starpu_mpi_req *internal_req;
-	struct _starpu_mpi_early_data_handle *early_data_handle;
-     	UT_hash_handle hh;
-#endif
 
 	int sequential_consistency;
 
@@ -382,6 +322,23 @@ void _starpu_mpi_wait_for_initialization();
 #endif
 void _starpu_mpi_data_flush(starpu_data_handle_t data_handle);
 
+/*
+ * Specific functions to backend implementation
+ */
+struct _starpu_mpi_backend
+{
+	void (*_starpu_mpi_backend_init)(struct starpu_conf *conf);
+	void (*_starpu_mpi_backend_shutdown)(void);
+	int (*_starpu_mpi_backend_reserve_core)(void);
+	void (*_starpu_mpi_backend_request_init)(struct _starpu_mpi_req *req);
+	void (*_starpu_mpi_backend_request_fill)(struct _starpu_mpi_req *req, MPI_Comm comm, int is_internal_req);
+	void (*_starpu_mpi_backend_request_destroy)(struct _starpu_mpi_req *req);
+	void (*_starpu_mpi_backend_data_clear)(starpu_data_handle_t data_handle);
+	void (*_starpu_mpi_backend_data_register)(starpu_data_handle_t data_handle, starpu_mpi_tag_t data_tag);
+	void (*_starpu_mpi_backend_comm_register)(MPI_Comm comm);
+};
+
+extern struct _starpu_mpi_backend _mpi_backend;
 #ifdef __cplusplus
 }
 #endif

+ 5 - 50
mpi/src/starpu_mpi_req.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2018                                CNRS
+ * Copyright (C) 2010-2019                                CNRS
  * Copyright (C) 2009-2019                                Université de Bordeaux
  * Copyright (C) 2012,2013,2016,2017                      Inria
  * Copyright (C) 2017                                     Guillaume Beauchamp
@@ -19,13 +19,6 @@
 
 #include <starpu.h>
 #include <starpu_mpi_private.h>
-#if defined(STARPU_USE_MPI_MPI)
-#include <mpi/starpu_mpi_comm.h>
-#endif
-#if defined(STARPU_USE_MPI_NMAD)
-#include <pioman.h>
-#include <nm_mpi_nmad.h>
-#endif
 
 void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 {
@@ -48,21 +41,10 @@ void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 	(*req)->func = NULL;
 
 	(*req)->status = NULL;
-#ifdef STARPU_USE_MPI_MPI
-	(*req)->data_request = 0;
-#endif
 	(*req)->flag = NULL;
 	_starpu_mpi_req_multilist_init_coop_sends(*req);
 
 	(*req)->ret = -1;
-#ifdef STARPU_USE_MPI_NMAD
-	piom_cond_init(&((*req)->req_cond), 0);
-#elif defined(STARPU_USE_MPI_MPI)
-	STARPU_PTHREAD_MUTEX_INIT(&((*req)->req_mutex), NULL);
-	STARPU_PTHREAD_COND_INIT(&((*req)->req_cond), NULL);
-	STARPU_PTHREAD_MUTEX_INIT(&((*req)->posted_mutex), NULL);
-	STARPU_PTHREAD_COND_INIT(&((*req)->posted_cond), NULL);
-#endif
 
 	(*req)->request_type = UNKNOWN_REQ;
 
@@ -70,23 +52,11 @@ void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 	(*req)->completed = 0;
 	(*req)->posted = 0;
 
-#ifdef STARPU_USE_MPI_MPI
-	(*req)->other_request = NULL;
-#endif
-
 	(*req)->sync = 0;
 	(*req)->detached = -1;
 	(*req)->callback = NULL;
 	(*req)->callback_arg = NULL;
 
-#ifdef STARPU_USE_MPI_MPI
-	(*req)->size_req = 0;
-	(*req)->internal_req = NULL;
-	(*req)->is_internal_req = 0;
-	(*req)->to_destroy = 1;
-	(*req)->early_data_handle = NULL;
-	(*req)->envelope = NULL;
-#endif
 	(*req)->sequential_consistency = 1;
 	(*req)->pre_sync_jobid = -1;
 	(*req)->post_sync_jobid = -1;
@@ -96,6 +66,7 @@ void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 	starpu_pthread_queue_register(&_starpu_mpi_thread_wait, &((*req)->queue));
 	(*req)->done = 0;
 #endif
+	_mpi_backend._starpu_mpi_backend_request_init(*req);
 }
 
 struct _starpu_mpi_req *_starpu_mpi_request_fill(starpu_data_handle_t data_handle,
@@ -108,10 +79,6 @@ struct _starpu_mpi_req *_starpu_mpi_request_fill(starpu_data_handle_t data_handl
 {
 	struct _starpu_mpi_req *req;
 
-#ifdef STARPU_USE_MPI_MPI
-	_starpu_mpi_comm_register(comm);
-#endif
-
 	/* Initialize the request structure */
 	_starpu_mpi_request_init(&req);
 	req->request_type = request_type;
@@ -128,30 +95,18 @@ struct _starpu_mpi_req *_starpu_mpi_request_fill(starpu_data_handle_t data_handl
 	req->callback_arg = arg;
 	req->func = func;
 	req->sequential_consistency = sequential_consistency;
-#ifdef STARPU_USE_MPI_NMAD
-	nm_mpi_nmad_dest(&req->session, &req->gate, comm, req->node_tag.rank);
-#elif defined(STARPU_USE_MPI_MPI)
-	req->is_internal_req = is_internal_req;
-	/* For internal requests, we wait for both the request completion and the matching application request completion */
-	req->to_destroy = !is_internal_req;
 	req->count = count;
-#endif
+
+	_mpi_backend._starpu_mpi_backend_request_fill(req, comm, is_internal_req);
 
 	return req;
 }
 
 void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req)
 {
-#ifdef STARPU_USE_MPI_NMAD
-	piom_cond_destroy(&(req->req_cond));
-#elif defined(STARPU_USE_MPI_MPI)
-	STARPU_PTHREAD_MUTEX_DESTROY(&req->req_mutex);
-	STARPU_PTHREAD_COND_DESTROY(&req->req_cond);
-	STARPU_PTHREAD_MUTEX_DESTROY(&req->posted_mutex);
-	STARPU_PTHREAD_COND_DESTROY(&req->posted_cond);
+	_mpi_backend._starpu_mpi_backend_request_destroy(req);
 	free(req->datatype_name);
 	req->datatype_name = NULL;
-#endif
 #ifdef STARPU_SIMGRID
 	starpu_pthread_queue_unregister(&_starpu_mpi_thread_wait, &req->queue);
 	starpu_pthread_queue_destroy(&req->queue);