Bläddra i källkod

Use only one MSG process per worker for running tasks

Samuel Thibault 8 år sedan
förälder
incheckning
dc8374c8a3
3 ändrade filer med 84 tillägg och 36 borttagningar
  1. 77 33
      src/core/simgrid.c
  2. 3 2
      src/core/simgrid.h
  3. 4 1
      src/core/workers.c

+ 77 - 33
src/core/simgrid.c

@@ -43,7 +43,14 @@ extern int _starpu_mpi_simgrid_init(int argc, char *argv[]);
 static int simgrid_started;
 
 starpu_pthread_queue_t _starpu_simgrid_transfer_queue[STARPU_MAXNODES];
+
 starpu_pthread_queue_t _starpu_simgrid_task_queue[STARPU_NMAXWORKERS];
+static struct task *first_task[STARPU_NMAXWORKERS];
+static struct task *last_task[STARPU_NMAXWORKERS];
+static msg_sem_t worker_sem[STARPU_NMAXWORKERS];
+static msg_process_t worker_runner[STARPU_NMAXWORKERS];
+static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED);
+static int runners_running;
 
 /* In case the MPI application didn't use smpicc to build the file containing
  * main(), try to cope by calling starpu_main */
@@ -272,7 +279,7 @@ static void maestro(void *data STARPU_ATTRIBUTE_UNUSED)
 	MSG_main();
 }
 
