Browse Source

Include data transfer time in lp linear problem

Samuel Thibault 12 years ago
parent
commit
04dae47ae3

+ 4 - 0
doc/chapters/advanced-examples.texi

@@ -471,6 +471,10 @@ of @code{lp_solve}. For instance, we often just use
 @code{lp_solve -cc -B1 -Bb -Bg -Bp -Bf -Br -BG -Bd -Bs -BB -Bo -Bc -Bi} , and
 the @code{-gr} option can also be quite useful.
 
+Data transfer time can only be taken into account when @code{deps} is set. Only
+data transfers inferred from implicit data dependencies between tasks are taken
+into account.
+
 Setting @code{deps} to 0 will only take into account the actual computations
 on processing units. It however still properly takes into account the varying
 performances of kernels and processing units, which is quite more accurate than

+ 12 - 22
src/core/dependencies/implicit_data_deps.c

@@ -28,22 +28,21 @@
 # define _STARPU_DEP_DEBUG(fmt, args ...)
 #endif
 
-static void _starpu_add_ayudame_ghost_dependency(starpu_data_handle_t handle STARPU_ATTRIBUTE_UNUSED, unsigned long previous STARPU_ATTRIBUTE_UNUSED, struct starpu_task *next STARPU_ATTRIBUTE_UNUSED)
+static void _starpu_add_ghost_dependency(starpu_data_handle_t handle STARPU_ATTRIBUTE_UNUSED, unsigned long previous STARPU_ATTRIBUTE_UNUSED, struct starpu_task *next STARPU_ATTRIBUTE_UNUSED)
 {
+	struct _starpu_job *next_job = _starpu_get_job_associated_to_task(next);
+	_starpu_bound_job_id_dep(handle, next_job, previous);
 #ifdef HAVE_AYUDAME_H
 	if (AYU_event) {
 		uintptr_t AYU_data[3] = { previous, (uintptr_t) handle, (uintptr_t) handle };
-		AYU_event(AYU_ADDDEPENDENCY, _starpu_get_job_associated_to_task(next)->job_id, AYU_data);
+		AYU_event(AYU_ADDDEPENDENCY, next_job->job_id, AYU_data);
 	}
 #endif
 }
 
-static void _starpu_add_ayudame_dependency(starpu_data_handle_t handle STARPU_ATTRIBUTE_UNUSED, struct starpu_task *previous STARPU_ATTRIBUTE_UNUSED, struct starpu_task *next STARPU_ATTRIBUTE_UNUSED)
+static void _starpu_add_dependency(starpu_data_handle_t handle STARPU_ATTRIBUTE_UNUSED, struct starpu_task *previous STARPU_ATTRIBUTE_UNUSED, struct starpu_task *next STARPU_ATTRIBUTE_UNUSED)
 {
-#ifdef HAVE_AYUDAME_H
-	if (AYU_event)
-		_starpu_add_ayudame_ghost_dependency(handle, _starpu_get_job_associated_to_task(previous)->job_id, next);
-#endif
+	_starpu_add_ghost_dependency(handle, _starpu_get_job_associated_to_task(previous)->job_id, next);
 }
 
 /* Read after Write (RAW) or Read after Read (RAR) */
@@ -61,7 +60,7 @@ static void _starpu_add_reader_after_writer(starpu_data_handle_t handle, struct
 		_STARPU_DEP_DEBUG("RAW %p\n", handle);
 		struct starpu_task *task_array[1] = {handle->last_submitted_writer};
 		_starpu_task_declare_deps_array(pre_sync_task, 1, task_array, 0);
-		_starpu_add_ayudame_dependency(handle, handle->last_submitted_writer, pre_sync_task);
+		_starpu_add_dependency(handle, handle->last_submitted_writer, pre_sync_task);
 		_STARPU_DEP_DEBUG("dep %p -> %p\n", handle->last_submitted_writer, pre_sync_task);
 	}
         else
@@ -86,8 +85,7 @@ static void _starpu_add_reader_after_writer(starpu_data_handle_t handle, struct
 	{
 		struct _starpu_job *pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task);
 		_STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_writer_id, pre_sync_job->job_id);
-		_starpu_bound_job_id_dep(pre_sync_job, handle->last_submitted_ghost_writer_id);
-		_starpu_add_ayudame_ghost_dependency(handle, handle->last_submitted_ghost_writer_id, pre_sync_task);
+		_starpu_add_ghost_dependency(handle, handle->last_submitted_ghost_writer_id, pre_sync_task);
 		_STARPU_DEP_DEBUG("dep ID%lu -> %p\n", handle->last_submitted_ghost_writer_id, pre_sync_task);
 	}
 
