Browse Source

Use _starpu_task_data_get_node_on_worker whenever possible

Samuel Thibault 7 years ago
parent
commit
c3b7311830

+ 1 - 1
doc/doxygen/chapters/320_scheduling.doxy

@@ -250,7 +250,7 @@ methods of the policy.
 
 Make sure to have a look at the \ref API_Scheduling_Policy section, which
 provides a list of the available functions for writing advanced schedulers, such
-as starpu_task_expected_length(), starpu_task_expected_data_transfer_time(),
+as starpu_task_expected_length(), starpu_task_expected_data_transfer_time_for(),
 starpu_task_expected_energy(), etc. Other
 useful functions include starpu_transfer_bandwidth(), starpu_transfer_latency(),
 starpu_transfer_predict(), ...

+ 33 - 3
doc/doxygen/chapters/api/scheduling_policy.doxy

@@ -229,7 +229,13 @@ Return an estimated speedup factor relative to CPU speed
 
 \fn double starpu_task_expected_data_transfer_time(unsigned memory_node, struct starpu_task *task)
 \ingroup API_Scheduling_Policy
-Return expected data transfer time in micro-seconds.
+Return expected data transfer time in micro-seconds for the given \p
+memory_node. Prefer using starpu_task_expected_data_transfer_time_for() which is
+more precise.
+
+\fn double starpu_task_expected_data_transfer_time_for(struct starpu_task *task, unsigned worker)
+\ingroup API_Scheduling_Policy
+Return expected data transfer time in micro-seconds for the given \p worker.
 
 \fn double starpu_data_expected_transfer_time(starpu_data_handle_t handle, unsigned memory_node, enum starpu_data_access_mode mode)
 \ingroup API_Scheduling_Policy
@@ -249,11 +255,35 @@ Whether \ref STARPU_PREFETCH was set
 
 \fn int starpu_prefetch_task_input_on_node(struct starpu_task *task, unsigned node)
 \ingroup API_Scheduling_Policy
-Prefetch data for a given task on a given node
+Prefetch data for a given p task on a given p node
+
+\fn int starpu_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned node, int prio)
+\ingroup API_Scheduling_Policy
+Prefetch data for a given p task on a given p node with a given priority
 
 \fn int starpu_idle_prefetch_task_input_on_node(struct starpu_task *task, unsigned node)
 \ingroup API_Scheduling_Policy
-Prefetch data for a given task on a given node when the bus is idle
+Prefetch data for a given p task on a given p node when the bus is idle
+
+\fn int starpu_idle_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned node, int prio)
+\ingroup API_Scheduling_Policy
+Prefetch data for a given p task on a given p node when the bus is idle with a given priority
+
+\fn int starpu_prefetch_task_input_for(struct starpu_task *task, unsigned worker)
+\ingroup API_Scheduling_Policy
+Prefetch data for a given p task on a given p worker
+
+\fn int starpu_prefetch_task_input_for_prio(struct starpu_task *task, unsigned worker, int prio)
+\ingroup API_Scheduling_Policy
+Prefetch data for a given p task on a given p worker with a given priority
+
+\fn int starpu_idle_prefetch_task_input_for(struct starpu_task *task, unsigned worker)
+\ingroup API_Scheduling_Policy
+Prefetch data for a given p task on a given p worker when the bus is idle
+
+\fn int starpu_idle_prefetch_task_input_for_prio(struct starpu_task *task, unsigned worker, int prio)
+\ingroup API_Scheduling_Policy
+Prefetch data for a given p task on a given p worker when the bus is idle with a given priority
 
 \fn void starpu_task_notify_ready_soon_register(starpu_notify_ready_soon_func f, void *data);
 \ingroup API_Scheduling_Policy

+ 6 - 0
include/starpu_scheduler.h

@@ -83,11 +83,17 @@ int starpu_prefetch_task_input_on_node(struct starpu_task *task, unsigned node);
 int starpu_idle_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned node, int prio);
 int starpu_idle_prefetch_task_input_on_node(struct starpu_task *task, unsigned node);
 
