Nathalie Furmento 8 anos atrás
pai
commit
994fb00312

+ 2 - 1
configure.ac

@@ -1,6 +1,6 @@
 # StarPU --- Runtime system for heterogeneous multicore architectures.
 #
-# Copyright (C) 2009-2016  Université de Bordeaux
+# Copyright (C) 2009-2017  Université de Bordeaux
 # Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016  CNRS
 # Copyright (C) 2011  Télécom-SudParis
 # Copyright (C) 2011, 2012, 2014-2016  INRIA
@@ -1106,6 +1106,7 @@ if test x$enable_simgrid = xyes ; then
 	)
 	AC_CHECK_HEADERS([simgrid/msg.h], [AC_DEFINE([STARPU_HAVE_SIMGRID_MSG_H], [1], [Define to 1 if you have msg.h in simgrid/.])])
 	AC_CHECK_HEADERS([xbt/synchro.h], [AC_DEFINE([STARPU_HAVE_XBT_SYNCHRO_H], [1], [Define to 1 if you have synchro.h in xbt/.])])
+	AC_CHECK_TYPES([smx_actor_t], [AC_DEFINE([STARPU_HAVE_SMX_ACTOR_T], [1], [Define to 1 if you have the smx_actor_t type.])], [], [[#include <simgrid/simix.h>]])
    	AC_CHECK_FUNCS([MSG_process_join MSG_process_attach MSG_get_as_by_name MSG_environment_get_routing_root MSG_host_get_speed xbt_mutex_try_acquire smpi_process_set_user_data sg_link_name])
 	AC_CHECK_FUNCS([xbt_barrier_init], [AC_DEFINE([STARPU_SIMGRID_HAVE_XBT_BARRIER_INIT], [1], [Define to 1 if you have the `xbt_barrier_init' function.])])
 	AC_CHECK_DECLS([smpi_process_set_user_data], [], [], [[#include <smpi/smpi.h>]])

+ 8 - 1
examples/sched_ctx/parallel_tasks_with_cluster_api.c

@@ -101,7 +101,14 @@ int main(int argc, char **argv)
 				    STARPU_VALUE,&size,sizeof(int),
 				    0);
 		t->destroy = 1;
-		t->possibly_parallel = 1;
+		/* For two tasks, try out the case when the task isn't parallel and expect
+			 the configuration to be sequential due to this, then automatically changed
+			 back to the parallel one */
+		if (i<=4 || i > 6)
+			t->possibly_parallel = 1;
+		/* Note that this mode requires that you put a prologue callback managing
+			 this on all tasks to be taken into account. */
+		t->prologue_callback_pop_func = &starpu_openmp_prologue;
 
 		ret=starpu_task_submit(t);
 		if (ret == -ENODEV)

+ 2 - 2
include/starpu_clusters_util.h

@@ -72,10 +72,10 @@ int starpu_uncluster_machine(struct starpu_cluster_machine* clusters);
 int starpu_cluster_print(struct starpu_cluster_machine* clusters);
 
 /* Prologue functions */
-void starpu_openmp_prologue(void * sched_ctx_id);
+void starpu_openmp_prologue(void*);
 #define starpu_intel_openmp_mkl_prologue starpu_openmp_prologue
 #ifdef STARPU_MKL
-void starpu_gnu_openmp_mkl_prologue(void * sched_ctx_id);
+void starpu_gnu_openmp_mkl_prologue(void*);
 #endif /* STARPU_MKL */
 
 #ifdef __cplusplus

+ 32 - 21
src/common/utils.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010, 2012-2016  Université de Bordeaux
+ * Copyright (C) 2010, 2012-2017  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -271,6 +271,33 @@ int _starpu_fftruncate(FILE *file, size_t length)
 	return ftruncate(fileno(file), length);
 }
 
+static int _starpu_warn_nolock(int err)
+{
+	if (0
+#ifdef ENOLCK
+		|| err == ENOLCK
+#endif
+#ifdef ENOTSUP
+		|| err == ENOTSUP
+#endif
+#ifdef EOPNOTSUPP
+		|| err == EOPNOTSUPP
+#endif
+#ifdef EROFS
+		|| err == EROFS
+#endif
+		)
+	{
+		static int warn;
+		if (!warn) {
+			warn = 1;
+			_STARPU_DISP("warning: Couldn't lock performance file, StarPU home (%s, coming from $HOME or $STARPU_HOME) is probably on some network filesystem like NFS which does not support locking.\n", _starpu_get_home_path());
+		}
+		return 1;
+	}
+	return 0;
+}
+
 int _starpu_frdlock(FILE *file)
 {
 	int ret;
@@ -290,17 +317,8 @@ int _starpu_frdlock(FILE *file)
 	};
 	ret = fcntl(fileno(file), F_SETLKW, &lock);
 #endif
-#ifdef ENOLCK
-	if (ret != 0 && errno == ENOLCK)
-	{
-		static int warn;
-		if (!warn) {
-			warn = 1;
-			_STARPU_DISP("warning: Couldn't lock performance file, StarPU home is probably on NFS which does not support locking.\n");
-		}
+	if (ret != 0 && _starpu_warn_nolock(errno))
 		return -1;
-	}
-#endif
 	STARPU_ASSERT(ret == 0);
 	return ret;
 }
@@ -325,6 +343,8 @@ int _starpu_frdunlock(FILE *file)
 	};
 	ret = fcntl(fileno(file), F_SETLKW, &lock);
 #endif
+	if (ret != 0 && _starpu_warn_nolock(errno))
+		return -1;
 	STARPU_ASSERT(ret == 0);
 	return ret;
 }
@@ -351,17 +371,8 @@ int _starpu_fwrlock(FILE *file)
 	ret = fcntl(fileno(file), F_SETLKW, &lock);
 #endif
 
-#ifdef ENOLCK
-	if (ret != 0 && errno == ENOLCK)
-	{
-		static int warn;
-		if (!warn) {
-			warn = 1;
-			_STARPU_DISP("warning: Couldn't lock performance file, StarPU home is probably on NFS which does not support locking.\n");
-		}
+	if (ret != 0 && _starpu_warn_nolock(errno))
 		return -1;
-	}
-#endif
 	STARPU_ASSERT(ret == 0);
 	return ret;
 }

+ 16 - 13
src/core/disk_ops/disk_leveldb.cpp

@@ -50,14 +50,14 @@ struct starpu_leveldb_base
 static void *starpu_leveldb_alloc(void *base, size_t size STARPU_ATTRIBUTE_UNUSED)
 {
 	struct starpu_leveldb_base *base_tmp = (struct starpu_leveldb_base *) base;
-	struct starpu_leveldb_obj *obj;
-	_STARPU_MALLOC(obj, sizeof(struct starpu_leveldb_obj));
+	struct starpu_leveldb_obj *obj = (struct starpu_leveldb_obj *)malloc(sizeof(struct starpu_leveldb_obj));
+	STARPU_ASSERT(obj);
 
         STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
 
 	size_t len = 6 + 1 + 2+sizeof(void*)*2 + 1;
-	char *key;
-	_STARPU_MALLOC(key, len*sizeof(char));
+	char *key = (char *)malloc(len*sizeof(char));
+	STARPU_ASSERT(key);
 	snprintf(key, len, "STARPU-%p", obj);
 
 	/* create and add a key with a small memory */
@@ -88,8 +88,8 @@ static void starpu_leveldb_free(void *base , void *obj, size_t size STARPU_ATTRI
 /* open an existing memory on disk */
 static void *starpu_leveldb_open(void *base STARPU_ATTRIBUTE_UNUSED, void *pos, size_t size)
 {
-	struct starpu_leveldb_obj *obj;
-	_STARPU_MALLOC(obj, sizeof(struct starpu_leveldb_obj));
+	struct starpu_leveldb_obj *obj = (struct starpu_leveldb_obj *)malloc(sizeof(struct starpu_leveldb_obj));
+	STARPU_ASSERT(obj);
 
         STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
 
@@ -149,7 +149,8 @@ static int starpu_leveldb_full_read(void *base, void *obj, void **ptr, size_t *s
 	STARPU_ASSERT(s.ok());
 
 	*size = value.length();
-	_STARPU_MALLOC(*ptr, *size);
+	*ptr = malloc(*size);
+	STARPU_ASSERT(*ptr);
 
 	/* use buffer */
 	memcpy(*ptr, value.c_str(), *size);
@@ -177,7 +178,8 @@ static int starpu_leveldb_write(void *base, void *obj, const void *buf, off_t of
 	else
 	{
 		uintptr_t buf_tmp = (uintptr_t) buf;
-		_STARPU_MALLOC(buffer, (tmp->size > (offset + size)) ? tmp->size : (offset + size));
+		buffer = malloc((tmp->size > (offset + size)) ? tmp->size : (offset + size));
+		STARPU_ASSERT(buffer);
 
 		/* we read the data */
 		std::string value;
@@ -224,8 +226,8 @@ static int starpu_leveldb_full_write(void *base, void *obj, void *ptr, size_t si
 /* create a new copy of parameter == base */
 static void *starpu_leveldb_plug(void *parameter, starpu_ssize_t size STARPU_ATTRIBUTE_UNUSED)
 {
-	struct starpu_leveldb_base *tmp;
-	_STARPU_MALLOC(tmp, sizeof(struct starpu_leveldb_base));
+	struct starpu_leveldb_base *tmp = (struct starpu_leveldb_base *)malloc(sizeof(struct starpu_leveldb_base));
+	STARPU_ASSERT(tmp);
 
 	leveldb::Status status;
 	leveldb::DB *db;
@@ -268,8 +270,8 @@ static int get_leveldb_bandwidth_between_disk_and_main_ram(unsigned node)
 	double end;
 
 	srand(time (NULL));
-	char *buf;
-	_STARPU_MALLOC(buf, SIZE_DISK_MIN*sizeof(char));
+	char *buf = (char *)malloc(SIZE_DISK_MIN*sizeof(char));
+	STARPU_ASSERT(buf);
 
 	/* allocate memory */
 	void *mem = _starpu_disk_alloc(node, SIZE_DISK_MIN);
@@ -293,7 +295,8 @@ static int get_leveldb_bandwidth_between_disk_and_main_ram(unsigned node)
 	/* free memory */
 	free(buf);
 
-	_STARPU_MALLOC(buf, sizeof(char));
+	buf = (char *)malloc(sizeof(char));
+	STARPU_ASSERT(buf);
 
 	/* Measure latency */
 	start = starpu_timing_now();

+ 1 - 0
src/core/sched_ctx.c

@@ -566,6 +566,7 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 		sched_ctx->sleeping[w] = 0;
 	}
 
+	sched_ctx->parallel_view = 0;
 
         /*init the strategy structs and the worker_collection of the ressources of the context */
 	if(policy)

+ 4 - 0
src/core/sched_ctx.h

@@ -168,6 +168,10 @@ struct _starpu_sched_ctx
 	/* perf model for the device comb of the ctx */
 	struct starpu_perfmodel_arch perf_arch;
 
+	/* For parallel workers, say whether it is viewed as sequential or not. This
+		 is a helper for the prologue code. */
+	unsigned parallel_view;
+
 	/* for ctxs without policy: flag to indicate that we want to get
 	   the threads to sleep in order to replace them with other threads or leave
 	   them awake & use them in the parallel code*/

+ 4 - 0
src/core/sched_policy.c

@@ -989,7 +989,11 @@ profiling:
 	}
 
 	if(task->prologue_callback_pop_func)
+	{
+		_starpu_set_current_task(task);
 		task->prologue_callback_pop_func(task->prologue_callback_pop_arg);
+		_starpu_set_current_task(NULL);
+	}
 
 	return task;
 }

+ 21 - 1
src/core/simgrid.c

@@ -233,6 +233,9 @@ static int main_ret;
 
 int do_starpu_main(int argc, char *argv[])
 {
+	/* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
+	MSG_process_sleep(0.000001);
+
 	main_ret = starpu_main(argc, argv);
 	return main_ret;
 }
@@ -342,6 +345,9 @@ static struct task *last_task[STARPU_NMAXWORKERS];
 /* Actually execute the task.  */
 static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
 {
+	/* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
+	MSG_process_sleep(0.000001);
+
 	struct task *task = starpu_pthread_getspecific(0);
 	_STARPU_DEBUG("task %p started\n", task);
 	MSG_task_execute(task->task);
@@ -530,6 +536,9 @@ static int transfers_are_sequential(struct transfer *new_transfer, struct transf
 /* Actually execute the transfer, and then start transfers waiting for this one.  */
 static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
 {
+	/* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
+	MSG_process_sleep(0.000001);
+
 	struct transfer *transfer = starpu_pthread_getspecific(0);
 	unsigned i;
 	_STARPU_DEBUG("transfer %p started\n", transfer);
@@ -690,6 +699,9 @@ _starpu_simgrid_thread_start(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[])
 	void *(*f)(void*) = (void*) (uintptr_t) strtol(argv[0], NULL, 16);
 	void *arg = (void*) (uintptr_t) strtol(argv[1], NULL, 16);
 
+	/* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
+	MSG_process_sleep(0.000001);
+
 	/* _args is freed with process context */
 	f(arg);
 	return 0;
@@ -813,7 +825,15 @@ typedef struct{
 
 static int _starpu_simgrid_xbt_thread_create_wrapper(int argc, char *argv[])
 {
-  smx_process_t self = SIMIX_process_self();
+  /* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
+  MSG_process_sleep(0.000001);
+
+#ifdef HAVE_SMX_ACTOR_T
+  smx_actor_t
+#else
+  smx_process_t
+#endif
+	  self = SIMIX_process_self();
   thread_data_t *t = SIMIX_process_self_get_data(self);
   simcall_process_set_data(self, t->father_data);
   t->code(t->userparam);

+ 44 - 22
src/util/starpu_clusters_create.c

@@ -45,45 +45,67 @@ starpu_binding_function _starpu_cluster_type_get_func(starpu_cluster_types type)
 	return prologue_func;
 }
 
-void starpu_openmp_prologue(void *sched_ctx_id)
+void starpu_openmp_prologue(void* arg)
 {
-	int sched_ctx = *(int*)sched_ctx_id;
-	int *cpuids = NULL;
-	int ncpuids = 0;
 	int workerid = starpu_worker_get_id_check();
 
 	if (starpu_worker_get_type(workerid) == STARPU_CPU_WORKER)
 	{
-		starpu_sched_ctx_get_available_cpuids(sched_ctx, &cpuids, &ncpuids);
-		omp_set_num_threads(ncpuids);
-#pragma omp parallel
+		struct starpu_task *task = starpu_task_get_current();
+		int sched_ctx = task->sched_ctx;
+		struct _starpu_sched_ctx *ctx_struct = _starpu_get_sched_ctx_struct(sched_ctx);
+		/* If the view of the worker doesn't correspond to the view of the task,
+			 adapt the thread team */
+		if (ctx_struct->parallel_view != task->possibly_parallel)
 		{
-			starpu_sched_ctx_bind_current_thread_to_cpuid(cpuids[omp_get_thread_num()]);
+			int *cpuids = NULL;
+			int ncpuids = 0;
+
+			starpu_sched_ctx_get_available_cpuids(sched_ctx, &cpuids, &ncpuids);
+			if (!task->possibly_parallel)
+				ncpuids=1;
+			omp_set_num_threads(ncpuids);
+#pragma omp parallel
+			{
+				starpu_sched_ctx_bind_current_thread_to_cpuid(cpuids[omp_get_thread_num()]);
+			}
+			free(cpuids);
+			ctx_struct->parallel_view = !ctx_struct->parallel_view;
 		}
-		free(cpuids);
 	}
 	return;
 }
 
 #ifdef STARPU_MKL
-void starpu_gnu_openmp_mkl_prologue(void *sched_ctx_id)
+void starpu_gnu_openmp_mkl_prologue(void* arg)
 {
-	int sched_ctx = *(int*)sched_ctx_id;
-	int *cpuids = NULL;
-	int ncpuids = 0;
 	int workerid = starpu_worker_get_id();
 
 	if (starpu_worker_get_type(workerid) == STARPU_CPU_WORKER)
 	{
-		starpu_sched_ctx_get_available_cpuids(sched_ctx, &cpuids, &ncpuids);
-		omp_set_num_threads(ncpuids);
-		mkl_set_num_threads(ncpuids);
-		mkl_set_dynamic(0);
-#pragma omp parallel
+		struct starpu_task *task = starpu_task_get_current();
+		int sched_ctx = task->sched_ctx;
+		struct _starpu_sched_ctx *ctx_struct = _starpu_get_sched_ctx_struct(sched_ctx);
+		/* If the view of the worker doesn't correspond to the view of the task,
+			 adapt the thread team */
+		if (ctx_struct->parallel_view != task->possibly_parallel)
 		{
-			starpu_sched_ctx_bind_current_thread_to_cpuid(cpuids[omp_get_thread_num()]);
+			int *cpuids = NULL;
+			int ncpuids = 0;
+
+			starpu_sched_ctx_get_available_cpuids(sched_ctx, &cpuids, &ncpuids);
+			if (!task->possibly_parallel)
+				ncpuids=1;
+			omp_set_num_threads(ncpuids);
+			mkl_set_num_threads(ncpuids);
+			mkl_set_dynamic(0);
+#pragma omp parallel
+			{
+				starpu_sched_ctx_bind_current_thread_to_cpuid(cpuids[omp_get_thread_num()]);
+			}
+			free(cpuids);
+			ctx_struct->parallel_view = !ctx_struct->parallel_view;
 		}
-		free(cpuids);
 	}
 	return;
 }
@@ -324,8 +346,8 @@ int _starpu_cluster_bind(struct _starpu_cluster *cluster)
 	else
 	{
 		func = _starpu_cluster_type_get_func(cluster->params->type);
-		func_arg = (void*) &cluster->id;
-		}
+		func_arg = NULL;
+	}
 
 	return starpu_task_insert(&_starpu_cluster_bind_cl,
 				  STARPU_SCHED_CTX, cluster->id,

+ 2 - 2
tests/sched_ctx/sched_ctx_hierarchy.c

@@ -23,7 +23,7 @@ void func_cpu_bis(void *descr[], void *_args)
 {
 	char msg;
 	char worker_name[256];
-	int worker_id = starpu_worker_get_id();
+	int worker_id = starpu_worker_get_id_check();
 	int worker_id_expected;
 	int ntasks;
 
@@ -54,7 +54,7 @@ void func_cpu(void *descr[], void *_args)
 {
 	char msg;
 	char worker_name[256];
-	int worker_id = starpu_worker_get_id();
+	int worker_id = starpu_worker_get_id_check();
 	int worker_id_expected;
 	int ntasks;
 	unsigned sched_ctx_id;