Parcourir la source

Increment/decrement submitted/ready tasks per context (instead of doing it per context & globally)

Andra Hugo il y a 11 ans
Parent
commit
9bb7bdb07f

+ 2 - 1
include/starpu_sched_ctx_hypervisor.h

@@ -29,7 +29,8 @@ struct starpu_sched_ctx_performance_counters
 	void (*notify_idle_cycle)(unsigned sched_ctx_id, int worker, double idle_time);
 	void (*notify_poped_task)(unsigned sched_ctx_id, int worker);
 	void (*notify_pushed_task)(unsigned sched_ctx_id, int worker);
-	void (*notify_post_exec_task)(struct starpu_task *task, size_t data_size, uint32_t footprint, int hypervisor_tag);
+	void (*notify_post_exec_task)(struct starpu_task *task, size_t data_size, uint32_t footprint, int hypervisor_tag,
+				      int nready_tasks, double nready_flops);
 	void (*notify_submitted_job)(struct starpu_task *task, uint32_t footprint, size_t data_size);
 	void (*notify_ready_task)(unsigned sched_ctx_id, struct starpu_task *task);
 	void (*notify_empty_ctx)(unsigned sched_ctx_id, struct starpu_task *task);

+ 5 - 4
sc_hypervisor/src/sc_hypervisor.c

@@ -24,7 +24,8 @@ struct starpu_sched_ctx_performance_counters* perf_counters = NULL;
 
 static void notify_idle_cycle(unsigned sched_ctx, int worker, double idle_time);
 static void notify_pushed_task(unsigned sched_ctx, int worker);
-static void notify_post_exec_task(struct starpu_task *task, size_t data_size, uint32_t footprint, int hypervisor_tag);
+static void notify_post_exec_task(struct starpu_task *task, size_t data_size, uint32_t footprint, 
+				  int hypervisor_tag, int nready_tasks, double ready_flops);
 static void notify_poped_task(unsigned sched_ctx, int  worker);
 static void notify_submitted_job(struct starpu_task *task, unsigned footprint, size_t data_size);
 static void notify_ready_task(unsigned sched_ctx, struct starpu_task *task);
@@ -996,7 +997,7 @@ static void notify_poped_task(unsigned sched_ctx, int worker)
 
  
 /* notifies the hypervisor that a tagged task has just been executed */