+int starpu_prefetch_task_input_for_prio(struct starpu_task *task, unsigned worker, int prio);
+int starpu_prefetch_task_input_for(struct starpu_task *task, unsigned worker);
+int starpu_idle_prefetch_task_input_for_prio(struct starpu_task *task, unsigned worker, int prio);
+int starpu_idle_prefetch_task_input_for(struct starpu_task *task, unsigned worker);
+
 uint32_t starpu_task_footprint(struct starpu_perfmodel *model, struct starpu_task *task, struct starpu_perfmodel_arch *arch, unsigned nimpl);
 uint32_t starpu_task_data_footprint(struct starpu_task *task);
 double starpu_task_expected_length(struct starpu_task *task, struct starpu_perfmodel_arch *arch, unsigned nimpl);
 double starpu_worker_get_relative_speedup(struct starpu_perfmodel_arch *perf_arch);
 double starpu_task_expected_data_transfer_time(unsigned memory_node, struct starpu_task *task);
+double starpu_task_expected_data_transfer_time_for(struct starpu_task *task, unsigned worker);
 double starpu_data_expected_transfer_time(starpu_data_handle_t handle, unsigned memory_node, enum starpu_data_access_mode mode);
 double starpu_task_expected_energy(struct starpu_task *task, struct starpu_perfmodel_arch *arch, unsigned nimpl);
 double starpu_task_expected_conversion_time(struct starpu_task *task, struct starpu_perfmodel_arch *arch, unsigned nimpl);

+ 20 - 0
src/core/perfmodel/perfmodel.c

@@ -350,6 +350,26 @@ double starpu_task_expected_data_transfer_time(unsigned memory_node, struct star
 	return penalty;
 }
 
