Преглед на файлове

List of scheduler contexts for each worker

Andra Hugo преди 14 години
родител
ревизия
6ae54ed3a9
променени са 7 файла, в които са добавени 55 реда и са изтрити 31 реда
  1. 0 1
      include/starpu_scheduler.h
  2. 3 3
      simple_ex/exemple.c
  3. 42 21
      src/core/sched_policy.c
  4. 1 1
      src/core/sched_policy.h
  5. 3 3
      src/core/workers.c
  6. 5 1
      src/core/workers.h
  7. 1 1
      src/drivers/cpu/driver_cpu.c

+ 0 - 1
include/starpu_scheduler.h

@@ -104,7 +104,6 @@ struct starpu_sched_ctx {
 	struct starpu_sched_policy_s *sched_policy;
 	int workerid[STARPU_NMAXWORKERS];
 	int nworkers_in_ctx;
-	int nworkers_of_next_ctx;
 };
 
 

+ 3 - 3
simple_ex/exemple.c

@@ -76,14 +76,14 @@ int main(int argc, char **argv)
   starpu_task_submit_to_ctx(task, &sched_ctx);
 
   struct starpu_sched_ctx sched_ctx2;
-  int procs2[]={4, 5, 6, 7};
+  int procs2[]={3, 4, 5, 6, 7};
   starpu_create_sched_ctx(&sched_ctx2, "random", procs2, 4);
 
   struct starpu_task *task3 = starpu_task_create();
   task3->cl = &cl;
   task3->buffers[0].handle = starpu_data_get_sub_data(dataA, 0);
   task3->buffers[0].mode = STARPU_R;
-  task3->name = "third 4 5 6 7";
+  task3->name = "third 3 4 5 6 7";
   starpu_task_submit_to_ctx(task3, &sched_ctx2);
 
 
@@ -91,7 +91,7 @@ int main(int argc, char **argv)
   task2->cl = &cl;
   task2->buffers[0].handle = starpu_data_get_sub_data(dataA, 0);
   task2->buffers[0].mode = STARPU_R;
-  task2->name = "second > 7";
+  task2->name = "anything";
   starpu_task_submit(task2);
 
   

+ 42 - 21
src/core/sched_policy.c

@@ -145,7 +145,7 @@ static void display_sched_help_message(void)
 	 }
 }
 
