Explorar o código

port r15212 from commute branch: Make arbiter register tasks only on the handle they did not manage to acquire, to avoid too many tries and adds/removals

Samuel Thibault %!s(int64=10) %!d(string=hai) anos
pai
achega
c89ca46b05
Modificáronse 1 ficheiros con 89 adicións e 113 borrados
  1. 89 113
      src/core/dependencies/data_arbiter_concurrency.c

+ 89 - 113
src/core/dependencies/data_arbiter_concurrency.c

@@ -25,6 +25,8 @@
 
 /* TODO factorize with data_concurrency.c and btw support redux */
 
+/* TODO: fine-grain R/W access */
+
 //#define LOCK_OR_DELEGATE
 
 /*
@@ -43,7 +45,7 @@
  *   - mutex_lock(&arbiter)
  *   - release reference on h
  *   - for each task Tc waiting for h:
- *     - for each data Tc_h it is waiting:
+ *     - for each data Tc_h it is waiting for:
  *       - if Tc_h is busy, goto fail
  *     // Ok, now really take them
  *     - For each data Tc_h it is waiting:
@@ -54,6 +56,8 @@
  *     _starpu_push_task(Tc);
  *     break;
  *     fail:
+ *       - unrecord T as waiting on h
+ *       - record T as waiting on Tc_h
  *       // No luck, let's try another task
  *       continue;
  *   // Release the arbiter mutex a bit from time to time
@@ -72,16 +76,13 @@
  * - return 0;
  *
  * fail:
- * // couldn't take everything, abort and record task T
+ * // couldn't take everything, record task T and abort
+ * - record T as waiting on h
  * // drop spurious references
  * - for each handle h of T already taken:
  *   - lock(h)
  *   - release reference on h
  *   - unlock(h)
- * // record T on the list of requests for h
- * TODO: record on only one handle
- * - for each handle h of T:
- *   - record T as waiting on h
  * - mutex_unlock(&arbiter)
  * - return 1;
  */
@@ -336,24 +337,6 @@ unsigned _starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_
 
 
 
-/* This function find a node that contains the parameter j as job and remove it from the list
- * the function return 0 if a node was found and deleted, 1 otherwise
- */
-static unsigned remove_job_from_requester_list(struct _starpu_data_requester_list* req_list, struct _starpu_job * j)
-{
-	struct _starpu_data_requester * iter = _starpu_data_requester_list_begin(req_list);//_head;
-	while (iter != _starpu_data_requester_list_end(req_list) && iter->j != j)
-	{
-		iter = _starpu_data_requester_list_next(iter); // iter = iter->_next;
-	}
-	if (iter)
-	{
-		_starpu_data_requester_list_erase(req_list, iter);
-		return 0;
-	}
-	return 1;
-}
-
 #ifdef LOCK_OR_DELEGATE
 /* These are the arguments passed to _submit_job_enforce_arbitered_deps */
 struct starpu_enforce_arbitered_args
@@ -397,15 +380,17 @@ void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned b
 #endif
 	STARPU_ASSERT(arbiter);
 
-	const unsigned nb_non_arbitered_buff = buf;
+	const unsigned start_buf_arbiter = buf;
 	unsigned idx_buf_arbiter;
 	unsigned all_arbiter_available = 1;
 
+	starpu_data_handle_t handle;
+	enum starpu_data_access_mode mode;
 
