Browse Source

Fix locking around implicit dependencies

some tasks with cl==NULL are used for data dependencies, and thus appear in
last_writer/reader. they also need to be cleared from there, in case they are to
be destroyed before another writer/reader replaces them. Instead of doing it in
a callback, which is too late since we notify dependencies before that, add an
implicit_dep_handle member just for that.

Keeping the job sync_mutex held along all dependency computation (but not
always) poses deadlock issues when a job is terminating while another one
is trying to depend on it. Adding a spinlock on the completion group list
permits to decouple this: we can release it while notifying completion, and
recheck list bound after that. _starpu_add_successor_to_cg_list now returns
whether the group was completed, to keep proper atomicity. We then don't need
job_is_already_locked any more. We also don't need to check whether we have to
destroy the task any more.

_starpu_detect_implicit_data_deps_with_handle needs to submit a synchronization task, but it might be ready immediately, thus calling the callback with the job mutex held, which is not a good idea. Better make _starpu_detect_implicit_data_deps_with_handle return the task and let the caller submit it after having released the mutex

Should at last fix cg, which was the only example that really stresses keeping submitting tasks and waiting for them concurrently (with reduction in the middle, even...) :)
Samuel Thibault 13 years ago
parent
commit
fba139b857

+ 46 - 46
src/core/dependencies/cg.c

@@ -26,10 +26,13 @@
 
 
 void _starpu_cg_list_init(struct _starpu_cg_list *list)
 void _starpu_cg_list_init(struct _starpu_cg_list *list)
 {
 {
-	list->nsuccs = 0;
+	_starpu_spin_init(&list->lock);
 	list->ndeps = 0;
 	list->ndeps = 0;
 	list->ndeps_completed = 0;
 	list->ndeps_completed = 0;
 
 
+	list->terminated = 0;
+
+	list->nsuccs = 0;
 #ifdef STARPU_DYNAMIC_DEPS_SIZE
 #ifdef STARPU_DYNAMIC_DEPS_SIZE
 	/* this is a small initial default value ... may be changed */
 	/* this is a small initial default value ... may be changed */
 	list->succ_list_size = 0;
 	list->succ_list_size = 0;
@@ -55,12 +58,19 @@ void _starpu_cg_list_deinit(struct _starpu_cg_list *list)
 #ifdef STARPU_DYNAMIC_DEPS_SIZE
 #ifdef STARPU_DYNAMIC_DEPS_SIZE
 	free(list->succ);
 	free(list->succ);
 #endif
 #endif
+	_starpu_spin_destroy(&list->lock);
 }
 }
 
 
-void _starpu_add_successor_to_cg_list(struct _starpu_cg_list *successors, struct _starpu_cg *cg)
+/* Returns whether the completion was already terminated, and caller should
+ * thus immediately proceed. */
+int _starpu_add_successor_to_cg_list(struct _starpu_cg_list *successors, struct _starpu_cg *cg)
 {
 {
+	int ret;
 	STARPU_ASSERT(cg);
 	STARPU_ASSERT(cg);
 
 
+	_starpu_spin_lock(&successors->lock);
+	ret = successors->terminated;
+
 	/* where should that cg should be put in the array ? */
 	/* where should that cg should be put in the array ? */
 	unsigned index = STARPU_ATOMIC_ADD(&successors->nsuccs, 1) - 1;
 	unsigned index = STARPU_ATOMIC_ADD(&successors->nsuccs, 1) - 1;
 
 
@@ -81,6 +91,9 @@ void _starpu_add_successor_to_cg_list(struct _starpu_cg_list *successors, struct
 	STARPU_ASSERT(index < STARPU_NMAXDEPS);
 	STARPU_ASSERT(index < STARPU_NMAXDEPS);
 #endif
 #endif
 	successors->succ[index] = cg;
 	successors->succ[index] = cg;
+	_starpu_spin_unlock(&successors->lock);
+
+	return ret;
 }
 }
 
 
 /* Note: in case of a tag, it must be already locked */
 /* Note: in case of a tag, it must be already locked */
@@ -103,7 +116,7 @@ void _starpu_notify_cg(struct _starpu_cg *cg)
 			case STARPU_CG_APPS:
 			case STARPU_CG_APPS:
 			{
 			{
 				/* this is a cg for an application waiting on a set of
 				/* this is a cg for an application waiting on a set of
-	 			 * tags, wake the thread */
+				 * tags, wake the thread */
 				_STARPU_PTHREAD_MUTEX_LOCK(&cg->succ.succ_apps.cg_mutex);
 				_STARPU_PTHREAD_MUTEX_LOCK(&cg->succ.succ_apps.cg_mutex);
 				cg->succ.succ_apps.completed = 1;
 				cg->succ.succ_apps.completed = 1;
 				_STARPU_PTHREAD_COND_SIGNAL(&cg->succ.succ_apps.cg_cond);
 				_STARPU_PTHREAD_COND_SIGNAL(&cg->succ.succ_apps.cg_cond);
@@ -134,18 +147,25 @@ void _starpu_notify_cg(struct _starpu_cg *cg)
 			{
 			{
 				j = cg->succ.job;
 				j = cg->succ.job;
 
 
+				_STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
+
 				job_successors = &j->job_successors;
 				job_successors = &j->job_successors;
 
 
 				unsigned ndeps_completed =
 				unsigned ndeps_completed =
 					STARPU_ATOMIC_ADD(&job_successors->ndeps_completed, 1);
 					STARPU_ATOMIC_ADD(&job_successors->ndeps_completed, 1);
 
 
+				/* Need to atomically test submitted and check
+				 * dependencies, since this is concurrent with
+				 * _starpu_submit_job */
 				if (j->submitted && job_successors->ndeps == ndeps_completed)
 				if (j->submitted && job_successors->ndeps == ndeps_completed)
 				{
 				{
 					/* Note that this also ensures that tag deps are
 					/* Note that this also ensures that tag deps are
 					 * fulfilled. This counter is reseted only when the
 					 * fulfilled. This counter is reseted only when the
 					 * dependencies are are all fulfilled) */
 					 * dependencies are are all fulfilled) */
-					_starpu_enforce_deps_and_schedule(j, 1);
-				}
+					_starpu_enforce_deps_and_schedule(j);
+				} else
+					_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
+
 
 
 				break;
 				break;
 			}
 			}
@@ -156,21 +176,33 @@ void _starpu_notify_cg(struct _starpu_cg *cg)
 	}
 	}
 }
 }
 
 
