ソースを参照

A completion group may unlock either a tag, a task, or the application itself.

Cédric Augonnet 15 年 前
コミット
d057e92735
共有2 個のファイルを変更した94 個の追加63 個の削除を含む
  1. 22 8
      src/core/dependencies/cg.h
  2. 72 55
      src/core/dependencies/tags.c

+ 22 - 8
src/core/dependencies/cg.h

@@ -43,20 +43,34 @@ struct cg_list_s {
 #endif
 };
 
+#define CG_APPS	(1<<0)
+#define CG_TAG	(1<<1)
+#define CG_TASK	(1<<2)
+
 /* Completion Group */
 typedef struct cg_s {
 	unsigned ntags; /* number of tags depended on */
 	unsigned remaining; /* number of remaining tags */
-	struct tag_s *tag; /* which tags depends on that cg ?  */
 
-	unsigned completed;
+	unsigned cg_type; /* CG_APPS or CG_TAG or CG_TASK */
+
+	union {
+		/* CG_TAG */
+		struct tag_s *tag;
+
+		/* CG_TASK */
+		struct job_s *succ_job;
 
-	/* in case this completion group is related to an application, we have
- 	 * to explicitely wake the waiting thread instead of reschedule the
-	 * corresponding task */
-	unsigned used_by_apps;
-	pthread_mutex_t cg_mutex;
-	pthread_cond_t cg_cond;
+		/* CG_APPS */
+		/* in case this completion group is related to an application,
+		 * we have to explicitely wake the waiting thread instead of
+		 * reschedule the corresponding task */
+		struct {
+			unsigned completed;
+			pthread_mutex_t cg_mutex;
+			pthread_cond_t cg_cond;
+		} succ_apps;
+	} succ;
 } cg_t;
 
 #endif // __CG_H__

+ 72 - 55
src/core/dependencies/tags.c

@@ -26,29 +26,34 @@
 static htbl_node_t *tag_htbl = NULL;
 static pthread_rwlock_t tag_global_rwlock = PTHREAD_RWLOCK_INITIALIZER;
 
-static cg_t *create_cg(unsigned ntags, struct tag_s *tag, unsigned is_apps_cg)
+static cg_t *create_cg_apps(unsigned ntags)
 {
-	cg_t *cg;
+	cg_t *cg = malloc(sizeof(cg_t));
+	STARPU_ASSERT(cg);
+
+	cg->ntags = ntags;
+	cg->remaining = ntags;
+	cg->cg_type = CG_APPS;
+
+	cg->succ.succ_apps.completed = 0;
+	pthread_mutex_init(&cg->succ.succ_apps.cg_mutex, NULL);
+	pthread_cond_init(&cg->succ.succ_apps.cg_cond, NULL);
+
+	return cg;
+}
 
-	cg = malloc(sizeof(cg_t));
+
+static cg_t *create_cg_tag(unsigned ntags, struct tag_s *tag)
+{
+	cg_t *cg = malloc(sizeof(cg_t));
 	STARPU_ASSERT(cg);
-	if (cg) {
-		cg->ntags = ntags;
-		cg->remaining = ntags;
-		cg->completed = 0;
-		cg->used_by_apps = is_apps_cg;
 
-		if (is_apps_cg)
-		{
-			pthread_mutex_init(&cg->cg_mutex, NULL);
-			pthread_cond_init(&cg->cg_cond, NULL);
-		}
-		else
-		{
-			cg->tag = tag;
-			tag->tag_successors.ndeps++;
-		}
-	}
+	cg->ntags = ntags;
+	cg->remaining = ntags;
+	cg->cg_type = CG_TAG;
+
+	cg->succ.tag = tag;
+	tag->tag_successors.ndeps++;
 
 	return cg;
 }
@@ -100,12 +105,11 @@ void starpu_tag_remove(starpu_tag_t id)
 		for (succ = 0; succ < nsuccs; succ++)
 		{
 			struct cg_s *cg = tag->tag_successors.succ[succ];
-			unsigned used_by_apps = cg->used_by_apps;
 
 			unsigned ntags = STARPU_ATOMIC_ADD(&cg->ntags, -1);
 			unsigned remaining __attribute__ ((unused)) = STARPU_ATOMIC_ADD(&cg->remaining, -1);
 
-			if (!ntags && !used_by_apps)
+			if (!ntags && (cg->cg_type == CG_TAG))
 				/* Last tag this cg depends on, cg becomes unreferenced */
 				free(cg);
 		}