-void _starpu_simgrid_init(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv STARPU_ATTRIBUTE_UNUSED)
+void _starpu_simgrid_init_early(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv STARPU_ATTRIBUTE_UNUSED)
 {
 #ifdef HAVE_MSG_PROCESS_ATTACH
 	if (!simgrid_started && !(smpi_main && smpi_simulated_main_ != _starpu_smpi_simulated_main_))
@@ -292,7 +299,6 @@ void _starpu_simgrid_init(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv STARPU
 	}
 #endif
 
-	unsigned i;
 	if (!simgrid_started && !starpu_main && !(smpi_main && smpi_simulated_main_ != _starpu_smpi_simulated_main_))
 	{
 		_STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main\n");
@@ -304,14 +310,50 @@ void _starpu_simgrid_init(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv STARPU
 #endif
 		MSG_process_set_data(MSG_process_self(), calloc(MAX_TSD, sizeof(void*)));
 	}
+	unsigned i;
 	for (i = 0; i < STARPU_MAXNODES; i++)
 		starpu_pthread_queue_init(&_starpu_simgrid_transfer_queue[i]);
 	for (i = 0; i < STARPU_NMAXWORKERS; i++)
 		starpu_pthread_queue_init(&_starpu_simgrid_task_queue[i]);
 }
 
+void _starpu_simgrid_init(void)
+{
+	unsigned i;
+	runners_running = 1;
+	for (i = 0; i < starpu_worker_get_count(); i++)
+	{
+		char s[32];
+		snprintf(s, sizeof(s), "worker %u runner", i);
+		void **tsd = calloc(MAX_TSD+1, sizeof(void*));
+		worker_sem[i] = MSG_sem_init(0);
+		tsd[0] = (void*)(uintptr_t) i;
+		worker_runner[i] = MSG_process_create_with_arguments(s, task_execute, tsd, _starpu_simgrid_get_host_by_worker(_starpu_get_worker_struct(i)), 0, NULL);
+	}
+}
+
 void _starpu_simgrid_deinit(void)
 {
+	unsigned i;
+	runners_running = 0;
+	for (i = 0; i < STARPU_MAXNODES; i++)
+	{
+		/* FIXME: queue not empty at this point, needs proper unregistration */
+		/* starpu_pthread_queue_destroy(&_starpu_simgrid_transfer_queue[i]); */
+	}
+	for (i = 0; i < starpu_worker_get_count(); i++)
+	{
+		MSG_sem_release(worker_sem[i]);
+#if SIMGRID_VERSION_MAJOR > 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR >= 14)
+		MSG_process_join(worker_runner[i], 1000000);
+#else
+		MSG_process_sleep(1);
+#endif
+		MSG_sem_destroy(worker_sem[i]);
+		starpu_pthread_queue_destroy(&_starpu_simgrid_task_queue[i]);
+		STARPU_ASSERT(first_task[i] == NULL);
+		STARPU_ASSERT(last_task[i] == NULL);
+	}
 #ifdef HAVE_MSG_PROCESS_ATTACH
 	if (simgrid_started == 2)
 	{
@@ -329,47 +371,51 @@ void _starpu_simgrid_deinit(void)
 struct task
 {
 	msg_task_t task;
-	int workerid;
 
 	/* communication termination signalization */
 	unsigned *finished;
 	starpu_pthread_mutex_t *mutex;
 	starpu_pthread_cond_t *cond;
 
-	/* Task which waits for this task */
+	/* Next task on this worker */
 	struct task *next;
 };
 
-static struct task *last_task[STARPU_NMAXWORKERS];
-
 /* Actually execute the task.  */
 static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
 {
 	/* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
 	MSG_process_sleep(0.000001);
 
-	struct task *task = starpu_pthread_getspecific(0);
-	_STARPU_DEBUG("task %p started\n", task);
-	MSG_task_execute(task->task);
-	MSG_task_destroy(task->task);
-	_STARPU_DEBUG("task %p finished\n", task);
-	STARPU_PTHREAD_MUTEX_LOCK(task->mutex);
-	*task->finished = 1;
-	STARPU_PTHREAD_COND_BROADCAST(task->cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(task->mutex);
-
-	/* The worker which started this task may be sleeping out of tasks, wake it  */
-	starpu_wake_worker(task->workerid);
-
-	if (last_task[task->workerid] == task)
-		last_task[task->workerid] = NULL;
-	if (task->next)
-	{
-		void **tsd = calloc(MAX_TSD+1, sizeof(void*));
-		tsd[0] = task->next;
-		MSG_process_create_with_arguments("task", task_execute, tsd, MSG_host_self(), 0, NULL);
+	unsigned workerid = (uintptr_t) starpu_pthread_getspecific(0);
+	_STARPU_DEBUG("worker %u started\n", workerid);
+	while (1) {
+		struct task *task;
+
+		MSG_sem_acquire(worker_sem[workerid]);
+		if (!runners_running)
+			break;
+
+		task = first_task[workerid];
+		first_task[workerid] = task->next;
+		if (last_task[workerid] == task)
+			last_task[workerid] = NULL;
+
+		_STARPU_DEBUG("task %p started\n", task);
+		MSG_task_execute(task->task);
+		MSG_task_destroy(task->task);
+		_STARPU_DEBUG("task %p finished\n", task);
+
+		STARPU_PTHREAD_MUTEX_LOCK(task->mutex);
+		*task->finished = 1;
+		STARPU_PTHREAD_COND_BROADCAST(task->cond);
+		STARPU_PTHREAD_MUTEX_UNLOCK(task->mutex);
+		/* The worker which started this task may be sleeping out of tasks, wake it  */
+		starpu_wake_worker(workerid);
+
+		free(task);
 	}
-	/* Task is freed with process context */
+	_STARPU_DEBUG("worker %u stopped\n", workerid);
 	return 0;
 }
 
@@ -430,7 +476,6 @@ void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *j, struct star
 		struct task *task;
 		_STARPU_MALLOC(task, sizeof(*task));
 		task->task = simgrid_task;
-		task->workerid = workerid;
 		task->finished = finished;
 		*finished = 0;
 		task->mutex = mutex;
@@ -441,18 +486,17 @@ void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *j, struct star
 			MSG_process_sleep(0.000010);
 		if (last_task[workerid])
 		{
-			/* Make this task depend on the previous */
+			/* Already running a task, queue */
 			last_task[workerid]->next = task;
 			last_task[workerid] = task;
 		}
 		else
 		{
-			void **tsd;
+			STARPU_ASSERT(!first_task[workerid]);
+			first_task[workerid] = task;
 			last_task[workerid] = task;
-			tsd = calloc(MAX_TSD+1, sizeof(void*));
-			tsd[0] = task;
-			MSG_process_create_with_arguments("task", task_execute, tsd, MSG_host_self(), 0, NULL);
 		}
+		MSG_sem_release(worker_sem[workerid]);
 	}
 }
 

+ 3 - 2
src/core/simgrid.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012-2016  Université de Bordeaux
+ * Copyright (C) 2012-2017  Université de Bordeaux
  * Copyright (C) 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -38,7 +38,8 @@ struct _starpu_pthread_args
 #define STARPU_MPI_AS_PREFIX "StarPU-MPI"
 #define _starpu_simgrid_running_smpi() (getenv("SMPI_GLOBAL_SIZE") != NULL)
 
-void _starpu_simgrid_init(int *argc, char ***argv);
+void _starpu_simgrid_init_early(int *argc, char ***argv);
+void _starpu_simgrid_init(void);
 void _starpu_simgrid_deinit(void);
 void _starpu_simgrid_wait_tasks(int workerid);
 void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *job, struct starpu_perfmodel_arch* perf_arch, double length, unsigned *finished, starpu_pthread_mutex_t *mutex, starpu_pthread_cond_t *cond);

+ 4 - 1
src/core/workers.c

@@ -1192,7 +1192,7 @@ int starpu_initialize(struct starpu_conf *user_conf, int *argc, char ***argv)
 
 #ifdef STARPU_SIMGRID
 	/* This initializes the simgrid thread library, thus needs to be early */
-	_starpu_simgrid_init(argc, argv);
+	_starpu_simgrid_init_early(argc, argv);
 #endif
 
 	STARPU_PTHREAD_MUTEX_LOCK(&init_mutex);
@@ -1414,6 +1414,9 @@ int starpu_initialize(struct starpu_conf *user_conf, int *argc, char ***argv)
 #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
 	_starpu_cuda_init();
 #endif
+#ifdef STARPU_SIMGRID
+	_starpu_simgrid_init();
+#endif
 	/* Launch "basic" workers (ie. non-combined workers) */
 	if (!is_a_sink)
 		_starpu_launch_drivers(&_starpu_config);