-	for (idx_buf_arbiter = nb_non_arbitered_buff; idx_buf_arbiter < nbuffers; idx_buf_arbiter++)
+	for (idx_buf_arbiter = start_buf_arbiter; idx_buf_arbiter < nbuffers; idx_buf_arbiter++)
 	{
-		starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter);
-		enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_arbiter);
+		handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter);
+		mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_arbiter);
 
 		if (idx_buf_arbiter && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter-1)==handle))
 			/* We have already requested this data, skip it. This
@@ -419,18 +404,19 @@ void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned b
 			break;
 		}
 
-		/* we post all arbiter  */
+		/* Try to take handle */
 		_starpu_spin_lock(&handle->header_lock);
 		if (handle->refcnt == 0)
 		{
-			handle->refcnt += 1;
-			handle->busy_count += 1;
+			/* Got it */
+			handle->refcnt++;
+			handle->busy_count++;
 			handle->current_mode = mode;
 			_starpu_spin_unlock(&handle->header_lock);
 		}
 		else
 		{
-			/* stop if an handle do not have a refcnt == 0 */
+			/* a handle does not have a refcnt == 0, stop */
 			_starpu_spin_unlock(&handle->header_lock);
 			all_arbiter_available = 0;
 			break;
@@ -438,9 +424,26 @@ void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned b
 	}
 	if (all_arbiter_available == 0)
 	{
-		/* Oups cancel all taken and put req in arbiter list */
+		/* Oups, record ourself as waiting for this data */
+
+		struct _starpu_data_requester *r = _starpu_data_requester_new();
+		r->mode = mode;
+		r->is_requested_by_codelet = 1;
+		r->j = j;
+		r->buffer_index = start_buf_arbiter;
+		r->ready_data_callback = NULL;
+		r->argcb = NULL;
+
+		/* store node in list */
+		_starpu_data_requester_list_push_front(&handle->arbitered_req_list, r);
+
+		_starpu_spin_lock(&handle->header_lock);
+		handle->busy_count++;
+		_starpu_spin_unlock(&handle->header_lock);
+
+		/* and cancel all taken */
 		unsigned idx_buf_cancel;
-		for (idx_buf_cancel = nb_non_arbitered_buff; idx_buf_cancel < idx_buf_arbiter ; idx_buf_cancel++)
+		for (idx_buf_cancel = start_buf_arbiter; idx_buf_cancel < idx_buf_arbiter ; idx_buf_cancel++)
 		{
 			starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
 
@@ -453,33 +456,11 @@ void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned b
 			_starpu_spin_lock(&cancel_handle->header_lock);
 			/* reset the counter because finally we do not take the data */
 			STARPU_ASSERT(cancel_handle->refcnt == 1);
-			cancel_handle->refcnt -= 1;
-			_starpu_spin_unlock(&cancel_handle->header_lock);
-		}
-
-		for (idx_buf_cancel = nb_non_arbitered_buff; idx_buf_cancel < nbuffers ; idx_buf_cancel++)
-		{
-			starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
-			enum starpu_data_access_mode cancel_mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_cancel);
-
-			if (cancel_handle->arbiter != arbiter)
-				break;
-
-			struct _starpu_data_requester *r = _starpu_data_requester_new();
-			r->mode = cancel_mode;
-			r->is_requested_by_codelet = 1;
-			r->j = j;
-			r->buffer_index = idx_buf_cancel;
-			r->ready_data_callback = NULL;
-			r->argcb = NULL;
-
-			_starpu_spin_lock(&cancel_handle->header_lock);
-			/* store node in list */
-			_starpu_data_requester_list_push_front(&cancel_handle->arbitered_req_list, r);
-			/* inc the busy count if it has not been changed in the previous loop */
-			if (idx_buf_arbiter <= idx_buf_cancel)
-				cancel_handle->busy_count += 1;
-			_starpu_spin_unlock(&cancel_handle->header_lock);
+			cancel_handle->refcnt--;
+			STARPU_ASSERT(cancel_handle->busy_count > 0);
+			cancel_handle->busy_count--;
+			if (!_starpu_data_check_not_busy(cancel_handle))
+				_starpu_spin_unlock(&cancel_handle->header_lock);
 		}
 
 #ifndef LOCK_OR_DELEGATE
@@ -529,13 +510,11 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 #endif
 		return;
 	}
-	/* no one has the right to work on arbitered_req_list without a lock on mutex
-	   so we do not need to lock the handle for safety */
-	struct _starpu_data_requester *r;
-	for (r = _starpu_data_requester_list_begin(&handle->arbitered_req_list);
-	     r != _starpu_data_requester_list_end(&handle->arbitered_req_list);
-	     r = _starpu_data_requester_list_next(r))
+
+	while (!_starpu_data_requester_list_empty(&handle->arbitered_req_list))
 	{
+		struct _starpu_data_requester *r = _starpu_data_requester_list_pop_front(&handle->arbitered_req_list);
+
 		if (!r->is_requested_by_codelet)
 		{
 			/* data_acquire_cb, process it */
@@ -548,7 +527,6 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 			handle->busy_count++;
 			handle->current_mode = r_mode;
 			_starpu_spin_unlock(&handle->header_lock);
-			_starpu_data_requester_list_erase(&handle->arbitered_req_list, r);
 #ifndef LOCK_OR_DELEGATE
 			STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
 #endif
@@ -563,30 +541,21 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 			return;
 		}
 
+		/* A task waiting for a set of data, try to acquire them */
+
 		struct _starpu_job* j = r->j;
 		unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(j->task);
-		unsigned nb_non_arbitered_buff;
-		/* find the position of arbiter buffers */
-		for (nb_non_arbitered_buff = 0; nb_non_arbitered_buff < nbuffers; nb_non_arbitered_buff++)
-		{
-			starpu_data_handle_t handle_arbiter = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, nb_non_arbitered_buff);
-			if (nb_non_arbitered_buff && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, nb_non_arbitered_buff-1) == handle_arbiter))
-				/* We have already requested this data, skip it. This
-				 * depends on ordering putting writes before reads, see
-				 * _starpu_compar_handles.  */
-				continue;
-			if (handle_arbiter->arbiter == arbiter)
-			{
-				break;
-			}
-		}
 
 		unsigned idx_buf_arbiter;
 		unsigned all_arbiter_available = 1;