-static void notify_post_exec_task(struct starpu_task *task, size_t data_size, uint32_t footprint, int task_tag)
+static void notify_post_exec_task(struct starpu_task *task, size_t data_size, uint32_t footprint, int task_tag, int ready_tasks, double ready_flops)
 {
 	unsigned sched_ctx = task->sched_ctx;
 	int worker = starpu_worker_get_id();
@@ -1017,8 +1018,8 @@ static void notify_post_exec_task(struct starpu_task *task, size_t data_size, ui
 
 	starpu_pthread_mutex_lock(&act_hypervisor_mutex);
 	hypervisor.sched_ctx_w[sched_ctx].remaining_flops -= task->flops;
-	hypervisor.sched_ctx_w[sched_ctx].nready_tasks--;
-	hypervisor.sched_ctx_w[sched_ctx].ready_flops -= task->flops;
+	hypervisor.sched_ctx_w[sched_ctx].nready_tasks = ready_tasks;
+	hypervisor.sched_ctx_w[sched_ctx].ready_flops = ready_flops;
 	if(hypervisor.sched_ctx_w[sched_ctx].ready_flops < 0.0)
 		hypervisor.sched_ctx_w[sched_ctx].ready_flops = 0.0;
 	_ack_resize_completed(sched_ctx, worker);

+ 1 - 0
src/common/barrier.c

@@ -23,6 +23,7 @@ int _starpu_barrier_init(struct _starpu_barrier *barrier, int count)
 	barrier->count = count;
 	barrier->reached_start = 0;
 	barrier->reached_exit = 0;
+	barrier->reached_flops = 0.0;
 	STARPU_PTHREAD_MUTEX_INIT(&barrier->mutex, NULL);
 	STARPU_PTHREAD_MUTEX_INIT(&barrier->mutex_exit, NULL);
 	STARPU_PTHREAD_COND_INIT(&barrier->cond, NULL);

+ 1 - 0
src/common/barrier.h

@@ -29,6 +29,7 @@ struct _starpu_barrier
 	int count;
 	int reached_start;
 	int reached_exit;
+	double reached_flops;
 	starpu_pthread_mutex_t mutex;
 	starpu_pthread_mutex_t mutex_exit;
 	starpu_pthread_cond_t cond;

+ 17 - 3
src/common/barrier_counter.c

@@ -56,7 +56,7 @@ int _starpu_barrier_counter_wait_for_full_counter(struct _starpu_barrier_counter
 	return 0;
 }
 
-int _starpu_barrier_counter_decrement_until_empty_counter(struct _starpu_barrier_counter *barrier_c)
+int _starpu_barrier_counter_decrement_until_empty_counter(struct _starpu_barrier_counter *barrier_c, double flops)
 {
 	struct _starpu_barrier *barrier = &barrier_c->barrier;
 	int ret = 0;
@@ -64,6 +64,7 @@ int _starpu_barrier_counter_decrement_until_empty_counter(struct _starpu_barrier
 
 	if (--barrier->reached_start == 0)
 	{
+		barrier->reached_flops -= flops;
 		ret = 1;
 		STARPU_PTHREAD_COND_BROADCAST(&barrier->cond);
 	}
@@ -72,7 +73,7 @@ int _starpu_barrier_counter_decrement_until_empty_counter(struct _starpu_barrier
 	return ret;
 }
 
-int _starpu_barrier_counter_increment_until_full_counter(struct _starpu_barrier_counter *barrier_c)
+int _starpu_barrier_counter_increment_until_full_counter(struct _starpu_barrier_counter *barrier_c, double flops)
 {
 	struct _starpu_barrier *barrier = &barrier_c->barrier;
 	int ret = 0;
@@ -80,6 +81,7 @@ int _starpu_barrier_counter_increment_until_full_counter(struct _starpu_barrier_
 
 	if(++barrier->reached_start == barrier->count)
 	{
+		barrier->reached_flops += flops;
 		ret = 1;
 		STARPU_PTHREAD_COND_BROADCAST(&barrier_c->cond2);
 	}
@@ -88,14 +90,26 @@ int _starpu_barrier_counter_increment_until_full_counter(struct _starpu_barrier_
 	return ret;
 }
 
-int _starpu_barrier_counter_increment(struct _starpu_barrier_counter *barrier_c)
+int _starpu_barrier_counter_increment(struct _starpu_barrier_counter *barrier_c, double flops)
 {
 	struct _starpu_barrier *barrier = &barrier_c->barrier;
 	STARPU_PTHREAD_MUTEX_LOCK(&barrier->mutex);
 
 	barrier->reached_start++;
+	barrier->reached_flops += flops;
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(&barrier->mutex);
 	return 0;
 }
 
+int _starpu_barrier_counter_check(struct _starpu_barrier_counter *barrier_c)
+{
+	struct _starpu_barrier *barrier = &barrier_c->barrier;
+	int ret = 0;
+	STARPU_PTHREAD_MUTEX_LOCK(&barrier->mutex);
+
+	if(barrier->reached_start == 0)
+		STARPU_PTHREAD_COND_BROADCAST(&barrier->cond);
+
+	STARPU_PTHREAD_MUTEX_UNLOCK(&barrier->mutex);
+}

+ 5 - 3
src/common/barrier_counter.h

@@ -34,10 +34,12 @@ int _starpu_barrier_counter_wait_for_empty_counter(struct _starpu_barrier_counte
 
 int _starpu_barrier_counter_wait_for_full_counter(struct _starpu_barrier_counter *barrier_c);
 
-int _starpu_barrier_counter_decrement_until_empty_counter(struct _starpu_barrier_counter *barrier_c);
+int _starpu_barrier_counter_decrement_until_empty_counter(struct _starpu_barrier_counter *barrier_c, double flops);
 
-int _starpu_barrier_counter_increment_until_full_counter(struct _starpu_barrier_counter *barrier_c);
+int _starpu_barrier_counter_increment_until_full_counter(struct _starpu_barrier_counter *barrier_c, double flops);
 
-int _starpu_barrier_counter_increment(struct _starpu_barrier_counter *barrier_c);
+int _starpu_barrier_counter_increment(struct _starpu_barrier_counter *barrier_c, double flops);
+
+int _starpu_barrier_counter_check(struct _starpu_barrier_counter *barrier_c);
 
 #endif

+ 2 - 2
src/core/jobs.c

@@ -143,6 +143,7 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 {
 	struct starpu_task *task = j->task;
 	unsigned sched_ctx = task->sched_ctx;
+	double flops = task->flops;
 	STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 
 	task->status = STARPU_TASK_FINISHED;
@@ -294,10 +295,9 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 		int ret = _starpu_submit_job(j);
 		STARPU_ASSERT(!ret);
 	}
-	_starpu_decrement_nsubmitted_tasks();
-	_starpu_decrement_nready_tasks();
 
 	_starpu_decrement_nsubmitted_tasks_of_sched_ctx(sched_ctx);
+	_starpu_decrement_nready_tasks_of_sched_ctx(sched_ctx, flops);
 
 	struct _starpu_worker *worker;
 	worker = _starpu_get_local_worker_key();

+ 77 - 3
src/core/sched_ctx.c

@@ -292,7 +292,9 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 	sem_init(&sched_ctx->parallel_code_sem, 0, 0);
 
 	_starpu_barrier_counter_init(&sched_ctx->tasks_barrier, 0);
+	_starpu_barrier_counter_init(&sched_ctx->ready_tasks_barrier, 0);
 
+	sched_ctx->ready_flops = 0.0;
 	/*init the strategy structs and the worker_collection of the ressources of the context */
 	_starpu_init_sched_policy(config, sched_ctx, policy);
 
@@ -630,6 +632,7 @@ void _starpu_delete_all_sched_ctxs()
 		{
 			_starpu_sched_ctx_free_scheduling_data(sched_ctx);
 			_starpu_barrier_counter_destroy(&sched_ctx->tasks_barrier);
+			_starpu_barrier_counter_destroy(&sched_ctx->ready_tasks_barrier);
 			_starpu_delete_sched_ctx(sched_ctx);
 		}
 		STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[i]);
@@ -748,6 +751,28 @@ void starpu_sched_ctx_remove_workers(int *workers_to_remove, int nworkers_to_rem
 	return;
 }
 
+int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struct _starpu_sched_ctx *sched_ctx)
+{
+	unsigned worker = 0, nworkers = 0;
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[sched_ctx->id]);
+	struct starpu_worker_collection *workers = sched_ctx->workers;
+
+	struct starpu_sched_ctx_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
+
+	while(workers->has_next(workers, &it))
+	{
+		worker = workers->get_next(workers, &it);
+		STARPU_ASSERT_MSG(worker < STARPU_NMAXWORKERS, "worker id %d", worker);
+		if (starpu_worker_can_execute_task(worker, task, 0))
+			nworkers++;
+	}
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx->id]);
+
+	return nworkers;
+}
+
 /* unused sched_ctx have the id STARPU_NMAX_SCHED_CTXS */
 void _starpu_init_all_sched_ctxs(struct _starpu_machine_config *config)
 {
@@ -786,8 +811,12 @@ int _starpu_wait_for_all_tasks_of_sched_ctx(unsigned sched_ctx_id)
 
 void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
 {
+	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
+	if (!config->watchdog_ok)
+		config->watchdog_ok = 1;
+
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	int finished = _starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->tasks_barrier);
+	int finished = _starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->tasks_barrier, 0.0);
         /* when finished decrementing the tasks if the user signaled he will not submit tasks anymore
            we can move all its workers to the inheritor context */
 	if(finished && sched_ctx->inheritor != STARPU_NMAX_SCHED_CTXS)
@@ -819,13 +848,56 @@ void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
 		}
 		STARPU_PTHREAD_MUTEX_UNLOCK(&finished_submit_mutex);
 	}
