Bladeren bron

Add workers and remove workers from context dynamicaly

Andra Hugo 14 jaren geleden
bovenliggende
commit
effd9bfbe0
6 gewijzigde bestanden met toevoegingen van 427 en 189 verwijderingen
  1. 5 1
      include/starpu_scheduler.h
  2. 1 1
      simple_ex/Makefile
  3. 32 2
      simple_ex/exemple.c
  4. 383 181
      src/core/sched_ctx.c
  5. 4 4
      src/core/sched_policy.c
  6. 2 0
      src/drivers/cuda/driver_cuda.c

+ 5 - 1
include/starpu_scheduler.h

@@ -104,13 +104,17 @@ struct starpu_sched_ctx {
 	struct starpu_sched_policy_s *sched_policy; /*policy of the contex */
 	struct starpu_sched_policy_s *sched_policy; /*policy of the contex */
 	int workerid[STARPU_NMAXWORKERS]; /*list of indices of workers */
 	int workerid[STARPU_NMAXWORKERS]; /*list of indices of workers */
 	int nworkers_in_ctx; /*number of threads in contex */
 	int nworkers_in_ctx; /*number of threads in contex */
-	unsigned is_init_sched; /*we keep an init sched which we never delete */
+	unsigned is_initial_sched; /*we keep an initial sched which we never delete */
 };
 };
 
 
 void starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int *workerids_in_ctx, int nworkerids_in_ctx);
 void starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int *workerids_in_ctx, int nworkerids_in_ctx);
 
 
 void starpu_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx);
 void starpu_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx);
 
 
