Explorar o código

Remove a use case of contexts and parallel workers which isn't (and shouldn't be) used and complexifies the code.

Terry Cojean %!s(int64=8) %!d(string=hai) anos
pai
achega
f222a00429
Modificáronse 2 ficheiros con 109 adicións e 244 borrados
  1. 107 239
      src/core/sched_ctx.c
  2. 2 5
      src/core/sched_ctx.h

+ 107 - 239
src/core/sched_ctx.c

@@ -37,10 +37,11 @@ static int nobind;
 static int occupied_sms = 0;
 
 static unsigned _starpu_get_first_free_sched_ctx(struct _starpu_machine_config *config);
-static void _starpu_sched_ctx_add_workers_to_master(unsigned sched_ctx_id, int *workerids, int nworkers, int new_master);
-static void _starpu_sched_ctx_wake_these_workers_up(unsigned sched_ctx_id, int *workerids, int nworkers);
-static int _starpu_sched_ctx_find_master(unsigned sched_ctx_id, int *workerids, int nworkers);
-static void _starpu_sched_ctx_set_master(struct _starpu_sched_ctx *sched_ctx, int *workerids, int nworkers, int master);
+
+static void _starpu_sched_ctx_put_new_master(unsigned sched_ctx_id);
+static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id);
+static void _starpu_sched_ctx_update_parallel_workers_with(unsigned sched_ctx_id);
+static void _starpu_sched_ctx_update_parallel_workers_without(unsigned sched_ctx_id);
 
 static void _starpu_worker_gets_into_ctx(unsigned sched_ctx_id, struct _starpu_worker *worker)
 {
@@ -155,8 +156,6 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 	if (!nworkers_to_add)
 		return;
 	int workers_to_add[nworkers_to_add];
-	int cpu_workers[nworkers_to_add];
-	int ncpu_workers = 0;
 
 	struct starpu_perfmodel_device devices[nworkers_to_add];
 	int ndevices = 0;
@@ -236,13 +235,6 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 			else
 				found = 0;
 		}
-
-		if (!sched_ctx->sched_policy)
-		{
-			struct _starpu_worker *worker_str = _starpu_get_worker_struct(wa[i]);
-			if (worker_str->arch == STARPU_CPU_WORKER)
-				cpu_workers[ncpu_workers++] = wa[i];
-		}
 	}
 
 	if(ndevices > 0)
@@ -311,24 +303,10 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 		}
 	}
 
-	if(!sched_ctx->sched_policy)
-	{
-		if(!sched_ctx->awake_workers)
-		{
-			if(sched_ctx->main_master == -1)
-				sched_ctx->main_master = starpu_sched_ctx_book_workers_for_task(sched_ctx->id, cpu_workers, ncpu_workers);
-			else
-			{
-				_starpu_sched_ctx_add_workers_to_master(sched_ctx->id, cpu_workers, ncpu_workers, sched_ctx->main_master);
-			}
-		}
-		else
-		{
-			sched_ctx->main_master = _starpu_sched_ctx_find_master(sched_ctx->id, cpu_workers, ncpu_workers);
-			_starpu_sched_ctx_set_master(sched_ctx, cpu_workers, ncpu_workers, sched_ctx->main_master);
-		}
-	}
-	else if(sched_ctx->sched_policy->add_workers)
+
+	_starpu_sched_ctx_update_parallel_workers_with(sched_ctx->id);
+
+	if(sched_ctx->sched_policy && sched_ctx->sched_policy->add_workers)
 	{
 		_STARPU_TRACE_WORKER_SCHEDULING_PUSH;
 		if(added_workers)
@@ -412,13 +390,7 @@ static void _starpu_remove_workers_from_sched_ctx(struct _starpu_sched_ctx *sche
 		sched_ctx->perf_arch.devices[dev].ncores = devices[dev].ncores;
 	}
 
-	if(!sched_ctx->sched_policy)
-	{
-		if(!sched_ctx->awake_workers)
-		{
-			_starpu_sched_ctx_wake_these_workers_up(sched_ctx->id, removed_workers, *n_removed_workers);
-		}
-	}
+	_starpu_sched_ctx_update_parallel_workers_without(sched_ctx->id);
 
 	return;
 }
