Browse Source

Implement starpu_task_declare_deps_array to have actual dependencies not only
between tags but also between tasks as well.
A list of completion group is added to the job structure which describes the
task deps: this means that the job structure should be allocated as soon as
dependencies are expressed (by the means of starpu_task_declare_deps_array).
This job structure is not allocated during task submission anymore, but is
created in a lazy fashion instead.

Cédric Augonnet 15 years ago
parent
commit
d908f0a3b3

+ 3 - 0
include/starpu-task.h

@@ -165,6 +165,9 @@ struct starpu_task {
 void starpu_tag_declare_deps(starpu_tag_t id, unsigned ndeps, ...);
 void starpu_tag_declare_deps_array(starpu_tag_t id, unsigned ndeps, starpu_tag_t *array);
 
+/* task depends on the tasks in task array */
+void starpu_task_declare_deps_array(struct starpu_task *task, unsigned ndeps, struct starpu_task *task_array[]);
+
 int starpu_tag_wait(starpu_tag_t id);
 int starpu_tag_wait_array(unsigned ntags, starpu_tag_t *id);
 

+ 1 - 0
src/Makefile.am

@@ -94,6 +94,7 @@ libstarpu_la_SOURCES = 						\
 	core/progress_hook.c					\
 	core/dependencies/cg.c					\
 	core/dependencies/tags.c				\
+	core/dependencies/task-deps.c				\
 	core/dependencies/htable.c				\
 	core/dependencies/data-concurrency.c			\
 	core/mechanisms/queues.c				\

+ 52 - 3
src/core/dependencies/cg.c

@@ -16,6 +16,7 @@
 
 #include <starpu.h>
 #include <common/config.h>
+#include <core/jobs.h>
 #include <core/dependencies/cg.h>
 #include <core/dependencies/tags.h>
 
@@ -27,7 +28,7 @@ void _starpu_cg_list_init(struct cg_list_s *list)
 
 #ifdef DYNAMIC_DEPS_SIZE
 	/* this is a small initial default value ... may be changed */
-	list->succ_list_size = 4;
+	list->succ_list_size = 0;
 	list->succ =
 		realloc(NULL, list->succ_list_size*sizeof(struct cg_s *));
 #endif
@@ -42,7 +43,10 @@ void _starpu_add_successor_to_cg_list(struct cg_list_s *successors, cg_t *cg)
 	if (index >= successors->succ_list_size)
 	{
 		/* the successor list is too small */
-		successors->succ_list_size *= 2;
+		if (successors->succ_list_size > 0)
+			successors->succ_list_size *= 2;
+		else
+			successors->succ_list_size = 4;
 
 		/* NB: this is thread safe as the tag->lock is taken */
 		successors->succ = realloc(successors->succ, 
@@ -62,7 +66,8 @@ void _starpu_notify_cg(cg_t *cg)
 		cg->remaining = cg->ntags;
 
 		struct tag_s *tag;
-		struct cg_list_s *tag_successors;
+		struct cg_list_s *tag_successors, *job_successors;
+		job_t j;
 
 		/* the group is now completed */
 		switch (cg->cg_type) {
@@ -83,6 +88,7 @@ void _starpu_notify_cg(cg_t *cg)
 	
 				if ((tag->state == BLOCKED) &&
 					(tag_successors->ndeps == tag_successors->ndeps_completed)) {
+					/* reset the counter so that we can reuse the completion group */
 					tag_successors->ndeps_completed = 0;
 					_starpu_tag_set_ready(tag);
 				}
@@ -90,6 +96,20 @@ void _starpu_notify_cg(cg_t *cg)
 
 			case CG_TASK:
 				/* TODO */
+				j = cg->succ.job;
+
+				job_successors = &j->job_successors;
+
+				unsigned ndeps_completed =
+					STARPU_ATOMIC_ADD(&job_successors->ndeps_completed, 1);
+
+				if (job_successors->ndeps == ndeps_completed)
+				{
+					/* reset the counter so that we can reuse the completion group */
+					job_successors->ndeps_completed = 0;
+					_starpu_enforce_deps_starting_from_data(j);
+				}
+
 				break;
 
 			default:
@@ -98,4 +118,33 @@ void _starpu_notify_cg(cg_t *cg)
 	}
 }
 
+void _starpu_notify_cg_list(struct cg_list_s *successors)
+{
+	unsigned nsuccs;
+	unsigned succ;
+
+	nsuccs = successors->nsuccs;
 
+	for (succ = 0; succ < nsuccs; succ++)
+	{
+		struct cg_s *cg = successors->succ[succ];
+		struct tag_s *cgtag = cg->succ.tag;
+
+		unsigned cg_type = cg->cg_type;
+
+		if (cg_type == CG_TAG)
+			starpu_spin_lock(&cgtag->lock);
+
+		_starpu_notify_cg(cg);
+		if (cg_type == CG_APPS) {
+			/* Remove the temporary ref to the cg */
+			memmove(&successors->succ[succ], &successors->succ[succ+1], (nsuccs-(succ+1)) * sizeof(successors->succ[succ]));
+			succ--;
+			nsuccs--;
+			successors->nsuccs--;
+		}
+
+		if (cg_type == CG_TAG)
+			starpu_spin_unlock(&cgtag->lock);
+	}
+}

+ 3 - 1
src/core/dependencies/cg.h

@@ -59,7 +59,7 @@ typedef struct cg_s {
 		struct tag_s *tag;
 
 		/* CG_TASK */
-		struct job_s *succ_job;
+		struct job_s *job;
 
 		/* CG_APPS */
 		/* in case this completion group is related to an application,
@@ -76,5 +76,7 @@ typedef struct cg_s {
 void _starpu_cg_list_init(struct cg_list_s *list);
 void _starpu_add_successor_to_cg_list(struct cg_list_s *successors, cg_t *cg);
 void _starpu_notify_cg(cg_t *cg);
+void _starpu_notify_cg_list(struct cg_list_s *successors);
+void _starpu_notify_task_dependencies(struct job_s *j);
 
 #endif // __CG_H__

+ 6 - 38
src/core/dependencies/tags.c

@@ -153,13 +153,7 @@ void _starpu_tag_set_ready(struct tag_s *tag)
 	starpu_spin_unlock(&tag->lock);
 
 	/* enforce data dependencies */
-	if (_starpu_submit_job_enforce_data_deps(j))
-	{
-		starpu_spin_lock(&tag->lock);
-		return;
-	}
-
-	push_task(j);
+	_starpu_enforce_deps_starting_from_task(j);
 
 	starpu_spin_lock(&tag->lock);
 }
@@ -179,41 +173,12 @@ static void _starpu_tag_add_succ(struct tag_s *tag, cg_t *cg)
 
 static void _starpu_notify_tag_dependencies(struct tag_s *tag)
 {
-	unsigned nsuccs;
-	unsigned succ;
-
-	struct cg_list_s *tag_successors = &tag->tag_successors;
-
 	starpu_spin_lock(&tag->lock);
 
 	tag->state = DONE;
-
 	TRACE_TASK_DONE(tag);
 
-	nsuccs = tag_successors->nsuccs;
-
-	for (succ = 0; succ < nsuccs; succ++)
-	{
-		struct cg_s *cg = tag_successors->succ[succ];
-		struct tag_s *cgtag = cg->succ.tag;
-
-		unsigned cg_type = cg->cg_type;
-
-		if (cg_type == CG_TAG)
-			starpu_spin_lock(&cgtag->lock);
-
-		_starpu_notify_cg(cg);
-		if (cg_type == CG_APPS) {
-			/* Remove the temporary ref to the cg */
-			memmove(&tag_successors->succ[succ], &tag_successors->succ[succ+1], (nsuccs-(succ+1)) * sizeof(tag_successors->succ[succ]));
-			succ--;
-			nsuccs--;
-			tag_successors->nsuccs--;
-		}
-
-		if (cg_type == CG_TAG)
-			starpu_spin_unlock(&cgtag->lock);
-	}
+	_starpu_notify_cg_list(&tag->tag_successors);
 
 	starpu_spin_unlock(&tag->lock);
 }
@@ -222,8 +187,11 @@ void _starpu_notify_dependencies(struct job_s *j)
 {
 	STARPU_ASSERT(j);
 	STARPU_ASSERT(j->task);
+
+	/* unlock tasks depending on that task */
+	_starpu_notify_task_dependencies(j);
 	
-	/* in case there are dependencies, wake up the proper tasks */
+	/* unlock tags depending on that task */
 	if (j->task->use_tag)
 		_starpu_notify_tag_dependencies(j->tag);
 }

+ 81 - 5
src/core/jobs.c

@@ -51,6 +51,8 @@ job_t __attribute__((malloc)) _starpu_job_create(struct starpu_task *task)
 	job->footprint_is_computed = 0;
 	job->terminated = 0;
 
+	_starpu_cg_list_init(&job->job_successors);
+
 	pthread_mutex_init(&job->sync_mutex, NULL);
 	pthread_cond_init(&job->sync_cond, NULL);
 
@@ -66,11 +68,16 @@ void starpu_wait_job(job_t j)
 	STARPU_ASSERT(!j->task->detach);
 
 	pthread_mutex_lock(&j->sync_mutex);
+
 	if (!j->terminated)
 		pthread_cond_wait(&j->sync_cond, &j->sync_mutex);
+
+	/* reset the job state so that it can be reused again */
+	j->terminated = 0;
+
 	pthread_mutex_unlock(&j->sync_mutex);
 
-	job_delete(j);
+	//job_delete(j);
 }
 
 void _starpu_handle_job_termination(job_t j)
@@ -117,11 +124,14 @@ void _starpu_handle_job_termination(job_t j)
 	else {
 		/* no one is going to synchronize with that task so we release
  		 * the data structures now */
-		if (detach && !regenerate)
-			job_delete(j);
+		if (!regenerate)
+			task->starpu_private = NULL;
 
 		if (destroy)
+		{
+			job_delete(j);
 			free(task);
+		}
 	}
 
 	_starpu_decrement_nsubmitted_tasks();
@@ -137,8 +147,8 @@ void _starpu_handle_job_termination(job_t j)
 }
 
 /* This function is called when a new task is submitted to StarPU 
- * it returns 1 if the task deps are not fulfilled, 0 otherwise */
-static unsigned _starpu_not_all_task_deps_are_fulfilled(job_t j)
+ * it returns 1 if the tag deps are not fulfilled, 0 otherwise */
+static unsigned _starpu_not_all_tag_deps_are_fulfilled(job_t j)
 {
 	unsigned ret;
 
@@ -171,10 +181,41 @@ static unsigned _starpu_not_all_task_deps_are_fulfilled(job_t j)
 	return ret;
 }
 
