浏览代码

add priorities on data requests from mpi

Lucas Nesi 4 年之前
父节点
当前提交
03d52ba8eb

+ 1 - 1
include/starpu_data.h

@@ -313,7 +313,7 @@ int starpu_data_acquire_on_node_cb_sequential_consistency_quick(starpu_data_hand
 
    This is a very internal interface, subject to changes, do not use this.
 */
-int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode, void (*callback_acquired)(void *arg, int *node, enum starpu_data_access_mode mode), void (*callback)(void *arg), void *arg, int sequential_consistency, int quick, long *pre_sync_jobid, long *post_sync_jobid);
+int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode, void (*callback_acquired)(void *arg, int *node, enum starpu_data_access_mode mode), void (*callback)(void *arg), void *arg, int sequential_consistency, int quick, long *pre_sync_jobid, long *post_sync_jobid, int prio);
 
 /**
    The application can call this function instead of starpu_data_acquire() so as to

+ 5 - 0
mpi/include/starpu_mpi.h

@@ -232,6 +232,11 @@ int starpu_mpi_isend_detached_prio(starpu_data_handle_t data_handle, int dest, s
 int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, void (*callback)(void *), void *arg);
 
 /**
+   Same of starpu_mpi_irecv_detached but with the \p prio parameter.
+*/
+int starpu_mpi_irecv_detached_prio(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, int prio, MPI_Comm comm, void (*callback)(void *), void *arg);
+
+/**
    Post a nonblocking receive in \p data_handle from the node \p
    source using the message tag \p data_tag within the communicator \p
    comm. On completion, the \p callback function is called with the

+ 3 - 3
mpi/src/mpi/starpu_mpi_mpi.c

@@ -111,7 +111,7 @@ static int posted_requests = 0, ready_requests = 0, newer_requests, mpi_wait_for
 #define _STARPU_MPI_INC_POSTED_REQUESTS(value) { STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
 #define _STARPU_MPI_INC_READY_REQUESTS(value) { STARPU_PTHREAD_MUTEX_LOCK(&mutex_ready_requests); ready_requests += value; STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_ready_requests); }
 
-extern struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count);
+extern struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count, int prio);
 
 #ifdef STARPU_SIMGRID
 #pragma weak smpi_simulated_main_
@@ -255,7 +255,7 @@ void _starpu_mpi_submit_ready_request(void *arg)
 
 				_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
 				// FIXME: when buffer == NULL, do not hardcode acquiring on early_data_handle->buffer_node, to just acquire where the data happens to have been stored by MPI
-				starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(early_data_handle->handle,early_data_handle->buffer_node,STARPU_R,NULL,_starpu_mpi_early_data_cb,(void*) cb_args,  1, 0, NULL, NULL);
+				starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(early_data_handle->handle,early_data_handle->buffer_node,STARPU_R,NULL,_starpu_mpi_early_data_cb,(void*) cb_args,  1, 0, NULL, NULL, req->prio);
 				STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 			}
 			else
@@ -1194,7 +1194,7 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
 	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 	early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
 							  early_data_handle->node_tag.data_tag, comm, 1, 0,
-							  NULL, NULL, 1, 1, envelope->size);
+							  NULL, NULL, 1, 1, envelope->size, STARPU_DEFAULT_PRIO);
 	/* The early data handle is ready, we can let _starpu_mpi_submit_ready_request
 	 * proceed with acquiring it */
 	STARPU_PTHREAD_MUTEX_UNLOCK(&early_data_mutex);

+ 17 - 7
mpi/src/starpu_mpi.c

