Browse Source

Slightly rework the use of the sync_mutex lock that protects jobs from
concurrency while dealing with dependencies. Since we need to enforce
dependencies from callbacks too, we sometimes have to remember whether that
lock is already taken or not.

This should fix the the tests/cores/declare_deps_* tests.

Cédric Augonnet 15 years ago
parent
commit
5fce968f7c

+ 1 - 0
src/Makefile.am

@@ -93,6 +93,7 @@ libstarpu_la_SOURCES = 						\
 	core/errorcheck.c					\
 	core/progress_hook.c					\
 	core/dependencies/cg.c					\
+	core/dependencies/dependencies.c			\
 	core/dependencies/tags.c				\
 	core/dependencies/task_deps.c				\
 	core/dependencies/htable.c				\

+ 14 - 1
src/core/dependencies/cg.c

@@ -130,7 +130,7 @@ void _starpu_notify_cg(starpu_cg_t *cg)
 					/* Note that this also ensures that tag deps are
 					 * fulfilled. This counter is reseted only when the
 					 * dependencies are are all fulfilled) */
-					_starpu_enforce_deps_and_schedule(j);
+					_starpu_enforce_deps_and_schedule(j, 1);
 				}
 
 				break;
@@ -164,7 +164,20 @@ void _starpu_notify_cg_list(struct starpu_cg_list_s *successors)
 			_starpu_spin_lock(&cgtag->lock);
 		}
 
+		if (cg_type == STARPU_CG_TASK)
+		{
+			starpu_job_t j = cg->succ.job;
+			PTHREAD_MUTEX_LOCK(&j->sync_mutex);
+		}			
+
 		_starpu_notify_cg(cg);
+
+		if (cg_type == STARPU_CG_TASK)
+		{
+			starpu_job_t j = cg->succ.job;
+			PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
+		}			
+
 		if (cg_type == STARPU_CG_APPS) {
 			/* Remove the temporary ref to the cg */
 			memmove(&successors->succ[succ], &successors->succ[succ+1], (nsuccs-(succ+1)) * sizeof(successors->succ[succ]));

+ 39 - 0
src/core/dependencies/dependencies.c

@@ -0,0 +1,39 @@
+/*
+ * StarPU
+ * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include <common/config.h>
+#include <common/utils.h>
+#include <core/dependencies/tags.h>
+#include <core/dependencies/htable.h>
+#include <core/jobs.h>
+#include <core/policies/sched_policy.h>
+#include <core/dependencies/data_concurrency.h>
+
+/* We assume that j->sync_mutex is taken by the caller */
+void _starpu_notify_dependencies(struct starpu_job_s *j)
+{
+	STARPU_ASSERT(j);
+	STARPU_ASSERT(j->task);
+
+	/* unlock tasks depending on that task */
+	_starpu_notify_task_dependencies(j);
+	
+	/* unlock tags depending on that task */
+	if (j->task->use_tag)
+		_starpu_notify_tag_dependencies(j->tag);
+
+}

+ 1 - 14
src/core/dependencies/tags.c

@@ -172,7 +172,7 @@ static void _starpu_tag_add_succ(struct starpu_tag_s *tag, starpu_cg_t *cg)
 	}
 }
 
-static void _starpu_notify_tag_dependencies(struct starpu_tag_s *tag)
+void _starpu_notify_tag_dependencies(struct starpu_tag_s *tag)
 {
 	_starpu_spin_lock(&tag->lock);
 
@@ -184,19 +184,6 @@ static void _starpu_notify_tag_dependencies(struct starpu_tag_s *tag)
 	_starpu_spin_unlock(&tag->lock);
 }
 