@@ -561,14 +533,13 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 		STARPU_PTHREAD_COND_INIT(&sched_ctx->parallel_sect_cond_busy[w], NULL);
 		sched_ctx->busy[w] = 0;
 
-		sched_ctx->master[w] = -1;
 		sched_ctx->parallel_sect[w] = 0;
 		sched_ctx->sleeping[w] = 0;
 	}
 
 	sched_ctx->parallel_view = 0;
 
-        /*init the strategy structs and the worker_collection of the ressources of the context */
+  /*init the strategy structs and the worker_collection of the ressources of the context */
 	if(policy)
 	{
 		_starpu_init_sched_policy(config, sched_ctx, policy);
@@ -1152,7 +1123,7 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 	if(!_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id))
 	{
 		if(!sched_ctx->sched_policy)
-			starpu_sched_ctx_unbook_workers_for_task(sched_ctx->id, sched_ctx->main_master);
+			_starpu_sched_ctx_wake_up_workers(sched_ctx_id);
 		/*if btw the mutex release & the mutex lock the context has changed take care to free all
 		  scheduling data before deleting the context */
 		_starpu_update_workers_without_ctx(workerids, nworkers_ctx, sched_ctx_id, 1);
@@ -2366,45 +2337,13 @@ static unsigned _worker_sleeping_in_other_ctx(unsigned sched_ctx_id, int workeri
 
 }
 
-static void _starpu_sched_ctx_get_workers_to_sleep(unsigned sched_ctx_id, int *workerids, int nworkers, int master)
-{
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	int current_worker_id = starpu_worker_get_id();
-	unsigned sleeping[nworkers];
-	int w;
-	for(w = 0; w < nworkers; w++)
-	{
-		if(current_worker_id == -1 || workerids[w] != current_worker_id)
-			STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerids[w]]);
-		sleeping[w] = _worker_sleeping_in_other_ctx(sched_ctx_id, workerids[w]);
-		sched_ctx->master[workerids[w]] = master;
-		sched_ctx->parallel_sect[workerids[w]] = 1;
-		if(current_worker_id == -1 || workerids[w] != current_worker_id)
-			STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerids[w]]);
-#ifndef STARPU_NON_BLOCKING_DRIVERS
-		starpu_wake_worker(workerids[w]);
-#endif
-	}
-
-	for(w = 0; w < nworkers; w++)
-	{
-		int workerid = workerids[w];
-		if((current_worker_id == -1 || workerid != current_worker_id) && !sleeping[w])
-		{
-			sem_wait(&sched_ctx->fall_asleep_sem[master]);
-		}
-	}
-	return;
-}
-
 void _starpu_sched_ctx_signal_worker_blocked(unsigned sched_ctx_id, int workerid)
 {
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 	worker->blocked = 1;
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	sched_ctx->sleeping[workerid] = 1;
-	int master = sched_ctx->master[workerid];
-	sem_post(&sched_ctx->fall_asleep_sem[master]);
+	sem_post(&sched_ctx->fall_asleep_sem[sched_ctx->main_master]);
 
 	return;
 }
@@ -2412,30 +2351,74 @@ void _starpu_sched_ctx_signal_worker_blocked(unsigned sched_ctx_id, int workerid
 void _starpu_sched_ctx_signal_worker_woke_up(unsigned sched_ctx_id, int workerid)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	int master = sched_ctx->master[workerid];
-	sem_post(&sched_ctx->wake_up_sem[master]);
+	sem_post(&sched_ctx->wake_up_sem[sched_ctx->main_master]);
 	sched_ctx->sleeping[workerid] = 0;
-	sched_ctx->master[workerid] = -1;
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 	worker->blocked = 0;
 
 	return;
 }
 
