Procházet zdrojové kódy

Make arbiters support concurrent read access. Fixes the matmul socl test which requires it

Samuel Thibault před 8 roky
rodič
revize
0586da60fa

+ 1 - 0
ChangeLog

@@ -39,6 +39,7 @@ Small features:
     performing MPI_Isend call
   * New function starpu_worker_display_names to display the names of
     all the workers of a specified type.
+  * Arbiters now support concurrent read access.
 
 Changes:
   * Vastly improve simgrid simulation time.

+ 65 - 20
src/core/dependencies/data_arbiter_concurrency.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2015-2016  Université de Bordeaux
+ * Copyright (C) 2015-2017  Université de Bordeaux
  * Copyright (C) 2015  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -25,8 +25,6 @@
 
 /* TODO factorize with data_concurrency.c and btw support redux */
 
-/* TODO: fine-grain R/W access */
-
 //#define LOCK_OR_DELEGATE
 
 /*
@@ -44,6 +42,7 @@
  * - for each handle h of T:
  *   - mutex_lock(&arbiter)
  *   - release reference on h
+ *   - call _starpu_notify_arbitered_dependencies which does the following
  *   - for each task Tc waiting for h:
  *     - for each data Tc_h it is waiting for:
  *       - if Tc_h is busy, goto fail
@@ -64,7 +63,7 @@
  *   - mutex_unlock(&arbiter)
  *
  *
- * at submission of task T:
+ * at submission of task T (_starpu_submit_job_enforce_arbitered_deps):
  *
  * - mutex_lock(&arbiter)
  * - for each handle h of T:
@@ -85,8 +84,26 @@
  *   - unlock(h)
  * - mutex_unlock(&arbiter)
  * - return 1;
+ *
+ * at acquire (_starpu_attempt_to_submit_arbitered_data_request):
+ * - mutex_lock(&arbiter)
+ * - try to take a reference on h
+ *   - on failure, record as waiting on h
+ * - mutex_unlock(&arbiter);
+ * - return 0 if succeeded, 1 if failed;
  */
 
+static int _starpu_arbiter_filter_modes(int mode)
+{
+	/* Do not care about some flags */
+	mode &= ~STARPU_COMMUTE;
+	mode &= ~STARPU_SSEND;
+	mode &= ~STARPU_LOCALITY;
+	if (mode == STARPU_RW)
+		mode = STARPU_W;
+	return mode;
+}
+
 struct starpu_arbiter
 {
 #ifdef LOCK_OR_DELEGATE
@@ -186,7 +203,7 @@ static int _starpu_LockOrDelegatePostOrPerform(starpu_arbiter_t arbiter, void (*
 
 #endif
 
-/* Try to submit a data request, in case the request can be processed
+/* Try to submit just one data request, in case the request can be processed
  * immediatly, return 0, if there is still a dependency that is not compatible
  * with the current mode, the request is put in the per-handle list of
  * "requesters", and this function returns 1. */
@@ -257,8 +274,7 @@ unsigned _starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_
 	STARPU_PTHREAD_MUTEX_LOCK(&arbiter->mutex);
 #endif // LOCK_OR_DELEGATE
 
-	if (mode == STARPU_RW)
-		mode = STARPU_W;
+	mode = _starpu_arbiter_filter_modes(mode);
 
 	STARPU_ASSERT_MSG(!(mode & STARPU_REDUX), "REDUX with arbiter is not implemented\n");
 
@@ -284,9 +300,21 @@ unsigned _starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_
 	/* If there is currently nobody accessing the piece of data, or it's
 	 * not another writter and if this is the same type of access as the
 	 * current one, we can proceed. */
-	unsigned put_in_list;
+	unsigned put_in_list = 1;
 
-	if (handle->refcnt)
+	if (((handle->refcnt == 0) || (!(mode == STARPU_W) && (handle->current_mode == mode))))
+	{
+		/* TODO: Detect whether this is the end of a reduction phase etc. like in data_concurrency.c */
+		if (0)
+		{
+		}
+		else
+		{
+			put_in_list = 0;
+		}
+	}
+
+	if (put_in_list)
 	{
 		/* there cannot be multiple writers or a new writer
 		 * while the data is in read mode */
@@ -394,7 +422,11 @@ void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned b
 	for (idx_buf_arbiter = start_buf_arbiter; idx_buf_arbiter < nbuffers; idx_buf_arbiter++)
 	{
 		handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter);
-		mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_arbiter);
+		mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_arbiter) & ~STARPU_COMMUTE;
+
+		mode = _starpu_arbiter_filter_modes(mode);
+
+		STARPU_ASSERT_MSG(!(mode & STARPU_REDUX), "REDUX with arbiter is not implemented\n");
 
 		if (idx_buf_arbiter && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter-1)==handle))
 			/* We have already requested this data, skip it. This
@@ -410,12 +442,13 @@ void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned b
 
 		/* Try to take handle */
 		_starpu_spin_lock(&handle->header_lock);
