Bladeren bron

The notion of completion group is not necessarily related to tags, we may use
it for tasks as well, so we make both tags and cg independant.

Cédric Augonnet 15 jaren geleden
bovenliggende
commit
5907a3851e
5 gewijzigde bestanden met toevoegingen van 104 en 64 verwijderingen
  1. 1 0
      src/Makefile.am
  2. 62 0
      src/core/dependencies/cg.h
  3. 30 23
      src/core/dependencies/tags.c
  4. 7 39
      src/core/dependencies/tags.h
  5. 4 2
      src/core/jobs.c

+ 1 - 0
src/Makefile.am

@@ -28,6 +28,7 @@ libstarpu_la_LDFLAGS = --no-undefined
 
 noinst_HEADERS = 						\
 	core/dependencies/data-concurrency.h			\
+	core/dependencies/cg.h					\
 	core/dependencies/tags.h				\
 	core/dependencies/htable.h				\
 	core/policies/eager-central-priority-policy.h		\

+ 62 - 0
src/core/dependencies/cg.h

@@ -0,0 +1,62 @@
+/*
+ * StarPU
+ * Copyright (C) INRIA 2008-2010 (see AUTHORS file)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __CG_H__
+#define __CG_H__
+
+#include <starpu.h>
+#include <common/config.h>
+
+/* we do not necessarily want to allocate room for 256 dependencies, but we
+   want to handle the few situation where there are a lot of dependencies as
+   well */
+#define DYNAMIC_DEPS_SIZE	1
+
+/* randomly choosen ! */
+#ifndef DYNAMIC_DEPS_SIZE
+#define NMAXDEPS	256
+#endif
+
+/* Completion Group list */
+struct cg_list_s {
+	unsigned nsuccs; /* how many successors ? */
+	unsigned ndeps; /* how many deps ? */
+	unsigned ndeps_completed; /* how many deps are done ? */
+#ifdef DYNAMIC_DEPS_SIZE
+	unsigned succ_list_size;
+	struct cg_s **succ;
+#else
+	struct cg_s *succ[NMAXDEPS];
+#endif
+};
+
+/* Completion Group */
+typedef struct cg_s {
+	unsigned ntags; /* number of tags depended on */
+	unsigned remaining; /* number of remaining tags */
+	struct tag_s *tag; /* which tags depends on that cg ?  */
+
+	unsigned completed;
+
+	/* in case this completion group is related to an application, we have
+ 	 * to explicitely wake the waiting thread instead of reschedule the
+	 * corresponding task */
+	unsigned used_by_apps;
+	pthread_mutex_t cg_mutex;
+	pthread_cond_t cg_cond;
+} cg_t;
+
+#endif // __CG_H__

+ 30 - 23
src/core/dependencies/tags.c

@@ -46,7 +46,7 @@ static cg_t *create_cg(unsigned ntags, struct tag_s *tag, unsigned is_apps_cg)
 		else
 		{
 			cg->tag = tag;
-			tag->ndeps++;
+			tag->tag_successors.ndeps++;
 		}
 	}
 
@@ -65,14 +65,15 @@ static struct tag_s *_starpu_tag_init(starpu_tag_t id)
 
 	tag->id = id;
 	tag->state = INVALID_STATE;
-	tag->nsuccs = 0;
-	tag->ndeps = 0;
-	tag->ndeps_completed = 0;
+	tag->tag_successors.nsuccs = 0;
+	tag->tag_successors.ndeps = 0;
+	tag->tag_successors.ndeps_completed = 0;
 
 #ifdef DYNAMIC_DEPS_SIZE
 	/* this is a small initial default value ... may be changed */
-	tag->succ_list_size = 4;
-	tag->succ = realloc(NULL, tag->succ_list_size*sizeof(struct cg_s *));
+	tag->tag_successors.succ_list_size = 4;
+	tag->tag_successors.succ =
+		realloc(NULL, tag->tag_successors.succ_list_size*sizeof(struct cg_s *));
 #endif
 
 	starpu_spin_init(&tag->lock);
