Selaa lähdekoodia

Use arbiters instead of a global state. Does not support several arbiters yet

Samuel Thibault 10 vuotta sitten
vanhempi
commit
a5b6105706

+ 4 - 0
include/starpu_data.h

@@ -81,6 +81,10 @@ int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t h
 void starpu_data_release(starpu_data_handle_t handle);
 void starpu_data_release_on_node(starpu_data_handle_t handle, int node);
 
+typedef struct starpu_arbiter *starpu_arbiter_t;
+starpu_arbiter_t starpu_arbiter_create(void);
+void starpu_data_assign_arbiter(starpu_data_handle_t handle, starpu_arbiter_t arbiter);
+
 void starpu_data_display_memory_stats();
 
 #define starpu_data_malloc_pinned_if_possible	starpu_malloc

+ 83 - 50
src/core/dependencies/data_commute_concurrency.c

@@ -21,7 +21,7 @@
 #include <common/starpu_spinlock.h>
 #include <datawizard/sort_data_handles.h>
 
-#define LOCK_OR_DELEGATE
+//#define LOCK_OR_DELEGATE
 
 /*
  * This implements a solution for the dining philosophers problem (see
@@ -81,6 +81,21 @@
  * - return 1;
  */
 
+struct starpu_arbiter
+{
+#ifdef LOCK_OR_DELEGATE
+/* The list of task to perform */
+	struct LockOrDelegateListNode* dlTaskListHead;
+
+/* To protect the list of tasks */
+	struct _starpu_spinlock dlListLock;
+/* Whether somebody is working on the list */
+	int working;
+#else /* LOCK_OR_DELEGATE */
+	starpu_pthread_mutex_t mutex;
+#endif /* LOCK_OR_DELEGATE */
+};
+
 #ifdef LOCK_OR_DELEGATE
 
 /* In case of congestion, we don't want to needlessly wait for the arbiter lock
@@ -99,20 +114,12 @@ struct LockOrDelegateListNode
 	struct LockOrDelegateListNode* next;
 };
 
-/* The list of task to perform */
-static struct LockOrDelegateListNode* dlTaskListHead = NULL;
-
-/* To protect the list of tasks */
-static starpu_pthread_mutex_t dlListLock = STARPU_PTHREAD_MUTEX_INITIALIZER;
-/* Whether somebody is working on the list */
-static int working;
-
 /* Post a task to perfom if possible, otherwise put it in the list
  * If we can perfom this task, we may also perfom all the tasks in the list
  * This function return 1 if the task (and maybe some others) has been done
  * by the calling thread and 0 otherwise (if the task has just been put in the list)
  */