-		if (handle->refcnt == 0)
+		if (((handle->refcnt == 0) || (!(mode == STARPU_W) && (handle->current_mode == mode))))
 		{
 			/* Got it */
 			handle->refcnt++;
 			handle->busy_count++;
-			handle->current_mode = mode;
+			if (mode != STARPU_R || handle->current_mode != mode)
+				handle->current_mode = mode;
 			_starpu_spin_unlock(&handle->header_lock);
 		}
 		else
@@ -528,22 +561,33 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 		{
 			/* data_acquire_cb, process it */
 			enum starpu_data_access_mode r_mode = r->mode;
-			if (r_mode == STARPU_RW)
-				r_mode = STARPU_W;
+			int put_in_list = 1;
+
+			r_mode = _starpu_arbiter_filter_modes(r_mode);
 
 			_starpu_spin_lock(&handle->header_lock);
-			handle->refcnt++;
-			handle->busy_count++;
-			handle->current_mode = r_mode;
+			if (((handle->refcnt == 0) || (!(r_mode == STARPU_W) && (handle->current_mode == r_mode))))
+			{
+				handle->refcnt++;
+				handle->busy_count++;
+				handle->current_mode = r_mode;
+				put_in_list = 0;
+			}
 			_starpu_spin_unlock(&handle->header_lock);
 
+			if (put_in_list)
+				_starpu_data_requester_list_push_front(&l, r);
+
 			/* Put back remaining requests */
 			_starpu_data_requester_list_push_list_back(&handle->arbitered_req_list, &l);
 #ifndef LOCK_OR_DELEGATE
 			STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
 #endif
-			r->ready_data_callback(r->argcb);
-			_starpu_data_requester_delete(r);
+			if (!put_in_list)
+			{
+				r->ready_data_callback(r->argcb);
+				_starpu_data_requester_delete(r);
+			}
 
 			_starpu_spin_lock(&handle->header_lock);
 			STARPU_ASSERT(handle->busy_count > 0);
@@ -578,10 +622,11 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 				break;
 
 			mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_arbiter);
+			mode = _starpu_arbiter_filter_modes(mode);
 
 			/* we post all arbiter  */
 			_starpu_spin_lock(&handle_arbiter->header_lock);
-			if (handle_arbiter->refcnt != 0)
+			if (!((handle_arbiter->refcnt == 0) || (!(mode == STARPU_W) && (handle_arbiter->current_mode == mode))))
 			{
 				/* handle is not available, record ourself */
 				_starpu_spin_unlock(&handle_arbiter->header_lock);

+ 5 - 1
src/core/dependencies/data_concurrency.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2015  Université de Bordeaux
+ * Copyright (C) 2010-2015, 2017  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013  CNRS
  * Copyright (C) 2015  Inria
  *
@@ -118,6 +118,10 @@ static unsigned _starpu_attempt_to_submit_data_request(unsigned request_from_cod
 	if (handle->arbiter)
 		return _starpu_attempt_to_submit_arbitered_data_request(request_from_codelet, handle, mode, callback, argcb, j, buffer_index);
 
+	/* Do not care about some flags */
+	mode &= STARPU_COMMUTE;
+	mode &= STARPU_SSEND;
+	mode &= STARPU_LOCALITY;
 	if (mode == STARPU_RW)
 		mode = STARPU_W;