+void starpu_add_workers_to_sched_ctx(int *workerids_in_ctx, int nworkerids_in_ctx, struct starpu_sched_ctx *sched_ctx);
+
+void starpu_remove_workers_from_sched_ctx(int *workerids_in_ctx, int nworkerids_in_ctx, struct starpu_sched_ctx *sched_ctx);
+
 /* When there is no available task for a worker, StarPU blocks this worker on a
 /* When there is no available task for a worker, StarPU blocks this worker on a
 condition variable. This function specifies which condition variable (and the
 condition variable. This function specifies which condition variable (and the
 associated mutex) should be used to block (and to wake up) a worker. Note that
 associated mutex) should be used to block (and to wake up) a worker. Note that

+ 1 - 1
simple_ex/Makefile

@@ -9,7 +9,7 @@ all: $(PROG)
 
 
 CC      := gcc
 CC      := gcc
 NVCC    := /usr/local/cuda/bin/nvcc
 NVCC    := /usr/local/cuda/bin/nvcc
-CFLAGS  := $$(pkg-config --cflags libstarpu) -O0 -g #-Wall
+CFLAGS  := $$(pkg-config --cflags libstarpu) -g #-Wall
 LDFLAGS := $$(pkg-config --libs libstarpu)
 LDFLAGS := $$(pkg-config --libs libstarpu)
 CUDADIR=$(CUDA_HOME)
 CUDADIR=$(CUDA_HOME)
 
 

+ 32 - 2
simple_ex/exemple.c

@@ -70,10 +70,8 @@ int main(int argc, char **argv)
   int procs[] = {1, 2, 3};
   int procs[] = {1, 2, 3};
   starpu_create_sched_ctx(&sched_ctx, "random", procs, 3);
   starpu_create_sched_ctx(&sched_ctx, "random", procs, 3);
 
 
-  unsigned block_id[children];  
   unsigned j;
   unsigned j;
   for(j = 0; j < children; j++){
   for(j = 0; j < children; j++){
-    block_id[j] = j;
     struct starpu_task *task = starpu_task_create();
     struct starpu_task *task = starpu_task_create();
     task->cl = &cl;
     task->cl = &cl;
     task->synchronous = 1;
     task->synchronous = 1;
@@ -84,6 +82,38 @@ int main(int argc, char **argv)
     starpu_task_submit_to_ctx(task, &sched_ctx);
     starpu_task_submit_to_ctx(task, &sched_ctx);
   }
   }
 
 
+  int procs_to_remove[]={1,3};
+  starpu_remove_workers_from_sched_ctx(procs_to_remove, 2, &sched_ctx);
+
+  printf("procs removed \n");
+
+  for(j = 0; j < children; j++){
+    struct starpu_task *task = starpu_task_create();
+    task->cl = &cl;
+    task->synchronous = 1;
+    task->callback_func = NULL;
+    task->buffers[0].handle = starpu_data_get_sub_data(dataA, 1, j);
+    task->buffers[0].mode = STARPU_RW;
+    task->name = "first 2";  
+    starpu_task_submit_to_ctx(task, &sched_ctx);
+  }
+
+  int procs_to_add[]={1, 4, 5};
+  starpu_add_workers_to_sched_ctx(procs_to_add, 2, &sched_ctx);
+
+  printf("procs add \n");
+
+  for(j = 0; j < children; j++){
+    struct starpu_task *task = starpu_task_create();
+    task->cl = &cl;
+    task->synchronous = 1;
+    task->callback_func = NULL;
+    task->buffers[0].handle = starpu_data_get_sub_data(dataA, 1, j);
+    task->buffers[0].mode = STARPU_RW;
+    task->name = "first 1 2 4 5";  
+    starpu_task_submit_to_ctx(task, &sched_ctx);
+  }
+
 
 
   struct starpu_sched_ctx sched_ctx2;
   struct starpu_sched_ctx sched_ctx2;
   int procs2[]={3, 4, 5, 6, 7};
   int procs2[]={3, 4, 5, 6, 7};

+ 383 - 181
src/core/sched_ctx.c

@@ -13,128 +13,141 @@ static pthread_mutex_t blocking_ths_mutex = PTHREAD_MUTEX_INITIALIZER;
 static int nblocked_ths = 0;
 static int nblocked_ths = 0;
 
 
 void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int
 void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int
-			      *workerids_in_ctx, int nworkerids_in_ctx, unsigned is_init_sched)
+			      *workerids_in_ctx, int nworkerids_in_ctx, unsigned is_initial_sched)
 {
 {
-  sched_ctx->nworkers_in_ctx = nworkerids_in_ctx;
-  sched_ctx->sched_policy = malloc(sizeof(struct starpu_sched_policy_s));
-  sched_ctx->is_init_sched = is_init_sched;
-
-  struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
-  int nworkers = config->topology.nworkers;
-
-  int j;
-  /*all the workers are in this contex*/
-  if(workerids_in_ctx == NULL){
-    for(j = 0; j < nworkers; j++){
-      sched_ctx->workerid[j] = j;
-      struct starpu_worker_s *workerarg = _starpu_get_worker_struct(j);
-      workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
-    }
-    sched_ctx->nworkers_in_ctx = nworkers;
-  } else {
-    int i;
-    for(i = 0; i < nworkerids_in_ctx; i++){
-      sched_ctx->workerid[i] = workerids_in_ctx[i];
-      for(j = 0; j < nworkers; j++){
-	if(sched_ctx->workerid[i] == j){
-	        struct starpu_worker_s *workerarg = _starpu_get_worker_struct(j);
-		workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
-	}
-      }
-    }
-  }
-
-  _starpu_init_sched_policy(config, sched_ctx, policy_name);
-
-  return;
+	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
+	int nworkers = config->topology.nworkers;
+	
+	STARPU_ASSERT(nworkerids_in_ctx <= nworkers);
+  
+	sched_ctx->nworkers_in_ctx = nworkerids_in_ctx;
+	sched_ctx->sched_policy = malloc(sizeof(struct starpu_sched_policy_s));
+	sched_ctx->is_initial_sched = is_initial_sched;
+
+
+	int j;
+	/*all the workers are in this contex*/
+	if(workerids_in_ctx == NULL)
+	  {
+		for(j = 0; j < nworkers; j++)
+		  {
+			sched_ctx->workerid[j] = j;
+			struct starpu_worker_s *workerarg = _starpu_get_worker_struct(j);
+			workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
+		}
+		sched_ctx->nworkers_in_ctx = nworkers;
+	  } 
+	else 
+	  {
+		int i;
+		for(i = 0; i < nworkerids_in_ctx; i++)
+		  {
+			/*take care the user does not ask for a resource that does not exist*/
+			STARPU_ASSERT( workerids_in_ctx[i] >= 0 &&  workerids_in_ctx[i] <= nworkers);
+		    
+			sched_ctx->workerid[i] = workerids_in_ctx[i];
+			for(j = 0; j < nworkers; j++)
+			  {
+				if(sched_ctx->workerid[i] == j)
+				  {
+					struct starpu_worker_s *workerarg = _starpu_get_worker_struct(j);
+					workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
+				  }
+			  }
+		  }
+	  }
+
+	_starpu_init_sched_policy(config, sched_ctx, policy_name);
+	
+	return;
 }
 }
 
 
 void _starpu_decrement_nblocked_ths(void)
 void _starpu_decrement_nblocked_ths(void)
 {
 {
-  PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
+	PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
 
 
-  if(--nblocked_ths == 0)
-    PTHREAD_COND_BROADCAST(&wakeup_ths_cond);
+	if(--nblocked_ths == 0)
+		PTHREAD_COND_BROADCAST(&wakeup_ths_cond);
 
 
-  PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
+	PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
 }
 }
 
 
 void _starpu_increment_nblocked_ths(int nworkers)
 void _starpu_increment_nblocked_ths(int nworkers)
 {
 {
-  PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
-  if (++nblocked_ths == nworkers)
-    PTHREAD_COND_BROADCAST(&blocking_ths_cond);
+	PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
+	if (++nblocked_ths == nworkers)
+		PTHREAD_COND_BROADCAST(&blocking_ths_cond);
 
 
-  PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
+	PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
 }
 }
 
 
 static int _starpu_wait_for_all_threads_to_block(int nworkers)
 static int _starpu_wait_for_all_threads_to_block(int nworkers)
 {
 {
-  PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
+	PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
 
 
-  while (nblocked_ths < nworkers)
-    PTHREAD_COND_WAIT(&blocking_ths_cond, &blocking_ths_mutex);
+	while (nblocked_ths < nworkers)
+		PTHREAD_COND_WAIT(&blocking_ths_cond, &blocking_ths_mutex);
 
 
-  PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
-
-  return 0;
+	PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
+	
+	return 0;
 }
 }
 
 
 static int _starpu_wait_for_all_threads_to_wake_up(void)
 static int _starpu_wait_for_all_threads_to_wake_up(void)
 {
 {
-  PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
-
-  while (nblocked_ths > 0)
-    PTHREAD_COND_WAIT(&wakeup_ths_cond, &blocking_ths_mutex);
-
-  PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
-
-  return 0;
+	PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
+	
+	while (nblocked_ths > 0)
+		PTHREAD_COND_WAIT(&wakeup_ths_cond, &blocking_ths_mutex);
+
+	PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
+	
+	return 0;
 }
 }
 
 
 static int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_ctx, int
 static int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_ctx, int
 				 *workerids_in_ctx)
 				 *workerids_in_ctx)
 {
 {
-  struct starpu_machine_config_s *config = _starpu_get_machine_config();
-
-  int i;
-  int nworkers = nworkerids_in_ctx == -1 ? config->topology.nworkers : nworkerids_in_ctx;
-
-  struct starpu_worker_s *worker = NULL;
-  pthread_mutex_t *changing_ctx_mutex = NULL;
-  pthread_cond_t *changing_ctx_cond = NULL;
-
-  int workerid = -1;
-
-  for(i = 0; i < nworkers; i++)
-    {
-      workerid = workerids_in_ctx == NULL ? i : workerids_in_ctx[i];
-      worker = _starpu_get_worker_struct(workerid);
-
-      changing_ctx_mutex = &worker->changing_ctx_mutex;
-      changing_ctx_cond = &worker->changing_ctx_cond;
-
-      /*if the status is CHANGING_CTX let the thread know that it must block*/
-      PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
-      worker->status = changing_ctx;
-      worker->nworkers_of_next_ctx = nworkers;
-      PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-
-      /*if we have finished changing the ctx wake up the blocked threads*/
-      if(changing_ctx == STATUS_UNKNOWN)
-	{
-	  PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
-	  PTHREAD_COND_SIGNAL(changing_ctx_cond);
-	  PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-	}
-    }
-
-  /*after letting know all the concerned threads about the change                        
-    wait for them to take into account the info*/
-  if(changing_ctx == STATUS_CHANGING_CTX)
-    _starpu_wait_for_all_threads_to_block(nworkers);
-  else
-    _starpu_wait_for_all_threads_to_wake_up();
+	struct starpu_machine_config_s *config = _starpu_get_machine_config();
+
+	int i;
+	int nworkers = nworkerids_in_ctx == -1 ? config->topology.nworkers : nworkerids_in_ctx;
+  
+	struct starpu_worker_s *worker = NULL;
+	pthread_mutex_t *changing_ctx_mutex = NULL;
+	pthread_cond_t *changing_ctx_cond = NULL;
+	
+	int workerid = -1;
+
+	for(i = 0; i < nworkers; i++)
+	  {
+		workerid = workerids_in_ctx == NULL ? i : workerids_in_ctx[i];
+		worker = _starpu_get_worker_struct(workerid);
+
+		changing_ctx_mutex = &worker->changing_ctx_mutex;
+		changing_ctx_cond = &worker->changing_ctx_cond;
+		
+		/*if the status is CHANGING_CTX let the thread know that it must block*/
+		PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+		worker->status = changing_ctx;
+		worker->nworkers_of_next_ctx = nworkers;
+		PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+
+		/*if we have finished changing the ctx wake up the blocked threads*/
+		if(changing_ctx == STATUS_UNKNOWN)
+		  {
+			PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+			PTHREAD_COND_SIGNAL(changing_ctx_cond);
+			PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+		  }
+	  }
+	
+	/*after letting know all the concerned threads about the change                        
+	  wait for them to take into account the info*/
+	if(changing_ctx == STATUS_CHANGING_CTX)
+		_starpu_wait_for_all_threads_to_block(nworkers);
+	else
+		_starpu_wait_for_all_threads_to_wake_up();
 
 
   return 0;
   return 0;
 }
 }