@@ -175,28 +179,40 @@ static void notify_cg(cg_t *cg)
 	unsigned remaining = STARPU_ATOMIC_ADD(&cg->remaining, -1);
 	if (remaining == 0) {
 		cg->remaining = cg->ntags;
-		/* the group is now completed */
-		if (cg->used_by_apps)
-		{
-			/* this is a cg for an application waiting on a set of
- 			 * tags, wake the thread */
-			pthread_mutex_lock(&cg->cg_mutex);
-			cg->completed = 1;
-			pthread_cond_signal(&cg->cg_cond);
-			pthread_mutex_unlock(&cg->cg_mutex);
-		}
-		else
-		{
-			struct tag_s *tag = cg->tag;
-			struct cg_list_s *tag_successors = &tag->tag_successors;
 
-			tag_successors->ndeps_completed++;
+		struct tag_s *tag;
+		struct cg_list_s *tag_successors;
 
-			if ((tag->state == BLOCKED) &&
-				(tag_successors->ndeps == tag_successors->ndeps_completed)) {
-				tag_successors->ndeps_completed = 0;
-				_starpu_tag_set_ready(tag);
-			}
+		/* the group is now completed */
+		switch (cg->cg_type) {
+			case CG_APPS:
+				/* this is a cg for an application waiting on a set of
+	 			 * tags, wake the thread */
+				pthread_mutex_lock(&cg->succ.succ_apps.cg_mutex);
+				cg->succ.succ_apps.completed = 1;
+				pthread_cond_signal(&cg->succ.succ_apps.cg_cond);
+				pthread_mutex_unlock(&cg->succ.succ_apps.cg_mutex);
+				break;
+
+			case CG_TAG:
+				tag = cg->succ.tag;
+				tag_successors = &tag->tag_successors;
+	
+				tag_successors->ndeps_completed++;
+	
+				if ((tag->state == BLOCKED) &&
+					(tag_successors->ndeps == tag_successors->ndeps_completed)) {
+					tag_successors->ndeps_completed = 0;
+					_starpu_tag_set_ready(tag);
+				}
+				break;
+
+			case CG_TASK:
+				/* TODO */
+				break;
+
+			default:
+				STARPU_ABORT();
 		}
 	}
 }
@@ -250,14 +266,15 @@ static void _starpu_notify_tag_dependencies(struct tag_s *tag)
 	for (succ = 0; succ < nsuccs; succ++)
 	{
 		struct cg_s *cg = tag_successors->succ[succ];
-		unsigned used_by_apps = cg->used_by_apps;
-		struct tag_s *cgtag = cg->tag;
+		struct tag_s *cgtag = cg->succ.tag;
+
+		unsigned cg_type = cg->cg_type;
 
-		if (!used_by_apps)
+		if (cg_type == CG_TAG)
 			starpu_spin_lock(&cgtag->lock);
 
 		notify_cg(cg);
-		if (used_by_apps) {
+		if (cg_type == CG_APPS) {
 			/* Remove the temporary ref to the cg */
 			memmove(&tag_successors->succ[succ], &tag_successors->succ[succ+1], (nsuccs-(succ+1)) * sizeof(tag_successors->succ[succ]));
 			succ--;
@@ -265,7 +282,7 @@ static void _starpu_notify_tag_dependencies(struct tag_s *tag)
 			tag_successors->nsuccs--;
 		}
 
-		if (!used_by_apps)
+		if (cg_type == CG_TAG)
 			starpu_spin_unlock(&cgtag->lock);
 	}
 
@@ -315,7 +332,7 @@ void starpu_tag_declare_deps_array(starpu_tag_t id, unsigned ndeps, starpu_tag_t
 
 	starpu_spin_lock(&tag_child->lock);
 
-	cg_t *cg = create_cg(ndeps, tag_child, 0);
+	cg_t *cg = create_cg_tag(ndeps, tag_child);
 
 	STARPU_ASSERT(ndeps != 0);
 	
@@ -344,7 +361,7 @@ void starpu_tag_declare_deps(starpu_tag_t id, unsigned ndeps, ...)
 
 	starpu_spin_lock(&tag_child->lock);
 
-	cg_t *cg = create_cg(ndeps, tag_child, 0);
+	cg_t *cg = create_cg_tag(ndeps, tag_child);
 
 	STARPU_ASSERT(ndeps != 0);
 	
@@ -406,7 +423,7 @@ int starpu_tag_wait_array(unsigned ntags, starpu_tag_t *id)
 	}
 	
 	/* there is at least one task that is not finished */
-	cg_t *cg = create_cg(current, NULL, 1);
+	cg_t *cg = create_cg_apps(current);
 
 	for (i = 0; i < current; i++)
 	{
@@ -414,15 +431,15 @@ int starpu_tag_wait_array(unsigned ntags, starpu_tag_t *id)
 		starpu_spin_unlock(&tag_array[i]->lock);
 	}
 
-	pthread_mutex_lock(&cg->cg_mutex);
+	pthread_mutex_lock(&cg->succ.succ_apps.cg_mutex);
 
-	if (!cg->completed)
-		pthread_cond_wait(&cg->cg_cond, &cg->cg_mutex);
+	if (!cg->succ.succ_apps.completed)
+		pthread_cond_wait(&cg->succ.succ_apps.cg_cond, &cg->succ.succ_apps.cg_mutex);
 
-	pthread_mutex_unlock(&cg->cg_mutex);
+	pthread_mutex_unlock(&cg->succ.succ_apps.cg_mutex);
 
-	pthread_mutex_destroy(&cg->cg_mutex);
-	pthread_cond_destroy(&cg->cg_cond);
+	pthread_mutex_destroy(&cg->succ.succ_apps.cg_mutex);
+	pthread_cond_destroy(&cg->succ.succ_apps.cg_cond);
 
 	free(cg);