Browse Source

replaced htable32 by lists, thus fixing duplicated communications

Benjamin Lorendeau 13 years ago
parent
commit
79c0bcda70

+ 119 - 118
mpi/examples/cholesky/mpi_cholesky.c

@@ -63,7 +63,8 @@ static struct starpu_codelet cl22 =
 /* Returns the MPI node number where data indexes index is */
 int my_distrib(int x, int y, int nb_nodes)
 {
-        return (x+y) % nb_nodes;
+	//return (x+y) % nb_nodes;
+	return (x%dblockx)+(y%dblocky)*dblockx;
 }
 
 /*
@@ -74,93 +75,93 @@ static void dw_cholesky(float ***matA, unsigned size, unsigned ld, unsigned nblo
 {
 	struct timeval start;
 	struct timeval end;
-        starpu_data_handle_t **data_handles;
-        int x, y;
+	starpu_data_handle_t **data_handles;
+	int x, y;
 
 	/* create all the DAG nodes */
 	unsigned i,j,k;
 
-        data_handles = malloc(nblocks*sizeof(starpu_data_handle_t *));
-        for(x=0 ; x<nblocks ; x++) data_handles[x] = malloc(nblocks*sizeof(starpu_data_handle_t));
+	data_handles = malloc(nblocks*sizeof(starpu_data_handle_t *));
+	for(x=0 ; x<nblocks ; x++) data_handles[x] = malloc(nblocks*sizeof(starpu_data_handle_t));
 
-        for(x = 0; x < nblocks ;  x++)
+	for(x = 0; x < nblocks ;  x++)
 	{
-                for (y = 0; y < nblocks; y++)
+		for (y = 0; y < nblocks; y++)
 		{
-                        int mpi_rank = my_distrib(x, y, nodes);
-                        if (mpi_rank == rank)
+			int mpi_rank = my_distrib(x, y, nodes);
+			if (mpi_rank == rank)
 			{
-                                //fprintf(stderr, "[%d] Owning data[%d][%d]\n", rank, x, y);
-                                starpu_matrix_data_register(&data_handles[x][y], 0, (uintptr_t)matA[x][y],
-                                                            ld, size/nblocks, size/nblocks, sizeof(float));
-                        }
+				//fprintf(stderr, "[%d] Owning data[%d][%d]\n", rank, x, y);
+				starpu_matrix_data_register(&data_handles[x][y], 0, (uintptr_t)matA[x][y],
+						ld, size/nblocks, size/nblocks, sizeof(float));
+			}
 			/* TODO: make better test to only registering what is needed */
-                        else
+			else
 			{
-                                /* I don't own that index, but will need it for my computations */
-                                //fprintf(stderr, "[%d] Neighbour of data[%d][%d]\n", rank, x, y);
-                                starpu_matrix_data_register(&data_handles[x][y], -1, (uintptr_t)NULL,
-                                                            ld, size/nblocks, size/nblocks, sizeof(float));
-                        }
-                        if (data_handles[x][y])
+				/* I don't own that index, but will need it for my computations */
+				//fprintf(stderr, "[%d] Neighbour of data[%d][%d]\n", rank, x, y);
+				starpu_matrix_data_register(&data_handles[x][y], -1, (uintptr_t)NULL,
+						ld, size/nblocks, size/nblocks, sizeof(float));
+			}
+			if (data_handles[x][y])
 			{
-                                starpu_data_set_rank(data_handles[x][y], mpi_rank);
-                                starpu_data_set_tag(data_handles[x][y], (y*nblocks)+x);
+				starpu_data_set_rank(data_handles[x][y], mpi_rank);
+				starpu_data_set_tag(data_handles[x][y], (y*nblocks)+x);
 			}
-                }
-        }
+		}
+	}
 
 	starpu_mpi_barrier(MPI_COMM_WORLD);
 	gettimeofday(&start, NULL);
 
 	for (k = 0; k < nblocks; k++)
-        {
-                int prio = STARPU_DEFAULT_PRIO;
-                if (!noprio) prio = STARPU_MAX_PRIO;
+	{
+		int prio = STARPU_DEFAULT_PRIO;
+		if (!noprio) prio = STARPU_MAX_PRIO;
 
-                starpu_mpi_insert_task(MPI_COMM_WORLD, &cl11,
-                                       STARPU_PRIORITY, prio,
-                                       STARPU_RW, data_handles[k][k],
-                                       0);
+		starpu_mpi_insert_task(MPI_COMM_WORLD, &cl11,
+				STARPU_PRIORITY, prio,
+				STARPU_RW, data_handles[k][k],
+				0);
 
 		for (j = k+1; j<nblocks; j++)
 		{
-                        prio = STARPU_DEFAULT_PRIO;
-                        if (!noprio&& (j == k+1)) prio = STARPU_MAX_PRIO;
-                        starpu_mpi_insert_task(MPI_COMM_WORLD, &cl21,
-                                               STARPU_PRIORITY, prio,
-                                               STARPU_R, data_handles[k][k],
-                                               STARPU_RW, data_handles[k][j],
-                                               0);
+			prio = STARPU_DEFAULT_PRIO;
+			if (!noprio&& (j == k+1)) prio = STARPU_MAX_PRIO;
+			starpu_mpi_insert_task(MPI_COMM_WORLD, &cl21,
+					STARPU_PRIORITY, prio,
+					STARPU_R, data_handles[k][k],
+					STARPU_RW, data_handles[k][j],
+					0);
 
 			for (i = k+1; i<nblocks; i++)
 			{
 				if (i <= j)
-                                {
-                                        prio = STARPU_DEFAULT_PRIO;
-                                        if (!noprio && (i == k + 1) && (j == k +1) ) prio = STARPU_MAX_PRIO;
-                                        starpu_mpi_insert_task(MPI_COMM_WORLD, &cl22,
-                                                               STARPU_PRIORITY, prio,
-                                                               STARPU_R, data_handles[k][i],
-                                                               STARPU_R, data_handles[k][j],
-                                                               STARPU_RW, data_handles[i][j],
-                                                               0);
-                                }
+				{
+					prio = STARPU_DEFAULT_PRIO;
+					if (!noprio && (i == k + 1) && (j == k +1) ) prio = STARPU_MAX_PRIO;
+					starpu_mpi_insert_task(MPI_COMM_WORLD, &cl22,
+							STARPU_PRIORITY, prio,
+							STARPU_R, data_handles[k][i],
+							STARPU_R, data_handles[k][j],
+							STARPU_RW, data_handles[i][j],
+							0);
+				}
 			}
 		}
-        }
+	}
 