+/* Caller just has to promise that the list will not disappear.
+ * _starpu_notify_cg_list protects the list itself.
+ * No job lock should be held, since we might want to immediately call the callback of an empty task.
+ */
 void _starpu_notify_cg_list(struct _starpu_cg_list *successors)
 void _starpu_notify_cg_list(struct _starpu_cg_list *successors)
 {
 {
-	unsigned nsuccs;
 	unsigned succ;
 	unsigned succ;
 
 
-	nsuccs = successors->nsuccs;
-
-	for (succ = 0; succ < nsuccs; succ++)
+	_starpu_spin_lock(&successors->lock);
+	successors->terminated = 1;
+	/* Note: some thread might be concurrently adding other items */
+	for (succ = 0; succ < successors->nsuccs; succ++)
 	{
 	{
 		struct _starpu_cg *cg = successors->succ[succ];
 		struct _starpu_cg *cg = successors->succ[succ];
 		STARPU_ASSERT(cg);
 		STARPU_ASSERT(cg);
+		unsigned cg_type = cg->cg_type;
 
 
-		struct _starpu_tag *cgtag = NULL;
+		if (cg_type == STARPU_CG_APPS)
+		{
+			/* Remove the temporary ref to the cg */
+			memmove(&successors->succ[succ], &successors->succ[succ+1], (successors->nsuccs-(succ+1)) * sizeof(successors->succ[succ]));
+			succ--;
+			successors->nsuccs--;
+		}
+		_starpu_spin_unlock(&successors->lock);
 
 
-		unsigned cg_type = cg->cg_type;
+		struct _starpu_tag *cgtag = NULL;
 
 
 		if (cg_type == STARPU_CG_TAG)
 		if (cg_type == STARPU_CG_TAG)
 		{
 		{
@@ -179,44 +211,12 @@ void _starpu_notify_cg_list(struct _starpu_cg_list *successors)
 			_starpu_spin_lock(&cgtag->lock);
 			_starpu_spin_lock(&cgtag->lock);
 		}
 		}
 
 
-		if (cg_type == STARPU_CG_TASK)
-		{
-			struct _starpu_job *j = cg->succ.job;
-			_STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
-		}
-
 		_starpu_notify_cg(cg);
 		_starpu_notify_cg(cg);
 
 
-		if (cg_type == STARPU_CG_TASK)
-		{
-			struct _starpu_job *j = cg->succ.job;
-
-			/* In case this task was immediately terminated, since
-			 * _starpu_notify_cg_list already hold the sync_mutex
-			 * lock, it is its reponsability to destroy the task if
-			 * needed. */
-			unsigned must_destroy_task = 0;
-			struct starpu_task *task = j->task;
-
-			if (j->submitted && j->terminated > 0 && task->destroy && task->detach)
-				must_destroy_task = 1;
-
-			_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
-
-			if (must_destroy_task)
-				_starpu_task_destroy(task);
-		}
-
-		if (cg_type == STARPU_CG_APPS)
-		{
-			/* Remove the temporary ref to the cg */
-			memmove(&successors->succ[succ], &successors->succ[succ+1], (nsuccs-(succ+1)) * sizeof(successors->succ[succ]));
-			succ--;
-			nsuccs--;
-			successors->nsuccs--;
-		}
-
 		if (cg_type == STARPU_CG_TAG)
 		if (cg_type == STARPU_CG_TAG)
 			_starpu_spin_unlock(&cgtag->lock);
 			_starpu_spin_unlock(&cgtag->lock);
+
+		_starpu_spin_lock(&successors->lock);
 	}
 	}
+	_starpu_spin_unlock(&successors->lock);
 }
 }

+ 15 - 4
src/core/dependencies/cg.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
- * Copyright (C) 2010  Université de Bordeaux 1
+ * Copyright (C) 2010, 2012  Université de Bordeaux 1
  * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
@@ -33,12 +33,23 @@
 
 
 struct _starpu_job;
 struct _starpu_job;
 
 
