Explorar o código

Add a heft variant which draws tasks from its own fifo

Samuel Thibault %!s(int64=12) %!d(string=hai) anos
pai
achega
23b0ea64b9

+ 4 - 0
include/starpu_sched_node.h

@@ -220,6 +220,10 @@ struct starpu_sched_node * starpu_sched_node_mct_create(struct starpu_mct_data *
 
 int starpu_sched_node_is_mct(struct starpu_sched_node * node);
 
+struct starpu_sched_node * starpu_sched_node_heft_create(struct starpu_mct_data * mct_data);
+
+int starpu_sched_node_is_heft(struct starpu_sched_node * node);
+
 /* this node select the best implementation for the first worker in context that can execute task.
  * and fill task->predicted and task->predicted_transfer
  * cannot have several childs if push_task is called

+ 3 - 1
src/Makefile.am

@@ -253,6 +253,7 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 		\
 	sched_policies/node_random.c				\
 	sched_policies/node_eager.c				\
 	sched_policies/node_mct.c				\
+	sched_policies/node_heft.c				\
 	sched_policies/node_best_implementation.c		\
 	sched_policies/node_perfmodel_select.c				\
 	sched_policies/node_composed.c				\
@@ -263,7 +264,8 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 		\
 	sched_policies/tree_random.c				\
 	sched_policies/tree_random_prefetching.c			\
 	sched_policies/tree_ws.c				\
-	sched_policies/tree_heft.c
+	sched_policies/tree_heft.c				\
+	sched_policies/tree_heft2.c
 
 
 if STARPU_HAVE_LEVELDB

+ 1 - 0
src/core/sched_policy.c

@@ -43,6 +43,7 @@ static struct starpu_sched_policy *predefined_policies[] =
 	&_starpu_sched_tree_random_prio_prefetching_policy,
 	&_starpu_sched_tree_ws_policy,
 	&_starpu_sched_tree_heft_policy,
+	&_starpu_sched_tree_heft2_policy,
 	&_starpu_sched_eager_policy,
 	&_starpu_sched_prio_policy,
 	&_starpu_sched_random_policy,

+ 1 - 0
src/core/sched_policy.h

@@ -77,4 +77,5 @@ extern struct starpu_sched_policy _starpu_sched_tree_random_prefetching_policy;
 extern struct starpu_sched_policy _starpu_sched_tree_random_prio_prefetching_policy;
 extern struct starpu_sched_policy _starpu_sched_tree_ws_policy;
 extern struct starpu_sched_policy _starpu_sched_tree_heft_policy;
+extern struct starpu_sched_policy _starpu_sched_tree_heft2_policy;
 #endif // __SCHED_POLICY_H__

+ 208 - 0
src/sched_policies/node_heft.c

@@ -0,0 +1,208 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013  Université de Bordeaux 1
+ * Copyright (C) 2013  INRIA
+ * Copyright (C) 2013  Simon Archipoff
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+/* HEFT variant which tries to schedule a given number of tasks instead of just
+ * the first of its scheduling window.  */
+
+#include <starpu_sched_node.h>
+#include "prio_deque.h"
+#include "sched_node.h"
+#include <starpu_perfmodel.h>
+#include "helper_mct.h"
+#include <float.h>
+
+#define NTASKS 5
+
+struct _starpu_heft_data
+{
+	struct _starpu_prio_deque prio;
+	starpu_pthread_mutex_t mutex;
+	struct _starpu_mct_data *mct_data;
+};
+
+static void heft_progress(struct starpu_sched_node *node);
+
+static int heft_push_task(struct starpu_sched_node * node, struct starpu_task * task)
+{
+	STARPU_ASSERT(node && task && starpu_sched_node_is_heft(node));
+	struct _starpu_heft_data * data = node->data;
+	struct _starpu_prio_deque * prio = &data->prio;
+	starpu_pthread_mutex_t * mutex = &data->mutex;
+
+	STARPU_PTHREAD_MUTEX_LOCK(mutex);
+	_starpu_prio_deque_push_task(prio,task);
+	STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+
+	heft_progress(node);
+
+	return 0;
+}
+
+static int heft_progress_one(struct starpu_sched_node *node)
+{
+	struct _starpu_heft_data * data = node->data;
+	starpu_pthread_mutex_t * mutex = &data->mutex;
+	struct _starpu_prio_deque * prio = &data->prio;
+	struct starpu_task * (tasks[NTASKS]);
+
+	unsigned ntasks, n, i;
+
+	STARPU_PTHREAD_MUTEX_LOCK(mutex);
+	/* Try to look at NTASKS from the queue */
+	for (ntasks = 0; ntasks < NTASKS; ntasks++)
+	{
+		tasks[ntasks] = _starpu_prio_deque_pop_task(prio);
+		if (!tasks[ntasks])
+			break;
+	}
+
+	if (!ntasks) {
+		STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+		return 1;
+	}
+
+	{
+		struct _starpu_mct_data * d = data->mct_data;
+		struct starpu_sched_node * best_node = NULL;
+
+		/* Estimated task duration for each child */
+		double estimated_lengths[node->nchilds * ntasks];
+		/* Estimated transfer duration for each child */
+		double estimated_transfer_length[node->nchilds * ntasks];
+		/* Estimated transfer+task termination for each child */
+		double estimated_ends_with_task[node->nchilds * ntasks];
+
+		/* Minimum transfer+task termination on all children */
+		double min_exp_end_with_task = DBL_MAX;
+		/* Maximum transfer+task termination on all children */
+		double max_exp_end_with_task = 0.0;
+
+		int suitable_nodes[node->nchilds * ntasks];
+
+		unsigned nsuitable_nodes[ntasks];
+
+		for (n = 0; n < ntasks; n++)
+		{
+			int offset = node->nchilds * n;
+
+			nsuitable_nodes[n] = starpu_mct_compute_expected_times(node, tasks[n],
+					estimated_lengths + offset,
+					estimated_transfer_length + offset,
+					estimated_ends_with_task + offset,
+					&min_exp_end_with_task, &max_exp_end_with_task,
+					suitable_nodes + offset);
+		}
+
+		double best_fitness = DBL_MAX;
+		int best_inode = -1;
+		int best_task = -1;
+
+		for (n = 0; n < ntasks; n++)
+		{
+			for(i = 0; i < nsuitable_nodes[n]; i++)
+			{
+				int offset = node->nchilds * n;
+				int inode = suitable_nodes[offset + i];
+#ifdef STARPU_DEVEL
+#warning FIXME: take power consumption into account
+#endif
+				double tmp = starpu_mct_compute_fitness(d,
+							     estimated_ends_with_task[offset + inode],
+							     min_exp_end_with_task,
+							     max_exp_end_with_task,
+							     estimated_transfer_length[offset + inode],
+							     0.0);
+
+				if(tmp < best_fitness)
+				{
+					best_fitness = tmp;
+					best_inode = inode;
+					best_task = n;
+				}
+			}
+		}
+
+		STARPU_ASSERT(best_inode != -1);
+		STARPU_ASSERT(best_task >= 0);
+		best_node = node->childs[best_inode];
+
+		/* Push back the other tasks */
+		for (n = 0; n < ntasks; n++)
+			if ((int) n != best_task)
+				_starpu_prio_deque_push_back_task(prio, tasks[n]);
+		STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+
+		int ret = best_node->push_task(best_node, tasks[best_task]);
+
+		if (ret)
+		{
+			/* Could not push to child actually, push that one back too */
+			STARPU_PTHREAD_MUTEX_LOCK(mutex);
+			_starpu_prio_deque_push_back_task(prio, tasks[best_task]);
+			STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+			return 1;
+		}
+		else
+		{
+			return 0;
+		}
+	}
+}
+
+/* Try to push some tasks below */
+static void heft_progress(struct starpu_sched_node *node)
+{
+	STARPU_ASSERT(node && starpu_sched_node_is_heft(node));
+	while (!heft_progress_one(node))
+		;
+}
+
+static void heft_room(struct starpu_sched_node *node, unsigned sched_ctx_id)
+{
+	heft_progress(node);
+}
+
+void heft_node_deinit_data(struct starpu_sched_node * node)
+{
+	STARPU_ASSERT(starpu_sched_node_is_heft(node));
+	struct _starpu_mct_data * d = node->data;
+	free(d);
+}
+
+int starpu_sched_node_is_heft(struct starpu_sched_node * node)
+{
+	return node->push_task == heft_push_task;
+}
+
+struct starpu_sched_node * starpu_sched_node_heft_create(struct starpu_mct_data * params)
+{
+	struct starpu_sched_node * node = starpu_sched_node_create();
+	struct _starpu_mct_data *mct_data = starpu_mct_init_parameters(params);
+	struct _starpu_heft_data *data = malloc(sizeof(*data));
+
+	_starpu_prio_deque_init(&data->prio);
+	STARPU_PTHREAD_MUTEX_INIT(&data->mutex,NULL);
+	data->mct_data = mct_data;
+	node->data = data;
+
+	node->push_task = heft_push_task;
+	node->room = heft_room;
+	node->deinit_data = heft_node_deinit_data;
+
+	return node;
+}

+ 120 - 0
src/sched_policies/tree_heft2.c

@@ -0,0 +1,120 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013  Université de Bordeaux 1
+ * Copyright (C) 2013  INRIA
+ * Copyright (C) 2013  Simon Archipoff
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu_sched_node.h>
+#include <starpu_scheduler.h>
+#include <float.h>
+
+/* The two thresolds concerns the prio nodes, which contains queues
+ * who can handle the priority of StarPU tasks. You can tune your
+ * scheduling by benching those values and choose which one is the
+ * best for your current application. 
+ * The current value of the ntasks_threshold is the best we found
+ * so far across several types of applications (cholesky, LU, stencil).
+ */
+#define _STARPU_SCHED_NTASKS_THRESHOLD_DEFAULT 30
+#define _STARPU_SCHED_EXP_LEN_THRESHOLD_DEFAULT 1000000000.0
+
+static void initialize_heft2_center_policy(unsigned sched_ctx_id)
+{
+	starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
+
+	unsigned ntasks_threshold = _STARPU_SCHED_NTASKS_THRESHOLD_DEFAULT;
+	double exp_len_threshold = _STARPU_SCHED_EXP_LEN_THRESHOLD_DEFAULT;
+
+	const char *strval_ntasks_threshold = getenv("STARPU_NTASKS_THRESHOLD");
+	if (strval_ntasks_threshold)
+		ntasks_threshold = atof(strval_ntasks_threshold);
+
+	const char *strval_exp_len_threshold = getenv("STARPU_EXP_LEN_THRESHOLD");
+	if (strval_exp_len_threshold)
+		exp_len_threshold = atof(strval_exp_len_threshold);
+
+
+	struct starpu_sched_tree * t = starpu_sched_tree_create(sched_ctx_id);
+
+	struct starpu_sched_node * perfmodel_node = starpu_sched_node_heft_create(NULL);
+	struct starpu_sched_node * no_perfmodel_node = starpu_sched_node_eager_create(NULL);
+	struct starpu_sched_node * calibrator_node = starpu_sched_node_eager_create(NULL);
+	
+	struct starpu_perfmodel_select_data perfmodel_select_data =
+		{
+			.calibrator_node = calibrator_node,
+			.no_perfmodel_node = no_perfmodel_node,
+			.perfmodel_node = perfmodel_node,
+		};
+
+	struct starpu_sched_node * perfmodel_select_node = starpu_sched_node_perfmodel_select_create(&perfmodel_select_data);
+	t->root = perfmodel_select_node;
+
+	perfmodel_select_node->add_child(perfmodel_select_node, calibrator_node);
+	starpu_sched_node_set_father(calibrator_node, perfmodel_select_node, sched_ctx_id);
+	perfmodel_select_node->add_child(perfmodel_select_node, perfmodel_node);
+	starpu_sched_node_set_father(perfmodel_node, perfmodel_select_node, sched_ctx_id);
+	perfmodel_select_node->add_child(perfmodel_select_node, no_perfmodel_node);
+	starpu_sched_node_set_father(no_perfmodel_node, perfmodel_select_node, sched_ctx_id);
+
+	struct starpu_prio_data prio_data =
+		{
+			.ntasks_threshold = ntasks_threshold,
+			.exp_len_threshold = exp_len_threshold,
+		};
+
+	unsigned i;
+	for(i = 0; i < starpu_worker_get_count() + starpu_combined_worker_get_count(); i++)
+	{
+		struct starpu_sched_node * worker_node = starpu_sched_node_worker_get(i);
+		STARPU_ASSERT(worker_node);
+
+		struct starpu_sched_node * prio = starpu_sched_node_prio_create(&prio_data);
+		prio->add_child(prio, worker_node);
+		starpu_sched_node_set_father(worker_node, prio, sched_ctx_id);
+
+		struct starpu_sched_node * impl_node = starpu_sched_node_best_implementation_create(NULL);
+		impl_node->add_child(impl_node, prio);
+		starpu_sched_node_set_father(prio, impl_node, sched_ctx_id);
+
+		perfmodel_node->add_child(perfmodel_node, impl_node);
+		starpu_sched_node_set_father(impl_node, perfmodel_node, sched_ctx_id);
+	}
+
+	starpu_sched_tree_update_workers(t);
+	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)t);
+}
+
+static void deinitialize_heft2_center_policy(unsigned sched_ctx_id)
+{
+	struct starpu_sched_tree *t = (struct starpu_sched_tree*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	starpu_sched_tree_destroy(t);
+	starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
+}
+
+struct starpu_sched_policy _starpu_sched_tree_heft2_policy =
+{
+	.init_sched = initialize_heft2_center_policy,
+	.deinit_sched = deinitialize_heft2_center_policy,
+	.add_workers = starpu_sched_tree_add_workers,
+	.remove_workers = starpu_sched_tree_remove_workers,
+	.push_task = starpu_sched_tree_push_task,
+	.pop_task = starpu_sched_tree_pop_task,
+	.pre_exec_hook = starpu_sched_node_worker_pre_exec_hook,
+	.post_exec_hook = starpu_sched_node_worker_post_exec_hook,
+	.pop_every_task = NULL,
+	.policy_name = "tree-heft2",
+	.policy_description = "heft tree2 policy"
+};