-static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, int master)
+static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id)
+{
+    struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+    int current_worker_id = starpu_worker_get_id();
+		int master = sched_ctx->main_master;
+    struct starpu_worker_collection *workers = sched_ctx->workers;
+    struct starpu_sched_ctx_iterator it;
+    unsigned sleeping[sched_ctx->workers->nworkers];
+
+	if (master == -1)
+		return;
+
+    workers->init_iterator(workers, &it);
+    while(workers->has_next(workers, &it))
+    {
+        int workerid = workers->get_next(workers, &it);
+        sleeping[workerid] = _worker_sleeping_in_other_ctx(sched_ctx_id, workerid);
+
+        if(!sched_ctx->parallel_sect[workerid] && workerid != master)
+        {
+            if (current_worker_id == -1 || workerid != current_worker_id)
+            {
+                STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+                sched_ctx->parallel_sect[workerid] = 1;
+                STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+            }
+        }
+    }
+
+    workers->init_iterator(workers, &it);
+    while(workers->has_next(workers, &it))
+    {
+            int workerid = workers->get_next(workers, &it);
+            if(workerid != master
+               && (current_worker_id == -1 || workerid != current_worker_id)
+               && !sleeping[workerid])
+            {
+                    sem_wait(&sched_ctx->fall_asleep_sem[master]);
+            }
+    }
+
+    return;
+}
+
+static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	int current_worker_id = starpu_worker_get_id();
+	int master = sched_ctx->main_master;
 	struct starpu_worker_collection *workers = sched_ctx->workers;
-
 	struct starpu_sched_ctx_iterator it;
 