@@ -121,7 +119,7 @@ static void _starpu_add_writer_after_readers(starpu_data_handle_t handle, struct
 			STARPU_ASSERT(l->task);
 			if (l->task != post_sync_task) {
 				task_array[i++] = l->task;
-				_starpu_add_ayudame_dependency(handle, l->task, pre_sync_task);
+				_starpu_add_dependency(handle, l->task, pre_sync_task);
 				_STARPU_DEP_DEBUG("dep %p -> %p\n", l->task, pre_sync_task);
 			}
 
@@ -143,8 +141,7 @@ static void _starpu_add_writer_after_readers(starpu_data_handle_t handle, struct
 		{
 			unsigned long id = ghost_readers_id->id;
 			_STARPU_TRACE_GHOST_TASK_DEPS(id, pre_sync_job->job_id);
-			_starpu_bound_job_id_dep(pre_sync_job, id);
-			_starpu_add_ayudame_ghost_dependency(handle, id, pre_sync_task);
+			_starpu_add_ghost_dependency(handle, id, pre_sync_task);
 			_STARPU_DEP_DEBUG("dep ID%lu -> %p\n", id, pre_sync_task);
 
 			struct _starpu_jobid_list *prev = ghost_readers_id;
@@ -170,13 +167,7 @@ static void _starpu_add_writer_after_writer(starpu_data_handle_t handle, struct
 	{
 		struct starpu_task *task_array[1] = {handle->last_submitted_writer};
 		_starpu_task_declare_deps_array(pre_sync_task, 1, task_array, 0);
-#ifdef HAVE_AYUDAME_H
-		if (AYU_event) {
-			int64_t AYU_data[3] = { _starpu_get_job_associated_to_task(handle->last_submitted_writer)->job_id, (int64_t) handle, (int64_t) handle };
-			AYU_event(AYU_ADDDEPENDENCY, _starpu_get_job_associated_to_task(pre_sync_task)->job_id, AYU_data);
-		}
-#endif
-		_starpu_add_ayudame_dependency(handle, handle->last_submitted_writer, pre_sync_task);
+		_starpu_add_dependency(handle, handle->last_submitted_writer, pre_sync_task);
 		_STARPU_DEP_DEBUG("dep %p -> %p\n", handle->last_submitted_writer, pre_sync_task);
 	}
         else
@@ -195,8 +186,7 @@ static void _starpu_add_writer_after_writer(starpu_data_handle_t handle, struct
 		{
 			struct _starpu_job *pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task);
 			_STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_writer_id, pre_sync_job->job_id);
-			_starpu_bound_job_id_dep(pre_sync_job, handle->last_submitted_ghost_writer_id);
-			_starpu_add_ayudame_ghost_dependency(handle, handle->last_submitted_ghost_writer_id, pre_sync_task);
+			_starpu_add_ghost_dependency(handle, handle->last_submitted_ghost_writer_id, pre_sync_task);
 			_STARPU_DEP_DEBUG("dep ID%lu -> %p\n", handle->last_submitted_ghost_writer_id, pre_sync_task);
 			handle->last_submitted_ghost_writer_id_is_valid = 0;
 		}

+ 110 - 22
src/profiling/bound.c

@@ -49,7 +49,7 @@ struct bound_task_pool
 {
 	/* Which codelet has been executed */
 	struct starpu_codelet *cl;
-	/* Task footprint key */
+	/* Task footprint key (for history-based perfmodel) */
 	uint32_t footprint;
 	/* Number of tasks of this kind */
 	unsigned long n;
@@ -75,9 +75,13 @@ struct bound_task_pool
  * - For each task pair and each worker, if both tasks are executed by that worker,
  *   one is started after the other's completion.
  */
-/* Note: only task-task, implicit data dependencies or task-tag dependencies
- * are taken into account. Tags released in a callback or something like this
- * is not taken into account, only tags associated with a task are. */
+struct task_dep
+{
+	/* Task this depends on */
+	struct bound_task *dep;
+	/* Data transferred between tasks (i.e. implicit data dep size) */
+	size_t size;
+};
 struct bound_task
 {
 	/* Unique ID */
@@ -92,7 +96,7 @@ struct bound_task
 	/* Task priority */
 	int priority;
 	/* Tasks this one depends on */
-	struct bound_task **deps;
+	struct task_dep *deps;
 	int depsn;
 
 	/* Estimated duration */
@@ -118,6 +122,7 @@ static int recordprio;
 
 static _starpu_pthread_mutex_t mutex = _STARPU_PTHREAD_MUTEX_INITIALIZER;
 
+/* Initialization */
 void starpu_bound_start(int deps, int prio)
 {
 	struct bound_task_pool *tp;
@@ -164,6 +169,7 @@ void starpu_bound_start(int deps, int prio)
 	}
 }
 
+/* Whether we will include it in the computation */
 static int good_job(struct _starpu_job *j)
 {
 	/* No codelet, nothing to measure */
@@ -181,6 +187,8 @@ static int good_job(struct _starpu_job *j)
 	return 1;
 }
 
+/* Create a new task (either because it has just been submitted, or a
+ * dependency was added before submission) */
 static void new_task(struct _starpu_job *j)
 {
 	struct bound_task *t;
@@ -203,6 +211,7 @@ static void new_task(struct _starpu_job *j)
 	tasks = t;
 }
 
+/* A new task was submitted, record it */
 void _starpu_bound_record(struct _starpu_job *j)
 {
 	if (!_starpu_bound_recording)
@@ -253,6 +262,7 @@ void _starpu_bound_record(struct _starpu_job *j)
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 }
 
+/* A tag dependency was emitted, record it */
 void _starpu_bound_tag_dep(starpu_tag_t id, starpu_tag_t dep_id)
 {
 	struct bound_tag_dep *td;
@@ -276,9 +286,11 @@ void _starpu_bound_tag_dep(starpu_tag_t id, starpu_tag_t dep_id)
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 }
 
+/* A task dependency was emitted, record it */
 void _starpu_bound_task_dep(struct _starpu_job *j, struct _starpu_job *dep_j)
 {
 	struct bound_task *t;
+	int i;
 
 	if (!_starpu_bound_recording || !recorddeps)
 		return;
@@ -297,11 +309,20 @@ void _starpu_bound_task_dep(struct _starpu_job *j, struct _starpu_job *dep_j)
 	new_task(j);
 	new_task(dep_j);
 	t = j->bound_task;
-	t->deps = (struct bound_task **) realloc(t->deps, ++t->depsn * sizeof(t->deps[0]));
-	t->deps[t->depsn-1] = dep_j->bound_task;
+	for (i = 0; i < t->depsn; i++)
+		if (t->deps[i].dep == dep_j->bound_task)
+			break;
+	if (i == t->depsn)
+	{
+		/* Not already there, add */
+		t->deps = (struct task_dep *) realloc(t->deps, ++t->depsn * sizeof(t->deps[0]));
+		t->deps[t->depsn-1].dep = dep_j->bound_task;
+		t->deps[t->depsn-1].size = 0; /* We don't have data information in that case */
+	}
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 }
 
+/* Look for job with id ID among our tasks */
 static struct bound_task *find_job(unsigned long id)
 {
 	struct bound_task *t;
@@ -312,9 +333,11 @@ static struct bound_task *find_job(unsigned long id)
 	return NULL;
 }
 
-void _starpu_bound_job_id_dep(struct _starpu_job *j, unsigned long id)
+/* Job J depends on previous job of id ID (which is already finished) */
+void _starpu_bound_job_id_dep(starpu_data_handle_t handle, struct _starpu_job *j, unsigned long id)
 {
 	struct bound_task *t, *dep_t;
+	int i;
 
 	if (!_starpu_bound_recording || !recorddeps)
 		return;
@@ -339,8 +362,20 @@ void _starpu_bound_job_id_dep(struct _starpu_job *j, unsigned long id)
 		return;
 	}
 	t = j->bound_task;
-	t->deps = (struct bound_task **) realloc(t->deps, ++t->depsn * sizeof(t->deps[0]));
-	t->deps[t->depsn-1] = dep_t;
+	for (i = 0; i < t->depsn; i++)
+		if (t->deps[i].dep == dep_t)
+		{
+			/* Found, just add size */
+			t->deps[i].size += _starpu_data_get_size(handle);
+			break;
+		}
+	if (i == t->depsn)
+	{
+		/* Not already there, add */
+		t->deps = (struct task_dep *) realloc(t->deps, ++t->depsn * sizeof(t->deps[0]));
+		t->deps[t->depsn-1].dep = dep_t;
+		t->deps[t->depsn-1].size = _starpu_data_get_size(handle);
+	}
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 }
 
@@ -351,6 +386,7 @@ void starpu_bound_stop(void)
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 }
 
+/* Compute all tasks times on all workers */
 static void _starpu_get_tasks_times(int nw, int nt, double *times)
 {
 	struct bound_task_pool *tp;
@@ -374,19 +410,21 @@ static void _starpu_get_tasks_times(int nw, int nt, double *times)
 	}
 }
 
+/* Return whether PARENT is an ancestor of CHILD */
 static int ancestor(struct bound_task *child, struct bound_task *parent)
 {
 	int i;
 	for (i = 0; i < child->depsn; i++)
 	{
-		if (parent == child->deps[i])
+		if (parent == child->deps[i].dep)
 			return 1;
-		if (ancestor(child->deps[i], parent))
+		if (ancestor(child->deps[i].dep, parent))
 			return -1;
 	}
 	return 0;
 }
 
+/* Print bound recording in .dot format */
 void starpu_bound_print_dot(FILE *output)
 {
 	struct bound_task *t;
@@ -403,7 +441,7 @@ void starpu_bound_print_dot(FILE *output)
 	{
 		fprintf(output, "\"t%lu\" [label=\"%lu: %s\"]\n", t->id, t->id, _starpu_codelet_get_model_name(t->cl));
 		for (i = 0; i < t->depsn; i++)
-			fprintf(output, "\"t%lu\" -> \"t%lu\"\n", t->deps[i]->id, t->id);
+			fprintf(output, "\"t%lu\" -> \"t%lu\"\n", t->deps[i].dep->id, t->id);
 	}
 	for (td = tag_deps; td; td = td->next)
 		fprintf(output, "\"tag%lu\" -> \"tag%lu\";\n", (unsigned long) td->dep_tag, (unsigned long) td->tag);
@@ -411,7 +449,7 @@ void starpu_bound_print_dot(FILE *output)
 }
 
 /*
- * lp_solve format
+ * Print bound system in lp_solve format
  *
  * When dependencies are enabled, you can check the set of tasks and deps that
  * were recorded by using tools/lp2paje and vite.
@@ -420,7 +458,9 @@ void starpu_bound_print_lp(FILE *output)
 {
 	int nt; /* Number of different kinds of tasks */
 	int nw; /* Number of different workers */
-	int t, w;
+	int t;
+	int w, w2; /* worker */
+	unsigned n, n2;
 
 	_STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 	nw = starpu_worker_get_count();
@@ -494,11 +534,57 @@ void starpu_bound_print_lp(FILE *output)
 			fprintf(output, ";\n");
 		}
 