-        starpu_task_wait_for_all();
+	starpu_task_wait_for_all();
 
-        for(x = 0; x < nblocks ;  x++)
+	for(x = 0; x < nblocks ;  x++)
 	{
-                for (y = 0; y < nblocks; y++)
+		for (y = 0; y < nblocks; y++)
 		{
-                        if (data_handles[x][y])
-                                starpu_data_unregister(data_handles[x][y]);
-                }
+			if (data_handles[x][y])
+				starpu_data_unregister(data_handles[x][y]);
+		}
 		free(data_handles[x]);
-        }
+	}
 	free(data_handles);
 
 	starpu_mpi_barrier(MPI_COMM_WORLD);
@@ -185,7 +186,7 @@ int main(int argc, char **argv)
 	 * */
 
 	float ***bmat;
-        int rank, nodes;
+	int rank, nodes;
 
 	parse_args(argc, argv);
 
@@ -202,18 +203,18 @@ int main(int argc, char **argv)
 	starpu_helper_cublas_init();
 
 	unsigned i,j,x,y;
-        bmat = malloc(nblocks * sizeof(float *));
-        for(x=0 ; x<nblocks ; x++)
+	bmat = malloc(nblocks * sizeof(float *));
+	for(x=0 ; x<nblocks ; x++)
 	{
-                bmat[x] = malloc(nblocks * sizeof(float *));
-                for(y=0 ; y<nblocks ; y++)
+		bmat[x] = malloc(nblocks * sizeof(float *));
+		for(y=0 ; y<nblocks ; y++)
 		{
-                        starpu_malloc((void **)&bmat[x][y], BLOCKSIZE*BLOCKSIZE*sizeof(float));
+			starpu_malloc((void **)&bmat[x][y], BLOCKSIZE*BLOCKSIZE*sizeof(float));
 			for (i = 0; i < BLOCKSIZE; i++)
 			{
 				for (j = 0; j < BLOCKSIZE; j++)
 				{
-                                        bmat[x][y][j +i*BLOCKSIZE] = (1.0f/(1.0f+(i+(x*BLOCKSIZE)+j+(y*BLOCKSIZE)))) + ((i+(x*BLOCKSIZE) == j+(y*BLOCKSIZE))?1.0f*size:0.0f);
+					bmat[x][y][j +i*BLOCKSIZE] = (1.0f/(1.0f+(i+(x*BLOCKSIZE)+j+(y*BLOCKSIZE)))) + ((i+(x*BLOCKSIZE) == j+(y*BLOCKSIZE))?1.0f*size:0.0f);
 					//mat[j +i*size] = ((i == j)?1.0f*size:0.0f);
 				}
 			}
@@ -221,15 +222,15 @@ int main(int argc, char **argv)
 	}
 
 
-        if (display)
+	if (display)
 	{
-                printf("[%d] Input :\n", rank);
+		printf("[%d] Input :\n", rank);
 
 		for(y=0 ; y<nblocks ; y++)
 		{
 			for(x=0 ; x<nblocks ; x++)
 			{
-                                printf("Block %d,%d :\n", x, y);
+				printf("Block %d,%d :\n", x, y);
 				for (j = 0; j < BLOCKSIZE; j++)
 				{
 					for (i = 0; i < BLOCKSIZE; i++)
@@ -253,14 +254,14 @@ int main(int argc, char **argv)
 
 	starpu_mpi_shutdown();
 
-        if (display)
+	if (display)
 	{
-                printf("[%d] Results :\n", rank);
+		printf("[%d] Results :\n", rank);
 		for(y=0 ; y<nblocks ; y++)
 		{
 			for(x=0 ; x<nblocks ; x++)
 			{
-                                printf("Block %d,%d :\n", x, y);
+				printf("Block %d,%d :\n", x, y);
 				for (j = 0; j < BLOCKSIZE; j++)
 				{
 					for (i = 0; i < BLOCKSIZE; i++)
@@ -281,19 +282,19 @@ int main(int argc, char **argv)
 	}
 
 	float *rmat = malloc(size*size*sizeof(float));
-        for(x=0 ; x<nblocks ; x++)
+	for(x=0 ; x<nblocks ; x++)
 	{
-                for(y=0 ; y<nblocks ; y++)
+		for(y=0 ; y<nblocks ; y++)
 		{
-                        for (i = 0; i < BLOCKSIZE; i++)
+			for (i = 0; i < BLOCKSIZE; i++)
 			{
-                                for (j = 0; j < BLOCKSIZE; j++)
+				for (j = 0; j < BLOCKSIZE; j++)
 				{
-                                        rmat[j+(y*BLOCKSIZE)+(i+(x*BLOCKSIZE))*size] = bmat[x][y][j +i*BLOCKSIZE];
-                                }
-                        }
-                }
-        }
+					rmat[j+(y*BLOCKSIZE)+(i+(x*BLOCKSIZE))*size] = bmat[x][y][j +i*BLOCKSIZE];
+				}
+			}
+		}
+	}
 
 	fprintf(stderr, "[%d] compute explicit LLt ...\n", rank);
 	for (j = 0; j < size; j++)
@@ -310,62 +311,62 @@ int main(int argc, char **argv)
 	STARPU_ASSERT(test_mat);
 
 	SSYRK("L", "N", size, size, 1.0f,
-				rmat, size, 0.0f, test_mat, size);
+			rmat, size, 0.0f, test_mat, size);
 
 	fprintf(stderr, "[%d] comparing results ...\n", rank);
-        if (display)
+	if (display)
 	{
-                for (j = 0; j < size; j++)
+		for (j = 0; j < size; j++)
 		{
-                        for (i = 0; i < size; i++)
+			for (i = 0; i < size; i++)
 			{
-                                if (i <= j)
+				if (i <= j)
 				{
-                                        printf("%2.2f\t", test_mat[j +i*size]);
-                                }
-                                else
+					printf("%2.2f\t", test_mat[j +i*size]);
+				}
+				else
 				{
-                                        printf(".\t");
-                                }
-                        }
-                        printf("\n");
-                }
-        }
+					printf(".\t");
+				}
+			}
+			printf("\n");
+		}
+	}
 
 	int correctness = 1;
