Browse Source

simplify commute locking, which fixes a race, and allows to easily improve it

Samuel Thibault 10 years ago
parent
commit
f45da9d88c
1 changed files with 47 additions and 27 deletions
  1. 47 27
      src/core/dependencies/data_commute_concurrency.c

+ 47 - 27
src/core/dependencies/data_commute_concurrency.c

@@ -81,13 +81,16 @@
  * - return 1;
  */
 
-/* Here are the LockOrDelegate functions
- * There are two version depending on the support of the compare and exchange
- * support from the compiler
- */
-
 #ifdef LOCK_OR_DELEGATE
 
+/* In case of congestion, we don't want to needlessly wait for the arbiter lock
+ * while we can just delegate the work to the worker already managing some
+ * dependencies.
+ *
+ * So we push work on the dlTastListHead queue and only one worker will process
+ * the list.
+ */
+
 /* A LockOrDelegate task list */
 struct LockOrDelegateListNode
 {
@@ -101,8 +104,8 @@ static struct LockOrDelegateListNode* dlTaskListHead = NULL;
 
 /* To protect the list of tasks */
 static starpu_pthread_mutex_t dlListLock = STARPU_PTHREAD_MUTEX_INITIALIZER;
-/* To know who is responsible to compute all the tasks */
-static starpu_pthread_mutex_t dlWorkLock = STARPU_PTHREAD_MUTEX_INITIALIZER;
+/* Whether somebody is working on the list */
+static int working;
 
 /* Post a task to perfom if possible, otherwise put it in the list
  * If we can perfom this task, we may also perfom all the tasks in the list
@@ -111,27 +114,47 @@ static starpu_pthread_mutex_t dlWorkLock = STARPU_PTHREAD_MUTEX_INITIALIZER;
  */
 static int _starpu_LockOrDelegatePostOrPerform(void (*func)(void*), void* data)
 {
-	/* We could avoid to allocate if we will be responsible but for simplicity
-	 * we always push the task in the list */
-	struct LockOrDelegateListNode* newNode = (struct LockOrDelegateListNode*)malloc(sizeof(struct LockOrDelegateListNode));
+	struct LockOrDelegateListNode* newNode = malloc(sizeof(*newNode)), *iter;
+	int did = 0;
 	STARPU_ASSERT(newNode);
 	newNode->data = data;
 	newNode->func = func;
-	int ret;
 
-	/* insert the node */
 	STARPU_PTHREAD_MUTEX_LOCK(&dlListLock);
-	newNode->next = dlTaskListHead;
-	dlTaskListHead = newNode;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&dlListLock);
-
-	/* See if we can compute all the tasks */
-	if((ret = STARPU_PTHREAD_MUTEX_TRYLOCK(&dlWorkLock)) == 0)
+	if (working)
+	{
+		/* Somebody working on it, insert the node */
+		newNode->next = dlTaskListHead;
+		dlTaskListHead = newNode;
+	}
+	else
 	{
+		/* Nobody working on the list, we'll work */
+		working = 1;
+
+		/* work on what was pushed so far first */
+		iter = dlTaskListHead;
+		dlTaskListHead = NULL;
+		STARPU_PTHREAD_MUTEX_UNLOCK(&dlListLock);
+		while(iter != NULL)
+		{
+
+			(*iter->func)(iter->data);
+			free(iter);
+			iter = iter->next;
+		}
+
+		/* And then do our job */
+		(*func)(data);
+		free(newNode);
+		did = 1;
+
 		STARPU_PTHREAD_MUTEX_LOCK(&dlListLock);
+		/* And finish working on anything that could have been pushed
+		 * in the meanwhile */
 		while(dlTaskListHead != 0)
 		{
-			struct LockOrDelegateListNode* iter = dlTaskListHead;
+			iter = dlTaskListHead;
 			dlTaskListHead = dlTaskListHead->next;
 			STARPU_PTHREAD_MUTEX_UNLOCK(&dlListLock);
 
@@ -140,14 +163,11 @@ static int _starpu_LockOrDelegatePostOrPerform(void (*func)(void*), void* data)
 			STARPU_PTHREAD_MUTEX_LOCK(&dlListLock);
 		}
 
-		/* First unlock the list! this is important */
-		STARPU_PTHREAD_MUTEX_UNLOCK(&dlWorkLock);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&dlListLock);
-
-		return 1;
+		working = 0;
 	}
-	STARPU_ASSERT(ret == EBUSY);
-	return 0;
+
+	STARPU_PTHREAD_MUTEX_UNLOCK(&dlListLock);
+	return did;
 }
 
 #else // LOCK_OR_DELEGATE
@@ -199,7 +219,7 @@ static void __starpu_submit_job_enforce_commute_deps(void* inData)
 
 void _starpu_submit_job_enforce_commute_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers)
 {
-	struct starpu_enforce_commute_args* args = (struct starpu_enforce_commute_args*)malloc(sizeof(struct starpu_enforce_commute_args));
+	struct starpu_enforce_commute_args* args = malloc(sizeof(*args));
 	args->j = j;
 	args->buf = buf;
 	args->nbuffers = nbuffers;