Browse Source

When enforcing the task dependencies, we first make sure that the task was
actually submitted by the application before scheduling it.

Cédric Augonnet 15 years ago
parent
commit
d32dd5c173
4 changed files with 12 additions and 11 deletions
  1. 6 5
      src/core/dependencies/cg.c
  2. 3 6
      src/core/jobs.c
  3. 1 0
      src/core/jobs.h
  4. 2 0
      src/core/task.c

+ 6 - 5
src/core/dependencies/cg.c

@@ -62,6 +62,7 @@ void _starpu_notify_cg(starpu_cg_t *cg)
 {
 	STARPU_ASSERT(cg);
 	unsigned remaining = STARPU_ATOMIC_ADD(&cg->remaining, -1);
+
 	if (remaining == 0) {
 		cg->remaining = cg->ntags;
 
@@ -85,7 +86,7 @@ void _starpu_notify_cg(starpu_cg_t *cg)
 				tag_successors = &tag->tag_successors;
 	
 				tag_successors->ndeps_completed++;
-	
+
 				if ((tag->state == STARPU_BLOCKED) &&
 					(tag_successors->ndeps == tag_successors->ndeps_completed)) {
 					/* reset the counter so that we can reuse the completion group */
@@ -95,7 +96,6 @@ void _starpu_notify_cg(starpu_cg_t *cg)
 				break;
 
 			case STARPU_CG_TASK:
-				/* TODO */
 				j = cg->succ.job;
 
 				job_successors = &j->job_successors;
@@ -105,9 +105,10 @@ void _starpu_notify_cg(starpu_cg_t *cg)
 
 				if (job_successors->ndeps == ndeps_completed)
 				{
-					/* reset the counter so that we can reuse the completion group */
-					job_successors->ndeps_completed = 0;
-					_starpu_enforce_deps_starting_from_data(j);
+					/* Note that this also ensures that tag deps are
+					 * fulfilled. This counter is reseted only when the
+					 * dependencies are are all fulfilled) */
+					_starpu_enforce_deps_and_schedule(j);
 				}
 
 				break;

+ 3 - 6
src/core/jobs.c

@@ -49,6 +49,7 @@ starpu_job_t __attribute__((malloc)) _starpu_job_create(struct starpu_task *task
 
 	job->predicted = 0.0;
 	job->footprint_is_computed = 0;
+	job->submitted = 0;
 	job->terminated = 0;
 
 	_starpu_cg_list_init(&job->job_successors);
@@ -76,8 +77,6 @@ void _starpu_wait_job(starpu_job_t j)
 	j->terminated = 0;
 
 	pthread_mutex_unlock(&j->sync_mutex);
-
-	//starpu_job_delete(j);
 }
 
 void _starpu_handle_job_termination(starpu_job_t j)
@@ -88,6 +87,7 @@ void _starpu_handle_job_termination(starpu_job_t j)
 		fprintf(stderr, "OOPS ... job %p was already terminated !!\n", j);
 
 	/* in case there are dependencies, wake up the proper tasks */
+	j->submitted = 0;
 	_starpu_notify_dependencies(j);
 
 	/* the callback is executed after the dependencies so that we may remove the tag 
@@ -124,9 +124,6 @@ void _starpu_handle_job_termination(starpu_job_t j)
 	else {
 		/* no one is going to synchronize with that task so we release
  		 * the data structures now */
-		if (!regenerate)
-			task->starpu_private = NULL;
-
 		if (destroy)
 		{
 			starpu_job_delete(j);
@@ -188,7 +185,7 @@ static unsigned _starpu_not_all_task_deps_are_fulfilled(starpu_job_t j)
 	struct starpu_cg_list_s *job_successors = &j->job_successors;
 
 #warning TODO use locks !
-	if (job_successors->ndeps != job_successors->ndeps_completed)
+	if (!j->submitted || (job_successors->ndeps != job_successors->ndeps_completed))
 	{
 		ret = 1;
 	}

+ 1 - 0
src/core/jobs.h

@@ -66,6 +66,7 @@ LIST_TYPE(starpu_job,
 	unsigned footprint_is_computed;
 	uint32_t footprint;
 
+	unsigned submitted;
 	unsigned terminated;
 );
 

+ 2 - 0
src/core/task.c

@@ -125,6 +125,8 @@ int _starpu_submit_job(starpu_job_t j)
 {
 	_starpu_increment_nsubmitted_tasks();
 
+	j->submitted = 1;
+
 	return _starpu_enforce_deps_and_schedule(j);
 }