@@ -161,12 +161,12 @@ static void _starpu_mpi_isend_irecv_common(struct _starpu_mpi_req *req, enum sta
 
 	if (sequential_consistency)
 	{
-		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, node, mode, _starpu_mpi_acquired_callback, _starpu_mpi_submit_ready_request, (void *)req, 1 /*sequential consistency*/, 1, &req->pre_sync_jobid, &req->post_sync_jobid);
+		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, node, mode, _starpu_mpi_acquired_callback, _starpu_mpi_submit_ready_request, (void *)req, 1 /*sequential consistency*/, 1, &req->pre_sync_jobid, &req->post_sync_jobid, req->prio);
 	}
 	else
 	{
 		/* post_sync_job_id has already been filled */
-		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, node, mode, _starpu_mpi_acquired_callback, _starpu_mpi_submit_ready_request, (void *)req, 0 /*sequential consistency*/, 1, &req->pre_sync_jobid, NULL);
+		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, node, mode, _starpu_mpi_acquired_callback, _starpu_mpi_submit_ready_request, (void *)req, 0 /*sequential consistency*/, 1, &req->pre_sync_jobid, NULL, req->prio);
 	}
 }
 
@@ -289,7 +289,7 @@ int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, starp
 	return starpu_mpi_issend_detached_prio(data_handle, dest, data_tag, 0, comm, callback, arg);
 }
 
-struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count)
+struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count, int prio)
 {
 	if (_starpu_mpi_fake_world_size != -1)
 	{
@@ -297,7 +297,7 @@ struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handl
 		return NULL;
 	}
 
-	struct _starpu_mpi_req *req = _starpu_mpi_request_fill(data_handle, source, data_tag, comm, detached, sync, 0, callback, arg, RECV_REQ, _mpi_backend._starpu_mpi_backend_irecv_size_func, sequential_consistency, is_internal_req, count);
+	struct _starpu_mpi_req *req = _starpu_mpi_request_fill(data_handle, source, data_tag, comm, detached, sync, prio, callback, arg, RECV_REQ, _mpi_backend._starpu_mpi_backend_irecv_size_func, sequential_consistency, is_internal_req, count);
 	_starpu_mpi_req_willpost(req);
 
 	if (sequential_consistency == 0)
@@ -317,7 +317,7 @@ int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
 
 	struct _starpu_mpi_req *req;
 	_STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(source, data_tag);
-	req = _starpu_mpi_irecv_common(data_handle, source, data_tag, comm, 0, 0, NULL, NULL, 1, 0, 0);
+	req = _starpu_mpi_irecv_common(data_handle, source, data_tag, comm, 0, 0, NULL, NULL, 1, 0, 0, STARPU_DEFAULT_PRIO);
 	_STARPU_MPI_TRACE_IRECV_COMPLETE_END(source, data_tag);
 
 	STARPU_MPI_ASSERT_MSG(req, "Invalid return for _starpu_mpi_irecv_common");
@@ -331,7 +331,17 @@ int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, star
 {
 	_STARPU_MPI_LOG_IN();
 
-	_starpu_mpi_irecv_common(data_handle, source, data_tag, comm, 1, 0, callback, arg, 1, 0, 0);
+	_starpu_mpi_irecv_common(data_handle, source, data_tag, comm, 1, 0, callback, arg, 1, 0, 0, STARPU_DEFAULT_PRIO);
+	_STARPU_MPI_LOG_OUT();
+	return 0;
+}
+
+int starpu_mpi_irecv_detached_prio(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, int prio, MPI_Comm comm, void (*callback)(void *), void *arg)
+{
+	_STARPU_MPI_LOG_IN();
+
+	_starpu_mpi_irecv_common(data_handle, source, data_tag, comm, 1, 0, callback, arg, 1, 0, 0, prio);
+
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 }
@@ -340,7 +350,7 @@ int starpu_mpi_irecv_detached_sequential_consistency(starpu_data_handle_t data_h
 {
 	_STARPU_MPI_LOG_IN();
 
-	_starpu_mpi_irecv_common(data_handle, source, data_tag, comm, 1, 0, callback, arg, sequential_consistency, 0, 0);
+	_starpu_mpi_irecv_common(data_handle, source, data_tag, comm, 1, 0, callback, arg, sequential_consistency, 0, 0, STARPU_DEFAULT_PRIO);
 
 	_STARPU_MPI_LOG_OUT();
 	return 0;

+ 1 - 2
mpi/src/starpu_mpi_coop_sends.c

@@ -297,8 +297,7 @@ void _starpu_mpi_coop_send(starpu_data_handle_t data_handle, struct _starpu_mpi_
 
 	if (first)
 		/* We were first, we are responsible for acquiring the data for everybody */
-		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, -1, mode, _starpu_mpi_coop_send_acquired_callback, _starpu_mpi_coop_sends_data_ready, coop_sends, sequential_consistency, 0, &coop_sends->pre_sync_jobid, NULL);
+		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, -1, mode, _starpu_mpi_coop_send_acquired_callback, _starpu_mpi_coop_sends_data_ready, coop_sends, sequential_consistency, 0, &coop_sends->pre_sync_jobid, NULL, req->prio);
 	else
 		req->pre_sync_jobid = coop_sends->pre_sync_jobid;
 }
-

+ 1 - 1
mpi/src/starpu_mpi_task_insert.c

@@ -118,7 +118,7 @@ void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum
 				if (data_tag == -1)
 					_STARPU_ERROR("StarPU needs to be told the MPI tag of this data, using starpu_mpi_data_register\n");
 				_STARPU_MPI_DEBUG(1, "Receiving data %p from %d\n", data, mpi_rank);
-				starpu_mpi_irecv_detached(data, mpi_rank, data_tag, comm, NULL, NULL);
+				starpu_mpi_irecv_detached_prio(data, mpi_rank, data_tag, prio, comm, NULL, NULL);
 			}
 			// else the node has already received the data
 		}

