Browse Source

Redim of ctxs - when one finished the other takes its part of res

Andra Hugo 14 years ago
parent
commit
2212709e37

+ 1 - 1
examples/cholesky_and_lu/cholesky/cholesky_implicit.c

@@ -211,7 +211,7 @@ double run_cholesky_implicit(unsigned sched_ctx, int start, int argc, char **arg
 		printf("\n");
 	}
 #endif
-	// 	if(barrier != NULL)
+	//	if(barrier != NULL)
 	//	  pthread_barrier_wait(barrier);
 	double gflops = cholesky(mat, size, size, nblocks, sched_ctx, timing);
 

+ 1 - 1
examples/cholesky_and_lu/cholesky_and_lu.c

@@ -16,7 +16,7 @@ typedef struct {
   double avg_timing;
 } retvals;
 
-#define NSAMPLES 3
+#define NSAMPLES 5
 int first = 1;
 pthread_mutex_t mut;
 

+ 72 - 60
src/core/sched_ctx.c

@@ -9,6 +9,22 @@ static pthread_cond_t wakeup_ths_cond = PTHREAD_COND_INITIALIZER;
 static pthread_mutex_t blocking_ths_mutex = PTHREAD_MUTEX_INITIALIZER;
 static int nblocked_ths = 0;
 
+void _starpu_init_sched_ctx(struct starpu_machine_config_s *config)
+{
+  /* unused sched_ctx have the id STARPU_NMAX_SCHED_CTXS */
+  int i;
+  for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
+    config->sched_ctxs[i].sched_ctx_id = STARPU_NMAX_SCHED_CTXS;
+}
+
+static unsigned _starpu_get_first_free_sched_ctx(struct starpu_machine_config_s *config){
+  unsigned i;
+  for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
+    if(config->sched_ctxs[i].sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
+      return i;
+  return STARPU_NMAX_SCHED_CTXS;
+}
+
 unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx, 
 			     int nworkerids_in_ctx, unsigned is_initial_sched,
 			     const char *sched_name)
@@ -18,7 +34,7 @@ unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx
 
 	struct starpu_sched_ctx *sched_ctx = &config->sched_ctxs[config->topology.nsched_ctxs];
 
-	sched_ctx->sched_ctx_id = config->topology.nsched_ctxs;
+	sched_ctx->sched_ctx_id = _starpu_get_first_free_sched_ctx(config);
 
 	int nworkers = config->topology.nworkers;
 	
@@ -65,8 +81,8 @@ unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx
 		  }
 	  }
 
-	sched_ctx->sched_mutex = (pthread_mutex_t**)malloc(sched_ctx->nworkers_in_ctx * sizeof(pthread_mutex_t*));
-	sched_ctx->sched_cond = (pthread_cond_t**)malloc(sched_ctx->nworkers_in_ctx *sizeof(pthread_cond_t*));
+	sched_ctx->sched_mutex = (pthread_mutex_t**)malloc(STARPU_NMAXWORKERS* sizeof(pthread_mutex_t*));
+	sched_ctx->sched_cond = (pthread_cond_t**)malloc(STARPU_NMAXWORKERS *sizeof(pthread_cond_t*));
 
 
 	_starpu_init_sched_policy(config, sched_ctx, policy_name);
@@ -227,7 +243,7 @@ static void _starpu_manage_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx)
 	sched_ctx->sched_policy = NULL;
 	struct starpu_machine_config_s *config = _starpu_get_machine_config();
 	config->topology.nsched_ctxs--;
-
+	sched_ctx->sched_ctx_id = STARPU_NMAX_SCHED_CTXS;
 }
 void starpu_delete_sched_ctx(unsigned sched_ctx_id)
 {
@@ -251,15 +267,15 @@ void _starpu_delete_all_sched_ctxs()
 	unsigned nsched_ctxs = config->topology.nsched_ctxs;
 	unsigned i;
 
-	for(i = 0; i < nsched_ctxs; i++)
+	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
 	  {
-	    if(!starpu_wait_for_all_tasks_of_sched_ctx(i))
-	      {
-
-		struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(i);
-
-		_starpu_manage_delete_sched_ctx(sched_ctx);
-	      }		
+	    struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(i);
+	    if(sched_ctx->sched_ctx_id != STARPU_NMAX_SCHED_CTXS){
+	      if(!starpu_wait_for_all_tasks_of_sched_ctx(i))
+		{
+		  _starpu_manage_delete_sched_ctx(sched_ctx);
+		}		
+	    }
 	  }
 	return;
 }
@@ -325,55 +341,51 @@ void _starpu_increment_nsubmitted_tasks_of_worker(int workerid)
 	return;
 }
 
