|
@@ -185,11 +185,11 @@ static int _starpu_LockOrDelegatePostOrPerform(starpu_arbiter_t arbiter, void (*
|
|
static unsigned remove_job_from_requester_list(struct _starpu_data_requester_list* req_list, struct _starpu_job * j)
|
|
static unsigned remove_job_from_requester_list(struct _starpu_data_requester_list* req_list, struct _starpu_job * j)
|
|
{
|
|
{
|
|
struct _starpu_data_requester * iter = _starpu_data_requester_list_begin(req_list);//_head;
|
|
struct _starpu_data_requester * iter = _starpu_data_requester_list_begin(req_list);//_head;
|
|
- while(iter != _starpu_data_requester_list_end(req_list) && iter->j != j)
|
|
|
|
|
|
+ while (iter != _starpu_data_requester_list_end(req_list) && iter->j != j)
|
|
{
|
|
{
|
|
iter = _starpu_data_requester_list_next(iter); // iter = iter->_next;
|
|
iter = _starpu_data_requester_list_next(iter); // iter = iter->_next;
|
|
}
|
|
}
|
|
- if(iter)
|
|
|
|
|
|
+ if (iter)
|
|
{
|
|
{
|
|
_starpu_data_requester_list_erase(req_list, iter);
|
|
_starpu_data_requester_list_erase(req_list, iter);
|
|
return 0;
|
|
return 0;
|
|
@@ -243,16 +243,16 @@ void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned b
|
|
STARPU_ASSERT(arbiter);
|
|
STARPU_ASSERT(arbiter);
|
|
|
|
|
|
const unsigned nb_non_arbitered_buff = buf;
|
|
const unsigned nb_non_arbitered_buff = buf;
|
|
- unsigned idx_buf_commute;
|
|
|
|
- unsigned all_commutes_available = 1;
|
|
|
|
|
|
+ unsigned idx_buf_arbiter;
|
|
|
|
+ unsigned all_arbiter_available = 1;
|
|
|
|
|
|
|
|
|
|
- for (idx_buf_commute = nb_non_arbitered_buff; idx_buf_commute < nbuffers; idx_buf_commute++)
|
|
|
|
|
|
+ for (idx_buf_arbiter = nb_non_arbitered_buff; idx_buf_arbiter < nbuffers; idx_buf_arbiter++)
|
|
{
|
|
{
|
|
- starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute);
|
|
|
|
- enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
|
|
|
|
|
|
+ starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter);
|
|
|
|
+ enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_arbiter);
|
|
|
|
|
|
- if (idx_buf_commute && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute-1)==handle))
|
|
|
|
|
|
+ if (idx_buf_arbiter && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter-1)==handle))
|
|
/* We have already requested this data, skip it. This
|
|
/* We have already requested this data, skip it. This
|
|
* depends on ordering putting writes before reads, see
|
|
* depends on ordering putting writes before reads, see
|
|
* _starpu_compar_handles. */
|
|
* _starpu_compar_handles. */
|
|
@@ -264,9 +264,9 @@ void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned b
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
|
|
|
|
- /* we post all commute */
|
|
|
|
|
|
+ /* we post all arbiter */
|
|
_starpu_spin_lock(&handle->header_lock);
|
|
_starpu_spin_lock(&handle->header_lock);
|
|
- if(handle->refcnt == 0)
|
|
|
|
|
|
+ if (handle->refcnt == 0)
|
|
{
|
|
{
|
|
handle->refcnt += 1;
|
|
handle->refcnt += 1;
|
|
handle->busy_count += 1;
|
|
handle->busy_count += 1;
|
|
@@ -277,15 +277,15 @@ void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned b
|
|
{
|
|
{
|
|
/* stop if an handle do not have a refcnt == 0 */
|
|
/* stop if an handle do not have a refcnt == 0 */
|
|
_starpu_spin_unlock(&handle->header_lock);
|
|
_starpu_spin_unlock(&handle->header_lock);
|
|
- all_commutes_available = 0;
|
|
|
|
|
|
+ all_arbiter_available = 0;
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- if(all_commutes_available == 0)
|
|
|
|
|
|
+ if (all_arbiter_available == 0)
|
|
{
|
|
{
|
|
- /* Oups cancel all taken and put req in commute list */
|
|
|
|
|
|
+ /* Oups cancel all taken and put req in arbiter list */
|
|
unsigned idx_buf_cancel;
|
|
unsigned idx_buf_cancel;
|
|
- for (idx_buf_cancel = nb_non_arbitered_buff; idx_buf_cancel < idx_buf_commute ; idx_buf_cancel++)
|
|
|
|
|
|
+ for (idx_buf_cancel = nb_non_arbitered_buff; idx_buf_cancel < idx_buf_arbiter ; idx_buf_cancel++)
|
|
{
|
|
{
|
|
starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
|
|
starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
|
|
|
|
|
|
@@ -320,12 +320,12 @@ void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned b
|
|
|
|
|
|
_starpu_spin_lock(&cancel_handle->header_lock);
|
|
_starpu_spin_lock(&cancel_handle->header_lock);
|
|
/* create list if needed */
|
|
/* create list if needed */
|
|
- if(cancel_handle->arbitered_req_list == NULL)
|
|
|
|
|
|
+ if (cancel_handle->arbitered_req_list == NULL)
|
|
cancel_handle->arbitered_req_list = _starpu_data_requester_list_new();
|
|
cancel_handle->arbitered_req_list = _starpu_data_requester_list_new();
|
|
/* store node in list */
|
|
/* store node in list */
|
|
_starpu_data_requester_list_push_front(cancel_handle->arbitered_req_list, r);
|
|
_starpu_data_requester_list_push_front(cancel_handle->arbitered_req_list, r);
|
|
/* inc the busy count if it has not been changed in the previous loop */
|
|
/* inc the busy count if it has not been changed in the previous loop */
|
|
- if(idx_buf_commute <= idx_buf_cancel)
|
|
|
|
|
|
+ if (idx_buf_arbiter <= idx_buf_cancel)
|
|
cancel_handle->busy_count += 1;
|
|
cancel_handle->busy_count += 1;
|
|
_starpu_spin_unlock(&cancel_handle->header_lock);
|
|
_starpu_spin_unlock(&cancel_handle->header_lock);
|
|
}
|
|
}
|
|
@@ -339,11 +339,11 @@ void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned b
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
|
|
#endif
|
|
#endif
|
|
|
|
|
|
- // all_commutes_available is true
|
|
|
|
- if (idx_buf_commute < nbuffers)
|
|
|
|
|
|
+ // all_arbiter_available is true
|
|
|
|
+ if (idx_buf_arbiter < nbuffers)
|
|
{
|
|
{
|
|
/* Other arbitered data, process them */
|
|
/* Other arbitered data, process them */
|
|
- return _starpu_submit_job_enforce_arbitered_deps(j, idx_buf_commute, nbuffers);
|
|
|
|
|
|
+ return _starpu_submit_job_enforce_arbitered_deps(j, idx_buf_arbiter, nbuffers);
|
|
}
|
|
}
|
|
/* Finished with all data, can eventually push! */
|
|
/* Finished with all data, can eventually push! */
|
|
_starpu_push_task(j);
|
|
_starpu_push_task(j);
|
|
@@ -373,7 +373,7 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
|
|
#endif
|
|
#endif
|
|
|
|
|
|
/* Since the request has been posted the handle may have been proceed and released */
|
|
/* Since the request has been posted the handle may have been proceed and released */
|
|
- if(handle->arbitered_req_list == NULL)
|
|
|
|
|
|
+ if (handle->arbitered_req_list == NULL)
|
|
{
|
|
{
|
|
#ifndef LOCK_OR_DELEGATE
|
|
#ifndef LOCK_OR_DELEGATE
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
|
|
@@ -384,84 +384,84 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
|
|
so we do not need to lock the handle for safety */
|
|
so we do not need to lock the handle for safety */
|
|
struct _starpu_data_requester *r;
|
|
struct _starpu_data_requester *r;
|
|
r = _starpu_data_requester_list_begin(handle->arbitered_req_list); //_head;
|
|
r = _starpu_data_requester_list_begin(handle->arbitered_req_list); //_head;
|
|
- while(r)
|
|
|
|
|
|
+ while (r)
|
|
{
|
|
{
|
|
struct _starpu_job* j = r->j;
|
|
struct _starpu_job* j = r->j;
|
|
unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(j->task);
|
|
unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(j->task);
|
|
unsigned nb_non_arbitered_buff;
|
|
unsigned nb_non_arbitered_buff;
|
|
- /* find the position of commute buffers */
|
|
|
|
|
|
+ /* find the position of arbiter buffers */
|
|
for (nb_non_arbitered_buff = 0; nb_non_arbitered_buff < nbuffers; nb_non_arbitered_buff++)
|
|
for (nb_non_arbitered_buff = 0; nb_non_arbitered_buff < nbuffers; nb_non_arbitered_buff++)
|
|
{
|
|
{
|
|
- starpu_data_handle_t handle_commute = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, nb_non_arbitered_buff);
|
|
|
|
- if (nb_non_arbitered_buff && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, nb_non_arbitered_buff-1) == handle_commute))
|
|
|
|
|
|
+ starpu_data_handle_t handle_arbiter = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, nb_non_arbitered_buff);
|
|
|
|
+ if (nb_non_arbitered_buff && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, nb_non_arbitered_buff-1) == handle_arbiter))
|
|
/* We have already requested this data, skip it. This
|
|
/* We have already requested this data, skip it. This
|
|
* depends on ordering putting writes before reads, see
|
|
* depends on ordering putting writes before reads, see
|
|
* _starpu_compar_handles. */
|
|
* _starpu_compar_handles. */
|
|
continue;
|
|
continue;
|
|
enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, nb_non_arbitered_buff);
|
|
enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, nb_non_arbitered_buff);
|
|
- if(handle_commute->arbiter == arbiter)
|
|
|
|
|
|
+ if (handle_arbiter->arbiter == arbiter)
|
|
{
|
|
{
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- unsigned idx_buf_commute;
|
|
|
|
- unsigned all_commutes_available = 1;
|
|
|
|
|
|
+ unsigned idx_buf_arbiter;
|
|
|
|
+ unsigned all_arbiter_available = 1;
|
|
|
|
|
|
- for (idx_buf_commute = nb_non_arbitered_buff; idx_buf_commute < nbuffers; idx_buf_commute++)
|
|
|
|
|
|
+ for (idx_buf_arbiter = nb_non_arbitered_buff; idx_buf_arbiter < nbuffers; idx_buf_arbiter++)
|
|
{
|
|
{
|
|
- starpu_data_handle_t handle_commute = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute);
|
|
|
|
- if (idx_buf_commute && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute-1)==handle_commute))
|
|
|
|
|
|
+ starpu_data_handle_t handle_arbiter = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter);
|
|
|
|
+ if (idx_buf_arbiter && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter-1)==handle_arbiter))
|
|
/* We have already requested this data, skip it. This
|
|
/* We have already requested this data, skip it. This
|
|
* depends on ordering putting writes before reads, see
|
|
* depends on ordering putting writes before reads, see
|
|
* _starpu_compar_handles. */
|
|
* _starpu_compar_handles. */
|
|
continue;
|
|
continue;
|
|
- if (handle_commute->arbiter != arbiter)
|
|
|
|
|
|
+ if (handle_arbiter->arbiter != arbiter)
|
|
/* Will have to process another arbiter, will do that later */
|
|
/* Will have to process another arbiter, will do that later */
|
|
break;
|
|
break;
|
|
|
|
|
|
- /* we post all commute */
|
|
|
|
- enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
|
|
|
|
|
|
+ /* we post all arbiter */
|
|
|
|
+ enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_arbiter);
|
|
|
|
|
|
- _starpu_spin_lock(&handle_commute->header_lock);
|
|
|
|
- if(handle_commute->refcnt != 0)
|
|
|
|
|
|
+ _starpu_spin_lock(&handle_arbiter->header_lock);
|
|
|
|
+ if (handle_arbiter->refcnt != 0)
|
|
{
|
|
{
|
|
/* handle is not available, record ourself */
|
|
/* handle is not available, record ourself */
|
|
- _starpu_spin_unlock(&handle_commute->header_lock);
|
|
|
|
- all_commutes_available = 0;
|
|
|
|
|
|
+ _starpu_spin_unlock(&handle_arbiter->header_lock);
|
|
|
|
+ all_arbiter_available = 0;
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
/* mark the handle as taken */
|
|
/* mark the handle as taken */
|
|
- handle_commute->refcnt += 1;
|
|
|
|
- handle_commute->current_mode = mode;
|
|
|
|
- _starpu_spin_unlock(&handle_commute->header_lock);
|
|
|
|
|
|
+ handle_arbiter->refcnt += 1;
|
|
|
|
+ handle_arbiter->current_mode = mode;
|
|
|
|
+ _starpu_spin_unlock(&handle_arbiter->header_lock);
|
|
}
|
|
}
|
|
|
|
|
|
- if(all_commutes_available)
|
|
|
|
|
|
+ if (all_arbiter_available)
|
|
{
|
|
{
|
|
- for (idx_buf_commute = nb_non_arbitered_buff; idx_buf_commute < nbuffers; idx_buf_commute++)
|
|
|
|
|
|
+ for (idx_buf_arbiter = nb_non_arbitered_buff; idx_buf_arbiter < nbuffers; idx_buf_arbiter++)
|
|
{
|
|
{
|
|
- starpu_data_handle_t handle_commute = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute);
|
|
|
|
- if (idx_buf_commute && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute-1)==handle_commute))
|
|
|
|
|
|
+ starpu_data_handle_t handle_arbiter = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter);
|
|
|
|
+ if (idx_buf_arbiter && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter-1)==handle_arbiter))
|
|
continue;
|
|
continue;
|
|
- if (handle_commute->arbiter != arbiter)
|
|
|
|
|
|
+ if (handle_arbiter->arbiter != arbiter)
|
|
break;
|
|
break;
|
|
|
|
|
|
- /* we post all commute */
|
|
|
|
- enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
|
|
|
|
|
|
+ /* we post all arbiter */
|
|
|
|
+ enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_arbiter);
|
|
|
|
|
|
- _starpu_spin_lock(&handle_commute->header_lock);
|
|
|
|
- STARPU_ASSERT(handle_commute->refcnt == 1);
|
|
|
|
- STARPU_ASSERT( handle_commute->busy_count >= 1);
|
|
|
|
- STARPU_ASSERT( handle_commute->current_mode == mode);
|
|
|
|
- const unsigned correctly_deleted = remove_job_from_requester_list(handle_commute->arbitered_req_list, j);
|
|
|
|
|
|
+ _starpu_spin_lock(&handle_arbiter->header_lock);
|
|
|
|
+ STARPU_ASSERT(handle_arbiter->refcnt == 1);
|
|
|
|
+ STARPU_ASSERT( handle_arbiter->busy_count >= 1);
|
|
|
|
+ STARPU_ASSERT( handle_arbiter->current_mode == mode);
|
|
|
|
+ const unsigned correctly_deleted = remove_job_from_requester_list(handle_arbiter->arbitered_req_list, j);
|
|
STARPU_ASSERT(correctly_deleted == 0);
|
|
STARPU_ASSERT(correctly_deleted == 0);
|
|
- if(_starpu_data_requester_list_empty(handle_commute->arbitered_req_list)) // If size == 0
|
|
|
|
|
|
+ if (_starpu_data_requester_list_empty(handle_arbiter->arbitered_req_list)) // If size == 0
|
|
{
|
|
{
|
|
- _starpu_data_requester_list_delete(handle_commute->arbitered_req_list);
|
|
|
|
- handle_commute->arbitered_req_list = NULL;
|
|
|
|
|
|
+ _starpu_data_requester_list_delete(handle_arbiter->arbitered_req_list);
|
|
|
|
+ handle_arbiter->arbitered_req_list = NULL;
|
|
}
|
|
}
|
|
- _starpu_spin_unlock(&handle_commute->header_lock);
|
|
|
|
|
|
+ _starpu_spin_unlock(&handle_arbiter->header_lock);
|
|
}
|
|
}
|
|
/* Remove and delete list node */
|
|
/* Remove and delete list node */
|
|
_starpu_data_requester_delete(r);
|
|
_starpu_data_requester_delete(r);
|
|
@@ -471,10 +471,10 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
|
|
#endif
|
|
#endif
|
|
|
|
|
|
- if (idx_buf_commute < nbuffers)
|
|
|
|
|
|
+ if (idx_buf_arbiter < nbuffers)
|
|
{
|
|
{
|
|
/* Other arbitered data, process them */
|
|
/* Other arbitered data, process them */
|
|
- _starpu_submit_job_enforce_arbitered_deps(j, idx_buf_commute, nbuffers);
|
|
|
|
|
|
+ _starpu_submit_job_enforce_arbitered_deps(j, idx_buf_arbiter, nbuffers);
|
|
}
|
|
}
|
|
else
|
|
else
|
|
/* Finished with all data, can eventually push! */
|
|
/* Finished with all data, can eventually push! */
|
|
@@ -487,7 +487,7 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
|
|
{
|
|
{
|
|
unsigned idx_buf_cancel;
|
|
unsigned idx_buf_cancel;
|
|
/* all handles are not available - revert the mark */
|
|
/* all handles are not available - revert the mark */
|
|
- for (idx_buf_cancel = nb_non_arbitered_buff; idx_buf_cancel < idx_buf_commute ; idx_buf_cancel++)
|
|
|
|
|
|
+ for (idx_buf_cancel = nb_non_arbitered_buff; idx_buf_cancel < idx_buf_arbiter ; idx_buf_cancel++)
|
|
{
|
|
{
|
|
starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
|
|
starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
|
|
if (idx_buf_cancel && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel-1)==cancel_handle))
|
|
if (idx_buf_cancel && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel-1)==cancel_handle))
|