浏览代码

simgrid: Simplify per-thread data

Factorize msg process creation, make some use starpu_pthread_create_on,
use sg_actor_data to simplify getting the TSD.
Samuel Thibault 5 年之前
父节点
当前提交
dfa2933a72
共有 6 个文件被更改,包括 59 次插入29 次删除
  1. 1 1
      configure.ac
  2. 1 0
      include/starpu_thread.h
  3. 3 1
      mpi/src/mpi/starpu_mpi_mpi.c
  4. 3 1
      mpi/src/nmad/starpu_mpi_nmad.c
  5. 16 0
      src/common/thread.c
  6. 35 26
      src/core/simgrid.c

+ 1 - 1
configure.ac

@@ -185,7 +185,7 @@ if test x$enable_simgrid = xyes ; then
 	AC_CHECK_TYPES([smx_actor_t], [AC_DEFINE([STARPU_HAVE_SMX_ACTOR_T], [1], [Define to 1 if you have the smx_actor_t type.])], [], [[#include <simgrid/simix.h>]])
 
 	# Latest functions
-	AC_CHECK_FUNCS([MSG_process_attach MSG_zone_get_hosts MSG_process_self_name MSG_process_userdata_init])
+	AC_CHECK_FUNCS([MSG_process_attach sg_actor_attach MSG_zone_get_hosts MSG_process_self_name MSG_process_userdata_init sg_actor_data])
 	AC_CHECK_FUNCS([xbt_mutex_try_acquire smpi_process_set_user_data sg_zone_get_by_name sg_link_name sg_host_route sg_host_self sg_host_speed simcall_process_create sg_config_continue_after_help])
 	AC_CHECK_FUNCS([xbt_barrier_init], [AC_DEFINE([STARPU_SIMGRID_HAVE_XBT_BARRIER_INIT], [1], [Define to 1 if you have the `xbt_barrier_init' function.])])
 	AC_CHECK_FUNCS([sg_actor_sleep_for sg_actor_self sg_actor_ref sg_host_get_properties sg_host_send_to sg_cfg_set_int sg_actor_self_execute simgrid_get_clock])

+ 1 - 0
include/starpu_thread.h

@@ -83,6 +83,7 @@ int starpu_pthread_equal(starpu_pthread_t t1, starpu_pthread_t t2);
 starpu_pthread_t starpu_pthread_self(void);
 int starpu_pthread_create_on(char *name, starpu_pthread_t *thread, const starpu_pthread_attr_t *attr, void *(*start_routine) (void *), void *arg, starpu_sg_host_t host);
 int starpu_pthread_create(starpu_pthread_t *thread, const starpu_pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
+starpu_pthread_t _starpu_simgrid_actor_create(const char *name, xbt_main_func_t code, starpu_sg_host_t host, int argc, char *argv[]);
 int starpu_pthread_join(starpu_pthread_t thread, void **retval);
 int starpu_pthread_exit(void *retval) STARPU_ATTRIBUTE_NORETURN;
 int starpu_pthread_attr_init(starpu_pthread_attr_t *attr);

+ 3 - 1
mpi/src/mpi/starpu_mpi_mpi.c

@@ -1150,7 +1150,8 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	int i;
 	for (i = 0; i < *(argc_argv->argc); i++)
 		argv_cpy[i] = strdup((*(argc_argv->argv))[i]);
-	MSG_process_create_with_arguments("main", smpi_simulated_main_, NULL, _starpu_simgrid_get_host_by_name("MAIN"), *(argc_argv->argc), argv_cpy);
+	_starpu_simgrid_actor_create("main", smpi_simulated_main_, _starpu_simgrid_get_host_by_name("MAIN"), *(argc_argv->argc), argv_cpy);
+#ifndef HAVE_SG_ACTOR_DATA
 	/* And set TSD for us */
 	void **tsd;
 	_STARPU_CALLOC(tsd, MAX_TSD + 1, sizeof(void*));
@@ -1159,6 +1160,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		_STARPU_ERROR("Your version of simgrid does not provide smpi_process_set_user_data, we can not continue without it\n");
 	}
 	smpi_process_set_user_data(tsd);
+#endif
         /* And wait for StarPU to get initialized, to come back to the same
          * situation as native execution where that's always the case. */
 	starpu_wait_initialized();

+ 3 - 1
mpi/src/nmad/starpu_mpi_nmad.c

@@ -503,7 +503,8 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	int i;
 	for (i = 0; i < *(argc_argv->argc); i++)
 		argv_cpy[i] = strdup((*(argc_argv->argv))[i]);
-	MSG_process_create_with_arguments("main", smpi_simulated_main_, NULL, _starpu_simgrid_get_host_by_name("MAIN"), *(argc_argv->argc), argv_cpy);
+	_starpu_simgrid_actor_create("main", smpi_simulated_main_, _starpu_simgrid_get_host_by_name("MAIN"), *(argc_argv->argc), argv_cpy);
+#ifndef HAVE_SG_ACTOR_DATA
 	/* And set TSD for us */
 	void **tsd;
 	_STARPU_CALLOC(tsd, MAX_TSD + 1, sizeof(void*));
@@ -513,6 +514,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	}
 	smpi_process_set_user_data(tsd);
 #endif
+#endif
 
 	_starpu_mpi_comm_amounts_init(argc_argv->comm);
 	_starpu_mpi_cache_init(argc_argv->comm);

+ 16 - 0
src/common/thread.c

@@ -86,7 +86,11 @@ int starpu_pthread_create_on(char *name, starpu_pthread_t *thread, const starpu_
 		host = MSG_get_host_by_name("MAIN");
 #endif
 	void *tsd;
+#ifdef HAVE_SG_ACTOR_DATA
+	tsd = NULL;
+#else
 	_STARPU_CALLOC(tsd, MAX_TSD+1, sizeof(void*));
+#endif
 	*thread = MSG_process_create_with_arguments(name, _starpu_simgrid_thread_start, tsd, host, 2, _args);
 #if SIMGRID_VERSION >= 31500 && SIMGRID_VERSION != 31559
 #  ifdef HAVE_SG_ACTOR_REF
@@ -300,6 +304,13 @@ extern void *smpi_process_get_user_data();
 int starpu_pthread_setspecific(starpu_pthread_key_t key, const void *pointer)
 {
 	void **array;
+#ifdef HAVE_SG_ACTOR_DATA
+	array = sg_actor_data(sg_actor_self());
+	if (!array) {
+		_STARPU_CALLOC(array, MAX_TSD + 1, sizeof(void*));
+		sg_actor_data_set(sg_actor_self(), array);
+	}
+#else
 #if defined(HAVE_SMPI_PROCESS_SET_USER_DATA) || defined(smpi_process_get_user_data)
 #if defined(HAVE_MSG_PROCESS_SELF_NAME) || defined(MSG_process_self_name)
 	const char *process_name = MSG_process_self_name();
@@ -316,6 +327,7 @@ int starpu_pthread_setspecific(starpu_pthread_key_t key, const void *pointer)
 	else
 #endif
 		array = MSG_process_get_data(MSG_process_self());
+#endif
 	array[key] = (void*) pointer;
 	return 0;
 }
@@ -323,6 +335,9 @@ int starpu_pthread_setspecific(starpu_pthread_key_t key, const void *pointer)
 void* starpu_pthread_getspecific(starpu_pthread_key_t key)
 {
 	void **array;
+#ifdef HAVE_SG_ACTOR_DATA
+	array = sg_actor_data(sg_actor_self());
+#else
 #if defined(HAVE_SMPI_PROCESS_SET_USER_DATA) || defined(smpi_process_get_user_data)
 #if defined(HAVE_MSG_PROCESS_SELF_NAME) || defined(MSG_process_self_name)
 	const char *process_name = MSG_process_self_name();
@@ -339,6 +354,7 @@ void* starpu_pthread_getspecific(starpu_pthread_key_t key)
 	else
 #endif
 		array = MSG_process_get_data(MSG_process_self());
+#endif
 	if (!array)
 		return NULL;
 	return array[key];

+ 35 - 26
src/core/simgrid.c

@@ -68,7 +68,7 @@ static struct transfer_runner
 	starpu_sem_t sem;
 	starpu_pthread_t runner;
 } transfer_runner[STARPU_MAXNODES][STARPU_MAXNODES];
-static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED);
+static void *transfer_execute(void *arg);
 
 starpu_pthread_queue_t _starpu_simgrid_task_queue[STARPU_NMAXWORKERS];
 static struct worker_runner
@@ -77,7 +77,7 @@ static struct worker_runner
 	starpu_sem_t sem;
 	starpu_pthread_t runner;
 } worker_runner[STARPU_NMAXWORKERS];
-static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED);
+static void *task_execute(void *arg);
 
 #if defined(HAVE_SG_ZONE_GET_BY_NAME) || defined(sg_zone_get_by_name)
 #define HAVE_STARPU_SIMGRID_GET_AS_BY_NAME
@@ -368,11 +368,9 @@ int main(int argc, char **argv)
 	int i;
 	for (i = 0; i < argc; i++)
 		argv_cpy[i] = strdup(argv[i]);
-	void **tsd;
-	_STARPU_CALLOC(tsd, MAX_TSD+1, sizeof(void*));
 
 	/* Run the application in a separate thread */
-	MSG_process_create_with_arguments("main", &do_starpu_main, tsd, _starpu_simgrid_get_host_by_name("MAIN"), argc, argv_cpy);
+	_starpu_simgrid_actor_create("main", &do_starpu_main, _starpu_simgrid_get_host_by_name("MAIN"), argc, argv_cpy);
 
 	/* And run maestro in the main thread */
 	MSG_main();
@@ -392,7 +390,7 @@ void _starpu_simgrid_init_early(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv
 #ifdef HAVE_SG_CONFIG_CONTINUE_AFTER_HELP
 	sg_config_continue_after_help();
 #endif
-#if defined(HAVE_MSG_PROCESS_ATTACH) || defined(MSG_process_attach)
+#if defined(HAVE_MSG_PROCESS_ATTACH) || defined(MSG_process_attach) || defined(HAVE_SG_ACTOR_ATTACH)
 	if (simgrid_started < 2 && !_starpu_simgrid_running_smpi())
 	{
 		/* "Cannot create_maestro with this ContextFactory.
@@ -413,8 +411,16 @@ void _starpu_simgrid_init_early(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv
 		_starpu_start_simgrid(argc, *argv);
 		/* And attach the main thread to the main simgrid process */
 		void **tsd;
+#ifdef HAVE_SG_ACTOR_DATA
+		tsd = NULL;
+#else
 		_STARPU_CALLOC(tsd, MAX_TSD+1, sizeof(void*));
+#endif
+#ifdef HAVE_SG_ACTOR_ATTACH
+		sg_actor_attach("main", tsd, _starpu_simgrid_get_host_by_name("MAIN"), NULL);
+#else
 		MSG_process_attach("main", tsd, _starpu_simgrid_get_host_by_name("MAIN"), NULL);
+#endif
 		/* We initialized through MSG_process_attach */
 		simgrid_started = 3;
 	}
@@ -431,7 +437,7 @@ void _starpu_simgrid_init_early(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv
 #ifndef STARPU_STATIC_ONLY
 		_STARPU_ERROR("Simgrid currently does not support privatization for dynamically-linked libraries in SMPI. Please reconfigure and build StarPU with --disable-shared");
 #endif
-#ifdef HAVE_MSG_PROCESS_USERDATA_INIT
+#if defined(HAVE_MSG_PROCESS_USERDATA_INIT) && !defined(HAVE_SG_ACTOR_DATA)
 		MSG_process_userdata_init();
 #endif
 		void **tsd;
@@ -454,21 +460,22 @@ void _starpu_simgrid_init(void)
 	{
 		char s[32];
 		snprintf(s, sizeof(s), "worker %u runner", i);
-		void **tsd;
-		_STARPU_CALLOC(tsd, MAX_TSD+1, sizeof(void*));
 		starpu_sem_init(&worker_runner[i].sem, 0, 0);
-		tsd[0] = (void*)(uintptr_t) i;
-		worker_runner[i].runner = MSG_process_create_with_arguments(s, task_execute, tsd, _starpu_simgrid_get_host_by_worker(_starpu_get_worker_struct(i)), 0, NULL);
+		starpu_pthread_create_on(s, &worker_runner[i].runner, NULL, task_execute, (void*)(uintptr_t) i, _starpu_simgrid_get_host_by_worker(_starpu_get_worker_struct(i)));
 	}
 }
 
 void _starpu_simgrid_deinit_late(void)
 {
-#if defined(HAVE_MSG_PROCESS_ATTACH) || defined(MSG_process_attach)
+#if defined(HAVE_MSG_PROCESS_ATTACH) || defined(MSG_process_attach) || defined(HAVE_SG_ACTOR_ATTACH)
 	if (simgrid_started == 3)
 	{
 		/* Started with MSG_process_attach, now detach */
+#ifdef HAVE_SG_ACTOR_ATTACH
+		sg_actor_detach();
+#else
 		MSG_process_detach();
+#endif
 		simgrid_started = 0;
 	}
 #endif
@@ -553,12 +560,9 @@ struct task
 };
 
 /* Actually execute the task.  */
-static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
+static void *task_execute(void *arg)
 {
-	/* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
-	starpu_sleep(0.000001);
-
-	unsigned workerid = (uintptr_t) STARPU_PTHREAD_GETSPECIFIC(0);
+	unsigned workerid = (uintptr_t) arg;
 	struct worker_runner *w = &worker_runner[workerid];
 
 	_STARPU_DEBUG("worker runner %u started\n", workerid);
@@ -799,10 +803,7 @@ static void transfer_queue(struct transfer *transfer)
 		{
 			char s[64];
 			snprintf(s, sizeof(s), "transfer %u-%u runner", src, dst);
-			void **tsd;
-			_STARPU_CALLOC(tsd, MAX_TSD+1, sizeof(void*));
-			tsd[0] = (void*)(uintptr_t)((src<<16) + dst);
-			t->runner = MSG_process_create_with_arguments(s, transfer_execute, tsd, _starpu_simgrid_get_memnode_host(src), 0, NULL);
+			starpu_pthread_create_on(s, &t->runner, NULL, transfer_execute, (void*)(uintptr_t)((src<<16) + dst), _starpu_simgrid_get_memnode_host(src));
 			starpu_sem_init(&t->sem, 0, 0);
 		}
 		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
@@ -824,12 +825,9 @@ static void transfer_queue(struct transfer *transfer)
 }
 
 /* Actually execute the transfer, and then start transfers waiting for this one.  */
-static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
+static void *transfer_execute(void *arg)
 {
-	/* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
-	starpu_sleep(0.000001);
-
-	unsigned src_dst = (uintptr_t) STARPU_PTHREAD_GETSPECIFIC(0);
+	unsigned src_dst = (uintptr_t) arg;
 	unsigned src = src_dst >> 16;
 	unsigned dst = src_dst & 0xffff;
 	struct transfer_runner *t = &transfer_runner[src][dst];
@@ -1105,6 +1103,17 @@ _starpu_simgrid_thread_start(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[])
 	return 0;
 }
 
+starpu_pthread_t _starpu_simgrid_actor_create(const char *name, xbt_main_func_t code, starpu_sg_host_t host, int argc, char *argv[])
+{
+	void **tsd;
+#ifdef HAVE_SG_ACTOR_DATA
+	tsd = NULL;
+#else
+	_STARPU_CALLOC(tsd, MAX_TSD+1, sizeof(void*));
+#endif
+	return MSG_process_create_with_arguments(name, code, tsd, host, argc, argv);
+}
+
 starpu_sg_host_t _starpu_simgrid_get_memnode_host(unsigned node)
 {
 	const char *fmt;