+
 	return;
 }
 
 void _starpu_increment_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	_starpu_barrier_counter_increment(&sched_ctx->tasks_barrier);
+	_starpu_barrier_counter_increment(&sched_ctx->tasks_barrier, 0.0);
+}
+
+int _starpu_get_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	return sched_ctx->tasks_barrier.barrier.reached_start;
+}
+
+int _starpu_check_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	_starpu_barrier_counter_check(&sched_ctx->tasks_barrier);
+}
+
+void _starpu_increment_nready_tasks_of_sched_ctx(unsigned sched_ctx_id, double ready_flops)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	_starpu_barrier_counter_increment(&sched_ctx->ready_tasks_barrier, ready_flops);
+}
+
+void _starpu_decrement_nready_tasks_of_sched_ctx(unsigned sched_ctx_id, double ready_flops)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	_starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->ready_tasks_barrier, ready_flops);
+}
+
+int _starpu_get_nready_tasks_of_sched_ctx(unsigned sched_ctx_id)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	return sched_ctx->ready_tasks_barrier.barrier.reached_start;
+}
+
+double _starpu_get_nready_flops_of_sched_ctx(unsigned sched_ctx_id)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	return sched_ctx->ready_tasks_barrier.barrier.reached_flops;
+}
+
+int _starpu_wait_for_no_ready_of_sched_ctx(unsigned sched_ctx_id)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	return _starpu_barrier_counter_wait_for_empty_counter(&sched_ctx->ready_tasks_barrier);
 }
 
 void starpu_sched_ctx_set_context(unsigned *sched_ctx)