-static void _starpu_add_workers_to_sched_ctx(int *workerids_in_ctx, int nworkerids_in_ctx, 
-				     struct starpu_sched_ctx *sched_ctx)
+static void _starpu_add_workers_to_sched_ctx(int *new_workers, int nnew_workers,
+                                     struct starpu_sched_ctx *sched_ctx)
 {
-	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
-	int nworkers = config->topology.nworkers;
-	
-	STARPU_ASSERT((nworkerids_in_ctx + sched_ctx->nworkers_in_ctx) <= nworkers);
-	
-	int nworkerids_already_in_ctx =  sched_ctx->nworkers_in_ctx;
-	int j;
-
-	/*if null add the rest of the workers which don't already belong to this ctx*/
-	if(workerids_in_ctx == NULL)
-	  {
-		for(j = 0; j < nworkers; j++)
-		  {
-			struct starpu_worker_s *workerarg = _starpu_get_worker_struct(j);
-			if(!_starpu_worker_belongs_to_ctx(workerarg, sched_ctx))
-			  {
-				sched_ctx->workerid[++nworkerids_already_in_ctx] = j;
-				workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
-			  }
-			sched_ctx->nworkers_in_ctx = nworkers;
-		  }
-	  } 
-	else 
-	  {
-		int i;
-		for(i = 0; i < nworkerids_in_ctx; i++)
-		  {
-			/*take care the user does not ask for a resource that does not exist*/
-			STARPU_ASSERT( workerids_in_ctx[i] >= 0 &&  workerids_in_ctx[i] <= nworkers);
-		    
-			sched_ctx->workerid[ nworkerids_already_in_ctx + i] = workerids_in_ctx[i];
-			for(j = 0; j < nworkers; j++)
-			  {
-				if(sched_ctx->workerid[i] == j)
-				  {
-					struct starpu_worker_s *workerarg = _starpu_get_worker_struct(j);
-					workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
-				  }
-			  }
-		  }
-		sched_ctx->nworkers_in_ctx += nworkerids_in_ctx;
-	  }
-
-	sched_ctx->sched_policy->init_sched_for_workers(sched_ctx->sched_ctx_id, nworkerids_in_ctx);
-
-	return;
+        struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
+        int ntotal_workers = config->topology.nworkers;
+
+        STARPU_ASSERT((nnew_workers + sched_ctx->nworkers_in_ctx) <= ntotal_workers);
+
+        int nworkerids_already_in_ctx =  sched_ctx->nworkers_in_ctx;
+        int j;
+
+        /*if null add the rest of the workers which don't already belong to this ctx*/
+        if(new_workers == NULL)
+          {
+                for(j = 0; j < ntotal_workers; j++)
+                  {
+                        struct starpu_worker_s *workerarg = _starpu_get_worker_struct(j);
+                        if(!_starpu_worker_belongs_to_ctx(workerarg, sched_ctx))
+                          {
+                                sched_ctx->workerid[++nworkerids_already_in_ctx] = j;
+                                workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
+                          }
+                  }
+                sched_ctx->nworkers_in_ctx = ntotal_workers;
+          }
+        else
+          {
+                int i;
+                for(i = 0; i < nnew_workers; i++)
+                  {
+                        /*take care the user does not ask for a resource that does not exist*/
+                        STARPU_ASSERT( new_workers[i] >= 0 &&  new_workers[i] <= ntotal_workers);
+
+                        /* add worker to context */
+                        sched_ctx->workerid[ nworkerids_already_in_ctx + i] = new_workers[i];
+                        /* add context to worker */
+                        struct starpu_worker_s *workerarg = _starpu_get_worker_struct(new_workers[i]);
+                        workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
+                  }
+                sched_ctx->nworkers_in_ctx += nnew_workers;
+          }
+
+        sched_ctx->sched_policy->init_sched_for_workers(sched_ctx->sched_ctx_id, nnew_workers);
+
+        return;
 }
 
 void starpu_add_workers_to_sched_ctx(int *workerids_in_ctx, int nworkerids_in_ctx,

+ 2 - 1
src/core/workers.c

@@ -348,6 +348,7 @@ int starpu_init(struct starpu_conf *user_conf)
 	 * initialization */
 	config.user_conf = user_conf;
 
+	_starpu_init_sched_ctx(&config);
 	ret = _starpu_build_topology(&config);
 	if (ret) {
 		PTHREAD_MUTEX_LOCK(&init_mutex);
@@ -633,7 +634,7 @@ struct starpu_worker_s *_starpu_get_worker_struct(unsigned id)
 
 struct starpu_sched_ctx *_starpu_get_sched_ctx(unsigned id)
 {
-	STARPU_ASSERT(id <= config.topology.nsched_ctxs);
+	STARPU_ASSERT(id <= STARPU_NMAX_SCHED_CTXS);
 	return &config.sched_ctxs[id];
 }