|
@@ -48,32 +48,33 @@ static void _starpu_mpi_task_init(int nb_nodes)
|
|
|
for(i=0 ; i<nb_nodes ; i++) received_data[i] = NULL;
|
|
|
}
|
|
|
|
|
|
-typedef struct _starpu_mpi_clear_data_s {
|
|
|
+typedef struct _starpu_mpi_clear_cache_s {
|
|
|
starpu_data_handle data;
|
|
|
int rank;
|
|
|
int mode;
|
|
|
-} _starpu_mpi_clear_data_t;
|
|
|
+} _starpu_mpi_clear_cache_t;
|
|
|
|
|
|
#define _STARPU_MPI_CLEAR_SENT_DATA 0
|
|
|
#define _STARPU_MPI_CLEAR_RECEIVED_DATA 1
|
|
|
|
|
|
-void _starpu_mpi_clear_data_callback(void *callback_arg)
|
|
|
+void _starpu_mpi_clear_cache_callback(void *callback_arg)
|
|
|
{
|
|
|
- _starpu_mpi_clear_data_t *data_rank = (_starpu_mpi_clear_data_t *)callback_arg;
|
|
|
- uint32_t key = _starpu_crc32_be((uintptr_t)data_rank->data, 0);
|
|
|
+ _starpu_mpi_clear_cache_t *clear_cache = (_starpu_mpi_clear_cache_t *)callback_arg;
|
|
|
+ uint32_t key = _starpu_crc32_be((uintptr_t)clear_cache->data, 0);
|
|
|
|
|
|
- if (data_rank->mode == _STARPU_MPI_CLEAR_SENT_DATA) {
|
|
|
- _STARPU_MPI_DEBUG("Clearing sent cache for data %p and rank %d\n", data_rank->data, data_rank->rank);
|
|
|
- _starpu_htbl_insert_32(&sent_data[data_rank->rank], key, NULL);
|
|
|
+ if (clear_cache->mode == _STARPU_MPI_CLEAR_SENT_DATA) {
|
|
|
+ _STARPU_MPI_DEBUG("Clearing sent cache for data %p and rank %d\n", clear_cache->data, clear_cache->rank);
|
|
|
+ _starpu_htbl_insert_32(&sent_data[clear_cache->rank], key, NULL);
|
|
|
}
|
|
|
- else if (data_rank->mode == _STARPU_MPI_CLEAR_RECEIVED_DATA) {
|
|
|
- _STARPU_MPI_DEBUG("Clearing received cache for data %p and rank %d\n", data_rank->data, data_rank->rank);
|
|
|
- _starpu_htbl_insert_32(&received_data[data_rank->rank], key, NULL);
|
|
|
+ else if (clear_cache->mode == _STARPU_MPI_CLEAR_RECEIVED_DATA) {
|
|
|
+ _STARPU_MPI_DEBUG("Clearing received cache for data %p and rank %d\n", clear_cache->data, clear_cache->rank);
|
|
|
+ _starpu_htbl_insert_32(&received_data[clear_cache->rank], key, NULL);
|
|
|
}
|
|
|
- free(data_rank);
|
|
|
+
|
|
|
+ free(clear_cache);
|
|
|
}
|
|
|
|
|
|
-void _starpu_mpi_clear_data(starpu_data_handle data_handle, int rank, int mode)
|
|
|
+void _starpu_mpi_clear_cache_request(starpu_data_handle data_handle, int rank, int mode)
|
|
|
{
|
|
|
struct starpu_task *task = starpu_task_create();
|
|
|
task->cl = NULL;
|
|
@@ -81,13 +82,13 @@ void _starpu_mpi_clear_data(starpu_data_handle data_handle, int rank, int mode)
|
|
|
task->buffers[0].handle = data_handle;
|
|
|
task->buffers[0].mode = STARPU_RW;
|
|
|
|
|
|
- _starpu_mpi_clear_data_t *data_rank = malloc(sizeof(_starpu_mpi_clear_data_t));
|
|
|
- data_rank->data = data_handle;
|
|
|
- data_rank->rank = rank;
|
|
|
- data_rank->mode = mode;
|
|
|
+ _starpu_mpi_clear_cache_t *clear_cache = malloc(sizeof(_starpu_mpi_clear_cache_t));
|
|
|
+ clear_cache->data = data_handle;
|
|
|
+ clear_cache->rank = rank;
|
|
|
+ clear_cache->mode = mode;
|
|
|
|
|
|
- task->callback_func = _starpu_mpi_clear_data_callback;
|
|
|
- task->callback_arg = data_rank;
|
|
|
+ task->callback_func = _starpu_mpi_clear_cache_callback;
|
|
|
+ task->callback_arg = clear_cache;
|
|
|
starpu_task_submit(task);
|
|
|
}
|
|
|
#endif
|
|
@@ -164,7 +165,7 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...)
|
|
|
* safeguard. */
|
|
|
_STARPU_MPI_DEBUG("oh oh\n");
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
- return;
|
|
|
+ return -EINVAL;
|
|
|
}
|
|
|
int mpi_rank = starpu_data_get_rank(data);
|
|
|
if (mpi_rank == me) {
|
|
@@ -286,7 +287,7 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...)
|
|
|
va_end(varg_list);
|
|
|
|
|
|
if (do_execute) {
|
|
|
- _STARPU_MPI_DEBUG("Execution of the codelet\n");
|
|
|
+ _STARPU_MPI_DEBUG("Execution of the codelet %p\n", codelet);
|
|
|
va_start(varg_list, codelet);
|
|
|
struct starpu_task *task = starpu_task_create();
|
|
|
int ret = starpu_insert_task_create_and_submit(arg_buffer_size, codelet, &task, varg_list);
|
|
@@ -348,7 +349,7 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...)
|
|
|
void *already_sent = _starpu_htbl_search_32(sent_data[n], key);
|
|
|
if (already_sent) {
|
|
|
_STARPU_MPI_DEBUG("Posting request to clear send cache for data %p\n", data);
|
|
|
- _starpu_mpi_clear_data(data, n, _STARPU_MPI_CLEAR_SENT_DATA);
|
|
|
+ _starpu_mpi_clear_cache_request(data, n, _STARPU_MPI_CLEAR_SENT_DATA);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -359,7 +360,7 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...)
|
|
|
/* Somebody else will write to the data, so discard our cached copy if any */
|
|
|
/* TODO: starpu_mpi could just remember itself. */
|
|
|
_STARPU_MPI_DEBUG("Posting request to clear receive cache for data %p\n", data);
|
|
|
- _starpu_mpi_clear_data(data, mpi_rank, _STARPU_MPI_CLEAR_RECEIVED_DATA);
|
|
|
+ _starpu_mpi_clear_cache_request(data, mpi_rank, _STARPU_MPI_CLEAR_RECEIVED_DATA);
|
|
|
_starpu_data_deallocate(data);
|
|
|
}
|
|
|
}
|