-        for(x = 0; x < nblocks ;  x++)
+	for(x = 0; x < nblocks ;  x++)
 	{
-                for (y = 0; y < nblocks; y++)
+		for (y = 0; y < nblocks; y++)
 		{
-                        int mpi_rank = my_distrib(x, y, nodes);
-                        if (mpi_rank == rank)
+			int mpi_rank = my_distrib(x, y, nodes);
+			if (mpi_rank == rank)
 			{
-                                for (i = (size/nblocks)*x ; i < (size/nblocks)*x+(size/nblocks); i++)
-                                {
-                                        for (j = (size/nblocks)*y ; j < (size/nblocks)*y+(size/nblocks); j++)
-                                        {
-                                                if (i <= j)
-                                                {
-                                                        float orig = (1.0f/(1.0f+i+j)) + ((i == j)?1.0f*size:0.0f);
-                                                        float err = abs(test_mat[j +i*size] - orig);
-                                                        if (err > 0.00001)
+				for (i = (size/nblocks)*x ; i < (size/nblocks)*x+(size/nblocks); i++)
+				{
+					for (j = (size/nblocks)*y ; j < (size/nblocks)*y+(size/nblocks); j++)
+					{
+						if (i <= j)
+						{
+							float orig = (1.0f/(1.0f+i+j)) + ((i == j)?1.0f*size:0.0f);
+							float err = abs(test_mat[j +i*size] - orig);
+							if (err > 0.00001)
 							{
-                                                                fprintf(stderr, "[%d] Error[%d, %d] --> %2.2f != %2.2f (err %2.2f)\n", rank, i, j, test_mat[j +i*size], orig, err);
+								fprintf(stderr, "[%d] Error[%d, %d] --> %2.2f != %2.2f (err %2.2f)\n", rank, i, j, test_mat[j +i*size], orig, err);
 								correctness = 0;
 								break;
-                                                        }
-                                                }
-                                        }
-                                }
-                        }
-                }
-        }
-
-        for(x=0 ; x<nblocks ; x++)
+							}
+						}
+					}
+				}
+			}
+		}
+	}
+
+	for(x=0 ; x<nblocks ; x++)
 	{
-                for(y=0 ; y<nblocks ; y++)
+		for(y=0 ; y<nblocks ; y++)
 		{
-                        starpu_free((void *)bmat[x][y]);
+			starpu_free((void *)bmat[x][y]);
 		}
 		free(bmat[x]);
 	}
@@ -373,7 +374,7 @@ int main(int argc, char **argv)
 	free(rmat);
 	free(test_mat);
 
-        starpu_helper_cublas_shutdown();
+	starpu_helper_cublas_shutdown();
 	starpu_shutdown();
 
 	assert(correctness);

+ 15 - 1
mpi/examples/cholesky/mpi_cholesky.h

@@ -34,9 +34,11 @@
 
 static unsigned size = 4*1024;
 static unsigned nblocks = 16;
-static unsigned nbigblocks = 8;
+static unsigned nbigblocks = 2;
 static unsigned noprio = 0;
 static unsigned display = 0;
+static unsigned dblockx = 2;
+static unsigned dblocky = 2;
 
 void chol_cpu_codelet_update_u11(void **, void *);
 void chol_cpu_codelet_update_u21(void **, void *);
@@ -59,6 +61,18 @@ static void __attribute__((unused)) parse_args(int argc, char **argv)
 			size = strtol(argv[++i], &argptr, 10);
 		}
 