-		fprintf(output, "\n/* Each task starts after all its task dependencies finish. */\n");
+		fprintf(output, "\n/* Each task starts after all its task dependencies finish and data is transferred. */\n");
 		fprintf(output, "/* Note that the dependency finish time depends on the worker where it's working */\n");
 		for (t1 = tasks; t1; t1 = t1->next)
-			for (i = 0; i < t1->depsn; i++)
-				fprintf(output, "s%lu >= c%lu;\n", t1->id, t1->deps[i]->id);
+			for (i = 0; i < t1->depsn; i++) {
+				fprintf(output, "/* %lu bytes transferred */\n", (unsigned long) t1->deps[i].size);
+				fprintf(output, "s%lu >= c%lu", t1->id, t1->deps[i].dep->id);
+				/* Transfer time: pick up one source node and a worker on it */
+				for (n = 0; n < starpu_memory_nodes_get_count(); n++)
+				for (w = 0; w < nw; w++)
+				if (starpu_worker_get_memory_node(w) == n)
+				{
+					/* pick up another destination node and a worker on it */
+					for (n2 = 0; n2 < starpu_memory_nodes_get_count(); n2++)
+					if (n2 != n)
+					{
+						for (w2 = 0; w2 < nw; w2++)
+						if (starpu_worker_get_memory_node(w2) == n2)
+						{
+							/* If predecessor is on worker w and successor
+							 * on worker w2 on different nodes, we need to
+							 * transfer the data. */
+							fprintf(output, " + d_t%luw%ut%luw%u", t1->deps[i].dep->id, w, t1->id, w2);
+
+						}
+					}
+				}
+				fprintf(output, ";\n");
+				/* Transfer time: pick up one source node and a worker on it */
+				for (n = 0; n < starpu_memory_nodes_get_count(); n++)
+				for (w = 0; w < nw; w++)
+				if (starpu_worker_get_memory_node(w) == n)
+				{
+					/* pick up another destination node and a worker on it */
+					for (n2 = 0; n2 < starpu_memory_nodes_get_count(); n2++)
+					if (n2 != n)
+					{
+						for (w2 = 0; w2 < nw; w2++)
+						if (starpu_worker_get_memory_node(w2) == n2)
+						{
+							/* The data transfer is at least 0ms */
+							fprintf(output, "d_t%luw%ut%luw%u >= 0;\n", t1->deps[i].dep->id, w, t1->id, w2);
+							/* The data transfer from w to w2 only happens if tasks run there */
+							fprintf(output, "d_t%luw%ut%luw%u >= %f - 2e5 + 1e5 t%luw%u + 1e5 t%luw%u;\n",
+									t1->deps[i].dep->id, w, t1->id, w2,
+									_starpu_predict_transfer_time(n, n2, t1->deps[i].size)/1000.,
+									t1->deps[i].dep->id, w, t1->id, w2);
+						}
+					}
+				}
+			}
+
 
 		fprintf(output, "\n/* Each tag finishes when its corresponding task finishes */");
 		for (t1 = tasks; t1; t1 = t1->next)
