소스 검색

Sort data access requests by priority

Samuel Thibault 6 년 전
부모
커밋
424238f6a1

+ 13 - 11
src/core/dependencies/data_arbiter_concurrency.c

@@ -327,10 +327,11 @@ unsigned _starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_
 		r->is_requested_by_codelet = request_from_codelet;
 		r->j = j;
 		r->buffer_index = buffer_index;
+		r->prio = j->task->priority;
 		r->ready_data_callback = callback;
 		r->argcb = argcb;
 
-		_starpu_data_requester_list_push_back(&handle->arbitered_req_list, r);
+		_starpu_data_requester_prio_list_push_back(&handle->arbitered_req_list, r);
 
 		/* failed */
 		put_in_list = 1;
@@ -470,11 +471,12 @@ void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned b
 		r->is_requested_by_codelet = 1;
 		r->j = j;
 		r->buffer_index = start_buf_arbiter;
+		r->prio = j->task->priority;
 		r->ready_data_callback = NULL;
 		r->argcb = NULL;
 
 		/* store node in list */
-		_starpu_data_requester_list_push_front(&handle->arbitered_req_list, r);
+		_starpu_data_requester_prio_list_push_front(&handle->arbitered_req_list, r);
 
 		_starpu_spin_lock(&handle->header_lock);
 		handle->busy_count++;
@@ -542,7 +544,7 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 #endif
 
 	/* Since the request has been posted the handle may have been proceed and released */