+		if (strcmp(argv[i], "-dblockx") == 0)
+		{
+		        char *argptr;
+			dblockx = strtol(argv[++i], &argptr, 10);
+		}
+		
+		if (strcmp(argv[i], "-dblocky") == 0)
+		{
+		        char *argptr;
+			dblocky = strtol(argv[++i], &argptr, 10);
+		}
+	
 		if (strcmp(argv[i], "-nblocks") == 0)
 		{
 		        char *argptr;

+ 252 - 223
mpi/starpu_mpi_insert_task.c

@@ -21,8 +21,6 @@
 #include <starpu.h>
 #include <starpu_data.h>
 #include <common/utils.h>
-#include <starpu_hash.h>
-#include <common/htable32.h>
 #include <util/starpu_insert_task_utils.h>
 #include <datawizard/coherency.h>
 
@@ -32,21 +30,21 @@
 /* Whether we are allowed to keep copies of remote data. Does not work
  * yet: the sender has to know whether the receiver has it, keeping it
  * in an array indexed by node numbers. */
-//#define MPI_CACHE
+#define MPI_CACHE 
 #include <starpu_mpi_insert_task_cache.h>
 
 static void _starpu_mpi_tables_init()
 {
-        if (sent_data == NULL) {
-                int nb_nodes;
+	if (sent_data == NULL) {
+		int nb_nodes;
 		int i;
 
-                MPI_Comm_size(MPI_COMM_WORLD, &nb_nodes);
-		_STARPU_MPI_DEBUG("Initialising hash table for cache\n");
-		sent_data = malloc(nb_nodes * sizeof(struct starpu_htbl32_node *));
-		for(i=0 ; i<nb_nodes ; i++) sent_data[i] = NULL;
-		received_data = malloc(nb_nodes * sizeof(struct starpu_htbl32_node *));
-		for(i=0 ; i<nb_nodes ; i++) received_data[i] = NULL;
+		MPI_Comm_size(MPI_COMM_WORLD, &nb_nodes);
+		_STARPU_MPI_DEBUG("Initialising lists for cache\n");
+		sent_data = malloc(nb_nodes * sizeof(struct starpu_addr_node_list_t *));
+		for(i=0 ; i<nb_nodes ; i++) sent_data[i] = starpu_addr_node_list_new();
+		received_data = malloc(nb_nodes * sizeof(struct starpu_addr_node_list_t *));
+		for(i=0 ; i<nb_nodes ; i++) received_data[i] = starpu_addr_node_list_new();
 	}
 }
 
@@ -59,15 +57,15 @@ void _starpu_data_deallocate(starpu_data_handle_t data_handle)
 
 int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 {
-        int arg_type;
-        va_list varg_list;
-        int me, do_execute, xrank, nb_nodes;
+	int arg_type;
+	va_list varg_list;
+	int me, do_execute, xrank, nb_nodes;
 	size_t *size_on_nodes;
 	size_t arg_buffer_size = 0;
 	char *arg_buffer;
-        int dest=0, inconsistent_execute;
+	int dest=0, inconsistent_execute;
 
-        _STARPU_MPI_LOG_IN();
+	_STARPU_MPI_LOG_IN();
 
 	MPI_Comm_rank(comm, &me);
 	MPI_Comm_size(comm, &nb_nodes);
@@ -76,78 +74,78 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 
 	_starpu_mpi_tables_init();
 
-        /* Get the number of buffers and the size of the arguments */
+	/* Get the number of buffers and the size of the arguments */
 	va_start(varg_list, codelet);
-        arg_buffer_size = _starpu_insert_task_get_arg_size(varg_list);
+	arg_buffer_size = _starpu_insert_task_get_arg_size(varg_list);
 
 	va_start(varg_list, codelet);
 	_starpu_codelet_pack_args(arg_buffer_size, &arg_buffer, varg_list);
 
 	/* Find out whether we are to execute the data because we own the data to be written to. */
-        inconsistent_execute = 0;
-        do_execute = -1;
+	inconsistent_execute = 0;
+	do_execute = -1;
 	xrank = -1;
 	va_start(varg_list, codelet);
 	while ((arg_type = va_arg(varg_list, int)) != 0) {
 		if (arg_type==STARPU_EXECUTE_ON_NODE) {
-                        xrank = va_arg(varg_list, int);
+			xrank = va_arg(varg_list, int);
 			_STARPU_MPI_DEBUG("Executing on node %d\n", xrank);
 			do_execute = 1;
-                }
+		}
 		else if (arg_type==STARPU_EXECUTE_ON_DATA) {
 			starpu_data_handle_t data = va_arg(varg_list, starpu_data_handle_t);
-                        xrank = starpu_data_get_rank(data);
+			xrank = starpu_data_get_rank(data);
 			_STARPU_MPI_DEBUG("Executing on data node %d\n", xrank);
 			STARPU_ASSERT(xrank <= nb_nodes);
 			do_execute = 1;
-                }
+		}
 		else if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type==STARPU_SCRATCH || arg_type==STARPU_REDUX) {
-                        starpu_data_handle_t data = va_arg(varg_list, starpu_data_handle_t);
+			starpu_data_handle_t data = va_arg(varg_list, starpu_data_handle_t);
 
-                        if (data && arg_type & STARPU_R) {
+			if (data && arg_type & STARPU_R) {
 				int rank = starpu_data_get_rank(data);
 				struct starpu_data_interface_ops *ops;
 				ops = data->ops;
 				size_on_nodes[rank] += ops->get_size(data);
 			}
 
-                        if (arg_type & STARPU_W) {
-                                if (!data) {
-                                        /* We don't have anything allocated for this.
-                                         * The application knows we won't do anything
-                                         * about this task */
-                                        /* Yes, the app could actually not call
-                                         * insert_task at all itself, this is just a
-                                         * safeguard. */
-                                        _STARPU_MPI_DEBUG("oh oh\n");
-                                        _STARPU_MPI_LOG_OUT();
+			if (arg_type & STARPU_W) {
+				if (!data) {
+					/* We don't have anything allocated for this.
+					 * The application knows we won't do anything
+					 * about this task */
+					/* Yes, the app could actually not call
+					 * insert_task at all itself, this is just a
+					 * safeguard. */
+					_STARPU_MPI_DEBUG("oh oh\n");
+					_STARPU_MPI_LOG_OUT();
 					free(size_on_nodes);
-                                        return -EINVAL;
-                                }
-                                int mpi_rank = starpu_data_get_rank(data);
-                                if (mpi_rank == me) {
-                                        if (do_execute == 0) {
-                                                inconsistent_execute = 1;
-                                        }
-                                        else {
-                                                do_execute = 1;
-                                        }
-                                }
-                                else if (mpi_rank != -1) {
-                                        if (do_execute == 1) {
-                                                inconsistent_execute = 1;
-                                        }
-                                        else {
-                                                do_execute = 0;
-                                                dest = mpi_rank;
-                                                /* That's the rank which needs the data to be sent to */
-                                        }
-                                }
-                                else {
-                                        _STARPU_ERROR("rank invalid\n");
-                                }
-                        }
-                }
+					return -EINVAL;
+				}
+				int mpi_rank = starpu_data_get_rank(data);
+				if (mpi_rank == me) {
+					if (do_execute == 0) {
+						inconsistent_execute = 1;
+					}
+					else {
+						do_execute = 1;
+					}
+				}
+				else if (mpi_rank != -1) {
+					if (do_execute == 1) {
+						inconsistent_execute = 1;
+					}
+					else {
+						do_execute = 0;
+						dest = mpi_rank;
+						/* That's the rank which needs the data to be sent to */
+					}
+				}
+				else {
+					_STARPU_ERROR("rank invalid\n");
+				}
+			}
+		}
 		else if (arg_type==STARPU_VALUE) {
 			va_arg(varg_list, void *);
 			va_arg(varg_list, size_t);
@@ -193,70 +191,87 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 
 	STARPU_ASSERT(do_execute != -1 && "StarPU needs to see a W or a REDUX data which will tell it where to execute the task");
 
-        if (inconsistent_execute == 1) {
-                if (xrank == -1) {
-                        _STARPU_MPI_DEBUG("Different tasks are owning W data. Needs to specify which one is to execute the codelet, using STARPU_EXECUTE_ON_NODE or STARPU_EXECUTE_ON_DATA\n");
+	if (inconsistent_execute == 1) {
+		if (xrank == -1) {
+			_STARPU_MPI_DEBUG("Different tasks are owning W data. Needs to specify which one is to execute the codelet, using STARPU_EXECUTE_ON_NODE or STARPU_EXECUTE_ON_DATA\n");
 			free(size_on_nodes);
 			return -EINVAL;
-                }
-                else {
-                        do_execute = (me == xrank);
-                        dest = xrank;
-                }
-        }
+		}
+		else {
+			do_execute = (me == xrank);
+			dest = xrank;
+		}
+	}
 	else if (xrank != -1) {
 		do_execute = (me == xrank);
 		dest = xrank;
 	}
 
-        /* Send and receive data as requested */
+	/* Send and receive data as requested */
 	va_start(varg_list, codelet);
 	while ((arg_type = va_arg(varg_list, int)) != 0) {
 		if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type == STARPU_SCRATCH) {
-                        starpu_data_handle_t data = va_arg(varg_list, starpu_data_handle_t);
-                        if (data && arg_type & STARPU_R) {
-                                int mpi_rank = starpu_data_get_rank(data);
+			starpu_data_handle_t data = va_arg(varg_list, starpu_data_handle_t);
+			if (data && arg_type & STARPU_R) {
+				int mpi_rank = starpu_data_get_rank(data);
 				int mpi_tag = starpu_data_get_tag(data);
 				STARPU_ASSERT(mpi_tag >= 0 && "StarPU needs to be told the MPI rank of this data, using starpu_data_set_rank");
-                                /* The task needs to read this data */
-                                if (do_execute && mpi_rank != me && mpi_rank != -1) {
-                                        /* I will have to execute but I don't have the data, receive */
+				/* The task needs to read this data */
+				if (do_execute && mpi_rank != me && mpi_rank != -1) {
+					/* I will have to execute but I don't have the data, receive */
 #ifdef MPI_CACHE
-                                        uint32_t key = starpu_crc32_be((uintptr_t)data, 0);
-                                        void *already_received = _starpu_htbl_search_32(received_data[mpi_rank], key);
-                                        if (!already_received) {
-                                                _starpu_htbl_insert_32(&received_data[mpi_rank], key, data);
-                                        }
-                                        else {
-                                                _STARPU_MPI_DEBUG("Do not receive data %p from node %d as it is already available\n", data, mpi_rank);
-                                        }
-                                        if (!already_received)
+					struct starpu_addr_node *it, *stored_data ;
+					void *already_received = NULL;
+					for (it = starpu_addr_node_list_begin(received_data[mpi_rank]); it != starpu_addr_node_list_end(received_data[mpi_rank]); it = starpu_addr_node_list_next(it)) {
+						if(((void *)it->ndata != NULL) && it->ndata == (uintptr_t)data) {
+							already_received=it->ndata;
+							break;
+						}
+					}
+					if (!already_received) {
+						stored_data = starpu_addr_node_new();
+						stored_data->ndata=(uintptr_t)data;
+						starpu_addr_node_list_push_front(received_data[mpi_rank], stored_data);
+					}
+					else {
+						_STARPU_MPI_DEBUG("Do not receive data %p from node %d as it is already available\n", data, mpi_rank);
+					}
+					if (!already_received)
 #endif
 					{
 						_STARPU_MPI_DEBUG("Receive data %p from %d\n", data, mpi_rank);
 						starpu_mpi_irecv_detached(data, mpi_rank, mpi_tag, comm, NULL, NULL);
 					}
-                                }
-                                if (!do_execute && mpi_rank == me) {
-                                        /* Somebody else will execute it, and I have the data, send it. */
+				}
+				if (!do_execute && mpi_rank == me) {
+					/* Somebody else will execute it, and I have the data, send it. */
 #ifdef MPI_CACHE
-                                        uint32_t key = starpu_crc32_be((uintptr_t)data, 0);
-                                        void *already_sent = _starpu_htbl_search_32(sent_data[dest], key);
-                                        if (!already_sent) {
-                                                _starpu_htbl_insert_32(&sent_data[dest], key, data);
-                                        }
-                                        else {
-                                                _STARPU_MPI_DEBUG("Do not sent data %p to node %d as it has already been sent\n", data, dest);
-                                        }
-                                        if (!already_sent)
+					struct starpu_addr_node *it, *stored_data;
+					void *already_sent = NULL;
+					for (it = starpu_addr_node_list_begin(sent_data[dest]); it != starpu_addr_node_list_end(sent_data[dest]); it = starpu_addr_node_list_next(it)) {
+						if((void *)it->ndata != NULL && it->ndata == (uintptr_t)data) {
+							already_sent=it;
+							break;
+						}
+					}
+
+					if (!already_sent) {
+						stored_data = starpu_addr_node_new();
+						stored_data->ndata=(uintptr_t)data;
+						starpu_addr_node_list_push_front(sent_data[dest], stored_data);
+					}
+					else {
+						_STARPU_MPI_DEBUG("Do not sent data %p to node %d as it has already been sent\n", data, dest);
+					}
+					if (!already_sent)
 #endif
 					{
 						_STARPU_MPI_DEBUG("Send data %p to %d\n", data, dest);
 						starpu_mpi_isend_detached(data, dest, mpi_tag, comm, NULL, NULL);
 					}
-                                }
-                        }
-                }
+				}
+			}
+		}
 		else if (arg_type==STARPU_VALUE) {
 			va_arg(varg_list, void *);
 			va_arg(varg_list, size_t);
@@ -280,107 +295,121 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 		else if (arg_type==STARPU_EXECUTE_ON_DATA) {
 			va_arg(varg_list, starpu_data_handle_t);
 		}
-        }
+	}
 	va_end(varg_list);
 
 	if (do_execute) {
-                _STARPU_MPI_DEBUG("Execution of the codelet %p (%s)\n", codelet, codelet->name);
-                va_start(varg_list, codelet);
-                struct starpu_task *task = starpu_task_create();
-                int ret = _starpu_insert_task_create_and_submit(arg_buffer, codelet, &task, varg_list);
-                _STARPU_MPI_DEBUG("ret: %d\n", ret);
-                STARPU_ASSERT(ret==0);
-        }
-
-        if (inconsistent_execute) {
-                va_start(varg_list, codelet);
-                while ((arg_type = va_arg(varg_list, int)) != 0) {
-                        if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type == STARPU_SCRATCH) {
-                                starpu_data_handle_t data = va_arg(varg_list, starpu_data_handle_t);
-                                if (arg_type & STARPU_W) {
-                                        int mpi_rank = starpu_data_get_rank(data);
+		_STARPU_MPI_DEBUG("Execution of the codelet %p (%s)\n", codelet, codelet->name);
+		va_start(varg_list, codelet);
+		struct starpu_task *task = starpu_task_create();
+		int ret = _starpu_insert_task_create_and_submit(arg_buffer, codelet, &task, varg_list);
+		_STARPU_MPI_DEBUG("ret: %d\n", ret);
+		STARPU_ASSERT(ret==0);
+	}
+
+	if (inconsistent_execute) {
+		va_start(varg_list, codelet);
+		while ((arg_type = va_arg(varg_list, int)) != 0) {
+			if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type == STARPU_SCRATCH) {
+				starpu_data_handle_t data = va_arg(varg_list, starpu_data_handle_t);
+				if (arg_type & STARPU_W) {
+					int mpi_rank = starpu_data_get_rank(data);
 					int mpi_tag = starpu_data_get_tag(data);
 					STARPU_ASSERT(mpi_tag >= 0 && "StarPU needs to be told the MPI rank of this data, using starpu_data_set_rank");
-                                        if (mpi_rank == me) {
-                                                if (xrank != -1 && me != xrank) {
-                                                        _STARPU_MPI_DEBUG("Receive data %p back from the task %d which executed the codelet ...\n", data, dest);
-                                                        starpu_mpi_irecv_detached(data, dest, mpi_tag, comm, NULL, NULL);
-                                                }
-                                        }
-                                        else if (do_execute) {
-                                                _STARPU_MPI_DEBUG("Send data %p back to its owner %d...\n", data, mpi_rank);
-                                                starpu_mpi_isend_detached(data, mpi_rank, mpi_tag, comm, NULL, NULL);
-                                        }
-                                }
-                        }
-                        else if (arg_type==STARPU_VALUE) {
-                                va_arg(varg_list, void *);
+					if (mpi_rank == me) {
+						if (xrank != -1 && me != xrank) {
+							_STARPU_MPI_DEBUG("Receive data %p back from the task %d which executed the codelet ...\n", data, dest);
+							starpu_mpi_irecv_detached(data, dest, mpi_tag, comm, NULL, NULL);
+						}
+					}
+					else if (do_execute) {
+						_STARPU_MPI_DEBUG("Send data %p back to its owner %d...\n", data, mpi_rank);
+						starpu_mpi_isend_detached(data, mpi_rank, mpi_tag, comm, NULL, NULL);
+					}
+				}
+			}
+			else if (arg_type==STARPU_VALUE) {
+				va_arg(varg_list, void *);
 				va_arg(varg_list, size_t);
-                        }
-                        else if (arg_type==STARPU_CALLBACK) {
-                                va_arg(varg_list, void (*)(void *));
-                        }
-                        else if (arg_type==STARPU_CALLBACK_WITH_ARG) {
-                                va_arg(varg_list, void (*)(void *));
-                                va_arg(varg_list, void *);
-                        }
-                        else if (arg_type==STARPU_CALLBACK_ARG) {
-                                va_arg(varg_list, void *);
-                        }
-                        else if (arg_type==STARPU_PRIORITY) {
-                                va_arg(varg_list, int);
-                        }
-                        else if (arg_type==STARPU_EXECUTE_ON_NODE) {
-                                va_arg(varg_list, int);
-                        }
-                        else if (arg_type==STARPU_EXECUTE_ON_DATA) {
-                                va_arg(varg_list, starpu_data_handle_t);
-                        }
-                }
-                va_end(varg_list);
-        }
+			}
+			else if (arg_type==STARPU_CALLBACK) {
+				va_arg(varg_list, void (*)(void *));
+			}
+			else if (arg_type==STARPU_CALLBACK_WITH_ARG) {
+				va_arg(varg_list, void (*)(void *));
+				va_arg(varg_list, void *);
+			}
+			else if (arg_type==STARPU_CALLBACK_ARG) {
+				va_arg(varg_list, void *);
+			}
+			else if (arg_type==STARPU_PRIORITY) {
+				va_arg(varg_list, int);
+			}
+			else if (arg_type==STARPU_EXECUTE_ON_NODE) {
+				va_arg(varg_list, int);
+			}
+			else if (arg_type==STARPU_EXECUTE_ON_DATA) {
+				va_arg(varg_list, starpu_data_handle_t);
+			}
+		}
+		va_end(varg_list);
+	}
 
 	va_start(varg_list, codelet);
 	while ((arg_type = va_arg(varg_list, int)) != 0) {
 		if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type == STARPU_SCRATCH) {
-                        starpu_data_handle_t data = va_arg(varg_list, starpu_data_handle_t);
+			starpu_data_handle_t data = va_arg(varg_list, starpu_data_handle_t);
 #ifdef MPI_CACHE
-                        if (arg_type & STARPU_W) {
-                                uint32_t key = starpu_crc32_be((uintptr_t)data, 0);
-                                if (do_execute) {
-                                        /* Note that all copies I've sent to neighbours are now invalid */
-                                        int n, size;
-                                        MPI_Comm_size(comm, &size);
-                                        for(n=0 ; n<size ; n++) {
-                                                void *already_sent = _starpu_htbl_search_32(sent_data[n], key);
-                                                if (already_sent) {
-                                                        _STARPU_MPI_DEBUG("Posting request to clear send cache for data %p\n", data);
-                                                        _starpu_mpi_clear_cache_request(data, n, _STARPU_MPI_CLEAR_SENT_DATA);
-                                                }
-                                        }
-                                }
-                                else {
-                                        int mpi_rank = starpu_data_get_rank(data);
-                                        void *already_received = _starpu_htbl_search_32(received_data[mpi_rank], key);
-                                        if (already_received) {
-                                                /* Somebody else will write to the data, so discard our cached copy if any */
-                                                /* TODO: starpu_mpi could just remember itself. */
-                                                _STARPU_MPI_DEBUG("Posting request to clear receive cache for data %p\n", data);
-                                                _starpu_mpi_clear_cache_request(data, mpi_rank, _STARPU_MPI_CLEAR_RECEIVED_DATA);
-                                                _starpu_data_deallocate(data);
-                                        }
-                                }
-                        }
+			if (arg_type & STARPU_W) {
+				if (do_execute) {
+					/* Note that all copies I've sent to neighbours are now invalid */
+					int n, size;
+					MPI_Comm_size(comm, &size);
+					for(n=0 ; n<size ; n++) {
+						struct starpu_addr_node *it ;
+						void *already_sent = NULL;
+						for (it = starpu_addr_node_list_begin(sent_data[n]); it < starpu_addr_node_list_end(sent_data[n]); it = starpu_addr_node_list_next(it)) {
+						if((void *)it->ndata != NULL && it->ndata == (uintptr_t)data) {
+								already_sent=it->ndata;
+								break;
+							}
+						}
+
+						if (already_sent) {
+							_STARPU_MPI_DEBUG("Posting request to clear send cache for data %p\n", data);
+							_starpu_mpi_clear_cache_request(data, n, _STARPU_MPI_CLEAR_SENT_DATA);
+						}
+					}
+				}
+				else {
+					int mpi_rank = starpu_data_get_rank(data);
+					void *already_received=NULL;
+					struct starpu_addr_node *it;
+					for (it = starpu_addr_node_list_begin(received_data[mpi_rank]); it < starpu_addr_node_list_end(received_data[mpi_rank]); it = starpu_addr_node_list_next(it)) {
+						if((void *)it->ndata !=NULL && it->ndata == (uintptr_t)data) {
+							already_received=it->ndata;
+							break;
+						}
+					}
+					if (already_received) {
+						/* Somebody else will write to the data, so discard our cached copy if any */
+						/* TODO: starpu_mpi could just remember itself. */
+						_STARPU_MPI_DEBUG("Posting request to clear receive cache for data %p\n", data);
+						_starpu_mpi_clear_cache_request(data, mpi_rank, _STARPU_MPI_CLEAR_RECEIVED_DATA);
+						_starpu_data_deallocate(data);
+					}
+				}
+			}
 #else
-                        /* We allocated a temporary buffer for the received data, now drop it */
-                        if ((arg_type & STARPU_R) && do_execute) {
-                                int mpi_rank = starpu_data_get_rank(data);
-                                if (mpi_rank != me && mpi_rank != -1) {
-                                        _starpu_data_deallocate(data);
-                                }
-                        }
+			/* We allocated a temporary buffer for the received data, now drop it */
+			if ((arg_type & STARPU_R) && do_execute) {
+				int mpi_rank = starpu_data_get_rank(data);
+				if (mpi_rank != me && mpi_rank != -1) {
+					_starpu_data_deallocate(data);
+				}
+			}
 #endif
-                }
+		}
 		else if (arg_type==STARPU_VALUE) {
 			va_arg(varg_list, void *);
 			va_arg(varg_list, size_t);
@@ -404,59 +433,59 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 		else if (arg_type==STARPU_EXECUTE_ON_DATA) {
 			va_arg(varg_list, starpu_data_handle_t);
 		}
-        }
+	}
 	va_end(varg_list);
