Parcourir la source

locks for redim ctxs

Andra Hugo il y a 14 ans
Parent
commit
0453c9ffe6
3 fichiers modifiés avec 43 ajouts et 17 suppressions
  1. 14 10
      src/core/sched_ctx.c
  2. 6 0
      src/core/sched_ctx.h
  3. 23 7
      src/core/sched_policy.c

+ 14 - 10
src/core/sched_ctx.c

@@ -9,8 +9,8 @@ static void _starpu_rearange_sched_ctx_workerids(struct starpu_sched_ctx *sched_
 int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_ctx, int *workerids_in_ctx);
 
 unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx, 
-			     int nworkerids_in_ctx, unsigned is_initial_sched,
-			     const char *sched_name)
+				  int nworkerids_in_ctx, unsigned is_initial_sched,
+				  const char *sched_name)
 {
 	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
 	STARPU_ASSERT(config->topology.nsched_ctxs < STARPU_NMAX_SCHED_CTXS - 1);
@@ -24,6 +24,9 @@ unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx
 	STARPU_ASSERT(nworkerids_in_ctx <= nworkers);
   
 	sched_ctx->nworkers_in_ctx = nworkerids_in_ctx;
+	sched_ctx->temp_nworkers_in_ctx = -1;
+	PTHREAD_MUTEX_INIT(&sched_ctx->changing_ctx_mutex, NULL);
+
 	sched_ctx->sched_policy = malloc(sizeof(struct starpu_sched_policy_s));
 	sched_ctx->is_initial_sched = is_initial_sched;
 	sched_ctx->sched_name = sched_name;
@@ -127,16 +130,15 @@ static void _starpu_manage_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx)
 	struct starpu_machine_config_s *config = _starpu_get_machine_config();
 	config->topology.nsched_ctxs--;
 	sched_ctx->sched_ctx_id = STARPU_NMAX_SCHED_CTXS;
+	PTHREAD_MUTEX_DESTROY(&sched_ctx->changing_ctx_mutex);
 }
 
 static void _starpu_add_workers_to_sched_ctx(int *new_workers, int nnew_workers,
-                                     struct starpu_sched_ctx *sched_ctx)
+					     struct starpu_sched_ctx *sched_ctx)
 {
         struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
         int ntotal_workers = config->topology.nworkers;
 
-	// STARPU_ASSERT((nnew_workers + sched_ctx->nworkers_in_ctx) <= ntotal_workers);
-
         int nworkerids_already_in_ctx =  sched_ctx->nworkers_in_ctx;
         int j;
 	
@@ -160,12 +162,12 @@ static void _starpu_add_workers_to_sched_ctx(int *new_workers, int nnew_workers,
         else
           {
                 int i;
-		printf("%d redim worker:", nnew_workers);
+		//		printf("%d redim worker:", nnew_workers);
                 for(i = 0; i < nnew_workers; i++)
                   {
                         /*take care the user does not ask for a resource that does not exist*/
                         STARPU_ASSERT( new_workers[i] >= 0 &&  new_workers[i] <= ntotal_workers);
-			printf(" %d", new_workers[i]);
+			//	printf(" %d", new_workers[i]);
 
 			struct starpu_worker_s *workerarg = _starpu_get_worker_struct(new_workers[i]);
 			if(!_starpu_worker_belongs_to_ctx(new_workers[i], sched_ctx))
@@ -178,7 +180,7 @@ static void _starpu_add_workers_to_sched_ctx(int *new_workers, int nnew_workers,
 			  }
                   }
 		//                sched_ctx->nworkers_in_ctx += n_added_workers;
-		printf("\n");
+		//		printf("\n");
           }
 
         sched_ctx->sched_policy->init_sched_for_workers(sched_ctx->sched_ctx_id, n_added_workers);
@@ -303,7 +305,9 @@ void starpu_remove_workers_from_sched_ctx(int *workerids_in_ctx, int nworkerids_
 
 		/* block the workers until the contex is switched */
 		set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx);
+		PTHREAD_MUTEX_LOCK(&sched_ctx->changing_ctx_mutex);
 		_starpu_remove_workers_from_sched_ctx(workerids_in_ctx, nworkerids_in_ctx, sched_ctx);
+		PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
 		/* also wait the workers to wake up before using the context */
 		set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx);
 	  }
