Procházet zdrojové kódy

dirty hack to have combined worker before scheduler unitialization
dead lock problem problem in node_workers.c to fix

Simon Archipoff před 12 roky
rodič
revize
ec7ccb4070

+ 3 - 1
src/core/combined_workers.c

@@ -77,7 +77,7 @@ int starpu_combined_worker_assign_workerid(int nworkers, int workerid_array[])
 	}
 
 	/* Get an id for that combined worker. Note that this is not thread
-	 * safe because thhis method should only be called when the scheduler
+	 * safe because this method should only be called when the scheduler
 	 * is being initialized. */
 	new_workerid = basic_worker_count + combined_worker_id;
 	config->topology.ncombinedworkers++;
@@ -90,6 +90,8 @@ int starpu_combined_worker_assign_workerid(int nworkers, int workerid_array[])
 	}
 	fprintf(stderr, "into worker %d\n", new_workerid);
 #endif
+	for(i = 0; i < nworkers; i++)
+		_starpu_get_worker_struct(workerid_array[i])->combined_workerid = new_workerid;
 
 	struct _starpu_combined_worker *combined_worker =
 		&config->combined_workers[combined_worker_id];

+ 19 - 7
src/core/workers.c

@@ -954,10 +954,28 @@ int starpu_initialize(struct starpu_conf *user_conf, int *argc, char ***argv)
 	     config.conf = user_conf;
 	     config.default_conf = 0;
 	}
-
+	/* Depending on whether we are a MP sink or not, we must build the
+	 * topology with MP nodes or not. */
+	ret = _starpu_build_topology(&config, is_a_sink);
 	_starpu_conf_check_environment(config.conf);
 
+	/* Launch "basic" workers (ie. non-combined workers) */
+	if (!is_a_sink)
+		_starpu_launch_drivers(&config);
+
+	int nworkers = starpu_worker_get_count();
+	int workerid_array[nworkers];
+	int i;
+	for(i = 0; i < nworkers; i++)
+	{
+		workerid_array[i] = i;
+	}
+
+	starpu_combined_worker_assign_workerid(nworkers, workerid_array);
+
 	_starpu_init_all_sched_ctxs(&config);
+
+
 	_starpu_init_progression_hooks();
 
 	_starpu_init_tags();
@@ -976,9 +994,6 @@ int starpu_initialize(struct starpu_conf *user_conf, int *argc, char ***argv)
 
 	_starpu_load_bus_performance_files();
 
-	/* Depending on whether we are a MP sink or not, we must build the
-	 * topology with MP nodes or not. */
-	ret = _starpu_build_topology(&config, is_a_sink ? 1 : 0);
 	if (ret)
 	{
 		STARPU_PTHREAD_MUTEX_LOCK(&init_mutex);
@@ -1003,9 +1018,6 @@ int starpu_initialize(struct starpu_conf *user_conf, int *argc, char ***argv)
 
 	_starpu_initialize_registered_performance_models();
 
-	/* Launch "basic" workers (ie. non-combined workers) */
-	if (!is_a_sink)
-		_starpu_launch_drivers(&config);
 
 	STARPU_PTHREAD_MUTEX_LOCK(&init_mutex);
 	initialized = INITIALIZED;

+ 1 - 1
src/sched_policies/node_eager.c

@@ -13,7 +13,7 @@ static void initialize_eager_center_policy(unsigned sched_ctx_id)
  	data->root = _starpu_sched_node_fifo_create(NULL);
 	data->workers = _starpu_bitmap_create();
 	unsigned i;
-	for(i = 0; i < starpu_worker_get_count(); i++)
+	for(i = 0; i < starpu_worker_get_count() + starpu_combined_worker_get_count(); i++)
 	{
 		struct _starpu_sched_node * node = _starpu_sched_node_worker_get(i);
 		if(!node)

+ 6 - 3
src/sched_policies/node_worker.c

@@ -22,7 +22,7 @@ static struct _starpu_sched_node * _worker_nodes[STARPU_NMAXWORKERS];
  *
  * its possible that a _starpu_task_grid wont have task
  *
- * N = no task
+ * N = no task 
  *
  *   T  T  T
  *   |  |  |
@@ -270,8 +270,8 @@ static void available_worker(struct _starpu_sched_node * worker_node)
 	
 #ifndef STARPU_NON_BLOCKING_DRIVERS
 	struct _starpu_worker * w = _starpu_sched_node_worker_get_worker(worker_node);
-//	if(w->workerid == starpu_worker_get_id())
-//		return;
+	if(w->workerid == starpu_worker_get_id())
+		return;
 	starpu_pthread_mutex_t *sched_mutex = &w->sched_mutex;
 	starpu_pthread_cond_t *sched_cond = &w->sched_cond;
 
@@ -283,6 +283,8 @@ static void available_worker(struct _starpu_sched_node * worker_node)
 
 static void available_combined_worker(struct _starpu_sched_node * node)
 {
+	(void) node;
+#ifndef STARPU_NON_BLOCKING_DRIVERS
 	STARPU_ASSERT(_starpu_sched_node_is_combined_worker(node));
 	struct _starpu_worker_node_data * data = node->data;
 	int workerid = starpu_worker_get_id();
@@ -299,6 +301,7 @@ static void available_combined_worker(struct _starpu_sched_node * node)
 		STARPU_PTHREAD_COND_SIGNAL(sched_cond);
 		STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 	}
+#endif
 }
 
 static double estimated_transfer_length(struct _starpu_sched_node * node,

+ 1 - 1
src/sched_policies/parallel_eager.c

@@ -47,7 +47,7 @@ static void peager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned n
 	unsigned ncombined_workers = starpu_combined_worker_get_count();
 	unsigned ntotal_workers = nbasic_workers + ncombined_workers;
 		
-	_starpu_sched_find_worker_combinations(workerids, nworkers);
+//	_starpu_sched_find_worker_combinations(workerids, nworkers);
 
 	unsigned workerid, i;
 	unsigned ncombinedworkers;

+ 1 - 1
src/sched_policies/parallel_heft.c

@@ -517,7 +517,7 @@ static void parallel_heft_add_workers(unsigned sched_ctx_id, int *workerids, uns
 			workerarg->has_prev_init = 1;
 		}
 	}
-	_starpu_sched_find_worker_combinations(workerids, nworkers);
+//	_starpu_sched_find_worker_combinations(workerids, nworkers);
 
 // start_unclear_part: not very clear where this is used
 /* 	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config(); */