Selaa lähdekoodia

Complete arbiter implementation

Samuel Thibault 10 vuotta sitten
vanhempi
commit
8fbc9b2c5a

+ 171 - 31
src/core/dependencies/data_arbiter_concurrency.c

@@ -21,6 +21,8 @@
 #include <common/starpu_spinlock.h>
 #include <datawizard/sort_data_handles.h>
 
+/* TODO factorize with data_concurrency.c and btw support redux */
+
 //#define LOCK_OR_DELEGATE
 
 /*
@@ -179,6 +181,158 @@ static int _starpu_LockOrDelegatePostOrPerform(starpu_arbiter_t arbiter, void (*
 
 #endif
 
+/* Try to submit a data request, in case the request can be processed
+ * immediatly, return 0, if there is still a dependency that is not compatible
+ * with the current mode, the request is put in the per-handle list of
+ * "requesters", and this function returns 1. */
+#ifdef LOCK_OR_DELEGATE
+struct starpu_submit_arbitered_args
+{
+	unsigned request_from_codelet;
+	starpu_data_handle_t handle;
+	enum starpu_data_access_mode mode;
+	void (*callback)(void *);
+	void *argcb;
+	struct _starpu_job *j;
+	unsigned buffer_index;
+};
+static unsigned ___starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_codelet,
+						       starpu_data_handle_t handle, enum starpu_data_access_mode mode,
+						       void (*callback)(void *), void *argcb,
+						       struct _starpu_job *j, unsigned buffer_index);
+static void __starpu_attempt_to_submit_arbitered_data_request(void *inData)
+{
+	struct starpu_submit_arbitered_args* args = inData;
+	unsigned request_from_codelet = args->request_from_codelet;
+	starpu_data_handle_t handle = args->handle;
+	enum starpu_data_access_mode mode = args->mode;
+	void (*callback)(void*) = args->callback;
+	void *argcb = args->argcb;
+	struct _starpu_job *j = args->j;
+	unsigned buffer_index = args->buffer_index;
+	free(args);
+	if (!___starpu_attempt_to_submit_arbitered_data_request(request_from_codelet, handle, mode, callback, argcb, j, buffer_index))
+		/* Success, but we have no way to report it to original caller,
+		 * so call callback ourself */
+		callback(argcb);
+}
+
+unsigned _starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_codelet,
+						       starpu_data_handle_t handle, enum starpu_data_access_mode mode,
+						       void (*callback)(void *), void *argcb,
+						       struct _starpu_job *j, unsigned buffer_index)
+{
+	struct starpu_submit_arbitered_args* args = malloc(sizeof(*args));
+	args->request_from_codelet = request_from_codelet;
+	args->handle = handle;
+	args->mode = mode;
+	args->callback = callback;
+	args->argcb = argcb;
+	args->j = j;
+	args->buffer_index = buffer_index;
+	/* The function will delete args */
+	_starpu_LockOrDelegatePostOrPerform(handle->arbiter, &__starpu_attempt_to_submit_arbitered_data_request, args);
+	return 1;
+}
+
+unsigned ___starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_codelet,
+						       starpu_data_handle_t handle, enum starpu_data_access_mode mode,
+						       void (*callback)(void *), void *argcb,
+						       struct _starpu_job *j, unsigned buffer_index)
+{
+	STARPU_ASSERT(handle->arbiter);
+#else // LOCK_OR_DELEGATE
+unsigned _starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_codelet,
+						       starpu_data_handle_t handle, enum starpu_data_access_mode mode,
+						       void (*callback)(void *), void *argcb,
+						       struct _starpu_job *j, unsigned buffer_index)
+{
+	starpu_arbiter_t arbiter = handle->arbiter;
+	STARPU_PTHREAD_MUTEX_LOCK(&arbiter->mutex);
+#endif // LOCK_OR_DELEGATE
+
+	if (mode == STARPU_RW)
+		mode = STARPU_W;
+
+	STARPU_ASSERT_MSG(!(mode & STARPU_REDUX), "REDUX with arbiter is not implemented\n");
+
+	/* Take the lock protecting the header. We try to do some progression
+	 * in case this is called from a worker, otherwise we just wait for the
+	 * lock to be available. */
+	if (request_from_codelet)
+	{
+		int cpt = 0;
+		while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
+		{
+			cpt++;
+			_starpu_datawizard_progress(_starpu_memory_node_get_local_key(), 0);
+		}
+		if (cpt == STARPU_SPIN_MAXTRY)
+			_starpu_spin_lock(&handle->header_lock);
+	}
+	else
+	{
+		_starpu_spin_lock(&handle->header_lock);
+	}
+
+	/* If there is currently nobody accessing the piece of data, or it's
+	 * not another writter and if this is the same type of access as the
+	 * current one, we can proceed. */
+	unsigned put_in_list;
+
+	if (handle->refcnt)
+	{
+		/* there cannot be multiple writers or a new writer
+		 * while the data is in read mode */
+
+		handle->busy_count++;
+		/* enqueue the request */
+		struct _starpu_data_requester *r = _starpu_data_requester_new();
+		r->mode = mode;
+		r->is_requested_by_codelet = request_from_codelet;
+		r->j = j;
+		r->buffer_index = buffer_index;
+		r->ready_data_callback = callback;
+		r->argcb = argcb;
+
+		_starpu_data_requester_list_push_back(handle->arbitered_req_list, r);
+
+		/* failed */
+		put_in_list = 1;
+	}
+	else
+	{
+		handle->refcnt++;
+		handle->busy_count++;
+
+		/* Do not write to handle->current_mode if it is already
+		 * R. This avoids a spurious warning from helgrind when
+		 * the following happens:
+		 * acquire(R) in thread A
+		 * acquire(R) in thread B
+		 * release_data_on_node() in thread A
+		 * helgrind would shout that the latter reads current_mode
+		 * unsafely.
+		 *
+		 * This actually basically explains helgrind that it is a
+		 * shared R acquisition.
+		 */
+		if (mode != STARPU_R || handle->current_mode != mode)
+			handle->current_mode = mode;
+
+		/* success */
+		put_in_list = 0;
+	}
+
+	_starpu_spin_unlock(&handle->header_lock);
+#ifndef LOCK_OR_DELEGATE
+	STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
+#endif // LOCK_OR_DELEGATE
+	return put_in_list;
+
+}
+
+
 
 /* This function find a node that contains the parameter j as job and remove it from the list
  * the function return 0 if a node was found and deleted, 1 otherwise
@@ -210,14 +364,12 @@ struct starpu_enforce_arbitered_args
 static void ___starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers);
 static void __starpu_submit_job_enforce_arbitered_deps(void* inData)
 {
-	struct starpu_enforce_arbitered_args* args = (struct starpu_enforce_arbitered_args*)inData;
+	struct starpu_enforce_arbitered_args* args = inData;
 	struct _starpu_job *j = args->j;
 	unsigned buf		  = args->buf;
 	unsigned nbuffers	 = args->nbuffers;
 	/* we are in charge of freeing the args */
 	free(args);