@@ -1099,7 +1171,9 @@ void _starpu_sched_ctx_post_exec_task_cb(int workerid, struct starpu_task *task,
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
 	if(sched_ctx != NULL && task->sched_ctx != _starpu_get_initial_sched_ctx()->id && 
 	   task->sched_ctx != STARPU_NMAX_SCHED_CTXS  && sched_ctx->perf_counters != NULL)
-		sched_ctx->perf_counters->notify_post_exec_task(task, data_size, footprint, task->hypervisor_tag);
+		sched_ctx->perf_counters->notify_post_exec_task(task, data_size, footprint, task->hypervisor_tag, 
+								_starpu_get_nready_tasks_of_sched_ctx(sched_ctx->id), 
+								_starpu_get_nready_flops_of_sched_ctx(sched_ctx->id));
 }
 
 void starpu_sched_ctx_call_pushed_task_cb(int workerid, unsigned sched_ctx_id)

+ 17 - 0
src/core/sched_ctx.h

@@ -59,6 +59,12 @@ struct _starpu_sched_ctx
 	/* wait for the tasks submitted to the context to be executed */
 	struct _starpu_barrier_counter tasks_barrier;
 
+	/* wait for the tasks ready of the context to be executed */
+	struct _starpu_barrier_counter ready_tasks_barrier;
+
+	/* amount of ready flops in a context */
+	double ready_flops;
+
 	/* cond to block push when there are no workers in the ctx */
 	starpu_pthread_cond_t no_workers_cond;
 
@@ -141,6 +147,14 @@ int _starpu_wait_for_all_tasks_of_sched_ctx(unsigned sched_ctx_id);
  * task currently submitted to the context */
 void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id);
 void _starpu_increment_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id);
+int _starpu_get_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id);
+int _starpu_check_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id);
+
+void _starpu_decrement_nready_tasks_of_sched_ctx(unsigned sched_ctx_id, double ready_flops);
+void _starpu_increment_nready_tasks_of_sched_ctx(unsigned sched_ctx_id, double ready_flops);
+int _starpu_get_nready_tasks_of_sched_ctx(unsigned sched_ctx_id);
+double _starpu_get_nready_flops_of_sched_ctx(unsigned sched_ctx_id);
+int _starpu_wait_for_no_ready_of_sched_ctx(unsigned sched_ctx_id);
 
 /* Return the corresponding index of the workerid in the ctx table */
 int _starpu_get_index_in_ctx_of_workerid(unsigned sched_ctx, unsigned workerid);
