Browse Source

improve data fetch efficiency, by not releasing data after the asynchronous transfers have completed

Samuel Thibault 8 years ago
parent
commit
d1bbfcfdc8

+ 35 - 68
src/datawizard/coherency.c

@@ -949,14 +949,14 @@ static void _starpu_fetch_task_input_cb(void *arg)
 /* Synchronously or asynchronously fetch data for a given task (if it's not there already) 
  * Returns the number of data acquired here.  */
 
-/* The synchronous version of _starpu_fetch_task_input must be called before
+/* _starpu_fetch_task_input must be called before
  * executing the task. __starpu_push_task_output but be called after the
  * execution of the task. */
-/* To improve overlapping, the driver can, before calling the synchronous
- * version of _starpu_fetch_task_input, call _starpu_fetch_task_input with
+
+/* The driver can either just call _starpu_fetch_task_input with async==0,
+ * or to improve overlapping, it can call _starpu_fetch_task_input with
  * async==1, then wait for transfers to complete, then call
- * _starpu_release_fetch_task_input_async to release them before calling the
- * synchronous version of _starpu_fetch_task_input. */
+ * _starpu_fetch_task_input_tail to complete the fetch.  */
 int _starpu_fetch_task_input(struct starpu_task *task, struct _starpu_job *j, int async)
 {
 	struct _starpu_worker *worker = _starpu_get_local_worker_key();
@@ -979,9 +979,8 @@ int _starpu_fetch_task_input(struct starpu_task *task, struct _starpu_job *j, in
 	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
 	unsigned nacquires;
 
-	unsigned local_memory_node = _starpu_memory_node_get_local_key();
+	unsigned local_memory_node = worker->memory_node;
 
-	unsigned long total_size = 0;
 	unsigned index;
 
 	nacquires = 0;
@@ -1034,9 +1033,6 @@ int _starpu_fetch_task_input(struct starpu_task *task, struct _starpu_job *j, in
 				goto enomem;
 		}
 
-#ifdef STARPU_USE_FXT
-		total_size += _starpu_data_get_size(handle);
-#endif
 		nacquires++;
 	}
 	if (async)
@@ -1045,33 +1041,7 @@ int _starpu_fetch_task_input(struct starpu_task *task, struct _starpu_job *j, in
 		return 0;
 	}
 
-	_STARPU_TRACE_DATA_LOAD(workerid,total_size);
-	/* Now that we have taken the data locks in locking order, fill the codelet interfaces in function order.  */
-	for (index = 0; index < nbuffers; index++)
-	{
-		starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
-		enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
-		int node = descrs[index].node;
-		if (node == -1)
-			node = local_memory_node;
-
-		struct _starpu_data_replicate *local_replicate;
-
-		local_replicate = get_replicate(handle, mode, workerid, node);
-		if (local_replicate->mc)
-			local_replicate->mc->diduse = 1;
-
-		_STARPU_TASK_SET_INTERFACE(task , local_replicate->data_interface, index);
-
-		/* If the replicate was not initialized yet, we have to do it now */
-		if (!(mode & STARPU_SCRATCH) && !local_replicate->initialized)
-			_starpu_redux_init_data_replicate(handle, local_replicate, workerid);
-	}
-
-	if (profiling && task->profiling_info)
-		_starpu_clock_gettime(&task->profiling_info->acquire_data_end_time);
-
-	_STARPU_TRACE_END_FETCH_INPUT(NULL);
+	_starpu_fetch_task_input_tail(task, j, worker);
 
 	return 0;
 
@@ -1105,54 +1075,51 @@ enomem:
 	return -1;
 }
 
