瀏覽代碼

Introduce the notion of task bundles. A task bundle is a set of tasks that the
scheduler should try to schedule together when possible. A bundle structure is
initialized with starpu_task_bundle_init, tasks can be inserted in the bundle
with starpu_task_bundle_insert until the bundle is closed with
starpu_task_bundle_close. Modify the HEFT strategy to take data bundles into
account.

Cédric Augonnet 14 年之前
父節點
當前提交
e55f7e46a2
共有 7 個文件被更改,包括 468 次插入7 次删除
  1. 1 0
      Makefile.am
  2. 4 0
      include/starpu_task.h
  3. 73 0
      include/starpu_task_bundle.h
  4. 1 0
      src/Makefile.am
  5. 16 0
      src/core/task.c
  6. 330 0
      src/core/task_bundle.c
  7. 43 7
      src/sched_policies/heft.c

+ 1 - 0
Makefile.am

@@ -32,6 +32,7 @@ include_HEADERS = 				\
 	include/starpu_data_filters.h		\
 	include/starpu_data_interfaces.h	\
 	include/starpu_task.h			\
+	include/starpu_task_bundle.h		\
 	include/starpu_task_list.h		\
 	include/starpu_data.h			\
 	include/starpu_perfmodel.h		\

+ 4 - 0
include/starpu_task.h