+static unsigned _starpu_not_all_task_deps_are_fulfilled(job_t j)
+{
+	unsigned ret;
+
+	struct cg_list_s *job_successors = &j->job_successors;
+
+#warning TODO use locks !
+	if (job_successors->ndeps != job_successors->ndeps_completed)
+	{
+		ret = 1;
+	}
+	else {
+		/* existing deps (if any) are fulfilled */
+		/* already prepare for next run */
+		job_successors->ndeps_completed = 0;
+		ret = 0;
+	}
+
+	return ret;
+}
+
+
+
+/*
+ *	In order, we enforce tag, task and data dependencies. The task is
+ *	passed to the scheduler only once all these constraints are fulfilled.
+ */
 unsigned _starpu_enforce_deps_and_schedule(job_t j)
 {
 	unsigned ret;
 
+	/* enfore tag dependencies */
+	if (_starpu_not_all_tag_deps_are_fulfilled(j))
+		return 0;
+
 	/* enfore task dependencies */
 	if (_starpu_not_all_task_deps_are_fulfilled(j))
 		return 0;
@@ -188,6 +229,41 @@ unsigned _starpu_enforce_deps_and_schedule(job_t j)
 	return ret;
 }
 
+/* Tag deps are already fulfilled */
+unsigned _starpu_enforce_deps_starting_from_task(job_t j)
+{
+	unsigned ret;
+
+	/* enfore task dependencies */
+	if (_starpu_not_all_task_deps_are_fulfilled(j))
+		return 0;
+
+	/* enforce data dependencies */
+	if (_starpu_submit_job_enforce_data_deps(j))
+		return 0;
+
+	ret = push_task(j);
+
+	return ret;
+}
+
+/* Tag and task deps are already fulfilled */
+unsigned _starpu_enforce_deps_starting_from_data(job_t j)
+{
+	unsigned ret;
+
+	/* enforce data dependencies */
+	if (_starpu_submit_job_enforce_data_deps(j))
+		return 0;
+
+	ret = push_task(j);
+
+	return ret;
+}
+
+
+
+
 struct job_s *_starpu_pop_local_task(struct worker_s *worker)
 {
 	struct job_s *j = NULL;

+ 4 - 0
src/core/jobs.h

@@ -58,6 +58,7 @@ LIST_TYPE(job,
 	pthread_cond_t sync_cond;
 
 	struct tag_s *tag;
+	struct cg_list_s job_successors;
 
 	double predicted;
 	double penality;
@@ -73,6 +74,9 @@ void starpu_wait_job(job_t j);
 
 /* try to submit job j, enqueue it if it's not schedulable yet */
 unsigned _starpu_enforce_deps_and_schedule(job_t j);
+unsigned _starpu_enforce_deps_starting_from_task(job_t j);
+unsigned _starpu_enforce_deps_starting_from_data(job_t j);
+
 
 //#warning this must not be exported anymore ... 
 //job_t _starpu_job_create(struct starpu_task *task);

+ 24 - 3
src/core/task.c

@@ -108,6 +108,19 @@ int starpu_wait_task(struct starpu_task *task)
 	return 0;
 }
 
+job_t _starpu_get_job_associated_to_task(struct starpu_task *task)
+{
+	STARPU_ASSERT(task);
+
+	if (!task->starpu_private)
+	{
+		job_t j = _starpu_job_create(task);
+		task->starpu_private = j;
+	}
+
+	return (struct job_s *)task->starpu_private;
+}
+
 int _starpu_submit_job(job_t j)
 {
 	_starpu_increment_nsubmitted_tasks();
@@ -149,10 +162,18 @@ int starpu_submit_task(struct starpu_task *task)
 
 
 	/* internally, StarPU manipulates a job_t which is a wrapper around a
- 	* task structure */
-	job_t j = _starpu_job_create(task);
+	* task structure, it is possible that this job structure was already
+	* allocated, for instance to enforce task depenencies. */
+	job_t j;
 
-	task->starpu_private = j;
+	if (!task->starpu_private)
+	{
+		j = _starpu_job_create(task);
+		task->starpu_private = j;
+	}
+	else {
+		j = (struct job_s *)task->starpu_private;
+	}
 
 	ret = _starpu_submit_job(j);
 

+ 1 - 0
src/core/task.h

@@ -26,5 +26,6 @@ void _starpu_increment_nsubmitted_tasks(void);
 void _starpu_decrement_nsubmitted_tasks(void);
 
 int _starpu_submit_job(job_t j);
+job_t _starpu_get_job_associated_to_task(struct starpu_task *task);
 
 #endif // __CORE_TASK_H__

+ 4 - 0
tests/Makefile.am

@@ -82,6 +82,7 @@ check_PROGRAMS += 				\
 	core/empty_task				\
 	core/empty_task_sync_point		\
 	core/tag-wait-api			\
+	core/task-wait-api			\
 	datawizard/sync_and_notify_data		\
 	datawizard/dsm_stress			\
 	datawizard/write_only_tmp_buffer	\
@@ -140,6 +141,9 @@ core_empty_task_sync_point_SOURCES =		\
 core_tag_wait_api_SOURCES =			\
 	core/tag-wait-api.c
 
+core_task_wait_api_SOURCES =			\
+	core/task-wait-api.c
+
 datawizard_dsm_stress_SOURCES =			\
 	datawizard/dsm_stress.c