Browse Source

Applications may wait until a set of tags are completed using the
starpu_tag_wait_array function.

Cédric Augonnet 16 years ago
parent
commit
3215a6d117
3 changed files with 113 additions and 55 deletions
  1. 2 0
      include/starpu-task.h
  2. 102 50
      src/core/dependencies/tags.c
  3. 9 5
      src/core/dependencies/tags.h

+ 2 - 0
include/starpu-task.h

@@ -139,7 +139,9 @@ void starpu_load_cuda_function(int devid, struct starpu_cuda_function_s *functio
 void starpu_tag_remove(starpu_tag_t id);
 void starpu_tag_declare_deps_array(starpu_tag_t id, unsigned ndeps, starpu_tag_t *array);
 void starpu_tag_declare_deps(starpu_tag_t id, unsigned ndeps, ...);
+
 void starpu_tag_wait(starpu_tag_t id);
+void starpu_tag_wait_array(unsigned ntags, starpu_tag_t *id);
 
 struct starpu_task *starpu_task_create(void);
 int starpu_submit_task(struct starpu_task *task);

+ 102 - 50
src/core/dependencies/tags.c

@@ -28,7 +28,7 @@ static starpu_mutex tag_mutex = {
 	.taken = 0
 };
 
-static cg_t *create_cg(unsigned ntags, struct tag_s *tag)
+static cg_t *create_cg(unsigned ntags, struct tag_s *tag, unsigned is_apps_cg)
 {
 	cg_t *cg;
 
@@ -36,7 +36,18 @@ static cg_t *create_cg(unsigned ntags, struct tag_s *tag)
 	STARPU_ASSERT(cg);
 	if (cg) {
 		cg->ntags = ntags;
-		cg->tag = tag;
+		cg->completed = 0;
+		cg->used_by_apps = is_apps_cg;
+
+		if (is_apps_cg)
+		{
+			pthread_mutex_init(&cg->cg_mutex, NULL);
+			pthread_cond_init(&cg->cg_cond, NULL);
+		}
+		else
+		{
+			cg->tag = tag;
+		}
 	}
 
 	return cg;
@@ -63,10 +74,6 @@ static struct tag_s *tag_init(starpu_tag_t id)
 
 	init_mutex(&tag->lock);
 
-	/* initializing a mutex and a cond variable is a little expensive, so
- 	 * we don't initialize them until they are needed */
-	tag->someone_is_waiting = 0;
-
 	return tag;
 }
 
@@ -82,13 +89,6 @@ void starpu_tag_remove(starpu_tag_t id)
 		free(tag->succ);
 #endif
 
-	/* the condition variable is only allocated if somebody starts waiting */
-	if (tag && tag->someone_is_waiting) 
-	{
-		pthread_cond_destroy(&tag->finished_cond);
-		pthread_mutex_destroy(&tag->finished_mutex);
-	}
-
 	release_mutex(&tag_mutex);
 
 	free(tag);
@@ -117,16 +117,17 @@ static struct tag_s *gettag_struct(starpu_tag_t id)
 	return tag;
 }
 
+/* lock should be taken */
 static void tag_set_ready(struct tag_s *tag)
 {
-	take_mutex(&tag->lock);
+//	take_mutex(&tag->lock);
 
 	/* mark this tag as ready to run */
 	tag->state = READY;
 	/* declare it to the scheduler ! */
 	struct job_s *j = tag->job;
 
-	release_mutex(&tag->lock);
+//	release_mutex(&tag->lock);
 
 	/* perhaps the corresponding task was not declared yet */
 	if (tag->is_assigned)
@@ -147,19 +148,30 @@ static void notify_cg(cg_t *cg)
 	unsigned ntags = STARPU_ATOMIC_ADD(&cg->ntags, -1);
 	if (ntags == 0) {
 		/* the group is now completed */
-		tag_set_ready(cg->tag);
-		free(cg);
+		if (cg->used_by_apps)
+		{
+			/* this is a cg for an application waiting on a set of
+ 			 * tags, wake the thread */
+			pthread_mutex_lock(&cg->cg_mutex);
+			cg->completed = 1;
+			pthread_cond_signal(&cg->cg_cond);
+			pthread_mutex_unlock(&cg->cg_mutex);
+		}
+		else
+		{
+//			take_mutex(&cg->tag->lock);
+			tag_set_ready(cg->tag);
+//			release_mutex(&cg->tag->lock);
+			free(cg);
+		}
 	}
 }
 
-static void tag_add_succ(starpu_tag_t id, cg_t *cg)
+/* the lock must be taken ! */
+static void tag_add_succ(struct tag_s *tag, cg_t *cg)
 {
-	/* find out the associated structure */
-	struct tag_s *tag = gettag_struct(id);
 	STARPU_ASSERT(tag);
 
-	take_mutex(&tag->lock);
-
 	if (tag->state == DONE) {
 		/* the tag was already completed sooner */
 		notify_cg(cg);
@@ -207,20 +219,13 @@ void notify_dependencies(struct job_s *j)
 
 		nsuccs = tag->nsuccs;
 
-		release_mutex(&tag->lock);
 
 		for (succ = 0; succ < nsuccs; succ++)
 		{
 			notify_cg(tag->succ[succ]);
 		}
 
-		/* the application may be waiting on this tag to finish */
-		if (tag->someone_is_waiting)
-		{
-			pthread_mutex_lock(&tag->finished_mutex);
-			pthread_cond_broadcast(&tag->finished_cond);
-			pthread_mutex_unlock(&tag->finished_mutex);
-		}
+		release_mutex(&tag->lock);
 	}
 }
 