@@ -115,6 +115,9 @@ struct starpu_task {
 	unsigned execute_on_a_specific_worker;
 	unsigned workerid;
 
+	/* Bundle including the task */
+	struct starpu_task_bundle *bundle;
+
 	/* If this flag is set, it is not possible to synchronize with the task
 	 * by the means of starpu_task_wait later on. Internal data structures
 	 * are only garanteed to be freed once starpu_task_wait is called if
@@ -169,6 +172,7 @@ struct starpu_task {
 	.use_tag = 0,					\
 	.synchronous = 0,				\
 	.execute_on_a_specific_worker = 0,		\
+	.bundle = NULL,					\
 	.detach = 1,					\
 	.destroy = 0,					\
 	.regenerate = 0,				\

+ 73 - 0
include/starpu_task_bundle.h

@@ -0,0 +1,73 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010, 2011  Université de Bordeaux 1
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __STARPU_TASK_BUNDLE_H__
+#define __STARPU_TASK_BUNDLE_H__
+
+#include <starpu.h>
+#include <starpu_config.h>
+#include <pthread.h>
+
+struct starpu_task_bundle_entry {
+	struct starpu_task *task;
+	struct starpu_task_bundle_entry *next;
+};
+
+/* The task bundle structure describes a list of tasks that should be scheduled
+ * together whenever possible. */
+struct starpu_task_bundle {
+	/* Mutex protecting the bundle */
+	pthread_mutex_t mutex;
+	/* last worker previously assigned a task from the bundle (-1 if none) .*/
+	int previous_workerid;
+	/* list of tasks */
+	struct starpu_task_bundle_entry *list;
+	/* If this flag is set, the bundle structure is automatically free'd
+	 * when the bundle is deinitialized. */
+	int destroy;
+	/* Is the bundle closed ? */
+	int closed;
+	/* TODO retain bundle (do not schedule until closed) */
+};
+
+/* Initialize a task bundle */
+void starpu_task_bundle_init(struct starpu_task_bundle *bundle);
+
+/* Deinitialize a bundle. In case the destroy flag is set, the bundle structure
+ * is freed too. */
+void starpu_task_bundle_deinit(struct starpu_task_bundle *bundle);
+
+/* Insert a task into a bundle. */
+int starpu_task_bundle_insert(struct starpu_task_bundle *bundle, struct starpu_task *task);
+
+/* Remove a task from a bundle. This method must be called with bundle->mutex
+ * hold. This function returns 0 if the task was found, -ENOENT if the element
+ * was not found, 1 if the element is found and if the list was deinitialized
+ * because it became empty. */
+int starpu_task_bundle_remove(struct starpu_task_bundle *bundle, struct starpu_task *task);
+
+/* Close a bundle. No task can be added to a closed bundle. A closed bundle
+ * automatically gets deinitialized when it becomes empty. */
+void starpu_task_bundle_close(struct starpu_task_bundle *bundle);
+
+/* Return the expected duration of the entire task bundle in µs. */
+double starpu_task_bundle_expected_length(struct starpu_task_bundle *bundle, enum starpu_perf_archtype arch);
+/* Return the time (in µs) expected to transfer all data used within the bundle */
+double starpu_task_bundle_expected_data_transfer_time(struct starpu_task_bundle *bundle, unsigned memory_node);
+/* Return the expected power consumption of the entire task bundle in J. */
+double starpu_task_bundle_expected_power(struct starpu_task_bundle *bundle,  enum starpu_perf_archtype arch);
+
+#endif // __STARPU_TASK_BUNDLE_H__

+ 1 - 0
src/Makefile.am

@@ -116,6 +116,7 @@ libstarpu_la_SOURCES = 						\
 	common/utils.c						\
 	core/jobs.c						\
 	core/task.c						\
+	core/task_bundle.c					\
 	core/workers.c						\
 	core/combined_workers.c					\
 	core/topology.c						\

+ 16 - 0
src/core/task.c

@@ -17,6 +17,7 @@
 
 #include <starpu.h>
 #include <starpu_profiling.h>
+#include <starpu_task_bundle.h>
 #include <core/workers.h>
 #include <core/jobs.h>
 #include <core/task.h>
@@ -57,6 +58,8 @@ void starpu_task_init(struct starpu_task *task)
 
 	task->execute_on_a_specific_worker = 0;
 
+	task->bundle = NULL;
+
 	task->detach = 1;
 
 	/* by default, we do not let StarPU free the task structure since
@@ -88,6 +91,19 @@ void starpu_task_deinit(struct starpu_task *task)
 		task->profiling_info = NULL;
 	}
 
+	/* If case the task is (still) part of a bundle */
+	struct starpu_task_bundle *bundle = task->bundle;
+	if (bundle)
+	{
+		PTHREAD_MUTEX_LOCK(&bundle->mutex);
+		int ret = starpu_task_bundle_remove(bundle, task);
+
+		/* Perhaps the bundle was destroyed when removing the last
+		 * entry */
+		if (ret != 1)
+			PTHREAD_MUTEX_UNLOCK(&bundle->mutex);
+	}
+
 	starpu_job_t j = (struct starpu_job_s *)task->starpu_private;
 
 	if (j)

+ 330 - 0
src/core/task_bundle.c

@@ -0,0 +1,330 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2011  Université de Bordeaux 1
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include <starpu_task_bundle.h>
+#include <starpu_scheduler.h>
+#include <common/config.h>
+#include <common/utils.h>
+#include <common/list.h>
+
+/* Initialize a task bundle */
+void starpu_task_bundle_init(struct starpu_task_bundle *bundle)
+{
+	STARPU_ASSERT(bundle);
+
+	PTHREAD_MUTEX_INIT(&bundle->mutex, NULL);
+	bundle->closed = 0;
+
+	/* Start with an empty list */
+	bundle->previous_workerid = -1;
+	bundle->list = NULL;
+
+	/* By default, bundle are destroyed */
+	bundle->destroy = 1;
+
+}
+
+/* Deinitialize a bundle. In case the destroy flag is set, the bundle structure
+ * is freed too. */
+void starpu_task_bundle_deinit(struct starpu_task_bundle *bundle)
+{
+	/* Remove all entries from the bundle (which is likely to be empty) */
+	while (bundle->list)
+	{
+		struct starpu_task_bundle_entry *entry = bundle->list;
+		bundle->list = bundle->list->next;
+		free(entry);
+	}
+
+	PTHREAD_MUTEX_DESTROY(&bundle->mutex);
+
+	if (bundle->destroy)
+		free(bundle);
+}
+
+/* Insert a task into a bundle. */
+int starpu_task_bundle_insert(struct starpu_task_bundle *bundle, struct starpu_task *task)
+{
+	PTHREAD_MUTEX_LOCK(&bundle->mutex);
+
+	if (bundle->closed)
+	{
+		/* The bundle is closed, we cannot add tasks anymore */
+		PTHREAD_MUTEX_UNLOCK(&bundle->mutex);
+		return -EPERM;
+	}
+
+	/* Insert a task at the end of the bundle */
+	struct starpu_task_bundle_entry *entry;
+	entry = malloc(sizeof(struct starpu_task_bundle_entry));
+	STARPU_ASSERT(entry);
+	entry->task = task;
+	entry->next = NULL;
+
+	if (!bundle->list)
+	{
+		bundle->list = entry;
+	}
+	else {
+		struct starpu_task_bundle_entry *item;
+		item = bundle->list;
+		while (item->next)
+			item = item->next;
+
+		item->next = entry;
+	}
+
+	task->bundle = bundle;
+
+	PTHREAD_MUTEX_UNLOCK(&bundle->mutex);
+	return 0;
+}
+
+/* Remove a task from a bundle. This method must be called with bundle->mutex
+ * hold. This function returns 0 if the task was found, -ENOENT if the element
+ * was not found, 1 if the element is found and if the list was deinitialized
+ * because it became empty. */
+int starpu_task_bundle_remove(struct starpu_task_bundle *bundle, struct starpu_task *task)
+{
+	struct starpu_task_bundle_entry *item;
+
+	item = bundle->list;
+
+	if (!item)
+		return -ENOENT;
+
+	STARPU_ASSERT(task->bundle == bundle);
+	task->bundle = NULL;
+
+	if (item->task == task)
+	{
+		/* Remove the first element */
+		bundle->list = item->next;
+		free(item);
+
+		/* If the list is now empty, deinitialize the bundle */
+		if (bundle->closed && bundle->list == NULL)
+		{
+			PTHREAD_MUTEX_UNLOCK(&bundle->mutex);
+			starpu_task_bundle_deinit(bundle);
+			return 1;
+		}
+
+		return 0;
+	}
+
+	while (item->next)
+	{
+		struct starpu_task_bundle_entry *next;
+		next = item->next;
+
+		if (next->task == task)
+		{
+			/* Remove the next element */
+			item->next = next->next;
+			free(next);
+			return 0;
+		}
+
+		item = next;
+	}
+
+	/* We could not find the task in the bundle */
+	return -ENOENT;
+}
+
+/* Close a bundle. No task can be added to a closed bundle. A closed bundle
+ * automatically gets deinitialized when it becomes empty. */
+void starpu_task_bundle_close(struct starpu_task_bundle *bundle)
+{
+	PTHREAD_MUTEX_LOCK(&bundle->mutex);
+
+	/* If the bundle is already empty, we deinitialize it now. */
+	if (bundle->list == NULL)
+	{
+		PTHREAD_MUTEX_UNLOCK(&bundle->mutex);
+		starpu_task_bundle_deinit(bundle);
+		return;
+	}
+
+	/* Mark the bundle as closed */
+	bundle->closed = 1;
+
+	PTHREAD_MUTEX_UNLOCK(&bundle->mutex);
+
+}
+
+/* Return the expected duration of the entire task bundle in µs */
+double starpu_task_bundle_expected_length(struct starpu_task_bundle *bundle,  enum starpu_perf_archtype arch)
+{
+	double expected_length = 0.0;
+
+	/* We expect the length of the bundle the be the sum of the different tasks length. */
+	PTHREAD_MUTEX_LOCK(&bundle->mutex);
+
+	struct starpu_task_bundle_entry *entry;
+	entry = bundle->list;
+
+	while (entry) {
+		double task_length = starpu_task_expected_length(entry->task, arch);
+
+		/* In case the task is not calibrated, we consider the task
+		 * ends immediately. */
+		if (task_length > 0.0)
+			expected_length += task_length;
+
+		entry = entry->next;
+	}
+	
+	PTHREAD_MUTEX_UNLOCK(&bundle->mutex);
+
+	return expected_length;
+}
+
+/* Return the expected power consumption of the entire task bundle in J */
+double starpu_task_bundle_expected_power(struct starpu_task_bundle *bundle,  enum starpu_perf_archtype arch)
+{
+	double expected_power = 0.0;
+
+	/* We expect total consumption of the bundle the be the sum of the different tasks consumption. */
+	PTHREAD_MUTEX_LOCK(&bundle->mutex);
+
+	struct starpu_task_bundle_entry *entry;
+	entry = bundle->list;
+
+	while (entry) {
+		double task_power = starpu_task_expected_power(entry->task, arch);
+
+		/* In case the task is not calibrated, we consider the task
+		 * ends immediately. */
+		if (task_power > 0.0)
+			expected_power += task_power;
+
+		entry = entry->next;
+	}
+	
+	PTHREAD_MUTEX_UNLOCK(&bundle->mutex);
+
+	return expected_power;
+}
+
+struct handle_list {
+	starpu_data_handle handle;
+	starpu_access_mode mode;
+	struct handle_list *next;
+};
+
+static void insertion_handle_sorted(struct handle_list **listp, starpu_data_handle handle, starpu_access_mode mode)
+{
+	STARPU_ASSERT(listp);
+
+	struct handle_list *list = *listp;
+
+	if (!list || list->handle > handle)
+	{
+		/* We insert the first element of the list */
+		struct handle_list *link = malloc(sizeof(struct handle_list));
+		STARPU_ASSERT(link);
+		link->handle = handle;
+		link->mode = mode;
+		link->next = list;
+		*listp = link;
+		return;
+	}
+
+	/* Look for the element or a place to insert it. */
+	struct handle_list *prev = list;
+
+	while (list && (handle > list->handle))
+	{
+		prev = list;
+		list = list->next;
+	}
+
+	/* The element should be in prev or not in the list */
+
+	if (prev->handle == handle)
+	{
+		/* The handle is already in the list */
+		prev->mode |= mode;
+	}
+	else {
+		/* The handle was not in the list, we insert it after prev */
+		struct handle_list *link = malloc(sizeof(struct handle_list));
+		STARPU_ASSERT(link);
+		link->handle = handle;
+		link->mode = mode;
+		link->next = prev->next;
+		prev->next = link;
+	}
+}
+
+/* Return the time (in µs) expected to transfer all data used within the bundle */
+double starpu_task_bundle_expected_data_transfer_time(struct starpu_task_bundle *bundle, unsigned memory_node)
+{
+	PTHREAD_MUTEX_LOCK(&bundle->mutex);
+
+	struct handle_list *handles = NULL;
+
+	/* We list all the handle that are accessed within the bundle. */
+
+	/* For each task in the bundle */
+	struct starpu_task_bundle_entry *entry = bundle->list;
+	while (entry) {
+		struct starpu_task *task = entry->task;
+
+		if (task->cl)
+		{
+			unsigned b;
+			for (b = 0; b < task->cl->nbuffers; b++)
+			{
+				starpu_data_handle handle = task->buffers[b].handle;
+				starpu_access_mode mode = task->buffers[b].mode;
+
+				if (!(mode & STARPU_R))
+					continue;
+
+				/* Insert the handle in the sorted list in case
+				 * it's not already in that list. */
+				insertion_handle_sorted(&handles, handle, mode);
+			}
+		}
+
+		entry = entry->next;
+	}
+
+	/* Compute the sum of data transfer time, and destroy the list */
+
+	double total_exp = 0.0;
+
+	while (handles)
+	{
+		struct handle_list *current = handles;
+		handles = handles->next;
+
+		double exp;
+		exp = starpu_data_expected_transfer_time(current->handle, memory_node, current->mode);
+
+		total_exp += exp;
+
+		free(current);
+	}
+
+	PTHREAD_MUTEX_UNLOCK(&bundle->mutex);
+
+	return total_exp;
+}

+ 43 - 7
src/sched_policies/heft.c

@@ -22,6 +22,7 @@
 #include <core/workers.h>
 #include <core/perfmodel/perfmodel.h>
 #include <starpu_parameters.h>
+#include <starpu_task_bundle.h>
 
 static unsigned nworkers;
 
@@ -141,7 +142,8 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 					double *local_task_length, double *exp_end,
 					double *max_exp_endp, double *best_exp_endp,
 					double *local_data_penalty,
-					double *local_power, int *forced_best)
+					double *local_power, int *forced_best,
+					struct starpu_task_bundle *bundle)
 {
 	int calibrating = 0;
 	double max_exp_end = DBL_MIN;
@@ -168,10 +170,19 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 		}
 
 		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
