Explorar o código

factorize duplicate code

Samuel Thibault %!s(int64=4) %!d(string=hai) anos
pai
achega
a405b041b4
Modificáronse 1 ficheiros con 25 adicións e 58 borrados
  1. 25 58
      src/datawizard/coherency.c

+ 25 - 58
src/datawizard/coherency.c

@@ -912,7 +912,7 @@ static void _starpu_set_data_requested_flag_if_needed(starpu_data_handle_t handl
 	_starpu_spin_unlock(&handle->header_lock);
 }
 
-int starpu_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned target_node, int prio)
+int starpu_prefetch_task_input_prio(struct starpu_task *task, int target_node, int worker, int prio)
 {
 #ifdef STARPU_OPENMP
 	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
@@ -936,7 +936,11 @@ int starpu_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned t
 			/* Don't bother prefetching some data which will be overwritten */
 			continue;
 
-		int node = _starpu_task_data_get_node_on_node(task, index, target_node);
+		int node;
+		if (target_node >= 0)
+			node = _starpu_task_data_get_node_on_node(task, index, target_node);
+		else
+			node = _starpu_task_data_get_node_on_worker(task, index, worker);
 
 		struct _starpu_data_replicate *replicate = &handle->per_node[node];
 		task_prefetch_data_on_node(handle, node, replicate, mode, prio);
@@ -948,6 +952,12 @@ int starpu_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned t
 	return 0;
 }
 
+int starpu_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned target_node, int prio)
+{
+	return starpu_prefetch_task_input_prio(task, target_node, -1, prio);
+}
+
+
 int starpu_prefetch_task_input_on_node(struct starpu_task *task, unsigned node)
 {
 	int prio = task->priority;
@@ -956,7 +966,7 @@ int starpu_prefetch_task_input_on_node(struct starpu_task *task, unsigned node)
 	return starpu_prefetch_task_input_on_node_prio(task, node, prio);
 }
 
-int starpu_idle_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned target_node, int prio)
+int starpu_idle_prefetch_task_input_prio(struct starpu_task *task, int target_node, int worker, int prio)
 {
 	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
 	unsigned index;
@@ -973,7 +983,11 @@ int starpu_idle_prefetch_task_input_on_node_prio(struct starpu_task *task, unsig
 			/* Don't bother prefetching some data which will be overwritten */
 			continue;
 
-		int node = _starpu_task_data_get_node_on_node(task, index, target_node);
+		int node;
+		if (target_node >= 0)
+			node = _starpu_task_data_get_node_on_node(task, index, target_node);
+		else
+			node = _starpu_task_data_get_node_on_worker(task, index, worker);
 
 		struct _starpu_data_replicate *replicate = &handle->per_node[node];
 		idle_prefetch_data_on_node(handle, node, replicate, mode, prio);
@@ -982,6 +996,11 @@ int starpu_idle_prefetch_task_input_on_node_prio(struct starpu_task *task, unsig
 	return 0;
 }
 
+int starpu_idle_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned target_node, int prio)
+{
+	return starpu_idle_prefetch_task_input_prio(task, target_node, -1, prio);
+}
+
 int starpu_idle_prefetch_task_input_on_node(struct starpu_task *task, unsigned node)
 {
 	int prio = task->priority;
@@ -992,38 +1011,7 @@ int starpu_idle_prefetch_task_input_on_node(struct starpu_task *task, unsigned n
 
 int starpu_prefetch_task_input_for_prio(struct starpu_task *task, unsigned worker, int prio)
 {
-#ifdef STARPU_OPENMP
-	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
-	/* do not attempt to prefetch task input if this is an OpenMP task resuming after blocking */
-	if (j->discontinuous != 0)
-		return 0;
-#endif
-	STARPU_ASSERT_MSG(!task->prefetched, "Prefetching was already requested for this task! Did you set 'prefetches' to 1 in the starpu_sched_policy structure?");
-	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
-	unsigned index;
-
-	for (index = 0; index < nbuffers; index++)
-	{
-		starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
-		enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
-
-		if (mode & (STARPU_SCRATCH|STARPU_REDUX))
-			continue;
-
-		if (!(mode & STARPU_R))
-			/* Don't bother prefetching some data which will be overwritten */
-			continue;
-
-		int node = _starpu_task_data_get_node_on_worker(task, index, worker);
-
-		struct _starpu_data_replicate *replicate = &handle->per_node[node];
-		task_prefetch_data_on_node(handle, node, replicate, mode, prio);
-
-		_starpu_set_data_requested_flag_if_needed(handle, replicate);
-	}
-	task->prefetched = 1;
-
-	return 0;
+	return starpu_prefetch_task_input_prio(task, -1, worker, prio);
 }
 
 int starpu_prefetch_task_input_for(struct starpu_task *task, unsigned worker)
@@ -1036,28 +1024,7 @@ int starpu_prefetch_task_input_for(struct starpu_task *task, unsigned worker)
 
 int starpu_idle_prefetch_task_input_for_prio(struct starpu_task *task, unsigned worker, int prio)
 {
-	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
-	unsigned index;
-
-	for (index = 0; index < nbuffers; index++)
-	{
-		starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
-		enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
-
-		if (mode & (STARPU_SCRATCH|STARPU_REDUX))
-			continue;
-
-		if (!(mode & STARPU_R))
-			/* Don't bother prefetching some data which will be overwritten */
-			continue;
-
-		int node = _starpu_task_data_get_node_on_worker(task, index, worker);
-
-		struct _starpu_data_replicate *replicate = &handle->per_node[node];
-		idle_prefetch_data_on_node(handle, node, replicate, mode, prio);
-	}
-
-	return 0;
+	return starpu_idle_prefetch_task_input_prio(task, -1, worker, prio);
 }
 
 int starpu_idle_prefetch_task_input_for(struct starpu_task *task, unsigned worker)