@@ -179,6 +193,9 @@ void _starpu_sched_ctx_signal_worker_blocked(int workerid);
  * id set by its last call, or the id of the initial context */
 unsigned _starpu_sched_ctx_get_current_context();
 
+/* verify how many workers can execute a certain task */
+int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struct _starpu_sched_ctx *sched_ctx);
+
 #ifdef STARPU_USE_SC_HYPERVISOR
 /* Notifies the hypervisor that a tasks was poped from the workers' list */
 void _starpu_sched_ctx_post_exec_task_cb(int workerid, struct starpu_task *task, size_t data_size, uint32_t footprint);

+ 1 - 25
src/core/sched_policy.c

@@ -292,25 +292,6 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 	}
 }
 
-static int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struct _starpu_sched_ctx *sched_ctx)
-{
-	unsigned worker = 0, nworkers = 0;
-	struct starpu_worker_collection *workers = sched_ctx->workers;
-
-	struct starpu_sched_ctx_iterator it;
-	if(workers->init_iterator)
-		workers->init_iterator(workers, &it);
-
-	while(workers->has_next(workers, &it))
-	{
-		worker = workers->get_next(workers, &it);
-		if (starpu_worker_can_execute_task(worker, task, 0))
-			nworkers++;
-	}
-
-	return nworkers;
-}
-
 /* the generic interface that call the proper underlying implementation */
 
 int _starpu_push_task(struct _starpu_job *j)
@@ -327,13 +308,8 @@ int _starpu_push_task(struct _starpu_job *j)
 	_STARPU_LOG_IN();
 
 	_STARPU_TRACE_JOB_PUSH(task, task->priority > 0);
-	_starpu_increment_nready_tasks();
+	_starpu_increment_nready_tasks_of_sched_ctx(task->sched_ctx, task->flops);
 	task->status = STARPU_TASK_READY;
-#ifdef STARPU_USE_SC_HYPERVISOR
-	if(sched_ctx != NULL && sched_ctx->id != 0 && sched_ctx->perf_counters != NULL 
-	   && sched_ctx->perf_counters->notify_ready_task)
-		sched_ctx->perf_counters->notify_ready_task(sched_ctx->id, task);
-#endif //STARPU_USE_SC_HYPERVISOR
 
 #ifdef HAVE_AYUDAME_H
 	if (AYU_event)

+ 71 - 83
src/core/task.c

@@ -42,10 +42,6 @@
 /* TODO we could make this hierarchical to avoid contention ? */
 static starpu_pthread_cond_t submitted_cond = STARPU_PTHREAD_COND_INITIALIZER;
 static starpu_pthread_mutex_t submitted_mutex = STARPU_PTHREAD_MUTEX_INITIALIZER;
-static long int nsubmitted = 0, nready = 0;
-static int watchdog_ok;
-
-static void _starpu_increment_nsubmitted_tasks(void);
 
 /* This key stores the task currently handled by the thread, note that we
  * cannot use the worker structure to store that information because it is
@@ -242,7 +238,6 @@ int _starpu_submit_job(struct _starpu_job *j)
 	/* notify bound computation of a new task */
 	_starpu_bound_record(j);
 
-	_starpu_increment_nsubmitted_tasks();
 	_starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
 
 #ifdef STARPU_USE_SC_HYPERVISOR
@@ -595,7 +590,6 @@ int _starpu_task_submit_nodeps(struct starpu_task *task)
 		j->task->sched_ctx = _starpu_sched_ctx_get_current_context();
 	}
 
-	_starpu_increment_nsubmitted_tasks();
 	_starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
 	STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 
@@ -661,11 +655,10 @@ int _starpu_task_submit_conversion_task(struct starpu_task *task,
 		j->task->sched_ctx = _starpu_sched_ctx_get_current_context();
 	}
 
-	_starpu_increment_nsubmitted_tasks();
 	_starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
 	STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 	j->submitted = 1;
