Przeglądaj źródła

Factorize code for the dm and the dmda policies

Cédric Augonnet 15 lat temu
rodzic
commit
2a2d5dd518

+ 0 - 3
src/Makefile.am

@@ -62,8 +62,6 @@ noinst_HEADERS = 						\
 	core/sched_policy.h					\
 	core/policies/random_policy.h				\
 	core/policies/eager_central_policy.h			\
-	core/policies/deque_modeling_policy.h			\
-	core/policies/deque_modeling_policy_data_aware.h	\
 	core/policies/work_stealing_policy.h			\
 	core/mechanisms/priority_queues.h			\
 	core/mechanisms/fifo_queues.h				\
@@ -140,7 +138,6 @@ libstarpu_la_SOURCES = 						\
 	core/policies/eager_central_priority_policy.c		\
 	core/policies/work_stealing_policy.c			\
 	core/sched_policy.c					\
-	core/policies/deque_modeling_policy.c			\
 	core/policies/random_policy.c				\
 	core/policies/deque_modeling_policy_data_aware.c	\
 	drivers/driver_common/driver_common.c			\

+ 0 - 196
src/core/policies/deque_modeling_policy.c

@@ -1,196 +0,0 @@
-/*
- * StarPU
- * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
-
-#include <common/config.h>
-#include <core/workers.h>
-#include <core/mechanisms/fifo_queues.h>
-#include <core/mechanisms/deque_queues.h>
-#include <core/perfmodel/perfmodel.h>
-
-static unsigned nworkers;
-static struct starpu_fifo_taskq_s *queue_array[STARPU_NMAXWORKERS];
-
-static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
-static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
-
-static struct starpu_task *dm_pop_task(void)
-{
-	struct starpu_task *task;
-
-	int workerid = starpu_worker_get_id();
-
-	struct starpu_fifo_taskq_s *fifo = queue_array[workerid];
-
-	task = _starpu_fifo_pop_task(fifo);
-	if (task) {
-		double model = task->predicted;
-	
-		fifo->exp_len -= model;
-		fifo->exp_start = _starpu_timing_now() + model;
-		fifo->exp_end = fifo->exp_start + fifo->exp_len;
-	}	
-
-	return task;
-}
-
-static struct starpu_task *dm_pop_every_task(uint32_t where)
-{
-	struct starpu_task *new_list;
-
-	int workerid = starpu_worker_get_id();
-
-	struct starpu_fifo_taskq_s *fifo = queue_array[workerid];
-
-	new_list = _starpu_fifo_pop_every_task(fifo, &sched_mutex[workerid], where);
-
-	while (new_list)
-	{
-		double model = new_list->predicted;
-
-		fifo->exp_len -= model;
-		fifo->exp_start = _starpu_timing_now() + model;
-		fifo->exp_end = fifo->exp_start + fifo->exp_len;
-	
-		new_list = new_list->next;
-	}
-
-	return new_list;
-}
-
-static int _dm_push_task(struct starpu_task *task, unsigned prio)
-{
-	/* find the queue */
-	struct starpu_fifo_taskq_s *fifo;
-	unsigned worker;
-	int best = -1;
-
-	double best_exp_end = 0.0;
-	double model_best = 0.0;
-
-	for (worker = 0; worker < nworkers; worker++)
-	{
-		double exp_end;
-		
-		fifo = queue_array[worker];
-
-		fifo->exp_start = STARPU_MAX(fifo->exp_start, _starpu_timing_now());
-		fifo->exp_end = STARPU_MAX(fifo->exp_end, _starpu_timing_now());
-
-		if (!_starpu_worker_may_execute_task(worker, task->cl->where))
-		{
-			/* no one on that queue may execute this task */
-			continue;
-		}
-
-		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
-		double local_length = _starpu_task_expected_length(worker, task, perf_arch);
-
-		if (local_length == -1.0) 
-		{
-			/* there is no prediction available for that task
-			 * with that arch we want to speed-up calibration time 
-			 * so we force this measurement */
-			/* XXX assert we are benchmarking ! */
-			best = worker;
-			model_best = 0.0;
-			exp_end = fifo->exp_start + fifo->exp_len;
-			break;
-		}
-
-
-		exp_end = fifo->exp_start + fifo->exp_len + local_length;
-
-		if (best == -1 || exp_end < best_exp_end)
-		{
-			/* a better solution was found */
-			best_exp_end = exp_end;
-			best = worker;
-			model_best = local_length;
-		}
-	}
-
-	
-	/* make sure someone coule execute that task ! */
-	STARPU_ASSERT(best != -1);
-
-	/* we should now have the best worker in variable "best" */
-	fifo = queue_array[best];
-
-	fifo->exp_end += model_best;
-	fifo->exp_len += model_best;
-
-	task->predicted = model_best;
-
-	unsigned memory_node = starpu_worker_get_memory_node(best);
-
-	if (_starpu_get_prefetch_flag())
-		_starpu_prefetch_task_input_on_node(task, memory_node);
-
-	if (prio) {
-		return _starpu_fifo_push_prio_task(queue_array[best], &sched_mutex[best], &sched_cond[best], task);
-	} else {
-		return _starpu_fifo_push_task(queue_array[best], &sched_mutex[best], &sched_cond[best], task);
-	}
-}
-
-static int dm_push_prio_task(struct starpu_task *task)
-{
-	return _dm_push_task(task, 1);
-}
-
-static int dm_push_task(struct starpu_task *task)
-{
-	if (task->priority == STARPU_MAX_PRIO)
-		return _dm_push_task(task, 1);
-
-	return _dm_push_task(task, 0);
-}
-
-static void initialize_dm_policy(struct starpu_machine_topology_s *topology, 
-	 __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
-{
-	nworkers = topology->nworkers;
-
-	unsigned workerid;
-	for (workerid = 0; workerid < nworkers; workerid++)
-	{
-		queue_array[workerid] = _starpu_create_fifo();
-	
-		PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
-		PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
-	
-		starpu_worker_set_sched_condition(workerid, &sched_cond[workerid], &sched_mutex[workerid]);
-	}
-}
-
-static void deinitialize_dm_policy(struct starpu_machine_topology_s *topology, 
-	 __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
-{
-	unsigned worker;
-	for (worker = 0; worker < nworkers; worker++)
-		_starpu_destroy_fifo(queue_array[worker]);
-}
-
-struct starpu_sched_policy_s _starpu_sched_dm_policy = {
-	.init_sched = initialize_dm_policy,
-	.deinit_sched = deinitialize_dm_policy,
-	.push_task = dm_push_task, 
-	.push_prio_task = dm_push_prio_task,
-	.pop_task = dm_pop_task,
-	.pop_every_task = dm_pop_every_task,
-	.policy_name = "dm",
-	.policy_description = "performance model"
-};

+ 0 - 26
src/core/policies/deque_modeling_policy.h

@@ -1,26 +0,0 @@
-/*
- * StarPU
- * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
-
-#ifndef __DEQUE_MODELING_POLICY_H__
-#define __DEQUE_MODELING_POLICY_H__
-
-#include <core/workers.h>
-#include <core/mechanisms/fifo_queues.h>
-#include <core/mechanisms/deque_queues.h>
-
-extern struct starpu_sched_policy_s _starpu_sched_dm_policy;
-
-#endif // __DEQUE_MODELING_POLICY_H__

+ 125 - 23
src/core/policies/deque_modeling_policy_data_aware.c

@@ -14,7 +14,8 @@
  * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  */
 
-#include <core/policies/deque_modeling_policy_data_aware.h>
+#include <core/workers.h>
+#include <core/mechanisms/fifo_queues.h>
 #include <core/perfmodel/perfmodel.h>
 
 static unsigned nworkers;
@@ -45,17 +46,111 @@ static struct starpu_task *dmda_pop_task(void)
 	return task;
 }
 
-static void update_data_requests(uint32_t memory_node, struct starpu_task *task)
+static struct starpu_task *dmda_pop_every_task(uint32_t where)
 {
-	unsigned nbuffers = task->cl->nbuffers;
-	unsigned buffer;
+	struct starpu_task *new_list;
 
-	for (buffer = 0; buffer < nbuffers; buffer++)
+	int workerid = starpu_worker_get_id();
+
+	struct starpu_fifo_taskq_s *fifo = queue_array[workerid];
+
+	new_list = _starpu_fifo_pop_every_task(fifo, &sched_mutex[workerid], where);
+
+	while (new_list)
+	{
+		double model = new_list->predicted;
+
+		fifo->exp_len -= model;
+		fifo->exp_start = _starpu_timing_now() + model;
+		fifo->exp_end = fifo->exp_start + fifo->exp_len;
+	
+		new_list = new_list->next;
+	}
+
+	return new_list;
+}
+
+static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio)
+{
+	/* make sure someone coule execute that task ! */
+	STARPU_ASSERT(best_workerid != -1);
+
+	struct starpu_fifo_taskq_s *fifo;
+	fifo = queue_array[best_workerid];
+
+	fifo->exp_end += predicted;
+	fifo->exp_len += predicted;
+
+	task->predicted = predicted;
+
+	unsigned memory_node = starpu_worker_get_memory_node(best_workerid);
+
+	if (_starpu_get_prefetch_flag())
+		_starpu_prefetch_task_input_on_node(task, memory_node);
+
+	if (prio) {
+		return _starpu_fifo_push_prio_task(queue_array[best_workerid],
+				&sched_mutex[best_workerid], &sched_cond[best_workerid], task);
+	} else {
+		return _starpu_fifo_push_task(queue_array[best_workerid],
+				&sched_mutex[best_workerid], &sched_cond[best_workerid], task);
+	}
+}
+
+static int _dm_push_task(struct starpu_task *task, unsigned prio)
+{
+	/* find the queue */
+	struct starpu_fifo_taskq_s *fifo;
+	unsigned worker;
+	int best = -1;
+
+	double best_exp_end = 0.0;
+	double model_best = 0.0;
+
+	for (worker = 0; worker < nworkers; worker++)
 	{
-		starpu_data_handle handle = task->buffers[buffer].handle;
+		double exp_end;
+		
+		fifo = queue_array[worker];
 
-		_starpu_set_data_requested_flag_if_needed(handle, memory_node);
+		fifo->exp_start = STARPU_MAX(fifo->exp_start, _starpu_timing_now());
+		fifo->exp_end = STARPU_MAX(fifo->exp_end, _starpu_timing_now());
+
+		if (!_starpu_worker_may_execute_task(worker, task->cl->where))
+		{
+			/* no one on that queue may execute this task */
+			continue;
+		}
+
+		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
+		double local_length = _starpu_task_expected_length(worker, task, perf_arch);
+
+		if (local_length == -1.0) 
+		{
+			/* there is no prediction available for that task
+			 * with that arch we want to speed-up calibration time 
+			 * so we force this measurement */
+			/* XXX assert we are benchmarking ! */
+			best = worker;
+			model_best = 0.0;
+			exp_end = fifo->exp_start + fifo->exp_len;
+			break;
+		}
+
+
+		exp_end = fifo->exp_start + fifo->exp_len + local_length;
+
+		if (best == -1 || exp_end < best_exp_end)
+		{
+			/* a better solution was found */
+			best_exp_end = exp_end;
+			best = worker;
+			model_best = local_length;
+		}
 	}
+	
+	/* we should now have the best worker in variable "best" */
+	return push_task_on_best_worker(task, best, model_best, prio);
 }
 
 static int _dmda_push_task(struct starpu_task *task, unsigned prio)