-	args = NULL;
-	inData = NULL;
 	___starpu_submit_job_enforce_arbitered_deps(j, buf, nbuffers);
 }
 
@@ -320,9 +472,6 @@ void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned b
 			r->argcb = NULL;
 
 			_starpu_spin_lock(&cancel_handle->header_lock);
-			/* create list if needed */
-			if (cancel_handle->arbitered_req_list == NULL)
-				cancel_handle->arbitered_req_list = _starpu_data_requester_list_new();
 			/* store node in list */
 			_starpu_data_requester_list_push_front(cancel_handle->arbitered_req_list, r);
 			/* inc the busy count if it has not been changed in the previous loop */
@@ -334,7 +483,7 @@ void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned b
 #ifndef LOCK_OR_DELEGATE
 		STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
 #endif
-		return 1;
+		return;
 	}
 #ifndef LOCK_OR_DELEGATE
 	STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
@@ -342,20 +491,18 @@ void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned b
 
 	// all_arbiter_available is true
 	if (idx_buf_arbiter < nbuffers)
-	{
 		/* Other arbitered data, process them */
-		return _starpu_submit_job_enforce_arbitered_deps(j, idx_buf_arbiter, nbuffers);
-	}
-	/* Finished with all data, can eventually push! */
-	_starpu_push_task(j);
-	return 0;
+		_starpu_submit_job_enforce_arbitered_deps(j, idx_buf_arbiter, nbuffers);
+	else
+		/* Finished with all data, can eventually push! */
+		_starpu_push_task(j);
 }
 
 #ifdef LOCK_OR_DELEGATE
 void ___starpu_notify_arbitered_dependencies(starpu_data_handle_t handle);
 void __starpu_notify_arbitered_dependencies(void* inData)
 {
-	starpu_data_handle_t handle = (starpu_data_handle_t)inData;
+	starpu_data_handle_t handle = inData;
 	___starpu_notify_arbitered_dependencies(handle);
 }
 void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