+		starpu_data_handle_t handle_arbiter;
+		enum starpu_data_access_mode mode;
+
+		unsigned start_buf_arbiter = r->buffer_index;
 
-		for (idx_buf_arbiter = nb_non_arbitered_buff; idx_buf_arbiter < nbuffers; idx_buf_arbiter++)
+		for (idx_buf_arbiter = start_buf_arbiter; idx_buf_arbiter < nbuffers; idx_buf_arbiter++)
 		{
-			starpu_data_handle_t handle_arbiter = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter);
+			handle_arbiter = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter);
 			if (idx_buf_arbiter && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter-1)==handle_arbiter))
 				/* We have already requested this data, skip it. This
 				 * depends on ordering putting writes before reads, see
@@ -596,9 +565,9 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 				/* Will have to process another arbiter, will do that later */
 				break;
 
-			/* we post all arbiter  */
-			enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_arbiter);
+			mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_arbiter);
 
+			/* we post all arbiter  */
 			_starpu_spin_lock(&handle_arbiter->header_lock);
 			if (handle_arbiter->refcnt != 0)
 			{
@@ -608,36 +577,23 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 				break;
 			}
 			/* mark the handle as taken */
-			handle_arbiter->refcnt += 1;
+			handle_arbiter->refcnt++;
+			handle_arbiter->busy_count++;
 			handle_arbiter->current_mode = mode;
 			_starpu_spin_unlock(&handle_arbiter->header_lock);
 		}
 
 		if (all_arbiter_available)
 		{
-			for (idx_buf_arbiter = nb_non_arbitered_buff; idx_buf_arbiter < nbuffers; idx_buf_arbiter++)
-			{
-				starpu_data_handle_t handle_arbiter = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter);
-				if (idx_buf_arbiter && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter-1)==handle_arbiter))
-					continue;
-				if (handle_arbiter->arbiter != arbiter)
-					break;
-
-				/* we post all arbiter  */
-				enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_arbiter);
-
-				_starpu_spin_lock(&handle_arbiter->header_lock);
-				STARPU_ASSERT(handle_arbiter->refcnt == 1);
-				STARPU_ASSERT( handle_arbiter->busy_count >= 1);
-				STARPU_ASSERT( handle_arbiter->current_mode == mode);
-				const unsigned correctly_deleted = remove_job_from_requester_list(&handle_arbiter->arbitered_req_list, j);
-				STARPU_ASSERT(correctly_deleted == 0);
-				_starpu_spin_unlock(&handle_arbiter->header_lock);
-			}
-			/* Remove and delete list node */
+			/* Success! Drop request */
 			_starpu_data_requester_delete(r);
 
-			/* release global mutex */
+			_starpu_spin_lock(&handle->header_lock);
+			STARPU_ASSERT(handle->busy_count > 0);
+			handle->busy_count--;
+			if (!_starpu_data_check_not_busy(handle))
+				_starpu_spin_unlock(&handle->header_lock);
+
 #ifndef LOCK_OR_DELEGATE
 			STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
 #endif
@@ -653,9 +609,26 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 		}
 		else
 		{
+			/* all handles are not available - record that task on the first unavailable handle */
+
+			/* store node in list */
+			r->mode = mode;
+			_starpu_data_requester_list_push_front(&handle_arbiter->arbitered_req_list, r);
+
+			/* Move check_busy reference too */
+			_starpu_spin_lock(&handle->header_lock);
+			STARPU_ASSERT(handle->busy_count > 0);
+			handle->busy_count--;
+			if (!_starpu_data_check_not_busy(handle))
+				_starpu_spin_unlock(&handle->header_lock);
+
+			_starpu_spin_lock(&handle_arbiter->header_lock);
+			handle_arbiter->busy_count++;
+			_starpu_spin_unlock(&handle_arbiter->header_lock);
+
+			/* and revert the mark */
 			unsigned idx_buf_cancel;
-			/* all handles are not available - revert the mark */
-			for (idx_buf_cancel = nb_non_arbitered_buff; idx_buf_cancel < idx_buf_arbiter ; idx_buf_cancel++)
+			for (idx_buf_cancel = start_buf_arbiter; idx_buf_cancel < idx_buf_arbiter ; idx_buf_cancel++)
 			{
 				starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
 				if (idx_buf_cancel && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel-1)==cancel_handle))
@@ -664,8 +637,11 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 					break;
 				_starpu_spin_lock(&cancel_handle->header_lock);
 				STARPU_ASSERT(cancel_handle->refcnt == 1);
-				cancel_handle->refcnt -= 1;
-				_starpu_spin_unlock(&cancel_handle->header_lock);
+				cancel_handle->refcnt--;
+				STARPU_ASSERT(cancel_handle->busy_count > 0);
+				cancel_handle->busy_count--;
+				if (!_starpu_data_check_not_busy(cancel_handle))
+					_starpu_spin_unlock(&cancel_handle->header_lock);
 			}
 		}
 	}