-	if (_starpu_data_requester_list_empty(&handle->arbitered_req_list))
+	if (_starpu_data_requester_prio_list_empty(&handle->arbitered_req_list))
 	{
 		/* No waiter, just remove our reference */
 		_starpu_spin_lock(&handle->header_lock);
@@ -573,12 +575,12 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 
 	/* Note: we may be putting back our own requests, so avoid looping by
 	 * extracting the list */
-	struct _starpu_data_requester_list l = handle->arbitered_req_list;
-	_starpu_data_requester_list_init(&handle->arbitered_req_list);
+	struct _starpu_data_requester_prio_list l = handle->arbitered_req_list;
+	_starpu_data_requester_prio_list_init(&handle->arbitered_req_list);
 
-	while (!_starpu_data_requester_list_empty(&l))
+	while (!_starpu_data_requester_prio_list_empty(&l))
 	{
-		struct _starpu_data_requester *r = _starpu_data_requester_list_pop_front(&l);
+		struct _starpu_data_requester *r = _starpu_data_requester_prio_list_pop_front_highest(&l);
 
 		if (!r->is_requested_by_codelet)
 		{
@@ -599,10 +601,10 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 			_starpu_spin_unlock(&handle->header_lock);
 
 			if (put_in_list)
-				_starpu_data_requester_list_push_front(&l, r);
+				_starpu_data_requester_prio_list_push_front(&l, r);
 
 			/* Put back remaining requests */
-			_starpu_data_requester_list_push_list_back(&handle->arbitered_req_list, &l);
+			_starpu_data_requester_prio_list_push_prio_list_back(&handle->arbitered_req_list, &l);
 #ifndef LOCK_OR_DELEGATE
 			STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
 #endif
@@ -676,7 +678,7 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 				_starpu_spin_unlock(&handle->header_lock);
 
 			/* Put back remaining requests */
-			_starpu_data_requester_list_push_list_back(&handle->arbitered_req_list, &l);
+			_starpu_data_requester_prio_list_push_prio_list_back(&handle->arbitered_req_list, &l);
 #ifndef LOCK_OR_DELEGATE
 			STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
 #endif
@@ -696,7 +698,7 @@ void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
 
 			/* store node in list */
 			r->mode = mode;
-			_starpu_data_requester_list_push_front(&handle_arbiter->arbitered_req_list, r);
+			_starpu_data_requester_prio_list_push_front(&handle_arbiter->arbitered_req_list, r);
 
 			/* Move check_busy reference too */
 			_starpu_spin_lock(&handle->header_lock);

+ 13 - 12
src/core/dependencies/data_concurrency.c

@@ -63,7 +63,7 @@
 /* the handle header lock must be taken by the caller */
 static struct _starpu_data_requester *may_unlock_data_req_list_head(starpu_data_handle_t handle)
 {
-	struct _starpu_data_requester_list *req_list;
+	struct _starpu_data_requester_prio_list *req_list;
 
 	if (handle->reduction_refcnt > 0)
 	{
@@ -71,26 +71,26 @@ static struct _starpu_data_requester *may_unlock_data_req_list_head(starpu_data_
 	}
 	else
 	{
-		if (_starpu_data_requester_list_empty(&handle->reduction_req_list))
+		if (_starpu_data_requester_prio_list_empty(&handle->reduction_req_list))
 			req_list = &handle->req_list;
 		else
 			req_list = &handle->reduction_req_list;
 	}
 
 	/* if there is no one to unlock ... */
-	if (_starpu_data_requester_list_empty(req_list))
+	if (_starpu_data_requester_prio_list_empty(req_list))
 		return NULL;
 
 	/* if there is no reference to the data anymore, we can use it */
 	if (handle->refcnt == 0)
-		return _starpu_data_requester_list_pop_front(req_list);
+		return _starpu_data_requester_prio_list_pop_front_highest(req_list);
 
 	/* Already writing to it, do not let another write access through */
 	if (handle->current_mode == STARPU_W)
 		return NULL;
 
 	/* data->current_mode == STARPU_R, so we can process more readers */
-	struct _starpu_data_requester *r = _starpu_data_requester_list_front(req_list);
+	struct _starpu_data_requester *r = _starpu_data_requester_prio_list_front_highest(req_list);
 
 	enum starpu_data_access_mode r_mode = r->mode;
 	if (r_mode == STARPU_RW)
@@ -100,7 +100,7 @@ static struct _starpu_data_requester *may_unlock_data_req_list_head(starpu_data_
 	 * access, we only proceed if the current mode is the same as the
 	 * requested mode. */
 	if (r_mode == handle->current_mode)
-		return _starpu_data_requester_list_pop_front(req_list);
+		return _starpu_data_requester_prio_list_pop_front_highest(req_list);
 	else
 		return NULL;
 }
@@ -196,14 +196,15 @@ static unsigned _starpu_attempt_to_submit_data_request(unsigned request_from_cod
 		r->is_requested_by_codelet = request_from_codelet;
 		r->j = j;
 		r->buffer_index = buffer_index;
+		r->prio = j->task->priority;
 		r->ready_data_callback = callback;
 		r->argcb = argcb;
 
 		/* We put the requester in a specific list if this is a reduction task */
-		struct _starpu_data_requester_list *req_list =
+		struct _starpu_data_requester_prio_list *req_list =
 			is_a_reduction_task?&handle->reduction_req_list:&handle->req_list;
 
-		_starpu_data_requester_list_push_back(req_list, r);
+		_starpu_data_requester_prio_list_push_back(req_list, r);
 
 		/* failed */
 		put_in_list = 1;
@@ -405,8 +406,8 @@ int _starpu_notify_data_dependencies(starpu_data_handle_t handle)
 	{
 		/* Keep our reference for now, _starpu_notify_arbitered_dependencies
 		 * will drop it when it needs to */
-		STARPU_ASSERT(_starpu_data_requester_list_empty(&handle->req_list));
-		STARPU_ASSERT(_starpu_data_requester_list_empty(&handle->reduction_req_list));
+		STARPU_ASSERT(_starpu_data_requester_prio_list_empty(&handle->req_list));
+		STARPU_ASSERT(_starpu_data_requester_prio_list_empty(&handle->reduction_req_list));
 		_starpu_spin_unlock(&handle->header_lock);
 		/* _starpu_notify_arbitered_dependencies will handle its own locking */
 		_starpu_notify_arbitered_dependencies(handle);
@@ -423,7 +424,7 @@ int _starpu_notify_data_dependencies(starpu_data_handle_t handle)
 		/* Handle was destroyed, nothing left to do.  */
 		return 1;
 
-	STARPU_ASSERT(_starpu_data_requester_list_empty(&handle->arbitered_req_list));
+	STARPU_ASSERT(_starpu_data_requester_prio_list_empty(&handle->arbitered_req_list));
 
 	/* In case there is a pending reduction, and that this is the last
 	 * requester, we may go back to a "normal" coherency model. */
@@ -472,7 +473,7 @@ int _starpu_notify_data_dependencies(starpu_data_handle_t handle)
 		{
 			/* We need to put the request back because we must
 			 * perform a reduction before. */
-			_starpu_data_requester_list_push_front(&handle->req_list, r);
+			_starpu_data_requester_prio_list_push_front(&handle->req_list, r);
 		}
 		else
 		{

+ 4 - 4
src/datawizard/coherency.h

@@ -86,7 +86,7 @@ struct _starpu_data_replicate
 	struct _starpu_mem_chunk * mc;
 };
 
-struct _starpu_data_requester_list;
+struct _starpu_data_requester_prio_list;
 
 struct _starpu_jobid_list
 {
@@ -117,7 +117,7 @@ typedef void (*_starpu_data_handle_unregister_hook)(starpu_data_handle_t);
 struct _starpu_data_state
 {
 	int magic;
-	struct _starpu_data_requester_list req_list;
+	struct _starpu_data_requester_prio_list req_list;
 	/* the number of requests currently in the scheduling engine (not in
 	 * the req_list anymore), i.e. the number of holders of the
 	 * current_mode rwlock */
@@ -245,7 +245,7 @@ struct _starpu_data_state
 	/* List of requesters that are specific to the pending reduction. This
 	 * list is used when the requests in the req_list list are frozen until
 	 * the end of the reduction. */
-	struct _starpu_data_requester_list reduction_req_list;
+	struct _starpu_data_requester_prio_list reduction_req_list;
 
 	starpu_data_handle_t *reduction_tmp_handles;
 
@@ -270,7 +270,7 @@ struct _starpu_data_state
 
 	struct starpu_arbiter *arbiter;
 	/* This is protected by the arbiter mutex */
-	struct _starpu_data_requester_list arbitered_req_list;
+	struct _starpu_data_requester_prio_list arbitered_req_list;
 
 	/* Data maintained by schedulers themselves */
 	/* Last worker that took this data in locality mode, or -1 if nobody

+ 4 - 1
src/datawizard/data_request.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2008-2011,2013-2017                      Université de Bordeaux
+ * Copyright (C) 2008-2011,2013-2018                      Université de Bordeaux
  * Copyright (C) 2010-2011,2013,2015,2017                 CNRS
  * Copyright (C) 2017                                     Inria
  *
@@ -121,12 +121,15 @@ LIST_TYPE(_starpu_data_requester,
 	struct _starpu_job *j;
 	unsigned buffer_index;
 
+	int prio;
+
 	/* if this is more complicated ... (eg. application request) 
 	 * NB: this callback is not called with the lock taken !
 	 */
 	void (*ready_data_callback)(void *argcb);
 	void *argcb;
 )
+PRIO_LIST_TYPE(_starpu_data_requester, prio)
 
 void _starpu_init_data_request_lists(void);
 void _starpu_deinit_data_request_lists(void);

+ 3 - 3
src/datawizard/filters.c

@@ -257,8 +257,8 @@ static void _starpu_data_partition(starpu_data_handle_t initial_handle, starpu_d
 		child->home_node = initial_handle->home_node;
 
 		/* initialize the chunk lock */
-		_starpu_data_requester_list_init(&child->req_list);
-		_starpu_data_requester_list_init(&child->reduction_req_list);
+		_starpu_data_requester_prio_list_init(&child->req_list);
+		_starpu_data_requester_prio_list_init(&child->reduction_req_list);
 		child->reduction_tmp_handles = NULL;
 		child->write_invalidation_req = NULL;
 		child->refcnt = 0;
@@ -300,7 +300,7 @@ static void _starpu_data_partition(starpu_data_handle_t initial_handle, starpu_d
 			starpu_data_assign_arbiter(child, _starpu_global_arbiter);
 		else
 			child->arbiter = NULL;
-		_starpu_data_requester_list_init(&child->arbitered_req_list);
+		_starpu_data_requester_prio_list_init(&child->arbitered_req_list);
 
 		for (node = 0; node < STARPU_MAXNODES; node++)
 		{

+ 3 - 3
src/datawizard/interfaces/data_interface.c

@@ -262,7 +262,7 @@ static void _starpu_register_new_data(starpu_data_handle_t handle,
 	STARPU_ASSERT(handle);
 
 	/* initialize the new lock */
-	_starpu_data_requester_list_init(&handle->req_list);
+	_starpu_data_requester_prio_list_init(&handle->req_list);
 	handle->refcnt = 0;
 	handle->unlocking_reqs = 0;
 	handle->busy_count = 0;
@@ -316,7 +316,7 @@ static void _starpu_register_new_data(starpu_data_handle_t handle,
 	handle->init_cl = NULL;
 
 	handle->reduction_refcnt = 0;
-	_starpu_data_requester_list_init(&handle->reduction_req_list);
+	_starpu_data_requester_prio_list_init(&handle->reduction_req_list);
 	handle->reduction_tmp_handles = NULL;
 	handle->write_invalidation_req = NULL;
 
@@ -339,7 +339,7 @@ static void _starpu_register_new_data(starpu_data_handle_t handle,
 		starpu_data_assign_arbiter(handle, _starpu_global_arbiter);
 	else
 		handle->arbiter = NULL;
-	_starpu_data_requester_list_init(&handle->arbitered_req_list);
+	_starpu_data_requester_prio_list_init(&handle->arbitered_req_list);
 	handle->last_locality = -1;
 
 	/* that new data is invalid from all nodes perpective except for the