-/* Completion Group list */
+/* Completion Group list, records both the number of expected notifications
+ * before the completion can start, and the list of successors when the
+ * completion is finished. */
 struct _starpu_cg_list
 struct _starpu_cg_list
 {
 {
-	unsigned nsuccs; /* how many successors ? */
+	/* Protects atomicity of the list and the terminated flag */
+	struct _starpu_spinlock lock;
+
+	/* Number of notifications to be waited for */
 	unsigned ndeps; /* how many deps ? */
 	unsigned ndeps; /* how many deps ? */
 	unsigned ndeps_completed; /* how many deps are done ? */
 	unsigned ndeps_completed; /* how many deps are done ? */
+
+	/* Whether the completion is finished. */
+	unsigned terminated;
+
+	/* List of successors */
+	unsigned nsuccs; /* how many successors ? */
 #ifdef STARPU_DYNAMIC_DEPS_SIZE
 #ifdef STARPU_DYNAMIC_DEPS_SIZE
 	unsigned succ_list_size;
 	unsigned succ_list_size;
 	struct _starpu_cg **succ;
 	struct _starpu_cg **succ;
@@ -85,7 +96,7 @@ struct _starpu_cg
 
 
 void _starpu_cg_list_init(struct _starpu_cg_list *list);
 void _starpu_cg_list_init(struct _starpu_cg_list *list);
 void _starpu_cg_list_deinit(struct _starpu_cg_list *list);
 void _starpu_cg_list_deinit(struct _starpu_cg_list *list);
-void _starpu_add_successor_to_cg_list(struct _starpu_cg_list *successors, struct _starpu_cg *cg);
+int _starpu_add_successor_to_cg_list(struct _starpu_cg_list *successors, struct _starpu_cg *cg);
 void _starpu_notify_cg(struct _starpu_cg *cg);
 void _starpu_notify_cg(struct _starpu_cg *cg);
 void _starpu_notify_cg_list(struct _starpu_cg_list *successors);
 void _starpu_notify_cg_list(struct _starpu_cg_list *successors);
 void _starpu_notify_task_dependencies(struct _starpu_job *j);
 void _starpu_notify_task_dependencies(struct _starpu_job *j);

+ 2 - 2
src/core/dependencies/data_concurrency.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
- * Copyright (C) 2010-2011  Université de Bordeaux 1
+ * Copyright (C) 2010-2012  Université de Bordeaux 1
  * Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
  * Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
@@ -342,7 +342,7 @@ void _starpu_notify_data_dependencies(starpu_data_handle_t handle)
 			if (r->is_requested_by_codelet)
 			if (r->is_requested_by_codelet)
 			{
 			{
 				if (!unlock_one_requester(r))
 				if (!unlock_one_requester(r))
-					_starpu_push_task(r->j, 0);
+					_starpu_push_task(r->j);
 			}
 			}
 			else
 			else
 			{
 			{

+ 2 - 2
src/core/dependencies/dependencies.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
- * Copyright (C) 2010  Université de Bordeaux 1
+ * Copyright (C) 2010, 2012  Université de Bordeaux 1
  * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
@@ -24,7 +24,7 @@
 #include <core/sched_policy.h>
 #include <core/sched_policy.h>
 #include <core/dependencies/data_concurrency.h>
 #include <core/dependencies/data_concurrency.h>
 
 
-/* We assume that j->sync_mutex is taken by the caller */
+/* We assume that the job will not disappear under our hands */
 void _starpu_notify_dependencies(struct _starpu_job *j)
 void _starpu_notify_dependencies(struct _starpu_job *j)
 {
 {
 	STARPU_ASSERT(j);
 	STARPU_ASSERT(j);

+ 26 - 20
src/core/dependencies/implicit_data_deps.c

@@ -63,6 +63,9 @@ static void _starpu_add_reader_after_writer(starpu_data_handle_t handle, struct
 		_starpu_bound_job_id_dep(pre_sync_job, handle->last_submitted_ghost_writer_id);
 		_starpu_bound_job_id_dep(pre_sync_job, handle->last_submitted_ghost_writer_id);
 		_STARPU_DEP_DEBUG("dep ID%lu -> %p\n", handle->last_submitted_ghost_writer_id, pre_sync_task);
 		_STARPU_DEP_DEBUG("dep ID%lu -> %p\n", handle->last_submitted_ghost_writer_id, pre_sync_task);
 	}
 	}
+
+	if (!pre_sync_task->cl)
+		_starpu_get_job_associated_to_task(pre_sync_task)->implicit_dep_handle = handle;
 }
 }
 
 
 /* Write after Read (WAR) */
 /* Write after Read (WAR) */
@@ -125,7 +128,10 @@ static void _starpu_add_writer_after_readers(starpu_data_handle_t handle, struct
 	handle->last_submitted_readers = NULL;
 	handle->last_submitted_readers = NULL;
 	handle->last_submitted_writer = post_sync_task;
 	handle->last_submitted_writer = post_sync_task;
 
 
+	if (!post_sync_task->cl)
+		_starpu_get_job_associated_to_task(post_sync_task)->implicit_dep_handle = handle;
 }
 }
+
 /* Write after Write (WAW) */
 /* Write after Write (WAW) */
 static void _starpu_add_writer_after_writer(starpu_data_handle_t handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task)
 static void _starpu_add_writer_after_writer(starpu_data_handle_t handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task)
 {
 {
@@ -164,18 +170,9 @@ static void _starpu_add_writer_after_writer(starpu_data_handle_t handle, struct
 	}
 	}
 
 
 	handle->last_submitted_writer = post_sync_task;
 	handle->last_submitted_writer = post_sync_task;
-}
 
 
-static void disable_last_writer_callback(void *cl_arg)
-{
-	starpu_data_handle_t handle = (starpu_data_handle_t) cl_arg;
-
-	/* NB: we don't take the handle->sequential_consistency_mutex mutex
-	 * because the empty task that is used for synchronization is going to
-	 * be unlock in the context of a call to
-	 * _starpu_detect_implicit_data_deps_with_handle. It will therefore
-	 * already have been locked. */
-	handle->last_submitted_writer = NULL;
+	if (!post_sync_task->cl)
+		_starpu_get_job_associated_to_task(post_sync_task)->implicit_dep_handle = handle;
 }
 }
 
 
 /* This function adds the implicit task dependencies introduced by data
 /* This function adds the implicit task dependencies introduced by data
@@ -185,10 +182,13 @@ static void disable_last_writer_callback(void *cl_arg)
  * introduced by a task submission, both tasks are just the submitted task, but
  * introduced by a task submission, both tasks are just the submitted task, but
  * in the case of user interactions with the DSM, these may be different tasks.
  * in the case of user interactions with the DSM, these may be different tasks.
  * */
  * */
-/* NB : handle->sequential_consistency_mutex must be hold by the caller */
-void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task,
+/* NB : handle->sequential_consistency_mutex must be hold by the caller;
+ * returns a task, to be submitted after releasing that mutex. */
+struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task,
 						   starpu_data_handle_t handle, enum starpu_access_mode mode)
 						   starpu_data_handle_t handle, enum starpu_access_mode mode)
 {
 {
+	struct starpu_task *task = NULL;
+
 	STARPU_ASSERT(!(mode & STARPU_SCRATCH));
 	STARPU_ASSERT(!(mode & STARPU_SCRATCH));
         _STARPU_LOG_IN();
         _STARPU_LOG_IN();
 
 
@@ -200,7 +200,7 @@ void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_
 		/* Skip tasks that are associated to a reduction phase so that
 		/* Skip tasks that are associated to a reduction phase so that
 		 * they do not interfere with the application. */
 		 * they do not interfere with the application. */
 		if (pre_sync_job->reduction_task || post_sync_job->reduction_task)
 		if (pre_sync_job->reduction_task || post_sync_job->reduction_task)
-			return;
+			return NULL;
 
 
 		_STARPU_DEP_DEBUG("Tasks %p %p\n", pre_sync_task, post_sync_task);
 		_STARPU_DEP_DEBUG("Tasks %p %p\n", pre_sync_task, post_sync_task);
 		/* In case we are generating the DAG, we add an implicit
 		/* In case we are generating the DAG, we add an implicit
@@ -257,21 +257,20 @@ void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_
 				new_sync_task = starpu_task_create();
 				new_sync_task = starpu_task_create();
 				STARPU_ASSERT(new_sync_task);
 				STARPU_ASSERT(new_sync_task);
 				new_sync_task->cl = NULL;
 				new_sync_task->cl = NULL;
-				new_sync_task->callback_func = disable_last_writer_callback;
-				new_sync_task->callback_arg = handle;
 #ifdef STARPU_USE_FXT
 #ifdef STARPU_USE_FXT
 				_starpu_get_job_associated_to_task(new_sync_task)->model_name = "sync_task_redux";
 				_starpu_get_job_associated_to_task(new_sync_task)->model_name = "sync_task_redux";
 #endif
 #endif
 
 
 				_starpu_add_writer_after_readers(handle, new_sync_task, new_sync_task);
 				_starpu_add_writer_after_readers(handle, new_sync_task, new_sync_task);
 
 
-				starpu_task_submit(new_sync_task);
+				task = new_sync_task;
 			}
 			}
 			_starpu_add_reader_after_writer(handle, pre_sync_task, post_sync_task);
 			_starpu_add_reader_after_writer(handle, pre_sync_task, post_sync_task);
 		}
 		}
 		handle->last_submitted_mode = mode;
 		handle->last_submitted_mode = mode;
 	}
 	}
         _STARPU_LOG_OUT();
         _STARPU_LOG_OUT();
+	return task;
 }
 }
 
 
 /* Create the implicit dependencies for a newly submitted task */
 /* Create the implicit dependencies for a newly submitted task */
@@ -293,14 +292,17 @@ void _starpu_detect_implicit_data_deps(struct starpu_task *task)
 	{
 	{
 		starpu_data_handle_t handle = task->handles[buffer];
 		starpu_data_handle_t handle = task->handles[buffer];
 		enum starpu_access_mode mode = task->cl->modes[buffer];
 		enum starpu_access_mode mode = task->cl->modes[buffer];
+		struct starpu_task *new_task;
 
 
 		/* Scratch memory does not introduce any deps */
 		/* Scratch memory does not introduce any deps */
 		if (mode & STARPU_SCRATCH)
 		if (mode & STARPU_SCRATCH)
 			continue;
 			continue;
 
 
 		_STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
 		_STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
-		_starpu_detect_implicit_data_deps_with_handle(task, task, handle, mode);
+		new_task = _starpu_detect_implicit_data_deps_with_handle(task, task, handle, mode);
 		_STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 		_STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
+		if (new_task)
+			starpu_task_submit(new_task);
 	}
 	}
         _STARPU_LOG_OUT();
         _STARPU_LOG_OUT();
 }
 }
