Browse Source

simgrid: Add support for starpu_mpi_wait

Samuel Thibault 5 years ago
parent
commit
b7f5c7361d
2 changed files with 39 additions and 21 deletions
  1. 26 8
      mpi/src/mpi/starpu_mpi_mpi.c
  2. 13 13
      mpi/tests/Makefile.am

+ 26 - 8
mpi/src/mpi/starpu_mpi_mpi.c

@@ -325,7 +325,7 @@ static void _starpu_mpi_simgrid_wait_req_func(void* arg)
 	STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_error_code(ret));
 
 	*(sim_req->done) = 1;
-	starpu_pthread_queue_signal(sim_req->queue);
+	starpu_pthread_queue_broadcast(sim_req->queue);
 
 	free(sim_req);
 
@@ -526,6 +526,7 @@ void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
 /*                                                      */
 /********************************************************/
 
+#ifndef STARPU_SIMGRID
 void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
 {
 	_STARPU_MPI_LOG_IN();
@@ -535,10 +536,6 @@ void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
 	_STARPU_MPI_TRACE_UWAIT_BEGIN(req->node_tag.node.rank, req->node_tag.data_tag);
 	if (req->backend->data_request != MPI_REQUEST_NULL)
 	{
-		// TODO: Fix for STARPU_SIMGRID
-#ifdef STARPU_SIMGRID
-		STARPU_MPI_ASSERT_MSG(0, "Implement this in STARPU_SIMGRID");
-#endif
 		req->ret = MPI_Wait(&req->backend->data_request, waiting_req->status);
 		STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
 	}
@@ -548,6 +545,7 @@ void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
 
 	_STARPU_MPI_LOG_OUT();
 }
+#endif
 
 int _starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
 {
@@ -557,6 +555,25 @@ int _starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
 
 	_STARPU_MPI_LOG_IN();
 
+#ifdef STARPU_SIMGRID
+	_STARPU_MPI_TRACE_UWAIT_BEGIN(req->node_tag.node.rank, req->node_tag.data_tag);
+	starpu_pthread_wait_t wait;
+	starpu_pthread_wait_init(&wait);
+	starpu_pthread_queue_register(&wait, &req->queue);
+	while (1)
+	{
+		starpu_pthread_wait_reset(&wait);
+		if (req->done)
+			break;
+		starpu_pthread_wait_wait(&wait);
+	}
+	starpu_pthread_queue_unregister(&wait, &req->queue);
+	starpu_pthread_wait_destroy(&wait);
+	_STARPU_MPI_TRACE_UWAIT_END(req->node_tag.node.rank, req->node_tag.data_tag);
+
+	*status = req->status_store;
+	_starpu_mpi_handle_request_termination(req);
+#else
 	/* We cannot try to complete a MPI request that was not actually posted
 	 * to MPI yet. */
 	STARPU_PTHREAD_MUTEX_LOCK(&(req->backend->req_mutex));
@@ -580,16 +597,17 @@ int _starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
 		STARPU_PTHREAD_COND_WAIT(&req->backend->req_cond, &req->backend->req_mutex);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
 
-	ret = req->ret;
-
 	/* The internal request structure was automatically allocated */
+	_starpu_mpi_request_destroy(waiting_req);
+#endif
+
 	*public_req = NULL;
 	if (req->backend->internal_req)
 	{
 		_starpu_mpi_request_destroy(req->backend->internal_req);
 	}
+	ret = req->ret;
 	_starpu_mpi_request_destroy(req);
-	_starpu_mpi_request_destroy(waiting_req);
 
 	_STARPU_MPI_LOG_OUT();
 	return ret;

+ 13 - 13
mpi/tests/Makefile.am

@@ -1,7 +1,7 @@
 # StarPU --- Runtime system for heterogeneous multicore architectures.
 #
 # Copyright (C) 2010-2019                                CNRS
-# Copyright (C) 2009-2018                                Université de Bordeaux
+# Copyright (C) 2009-2018, 2020                                Université de Bordeaux
 # Copyright (C) 2013                                     Thibaut Lambert
 #
 # StarPU is free software; you can redistribute it and/or modify
@@ -95,6 +95,7 @@ if BUILD_TESTS
 starpu_mpi_TESTS =
 
 starpu_mpi_TESTS +=				\
+	broadcast				\
 	cache					\
 	cache_disable				\
 	callback				\
@@ -108,36 +109,38 @@ starpu_mpi_TESTS +=				\
 	insert_task_owner_data			\
 	matrix					\
 	matrix2					\
+	mpi_barrier				\
 	mpi_detached_tag			\
+	mpi_irecv				\
 	mpi_irecv_detached			\
+	mpi_isend				\
 	mpi_isend_detached			\
 	mpi_reduction				\
 	mpi_scatter_gather			\
+	pingpong				\
 	policy_register				\
 	policy_register_many			\
 	policy_selection			\
 	policy_selection2			\
+	ring					\
+	ring_async				\
 	ring_async_implicit			\
 	temporary				\
-	early_stuff
+	user_defined_datatype			\
+	early_stuff				\
+	gather					\
+	sendrecv_bench
 
 if !STARPU_SIMGRID
 starpu_mpi_TESTS +=				\
 	attr					\
-	broadcast				\
-	pingpong				\
 	mpi_test				\
-	mpi_isend				\
 	mpi_earlyrecv				\
 	mpi_earlyrecv2				\
 	mpi_earlyrecv2_sync			\
-	mpi_irecv				\
-	mpi_barrier				\
 	mpi_redux				\
-	ring					\
 	ring_sync				\
 	ring_sync_detached			\
-	ring_async				\
 	block_interface				\
 	block_interface_pinned			\
 	matrix2					\
@@ -147,13 +150,10 @@ starpu_mpi_TESTS +=				\
 	insert_task_count			\
 	insert_task_seq				\
 	multiple_send				\
-	user_defined_datatype			\
 	tags_checking				\
 	sync					\
-	gather					\
 	gather2					\
-	driver					\
-	sendrecv_bench
+	driver
 
 if STARPU_USE_MPI_MPI
 starpu_mpi_TESTS +=				\