ソースを参照

Turn a mutex into an rwlock which should be almost never taken wr

Samuel Thibault 7 年 前
コミット
fc9389df54
共有1 個のファイルを変更した25 個の追加14 個の削除を含む
  1. 25 14
      mpi/src/mpi/starpu_mpi_comm.c

+ 25 - 14
mpi/src/mpi/starpu_mpi_comm.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2011, 2012, 2013, 2014, 2015, 2016, 2017  CNRS
- * Copyright (C) 2011-2016  Université de Bordeaux
+ * Copyright (C) 2011-2017  Université de Bordeaux
  * Copyright (C) 2014 Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -43,7 +43,9 @@ struct _starpu_mpi_comm_hashtable
 	MPI_Comm comm;
 };
 
-static starpu_pthread_mutex_t _starpu_mpi_comms_mutex;
+/* Protect between comm addition from submitting tasks and MPI thread */
+static starpu_pthread_rwlock_t _starpu_mpi_comms_mutex;
+
 struct _starpu_mpi_comm_hashtable *_starpu_mpi_comms_cache;
 struct _starpu_mpi_comm **_starpu_mpi_comms;
 int _starpu_mpi_comm_nb;
@@ -58,7 +60,7 @@ void _starpu_mpi_comm_init(MPI_Comm comm)
 	_starpu_mpi_comm_nb=0;
 	_starpu_mpi_comm_tested=0;
 	_starpu_mpi_comms_cache = NULL;
-	STARPU_PTHREAD_MUTEX_INIT(&_starpu_mpi_comms_mutex, NULL);
+	STARPU_PTHREAD_RWLOCK_INIT(&_starpu_mpi_comms_mutex, NULL);
 
 	_starpu_mpi_comm_register(comm);
 }
@@ -85,18 +87,27 @@ void _starpu_mpi_comm_shutdown()
 		free(entry);
 	}
 
-	STARPU_PTHREAD_MUTEX_DESTROY(&_starpu_mpi_comms_mutex);
+	STARPU_PTHREAD_RWLOCK_DESTROY(&_starpu_mpi_comms_mutex);
 }
 
 void _starpu_mpi_comm_register(MPI_Comm comm)
 {
 	struct _starpu_mpi_comm_hashtable *found;
 
-	STARPU_PTHREAD_MUTEX_LOCK(&_starpu_mpi_comms_mutex);
+	STARPU_PTHREAD_RWLOCK_RDLOCK(&_starpu_mpi_comms_mutex);
 	HASH_FIND(hh, _starpu_mpi_comms_cache, &comm, sizeof(MPI_Comm), found);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&_starpu_mpi_comms_mutex);
 	if (found)
 	{
 		_STARPU_MPI_DEBUG(10, "comm %ld (%ld) already registered\n", (long int)comm, (long int)MPI_COMM_WORLD);
+		return;
+	}
+
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&_starpu_mpi_comms_mutex);
+	HASH_FIND(hh, _starpu_mpi_comms_cache, &comm, sizeof(MPI_Comm), found);
+	if (found)
+	{
+		_STARPU_MPI_DEBUG(10, "comm %ld (%ld) already registered in between\n", (long int)comm, (long int)MPI_COMM_WORLD);
 	}
 	else
 	{
@@ -125,14 +136,14 @@ void _starpu_mpi_comm_register(MPI_Comm comm)
 		_comm->done = 0;
 #endif
 	}
-	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_comms_mutex);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&_starpu_mpi_comms_mutex);
 }
 
 void _starpu_mpi_comm_post_recv()
 {
 	int i;
 
-	STARPU_PTHREAD_MUTEX_LOCK(&_starpu_mpi_comms_mutex);
+	STARPU_PTHREAD_RWLOCK_RDLOCK(&_starpu_mpi_comms_mutex);
 	for(i=0 ; i<_starpu_mpi_comm_nb ; i++)
 	{
 		struct _starpu_mpi_comm *_comm = _starpu_mpi_comms[i]; // get the ith _comm;
@@ -147,14 +158,14 @@ void _starpu_mpi_comm_post_recv()
 			_comm->posted = 1;
 		}
 	}
-	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_comms_mutex);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&_starpu_mpi_comms_mutex);
 }
 
 int _starpu_mpi_comm_test_recv(MPI_Status *status, struct _starpu_mpi_envelope **envelope, MPI_Comm *comm)
 {
 	int i=_starpu_mpi_comm_tested;
 
-	STARPU_PTHREAD_MUTEX_LOCK(&_starpu_mpi_comms_mutex);
+	STARPU_PTHREAD_RWLOCK_RDLOCK(&_starpu_mpi_comms_mutex);
 	while (1)
 	{
 		struct _starpu_mpi_comm *_comm = _starpu_mpi_comms[i]; // get the ith _comm;
@@ -178,7 +189,7 @@ int _starpu_mpi_comm_test_recv(MPI_Status *status, struct _starpu_mpi_envelope *
 					_starpu_mpi_comm_tested = 0;
 				*envelope = _comm->envelope;
 				*comm = _comm->comm;
-				STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_comms_mutex);
+				STARPU_PTHREAD_RWLOCK_UNLOCK(&_starpu_mpi_comms_mutex);
 				return 1;
 			}
 		}
@@ -190,11 +201,11 @@ int _starpu_mpi_comm_test_recv(MPI_Status *status, struct _starpu_mpi_envelope *
 		if (i == _starpu_mpi_comm_tested)
 		{
 			// We have tested all the requests, none has completed
-			STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_comms_mutex);
+			STARPU_PTHREAD_RWLOCK_UNLOCK(&_starpu_mpi_comms_mutex);
 			return 0;
 		}
 	}
-	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_comms_mutex);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&_starpu_mpi_comms_mutex);
 	return 0;
 }
 
@@ -202,7 +213,7 @@ void _starpu_mpi_comm_cancel_recv()
 {
 	int i;
 
-	STARPU_PTHREAD_MUTEX_LOCK(&_starpu_mpi_comms_mutex);
+	STARPU_PTHREAD_RWLOCK_RDLOCK(&_starpu_mpi_comms_mutex);
 	for(i=0 ; i<_starpu_mpi_comm_nb ; i++)
 	{
 		struct _starpu_mpi_comm *_comm = _starpu_mpi_comms[i]; // get the ith _comm;
@@ -218,7 +229,7 @@ void _starpu_mpi_comm_cancel_recv()
 			_comm->posted = 0;
 		}
 	}
-	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_comms_mutex);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&_starpu_mpi_comms_mutex);
 }
 
 #endif /* STARPU_USE_MPI_MPI */