Browse Source

* patch Terry: fix book workers (book workers that have already been booked and wake up eventually workers that we don't want anymore in the group)
* fix deadlock issues: worker goes to sleep in parallel_sect with its sched_mutex locked + barrier to go to sleep belongs to the master and not the context

Andra Hugo 11 years ago
parent
commit
5a219c8a8f

+ 1 - 1
sc_hypervisor/src/policies_utils/lp_programs.c

@@ -252,7 +252,7 @@ double sc_hypervisor_lp_simulate_distrib_tasks(int ns, int nw, int nt, double w_
 double sc_hypervisor_lp_simulate_distrib_flops(int ns, int nw, double v[ns][nw], double flops[ns], double res[ns][nw], 
 					       int  total_nw[nw], unsigned sched_ctxs[ns], double last_vmax)
 {
-	int integer = 0;
+	int integer = 1;
 	int s, w;
 	glp_prob *lp;
 

+ 92 - 35
src/core/sched_ctx.c

@@ -278,7 +278,7 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 	if (sched_ctx->min_priority_is_set) sched_ctx->min_priority = min_prio;
 	sched_ctx->max_priority_is_set = max_prio_set;
 	if (sched_ctx->max_priority_is_set) sched_ctx->max_priority = max_prio;
-	sem_init(&sched_ctx->parallel_code_sem, 0, 0);
+
 
 	_starpu_barrier_counter_init(&sched_ctx->tasks_barrier, 0);
 	_starpu_barrier_counter_init(&sched_ctx->ready_tasks_barrier, 0);
@@ -560,7 +560,6 @@ static void _starpu_delete_sched_ctx(struct _starpu_sched_ctx *sched_ctx)
 	sched_ctx->sched_policy = NULL;
 
 	STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->empty_ctx_mutex);
-	sem_destroy(&sched_ctx->parallel_code_sem);
 	sched_ctx->id = STARPU_NMAX_SCHED_CTXS;
 #ifdef STARPU_HAVE_HWLOC
 	hwloc_bitmap_free(sched_ctx->hwloc_workers_set);
@@ -1587,12 +1586,13 @@ static void _starpu_sched_ctx_get_workers_to_sleep(unsigned sched_ctx_id, int *w
 			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 	}
 
+	struct _starpu_worker *master_worker = _starpu_get_worker_struct(master);
 	int workerid;
 	for(w = 0; w < nworkers; w++)
 	{
 		workerid = workerids[w];
 		if(current_worker_id == -1 || workerid != current_worker_id)
-			sem_wait(&sched_ctx->parallel_code_sem);
+			sem_wait(&master_worker->parallel_code_sem);
 	}
 	return;
 }
@@ -1600,13 +1600,14 @@ static void _starpu_sched_ctx_get_workers_to_sleep(unsigned sched_ctx_id, int *w
 void _starpu_sched_ctx_signal_worker_blocked(int workerid)
 {
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	struct _starpu_worker *master_worker = _starpu_get_worker_struct(worker->master);
 	struct _starpu_sched_ctx *sched_ctx = NULL;
 	struct _starpu_sched_ctx_list *l = NULL;
 	for (l = worker->sched_ctx_list; l; l = l->next)
 	{
 		sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
 		if(sched_ctx->id != 0)
-			sem_post(&sched_ctx->parallel_code_sem);
+			sem_post(&master_worker->parallel_code_sem);
 	}	
 	return;
 }
@@ -1685,47 +1686,103 @@ void starpu_sched_ctx_get_available_cpuids(unsigned sched_ctx_id, int **cpuids,
 	return;
 }
 
-int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids, int nworkers)
+/* int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids, int nworkers) */
+/* { */
+/* 	int current_worker_id = starpu_worker_get_id(); */
+
+/* 	int final_workerids[nworkers]; */
+/* 	int nfinal_workerids = 0; */
+/* 	int w; */
+/* 	int master = -1; */
+/* 	for(w = 0; w < nworkers; w++) */
+/* 	{ */
+/* 		if(current_worker_id == -1) */
+/* 		{ */
+/* 			final_workerids[nfinal_workerids++] = workerids[w];                           */
+/* 			if(nfinal_workerids == nworkers - 1)                          */
+/* 			{ */
+/* 				master = workerids[nfinal_workerids];   */
+/* 				break;   */
+/* 			} */
+/* 		} */
+/* 		else */
+/* 		{ */
+/* 			if(workerids[w] != current_worker_id) */
+/* 				final_workerids[nfinal_workerids++] = workerids[w]; */
+/* 			else */
+/* 			{ */
+/* 				if(nfinal_workerids == nworkers - 1) */
+/* 				{ */
+/* 					master = workerids[nfinal_workerids]; */
+/* 					break; */
+/* 				} */
+/* 				else */
+/* 					master = current_worker_id; */
+/* 			}	 */
+/* 		} */
+/* 	} */
+/* 	if(master == -1 && nfinal_workerids > 0) */
+/* 	{ */
+/* 		nfinal_workerids--; */
+/* 		master = final_workerids[nfinal_workerids]; */
+/* 	} */
+/* 	/\* get starpu workers to sleep *\/ */
+/* 	_starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, final_workerids, nfinal_workerids, master); */
+
+/* 	/\* bind current thread on all workers of the context *\/ */
+/* //	_starpu_sched_ctx_bind_thread_to_ctx_cpus(sched_ctx_id); */
+/* 	return master; */
+/* } */
+
+static void _starpu_sched_ctx_wake_these_workers_up(unsigned sched_ctx_id, int *workerids, int nworkers)
 {
 	int current_worker_id = starpu_worker_get_id();
-
-	int final_workerids[nworkers];
-	int nfinal_workerids = 0;
+	
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	int w;
-	int master = -1;
+	struct _starpu_worker *worker = NULL;
 	for(w = 0; w < nworkers; w++)
 	{
-		if(current_worker_id == -1)
+		worker = _starpu_get_worker_struct(workerids[w]);
+		if(current_worker_id == -1 || worker->workerid != current_worker_id)
 		{
-			final_workerids[nfinal_workerids++] = workerids[w];                          
-			if(nfinal_workerids == nworkers - 1)                         
-			{
-				master = workerids[nfinal_workerids];  
-				break;  
-			}
+			STARPU_PTHREAD_MUTEX_LOCK(&worker->parallel_sect_mutex);
+			STARPU_PTHREAD_COND_SIGNAL(&worker->parallel_sect_cond);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->parallel_sect_mutex);
 		}
 		else
-		{
-			if(workerids[w] != current_worker_id)
-				final_workerids[nfinal_workerids++] = workerids[w];
-			else
-			{
-				if(nfinal_workerids == nworkers - 1)
-				{
-					master = workerids[nfinal_workerids];
-					break;
-				}
-				else
-					master = current_worker_id;
-			}	
-		}
+			worker->parallel_sect = 0;
 	}
-	/* get starpu workers to sleep */
-	_starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, final_workerids, nfinal_workerids, master);
+	return;
+}
 