-static struct starpu_sched_policy_s *select_sched_policy(struct starpu_machine_config_s *config, int init_ctx, const char *policy_name)
+static struct starpu_sched_policy_s *select_sched_policy(struct starpu_machine_config_s *config, const char *policy_name)
 {
 	struct starpu_sched_policy_s *selected_policy = NULL;
 	struct starpu_conf *user_conf = config->user_conf;
@@ -178,7 +178,7 @@ static struct starpu_sched_policy_s *select_sched_policy(struct starpu_machine_c
 	return &_starpu_sched_eager_policy;
 }
 
-void _starpu_init_sched_policy(struct starpu_machine_config_s *config, struct starpu_sched_ctx *sched_ctx, int init_ctx, const char *policy_name)
+void _starpu_init_sched_policy(struct starpu_machine_config_s *config, struct starpu_sched_ctx *sched_ctx, const char *policy_name)
 {
 	/* Perhaps we have to display some help */
 	display_sched_help_message();
@@ -203,7 +203,7 @@ void _starpu_init_sched_policy(struct starpu_machine_config_s *config, struct st
 	_starpu_set_calibrate_flag(do_calibrate);
 
 	struct starpu_sched_policy_s *selected_policy;
-	selected_policy = select_sched_policy(config, init_ctx, policy_name);
+	selected_policy = select_sched_policy(config, policy_name);
 
 	load_sched_policy(selected_policy, sched_ctx);
 
@@ -245,8 +245,11 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 	if (use_prefetch)
 		starpu_prefetch_task_input_on_node(task, memory_node);
 
-	if (worker->sched_ctx->sched_policy->push_task_notify)
-		worker->sched_ctx->sched_policy->push_task_notify(task, workerid);
+	unsigned i;
+	for(i = 0; i < worker->nctxs; i++){
+		if (worker->sched_ctx[i]->sched_policy->push_task_notify)
+			worker->sched_ctx[i]->sched_policy->push_task_notify(task, workerid);
+	}
 
 	if (is_basic_worker)
 	{
@@ -319,7 +322,6 @@ int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
 struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker)
 {
 	struct starpu_task *task;
-	struct starpu_sched_ctx *sched_ctx = worker->sched_ctx;
 
 	/* We can't tell in advance which task will be picked up, so we measure
 	 * a timestamp, and will attribute it afterwards to the task. */
@@ -331,12 +333,22 @@ struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker)
 	/* perhaps there is some local task to be executed first */
 	task = _starpu_pop_local_task(worker);
 
-	if (!task && sched_ctx->sched_policy->pop_task)
-		task = sched_ctx->sched_policy->pop_task();
+	if(!task){
+		struct starpu_sched_ctx *sched_ctx;
+		unsigned i;
+		for(i = 0; i < worker->nctxs; i++){
+			sched_ctx = worker->sched_ctx[i];
+			if (sched_ctx->sched_policy->pop_task){
+				task = sched_ctx->sched_policy->pop_task();
+				break;
+			}
+		}
+	}
 
 	if(task){
 	  printf("task %s poped by th %d with strateg %s\n", task->name, worker->workerid, task->sched_ctx->sched_policy->policy_name);
 	}
+
 	/* Note that we may get a NULL task in case the scheduler was unlocked
 	 * for some reason. */
 	if (profiling && task)
@@ -402,8 +414,15 @@ int starpu_push_local_task(int workerid, struct starpu_task *task, int back)
 	return _starpu_push_local_task(worker, task, back);
 }
 
+static int starpu_get_ctx_id(struct starpu_sched_ctx *sched_ctx, struct starpu_worker_s *worker){
+	unsigned i;
+	for(i = 0; i < worker->nctxs; i++)
+		if(worker->sched_ctx[i] == sched_ctx)
+			return i;
+	return -1;
+}
 
-void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int *workerids_in_ctx, int nworkerids_in_ctx, int init_ctx)
+void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int *workerids_in_ctx, int nworkerids_in_ctx)
 {
 	sched_ctx->nworkers_in_ctx = nworkerids_in_ctx;
 	sched_ctx->sched_policy = malloc(sizeof(struct starpu_sched_policy_s));
@@ -418,8 +437,7 @@ void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *po
 		for(j = 0; j < nworkers; j++){
 			sched_ctx->workerid[j] = j;
 			struct starpu_worker_s *workerarg = &config->workers[j];
-			_starpu_delete_sched_ctx(workerarg->sched_ctx);
-			workerarg->sched_ctx = sched_ctx;
+			workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
 		}
 		sched_ctx->nworkers_in_ctx = nworkers;
 	} else {
@@ -429,15 +447,13 @@ void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *po
 			for(j = 0; j < nworkers; j++){
 				if(sched_ctx->workerid[i] == j){
 					struct starpu_worker_s *workerarg = &config->workers[j];
-					_starpu_delete_sched_ctx(workerarg->sched_ctx);
-					workerarg->sched_ctx = sched_ctx;
+					workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
 				}
 			}
 		}
 	}
 
-	_starpu_init_sched_policy(config, sched_ctx, init_ctx, policy_name);
-
+	_starpu_init_sched_policy(config, sched_ctx, policy_name);
 
 	return;
 }
@@ -492,11 +508,13 @@ int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_c
 		changing_ctx_mutex = &worker->changing_ctx_mutex;
 		changing_ctx_cond = &worker->changing_ctx_cond;
 
+		/*if the status is CHANGING_CTX let the thread know that it must block*/
 		PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
 		worker->status = changing_ctx;
-		worker->sched_ctx->nworkers_of_next_ctx = nworkers;
+		worker->nworkers_of_next_ctx = nworkers;
 		PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 		
+		/*if we have finished changing the ctx wake up the blocked threads*/
 		if(changing_ctx == STATUS_UNKNOWN){
 			PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
 			PTHREAD_COND_SIGNAL(changing_ctx_cond);
@@ -505,9 +523,10 @@ int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_c
 		}
 	}
 
-	if(changing_ctx == STATUS_CHANGING_CTX){
+	/*after letting know all the concered threads about the change
+	  wait for them to take into account the info*/
+	if(changing_ctx == STATUS_CHANGING_CTX)
 		_starpu_wait_for_all_threads_to_block(nworkers);
-	}
 
 	return 0;
 }