+ 5 - 2
src/datawizard/user_interactions.c

@@ -191,7 +191,7 @@ int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_dat
 							  void (*callback)(void *arg),
 							  void *arg,
 							  int sequential_consistency, int quick,
-							  long *pre_sync_jobid, long *post_sync_jobid)
+							  long *pre_sync_jobid, long *post_sync_jobid, int prio)
 {
 	STARPU_ASSERT(handle);
 	STARPU_ASSERT_MSG(handle->nchildren == 0, "Acquiring a partitioned data (%p) is not possible", handle);
@@ -211,6 +211,7 @@ int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_dat
 	wrapper->callback_arg = arg;
 	wrapper->pre_sync_task = NULL;
 	wrapper->post_sync_task = NULL;
+	wrapper->prio = prio;
 
 	STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
 	int handle_sequential_consistency = handle->sequential_consistency;
@@ -225,6 +226,7 @@ int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_dat
 		wrapper->pre_sync_task->callback_func = starpu_data_acquire_cb_pre_sync_callback;
 		wrapper->pre_sync_task->callback_arg = wrapper;
 		wrapper->pre_sync_task->type = STARPU_TASK_TYPE_DATA_ACQUIRE;
+		wrapper->pre_sync_task->priority = prio;
 		pre_sync_job = _starpu_get_job_associated_to_task(wrapper->pre_sync_task);
 		if (pre_sync_jobid)
 			*pre_sync_jobid = pre_sync_job->job_id;
@@ -233,6 +235,7 @@ int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_dat
 		wrapper->post_sync_task->name = "_starpu_data_acquire_cb_release";
 		wrapper->post_sync_task->detach = 1;
 		wrapper->post_sync_task->type = STARPU_TASK_TYPE_DATA_ACQUIRE;
+		wrapper->post_sync_task->priority = prio;
 		post_sync_job = _starpu_get_job_associated_to_task(wrapper->post_sync_task);
 		if (post_sync_jobid)
 			*post_sync_jobid = post_sync_job->job_id;
@@ -280,7 +283,7 @@ int starpu_data_acquire_on_node_cb_sequential_consistency_quick(starpu_data_hand
 							  enum starpu_data_access_mode mode, void (*callback)(void *), void *arg,
 							  int sequential_consistency, int quick)
 {
-	return starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(handle, node, mode, NULL, callback, arg, sequential_consistency, quick, NULL, NULL);
+	return starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(handle, node, mode, NULL, callback, arg, sequential_consistency, quick, NULL, NULL, STARPU_DEFAULT_PRIO);
 }
 
 int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t handle, int node,