Browse Source

Bug fixing

Andra Hugo 14 years ago
parent
commit
a86137dc1f

+ 9 - 5
include/starpu_scheduler.h

@@ -101,13 +101,17 @@ struct starpu_sched_policy_s {
 };
 
 struct starpu_sched_ctx {
-	struct starpu_sched_policy_s *sched_policy; /*policy of the contex */
-	int workerid[STARPU_NMAXWORKERS]; /*list of indices of workers */
-	int nworkers_in_ctx; /*number of threads in contex */
-	unsigned is_initial_sched; /*we keep an initial sched which we never delete */
+	struct starpu_sched_policy_s *sched_policy; /* policy of the contex */
+	int workerid[STARPU_NMAXWORKERS]; /* list of indices of workers */
+	int nworkers_in_ctx; /* number of threads in contex */
+	unsigned is_initial_sched; /* we keep an initial sched which we never delete */
+	pthread_cond_t submitted_cond; /* cond used for no of submitted tasks to a sched_ctx */
+	pthread_mutex_t submitted_mutex; /* mut used for no of submitted tasks to a sched_ctx */
+	int nsubmitted;	 /* counter used for no of submitted tasks to a sched_ctx */
+	const char *sched_name;
 };
 
-void starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int *workerids_in_ctx, int nworkerids_in_ctx);
+void starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int *workerids_in_ctx, int nworkerids_in_ctx, const char *sched_name);
 
 void starpu_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx);
 

+ 4 - 0
include/starpu_task.h

@@ -259,6 +259,10 @@ int starpu_task_wait(struct starpu_task *task);
  * been executed. */
 int starpu_task_wait_for_all(void);
 
+/* This function waits until all the tasks that were already submitted to a specific
+ * context have been executed. */
+int starpu_wait_for_all_tasks_of_sched_ctx(struct starpu_sched_ctx *sched_ctx);
+
 void starpu_display_codelet_stats(struct starpu_codelet_t *cl);
 
 /* Return the task currently executed by the worker, or NULL if this is called

+ 3 - 1
src/core/dependencies/tags.c

@@ -328,8 +328,10 @@ int starpu_tag_wait_array(unsigned ntags, starpu_tag_t *id)
 
 	PTHREAD_MUTEX_LOCK(&cg->succ.succ_apps.cg_mutex);
 
-	while (!cg->succ.succ_apps.completed)
+	while (!cg->succ.succ_apps.completed){
+	  printf("cond wait\n");
 		PTHREAD_COND_WAIT(&cg->succ.succ_apps.cg_cond, &cg->succ.succ_apps.cg_mutex);
+	}
 
 	PTHREAD_MUTEX_UNLOCK(&cg->succ.succ_apps.cg_mutex);
 

+ 49 - 12
src/core/sched_ctx.c

@@ -1,9 +1,6 @@
-#include <pthread.h>
-
-#include <starpu.h>
+#include <core/sched_ctx.h>
 #include <common/config.h>
 #include <common/utils.h>
-#include <core/sched_ctx.h>
 #include <core/sched_policy.h>
 #include <profiling/profiling.h>
 
@@ -13,7 +10,8 @@ static pthread_mutex_t blocking_ths_mutex = PTHREAD_MUTEX_INITIALIZER;
 static int nblocked_ths = 0;
 
 void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int
-			      *workerids_in_ctx, int nworkerids_in_ctx, unsigned is_initial_sched)
+			      *workerids_in_ctx, int nworkerids_in_ctx, unsigned is_initial_sched,
+			      const char *sched_name)
 {
 	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
 	int nworkers = config->topology.nworkers;
@@ -23,7 +21,11 @@ void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *po
 	sched_ctx->nworkers_in_ctx = nworkerids_in_ctx;
 	sched_ctx->sched_policy = malloc(sizeof(struct starpu_sched_policy_s));
 	sched_ctx->is_initial_sched = is_initial_sched;
+	sched_ctx->sched_name = sched_name;
 
+	PTHREAD_COND_INIT(&sched_ctx->submitted_cond, NULL);
+	PTHREAD_MUTEX_INIT(&sched_ctx->submitted_mutex, NULL);
+	sched_ctx->nsubmitted = 0;
 
 	int j;
 	/*all the workers are in this contex*/