@@ -142,127 +155,316 @@ static int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkeri
 void starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int
 void starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int
 			     *workerids_in_ctx, int nworkerids_in_ctx)
 			     *workerids_in_ctx, int nworkerids_in_ctx)
 {
 {
-  /* wait for the workers concerned by the change of contex                              
-   * to finish their work in the previous context */
-  if(!starpu_wait_for_all_tasks_of_workers(workerids_in_ctx, nworkerids_in_ctx))
-    {
-      /* block the workers until the contex is switched */
-      set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx);
-      _starpu_create_sched_ctx(sched_ctx, policy_name, workerids_in_ctx, nworkerids_in_ctx, 0);
-      /* also wait the workers to wake up before using the context */
-      set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx);
-    }
-  return;
+	  /* wait for the workers concerned by the change of contex                              
+	   * to finish their work in the previous context */
+	if(!starpu_wait_for_all_tasks_of_workers(workerids_in_ctx, nworkerids_in_ctx))
+	  {
+		/* block the workers until the contex is switched */
+		set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx);
+		_starpu_create_sched_ctx(sched_ctx, policy_name, workerids_in_ctx, nworkerids_in_ctx, 0);
+		/* also wait the workers to wake up before using the context */
+		set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx);
+	  }
+	return;
 }
 }
 
 