@@ -363,29 +510,28 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 	_starpu_LockOrDelegatePostOrPerform(handle->arbiter, &__starpu_notify_arbitered_dependencies, handle);
 }
 void ___starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
-{
 #else // LOCK_OR_DELEGATE
 void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
-{
 #endif
+{
 	starpu_arbiter_t arbiter = handle->arbiter;
 #ifndef LOCK_OR_DELEGATE
 	STARPU_PTHREAD_MUTEX_LOCK(&arbiter->mutex);
 #endif
 
 	/* Since the request has been posted the handle may have been proceed and released */
-	if (handle->arbitered_req_list == NULL)
+	if (_starpu_data_requester_list_empty(handle->arbitered_req_list))
 	{
 #ifndef LOCK_OR_DELEGATE
 		STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
 #endif
-		return 1;
+		return;
 	}
 	/* no one has the right to work on arbitered_req_list without a lock on mutex
 	   so we do not need to lock the handle for safety */
 	struct _starpu_data_requester *r;
 	r = _starpu_data_requester_list_begin(handle->arbitered_req_list); //_head;
-	while (r)
+	while (r != _starpu_data_requester_list_end(handle->arbitered_req_list))
 	{
 		struct _starpu_job* j = r->j;
 		unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(j->task);
@@ -399,7 +545,6 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 				 * depends on ordering putting writes before reads, see
 				 * _starpu_compar_handles.  */
 				continue;
-			enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, nb_non_arbitered_buff);
 			if (handle_arbiter->arbiter == arbiter)
 			{
 				break;
@@ -457,11 +602,6 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 				STARPU_ASSERT( handle_arbiter->current_mode == mode);
 				const unsigned correctly_deleted = remove_job_from_requester_list(handle_arbiter->arbitered_req_list, j);
 				STARPU_ASSERT(correctly_deleted == 0);
-				if (_starpu_data_requester_list_empty(handle_arbiter->arbitered_req_list)) // If size == 0
-				{
-					_starpu_data_requester_list_delete(handle_arbiter->arbitered_req_list);
-					handle_arbiter->arbitered_req_list = NULL;
-				}
 				_starpu_spin_unlock(&handle_arbiter->header_lock);
 			}
 			/* Remove and delete list node */
@@ -473,16 +613,13 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 #endif
 
 			if (idx_buf_arbiter < nbuffers)
-			{
 				/* Other arbitered data, process them */
 				_starpu_submit_job_enforce_arbitered_deps(j, idx_buf_arbiter, nbuffers);
-			}
 			else
 				/* Finished with all data, can eventually push! */
 				_starpu_push_task(j);
 
-			/* We need to lock when returning 0 */
-			return 0;
+			return;
 		}
 		else
 		{
@@ -502,13 +639,13 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 			}
 		}
 
-		r = r->_next;
+		r = _starpu_data_requester_list_next(r);
 	}
 	/* no task has been pushed */
 #ifndef LOCK_OR_DELEGATE
 	STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
 #endif
