Browse Source

- Add a "ndeps" field in the data request structure so that a request may
depend on multiple other requests.
- Also do minor code cleanups.

Cédric Augonnet 14 years ago
parent
commit
6290ece0a1

+ 92 - 80
src/datawizard/coherency.c

@@ -148,6 +148,94 @@ void _starpu_update_data_state(starpu_data_handle handle,
  * 		    else (invalid,owner->shared)
  */
 
+static starpu_data_request_t create_new_request_to_fetch_data(starpu_data_handle handle,
+				struct starpu_data_replicate_s *dst_replicate,
+                                starpu_access_mode mode, unsigned is_prefetch,
+                                void (*callback_func)(void *), void *callback_arg)
+{
+	starpu_data_request_t r;
+	unsigned requesting_node = dst_replicate->memory_node;
+
+	/* find someone who already has the data */
+	uint32_t src_node = 0;
+
+	/* if the data is in write only mode, there is no need for a source */
+	if (mode & STARPU_R)
+	{
+		src_node = _starpu_select_src_node(handle);
+		STARPU_ASSERT(src_node != requesting_node);
+	}
+
+	unsigned src_is_a_gpu = (_starpu_get_node_kind(src_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(src_node) == STARPU_OPENCL_RAM);
+	unsigned dst_is_a_gpu = (_starpu_get_node_kind(requesting_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(requesting_node) == STARPU_OPENCL_RAM);
+
+	struct starpu_data_replicate_s *src_replicate = handle->per_node[src_node];
+
+	/* we have to perform 2 successive requests for GPU->GPU transfers */
+	if ((mode & STARPU_R) && (src_is_a_gpu && dst_is_a_gpu)) {
+		unsigned reuse_r_src_to_ram;
+		starpu_data_request_t r_src_to_ram;
+		starpu_data_request_t r_ram_to_dst;
+
+		struct starpu_data_replicate_s *ram_replicate = handle->per_node[0];
+
+		/* XXX we hardcore 0 as the RAM node ... */
+		/* We put a 1 in the number of dependencies because this
+		 * depends on the r_src_to_ram request. */
+		r_ram_to_dst = _starpu_create_data_request(handle, ram_replicate,
+					dst_replicate, requesting_node, mode, 1, is_prefetch);
+
+		if (!is_prefetch)
+			r_ram_to_dst->refcnt++;
+
+		r_src_to_ram = _starpu_search_existing_data_request(ram_replicate, mode);
+
+		reuse_r_src_to_ram = r_src_to_ram?1:0;
+
+		if (!r_src_to_ram)
+		{
+			r_src_to_ram = _starpu_create_data_request(handle, src_replicate,
+						ram_replicate, src_node, mode, 0, is_prefetch);
+		}
+
+		/* we chain both requests */
+		r_src_to_ram->next_req[r_src_to_ram->next_req_count++]= r_ram_to_dst;
+
+		_starpu_data_request_append_callback(r_ram_to_dst, callback_func, callback_arg);
+
+		if (reuse_r_src_to_ram)
+			_starpu_spin_unlock(&r_src_to_ram->lock);
+
+		_starpu_spin_unlock(&handle->header_lock);
+
+		/* we only submit the first request, the remaining will be automatically submitted afterward */
+		if (!reuse_r_src_to_ram)
+			_starpu_post_data_request(r_src_to_ram, src_node);
+
+		/* the application only waits for the termination of the last request */
+		r = r_ram_to_dst;
+	}
+	else {
+		/* who will perform that request ? */
+		uint32_t handling_node =
+			_starpu_select_node_to_handle_request(src_node, requesting_node);
+
+		r = _starpu_create_data_request(handle, src_replicate,
+				dst_replicate, handling_node, mode, 0, is_prefetch);
+
+		_starpu_data_request_append_callback(r, callback_func, callback_arg);
+
+		if (!is_prefetch)
+			r->refcnt++;
+
+		_starpu_spin_unlock(&handle->header_lock);
+
+		_starpu_post_data_request(r, handling_node);
+	}
+
+	return r;
+}
+
 int _starpu_fetch_data_on_node(starpu_data_handle handle, struct starpu_data_replicate_s *dst_replicate,
 				starpu_access_mode mode, unsigned is_prefetch,
 				void (*callback_func)(void *), void *callback_arg)
@@ -186,83 +274,10 @@ int _starpu_fetch_data_on_node(starpu_data_handle handle, struct starpu_data_rep
 
 	/* is there already a pending request ? */
 	r = _starpu_search_existing_data_request(dst_replicate, mode);
-	/* at the exit of _starpu_search_existing_data_request the lock is taken is the request existed ! */
+	/* at the exit of _starpu_search_existing_data_request the lock is taken if the request existed ! */
 
 	if (!r) {
-		/* find someone who already has the data */
-		uint32_t src_node = 0;
-
-		/* if the data is in write only mode, there is no need for a source */
-		if (mode & STARPU_R)
-		{
-			src_node = _starpu_select_src_node(handle);
-			STARPU_ASSERT(src_node != requesting_node);
-		}
-	
-		unsigned src_is_a_gpu = (_starpu_get_node_kind(src_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(src_node) == STARPU_OPENCL_RAM);
-		unsigned dst_is_a_gpu = (_starpu_get_node_kind(requesting_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(requesting_node) == STARPU_OPENCL_RAM);
-
-		struct starpu_data_replicate_s *src_replicate = handle->per_node[src_node];
-
-		/* we have to perform 2 successive requests for GPU->GPU transfers */
-		if ((mode & STARPU_R) && (src_is_a_gpu && dst_is_a_gpu)) {
-			unsigned reuse_r_src_to_ram;
-			starpu_data_request_t r_src_to_ram;
-			starpu_data_request_t r_ram_to_dst;
-
-			struct starpu_data_replicate_s *ram_replicate = handle->per_node[0];
-
-			/* XXX we hardcore 0 as the RAM node ... */
-			r_ram_to_dst = _starpu_create_data_request(handle, ram_replicate,
-						dst_replicate, requesting_node, mode, is_prefetch);
-
-			if (!is_prefetch)
-				r_ram_to_dst->refcnt++;
-
-			r_src_to_ram = _starpu_search_existing_data_request(ram_replicate, mode);
-
-			reuse_r_src_to_ram = r_src_to_ram?1:0;
-
-			if (!r_src_to_ram)
-			{
-				r_src_to_ram = _starpu_create_data_request(handle, src_replicate,
-							ram_replicate, src_node, mode, is_prefetch);
-			}
-
-			/* we chain both requests */
-			r_src_to_ram->next_req[r_src_to_ram->next_req_count++]= r_ram_to_dst;
-
-			_starpu_data_request_append_callback(r_ram_to_dst, callback_func, callback_arg);
-
-			if (reuse_r_src_to_ram)
-				_starpu_spin_unlock(&r_src_to_ram->lock);
-
-			_starpu_spin_unlock(&handle->header_lock);
-
-			/* we only submit the first request, the remaining will be automatically submitted afterward */
-			if (!reuse_r_src_to_ram)
-				_starpu_post_data_request(r_src_to_ram, src_node);
-
-			/* the application only waits for the termination of the last request */
-			r = r_ram_to_dst;
-		}
-		else {
-			/* who will perform that request ? */
-			uint32_t handling_node =
-				_starpu_select_node_to_handle_request(src_node, requesting_node);
-
-			r = _starpu_create_data_request(handle, src_replicate,
-					dst_replicate, handling_node, mode, is_prefetch);
-
-			_starpu_data_request_append_callback(r, callback_func, callback_arg);
-
-			if (!is_prefetch)
-				r->refcnt++;
-
-			_starpu_spin_unlock(&handle->header_lock);
-
-			_starpu_post_data_request(r, handling_node);
-		}
+		r = create_new_request_to_fetch_data(handle, dst_replicate, mode, is_prefetch, callback_func, callback_arg);
 	}
 	else {
 		/* the lock was taken by _starpu_search_existing_data_request */
@@ -272,7 +287,6 @@ int _starpu_fetch_data_on_node(starpu_data_handle handle, struct starpu_data_rep
 		if (is_prefetch)
 		{
 			_starpu_spin_unlock(&r->lock);
-
 			_starpu_spin_unlock(&handle->header_lock);
 
                         _STARPU_LOG_OUT_TAG("similar request");
@@ -288,11 +302,9 @@ int _starpu_fetch_data_on_node(starpu_data_handle handle, struct starpu_data_rep
 			r->is_a_prefetch_request = 0;
 
 			/* transform that request into the proper access mode (prefetch could be read only) */
-#warning check that
 			r->mode |= mode;
 		}
 
-		//_STARPU_DEBUG("found a similar request : refcnt (req) %d\n", r->refcnt);
 		_starpu_spin_unlock(&r->lock);
 		_starpu_spin_unlock(&handle->header_lock);
 	}
@@ -379,7 +391,7 @@ int _starpu_prefetch_task_input_on_node(struct starpu_task *task, uint32_t node)
 		starpu_data_handle handle = descrs[index].handle;
 		starpu_access_mode mode = descrs[index].mode;
 
-		if (mode & STARPU_SCRATCH)
+		if (mode & (STARPU_SCRATCH|STARPU_REDUX))
 			continue;
 
 		struct starpu_data_replicate_s *replicate = handle->per_node[node];
@@ -436,7 +448,7 @@ int _starpu_fetch_task_input(struct starpu_task *task, uint32_t mask)
 
 		void *interface;
 
-		if (mode & STARPU_SCRATCH)
+		if (mode & (STARPU_SCRATCH|STARPU_REDUX))
 		{
 			int workerid = starpu_worker_get_id();
 			struct starpu_data_replicate_s *local_replicate;

+ 8 - 0
src/datawizard/data_request.c

@@ -71,6 +71,7 @@ starpu_data_request_t _starpu_create_data_request(starpu_data_handle handle,
 				struct starpu_data_replicate_s *dst_replicate,
 				uint32_t handling_node,
 				starpu_access_mode mode,
+				unsigned ndeps,
 				unsigned is_prefetch)
 {
 	starpu_data_request_t r = starpu_data_request_new();
@@ -84,6 +85,7 @@ starpu_data_request_t _starpu_create_data_request(starpu_data_handle handle,
 	r->handling_node = handling_node;
 	r->completed = 0;
 	r->retval = -1;
+	r->ndeps = ndeps;
 	r->next_req_count = 0;
 	r->callbacks = NULL;
 	r->is_a_prefetch_request = is_prefetch;
@@ -179,6 +181,10 @@ void _starpu_post_data_request(starpu_data_request_t r, uint32_t handling_node)
 {
 //	_STARPU_DEBUG("POST REQUEST\n");
 
+	/* If some dependencies are not fulfilled yet, we don't actually post the request */
+	if (r->ndeps > 0)
+		return;
+
 	if (r->mode & STARPU_R)
 	{
 		STARPU_ASSERT(r->src_replicate->allocated);
@@ -237,6 +243,8 @@ static void starpu_handle_data_request_completion(starpu_data_request_t r)
 	for (chained_req = 0; chained_req < r->next_req_count; chained_req++)
 	{
 		struct starpu_data_request_s *next_req = r->next_req[chained_req];
+		STARPU_ASSERT(next_req->ndeps > 0);
+		next_req->ndeps--;
 		_starpu_post_data_request(next_req, next_req->handling_node);
 	}
 

+ 5 - 0
src/datawizard/data_request.h

@@ -48,6 +48,10 @@ LIST_TYPE(starpu_data_request,
 	unsigned completed;
 	int retval;
 
+	/* The request will not actually be submitted until there remains
+	 * dependencies. */
+	unsigned ndeps;
+
 	/* in case we have a chain of request (eg. for nvidia multi-GPU) */
 	struct starpu_data_request_s *next_req[STARPU_MAXNODES];
 	/* who should perform the next request ? */
@@ -97,6 +101,7 @@ starpu_data_request_t _starpu_create_data_request(starpu_data_handle handle,
 				struct starpu_data_replicate_s *dst_replicate,
 				uint32_t handling_node,
 				starpu_access_mode mode,
+				unsigned ndeps,
 				unsigned is_prefetch);
 
 starpu_data_request_t _starpu_search_existing_data_request(struct starpu_data_replicate_s *replicate, starpu_access_mode mode);

+ 1 - 1
src/datawizard/user_interactions.c

@@ -30,7 +30,7 @@ int starpu_data_request_allocation(starpu_data_handle handle, uint32_t node)
 
 	STARPU_ASSERT(handle);
 
-	r = _starpu_create_data_request(handle, NULL, handle->per_node[node], node, 0, 1);
+	r = _starpu_create_data_request(handle, NULL, handle->per_node[node], node, 0, 0, 1);
 
 	/* we do not increase the refcnt associated to the request since we are
 	 * not waiting for its termination */

+ 1 - 1
src/datawizard/write_back.c

@@ -46,7 +46,7 @@ void _starpu_write_through_data(starpu_data_handle handle, uint32_t requesting_n
 				if (!r) {
 					/* there was no existing request so we create one now */
 					r = _starpu_create_data_request(handle, handle->per_node[requesting_node],
-							handle->per_node[node], handling_node, STARPU_R, 1);
+							handle->per_node[node], handling_node, STARPU_R, 0, 1);
 					_starpu_post_data_request(r, handling_node);
 				}
 				else {