@@ -313,6 +315,7 @@ void _starpu_detect_implicit_data_deps(struct starpu_task *task)
  * sequence, f(Ar) g(Ar) h(Aw), we expect to have h depend on both f and g, but
  * sequence, f(Ar) g(Ar) h(Aw), we expect to have h depend on both f and g, but
  * if h is submitted after the termination of f or g, StarPU will not create a
  * if h is submitted after the termination of f or g, StarPU will not create a
  * dependency as this is not needed anymore. */
  * dependency as this is not needed anymore. */
+/* the sequential_consistency_mutex of the handle has to be already held */
 void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, starpu_data_handle_t handle)
 void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, starpu_data_handle_t handle)
 {
 {
 	_STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
 	_STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
@@ -465,7 +468,7 @@ int _starpu_data_wait_until_available(starpu_data_handle_t handle, enum starpu_a
 	int sequential_consistency = handle->sequential_consistency;
 	int sequential_consistency = handle->sequential_consistency;
 	if (sequential_consistency)
 	if (sequential_consistency)
 	{
 	{
-		struct starpu_task *sync_task;
+		struct starpu_task *sync_task, *new_task;
 		sync_task = starpu_task_create();
 		sync_task = starpu_task_create();
 		sync_task->detach = 0;
 		sync_task->detach = 0;
 		sync_task->destroy = 1;
 		sync_task->destroy = 1;
@@ -475,9 +478,12 @@ int _starpu_data_wait_until_available(starpu_data_handle_t handle, enum starpu_a
 
 
 		/* It is not really a RW access, but we want to make sure that
 		/* It is not really a RW access, but we want to make sure that
 		 * all previous accesses are done */
 		 * all previous accesses are done */
-		_starpu_detect_implicit_data_deps_with_handle(sync_task, sync_task, handle, mode);
+		new_task = _starpu_detect_implicit_data_deps_with_handle(sync_task, sync_task, handle, mode);
 		_STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 		_STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 
 
+		if (new_task)
+			starpu_task_submit(new_task);
+
 		/* TODO detect if this is superflous */
 		/* TODO detect if this is superflous */
 		int ret = starpu_task_submit(sync_task);
 		int ret = starpu_task_submit(sync_task);
 		STARPU_ASSERT(!ret);
 		STARPU_ASSERT(!ret);

+ 2 - 2
src/core/dependencies/implicit_data_deps.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
- * Copyright (C) 2010  Université de Bordeaux 1
+ * Copyright (C) 2010, 2012  Université de Bordeaux 1
  * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
@@ -21,7 +21,7 @@
 #include <starpu.h>
 #include <starpu.h>
 #include <common/config.h>
 #include <common/config.h>
 
 
-void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task,
+struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task,
 						   starpu_data_handle_t handle, enum starpu_access_mode mode);
 						   starpu_data_handle_t handle, enum starpu_access_mode mode);
 void _starpu_detect_implicit_data_deps(struct starpu_task *task);
 void _starpu_detect_implicit_data_deps(struct starpu_task *task);
 void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, starpu_data_handle_t handle);
 void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, starpu_data_handle_t handle);

+ 1 - 12
src/core/dependencies/tags.c

@@ -171,19 +171,8 @@ void _starpu_tag_set_ready(struct _starpu_tag *tag)
 	 * lock again, resulting in a deadlock. */
 	 * lock again, resulting in a deadlock. */
 	_starpu_spin_unlock(&tag->lock);
 	_starpu_spin_unlock(&tag->lock);
 
 
-	_STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
-
 	/* enforce data dependencies */
 	/* enforce data dependencies */
-	_starpu_enforce_deps_starting_from_task(j, 1);
-
-	int must_destroy = j->terminated > 0 && j->task->destroy && j->task->detach;
-
-	_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
-
-	/* If the task terminated immediately (cl == NULL), we have to destroy it ourself */
-
-	if (must_destroy)
-		_starpu_task_destroy(j->task);
+	_starpu_enforce_deps_starting_from_task(j);
 
 
 	_starpu_spin_lock(&tag->lock);
 	_starpu_spin_lock(&tag->lock);
 }
 }

+ 3 - 14
src/core/dependencies/task_deps.c

@@ -41,18 +41,13 @@ static struct _starpu_cg *create_cg_task(unsigned ntags, struct _starpu_job *j)
 	return cg;
 	return cg;
 }
 }
 
 
-/* the job lock must be taken */
 static void _starpu_task_add_succ(struct _starpu_job *j, struct _starpu_cg *cg)
 static void _starpu_task_add_succ(struct _starpu_job *j, struct _starpu_cg *cg)
 {
 {
 	STARPU_ASSERT(j);
 	STARPU_ASSERT(j);
 
 
-	_starpu_add_successor_to_cg_list(&j->job_successors, cg);
-
-	if (j->terminated)
-	{
+	if (_starpu_add_successor_to_cg_list(&j->job_successors, cg))
 		/* the task was already completed sooner */
 		/* the task was already completed sooner */
 		_starpu_notify_cg(cg);
 		_starpu_notify_cg(cg);
-	}
 }
 }
 
 
 void _starpu_notify_task_dependencies(struct _starpu_job *j)
 void _starpu_notify_task_dependencies(struct _starpu_job *j)
