|
@@ -1,6 +1,6 @@
|
|
/*
|
|
/*
|
|
* StarPU
|
|
* StarPU
|
|
- * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
|
|
|
|
|
|
+ * Copyright (C) INRIA 2008-2010 (see AUTHORS file)
|
|
*
|
|
*
|
|
* This program is free software; you can redistribute it and/or modify
|
|
* This program is free software; you can redistribute it and/or modify
|
|
* it under the terms of the GNU Lesser General Public License as published by
|
|
* it under the terms of the GNU Lesser General Public License as published by
|
|
@@ -39,8 +39,8 @@ int starpu_data_request_allocation(starpu_data_handle handle, uint32_t node)
|
|
return 0;
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
|
|
-struct state_and_node {
|
|
|
|
- starpu_data_handle state;
|
|
|
|
|
|
+struct user_interaction_wrapper {
|
|
|
|
+ starpu_data_handle handle;
|
|
starpu_access_mode mode;
|
|
starpu_access_mode mode;
|
|
unsigned node;
|
|
unsigned node;
|
|
pthread_cond_t cond;
|
|
pthread_cond_t cond;
|
|
@@ -60,44 +60,44 @@ struct state_and_node {
|
|
/* put the current value of the data into RAM */
|
|
/* put the current value of the data into RAM */
|
|
static void _starpu_sync_data_with_mem_fetch_data_callback(void *arg)
|
|
static void _starpu_sync_data_with_mem_fetch_data_callback(void *arg)
|
|
{
|
|
{
|
|
- struct state_and_node *statenode = arg;
|
|
|
|
- starpu_data_handle handle = statenode->state;
|
|
|
|
|
|
+ struct user_interaction_wrapper *wrapper = arg;
|
|
|
|
+ starpu_data_handle handle = wrapper->handle;
|
|
|
|
|
|
/* At that moment, the caller holds a reference to the piece of data.
|
|
/* At that moment, the caller holds a reference to the piece of data.
|
|
* We enqueue the "post" sync task in the list associated to the handle
|
|
* We enqueue the "post" sync task in the list associated to the handle
|
|
* so that it is submitted by the starpu_data_release_from_mem
|
|
* so that it is submitted by the starpu_data_release_from_mem
|
|
* function. */
|
|
* function. */
|
|
- _starpu_add_post_sync_tasks(statenode->post_sync_task, handle);
|
|
|
|
|
|
+ _starpu_add_post_sync_tasks(wrapper->post_sync_task, handle);
|
|
|
|
|
|
- statenode->callback(statenode->callback_arg);
|
|
|
|
|
|
+ wrapper->callback(wrapper->callback_arg);
|
|
}
|
|
}
|
|
|
|
|
|
static void _starpu_sync_data_with_mem_continuation_non_blocking(void *arg)
|
|
static void _starpu_sync_data_with_mem_continuation_non_blocking(void *arg)
|
|
{
|
|
{
|
|
int ret;
|
|
int ret;
|
|
- struct state_and_node *statenode = arg;
|
|
|
|
|
|
+ struct user_interaction_wrapper *wrapper = arg;
|
|
|
|
|
|
- starpu_data_handle handle = statenode->state;
|
|
|
|
|
|
+ starpu_data_handle handle = wrapper->handle;
|
|
|
|
|
|
STARPU_ASSERT(handle);
|
|
STARPU_ASSERT(handle);
|
|
|
|
|
|
- ret = _starpu_fetch_data_on_node(handle, 0, statenode->mode, 1,
|
|
|
|
- _starpu_sync_data_with_mem_fetch_data_callback, statenode);
|
|
|
|
|
|
+ ret = _starpu_fetch_data_on_node(handle, 0, wrapper->mode, 1,
|
|
|
|
+ _starpu_sync_data_with_mem_fetch_data_callback, wrapper);
|
|
STARPU_ASSERT(!ret);
|
|
STARPU_ASSERT(!ret);
|
|
}
|
|
}
|
|
|
|
|
|
static void starpu_data_sync_with_mem_non_blocking_pre_sync_callback(void *arg)
|
|
static void starpu_data_sync_with_mem_non_blocking_pre_sync_callback(void *arg)
|
|
{
|
|
{
|
|
- struct state_and_node *statenode = arg;
|
|
|
|
|
|
+ struct user_interaction_wrapper *wrapper = arg;
|
|
|
|
|
|
/* we try to get the data, if we do not succeed immediately, we set a
|
|
/* we try to get the data, if we do not succeed immediately, we set a
|
|
* callback function that will be executed automatically when the data is
|
|
* callback function that will be executed automatically when the data is
|
|
* available again, otherwise we fetch the data directly */
|
|
* available again, otherwise we fetch the data directly */
|
|
- if (!_starpu_attempt_to_submit_data_request_from_apps(statenode->state, statenode->mode,
|
|
|
|
- _starpu_sync_data_with_mem_continuation_non_blocking, statenode))
|
|
|
|
|
|
+ if (!_starpu_attempt_to_submit_data_request_from_apps(wrapper->handle, wrapper->mode,
|
|
|
|
+ _starpu_sync_data_with_mem_continuation_non_blocking, wrapper))
|
|
{
|
|
{
|
|
/* no one has locked this data yet, so we proceed immediately */
|
|
/* no one has locked this data yet, so we proceed immediately */
|
|
- _starpu_sync_data_with_mem_continuation_non_blocking(statenode);
|
|
|
|
|
|
+ _starpu_sync_data_with_mem_continuation_non_blocking(wrapper);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -107,16 +107,16 @@ int starpu_data_sync_with_mem_non_blocking(starpu_data_handle handle,
|
|
{
|
|
{
|
|
STARPU_ASSERT(handle);
|
|
STARPU_ASSERT(handle);
|
|
|
|
|
|
- struct state_and_node *statenode = malloc(sizeof(struct state_and_node));
|
|
|
|
- STARPU_ASSERT(statenode);
|
|
|
|
|
|
+ struct user_interaction_wrapper *wrapper = malloc(sizeof(struct user_interaction_wrapper));
|
|
|
|
+ STARPU_ASSERT(wrapper);
|
|
|
|
|
|
- statenode->state = handle;
|
|
|
|
- statenode->mode = mode;
|
|
|
|
- statenode->callback = callback;
|
|
|
|
- statenode->callback_arg = arg;
|
|
|
|
- PTHREAD_COND_INIT(&statenode->cond, NULL);
|
|
|
|
- PTHREAD_MUTEX_INIT(&statenode->lock, NULL);
|
|
|
|
- statenode->finished = 0;
|
|
|
|
|
|
+ wrapper->handle = handle;
|
|
|
|
+ wrapper->mode = mode;
|
|
|
|
+ wrapper->callback = callback;
|
|
|
|
+ wrapper->callback_arg = arg;
|
|
|
|
+ PTHREAD_COND_INIT(&wrapper->cond, NULL);
|
|
|
|
+ PTHREAD_MUTEX_INIT(&wrapper->lock, NULL);
|
|
|
|
+ wrapper->finished = 0;
|
|
|
|
|
|
#warning TODO instead of having the is_prefetch argument, _starpu_fetch_data shoud consider two flags: async and detached
|
|
#warning TODO instead of having the is_prefetch argument, _starpu_fetch_data shoud consider two flags: async and detached
|
|
_starpu_spin_lock(&handle->header_lock);
|
|
_starpu_spin_lock(&handle->header_lock);
|
|
@@ -127,25 +127,25 @@ int starpu_data_sync_with_mem_non_blocking(starpu_data_handle handle,
|
|
int sequential_consistency = handle->sequential_consistency;
|
|
int sequential_consistency = handle->sequential_consistency;
|
|
if (sequential_consistency)
|
|
if (sequential_consistency)
|
|
{
|
|
{
|
|
- statenode->pre_sync_task = starpu_task_create();
|
|
|
|
- statenode->pre_sync_task->detach = 1;
|
|
|
|
- statenode->pre_sync_task->callback_func = starpu_data_sync_with_mem_non_blocking_pre_sync_callback;
|
|
|
|
- statenode->pre_sync_task->callback_arg = statenode;
|
|
|
|
|
|
+ wrapper->pre_sync_task = starpu_task_create();
|
|
|
|
+ wrapper->pre_sync_task->detach = 1;
|
|
|
|
+ wrapper->pre_sync_task->callback_func = starpu_data_sync_with_mem_non_blocking_pre_sync_callback;
|
|
|
|
+ wrapper->pre_sync_task->callback_arg = wrapper;
|
|
|
|
|
|
- statenode->post_sync_task = starpu_task_create();
|
|
|
|
- statenode->post_sync_task->detach = 1;
|
|
|
|
|
|
+ wrapper->post_sync_task = starpu_task_create();
|
|
|
|
+ wrapper->post_sync_task->detach = 1;
|
|
|
|
|
|
- _starpu_detect_implicit_data_deps_with_handle(statenode->pre_sync_task, statenode->post_sync_task, handle, mode);
|
|
|
|
|
|
+ _starpu_detect_implicit_data_deps_with_handle(wrapper->pre_sync_task, wrapper->post_sync_task, handle, mode);
|
|
PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
|
|
PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
|
|
|
|
|
|
/* TODO detect if this is superflous */
|
|
/* TODO detect if this is superflous */
|
|
- int ret = starpu_task_submit(statenode->pre_sync_task);
|
|
|
|
|
|
+ int ret = starpu_task_submit(wrapper->pre_sync_task);
|
|
STARPU_ASSERT(!ret);
|
|
STARPU_ASSERT(!ret);
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
|
|
PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
|
|
|
|
|
|
- starpu_data_sync_with_mem_non_blocking_pre_sync_callback(statenode);
|
|
|
|
|
|
+ starpu_data_sync_with_mem_non_blocking_pre_sync_callback(wrapper);
|
|
}
|
|
}
|
|
|
|
|
|
return 0;
|
|
return 0;
|
|
@@ -156,19 +156,19 @@ int starpu_data_sync_with_mem_non_blocking(starpu_data_handle handle,
|
|
*/
|
|
*/
|
|
static inline void _starpu_sync_data_with_mem_continuation(void *arg)
|
|
static inline void _starpu_sync_data_with_mem_continuation(void *arg)
|
|
{
|
|
{
|
|
- struct state_and_node *statenode = arg;
|
|
|
|
|
|
+ struct user_interaction_wrapper *wrapper = arg;
|
|
|
|
|
|
- starpu_data_handle handle = statenode->state;
|
|
|
|
|
|
+ starpu_data_handle handle = wrapper->handle;
|
|
|
|
|
|
STARPU_ASSERT(handle);
|
|
STARPU_ASSERT(handle);
|
|
|
|
|
|
- _starpu_fetch_data_on_node(handle, 0, statenode->mode, 0, NULL, NULL);
|
|
|
|
|
|
+ _starpu_fetch_data_on_node(handle, 0, wrapper->mode, 0, NULL, NULL);
|
|
|
|
|
|
/* continuation of starpu_data_sync_with_mem */
|
|
/* continuation of starpu_data_sync_with_mem */
|
|
- PTHREAD_MUTEX_LOCK(&statenode->lock);
|
|
|
|
- statenode->finished = 1;
|
|
|
|
- PTHREAD_COND_SIGNAL(&statenode->cond);
|
|
|
|
- PTHREAD_MUTEX_UNLOCK(&statenode->lock);
|
|
|
|
|
|
+ PTHREAD_MUTEX_LOCK(&wrapper->lock);
|
|
|
|
+ wrapper->finished = 1;
|
|
|
|
+ PTHREAD_COND_SIGNAL(&wrapper->cond);
|
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&wrapper->lock);
|
|
}
|
|
}
|
|
|
|
|
|
/* The data must be released by calling starpu_data_release_from_mem later on */
|
|
/* The data must be released by calling starpu_data_release_from_mem later on */
|
|
@@ -180,9 +180,9 @@ int starpu_data_sync_with_mem(starpu_data_handle handle, starpu_access_mode mode
|
|
if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
|
|
if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
|
|
return -EDEADLK;
|
|
return -EDEADLK;
|
|
|
|
|
|
- struct state_and_node statenode =
|
|
|
|
|
|
+ struct user_interaction_wrapper wrapper =
|
|
{
|
|
{
|
|
- .state = handle,
|
|
|
|
|
|
+ .handle = handle,
|
|
.mode = mode,
|
|
.mode = mode,
|
|
.node = 0, // unused
|
|
.node = 0, // unused
|
|
.cond = PTHREAD_COND_INITIALIZER,
|
|
.cond = PTHREAD_COND_INITIALIZER,
|
|
@@ -195,20 +195,20 @@ int starpu_data_sync_with_mem(starpu_data_handle handle, starpu_access_mode mode
|
|
int sequential_consistency = handle->sequential_consistency;
|
|
int sequential_consistency = handle->sequential_consistency;
|
|
if (sequential_consistency)
|
|
if (sequential_consistency)
|
|
{
|
|
{
|
|
- statenode.pre_sync_task = starpu_task_create();
|
|
|
|
- statenode.pre_sync_task->detach = 0;
|
|
|
|
|
|
+ wrapper.pre_sync_task = starpu_task_create();
|
|
|
|
+ wrapper.pre_sync_task->detach = 0;
|
|
|
|
|
|
- statenode.post_sync_task = starpu_task_create();
|
|
|
|
- statenode.post_sync_task->detach = 1;
|
|
|
|
|
|
+ wrapper.post_sync_task = starpu_task_create();
|
|
|
|
+ wrapper.post_sync_task->detach = 1;
|
|
|
|
|
|
- _starpu_detect_implicit_data_deps_with_handle(statenode.pre_sync_task, statenode.post_sync_task, handle, mode);
|
|
|
|
|
|
+ _starpu_detect_implicit_data_deps_with_handle(wrapper.pre_sync_task, wrapper.post_sync_task, handle, mode);
|
|
PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
|
|
PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
|
|
|
|
|
|
/* TODO detect if this is superflous */
|
|
/* TODO detect if this is superflous */
|
|
- statenode.pre_sync_task->synchronous = 1;
|
|
|
|
- int ret = starpu_task_submit(statenode.pre_sync_task);
|
|
|
|
|
|
+ wrapper.pre_sync_task->synchronous = 1;
|
|
|
|
+ int ret = starpu_task_submit(wrapper.pre_sync_task);
|
|
STARPU_ASSERT(!ret);
|
|
STARPU_ASSERT(!ret);
|
|
- //starpu_task_wait(statenode.pre_sync_task);
|
|
|
|
|
|
+ //starpu_task_wait(wrapper.pre_sync_task);
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
|
|
PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
|
|
@@ -218,24 +218,24 @@ int starpu_data_sync_with_mem(starpu_data_handle handle, starpu_access_mode mode
|
|
* callback function that will be executed automatically when the data is
|
|
* callback function that will be executed automatically when the data is
|
|
* available again, otherwise we fetch the data directly */
|
|
* available again, otherwise we fetch the data directly */
|
|
if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode,
|
|
if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode,
|
|
- _starpu_sync_data_with_mem_continuation, &statenode))
|
|
|
|
|
|
+ _starpu_sync_data_with_mem_continuation, &wrapper))
|
|
{
|
|
{
|
|
/* no one has locked this data yet, so we proceed immediately */
|
|
/* no one has locked this data yet, so we proceed immediately */
|
|
int ret = _starpu_fetch_data_on_node(handle, 0, mode, 0, NULL, NULL);
|
|
int ret = _starpu_fetch_data_on_node(handle, 0, mode, 0, NULL, NULL);
|
|
STARPU_ASSERT(!ret);
|
|
STARPU_ASSERT(!ret);
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
- PTHREAD_MUTEX_LOCK(&statenode.lock);
|
|
|
|
- while (!statenode.finished)
|
|
|
|
- PTHREAD_COND_WAIT(&statenode.cond, &statenode.lock);
|
|
|
|
- PTHREAD_MUTEX_UNLOCK(&statenode.lock);
|
|
|
|
|
|
+ PTHREAD_MUTEX_LOCK(&wrapper.lock);
|
|
|
|
+ while (!wrapper.finished)
|
|
|
|
+ PTHREAD_COND_WAIT(&wrapper.cond, &wrapper.lock);
|
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&wrapper.lock);
|
|
}
|
|
}
|
|
|
|
|
|
/* At that moment, the caller holds a reference to the piece of data.
|
|
/* At that moment, the caller holds a reference to the piece of data.
|
|
* We enqueue the "post" sync task in the list associated to the handle
|
|
* We enqueue the "post" sync task in the list associated to the handle
|
|
* so that it is submitted by the starpu_data_release_from_mem
|
|
* so that it is submitted by the starpu_data_release_from_mem
|
|
* function. */
|
|
* function. */
|
|
- _starpu_add_post_sync_tasks(statenode.post_sync_task, handle);
|
|
|
|
|
|
+ _starpu_add_post_sync_tasks(wrapper.post_sync_task, handle);
|
|
|
|
|
|
return 0;
|
|
return 0;
|
|
}
|
|
}
|
|
@@ -255,22 +255,22 @@ void starpu_data_release_from_mem(starpu_data_handle handle)
|
|
|
|
|
|
static void _prefetch_data_on_node(void *arg)
|
|
static void _prefetch_data_on_node(void *arg)
|
|
{
|
|
{
|
|
- struct state_and_node *statenode = arg;
|
|
|
|
|
|
+ struct user_interaction_wrapper *wrapper = arg;
|
|
int ret;
|
|
int ret;
|
|
|
|
|
|
- ret = _starpu_fetch_data_on_node(statenode->state, statenode->node, STARPU_R, statenode->async, NULL, NULL);
|
|
|
|
|
|
+ ret = _starpu_fetch_data_on_node(wrapper->handle, wrapper->node, STARPU_R, wrapper->async, NULL, NULL);
|
|
STARPU_ASSERT(!ret);
|
|
STARPU_ASSERT(!ret);
|
|
|
|
|
|
- PTHREAD_MUTEX_LOCK(&statenode->lock);
|
|
|
|
- statenode->finished = 1;
|
|
|
|
- PTHREAD_COND_SIGNAL(&statenode->cond);
|
|
|
|
- PTHREAD_MUTEX_UNLOCK(&statenode->lock);
|
|
|
|
|
|
+ PTHREAD_MUTEX_LOCK(&wrapper->lock);
|
|
|
|
+ wrapper->finished = 1;
|
|
|
|
+ PTHREAD_COND_SIGNAL(&wrapper->cond);
|
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&wrapper->lock);
|
|
|
|
|
|
- if (!statenode->async)
|
|
|
|
|
|
+ if (!wrapper->async)
|
|
{
|
|
{
|
|
- _starpu_spin_lock(&statenode->state->header_lock);
|
|
|
|
- _starpu_notify_data_dependencies(statenode->state);
|
|
|
|
- _starpu_spin_unlock(&statenode->state->header_lock);
|
|
|
|
|
|
+ _starpu_spin_lock(&wrapper->handle->header_lock);
|
|
|
|
+ _starpu_notify_data_dependencies(wrapper->handle);
|
|
|
|
+ _starpu_spin_unlock(&wrapper->handle->header_lock);
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|
|
@@ -283,9 +283,9 @@ int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle handle, unsigned
|
|
if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
|
|
if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
|
|
return -EDEADLK;
|
|
return -EDEADLK;
|
|
|
|
|
|
- struct state_and_node statenode =
|
|
|
|
|
|
+ struct user_interaction_wrapper wrapper =
|
|
{
|
|
{
|
|
- .state = handle,
|
|
|
|
|
|
+ .handle = handle,
|
|
.node = node,
|
|
.node = node,
|
|
.async = async,
|
|
.async = async,
|
|
.cond = PTHREAD_COND_INITIALIZER,
|
|
.cond = PTHREAD_COND_INITIALIZER,
|
|
@@ -293,7 +293,7 @@ int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle handle, unsigned
|
|
.finished = 0
|
|
.finished = 0
|
|
};
|
|
};
|
|
|
|
|
|
- if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _prefetch_data_on_node, &statenode))
|
|
|
|
|
|
+ if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _prefetch_data_on_node, &wrapper))
|
|
{
|
|
{
|
|
/* we can immediately proceed */
|
|
/* we can immediately proceed */
|
|
_starpu_fetch_data_on_node(handle, node, mode, async, NULL, NULL);
|
|
_starpu_fetch_data_on_node(handle, node, mode, async, NULL, NULL);
|
|
@@ -307,10 +307,10 @@ int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle handle, unsigned
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
- PTHREAD_MUTEX_LOCK(&statenode.lock);
|
|
|
|
- while (!statenode.finished)
|
|
|
|
- PTHREAD_COND_WAIT(&statenode.cond, &statenode.lock);
|
|
|
|
- PTHREAD_MUTEX_UNLOCK(&statenode.lock);
|
|
|
|
|
|
+ PTHREAD_MUTEX_LOCK(&wrapper.lock);
|
|
|
|
+ while (!wrapper.finished)
|
|
|
|
+ PTHREAD_COND_WAIT(&wrapper.cond, &wrapper.lock);
|
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&wrapper.lock);
|
|
}
|
|
}
|
|
|
|
|
|
return 0;
|
|
return 0;
|