|
@@ -309,15 +309,35 @@ int request_data_allocation(data_state *state, uint32_t node)
|
|
|
}
|
|
|
|
|
|
#ifdef NO_DATA_RW_LOCK
|
|
|
+struct state_and_node {
|
|
|
+ data_state *state;
|
|
|
+ unsigned node;
|
|
|
+ pthread_cond_t cond;
|
|
|
+ pthread_mutex_t lock;
|
|
|
+ unsigned finished;
|
|
|
+};
|
|
|
+#endif
|
|
|
+
|
|
|
+#ifdef NO_DATA_RW_LOCK
|
|
|
/* put the current value of the data into RAM */
|
|
|
-static void _starpu_sync_data_with_mem_continuation(void *_state)
|
|
|
+static inline void _starpu_sync_data_with_mem_continuation(void *arg)
|
|
|
{
|
|
|
int ret;
|
|
|
- data_state *state = _state;
|
|
|
+ struct state_and_node *statenode = arg;
|
|
|
+
|
|
|
+ data_state *state = statenode->state;
|
|
|
|
|
|
ret = fetch_data(state, R);
|
|
|
|
|
|
STARPU_ASSERT(!ret);
|
|
|
+
|
|
|
+ /* the application does not need to "lock" the data anymore */
|
|
|
+ notify_data_dependencies(state);
|
|
|
+
|
|
|
+ pthread_mutex_lock(&statenode->lock);
|
|
|
+ statenode->finished = 1;
|
|
|
+ pthread_cond_signal(&statenode->cond);
|
|
|
+ pthread_mutex_unlock(&statenode->lock);
|
|
|
}
|
|
|
#endif
|
|
|
|
|
@@ -326,13 +346,29 @@ void starpu_sync_data_with_mem(data_state *state)
|
|
|
int ret;
|
|
|
|
|
|
#ifdef NO_DATA_RW_LOCK
|
|
|
+ struct state_and_node statenode =
|
|
|
+ {
|
|
|
+ .state = state,
|
|
|
+ .node = 0, /* unused here */
|
|
|
+ .cond = PTHREAD_COND_INITIALIZER,
|
|
|
+ .lock = PTHREAD_MUTEX_INITIALIZER,
|
|
|
+ .finished = 0
|
|
|
+ };
|
|
|
+
|
|
|
/* we try to get the data, if we do not succeed immediately, we set a
|
|
|
* callback function that will be executed automatically when the data is
|
|
|
* available again, otherwise we fetch the data directly */
|
|
|
- if (!attempt_to_submit_data_request_from_apps(state, R, _starpu_sync_data_with_mem_continuation, state))
|
|
|
+ if (!attempt_to_submit_data_request_from_apps(state, R,
|
|
|
+ _starpu_sync_data_with_mem_continuation, &statenode))
|
|
|
{
|
|
|
- ret = fetch_data(state, R);
|
|
|
- STARPU_ASSERT(!ret);
|
|
|
+ /* no one has locked this data yet, so we proceed immediately */
|
|
|
+ _starpu_sync_data_with_mem_continuation(&statenode);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ pthread_mutex_lock(&statenode.lock);
|
|
|
+ if (!statenode.finished)
|
|
|
+ pthread_cond_wait(&statenode.cond, &statenode.lock);
|
|
|
+ pthread_mutex_unlock(&statenode.lock);
|
|
|
}
|
|
|
#else
|
|
|
ret = fetch_data(state, R);
|
|
@@ -340,16 +376,8 @@ void starpu_sync_data_with_mem(data_state *state)
|
|
|
#endif
|
|
|
}
|
|
|
|
|
|
-/* in case the application did modify the data ... invalidate all other copies */
|
|
|
-void notify_data_modification(data_state *state, uint32_t modifying_node)
|
|
|
+static inline void do_notify_data_modification(data_state *state, uint32_t modifying_node)
|
|
|
{
|
|
|
- /* this may block .. XXX */
|
|
|
-#ifndef NO_DATA_RW_LOCK
|
|
|
- take_rw_lock_write(&state->data_lock);
|
|
|
-#else
|
|
|
-#warning notify_data_modification is not supported with NO_DATA_RW_LOCK yet
|
|
|
-#endif
|
|
|
-
|
|
|
take_mutex(&state->header_lock);
|
|
|
|
|
|
unsigned node = 0;
|
|
@@ -360,7 +388,56 @@ void notify_data_modification(data_state *state, uint32_t modifying_node)
|
|
|
}
|
|
|
|
|
|
release_mutex(&state->header_lock);
|
|
|
-#ifndef NO_DATA_RW_LOCK
|
|
|
+}
|
|
|
+
|
|
|
+#ifdef NO_DATA_RW_LOCK
|
|
|
+static inline void _notify_data_modification_continuation(void *arg)
|
|
|
+{
|
|
|
+ struct state_and_node *statenode = arg;
|
|
|
+
|
|
|
+ do_notify_data_modification(statenode->state, statenode->node);
|
|
|
+
|
|
|
+ pthread_mutex_lock(&statenode->lock);
|
|
|
+ statenode->finished = 1;
|
|
|
+ pthread_cond_signal(&statenode->cond);
|
|
|
+ pthread_mutex_unlock(&statenode->lock);
|
|
|
+}
|
|
|
+#endif
|
|
|
+
|
|
|
+/* in case the application did modify the data ... invalidate all other copies */
|
|
|
+void notify_data_modification(data_state *state, uint32_t modifying_node)
|
|
|
+{
|
|
|
+ /* this may block .. XXX */
|
|
|
+#ifdef NO_DATA_RW_LOCK
|
|
|
+ struct state_and_node statenode =
|
|
|
+ {
|
|
|
+ .state = state,
|
|
|
+ .node = modifying_node,
|
|
|
+ .cond = PTHREAD_COND_INITIALIZER,
|
|
|
+ .lock = PTHREAD_MUTEX_INITIALIZER,
|
|
|
+ .finished = 0
|
|
|
+ };
|
|
|
+
|
|
|
+ if (!attempt_to_submit_data_request_from_apps(state, W, _notify_data_modification_continuation, &statenode))
|
|
|
+ {
|
|
|
+ /* we can immediately proceed */
|
|
|
+ do_notify_data_modification(state, modifying_node);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ pthread_mutex_lock(&statenode.lock);
|
|
|
+ if (!statenode.finished)
|
|
|
+ pthread_cond_wait(&statenode.cond, &statenode.lock);
|
|
|
+ pthread_mutex_unlock(&statenode.lock);
|
|
|
+ }
|
|
|
+
|
|
|
+ /* remove the "lock"/reference */
|
|
|
+ notify_data_dependencies(state);
|
|
|
+
|
|
|
+#else
|
|
|
+ take_rw_lock_write(&state->data_lock);
|
|
|
+
|
|
|
+ do_notify_data_modification(state, modifying_node);
|
|
|
+
|
|
|
release_rw_lock(&state->data_lock);
|
|
|
#endif
|
|
|
}
|