@@ -153,7 +155,7 @@ static int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkeri
 }
 
 void starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int
-			     *workerids_in_ctx, int nworkerids_in_ctx)
+			     *workerids_in_ctx, int nworkerids_in_ctx, const char *sched_name)
 {
 	  /* wait for the workers concerned by the change of contex                              
 	   * to finish their work in the previous context */
@@ -161,7 +163,7 @@ void starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *pol
 	  {
 		/* block the workers until the contex is switched */
 		set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx);
-		_starpu_create_sched_ctx(sched_ctx, policy_name, workerids_in_ctx, nworkerids_in_ctx, 0);
+		_starpu_create_sched_ctx(sched_ctx, policy_name, workerids_in_ctx, nworkerids_in_ctx, 0, sched_name);
 		/* also wait the workers to wake up before using the context */
 		set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx);
 	  }
@@ -230,14 +232,15 @@ void _starpu_delete_all_sched_ctxs()
 		for(j = 0; j < workerarg->nctxs; j++)
 		  {
 			sched_ctx = workerarg->sched_ctx[j];
-			if(sched_ctx != NULL && !sched_ctx->is_initial_sched)
+			if(sched_ctx != NULL)
 			  {
 				free(sched_ctx->sched_policy);
 				sched_ctx->sched_policy = NULL;
-				workerarg->nctxs--;
+				sched_ctx = NULL;
 				
 			  }
 		  }
+		workerarg->nctxs = 0;
 	  }
 	return;
 }
@@ -272,7 +275,7 @@ int starpu_wait_for_all_tasks_of_workers(int *workerids_in_ctx, int nworkerids_i
 	  {
 		workerid = workerids_in_ctx == NULL ? i : workerids_in_ctx[i];
 		n = starpu_wait_for_all_tasks_of_worker(workerid);
-		ret_val = ret_val && n;
+		ret_val = (ret_val && n);
 	  }
 	
 	return ret_val;
@@ -283,7 +286,7 @@ void _starpu_decrement_nsubmitted_tasks_of_worker(int workerid)
 	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
 	
 	PTHREAD_MUTEX_LOCK(&worker->submitted_mutex);
-	
+
 	if (--worker->nsubmitted == 0)
 		PTHREAD_COND_BROADCAST(&worker->submitted_cond);
 
@@ -296,7 +299,7 @@ void _starpu_increment_nsubmitted_tasks_of_worker(int workerid)
 	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
 
 	PTHREAD_MUTEX_LOCK(&worker->submitted_mutex);
-       
+
 	worker->nsubmitted++;
 	
 	PTHREAD_MUTEX_UNLOCK(&worker->submitted_mutex);
@@ -468,3 +471,37 @@ void starpu_remove_workers_from_sched_ctx(int *workerids_in_ctx, int nworkerids_
 
 }
 
+int starpu_wait_for_all_tasks_of_sched_ctx(struct starpu_sched_ctx *sched_ctx)
+{
+  if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
+    return -EDEADLK;
+
+  PTHREAD_MUTEX_LOCK(&sched_ctx->submitted_mutex);
+
+
+  while (sched_ctx->nsubmitted > 0)
+    PTHREAD_COND_WAIT(&sched_ctx->submitted_cond, &sched_ctx->submitted_mutex);
+
+  PTHREAD_MUTEX_UNLOCK(&sched_ctx->submitted_mutex);
+
+  return 0;
+}
+
+void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(struct starpu_sched_ctx *sched_ctx)
+{
+  PTHREAD_MUTEX_LOCK(&sched_ctx->submitted_mutex);
+
+  if (--sched_ctx->nsubmitted == 0)
+    PTHREAD_COND_BROADCAST(&sched_ctx->submitted_cond);
+
+  PTHREAD_MUTEX_UNLOCK(&sched_ctx->submitted_mutex);
+}
+
+void _starpu_increment_nsubmitted_tasks_of_sched_ctx(struct starpu_sched_ctx *sched_ctx)
+{
+  PTHREAD_MUTEX_LOCK(&sched_ctx->submitted_mutex);
+
+  sched_ctx->nsubmitted++;
+
+  PTHREAD_MUTEX_UNLOCK(&sched_ctx->submitted_mutex);
+}

+ 6 - 1
src/core/sched_ctx.h

@@ -22,7 +22,7 @@
 #include <starpu_scheduler.h>
 
 
-void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int *workerid, int nworkerids, unsigned is_init_sched);
+void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int *workerid, int nworkerids, unsigned is_init_sched, const char *sched_name);
 
 void _starpu_delete_all_sched_ctxs();
 