-	_starpu_increment_nready_tasks();
+	_starpu_increment_nready_tasks_of_sched_ctx(j->task->sched_ctx, j->task->flops);
 
 	for (i=0 ; i<task->cl->nbuffers ; i++)
 	{
@@ -743,18 +736,24 @@ int starpu_task_wait_for_all(void)
 		if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
 			return -EDEADLK;
 
-		STARPU_PTHREAD_MUTEX_LOCK(&submitted_mutex);
-
-		_STARPU_TRACE_TASK_WAIT_FOR_ALL;
-
-		while (nsubmitted > 0)
-			STARPU_PTHREAD_COND_WAIT(&submitted_cond, &submitted_mutex);
-
-		STARPU_PTHREAD_MUTEX_UNLOCK(&submitted_mutex);
-
 #ifdef HAVE_AYUDAME_H
 		if (AYU_event) AYU_event(AYU_BARRIER, 0, NULL);
 #endif
+		struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
+		if(config->topology.nsched_ctxs == 1)
+			starpu_task_wait_for_all_in_ctx(0);
+		else
+		{
+			int s;
+			for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
+			{
+				if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
+				{
+					starpu_task_wait_for_all_in_ctx(config->sched_ctxs[s].id);
+				}
+			}
+		}
+
 		return 0;
 	}
 	else
@@ -782,42 +781,22 @@ int starpu_task_wait_for_no_ready(void)
 	if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
 		return -EDEADLK;
 
-	STARPU_PTHREAD_MUTEX_LOCK(&submitted_mutex);
-
-	_STARPU_TRACE_TASK_WAIT_FOR_ALL;
-
-	while (nready > 0)
-		STARPU_PTHREAD_COND_WAIT(&submitted_cond, &submitted_mutex);
-
-	STARPU_PTHREAD_MUTEX_UNLOCK(&submitted_mutex);
-
-	return 0;
-}
-
-void _starpu_decrement_nsubmitted_tasks(void)
-{
-	struct _starpu_machine_config *config = _starpu_get_machine_config();
-
-	STARPU_PTHREAD_MUTEX_LOCK(&submitted_mutex);
-
-	if (!watchdog_ok)
-		watchdog_ok = 1;
-
-	if (--nsubmitted == 0)
+	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
+	if(config->topology.nsched_ctxs == 1)
+		_starpu_wait_for_no_ready_of_sched_ctx(0);
+	else
 	{
-		if (!config->submitting)
+		int s;
+		for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
 		{
-			ANNOTATE_HAPPENS_AFTER(&config->running);
-			config->running = 0;
-			ANNOTATE_HAPPENS_BEFORE(&config->running);
+			if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
+			{
+				_starpu_wait_for_no_ready_of_sched_ctx(config->sched_ctxs[s].id);
+			}
 		}
-		STARPU_PTHREAD_COND_BROADCAST(&submitted_cond);
 	}
 
-	_STARPU_TRACE_UPDATE_TASK_CNT(nsubmitted);
-
-	STARPU_PTHREAD_MUTEX_UNLOCK(&submitted_mutex);
-
+	return 0;
 }
 
 void
@@ -826,57 +805,65 @@ starpu_drivers_request_termination(void)
 	struct _starpu_machine_config *config = _starpu_get_machine_config();
 
 	STARPU_PTHREAD_MUTEX_LOCK(&submitted_mutex);
-
+	int nsubmitted = starpu_task_nsubmitted();
 	config->submitting = 0;
 	if (nsubmitted == 0)
 	{
 		ANNOTATE_HAPPENS_AFTER(&config->running);
 		config->running = 0;
 		ANNOTATE_HAPPENS_BEFORE(&config->running);
-		STARPU_PTHREAD_COND_BROADCAST(&submitted_cond);
+		int s;
+		for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
+		{
+			if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
+			{
+				_starpu_check_nsubmitted_tasks_of_sched_ctx(config->sched_ctxs[s].id);
+			}
+		}
 	}
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(&submitted_mutex);
 }
 
