Browse Source

Bug fix. When an "empty task" (cl = NULL) is executable, it is directly
executed, so that we need to make sure that the sync_mutex lock is not already
taken before taking it again. In case the empty task is executed during the
termination of another task, we need to defer the destruction of the task
structure until it is not needed anymore.

Cédric Augonnet 15 years ago
parent
commit
47af90d6fe

+ 14 - 0
src/core/dependencies/cg.c

@@ -175,7 +175,21 @@ void _starpu_notify_cg_list(struct starpu_cg_list_s *successors)
 		if (cg_type == STARPU_CG_TASK)
 		{
 			starpu_job_t j = cg->succ.job;
+			
+			/* In case this task was immediately terminated, since
+			 * _starpu_notify_cg_list already hold the sync_mutex
+			 * lock, it is its reponsability to destroy the task if
+			 * needed. */
+			unsigned must_destroy_task = 0;
+			struct starpu_task *task = j->task;
+
+			if ((j->terminated > 0) && task->destroy && task->detach)
+				must_destroy_task = 1;
+
 			PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
+
+			if (must_destroy_task)
+				starpu_task_destroy(task);
 		}			
 
 		if (cg_type == STARPU_CG_APPS) {

+ 1 - 1
src/core/dependencies/data_concurrency.c

@@ -223,7 +223,7 @@ void _starpu_notify_data_dependencies(starpu_data_handle handle)
 		if (r->is_requested_by_codelet)
 		{
 			if (!unlock_one_requester(r))
-				_starpu_push_task(r->j);
+				_starpu_push_task(r->j, 0);
 		}
 		else
 		{

+ 1 - 1
src/core/dependencies/tags.c

@@ -154,7 +154,7 @@ void _starpu_tag_set_ready(struct starpu_tag_s *tag)
 	_starpu_spin_unlock(&tag->lock);
 
 	/* enforce data dependencies */
-	_starpu_enforce_deps_starting_from_task(j);
+	_starpu_enforce_deps_starting_from_task(j, 0);
 
 	_starpu_spin_lock(&tag->lock);
 }

+ 19 - 27
src/core/jobs.c

@@ -92,12 +92,14 @@ void _starpu_wait_job(starpu_job_t j)
 	PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 }
 
