Browse Source

- cleanup termination code
- we now maintain a list of all queues
- the per-node lists does not contain the same queues multiple times anymore
- the previous lists of queues are protected with a mutex
- instead of setting config->running to 0 and then waking up everyone (with
some faith), we lock all queues and the scheduler, so that we are sure the
termination will be detected.

Cédric Augonnet 16 years ago
parent
commit
114b4dd446

+ 2 - 2
src/core/mechanisms/deque_queues.c

@@ -124,7 +124,7 @@ job_t deque_pop_task(struct jobq_s *q)
 	/* block until some task is available in that queue */
 	pthread_mutex_lock(&q->activity_mutex);
 
-	if (deque_queue->njobs == 0)
+	if ((deque_queue->njobs == 0) && machine_is_running())
 		pthread_cond_wait(&q->activity_cond, &q->activity_mutex);
 
 	if (deque_queue->njobs > 0) 
@@ -198,7 +198,7 @@ job_t deque_non_blocking_pop_task_if_job_exists(struct jobq_s *q)
 		 * times and just get back to sleep */
 		pthread_mutex_lock(sched_mutex);
 
-		if (total_number_of_jobs == 0)
+		if ((total_number_of_jobs == 0) && machine_is_running())
 			pthread_cond_wait(sched_cond, sched_mutex);
 
 		pthread_mutex_unlock(sched_mutex);

+ 2 - 2
src/core/mechanisms/fifo_queues.c

@@ -125,7 +125,7 @@ job_t fifo_pop_task(struct jobq_s *q)
 	/* block until some event happens */
 	pthread_mutex_lock(&q->activity_mutex);
 
-	if (fifo_queue->njobs == 0)
+	if ((fifo_queue->njobs == 0) && machine_is_running())
 		pthread_cond_wait(&q->activity_cond, &q->activity_mutex);
 
 	if (fifo_queue->njobs > 0) 
@@ -235,7 +235,7 @@ job_t fifo_non_blocking_pop_task_if_job_exists(struct jobq_s *q)
 		 * times and just get back to sleep */
 		pthread_mutex_lock(sched_mutex);
 
-		if (total_number_of_jobs == 0)
+		if ((total_number_of_jobs == 0) && machine_is_running())
 			pthread_cond_wait(sched_cond, sched_mutex);
 
 		pthread_mutex_unlock(sched_mutex);

+ 1 - 1
src/core/mechanisms/priority_queues.c

@@ -98,7 +98,7 @@ job_t priority_pop_task(struct jobq_s *q)
 	/* block until some event happens */
 	pthread_mutex_lock(&q->activity_mutex);
 
-	if (queue->total_njobs == 0)
+	if ((queue->total_njobs == 0) && machine_is_running())
 		 pthread_cond_wait(&q->activity_cond, &q->activity_mutex);
 
 	if (queue->total_njobs > 0)

+ 2 - 2
src/core/mechanisms/stack_queues.c

@@ -144,7 +144,7 @@ job_t stack_pop_task(struct jobq_s *q)
 	/* block until some task is available in that queue */
 	pthread_mutex_lock(&q->activity_mutex);
 
-	if (stack_queue->njobs == 0)
+	if ((stack_queue->njobs == 0) && machine_is_running())
 		pthread_cond_wait(&q->activity_cond, &q->activity_mutex);
 
 	if (stack_queue->njobs > 0) 
@@ -218,7 +218,7 @@ job_t stack_non_blocking_pop_task_if_job_exists(struct jobq_s *q)
 		 * times and just get back to sleep */
 		pthread_mutex_lock(sched_mutex);
 
-		if (total_number_of_jobs == 0)
+		if ((total_number_of_jobs == 0) && machine_is_running())
 			pthread_cond_wait(sched_cond, sched_mutex);
 
 		pthread_mutex_unlock(sched_mutex);

+ 7 - 1
src/core/policies/sched_policy.c

@@ -28,6 +28,7 @@
 
 
 static struct sched_policy_s policy;
+extern mem_node_descr descr;
 
 struct sched_policy_s *get_sched_policy(void)
 {
@@ -103,6 +104,8 @@ void init_sched_policy(struct machine_config_s *config)
 	pthread_cond_init(&policy.sched_activity_cond, NULL);
 	pthread_mutex_init(&policy.sched_activity_mutex, NULL);
 	pthread_key_create(&policy.local_queue_key, NULL);
+	init_mutex(&descr.attached_queues_mutex);
+	descr.total_queues_count = 0;
 
 	policy.init_sched(config, &policy);
 }
@@ -154,6 +157,9 @@ void wait_on_sched_event(void)
 	struct jobq_s *q = policy.get_local_queue(&policy);
 
 	pthread_mutex_lock(&q->activity_mutex);
-	pthread_cond_wait(&q->activity_cond, &q->activity_mutex);
+
+	if (machine_is_running())
+		pthread_cond_wait(&q->activity_cond, &q->activity_mutex);
+
 	pthread_mutex_unlock(&q->activity_mutex);
 }

+ 56 - 3
src/core/workers.c

@@ -382,14 +382,67 @@ unsigned machine_is_running(void)
 	return config.running;
 }
 