-int worker_belongs_to_ctx(struct starpu_worker_s *workerarg, struct starpu_sched_ctx *sched_ctx)
+static unsigned _starpu_worker_belongs_to_ctx(struct starpu_worker_s *workerarg, struct starpu_sched_ctx *sched_ctx)
 {
 {
-  unsigned i;
-  for(i = 0; i < workerarg->nctxs; i++)
-    if(sched_ctx != NULL && workerarg->sched_ctx[i] == sched_ctx
-       && workerarg->status != STATUS_JOINED)
-      return 1;
-  return 0;
+	unsigned i;
+	for(i = 0; i < workerarg->nctxs; i++)
+		if(sched_ctx != NULL && workerarg->sched_ctx[i] == sched_ctx
+		   && workerarg->status != STATUS_JOINED)
+		  return 1;
+	return 0;
+}
 
 
+static void _starpu_remove_sched_ctx_from_worker(struct starpu_worker_s *workerarg, struct starpu_sched_ctx *sched_ctx)
+{
+	int i;
+	unsigned to_remove = 0;
+	for(i = 0; i < workerarg->nctxs; i++)
+	  {
+		if(sched_ctx != NULL && workerarg->sched_ctx[i] == sched_ctx
+		    && workerarg->status != STATUS_JOINED)
+		  {
+			workerarg->sched_ctx[i] = NULL;
+			to_remove = 1;
+		  }
+	  }
+	
+	/* if the the worker had belonged to the context it would have been found in the worker's list of sched_ctxs, so it can be removed */
+	if(to_remove)
+		workerarg->nctxs--;
+	return;
 }
 }
