Browse Source

make the parallel code wait for the workers to be ready to execute it

Andra Hugo 12 years ago
parent
commit
f45a350498

+ 4 - 3
examples/sched_ctx/parallel_code.c

@@ -50,6 +50,7 @@ int parallel_code(int nprocs)
 #pragma omp parallel for num_threads(nprocs)
 	for (i = 0; i < NTASKS; i++) 
 		tasks++;
+
 	return tasks;
 }
 
@@ -77,7 +78,7 @@ int main(int argc, char **argv)
 	nprocs1 = ncpus/2;
 	nprocs2 =  nprocs1;
 	int j, k = 0;
-	for(j = nprocs1; j < nprocs2; j++)
+	for(j = nprocs1; j < nprocs1+nprocs2; j++)
 		procs2[k++] = j;
 #endif
 
@@ -110,7 +111,6 @@ int main(int argc, char **argv)
 
 	/* execute an openmp code */
 	int ret_ntasks = (int)starpu_sched_ctx_exec_parallel_code((void*)parallel_code, (void*)nprocs2, sched_ctx2);
-	printf("ctx%d: tasks %d out of %d\n", sched_ctx2, ret_ntasks, NTASKS);
 	starpu_sched_ctx_finished_submit(sched_ctx2);
 
 	/* wait for all tasks at the end*/
@@ -118,7 +118,8 @@ int main(int argc, char **argv)
 
 	starpu_sched_ctx_delete(sched_ctx1);
 	starpu_sched_ctx_delete(sched_ctx2);
-	printf("ctx%d: tasks executed %d out of %d\n", sched_ctx1, tasks_executed, ntasks);
+	printf("ctx%d: tasks starpu executed %d out of %d\n", sched_ctx1, tasks_executed, ntasks);
+	printf("ctx%d: tasks openmp executed %d out of %d\n", sched_ctx2, ret_ntasks, NTASKS);
 	starpu_shutdown();
 
 	return 0;

+ 100 - 15
src/core/sched_ctx.c