+typedef enum {
+	BROADCAST,
+	LOCK,
+	UNLOCK
+} queue_op;
+
+/* XXX we should use an accessor */
+extern mem_node_descr descr;
+
+static void operate_on_all_queues(queue_op op)
+{
+	unsigned q_id;
+	struct jobq_s *q;
+
+	take_mutex(&descr.attached_queues_mutex);
+
+	unsigned nqueues = descr.total_queues_count;
+
+	for (q_id = 0; q_id < nqueues; q_id++)
+	{
+		q  = descr.attached_queues_all[q_id];
+		switch (op) {
+			case BROADCAST:
+				pthread_cond_broadcast(&q->activity_cond);
+				break;
+			case LOCK:
+				pthread_mutex_lock(&q->activity_mutex);
+				break;
+			case UNLOCK:
+				pthread_mutex_unlock(&q->activity_mutex);
+				break;
+		}
+	}
+
+	release_mutex(&descr.attached_queues_mutex);
+}
+
 void kill_all_workers(struct machine_config_s *config)
 {
+	/* lock all workers and the scheduler (in the proper order) to make
+	   sure everyone will notice the termination */
+	/* WARNING: here we make the asumption that a queue is not attached to
+ 	 * different memory nodes ! */
+
+	struct sched_policy_s *sched = get_sched_policy();
+
+	fprintf(stderr, "LOCK\n");
+
+	operate_on_all_queues(LOCK);
+	pthread_mutex_lock(&sched->sched_activity_mutex);
+	
+	fprintf(stderr, "BROADCAST \n");
 	/* set the flag which will tell workers to stop */
 	config->running = 0;
 
-	/* in case some workers are waiting on some event 
-	   wake them up ... */
-	wake_all_blocked_workers();
+	operate_on_all_queues(BROADCAST);
+	pthread_cond_broadcast(&sched->sched_activity_cond);
+
+	fprintf(stderr, "UNLOCK\n");
+	pthread_mutex_unlock(&sched->sched_activity_mutex);
+	operate_on_all_queues(UNLOCK);
 }
 
 void starpu_shutdown(void)

+ 45 - 6
src/datawizard/copy-driver.c

@@ -40,31 +40,70 @@ unsigned register_memory_node(node_kind kind)
 }
 
 
-/* TODO move in a more appropriate file */
-/* attach a queue to a memory node */
+/* TODO move in a more appropriate file  !! */
+/* attach a queue to a memory node (if it's not already attached) */
 void memory_node_attach_queue(struct jobq_s *q, unsigned nodeid)
 {
-	unsigned nqueues;
-	nqueues = STARPU_ATOMIC_ADD(&descr.queues_count[nodeid], 1);
+	unsigned queue;
+	unsigned nqueues_total, nqueues;
+	
+	take_mutex(&descr.attached_queues_mutex);
+
+	/* we only insert the queue if it's not already in the list */
+	nqueues = descr.queues_count[nodeid];
+	for (queue = 0; queue < nqueues; queue++)
+	{
+		if (descr.attached_queues_per_node[nodeid][queue] == q)
+		{
+			/* the queue is already in the list */
+			release_mutex(&descr.attached_queues_mutex);
+			return;
+		}
+	}
 
-	descr.attached_queues[nodeid][nqueues-1] = q;
+	/* it was not found locally */
+	descr.attached_queues_per_node[nodeid][nqueues] = q;
+	descr.queues_count[nodeid]++;
+
+	/* do we have to add it in the global list as well ? */
+	nqueues_total = descr.total_queues_count; 
+	for (queue = 0; queue < nqueues_total; queue++)
+	{
+		if (descr.attached_queues_all[queue] == q)
+		{
+			/* the queue is already in the global list */
+			release_mutex(&descr.attached_queues_mutex);
+			return;
+		}
+	} 
+
+	/* it was not in the global queue either */
+	descr.attached_queues_all[nqueues_total] = q;
+	descr.total_queues_count++;
+
+	release_mutex(&descr.attached_queues_mutex);
 }
 
 void wake_all_blocked_workers_on_node(unsigned nodeid)
 {
 	/* wake up all queues on that node */
 	unsigned q_id;
+
+	take_mutex(&descr.attached_queues_mutex);
+
 	unsigned nqueues = descr.queues_count[nodeid];
 	for (q_id = 0; q_id < nqueues; q_id++)
 	{
 		struct jobq_s *q;
-		q  = descr.attached_queues[nodeid][q_id];
+		q  = descr.attached_queues_per_node[nodeid][q_id];
 
 		/* wake anybody waiting on that queue */
 		pthread_mutex_lock(&q->activity_mutex);
 		pthread_cond_broadcast(&q->activity_cond);
 		pthread_mutex_unlock(&q->activity_mutex);
 	}
+
+	release_mutex(&descr.attached_queues_mutex);
 }
 
 void wake_all_blocked_workers(void)

+ 5 - 1
src/datawizard/copy-driver.h

@@ -38,8 +38,12 @@ typedef struct {
 
 	/* the list of queues that are attached to a given node */
 	// XXX 32 is set randomly !
-	struct jobq_s *attached_queues[MAXNODES][32];
+	// TODO move this 2 lists outside mem_node_descr
+	struct starpu_mutex_t attached_queues_mutex;
+	struct jobq_s *attached_queues_per_node[MAXNODES][32];
+	struct jobq_s *attached_queues_all[MAXNODES*32];
 	/* the number of queues attached to each node */
+	unsigned total_queues_count;
 	unsigned queues_count[MAXNODES];
 } mem_node_descr;