-	/* bind current thread on all workers of the context */
-//	_starpu_sched_ctx_bind_thread_to_ctx_cpus(sched_ctx_id);
-	return master;
+
+int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids, int nworkers)
+{ 
+	int new_master = workerids[nworkers-1];
+	int w;
+	int nput_to_sleep = 0;
+	int nwake_up = 0;
+	int put_to_sleep[nworkers];
+	int wake_up[nworkers];
+	
+	for(w = 0 ; w < nworkers ; w++)
+	{
+		struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[w]);
+		if (worker->master == -1 && workerids[w] != new_master)
+			put_to_sleep[nput_to_sleep++] = workerids[w];
+		else if(worker->master != -1 && workerids[w] == new_master)
+			wake_up[nwake_up++] = workerids[w];
+		
+		if (workerids[w] != new_master)
+			worker->master = new_master;
+		else
+			worker->master = -1;
+	}
+	_starpu_sched_ctx_wake_these_workers_up(sched_ctx_id, wake_up, nwake_up);
+	_starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, put_to_sleep, nput_to_sleep, new_master);
+	
+	return new_master;
 }
 
 void starpu_sched_ctx_unbook_workers_for_task(unsigned sched_ctx_id, int master)

+ 0 - 4
src/core/sched_ctx.h

@@ -103,10 +103,6 @@ struct _starpu_sched_ctx
      	int min_priority_is_set;
 	int max_priority_is_set;
 
-	/* semaphore that block appl thread until threads are ready 
-	   to exec the parallel code */
-	sem_t parallel_code_sem;
-
 	/* hwloc tree structure of workers */
 #ifdef STARPU_HAVE_HWLOC
 	hwloc_bitmap_t hwloc_workers_set;

+ 1 - 0
src/core/workers.c

@@ -411,6 +411,7 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	starpu_task_list_init(&workerarg->local_tasks);
 	workerarg->current_task = NULL;
 	workerarg->set = NULL;
+	sem_init(&workerarg->parallel_code_sem, 0, 0);
 
 	/* if some codelet's termination cannot be handled directly :
 	 * for instance in the Gordon driver, Gordon tasks' callbacks

+ 4 - 0
src/core/workers.h

@@ -121,6 +121,10 @@ LIST_TYPE(_starpu_worker,
 	/* id of the master worker */
 	int master;
 
+	/* semaphore that block appl thread until threads are ready 
+	   to exec the parallel code */
+	sem_t parallel_code_sem;
+
 #ifdef __GLIBC__
 	cpu_set_t cpu_set;
 #endif /* __GLIBC__ */

+ 1 - 0
src/drivers/driver_common/driver_common.c

@@ -201,6 +201,7 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int wor
 	if(args->parallel_sect)
 	{
 		STARPU_PTHREAD_MUTEX_LOCK(&args->parallel_sect_mutex);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&args->sched_mutex);
 		_starpu_sched_ctx_signal_worker_blocked(args->workerid);
 		STARPU_PTHREAD_COND_WAIT(&args->parallel_sect_cond, &args->parallel_sect_mutex);
 		starpu_sched_ctx_bind_current_thread_to_cpuid(args->bindid);