-static int _starpu_LockOrDelegatePostOrPerform(void (*func)(void*), void* data)
+static int _starpu_LockOrDelegatePostOrPerform(starpu_arbiter_t arbiter, void (*func)(void*), void* data)
 {
 	struct LockOrDelegateListNode* newNode = malloc(sizeof(*newNode)), *iter;
 	int did = 0;
@@ -120,25 +127,24 @@ static int _starpu_LockOrDelegatePostOrPerform(void (*func)(void*), void* data)
 	newNode->data = data;
 	newNode->func = func;
 
-	STARPU_PTHREAD_MUTEX_LOCK(&dlListLock);
-	if (working)
+	_starpu_spin_lock(&arbiter->dlListLock);
+	if (arbiter->working)
 	{
 		/* Somebody working on it, insert the node */
-		newNode->next = dlTaskListHead;
-		dlTaskListHead = newNode;
+		newNode->next = arbiter->dlTaskListHead;
+		arbiter->dlTaskListHead = newNode;
 	}
 	else
 	{
 		/* Nobody working on the list, we'll work */
-		working = 1;
+		arbiter->working = 1;
 
 		/* work on what was pushed so far first */
-		iter = dlTaskListHead;
-		dlTaskListHead = NULL;
-		STARPU_PTHREAD_MUTEX_UNLOCK(&dlListLock);
-		while(iter != NULL)
+		iter = arbiter->dlTaskListHead;
+		arbiter->dlTaskListHead = NULL;
+		_starpu_spin_unlock(&arbiter->dlListLock);
+		while (iter != NULL)
 		{
-
 			(*iter->func)(iter->data);
 			free(iter);
 			iter = iter->next;
@@ -149,33 +155,30 @@ static int _starpu_LockOrDelegatePostOrPerform(void (*func)(void*), void* data)
 		free(newNode);
 		did = 1;
 
-		STARPU_PTHREAD_MUTEX_LOCK(&dlListLock);
+		_starpu_spin_lock(&arbiter->dlListLock);
 		/* And finish working on anything that could have been pushed
 		 * in the meanwhile */
-		while(dlTaskListHead != 0)
+		while (arbiter->dlTaskListHead != 0)
 		{
-			iter = dlTaskListHead;
-			dlTaskListHead = dlTaskListHead->next;
-			STARPU_PTHREAD_MUTEX_UNLOCK(&dlListLock);
+			iter = arbiter->dlTaskListHead;
+			arbiter->dlTaskListHead = arbiter->dlTaskListHead->next;
+			_starpu_spin_unlock(&arbiter->dlListLock);
 
 			(*iter->func)(iter->data);
 			free(iter);
-			STARPU_PTHREAD_MUTEX_LOCK(&dlListLock);
+			_starpu_spin_lock(&arbiter->dlListLock);
 		}
 
-		working = 0;
+		arbiter->working = 0;
 	}
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(&dlListLock);
+	_starpu_spin_unlock(&arbiter->dlListLock);
 	return did;
 }
 
-#else // LOCK_OR_DELEGATE
-
-starpu_pthread_mutex_t commute_global_mutex = STARPU_PTHREAD_MUTEX_INITIALIZER;
-
 #endif
 
+
 /* This function find a node that contains the parameter j as job and remove it from the list
  * the function return 0 if a node was found and deleted, 1 otherwise
  */
@@ -220,20 +223,24 @@ static void __starpu_submit_job_enforce_commute_deps(void* inData)
 void _starpu_submit_job_enforce_commute_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers)
 {
 	struct starpu_enforce_commute_args* args = malloc(sizeof(*args));
+	starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf);
 	args->j = j;
 	args->buf = buf;
 	args->nbuffers = nbuffers;
 	/* The function will delete args */
-	_starpu_LockOrDelegatePostOrPerform(&__starpu_submit_job_enforce_commute_deps, args);
+	_starpu_LockOrDelegatePostOrPerform(handle->arbiter, &__starpu_submit_job_enforce_commute_deps, args);
 }
 
 static void ___starpu_submit_job_enforce_commute_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers)
 {
+	starpu_arbiter_t arbiter = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf)->arbiter;
 #else // LOCK_OR_DELEGATE
 void _starpu_submit_job_enforce_commute_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers)
 {
-	STARPU_PTHREAD_MUTEX_LOCK(&commute_global_mutex);
+	starpu_arbiter_t arbiter = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf)->arbiter;
+	STARPU_PTHREAD_MUTEX_LOCK(&arbiter->mutex);
 #endif
+	STARPU_ASSERT(arbiter);
 
 	const unsigned nb_non_commute_buff = buf;
 	unsigned idx_buf_commute;
@@ -250,7 +257,7 @@ void _starpu_submit_job_enforce_commute_deps(struct _starpu_job *j, unsigned buf
 			 * _starpu_compar_handles.  */
 			continue;
 		/* we post all commute  */
-		STARPU_ASSERT(mode & STARPU_COMMUTE);
+		STARPU_ASSERT(handle->arbiter == arbiter);
 
 		_starpu_spin_lock(&handle->header_lock);
 		if(handle->refcnt == 0)
@@ -291,7 +298,7 @@ void _starpu_submit_job_enforce_commute_deps(struct _starpu_job *j, unsigned buf
 			starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
 			enum starpu_data_access_mode cancel_mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_cancel);
 
-			STARPU_ASSERT(cancel_mode & STARPU_COMMUTE);
+			STARPU_ASSERT(cancel_handle->arbiter == arbiter);
 
 			struct _starpu_data_requester *r = _starpu_data_requester_new();
 			r->mode = cancel_mode;
@@ -314,7 +321,7 @@ void _starpu_submit_job_enforce_commute_deps(struct _starpu_job *j, unsigned buf
 		}
 
 #ifndef LOCK_OR_DELEGATE
-		STARPU_PTHREAD_MUTEX_UNLOCK(&commute_global_mutex);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
 #endif
 		return 1;
 	}
