Browse Source

Rework termination, so restarting starpu works

Samuel Thibault 12 years ago
parent
commit
7ec80e3076

+ 167 - 139
src/core/topology.c

@@ -320,16 +320,16 @@ static inline int _starpu_get_next_scc_deviceid(struct _starpu_machine_config *c
 static void
 _starpu_init_mic_topology (struct _starpu_machine_config *config, long mic_idx)
 {
-    /* Discover the topology of the mic node identifier by MIC_IDX. That
-     * means, make this StarPU instance aware of the number of cores available
-     * on this MIC device. Update the `nhwmiccores' topology field
-     * accordingly. */
+	/* Discover the topology of the mic node identifier by MIC_IDX. That
+	 * means, make this StarPU instance aware of the number of cores available
+	 * on this MIC device. Update the `nhwmiccores' topology field
+	 * accordingly. */
 
-    struct starpu_machine_topology *topology = &config->topology;
+	struct starpu_machine_topology *topology = &config->topology;
 
-    int nbcores;
-    _starpu_src_common_sink_nbcores (mic_nodes[mic_idx], &nbcores);
-    topology->nhwmiccores[mic_idx] = nbcores;
+	int nbcores;
+	_starpu_src_common_sink_nbcores (mic_nodes[mic_idx], &nbcores);
+	topology->nhwmiccores[mic_idx] = nbcores;
 }
 
 
@@ -337,67 +337,67 @@ static int
 _starpu_init_mic_node (struct _starpu_machine_config *config, int mic_idx,
 		       COIENGINE *coi_handle, COIPROCESS *coi_process)
 {
-    /* Initialize the MIC node of index MIC_IDX. */
-
-    struct starpu_conf *user_conf = config->conf;
-
-    char ***argv = _starpu_get_argv();
-    const char *suffixes[] = {"-mic", "_mic", NULL};
-
-    /* Environment variables to send to the Sink, it informs it what kind
-     * of node it is (architecture and type) as there is no way to discover
-     * it itself */
-    char mic_idx_env[32];
-    sprintf(mic_idx_env, "DEVID=%d", mic_idx);
-
-    /* XXX: this is currently necessary so that the remote process does not
-     * segfault. */
-    char nb_mic_env[32];
-    sprintf(nb_mic_env, "NB_MIC=%d", 2);
-
-    const char *mic_sink_env[] = {"STARPU_SINK=STARPU_MIC", mic_idx_env, nb_mic_env, NULL};
-
-    char mic_sink_program_path[1024];
-    /* Let's get the helper program to run on the MIC device */
-    int mic_file_found =
-	_starpu_src_common_locate_file (mic_sink_program_path,
-					getenv("STARPU_MIC_SINK_PROGRAM_NAME"),
-					getenv("STARPU_MIC_SINK_PROGRAM_PATH"),
-					user_conf->mic_sink_program_path,
-					(argv ? (*argv)[0] : NULL),
-					suffixes);
-
-    if (0 != mic_file_found) {
-	fprintf(stderr, "No MIC program specified, use the environment"
-		"variable STARPU_MIC_SINK_PROGRAM_NAME or the environment"
-		"or the field 'starpu_conf.mic_sink_program_path'"
-		"to define it.\n");
-
-	return -1;
-    }
+	/* Initialize the MIC node of index MIC_IDX. */
+
+	struct starpu_conf *user_conf = config->conf;
+
+	char ***argv = _starpu_get_argv();
+	const char *suffixes[] = {"-mic", "_mic", NULL};
+
+	/* Environment variables to send to the Sink, it informs it what kind
+	 * of node it is (architecture and type) as there is no way to discover
+	 * it itself */
+	char mic_idx_env[32];
+	sprintf(mic_idx_env, "DEVID=%d", mic_idx);
+
+	/* XXX: this is currently necessary so that the remote process does not
+	 * segfault. */
+	char nb_mic_env[32];
+	sprintf(nb_mic_env, "NB_MIC=%d", 2);
+
+	const char *mic_sink_env[] = {"STARPU_SINK=STARPU_MIC", mic_idx_env, nb_mic_env, NULL};
+
+	char mic_sink_program_path[1024];
+	/* Let's get the helper program to run on the MIC device */
+	int mic_file_found =
+	    _starpu_src_common_locate_file (mic_sink_program_path,
+					    getenv("STARPU_MIC_SINK_PROGRAM_NAME"),
+					    getenv("STARPU_MIC_SINK_PROGRAM_PATH"),
+					    user_conf->mic_sink_program_path,
+					    (argv ? (*argv)[0] : NULL),
+					    suffixes);
+
+	if (0 != mic_file_found) {
+		fprintf(stderr, "No MIC program specified, use the environment"
+			"variable STARPU_MIC_SINK_PROGRAM_NAME or the environment"
+			"or the field 'starpu_conf.mic_sink_program_path'"
+			"to define it.\n");
+
+		return -1;
+	}
 
-    COIRESULT res;
-    /* Let's get the handle which let us manage the remote MIC device */
-    res = COIEngineGetHandle(COI_ISA_MIC, mic_idx, coi_handle);
-    if (STARPU_UNLIKELY(res != COI_SUCCESS))
-	STARPU_MIC_SRC_REPORT_COI_ERROR(res);
-
-    /* We launch the helper on the MIC device, which will wait for us
-     * to give it work to do.
-     * As we will communicate further with the device throught scif we
-     * don't need to keep the process pointer */
-    res = COIProcessCreateFromFile(*coi_handle, mic_sink_program_path, 0, NULL, 0,
-				   mic_sink_env, 1, NULL, 0, NULL,
-				   coi_process);
-    if (STARPU_UNLIKELY(res != COI_SUCCESS))
-	STARPU_MIC_SRC_REPORT_COI_ERROR(res);
-
-    /* Let's create the node structure, we'll communicate with the peer
-     * through scif thanks to it */
-    mic_nodes[mic_idx] =
-	_starpu_mp_common_node_create(STARPU_MIC_SOURCE, mic_idx);
-
-    return 0;
+	COIRESULT res;
+	/* Let's get the handle which let us manage the remote MIC device */
+	res = COIEngineGetHandle(COI_ISA_MIC, mic_idx, coi_handle);
+	if (STARPU_UNLIKELY(res != COI_SUCCESS))
+		STARPU_MIC_SRC_REPORT_COI_ERROR(res);
+
+	/* We launch the helper on the MIC device, which will wait for us
+	 * to give it work to do.
+	 * As we will communicate further with the device throught scif we
+	 * don't need to keep the process pointer */
+	res = COIProcessCreateFromFile(*coi_handle, mic_sink_program_path, 0, NULL, 0,
+				       mic_sink_env, 1, NULL, 0, NULL,
+				       coi_process);
+	if (STARPU_UNLIKELY(res != COI_SUCCESS))
+		STARPU_MIC_SRC_REPORT_COI_ERROR(res);
+
+	/* Let's create the node structure, we'll communicate with the peer
+	 * through scif thanks to it */
+	mic_nodes[mic_idx] =
+		_starpu_mp_common_node_create(STARPU_MIC_SOURCE, mic_idx);
+
+	return 0;
 }
 #endif
 
@@ -586,101 +586,125 @@ _starpu_init_mic_config (struct _starpu_machine_config *config,
 			 struct starpu_conf *user_conf,
 			 unsigned mic_idx)
 {
-    // Configure the MIC device of index MIC_IDX.
+	// Configure the MIC device of index MIC_IDX.
 
-    struct starpu_machine_topology *topology = &config->topology;
+	struct starpu_machine_topology *topology = &config->topology;
 
-    topology->nhwmiccores[mic_idx] = 0;
+	topology->nhwmiccores[mic_idx] = 0;
 
-    _starpu_init_mic_topology (config, mic_idx);
+	_starpu_init_mic_topology (config, mic_idx);
 
-    int nmiccores;
-    nmiccores = starpu_get_env_number("STARPU_NMIC");
+	int nmiccores;
+	nmiccores = starpu_get_env_number("STARPU_NMIC");
 
-    /* STARPU_NMIC is not set. Did the user specify anything ? */
-    if (nmiccores == -1 && user_conf)
-	nmiccores = user_conf->nmic;
+	/* STARPU_NMIC is not set. Did the user specify anything ? */
+	if (nmiccores == -1 && user_conf)
+		nmiccores = user_conf->nmic;
 
-    if (nmiccores != 0)
-    {
-	if (nmiccores == -1)
+	if (nmiccores != 0)
 	{
-	    /* Nothing was specified, so let's use the number of
-	     * detected mic cores. ! */
-	    nmiccores = topology->nhwmiccores[mic_idx];
+		if (nmiccores == -1)
+		{
+			/* Nothing was specified, so let's use the number of
+			 * detected mic cores. ! */
+			nmiccores = topology->nhwmiccores[mic_idx];
+		    }
+		else
+		{
+			if ((unsigned) nmiccores > topology->nhwmiccores[mic_idx])
+			{
+				/* The user requires more MIC devices than there is available */
+				fprintf(stderr,
+					"# Warning: %d MIC devices requested. Only %d available.\n",
+					nmiccores, topology->nhwmiccores[mic_idx]);
+				nmiccores = topology->nhwmiccores[mic_idx];
+			}
+		}
 	}
-	else
+
+	topology->nmiccores[mic_idx] = nmiccores;
+	STARPU_ASSERT(topology->nmiccores[mic_idx] + topology->nworkers <= STARPU_NMAXWORKERS);
+
+	/* _starpu_initialize_workers_mic_deviceid (config); */
+
+	unsigned miccore_id;
+	for (miccore_id = 0; miccore_id < topology->nmiccores[mic_idx]; miccore_id++)
 	{
-	    if ((unsigned) nmiccores > topology->nhwmiccores[mic_idx])
-	    {
-		/* The user requires more MIC devices than there is available */
-		fprintf(stderr,
-			"# Warning: %d MIC devices requested. Only %d available.\n",
-			nmiccores, topology->nhwmiccores[mic_idx]);
-		nmiccores = topology->nhwmiccores[mic_idx];
-	    }
+		int worker_idx = topology->nworkers + miccore_id;
+		config->workers[worker_idx].arch = STARPU_MIC_WORKER;
+		config->workers[worker_idx].perf_arch = STARPU_MIC_DEFAULT;
+		config->workers[worker_idx].mp_nodeid = mic_idx;
+		config->workers[worker_idx].devid = miccore_id;
+		config->workers[worker_idx].worker_mask = STARPU_MIC;
+		config->worker_mask |= STARPU_MIC;
+		_starpu_init_sched_ctx_for_worker(config->workers[worker_idx].workerid);
 	}
-    }
 
-    topology->nmiccores[mic_idx] = nmiccores;
-    STARPU_ASSERT(topology->nmiccores[mic_idx] + topology->nworkers <= STARPU_NMAXWORKERS);
-
-    /* _starpu_initialize_workers_mic_deviceid (config); */
-
-    unsigned miccore_id;
-    for (miccore_id = 0; miccore_id < topology->nmiccores[mic_idx]; miccore_id++)
-    {
-	int worker_idx = topology->nworkers + miccore_id;
-	config->workers[worker_idx].arch = STARPU_MIC_WORKER;
-	config->workers[worker_idx].perf_arch = STARPU_MIC_DEFAULT;
-	config->workers[worker_idx].mp_nodeid = mic_idx;
-	config->workers[worker_idx].devid = miccore_id;
-	config->workers[worker_idx].worker_mask = STARPU_MIC;
-	config->worker_mask |= STARPU_MIC;
-	_starpu_init_sched_ctx_for_worker(config->workers[worker_idx].workerid);
+	topology->nworkers += topology->nmiccores[mic_idx];
     }
 
-    topology->nworkers += topology->nmiccores[mic_idx];
-}
 
+#ifdef STARPU_USE_MIC
+static COIENGINE handles[2];
+static COIPROCESS process[2];
+#endif
 
 static void
 _starpu_init_mp_config (struct _starpu_machine_config *config,
 			struct starpu_conf *user_conf)
 {
-    /* Discover and configure the mp topology. That means:
-     * - discover the number of mp nodes;
-     * - initialize each discovered node;
-     * - discover the local topology (number of PUs/devices) of each node;
-     * - configure the workers accordingly.
-     */
+	/* Discover and configure the mp topology. That means:
+	 * - discover the number of mp nodes;
+	 * - initialize each discovered node;
+	 * - discover the local topology (number of PUs/devices) of each node;
+	 * - configure the workers accordingly.
+	 */
 
-    struct starpu_machine_topology *topology = &config->topology;
+	struct starpu_machine_topology *topology = &config->topology;
 
-    // We currently only support MIC at this level.
+	// We currently only support MIC at this level.
 #ifdef STARPU_USE_MIC
-    static COIENGINE handles[2];
-    static COIPROCESS process[2];
-
-    /* Discover and initialize the number of MIC nodes through the mp
-     * infrastructure. */
-    unsigned nhwmicdevices = _starpu_mic_src_get_device_count();
-
-    int reqmicdevices = starpu_get_env_number("STARPU_NMICDEVS");
-    if (-1 == reqmicdevices)
-	reqmicdevices = nhwmicdevices;
-
-    topology->nmicdevices = 0;
-    unsigned i;
-    for (i = 0; i < STARPU_MIN (nhwmicdevices, (unsigned) reqmicdevices); i++)
-	if (0 == _starpu_init_mic_node (config, i, &handles[i], &process[i]))
-	    topology->nmicdevices++;
-
-    i = 0;
-    for (; i < topology->nmicdevices; i++)
-	_starpu_init_mic_config (config, user_conf, i);
+
+	/* Discover and initialize the number of MIC nodes through the mp
+	 * infrastructure. */
+	unsigned nhwmicdevices = _starpu_mic_src_get_device_count();
+
+	int reqmicdevices = starpu_get_env_number("STARPU_NMICDEVS");
+	if (-1 == reqmicdevices)
+		reqmicdevices = nhwmicdevices;
+
+	topology->nmicdevices = 0;
+	unsigned i;
+	for (i = 0; i < STARPU_MIN (nhwmicdevices, (unsigned) reqmicdevices); i++)
+		if (0 == _starpu_init_mic_node (config, i, &handles[i], &process[i]))
+			topology->nmicdevices++;
+
+	i = 0;
+	for (; i < topology->nmicdevices; i++)
+		_starpu_init_mic_config (config, user_conf, i);
 #endif
 }
+
+static void
+_starpu_deinit_mic_node (unsigned mic_idx)
+{
+	_starpu_mp_common_send_command(mic_nodes[mic_idx], STARPU_EXIT, NULL, 0);
+
+	COIProcessDestroy(process[mic_idx], -1, 0, NULL, NULL);
+
+	_starpu_mp_common_node_destroy(mic_nodes[mic_idx]);
+}
+
+static void
+_starpu_deinit_mp_config (struct _starpu_machine_config *config)
+{
+	struct starpu_machine_topology *topology = &config->topology;
+	unsigned i;
+
+	for (i = 0; i < topology->nmicdevices; i++)
+		_starpu_deinit_mic_node (i);
+	_starpu_mic_clear_kernels();
+}
 #endif
 
 static int
@@ -1300,6 +1324,10 @@ void
 _starpu_destroy_topology (
 	struct _starpu_machine_config *config __attribute__ ((unused)))
 {
+#ifdef STARPU_USE_MIC
+	_starpu_deinit_mp_config(config);
+#endif
+
 	/* cleanup StarPU internal data structures */
 	_starpu_memory_nodes_deinit();
 

+ 43 - 58
src/drivers/mic/driver_mic_source.c

@@ -91,8 +91,6 @@ const struct _starpu_mp_node *_starpu_mic_src_get_mp_node_from_memory_node(int m
 	return mic_nodes[nodeid];
 }
 
-// Should be obsolete.
-#if 0
 static void _starpu_mic_src_free_kernel(void *kernel)
 {
 	struct _starpu_mic_kernel *k = kernel;
@@ -101,76 +99,63 @@ static void _starpu_mic_src_free_kernel(void *kernel)
 	free(kernel);
 }
 
-static void _starpu_mic_src_deinit_context(int devid)
+void _starpu_mic_clear_kernels(void)
 {
-	_starpu_mp_common_send_command(mic_nodes[devid], STARPU_EXIT, NULL, 0);
-
-	COIProcessDestroy(process[devid], -1, 0, NULL, NULL);
-
-	_starpu_mp_common_node_destroy(mic_nodes[devid]);
-
-	STARPU_PTHREAD_MUTEX_LOCK(&nb_mic_worker_init_mutex);
-	unsigned int tmp = --nb_mic_worker_init;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&nb_mic_worker_init_mutex);
-
-	if (tmp == 0) {
-		struct _starpu_mic_kernel *kernel, *tmp;
-		HASH_ITER(hh, kernels, kernel, tmp)
-		{
-			HASH_DEL(kernels, kernel);
-			free(kernel);
-		}
+	struct _starpu_mic_kernel *kernel, *tmp;
+	HASH_ITER(hh, kernels, kernel, tmp)
+	{
+		HASH_DEL(kernels, kernel);
+		free(kernel);
 	}
 }
-#endif
 
 static int
 _starpu_mic_src_finalize_job (struct _starpu_job *j, struct _starpu_worker *worker)
 {
-    uint32_t mask = 0;
-    int profiling = starpu_profiling_status_get();
-    struct timespec codelet_end;
+	uint32_t mask = 0;
+	int profiling = starpu_profiling_status_get();
+	struct timespec codelet_end;
 
-    _starpu_driver_end_job(worker, j, worker->perf_arch, &codelet_end, 0,
-			   profiling);
+	_starpu_driver_end_job(worker, j, worker->perf_arch, &codelet_end, 0,
+			       profiling);
 
-    _starpu_driver_update_job_feedback(j, worker, worker->perf_arch,
-				       &j->cl_start, &codelet_end,
-				       profiling);
+	_starpu_driver_update_job_feedback(j, worker, worker->perf_arch,
+					   &j->cl_start, &codelet_end,
+					   profiling);
 
-    _starpu_push_task_output (j, mask);
+	_starpu_push_task_output (j, mask);
 
-    _starpu_handle_job_termination(j);
+	_starpu_handle_job_termination(j);
 
-    return 0;
+	return 0;
 }
 
 static int
 _starpu_mic_src_process_completed_job (struct _starpu_worker_set *workerset)
 {
-    struct _starpu_mp_node *node = mic_nodes[workerset->workers[0].mp_nodeid];
-    enum _starpu_mp_command answer;
-    void *arg;
-    int arg_size;
+	struct _starpu_mp_node *node = mic_nodes[workerset->workers[0].mp_nodeid];
+	enum _starpu_mp_command answer;
+	void *arg;
+	int arg_size;
 
-    answer = _starpu_mp_common_recv_command (node, &arg, &arg_size);
-    STARPU_ASSERT (answer == STARPU_EXECUTION_COMPLETED);
+	answer = _starpu_mp_common_recv_command (node, &arg, &arg_size);
+	STARPU_ASSERT (answer == STARPU_EXECUTION_COMPLETED);
 
-    void *arg_ptr = arg;
-    int coreid;
+	void *arg_ptr = arg;
+	int coreid;
 
-    coreid = *(int *) arg_ptr;
-    arg_ptr += sizeof (coreid); // Useless.
+	coreid = *(int *) arg_ptr;
+	arg_ptr += sizeof (coreid); // Useless.
 
-    struct _starpu_worker *worker = &workerset->workers[coreid];
-    struct starpu_task *task = worker->current_task;
-    struct _starpu_job *j = _starpu_get_job_associated_to_task (task);
+	struct _starpu_worker *worker = &workerset->workers[coreid];
+	struct starpu_task *task = worker->current_task;
+	struct _starpu_job *j = _starpu_get_job_associated_to_task (task);
 
-    _starpu_mic_src_finalize_job (j, worker);
+	_starpu_mic_src_finalize_job (j, worker);
 
-    worker->current_task = NULL;
+	worker->current_task = NULL;
 
-    return 0;
+	return 0;
 }
 
 
@@ -383,16 +368,16 @@ starpu_mic_kernel_t _starpu_mic_src_get_kernel_from_codelet(struct starpu_codele
  */
 void _starpu_mic_src_init(struct _starpu_mp_node *node)
 {
-    /* Let's initialize the connection with the peered sink device */
-    _starpu_mic_common_connect(&node->mp_connection.mic_endpoint,
-					STARPU_TO_MIC_ID(node->peer_id),
-					STARPU_MIC_SINK_PORT_NUMBER(node->peer_id),
-					STARPU_MIC_SOURCE_PORT_NUMBER);
-
-    _starpu_mic_common_connect(&node->host_sink_dt_connection.mic_endpoint,
-			       STARPU_TO_MIC_ID(node->peer_id),
-			       STARPU_MIC_SINK_DT_PORT_NUMBER(node->peer_id),
-			       STARPU_MIC_SOURCE_DT_PORT_NUMBER);
+	/* Let's initialize the connection with the peered sink device */
+	_starpu_mic_common_connect(&node->mp_connection.mic_endpoint,
+					    STARPU_TO_MIC_ID(node->peer_id),
+					    STARPU_MIC_SINK_PORT_NUMBER(node->peer_id),
+					    STARPU_MIC_SOURCE_PORT_NUMBER);
+
+	_starpu_mic_common_connect(&node->host_sink_dt_connection.mic_endpoint,
+				   STARPU_TO_MIC_ID(node->peer_id),
+				   STARPU_MIC_SINK_DT_PORT_NUMBER(node->peer_id),
+				   STARPU_MIC_SOURCE_DT_PORT_NUMBER);
 }
 
 /* Deinitialize the MIC sink, close all the connections.

+ 1 - 0
src/drivers/mic/driver_mic_source.h

@@ -54,6 +54,7 @@ unsigned _starpu_mic_src_get_device_count(void);
 starpu_mic_kernel_t _starpu_mic_src_get_kernel_from_codelet(struct starpu_codelet *cl, unsigned nimpl);
 
 void _starpu_mic_src_init(struct _starpu_mp_node *node);
+void _starpu_mic_clear_kernels(void);
 void _starpu_mic_src_deinit(struct _starpu_mp_node *node);
 
 size_t _starpu_mic_get_global_mem_size(int devid);

+ 0 - 6
tests/main/restart.c

@@ -49,12 +49,6 @@ int main(int argc, char **argv)
 		gettimeofday(&end, NULL);
 		if (ret == -ENODEV)
 			goto enodev;
-
-		if (starpu_worker_get_type(STARPU_MIC_WORKER))
-#ifdef STARPU_DEVEL
-#warning MIC does not support restart yet
-#endif
-			goto enodev;
 		STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
 		init_timing += (double)((end.tv_sec - start.tv_sec)*1000000 + (end.tv_usec - start.tv_usec));
 

+ 0 - 6
tests/main/starpu_init.c

@@ -56,12 +56,6 @@ static int check_cpu(int env_cpu, int conf_cpu, int expected_cpu, int *cpu)
 	}
 
 	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
-
-#ifdef STARPU_DEVEL
-#warning MIC does not support restart yet
-#endif
-	if (starpu_worker_get_type(STARPU_MIC_WORKER)) return STARPU_TEST_SKIPPED;
-
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
 
 	*cpu = starpu_cpu_worker_get_count();