@@ -33,4 +33,9 @@ void _starpu_decrement_nblocked_ths(void);
 void _starpu_decrement_nsubmitted_tasks_of_worker(int workerid);
 void _starpu_increment_nsubmitted_tasks_of_worker(int workerid);
 
+/* In order to implement starpu_wait_for_all_tasks_of_ctx, we keep track of the number of 
+ * task currently submitted to the context */
+void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(struct starpu_sched_ctx *sched_ctx);
+void _starpu_increment_nsubmitted_tasks_of_sched_ctx(struct starpu_sched_ctx *sched_ctx);
+
 #endif // __SCHED_CONTEXT_H__

+ 6 - 0
src/core/sched_policy.c

@@ -305,6 +305,7 @@ int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
         int ret;
 	if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
 	{
+		_starpu_increment_nsubmitted_tasks_of_worker(task->workerid);
 		ret = _starpu_push_task_on_specific_worker(task, task->workerid);
 	}
 	else {
@@ -315,6 +316,11 @@ int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
 
 	_starpu_profiling_set_task_push_end_time(task);
 
+ 	/* if(task) */
+	/*   { */
+	/*     printf("task %s pushed with strateg %s\n", task->name, task->sched_ctx->sched_policy->policy_name); */
+	/*   } */
+
         _STARPU_LOG_OUT();
         return ret;
 }

+ 1 - 0
src/core/task.c

@@ -200,6 +200,7 @@ int _starpu_submit_job(starpu_job_t j, unsigned do_not_increment_nsubmitted)
 
 	if (!do_not_increment_nsubmitted){
 		_starpu_increment_nsubmitted_tasks();
+		_starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
 	}
 
 	PTHREAD_MUTEX_LOCK(&j->sync_mutex);

+ 2 - 2
src/core/workers.c

@@ -356,9 +356,9 @@ int starpu_init(struct starpu_conf *user_conf)
 	/* initialize the scheduling policy */
 
 	if(user_conf == NULL)
-	  _starpu_create_sched_ctx(&sched_ctx, NULL, NULL, -1, 1);
+	  _starpu_create_sched_ctx(&sched_ctx, NULL, NULL, -1, 1, "init");
 	else
-	  _starpu_create_sched_ctx(&sched_ctx, user_conf->sched_policy_name, NULL, -1, 1);
+	  _starpu_create_sched_ctx(&sched_ctx, user_conf->sched_policy_name, NULL, -1, 1, "init");
 
 	//_starpu_init_sched_policy(&config, &sched_ctx);
 

+ 3 - 0
src/drivers/cpu/driver_cpu.c

@@ -228,6 +228,8 @@ void *_starpu_cpu_worker(void *arg)
 
 		_starpu_set_current_task(j->task);
 
+		struct starpu_sched_ctx *local_sched_ctx = j->task->sched_ctx;
+
                 res = execute_job_on_cpu(j, cpu_arg, is_parallel_task, rank, perf_arch);
 
 		_starpu_set_current_task(NULL);