-	return 1;
+	return;
 }
 
 starpu_arbiter_t starpu_arbiter_create(void)
@@ -528,6 +665,9 @@ starpu_arbiter_t starpu_arbiter_create(void)
 
 void starpu_data_assign_arbiter(starpu_data_handle_t handle, starpu_arbiter_t arbiter)
 {
+	if (handle->arbiter && handle->arbiter == _starpu_global_arbiter)
+		/* Just for testing purpose */
+		return;
 	STARPU_ASSERT_MSG(!handle->arbiter, "handle can only be assigned one arbiter");
 	STARPU_ASSERT_MSG(!handle->refcnt, "arbiter can be assigned to handle only right after initialization");
 	STARPU_ASSERT_MSG(!handle->busy_count, "arbiter can be assigned to handle only right after initialization");

+ 15 - 16
src/core/dependencies/data_concurrency.c

@@ -113,9 +113,8 @@ static unsigned _starpu_attempt_to_submit_data_request(unsigned request_from_cod
 						       void (*callback)(void *), void *argcb,
 						       struct _starpu_job *j, unsigned buffer_index)
 {
-	/* TODO: implement */
 	if (handle->arbiter)
-		_STARPU_DISP("data acquisition not completely safe with arbitered handles\n");
+		return _starpu_attempt_to_submit_arbitered_data_request(request_from_codelet, handle, mode, callback, argcb, j, buffer_index);
 
 	if (mode == STARPU_RW)
 		mode = STARPU_W;
@@ -354,6 +353,20 @@ int _starpu_notify_data_dependencies(starpu_data_handle_t handle)
 		/* Handle was destroyed, nothing left to do.  */
 		return 1;
 
+	if (handle->arbiter)
+	{
+		unsigned refcnt = handle->refcnt;
+		STARPU_ASSERT(_starpu_data_requester_list_empty(handle->req_list));
+		STARPU_ASSERT(_starpu_data_requester_list_empty(handle->reduction_req_list));
+		_starpu_spin_unlock(&handle->header_lock);
+		/* _starpu_notify_arbitered_dependencies will handle its own locking */
+		if (!refcnt)
+			_starpu_notify_arbitered_dependencies(handle);
+		/* We have already unlocked */
+		return 1;
+	}
+	STARPU_ASSERT(_starpu_data_requester_list_empty(handle->arbitered_req_list));
+
 	/* In case there is a pending reduction, and that this is the last
 	 * requester, we may go back to a "normal" coherency model. */
 	if (handle->reduction_refcnt > 0)
@@ -434,19 +447,5 @@ int _starpu_notify_data_dependencies(starpu_data_handle_t handle)
 		}
 	}
 
-	// WIP_COMMUTE Begin
-
-	/* TODO: directly call that */
-
-	if(handle->refcnt == 0 && handle->arbitered_req_list != NULL)
-	{
-		_starpu_spin_unlock(&handle->header_lock);
-		/* _starpu_notify_arbitered_dependencies will handle its own locking */
-		_starpu_notify_arbitered_dependencies(handle);
-		/* We have already unlocked */
-		return 1;
-	}
-	// WIP_COMMUTE End
-
 	return 0;
 }

+ 5 - 0
src/core/dependencies/data_concurrency.h

@@ -31,5 +31,10 @@ unsigned _starpu_attempt_to_submit_data_request_from_apps(starpu_data_handle_t h
 							  enum starpu_data_access_mode mode,
 							  void (*callback)(void *), void *argcb);
 
+unsigned _starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_codelet,
+						       starpu_data_handle_t handle, enum starpu_data_access_mode mode,
+						       void (*callback)(void *), void *argcb,
+						       struct _starpu_job *j, unsigned buffer_index);
+
 #endif // __DATA_CONCURRENCY_H__
 