@@ -93,12 +94,12 @@ void starpu_tag_remove(starpu_tag_t id)
 	if (tag) {
 		starpu_spin_lock(&tag->lock);
 
-		unsigned nsuccs = tag->nsuccs;
+		unsigned nsuccs = tag->tag_successors.nsuccs;
 		unsigned succ;
 
 		for (succ = 0; succ < nsuccs; succ++)
 		{
-			struct cg_s *cg = tag->succ[succ];
+			struct cg_s *cg = tag->tag_successors.succ[succ];
 			unsigned used_by_apps = cg->used_by_apps;
 
 			unsigned ntags = STARPU_ATOMIC_ADD(&cg->ntags, -1);
@@ -110,7 +111,7 @@ void starpu_tag_remove(starpu_tag_t id)
 		}
 
 #ifdef DYNAMIC_DEPS_SIZE
-		free(tag->succ);
+		free(tag->tag_successors.succ);
 #endif
 
 		starpu_spin_unlock(&tag->lock);
@@ -187,11 +188,13 @@ static void notify_cg(cg_t *cg)
 		else
 		{
 			struct tag_s *tag = cg->tag;
-			tag->ndeps_completed++;
+			struct cg_list_s *tag_successors = &tag->tag_successors;
 
-			if ((tag->state == BLOCKED) 
-				&& (tag->ndeps == tag->ndeps_completed)) {
-				tag->ndeps_completed = 0;
+			tag_successors->ndeps_completed++;
+
+			if ((tag->state == BLOCKED) &&
+				(tag_successors->ndeps == tag_successors->ndeps_completed)) {
+				tag_successors->ndeps_completed = 0;
 				_starpu_tag_set_ready(tag);
 			}
 		}
@@ -203,23 +206,25 @@ static void _starpu_tag_add_succ(struct tag_s *tag, cg_t *cg)
 {
 	STARPU_ASSERT(tag);
 
+	struct cg_list_s *tag_successors = &tag->tag_successors;
+
 	/* where should that cg should be put in the array ? */
-	unsigned index = STARPU_ATOMIC_ADD(&tag->nsuccs, 1) - 1;
+	unsigned index = STARPU_ATOMIC_ADD(&tag_successors->nsuccs, 1) - 1;
 
 #ifdef DYNAMIC_DEPS_SIZE
-	if (index >= tag->succ_list_size)
+	if (index >= tag_successors->succ_list_size)
 	{
 		/* the successor list is too small */
-		tag->succ_list_size *= 2;
+		tag_successors->succ_list_size *= 2;
 
 		/* NB: this is thread safe as the tag->lock is taken */
-		tag->succ = realloc(tag->succ, 
-			tag->succ_list_size*sizeof(struct cg_s *));
+		tag_successors->succ = realloc(tag_successors->succ, 
+			tag_successors->succ_list_size*sizeof(struct cg_s *));
 	}
 #else
 	STARPU_ASSERT(index < NMAXDEPS);
 #endif
-	tag->succ[index] = cg;
+	tag_successors->succ[index] = cg;
 
 	if (tag->state == DONE) {
 		/* the tag was already completed sooner */
@@ -232,17 +237,19 @@ static void _starpu_notify_tag_dependencies(struct tag_s *tag)
 	unsigned nsuccs;
 	unsigned succ;
 
+	struct cg_list_s *tag_successors = &tag->tag_successors;
+
 	starpu_spin_lock(&tag->lock);
 
 	tag->state = DONE;
 
 	TRACE_TASK_DONE(tag);
 
-	nsuccs = tag->nsuccs;
+	nsuccs = tag_successors->nsuccs;
 
 	for (succ = 0; succ < nsuccs; succ++)
 	{
-		struct cg_s *cg = tag->succ[succ];
+		struct cg_s *cg = tag_successors->succ[succ];
 		unsigned used_by_apps = cg->used_by_apps;
 		struct tag_s *cgtag = cg->tag;
 
@@ -252,10 +259,10 @@ static void _starpu_notify_tag_dependencies(struct tag_s *tag)
 		notify_cg(cg);
 		if (used_by_apps) {
 			/* Remove the temporary ref to the cg */
-			memmove(&tag->succ[succ], &tag->succ[succ+1], (nsuccs-(succ+1)) * sizeof(tag->succ[succ]));
+			memmove(&tag_successors->succ[succ], &tag_successors->succ[succ+1], (nsuccs-(succ+1)) * sizeof(tag_successors->succ[succ]));
 			succ--;
 			nsuccs--;
-			tag->nsuccs--;
+			tag_successors->nsuccs--;
 		}
 
 		if (!used_by_apps)

+ 7 - 39
src/core/dependencies/tags.h

@@ -1,6 +1,6 @@
 /*
  * StarPU
- * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
+ * Copyright (C) INRIA 2008-2010 (see AUTHORS file)
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -17,20 +17,10 @@
 #ifndef __TAGS_H__
 #define __TAGS_H__
 
-#include <stdint.h>
-#include <pthread.h>
-#include <core/jobs.h>
+#include <starpu.h>
+#include <common/config.h>
 #include <common/starpu-spinlock.h>
-
-/* we do not necessarily want to allocate room for 256 dependencies, but we
-   want to handle the few situation where there are a lot of dependencies as
-   well */
-#define DYNAMIC_DEPS_SIZE	1
-
-/* randomly choosen ! */
-#ifndef DYNAMIC_DEPS_SIZE
-#define NMAXDEPS	256
-#endif
+#include <core/dependencies/cg.h>
 
 #define TAG_SIZE        (sizeof(starpu_tag_t)*8)
 
@@ -57,37 +47,15 @@ struct tag_s {
 	starpu_spinlock_t lock;
 	starpu_tag_t id; /* an identifier for the task */
 	tag_state state;
-	unsigned nsuccs; /* how many successors ? */
-	unsigned ndeps; /* how many deps ? */
-	unsigned ndeps_completed; /* how many deps are done ? */
-#ifdef DYNAMIC_DEPS_SIZE
-	unsigned succ_list_size;
-	struct cg_s **succ;
-#else
-	struct cg_s *succ[NMAXDEPS];
-#endif
+
+	struct cg_list_s tag_successors;
+
 	struct job_s *job; /* which job is associated to the tag if any ? */
 
 	unsigned is_assigned;
 	unsigned is_submitted;
 };
 
-/* Completion Group */
-typedef struct cg_s {
-	unsigned ntags; /* number of tags depended on */
-	unsigned remaining; /* number of remaining tags */
-	struct tag_s *tag; /* which tags depends on that cg ?  */
-
-	unsigned completed;
-
-	/* in case this completion group is related to an application, we have
- 	 * to explicitely wake the waiting thread instead of reschedule the
-	 * corresponding task */
-	unsigned used_by_apps;
-	pthread_mutex_t cg_mutex;
-	pthread_cond_t cg_cond;
-} cg_t;
-
 void starpu_tag_declare_deps(starpu_tag_t id, unsigned ndeps, ...);
 
 void _starpu_notify_dependencies(struct job_s *j);

+ 4 - 2
src/core/jobs.c

@@ -134,9 +134,11 @@ static unsigned _starpu_not_all_task_deps_are_fulfilled(job_t j)
 
 	struct tag_s *tag = j->tag;
 
+	struct cg_list_s *tag_successors = &tag->tag_successors;
+
 	starpu_spin_lock(&tag->lock);
 
-	if (tag->ndeps != tag->ndeps_completed)
+	if (tag_successors->ndeps != tag_successors->ndeps_completed)
 	{
 		tag->state = BLOCKED;
 		ret = 1;
@@ -145,7 +147,7 @@ static unsigned _starpu_not_all_task_deps_are_fulfilled(job_t j)
 		/* existing deps (if any) are fulfilled */
 		tag->state = READY;
 		/* already prepare for next run */
-		tag->ndeps_completed = 0;
+		tag_successors->ndeps_completed = 0;
 		ret = 0;
 	}