浏览代码

Overlapping + redim

Andra Hugo 14 年之前
父节点
当前提交
0ab734e82c

+ 41 - 16
examples/cholesky_and_lu/cholesky_and_lu.c

@@ -42,8 +42,8 @@ void* func_cholesky(void *val){
   pthread_mutex_lock(&mut);
   if(first)
     {
-      starpu_delete_sched_ctx(p->ctx);
-      starpu_add_workers_to_sched_ctx(p->procs, p->ncpus, the_other_ctx);
+      starpu_delete_sched_ctx(p->ctx, the_other_ctx);
+      //      starpu_add_workers_to_sched_ctx(p->procs, p->ncpus, the_other_ctx);
     }
   first = 0;
   pthread_mutex_unlock(&mut);
@@ -54,24 +54,38 @@ void* func_cholesky(void *val){
   return (void*)rv;
 }
 
-void cholesky_vs_cholesky(params *p1, params *p2, params *p3, int ncpus1, int ncpus2){
+void cholesky_vs_cholesky(params *p1, params *p2, params *p3, 
+			  unsigned cpu_start1, unsigned cpu_start2,
+			  unsigned cpu_end1, unsigned cpu_end2){
+  int ncpus1 = cpu_end1 - cpu_start1;
+  int ncpus2 = cpu_end2 - cpu_start2;
+
   /* 2 cholesky in different ctxs */
   starpu_init(NULL);
   starpu_helper_cublas_init();
 
   int procs[ncpus1];
   int i;
-  for(i = 0; i < ncpus1; i++)
-    procs[i] = i;
-
+  int k = 0;
+  for(i = cpu_start1; i < cpu_end1; i++)
+    {
+      printf("%d ", i);
+      procs[k++] = i;
+    }
+  printf("\n");
   p1->ctx = starpu_create_sched_ctx("heft", procs, ncpus1, "cholesky1");
   p2->the_other_ctx = (int)p1->ctx;
   p1->procs = procs;
   p1->ncpus = ncpus1;
   int procs2[ncpus2];
 
-  for(i = 0; i < ncpus2; i++)
-    procs2[i] = ncpus1+i;
+  k = 0;
+  for(i = cpu_start2; i < cpu_end2; i++){
+    printf("%d ", i);
+    procs2[k++] = i;
+  }
+
+  printf("\n");
 
   p2->ctx = starpu_create_sched_ctx("heft", procs2, ncpus2, "cholesky2");
   p1->the_other_ctx = (int)p2->ctx;
@@ -115,22 +129,33 @@ void cholesky_vs_cholesky(params *p1, params *p2, params *p3, int ncpus1, int nc
 
 int main(int argc, char **argv)
 {
-  //  printf("argc = %d\n", argc);
-  int ncpus1=0, ncpus2=0;
+  unsigned cpu_start1 = 0, cpu_end1 = 0, cpu_start2 = 0, cpu_end2 = 0;
+
   int i;
   
   for (i = 9; i < argc; i++) {
-    if (strcmp(argv[i], "-ncpus1") == 0) {
+    if (strcmp(argv[i], "-cpu_start1") == 0) {
       char *argptr;
-      ncpus1 = strtol(argv[++i], &argptr, 10);
+      cpu_start1 = strtol(argv[++i], &argptr, 10);
     }
     
-    if (strcmp(argv[i], "-ncpus2") == 0) {
+    if (strcmp(argv[i], "-cpu_start2") == 0) {
       char *argptr;
-      ncpus2 = strtol(argv[++i], &argptr, 10);
+      cpu_start2 = strtol(argv[++i], &argptr, 10);
     }    
+
+    if (strcmp(argv[i], "-cpu_end1") == 0) {
+      char *argptr;
+      cpu_end1 = strtol(argv[++i], &argptr, 10);
+    }
+    
+    if (strcmp(argv[i], "-cpu_end2") == 0) {
+      char *argptr;
+      cpu_end2 = strtol(argv[++i], &argptr, 10);
+    }    
+
   }
-  //  printf("%d %d\n", ncpus1, ncpus2);
+
   params p1;
   p1.start = 1;
   p1.argc = 5;
@@ -145,7 +170,7 @@ int main(int argc, char **argv)
   p3.argc = argc;
   p3.argv = argv;
   p3.ctx = 0;
-  cholesky_vs_cholesky(&p1, &p2,&p3, ncpus1, ncpus2);
+  cholesky_vs_cholesky(&p1, &p2,&p3, cpu_start1, cpu_start2, cpu_end1, cpu_end2);
 
   return 0;
 }

+ 1 - 1
include/starpu_scheduler.h

@@ -106,7 +106,7 @@ struct starpu_sched_policy_s {
 
 unsigned starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx, int nworkerids_in_ctx, const char *sched_name);
 
-void starpu_delete_sched_ctx(unsigned sched_ctx_id);
+void starpu_delete_sched_ctx(unsigned sched_ctx_id, unsigned inheritor_sched_ctx_id);
 
 void starpu_add_workers_to_sched_ctx(int *workerids_in_ctx, int nworkerids_in_ctx, unsigned sched_ctx);
 

+ 82 - 56
src/core/sched_ctx.c

@@ -17,6 +17,15 @@ void _starpu_init_sched_ctx(struct starpu_machine_config_s *config)
     config->sched_ctxs[i].sched_ctx_id = STARPU_NMAX_SCHED_CTXS;
 }
 
+void _starpu_init_sched_ctx_for_worker(unsigned workerid)
+{
+  struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
+  worker->sched_ctx = (struct starpu_sched_ctx**)malloc(STARPU_NMAX_SCHED_CTXS * sizeof(struct starpu_sched_ctx*));
+  int i;
+  for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
+    worker->sched_ctx[i] = NULL;
+}
+
 static unsigned _starpu_get_first_free_sched_ctx(struct starpu_machine_config_s *config){
   unsigned i;
   for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
@@ -25,6 +34,14 @@ static unsigned _starpu_get_first_free_sched_ctx(struct starpu_machine_config_s
   return STARPU_NMAX_SCHED_CTXS;
 }
 
+static unsigned _starpu_get_first_free_sched_ctx_in_worker_list(struct starpu_worker_s *worker){
+  unsigned i;
+  for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
+    if(worker->sched_ctx[i] == NULL)
+      return i;
+  return -1;
+}
+
 unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx, 
 			     int nworkerids_in_ctx, unsigned is_initial_sched,
 			     const char *sched_name)
@@ -57,7 +74,7 @@ unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx
 		  {
 			sched_ctx->workerid[j] = j;
 			struct starpu_worker_s *workerarg = _starpu_get_worker_struct(j);
-			workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
+			workerarg->sched_ctx[_starpu_get_first_free_sched_ctx_in_worker_list(workerarg)] = sched_ctx;
 		}
 		sched_ctx->nworkers_in_ctx = nworkers;
 	  } 
@@ -75,7 +92,7 @@ unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx
 				if(sched_ctx->workerid[i] == j)
 				  {
 					struct starpu_worker_s *workerarg = _starpu_get_worker_struct(j);
-					workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
+					workerarg->sched_ctx[_starpu_get_first_free_sched_ctx_in_worker_list(workerarg)] = sched_ctx;
 				  }
 			  }
 		  }
@@ -158,7 +175,7 @@ static int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkeri
 		
 		/*if the status is CHANGING_CTX let the thread know that it must block*/
 		PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
-		worker->status = changing_ctx;
+		worker->blocking_status = changing_ctx;
 		worker->nworkers_of_next_ctx = nworkers;
 		PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 
@@ -169,6 +186,7 @@ static int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkeri
 			PTHREAD_COND_SIGNAL(changing_ctx_cond);
 			PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 		  }
+
 	  }
 	
 	/*after letting know all the concerned threads about the change                        
@@ -194,12 +212,11 @@ unsigned starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx,
 	return ret;
 }
 
-static unsigned _starpu_worker_belongs_to_ctx(struct starpu_worker_s *workerarg, struct starpu_sched_ctx *sched_ctx)
+static unsigned _starpu_worker_belongs_to_ctx(int workerid, struct starpu_sched_ctx *sched_ctx)
 {
-	unsigned i;
-	for(i = 0; i < workerarg->nctxs; i++)
-		if(sched_ctx != NULL && workerarg->sched_ctx[i] == sched_ctx
-		   && workerarg->status != STATUS_JOINED)
+	int i;
+	for(i = 0; i < sched_ctx->nworkers_in_ctx; i++)
+	  if(sched_ctx->workerid[i] == workerid)
 		  return 1;
 	return 0;
 }
@@ -208,7 +225,7 @@ static void _starpu_remove_sched_ctx_from_worker(struct starpu_worker_s *workera
 {
 	unsigned i;
 	unsigned to_remove = 0;
-	for(i = 0; i < workerarg->nctxs; i++)
+	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
 	  {
 		if(sched_ctx != NULL && workerarg->sched_ctx[i] == sched_ctx
 		    && workerarg->status != STATUS_JOINED)
@@ -245,16 +262,71 @@ static void _starpu_manage_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx)
 	config->topology.nsched_ctxs--;
 	sched_ctx->sched_ctx_id = STARPU_NMAX_SCHED_CTXS;
 }
-void starpu_delete_sched_ctx(unsigned sched_ctx_id)
+
+static void _starpu_add_workers_to_sched_ctx(int *new_workers, int nnew_workers,
+                                     struct starpu_sched_ctx *sched_ctx)
+{
+        struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
+        int ntotal_workers = config->topology.nworkers;
+
+	// STARPU_ASSERT((nnew_workers + sched_ctx->nworkers_in_ctx) <= ntotal_workers);
+
+        int nworkerids_already_in_ctx =  sched_ctx->nworkers_in_ctx;
+        int j;
+	
+	int n_added_workers = 0;
+		
+        /*if null add the rest of the workers which don't already belong to this ctx*/
+        if(new_workers == NULL)
+          {
+                for(j = 0; j < ntotal_workers; j++)
+                  {
+                        struct starpu_worker_s *workerarg = _starpu_get_worker_struct(j);
+                        if(!_starpu_worker_belongs_to_ctx(j, sched_ctx))
+                          {
+                                sched_ctx->workerid[++nworkerids_already_in_ctx] = j;
+                                workerarg->sched_ctx[_starpu_get_first_free_sched_ctx_in_worker_list(workerarg)] = sched_ctx;
+                          }
+                  }
+                sched_ctx->nworkers_in_ctx = ntotal_workers;
+          }
+        else
+          {
+                int i;
+                for(i = 0; i < nnew_workers; i++)
+                  {
+                        /*take care the user does not ask for a resource that does not exist*/
+                        STARPU_ASSERT( new_workers[i] >= 0 &&  new_workers[i] <= ntotal_workers);
+			struct starpu_worker_s *workerarg = _starpu_get_worker_struct(new_workers[i]);
+			if(!_starpu_worker_belongs_to_ctx(new_workers[i], sched_ctx))
+			  {
+			    /* add worker to context */
+			    sched_ctx->workerid[ nworkerids_already_in_ctx + i] = new_workers[i];
+			    /* add context to worker */
+			    workerarg->sched_ctx[_starpu_get_first_free_sched_ctx_in_worker_list(workerarg)] = sched_ctx;
+			    n_added_workers++;
+			  }
+                  }
+                sched_ctx->nworkers_in_ctx += n_added_workers;
+          }
+
+        sched_ctx->sched_policy->init_sched_for_workers(sched_ctx->sched_ctx_id, n_added_workers);
+
+        return;
+}
+
+void starpu_delete_sched_ctx(unsigned sched_ctx_id, unsigned inheritor_sched_ctx_id)
 {
 	if(!starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id))
 	  {
 
 		struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+		struct starpu_sched_ctx *inheritor_sched_ctx = _starpu_get_sched_ctx(inheritor_sched_ctx_id);
 
 		/* block the workers until the contex is switched */
 		set_changing_ctx_flag(STATUS_CHANGING_CTX, sched_ctx->nworkers_in_ctx, sched_ctx->workerid);
 		_starpu_manage_delete_sched_ctx(sched_ctx);
+		_starpu_add_workers_to_sched_ctx(sched_ctx->workerid, sched_ctx->nworkers_in_ctx, inheritor_sched_ctx);
 		/* also wait the workers to wake up before using the context */
 		set_changing_ctx_flag(STATUS_UNKNOWN, sched_ctx->nworkers_in_ctx, sched_ctx->workerid);
 	  }		