+
 void starpu_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx)
 void starpu_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx)
 {
 {
-  struct starpu_machine_config_s *config = _starpu_get_machine_config();
-  int nworkers = config->topology.nworkers;
-
-  int i;
-  for(i = 0; i < nworkers; i++)
-    {
-      struct starpu_worker_s *workerarg = _starpu_get_worker_struct(i);
-      if(worker_belongs_to_ctx(workerarg, sched_ctx))
-	workerarg->nctxs--;
-    }
-
-  free(sched_ctx->sched_policy);
-  sched_ctx->sched_policy = NULL;
+	struct starpu_machine_config_s *config = _starpu_get_machine_config();
+	int nworkers = config->topology.nworkers;
+
+	int i;
+	for(i = 0; i < nworkers; i++)
+	  {
+		struct starpu_worker_s *workerarg = _starpu_get_worker_struct(i);
+		_starpu_remove_sched_ctx_from_worker(workerarg, sched_ctx);
+	  }
+	
+	free(sched_ctx->sched_policy);
+	sched_ctx->sched_policy = NULL;
+
+	return;
 }
 }
 
 
 void _starpu_delete_all_sched_ctxs()
 void _starpu_delete_all_sched_ctxs()
 {
 {
-  struct starpu_machine_config_s *config = _starpu_get_machine_config();
-  unsigned nworkers = config->topology.nworkers;
-
-  unsigned i, j;
-  struct starpu_sched_ctx *sched_ctx = NULL;
-  struct starpu_worker_s *workerarg = NULL;
-  for(i = 0; i < nworkers; i++)
-    {
-      workerarg = _starpu_get_worker_struct(i);
-      for(j = 0; j < workerarg->nctxs; j++)
-	{
-	  sched_ctx = workerarg->sched_ctx[j];
-	  if(sched_ctx != NULL && !sched_ctx->is_init_sched)
-	    {
-	      free(sched_ctx->sched_policy);
-	      sched_ctx->sched_policy = NULL;
-	      workerarg->nctxs--;
-
-	    }
-	}
-    }
+	struct starpu_machine_config_s *config = _starpu_get_machine_config();
+	unsigned nworkers = config->topology.nworkers;
+	
+	unsigned i, j;
+	struct starpu_sched_ctx *sched_ctx = NULL;
+	struct starpu_worker_s *workerarg = NULL;
+	for(i = 0; i < nworkers; i++)
+	  {
+		workerarg = _starpu_get_worker_struct(i);
+		for(j = 0; j < workerarg->nctxs; j++)
+		  {
+			sched_ctx = workerarg->sched_ctx[j];
+			if(sched_ctx != NULL && !sched_ctx->is_initial_sched)
+			  {
+				free(sched_ctx->sched_policy);
+				sched_ctx->sched_policy = NULL;
+				workerarg->nctxs--;
+				
+			  }
+		  }
+	  }
+	return;
 }
 }
 
 
 int starpu_wait_for_all_tasks_of_worker(int workerid)
 int starpu_wait_for_all_tasks_of_worker(int workerid)
 {
 {
-  if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
-    return -EDEADLK;
+	if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
+		return -EDEADLK;
 
 
-  struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
+	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
+	
+	PTHREAD_MUTEX_LOCK(&worker->submitted_mutex);
 
 
-  PTHREAD_MUTEX_LOCK(&worker->submitted_mutex);
+	while (worker->nsubmitted > 0)
+		PTHREAD_COND_WAIT(&worker->submitted_cond, &worker->submitted_mutex);
 
 
-  while (worker->nsubmitted > 0)
-    PTHREAD_COND_WAIT(&worker->submitted_cond, &worker->submitted_mutex);
-
-  PTHREAD_MUTEX_UNLOCK(&worker->submitted_mutex);
-
-  return 0;
+	PTHREAD_MUTEX_UNLOCK(&worker->submitted_mutex);
+	
+	return 0;
 }
 }
 
 
 int starpu_wait_for_all_tasks_of_workers(int *workerids_in_ctx, int nworkerids_in_ctx){
 int starpu_wait_for_all_tasks_of_workers(int *workerids_in_ctx, int nworkerids_in_ctx){
-  int ret_val = 0;
-
-  struct starpu_machine_config_s *config = _starpu_get_machine_config();
-  int nworkers = nworkerids_in_ctx == -1 ? config->topology.nworkers : nworkerids_in_ctx;
+	int ret_val = 0;
+	
+	struct starpu_machine_config_s *config = _starpu_get_machine_config();
+	int nworkers = nworkerids_in_ctx == -1 ? config->topology.nworkers : nworkerids_in_ctx;
+	
+	int workerid = -1;
+	int i, n;
+	
+	for(i = 0; i < nworkers; i++)
+	  {
+		workerid = workerids_in_ctx == NULL ? i : workerids_in_ctx[i];
+		n = starpu_wait_for_all_tasks_of_worker(workerid);
+		ret_val = ret_val && n;
+	  }
+	
+	return ret_val;
+}
 
 
-  int workerid = -1;
-  int i, n;
+void _starpu_decrement_nsubmitted_tasks_of_worker(int workerid)
+{
+	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
+	
+	PTHREAD_MUTEX_LOCK(&worker->submitted_mutex);
+	
+	if (--worker->nsubmitted == 0)
+		PTHREAD_COND_BROADCAST(&worker->submitted_cond);
+
+	PTHREAD_MUTEX_UNLOCK(&worker->submitted_mutex);
+	return;
+}
 
 
-  for(i = 0; i < nworkers; i++)
-    {
-      workerid = workerids_in_ctx == NULL ? i : workerids_in_ctx[i];
-      n = starpu_wait_for_all_tasks_of_worker(workerid);
-      ret_val = ret_val && n;
-    }
+void _starpu_increment_nsubmitted_tasks_of_worker(int workerid)
+{
+	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
+
+	PTHREAD_MUTEX_LOCK(&worker->submitted_mutex);
+       
+	worker->nsubmitted++;
+	
+	PTHREAD_MUTEX_UNLOCK(&worker->submitted_mutex);
+	return;
+}
 
 
-  return ret_val;
+static void _starpu_add_workers_to_sched_ctx(int *workerids_in_ctx, int nworkerids_in_ctx, 
+				     struct starpu_sched_ctx *sched_ctx)
+{
+	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
+	int nworkers = config->topology.nworkers;
+	
+	STARPU_ASSERT((nworkerids_in_ctx + sched_ctx->nworkers_in_ctx) <= nworkers);
+	
+	int nworkerids_already_in_ctx =  sched_ctx->nworkers_in_ctx;
+	int j;
+
+	/*if null add the rest of the workers which don't already belong to this ctx*/
+	if(workerids_in_ctx == NULL)
+	  {
+		for(j = 0; j < nworkers; j++)
+		  {
+			struct starpu_worker_s *workerarg = _starpu_get_worker_struct(j);
+			if(!_starpu_worker_belongs_to_ctx(workerarg, sched_ctx))
+			  {
+				sched_ctx->workerid[++nworkerids_already_in_ctx] = j;
+				workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
+			  }
+			sched_ctx->nworkers_in_ctx = nworkers;
+		  }
+	  } 
+	else 
+	  {
+		int i;
+		for(i = 0; i < nworkerids_in_ctx; i++)
+		  {
+			/*take care the user does not ask for a resource that does not exist*/
+			STARPU_ASSERT( workerids_in_ctx[i] >= 0 &&  workerids_in_ctx[i] <= nworkers);
+		    
+			sched_ctx->workerid[ nworkerids_already_in_ctx + i] = workerids_in_ctx[i];
+			for(j = 0; j < nworkers; j++)
+			  {
+				if(sched_ctx->workerid[i] == j)
+				  {
+					struct starpu_worker_s *workerarg = _starpu_get_worker_struct(j);
+					workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
+				  }
+			  }
+		  }
+		sched_ctx->nworkers_in_ctx = nworkerids_in_ctx;
+	  }
+	return;
 }
 }
 
 