@@ -548,11 +552,11 @@ int _starpu_get_index_in_ctx_of_workerid(unsigned sched_ctx_id, unsigned workeri
 pthread_mutex_t *_starpu_get_sched_mutex(struct starpu_sched_ctx *sched_ctx, int worker)
 {
 	int workerid_ctx = _starpu_get_index_in_ctx_of_workerid(sched_ctx->sched_ctx_id, worker);
-	return sched_ctx->sched_mutex[workerid_ctx];
+	return (workerid_ctx == -1 ? NULL : sched_ctx->sched_mutex[workerid_ctx]);
 }
 
 pthread_cond_t *_starpu_get_sched_cond(struct starpu_sched_ctx *sched_ctx, int worker)
 {
 	int workerid_ctx = _starpu_get_index_in_ctx_of_workerid(sched_ctx->sched_ctx_id, worker);
-	return sched_ctx->sched_cond[workerid_ctx];
+	return (workerid_ctx == -1 ? NULL : sched_ctx->sched_cond[workerid_ctx]);
 }

+ 6 - 0
src/core/sched_ctx.h

@@ -42,6 +42,12 @@ struct starpu_sched_ctx {
 	/* number of threads in contex */
 	int nworkers_in_ctx; 
 
+	/* temporary variable for number of threads in contex */
+	int temp_nworkers_in_ctx; 
+  
+	/* mutext for temp_nworkers_in_ctx*/
+	pthread_mutex_t changing_ctx_mutex;
+
 	/* we keep an initial sched which we never delete */
 	unsigned is_initial_sched; 
 

+ 23 - 7
src/core/sched_policy.c

@@ -312,7 +312,19 @@ int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
 	else {
 		struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(task->sched_ctx);
 		STARPU_ASSERT(sched_ctx->sched_policy->push_task);
+		/* update the number of threads of the context before pushing a task to 
+		   the context in order to avoid doing it during the computation of the
+		   best worker */
+		PTHREAD_MUTEX_LOCK(&sched_ctx->changing_ctx_mutex);
+		if(sched_ctx->temp_nworkers_in_ctx != -1)
+		  {
+		    sched_ctx->nworkers_in_ctx = sched_ctx->temp_nworkers_in_ctx;
+		    sched_ctx->temp_nworkers_in_ctx = -1;
+		  }
+		/* don't push task on ctx at the same time workers are removed from ctx */
 		ret = sched_ctx->sched_policy->push_task(task, sched_ctx->sched_ctx_id);
+		PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
+		
 	}
 
 	_starpu_profiling_set_task_push_end_time(task);
@@ -340,24 +352,28 @@ struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker)
 	if(!task)
 	  {
 		struct starpu_sched_ctx *sched_ctx;
-		pthread_mutex_t *sched_mutex_ctx;
+		pthread_mutex_t *sched_ctx_mutex;
 
 		unsigned i;
 		for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
 		  {
-			sched_ctx = worker->sched_ctx[i];
-			if(sched_ctx != NULL)
+		    sched_ctx = worker->sched_ctx[i];
+		    
+		    if(sched_ctx != NULL)
+		      {
+			sched_ctx_mutex = _starpu_get_sched_mutex(sched_ctx, worker->workerid);
+			if(sched_ctx_mutex != NULL)
 			  {
-			    sched_mutex_ctx = _starpu_get_sched_mutex(sched_ctx, worker->workerid);
-			    PTHREAD_MUTEX_LOCK(sched_mutex_ctx);
+			    PTHREAD_MUTEX_LOCK(sched_ctx_mutex);
 			    if (sched_ctx->sched_policy->pop_task)
 			      {
 				task = sched_ctx->sched_policy->pop_task(sched_ctx->sched_ctx_id);
-				PTHREAD_MUTEX_UNLOCK(sched_mutex_ctx);
+				PTHREAD_MUTEX_UNLOCK(sched_ctx_mutex);
 				break;
 			      }
-			    PTHREAD_MUTEX_UNLOCK(sched_mutex_ctx);
+			    PTHREAD_MUTEX_UNLOCK(sched_ctx_mutex);
 			  }
+		      }
 		  }
 	  }