@@ -341,52 +413,6 @@ void _starpu_increment_nsubmitted_tasks_of_worker(int workerid)
 	return;
 }
 
-static void _starpu_add_workers_to_sched_ctx(int *new_workers, int nnew_workers,
-                                     struct starpu_sched_ctx *sched_ctx)
-{
-        struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
-        int ntotal_workers = config->topology.nworkers;
-
-        STARPU_ASSERT((nnew_workers + sched_ctx->nworkers_in_ctx) <= ntotal_workers);
-
-        int nworkerids_already_in_ctx =  sched_ctx->nworkers_in_ctx;
-        int j;
-
-        /*if null add the rest of the workers which don't already belong to this ctx*/
-        if(new_workers == NULL)
-          {
-                for(j = 0; j < ntotal_workers; j++)
-                  {
-                        struct starpu_worker_s *workerarg = _starpu_get_worker_struct(j);
-                        if(!_starpu_worker_belongs_to_ctx(workerarg, sched_ctx))
-                          {
-                                sched_ctx->workerid[++nworkerids_already_in_ctx] = j;
-                                workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
-                          }
-                  }
-                sched_ctx->nworkers_in_ctx = ntotal_workers;
-          }
-        else
-          {
-                int i;
-                for(i = 0; i < nnew_workers; i++)
-                  {
-                        /*take care the user does not ask for a resource that does not exist*/
-                        STARPU_ASSERT( new_workers[i] >= 0 &&  new_workers[i] <= ntotal_workers);
-
-                        /* add worker to context */
-                        sched_ctx->workerid[ nworkerids_already_in_ctx + i] = new_workers[i];
-                        /* add context to worker */
-                        struct starpu_worker_s *workerarg = _starpu_get_worker_struct(new_workers[i]);
-                        workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
-                  }
-                sched_ctx->nworkers_in_ctx += nnew_workers;
-          }
-
-        sched_ctx->sched_policy->init_sched_for_workers(sched_ctx->sched_ctx_id, nnew_workers);
-
-        return;
-}
 
 void starpu_add_workers_to_sched_ctx(int *workerids_in_ctx, int nworkerids_in_ctx,
 				     unsigned sched_ctx_id)