-static void _starpu_increment_nsubmitted_tasks(void)
-{
-	STARPU_PTHREAD_MUTEX_LOCK(&submitted_mutex);
-
-	nsubmitted++;
-
-	_STARPU_TRACE_UPDATE_TASK_CNT(nsubmitted);
-
-	STARPU_PTHREAD_MUTEX_UNLOCK(&submitted_mutex);
-}
-
 int starpu_task_nsubmitted(void)
 {
+	int nsubmitted = 0;
+	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
+	if(config->topology.nsched_ctxs == 1)
+		nsubmitted = _starpu_get_nsubmitted_tasks_of_sched_ctx(0);
+	else
+	{
+		int s;
+		for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
+		{
+			if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
+			{
+				nsubmitted += _starpu_get_nsubmitted_tasks_of_sched_ctx(config->sched_ctxs[s].id);
+			}
+		}
+	}
 	return nsubmitted;
 }
 
-void _starpu_increment_nready_tasks(void)
-{
-	STARPU_PTHREAD_MUTEX_LOCK(&submitted_mutex);
-
-	nready++;
-
-	STARPU_PTHREAD_MUTEX_UNLOCK(&submitted_mutex);
-}
-
-void _starpu_decrement_nready_tasks(void)
-{
-	STARPU_PTHREAD_MUTEX_LOCK(&submitted_mutex);
-
-	if (--nready == 0)
-		STARPU_PTHREAD_COND_BROADCAST(&submitted_cond);
-
-	STARPU_PTHREAD_MUTEX_UNLOCK(&submitted_mutex);
-
-}
 
 int starpu_task_nready(void)
 {
+	int nready = 0;
+	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
+	if(config->topology.nsched_ctxs == 1)
+		nready = _starpu_get_nready_tasks_of_sched_ctx(0);
+	else
+	{
+		int s;
+		for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
+		{
+			if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
+			{
+				nready += _starpu_get_nready_tasks_of_sched_ctx(config->sched_ctxs[s].id);
+			}
+		}
+	}
+
 	return nready;
 }
 
@@ -1042,18 +1029,19 @@ static void *watchdog_func(void *foo STARPU_ATTRIBUTE_UNUSED)
 	timeout = atoll(timeout_env);
 	ts.tv_sec = timeout / 1000000;
 	ts.tv_nsec = (timeout % 1000000) * 1000;
-
+	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
+	
 	STARPU_PTHREAD_MUTEX_LOCK(&submitted_mutex);
 	while (_starpu_machine_is_running())
 	{
 		int last_nsubmitted = starpu_task_nsubmitted();
-		watchdog_ok = 0;
+		config->watchdog_ok = 0;
 		STARPU_PTHREAD_MUTEX_UNLOCK(&submitted_mutex);
 
 		_starpu_sleep(ts);
 
 		STARPU_PTHREAD_MUTEX_LOCK(&submitted_mutex);
-		if (!watchdog_ok && last_nsubmitted
+		if (!config->watchdog_ok && last_nsubmitted
 				&& last_nsubmitted == starpu_task_nsubmitted())
 		{
 			fprintf(stderr,"The StarPU watchdog detected that no task finished for %u.%06us (can be configure through STARPU_WATCHDOG_TIMEOUT)\n", (unsigned)ts.tv_sec, (unsigned)ts.tv_nsec/1000);

+ 0 - 8
src/core/task.h

@@ -26,14 +26,6 @@
 /* Internal version of starpu_task_destroy: don't check task->destroy flag */
 void _starpu_task_destroy(struct starpu_task *task);
 
-/* In order to implement starpu_task_wait_for_all, we keep track of the number of
- * task currently submitted */
-void _starpu_decrement_nsubmitted_tasks(void);
-/* In order to implement starpu_task_wait_for_no_ready, we keep track of the number of
- * task currently ready */
-void _starpu_increment_nready_tasks(void);
-void _starpu_decrement_nready_tasks(void);
-
 /* A pthread key is used to store the task currently executed on the thread.
  * _starpu_initialize_current_task_key initializes this pthread key and
  * _starpu_set_current_task updates its current value. */

+ 2 - 0
src/core/workers.h

@@ -309,6 +309,8 @@ struct _starpu_machine_config
 
 	/* this flag is set until the application is finished submitting tasks */
 	unsigned submitting;
+
+	int watchdog_ok;
 };
 
 /* Three functions to manage argv, argc */