-        _STARPU_MPI_LOG_OUT();
-        return 0;
+	_STARPU_MPI_LOG_OUT();
+	return 0;
 }
 
 void starpu_mpi_get_data_on_node_detached(MPI_Comm comm, starpu_data_handle_t data_handle, int node, void (*callback)(void*), void *arg)
 {
-        int me, rank, tag;
+	int me, rank, tag;
 
-        rank = starpu_data_get_rank(data_handle);
-        tag = starpu_data_get_tag(data_handle);
+	rank = starpu_data_get_rank(data_handle);
+	tag = starpu_data_get_tag(data_handle);
 	MPI_Comm_rank(comm, &me);
 
-        if (node == rank) return;
+	if (node == rank) return;
 
-        if (me == node)
-        {
+	if (me == node)
+	{
 		starpu_mpi_irecv_detached(data_handle, rank, tag, comm, callback, arg);
-        }
-        else if (me == rank)
-        {
+	}
+	else if (me == rank)
+	{
 		starpu_mpi_isend_detached(data_handle, node, tag, comm, NULL, NULL);
-        }
+	}
 }
 
 void starpu_mpi_get_data_on_node(MPI_Comm comm, starpu_data_handle_t data_handle, int node)
 {
-        int me, rank, tag;
+	int me, rank, tag;
 
-        rank = starpu_data_get_rank(data_handle);
-        tag = starpu_data_get_tag(data_handle);
+	rank = starpu_data_get_rank(data_handle);
+	tag = starpu_data_get_tag(data_handle);
 	MPI_Comm_rank(comm, &me);
 
-        if (node == rank) return;
-
-        if (me == node)
-        {
-                MPI_Status status;
-                starpu_mpi_recv(data_handle, rank, tag, comm, &status);
-        }
-        else if (me == rank)
-        {
-                starpu_mpi_send(data_handle, node, tag, comm);
-        }
+	if (node == rank) return;
+
+	if (me == node)
+	{
+		MPI_Status status;
+		starpu_mpi_recv(data_handle, rank, tag, comm, &status);
+	}
+	else if (me == rank)
+	{
+		starpu_mpi_send(data_handle, node, tag, comm);
+	}
 }
 
 void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle)
 {
-        int me, rank, tag, nb_nodes;
+	int me, rank, tag, nb_nodes;
 
-        rank = starpu_data_get_rank(data_handle);
-        tag = starpu_data_get_tag(data_handle);
+	rank = starpu_data_get_rank(data_handle);
+	tag = starpu_data_get_tag(data_handle);
 
 	MPI_Comm_rank(comm, &me);
 	MPI_Comm_size(comm, &nb_nodes);
@@ -477,9 +506,9 @@ void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle)
 
 				starpu_mpi_irecv_detached(new_handle, i, tag, comm, NULL, NULL);
 				starpu_insert_task(data_handle->redux_cl,
-						   STARPU_RW, data_handle,
-						   STARPU_R, new_handle,
-						   0);
+						STARPU_RW, data_handle,
+						STARPU_R, new_handle,
+						0);
 			}
 		}
 	}