+ 1 - 0
src/datawizard/coherency.h

@@ -233,6 +233,7 @@ struct _starpu_data_state
 	_starpu_data_handle_unregister_hook unregister_hook;
 
 	struct starpu_arbiter *arbiter;
+	/* This is protected by the arbiter mutex */
 	struct _starpu_data_requester_list *arbitered_req_list;
 };
 

+ 7 - 0
src/datawizard/filters.c

@@ -209,6 +209,13 @@ void starpu_data_partition(starpu_data_handle_t initial_handle, struct starpu_da
 		child->last_submitted_ghost_accessors_id = NULL;
 #endif
 
+		if (_starpu_global_arbiter)
+			/* Just for testing purpose */
+			starpu_data_assign_arbiter(child, _starpu_global_arbiter);
+		else
+			child->arbiter = NULL;
+		child->arbitered_req_list = _starpu_data_requester_list_new();
+
 		for (node = 0; node < STARPU_MAXNODES; node++)
 		{
 			struct _starpu_data_replicate *initial_replicate;

+ 13 - 0
src/datawizard/interfaces/data_interface.c

@@ -41,12 +41,17 @@ struct handle_entry
 static struct handle_entry *registered_handles;
 static struct _starpu_spinlock    registered_handles_lock;
 static int _data_interface_number = STARPU_MAX_INTERFACE_ID;
+starpu_arbiter_t _starpu_global_arbiter;
 
 static void _starpu_data_unregister(starpu_data_handle_t handle, unsigned coherent, unsigned nowait);
 
 void _starpu_data_interface_init(void)
 {
 	_starpu_spin_init(&registered_handles_lock);
+
+	/* Just for testing purpose */
+	if (starpu_get_env_number_default("STARPU_ARBITER", 0) > 0)
+		_starpu_global_arbiter = starpu_arbiter_create();
 }
 
 void _starpu_data_interface_shutdown()
@@ -291,6 +296,13 @@ static void _starpu_register_new_data(starpu_data_handle_t handle,
 
 	handle->home_node = home_node;
 
+	if (_starpu_global_arbiter)
+		/* Just for testing purpose */
+		starpu_data_assign_arbiter(handle, _starpu_global_arbiter);
+	else
+		handle->arbiter = NULL;
+	handle->arbitered_req_list = _starpu_data_requester_list_new();
+
 	/* that new data is invalid from all nodes perpective except for the
 	 * home node */
 	unsigned node;
@@ -791,6 +803,7 @@ static void _starpu_data_unregister(starpu_data_handle_t handle, unsigned cohere
 	_starpu_data_free_interfaces(handle);
 
 	_starpu_memory_stats_free(handle);
+	_starpu_data_requester_list_delete(handle->arbitered_req_list);
 	_starpu_data_requester_list_delete(handle->req_list);
 	_starpu_data_requester_list_delete(handle->reduction_req_list);
 

+ 2 - 1
src/datawizard/interfaces/data_interface.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009-2012, 2014  Université de Bordeaux
+ * Copyright (C) 2009-2012, 2014-2015  Université de Bordeaux
  * Copyright (C) 2010, 2012, 2013, 2014, 2015  Centre National de la Recherche Scientifique
  * Copyright (C) 2014  Inria
  *
@@ -57,6 +57,7 @@ void _starpu_data_free_interfaces(starpu_data_handle_t handle)
 extern
 int _starpu_data_handle_init(starpu_data_handle_t handle, struct starpu_data_interface_ops *interface_ops, unsigned int mf_node);
 
+struct starpu_arbiter *_starpu_global_arbiter;
 extern void _starpu_data_interface_init(void) STARPU_ATTRIBUTE_INTERNAL;
 extern int _starpu_data_check_not_busy(starpu_data_handle_t handle) STARPU_ATTRIBUTE_INTERNAL;
 extern void _starpu_data_interface_shutdown(void) STARPU_ATTRIBUTE_INTERNAL;