@@ -73,9 +68,7 @@ void _starpu_task_declare_deps_array(struct starpu_task *task, unsigned ndeps, s
 	if (check)
 	if (check)
 		STARPU_ASSERT_MSG(!job->submitted || !task->destroy || task->detach, "Task dependencies have to be set before submission");
 		STARPU_ASSERT_MSG(!job->submitted || !task->destroy || task->detach, "Task dependencies have to be set before submission");
 	else
 	else
-		STARPU_ASSERT_MSG(!job->terminated, "Task dependencies have to be set before termination");
-
-	_STARPU_PTHREAD_MUTEX_LOCK(&job->sync_mutex);
+		STARPU_ASSERT_MSG(job->terminated <= 1, "Task dependencies have to be set before termination");
 
 
 	struct _starpu_cg *cg = create_cg_task(ndeps, job);
 	struct _starpu_cg *cg = create_cg_task(ndeps, job);
 
 
@@ -91,17 +84,13 @@ void _starpu_task_declare_deps_array(struct starpu_task *task, unsigned ndeps, s
 		if (check)
 		if (check)
 			STARPU_ASSERT_MSG(!dep_job->submitted || !dep_job->task->destroy || dep_job->task->detach, "Task dependencies have to be set before submission");
 			STARPU_ASSERT_MSG(!dep_job->submitted || !dep_job->task->destroy || dep_job->task->detach, "Task dependencies have to be set before submission");
 		else
 		else
-			STARPU_ASSERT_MSG(!dep_job->terminated, "Task dependencies have to be set before termination");
+			STARPU_ASSERT_MSG(dep_job->terminated <= 1, "Task dependencies have to be set before termination");
 
 
 		_STARPU_TRACE_TASK_DEPS(dep_job, job);
 		_STARPU_TRACE_TASK_DEPS(dep_job, job);
 		_starpu_bound_task_dep(job, dep_job);
 		_starpu_bound_task_dep(job, dep_job);
 
 
-		_STARPU_PTHREAD_MUTEX_LOCK(&dep_job->sync_mutex);
 		_starpu_task_add_succ(dep_job, cg);
 		_starpu_task_add_succ(dep_job, cg);
-		_STARPU_PTHREAD_MUTEX_UNLOCK(&dep_job->sync_mutex);
 	}
 	}
-
-	_STARPU_PTHREAD_MUTEX_UNLOCK(&job->sync_mutex);
 }
 }
 
 
 void starpu_task_declare_deps_array(struct starpu_task *task, unsigned ndeps, struct starpu_task *task_array[])
 void starpu_task_declare_deps_array(struct starpu_task *task, unsigned ndeps, struct starpu_task *task_array[])

+ 44 - 46
src/core/jobs.c

@@ -89,6 +89,8 @@ struct _starpu_job* __attribute__((malloc)) _starpu_job_create(struct starpu_tas
 
 
 	_starpu_cg_list_init(&job->job_successors);
 	_starpu_cg_list_init(&job->job_successors);
 
 
+	job->implicit_dep_handle = NULL;
+
 	_STARPU_PTHREAD_MUTEX_INIT(&job->sync_mutex, NULL);
 	_STARPU_PTHREAD_MUTEX_INIT(&job->sync_mutex, NULL);
 	_STARPU_PTHREAD_COND_INIT(&job->sync_cond, NULL);
 	_STARPU_PTHREAD_COND_INIT(&job->sync_cond, NULL);
 
 
@@ -144,26 +146,30 @@ void _starpu_wait_job(struct _starpu_job *j)
         _STARPU_LOG_OUT();
         _STARPU_LOG_OUT();
 }
 }
 
 
