Browse Source

Take GPU transfers sequentialization into account in simgrid

Samuel Thibault 12 years ago
parent
commit
1997bc7549
1 changed files with 139 additions and 16 deletions
  1. 139 16
      src/core/simgrid.c

+ 139 - 16
src/core/simgrid.c

@@ -28,6 +28,8 @@
 #pragma weak starpu_main
 extern int starpu_main(int argc, char *argv[]);
 
+static struct starpu_conf conf;
+
 struct main_args {
 	int argc;
 	char **argv;
@@ -39,14 +41,14 @@ int do_starpu_main(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBU
 	return starpu_main(args->argc, args->argv);
 }
 
-static void bus_name(struct starpu_conf *conf, char *s, size_t size, int num)
+static void bus_name(char *s, size_t size, int num)
 {
 	if (!num)
 		snprintf(s, size, "RAM");
-	else if (num < conf->ncuda + 1)
+	else if (num < conf.ncuda + 1)
 		snprintf(s, size, "CUDA%d", num - 1);
 	else
-		snprintf(s, size, "OpenCL%d", num - conf->ncuda - 1);
+		snprintf(s, size, "OpenCL%d", num - conf.ncuda - 1);
 }
 
 #ifdef STARPU_DEVEL
@@ -72,7 +74,6 @@ int main(int argc, char **argv)
 	MSG_config("workstation/model", "ptask_L07");
 
 	/* Create platform file */
-	struct starpu_conf conf;
 	starpu_conf_init(&conf);
 	if ((!getenv("STARPU_NCPUS") && !getenv("STARPU_NCPU"))
 #ifdef STARPU_USE_CUDA
@@ -147,14 +148,14 @@ int main(int argc, char **argv)
 	for (i = 0; i < conf.ncuda + conf.nopencl + 1; i++)
 	{
 		char i_name[16];
-		bus_name(&conf, i_name, sizeof(i_name), i);
+		bus_name(i_name, sizeof(i_name), i);
 
 		for (j = 0; j < conf.ncuda + conf.nopencl + 1; j++)
 		{
 			char j_name[16];
 			if (j == i)
 				continue;
-			bus_name(&conf, j_name, sizeof(j_name), j);
+			bus_name(j_name, sizeof(j_name), j);
 			fprintf(file, "   <link id='%s-%s' bandwidth='%f' latency='%f'/>\n",
 				i_name, j_name,
 				_starpu_transfer_bandwidth(i, j) * 1000000,
@@ -165,14 +166,14 @@ int main(int argc, char **argv)
 	for (i = 0; i < conf.ncuda + conf.nopencl + 1; i++)
 	{
 		char i_name[16];
-		bus_name(&conf, i_name, sizeof(i_name), i);
+		bus_name(i_name, sizeof(i_name), i);
 
 		for (j = 0; j < conf.ncuda + conf.nopencl + 1; j++)
 		{
 			char j_name[16];
 			if (j == i)
 				continue;
-			bus_name(&conf, j_name, sizeof(j_name), j);
+			bus_name(j_name, sizeof(j_name), j);
 			fprintf(file,
 "   <route src='%s' dst='%s' symmetrical='NO'><link_ctn id='%s-%s'/><link_ctn id='Share'/></route>\n",
 				i_name, j_name, i_name, j_name);
@@ -203,6 +204,7 @@ int main(int argc, char **argv)
 	return 0;
 }
 
+/* Task execution submitted by StarPU */
 void _starpu_simgrid_execute_job(struct _starpu_job *j, enum starpu_perf_archtype perf_arch, double length)
 {
 	struct starpu_task *task = j->task;
@@ -227,28 +229,145 @@ void _starpu_simgrid_execute_job(struct _starpu_job *j, enum starpu_perf_archtyp
 	MSG_task_execute(simgrid_task);
 }
 
-struct transfer {
+/* Note: simgrid is not parallel, so there is no need to hold locks for management of transfers.  */
+LIST_TYPE(transfer,
 	msg_task_t task;
-	unsigned src_node;
-	unsigned dst_node;
+	int src_node;
+	int dst_node;
+	int run_node;
+
+	/* communication termination signalization */
 	unsigned *finished;
 	_starpu_pthread_mutex_t *mutex;
 	_starpu_pthread_cond_t *cond;
-};
 
+	/* transfers which wait for this transfer */
+	struct transfer **wake;
+	unsigned nwake;
+
+	/* Number of transfers that this transfer waits for */
+	unsigned nwait;
+)
+
+struct transfer_list *pending;
+
+/* Tell for two transfers whether they should be handled in sequence */
+static int transfers_are_sequential(struct transfer *new_transfer, struct transfer *old_transfer)
+{
+	int new_is_cuda STARPU_ATTRIBUTE_UNUSED, old_is_cuda STARPU_ATTRIBUTE_UNUSED;
+	int new_is_opencl STARPU_ATTRIBUTE_UNUSED, old_is_opencl STARPU_ATTRIBUTE_UNUSED;
+	int new_is_gpu_gpu, old_is_gpu_gpu;
+
+	new_is_cuda  = new_transfer->src_node >= 1 && new_transfer->src_node <= conf.ncuda;
+	new_is_cuda |= new_transfer->dst_node >= 1 && new_transfer->dst_node <= conf.ncuda;
+	old_is_cuda  = old_transfer->src_node >= 1 && old_transfer->src_node <= conf.ncuda;
+	old_is_cuda |= old_transfer->dst_node >= 1 && old_transfer->dst_node <= conf.ncuda;
+
+	new_is_opencl  = new_transfer->src_node > conf.ncuda && new_transfer->src_node <= conf.ncuda + conf.nopencl;
+	new_is_opencl |= new_transfer->dst_node > conf.ncuda && new_transfer->dst_node <= conf.ncuda + conf.nopencl;
+	old_is_opencl  = old_transfer->src_node > conf.ncuda && old_transfer->src_node <= conf.ncuda + conf.nopencl;
+	old_is_opencl |= old_transfer->dst_node > conf.ncuda && old_transfer->dst_node <= conf.ncuda + conf.nopencl;
+
+	new_is_gpu_gpu = new_transfer->src_node && new_transfer->dst_node;
+	old_is_gpu_gpu = old_transfer->src_node && old_transfer->dst_node;
+
+	/* We ignore cuda-opencl transfers, they can not happen */
+	STARPU_ASSERT(!((new_is_cuda && old_is_opencl) || (old_is_cuda && new_is_opencl)));
+
+	/* The following constraints have been observed with CUDA alone */
+
+	/* Same source/destination, sequential */
+	if (new_transfer->src_node == old_transfer->src_node && new_transfer->dst_node == old_transfer->dst_node)
+		return 1;
+
+	/* Crossed GPU-GPU, sequential */
+	if (new_is_gpu_gpu
+			&& new_transfer->src_node == old_transfer->dst_node
+			&& old_transfer->src_node == new_transfer->dst_node)
+		return 1;
+
+	/* GPU-GPU transfers are sequential with any RAM->GPU transfer */
+	if (new_is_gpu_gpu
+			&& old_transfer->dst_node == new_transfer->src_node
+			&& old_transfer->dst_node == new_transfer->dst_node)
+		return 1;
+	if (old_is_gpu_gpu
+			&& new_transfer->dst_node == old_transfer->src_node
+			&& new_transfer->dst_node == old_transfer->dst_node)
+		return 1;
+
+	/* These constraints come from StarPU */
+
+	/* StarPU uses one stream per direction */
+	/* RAM->GPU and GPU->RAM are already handled by "same source/destination" */
+
+	/* StarPU uses one stream per running GPU for GPU-GPU transfers */
+	if (new_is_gpu_gpu && old_is_gpu_gpu && new_transfer->run_node == old_transfer->run_node)
+		return 1;
+
+	return 0;
+}
+
+/* Actually execute the transfer, and then start transfers waiting for this one.  */
 static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
 {
-	struct transfer *transfer = MSG_process_get_data(MSG_process_self());
+	struct transfer *transfer = MSG_process_get_data(MSG_process_self()), **wakep;
 	MSG_task_execute(transfer->task);
 	MSG_task_destroy(transfer->task);
 	_STARPU_PTHREAD_MUTEX_LOCK(transfer->mutex);
 	*transfer->finished = 1;
 	_STARPU_PTHREAD_COND_BROADCAST(transfer->cond);
 	_STARPU_PTHREAD_MUTEX_UNLOCK(transfer->mutex);
-	free(transfer);
+
+	/* Wake transfers waiting for my termination */
+	for (wakep = &transfer->wake[0]; wakep < &transfer->wake[transfer->nwake]; wakep++) {
+		struct transfer *wake = *wakep;
+		STARPU_ASSERT(wake->nwait > 0);
+		wake->nwait--;
+		if (!wake->nwait) {
+			_STARPU_DEBUG("triggering transfer %p\n", wake);
+			MSG_process_create("transfer task", transfer_execute, wake, MSG_get_host_by_name("MAIN"));
+		}
+	}
+
+	free(transfer->wake);
+	transfer_list_erase(pending, transfer);
+	transfer_delete(transfer);
 	return 0;
 }
 
+/* Look for sequentialization between this transfer and pending transfers, and submit this one */
+static void transfer_submit(struct transfer *transfer) {
+	struct transfer *old;
+
+	if (!pending)
+		pending = transfer_list_new();
+
+	for (old  = transfer_list_begin(pending);
+	     old != transfer_list_end(pending);
+	     old  = transfer_list_next(old)) {
+		if (transfers_are_sequential(transfer, old)) {
+			_STARPU_DEBUG("transfer %p(%d->%d) waits for %p(%d->%d)\n",
+					transfer, transfer->src_node, transfer->dst_node,
+					old, old->src_node, old->dst_node);
+			/* Make new wait for the old */
+			transfer->nwait++;
+			/* Make old wake the new */
+			old->wake = realloc(old->wake, (old->nwake + 1) * sizeof(old->wake));
+			old->wake[old->nwake] = transfer;
+			old->nwake++;
+		}
+	}
+
+	transfer_list_push_front(pending, transfer);
+
+	if (!transfer->nwait) {
+		_STARPU_DEBUG("transfer %p waits for nobody, starting\n", transfer);
+		MSG_process_create("transfer task", transfer_execute, transfer, MSG_get_host_by_name("MAIN"));
+	}
+}
+
+/* Data transfer issued by StarPU */
 int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node, struct _starpu_data_request *req)
 {
 	msg_task_t task;
@@ -266,10 +385,11 @@ int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node,
 
 	task = MSG_parallel_task_create("copy", 2, hosts, computation, communication, NULL);
 
-	struct transfer *transfer = malloc(sizeof (*transfer));
+	struct transfer *transfer = transfer_new();
 	transfer->task = task;
 	transfer->src_node = src_node;
 	transfer->dst_node = dst_node;
+	transfer->run_node = _starpu_get_local_memory_node();
 
 	if (req) {
 		transfer->finished = &req->async_channel.event.finished;
@@ -284,11 +404,14 @@ int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node,
 	*transfer->finished = 0;
 	_STARPU_PTHREAD_MUTEX_INIT(transfer->mutex, NULL);
 	_STARPU_PTHREAD_COND_INIT(transfer->cond, NULL);
+	transfer->wake = NULL;
+	transfer->nwake = 0;
+	transfer->nwait = 0;
 
 	if (req)
 		_STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
 
-	MSG_process_create("transfer task", transfer_execute, transfer, MSG_get_host_by_name("MAIN"));
+	transfer_submit(transfer);
 	/* Note: from here, transfer might be already freed */
 
 	if (req) {