-		local_task_length[worker] = starpu_task_expected_length(task, perf_arch);
-
 		unsigned memory_node = starpu_worker_get_memory_node(worker);
-		local_data_penalty[worker] = starpu_task_expected_data_transfer_time(memory_node, task);
+
+		if (bundle)
+		{
+			local_task_length[worker] = starpu_task_bundle_expected_length(bundle, perf_arch);
+			local_data_penalty[worker] = starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
+			local_power[worker] = starpu_task_bundle_expected_power(bundle, perf_arch);
+		}
+		else {
+			local_task_length[worker] = starpu_task_expected_length(task, perf_arch);
+			local_data_penalty[worker] = starpu_task_expected_data_transfer_time(memory_node, task);
+			local_power[worker] = starpu_task_expected_power(task, perf_arch);
+		}
 
 		double ntasks_end = ntasks[worker] / starpu_worker_get_relative_speedup(perf_arch);
 
@@ -206,7 +217,6 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 			best_exp_end = exp_end[worker];
 		}
 
-		local_power[worker] = starpu_task_expected_power(task, perf_arch);
 		if (local_power[worker] == -1.0)
 			local_power[worker] = 0.;
 	}
@@ -239,10 +249,12 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio)
 	 *	and detect if there is some calibration that needs to be done.
 	 */
 