@@ -322,7 +329,7 @@ void _starpu_submit_job_enforce_commute_deps(struct _starpu_job *j, unsigned buf
 	// all_commutes_available is true
 	_starpu_push_task(j);
 #ifndef LOCK_OR_DELEGATE
-	STARPU_PTHREAD_MUTEX_UNLOCK(&commute_global_mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
 #endif
 	return 0;
 }
@@ -336,31 +343,34 @@ void __starpu_notify_commute_dependencies(void* inData)
 }
 void _starpu_notify_commute_dependencies(starpu_data_handle_t handle)
 {
-	_starpu_LockOrDelegatePostOrPerform(&__starpu_notify_commute_dependencies, handle);
+	_starpu_LockOrDelegatePostOrPerform(handle->arbiter, &__starpu_notify_commute_dependencies, handle);
 }
 void ___starpu_notify_commute_dependencies(starpu_data_handle_t handle)
 {
 #else // LOCK_OR_DELEGATE
 void _starpu_notify_commute_dependencies(starpu_data_handle_t handle)
 {
-	STARPU_PTHREAD_MUTEX_LOCK(&commute_global_mutex);
 #endif
+	starpu_arbiter_t arbiter = handle->arbiter;
+#ifndef LOCK_OR_DELEGATE
+	STARPU_PTHREAD_MUTEX_LOCK(&arbiter->mutex);
+#endif
+
 	/* Since the request has been posted the handle may have been proceed and released */
 	if(handle->commute_req_list == NULL)
 	{
 #ifndef LOCK_OR_DELEGATE
-		STARPU_PTHREAD_MUTEX_UNLOCK(&commute_global_mutex);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
 #endif
 		return 1;
 	}
-	/* no one has the right to work on commute_req_list without a lock on commute_global_mutex
+	/* no one has the right to work on commute_req_list without a lock on mutex
 	   so we do not need to lock the handle for safety */
 	struct _starpu_data_requester *r;
 	r = _starpu_data_requester_list_begin(handle->commute_req_list); //_head;
 	while(r)
 	{
 		struct _starpu_job* j = r->j;
-		STARPU_ASSERT(r->mode & STARPU_COMMUTE);
 		unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(j->task);
 		unsigned nb_non_commute_buff;
 		/* find the position of commute buffers */
@@ -373,7 +383,7 @@ void _starpu_notify_commute_dependencies(starpu_data_handle_t handle)
 				 * _starpu_compar_handles.  */
 				continue;
 			enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, nb_non_commute_buff);
-			if(mode & STARPU_COMMUTE)
+			if(handle_commute->arbiter == arbiter)
 			{
 				break;
 			}
@@ -392,12 +402,12 @@ void _starpu_notify_commute_dependencies(starpu_data_handle_t handle)
 				continue;
 			/* we post all commute  */
 			enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
-			STARPU_ASSERT(mode & STARPU_COMMUTE);
+			STARPU_ASSERT(handle_commute->arbiter == arbiter);
 
 			_starpu_spin_lock(&handle_commute->header_lock);
 			if(handle_commute->refcnt != 0)
 			{
-				/* handle is not available */
+				/* handle is not available, record ourself */
 				_starpu_spin_unlock(&handle_commute->header_lock);
 				all_commutes_available = 0;
 				break;
@@ -417,7 +427,7 @@ void _starpu_notify_commute_dependencies(starpu_data_handle_t handle)
 					continue;
 				/* we post all commute  */
 				enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
-				STARPU_ASSERT(mode & STARPU_COMMUTE);
+				STARPU_ASSERT(handle_commute->arbiter == arbiter);
 
 				_starpu_spin_lock(&handle_commute->header_lock);
 				STARPU_ASSERT(handle_commute->refcnt == 1);
@@ -432,7 +442,7 @@ void _starpu_notify_commute_dependencies(starpu_data_handle_t handle)
 				}
 				_starpu_spin_unlock(&handle_commute->header_lock);
 			}
-			/* delete list node */
+			/* Remove and delete list node */
 			_starpu_data_requester_delete(r);
 
 			/* push the task */
@@ -440,7 +450,7 @@ void _starpu_notify_commute_dependencies(starpu_data_handle_t handle)
 
 			/* release global mutex */
 #ifndef LOCK_OR_DELEGATE
-			STARPU_PTHREAD_MUTEX_UNLOCK(&commute_global_mutex);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
 #endif
 			/* We need to lock when returning 0 */
 			return 0;
@@ -463,7 +473,30 @@ void _starpu_notify_commute_dependencies(starpu_data_handle_t handle)
 	}
 	/* no task has been pushed */
 #ifndef LOCK_OR_DELEGATE