+ 2 - 0
src/core/sched_ctx.h

@@ -58,6 +58,8 @@ struct starpu_sched_ctx {
 	pthread_mutex_t **sched_mutex;
 };
 
+void _starpu_init_sched_ctx_for_worker(unsigned workerid);
+
 unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerid, int nworkerids, unsigned is_init_sched, const char *sched_name);
 
 void _starpu_delete_all_sched_ctxs();

+ 10 - 7
src/core/sched_policy.c

@@ -246,8 +246,8 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 		starpu_prefetch_task_input_on_node(task, memory_node);
 
 	unsigned i;
-	for(i = 0; i < worker->nctxs; i++){
-		if (worker->sched_ctx[i]->sched_policy->push_task_notify){
+	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++){
+		if (worker->sched_ctx[i] != NULL && worker->sched_ctx[i]->sched_policy->push_task_notify){
 		  worker->sched_ctx[i]->sched_policy->push_task_notify(task, workerid, worker->sched_ctx[i]->sched_ctx_id);
 		}
 	}
@@ -343,18 +343,21 @@ struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker)
 		pthread_mutex_t *sched_mutex_ctx;
 
 		unsigned i;
-		for(i = 0; i < worker->nctxs; i++)
+		for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
 		  {
 			sched_ctx = worker->sched_ctx[i];
-			sched_mutex_ctx = _starpu_get_sched_mutex(sched_ctx, worker->workerid);
-			PTHREAD_MUTEX_LOCK(sched_mutex_ctx);
-			if (sched_ctx->sched_policy->pop_task)
+			if(sched_ctx != NULL)
 			  {
+			    sched_mutex_ctx = _starpu_get_sched_mutex(sched_ctx, worker->workerid);
+			    PTHREAD_MUTEX_LOCK(sched_mutex_ctx);
+			    if (sched_ctx->sched_policy->pop_task)
+			      {
 				task = sched_ctx->sched_policy->pop_task(sched_ctx->sched_ctx_id);
 				PTHREAD_MUTEX_UNLOCK(sched_mutex_ctx);
 				break;
+			      }
+			    PTHREAD_MUTEX_UNLOCK(sched_mutex_ctx);
 			  }
-			PTHREAD_MUTEX_UNLOCK(sched_mutex_ctx);
 		  }
 	  }
 