+ 8 - 7
mpi/starpu_mpi_insert_task_cache.c

@@ -17,8 +17,6 @@
 
 #include <starpu_mpi_private.h>
 #include <starpu_mpi_insert_task_cache.h>
-#include <starpu_hash.h>
-#include <common/htable32.h>
 
 typedef struct _starpu_mpi_clear_cache_s {
         starpu_data_handle_t data;
@@ -26,21 +24,24 @@ typedef struct _starpu_mpi_clear_cache_s {
         int mode;
 } _starpu_mpi_clear_cache_t;
 
-struct starpu_htbl32_node **sent_data = NULL;
-struct starpu_htbl32_node **received_data = NULL;
+struct starpu_addr_node_list **sent_data = NULL;
+struct starpu_addr_node_list **received_data = NULL;
 
 void _starpu_mpi_clear_cache_callback(void *callback_arg)
 {
         _starpu_mpi_clear_cache_t *clear_cache = (_starpu_mpi_clear_cache_t *)callback_arg;
-        uint32_t key = starpu_crc32_be((uintptr_t)clear_cache->data, 0);
+		struct starpu_addr_node *stored_node = starpu_addr_node_new();
 
         if (clear_cache->mode == _STARPU_MPI_CLEAR_SENT_DATA) {
                 _STARPU_MPI_DEBUG("Clearing sent cache for data %p and rank %d\n", clear_cache->data, clear_cache->rank);
-                _starpu_htbl_insert_32(&sent_data[clear_cache->rank], key, NULL);
+				/* TODO: the implementation should be careful about freed memory. */
+				stored_node->ndata = (uintptr_t)clear_cache->data;
+				starpu_addr_node_list_push_front(sent_data[clear_cache->rank], stored_node);
         }
         else if (clear_cache->mode == _STARPU_MPI_CLEAR_RECEIVED_DATA) {
                 _STARPU_MPI_DEBUG("Clearing received cache for data %p and rank %d\n", clear_cache->data, clear_cache->rank);
-                _starpu_htbl_insert_32(&received_data[clear_cache->rank], key, NULL);
+				stored_node->ndata = (uintptr_t)clear_cache->data;
+				starpu_addr_node_list_push_front(received_data[clear_cache->rank], stored_node);
         }
 
         free(clear_cache);

+ 7 - 2
mpi/starpu_mpi_insert_task_cache.h

@@ -16,11 +16,16 @@
  */
 
 #include <starpu.h>
+#include <common/list.h>
 
 #define _STARPU_MPI_CLEAR_SENT_DATA     0
 #define _STARPU_MPI_CLEAR_RECEIVED_DATA 1
 
-extern struct starpu_htbl32_node **sent_data;
-extern struct starpu_htbl32_node **received_data;
+LIST_TYPE(starpu_addr_node, 
+		uintptr_t ndata;
+);
+
+extern struct starpu_addr_node_list **sent_data;
+extern struct starpu_addr_node_list **received_data;
 
 void _starpu_mpi_clear_cache_request(starpu_data_handle_t data_handle, int rank, int mode);