@@ -242,7 +247,10 @@ void starpu_tag_declare_deps_array(starpu_tag_t id, unsigned ndeps, starpu_tag_t
 
 	/* create the associated completion group */
 	struct tag_s *tag_child = gettag_struct(id);
-	cg_t *cg = create_cg(ndeps, tag_child);
+
+	take_mutex(&tag_child->lock);
+
+	cg_t *cg = create_cg(ndeps, tag_child, 0);
 	
 	tag_child->state = BLOCKED;
 	
@@ -255,8 +263,13 @@ void starpu_tag_declare_deps_array(starpu_tag_t id, unsigned ndeps, starpu_tag_t
 		/* id depends on dep_id
 		 * so cg should be among dep_id's successors*/
 		TRACE_CODELET_TAG_DEPS(id, dep_id);
-		tag_add_succ(dep_id, cg);
+		struct tag_s *tag_dep = gettag_struct(dep_id);
+		take_mutex(&tag_dep->lock);
+		tag_add_succ(tag_dep, cg);
+		release_mutex(&tag_dep->lock);
 	}
+
+	release_mutex(&tag_child->lock);
 }
 
 void starpu_tag_declare_deps(starpu_tag_t id, unsigned ndeps, ...)
@@ -265,7 +278,10 @@ void starpu_tag_declare_deps(starpu_tag_t id, unsigned ndeps, ...)
 	
 	/* create the associated completion group */
 	struct tag_s *tag_child = gettag_struct(id);
-	cg_t *cg = create_cg(ndeps, tag_child);
+
+	take_mutex(&tag_child->lock);
+
+	cg_t *cg = create_cg(ndeps, tag_child, 0);
 	
 	tag_child->state = BLOCKED;
 	
@@ -281,38 +297,74 @@ void starpu_tag_declare_deps(starpu_tag_t id, unsigned ndeps, ...)
 		/* id depends on dep_id
 		 * so cg should be among dep_id's successors*/
 		TRACE_CODELET_TAG_DEPS(id, dep_id);
-		tag_add_succ(dep_id, cg);
+		struct tag_s *tag_dep = gettag_struct(dep_id);
+		take_mutex(&tag_dep->lock);
+		tag_add_succ(tag_dep, cg);
+		release_mutex(&tag_dep->lock);
 	}
 	va_end(pa);
+
+	release_mutex(&tag_child->lock);
 }
 
 /* this function may be called by the application (outside callbacks !) */
-void starpu_tag_wait(starpu_tag_t id)
+void starpu_tag_wait_array(unsigned ntags, starpu_tag_t *id)
 {
-	struct tag_s *tag = gettag_struct(id);
+	unsigned i;
+	unsigned current;
 
-	take_mutex(&tag->lock);
+	struct tag_s *tag_array[ntags];
 
-	if (tag->state == DONE)
+	/* only wait the tags that are not done yet */
+	for (i = 0, current = 0; i < ntags; i++)
 	{
-		/* the corresponding task is already finished */
-		release_mutex(&tag->lock);
+		struct tag_s *tag = gettag_struct(id[i]);
+		
+		take_mutex(&tag->lock);
+
+		if (tag->state == DONE)
+		{
+			/* that tag is done already */
+			release_mutex(&tag->lock);
+		}
+		else
+		{
+			tag_array[current] = tag;
+			current++;
+		}
+	}
+
+	if (current == 0)
+	{
+		/* all deps are already fulfilled */
 		return;
-	} 
+	}
+	
+	/* there is at least one task that is not finished */
+	cg_t *cg = create_cg(current, NULL, 1);
 
-	if (!tag->someone_is_waiting)
+	for (i = 0; i < current; i++)
 	{
-		/* condition variable is not allocated yet */
-		tag->someone_is_waiting = 1;
-		pthread_mutex_init(&tag->finished_mutex, NULL);
-		pthread_cond_init(&tag->finished_cond, NULL);
+		tag_add_succ(tag_array[i], cg);
+		release_mutex(&tag_array[i]->lock);
 	}
 
-	release_mutex(&tag->lock);
+	pthread_mutex_lock(&cg->cg_mutex);
+
+	if (!cg->completed)
+		pthread_cond_wait(&cg->cg_cond, &cg->cg_mutex);
 
-	pthread_mutex_lock(&tag->finished_mutex);
-	pthread_cond_wait(&tag->finished_cond, &tag->finished_mutex);
-	pthread_mutex_unlock(&tag->finished_mutex);
+	pthread_mutex_unlock(&cg->cg_mutex);
+
+	pthread_mutex_destroy(&cg->cg_mutex);
+	pthread_cond_destroy(&cg->cg_cond);
+
+	free(cg);
+}
+
+void starpu_tag_wait(starpu_tag_t id)
+{
+	starpu_tag_wait_array(1, &id);
 }
 
 /* This function is called when a new task is submitted to StarPU 

+ 9 - 5
src/core/dependencies/tags.h

@@ -56,16 +56,20 @@ struct tag_s {
 	struct job_s *job; /* which job is associated to the tag if any ? */
 
 	unsigned is_assigned;
-	
-	/* the application may wait on a tag to finish */
-	unsigned someone_is_waiting;
-	pthread_mutex_t finished_mutex;
-	pthread_cond_t finished_cond;
 };
 
 typedef struct _cg_t {
 	unsigned ntags; /* number of remaining tags */
 	struct tag_s *tag; /* which tags depends on that cg ?  */
+
+	unsigned completed;
+
+	/* in case this completion group is related to an application, we have
+ 	 * to explicitely wake the waiting thread instead of reschedule the
+	 * corresponding task */
+	unsigned used_by_apps;
+	pthread_mutex_t cg_mutex;
+	pthread_cond_t cg_cond;
 } cg_t;
 
 void starpu_tag_declare_deps(starpu_tag_t id, unsigned ndeps, ...);