|
@@ -37,6 +37,8 @@ int _starpu_cache_enabled=1;
|
|
|
static MPI_Comm _starpu_cache_comm;
|
|
|
static int _starpu_cache_comm_size;
|
|
|
|
|
|
+static void _starpu_mpi_cache_flush_nolock(starpu_data_handle_t data_handle);
|
|
|
+
|
|
|
int starpu_mpi_cache_is_enabled()
|
|
|
{
|
|
|
return _starpu_cache_enabled==1;
|
|
@@ -106,8 +108,10 @@ void _starpu_mpi_cache_data_clear(starpu_data_handle_t data_handle)
|
|
|
|
|
|
if (_starpu_cache_enabled == 0) return;
|
|
|
|
|
|
- _starpu_mpi_cache_flush(data_handle);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
|
|
|
+ _starpu_mpi_cache_flush_nolock(data_handle);
|
|
|
free(mpi_data->cache_sent);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
|
|
|
}
|
|
|
|
|
|
void _starpu_mpi_cache_data_init(starpu_data_handle_t data_handle)
|
|
@@ -117,21 +121,22 @@ void _starpu_mpi_cache_data_init(starpu_data_handle_t data_handle)
|
|
|
|
|
|
if (_starpu_cache_enabled == 0) return;
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
|
|
|
mpi_data->cache_received = 0;
|
|
|
_STARPU_MALLOC(mpi_data->cache_sent, _starpu_cache_comm_size*sizeof(mpi_data->cache_sent[0]));
|
|
|
for(i=0 ; i<_starpu_cache_comm_size ; i++)
|
|
|
{
|
|
|
mpi_data->cache_sent[i] = 0;
|
|
|
}
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
|
|
|
}
|
|
|
|
|
|
-static void _starpu_mpi_cache_data_add(starpu_data_handle_t data_handle)
|
|
|
+static void _starpu_mpi_cache_data_add_nolock(starpu_data_handle_t data_handle)
|
|
|
{
|
|
|
struct _starpu_data_entry *entry;
|
|
|
|
|
|
if (_starpu_cache_enabled == 0) return;
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
|
|
|
HASH_FIND_PTR(_cache_data, &data_handle, entry);
|
|
|
if (entry == NULL)
|
|
|
{
|
|
@@ -139,23 +144,20 @@ static void _starpu_mpi_cache_data_add(starpu_data_handle_t data_handle)
|
|
|
entry->data_handle = data_handle;
|
|
|
HASH_ADD_PTR(_cache_data, data_handle, entry);
|
|
|
}
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
|
|
|
}
|
|
|
|
|
|
-static void _starpu_mpi_cache_data_remove(starpu_data_handle_t data_handle)
|
|
|
+static void _starpu_mpi_cache_data_remove_nolock(starpu_data_handle_t data_handle)
|
|
|
{
|
|
|
struct _starpu_data_entry *entry;
|
|
|
|
|
|
if (_starpu_cache_enabled == 0) return;
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
|
|
|
HASH_FIND_PTR(_cache_data, &data_handle, entry);
|
|
|
if (entry)
|
|
|
{
|
|
|
HASH_DEL(_cache_data, entry);
|
|
|
free(entry);
|
|
|
}
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
|
|
|
}
|
|
|
|
|
|
/**************************************
|
|
@@ -168,6 +170,7 @@ void _starpu_mpi_cache_received_data_clear(starpu_data_handle_t data_handle)
|
|
|
|
|
|
if (_starpu_cache_enabled == 0) return;
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
|
|
|
STARPU_ASSERT(mpi_data->magic == 42);
|
|
|
STARPU_MPI_ASSERT_MSG(mpi_rank < _starpu_cache_comm_size, "Node %d invalid. Max node is %d\n", mpi_rank, _starpu_cache_comm_size);
|
|
|
|
|
@@ -179,9 +182,10 @@ void _starpu_mpi_cache_received_data_clear(starpu_data_handle_t data_handle)
|
|
|
_STARPU_MPI_DEBUG(2, "Clearing receive cache for data %p\n", data_handle);
|
|
|
mpi_data->cache_received = 0;
|
|
|
starpu_data_invalidate_submit(data_handle);
|
|
|
- _starpu_mpi_cache_data_remove(data_handle);
|
|
|
+ _starpu_mpi_cache_data_remove_nolock(data_handle);
|
|
|
_starpu_mpi_cache_stats_dec(mpi_rank, data_handle);
|
|
|
}
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
|
|
|
}
|
|
|
|
|
|
int _starpu_mpi_cache_received_data_set(starpu_data_handle_t data_handle)
|
|
@@ -191,6 +195,7 @@ int _starpu_mpi_cache_received_data_set(starpu_data_handle_t data_handle)
|
|
|
|
|
|
if (_starpu_cache_enabled == 0) return 0;
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
|
|
|
STARPU_ASSERT(mpi_data->magic == 42);
|
|
|
STARPU_MPI_ASSERT_MSG(mpi_rank < _starpu_cache_comm_size, "Node %d invalid. Max node is %d\n", mpi_rank, _starpu_cache_comm_size);
|
|
|
|
|
@@ -199,13 +204,14 @@ int _starpu_mpi_cache_received_data_set(starpu_data_handle_t data_handle)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(2, "Noting that data %p has already been received by %d\n", data_handle, mpi_rank);
|
|
|
mpi_data->cache_received = 1;
|
|
|
- _starpu_mpi_cache_data_add(data_handle);
|
|
|
+ _starpu_mpi_cache_data_add_nolock(data_handle);
|
|
|
_starpu_mpi_cache_stats_inc(mpi_rank, data_handle);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(2, "Do not receive data %p from node %d as it is already available\n", data_handle, mpi_rank);
|
|
|
}
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
|
|
|
return already_received;
|
|
|
}
|
|
|
|
|
@@ -216,9 +222,11 @@ int _starpu_mpi_cache_received_data_get(starpu_data_handle_t data_handle)
|
|
|
|
|
|
if (_starpu_cache_enabled == 0) return 0;
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
|
|
|
STARPU_ASSERT(mpi_data->magic == 42);
|
|
|
-
|
|
|
- return mpi_data->cache_received;
|
|
|
+ already_received = mpi_data->cache_received;
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
|
|
|
+ return already_received;
|
|
|
}
|
|
|
|
|
|
int starpu_mpi_cached_receive(starpu_data_handle_t data_handle)
|
|
@@ -236,6 +244,7 @@ void _starpu_mpi_cache_sent_data_clear(starpu_data_handle_t data_handle)
|
|
|
|
|
|
if (_starpu_cache_enabled == 0) return;
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
|
|
|
starpu_mpi_comm_size(mpi_data->node_tag.comm, &size);
|
|
|
for(n=0 ; n<size ; n++)
|
|
|
{
|
|
@@ -243,9 +252,10 @@ void _starpu_mpi_cache_sent_data_clear(starpu_data_handle_t data_handle)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(2, "Clearing send cache for data %p\n", data_handle);
|
|
|
mpi_data->cache_sent[n] = 0;
|
|
|
- _starpu_mpi_cache_data_remove(data_handle);
|
|
|
+ _starpu_mpi_cache_data_remove_nolock(data_handle);
|
|
|
}
|
|
|
}
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
|
|
|
}
|
|
|
|
|
|
int _starpu_mpi_cache_sent_data_set(starpu_data_handle_t data_handle, int dest)
|
|
@@ -256,17 +266,19 @@ int _starpu_mpi_cache_sent_data_set(starpu_data_handle_t data_handle, int dest)
|
|
|
|
|
|
STARPU_MPI_ASSERT_MSG(dest < _starpu_cache_comm_size, "Node %d invalid. Max node is %d\n", dest, _starpu_cache_comm_size);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
|
|
|
int already_sent = mpi_data->cache_sent[dest];
|
|
|
if (mpi_data->cache_sent[dest] == 0)
|
|
|
{
|
|
|
mpi_data->cache_sent[dest] = 1;
|
|
|
- _starpu_mpi_cache_data_add(data_handle);
|
|
|
+ _starpu_mpi_cache_data_add_nolock(data_handle);
|
|
|
_STARPU_MPI_DEBUG(2, "Noting that data %p has already been sent to %d\n", data_handle, dest);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(2, "Do not send data %p to node %d as it has already been sent\n", data_handle, dest);
|
|
|
}
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
|
|
|
return already_sent;
|
|
|
}
|
|
|
|
|
@@ -277,8 +289,11 @@ int _starpu_mpi_cache_sent_data_get(starpu_data_handle_t data_handle, int dest)
|
|
|
|
|
|
if (_starpu_cache_enabled == 0) return 0;
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
|
|
|
STARPU_MPI_ASSERT_MSG(dest < _starpu_cache_comm_size, "Node %d invalid. Max node is %d\n", dest, _starpu_cache_comm_size);
|
|
|
- return already_sent = mpi_data->cache_sent[dest];
|
|
|
+ already_sent = mpi_data->cache_sent[dest];
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
|
|
|
+ return already_sent;
|
|
|
}
|
|
|
|
|
|
int starpu_mpi_cached_send(starpu_data_handle_t data_handle, int dest)
|
|
@@ -286,7 +301,7 @@ int starpu_mpi_cached_send(starpu_data_handle_t data_handle, int dest)
|
|
|
return _starpu_mpi_cache_sent_data_get(data_handle, dest);
|
|
|
}
|
|
|
|
|
|
-void _starpu_mpi_cache_flush(starpu_data_handle_t data_handle)
|
|
|
+static void _starpu_mpi_cache_flush_nolock(starpu_data_handle_t data_handle)
|
|
|
{
|
|
|
struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
|
|
|
int i, nb_nodes;
|
|
@@ -313,11 +328,20 @@ void _starpu_mpi_cache_flush(starpu_data_handle_t data_handle)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-static void _starpu_mpi_cache_flush_and_invalidate(MPI_Comm comm, starpu_data_handle_t data_handle)
|
|
|
+void _starpu_mpi_cache_flush(starpu_data_handle_t data_handle)
|
|
|
+{
|
|
|
+ if (_starpu_cache_enabled == 0) return;
|
|
|
+
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
|
|
|
+ _starpu_mpi_cache_flush_nolock(data_handle);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
|
|
|
+}
|
|
|
+
|
|
|
+static void _starpu_mpi_cache_flush_and_invalidate_nolock(MPI_Comm comm, starpu_data_handle_t data_handle)
|
|
|
{
|
|
|
int my_rank, mpi_rank;
|
|
|
|
|
|
- _starpu_mpi_cache_flush(data_handle);
|
|
|
+ _starpu_mpi_cache_flush_nolock(data_handle);
|
|
|
|
|
|
starpu_mpi_comm_rank(comm, &my_rank);
|
|
|
mpi_rank = starpu_mpi_data_get_rank(data_handle);
|
|
@@ -329,8 +353,10 @@ void starpu_mpi_cache_flush(MPI_Comm comm, starpu_data_handle_t data_handle)
|
|
|
{
|
|
|
if (_starpu_cache_enabled == 0) return;
|
|
|
|
|
|
- _starpu_mpi_cache_flush_and_invalidate(comm, data_handle);
|
|
|
- _starpu_mpi_cache_data_remove(data_handle);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
|
|
|
+ _starpu_mpi_cache_flush_and_invalidate_nolock(comm, data_handle);
|
|
|
+ _starpu_mpi_cache_data_remove_nolock(data_handle);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
|
|
|
}
|
|
|
|
|
|
void starpu_mpi_cache_flush_all_data(MPI_Comm comm)
|
|
@@ -342,7 +368,7 @@ void starpu_mpi_cache_flush_all_data(MPI_Comm comm)
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
|
|
|
HASH_ITER(hh, _cache_data, entry, tmp)
|
|
|
{
|
|
|
- _starpu_mpi_cache_flush_and_invalidate(comm, entry->data_handle);
|
|
|
+ _starpu_mpi_cache_flush_and_invalidate_nolock(comm, entry->data_handle);
|
|
|
HASH_DEL(_cache_data, entry);
|
|
|
free(entry);
|
|
|
}
|