@@ -38,7 +38,7 @@ static unsigned _get_workers_list(struct _starpu_sched_ctx *sched_ctx, int **wor
 static void _starpu_worker_gets_into_ctx(unsigned sched_ctx_id, struct _starpu_worker *worker)
 {
 	unsigned worker_sched_ctx_id = _starpu_worker_get_sched_ctx_id(worker, sched_ctx_id);
-	/* the worker was planning to go away in another ctx but finally he changed his mind &
+	/* the worker was planning to go away in another ctx but finally he changed his mind & 
 	   he's staying */
 	if (worker_sched_ctx_id  == STARPU_NMAX_SCHED_CTXS)
 	{
@@ -69,7 +69,7 @@ static void _starpu_update_workers_with_ctx(int *workerids, int nworkers, int sc
 	int i;
 	struct _starpu_worker *worker = NULL;
  	struct _starpu_worker *curr_worker = _starpu_get_local_worker_key();
-
+	
 	for(i = 0; i < nworkers; i++)
 	{
 		worker = _starpu_get_worker_struct(workerids[i]);
@@ -94,7 +94,7 @@ static void _starpu_update_workers_without_ctx(int *workerids, int nworkers, int
 	int i;
 	struct _starpu_worker *worker = NULL;
  	struct _starpu_worker *curr_worker = _starpu_get_local_worker_key();
-
+	
 	for(i = 0; i < nworkers; i++)
 	{
 		worker = _starpu_get_worker_struct(workerids[i]);
@@ -131,7 +131,7 @@ void starpu_sched_ctx_stop_task_submission()
 }
 
 static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx, int *workerids, int nworkers,
-					     int *added_workers, int *n_added_workers)
+				       int *added_workers, int *n_added_workers)
 {
 	struct starpu_worker_collection *workers = sched_ctx->workers;
 	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
@@ -163,7 +163,7 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 			workers->add(workers, worker);
 			workers_to_add[i] = worker;
 		}
-	}
+}
 
 	if(sched_ctx->sched_policy->add_workers)
 	{
@@ -274,6 +274,7 @@ struct _starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 	sched_ctx->finished_submit = 0;
 	sched_ctx->min_priority = 0;
 	sched_ctx->max_priority = 1;
+	sem_init(&sched_ctx->parallel_code_sem, 0, 0);
 
 	_starpu_barrier_counter_init(&sched_ctx->tasks_barrier, 0);
 
@@ -492,6 +493,7 @@ static void _starpu_delete_sched_ctx(struct _starpu_sched_ctx *sched_ctx)
 	sched_ctx->sched_policy = NULL;
 
 	STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->empty_ctx_mutex);
+	sem_destroy(&sched_ctx->parallel_code_sem);
 	sched_ctx->id = STARPU_NMAX_SCHED_CTXS;
 
 	struct _starpu_machine_config *config = _starpu_get_machine_config();
@@ -517,13 +519,13 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 
 	int *workerids;
 	unsigned nworkers_ctx = _get_workers_list(sched_ctx, &workerids);
-
+	
 	/*if both of them have all the ressources is pointless*/
 	/*trying to transfer ressources from one ctx to the other*/
 	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
 	unsigned nworkers = config->topology.nworkers;
 
-	if(nworkers_ctx > 0 && inheritor_sched_ctx && inheritor_sched_ctx->id != STARPU_NMAX_SCHED_CTXS &&
+	if(nworkers_ctx > 0 && inheritor_sched_ctx && inheritor_sched_ctx->id != STARPU_NMAX_SCHED_CTXS && 
 	   !(nworkers_ctx == nworkers && nworkers_ctx == inheritor_sched_ctx->workers->nworkers))
 	{
 		starpu_sched_ctx_add_workers(workerids, nworkers_ctx, inheritor_sched_ctx_id);
@@ -594,7 +596,7 @@ void _starpu_fetch_tasks_from_empty_ctx_list(struct _starpu_sched_ctx *sched_ctx
                 /* you're not suppose to get here if you deleted the context
 		   so no point in having the mutex locked */
 		STARPU_PTHREAD_MUTEX_UNLOCK(&changing_ctx_mutex[sched_ctx->id]);
-
+	
 	while(!starpu_task_list_empty(&sched_ctx->empty_ctx_tasks))
 	{
 		if(unlocked)
@@ -634,7 +636,7 @@ void starpu_sched_ctx_add_workers(int *workers_to_add, int nworkers_to_add, unsi
 	if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 	{
 		_starpu_add_workers_to_sched_ctx(sched_ctx, workers_to_add, nworkers_to_add, added_workers, &n_added_workers);
-
+		
 		if(n_added_workers > 0)
 		{
 			_starpu_update_workers_with_ctx(added_workers, n_added_workers, sched_ctx->id);
@@ -776,7 +778,7 @@ void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
 			{
 				int *workerids = NULL;
 				unsigned nworkers = _get_workers_list(sched_ctx, &workerids);
-
+				
 				if(nworkers > 0)
 				{
 					starpu_sched_ctx_add_workers(workerids, nworkers, sched_ctx->inheritor);
@@ -1006,7 +1008,7 @@ unsigned _starpu_worker_belongs_to_a_sched_ctx(int workerid, unsigned sched_ctx_
 	}
 	return 0;
 }
-
+		 
 unsigned starpu_sched_ctx_overlapping_ctxs_on_worker(int workerid)
 {
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
@@ -1136,16 +1138,16 @@ int starpu_sched_ctx_set_max_priority(unsigned sched_ctx_id, int max_prio)
 
 static void _starpu_sched_ctx_bind_thread_to_ctx_cpus(unsigned sched_ctx_id)
 {
-#ifdef STARPU_HAVE_HWLOC
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	struct _starpu_machine_config *config = _starpu_get_machine_config();
 
+#ifdef STARPU_HAVE_HWLOC	
 	const struct hwloc_topology_support *support = hwloc_topology_get_support(config->topology.hwtopology);
         if (support->cpubind->set_thisthread_cpubind)
         {
 		hwloc_bitmap_t set = sched_ctx->hwloc_workers_set;
                 int ret;
-
+		
                 ret = hwloc_set_cpubind (config->topology.hwtopology, set,
                                          HWLOC_CPUBIND_THREAD);
 		if (ret)
@@ -1160,6 +1162,64 @@ static void _starpu_sched_ctx_bind_thread_to_ctx_cpus(unsigned sched_ctx_id)
 #endif
 	return;
 }
+
+void _starpu_sched_ctx_rebind_thread_to_its_cpu(unsigned cpuid)
+{
+	struct _starpu_machine_config *config = _starpu_get_machine_config();
+
+#ifdef STARPU_SIMGRID
+	return;
+#endif
+	if (starpu_get_env_number("STARPU_WORKERS_NOBIND") > 0)
+		return;
+
+#ifdef STARPU_HAVE_HWLOC
+	const struct hwloc_topology_support *support = hwloc_topology_get_support (config->topology.hwtopology);
+	if (support->cpubind->set_thisthread_cpubind)
+	{
+		hwloc_obj_t obj = hwloc_get_obj_by_depth (config->topology.hwtopology,
+							  config->cpu_depth, cpuid);
+		hwloc_bitmap_t set = obj->cpuset;
+		int ret;
+		
+		hwloc_bitmap_singlify(set);
+		ret = hwloc_set_cpubind (config->topology.hwtopology, set,
+					 HWLOC_CPUBIND_THREAD);
+		if (ret)
+		{
+			perror("hwloc_set_cpubind");
+			STARPU_ABORT();
+		}
+	}
+
+#elif defined(HAVE_PTHREAD_SETAFFINITY_NP) && defined(__linux__)
+	int ret;
+	/* fix the thread on the correct cpu */
+	cpu_set_t aff_mask;
+	CPU_ZERO(&aff_mask);
+	CPU_SET(cpuid, &aff_mask);
+
+	starpu_pthread_t self = pthread_self();
+
+	ret = pthread_setaffinity_np(self, sizeof(aff_mask), &aff_mask);
+	if (ret)
+	{
+		perror("binding thread");
+		STARPU_ABORT();
+	}
+
+#elif defined(__MINGW32__) || defined(__CYGWIN__)
+	DWORD mask = 1 << cpuid;
+	if (!SetThreadAffinityMask(GetCurrentThread(), mask))
+	{
+		_STARPU_ERROR("SetThreadMaskAffinity(%lx) failed\n", mask);
+	}
+#else
+#warning no CPU binding support
+#endif
+
+}
+
 static void _starpu_sched_ctx_get_workers_to_sleep(unsigned sched_ctx_id)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
@@ -1177,6 +1237,29 @@ static void _starpu_sched_ctx_get_workers_to_sleep(unsigned sched_ctx_id)
 		worker->parallel_sect = 1;
 		STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 	}
+
+	while(workers->has_next(workers, &it))
+	{
+		int w = workers->get_next(workers, &it);
+		sem_wait(&sched_ctx->parallel_code_sem);
+	}
+	return;
+}
+
+void _starpu_sched_ctx_signal_worker_blocked(int workerid)
+{
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	struct _starpu_sched_ctx *sched_ctx = NULL;
+	unsigned i;
+	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
+	{
+		if(worker->sched_ctx[i] != NULL && worker->sched_ctx[i]->id != STARPU_NMAX_SCHED_CTXS
+			&& worker->sched_ctx[i]->id != 0)
+		{
+			sched_ctx = worker->sched_ctx[i];
+			sem_post(&sched_ctx->parallel_code_sem);
+		}
+	}	
 	return;
 }
 
@@ -1194,7 +1277,7 @@ static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id)
 	{
 		worker = _starpu_get_worker_struct(workers->get_next(workers, &it));
 		STARPU_PTHREAD_MUTEX_LOCK(&worker->parallel_sect_mutex);
-		STARPU_PTHREAD_COND_BROADCAST(&worker->parallel_sect_cond);
+		STARPU_PTHREAD_COND_SIGNAL(&worker->parallel_sect_cond);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&worker->parallel_sect_mutex);
 	}
 	return;
@@ -1207,7 +1290,7 @@ void* starpu_sched_ctx_exec_parallel_code(void* (*func)(void* param), void* para
 
 	/* bind current thread on all workers of the context */
 	_starpu_sched_ctx_bind_thread_to_ctx_cpus(sched_ctx_id);
-
+	
 	/* execute parallel code */
 	void* ret = func(param);
 
@@ -1216,3 +1299,5 @@ void* starpu_sched_ctx_exec_parallel_code(void* (*func)(void* param), void* para
 
 	return ret;
 }
+
+

+ 12 - 1
src/core/sched_ctx.h

@@ -23,6 +23,7 @@
 #include <common/config.h>
 #include <common/barrier_counter.h>
 #include <profiling/profiling.h>
+#include <semaphore.h>
 
 #ifdef STARPU_HAVE_HWLOC
 #include <hwloc.h>
@@ -95,7 +96,11 @@ struct _starpu_sched_ctx
          * task (level 1) or it is not (level 0). */
      	int min_priority;
 	int max_priority;
-	
+
+	/* semaphore that block appl thread until threads are ready 
+	   to exec the parallel code */
+	sem_t parallel_code_sem;
+
 	/* hwloc tree structure of workers */
 #ifdef STARPU_HAVE_HWLOC
 	hwloc_bitmap_t hwloc_workers_set;
@@ -156,6 +161,12 @@ unsigned _starpu_worker_belongs_to_a_sched_ctx(int workerid, unsigned sched_ctx_
 /* mutex synchronising several simultaneous modifications of a context */
 starpu_pthread_mutex_t* _starpu_sched_ctx_get_changing_ctx_mutex(unsigned sched_ctx_id);
 
+/*rebind each thread on its cpu after finishing a parallel code */
+void _starpu_sched_ctx_rebind_thread_to_its_cpu(unsigned cpuid);
+
+/* let the appl know that the worker blocked to execute parallel code */
+void _starpu_sched_ctx_signal_worker_blocked(int workerid);
+
 #ifdef STARPU_USE_SC_HYPERVISOR
 /* Notifies the hypervisor that a tasks was poped from the workers' list */
 void _starpu_sched_ctx_call_poped_task_cb(int workerid, struct starpu_task *task, size_t data_size, uint32_t footprint);

+ 2 - 0
src/drivers/driver_common/driver_common.c

@@ -159,7 +159,9 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int wor
 	if(args->parallel_sect)
 	{
 		STARPU_PTHREAD_MUTEX_LOCK(&args->parallel_sect_mutex);
+		_starpu_sched_ctx_signal_worker_blocked(args->workerid);
 		STARPU_PTHREAD_COND_WAIT(&args->parallel_sect_cond, &args->parallel_sect_mutex);
+		_starpu_sched_ctx_rebind_thread_to_its_cpu(args->bindid);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&args->parallel_sect_mutex);
 		args->parallel_sect = 0;
 	}