@@ -245,6 +247,7 @@ void *_starpu_cpu_worker(void *arg)
 		if (rank == 0){
 			_starpu_handle_job_termination(j, 0);
 			_starpu_decrement_nsubmitted_tasks_of_worker(cpu_arg->workerid);
+			_starpu_decrement_nsubmitted_tasks_of_sched_ctx(local_sched_ctx);
 		}
         }
 

+ 5 - 1
src/drivers/cuda/driver_cuda.c

@@ -178,7 +178,7 @@ static int execute_job_on_cuda(starpu_job_t j, struct starpu_worker_s *args)
 	ret = _starpu_fetch_task_input(task, mask);
 
 	if (ret != 0) {
-		/* there was not enough memory, so the input of
+		/* there was not enough memory, so th input of
 		 * the codelet cannot be fetched ... put the 
 		 * codelet back, and try it later */
 		return -EAGAIN;
@@ -212,6 +212,7 @@ static int execute_job_on_cuda(starpu_job_t j, struct starpu_worker_s *args)
 
 	cl->per_worker_stats[workerid]++;
 
+
 	if ((profiling && profiling_info) || calibrate_model)
 		starpu_clock_gettime(&codelet_end);
 
@@ -325,6 +326,8 @@ void *_starpu_cuda_worker(void *arg)
 
 		_starpu_set_current_task(task);
 
+		struct starpu_sched_ctx *local_sched_ctx = j->task->sched_ctx;
+
 		res = execute_job_on_cuda(j, args);
 
 		_starpu_set_current_task(NULL);
@@ -343,6 +346,7 @@ void *_starpu_cuda_worker(void *arg)
 
 		_starpu_handle_job_termination(j, 0);
 		_starpu_decrement_nsubmitted_tasks_of_worker(args->workerid);
+		_starpu_decrement_nsubmitted_tasks_of_sched_ctx(local_sched_ctx);
 
 	}
 

+ 1 - 1
src/drivers/gordon/driver_gordon.c

@@ -211,7 +211,7 @@ static void gordon_callback_list_func(void *arg)
 		}
 
 		_starpu_push_task_output(j->task, 0);
-		_starpu_handle_job_termination(j, 0);
+		_starpu_handle_job_termination(j, 0, worker->sched_ctx);
 		//starpu_wake_all_blocked_workers();
 
 		task_cnt++;

+ 1 - 0
src/sched_policies/eager_central_policy.c

@@ -64,6 +64,7 @@ static int push_task_eager_policy(struct starpu_task *task, struct starpu_sched_
 		_starpu_increment_nsubmitted_tasks_of_worker(workerid);
 	}
 
+
 	return _starpu_fifo_push_task(fifo, &sched_mutex, &sched_cond, task);
 }
 

+ 4 - 1
src/sched_policies/heft.c

@@ -263,8 +263,10 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, struct starp
 
 	/* If there is no prediction available for that task with that arch we
 	 * want to speed-up calibration time so we force this measurement */
-	if (forced_best != -1)
+	if (forced_best != -1){
+		_starpu_increment_nsubmitted_tasks_of_worker(forced_best);
 		return push_task_on_best_worker(task, forced_best, 0.0, prio);
+	}
 
 	/*
 	 *	Determine which worker optimizes the fitness metric which is a
@@ -331,6 +333,7 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, struct starp
 		model_best = local_task_length[best];
 	}
 
+	_starpu_increment_nsubmitted_tasks_of_worker(best);
 	return push_task_on_best_worker(task, best, model_best, prio);
 }
 

+ 2 - 5
tests/cholesky_and_lu/sched.sh

@@ -37,12 +37,9 @@ do
     echo "$ROOTDIR/examples/$BENCH_NAME/$BENCH_NAME $OPTIONS"
 
     val=`$ROOTDIR/examples/$BENCH_NAME/$BENCH_NAME $OPTIONS`
-    if [ "$val" != "" ];
-    then
-	echo "$size $val"
-	echo "$size $val" >> $filename
-    fi
 
+    echo "$size $val"
+    echo "$size $val" >> $filename
 done