-void _starpu_handle_job_termination(struct _starpu_job *j, unsigned job_is_already_locked)
+void _starpu_handle_job_termination(struct _starpu_job *j)
 {
 {
 	struct starpu_task *task = j->task;
 	struct starpu_task *task = j->task;
 
 
-	if (!job_is_already_locked)
-		_STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
+	_STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 
 
 	task->status = STARPU_TASK_FINISHED;
 	task->status = STARPU_TASK_FINISHED;
 
 
-	/* in case there are dependencies, wake up the proper tasks */
-	_starpu_notify_dependencies(j);
-
 	/* We must have set the j->terminated flag early, so that it is
 	/* We must have set the j->terminated flag early, so that it is
 	 * possible to express task dependencies within the callback
 	 * possible to express task dependencies within the callback
 	 * function. A value of 1 means that the codelet was executed but that
 	 * function. A value of 1 means that the codelet was executed but that
 	 * the callback is not done yet. */
 	 * the callback is not done yet. */
 	j->terminated = 1;
 	j->terminated = 1;
 
 
-	if (!job_is_already_locked)
-		_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
+
+	/* Task does not have a cl, but has explicit data dependencies, we need
+	 * to tell them that we will not exist any more before notifying the
+	 * tasks waiting for us */
+	if (j->implicit_dep_handle)
+		_starpu_release_data_enforce_sequential_consistency(j->task, j->implicit_dep_handle);
+
+	/* in case there are dependencies, wake up the proper tasks */
+	_starpu_notify_dependencies(j);
 
 
 	/* the callback is executed after the dependencies so that we may remove the tag
 	/* the callback is executed after the dependencies so that we may remove the tag
  	 * of the task itself */
  	 * of the task itself */
@@ -208,27 +214,23 @@ void _starpu_handle_job_termination(struct _starpu_job *j, unsigned job_is_alrea
 	int detach = task->detach;
 	int detach = task->detach;
 	int regenerate = task->regenerate;
 	int regenerate = task->regenerate;
 
 
-	if (!detach)
-	{
-		/* we do not desallocate the job structure if some is going to
-		 * wait after the task */
-		if (!job_is_already_locked)
-			_STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
-		/* A value of 2 is put to specify that not only the codelet but
-		 * also the callback were executed. */
-		j->terminated = 2;
-		_STARPU_PTHREAD_COND_BROADCAST(&j->sync_cond);
-
-		if (!job_is_already_locked)
-			_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
-	}
-	else
+	/* we do not desallocate the job structure if some is going to
+	 * wait after the task */
+	_STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
+	/* A value of 2 is put to specify that not only the codelet but
+	 * also the callback were executed. */
+	j->terminated = 2;
+	_STARPU_PTHREAD_COND_BROADCAST(&j->sync_cond);
+
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
+
+	if (detach)
 	{
 	{
 		/* no one is going to synchronize with that task so we release
 		/* no one is going to synchronize with that task so we release
 		 * the data structures now. In case the job was already locked
 		 * the data structures now. In case the job was already locked
 		 * by the caller, it is its responsability to destroy the task.
 		 * by the caller, it is its responsability to destroy the task.
 		 * */
 		 * */
-		if (!job_is_already_locked && destroy)
+		if (destroy)
 			_starpu_task_destroy(task);
 			_starpu_task_destroy(task);
 	}
 	}
 
 
@@ -281,18 +283,12 @@ static unsigned _starpu_not_all_tag_deps_are_fulfilled(struct _starpu_job *j)
 	return ret;
 	return ret;
 }
 }
 
 
-#ifdef STARPU_DEVEL
-#warning TODO remove the job_is_already_locked parameter
-#endif
-static unsigned _starpu_not_all_task_deps_are_fulfilled(struct _starpu_job *j, unsigned job_is_already_locked)
+static unsigned _starpu_not_all_task_deps_are_fulfilled(struct _starpu_job *j)
 {
 {
 	unsigned ret;
 	unsigned ret;
 
 
 	struct _starpu_cg_list *job_successors = &j->job_successors;
 	struct _starpu_cg_list *job_successors = &j->job_successors;
 
 
-	if (!job_is_already_locked)
-		_STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
-
 	if (!j->submitted || (job_successors->ndeps != job_successors->ndeps_completed))
 	if (!j->submitted || (job_successors->ndeps != job_successors->ndeps_completed))
 	{
 	{
                 j->task->status = STARPU_TASK_BLOCKED_ON_TASK;
                 j->task->status = STARPU_TASK_BLOCKED_ON_TASK;
@@ -306,20 +302,17 @@ static unsigned _starpu_not_all_task_deps_are_fulfilled(struct _starpu_job *j, u
 		ret = 0;
 		ret = 0;
 	}
 	}
 
 
-	if (!job_is_already_locked)
-		_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
-
 	return ret;
 	return ret;
 }
 }
 
 
 /*
 /*
  *	In order, we enforce tag, task and data dependencies. The task is
  *	In order, we enforce tag, task and data dependencies. The task is
  *	passed to the scheduler only once all these constraints are fulfilled.
  *	passed to the scheduler only once all these constraints are fulfilled.
+ *
+ *	The job mutex has to be taken for atomicity with task submission, and
+ *	is released here.
  */
  */
-#ifdef STARPU_DEVEL
-#warning TODO remove the job_is_already_locked parameter
-#endif
-unsigned _starpu_enforce_deps_and_schedule(struct _starpu_job *j, unsigned job_is_already_locked)
+unsigned _starpu_enforce_deps_and_schedule(struct _starpu_job *j)
 {
 {
 	unsigned ret;
 	unsigned ret;
         _STARPU_LOG_IN();
         _STARPU_LOG_IN();
@@ -327,16 +320,19 @@ unsigned _starpu_enforce_deps_and_schedule(struct _starpu_job *j, unsigned job_i
 	/* enfore tag dependencies */
 	/* enfore tag dependencies */
 	if (_starpu_not_all_tag_deps_are_fulfilled(j))
 	if (_starpu_not_all_tag_deps_are_fulfilled(j))
 	{
 	{
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
                 _STARPU_LOG_OUT_TAG("not_all_tag_deps_are_fulfilled");
                 _STARPU_LOG_OUT_TAG("not_all_tag_deps_are_fulfilled");
 		return 0;
 		return 0;
         }
         }
 
 
 	/* enfore task dependencies */
 	/* enfore task dependencies */
-	if (_starpu_not_all_task_deps_are_fulfilled(j, job_is_already_locked))
+	if (_starpu_not_all_task_deps_are_fulfilled(j))
 	{
 	{
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
                 _STARPU_LOG_OUT_TAG("not_all_task_deps_are_fulfilled");
                 _STARPU_LOG_OUT_TAG("not_all_task_deps_are_fulfilled");
 		return 0;
 		return 0;
         }
         }
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 
 
 	/* enforce data dependencies */
 	/* enforce data dependencies */
 	if (_starpu_submit_job_enforce_data_deps(j))
 	if (_starpu_submit_job_enforce_data_deps(j))
@@ -345,29 +341,31 @@ unsigned _starpu_enforce_deps_and_schedule(struct _starpu_job *j, unsigned job_i
 		return 0;
 		return 0;
         }
         }
 
 
-	ret = _starpu_push_task(j, job_is_already_locked);
+	ret = _starpu_push_task(j);
 
 
         _STARPU_LOG_OUT();
         _STARPU_LOG_OUT();
 	return ret;
 	return ret;
 }
 }
 
 
 /* Tag deps are already fulfilled */
 /* Tag deps are already fulfilled */
-#ifdef STARPU_DEVEL
-#warning TODO remove the job_is_already_locked parameter
-#endif
-unsigned _starpu_enforce_deps_starting_from_task(struct _starpu_job *j, unsigned job_is_already_locked)
+unsigned _starpu_enforce_deps_starting_from_task(struct _starpu_job *j)
 {
 {
 	unsigned ret;
 	unsigned ret;
 
 
+	_STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 	/* enfore task dependencies */
 	/* enfore task dependencies */
-	if (_starpu_not_all_task_deps_are_fulfilled(j, job_is_already_locked))
+	if (_starpu_not_all_task_deps_are_fulfilled(j))
+	{
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 		return 0;
 		return 0;
+	}
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 
 
 	/* enforce data dependencies */
 	/* enforce data dependencies */
 	if (_starpu_submit_job_enforce_data_deps(j))
 	if (_starpu_submit_job_enforce_data_deps(j))
 		return 0;
 		return 0;
 
 
-	ret = _starpu_push_task(j, job_is_already_locked);
+	ret = _starpu_push_task(j);
 
 
 	return ret;
 	return ret;
 }
 }

+ 9 - 4
src/core/jobs.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
- * Copyright (C) 2009-2011  Université de Bordeaux 1
+ * Copyright (C) 2009-2012  Université de Bordeaux 1
  * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  * Copyright (C) 2011  Télécom-SudParis
  * Copyright (C) 2011  Télécom-SudParis
  *
  *
@@ -80,6 +80,11 @@ LIST_TYPE(_starpu_job,
 	 * */
 	 * */
 	struct _starpu_cg_list job_successors;
 	struct _starpu_cg_list job_successors;
 
 
+	/* For tasks with cl==NULL but submitted with explicit data dependency,
+	 * the handle for this dependency, so as to remove the task from the
+	 * last_writer/readers */
+	starpu_data_handle_t implicit_dep_handle;
+
 	/* The value of the footprint that identifies the job may be stored in
 	/* The value of the footprint that identifies the job may be stored in
 	 * this structure. */
 	 * this structure. */
 	unsigned footprint_is_computed;
 	unsigned footprint_is_computed;
@@ -142,13 +147,13 @@ void _starpu_wait_job(struct _starpu_job *j);
 void _starpu_exclude_task_from_dag(struct starpu_task *task);
 void _starpu_exclude_task_from_dag(struct starpu_task *task);
 
 
 /* try to submit job j, enqueue it if it's not schedulable yet */
 /* try to submit job j, enqueue it if it's not schedulable yet */
-unsigned _starpu_enforce_deps_and_schedule(struct _starpu_job *j, unsigned job_is_already_locked);
-unsigned _starpu_enforce_deps_starting_from_task(struct _starpu_job *j, unsigned job_is_already_locked);
+unsigned _starpu_enforce_deps_and_schedule(struct _starpu_job *j);
+unsigned _starpu_enforce_deps_starting_from_task(struct _starpu_job *j);
 
 
 
 
 /* This function must be called after the execution of a job, this triggers all
 /* This function must be called after the execution of a job, this triggers all
  * job's dependencies and perform the callback function if any. */
  * job's dependencies and perform the callback function if any. */
-void _starpu_handle_job_termination(struct _starpu_job *j, unsigned job_is_already_locked);
+void _starpu_handle_job_termination(struct _starpu_job *j);
 
 
 /* Get the sum of the size of the data accessed by the job. */
 /* Get the sum of the size of the data accessed by the job. */
 size_t _starpu_job_get_data_size(struct starpu_perfmodel *model, enum starpu_perf_archtype arch, unsigned nimpl, struct _starpu_job *j);
 size_t _starpu_job_get_data_size(struct starpu_perfmodel *model, enum starpu_perf_archtype arch, unsigned nimpl, struct _starpu_job *j);

+ 2 - 2
src/core/sched_policy.c

@@ -290,7 +290,7 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 }
 }
 
 
 /* the generic interface that call the proper underlying implementation */
 /* the generic interface that call the proper underlying implementation */
-int _starpu_push_task(struct _starpu_job *j, unsigned job_is_already_locked)
+int _starpu_push_task(struct _starpu_job *j)
 {
 {
 	struct starpu_task *task = j->task;
 	struct starpu_task *task = j->task;
         _STARPU_LOG_IN();
         _STARPU_LOG_IN();
@@ -304,7 +304,7 @@ int _starpu_push_task(struct _starpu_job *j, unsigned job_is_already_locked)
 	 * corresponding dependencies */
 	 * corresponding dependencies */
 	if (task->cl == NULL)
 	if (task->cl == NULL)
 	{
 	{
-		_starpu_handle_job_termination(j, job_is_already_locked);
+		_starpu_handle_job_termination(j);
                 _STARPU_LOG_OUT_TAG("handle_job_termination");
                 _STARPU_LOG_OUT_TAG("handle_job_termination");
 		return 0;
 		return 0;
 	}
 	}

+ 2 - 2
src/core/sched_policy.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
- * Copyright (C) 2010  Université de Bordeaux 1
+ * Copyright (C) 2010, 2012  Université de Bordeaux 1
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
  * it under the terms of the GNU Lesser General Public License as published by
@@ -28,7 +28,7 @@ struct starpu_sched_policy *_starpu_get_sched_policy(void);
 void _starpu_init_sched_policy(struct _starpu_machine_config *config);
 void _starpu_init_sched_policy(struct _starpu_machine_config *config);
 void _starpu_deinit_sched_policy(struct _starpu_machine_config *config);
 void _starpu_deinit_sched_policy(struct _starpu_machine_config *config);
 
 
-int _starpu_push_task(struct _starpu_job *task, unsigned job_is_already_locked);
+int _starpu_push_task(struct _starpu_job *task);
 /* pop a task that can be executed on the worker */
 /* pop a task that can be executed on the worker */
 struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker);
 struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker);
 /* pop every task that can be executed on the worker */
 /* pop every task that can be executed on the worker */

+ 4 - 21
src/core/task.c

@@ -212,18 +212,12 @@ int _starpu_submit_job(struct _starpu_job *j)
 
 
 	_STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 	_STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 
 
+	/* Need to atomically set submitted to 1 and check dependencies, since
+	 * this is concucrent with _starpu_notify_cg */
 	j->terminated = 0;
 	j->terminated = 0;
 	j->submitted = 1;
 	j->submitted = 1;
 
 
-	int ret = _starpu_enforce_deps_and_schedule(j, 1);
-	int must_destroy = j->terminated > 0 && j->task->destroy && j->task->detach;
-
-	_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
-
-	/* If the task terminated immediately (cl == NULL), we have to destroy it ourself */
-
-	if (must_destroy)
-		_starpu_task_destroy(j->task);
+	int ret = _starpu_enforce_deps_and_schedule(j);
 
 
         _STARPU_LOG_OUT();
         _STARPU_LOG_OUT();
         return ret;
         return ret;
@@ -435,8 +429,6 @@ int starpu_task_submit(struct starpu_task *task)
  * skipping dependencies completely (when it knows what it is doing).  */
  * skipping dependencies completely (when it knows what it is doing).  */
 int _starpu_task_submit_nodeps(struct starpu_task *task)
 int _starpu_task_submit_nodeps(struct starpu_task *task)
 {
 {
-	int ret;
-
 	_starpu_task_check_deprecated_fields(task);
 	_starpu_task_check_deprecated_fields(task);
 	_starpu_codelet_check_deprecated_fields(task->cl);
 	_starpu_codelet_check_deprecated_fields(task->cl);
 
 
@@ -467,18 +459,9 @@ int _starpu_task_submit_nodeps(struct starpu_task *task)
 		}
 		}
 	}
 	}
 
 