-/* This is to be called after having called _starpu_fetch_task_input with async=1 and getting the cb called as many times as there are buffers.  */
-int _starpu_release_fetch_task_input_async(struct _starpu_job *j, struct _starpu_worker *worker)
+/* Now that we have taken the data locks in locking order, fill the codelet interfaces in function order.  */
+void _starpu_fetch_task_input_tail(struct starpu_task *task, struct _starpu_job *j, struct _starpu_worker *worker)
 {
-	unsigned workerid = worker->workerid;
-	unsigned nbtransfers = worker->nb_buffers_totransfer;
-	STARPU_RMB();
-	if (worker->ntasks <= 1)
-		_STARPU_TRACE_WORKER_END_FETCH_INPUT(NULL, workerid);
-	struct starpu_task *task = j->task;
+	int workerid = worker->workerid;
+
+	int profiling = starpu_profiling_status_get();
 
 	struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
 	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
-	unsigned local_memory_node = _starpu_memory_node_get_local_key();
+
+	unsigned local_memory_node = worker->memory_node;
+
 	unsigned index;
-	unsigned nreleases;
+	unsigned long total_size = 0;
 
-	nreleases = 0;
 	for (index = 0; index < nbuffers; index++)
 	{
-		if (nreleases == nbtransfers)
-			/* That was a partial fetch */
-			break;
-		starpu_data_handle_t handle = descrs[index].handle;
-		enum starpu_data_access_mode mode = descrs[index].mode;
+		starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
+		enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
 		int node = descrs[index].node;
 		if (node == -1)
 			node = local_memory_node;
 
 		struct _starpu_data_replicate *local_replicate;
 
-		if (index && descrs[index-1].handle == descrs[index].handle)
-			/* We have already took this data, skip it. This
-			 * depends on ordering putting writes before reads, see
-			 * _starpu_compar_handles */
-			continue;
-
 		local_replicate = get_replicate(handle, mode, workerid, node);
+		if (local_replicate->mc)
+			local_replicate->mc->diduse = 1;
 
-		/* Release our refcnt */
-		_starpu_spin_lock(&handle->header_lock);
-		local_replicate->refcnt--;
-		STARPU_ASSERT(local_replicate->refcnt >= 0);
-		STARPU_ASSERT(handle->busy_count > 0);
-		handle->busy_count--;
-		if (!_starpu_data_check_not_busy(handle))
-			_starpu_spin_unlock(&handle->header_lock);
+		_STARPU_TASK_SET_INTERFACE(task , local_replicate->data_interface, index);
+
+		/* If the replicate was not initialized yet, we have to do it now */
+		if (!(mode & STARPU_SCRATCH) && !local_replicate->initialized)
+			_starpu_redux_init_data_replicate(handle, local_replicate, workerid);
+
+#ifdef STARPU_USE_FXT
+		total_size += _starpu_data_get_size(handle);
+#endif
 	}
-	return 0;
+	_STARPU_TRACE_DATA_LOAD(workerid,total_size);
+
+	if (profiling && task->profiling_info)
+		_starpu_clock_gettime(&task->profiling_info->acquire_data_end_time);
+
+	_STARPU_TRACE_END_FETCH_INPUT(NULL);
 }
 
 /* Release task data dependencies */

+ 1 - 1
src/datawizard/coherency.h

@@ -303,7 +303,7 @@ void _starpu_release_nowhere_task_output(struct _starpu_job *j);
 struct _starpu_worker;
 STARPU_ATTRIBUTE_WARN_UNUSED_RESULT
 int _starpu_fetch_task_input(struct starpu_task *task, struct _starpu_job *j, int async);
-int _starpu_release_fetch_task_input_async(struct _starpu_job *j, struct _starpu_worker *worker);
+void _starpu_fetch_task_input_tail(struct starpu_task *task, struct _starpu_job *j, struct _starpu_worker *worker);
 void _starpu_fetch_nowhere_task_input(struct _starpu_job *j);
 
 unsigned _starpu_is_data_present_or_requested(struct _starpu_data_state *state, unsigned node);

+ 9 - 20
src/drivers/cpu/driver_cpu.c

