ソースを参照

some renamings + remove synchr for simultaneous pushes and remove_worker_from_ctx

Andra Hugo 13 年 前
コミット
70f169e657
共有6 個のファイルを変更した36 個の追加41 個の削除を含む
  1. 18 22
      src/core/sched_ctx.c
  2. 1 1
      src/core/sched_ctx.h
  3. 10 11
      src/core/sched_policy.c
  4. 1 1
      src/core/workers.c
  5. 4 4
      src/sched_policies/deque_modeling_policy_data_aware.c
  6. 2 2
      src/sched_policies/work_stealing_policy.c

+ 18 - 22
src/core/sched_ctx.c

@@ -20,8 +20,8 @@
 
 pthread_key_t sched_ctx_key;
 
-static unsigned _starpu_get_first_available_sched_ctx_id(struct starpu_machine_config_s *config);
-static unsigned _starpu_get_first_free_sched_ctx_in_worker_list(struct starpu_worker_s *worker);
+static unsigned _starpu_get_first_free_sched_ctx(struct starpu_machine_config_s *config);
+static unsigned _starpu_worker_get_first_free_sched_ctx(struct starpu_worker_s *worker);
 static void _starpu_rearange_sched_ctx_workerids(struct starpu_sched_ctx *sched_ctx, int old_nworkers_ctx);
 
 struct sched_ctx_info {
@@ -77,7 +77,7 @@ static void _starpu_update_workers(int *workerids, int nworkers,
 		worker[i] = _starpu_get_worker_struct(workerids[i]);
 		
 		sched_info_args[i].sched_ctx_id = sched_ctx_id == -1  ? 
-			_starpu_get_first_free_sched_ctx_in_worker_list(worker[i]) : 
+			_starpu_worker_get_first_free_sched_ctx(worker[i]) : 
 			(unsigned)sched_ctx_id;
 
 		sched_info_args[i].sched_ctx = sched_ctx;
@@ -122,7 +122,7 @@ struct starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
 	STARPU_ASSERT(config->topology.nsched_ctxs < STARPU_NMAX_SCHED_CTXS - 1);
 
-	unsigned id = _starpu_get_first_available_sched_ctx_id(config);
+	unsigned id = _starpu_get_first_free_sched_ctx(config);
 
 	struct starpu_sched_ctx *sched_ctx = &config->sched_ctxs[id];
 	sched_ctx->id = id;
@@ -179,7 +179,7 @@ struct starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 	    for(i = 0; i < sched_ctx->nworkers; i++)
 	      {
 		struct starpu_worker_s *worker = _starpu_get_worker_struct(sched_ctx->workerids[i]);
-		worker->sched_ctx[_starpu_get_first_free_sched_ctx_in_worker_list(worker)] = sched_ctx;
+		worker->sched_ctx[_starpu_worker_get_first_free_sched_ctx(worker)] = sched_ctx;
 	      }
 	  }
 
@@ -223,8 +223,7 @@ static void free_sched_ctx_mem(struct starpu_sched_ctx *sched_ctx)
 
 static void _starpu_manage_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx)
 {
-	_starpu_update_workers(sched_ctx->workerids, sched_ctx->nworkers, 
-			       sched_ctx->id, NULL);
+	_starpu_update_workers(sched_ctx->workerids, sched_ctx->nworkers, sched_ctx->id, NULL);
 }
 
 static void _starpu_add_workers_to_sched_ctx(int *new_workers, int nnew_workers,
@@ -378,21 +377,18 @@ static void _starpu_remove_workers_from_sched_ctx(int *workerids, int nworkers_t
 void starpu_remove_workers_from_sched_ctx(int *workerids, int nworkers_to_remove, 
 					  unsigned sched_ctx_id)
 {
-	  /* wait for the workers concerned by the change of contex    
-	   * to finish their work in the previous context */
+	/* wait for the workers concerned by the change of contex    
+	 * to finish their work in the previous context */
 	if(!starpu_wait_for_all_tasks_of_workers(workerids, nworkers_to_remove))
-	  {
+	{
 		struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_structure(sched_ctx_id);
-
-		PTHREAD_MUTEX_LOCK(&sched_ctx->changing_ctx_mutex);
 		_starpu_remove_workers_from_sched_ctx(workerids, nworkers_to_remove, sched_ctx);
-		PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
-	  }
+	}
 	return;
 }
 
 /* unused sched_ctx have the id STARPU_NMAX_SCHED_CTXS */
-void _starpu_init_all_sched_ctx(struct starpu_machine_config_s *config)
+void _starpu_init_all_sched_ctxs(struct starpu_machine_config_s *config)
 {
 	pthread_key_create(&sched_ctx_key, NULL);
 
@@ -418,7 +414,7 @@ void _starpu_init_sched_ctx_for_worker(unsigned workerid)
 /* sched_ctx aren't necessarly one next to another */
 /* for eg when we remove one its place is free */
 /* when we add  new one we reuse its place */
-static unsigned _starpu_get_first_available_sched_ctx_id(struct starpu_machine_config_s *config)
+static unsigned _starpu_get_first_free_sched_ctx(struct starpu_machine_config_s *config)
 {
 	unsigned i;
 	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
@@ -429,7 +425,7 @@ static unsigned _starpu_get_first_available_sched_ctx_id(struct starpu_machine_c
 	return STARPU_NMAX_SCHED_CTXS;
 }
 
-static unsigned _starpu_get_first_free_sched_ctx_in_worker_list(struct starpu_worker_s *worker)
+static unsigned _starpu_worker_get_first_free_sched_ctx(struct starpu_worker_s *worker)
 {
 	unsigned i;
 	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
@@ -439,7 +435,7 @@ static unsigned _starpu_get_first_free_sched_ctx_in_worker_list(struct starpu_wo
 	return STARPU_NMAX_SCHED_CTXS;
 }
 
-static int _starpu_get_first_free_worker_space(int *workerids, int nworkers)
+static int _starpu_get_first_free_worker(int *workerids, int nworkers)
 {
 	int i;
 	for(i = 0; i < nworkers; i++)
@@ -461,7 +457,7 @@ static void _starpu_rearange_sched_ctx_workerids(struct starpu_sched_ctx *sched_
 	  {
 		if(sched_ctx->workerids[i] != -1)
 		  {
-			first_free_id = _starpu_get_first_free_worker_space(sched_ctx->workerids, nworkers_ctx);
+			first_free_id = _starpu_get_first_free_worker(sched_ctx->workerids, nworkers_ctx);
 			if(first_free_id != -1)
 			  {
 				sched_ctx->workerids[first_free_id] = sched_ctx->workerids[i];
@@ -542,7 +538,7 @@ void _starpu_increment_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
 	_starpu_barrier_counter_increment(&sched_ctx->tasks_barrier);
 }
 
-int _starpu_get_index_ctx_of_workerid(unsigned sched_ctx_id, unsigned workerid)
+int _starpu_get_index_in_ctx_of_workerid(unsigned sched_ctx_id, unsigned workerid)
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_structure(sched_ctx_id);
 	
@@ -558,13 +554,13 @@ int _starpu_get_index_ctx_of_workerid(unsigned sched_ctx_id, unsigned workerid)
 
 pthread_mutex_t *_starpu_get_sched_mutex(struct starpu_sched_ctx *sched_ctx, int worker)
 {
-	int workerid_ctx = _starpu_get_index_ctx_of_workerid(sched_ctx->id, worker);
+	int workerid_ctx = _starpu_get_index_in_ctx_of_workerid(sched_ctx->id, worker);
 	return (workerid_ctx == -1 ? NULL : sched_ctx->sched_mutex[workerid_ctx]);
 }
 
 pthread_cond_t *_starpu_get_sched_cond(struct starpu_sched_ctx *sched_ctx, int worker)
 {
-	int workerid_ctx = _starpu_get_index_ctx_of_workerid(sched_ctx->id, worker);
+	int workerid_ctx = _starpu_get_index_in_ctx_of_workerid(sched_ctx->id, worker);
 	return (workerid_ctx == -1 ? NULL : sched_ctx->sched_cond[workerid_ctx]);
 }
 

+ 1 - 1
src/core/sched_ctx.h

@@ -64,7 +64,7 @@ struct starpu_sched_ctx {
 struct starpu_machine_config_s;
 
 /* init sched_ctx_id of all contextes*/
-void _starpu_init_all_sched_ctx(struct starpu_machine_config_s *config);
+void _starpu_init_all_sched_ctxs(struct starpu_machine_config_s *config);
 
 /* init the list of contextes of the worker */
 void _starpu_init_sched_ctx_for_worker(unsigned workerid);

+ 10 - 11
src/core/sched_policy.c

@@ -241,16 +241,16 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 	if (use_prefetch)
 		starpu_prefetch_task_input_on_node(task, memory_node);
 
+	/* if we push a task on a specific worker, notify all the sched_ctxs the worker belongs to */
 	unsigned i;
+	struct starpu_sched_ctx *sched_ctx;
 	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
-	  {
-		if (worker->sched_ctx[i] != NULL && worker->sched_ctx[i]->sched_policy != NULL && 
-		    worker->sched_ctx[i]->sched_policy->push_task_notify)
-		  {
-			worker->sched_ctx[i]->sched_policy->push_task_notify(task, workerid, worker->sched_ctx[i]->id);
-		  }
-	  }
-
+	{
+		sched_ctx = worker->sched_ctx[i];
+		if (sched_ctx != NULL && sched_ctx->sched_policy != NULL && sched_ctx->sched_policy->push_task_notify)
+			sched_ctx->sched_policy->push_task_notify(task, workerid, sched_ctx->id);
+	}
+	
 	if (is_basic_worker)
 	{
 		return _starpu_push_local_task(worker, task, 0);
@@ -319,10 +319,9 @@ int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
 		    sched_ctx->nworkers = sched_ctx->temp_nworkers;
 		    sched_ctx->temp_nworkers = -1;
 		  }
-		/* don't push task on ctx at the same time workers are removed from ctx */
-		ret = sched_ctx->sched_policy->push_task(task, sched_ctx->id);
 		PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
-		
+
+		ret = sched_ctx->sched_policy->push_task(task, sched_ctx->id);
 	}
 
 	_starpu_profiling_set_task_push_end_time(task);

+ 1 - 1
src/core/workers.c

@@ -371,7 +371,7 @@ int starpu_init(struct starpu_conf *user_conf)
 	 * initialization */
 	config.user_conf = user_conf;
 
-	_starpu_init_all_sched_ctx(&config);
+	_starpu_init_all_sched_ctxs(&config);
 	ret = _starpu_build_topology(&config);
 	if (ret) {
 		PTHREAD_MUTEX_LOCK(&init_mutex);

+ 4 - 4
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -125,7 +125,7 @@ static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
 	struct starpu_task *task;
 
 	int workerid = starpu_worker_get_id();
-	int workerid_ctx =  _starpu_get_index_ctx_of_workerid(sched_ctx_id, workerid);
+	int workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
 	struct starpu_fifo_taskq_s *fifo = dt->queue_array[workerid_ctx];
 
 	unsigned node = starpu_worker_get_memory_node(workerid);
@@ -161,7 +161,7 @@ static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
 	struct starpu_task *task;
 
 	int workerid = starpu_worker_get_id();
-	int workerid_ctx =  _starpu_get_index_ctx_of_workerid(sched_ctx_id, workerid);
+	int workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
 	struct starpu_fifo_taskq_s *fifo = dt->queue_array[workerid_ctx];
 
 	task = _starpu_fifo_pop_task(fifo, -1);
@@ -197,7 +197,7 @@ static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
 	struct starpu_task *new_list;
 
 	int workerid = starpu_worker_get_id();
-	int workerid_ctx =  _starpu_get_index_ctx_of_workerid(sched_ctx_id, workerid);
+	int workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
 	struct starpu_fifo_taskq_s *fifo = dt->queue_array[workerid_ctx];
 
 	new_list = _starpu_fifo_pop_every_task(fifo, sched_ctx->sched_mutex[workerid_ctx], workerid);
@@ -289,7 +289,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	/* make sure someone coule execute that task ! */
 	STARPU_ASSERT(best_workerid != -1);
 
-	int best_workerid_ctx =  _starpu_get_index_ctx_of_workerid(sched_ctx->id, best_workerid);
+	int best_workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx->id, best_workerid);
 
 	struct starpu_fifo_taskq_s *fifo;
 	fifo = dt->queue_array[best_workerid_ctx];

+ 2 - 2
src/sched_policies/work_stealing_policy.c

@@ -141,7 +141,7 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 	struct starpu_task *task;
 
 	int workerid = starpu_worker_get_id();
-	int workerid_ctx =  _starpu_get_index_ctx_of_workerid(sched_ctx_id, workerid);
+	int workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
 
 	struct starpu_deque_jobq_s *q;
 
@@ -180,7 +180,7 @@ int ws_push_task(struct starpu_task *task, unsigned sched_ctx_id)
 	work_stealing_data *ws = (work_stealing_data*)sched_ctx->policy_data;
 
 	int workerid = starpu_worker_get_id();
-	int workerid_ctx =  _starpu_get_index_ctx_of_workerid(sched_ctx_id, workerid);
+	int workerid_ctx =  _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
 
         struct starpu_deque_jobq_s *deque_queue;
 	deque_queue = ws->queue_array[workerid_ctx];