+/* Data transfer performance modeling */
+double starpu_task_expected_data_transfer_time_for(struct starpu_task *task, unsigned worker)
+{
+	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
+	unsigned buffer;
+
+	double penalty = 0.0;
+
+	for (buffer = 0; buffer < nbuffers; buffer++)
+	{
+		starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, buffer);
+		enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, buffer);
+		int node = _starpu_task_data_get_node_on_worker(task, buffer, worker);
+
+		penalty += starpu_data_expected_transfer_time(handle, node, mode);
+	}
+
+	return penalty;
+}
+
 /* Return the expected duration of the entire task bundle in µs */
 double starpu_task_bundle_expected_length(starpu_task_bundle_t bundle, struct starpu_perfmodel_arch* arch, unsigned nimpl)
 {

+ 2 - 6
src/core/sched_policy.c

@@ -296,23 +296,20 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 	/* Is this a basic worker or a combined worker ? */
 	int is_basic_worker = (workerid < nbasic_workers);
 
-	unsigned memory_node;
 	struct _starpu_worker *worker = NULL;
 	struct _starpu_combined_worker *combined_worker = NULL;
 
 	if (is_basic_worker)
 	{
 		worker = _starpu_get_worker_struct(workerid);
-		memory_node = worker->memory_node;
 	}
 	else
 	{
 		combined_worker = _starpu_get_combined_worker_struct(workerid);
-		memory_node = combined_worker->memory_node;
 	}
 
 	if (use_prefetch)
-		starpu_prefetch_task_input_on_node(task, memory_node);
+		starpu_prefetch_task_input_for(task, workerid);
 
 	if (is_basic_worker)
 		_starpu_push_task_on_specific_worker_notify_sched(task, worker, workerid, workerid);
@@ -524,9 +521,8 @@ int _starpu_push_task_to_workers(struct starpu_task *task)
 	int ret = 0;
 	if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
 	{
-		unsigned node = starpu_worker_get_memory_node(task->workerid);
 		if (starpu_get_prefetch_flag())
-			starpu_prefetch_task_input_on_node(task, node);
+			starpu_prefetch_task_input_for(task, task->workerid);
 
 		ret = _starpu_push_task_on_specific_worker(task, task->workerid);
 	}

+ 71 - 0
src/datawizard/coherency.c

@@ -984,6 +984,77 @@ int starpu_idle_prefetch_task_input_on_node(struct starpu_task *task, unsigned n
 	return starpu_idle_prefetch_task_input_on_node_prio(task, node, prio);
 }
 
+int starpu_prefetch_task_input_for_prio(struct starpu_task *task, unsigned worker, int prio)
+{
+	STARPU_ASSERT(!task->prefetched);
+	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
+	unsigned index;
+
+	for (index = 0; index < nbuffers; index++)
+	{
+		starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
+		enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
+
+		if (mode & (STARPU_SCRATCH|STARPU_REDUX))
+			continue;
+
+		if (!(mode & STARPU_R))
+			/* Don't bother prefetching some data which will be overwritten */
+			continue;
+
+		int node = _starpu_task_data_get_node_on_worker(task, index, worker);
+
+		struct _starpu_data_replicate *replicate = &handle->per_node[node];
+		prefetch_data_on_node(handle, node, replicate, mode, prio);
+
+		_starpu_set_data_requested_flag_if_needed(handle, replicate);
+	}
+
+	return 0;
+}
+
+int starpu_prefetch_task_input_for(struct starpu_task *task, unsigned worker)
+{
+	int prio = task->priority;
+	if (task->workerorder)
+		prio = INT_MAX - task->workerorder;
+	return starpu_prefetch_task_input_for_prio(task, worker, prio);
+}
+
+int starpu_idle_prefetch_task_input_for_prio(struct starpu_task *task, unsigned worker, int prio)
+{
+	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
+	unsigned index;
+
+	for (index = 0; index < nbuffers; index++)
+	{
+		starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
+		enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
+
+		if (mode & (STARPU_SCRATCH|STARPU_REDUX))
+			continue;
+
+		if (!(mode & STARPU_R))
+			/* Don't bother prefetching some data which will be overwritten */
+			continue;
+
+		int node = _starpu_task_data_get_node_on_worker(task, index, worker);
+
+		struct _starpu_data_replicate *replicate = &handle->per_node[node];
+		idle_prefetch_data_on_node(handle, node, replicate, mode, prio);
+	}
+
+	return 0;
+}
+
+int starpu_idle_prefetch_task_input_for(struct starpu_task *task, unsigned worker)
+{
+	int prio = task->priority;
+	if (task->workerorder)
+		prio = INT_MAX - task->workerorder;
+	return starpu_idle_prefetch_task_input_for_prio(task, worker, prio);
+}
+
 struct _starpu_data_replicate *get_replicate(starpu_data_handle_t handle, enum starpu_data_access_mode mode, int workerid, unsigned node)
 {
 	if (mode & (STARPU_SCRATCH|STARPU_REDUX))

+ 2 - 3
src/sched_policies/component_best_implementation.c

@@ -2,7 +2,7 @@
  *
  * Copyright (C) 2013                                     Inria
  * Copyright (C) 2014,2016-2017                           CNRS
- * Copyright (C) 2014-2017                                Université de Bordeaux
+ * Copyright (C) 2014-2018                                Université de Bordeaux
  * Copyright (C) 2013                                     Simon Archipoff
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -53,9 +53,8 @@ static int find_best_impl(unsigned sched_ctx_id, struct starpu_task * task, int
 	if(best_impl == -1)
 		return 0;
 
-	int memory_node = starpu_worker_get_memory_node(workerid);
 	task->predicted = len;
-	task->predicted_transfer = starpu_task_expected_data_transfer_time(memory_node, task);
+	task->predicted_transfer = starpu_task_expected_data_transfer_time_for(task, workerid);
 	starpu_task_set_implementation(task, best_impl);
 	return 1;
 }

+ 1 - 4
src/sched_policies/component_worker.c

@@ -415,10 +415,7 @@ static int simple_worker_push_task(struct starpu_sched_component * component, st
 	task->workerid = starpu_bitmap_first(component->workers);
 #if 1 /* dead lock problem? */
 	if (starpu_get_prefetch_flag() && !task->prefetched)
-	{
-		unsigned memory_node = starpu_worker_get_memory_node(task->workerid);
-		starpu_prefetch_task_input_on_node(task, memory_node);
-	}
+		starpu_prefetch_task_input_for(task, task->workerid);
 #endif
 	struct _starpu_worker_task_list * list = data->list;
 	STARPU_COMPONENT_MUTEX_LOCK(&list->mutex);

+ 11 - 18
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -86,7 +86,7 @@ static const float idle_power_minimum=0;
 static const float idle_power_maximum=10000.0;
 #endif /* !STARPU_USE_TOP */
 
-static int count_non_ready_buffers(struct starpu_task *task, unsigned node)
+static int count_non_ready_buffers(struct starpu_task *task, unsigned worker)
 {
 	int cnt = 0;
 	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
@@ -95,7 +95,7 @@ static int count_non_ready_buffers(struct starpu_task *task, unsigned node)
 	for (index = 0; index < nbuffers; index++)
 	{
 		starpu_data_handle_t handle;
-		unsigned buffer_node = _starpu_task_data_get_node_on_node(task, index, node);
+		unsigned buffer_node = _starpu_task_data_get_node_on_worker(task, index, worker);
 
 		handle = STARPU_TASK_GET_HANDLE(task, index);
 
@@ -188,7 +188,7 @@ static void _starpu_fifo_task_finished(struct _starpu_fifo_taskq *fifo, struct s
 
 
 
-static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo_taskq *fifo_queue, unsigned node, int num_priorities)
+static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo_taskq *fifo_queue, unsigned workerid, int num_priorities)
 {
 	struct starpu_task *task = NULL, *current;
 
@@ -215,7 +215,7 @@ static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo
 
 			if (priority >= first_task_priority)
 			{
-				int non_ready = count_non_ready_buffers(current, node);
+				int non_ready = count_non_ready_buffers(current, workerid);
 				if (non_ready < non_ready_best)
 				{
 					non_ready_best = non_ready;
@@ -252,13 +252,11 @@ static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
 	unsigned workerid = starpu_worker_get_id_check();
 	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
 
-	unsigned node = starpu_worker_get_memory_node(workerid);
-
 	/* Take the opportunity to update start time */
 	fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start);
 	fifo->exp_end = fifo->exp_start + fifo->exp_len;
 
-	task = _starpu_fifo_pop_first_ready_task(fifo, node, dt->num_priorities);
+	task = _starpu_fifo_pop_first_ready_task(fifo, workerid, dt->num_priorities);
 	if (task)
 	{
 		_starpu_fifo_task_transfer_started(fifo, task, dt->num_priorities);
@@ -268,7 +266,7 @@ static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
 #ifdef STARPU_VERBOSE
 		if (task->cl)
 		{
-			int non_ready = count_non_ready_buffers(task, node);
+			int non_ready = count_non_ready_buffers(task, workerid);
 			if (non_ready == 0)
 				dt->ready_task_cnt++;
 		}
@@ -305,7 +303,7 @@ static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
 #ifdef STARPU_VERBOSE
 		if (task->cl)
 		{
-			int non_ready = count_non_ready_buffers(task, starpu_worker_get_memory_node(workerid));
+			int non_ready = count_non_ready_buffers(task, workerid);
 			if (non_ready == 0)
 				dt->ready_task_cnt++;
 		}
@@ -429,10 +427,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 #endif /* !STARPU_USE_TOP */
 
 	if (starpu_get_prefetch_flag())
-	{
-		unsigned memory_node = starpu_worker_get_memory_node(best_workerid);
-		starpu_prefetch_task_input_on_node(task, memory_node);
-	}
+		starpu_prefetch_task_input_for(task, best_workerid);
 
 	STARPU_AYU_ADDTOTASKQUEUE(starpu_task_get_job_id(task), best_workerid);
 	unsigned stream_ctx_id = starpu_worker_get_sched_ctx_id_stream(best_workerid);
@@ -515,7 +510,6 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 		unsigned impl_mask;
 		unsigned worker = workers->get_next(workers, &it);
 		struct _starpu_fifo_taskq *fifo  = dt->queue_array[worker];
-		unsigned memory_node = starpu_worker_get_memory_node(worker);
 		struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(worker, sched_ctx_id);
 
 		/* Sometimes workers didn't take the tasks as early as we expected */
@@ -534,7 +528,7 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 
 			double exp_end;
 			double local_length = starpu_task_expected_length(task, perf_arch, nimpl);
-			double local_penalty = starpu_task_expected_data_transfer_time(memory_node, task);
+			double local_penalty = starpu_task_expected_data_transfer_time_for(task, worker);
 			double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
 
 			//_STARPU_DEBUG("Scheduler dm: task length (%lf) worker (%u) kernel (%u) \n", local_length,worker,nimpl);
@@ -726,7 +720,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 			else
 			{
 				local_task_length[worker_ctx][nimpl] = starpu_task_expected_length(task, perf_arch, nimpl);
-				local_data_penalty[worker_ctx][nimpl] = starpu_task_expected_data_transfer_time(memory_node, task);
+				local_data_penalty[worker_ctx][nimpl] = starpu_task_expected_data_transfer_time_for(task, workerid);
 				local_energy[worker_ctx][nimpl] = starpu_task_expected_energy(task, perf_arch,nimpl);
 				double conversion_time = starpu_task_expected_conversion_time(task, perf_arch, nimpl);
 				if (conversion_time > 0.0)
@@ -1161,12 +1155,11 @@ static void dmda_push_task_notify(struct starpu_task *task, int workerid, int pe
 	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
 	/* Compute the expected penality */
 	struct starpu_perfmodel_arch *perf_arch = starpu_worker_get_perf_archtype(perf_workerid, sched_ctx_id);
-	unsigned memory_node = starpu_worker_get_memory_node(workerid);
 
 	double predicted = starpu_task_expected_length(task, perf_arch,
 						       starpu_task_get_implementation(task));
 
-	double predicted_transfer = starpu_task_expected_data_transfer_time(memory_node, task);
+	double predicted_transfer = starpu_task_expected_data_transfer_time_for(task, workerid);
 	double now = starpu_timing_now();
 
 	/* Update the predictions */

+ 3 - 3
src/sched_policies/heteroprio.c

@@ -2,7 +2,7 @@
  *
  * Copyright (C) 2015-2017                                Inria
  * Copyright (C) 2015-2017                                CNRS
- * Copyright (C) 2015-2017                                Université de Bordeaux
+ * Copyright (C) 2015-2018                                Université de Bordeaux
  * Copyright (C) 2016                                     Uppsala University
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -536,7 +536,7 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
 				/* Decrease the number of tasks to found */
 				nb_tasks_to_prefetch -= 1;
 				nb_added_tasks       += 1;
-				// TODO starpu_prefetch_task_input_on_node(task, workerid);
+				// TODO starpu_prefetch_task_input_for(task, workerid);
 			}
 		}		
 	}
@@ -649,7 +649,7 @@ done:		;
 		     task_to_prefetch  = starpu_task_prio_list_next(&worker->tasks_queue.list, task_to_prefetch))
 		{
 			/* prefetch from closest to end task */
-			starpu_prefetch_task_input_on_node(task_to_prefetch, memory_node);
+			starpu_prefetch_task_input_for(task_to_prefetch, workerid);
 			nb_added_tasks -= 1;
 		}
 	}

+ 2 - 6
src/sched_policies/parallel_heft.c

@@ -106,12 +106,8 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 
 	struct _starpu_pheft_data *hd = (struct _starpu_pheft_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
-	/* Is this a basic worker or a combined worker ? */
-	unsigned memory_node;
-	memory_node = starpu_worker_get_memory_node(best_workerid);
-
 	if (starpu_get_prefetch_flag())
-		starpu_prefetch_task_input_on_node(task, memory_node);
+		starpu_prefetch_task_input_for(task, best_workerid);
 
 	int ret = 0;
 
@@ -354,7 +350,7 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 			local_task_length[worker_ctx][nimpl] = starpu_task_expected_length(task, perf_arch,nimpl);
 
 			unsigned memory_node = starpu_worker_get_memory_node(workerid);
-			local_data_penalty[worker_ctx][nimpl] = starpu_task_expected_data_transfer_time(memory_node, task);
+			local_data_penalty[worker_ctx][nimpl] = starpu_task_expected_data_transfer_time_for(task, workerid);
 
 			double ntasks_end = compute_ntasks_end(workerid, sched_ctx_id);