Explorar o código

integrate latest version of Bérenger's patch

Samuel Thibault %!s(int64=10) %!d(string=hai) anos
pai
achega
8be59f3fbf

+ 512 - 8
src/core/dependencies/data_concurrency.c

@@ -51,6 +51,445 @@
  * (starpu_data_acquire).
  */
 
+// WIP_COMMUTE Begin
+
+#define NO_LOCK_OR_DELEGATE
+
+/* Here are the high level algorithms which have been discussed in order
+ * to manage the commutes.
+    Pour chaque handle h en commute:
+        mutex_lock(&arbiter)
+        relâcher h
+        Pour chaque tâche Tc en attente sur le handle:
+            // Juste tester si on peut prendre:
+            Pour chaque donnée Tc_h qu’il attend:
+                Si Tc_h est occupé, goto fail
+            // Vraiment prendre
+            Pour chaque donnée Tc_h qu’il attend:
+                lock(Tc_h)
+                prendre(h) (il devrait être encore disponible si tout le reste utilise bien le mutex arbiter)
+                lock(Tc_h)
+            // on a trouvé quelqu’un, on a fini!
+            _starpu_push_task(Tc);
+            break;
+            fail:
+                // Pas de bol, on essaie une autre tâche
+                continue;
+        // relâcher un peu le mutex arbiter de temps en temps
+        mutex_unlock(&arbiter)
+
+    mutex_lock(&arbiter)
+    Pour chaque handle h en commute:
+        lock(h)
+        essayer de prendre h, si échec goto fail;
+        unlock(h)
+        mutex_unlock(&arbiter)
+        return 0
+
+        fail:
+            // s’enregistrer sur la liste des requêtes de h
+            Pour chaque handle déjà pris:
+                lock(handle)
+                relâcher handle
+                unlock(handle)
+     mutex_unlock(&arbiter)
+ */
+
+/* Here are the LockOrDelegate functions
+ * There are two version depending on the support of the compare and exchange
+ * support from the compiler
+ */
+
+#include <assert.h>
+#include <stdlib.h>
+
+#ifndef NO_LOCK_OR_DELEGATE
+
+/* A LockOrDelegate task list */
+struct LockOrDelegateListNode{
+    int (*func)(void*);
+    void* data;
+    struct LockOrDelegateListNode* next;
+};
+
+/* If the compiler support C11 and the usage of atomic functions */
+#if (201112L <= __STDC_VERSION__) && !(defined(__STDC_NO_ATOMICS__))
+
+#include <stdatomic.h>
+
+/* To know the number of task to perform and attributes the tickets */
+atomic_int dlAtomicCounter;
+/* The list of task to perform */
+_Atomic struct LockOrDelegateListNode* dlListHead;
+
+/* Post a task to perfom if possible, otherwise put it in the list
+ * If we can perfom this task, we may also perfom all the tasks in the list
+ * This function return 1 if the task (and maybe some others) has been done
+ * by the calling thread and 0 otherwise (if the task has just been put in the list)
+ */
+int LockOrDelegatePostOrPerform(int (*func)(void*), void* data){
+    /* Get our ticket */
+    int insertionPosition = atomic_load(&dlAtomicCounter);
+    while (!atomic_compare_exchange_weak(&dlAtomicCounter, &insertionPosition, insertionPosition+1));
+
+    /* If we obtain 0 we are responsible of computing all the tasks */
+    if(insertionPosition == 0){
+        /* start by our current task */
+        (*func)(data);
+
+        /* Compute task of other and manage ticket */
+        while(1){
+            assert(atomic_load(&dlAtomicCounter) > 0);
+
+            /* Dec ticket and see if something else has to be done */
+            int removedPosition = atomic_load(&dlAtomicCounter);
+            while(!atomic_compare_exchange_weak(&dlAtomicCounter, &removedPosition,removedPosition-1));
+            if(removedPosition-1 == 0){
+                break;
+            }
+
+            /* Get the next task */
+            struct LockOrDelegateListNode* removedNode = (struct LockOrDelegateListNode*)atomic_load(&dlListHead);
+            // Maybe it has not been pushed yet (listHead.load() == nullptr)
+            while((removedNode = (struct LockOrDelegateListNode*)atomic_load(&dlListHead)) == NULL || !atomic_compare_exchange_weak(&dlListHead, &removedNode,removedNode->next));
+            assert(removedNode);
+            /* call the task */
+            (*removedNode->func)(removedNode->data);
+            // Delete node
+            free(removedNode);
+        }
+
+        return 1;
+    }
+
+    struct LockOrDelegateListNode* newNode = (struct LockOrDelegateListNode*)malloc(sizeof(struct LockOrDelegateListNode));
+    assert(newNode);
+    newNode->data = data;
+    newNode->func = func;
+    newNode->next = (struct LockOrDelegateListNode*)atomic_load(&dlListHead);
+    while(!atomic_compare_exchange_weak(&dlListHead, &newNode->next, newNode));
+
+    return 0;
+}
+
+#else
+/* We cannot rely on the C11 atomics */
+#warning Lock based version of Lock or Delegate
+
+#include <pthread.h>
+#include <errno.h>
+
+/* The list of task to perform */
+struct LockOrDelegateListNode* dlTaskListHead = NULL;
+
+/* To protect the list of tasks */
+pthread_mutex_t dlListLock = PTHREAD_MUTEX_INITIALIZER;
+/* To know who is responsible to compute all the tasks */
+pthread_mutex_t dlWorkLock = PTHREAD_MUTEX_INITIALIZER;
+
+/* Post a task to perfom if possible, otherwise put it in the list
+ * If we can perfom this task, we may also perfom all the tasks in the list
+ * This function return 1 if the task (and maybe some others) has been done
+ * by the calling thread and 0 otherwise (if the task has just been put in the list)
+ */
+int LockOrDelegatePostOrPerform(int (*func)(void*), void* data){
+    /* We could avoid to allocate if we will be responsible but for simplicity
+     * we always push the task in the list */
+    struct LockOrDelegateListNode* newNode = (struct LockOrDelegateListNode*)malloc(sizeof(struct LockOrDelegateListNode));
+    assert(newNode);
+    newNode->data = data;
+    newNode->func = func;
+
+    /* insert the node */
+    int ret = pthread_mutex_lock(&dlListLock);
+    assert(ret == 0);
+    newNode->next = dlTaskListHead;
+    dlTaskListHead = newNode;
+    ret = pthread_mutex_unlock(&dlListLock);
+    assert(ret == 0);
+
+    /* See if we can compute all the tasks */
+    if((ret = pthread_mutex_trylock(&dlWorkLock)) == 0){
+        ret = pthread_mutex_lock(&dlListLock);
+        assert(ret == 0);
+        while(dlTaskListHead != 0){
+            struct LockOrDelegateListNode* iter = dlTaskListHead;
+            dlTaskListHead = dlTaskListHead->next;
+            ret = pthread_mutex_unlock(&dlListLock);
+            assert(ret == 0);
+
+            (*iter->func)(iter->data);
+            free(iter);
+            ret = pthread_mutex_lock(&dlListLock);
+            assert(ret == 0);
+        }
+
+        /* First unlock the list! this is important */
+        ret = pthread_mutex_unlock(&dlWorkLock);
+        assert(ret == 0);
+        ret = pthread_mutex_unlock(&dlListLock);
+        assert(ret == 0);
+
+        return 1;
+    }
+    assert(ret == EBUSY);
+    return 0;
+}
+
+#endif
+
+#else // NO_LOCK_OR_DELEGATE
+
+pthread_mutex_t commute_global_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+#endif
+
+/* This function find a node that contains the parameter j as job and remove it from the list
+ * the function return 0 if a node was found and deleted, 1 otherwise
+ */
+unsigned remove_job_from_requester_list(struct _starpu_data_requester_list* req_list, struct _starpu_job * j){
+    struct _starpu_data_requester * iter = _starpu_data_requester_list_begin(req_list);//_head;
+    while(iter != _starpu_data_requester_list_end(req_list) && iter->j != j){
+        iter = _starpu_data_requester_list_next(iter); // iter = iter->_next;
+    }
+    if(iter){
+        _starpu_data_requester_list_erase(req_list, iter);
+        return 0;
+    }
+    return 1;
+}
+
+#ifndef NO_LOCK_OR_DELEGATE
+
+/* These are the arguments passed to _submit_job_enforce_commute_deps */
+struct EnforceCommuteArgs{
+    struct _starpu_job *j;
+    unsigned buf;
+    unsigned nbuffers;
+};
+
+int _submit_job_enforce_commute_deps(void* inData){
+    struct EnforceCommuteArgs* args = (struct EnforceCommuteArgs*)inData;
+    struct _starpu_job *j = args->j;
+    unsigned buf          = args->buf;
+    unsigned nbuffers     = args->nbuffers;
+    /* we are in charge of freeing the args */
+    free(args);
+    args = NULL;
+    inData = NULL;
+#else // NO_LOCK_OR_DELEGATE
+int _submit_job_enforce_commute_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers){
+    int ret = pthread_mutex_lock(&commute_global_mutex);
+    assert(ret == 0);
+#endif
+
+    const unsigned nb_non_commute_buff = buf;
+    unsigned idx_buf_commute;
+    unsigned all_commutes_available = 1;
+
+    for (idx_buf_commute = nb_non_commute_buff; idx_buf_commute < nbuffers; idx_buf_commute++)
+    {
+        if (idx_buf_commute && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute-1)==_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute)))
+            continue;
+        /* we post all commute  */
+        starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute);
+        enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
+        assert(mode & STARPU_COMMUTE);
+
+        _starpu_spin_lock(&handle->header_lock);
+        if(handle->refcnt == 0){
+            handle->refcnt += 1;
+            handle->busy_count += 1;
+            handle->current_mode = mode;
+            _starpu_spin_unlock(&handle->header_lock);
+        }
+        else{
+            /* stop if an handle do not have a refcnt == 0 */
+            _starpu_spin_unlock(&handle->header_lock);
+            all_commutes_available = 0;
+            break;
+        }
+    }
+    if(all_commutes_available == 0){
+        /* Oups cancel all taken and put req in commute list */
+        unsigned idx_buf_cancel;
+        for (idx_buf_cancel = nb_non_commute_buff; idx_buf_cancel < idx_buf_commute ; idx_buf_cancel++)
+        {
+            if (idx_buf_cancel && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel-1)==_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel)))
+                continue;
+
+            starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
+            _starpu_spin_lock(&cancel_handle->header_lock);
+            /* reset the counter because finally we do not take the data */
+            assert(cancel_handle->refcnt == 1);
+            cancel_handle->refcnt -= 1;
+            _starpu_spin_unlock(&cancel_handle->header_lock);
+        }
+
+        for (idx_buf_cancel = nb_non_commute_buff; idx_buf_cancel < nbuffers ; idx_buf_cancel++)
+        {
+            starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
+            enum starpu_data_access_mode cancel_mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_cancel);
+
+            assert(cancel_mode & STARPU_COMMUTE);
+
+            struct _starpu_data_requester *r = _starpu_data_requester_new();
+            r->mode = cancel_mode;
+            r->is_requested_by_codelet = 1;
+            r->j = j;
+            r->buffer_index = idx_buf_cancel;
+            r->ready_data_callback = NULL;
+            r->argcb = NULL;
+
+            _starpu_spin_lock(&cancel_handle->header_lock);
+            /* create list if needed */
+            if(cancel_handle->commute_req_list == NULL)
+                cancel_handle->commute_req_list = _starpu_data_requester_list_new();
+            /* store node in list */
+            _starpu_data_requester_list_push_front(cancel_handle->commute_req_list, r);
+            /* inc the busy count if it has not been changed in the previous loop */
+            if(idx_buf_commute <= idx_buf_cancel) cancel_handle->busy_count += 1;
+            _starpu_spin_unlock(&cancel_handle->header_lock);
+        }
+
+#ifdef NO_LOCK_OR_DELEGATE
+        ret = pthread_mutex_unlock(&commute_global_mutex);
+        assert(ret == 0);
+#endif
+        return 1;
+    }
+
+    // all_commutes_available is true
+    _starpu_push_task(j);
+#ifdef NO_LOCK_OR_DELEGATE
+    ret = pthread_mutex_unlock(&commute_global_mutex);
+    assert(ret == 0);
+#endif
+    return 0;
+}
+
+#ifndef NO_LOCK_OR_DELEGATE
+int _starpu_notify_commute_dependencies(void* inData){
+    starpu_data_handle_t handle = (starpu_data_handle_t)inData;
+#else // NO_LOCK_OR_DELEGATE
+int _starpu_notify_commute_dependencies(starpu_data_handle_t handle){
+    int ret = pthread_mutex_lock(&commute_global_mutex);
+    assert(ret == 0);
+#endif
+    /* Since the request has been posted the handle may have been proceed and released */
+    if(handle->commute_req_list == NULL){
+#ifdef NO_LOCK_OR_DELEGATE
+        ret = pthread_mutex_unlock(&commute_global_mutex);
+        assert(ret == 0);
+#endif
+        return 1;
+    }
+    /* no one has the right to work on commute_req_list without a lock on commute_global_mutex
+       so we do not need to lock the handle for safety */
+    struct _starpu_data_requester *r;
+    r = _starpu_data_requester_list_begin(handle->commute_req_list); //_head;
+    while(r){
+        struct _starpu_job* j = r->j;
+        assert(r->mode & STARPU_COMMUTE);
+        unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(j->task);
+        unsigned nb_non_commute_buff;
+        /* find the position of commute buffers */
+        for (nb_non_commute_buff = 0; nb_non_commute_buff < nbuffers; nb_non_commute_buff++)
+        {
+            if (nb_non_commute_buff && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, nb_non_commute_buff-1) == _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, nb_non_commute_buff)))
+                continue;
+            enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, nb_non_commute_buff);
+            if(mode & STARPU_COMMUTE){
+                break;
+            }
+        }
+
+        unsigned idx_buf_commute;
+        unsigned all_commutes_available = 1;
+
+        for (idx_buf_commute = nb_non_commute_buff; idx_buf_commute < nbuffers; idx_buf_commute++)
+        {
+            if (idx_buf_commute && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute-1)==_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute)))
+                continue;
+            /* we post all commute  */
+            starpu_data_handle_t handle_commute = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute);
+            enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
+            assert(mode & STARPU_COMMUTE);
+
+            _starpu_spin_lock(&handle_commute->header_lock);
+            if(handle_commute->refcnt != 0){
+                /* handle is not available */
+                _starpu_spin_unlock(&handle_commute->header_lock);
+                all_commutes_available = 0;
+                break;
+            }
+            /* mark the handle as taken */
+            handle_commute->refcnt += 1;
+            handle_commute->current_mode = mode;
+            _starpu_spin_unlock(&handle_commute->header_lock);
+        }
+
+        if(all_commutes_available){
+            for (idx_buf_commute = nb_non_commute_buff; idx_buf_commute < nbuffers; idx_buf_commute++)
+            {
+                if (idx_buf_commute && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute-1)==_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute)))
+                    continue;
+                /* we post all commute  */
+                starpu_data_handle_t handle_commute = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute);
+                enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
+                assert(mode & STARPU_COMMUTE);
+
+                _starpu_spin_lock(&handle_commute->header_lock);
+                assert(handle_commute->refcnt == 1);
+                assert( handle_commute->busy_count >= 1);
+                assert( handle_commute->current_mode == mode);
+                const unsigned correctly_deleted = remove_job_from_requester_list(handle_commute->commute_req_list, j);
+                assert(correctly_deleted == 0);
+                if(_starpu_data_requester_list_empty(handle_commute->commute_req_list)){ // If size == 0
+                    _starpu_data_requester_list_delete(handle_commute->commute_req_list);
+                    handle_commute->commute_req_list = NULL;
+                }
+                _starpu_spin_unlock(&handle_commute->header_lock);
+            }
+            /* delete list node */
+            _starpu_data_requester_delete(r);
+
+            /* push the task */
+            _starpu_push_task(j);
+
+            /* release global mutex */
+#ifdef NO_LOCK_OR_DELEGATE
+            ret = pthread_mutex_unlock(&commute_global_mutex);
+            assert(ret == 0);
+#endif
+            /* We need to lock when returning 0 */
+            return 0;
+        }
+        else{
+            unsigned idx_buf_cancel;
+            /* all handles are not available - revert the mark */
+            for (idx_buf_cancel = nb_non_commute_buff; idx_buf_cancel < idx_buf_commute ; idx_buf_cancel++)
+            {
+                starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
+                _starpu_spin_lock(&cancel_handle->header_lock);
+                assert(cancel_handle->refcnt == 1);
+                cancel_handle->refcnt -= 1;
+                _starpu_spin_unlock(&cancel_handle->header_lock);
+            }
+        }
+
+        r = r->_next;
+    }
+    /* no task has been pushed */
+#ifdef NO_LOCK_OR_DELEGATE
+    ret = pthread_mutex_unlock(&commute_global_mutex);
+    assert(ret == 0);
+#endif
+    return 1;
+}
+
+// WIP_COMMUTE End
+
 /*
  * Check to see whether the first queued request can proceed, and return it in
  * such case.
@@ -81,15 +520,25 @@ static struct _starpu_data_requester *may_unlock_data_req_list_head(starpu_data_
 		return _starpu_data_requester_list_pop_front(req_list);
 
 	/* Already writing to it, do not let another write access through */
