Przeglądaj źródła

Clean up worker_set vs worker confusion, and CUDA initialization when several threads use the same device

Samuel Thibault 8 lat temu
rodzic
commit
82ee3ee150

+ 7 - 2
src/core/topology.c

@@ -94,7 +94,6 @@ _starpu_get_worker_from_driver(struct starpu_driver *d)
 		unsigned th_per_stream = starpu_get_env_number_default("STARPU_ONE_THREAD_PER_STREAM", 0);
 		if(th_per_stream == 0)
 			return &cuda_worker_set[d->id.cuda_id];
-
 	}
 #endif
 
@@ -1069,6 +1068,7 @@ _starpu_init_machine_config(struct _starpu_machine_config *config, int no_mp_con
 			int worker_idx = worker_idx0 + i;
 			if(th_per_stream)
 			{
+				/* Just one worker in the set */
 				config->workers[worker_idx].set = (struct _starpu_worker_set *)malloc(sizeof(struct _starpu_worker_set));
 				config->workers[worker_idx].set->workers = &config->workers[worker_idx];
 			}
@@ -1552,6 +1552,7 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 	unsigned cuda_init[STARPU_MAXCUDADEVS] = { };
 	unsigned cuda_memory_nodes[STARPU_MAXCUDADEVS];
 	unsigned cuda_bindid[STARPU_MAXCUDADEVS];
+	unsigned th_per_stream = starpu_get_env_number_default("STARPU_ONE_THREAD_PER_STREAM", 0);
 #endif
 #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
 	unsigned opencl_init[STARPU_MAXOPENCLDEVS] = { };
@@ -1584,6 +1585,7 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 		int *preferred_binding = NULL;
 		int npreferred = 0;
 #endif
+
 		/* select the memory node that contains worker's memory */
 		switch (workerarg->arch)
 		{
@@ -1630,7 +1632,10 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 				{
 					memory_node = cuda_memory_nodes[devid];
 #ifndef STARPU_SIMGRID
-					workerarg->bindid = _starpu_get_next_bindid(config, preferred_binding, npreferred);//cuda_bindid[devid];
+					if (th_per_stream == 0)
+						workerarg->bindid = cuda_bindid[devid];
+					else
+						workerarg->bindid = _starpu_get_next_bindid(config, preferred_binding, npreferred);
 #endif /* SIMGRID */
 				}
 				else

+ 41 - 56
src/core/workers.c

@@ -54,7 +54,7 @@
 static starpu_pthread_mutex_t init_mutex = STARPU_PTHREAD_MUTEX_INITIALIZER;
 static starpu_pthread_cond_t init_cond = STARPU_PTHREAD_COND_INITIALIZER;
 static int init_count = 0;
-static enum { UNINITIALIZED, CHANGING, INITIALIZED } initialized = UNINITIALIZED;
+static enum initialization initialized = UNINITIALIZED;
 
 int _starpu_keys_initialized STARPU_ATTRIBUTE_INTERNAL;
 starpu_pthread_key_t _starpu_worker_key STARPU_ATTRIBUTE_INTERNAL;
@@ -372,7 +372,7 @@ int starpu_worker_can_execute_task_first_impl(unsigned workerid, struct starpu_t
 	{
 		for (i = 0; i < STARPU_MAXIMPLEMENTATIONS; i++)
 			if (_starpu_can_use_nth_implementation(arch, cl, i)
-			 && (!task->cl->can_execute || task->cl->can_execute(workerid, task, i)))
+			 && (task->cl->can_execute(workerid, task, i)))
 			{
 				if (nimpl)
 					*nimpl = i;
@@ -679,25 +679,25 @@ static void _starpu_launch_drivers(struct _starpu_machine_config *pconfig)
 #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
 			case STARPU_CUDA_WORKER:
 				driver.id.cuda_id = devid;
-				/* allow having one worker per stream */
-				if(th_per_stream == 0)
-				{
-					/* We spawn only one thread per CUDA driver,
-					 * which will control all CUDA workers of this
-					 * driver. (by using a worker set). */
-					if (worker_set->workers != workerarg)
-						break;
-				}
 
-				worker_set->nworkers = starpu_get_env_number_default("STARPU_NWORKER_PER_CUDA", 1);
+				if (worker_set->workers != workerarg)
+					/* We are not the first worker of the
+					 * set, don't start a thread for it. */
+					break;
 
-#ifndef STARPU_NON_BLOCKING_DRIVERS
-				if (worker_set->nworkers > 1)
+				if(th_per_stream == 0)
 				{
-					_STARPU_DISP("Warning: reducing STARPU_NWORKER_PER_CUDA to 1 because blocking drivers are enabled\n");
-					worker_set->nworkers = 1;
-				}
+					worker_set->nworkers = starpu_get_env_number_default("STARPU_NWORKER_PER_CUDA", 1);
+#ifndef STARPU_NON_BLOCKING_DRIVERS
+					if (worker_set->nworkers > 1)
+					{
+						_STARPU_DISP("Warning: reducing STARPU_NWORKER_PER_CUDA to 1 because blocking drivers are enabled\n");
+						worker_set->nworkers = 1;
+					}
 #endif
+				}
+				else
+					worker_set->nworkers = 1;
 
 				worker_set->set_is_initialized = 0;
 
@@ -708,35 +708,19 @@ static void _starpu_launch_drivers(struct _starpu_machine_config *pconfig)
 				}
 
 
-				if(th_per_stream == 0)
-				{
-					STARPU_PTHREAD_CREATE_ON(
-						workerarg->name,
-						&worker_set->worker_thread,
-						NULL,
-						_starpu_cuda_worker,
-						worker_set,
-						_starpu_simgrid_get_host_by_worker(workerarg));
-				}
-				else
-				{
-					worker_set->nworkers = 1;
-					STARPU_PTHREAD_CREATE_ON(
-						workerarg->name,
-						&workerarg->worker_thread,
-						NULL,
-						_starpu_cuda_worker,
-//						workerarg,
-						worker_set,
-						_starpu_simgrid_get_host_by_worker(workerarg));
-				}
+				STARPU_PTHREAD_CREATE_ON(
+					workerarg->name,
+					&worker_set->worker_thread,
+					NULL,
+					_starpu_cuda_worker,
+					worker_set,
+					_starpu_simgrid_get_host_by_worker(workerarg));
 #ifdef STARPU_USE_FXT
 				STARPU_PTHREAD_MUTEX_LOCK(&workerarg->mutex);
 				while (!workerarg->worker_is_running)
 					STARPU_PTHREAD_COND_WAIT(&workerarg->started_cond, &workerarg->mutex);
 				STARPU_PTHREAD_MUTEX_UNLOCK(&workerarg->mutex);
 #endif
-
 				break;
 #endif
 #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
@@ -835,6 +819,7 @@ static void _starpu_launch_drivers(struct _starpu_machine_config *pconfig)
 #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
 		struct _starpu_worker_set *worker_set = workerarg->set;
 #endif
+
 		switch (workerarg->arch)
 		{
 			case STARPU_CPU_WORKER:
@@ -855,21 +840,11 @@ static void _starpu_launch_drivers(struct _starpu_machine_config *pconfig)
 					break;
 #endif
 				_STARPU_DEBUG("waiting for worker %u initialization\n", worker);
-				if(th_per_stream == 0)
-				{
-					STARPU_PTHREAD_MUTEX_LOCK(&worker_set->mutex);
-					while (!worker_set->set_is_initialized)
-						STARPU_PTHREAD_COND_WAIT(&worker_set->ready_cond,
-									 &worker_set->mutex);
-					STARPU_PTHREAD_MUTEX_UNLOCK(&worker_set->mutex);
-				}
-				else
-				{
-					STARPU_PTHREAD_MUTEX_LOCK(&workerarg->mutex);
-					while (!workerarg->worker_is_initialized)
-						STARPU_PTHREAD_COND_WAIT(&workerarg->ready_cond, &workerarg->mutex);
-					STARPU_PTHREAD_MUTEX_UNLOCK(&workerarg->mutex);
-				}
+				STARPU_PTHREAD_MUTEX_LOCK(&worker_set->mutex);
+				while (!worker_set->set_is_initialized)
+					STARPU_PTHREAD_COND_WAIT(&worker_set->ready_cond,
+								 &worker_set->mutex);
+				STARPU_PTHREAD_MUTEX_UNLOCK(&worker_set->mutex);
 				worker_set->started = 1;
 
 				break;
@@ -1285,6 +1260,7 @@ int starpu_initialize(struct starpu_conf *user_conf, int *argc, char ***argv)
 
 	_starpu_initialize_registered_performance_models();
 
+	_starpu_cuda_init();
 	/* Launch "basic" workers (ie. non-combined workers) */
 	if (!is_a_sink)
 		_starpu_launch_drivers(&_starpu_config);
@@ -1340,17 +1316,26 @@ static void _starpu_terminate_workers(struct _starpu_machine_config *pconfig)
 		struct _starpu_worker_set *set = pconfig->workers[workerid].set;
 		struct _starpu_worker *worker = &pconfig->workers[workerid];
 
+					fprintf(stderr,"worker %p %p\n", set, worker);
 		/* in case StarPU termination code is called from a callback,
  		 * we have to check if pthread_self() is the worker itself */
-		if (set && set->nworkers > 1)
+					if (set)
+					fprintf(stderr,"worker %p %u\n", set, set->nworkers);
+		if (set && set->nworkers > 0)
 		{
+					fprintf(stderr,"set started for %u %u\n", workerid, set->nworkers);
 			if (set->started)
 			{
+					fprintf(stderr,"set started for %u\n", workerid);
 #ifdef STARPU_SIMGRID
 				status = starpu_pthread_join(set->worker_thread, NULL);
 #else
 				if (!pthread_equal(pthread_self(), set->worker_thread))
+				{
+					fprintf(stderr,"waiting worker %u\n", workerid);
 					status = starpu_pthread_join(set->worker_thread, NULL);
+					fprintf(stderr,"waited worker %u\n", workerid);
+				}
 #endif
 				if (status)
 				{

+ 2 - 0
src/core/workers.h

@@ -56,6 +56,8 @@
 
 #define STARPU_MAX_PIPELINE 4
 
+enum initialization { UNINITIALIZED = 0, CHANGING, INITIALIZED };
+
 /* This is initialized from in _starpu_worker_init */
 LIST_TYPE(_starpu_worker,
 	struct _starpu_machine_config *config;

+ 49 - 18
src/drivers/cuda/driver_cuda.c

@@ -71,8 +71,20 @@ static starpu_pthread_mutex_t task_mutex[STARPU_NMAXWORKERS][STARPU_MAX_PIPELINE
 static starpu_pthread_cond_t task_cond[STARPU_NMAXWORKERS][STARPU_MAX_PIPELINE];
 #endif /* STARPU_SIMGRID */
 
-static unsigned cuda_memnode_deinit[STARPU_MAXCUDADEVS];
-static starpu_pthread_mutex_t cuda_deinit_mutex[STARPU_MAXCUDADEVS];
+static enum initialization cuda_device_init[STARPU_MAXCUDADEVS];
+static int cuda_device_users[STARPU_MAXCUDADEVS];
+static starpu_pthread_mutex_t cuda_device_init_mutex[STARPU_MAXCUDADEVS];
+static starpu_pthread_cond_t cuda_device_init_cond[STARPU_MAXCUDADEVS];
+
+void _starpu_cuda_init(void)
+{
+	unsigned i;
+	for (i = 0; i < STARPU_MAXCUDADEVS; i++)
+	{
+		STARPU_PTHREAD_MUTEX_INIT(&cuda_device_init_mutex[i], NULL);
+		STARPU_PTHREAD_COND_INIT(&cuda_device_init_cond[i], NULL);
+	}
+}
 
 void
 _starpu_cuda_discover_devices (struct _starpu_machine_config *config)
@@ -259,6 +271,21 @@ static void init_device_context(unsigned devid)
 
 	starpu_cuda_set_device(devid);
 
+	STARPU_PTHREAD_MUTEX_LOCK(&cuda_device_init_mutex[devid]);
+	cuda_device_users[devid]++;
+	if (cuda_device_init[devid] == UNINITIALIZED)
+		/* Nobody started initialization yet, do it */
+		cuda_device_init[devid] = CHANGING;
+	else
+	{
+		/* Somebody else is doing initialization, wait for it */
+		while (cuda_device_init[devid] != INITIALIZED)
+			STARPU_PTHREAD_COND_WAIT(&cuda_device_init_cond[devid], &cuda_device_init_mutex[devid]);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&cuda_device_init_mutex[devid]);
+		return;
+	}
+	STARPU_PTHREAD_MUTEX_UNLOCK(&cuda_device_init_mutex[devid]);
+
 #ifdef HAVE_CUDA_MEMCPY_PEER
 	if (starpu_get_env_number("STARPU_ENABLE_CUDA_GPU_GPU_DIRECT") != 0)
 	{
@@ -326,6 +353,11 @@ static void init_device_context(unsigned devid)
 		if (STARPU_UNLIKELY(cures))
 			STARPU_CUDA_REPORT_ERROR(cures);
 	}
+
+	STARPU_PTHREAD_MUTEX_LOCK(&cuda_device_init_mutex[devid]);
+	cuda_device_init[devid] = INITIALIZED;
+	STARPU_PTHREAD_COND_BROADCAST(&cuda_device_init_cond[devid]);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&cuda_device_init_mutex[devid]);
 }
 #endif /* !STARPU_SIMGRID */
 
@@ -680,16 +712,11 @@ int _starpu_cuda_driver_init(struct _starpu_worker_set *worker_set)
 	STARPU_PTHREAD_COND_SIGNAL(&worker0->ready_cond);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&worker0->mutex);
 
-	unsigned th_per_stream = starpu_get_env_number_default("STARPU_ONE_THREAD_PER_STREAM", 0);
-
-	if(th_per_stream == 0)
-	{
-		/* tell the main thread that this one is ready */
-		STARPU_PTHREAD_MUTEX_LOCK(&worker_set->mutex);
-		worker_set->set_is_initialized = 1;
-		STARPU_PTHREAD_COND_SIGNAL(&worker_set->ready_cond);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&worker_set->mutex);
-	}
+	/* tell the main thread that this one is ready */
+	STARPU_PTHREAD_MUTEX_LOCK(&worker_set->mutex);
+	worker_set->set_is_initialized = 1;
+	STARPU_PTHREAD_COND_SIGNAL(&worker_set->ready_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&worker_set->mutex);
 
 	return 0;
 }
@@ -856,15 +883,19 @@ int _starpu_cuda_driver_deinit(struct _starpu_worker_set *worker_set)
 		struct _starpu_worker *worker = &worker_set->workers[i];
 		unsigned devid = worker->devid;
 		unsigned memnode = worker->memory_node;
+		unsigned usersleft;
 		if ((int) devid == lastdevid)
 			/* Already initialized */
 			continue;
 		lastdevid = devid;
 
-		STARPU_PTHREAD_MUTEX_LOCK(&cuda_deinit_mutex[memnode]);
-		if(!cuda_memnode_deinit[devid])
-                {
+		STARPU_PTHREAD_MUTEX_LOCK(&cuda_device_init_mutex[devid]);
+		usersleft = --cuda_device_users[devid];
+		STARPU_PTHREAD_MUTEX_UNLOCK(&cuda_device_init_mutex[devid]);
 
+		if (!usersleft)
+                {
+			/* I'm last, deinitialize device */
 			_starpu_handle_all_pending_node_data_requests(memnode);
 			
 			/* In case there remains some memory that was automatically
@@ -873,14 +904,14 @@ int _starpu_cuda_driver_deinit(struct _starpu_worker_set *worker_set)
 			_starpu_free_all_automatically_allocated_buffers(memnode);
 			
 			_starpu_malloc_shutdown(memnode);
-			cuda_memnode_deinit[devid] = 1;
 
 #ifndef STARPU_SIMGRID
 			deinit_device_context(devid);
 #endif /* !STARPU_SIMGRID */
                 }
-
-                STARPU_PTHREAD_MUTEX_UNLOCK(&cuda_deinit_mutex[memnode]);
+		STARPU_PTHREAD_MUTEX_LOCK(&cuda_device_init_mutex[devid]);
+		cuda_device_init[devid] = UNINITIALIZED;
+		STARPU_PTHREAD_MUTEX_UNLOCK(&cuda_device_init_mutex[devid]);
 
 	}
 

+ 1 - 0
src/drivers/cuda/driver_cuda.h

@@ -34,6 +34,7 @@
 
 #include <common/fxt.h>
 
+void _starpu_cuda_init(void);
 unsigned _starpu_get_cuda_device_count(void);
 extern int _starpu_cuda_bus_ids[STARPU_MAXCUDADEVS+1][STARPU_MAXCUDADEVS+1];