-	STARPU_PTHREAD_MUTEX_UNLOCK(&commute_global_mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
 #endif
 	return 1;
 }
+
+starpu_arbiter_t starpu_arbiter_create(void)
+{
+	starpu_arbiter_t res = malloc(sizeof(*res));
+
+#ifdef LOCK_OR_DELEGATE
+	res->dlTaskListHead = NULL;
+	_starpu_spin_init(&res->dlListLock);
+	res->working = 0;
+#else /* LOCK_OR_DELEGATE */
+	STARPU_PTHREAD_MUTEX_INIT(&res->mutex, NULL);
+#endif /* LOCK_OR_DELEGATE */
+
+	return res;
+}
+
+void starpu_data_assign_arbiter(starpu_data_handle_t handle, starpu_arbiter_t arbiter)
+{
+	STARPU_ASSERT_MSG(!handle->arbiter, "handle can only be assigned one arbiter");
+	STARPU_ASSERT_MSG(!handle->refcnt, "arbiter can be assigned to handle only right after initialization");
+	STARPU_ASSERT_MSG(!handle->busy_count, "arbiter can be assigned to handle only right after initialization");
+	handle->arbiter = arbiter;
+}

+ 8 - 10
src/core/dependencies/data_concurrency.c

@@ -51,8 +51,8 @@
  * The same mechanism is used for application data aquisition
  * (starpu_data_acquire).
  *
- * For COMMUTE data, we have a second step, performed after this first step,
- * implemented in data_commute_concurrency.c
+ * For data with an arbiter, we have a second step, performed after this first
+ * step, implemented in data_commute_concurrency.c
  */
 
 /*
@@ -256,10 +256,10 @@ static unsigned _submit_job_enforce_data_deps(struct _starpu_job *j, unsigned st
 	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(j->task);
 	for (buf = start_buffer_index; buf < nbuffers; buf++)
 	{
+		starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf);
 		if (buf)
 		{
 			starpu_data_handle_t handle_m1 = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf-1);
-			starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf);
 			if (handle_m1 == handle)
 				/* We have already requested this data, skip it. This
 				 * depends on ordering putting writes before reads, see
@@ -270,10 +270,10 @@ static unsigned _submit_job_enforce_data_deps(struct _starpu_job *j, unsigned st
                 j->task->status = STARPU_TASK_BLOCKED_ON_DATA;
 
 		// WIP_COMMUTE Begin
-		enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, buf);
-		if(mode & STARPU_COMMUTE)
+		if(handle->arbiter)
 		{
-			/* We arrived on the commute we stop and proceed with the commute second step.  */
+			/* We arrived on an arbitered data, we stop and proceed
+			 * with the arbiter second step.  */
 			_starpu_submit_job_enforce_commute_deps(j, buf, nbuffers);
 			return 1;
 		}
@@ -436,12 +436,10 @@ int _starpu_notify_data_dependencies(starpu_data_handle_t handle)
 
 	if(handle->refcnt == 0 && handle->commute_req_list != NULL)
 	{
-		/* We need to delloc current handle because it is currently locked
-		 * but we alloc fist the global mutex and than the handles mutex
-		 */
 		_starpu_spin_unlock(&handle->header_lock);
+		/* _starpu_notify_commute_dependencies will handle its own locking */
 		_starpu_notify_commute_dependencies(handle);
-		/* We need to lock when returning 0 */
+		/* We have already unlocked */
 		return 1;
 	}
 	// WIP_COMMUTE End

+ 1 - 0
src/datawizard/coherency.h

@@ -233,6 +233,7 @@ struct _starpu_data_state
 	_starpu_data_handle_unregister_hook unregister_hook;
 
 	// WIP_COMMUTE Begin
+	struct starpu_arbiter *arbiter;
 	struct _starpu_data_requester_list *commute_req_list;
 	// WIP_COMMUTE End
 };

+ 8 - 4
src/datawizard/sort_data_handles.c

@@ -94,12 +94,16 @@ static int _starpu_compar_handles(const struct _starpu_data_descr *descrA,
 	}
 
 	// WIP_COMMUTE Begin