@@ -59,27 +59,9 @@ static int execute_job_on_cpu(struct _starpu_job *j, struct starpu_task *worker_
 
 	struct starpu_task *task = j->task;
 	struct starpu_codelet *cl = task->cl;
-#ifdef STARPU_OPENMP
-	/* At this point, j->continuation as been cleared as the task is being
-	 * woken up, thus we use j->discontinuous instead for the check */
-	const unsigned continuation_wake_up = j->discontinuous;
-#else
-	const unsigned continuation_wake_up = 0;
-#endif
 
 	STARPU_ASSERT(cl);
 
-	if (rank == 0 && !continuation_wake_up)
-	{
-		int ret = _starpu_fetch_task_input(task, j, 0);
-		if (ret != 0)
-		{
-			/* there was not enough memory so the codelet cannot be executed right now ... */
-			/* push the codelet back and try another one ... */
-			return -EAGAIN;
-		}
-	}
-
 	if (is_parallel_task)
 	{
 		STARPU_PTHREAD_BARRIER_WAIT(&j->before_work_barrier);
@@ -332,7 +314,7 @@ int _starpu_cpu_driver_run_once(struct _starpu_worker *cpu_worker)
 		_STARPU_TRACE_END_PROGRESS(memnode);
 		j = _starpu_get_job_associated_to_task(pending_task);
 
-		_starpu_release_fetch_task_input_async(j, cpu_worker);
+		_starpu_fetch_task_input_tail(pending_task, j, cpu_worker);
 		/* Reset it */
 		cpu_worker->task_transferring = NULL;
 
@@ -382,7 +364,14 @@ int _starpu_cpu_driver_run_once(struct _starpu_worker *cpu_worker)
 	}
 	cpu_worker->current_rank = rank;
 
-	if (rank == 0)
+#ifdef STARPU_OPENMP
+	/* At this point, j->continuation as been cleared as the task is being
+	 * woken up, thus we use j->discontinuous instead for the check */
+	const unsigned continuation_wake_up = j->discontinuous;
+#else
+	const unsigned continuation_wake_up = 0;
+#endif
+	if (rank == 0 && !continuation_wake_up)
 	{
 		res = _starpu_fetch_task_input(task, j, 1);
 		STARPU_ASSERT(res == 0);

+ 1 - 12
src/drivers/cuda/driver_cuda.c

@@ -473,8 +473,6 @@ void _starpu_init_cuda(void)
 
 static int start_job_on_cuda(struct _starpu_job *j, struct _starpu_worker *worker, unsigned char pipeline_idx STARPU_ATTRIBUTE_UNUSED)
 {
-	int ret;
-
 	STARPU_ASSERT(j);
 	struct starpu_task *task = j->task;
 
@@ -487,15 +485,6 @@ static int start_job_on_cuda(struct _starpu_job *j, struct _starpu_worker *worke
 	_starpu_set_local_worker_key(worker);
 	_starpu_set_current_task(task);
 
-	ret = _starpu_fetch_task_input(task, j, 0);
-	if (ret < 0)
-	{
-		/* there was not enough memory, so the input of
-		 * the codelet cannot be fetched ... put the
-		 * codelet back, and try it later */
-		return -EAGAIN;
-	}
-
 	if (worker->ntasks == 1)
 	{
 		/* We are alone in the pipeline, the kernel will start now, record it */
@@ -777,7 +766,7 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 			j = _starpu_get_job_associated_to_task(task);
 
 			_starpu_set_local_worker_key(worker);
-			_starpu_release_fetch_task_input_async(j, worker);
+			_starpu_fetch_task_input_tail(task, j, worker);
 			/* Reset it */
 			worker->task_transferring = NULL;
 

+ 1 - 12
src/drivers/mp_common/source_common.c

@@ -497,17 +497,6 @@ static int _starpu_src_common_execute(struct _starpu_job *j,
 	int profiling = starpu_profiling_status_get();
 
 	STARPU_ASSERT(task);
-	if (worker->current_rank == 0)
-	{
-		int ret = _starpu_fetch_task_input(task, j, 0);
-		if (ret != 0)
-		{
-			/* there was not enough memory, so the input of
-			 * the codelet cannot be fetched ... put the
-			 * codelet back, and try it later */
-			return -EAGAIN;
-		}
-	}
 
 	void (*kernel)(void)  = node->get_kernel_from_job(node,j);
 
@@ -964,7 +953,7 @@ static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set *
 
 			_STARPU_TRACE_END_PROGRESS(memnode);
 			_starpu_set_local_worker_key(&worker_set->workers[i]);
-			_starpu_release_fetch_task_input_async(j, &worker_set->workers[i]);
+			_starpu_fetch_task_input_tail(task, j, &worker_set->workers[i]);
 
 			/* Execute the task */
 			res =  _starpu_src_common_execute(j, &worker_set->workers[i], mp_node);

+ 1 - 12
src/drivers/opencl/driver_opencl.c

@@ -691,7 +691,7 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 		_STARPU_TRACE_END_PROGRESS(memnode);
 		j = _starpu_get_job_associated_to_task(task);
 
-		_starpu_release_fetch_task_input_async(j, worker);
+		_starpu_fetch_task_input_tail(task, j, worker);
 		/* Reset it */
 		worker->task_transferring = NULL;
 
@@ -903,8 +903,6 @@ cl_device_type _starpu_opencl_get_device_type(int devid)
 
 static int _starpu_opencl_start_job(struct _starpu_job *j, struct _starpu_worker *worker, unsigned char pipeline_idx STARPU_ATTRIBUTE_UNUSED)
 {
-	int ret;
-
 	STARPU_ASSERT(j);
 	struct starpu_task *task = j->task;
 
@@ -916,15 +914,6 @@ static int _starpu_opencl_start_job(struct _starpu_job *j, struct _starpu_worker
 
 	_starpu_set_current_task(task);
 
-	ret = _starpu_fetch_task_input(task, j, 0);
-	if (ret < 0)
-	{
-		/* there was not enough memory, so the input of
-		 * the codelet cannot be fetched ... put the
-		 * codelet back, and try it later */
-		return -EAGAIN;
-	}
-
 	if (worker->ntasks == 1)
 	{
 		/* We are alone in the pipeline, the kernel will start now, record it */