Kaynağa Gözat

parallel greedy - doesn't work for several ctxs bc of the combined
workers

Andra Hugo 13 yıl önce
ebeveyn
işleme
b84f08aab5

+ 1 - 1
include/starpu_scheduler.h

@@ -170,7 +170,7 @@ void starpu_worker_init_sched_condition(unsigned sched_ctx, int workerid);
 
 void starpu_worker_deinit_sched_condition(unsigned sched_ctx, int workerid);
 
-void starpu_create_worker_collection_for_sched_ctx(unsigned sched_ctx_id, int type);
+struct worker_collection* starpu_create_worker_collection_for_sched_ctx(unsigned sched_ctx_id, int type);
 	
 void starpu_delete_worker_collection_for_sched_ctx(unsigned sched_ctx_id); 
 

+ 1 - 1
src/Makefile.am

@@ -161,6 +161,7 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 						\
 	sched_policies/eager_central_priority_policy.c		\
 	sched_policies/random_policy.c				\
 	sched_policies/work_stealing_policy.c			\
+	sched_policies/parallel_greedy.c			\
 	sched_policies/detect_combined_workers.c		\
 	drivers/driver_common/driver_common.c			\
 	datawizard/memory_nodes.c				\
@@ -214,7 +215,6 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 						\
 	top/starpu_top_connection.c                          	\
 	worker_collection/worker_list.c
 #	sched_policies/parallel_heft.c				
-#	sched_policies/parallel_greedy.c			
 
 if STARPU_USE_CPU
 libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES += drivers/cpu/driver_cpu.c

+ 2 - 2
src/core/sched_ctx.c

@@ -610,7 +610,7 @@ void starpu_worker_deinit_sched_condition(unsigned sched_ctx_id, int workerid)
 	free(sched_ctx->sched_cond[workerid]);
 }
 
-void starpu_create_worker_collection_for_sched_ctx(unsigned sched_ctx_id, int worker_collection_type)
+struct worker_collection* starpu_create_worker_collection_for_sched_ctx(unsigned sched_ctx_id, int worker_collection_type)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	sched_ctx->workers = (struct worker_collection*)malloc(sizeof(struct worker_collection));
@@ -630,7 +630,7 @@ void starpu_create_worker_collection_for_sched_ctx(unsigned sched_ctx_id, int wo
 		break;
 	}
 
-	return;
+	return sched_ctx->workers;
 }
 
 void starpu_delete_worker_collection_for_sched_ctx(unsigned sched_ctx_id)

+ 84 - 68
src/sched_policies/parallel_greedy.c

@@ -36,32 +36,30 @@ static int possible_combinations_cnt[STARPU_NMAXWORKERS];
 static int possible_combinations[STARPU_NMAXWORKERS][10];
 static int possible_combinations_size[STARPU_NMAXWORKERS][10];
 