@@ -573,7 +659,7 @@ void starpu_bound_print_lp(FILE *output)
 
 							for (i = 0; i < t1->depsn; i++)
 							{
-								fprintf(output, "c%lu - s%lu >= ", t1->deps[i]->id, t2->id);
+								fprintf(output, "c%lu - s%lu >= ", t1->deps[i].dep->id, t2->id);
 								if (t1->depsn > 1)
 									/* Only checks this when it's this dependency that is chosen */
 									fprintf(output, "-2e5 + 1e5 t%lut%lud%d", t2->id, t1->id, i);
@@ -602,7 +688,7 @@ void starpu_bound_print_lp(FILE *output)
 
 							for (i = 0; i < t2->depsn; i++)
 							{
-								fprintf(output, "c%lu - s%lu >= ", t2->deps[i]->id, t1->id);
+								fprintf(output, "c%lu - s%lu >= ", t2->deps[i].dep->id, t1->id);
 								if (t2->depsn > 1)
 									/* Only checks this when it's this dependency that is chosen */
 									fprintf(output, "-1e5 + 1e5 t%lut%lud%d", t1->id, t2->id, i);
@@ -719,7 +805,7 @@ void starpu_bound_print_lp(FILE *output)
 }
 
 /*
- * MPS output format
+ * Print bound system in MPS output format
  */
 void starpu_bound_print_mps(FILE *output)
 {
@@ -799,7 +885,7 @@ void starpu_bound_print_mps(FILE *output)
 }
 
 /*
- * GNU Linear Programming Kit backend
+ * Solve bound system thanks to GNU Linear Programming Kit backend
  */
 #ifdef HAVE_GLPK_H
 static glp_prob *_starpu_bound_glp_resolve(int integer)
@@ -934,6 +1020,7 @@ static glp_prob *_starpu_bound_glp_resolve(int integer)
 }
 #endif /* HAVE_GLPK_H */
 
