Prechádzať zdrojové kódy

Integrate changes for starpu-simgrid-mpi: make TSD use process data instead of host data, since we may have several process on the same host, and pass process argument through argument instead of process data, and duplicate it.

Samuel Thibault 10 rokov pred
rodič
commit
204c00b4bf
6 zmenil súbory, kde vykonal 49 pridanie a 47 odobranie
  1. 1 0
      configure.ac
  2. 6 4
      mpi/src/starpu_mpi.c
  3. 19 5
      src/common/thread.c
  4. 2 0
      src/common/utils.c
  5. 19 38
      src/core/simgrid.c
  6. 2 0
      src/core/simgrid.h

+ 1 - 0
configure.ac

@@ -1061,6 +1061,7 @@ if test x$enable_simgrid = xyes ; then
 	AC_CHECK_HEADERS([simgrid/msg.h], [AC_DEFINE([STARPU_HAVE_SIMGRID_MSG_H], [1], [Define to 1 if you have msg.h in simgrid/.])])
    	AC_CHECK_FUNCS([MSG_process_join MSG_get_as_by_name MSG_environment_get_routing_root xbt_mutex_try_acquire])
 	AC_CHECK_FUNCS([xbt_barrier_init], [AC_DEFINE([STARPU_SIMGRID_HAVE_XBT_BARRIER_INIT], [1], [Define to 1 if you have the `xbt_barrier_init' function.])])
+	AC_CHECK_FUNCS([SIMIX_process_get_code], [AC_DEFINE([STARPU_SIMGRID_HAVE_SIMIX_PROCESS_GET_CODE], [1], [Define to 1 if you have the `SIMIX_process_get_code' function.])])
 	AC_COMPILE_IFELSE([AC_LANG_PROGRAM(
 		    		[[
 #ifdef STARPU_HAVE_SIMGRID_MSG_H

+ 6 - 4
mpi/src/starpu_mpi.c

@@ -1273,7 +1273,11 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	_mpi_world_size = worldsize;
 	_mpi_world_rank = rank;
 	/* Now that MPI is set up, let the rest of simgrid get initialized */
-	MSG_process_create_with_arguments("main", smpi_simulated_main_, NULL, _starpu_simgrid_get_host_by_name("MAIN"), *(argc_argv->argc), *(argc_argv->argv));
+	char ** argv_cpy = malloc(*(argc_argv->argc) * sizeof(char*));
+	int i;
+	for (i = 0; i < *(argc_argv->argc); i++)
+		argv_cpy[i] = strdup(*(argc_argv->argv)[i]);
+	MSG_process_create_with_arguments("main", smpi_simulated_main_, NULL, _starpu_simgrid_get_host_by_name("MAIN"), *(argc_argv->argc), argv_cpy);
 #endif
 
 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
@@ -1570,10 +1574,9 @@ int _starpu_mpi_simgrid_init(int argc, char *argv[])
 }
 #endif
 
-int starpu_mpi_init_comm(int *argc, char ***argv, int initialize_mpi, MPI_Comm comm)
+int starpu_mpi_init_comm(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv STARPU_ATTRIBUTE_UNUSED, int initialize_mpi STARPU_ATTRIBUTE_UNUSED, MPI_Comm comm STARPU_ATTRIBUTE_UNUSED)
 {
 #ifdef STARPU_SIMGRID
-	STARPU_MPI_ASSERT_MSG(initialize_mpi, "application has to let StarPU initialize MPI");
 	return 0;
 #else
 	return _starpu_mpi_initialize(argc, argv, initialize_mpi, comm);
@@ -1588,7 +1591,6 @@ int starpu_mpi_init(int *argc, char ***argv, int initialize_mpi)
 int starpu_mpi_initialize(void)
 {
 #ifdef STARPU_SIMGRID
-	STARPU_MPI_ASSERT_MSG(0, "application has to let StarPU initialize MPI");
 	return 0;
 #else
 	return _starpu_mpi_initialize(NULL, NULL, 0, MPI_COMM_WORLD);

+ 19 - 5
src/common/thread.c

@@ -21,6 +21,7 @@
 
 #ifdef STARPU_SIMGRID
 #include <xbt/synchro_core.h>
+#include <smpi/smpi.h>
 #else
 
 #if defined(STARPU_LINUX_SYS) && defined(STARPU_HAVE_XCHG)
@@ -50,7 +51,7 @@ int starpu_pthread_create_on(char *name, starpu_pthread_t *thread, const starpu_
 	_args->arg = arg;
 	if (!host)
 		host = MSG_get_host_by_name("MAIN");
-	*thread = MSG_process_create(name, _starpu_simgrid_thread_start, _args, host);
+	*thread = MSG_process_create_with_arguments(name, _starpu_simgrid_thread_start, calloc(MAX_TSD, sizeof(void*)), host, 0, (char **) _args);
 	return 0;
 }
 
@@ -192,17 +193,30 @@ int starpu_pthread_key_delete(starpu_pthread_key_t key)
 
 int starpu_pthread_setspecific(starpu_pthread_key_t key, const void *pointer)
 {
-	void **array = MSG_host_get_data(MSG_host_self());
+	void **array;
+#ifdef STARPU_SIMGRID_HAVE_SIMIX_PROCESS_GET_CODE
+	if (SIMIX_process_get_code() == _starpu_mpi_simgrid_init)
+		/* Special-case the SMPI process */
+		array = smpi_process_get_user_data();
+	else
+#endif
+		array = MSG_process_get_data(MSG_process_self());
 	array[key] = (void*) pointer;
 	return 0;
 }
 
 void* starpu_pthread_getspecific(starpu_pthread_key_t key)
 {
-	msg_host_t host = MSG_host_self();
-	if (!host)
+	void **array;
+#ifdef STARPU_SIMGRID_HAVE_SIMIX_PROCESS_GET_CODE
+	if (SIMIX_process_get_code() == _starpu_mpi_simgrid_init)
+		/* Special-case the SMPI process */
+		array = smpi_process_get_user_data();
+	else
+#endif
+		array = MSG_process_get_data(MSG_process_self());
+	if (!array)
 		return NULL;
-	void **array = MSG_host_get_data(host);
 	return array[key];
 }
 

+ 2 - 0
src/common/utils.c

@@ -284,6 +284,7 @@ void _starpu_sleep(struct timespec ts)
 
 char *starpu_getenv(const char *str)
 {
+#ifndef STARPU_SIMGRID
 	struct _starpu_worker * worker;
 
 	worker = _starpu_get_local_worker_key();
@@ -292,5 +293,6 @@ char *starpu_getenv(const char *str)
 	if (worker && worker->worker_is_initialized)
 		_STARPU_DISP( "getenv should not be called from running workers, only for main() or worker initialization, since it is not reentrant\n");
 #endif
+#endif
 	return getenv(str);
 }

+ 19 - 38
src/core/simgrid.c

@@ -42,9 +42,9 @@ struct main_args
 	char **argv;
 };
 
-int do_starpu_main(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
+int do_starpu_main(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[])
 {
-	struct main_args *args = MSG_process_get_data(MSG_process_self());
+	struct main_args *args = (void*) argv;
 	return starpu_main(args->argc, args->argv);
 }
 
@@ -215,8 +215,10 @@ int main(int argc, char **argv)
 	_starpu_simgrid_get_platform_path(path, sizeof(path));
 	MSG_create_environment(path);
 
-	struct main_args args = { .argc = argc, .argv = argv };
-	MSG_process_create("main", &do_starpu_main, &args, MSG_get_host_by_name("MAIN"));
+	struct main_args *args = malloc(sizeof(*args));
+	args->argc = argc;
+	args->argv = argv;
+	MSG_process_create_with_arguments("main", &do_starpu_main, calloc(MAX_TSD, sizeof(void*)), MSG_get_host_by_name("MAIN"), 0, (char**) args);
 
 	MSG_main();
 	return 0;
@@ -224,32 +226,11 @@ int main(int argc, char **argv)
 
 void _starpu_simgrid_init()
 {
-	xbt_dynar_t hosts;
-	int i;
-
 	if (!starpu_main && !(smpi_main && smpi_simulated_main_))
 	{
 		_STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h included, to properly rename it into starpu_main\n");
 		exit(EXIT_FAILURE);
 	}
-
-#ifdef HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT
-	if (_starpu_simgrid_running_smpi())
-	{
-		char asname[32];
-		STARPU_ASSERT(starpu_mpi_world_rank);
-		snprintf(asname, sizeof(asname), STARPU_MPI_AS_PREFIX"%u", starpu_mpi_world_rank());
-		hosts = MSG_environment_as_get_hosts(_starpu_simgrid_get_as_by_name(asname));
-	}
-	else
-#endif /* HAVE_MSG_ENVIRONMENT_GET_ROUTING_ROOT */
-		hosts = MSG_hosts_as_dynar();
-
-	int nb = xbt_dynar_length(hosts);
-	for (i = 0; i < nb; i++)
-		MSG_host_set_data(xbt_dynar_get_as(hosts, i, msg_host_t), calloc(MAX_TSD, sizeof(void*)));
-
-	xbt_dynar_free(&hosts);
 }
 
 /*
@@ -273,9 +254,9 @@ struct task
 static struct task *last_task[STARPU_NMAXWORKERS];
 
 /* Actually execute the task.  */
-static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
+static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[])
 {
-	struct task *task = MSG_process_get_data(MSG_process_self());
+	struct task *task = (void*) argv;
 	_STARPU_DEBUG("task %p started\n", task);
 	MSG_task_execute(task->task);
 	MSG_task_destroy(task->task);
@@ -291,8 +272,8 @@ static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_AT
 	if (last_task[task->workerid] == task)
 		last_task[task->workerid] = NULL;
 	if (task->next)
-		MSG_process_create("task", task_execute, task->next, MSG_host_self());
-	free(task);
+		MSG_process_create_with_arguments("task", task_execute, calloc(MAX_TSD, sizeof(void*)), MSG_host_self(), 0, (char**) task->next);
+	/* Task is freed with process context */
 	return 0;
 }
 
@@ -366,7 +347,7 @@ void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *j, struct star
 		else
 		{
 			last_task[workerid] = task;
-			MSG_process_create("task", task_execute, task, MSG_host_self());
+			MSG_process_create_with_arguments("task", task_execute, calloc(MAX_TSD, sizeof(void*)), MSG_host_self(), 0, (char**) task);
 		}
 	}
 }
@@ -449,9 +430,9 @@ static int transfers_are_sequential(struct transfer *new_transfer, struct transf
 }
 
 /* Actually execute the transfer, and then start transfers waiting for this one.  */
-static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
+static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[])
 {
-	struct transfer *transfer = MSG_process_get_data(MSG_process_self());
+	struct transfer *transfer = (void*) argv;
 	unsigned i;
 	_STARPU_DEBUG("transfer %p started\n", transfer);
 	MSG_task_execute(transfer->task);
@@ -476,13 +457,13 @@ static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARP
 		if (!wake->nwait)
 		{
 			_STARPU_DEBUG("triggering transfer %p\n", wake);
-			MSG_process_create("transfer task", transfer_execute, wake, _starpu_simgrid_get_host_by_name("MAIN"));
+			MSG_process_create_with_arguments("transfer task", transfer_execute, calloc(MAX_TSD, sizeof(void*)), _starpu_simgrid_get_host_by_name("MAIN"), 0, (char**) wake);
 		}
 	}
 
 	free(transfer->wake);
 	transfer_list_erase(&pending, transfer);
-	transfer_delete(transfer);
+	/* transfer is freed with process context */
 	return 0;
 }
 
@@ -514,7 +495,7 @@ static void transfer_submit(struct transfer *transfer)
 	if (!transfer->nwait)
 	{
 		_STARPU_DEBUG("transfer %p waits for nobody, starting\n", transfer);
-		MSG_process_create("transfer task", transfer_execute, transfer, _starpu_simgrid_get_host_by_name("MAIN"));
+		MSG_process_create_with_arguments("transfer task", transfer_execute, calloc(MAX_TSD, sizeof(void*)), _starpu_simgrid_get_host_by_name("MAIN"), 0, (char**) transfer);
 	}
 }
 
@@ -592,11 +573,11 @@ int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node,
 }
 
 int
-_starpu_simgrid_thread_start(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
+_starpu_simgrid_thread_start(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[])
 {
-	struct _starpu_pthread_args *_args = MSG_process_get_data(MSG_process_self());
+	struct _starpu_pthread_args *_args = (void*) argv;
 	struct _starpu_pthread_args args = *_args;
-	free(_args);
+	/* _args is freed with process context */
 	args.f(args.arg);
 	return 0;
 }

+ 2 - 0
src/core/simgrid.h

@@ -51,6 +51,8 @@ void _starpu_simgrid_get_platform_path(char *path, size_t maxlen);
 msg_as_t _starpu_simgrid_get_as_by_name(const char *name);
 #pragma weak starpu_mpi_world_rank
 extern int starpu_mpi_world_rank(void);
+#pragma weak _starpu_mpi_simgrid_init
+int _starpu_mpi_simgrid_init(int argc, char *argv[]);
 
 #define _starpu_simgrid_cuda_malloc_cost() starpu_get_env_number_default("STARPU_SIMGRID_CUDA_MALLOC_COST", 1)
 #define _starpu_simgrid_queue_malloc_cost() starpu_get_env_number_default("STARPU_SIMGRID_QUEUE_MALLOC_COST", 1)