|
@@ -15,9 +15,11 @@
|
|
|
*/
|
|
|
|
|
|
#include <datawizard/data_request.h>
|
|
|
+#include <pthread.h>
|
|
|
|
|
|
static data_request_list_t data_requests[MAXNODES];
|
|
|
-static starpu_mutex data_requests_mutex[MAXNODES];
|
|
|
+static pthread_cond_t data_requests_list_cond[MAXNODES];
|
|
|
+static pthread_mutex_t data_requests_list_mutex[MAXNODES];
|
|
|
|
|
|
void init_data_request_lists(void)
|
|
|
{
|
|
@@ -25,7 +27,8 @@ void init_data_request_lists(void)
|
|
|
for (i = 0; i < MAXNODES; i++)
|
|
|
{
|
|
|
data_requests[i] = data_request_list_new();
|
|
|
- init_mutex(&data_requests_mutex[i]);
|
|
|
+ pthread_mutex_init(&data_requests_list_mutex[i], NULL);
|
|
|
+ pthread_cond_init(&data_requests_list_cond[i], NULL);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -34,6 +37,8 @@ void deinit_data_request_lists(void)
|
|
|
unsigned i;
|
|
|
for (i = 0; i < MAXNODES; i++)
|
|
|
{
|
|
|
+ pthread_cond_destroy(&data_requests_list_cond[i]);
|
|
|
+ pthread_mutex_destroy(&data_requests_list_mutex[i]);
|
|
|
data_request_list_delete(data_requests[i]);
|
|
|
}
|
|
|
}
|
|
@@ -47,21 +52,17 @@ int post_data_request(data_state *state, uint32_t src_node, uint32_t dst_node)
|
|
|
r->state = state;
|
|
|
r->src_node = src_node;
|
|
|
r->dst_node = dst_node;
|
|
|
- sem_init(&r->sem, 0, 0);
|
|
|
+
|
|
|
+ r->completed = 0;
|
|
|
|
|
|
/* insert the request in the proper list */
|
|
|
- take_mutex(&data_requests_mutex[src_node]);
|
|
|
+ pthread_mutex_lock(&data_requests_list_mutex[src_node]);
|
|
|
data_request_list_push_front(data_requests[src_node], r);
|
|
|
- release_mutex(&data_requests_mutex[src_node]);
|
|
|
+ pthread_mutex_unlock(&data_requests_list_mutex[src_node]);
|
|
|
|
|
|
/* wake the threads that could perform that operation */
|
|
|
wake_all_blocked_workers_on_node(src_node);
|
|
|
|
|
|
- /* wait for the request to be performed */
|
|
|
- //sem_wait(&r->sem);
|
|
|
- //while(sem_trywait(&r->sem) == -1)
|
|
|
- // wake_all_blocked_workers_on_node(src_node);
|
|
|
-
|
|
|
#ifdef NO_DATA_RW_LOCK
|
|
|
/* XXX: since there is no concurrency on this data (we don't use the
|
|
|
* rw-lock) we can assume that the data on the source node should not
|
|
@@ -72,17 +73,32 @@ int post_data_request(data_state *state, uint32_t src_node, uint32_t dst_node)
|
|
|
release_mutex(&state->header_lock);
|
|
|
#endif
|
|
|
|
|
|
- while(sem_trywait(&r->sem) == -1)
|
|
|
+// /* wait for the request to be performed */
|
|
|
+// while(sem_trywait(&r->sem) == -1)
|
|
|
+// {
|
|
|
+// wake_all_blocked_workers_on_node(src_node);
|
|
|
+// datawizard_progress(dst_node);
|
|
|
+// }
|
|
|
+
|
|
|
+ /* wait for the request to be performed */
|
|
|
+ pthread_mutex_lock(&data_requests_list_mutex[src_node]);
|
|
|
+ while(!r->completed)
|
|
|
{
|
|
|
+ // if we do not progress
|
|
|
+ // pthread_cond_wait(&data_requests_list_cond[src_node], &data_requests_list_mutex[src_node]);
|
|
|
+ pthread_mutex_unlock(&data_requests_list_mutex[src_node]);
|
|
|
+
|
|
|
wake_all_blocked_workers_on_node(src_node);
|
|
|
datawizard_progress(dst_node);
|
|
|
+
|
|
|
+ pthread_mutex_lock(&data_requests_list_mutex[src_node]);
|
|
|
}
|
|
|
+ pthread_mutex_unlock(&data_requests_list_mutex[src_node]);
|
|
|
|
|
|
#ifdef NO_DATA_RW_LOCK
|
|
|
take_mutex(&state->header_lock);
|
|
|
#endif
|
|
|
|
|
|
-
|
|
|
retvalue = r->retval;
|
|
|
|
|
|
/* the request is useless now */
|
|
@@ -93,7 +109,7 @@ int post_data_request(data_state *state, uint32_t src_node, uint32_t dst_node)
|
|
|
|
|
|
void handle_node_data_requests(uint32_t src_node)
|
|
|
{
|
|
|
- take_mutex(&data_requests_mutex[src_node]);
|
|
|
+ pthread_mutex_lock(&data_requests_list_mutex[src_node]);
|
|
|
|
|
|
/* for all entries of the list */
|
|
|
data_request_list_t l = data_requests[src_node];
|
|
@@ -102,7 +118,7 @@ void handle_node_data_requests(uint32_t src_node)
|
|
|
while (!data_request_list_empty(l))
|
|
|
{
|
|
|
r = data_request_list_pop_back(l);
|
|
|
- release_mutex(&data_requests_mutex[src_node]);
|
|
|
+ pthread_mutex_unlock(&data_requests_list_mutex[src_node]);
|
|
|
|
|
|
/* TODO : accounting to see how much time was spent working for other people ... */
|
|
|
|
|
@@ -111,11 +127,13 @@ void handle_node_data_requests(uint32_t src_node)
|
|
|
r->retval = driver_copy_data_1_to_1(r->state, r->src_node, r->dst_node, 0);
|
|
|
|
|
|
/* wake the requesting worker up */
|
|
|
- if (sem_post(&r->sem))
|
|
|
- perror("sem_post");
|
|
|
+ pthread_mutex_lock(&data_requests_list_mutex[src_node]);
|
|
|
+ r->completed = 1;
|
|
|
+
|
|
|
+ // if we do not progress ..
|
|
|
+ // pthread_cond_broadcast(&data_requests_list_cond[src_node]);
|
|
|
|
|
|
- take_mutex(&data_requests_mutex[src_node]);
|
|
|
}
|
|
|
|
|
|
- release_mutex(&data_requests_mutex[src_node]);
|
|
|
+ pthread_mutex_unlock(&data_requests_list_mutex[src_node]);
|
|
|
}
|