-void _starpu_handle_job_termination(starpu_job_t j)
+void _starpu_handle_job_termination(starpu_job_t j, unsigned job_is_already_locked)
 {
 	struct starpu_task *task = j->task;
 
+	if (!job_is_already_locked)
+		PTHREAD_MUTEX_LOCK(&j->sync_mutex);
+
 	/* in case there are dependencies, wake up the proper tasks */
-	PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 	j->submitted = 0;
 	_starpu_notify_dependencies(j);
 
@@ -106,7 +108,9 @@ void _starpu_handle_job_termination(starpu_job_t j)
 	 * function. A value of 1 means that the codelet was executed but that
 	 * the callback is not done yet. */
 	j->terminated = 1;
-	PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
+
+	if (!job_is_already_locked)
+		PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 
 	/* the callback is executed after the dependencies so that we may remove the tag 
  	 * of the task itself */
@@ -138,17 +142,22 @@ void _starpu_handle_job_termination(starpu_job_t j)
 	{
 		/* we do not desallocate the job structure if some is going to
 		 * wait after the task */
-		PTHREAD_MUTEX_LOCK(&j->sync_mutex);
+		if (!job_is_already_locked)
+			PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 		/* A value of 2 is put to specify that not only the codelet but
 		 * also the callback were executed. */
 		j->terminated = 2;
 		PTHREAD_COND_BROADCAST(&j->sync_cond);
-		PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
+
+		if (!job_is_already_locked)
+			PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 	}
 	else {
 		/* no one is going to synchronize with that task so we release
- 		 * the data structures now */
-		if (destroy)
+		 * the data structures now. In case the job was already locked
+		 * by the caller, it is its responsability to destroy the task.
+		 * */
+		if (!job_is_already_locked && destroy)
 			starpu_task_destroy(task);
 	}
 
@@ -248,13 +257,13 @@ unsigned _starpu_enforce_deps_and_schedule(starpu_job_t j, unsigned job_is_alrea
 	if (_starpu_submit_job_enforce_data_deps(j))
 		return 0;
 
-	ret = _starpu_push_task(j);
+	ret = _starpu_push_task(j, job_is_already_locked);
 
 	return ret;
 }
 
 /* Tag deps are already fulfilled */
-unsigned _starpu_enforce_deps_starting_from_task(starpu_job_t j)
+unsigned _starpu_enforce_deps_starting_from_task(starpu_job_t j, unsigned job_is_already_locked)
 {
 	unsigned ret;
 
@@ -266,28 +275,11 @@ unsigned _starpu_enforce_deps_starting_from_task(starpu_job_t j)
 	if (_starpu_submit_job_enforce_data_deps(j))
 		return 0;
 
-	ret = _starpu_push_task(j);
+	ret = _starpu_push_task(j, job_is_already_locked);
 
 	return ret;
 }
 
-/* Tag and task deps are already fulfilled */
-unsigned _starpu_enforce_deps_starting_from_data(starpu_job_t j)
-{
-	unsigned ret;
-
-	/* enforce data dependencies */
-	if (_starpu_submit_job_enforce_data_deps(j))
-		return 0;
-
-	ret = _starpu_push_task(j);
-
-	return ret;
-}
-
-
-
-
 struct starpu_job_s *_starpu_pop_local_task(struct starpu_worker_s *worker)
 {
 	struct starpu_job_s *j = NULL;

+ 2 - 3
src/core/jobs.h

@@ -79,13 +79,12 @@ void _starpu_wait_job(starpu_job_t j);
 
 /* try to submit job j, enqueue it if it's not schedulable yet */
 unsigned _starpu_enforce_deps_and_schedule(starpu_job_t j, unsigned job_is_already_locked);
-unsigned _starpu_enforce_deps_starting_from_task(starpu_job_t j);
-unsigned _starpu_enforce_deps_starting_from_data(starpu_job_t j);
+unsigned _starpu_enforce_deps_starting_from_task(starpu_job_t j, unsigned job_is_already_locked);
 
 
 //#warning this must not be exported anymore ... 
 //starpu_job_t _starpu_job_create(struct starpu_task *task);
-void _starpu_handle_job_termination(starpu_job_t j);
+void _starpu_handle_job_termination(starpu_job_t j, unsigned job_is_already_locked);
 size_t _starpu_job_get_data_size(starpu_job_t j);
 
 starpu_job_t _starpu_pop_local_task(struct starpu_worker_s *worker);

+ 2 - 2
src/core/policies/sched_policy.c

@@ -202,7 +202,7 @@ void _starpu_deinit_sched_policy(struct starpu_machine_config_s *config)
 }
 
 /* the generic interface that call the proper underlying implementation */
-int _starpu_push_task(starpu_job_t j)
+int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
 {
 	struct starpu_jobq_s *queue = policy.starpu_get_local_queue(&policy);
 
@@ -211,7 +211,7 @@ int _starpu_push_task(starpu_job_t j)
 	 * corresponding dependencies */
 	if (j->task->cl == NULL)
 	{
-		_starpu_handle_job_termination(j);
+		_starpu_handle_job_termination(j, job_is_already_locked);
 		return 0;
 	}
 

+ 1 - 1
src/core/policies/sched_policy.h

@@ -58,7 +58,7 @@ void _starpu_deinit_sched_policy(struct starpu_machine_config_s *config);
 
 int _starpu_get_prefetch_flag(void);
 
-int _starpu_push_task(starpu_job_t task);
+int _starpu_push_task(starpu_job_t task, unsigned job_is_already_locked);
 struct starpu_job_s *_starpu_pop_task(void);
 struct starpu_job_s *_starpu_pop_task_from_queue(struct starpu_jobq_s *queue);
 struct starpu_job_list_s *_starpu_pop_every_task(uint32_t where);

+ 3 - 3
src/drivers/cpu/driver_cpu.c

@@ -174,7 +174,7 @@ void *_starpu_cpu_worker(void *arg)
 		if (!STARPU_CPU_MAY_PERFORM(j)) 
 		{
 			/* put it and the end of the queue ... XXX */
-			_starpu_push_task(j);
+			_starpu_push_task(j, 0);
 			continue;
 		}
 
@@ -187,14 +187,14 @@ void *_starpu_cpu_worker(void *arg)
 		if (res) {
 			switch (res) {
 				case -EAGAIN:
-					_starpu_push_task(j);
+					_starpu_push_task(j, 0);
 					continue;
 				default: 
 					assert(0);
 			}
 		}
 
-		_starpu_handle_job_termination(j);
+		_starpu_handle_job_termination(j, 0);
         }
 
 	STARPU_TRACE_WORKER_DEINIT_START

+ 3 - 3
src/drivers/cuda/driver_cuda.c

@@ -256,7 +256,7 @@ void *_starpu_cuda_worker(void *arg)
 		if (!STARPU_CUDA_MAY_PERFORM(j))
 		{
 			/* this is neither a cuda or a cublas task */
-			_starpu_push_task(j);
+			_starpu_push_task(j, 0);
 			continue;
 		}
 
@@ -270,7 +270,7 @@ void *_starpu_cuda_worker(void *arg)
 			switch (res) {
 				case -EAGAIN:
 					fprintf(stderr, "ouch, put the codelet %p back ... \n", j);
-					_starpu_push_task(j);
+					_starpu_push_task(j, 0);
 					STARPU_ABORT();
 					continue;
 				default:
@@ -278,7 +278,7 @@ void *_starpu_cuda_worker(void *arg)
 			}
 		}
 