-void _starpu_decrement_nsubmitted_tasks_of_worker(int workerid)
+void starpu_add_workers_to_sched_ctx(int *workerids_in_ctx, int nworkerids_in_ctx,
+				     struct starpu_sched_ctx *sched_ctx)
 {
 {
-  struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
+  	  /* wait for the workers concerned by the change of contex                              
+	   * to finish their work in the previous context */
+	if(!starpu_wait_for_all_tasks_of_workers(workerids_in_ctx, nworkerids_in_ctx))
+	  {
+		/* block the workers until the contex is switched */
+		set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx);
+		_starpu_add_workers_to_sched_ctx(workerids_in_ctx, nworkerids_in_ctx, sched_ctx);
+		/* also wait the workers to wake up before using the context */
+		set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx);
+	  }
+	return;
 
 
-  PTHREAD_MUTEX_LOCK(&worker->submitted_mutex);
+}
 
 
-  if (--worker->nsubmitted == 0)
-    PTHREAD_COND_BROADCAST(&worker->submitted_cond);
+static int _starpu_get_first_free_space(int *workerids, int old_nworkerids_in_ctx)
+{
+  int i;
+  for(i = 0; i < old_nworkerids_in_ctx; i++)
+    if(workerids[i] == -1)
+      return i;
+  return -1;
+}
 
 
-  PTHREAD_MUTEX_UNLOCK(&worker->submitted_mutex);
+/* rearange array of workerids in order not to have {-1, -1, 5, -1, 7}
+   and have instead {5, 7, -1, -1, -1} 
+   it is easier afterwards to iterate the array
+*/
+static void _starpu_rearange_sched_ctx_workerids(struct starpu_sched_ctx *sched_ctx, int old_nworkerids_in_ctx)
+{
+	int first_free_id = -1;
+	int i;
+	for(i = 0; i < old_nworkerids_in_ctx; i++)
+	  {
+		if(sched_ctx->workerid[i] != -1)
+		  {
+			first_free_id = _starpu_get_first_free_space(sched_ctx->workerid, 
+								     old_nworkerids_in_ctx);
+			if(first_free_id != -1)
+			  {
+				sched_ctx->workerid[first_free_id] = sched_ctx->workerid[i];
+				sched_ctx->workerid[i] = -1;
+			  }
+		  }
+	  }
 }
 }
 
 