-	/* Put commute accesses after non-commute */
-	if (descrA->mode & STARPU_COMMUTE && !(descrB->mode & STARPU_COMMUTE))
+	/* Put arbitered accesses after non-arbitered */
+	if (dataA->arbiter && !(dataB->arbiter))
 		return 1;
-	if (descrB->mode & STARPU_COMMUTE && !(descrA->mode & STARPU_COMMUTE))
+	if (dataB->arbiter && !(dataA->arbiter))
 		return -1;
-	/* If both are commute, we'll sort them by handle */
+	if (dataA->arbiter != dataB->arbiter)
+		/* Both are arbitered, sort by arbiter pointer order */
+		return ((dataA->arbiter < dataB->arbiter)?-1:1);
+	/* If both are arbitered by the same arbiter (or they are both not
+	 * arbitered), we'll sort them by handle */
 	// WIP_COMMUTE End
 
 	/* In case we have data/subdata from different trees */

+ 51 - 0
tests/datawizard/testCommute.cpp

@@ -67,6 +67,7 @@ int main(int /*argc*/, char** /*argv*/)
 {
 	int ret;
 	struct starpu_conf conf;
+	starpu_arbiter_t arbiter, arbiter2;
 	ret = starpu_conf_init(&conf);
 	STARPU_ASSERT(ret == 0);
 	//conf.ncpus = 1;//// 4
@@ -107,6 +108,8 @@ int main(int /*argc*/, char** /*argv*/)
 
 	std::vector<starpu_data_handle_t> handleA(nbA);
 	std::vector<int> dataA(nbA);
+	arbiter = starpu_arbiter_create();
+	arbiter2 = starpu_arbiter_create();
 	for(int idx = 0 ; idx < nbA ; ++idx)
 	{
 		dataA[idx] = idx;
@@ -114,6 +117,7 @@ int main(int /*argc*/, char** /*argv*/)
 	for(int idxHandle = 0 ; idxHandle < nbA ; ++idxHandle)
 	{
 		starpu_variable_data_register(&handleA[idxHandle], 0, (uintptr_t)&dataA[idxHandle], sizeof(dataA[idxHandle]));
+		starpu_data_assign_arbiter(handleA[idxHandle], arbiter);
 	}
 
 	//////////////////////////////////////////////////////
@@ -142,6 +146,53 @@ int main(int /*argc*/, char** /*argv*/)
 	//////////////////////////////////////////////////////
 	FPRINTF(stdout,"Wait task\n");
 
+	starpu_task_wait_for_all();
+
+	//////////////////////////////////////////////////////
+	FPRINTF(stdout,"Release data\n");
+
+	for(int idxHandle = 0 ; idxHandle < nbA ; ++idxHandle)
+	{
+		starpu_data_unregister(handleA[idxHandle]);
+	}
+
+	//////////////////////////////////////////////////////
+	FPRINTF(stdout,"Proceed gain, with several arbiters\n");
+
+	for(int idxHandle = 0 ; idxHandle < nbA ; ++idxHandle)
+	{
+		starpu_variable_data_register(&handleA[idxHandle], 0, (uintptr_t)&dataA[idxHandle], sizeof(dataA[idxHandle]));
+		starpu_data_assign_arbiter(handleA[idxHandle], idxHandle%2?arbiter:arbiter2);
+	}
+
+#ifdef NOTYET
+	//////////////////////////////////////////////////////
+	//////////////////////////////////////////////////////
+	FPRINTF(stdout,"Submit tasks\n");
+
+	for(int idxHandleA1 = 0 ; idxHandleA1 < nbA ; ++idxHandleA1)
+	{
+		ret = starpu_task_insert(&slowCodelete,
+				(STARPU_RW|STARPU_COMMUTE), handleA[idxHandleA1],
+				0);
+		if (ret == -ENODEV) goto out;
+		for(int idxHandleA2 = 0 ; idxHandleA2 < nbA ; ++idxHandleA2)
+		{
+			if(idxHandleA1 != idxHandleA2)
+			{
+				ret = starpu_task_insert(&normalCodelete,
+						(STARPU_RW|STARPU_COMMUTE), handleA[idxHandleA1],
+						(STARPU_RW|STARPU_COMMUTE), handleA[idxHandleA2],
+						0);
+				if (ret == -ENODEV) goto out;
+			}
+		}
+	}
+
+	//////////////////////////////////////////////////////
+	FPRINTF(stdout,"Wait task\n");
+#endif
+
 out:
 	starpu_task_wait_for_all();