Browse Source

Support several arbiters

Samuel Thibault 10 years ago
parent
commit
14be4db267

+ 43 - 11
src/core/dependencies/data_commute_concurrency.c

@@ -246,6 +246,7 @@ void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned b
 	unsigned idx_buf_commute;
 	unsigned all_commutes_available = 1;
 
+
 	for (idx_buf_commute = nb_non_arbitered_buff; idx_buf_commute < nbuffers; idx_buf_commute++)
 	{
 		starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute);
@@ -256,9 +257,14 @@ void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned b
 			 * depends on ordering putting writes before reads, see
 			 * _starpu_compar_handles.  */
 			continue;
-		/* we post all commute  */
-		STARPU_ASSERT(handle->arbiter == arbiter);
 
+		if (handle->arbiter != arbiter)
+		{
+			/* another arbiter */
+			break;
+		}
+
+		/* we post all commute  */
 		_starpu_spin_lock(&handle->header_lock);
 		if(handle->refcnt == 0)
 		{
@@ -285,6 +291,9 @@ void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned b
 
 			if (idx_buf_cancel && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel-1)==cancel_handle))
 				continue;
+			if (cancel_handle->arbiter != arbiter)
+				/* Will have to process another arbiter, will do that later */
+				break;
 
 			_starpu_spin_lock(&cancel_handle->header_lock);
 			/* reset the counter because finally we do not take the data */
@@ -298,7 +307,8 @@ void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned b
 			starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
 			enum starpu_data_access_mode cancel_mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_cancel);
 
-			STARPU_ASSERT(cancel_handle->arbiter == arbiter);
+			if (cancel_handle->arbiter != arbiter)
+				break;
 
 			struct _starpu_data_requester *r = _starpu_data_requester_new();
 			r->mode = cancel_mode;
@@ -325,12 +335,18 @@ void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned b
 #endif
 		return 1;
 	}
-
-	// all_commutes_available is true
-	_starpu_push_task(j);
 #ifndef LOCK_OR_DELEGATE
 	STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
 #endif
+
+	// all_commutes_available is true
+	if (idx_buf_commute < nbuffers)
+	{
+		/* Other arbitered data, process them */
+		return _starpu_submit_job_enforce_arbitered_deps(j, idx_buf_commute, nbuffers);
+	}
+	/* Finished with all data, can eventually push! */
+	_starpu_push_task(j);
 	return 0;
 }
 
@@ -400,9 +416,12 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 				 * depends on ordering putting writes before reads, see
 				 * _starpu_compar_handles.  */
 				continue;
+			if (handle_commute->arbiter != arbiter)
+				/* Will have to process another arbiter, will do that later */
+				break;
+
 			/* we post all commute  */
 			enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
-			STARPU_ASSERT(handle_commute->arbiter == arbiter);
 
 			_starpu_spin_lock(&handle_commute->header_lock);
 			if(handle_commute->refcnt != 0)
@@ -425,9 +444,11 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 				starpu_data_handle_t handle_commute = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute);
 				if (idx_buf_commute && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_commute-1)==handle_commute))
 					continue;
+				if (handle_commute->arbiter != arbiter)
+					break;
+
 				/* we post all commute  */
 				enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_commute);
-				STARPU_ASSERT(handle_commute->arbiter == arbiter);
 
 				_starpu_spin_lock(&handle_commute->header_lock);
 				STARPU_ASSERT(handle_commute->refcnt == 1);
@@ -445,13 +466,20 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 			/* Remove and delete list node */
 			_starpu_data_requester_delete(r);
 
-			/* push the task */
-			_starpu_push_task(j);
-
 			/* release global mutex */
 #ifndef LOCK_OR_DELEGATE
 			STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
 #endif
+
+			if (idx_buf_commute < nbuffers)
+			{
+				/* Other arbitered data, process them */
+				_starpu_submit_job_enforce_arbitered_deps(j, idx_buf_commute, nbuffers);
+			}
+			else
+				/* Finished with all data, can eventually push! */
+				_starpu_push_task(j);
+
 			/* We need to lock when returning 0 */
 			return 0;
 		}
@@ -462,6 +490,10 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 			for (idx_buf_cancel = nb_non_arbitered_buff; idx_buf_cancel < idx_buf_commute ; idx_buf_cancel++)
 			{
 				starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
+				if (idx_buf_cancel && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel-1)==cancel_handle))
+					continue;
+				if (cancel_handle->arbiter != arbiter)
+					break;
 				_starpu_spin_lock(&cancel_handle->header_lock);
 				STARPU_ASSERT(cancel_handle->refcnt == 1);
 				cancel_handle->refcnt -= 1;

+ 4 - 0
src/core/dependencies/data_concurrency.c

@@ -113,6 +113,10 @@ static unsigned _starpu_attempt_to_submit_data_request(unsigned request_from_cod
 						       void (*callback)(void *), void *argcb,
 						       struct _starpu_job *j, unsigned buffer_index)
 {
+	/* TODO: implement */
+	if (handle->arbiter)
+		_STARPU_DISP("data acquisition not completely safe with arbitered handles\n");
+
 	if (mode == STARPU_RW)
 		mode = STARPU_W;
 

+ 0 - 2
tests/datawizard/testCommute.cpp

@@ -165,7 +165,6 @@ int main(int /*argc*/, char** /*argv*/)
 		starpu_data_assign_arbiter(handleA[idxHandle], idxHandle%2?arbiter:arbiter2);
 	}
 
-#ifdef NOTYET
 	//////////////////////////////////////////////////////
 	//////////////////////////////////////////////////////
 	FPRINTF(stdout,"Submit tasks\n");
@@ -191,7 +190,6 @@ int main(int /*argc*/, char** /*argv*/)
 
 	//////////////////////////////////////////////////////
 	FPRINTF(stdout,"Wait task\n");
-#endif
 
 out:
 	starpu_task_wait_for_all();