-	ret = _starpu_push_task(j, 1);
-
-	int must_destroy = j->terminated > 0 && j->task->destroy && j->task->detach;
-
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 
 
-	/* If the task terminated immediately (cl == NULL), we have to destroy it ourself */
-
-	if (must_destroy)
-		_starpu_task_destroy(j->task);
-
-	return ret;
+	return _starpu_push_task(j);
 }
 }
 
 
 /*
 /*

+ 9 - 2
src/datawizard/user_interactions.c

@@ -140,6 +140,7 @@ int starpu_data_acquire_cb(starpu_data_handle_t handle,
 	int sequential_consistency = handle->sequential_consistency;
 	int sequential_consistency = handle->sequential_consistency;
 	if (sequential_consistency)
 	if (sequential_consistency)
 	{
 	{
+		struct starpu_task *new_task;
 		wrapper->pre_sync_task = starpu_task_create();
 		wrapper->pre_sync_task = starpu_task_create();
 		wrapper->pre_sync_task->detach = 1;
 		wrapper->pre_sync_task->detach = 1;
 		wrapper->pre_sync_task->callback_func = starpu_data_acquire_cb_pre_sync_callback;
 		wrapper->pre_sync_task->callback_func = starpu_data_acquire_cb_pre_sync_callback;
@@ -155,9 +156,12 @@ int starpu_data_acquire_cb(starpu_data_handle_t handle,
                 job->model_name = "acquire_cb_post";
                 job->model_name = "acquire_cb_post";
 #endif
 #endif
 
 
-		_starpu_detect_implicit_data_deps_with_handle(wrapper->pre_sync_task, wrapper->post_sync_task, handle, mode);
+		new_task = _starpu_detect_implicit_data_deps_with_handle(wrapper->pre_sync_task, wrapper->post_sync_task, handle, mode);
 		_STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 		_STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 
 
+		if (new_task)
+			starpu_task_submit(new_task);
+
 		/* TODO detect if this is superflous */
 		/* TODO detect if this is superflous */
 		int ret = starpu_task_submit(wrapper->pre_sync_task);
 		int ret = starpu_task_submit(wrapper->pre_sync_task);
 		STARPU_ASSERT(!ret);
 		STARPU_ASSERT(!ret);