@@ -516,7 +535,7 @@ void starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *pol
     	  if(!starpu_task_wait_for_all()){
 		/*block the workers until the contex is switched*/
 		set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx);
-		_starpu_create_sched_ctx(sched_ctx, policy_name, workerids_in_ctx, nworkerids_in_ctx, 0);
+		_starpu_create_sched_ctx(sched_ctx, policy_name, workerids_in_ctx, nworkerids_in_ctx);
 		set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx);
 	  }
 	  return;
@@ -529,14 +548,16 @@ void _starpu_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx)
        
 	unsigned used_sched_ctx = 0;
 	int i;
+	int workerid = -1;
 	for(i = 0; i < nworkers; i++){
 		struct starpu_worker_s *workerarg = &config->workers[i];
-		if(sched_ctx != NULL && workerarg->sched_ctx == sched_ctx && workerarg->status != STATUS_JOINED)
+		workerid = starpu_get_ctx_id(sched_ctx, workerarg);
+		
+		if(sched_ctx != NULL && workerid != -1 && workerarg->status != STATUS_JOINED)
 			used_sched_ctx++;
 	}
 
 	if(used_sched_ctx < 2  && sched_ctx != NULL){
-	  printf("free \n");
 		free(sched_ctx->sched_policy);
 		sched_ctx->sched_policy = NULL;
 		sched_ctx = NULL;

+ 1 - 1
src/core/sched_policy.h

@@ -37,7 +37,7 @@ void _starpu_sched_post_exec_hook(struct starpu_task *task);
 
 void _starpu_wait_on_sched_event(void);
 
-void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int *workerid, int nworkerids, int init_ctx);
+void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int *workerid, int nworkerids);
 
 void _starpu_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx);
 

+ 3 - 3
src/core/workers.c

@@ -154,7 +154,7 @@ static void _starpu_launch_drivers(struct starpu_machine_config_s *config)
 
 		workerarg->worker_size = 1;
 		workerarg->combined_workerid = workerarg->workerid;
-		workerarg->current_rank = 0;
+		workerarg->current_rank = 1;
 
 		/* if some codelet's termination cannot be handled directly :
 		 * for instance in the Gordon driver, Gordon tasks' callbacks
@@ -351,9 +351,9 @@ int starpu_init(struct starpu_conf *user_conf)
 	/* initialize the scheduling policy */
 
 	if(user_conf == NULL)
-	  _starpu_create_sched_ctx(&sched_ctx, NULL, NULL, -1, 1);
+	  _starpu_create_sched_ctx(&sched_ctx, NULL, NULL, -1);
 	else
-	  _starpu_create_sched_ctx(&sched_ctx, user_conf->sched_policy_name, NULL, -1, 1);
+	  _starpu_create_sched_ctx(&sched_ctx, user_conf->sched_policy_name, NULL, -1);
 
 	//_starpu_init_sched_policy(&config, &sched_ctx);
 

+ 5 - 1
src/core/workers.h

@@ -77,10 +77,14 @@ struct starpu_worker_s {
 	unsigned worker_is_initialized;
 	starpu_worker_status status; /* what is the worker doing now ? (eg. CALLBACK) */
 	char name[32];
-	struct starpu_sched_ctx *sched_ctx;
+
+	struct starpu_sched_ctx *sched_ctx[STARPU_NMAXSCHEDCTXS];
+	unsigned nctxs; /* the no of contexts a worker belongs to*/
 	unsigned changing_ctx;
 	pthread_mutex_t changing_ctx_mutex;
 	pthread_cond_t changing_ctx_cond;
+	int nworkers_of_next_ctx;
+
 #ifdef __GLIBC__
 	cpu_set_t initial_cpu_set;
 	cpu_set_t current_cpu_set;

+ 1 - 1
src/drivers/cpu/driver_cpu.c

@@ -159,7 +159,7 @@ void *_starpu_cpu_worker(void *arg)
 		/*when contex is changing block the threads belonging to it*/
 		PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
 		if(cpu_arg->status == STATUS_CHANGING_CTX){
-			_starpu_increment_nblocked_ths(cpu_arg->sched_ctx->nworkers_of_next_ctx);
+			_starpu_increment_nblocked_ths(cpu_arg->nworkers_of_next_ctx);
 			_starpu_block_worker(workerid, changing_ctx_cond, changing_ctx_mutex);
 		}
 		PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);