-static void initialize_pgreedy_policy(unsigned sched_ctx_id) 
-{
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct pgreedy_data *data = (struct pgreedy_data*)malloc(sizeof(pgreedy_data));
-	/* masters pick tasks from that queue */
-	data->fifo = _starpu_create_fifo();
 
-	struct starpu_machine_config_s *config = _starpu_get_machine_config();
-	struct starpu_machine_topology_s *topology = &config->topology;
+/*!!!!!!! It doesn't work with several contexts because the combined workers are constructed
+  from the workers available to the program, and not to the context !!!!!!!!!!!!!!!!!!!!!!!
+ */
+
+static void pgreedy_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
+{
+	struct pgreedy_data *data = (struct pgreedy_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
+	struct _starpu_machine_config *config = _starpu_get_machine_config();
+	struct starpu_machine_topology *topology = &config->topology;
 
 	_starpu_sched_find_worker_combinations(topology);
 
-	unsigned workerid, workerid_ctx;
-	unsigned ncombinedworkers, nworkers, nworkers_ctx;
+	unsigned workerid, i;
+	unsigned ncombinedworkers;
 	
-	nworkers = topology->nworkers;
-	nworkers_ctx = sched_ctx->nworkers;
 	ncombinedworkers = starpu_combined_worker_get_count();
 
 	/* Find the master of each worker. We first assign the worker as its
 	 * own master, and then iterate over the different worker combinations
 	 * to find the biggest combination containing this worker. */
-
-	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
+	for(i = 0; i < nworkers; i++)
 	{
-		workerid = sched_ctx->workerids[workerid_ctx];
+		workerid = workerids[i];
 		
 		int cnt = possible_combinations_cnt[workerid]++;
 		possible_combinations[workerid][cnt] = workerid;
@@ -70,7 +68,6 @@ static void initialize_pgreedy_policy(unsigned sched_ctx_id)
 		data->master_id[workerid] = workerid;
 	}
 	
-	unsigned i;
 	
 	for (i = 0; i < ncombinedworkers; i++)
 	{
@@ -95,20 +92,9 @@ static void initialize_pgreedy_policy(unsigned sched_ctx_id)
 		}
 	}
 
-	_STARPU_PTHREAD_MUTEX_INIT(&data->sched_mutex, NULL);
-	_STARPU_PTHREAD_COND_INIT(&data->sched_cond, NULL);
-
-	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
-	{
-		workerid = sched_ctx->workerids[workerid_ctx];
-
-		_STARPU_PTHREAD_MUTEX_INIT(sched_ctx->sched_mutex[workerid], NULL);
-		_STARPU_PTHREAD_COND_INIT(sched_ctx->sched_cond[workerid], NULL);
-	}
-
-	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
-	{
-		workerid = sched_ctx->workerids[workerid_ctx];
+	for(i = 0; i < nworkers; i++)
+        {
+		workerid = workerids[i];
 
 		/* slaves pick up tasks from their local queue, their master
 		 * will put tasks directly in that local list when a parallel
@@ -118,67 +104,95 @@ static void initialize_pgreedy_policy(unsigned sched_ctx_id)
 		unsigned master = data->master_id[workerid];
 
 		/* All masters use the same condition/mutex */
-		if (master == workerid)
-		{
-			sched_ctx->sched_mutex[workerid] = &data->sched_mutex;
-			sched_ctx->sched_cond[workerid] = &data->sched_cond;
-		}
+		if (workerid == master)
+			starpu_worker_set_sched_condition(sched_ctx_id, workerid, &data->sched_mutex, &data->sched_cond);
+		else
+			starpu_worker_init_sched_condition(sched_ctx_id, workerid);
 	}
-	sched_ctx->policy_data = (void*)data;
 
 #if 0
-	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
-	{
-		workerid = sched_ctx->workerids[workerid_ctx];
+	for(i = 0; i < nworkers; i++)
+        {
+		workerid = workerids[i];
 
 		fprintf(stderr, "MASTER of %d = %d\n", workerid, master_id[workerid]);
 	}
 #endif
+
+}
+
+static void pgreedy_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
+{
+	struct pgreedy_data *data = (struct pgreedy_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
+	int workerid;
+	unsigned i;
+	for(i = 0; i < nworkers; i++)
+        {
+		workerid = workerids[i];
+		_starpu_destroy_fifo(data->local_fifo[workerid]);
+		unsigned master = data->master_id[workerid];
+		if(workerid != master)
+			starpu_worker_deinit_sched_condition(sched_ctx_id, workerid);
+		else
+			starpu_woker_set_sched_condition(sched_ctx_id, workerid, NULL, NULL);
+	}
+}
+
+static void initialize_pgreedy_policy(unsigned sched_ctx_id) 
+{
+	starpu_create_worker_collection_for_sched_ctx(sched_ctx_id, WORKER_LIST);
+
+	struct pgreedy_data *data = (struct pgreedy_data*)malloc(sizeof(pgreedy_data));
+	/* masters pick tasks from that queue */
+	data->fifo = _starpu_create_fifo();
+
+	_STARPU_PTHREAD_MUTEX_INIT(&data->sched_mutex, NULL);
+	_STARPU_PTHREAD_COND_INIT(&data->sched_cond, NULL);
+
+	starpu_set_sched_ctx_policy_data(sched_ctx_id, (void*)data);
 }
 
 static void deinitialize_pgreedy_policy(unsigned sched_ctx_id) 
 {
 	/* TODO check that there is no task left in the queue */
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct pgreedy_data *data = (struct pgreedy_data*)sched_ctx->policy_data;
-
+	struct pgreedy_data *data = (struct pgreedy_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 
 	/* deallocate the job queue */
 	_starpu_destroy_fifo(data->fifo);
 
-	PTHREAD_MUTEX_DESTROY(&data->sched_mutex);
-	PTHREAD_COND_DESTROY(&data->sched_cond);
-	
-	free(data);	
-	
-	unsigned nworkers_ctx = sched_ctx->nworkers;
-	int workerid;
-	unsigned workerid_ctx;
-	for (workerid_ctx = 0; workerid_ctx < nworkers_ctx; workerid_ctx++)
-	{
-		workerid = sched_ctx->workerids[workerid_ctx];
-		_starpu_destroy_fifo(data->local_fifo[workerid]);
-		PTHREAD_MUTEX_DESTROY(sched_ctx->sched_mutex[workerid]);
-		PTHREAD_COND_DESTROY(sched_ctx->sched_cond[workerid]);
+	_STARPU_PTHREAD_MUTEX_DESTROY(&data->sched_mutex);
+	_STARPU_PTHREAD_COND_DESTROY(&data->sched_cond);
 
-		sched_ctx->sched_mutex[workerid] = NULL;
-		sched_ctx->sched_cond[workerid] = NULL;
-	}
+	starpu_delete_worker_collection_for_sched_ctx(sched_ctx_id);
 
+	free(data);	
 }
 
 static int push_task_pgreedy_policy(struct starpu_task *task, unsigned sched_ctx_id)
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct pgreedy_data *data = (struct pgreedy_data*)sched_ctx->policy_data;
-
-	return _starpu_fifo_push_task(data->fifo, &data->sched_mutex, &data->sched_cond, task);
+	pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);
+	unsigned nworkers;
+        int ret_val = -1;
+
+	/* if the context has no workers return */
+        _STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+        nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
+        if(nworkers == 0)
+        {
+                _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+                return ret_val;
+        }
+	struct pgreedy_data *data = (struct pgreedy_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
+
+	ret_val = _starpu_fifo_push_task(data->fifo, &data->sched_mutex, &data->sched_cond, task);
+	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+
+	return ret_val;
 }
 
 static struct starpu_task *pop_task_pgreedy_policy(unsigned sched_ctx_id)
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	struct pgreedy_data *data = (struct pgreedy_data*)sched_ctx->policy_data;
+	struct pgreedy_data *data = (struct pgreedy_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
 
 	int workerid = starpu_worker_get_id();
 
@@ -188,7 +202,7 @@ static struct starpu_task *pop_task_pgreedy_policy(unsigned sched_ctx_id)
 
 	int master = data->master_id[workerid];
 
-	if (master == workerid)
+	if (workerid == master)
 	{
 		/* The worker is a master */
 		struct starpu_task *task = _starpu_fifo_pop_task(data->fifo, workerid);
@@ -253,9 +267,11 @@ static struct starpu_task *pop_task_pgreedy_policy(unsigned sched_ctx_id)
 				struct starpu_task *alias = _starpu_create_task_alias(task);
 				int local_worker = combined_workerid[i];
 
+				pthread_mutex_t *sched_mutex;
+				pthread_cond_t *sched_cond;
+				starpu_worker_get_sched_condition(sched_ctx_id, master, &sched_mutex, &sched_cond);
 				_starpu_fifo_push_task(data->local_fifo[local_worker],
-					sched_ctx->sched_mutex[master],
-					sched_ctx->sched_cond[master], alias);
+						       sched_mutex, sched_cond, alias);
 			}
 
 			/* The master also manipulated an alias */

+ 1 - 1
src/sched_policies/work_stealing_policy.c

@@ -33,7 +33,7 @@ typedef struct{
 	pthread_mutex_t sched_mutex;
 	pthread_cond_t sched_cond;
 	unsigned last_pop_worker;
-	static unsigned last_push_worker;
+	unsigned last_push_worker;
 } work_stealing_data;
 
 #ifdef USE_OVERLOAD