Pārlūkot izejas kodu

Use only one MSG process per transfer stream

Samuel Thibault 8 gadi atpakaļ
vecāks
revīzija
24d0a076ab
1 mainītis faili ar 112 papildinājumiem un 36 dzēšanām
  1. 112 36
      src/core/simgrid.c

+ 112 - 36
src/core/simgrid.c

@@ -42,10 +42,16 @@ extern int _starpu_mpi_simgrid_init(int argc, char *argv[]);
 
 static int simgrid_started;
 
+static int runners_running;
 starpu_pthread_queue_t _starpu_simgrid_transfer_queue[STARPU_MAXNODES];
+static struct transfer_runner {
+	struct transfer *first_transfer, *last_transfer;
+	msg_sem_t sem;
+	msg_process_t runner;
+} transfer_runner[STARPU_MAXNODES][STARPU_MAXNODES];
+static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED);
 
 starpu_pthread_queue_t _starpu_simgrid_task_queue[STARPU_NMAXWORKERS];
-static int runners_running;
 static struct worker_runner {
 	struct task *first_task, *last_task;
 	msg_sem_t sem;
@@ -335,10 +341,26 @@ void _starpu_simgrid_init(void)
 
 void _starpu_simgrid_deinit(void)
 {
-	unsigned i;
+	unsigned i, j;
 	runners_running = 0;
 	for (i = 0; i < STARPU_MAXNODES; i++)
 	{
+		for (j = 0; j < STARPU_MAXNODES; j++)
+		{
+			struct transfer_runner *t = &transfer_runner[i][j];
+			if (t->runner)
+			{
+				MSG_sem_release(t->sem);
+#if SIMGRID_VERSION_MAJOR > 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR >= 14)
+				MSG_process_join(t->runner, 1000000);
+#else
+				MSG_process_sleep(1);
+#endif
+				STARPU_ASSERT(t->first_transfer == NULL);
+				STARPU_ASSERT(t->last_transfer == NULL);
+				MSG_sem_destroy(t->sem);
+			}
+		}
 		/* FIXME: queue not empty at this point, needs proper unregistration */
 		/* starpu_pthread_queue_destroy(&_starpu_simgrid_transfer_queue[i]); */
 	}
@@ -351,9 +373,9 @@ void _starpu_simgrid_deinit(void)
 #else
 		MSG_process_sleep(1);
 #endif
-		MSG_sem_destroy(w->sem);
 		STARPU_ASSERT(w->first_task == NULL);
 		STARPU_ASSERT(w->last_task == NULL);
+		MSG_sem_destroy(w->sem);
 		starpu_pthread_queue_destroy(&_starpu_simgrid_task_queue[i]);
 	}
 #ifdef HAVE_MSG_PROCESS_ATTACH
@@ -390,7 +412,7 @@ static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_AT
 	unsigned workerid = (uintptr_t) starpu_pthread_getspecific(0);
 	struct worker_runner *w = &worker_runner[workerid];
 
-	_STARPU_DEBUG("worker %u started\n", workerid);
+	_STARPU_DEBUG("worker runner %u started\n", workerid);
 	while (1) {
 		struct task *task;
 
@@ -525,6 +547,9 @@ LIST_TYPE(transfer,
 
 	/* Number of transfers that this transfer waits for */
 	unsigned nwait;
+
+	/* Next transfer on this stream */
+	struct transfer *next;
 )
 
 struct transfer_list pending;
@@ -580,44 +605,97 @@ static int transfers_are_sequential(struct transfer *new_transfer, struct transf
 	return 0;
 }
 
+static void transfer_queue(struct transfer *transfer)
+{
+	unsigned src = transfer->src_node;
+	unsigned dst = transfer->dst_node;
+	struct transfer_runner *t = &transfer_runner[src][dst];
+
+	if (!t->runner)
+	{
+		/* No runner yet, start it */
+		static starpu_pthread_mutex_t mutex; /* process_create may yield */
+		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+		if (!t->runner)
+		{
+			char s[64];
+			snprintf(s, sizeof(s), "transfer %u-%u runner", src, dst);
+			void **tsd = calloc(MAX_TSD+1, sizeof(void*));
+			tsd[0] = (void*)(uintptr_t)((src<<16) + dst);
+			t->runner = MSG_process_create_with_arguments(s, transfer_execute, tsd, _starpu_simgrid_get_memnode_host(src), 0, NULL);
+			t->sem = MSG_sem_init(0);
+		}
+		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	}
+
+	if (t->last_transfer)
+	{
+		/* Already running a transfer, queue */
+		t->last_transfer->next = transfer;
+		t->last_transfer = transfer;
+	}
+	else
+	{
+		STARPU_ASSERT(!t->first_transfer);
+		t->first_transfer = transfer;
+		t->last_transfer = transfer;
+	}
+	MSG_sem_release(t->sem);
+}
+
 /* Actually execute the transfer, and then start transfers waiting for this one.  */
 static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
 {
 	/* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
 	MSG_process_sleep(0.000001);
 
-	struct transfer *transfer = starpu_pthread_getspecific(0);
-	unsigned i;
-	_STARPU_DEBUG("transfer %p started\n", transfer);
-	MSG_task_execute(transfer->task);
-	MSG_task_destroy(transfer->task);
-	_STARPU_DEBUG("transfer %p finished\n", transfer);
-	*transfer->finished = 1;
-
-	/* The workers which started this request may be sleeping out of tasks, wake it  */
-	_starpu_wake_all_blocked_workers_on_node(transfer->run_node);
-
-	/* Wake transfers waiting for my termination */
-	/* Note: due to possible preemption inside process_create, the array
-	 * may grow while doing this */
-	for (i = 0; i < transfer->nwake; i++)
-	{
-		struct transfer *wake = transfer->wake[i];
-		STARPU_ASSERT(wake->nwait > 0);
-		wake->nwait--;
-		if (!wake->nwait)
+	unsigned src_dst = (uintptr_t) starpu_pthread_getspecific(0);
+	unsigned src = src_dst >> 16;
+	unsigned dst = src_dst & 0xffff;
+	struct transfer_runner *t = &transfer_runner[src][dst];
+
+	_STARPU_DEBUG("transfer runner %u-%u started\n", src, dst);
+	while (1) {
+		struct transfer *transfer;
+
+		MSG_sem_acquire(t->sem);
+		if (!runners_running)
+			break;
+		transfer = t->first_transfer;
+		t->first_transfer = transfer->next;
+		if (t->last_transfer == transfer)
+			t->last_transfer = NULL;
+
+		_STARPU_DEBUG("transfer %p started\n", transfer);
+		MSG_task_execute(transfer->task);
+		MSG_task_destroy(transfer->task);
+		_STARPU_DEBUG("transfer %p finished\n", transfer);
+
+		*transfer->finished = 1;
+		transfer_list_erase(&pending, transfer);
+
+		/* The workers which started this request may be sleeping out of tasks, wake it  */
+		_starpu_wake_all_blocked_workers_on_node(transfer->run_node);
+
+		unsigned i;
+		/* Wake transfers waiting for my termination */
+		/* Note: due to possible preemption inside process_create, the array
+		 * may grow while doing this */
+		for (i = 0; i < transfer->nwake; i++)
 		{
-			void **tsd;
-			_STARPU_DEBUG("triggering transfer %p\n", wake);
-			tsd = calloc(MAX_TSD+1, sizeof(void*));
-			tsd[0] = wake;
-			MSG_process_create_with_arguments("transfer task", transfer_execute, tsd, _starpu_simgrid_get_host_by_name("MAIN"), 0, NULL);
+			struct transfer *wake = transfer->wake[i];
+			STARPU_ASSERT(wake->nwait > 0);
+			wake->nwait--;
+			if (!wake->nwait)
+			{
+				_STARPU_DEBUG("triggering transfer %p\n", wake);
+				transfer_queue(wake);
+			}
 		}
+		free(transfer->wake);
+		free(transfer);
 	}
 
-	free(transfer->wake);
-	transfer_list_erase(&pending, transfer);
-	/* transfer is freed with process context */
 	return 0;
 }
 
@@ -648,11 +726,8 @@ static void transfer_submit(struct transfer *transfer)
 
 	if (!transfer->nwait)
 	{
-		void **tsd;
 		_STARPU_DEBUG("transfer %p waits for nobody, starting\n", transfer);
-		tsd = calloc(MAX_TSD+1, sizeof(void*));
-		tsd[0] = transfer;
-		MSG_process_create_with_arguments("transfer task", transfer_execute, tsd, _starpu_simgrid_get_host_by_name("MAIN"), 0, NULL);
+		transfer_queue(transfer);
 	}
 }
 
@@ -724,6 +799,7 @@ int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node,
 	transfer->wake = NULL;
 	transfer->nwake = 0;
 	transfer->nwait = 0;
+	transfer->next = NULL;
 
 	if (req)
 		_STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);