-void _starpu_increment_nsubmitted_tasks_of_worker(int workerid)
+static void _starpu_remove_workers_from_sched_ctx(int *workerids_in_ctx, int nworkerids_in_ctx, 
+					  struct starpu_sched_ctx *sched_ctx)
 {
 {
-  struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
+  	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
+	int nworkers = config->topology.nworkers;
+	
+	int nworkerids_already_in_ctx =  sched_ctx->nworkers_in_ctx;
+
+	STARPU_ASSERT(nworkerids_in_ctx  <= nworkerids_already_in_ctx);
+
+	int i, workerid;
+
+	/*if null remove all the workers that belong to this ctx*/
+	if(workerids_in_ctx == NULL)
+	  {
+		for(i = 0; i < nworkerids_already_in_ctx; i++)
+		  {
+			workerid = sched_ctx->workerid[i];
+			struct starpu_worker_s *workerarg = _starpu_get_worker_struct(workerid);
+			_starpu_remove_sched_ctx_from_worker(workerarg, sched_ctx);
+sched_ctx->workerid[i] = -1;
+		  }
+
+		sched_ctx->nworkers_in_ctx = 0;
+	  } 
+	else 
+	  {
+		for(i = 0; i < nworkerids_in_ctx; i++)
+		  {
+		    	workerid = workerids_in_ctx[i]; 
+			/* take care the user does not ask for a resource that does not exist */
+			STARPU_ASSERT( workerid >= 0 &&  workerid <= nworkers);
+
+			struct starpu_worker_s *workerarg = _starpu_get_worker_struct(workerid);
+			_starpu_remove_sched_ctx_from_worker(workerarg, sched_ctx);
+			int j;
+
+			/* don't leave the workerid with a correct value even if we don't use it anymore */
+			for(j = 0; j < nworkerids_already_in_ctx; j++)
+				if(sched_ctx->workerid[j] == workerid)				 
+					sched_ctx->workerid[j] = -1;
+		  }
+
+		sched_ctx->nworkers_in_ctx -= nworkerids_in_ctx;
+		_starpu_rearange_sched_ctx_workerids(sched_ctx, nworkerids_already_in_ctx);
+	  }
+
+	return;
 
 
-  PTHREAD_MUTEX_LOCK(&worker->submitted_mutex);
+}
 
 
-  worker->nsubmitted++;
+void starpu_remove_workers_from_sched_ctx(int *workerids_in_ctx, int nworkerids_in_ctx, 
+					  struct starpu_sched_ctx *sched_ctx)
+{
+	  /* wait for the workers concerned by the change of contex                              
+	   * to finish their work in the previous context */
+	if(!starpu_wait_for_all_tasks_of_workers(workerids_in_ctx, nworkerids_in_ctx))
+	  {
+		/* block the workers until the contex is switched */
+		set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx);
+		_starpu_remove_workers_from_sched_ctx(workerids_in_ctx, nworkerids_in_ctx, sched_ctx);
+		/* also wait the workers to wake up before using the context */
+		set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx);
+	  }
+	return;
 
 
-  PTHREAD_MUTEX_UNLOCK(&worker->submitted_mutex);
 }
 }
 
 