-	if (handle->current_mode == STARPU_W)
+	// WIP_COMMUTE Was
+	// if (handle->current_mode == STARPU_W)
+	//	return NULL;
+	// WIP_COMMUTE Begin
+	if (handle->current_mode & STARPU_W)
 		return NULL;
+	// WIP_COMMUTE End
 
 	/* data->current_mode == STARPU_R, so we can process more readers */
 	struct _starpu_data_requester *r = _starpu_data_requester_list_front(req_list);
 
 	enum starpu_data_access_mode r_mode = r->mode;
-	if (r_mode == STARPU_RW)
-		r_mode = STARPU_W;
+	// WIP_COMMUTE Was
+	// if (r_mode == STARPU_RW)
+	//	r_mode = STARPU_W;
+	// WIP_COMMUTE Begin
+	if (r_mode & STARPU_RW)
+		r_mode &= ~STARPU_R;
+	// WIP_COMMUTE End
 
 	/* If this is a STARPU_R, STARPU_SCRATCH or STARPU_REDUX type of
 	 * access, we only proceed if the current mode is the same as the
@@ -105,10 +554,14 @@ static struct _starpu_data_requester *may_unlock_data_req_list_head(starpu_data_
  * with the current mode, the request is put in the per-handle list of
  * "requesters", and this function returns 1. */
 static unsigned _starpu_attempt_to_submit_data_request(unsigned request_from_codelet,
-						       starpu_data_handle_t handle, enum starpu_data_access_mode mode,
+						       starpu_data_handle_t handle, enum starpu_data_access_mode current_mode,
 						       void (*callback)(void *), void *argcb,
 						       struct _starpu_job *j, unsigned buffer_index)
 {
+	// WIP_COMMUTE Begin
+	enum starpu_data_access_mode mode = (current_mode & ~STARPU_COMMUTE);
+	// WIP_COMMUTE End
+
 	if (mode == STARPU_RW)
 		mode = STARPU_W;
 
@@ -149,7 +602,11 @@ static unsigned _starpu_attempt_to_submit_data_request(unsigned request_from_cod
 	 * current one, we can proceed. */
 	unsigned put_in_list = 1;
 
-	enum starpu_data_access_mode previous_mode = handle->current_mode;
+	// WIP_COMMUTE Was
+	//enum starpu_data_access_mode previous_mode = handle->current_mode;
+	// WIP_COMMUTE Begin
+	enum starpu_data_access_mode previous_mode = (handle->current_mode & ~STARPU_COMMUTE);
+	// WIP_COMMUTE End
 
 	if (!frozen && ((handle->refcnt == 0) || (!(mode == STARPU_W) && (handle->current_mode == mode))))
 	{
@@ -179,7 +636,7 @@ static unsigned _starpu_attempt_to_submit_data_request(unsigned request_from_cod
 		handle->busy_count++;
 		/* enqueue the request */
 		struct _starpu_data_requester *r = _starpu_data_requester_new();
-		r->mode = mode;
+		r->mode = current_mode;
 		r->is_requested_by_codelet = request_from_codelet;
 		r->j = j;
 		r->buffer_index = buffer_index;
@@ -213,7 +670,7 @@ static unsigned _starpu_attempt_to_submit_data_request(unsigned request_from_cod
 		 * shared R acquisition.
 		 */
 		if (mode != STARPU_R || handle->current_mode != mode)
-			handle->current_mode = mode;
+			handle->current_mode = current_mode;
 
 		if ((mode == STARPU_REDUX) && (previous_mode != STARPU_REDUX))
 			_starpu_data_start_reduction_mode(handle);
@@ -238,7 +695,11 @@ static unsigned attempt_to_submit_data_request_from_job(struct _starpu_job *j, u
 	/* Note that we do not access j->task->handles, but j->ordered_buffers
 	 * which is a sorted copy of it. */
 	starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buffer_index);
-	enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, buffer_index) & ~STARPU_COMMUTE;
+	// WIP_COMMUTE Was
+	// enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, buffer_index) & ~STARPU_COMMUTE;
+	// WIP_COMMUTE Begin
+	enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, buffer_index);
+	// WIP_COMMUTE End
 
 	return _starpu_attempt_to_submit_data_request(1, handle, mode, NULL, NULL, j, buffer_index);
 }
@@ -264,12 +725,38 @@ static unsigned _submit_job_enforce_data_deps(struct _starpu_job *j, unsigned st
 		}
 
                 j->task->status = STARPU_TASK_BLOCKED_ON_DATA;
+
+		// WIP_COMMUTE Begin
+		enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, buf);
+		if(mode & STARPU_COMMUTE){
+			/* We arrived on the commute we stop and do not proceed as usual */
+			break;
+		}
+		// WIP_COMMUTE End
+
                 if (attempt_to_submit_data_request_from_job(j, buf))
 		{
 			return 1;
                 }
 	}
 
