Sfoglia il codice sorgente

Continue to cleanup the handling of tasks dependencies, there are still issues.

Cédric Augonnet 16 anni fa
parent
commit
54b4d86198
3 ha cambiato i file con 60 aggiunte e 45 eliminazioni
  1. 23 20
      src/core/dependencies/tags.c
  2. 15 3
      src/core/dependencies/tags.h
  3. 22 22
      src/core/jobs.c

+ 23 - 20
src/core/dependencies/tags.c

@@ -47,6 +47,7 @@ static cg_t *create_cg(unsigned ntags, struct tag_s *tag, unsigned is_apps_cg)
 		else
 		{
 			cg->tag = tag;
+			tag->ndeps++;
 		}
 	}
 
@@ -64,8 +65,10 @@ static struct tag_s *tag_init(starpu_tag_t id)
 	tag->is_submitted = 0;
 
 	tag->id = id;
-	tag->state = READY;
+	tag->state = INVALID_STATE;
 	tag->nsuccs = 0;
+	tag->ndeps = 0;
+	tag->ndeps_completed = 0;
 
 #ifdef DYNAMIC_DEPS_SIZE
 	/* this is a small initial default value ... may be changed */
@@ -130,17 +133,14 @@ static void tag_set_ready(struct tag_s *tag)
 
 //	release_mutex(&tag->lock);
 
-	/* perhaps the corresponding task was not declared yet */
-	if (tag->is_assigned)
-	{
 #ifdef NO_DATA_RW_LOCK
-		/* enforce data dependencies */
-		if (submit_job_enforce_data_deps(j))
-			return;
+	/* enforce data dependencies */
+	if (submit_job_enforce_data_deps(j))
+		return;
 #endif
-	
-		push_task(j);
-	}
+
+	push_task(j);
+//	}
 }
 
 static void notify_cg(cg_t *cg)
@@ -161,9 +161,14 @@ static void notify_cg(cg_t *cg)
 		else
 		{
 //			take_mutex(&cg->tag->lock);
-			if (cg->tag->is_submitted)
+			struct tag_s *tag = cg->tag;
+			tag->ndeps_completed++;
+
+			if ((tag->state == BLOCKED) 
+				&& (tag->ndeps == tag->ndeps_completed))
 				tag_set_ready(cg->tag);
 //			release_mutex(&cg->tag->lock);
+
 			free(cg);
 		}
 	}
@@ -195,7 +200,6 @@ static void tag_add_succ(struct tag_s *tag, cg_t *cg)
 #else
 		STARPU_ASSERT(index < NMAXDEPS);
 #endif
-
 		tag->succ[index] = cg;
 	}
 
@@ -221,7 +225,6 @@ void notify_dependencies(struct job_s *j)
 
 		nsuccs = tag->nsuccs;
 
-
 		for (succ = 0; succ < nsuccs; succ++)
 		{
 			notify_cg(tag->succ[succ]);
@@ -241,6 +244,9 @@ void tag_declare(starpu_tag_t id, struct job_s *job)
 	tag->is_assigned = 1;
 	
 	job->tag = tag;
+
+	/* the tag is now associated to a job */
+	tag->state = ASSOCIATED;
 }
 
 void starpu_tag_declare_deps_array(starpu_tag_t id, unsigned ndeps, starpu_tag_t *array)