@@ -159,25 +254,20 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio)
 	}
 
 	/* we should now have the best worker in variable "best" */
-	fifo = queue_array[best];
-
-	fifo->exp_end += model_best;
-	fifo->exp_len += model_best;
-
-	task->predicted = model_best;
+	return push_task_on_best_worker(task, best, model_best, prio);
+}
 
-	unsigned memory_node = starpu_worker_get_memory_node(best);
+static int dm_push_prio_task(struct starpu_task *task)
+{
+	return _dm_push_task(task, 1);
+}
 
-	update_data_requests(memory_node, task);
-	
-	if (_starpu_get_prefetch_flag())
-		_starpu_prefetch_task_input_on_node(task, memory_node);
+static int dm_push_task(struct starpu_task *task)
+{
+	if (task->priority == STARPU_MAX_PRIO)
+		return _dm_push_task(task, 1);
 
-	if (prio) {
-		return _starpu_fifo_push_prio_task(queue_array[best], &sched_mutex[best], &sched_cond[best], task);
-	} else {
-		return _starpu_fifo_push_task(queue_array[best], &sched_mutex[best], &sched_cond[best], task);
-	}
+	return _dm_push_task(task, 0);
 }
 
 static int dmda_push_prio_task(struct starpu_task *task)
@@ -226,12 +316,24 @@ static void deinitialize_dmda_policy(struct starpu_machine_topology_s *topology,
 		_starpu_destroy_fifo(queue_array[workerid]);
 }
 
