Explorar el Código

mpi/src/starpu_mpi_task_insert.c: starpu_mpi_cache_flush_all_data() also invalidates data which are flushed from the cache

Nathalie Furmento hace 11 años
padre
commit
08d868999a
Se han modificado 1 ficheros con 26 adiciones y 4 borrados
  1. 26 4
      mpi/src/starpu_mpi_task_insert.c

+ 26 - 4
mpi/src/starpu_mpi_task_insert.c

@@ -102,25 +102,47 @@ void _starpu_mpi_cache_free(int world_size)
 
 void starpu_mpi_cache_flush_all_data(MPI_Comm comm)
 {
-	int nb_nodes;
+	int nb_nodes, i;
+	int mpi_rank, my_rank;
 
 	if (_cache_enabled == 0) return;
 
 	MPI_Comm_size(comm, &nb_nodes);
-	_starpu_mpi_cache_empty_tables(nb_nodes);
-	/* TODO: also invalidate all data */
+	MPI_Comm_rank(comm, &my_rank);
+
+	for(i=0 ; i<nb_nodes ; i++)
+	{
+		struct _starpu_data_entry *entry, *tmp;
+		HASH_ITER(hh, _cache_sent_data[i], entry, tmp)
+		{
+			mpi_rank = starpu_data_get_rank((starpu_data_handle_t) entry->data);
+			if (mpi_rank != my_rank && mpi_rank != -1)
+				starpu_data_invalidate_submit((starpu_data_handle_t) entry->data);
+			HASH_DEL(_cache_sent_data[i], entry);
+			free(entry);
+		}
+		HASH_ITER(hh, _cache_received_data[i], entry, tmp)
+		{
+			mpi_rank = starpu_data_get_rank((starpu_data_handle_t) entry->data);
+			if (mpi_rank != my_rank && mpi_rank != -1)
+				starpu_data_invalidate_submit((starpu_data_handle_t) entry->data);
+			HASH_DEL(_cache_received_data[i], entry);
+			free(entry);
+		}
+	}
 }
 
 void starpu_mpi_cache_flush(MPI_Comm comm, starpu_data_handle_t data_handle)
 {
 	struct _starpu_data_entry *avail;
 	int i, my_rank, nb_nodes;
-	int mpi_rank = starpu_data_get_rank(data_handle);
+	int mpi_rank;
 
 	if (_cache_enabled == 0) return;
 
 	MPI_Comm_size(comm, &nb_nodes);
 	MPI_Comm_rank(comm, &my_rank);
+	mpi_rank = starpu_data_get_rank(data_handle);
 
 	for(i=0 ; i<nb_nodes ; i++)
 	{