+/* Print the computed bound as well as the optimized distribution of tasks */
 void starpu_bound_print(FILE *output, int integer __attribute__ ((unused)))
 {
 #ifdef HAVE_GLPK_H
@@ -984,6 +1071,7 @@ void starpu_bound_print(FILE *output, int integer __attribute__ ((unused)))
 #endif /* HAVE_GLPK_H */
 }
 
+/* Compute and return the bound */
 void starpu_bound_compute(double *res, double *integer_res __attribute__ ((unused)), int integer __attribute__ ((unused)))
 {
 #ifdef HAVE_GLPK_H

+ 2 - 2
src/profiling/bound.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010  Université de Bordeaux 1
+ * Copyright (C) 2010, 2012  Université de Bordeaux 1
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -34,6 +34,6 @@ extern void _starpu_bound_tag_dep(starpu_tag_t id, starpu_tag_t dep_id);
 extern void _starpu_bound_task_dep(struct _starpu_job *j, struct _starpu_job *dep_j);
 
 /* Record job id dependency: j depends on job_id */
-extern void _starpu_bound_job_id_dep(struct _starpu_job *dep_j, unsigned long job_id);
+extern void _starpu_bound_job_id_dep(starpu_data_handle_t handle, struct _starpu_job *dep_j, unsigned long job_id);
 
 #endif // __BOUND_H__