+struct starpu_sched_policy_s _starpu_sched_dm_policy = {
+	.init_sched = initialize_dmda_policy,
+	.deinit_sched = deinitialize_dmda_policy,
+	.push_task = dm_push_task, 
+	.push_prio_task = dm_push_prio_task,
+	.pop_task = dmda_pop_task,
+	.pop_every_task = dmda_pop_every_task,
+	.policy_name = "dm",
+	.policy_description = "performance model"
+};
+
 struct starpu_sched_policy_s _starpu_sched_dmda_policy = {
 	.init_sched = initialize_dmda_policy,
 	.deinit_sched = deinitialize_dmda_policy,
 	.push_task = dmda_push_task, 
 	.push_prio_task = dmda_push_prio_task, 
 	.pop_task = dmda_pop_task,
+	.pop_every_task = dmda_pop_every_task,
 	.policy_name = "dmda",
 	.policy_description = "data-aware performance model"
 };

+ 0 - 25
src/core/policies/deque_modeling_policy_data_aware.h

@@ -1,25 +0,0 @@
-/*
- * StarPU
- * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
-
-#ifndef __DEQUE_MODELING_POLICY_DATA_AWARE_H__
-#define __DEQUE_MODELING_POLICY_DATA_AWARE_H__
-
-#include <core/workers.h>
-#include <core/mechanisms/fifo_queues.h>
-
-extern struct starpu_sched_policy_s _starpu_sched_dmda_policy;
-
-#endif // __DEQUE_MODELING_POLICY_DATA_AWARE_H__

+ 2 - 0
src/datawizard/coherency.c

@@ -358,6 +358,8 @@ int _starpu_prefetch_task_input_on_node(struct starpu_task *task, uint32_t node)
 			continue;
 	
 		prefetch_data_on_node(handle, mode, node);
+
+		_starpu_set_data_requested_flag_if_needed(handle, node);
 	}
 
 	return 0;