@@ -253,9 +259,7 @@ void starpu_tag_declare_deps_array(starpu_tag_t id, unsigned ndeps, starpu_tag_t
 	take_mutex(&tag_child->lock);
 
 	cg_t *cg = create_cg(ndeps, tag_child, 0);
-	
-	tag_child->state = BLOCKED;
-	
+
 	STARPU_ASSERT(ndeps != 0);
 	
 	for (i = 0; i < ndeps; i++)
@@ -284,10 +288,9 @@ void starpu_tag_declare_deps(starpu_tag_t id, unsigned ndeps, ...)
 	take_mutex(&tag_child->lock);
 
 	cg_t *cg = create_cg(ndeps, tag_child, 0);
-	
-	tag_child->state = BLOCKED;
-	
+
 	STARPU_ASSERT(ndeps != 0);
+
 	
 	va_list pa;
 	va_start(pa, ndeps);
@@ -295,7 +298,7 @@ void starpu_tag_declare_deps(starpu_tag_t id, unsigned ndeps, ...)
 	{
 		starpu_tag_t dep_id;
 		dep_id = va_arg(pa, starpu_tag_t);
-		
+	
 		/* id depends on dep_id
 		 * so cg should be among dep_id's successors*/
 		TRACE_CODELET_TAG_DEPS(id, dep_id);

+ 15 - 3
src/core/dependencies/tags.h

@@ -34,10 +34,20 @@
 #define TAG_SIZE        (sizeof(starpu_tag_t)*8)
 
 typedef enum {
-	DONE,
+	/* this tag is not declared by any task */
+	INVALID_STATE,
+	/* tag_declare was called to associate the tag to a task */
+	ASSOCIATED,
+	/* some task dependencies are not fulfilled yet */
+	BLOCKED,
+	/* the task can be (or has been) submitted to the scheduler (all deps
+ 	 * fulfilled) */
 	READY,
-	SCHEDULED,
-	BLOCKED
+// useless ...
+//	/* the task has been submitted to the scheduler */
+//	SCHEDULED,
+	/* the task has been performed */
+	DONE
 } tag_state;
 
 struct job_s;
@@ -47,6 +57,8 @@ struct tag_s {
 	starpu_tag_t id; /* an identifier for the task */
 	tag_state state;
 	unsigned nsuccs; /* how many successors ? */
+	unsigned ndeps; /* how many deps ? */
+	unsigned ndeps_completed; /* how many deps are done ? */
 #ifdef DYNAMIC_DEPS_SIZE
 	unsigned succ_list_size;
 	struct _cg_t **succ;

+ 22 - 22
src/core/jobs.c

@@ -151,37 +151,35 @@ static void block_sync_task(job_t j)
 
 /* This function is called when a new task is submitted to StarPU 
  * it returns 1 if the task deps are not fulfilled, 0 otherwise */
-static unsigned enforce_task_deps(job_t j)
+static unsigned not_all_task_deps_are_fulfilled(job_t j)
 {
-	if (j->task->use_tag)
+	if (!j->task->use_tag)
 	{
-		unsigned deps_are_not_fulfilled;
-		struct tag_s *tag = j->tag;
-		deps_are_not_fulfilled = (tag->state == BLOCKED);
-
-		return deps_are_not_fulfilled;
+		/* this task does not use tags, so we can go on */
+		return 0;
 	}
-	else
+
+	struct tag_s *tag = j->tag;
+
+	if (tag->ndeps != tag->ndeps_completed)
 	{
-		/* this task does not use tags, so we can go on */
+		tag->state = BLOCKED;
+		return 1;
+	}
+	else {
+		/* existing deps (if any) are fulfilled */
+		tag->state = READY;
 		return 0;
 	}
 }
 
-static enforce_deps_and_schedule(job_t j)
+static unsigned enforce_deps_and_schedule(job_t j)
 {
-	/* enfore task dependencies */
-	if (task->use_tag)
-	{
-		if (enforce_task_deps(j))
-		{
-			j->tag->is_submitted = 1;
-			return 0;
-		}
-		
-		j->tag->is_submitted = 1;
+	unsigned ret;
 
-	}
+	/* enfore task dependencies */
+	if (not_all_task_deps_are_fulfilled(j))
+		return 0;
 
 #ifdef NO_DATA_RW_LOCK
 	/* enforce data dependencies */
@@ -190,6 +188,8 @@ static enforce_deps_and_schedule(job_t j)
 #endif
 
 	ret = push_task(j);
+
+	return ret;
 }
 
 /* application should submit new tasks to StarPU through this function */
@@ -207,7 +207,7 @@ int starpu_submit_task(struct starpu_task *task)
  	* task structure */
 	job_t j = job_create(task);
 
-	enforce_deps_and_schedule(j);
+	ret = enforce_deps_and_schedule(j);
 
 	if (is_sync)
 		block_sync_task(j);