-		_starpu_handle_job_termination(j);
+		_starpu_handle_job_termination(j, 0);
 	}
 
 	STARPU_TRACE_WORKER_DEINIT_START

+ 3 - 3
src/drivers/gordon/driver_gordon.c

@@ -172,7 +172,7 @@ static struct gordon_task_wrapper_s *starpu_to_gordon_job(starpu_job_t j)
 static void handle_terminated_job(starpu_job_t j)
 {
 	_starpu_push_task_output(j->task, 0);
-	_starpu_handle_job_termination(j);
+	_starpu_handle_job_termination(j, 0);
 	starpu_wake_all_blocked_workers();
 }
 
@@ -210,7 +210,7 @@ static void gordon_callback_list_func(void *arg)
 		}
 
 		_starpu_push_task_output(j->task, 0);
-		_starpu_handle_job_termination(j);
+		_starpu_handle_job_termination(j, 0);
 		//starpu_wake_all_blocked_workers();
 
 		task_cnt++;
@@ -398,7 +398,7 @@ void *gordon_worker_inject(struct starpu_worker_set_s *arg)
 					inject_task(j, &arg->workers[0]);
 				}
 				else {
-					_starpu_push_task(j);
+					_starpu_push_task(j, 0);
 				}
 			}
 #endif

+ 3 - 3
src/drivers/opencl/driver_opencl.c

@@ -267,7 +267,7 @@ void *_starpu_opencl_worker(void *arg)
 		if (!STARPU_OPENCL_MAY_PERFORM(j))
 		{
 			/* this is not a OpenCL task */
-			_starpu_push_task(j);
+			_starpu_push_task(j, 0);
 			continue;
 		}
 
@@ -281,7 +281,7 @@ void *_starpu_opencl_worker(void *arg)
 			switch (res) {
 				case -EAGAIN:
 					fprintf(stderr, "ouch, put the codelet %p back ... \n", j);
-					_starpu_push_task(j);
+					_starpu_push_task(j, 0);
 					STARPU_ABORT();
 					continue;
 				default:
@@ -289,7 +289,7 @@ void *_starpu_opencl_worker(void *arg)
 			}
 		}
 
-		_starpu_handle_job_termination(j);
+		_starpu_handle_job_termination(j, 0);
 	}
 
 	STARPU_TRACE_WORKER_DEINIT_START