瀏覽代碼

Provide the starpu_tag_wait: applications may wait after a tag instead of
synchronizing by the means of callbacks and semaphores.

Cédric Augonnet 16 年之前
父節點
當前提交
b465115925
共有 3 個文件被更改,包括 56 次插入3 次删除
  1. 1 0
      include/starpu-task.h
  2. 49 1
      src/core/dependencies/tags.c
  3. 6 2
      src/core/dependencies/tags.h

+ 1 - 0
include/starpu-task.h

@@ -136,6 +136,7 @@ void starpu_load_cuda_function(int devid, struct starpu_cuda_function_s *functio
 void starpu_tag_remove(starpu_tag_t id);
 void starpu_tag_declare_deps_array(starpu_tag_t id, unsigned ndeps, starpu_tag_t *array);
 void starpu_tag_declare_deps(starpu_tag_t id, unsigned ndeps, ...);
+void starpu_tag_wait(starpu_tag_t id);
 
 struct starpu_task *starpu_task_create(void);
 int starpu_submit_task(struct starpu_task *task);

+ 49 - 1
src/core/dependencies/tags.c

@@ -62,6 +62,10 @@ static struct tag_s *tag_init(starpu_tag_t id)
 
 	tag->job = NULL;
 
+	/* initializing a mutex and a cond variable is a little expensive, so
+ 	 * we don't initialize them until they are needed */
+	tag->someone_is_waiting = 0;
+
 	return tag;
 }
 
@@ -77,6 +81,13 @@ void starpu_tag_remove(starpu_tag_t id)
 		free(tag->succ);
 #endif
 
+	/* the condition variable is only allocated if somebody starts waiting */
+	if (tag && tag->someone_is_waiting) 
+	{
+		pthread_cond_destroy(&tag->finished_cond);
+		pthread_mutex_destroy(&tag->finished_mutex);
+	}
+
 	release_mutex(&tag_mutex);
 
 	free(tag);
@@ -105,7 +116,7 @@ struct tag_s *gettag_struct(starpu_tag_t id)
 	return tag;
 }
 
-void notify_cg(cg_t *cg)
+static void notify_cg(cg_t *cg)
 {
 
 	STARPU_ASSERT(cg);
@@ -173,6 +184,14 @@ void notify_dependencies(struct job_s *j)
 		{
 			notify_cg(tag->succ[succ]);
 		}
+
+		/* the application may be waiting on this tag to finish */
+		if (tag->someone_is_waiting)
+		{
+			pthread_mutex_lock(&tag->finished_mutex);
+			pthread_cond_broadcast(&tag->finished_cond);
+			pthread_mutex_unlock(&tag->finished_mutex);
+		}
 	}
 }
 
@@ -237,6 +256,35 @@ void starpu_tag_declare_deps(starpu_tag_t id, unsigned ndeps, ...)
 	va_end(pa);
 }
 
+/* this function may be called by the application (outside callbacks !) */
+void starpu_tag_wait(starpu_tag_t id)
+{
+	struct tag_s *tag = gettag_struct(id);
+
+	take_mutex(&tag->lock);
+
+	if (tag->state == DONE)
+	{
+		/* the corresponding task is already finished */
+		release_mutex(&tag->lock);
+		return;
+	} 
+
+	if (!tag->someone_is_waiting)
+	{
+		/* condition variable is not allocated yet */
+		tag->someone_is_waiting = 1;
+		pthread_mutex_init(&tag->finished_mutex, NULL);
+		pthread_cond_init(&tag->finished_cond, NULL);
+	}
+
+	release_mutex(&tag->lock);
+
+	pthread_mutex_lock(&tag->finished_mutex);
+	pthread_cond_wait(&tag->finished_cond, &tag->finished_mutex);
+	pthread_mutex_unlock(&tag->finished_mutex);
+}
+
 void tag_set_ready(struct tag_s *tag)
 {
 	/* mark this tag as ready to run */

+ 6 - 2
src/core/dependencies/tags.h

@@ -44,7 +44,7 @@ typedef enum {
 struct job_s;
 
 struct tag_s {
-	starpu_mutex lock; /* do we really need that ? */
+	starpu_mutex lock;
 	starpu_tag_t id; /* an identifier for the task */
 	tag_state state;
 	unsigned nsuccs; /* how many successors ? */
@@ -55,6 +55,11 @@ struct tag_s {
 	struct _cg_t *succ[NMAXDEPS];
 #endif
 	struct job_s *job; /* which job is associated to the tag if any ? */
+	
+	/* the application may wait on a tag to finish */
+	unsigned someone_is_waiting;
+	pthread_mutex_t finished_mutex;
+	pthread_cond_t finished_cond;
 };
 
 typedef struct _cg_t {
@@ -62,7 +67,6 @@ typedef struct _cg_t {
 	struct tag_s *tag; /* which tags depends on that cg ?  */
 } cg_t;
 
-void notify_cg(cg_t *cg);
 void starpu_tag_declare_deps(starpu_tag_t id, unsigned ndeps, ...);
 
 cg_t *create_cg(unsigned ntags, struct tag_s *tag);