@@ -235,6 +239,7 @@ int starpu_data_acquire(starpu_data_handle_t handle, enum starpu_access_mode mod
 	int sequential_consistency = handle->sequential_consistency;
 	int sequential_consistency = handle->sequential_consistency;
 	if (sequential_consistency)
 	if (sequential_consistency)
 	{
 	{
+		struct starpu_task *new_task;
 		wrapper.pre_sync_task = starpu_task_create();
 		wrapper.pre_sync_task = starpu_task_create();
 		wrapper.pre_sync_task->detach = 0;
 		wrapper.pre_sync_task->detach = 0;
 
 
@@ -248,8 +253,10 @@ int starpu_data_acquire(starpu_data_handle_t handle, enum starpu_access_mode mod
                 job->model_name = "acquire_post";
                 job->model_name = "acquire_post";
 #endif
 #endif
 
 
-		_starpu_detect_implicit_data_deps_with_handle(wrapper.pre_sync_task, wrapper.post_sync_task, handle, mode);
+		new_task = _starpu_detect_implicit_data_deps_with_handle(wrapper.pre_sync_task, wrapper.post_sync_task, handle, mode);
 		_STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 		_STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
+		if (new_task)
+			starpu_task_submit(new_task);
 
 
 		/* TODO detect if this is superflous */
 		/* TODO detect if this is superflous */
 		wrapper.pre_sync_task->synchronous = 1;
 		wrapper.pre_sync_task->synchronous = 1;

+ 3 - 3
src/drivers/cpu/driver_cpu.c

@@ -148,7 +148,7 @@ void *_starpu_cpu_worker(void *arg)
 		if (!_STARPU_CPU_MAY_PERFORM(j))
 		if (!_STARPU_CPU_MAY_PERFORM(j))
 		{
 		{
 			/* put it and the end of the queue ... XXX */
 			/* put it and the end of the queue ... XXX */
-			_starpu_push_task(j, 0);
+			_starpu_push_task(j);
 			continue;
 			continue;
 		}
 		}
 
 
@@ -197,7 +197,7 @@ void *_starpu_cpu_worker(void *arg)
 			switch (res)
 			switch (res)
 			{
 			{
 				case -EAGAIN:
 				case -EAGAIN:
-					_starpu_push_task(j, 0);
+					_starpu_push_task(j);
 					continue;
 					continue;
 				default:
 				default:
 					STARPU_ASSERT(0);
 					STARPU_ASSERT(0);
@@ -205,7 +205,7 @@ void *_starpu_cpu_worker(void *arg)
 		}
 		}
 
 
 		if (rank == 0)
 		if (rank == 0)
-			_starpu_handle_job_termination(j, 0);
+			_starpu_handle_job_termination(j);
         }
         }
 
 
 	_STARPU_TRACE_WORKER_DEINIT_START
 	_STARPU_TRACE_WORKER_DEINIT_START

+ 3 - 3
src/drivers/cuda/driver_cuda.c

@@ -323,7 +323,7 @@ void *_starpu_cuda_worker(void *arg)
 		if (!_STARPU_CUDA_MAY_PERFORM(j))
 		if (!_STARPU_CUDA_MAY_PERFORM(j))
 		{
 		{
 			/* this is neither a cuda or a cublas task */
 			/* this is neither a cuda or a cublas task */
-			_starpu_push_task(j, 0);
+			_starpu_push_task(j);
 			continue;
 			continue;
 		}
 		}
 
 
@@ -341,7 +341,7 @@ void *_starpu_cuda_worker(void *arg)
 			{
 			{
 				case -EAGAIN:
 				case -EAGAIN:
 					_STARPU_DISP("ouch, put the codelet %p back ... \n", j);
 					_STARPU_DISP("ouch, put the codelet %p back ... \n", j);
-					_starpu_push_task(j, 0);
+					_starpu_push_task(j);
 					STARPU_ABORT();
 					STARPU_ABORT();
 					continue;
 					continue;
 				default:
 				default:
@@ -349,7 +349,7 @@ void *_starpu_cuda_worker(void *arg)
 			}
 			}
 		}
 		}
 
 
-		_starpu_handle_job_termination(j, 0);
+		_starpu_handle_job_termination(j);
 	}
 	}
 
 
 	_STARPU_TRACE_WORKER_DEINIT_START
 	_STARPU_TRACE_WORKER_DEINIT_START

+ 4 - 4
src/drivers/gordon/driver_gordon.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
- * Copyright (C) 2009, 2010, 2011  Université de Bordeaux 1
+ * Copyright (C) 2009, 2010, 2011-2012  Université de Bordeaux 1
  * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  * Copyright (C) 2011  Télécom-SudParis
  * Copyright (C) 2011  Télécom-SudParis
  *
  *
@@ -177,7 +177,7 @@ static struct gordon_task_wrapper_s *starpu_to_gordon_job(struct _starpu_job *j)
 static void handle_terminated_job(struct _starpu_job *j)
 static void handle_terminated_job(struct _starpu_job *j)
 {
 {
 	_starpu_push_task_output(j, 0);
 	_starpu_push_task_output(j, 0);
-	_starpu_handle_job_termination(j, 0);
+	_starpu_handle_job_termination(j);
 	starpu_wake_all_blocked_workers();
 	starpu_wake_all_blocked_workers();
 }
 }
 
 
@@ -215,7 +215,7 @@ static void gordon_callback_list_func(void *arg)
 		}
 		}
 
 
 		_starpu_push_task_output(j, 0);
 		_starpu_push_task_output(j, 0);
-		_starpu_handle_job_termination(j, 0);
+		_starpu_handle_job_termination(j);
 		//starpu_wake_all_blocked_workers();
 		//starpu_wake_all_blocked_workers();
 
 
 		task_cnt++;
 		task_cnt++;
@@ -420,7 +420,7 @@ void *gordon_worker_inject(struct _starpu_worker_set *arg)
 				}
 				}
 				else
 				else
 				{
 				{
-					_starpu_push_task(j, 0);
+					_starpu_push_task(j);
 				}
 				}
 			}
 			}
 #endif
 #endif

+ 3 - 3
src/drivers/opencl/driver_opencl.c

@@ -504,7 +504,7 @@ void *_starpu_opencl_worker(void *arg)
 		if (!_STARPU_OPENCL_MAY_PERFORM(j))
 		if (!_STARPU_OPENCL_MAY_PERFORM(j))
 		{
 		{
 			/* this is not a OpenCL task */
 			/* this is not a OpenCL task */
-			_starpu_push_task(j, 0);
+			_starpu_push_task(j);
 			continue;
 			continue;
 		}
 		}
 
 
@@ -522,7 +522,7 @@ void *_starpu_opencl_worker(void *arg)
 			{
 			{
 				case -EAGAIN:
 				case -EAGAIN:
 					_STARPU_DISP("ouch, put the codelet %p back ... \n", j);
 					_STARPU_DISP("ouch, put the codelet %p back ... \n", j);
-					_starpu_push_task(j, 0);
+					_starpu_push_task(j);
 					STARPU_ABORT();
 					STARPU_ABORT();
 					continue;
 					continue;
 				default:
 				default:
@@ -530,7 +530,7 @@ void *_starpu_opencl_worker(void *arg)
 			}
 			}
 		}
 		}
 
 
-		_starpu_handle_job_termination(j, 0);
+		_starpu_handle_job_termination(j);
 	}
 	}
 
 
 	_STARPU_TRACE_WORKER_DEINIT_START
 	_STARPU_TRACE_WORKER_DEINIT_START