+ 4 - 4
src/core/sched_policy.c

@@ -347,10 +347,10 @@ struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker)
 		  }
 		  }
 	  }
 	  }
 
 
- 	/* if(task) */
-	/*   { */
-	/* 	printf("task poped by th %d for %d  with strateg %s\n", worker->workerid, worker->arch, task->sched_ctx->sched_policy->policy_name); */
-	/*   } */
+ 	if(task)
+	  {
+	    printf("task %s poped by th %d for %d  with strateg %s\n", task->name, worker->workerid, worker->arch, task->sched_ctx->sched_policy->policy_name);
+	  }
 
 
 	/* Note that we may get a NULL task in case the scheduler was unlocked
 	/* Note that we may get a NULL task in case the scheduler was unlocked
 	 * for some reason. */
 	 * for some reason. */

+ 2 - 0
src/drivers/cuda/driver_cuda.c

@@ -342,6 +342,8 @@ void *_starpu_cuda_worker(void *arg)
 		}
 		}
 
 
 		_starpu_handle_job_termination(j, 0);
 		_starpu_handle_job_termination(j, 0);
+		_starpu_decrement_nsubmitted_tasks_of_worker(args->workerid);
+
 	}
 	}
 
 
 	STARPU_TRACE_WORKER_DEINIT_START
 	STARPU_TRACE_WORKER_DEINIT_START