|
@@ -21,16 +21,17 @@
|
|
#include <starpu.h>
|
|
#include <starpu.h>
|
|
#include <starpu_data.h>
|
|
#include <starpu_data.h>
|
|
#include <common/utils.h>
|
|
#include <common/utils.h>
|
|
|
|
+#include <common/htable64.h>
|
|
#include <util/starpu_insert_task_utils.h>
|
|
#include <util/starpu_insert_task_utils.h>
|
|
#include <datawizard/coherency.h>
|
|
#include <datawizard/coherency.h>
|
|
|
|
|
|
-//#define STARPU_MPI_VERBOSE 1
|
|
|
|
|
|
+//#define STARPU_MPI_VERBOSE 1
|
|
#include <starpu_mpi_private.h>
|
|
#include <starpu_mpi_private.h>
|
|
|
|
|
|
/* Whether we are allowed to keep copies of remote data. Does not work
|
|
/* Whether we are allowed to keep copies of remote data. Does not work
|
|
* yet: the sender has to know whether the receiver has it, keeping it
|
|
* yet: the sender has to know whether the receiver has it, keeping it
|
|
* in an array indexed by node numbers. */
|
|
* in an array indexed by node numbers. */
|
|
-#define MPI_CACHE
|
|
|
|
|
|
+#define MPI_CACHE 1
|
|
#include <starpu_mpi_insert_task_cache.h>
|
|
#include <starpu_mpi_insert_task_cache.h>
|
|
|
|
|
|
static void _starpu_mpi_tables_init()
|
|
static void _starpu_mpi_tables_init()
|
|
@@ -40,11 +41,11 @@ static void _starpu_mpi_tables_init()
|
|
int i;
|
|
int i;
|
|
|
|
|
|
MPI_Comm_size(MPI_COMM_WORLD, &nb_nodes);
|
|
MPI_Comm_size(MPI_COMM_WORLD, &nb_nodes);
|
|
- _STARPU_MPI_DEBUG("Initialising lists for cache\n");
|
|
|
|
- sent_data = malloc(nb_nodes * sizeof(struct starpu_addr_node_list_t *));
|
|
|
|
- for(i=0 ; i<nb_nodes ; i++) sent_data[i] = starpu_addr_node_list_new();
|
|
|
|
- received_data = malloc(nb_nodes * sizeof(struct starpu_addr_node_list_t *));
|
|
|
|
- for(i=0 ; i<nb_nodes ; i++) received_data[i] = starpu_addr_node_list_new();
|
|
|
|
|
|
+ _STARPU_MPI_DEBUG("Initialising htable for cache\n");
|
|
|
|
+ sent_data = malloc(nb_nodes * sizeof(struct starpu_htbl64_node *));
|
|
|
|
+ for(i=0 ; i<nb_nodes ; i++) sent_data[i] = NULL;
|
|
|
|
+ received_data = malloc(nb_nodes * sizeof(struct starpu_htbl64_node *));
|
|
|
|
+ for(i=0 ; i<nb_nodes ; i++) received_data[i] = NULL;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -220,18 +221,9 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
|
|
if (do_execute && mpi_rank != me && mpi_rank != -1) {
|
|
if (do_execute && mpi_rank != me && mpi_rank != -1) {
|
|
/* I will have to execute but I don't have the data, receive */
|
|
/* I will have to execute but I don't have the data, receive */
|
|
#ifdef MPI_CACHE
|
|
#ifdef MPI_CACHE
|
|
- struct starpu_addr_node *it, *stored_data ;
|
|
|
|
- void *already_received = NULL;
|
|
|
|
- for (it = starpu_addr_node_list_begin(received_data[mpi_rank]); it != starpu_addr_node_list_end(received_data[mpi_rank]); it = starpu_addr_node_list_next(it)) {
|
|
|
|
- if(((void *)it->ndata != NULL) && it->ndata == (uintptr_t)data) {
|
|
|
|
- already_received=it->ndata;
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ void *already_received = _starpu_htbl_search_64(received_data[mpi_rank], data);
|
|
if (!already_received) {
|
|
if (!already_received) {
|
|
- stored_data = starpu_addr_node_new();
|
|
|
|
- stored_data->ndata=(uintptr_t)data;
|
|
|
|
- starpu_addr_node_list_push_front(received_data[mpi_rank], stored_data);
|
|
|
|
|
|
+ _starpu_htbl_insert_64(&received_data[mpi_rank], data, data);
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
_STARPU_MPI_DEBUG("Do not receive data %p from node %d as it is already available\n", data, mpi_rank);
|
|
_STARPU_MPI_DEBUG("Do not receive data %p from node %d as it is already available\n", data, mpi_rank);
|
|
@@ -246,22 +238,13 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
|
|
if (!do_execute && mpi_rank == me) {
|
|
if (!do_execute && mpi_rank == me) {
|
|
/* Somebody else will execute it, and I have the data, send it. */
|
|
/* Somebody else will execute it, and I have the data, send it. */
|
|
#ifdef MPI_CACHE
|
|
#ifdef MPI_CACHE
|
|
- struct starpu_addr_node *it, *stored_data;
|
|
|
|
- void *already_sent = NULL;
|
|
|
|
- for (it = starpu_addr_node_list_begin(sent_data[dest]); it != starpu_addr_node_list_end(sent_data[dest]); it = starpu_addr_node_list_next(it)) {
|
|
|
|
- if((void *)it->ndata != NULL && it->ndata == (uintptr_t)data) {
|
|
|
|
- already_sent=it;
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ void *already_sent = _starpu_htbl_search_64(sent_data[dest], data);
|
|
|
|
|
|
if (!already_sent) {
|
|
if (!already_sent) {
|
|
- stored_data = starpu_addr_node_new();
|
|
|
|
- stored_data->ndata=(uintptr_t)data;
|
|
|
|
- starpu_addr_node_list_push_front(sent_data[dest], stored_data);
|
|
|
|
|
|
+ _starpu_htbl_insert_64(&sent_data[dest], data, data);
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
- _STARPU_MPI_DEBUG("Do not sent data %p to node %d as it has already been sent\n", data, dest);
|
|
|
|
|
|
+ _STARPU_MPI_DEBUG("Do not send data %p to node %d as it has already been sent\n", data, dest);
|
|
}
|
|
}
|
|
if (!already_sent)
|
|
if (!already_sent)
|
|
#endif
|
|
#endif
|
|
@@ -366,14 +349,7 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
|
|
int n, size;
|
|
int n, size;
|
|
MPI_Comm_size(comm, &size);
|
|
MPI_Comm_size(comm, &size);
|
|
for(n=0 ; n<size ; n++) {
|
|
for(n=0 ; n<size ; n++) {
|
|
- struct starpu_addr_node *it ;
|
|
|
|
- void *already_sent = NULL;
|
|
|
|
- for (it = starpu_addr_node_list_begin(sent_data[n]); it < starpu_addr_node_list_end(sent_data[n]); it = starpu_addr_node_list_next(it)) {
|
|
|
|
- if((void *)it->ndata != NULL && it->ndata == (uintptr_t)data) {
|
|
|
|
- already_sent=it->ndata;
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ void *already_sent = _starpu_htbl_search_64(sent_data[n], data);
|
|
|
|
|
|
if (already_sent) {
|
|
if (already_sent) {
|
|
_STARPU_MPI_DEBUG("Posting request to clear send cache for data %p\n", data);
|
|
_STARPU_MPI_DEBUG("Posting request to clear send cache for data %p\n", data);
|
|
@@ -383,14 +359,7 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
int mpi_rank = starpu_data_get_rank(data);
|
|
int mpi_rank = starpu_data_get_rank(data);
|
|
- void *already_received=NULL;
|
|
|
|
- struct starpu_addr_node *it;
|
|
|
|
- for (it = starpu_addr_node_list_begin(received_data[mpi_rank]); it < starpu_addr_node_list_end(received_data[mpi_rank]); it = starpu_addr_node_list_next(it)) {
|
|
|
|
- if((void *)it->ndata !=NULL && it->ndata == (uintptr_t)data) {
|
|
|
|
- already_received=it->ndata;
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ void *already_received=_starpu_htbl_search_64(received_data[mpi_rank], data);
|
|
if (already_received) {
|
|
if (already_received) {
|
|
/* Somebody else will write to the data, so discard our cached copy if any */
|
|
/* Somebody else will write to the data, so discard our cached copy if any */
|
|
/* TODO: starpu_mpi could just remember itself. */
|
|
/* TODO: starpu_mpi could just remember itself. */
|
|
@@ -434,6 +403,7 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
|
|
va_arg(varg_list, starpu_data_handle_t);
|
|
va_arg(varg_list, starpu_data_handle_t);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
va_end(varg_list);
|
|
va_end(varg_list);
|
|
_STARPU_MPI_LOG_OUT();
|
|
_STARPU_MPI_LOG_OUT();
|
|
return 0;
|
|
return 0;
|