+	// WIP_COMMUTE Begin
+	/* We arrive on the commutes */
+	if(buf != nbuffers){
+#ifndef NO_LOCK_OR_DELEGATE
+		struct EnforceCommuteArgs* args = (struct EnforceCommuteArgs*)malloc(sizeof(struct EnforceCommuteArgs));
+		args->j = j;
+		args->buf = buf;
+		args->nbuffers = nbuffers;
+		/* The function will delete args */
+		LockOrDelegatePostOrPerform(&_submit_job_enforce_commute_deps, args);
+#else // NO_LOCK_OR_DELEGATE
+		_submit_job_enforce_commute_deps(j, buf, nbuffers);
+#endif
+		return 1;
+	}
+	// WIP_COMMUTE End
+
 	return 0;
 }
 
@@ -417,5 +904,22 @@ int _starpu_notify_data_dependencies(starpu_data_handle_t handle)
 		}
 	}
 
+	// WIP_COMMUTE Begin
+
+	if(handle->refcnt == 0 && handle->commute_req_list != NULL){
+		/* We need to delloc current handle because it is currently locked
+		 * but we alloc fist the global mutex and than the handles mutex
+		 */
+		_starpu_spin_unlock(&handle->header_lock);
+#ifndef NO_LOCK_OR_DELEGATE
+		LockOrDelegatePostOrPerform(&_starpu_notify_commute_dependencies, handle);
+#else // NO_LOCK_OR_DELEGATE
+		_starpu_notify_commute_dependencies(handle);
+#endif
+		/* We need to lock when returning 0 */
+		return 1;
+	}
+	// WIP_COMMUTE End
+
 	return 0;
 }

+ 4 - 0
src/datawizard/coherency.h

@@ -231,6 +231,10 @@ struct _starpu_data_state
 
 	/* hook to be called when unregistering the data */
 	_starpu_data_handle_unregister_hook unregister_hook;
+
+    // WIP_COMMUTE Begin
+    struct _starpu_data_requester_list *commute_req_list;
+    // WIP_COMMUTE End
 };
 
 void _starpu_display_msi_stats(void);

+ 9 - 0
src/datawizard/sort_data_handles.c

@@ -70,6 +70,15 @@ static int _starpu_compar_handles(const struct _starpu_data_descr *descrA,
 	struct _starpu_data_state *dataA = descrA->handle;
 	struct _starpu_data_state *dataB = descrB->handle;
 
+    // WIP_COMMUTE Begin
+    if(descrA->mode & STARPU_COMMUTE){
+        return 1;
+    }
+    if(descrB->mode & STARPU_COMMUTE){
+        return -1;
+    }
+    // WIP_COMMUTE End
+
 	/* Perhaps we have the same piece of data */
 	if (dataA == dataB)
 	{