+	struct starpu_task_bundle *bundle = task->bundle;
+
 	compute_all_performance_predictions(task, local_task_length, exp_end,
 					&max_exp_end, &best_exp_end,
 					local_data_penalty,
-					local_power, &forced_best);
+					local_power, &forced_best, bundle);
 
 	/* If there is no prediction available for that task with that arch we
 	 * want to speed-up calibration time so we force this measurement */
@@ -288,7 +300,31 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio)
 	STARPU_ASSERT(best != -1);
 	
 	/* we should now have the best worker in variable "best" */
-	double model_best = local_task_length[best];
+	double model_best;
+
+	if (bundle)
+	{
+		/* If we have a task bundle, we have computed the expected
+		 * length for the entire bundle, but not for the task alone. */
+		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(best);
+		model_best = starpu_task_expected_length(task, perf_arch);
+
+		/* Remove the task from the bundle since we have made a
+		 * decision for it, and that other tasks should not consider it
+		 * anymore. */
+		PTHREAD_MUTEX_LOCK(&bundle->mutex);
+		int ret = starpu_task_bundle_remove(bundle, task);
+
+		/* Perhaps the bundle was destroyed when removing the last
+		 * entry */
+		if (ret != 1)
+			PTHREAD_MUTEX_UNLOCK(&bundle->mutex);
+
+	}
+	else {
+		model_best = local_task_length[best];
+	}
+
 	return push_task_on_best_worker(task, best, model_best, prio);
 }