Browse Source

Fix barrier implementation.

- barrier_wait counts the number of threads that exit the barrier
- barrier_destroy waits until all threads have exited the barrier before destroying mutex and cond variables
Nathalie Furmento 14 years ago
parent
commit
deef808ea4

+ 46 - 13
src/common/barrier.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010,2011  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -15,20 +15,45 @@
  */
 
 #include <common/barrier.h>
+#include <common/utils.h>
 
 int _starpu_barrier_init(_starpu_barrier_t *barrier, int count)
 {
 	barrier->count = count;
-	barrier->reached = 0;
-	pthread_mutex_init(&barrier->mutex,NULL);
-	pthread_cond_init(&barrier->cond,NULL);
+	barrier->reached_start = 0;
+	barrier->reached_exit = 0;
+	PTHREAD_MUTEX_INIT(&barrier->mutex, NULL);
+	PTHREAD_MUTEX_INIT(&barrier->mutex_exit, NULL);
+	PTHREAD_COND_INIT(&barrier->cond, NULL);
 	return 0;
 }
 
+int _starpu_barrier_test(_starpu_barrier_t *barrier)
+{
+    /*
+     * Check whether any threads are known to be waiting; report
+     * "BUSY" if so.
+     */
+        PTHREAD_MUTEX_LOCK(&barrier->mutex_exit);
+        if (barrier->reached_exit != barrier->count) {
+                PTHREAD_MUTEX_UNLOCK(&barrier->mutex_exit);
+                return EBUSY;
+        }
+        PTHREAD_MUTEX_UNLOCK(&barrier->mutex_exit);
+        return 0;
+}
+
 int _starpu_barrier_destroy(_starpu_barrier_t *barrier)
 {
-	pthread_mutex_destroy(&barrier->mutex);
-	pthread_cond_destroy(&barrier->cond);
+	int ret = _starpu_barrier_test(barrier);
+	while (ret == EBUSY) {
+		ret = _starpu_barrier_test(barrier);
+	}
+	_STARPU_DEBUG("reached_exit %d\n", barrier->reached_exit);
+
+	PTHREAD_MUTEX_DESTROY(&barrier->mutex);
+	PTHREAD_MUTEX_DESTROY(&barrier->mutex_exit);
+	PTHREAD_COND_DESTROY(&barrier->cond);
 	return 0;
 }
 
@@ -36,18 +61,26 @@ int _starpu_barrier_wait(_starpu_barrier_t *barrier)
 {
 	int ret=0;
 
-	pthread_mutex_lock(&barrier->mutex);
-	barrier->reached++;
-	if (barrier->reached == barrier->count)
+        // Wait until all threads enter the barrier
+	PTHREAD_MUTEX_LOCK(&barrier->mutex);
+	barrier->reached_exit=0;
+	barrier->reached_start++;
+	if (barrier->reached_start == barrier->count)
 	{
-		barrier->reached = 0;
-		pthread_cond_broadcast(&barrier->cond);
+		barrier->reached_start = 0;
+		PTHREAD_COND_BROADCAST(&barrier->cond);
 		ret = PTHREAD_BARRIER_SERIAL_THREAD;
 	}
 	else
 	{
-		pthread_cond_wait(&barrier->cond,&barrier->mutex);
+                PTHREAD_COND_WAIT(&barrier->cond,&barrier->mutex);
 	}
-	pthread_mutex_unlock(&barrier->mutex);
+	PTHREAD_MUTEX_UNLOCK(&barrier->mutex);
+
+        // Count number of threads that exit the barrier
+	PTHREAD_MUTEX_LOCK(&barrier->mutex_exit);
+	barrier->reached_exit ++;
+	PTHREAD_MUTEX_UNLOCK(&barrier->mutex_exit);
+
 	return ret;
 }

+ 4 - 2
src/common/barrier.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -21,8 +21,10 @@
 
 typedef struct {
 	int count;
-	int reached;
+	int reached_start;
+	int reached_exit;
 	pthread_mutex_t mutex;
+	pthread_mutex_t mutex_exit;
 	pthread_cond_t cond;
 } _starpu_barrier_t;
 

+ 2 - 1
src/core/jobs.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2009, 2010  Université de Bordeaux 1
- * Copyright (C) 2010  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -35,6 +35,7 @@
 #include <datawizard/datawizard.h>
 #include <core/perfmodel/perfmodel.h>
 #include <core/errorcheck.h>
+#include <common/barrier.h>
 
 #ifdef STARPU_USE_CUDA
 #include <cuda.h>

+ 2 - 1
src/core/sched_policy.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2010  Université de Bordeaux 1
- * Copyright (C) 2010  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -22,6 +22,7 @@
 #include <common/utils.h>
 #include <core/sched_policy.h>
 #include <profiling/profiling.h>
+#include <common/barrier.h>
 
 static struct starpu_sched_policy_s policy;
 

+ 1 - 0
src/sched_policies/parallel_greedy.c

@@ -16,6 +16,7 @@
 
 #include <core/workers.h>
 #include <sched_policies/fifo_queues.h>
+#include <common/barrier.h>
 
 /* the former is the actual queue, the latter some container */
 static struct starpu_fifo_taskq_s *fifo;

+ 2 - 1
src/sched_policies/parallel_heft.c

@@ -22,6 +22,7 @@
 #include <sched_policies/fifo_queues.h>
 #include <core/perfmodel/perfmodel.h>
 #include <starpu_parameters.h>
+#include <common/barrier.h>
 
 static pthread_mutex_t big_lock;
 
@@ -199,7 +200,7 @@ static double compute_ntasks_end(int workerid)
 		int *combined_workerid;
 		starpu_combined_worker_get_description(workerid, &worker_size, &combined_workerid);
 
-		int ntasks_end;
+		int ntasks_end=0;
 
 		int i;
 		for (i = 0; i < worker_size; i++)