-void _starpu_notify_dependencies(struct starpu_job_s *j)
-{
-	STARPU_ASSERT(j);
-	STARPU_ASSERT(j->task);
-
-	/* unlock tasks depending on that task */
-	_starpu_notify_task_dependencies(j);
-	
-	/* unlock tags depending on that task */
-	if (j->task->use_tag)
-		_starpu_notify_tag_dependencies(j->tag);
-}
-
 void starpu_tag_notify_from_apps(starpu_tag_t id)
 {
 	struct starpu_tag_s *tag = gettag_struct(id);

+ 2 - 0
src/core/dependencies/tags.h

@@ -59,6 +59,8 @@ struct starpu_tag_s {
 void starpu_tag_declare_deps(starpu_tag_t id, unsigned ndeps, ...);
 
 void _starpu_notify_dependencies(struct starpu_job_s *j);
+void _starpu_notify_tag_dependencies(struct starpu_tag_s *tag);
+
 void _starpu_tag_declare(starpu_tag_t id, struct starpu_job_s *job);
 void _starpu_tag_set_ready(struct starpu_tag_s *tag);
 

+ 14 - 7
src/core/dependencies/task_deps.c

@@ -60,23 +60,30 @@ void _starpu_notify_task_dependencies(starpu_job_t j)
 /* task depends on the tasks in task array */
 void starpu_task_declare_deps_array(struct starpu_task *task, unsigned ndeps, struct starpu_task *task_array[])
 {
-	unsigned i;
+	if (ndeps == 0)
+		return;
 
 	starpu_job_t job;
 
 	job = _starpu_get_job_associated_to_task(task);
+
+	PTHREAD_MUTEX_LOCK(&job->sync_mutex);
+
 	starpu_cg_t *cg = create_cg_task(ndeps, job);
 
-	STARPU_ASSERT(ndeps != 0);
-	
+	unsigned i;
 	for (i = 0; i < ndeps; i++)
 	{
 		struct starpu_task *dep_task = task_array[i];
 
-		job = _starpu_get_job_associated_to_task(dep_task);
+		starpu_job_t dep_job;
+		dep_job = _starpu_get_job_associated_to_task(dep_task);
 
-		PTHREAD_MUTEX_LOCK(&job->sync_mutex);
-		_starpu_task_add_succ(job, cg);
-		PTHREAD_MUTEX_UNLOCK(&job->sync_mutex);
+		PTHREAD_MUTEX_LOCK(&dep_job->sync_mutex);
+		_starpu_task_add_succ(dep_job, cg);
+		PTHREAD_MUTEX_UNLOCK(&dep_job->sync_mutex);
 	}
+
+	
+	PTHREAD_MUTEX_UNLOCK(&job->sync_mutex);
 }

+ 9 - 7
src/core/jobs.c

@@ -97,6 +97,7 @@ void _starpu_handle_job_termination(starpu_job_t j)
 	struct starpu_task *task = j->task;
 
 	/* in case there are dependencies, wake up the proper tasks */
+	PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 	j->submitted = 0;
 	_starpu_notify_dependencies(j);
 
@@ -104,7 +105,6 @@ void _starpu_handle_job_termination(starpu_job_t j)
 	 * possible to express task dependencies within the callback
 	 * function. A value of 1 means that the codelet was executed but that
 	 * the callback is not done yet. */
-	PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 	j->terminated = 1;
 	PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 
@@ -200,13 +200,14 @@ static unsigned _starpu_not_all_tag_deps_are_fulfilled(starpu_job_t j)
 	return ret;
 }
 
-static unsigned _starpu_not_all_task_deps_are_fulfilled(starpu_job_t j)
+static unsigned _starpu_not_all_task_deps_are_fulfilled(starpu_job_t j, unsigned job_is_already_locked)
 {
 	unsigned ret;
 
 	struct starpu_cg_list_s *job_successors = &j->job_successors;
 
-	PTHREAD_MUTEX_LOCK(&j->sync_mutex);	
+	if (!job_is_already_locked)
+		PTHREAD_MUTEX_LOCK(&j->sync_mutex);	
 
 	if (!j->submitted || (job_successors->ndeps != job_successors->ndeps_completed))
 	{
@@ -219,7 +220,8 @@ static unsigned _starpu_not_all_task_deps_are_fulfilled(starpu_job_t j)
 		ret = 0;
 	}
 
-	PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
+	if (!job_is_already_locked)
+		PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 
 	return ret;
 }
@@ -230,7 +232,7 @@ static unsigned _starpu_not_all_task_deps_are_fulfilled(starpu_job_t j)
  *	In order, we enforce tag, task and data dependencies. The task is
  *	passed to the scheduler only once all these constraints are fulfilled.
  */
-unsigned _starpu_enforce_deps_and_schedule(starpu_job_t j)
+unsigned _starpu_enforce_deps_and_schedule(starpu_job_t j, unsigned job_is_already_locked)
 {
 	unsigned ret;
 
@@ -239,7 +241,7 @@ unsigned _starpu_enforce_deps_and_schedule(starpu_job_t j)
 		return 0;
 
 	/* enfore task dependencies */
-	if (_starpu_not_all_task_deps_are_fulfilled(j))
+	if (_starpu_not_all_task_deps_are_fulfilled(j, job_is_already_locked))
 		return 0;
 
 	/* enforce data dependencies */
@@ -257,7 +259,7 @@ unsigned _starpu_enforce_deps_starting_from_task(starpu_job_t j)
 	unsigned ret;
 
 	/* enfore task dependencies */
-	if (_starpu_not_all_task_deps_are_fulfilled(j))
+	if (_starpu_not_all_task_deps_are_fulfilled(j, 0))
 		return 0;
 
 	/* enforce data dependencies */

+ 1 - 1
src/core/jobs.h

@@ -77,7 +77,7 @@ void _starpu_job_destroy(starpu_job_t j);
 void _starpu_wait_job(starpu_job_t j);
 
 /* try to submit job j, enqueue it if it's not schedulable yet */
-unsigned _starpu_enforce_deps_and_schedule(starpu_job_t j);
+unsigned _starpu_enforce_deps_and_schedule(starpu_job_t j, unsigned job_is_already_locked);
 unsigned _starpu_enforce_deps_starting_from_task(starpu_job_t j);
 unsigned _starpu_enforce_deps_starting_from_data(starpu_job_t j);
 

+ 1 - 1
src/core/task.c

@@ -155,7 +155,7 @@ int _starpu_submit_job(starpu_job_t j, unsigned do_not_increment_nsubmitted)
 
 	j->submitted = 1;
 
-	return _starpu_enforce_deps_and_schedule(j);
+	return _starpu_enforce_deps_and_schedule(j, 0);
 }
 
 /* application should submit new tasks to StarPU through this function */