|
@@ -106,10 +106,11 @@
|
|
|
#ifndef NO_LOCK_OR_DELEGATE
|
|
|
|
|
|
/* A LockOrDelegate task list */
|
|
|
-struct LockOrDelegateListNode{
|
|
|
- int (*func)(void*);
|
|
|
- void* data;
|
|
|
- struct LockOrDelegateListNode* next;
|
|
|
+struct LockOrDelegateListNode
|
|
|
+{
|
|
|
+ int (*func)(void*);
|
|
|
+ void* data;
|
|
|
+ struct LockOrDelegateListNode* next;
|
|
|
};
|
|
|
|
|
|
/* If the compiler support C11 and the usage of atomic functions */
|
|
@@ -127,49 +128,56 @@ _Atomic struct LockOrDelegateListNode* dlListHead;
|
|
|
* This function return 1 if the task (and maybe some others) has been done
|
|
|
* by the calling thread and 0 otherwise (if the task has just been put in the list)
|
|
|
*/
|
|
|
-int LockOrDelegatePostOrPerform(int (*func)(void*), void* data){
|
|
|
- /* Get our ticket */
|
|
|
- int insertionPosition = atomic_load(&dlAtomicCounter);
|
|
|
- while (!atomic_compare_exchange_weak(&dlAtomicCounter, &insertionPosition, insertionPosition+1));
|
|
|
-
|
|
|
- /* If we obtain 0 we are responsible of computing all the tasks */
|
|
|
- if(insertionPosition == 0){
|
|
|
- /* start by our current task */
|
|
|
- (*func)(data);
|
|
|
-
|
|
|
- /* Compute task of other and manage ticket */
|
|
|
- while(1){
|
|
|
- assert(atomic_load(&dlAtomicCounter) > 0);
|
|
|
-
|
|
|
- /* Dec ticket and see if something else has to be done */
|
|
|
- int removedPosition = atomic_load(&dlAtomicCounter);
|
|
|
- while(!atomic_compare_exchange_weak(&dlAtomicCounter, &removedPosition,removedPosition-1));
|
|
|
- if(removedPosition-1 == 0){
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- /* Get the next task */
|
|
|
- struct LockOrDelegateListNode* removedNode = (struct LockOrDelegateListNode*)atomic_load(&dlListHead);
|
|
|
- // Maybe it has not been pushed yet (listHead.load() == nullptr)
|
|
|
- while((removedNode = (struct LockOrDelegateListNode*)atomic_load(&dlListHead)) == NULL || !atomic_compare_exchange_weak(&dlListHead, &removedNode,removedNode->next));
|
|
|
- assert(removedNode);
|
|
|
- /* call the task */
|
|
|
- (*removedNode->func)(removedNode->data);
|
|
|
- // Delete node
|
|
|
- free(removedNode);
|
|
|
- }
|
|
|
-
|
|
|
- return 1;
|
|
|
- }
|
|
|
-
|
|
|
- struct LockOrDelegateListNode* newNode = (struct LockOrDelegateListNode*)malloc(sizeof(struct LockOrDelegateListNode));
|
|
|
- assert(newNode);
|
|
|
- newNode->data = data;
|
|
|
- newNode->func = func;
|
|
|
- newNode->next = (struct LockOrDelegateListNode*)atomic_load(&dlListHead);
|
|
|
- while(!atomic_compare_exchange_weak(&dlListHead, &newNode->next, newNode));
|
|
|
-
|
|
|
- return 0;
|
|
|
+int LockOrDelegatePostOrPerform(int (*func)(void*), void* data)
|
|
|
+{
|
|
|
+ /* Get our ticket */
|
|
|
+ int insertionPosition = atomic_load(&dlAtomicCounter);
|
|
|
+ while (!atomic_compare_exchange_weak(&dlAtomicCounter, &insertionPosition, insertionPosition+1))
|
|
|
+ ;
|
|
|
+
|
|
|
+ /* If we obtain 0 we are responsible of computing all the tasks */
|
|
|
+ if(insertionPosition == 0)
|
|
|
+ {
|
|
|
+ /* start by our current task */
|
|
|
+ (*func)(data);
|
|
|
+
|
|
|
+ /* Compute task of other and manage ticket */
|
|
|
+ while(1)
|
|
|
+ {
|
|
|
+ assert(atomic_load(&dlAtomicCounter) > 0);
|
|
|
+
|
|
|
+ /* Dec ticket and see if something else has to be done */
|
|
|
+ int removedPosition = atomic_load(&dlAtomicCounter);
|
|
|
+ while(!atomic_compare_exchange_weak(&dlAtomicCounter, &removedPosition,removedPosition-1));
|
|
|
+ if(removedPosition-1 == 0)
|
|
|
+ {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ /* Get the next task */
|
|
|
+ struct LockOrDelegateListNode* removedNode = (struct LockOrDelegateListNode*)atomic_load(&dlListHead);
|
|
|
+ // Maybe it has not been pushed yet (listHead.load() == nullptr)
|
|
|
+ while((removedNode = (struct LockOrDelegateListNode*)atomic_load(&dlListHead)) == NULL || !atomic_compare_exchange_weak(&dlListHead, &removedNode,removedNode->next))
|
|
|
+ ;
|
|
|
+ assert(removedNode);
|
|
|
+ /* call the task */
|
|
|
+ (*removedNode->func)(removedNode->data);
|
|
|
+ // Delete node
|
|
|
+ free(removedNode);
|
|
|
+ }
|
|
|
+
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
+
|
|
|
+ struct LockOrDelegateListNode* newNode = (struct LockOrDelegateListNode*)malloc(sizeof(struct LockOrDelegateListNode));
|
|
|
+ assert(newNode);
|
|
|
+ newNode->data = data;
|
|
|
+ newNode->func = func;
|
|
|
+ newNode->next = (struct LockOrDelegateListNode*)atomic_load(&dlListHead);
|
|
|
+ while(!atomic_compare_exchange_weak(&dlListHead, &newNode->next, newNode))
|
|
|
+ ;
|
|
|
+
|
|
|
+ return 0;
|
|
|
}
|
|
|
|
|
|
#else
|
|
@@ -192,48 +200,51 @@ pthread_mutex_t dlWorkLock = PTHREAD_MUTEX_INITIALIZER;
|
|
|
* This function return 1 if the task (and maybe some others) has been done
|
|
|
* by the calling thread and 0 otherwise (if the task has just been put in the list)
|
|
|
*/
|
|
|
-int LockOrDelegatePostOrPerform(int (*func)(void*), void* data){
|
|
|
- /* We could avoid to allocate if we will be responsible but for simplicity
|
|
|
- * we always push the task in the list */
|
|
|
- struct LockOrDelegateListNode* newNode = (struct LockOrDelegateListNode*)malloc(sizeof(struct LockOrDelegateListNode));
|
|
|
- assert(newNode);
|
|
|
- newNode->data = data;
|
|
|
- newNode->func = func;
|
|
|
-
|
|
|
- /* insert the node */
|
|
|
- int ret = pthread_mutex_lock(&dlListLock);
|
|
|
- assert(ret == 0);
|
|
|
- newNode->next = dlTaskListHead;
|
|
|
- dlTaskListHead = newNode;
|
|
|
- ret = pthread_mutex_unlock(&dlListLock);
|
|
|
- assert(ret == 0);
|
|
|
-
|
|
|
- /* See if we can compute all the tasks */
|
|
|
- if((ret = pthread_mutex_trylock(&dlWorkLock)) == 0){
|
|
|
- ret = pthread_mutex_lock(&dlListLock);
|
|
|
- assert(ret == 0);
|
|
|
- while(dlTaskListHead != 0){
|
|
|
- struct LockOrDelegateListNode* iter = dlTaskListHead;
|
|
|
- dlTaskListHead = dlTaskListHead->next;
|
|
|
- ret = pthread_mutex_unlock(&dlListLock);
|
|
|
- assert(ret == 0);
|
|
|
-
|
|
|
- (*iter->func)(iter->data);
|
|
|
- free(iter);
|
|
|
- ret = pthread_mutex_lock(&dlListLock);
|
|
|
- assert(ret == 0);
|
|
|
- }
|
|
|
-
|
|
|
- /* First unlock the list! this is important */
|
|
|
- ret = pthread_mutex_unlock(&dlWorkLock);
|
|
|
- assert(ret == 0);
|
|
|
- ret = pthread_mutex_unlock(&dlListLock);
|
|
|
- assert(ret == 0);
|
|
|
-
|
|
|
- return 1;
|
|
|
- }
|
|
|
- assert(ret == EBUSY);
|
|
|
- return 0;
|
|
|
+int LockOrDelegatePostOrPerform(int (*func)(void*), void* data)
|
|
|
+{
|
|
|
+ /* We could avoid to allocate if we will be responsible but for simplicity
|
|
|
+ * we always push the task in the list */
|
|
|
+ struct LockOrDelegateListNode* newNode = (struct LockOrDelegateListNode*)malloc(sizeof(struct LockOrDelegateListNode));
|
|
|
+ assert(newNode);
|
|
|
+ newNode->data = data;
|
|
|
+ newNode->func = func;
|
|
|
+
|
|
|
+ /* insert the node */
|
|
|
+ int ret = pthread_mutex_lock(&dlListLock);
|
|
|
+ assert(ret == 0);
|
|
|
+ newNode->next = dlTaskListHead;
|
|
|
+ dlTaskListHead = newNode;
|
|
|
+ ret = pthread_mutex_unlock(&dlListLock);
|
|
|
+ assert(ret == 0);
|
|
|
+
|
|
|
+ /* See if we can compute all the tasks */
|
|
|
+ if((ret = pthread_mutex_trylock(&dlWorkLock)) == 0)
|
|
|
+ {
|
|
|
+ ret = pthread_mutex_lock(&dlListLock);
|
|
|
+ assert(ret == 0);
|
|
|
+ while(dlTaskListHead != 0)
|
|
|
+ {
|
|
|
+ struct LockOrDelegateListNode* iter = dlTaskListHead;
|
|
|
+ dlTaskListHead = dlTaskListHead->next;
|
|
|
+ ret = pthread_mutex_unlock(&dlListLock);
|
|
|
+ assert(ret == 0);
|
|
|
+
|
|
|
+ (*iter->func)(iter->data);
|
|
|
+ free(iter);
|
|
|
+ ret = pthread_mutex_lock(&dlListLock);
|
|
|
+ assert(ret == 0);
|
|
|
+ }
|
|
|
+
|
|
|
+ /* First unlock the list! this is important */
|
|
|
+ ret = pthread_mutex_unlock(&dlWorkLock);
|
|
|
+ assert(ret == 0);
|
|
|
+ ret = pthread_mutex_unlock(&dlListLock);
|
|
|
+ assert(ret == 0);
|
|
|
+
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
+ assert(ret == EBUSY);
|
|
|
+ return 0;
|
|
|
}
|
|
|
|
|
|
#endif
|
|
@@ -247,245 +258,264 @@ pthread_mutex_t commute_global_mutex = PTHREAD_MUTEX_INITIALIZER;
|
|
|
/* This function find a node that contains the parameter j as job and remove it from the list
|
|
|
* the function return 0 if a node was found and deleted, 1 otherwise
|
|
|
*/
|
|
|
-unsigned remove_job_from_requester_list(struct _starpu_data_requester_list* req_list, struct _starpu_job * j){
|
|
|
- struct _starpu_data_requester * iter = _starpu_data_requester_list_begin(req_list);//_head;
|
|
|
- while(iter != _starpu_data_requester_list_end(req_list) && iter->j != j){
|
|
|
- iter = _starpu_data_requester_list_next(iter); // iter = iter->_next;
|
|
|
- }
|
|
|
- if(iter){
|
|
|
- _starpu_data_requester_list_erase(req_list, iter);
|
|
|
- return 0;
|
|
|
- }
|
|
|
- return 1;
|
|
|
+unsigned remove_job_from_requester_list(struct _starpu_data_requester_list* req_list, struct _starpu_job * j)
|
|
|
+{
|
|
|
+ struct _starpu_data_requester * iter = _starpu_data_requester_list_begin(req_list);//_head;
|
|
|
+ while(iter != _starpu_data_requester_list_end(req_list) && iter->j != j)
|
|
|
+ {
|
|
|
+ iter = _starpu_data_requester_list_next(iter); // iter = iter->_next;
|
|
|
+ }
|
|
|
+ if(iter)
|
|
|
+ {
|
|
|
+ _starpu_data_requester_list_erase(req_list, iter);
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ return 1;
|
|
|
}
|
|
|
|
|
|
#ifndef NO_LOCK_OR_DELEGATE
|
|
|
|
|
|
/* These are the arguments passed to _submit_job_enforce_commute_deps */
|
|
|
-struct EnforceCommuteArgs{
|
|
|
- struct _starpu_job *j;
|
|
|
- unsigned buf;
|
|
|
- unsigned nbuffers;
|
|
|
+struct EnforceCommuteArgs
|
|
|
+{
|
|
|
+ struct _starpu_job *j;
|
|
|
+ unsigned buf;
|
|
|
+ unsigned nbuffers;
|
|
|
};
|
|
|
|
|
|
-int _submit_job_enforce_commute_deps(void* inData){
|
|
|
- struct EnforceCommuteArgs* args = (struct EnforceCommuteArgs*)inData;
|
|
|
- struct _starpu_job *j = args->j;
|
|
|
- unsigned buf = args->buf;
|
|
|
- unsigned nbuffers = args->nbuffers;
|
|
|
- /* we are in charge of freeing the args */
|
|
|
- free(args);
|
|
|
- args = NULL;
|
|
|
- inData = NULL;
|
|
|
+int _submit_job_enforce_commute_deps(void* inData)
|
|
|
+{
|
|
|
+ struct EnforceCommuteArgs* args = (struct EnforceCommuteArgs*)inData;
|
|
|
+ struct _starpu_job *j = args->j;
|
|
|
+ unsigned buf = args->buf;
|
|
|
+ unsigned nbuffers = args->nbuffers;
|
|
|
+ /* we are in charge of freeing the args */
|
|
|
+ free(args);
|
|
|
+ args = NULL;
|
|
|
+ inData = NULL;
|
|
|
#else // NO_LOCK_OR_DELEGATE
|
|
|
-int _submit_job_enforce_commute_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers){
|
|
|
- int ret = pthread_mutex_lock(&commute_global_mutex);
|
|
|
- assert(ret == 0);
|
|
|
+int _submit_job_enforce_commute_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers)
|
|
|
+{
|
|
|
+ int ret = pthread_mutex_lock(&commute_global_mutex);
|
|
|
+ assert(ret == 0);
|
|
|
#endif
|
|
|
|
|
|
- const unsigned nb_non_commute_buff = buf;
|
|
|
- unsigned idx_buf_commute;
|
|
|
- unsigned all_commutes_available = 1;
|
|
|
-
|
|
|
- for (idx_buf_commute = nb_non_commute_buff; idx_buf_commute < nbuffers; idx_buf_commute++)
|
|
|
- {
|
|
|
- if (idx_buf_commute && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute-1)==_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute)))
|
|
|
- continue;
|
|
|
- /* we post all commute */
|
|
|
- starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute);
|
|
|
- enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
|
|
|
- assert(mode & STARPU_COMMUTE);
|
|
|
-
|
|
|
- _starpu_spin_lock(&handle->header_lock);
|
|
|
- if(handle->refcnt == 0){
|
|
|
- handle->refcnt += 1;
|
|
|
- handle->busy_count += 1;
|
|
|
- handle->current_mode = mode;
|
|
|
- _starpu_spin_unlock(&handle->header_lock);
|
|
|
- }
|
|
|
- else{
|
|
|
- /* stop if an handle do not have a refcnt == 0 */
|
|
|
- _starpu_spin_unlock(&handle->header_lock);
|
|
|
- all_commutes_available = 0;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- if(all_commutes_available == 0){
|
|
|
- /* Oups cancel all taken and put req in commute list */
|
|
|
- unsigned idx_buf_cancel;
|
|
|
- for (idx_buf_cancel = nb_non_commute_buff; idx_buf_cancel < idx_buf_commute ; idx_buf_cancel++)
|
|
|
- {
|
|
|
- if (idx_buf_cancel && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel-1)==_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel)))
|
|
|
- continue;
|
|
|
+ const unsigned nb_non_commute_buff = buf;
|
|
|
+ unsigned idx_buf_commute;
|
|
|
+ unsigned all_commutes_available = 1;
|
|
|
+
|
|
|
+ for (idx_buf_commute = nb_non_commute_buff; idx_buf_commute < nbuffers; idx_buf_commute++)
|
|
|
+ {
|
|
|
+ if (idx_buf_commute && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute-1)==_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute)))
|
|
|
+ continue;
|
|
|
+ /* we post all commute */
|
|
|
+ starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute);
|
|
|
+ enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
|
|
|
+ assert(mode & STARPU_COMMUTE);
|
|
|
+
|
|
|
+ _starpu_spin_lock(&handle->header_lock);
|
|
|
+ if(handle->refcnt == 0)
|
|
|
+ {
|
|
|
+ handle->refcnt += 1;
|
|
|
+ handle->busy_count += 1;
|
|
|
+ handle->current_mode = mode;
|
|
|
+ _starpu_spin_unlock(&handle->header_lock);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ /* stop if an handle do not have a refcnt == 0 */
|
|
|
+ _starpu_spin_unlock(&handle->header_lock);
|
|
|
+ all_commutes_available = 0;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if(all_commutes_available == 0)
|
|
|
+ {
|
|
|
+ /* Oups cancel all taken and put req in commute list */
|
|
|
+ unsigned idx_buf_cancel;
|
|
|
+ for (idx_buf_cancel = nb_non_commute_buff; idx_buf_cancel < idx_buf_commute ; idx_buf_cancel++)
|
|
|
+ {
|
|
|
+ if (idx_buf_cancel && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel-1)==_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel)))
|
|
|
+ continue;
|
|
|
|
|
|
- starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
|
|
|
- _starpu_spin_lock(&cancel_handle->header_lock);
|
|
|
- /* reset the counter because finally we do not take the data */
|
|
|
- assert(cancel_handle->refcnt == 1);
|
|
|
- cancel_handle->refcnt -= 1;
|
|
|
- _starpu_spin_unlock(&cancel_handle->header_lock);
|
|
|
- }
|
|
|
-
|
|
|
- for (idx_buf_cancel = nb_non_commute_buff; idx_buf_cancel < nbuffers ; idx_buf_cancel++)
|
|
|
- {
|
|
|
- starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
|
|
|
- enum starpu_data_access_mode cancel_mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_cancel);
|
|
|
-
|
|
|
- assert(cancel_mode & STARPU_COMMUTE);
|
|
|
-
|
|
|
- struct _starpu_data_requester *r = _starpu_data_requester_new();
|
|
|
- r->mode = cancel_mode;
|
|
|
- r->is_requested_by_codelet = 1;
|
|
|
- r->j = j;
|
|
|
- r->buffer_index = idx_buf_cancel;
|
|
|
- r->ready_data_callback = NULL;
|
|
|
- r->argcb = NULL;
|
|
|
-
|
|
|
- _starpu_spin_lock(&cancel_handle->header_lock);
|
|
|
- /* create list if needed */
|
|
|
- if(cancel_handle->commute_req_list == NULL)
|
|
|
- cancel_handle->commute_req_list = _starpu_data_requester_list_new();
|
|
|
- /* store node in list */
|
|
|
- _starpu_data_requester_list_push_front(cancel_handle->commute_req_list, r);
|
|
|
- /* inc the busy count if it has not been changed in the previous loop */
|
|
|
- if(idx_buf_commute <= idx_buf_cancel) cancel_handle->busy_count += 1;
|
|
|
- _starpu_spin_unlock(&cancel_handle->header_lock);
|
|
|
- }
|
|
|
+ starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
|
|
|
+ _starpu_spin_lock(&cancel_handle->header_lock);
|
|
|
+ /* reset the counter because finally we do not take the data */
|
|
|
+ assert(cancel_handle->refcnt == 1);
|
|
|
+ cancel_handle->refcnt -= 1;
|
|
|
+ _starpu_spin_unlock(&cancel_handle->header_lock);
|
|
|
+ }
|
|
|
+
|
|
|
+ for (idx_buf_cancel = nb_non_commute_buff; idx_buf_cancel < nbuffers ; idx_buf_cancel++)
|
|
|
+ {
|
|
|
+ starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
|
|
|
+ enum starpu_data_access_mode cancel_mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_cancel);
|
|
|
+
|
|
|
+ assert(cancel_mode & STARPU_COMMUTE);
|
|
|
+
|
|
|
+ struct _starpu_data_requester *r = _starpu_data_requester_new();
|
|
|
+ r->mode = cancel_mode;
|
|
|
+ r->is_requested_by_codelet = 1;
|
|
|
+ r->j = j;
|
|
|
+ r->buffer_index = idx_buf_cancel;
|
|
|
+ r->ready_data_callback = NULL;
|
|
|
+ r->argcb = NULL;
|
|
|
+
|
|
|
+ _starpu_spin_lock(&cancel_handle->header_lock);
|
|
|
+ /* create list if needed */
|
|
|
+ if(cancel_handle->commute_req_list == NULL)
|
|
|
+ cancel_handle->commute_req_list = _starpu_data_requester_list_new();
|
|
|
+ /* store node in list */
|
|
|
+ _starpu_data_requester_list_push_front(cancel_handle->commute_req_list, r);
|
|
|
+ /* inc the busy count if it has not been changed in the previous loop */
|
|
|
+ if(idx_buf_commute <= idx_buf_cancel)
|
|
|
+ cancel_handle->busy_count += 1;
|
|
|
+ _starpu_spin_unlock(&cancel_handle->header_lock);
|
|
|
+ }
|
|
|
|
|
|
#ifdef NO_LOCK_OR_DELEGATE
|
|
|
- ret = pthread_mutex_unlock(&commute_global_mutex);
|
|
|
- assert(ret == 0);
|
|
|
+ ret = pthread_mutex_unlock(&commute_global_mutex);
|
|
|
+ assert(ret == 0);
|
|
|
#endif
|
|
|
- return 1;
|
|
|
- }
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
|
|
|
- // all_commutes_available is true
|
|
|
- _starpu_push_task(j);
|
|
|
+ // all_commutes_available is true
|
|
|
+ _starpu_push_task(j);
|
|
|
#ifdef NO_LOCK_OR_DELEGATE
|
|
|
- ret = pthread_mutex_unlock(&commute_global_mutex);
|
|
|
- assert(ret == 0);
|
|
|
+ ret = pthread_mutex_unlock(&commute_global_mutex);
|
|
|
+ assert(ret == 0);
|
|
|
#endif
|
|
|
- return 0;
|
|
|
+ return 0;
|
|
|
}
|
|
|
|
|
|
#ifndef NO_LOCK_OR_DELEGATE
|
|
|
-int _starpu_notify_commute_dependencies(void* inData){
|
|
|
- starpu_data_handle_t handle = (starpu_data_handle_t)inData;
|
|
|
+int _starpu_notify_commute_dependencies(void* inData)
|
|
|
+{
|
|
|
+ starpu_data_handle_t handle = (starpu_data_handle_t)inData;
|
|
|
#else // NO_LOCK_OR_DELEGATE
|
|
|
-int _starpu_notify_commute_dependencies(starpu_data_handle_t handle){
|
|
|
- int ret = pthread_mutex_lock(&commute_global_mutex);
|
|
|
- assert(ret == 0);
|
|
|
+int _starpu_notify_commute_dependencies(starpu_data_handle_t handle)
|
|
|
+{
|
|
|
+ int ret = pthread_mutex_lock(&commute_global_mutex);
|
|
|
+ assert(ret == 0);
|
|
|
#endif
|
|
|
- /* Since the request has been posted the handle may have been proceed and released */
|
|
|
- if(handle->commute_req_list == NULL){
|
|
|
+ /* Since the request has been posted the handle may have been proceed and released */
|
|
|
+ if(handle->commute_req_list == NULL)
|
|
|
+ {
|
|
|
#ifdef NO_LOCK_OR_DELEGATE
|
|
|
- ret = pthread_mutex_unlock(&commute_global_mutex);
|
|
|
- assert(ret == 0);
|
|
|
+ ret = pthread_mutex_unlock(&commute_global_mutex);
|
|
|
+ assert(ret == 0);
|
|
|
#endif
|
|
|
- return 1;
|
|
|
- }
|
|
|
- /* no one has the right to work on commute_req_list without a lock on commute_global_mutex
|
|
|
- so we do not need to lock the handle for safety */
|
|
|
- struct _starpu_data_requester *r;
|
|
|
- r = _starpu_data_requester_list_begin(handle->commute_req_list); //_head;
|
|
|
- while(r){
|
|
|
- struct _starpu_job* j = r->j;
|
|
|
- assert(r->mode & STARPU_COMMUTE);
|
|
|
- unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(j->task);
|
|
|
- unsigned nb_non_commute_buff;
|
|
|
- /* find the position of commute buffers */
|
|
|
- for (nb_non_commute_buff = 0; nb_non_commute_buff < nbuffers; nb_non_commute_buff++)
|
|
|
- {
|
|
|
- if (nb_non_commute_buff && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, nb_non_commute_buff-1) == _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, nb_non_commute_buff)))
|
|
|
- continue;
|
|
|
- enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, nb_non_commute_buff);
|
|
|
- if(mode & STARPU_COMMUTE){
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- unsigned idx_buf_commute;
|
|
|
- unsigned all_commutes_available = 1;
|
|
|
-
|
|
|
- for (idx_buf_commute = nb_non_commute_buff; idx_buf_commute < nbuffers; idx_buf_commute++)
|
|
|
- {
|
|
|
- if (idx_buf_commute && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute-1)==_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute)))
|
|
|
- continue;
|
|
|
- /* we post all commute */
|
|
|
- starpu_data_handle_t handle_commute = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute);
|
|
|
- enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
|
|
|
- assert(mode & STARPU_COMMUTE);
|
|
|
-
|
|
|
- _starpu_spin_lock(&handle_commute->header_lock);
|
|
|
- if(handle_commute->refcnt != 0){
|
|
|
- /* handle is not available */
|
|
|
- _starpu_spin_unlock(&handle_commute->header_lock);
|
|
|
- all_commutes_available = 0;
|
|
|
- break;
|
|
|
- }
|
|
|
- /* mark the handle as taken */
|
|
|
- handle_commute->refcnt += 1;
|
|
|
- handle_commute->current_mode = mode;
|
|
|
- _starpu_spin_unlock(&handle_commute->header_lock);
|
|
|
- }
|
|
|
-
|
|
|
- if(all_commutes_available){
|
|
|
- for (idx_buf_commute = nb_non_commute_buff; idx_buf_commute < nbuffers; idx_buf_commute++)
|
|
|
- {
|
|
|
- if (idx_buf_commute && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute-1)==_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute)))
|
|
|
- continue;
|
|
|
- /* we post all commute */
|
|
|
- starpu_data_handle_t handle_commute = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute);
|
|
|
- enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
|
|
|
- assert(mode & STARPU_COMMUTE);
|
|
|
-
|
|
|
- _starpu_spin_lock(&handle_commute->header_lock);
|
|
|
- assert(handle_commute->refcnt == 1);
|
|
|
- assert( handle_commute->busy_count >= 1);
|
|
|
- assert( handle_commute->current_mode == mode);
|
|
|
- const unsigned correctly_deleted = remove_job_from_requester_list(handle_commute->commute_req_list, j);
|
|
|
- assert(correctly_deleted == 0);
|
|
|
- if(_starpu_data_requester_list_empty(handle_commute->commute_req_list)){ // If size == 0
|
|
|
- _starpu_data_requester_list_delete(handle_commute->commute_req_list);
|
|
|
- handle_commute->commute_req_list = NULL;
|
|
|
- }
|
|
|
- _starpu_spin_unlock(&handle_commute->header_lock);
|
|
|
- }
|
|
|
- /* delete list node */
|
|
|
- _starpu_data_requester_delete(r);
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
+ /* no one has the right to work on commute_req_list without a lock on commute_global_mutex
|
|
|
+ so we do not need to lock the handle for safety */
|
|
|
+ struct _starpu_data_requester *r;
|
|
|
+ r = _starpu_data_requester_list_begin(handle->commute_req_list); //_head;
|
|
|
+ while(r)
|
|
|
+ {
|
|
|
+ struct _starpu_job* j = r->j;
|
|
|
+ assert(r->mode & STARPU_COMMUTE);
|
|
|
+ unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(j->task);
|
|
|
+ unsigned nb_non_commute_buff;
|
|
|
+ /* find the position of commute buffers */
|
|
|
+ for (nb_non_commute_buff = 0; nb_non_commute_buff < nbuffers; nb_non_commute_buff++)
|
|
|
+ {
|
|
|
+ if (nb_non_commute_buff && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, nb_non_commute_buff-1) == _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, nb_non_commute_buff)))
|
|
|
+ continue;
|
|
|
+ enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, nb_non_commute_buff);
|
|
|
+ if(mode & STARPU_COMMUTE)
|
|
|
+ {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ unsigned idx_buf_commute;
|
|
|
+ unsigned all_commutes_available = 1;
|
|
|
|
|
|
- /* push the task */
|
|
|
- _starpu_push_task(j);
|
|
|
+ for (idx_buf_commute = nb_non_commute_buff; idx_buf_commute < nbuffers; idx_buf_commute++)
|
|
|
+ {
|
|
|
+ if (idx_buf_commute && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute-1)==_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute)))
|
|
|
+ continue;
|
|
|
+ /* we post all commute */
|
|
|
+ starpu_data_handle_t handle_commute = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute);
|
|
|
+ enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
|
|
|
+ assert(mode & STARPU_COMMUTE);
|
|
|
+
|
|
|
+ _starpu_spin_lock(&handle_commute->header_lock);
|
|
|
+ if(handle_commute->refcnt != 0)
|
|
|
+ {
|
|
|
+ /* handle is not available */
|
|
|
+ _starpu_spin_unlock(&handle_commute->header_lock);
|
|
|
+ all_commutes_available = 0;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ /* mark the handle as taken */
|
|
|
+ handle_commute->refcnt += 1;
|
|
|
+ handle_commute->current_mode = mode;
|
|
|
+ _starpu_spin_unlock(&handle_commute->header_lock);
|
|
|
+ }
|
|
|
+
|
|
|
+ if(all_commutes_available)
|
|
|
+ {
|
|
|
+ for (idx_buf_commute = nb_non_commute_buff; idx_buf_commute < nbuffers; idx_buf_commute++)
|
|
|
+ {
|
|
|
+ if (idx_buf_commute && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute-1)==_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute)))
|
|
|
+ continue;
|
|
|
+ /* we post all commute */
|
|
|
+ starpu_data_handle_t handle_commute = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute);
|
|
|
+ enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
|
|
|
+ assert(mode & STARPU_COMMUTE);
|
|
|
+
|
|
|
+ _starpu_spin_lock(&handle_commute->header_lock);
|
|
|
+ assert(handle_commute->refcnt == 1);
|
|
|
+ assert( handle_commute->busy_count >= 1);
|
|
|
+ assert( handle_commute->current_mode == mode);
|
|
|
+ const unsigned correctly_deleted = remove_job_from_requester_list(handle_commute->commute_req_list, j);
|
|
|
+ assert(correctly_deleted == 0);
|
|
|
+ if(_starpu_data_requester_list_empty(handle_commute->commute_req_list)) // If size == 0
|
|
|
+ {
|
|
|
+ _starpu_data_requester_list_delete(handle_commute->commute_req_list);
|
|
|
+ handle_commute->commute_req_list = NULL;
|
|
|
+ }
|
|
|
+ _starpu_spin_unlock(&handle_commute->header_lock);
|
|
|
+ }
|
|
|
+ /* delete list node */
|
|
|
+ _starpu_data_requester_delete(r);
|
|
|
|
|
|
- /* release global mutex */
|
|
|
+ /* push the task */
|
|
|
+ _starpu_push_task(j);
|
|
|
+
|
|
|
+ /* release global mutex */
|
|
|
#ifdef NO_LOCK_OR_DELEGATE
|
|
|
- ret = pthread_mutex_unlock(&commute_global_mutex);
|
|
|
- assert(ret == 0);
|
|
|
+ ret = pthread_mutex_unlock(&commute_global_mutex);
|
|
|
+ assert(ret == 0);
|
|
|
#endif
|
|
|
- /* We need to lock when returning 0 */
|
|
|
- return 0;
|
|
|
- }
|
|
|
- else{
|
|
|
- unsigned idx_buf_cancel;
|
|
|
- /* all handles are not available - revert the mark */
|
|
|
- for (idx_buf_cancel = nb_non_commute_buff; idx_buf_cancel < idx_buf_commute ; idx_buf_cancel++)
|
|
|
- {
|
|
|
- starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
|
|
|
- _starpu_spin_lock(&cancel_handle->header_lock);
|
|
|
- assert(cancel_handle->refcnt == 1);
|
|
|
- cancel_handle->refcnt -= 1;
|
|
|
- _starpu_spin_unlock(&cancel_handle->header_lock);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- r = r->_next;
|
|
|
- }
|
|
|
- /* no task has been pushed */
|
|
|
+ /* We need to lock when returning 0 */
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ unsigned idx_buf_cancel;
|
|
|
+ /* all handles are not available - revert the mark */
|
|
|
+ for (idx_buf_cancel = nb_non_commute_buff; idx_buf_cancel < idx_buf_commute ; idx_buf_cancel++)
|
|
|
+ {
|
|
|
+ starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
|
|
|
+ _starpu_spin_lock(&cancel_handle->header_lock);
|
|
|
+ assert(cancel_handle->refcnt == 1);
|
|
|
+ cancel_handle->refcnt -= 1;
|
|
|
+ _starpu_spin_unlock(&cancel_handle->header_lock);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ r = r->_next;
|
|
|
+ }
|
|
|
+ /* no task has been pushed */
|
|
|
#ifdef NO_LOCK_OR_DELEGATE
|
|
|
- ret = pthread_mutex_unlock(&commute_global_mutex);
|
|
|
- assert(ret == 0);
|
|
|
+ ret = pthread_mutex_unlock(&commute_global_mutex);
|
|
|
+ assert(ret == 0);
|
|
|
#endif
|
|
|
- return 1;
|
|
|
+ return 1;
|
|
|
}
|
|
|
|
|
|
// WIP_COMMUTE End
|
|
@@ -728,7 +758,8 @@ static unsigned _submit_job_enforce_data_deps(struct _starpu_job *j, unsigned st
|
|
|
|
|
|
// WIP_COMMUTE Begin
|
|
|
enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, buf);
|
|
|
- if(mode & STARPU_COMMUTE){
|
|
|
+ if(mode & STARPU_COMMUTE)
|
|
|
+ {
|
|
|
/* We arrived on the commute we stop and do not proceed as usual */
|
|
|
break;
|
|
|
}
|
|
@@ -742,7 +773,8 @@ static unsigned _submit_job_enforce_data_deps(struct _starpu_job *j, unsigned st
|
|
|
|
|
|
// WIP_COMMUTE Begin
|
|
|
/* We arrive on the commutes */
|
|
|
- if(buf != nbuffers){
|
|
|
+ if(buf != nbuffers)
|
|
|
+ {
|
|
|
#ifndef NO_LOCK_OR_DELEGATE
|
|
|
struct EnforceCommuteArgs* args = (struct EnforceCommuteArgs*)malloc(sizeof(struct EnforceCommuteArgs));
|
|
|
args->j = j;
|
|
@@ -906,7 +938,8 @@ int _starpu_notify_data_dependencies(starpu_data_handle_t handle)
|
|
|
|
|
|
// WIP_COMMUTE Begin
|
|
|
|
|
|
- if(handle->refcnt == 0 && handle->commute_req_list != NULL){
|
|
|
+ if(handle->refcnt == 0 && handle->commute_req_list != NULL)
|
|
|
+ {
|
|
|
/* We need to delloc current handle because it is currently locked
|
|
|
* but we alloc fist the global mutex and than the handles mutex
|
|
|
*/
|