+ 4 - 4
src/core/topology.c

@@ -328,7 +328,7 @@ static int _starpu_init_machine_config(struct starpu_machine_config_s *config,
 		config->workers[topology->nworkers + cudagpu].devid = devid;
 		config->workers[topology->nworkers + cudagpu].perf_arch = arch; 
 		config->workers[topology->nworkers + cudagpu].worker_mask = STARPU_CUDA;
-		config->workers[topology->nworkers + cudagpu].sched_ctx = (struct starpu_sched_ctx**)malloc(sizeof(struct starpu_sched_ctx*));
+		_starpu_init_sched_ctx_for_worker(config->workers[topology->nworkers + cudagpu].workerid);
 		config->worker_mask |= STARPU_CUDA;
 
                 uint32_t key = _starpu_crc32_be(devid, 0);
@@ -395,7 +395,7 @@ static int _starpu_init_machine_config(struct starpu_machine_config_s *config,
 		config->workers[topology->nworkers + openclgpu].devid = devid;
 		config->workers[topology->nworkers + openclgpu].perf_arch = arch; 
 		config->workers[topology->nworkers + openclgpu].worker_mask = STARPU_OPENCL;
-		config->workers[topology->nworkers + openclgpu].sched_ctx = (struct starpu_sched_ctx**)malloc(sizeof(struct starpu_sched_ctx*));
+		_starpu_init_sched_ctx_for_worker(config->workers[topology->nworkers + openclgpu].workerid);
 		config->worker_mask |= STARPU_OPENCL;
 	}
 
@@ -430,7 +430,7 @@ static int _starpu_init_machine_config(struct starpu_machine_config_s *config,
 		config->workers[topology->nworkers + spu].id = spu;
 		config->workers[topology->nworkers + spu].worker_is_running = 0;
 		config->workers[topology->nworkers + spu].worker_mask = STARPU_GORDON;
-		config->workers[topology->nworkers + spu].sched_ctx = (struct starpu_sched_ctx**)malloc(sizeof(struct starpu_sched_ctx*));
+		_starpu_init_sched_ctx_for_worker(config->workers[topology->nworkers + spu].workerid);
 		config->worker_mask |= STARPU_GORDON;
 	}
 
@@ -465,7 +465,7 @@ static int _starpu_init_machine_config(struct starpu_machine_config_s *config,
 		config->workers[topology->nworkers + cpu].devid = cpu;
 		config->workers[topology->nworkers + cpu].worker_mask = STARPU_CPU;
 		config->worker_mask |= STARPU_CPU;
-		config->workers[topology->nworkers + cpu].sched_ctx = (struct starpu_sched_ctx**)malloc(sizeof(struct starpu_sched_ctx*));
+		_starpu_init_sched_ctx_for_worker(config->workers[topology->nworkers + cpu].workerid);
 	}
 
 	topology->nworkers += topology->ncpus;

+ 1 - 0
src/core/workers.h

@@ -76,6 +76,7 @@ struct starpu_worker_s {
 	unsigned worker_is_running;
 	unsigned worker_is_initialized;
 	starpu_worker_status status; /* what is the worker doing now ? (eg. CALLBACK) */
+	starpu_worker_status blocking_status; /* blocked or not */
 	char name[32];
 
 	struct starpu_sched_ctx **sched_ctx;

+ 1 - 2
src/drivers/cpu/driver_cpu.c

@@ -161,7 +161,7 @@ void *_starpu_cpu_worker(void *arg)
 		/*when contex is changing block the threads belonging to it*/
 		PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
 
-		if(cpu_arg->status == STATUS_CHANGING_CTX){
+		if(cpu_arg->blocking_status == STATUS_CHANGING_CTX){
 			_starpu_increment_nblocked_ths(cpu_arg->nworkers_of_next_ctx);
 			_starpu_block_worker(workerid, changing_ctx_cond, changing_ctx_mutex);
 			_starpu_decrement_nblocked_ths();
@@ -172,7 +172,6 @@ void *_starpu_cpu_worker(void *arg)
 		/* take the mutex inside pop because it depends what mutex:
 		   the one of the local task or the one of one of the strategies */
 		task = _starpu_pop_task(cpu_arg);
-
                 if (!task) 
 		{
 			PTHREAD_MUTEX_LOCK(sched_mutex);

+ 1 - 1
src/drivers/cuda/driver_cuda.c

@@ -288,7 +288,7 @@ void *_starpu_cuda_worker(void *arg)
 		/*when contex is changing block the threads belonging to it*/
                 PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
 
-                if(args->status == STATUS_CHANGING_CTX){
+                if(args->blocking_status == STATUS_CHANGING_CTX){
 			_starpu_increment_nblocked_ths(args->nworkers_of_next_ctx);
 			_starpu_block_worker(workerid, changing_ctx_cond, changing_ctx_mutex);
 			_starpu_decrement_nblocked_ths();