|
@@ -107,12 +107,7 @@ void _starpu_mpi_cache_data_clear(starpu_data_handle_t data_handle)
|
|
|
if (_starpu_cache_enabled == 0) return;
|
|
|
|
|
|
_starpu_mpi_cache_flush(data_handle);
|
|
|
- for(i=0 ; i<_starpu_cache_comm_size ; i++)
|
|
|
- {
|
|
|
- STARPU_PTHREAD_MUTEX_DESTROY(&mpi_data->cache_sent_mutex[i]);
|
|
|
- }
|
|
|
free(mpi_data->cache_sent);
|
|
|
- free(mpi_data->cache_sent_mutex);
|
|
|
}
|
|
|
|
|
|
void _starpu_mpi_cache_data_init(starpu_data_handle_t data_handle)
|
|
@@ -122,13 +117,10 @@ void _starpu_mpi_cache_data_init(starpu_data_handle_t data_handle)
|
|
|
|
|
|
if (_starpu_cache_enabled == 0) return;
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_INIT(&mpi_data->cache_received_mutex, NULL);
|
|
|
mpi_data->cache_received = 0;
|
|
|
- _STARPU_MALLOC(mpi_data->cache_sent_mutex, _starpu_cache_comm_size*sizeof(mpi_data->cache_sent_mutex[0]));
|
|
|
_STARPU_MALLOC(mpi_data->cache_sent, _starpu_cache_comm_size*sizeof(mpi_data->cache_sent[0]));
|
|
|
for(i=0 ; i<_starpu_cache_comm_size ; i++)
|
|
|
{
|
|
|
- STARPU_PTHREAD_MUTEX_INIT(&mpi_data->cache_sent_mutex[i], NULL);
|
|
|
mpi_data->cache_sent[i] = 0;
|
|
|
}
|
|
|
}
|
|
@@ -179,7 +171,6 @@ void _starpu_mpi_cache_received_data_clear(starpu_data_handle_t data_handle)
|
|
|
STARPU_ASSERT(mpi_data->magic == 42);
|
|
|
STARPU_MPI_ASSERT_MSG(mpi_rank < _starpu_cache_comm_size, "Node %d invalid. Max node is %d\n", mpi_rank, _starpu_cache_comm_size);
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mpi_data->cache_received_mutex);
|
|
|
if (mpi_data->cache_received == 1)
|
|
|
{
|
|
|
#ifdef STARPU_DEVEL
|
|
@@ -191,7 +182,6 @@ void _starpu_mpi_cache_received_data_clear(starpu_data_handle_t data_handle)
|
|
|
_starpu_mpi_cache_data_remove(data_handle);
|
|
|
_starpu_mpi_cache_stats_dec(mpi_rank, data_handle);
|
|
|
}
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mpi_data->cache_received_mutex);
|
|
|
}
|
|
|
|
|
|
int _starpu_mpi_cache_received_data_set(starpu_data_handle_t data_handle)
|
|
@@ -204,7 +194,6 @@ int _starpu_mpi_cache_received_data_set(starpu_data_handle_t data_handle)
|
|
|
STARPU_ASSERT(mpi_data->magic == 42);
|
|
|
STARPU_MPI_ASSERT_MSG(mpi_rank < _starpu_cache_comm_size, "Node %d invalid. Max node is %d\n", mpi_rank, _starpu_cache_comm_size);
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mpi_data->cache_received_mutex);
|
|
|
int already_received = mpi_data->cache_received;
|
|
|
if (already_received == 0)
|
|
|
{
|
|
@@ -217,7 +206,6 @@ int _starpu_mpi_cache_received_data_set(starpu_data_handle_t data_handle)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(2, "Do not receive data %p from node %d as it is already available\n", data_handle, mpi_rank);
|
|
|
}
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mpi_data->cache_received_mutex);
|
|
|
return already_received;
|
|
|
}
|
|
|
|
|
@@ -230,10 +218,7 @@ int _starpu_mpi_cache_received_data_get(starpu_data_handle_t data_handle)
|
|
|
|
|
|
STARPU_ASSERT(mpi_data->magic == 42);
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mpi_data->cache_received_mutex);
|
|
|
- already_received = mpi_data->cache_received;
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mpi_data->cache_received_mutex);
|
|
|
- return already_received;
|
|
|
+ return mpi_data->cache_received;
|
|
|
}
|
|
|
|
|
|
int starpu_mpi_cached_receive(starpu_data_handle_t data_handle)
|
|
@@ -254,14 +239,12 @@ void _starpu_mpi_cache_sent_data_clear(starpu_data_handle_t data_handle)
|
|
|
starpu_mpi_comm_size(mpi_data->node_tag.comm, &size);
|
|
|
for(n=0 ; n<size ; n++)
|
|
|
{
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mpi_data->cache_sent_mutex[n]);
|
|
|
if (mpi_data->cache_sent[n] == 1)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(2, "Clearing send cache for data %p\n", data_handle);
|
|
|
mpi_data->cache_sent[n] = 0;
|
|
|
_starpu_mpi_cache_data_remove(data_handle);
|
|
|
}
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mpi_data->cache_sent_mutex[n]);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -273,7 +256,6 @@ int _starpu_mpi_cache_sent_data_set(starpu_data_handle_t data_handle, int dest)
|
|
|
|
|
|
STARPU_MPI_ASSERT_MSG(dest < _starpu_cache_comm_size, "Node %d invalid. Max node is %d\n", dest, _starpu_cache_comm_size);
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mpi_data->cache_sent_mutex[dest]);
|
|
|
int already_sent = mpi_data->cache_sent[dest];
|
|
|
if (mpi_data->cache_sent[dest] == 0)
|
|
|
{
|
|
@@ -285,7 +267,6 @@ int _starpu_mpi_cache_sent_data_set(starpu_data_handle_t data_handle, int dest)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(2, "Do not send data %p to node %d as it has already been sent\n", data_handle, dest);
|
|
|
}
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mpi_data->cache_sent_mutex[dest]);
|
|
|
return already_sent;
|
|
|
}
|
|
|
|
|
@@ -297,11 +278,7 @@ int _starpu_mpi_cache_sent_data_get(starpu_data_handle_t data_handle, int dest)
|
|
|
if (_starpu_cache_enabled == 0) return 0;
|
|
|
|
|
|
STARPU_MPI_ASSERT_MSG(dest < _starpu_cache_comm_size, "Node %d invalid. Max node is %d\n", dest, _starpu_cache_comm_size);
|
|
|
-
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mpi_data->cache_sent_mutex[dest]);
|
|
|
- already_sent = mpi_data->cache_sent[dest];
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mpi_data->cache_sent_mutex[dest]);
|
|
|
- return already_sent;
|
|
|
+ return already_sent = mpi_data->cache_sent[dest];
|
|
|
}
|
|
|
|
|
|
int starpu_mpi_cached_send(starpu_data_handle_t data_handle, int dest)
|
|
@@ -319,17 +296,14 @@ void _starpu_mpi_cache_flush(starpu_data_handle_t data_handle)
|
|
|
starpu_mpi_comm_size(mpi_data->node_tag.comm, &nb_nodes);
|
|
|
for(i=0 ; i<nb_nodes ; i++)
|
|
|
{
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mpi_data->cache_sent_mutex[i]);
|
|
|
if (mpi_data->cache_sent[i] == 1)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(2, "Clearing send cache for data %p\n", data_handle);
|
|
|
mpi_data->cache_sent[i] = 0;
|
|
|
_starpu_mpi_cache_stats_dec(i, data_handle);
|
|
|
}
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mpi_data->cache_sent_mutex[i]);
|
|
|
}
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mpi_data->cache_received_mutex);
|
|
|
if (mpi_data->cache_received == 1)
|
|
|
{
|
|
|
int mpi_rank = starpu_mpi_data_get_rank(data_handle);
|
|
@@ -337,7 +311,6 @@ void _starpu_mpi_cache_flush(starpu_data_handle_t data_handle)
|
|
|
mpi_data->cache_received = 0;
|
|
|
_starpu_mpi_cache_stats_dec(mpi_rank, data_handle);
|
|
|
}
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mpi_data->cache_received_mutex);
|
|
|
}
|
|
|
|
|
|
static void _starpu_mpi_cache_flush_and_invalidate(MPI_Comm comm, starpu_data_handle_t data_handle)
|