+	if (master == -1)
+		return;
+
 	workers->init_iterator(workers, &it);
 	while(workers->has_next(workers, &it))
 	{
 		int workerid = workers->get_next(workers, &it);
-		int curr_master = sched_ctx->master[workerid];
-		if(curr_master == master && sched_ctx->parallel_sect[workerid])
+		if(sched_ctx->parallel_sect[workerid] && workerid != master)
 		{
 			if((current_worker_id == -1 || workerid != current_worker_id) && sched_ctx->sleeping[workerid])
 			{
@@ -2454,197 +2437,82 @@ static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, int master)
 
 void* starpu_sched_ctx_exec_parallel_code(void* (*func)(void*), void* param, unsigned sched_ctx_id)
 {
-	int *workerids = NULL;
-	int nworkers = starpu_sched_ctx_get_workers_list(sched_ctx_id, &workerids);
-	_starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, workerids, nworkers, workerids[nworkers-1]);
-
-	/* execute parallel code */
-	void* ret = func(param);
+    _starpu_sched_ctx_put_workers_to_sleep(sched_ctx_id);
 
-	/* wake up starpu workers */
-	_starpu_sched_ctx_wake_up_workers(sched_ctx_id, workerids[nworkers-1]);
+    /* execute parallel code */
+    void* ret = func(param);
 
-	free(workerids);
-	return ret;
+    /* wake up starpu workers */
+    _starpu_sched_ctx_wake_up_workers(sched_ctx_id);
+    return ret;
 }
 
-void starpu_sched_ctx_get_available_cpuids(unsigned sched_ctx_id, int **cpuids, int *ncpuids)
+static void _starpu_sched_ctx_update_parallel_workers_with(unsigned sched_ctx_id)
 {
-	int current_worker_id = starpu_worker_get_id();
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct starpu_worker_collection *workers = sched_ctx->workers;
-	_STARPU_MALLOC((*cpuids), workers->nworkers*sizeof(int));
-	int w = 0;
+    struct _starpu_sched_ctx * sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 
-	struct starpu_sched_ctx_iterator it;
+	if(sched_ctx->sched_policy)
+		return;
 
-	workers->init_iterator(workers, &it);
 
-	while(workers->has_next(workers, &it))
+	_starpu_sched_ctx_put_new_master(sched_ctx_id);
+
+	if(!sched_ctx->awake_workers)
 	{
-		int workerid = workers->get_next(workers, &it);
-		int master = sched_ctx->master[workerid];
-		if(master == current_worker_id || workerid == current_worker_id || current_worker_id == -1)
-		{
-			(*cpuids)[w++] = starpu_worker_get_bindid(workerid);
-		}
+		_starpu_sched_ctx_put_workers_to_sleep(sched_ctx_id);
 	}
-	*ncpuids = w;
-	return;
 }
 
-static void _starpu_sched_ctx_wake_these_workers_up(unsigned sched_ctx_id, int *workerids, int nworkers)
+static void _starpu_sched_ctx_update_parallel_workers_without(unsigned sched_ctx_id)
 {
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	int current_worker_id = starpu_worker_get_id();
+    struct _starpu_sched_ctx * sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 
-	int masters[nworkers];
-	int w;
-	for(w = 0; w < nworkers; w++)
-	{
-		int workerid = workerids[w];
-		masters[w] = sched_ctx->master[workerid];
-		if(current_worker_id == -1 || workerid != current_worker_id)
-		{
-			STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
-			STARPU_PTHREAD_COND_SIGNAL(&sched_ctx->parallel_sect_cond[workerid]);
-			STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
-		}
-		else
-			sched_ctx->parallel_sect[workerid] = 0;
-		sched_ctx->master[workerid] = -1;
-	}
+	if(sched_ctx->sched_policy)
+		return;
 
-	for(w = 0; w < nworkers; w++)
+
+	_starpu_sched_ctx_put_new_master(sched_ctx_id);
+
+	if(!sched_ctx->awake_workers)
 	{
-		int workerid = workerids[w];
-		if(masters[w] != -1)
-		{
-			int master = sched_ctx->master[workerid];
-			if(current_worker_id == -1 || workerid != current_worker_id)
-				sem_wait(&sched_ctx->wake_up_sem[master]);
-		}
+		_starpu_sched_ctx_wake_up_workers(sched_ctx_id);
 	}
-
-	return;
 }
 
-static int _starpu_sched_ctx_find_master(unsigned sched_ctx_id, int *workerids, int nworkers)
+void starpu_sched_ctx_get_available_cpuids(unsigned sched_ctx_id, int **cpuids, int *ncpuids)
 {
+	int current_worker_id = starpu_worker_get_id();
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	int new_master = workerids[nworkers-1];
-        int current_worker_id = starpu_worker_get_id();
-        int current_is_in_section = 0;
-        int npotential_masters = 0;
-        int nawake_workers = 0;
-        int ntrue_masters = 0;
-        int potential_masters[nworkers];
-        int awake_workers[nworkers];
-        int true_masters[nworkers];
-
-        int i,w;
-        for(w = 0 ; w < nworkers ; w++)
-        {
-                if (current_worker_id == workerids[w])
-                        current_is_in_section = 1;
+	struct starpu_worker_collection *workers = sched_ctx->workers;
+	_STARPU_MALLOC((*cpuids), workers->nworkers*sizeof(int));
+	int w = 0;
 
-		int master = sched_ctx->master[workerids[w]];
-                if (master > -1)
-		{
-                        int already_seen = 0;
-                        //Could create a function for this. Basically searching an element in an array.
-                        for (i = 0 ; i < npotential_masters; i++)
-                        {
-                                if (potential_masters[i] == master)
-				{
-                                        already_seen = 1;
-                                        break;
-				}
-                        }
-                        if (!already_seen)
-				potential_masters[npotential_masters++] = master;
-                }
-                else if (master == -1)
-                        awake_workers[nawake_workers++] = workerids[w];
-        }
+	struct starpu_sched_ctx_iterator it;
 
-        for (i = 0 ; i < npotential_masters ; i++)
-	{
-		int master_is_in_section = 0;
-		//Could create a function for this. Basically searching an element in an array.
-		for (w = 0 ; w < nworkers ; w++)
-		{
-			if (workerids[w] == potential_masters[i])
-			{
-				master_is_in_section = 1;
-				break;
-			}
-		}
-                if (master_is_in_section)
-			true_masters[ntrue_masters++] = potential_masters[i];
-        }
+	workers->init_iterator(workers, &it);
 
-        if (current_is_in_section)
-                new_master = current_worker_id;
-        else
-        {
-                if (ntrue_masters > 1)
+	while(workers->has_next(workers, &it))
+	{
+		int workerid = workers->get_next(workers, &it);
+		int master = sched_ctx->main_master;
+		if(master == current_worker_id || workerid == current_worker_id || current_worker_id == -1)
 		{
-                        if (nawake_workers > 0)
-                                new_master = awake_workers[nawake_workers - 1];
-                        else
-                                new_master = true_masters[ntrue_masters - 1];
+			(*cpuids)[w++] = starpu_worker_get_bindid(workerid);
 		}
 	}
-	return new_master;
+	*ncpuids = w;
+	return;
 }
 
-static void _starpu_sched_ctx_add_workers_to_master(unsigned sched_ctx_id, int *workerids, int nworkers, int new_master)
+static void _starpu_sched_ctx_put_new_master(unsigned sched_ctx_id)
 {
+	int *workerids;
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	int w;
-	int nput_to_sleep = 0;
-	int nwake_up = 0;
-	int put_to_sleep[nworkers];
-	int wake_up[nworkers];
+	unsigned nworkers = starpu_sched_ctx_get_workers_list(sched_ctx_id, &workerids);
 
-	for(w = 0 ; w < nworkers ; w++)
-	{
-		int master = sched_ctx->master[workerids[w]];
-		if (master == -1 && workerids[w] != new_master)
-			put_to_sleep[nput_to_sleep++] = workerids[w];
-		else if(master != -1 && workerids[w] == new_master)
-			wake_up[nwake_up++] = workerids[w];
-	}
-
-	if(nwake_up > 0)
-		_starpu_sched_ctx_wake_these_workers_up(sched_ctx_id, wake_up, nwake_up);
-	if(nput_to_sleep > 0)
-		_starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, put_to_sleep, nput_to_sleep, new_master);
-
-}
+	sched_ctx->main_master = workerids[nworkers-1];
 
-static void _starpu_sched_ctx_set_master(struct _starpu_sched_ctx *sched_ctx, int *workerids, int nworkers, int master)
-{
-	int i;
-	for(i = 0; i < nworkers; i++)
-	{
-		if(workerids[i] != master)
-			sched_ctx->master[workerids[i]] = master;
-	}
-}
-
-int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids, int nworkers)
-{
-	int new_master = _starpu_sched_ctx_find_master(sched_ctx_id, workerids, nworkers);
-	_starpu_sched_ctx_add_workers_to_master(sched_ctx_id, workerids, nworkers, new_master);
-	return new_master;
-}
-
-void starpu_sched_ctx_unbook_workers_for_task(unsigned sched_ctx_id, int master)
-{
-	/* wake up starpu workers */
-	_starpu_sched_ctx_wake_up_workers(sched_ctx_id, master);
+	free(workerids);
 }
 
 struct starpu_perfmodel_arch * _starpu_sched_ctx_get_perf_archtype(unsigned sched_ctx_id)

+ 2 - 5
src/core/sched_ctx.h

@@ -148,17 +148,14 @@ struct _starpu_sched_ctx
 	   parallel sections to be executed on their allocated resources */
 	unsigned parallel_sect[STARPU_NMAXWORKERS];
 
-	/* id of the master worker */
-	int master[STARPU_NMAXWORKERS];
-
-	/* semaphore that block appl thread until starpu threads are 
+	/* semaphore that block appl thread until starpu threads are
 	   all blocked and ready to exec the parallel code */
 	sem_t fall_asleep_sem[STARPU_NMAXWORKERS];
 
 	/* semaphore that block appl thread until starpu threads are 
 	   all woke up and ready continue appl */
 	sem_t wake_up_sem[STARPU_NMAXWORKERS];
-       
+
 	/* bool indicating if the workers is sleeping in this ctx */
 	unsigned sleeping[STARPU_NMAXWORKERS];