Quellcode durchsuchen

Minor code cleanups in the data request management.

Cédric Augonnet vor 15 Jahren
Ursprung
Commit
f483d3bb70

+ 18 - 26
src/datawizard/coherency.c

@@ -89,14 +89,14 @@ uint32_t _starpu_select_src_node(starpu_data_handle handle)
 }
 
 /* this may be called once the data is fetched with header and STARPU_RW-lock hold */
-void _starpu_update_data_state(starpu_data_handle handle, uint32_t requesting_node, uint8_t write)
+void _starpu_update_data_state(starpu_data_handle handle, uint32_t requesting_node, starpu_access_mode mode)
 {
 	unsigned nnodes = _starpu_get_memory_nodes_count();
 
 	/* the data is present now */
 	handle->per_node[requesting_node].requested = 0;
 
-	if (write) {
+	if (mode & STARPU_W) {
 		/* the requesting node now has the only valid copy */
 		uint32_t node;
 		for (node = 0; node < nnodes; node++)
@@ -141,7 +141,7 @@ void _starpu_update_data_state(starpu_data_handle handle, uint32_t requesting_no
  */
 
 int _starpu_fetch_data_on_node(starpu_data_handle handle, uint32_t requesting_node,
-			uint8_t read, uint8_t write, unsigned is_prefetch)
+				starpu_access_mode mode, unsigned is_prefetch)
 {
 	uint32_t local_node = _starpu_get_local_memory_node();
 
@@ -154,7 +154,7 @@ int _starpu_fetch_data_on_node(starpu_data_handle handle, uint32_t requesting_no
 	if (handle->per_node[requesting_node].state != STARPU_INVALID)
 	{
 		/* the data is already available so we can stop */
-		_starpu_update_data_state(handle, requesting_node, write);
+		_starpu_update_data_state(handle, requesting_node, mode);
 		_starpu_msi_cache_hit(requesting_node);
 		_starpu_spin_unlock(&handle->header_lock);
 		return 0;
@@ -168,16 +168,15 @@ int _starpu_fetch_data_on_node(starpu_data_handle handle, uint32_t requesting_no
 	starpu_data_request_t r;
 
 	/* is there already a pending request ? */
-	r = _starpu_search_existing_data_request(handle, requesting_node, read, write);
+	r = _starpu_search_existing_data_request(handle, requesting_node, mode);
 	/* at the exit of _starpu_search_existing_data_request the lock is taken is the request existed ! */
 
 	if (!r) {
-		//fprintf(stderr, "no request matched that one so we post a request %s\n", is_prefetch?"STARPU_PREFETCH":"");
 		/* find someone who already has the data */
 		uint32_t src_node = 0;
 
 		/* if the data is in write only mode, there is no need for a source */
-		if (read)
+		if (mode & STARPU_R)
 		{
 			src_node = _starpu_select_src_node(handle);
 			STARPU_ASSERT(src_node != requesting_node);
@@ -187,22 +186,22 @@ int _starpu_fetch_data_on_node(starpu_data_handle handle, uint32_t requesting_no
 		unsigned dst_is_a_gpu = (_starpu_get_node_kind(requesting_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(requesting_node) == STARPU_OPENCL_RAM);
 
 		/* we have to perform 2 successive requests for GPU->GPU transfers */
-		if (read && (src_is_a_gpu && dst_is_a_gpu)) {
+		if ((mode & STARPU_R) && (src_is_a_gpu && dst_is_a_gpu)) {
 			unsigned reuse_r_src_to_ram;
 			starpu_data_request_t r_src_to_ram;
 			starpu_data_request_t r_ram_to_dst;
 
 			/* XXX we hardcore 0 as the RAM node ... */
-			r_ram_to_dst = _starpu_create_data_request(handle, 0, requesting_node, requesting_node, read, write, is_prefetch);
+			r_ram_to_dst = _starpu_create_data_request(handle, 0, requesting_node, requesting_node, mode, is_prefetch);
 
 			if (!is_prefetch)
 				r_ram_to_dst->refcnt++;
 
-			r_src_to_ram = _starpu_search_existing_data_request(handle, 0, read, write);
+			r_src_to_ram = _starpu_search_existing_data_request(handle, 0, mode);
 			if (!r_src_to_ram)
 			{
 				reuse_r_src_to_ram = 0;
-				r_src_to_ram = _starpu_create_data_request(handle, src_node, 0, src_node, read, write, is_prefetch);
+				r_src_to_ram = _starpu_create_data_request(handle, src_node, 0, src_node, mode, is_prefetch);
 			}
 			else {
 				reuse_r_src_to_ram = 1;
@@ -228,7 +227,7 @@ int _starpu_fetch_data_on_node(starpu_data_handle handle, uint32_t requesting_no
 			uint32_t handling_node =
 				_starpu_select_node_to_handle_request(src_node, requesting_node);
 
-			r = _starpu_create_data_request(handle, src_node, requesting_node, handling_node, read, write, is_prefetch);
+			r = _starpu_create_data_request(handle, src_node, requesting_node, handling_node, mode, is_prefetch);
 
 			if (!is_prefetch)
 				r->refcnt++;
@@ -259,8 +258,8 @@ int _starpu_fetch_data_on_node(starpu_data_handle handle, uint32_t requesting_no
 			r->is_a_prefetch_request = 0;
 
 			/* transform that request into the proper access mode (prefetch could be read only) */
-			r->read = read;
-			r->write = write;
+#warning check that
+			r->mode = mode;
 		}
 
 		//fprintf(stderr, "found a similar request : refcnt (req) %d\n", r->refcnt);
@@ -271,22 +270,18 @@ int _starpu_fetch_data_on_node(starpu_data_handle handle, uint32_t requesting_no
 	return (is_prefetch?0:_starpu_wait_data_request_completion(r, 1));
 }
 
-static int prefetch_data_on_node(starpu_data_handle handle, uint8_t read, uint8_t write, uint32_t node)
+static int prefetch_data_on_node(starpu_data_handle handle, starpu_access_mode mode, uint32_t node)
 {
-	return _starpu_fetch_data_on_node(handle, node, read, write, 1);
+	return _starpu_fetch_data_on_node(handle, node, mode, 1);
 }
 
 static int fetch_data(starpu_data_handle handle, starpu_access_mode mode)
 {
 	uint32_t requesting_node = _starpu_get_local_memory_node(); 
 
-	uint8_t read, write;
-	read = (mode & STARPU_R); /* then R or STARPU_RW */
-	write = (mode & STARPU_W); /* then STARPU_W or STARPU_RW */
-
 	STARPU_ASSERT(!(mode & STARPU_SCRATCH));
 
-	return _starpu_fetch_data_on_node(handle, requesting_node, read, write, 0);
+	return _starpu_fetch_data_on_node(handle, requesting_node, mode, 0);
 }
 
 inline uint32_t _starpu_get_data_refcnt(starpu_data_handle handle, uint32_t node)
@@ -335,15 +330,12 @@ int _starpu_prefetch_task_input_on_node(struct starpu_task *task, uint32_t node)
 		descr = &descrs[index];
 		handle = descr->handle;
 		
-		uint32_t mode = task->buffers[index].mode;
+		starpu_access_mode mode = task->buffers[index].mode;
 
 		if (mode & STARPU_SCRATCH)
 			continue;
 	
-		uint8_t read = (mode & STARPU_R);
-		uint8_t write = (mode & STARPU_W);
-
-		prefetch_data_on_node(handle, read, write, node);
+		prefetch_data_on_node(handle, mode, node);
 	}
 
 	return 0;

+ 3 - 2
src/datawizard/coherency.h

@@ -130,10 +130,11 @@ struct starpu_data_state_t {
 void _starpu_display_msi_stats(void);
 
 __attribute__((warn_unused_result))
-int _starpu_fetch_data_on_node(struct starpu_data_state_t *state, uint32_t requesting_node, uint8_t read, uint8_t write, unsigned is_prefetch);
+int _starpu_fetch_data_on_node(struct starpu_data_state_t *state, uint32_t requesting_node,
+				starpu_access_mode mode, unsigned is_prefetch);
 void _starpu_release_data_on_node(struct starpu_data_state_t *state, uint32_t default_wb_mask, unsigned memory_node);
 
-void _starpu_update_data_state(struct starpu_data_state_t *state, uint32_t requesting_node, uint8_t write);
+void _starpu_update_data_state(struct starpu_data_state_t *state, uint32_t requesting_node, starpu_access_mode mode);
 
 uint32_t _starpu_get_data_refcnt(struct starpu_data_state_t *state, uint32_t node);
 

+ 14 - 16
src/datawizard/data_request.c

@@ -67,7 +67,7 @@ static void starpu_data_request_destroy(starpu_data_request_t r)
 }
 
 /* handle->lock should already be taken !  */
-starpu_data_request_t _starpu_create_data_request(starpu_data_handle handle, uint32_t src_node, uint32_t dst_node, uint32_t handling_node, uint8_t read, uint8_t write, unsigned is_prefetch)
+starpu_data_request_t _starpu_create_data_request(starpu_data_handle handle, uint32_t src_node, uint32_t dst_node, uint32_t handling_node, starpu_access_mode mode, unsigned is_prefetch)
 {
 	starpu_data_request_t r = starpu_data_request_new();
 
@@ -76,8 +76,7 @@ starpu_data_request_t _starpu_create_data_request(starpu_data_handle handle, uin
 	r->handle = handle;
 	r->src_node = src_node;
 	r->dst_node = dst_node;
-	r->read = read;
-	r->write = write;
+	r->mode = mode;
 
 	r->handling_node = handling_node;
 
@@ -86,7 +85,6 @@ starpu_data_request_t _starpu_create_data_request(starpu_data_handle handle, uin
 
 	r->next_req_count = 0;
 
-	r->strictness = 1;
 	r->is_a_prefetch_request = is_prefetch;
 
 	/* associate that request with the handle so that further similar
@@ -98,7 +96,7 @@ starpu_data_request_t _starpu_create_data_request(starpu_data_handle handle, uin
 
 	handle->per_node[dst_node].refcnt++;
 
-	if (read)
+	if (mode & STARPU_R)
 		handle->per_node[src_node].refcnt++;
 
 	r->refcnt = 1;
@@ -109,26 +107,26 @@ starpu_data_request_t _starpu_create_data_request(starpu_data_handle handle, uin
 }
 
 /* handle->lock should be taken */
-starpu_data_request_t _starpu_search_existing_data_request(starpu_data_handle handle, uint32_t dst_node, uint8_t read, uint8_t write)
+starpu_data_request_t _starpu_search_existing_data_request(starpu_data_handle handle, uint32_t dst_node, starpu_access_mode mode)
 {
 	starpu_data_request_t r = handle->per_node[dst_node].request;
 
 	if (r)
 	{
 		/* perhaps we need to "upgrade" the request */
-		if (read)
+		if (mode & STARPU_R)
 		{
 			/* in case the exisiting request did not imply a memory
 			 * transfer yet, we have to increment the refcnt now
 			 * (so that the source remains valid) */
-			if (!r->read)
+			if (!(r->mode & STARPU_R))
 				handle->per_node[dst_node].refcnt++;
 
-			r->read = 1;
+			r->mode |= STARPU_R;
 		}
 
-		if (write)
-			r->write = 1;
+		if (mode & STARPU_W)
+			r->mode |= STARPU_W;
 
 		_starpu_spin_lock(&r->lock);
 	}
@@ -182,7 +180,7 @@ void _starpu_post_data_request(starpu_data_request_t r, uint32_t handling_node)
 {
 //	fprintf(stderr, "POST REQUEST\n");
 
-	if (r->read)
+	if (r->mode & STARPU_R)
 	{
 		STARPU_ASSERT(r->handle->per_node[r->src_node].allocated);
 		STARPU_ASSERT(r->handle->per_node[r->src_node].refcnt);
@@ -206,7 +204,7 @@ static void starpu_handle_data_request_completion(starpu_data_request_t r)
 	uint32_t src_node = r->src_node;
 	uint32_t dst_node = r->dst_node;
 
-	_starpu_update_data_state(handle, dst_node, r->write);
+	_starpu_update_data_state(handle, dst_node, r->mode);
 
 #ifdef STARPU_USE_FXT
 	size_t size = handle->ops->get_size(handle);
@@ -223,7 +221,7 @@ static void starpu_handle_data_request_completion(starpu_data_request_t r)
 	
 	handle->per_node[dst_node].refcnt--;
 
-	if (r->read)
+	if (r->mode & STARPU_R)
 		handle->per_node[src_node].refcnt--;
 
 	r->refcnt--;
@@ -251,7 +249,7 @@ static int starpu_handle_data_request(starpu_data_request_t r, unsigned may_allo
 
 	_starpu_spin_lock(&r->lock);
 
-	if (r->read)
+	if (r->mode & STARPU_R)
 	{
 		STARPU_ASSERT(handle->per_node[r->src_node].allocated);
 		STARPU_ASSERT(handle->per_node[r->src_node].refcnt);
@@ -259,7 +257,7 @@ static int starpu_handle_data_request(starpu_data_request_t r, unsigned may_allo
 
 	/* perform the transfer */
 	/* the header of the data must be locked by the worker that submitted the request */
-	r->retval = _starpu_driver_copy_data_1_to_1(handle, r->src_node, r->dst_node, !r->read, r, may_alloc);
+	r->retval = _starpu_driver_copy_data_1_to_1(handle, r->src_node, r->dst_node, !(r->mode & STARPU_R), r, may_alloc);
 
 	if (r->retval == ENOMEM)
 	{

+ 3 - 13
src/datawizard/data_request.h

@@ -23,24 +23,17 @@
 #include <common/list.h>
 #include <common/starpu_spinlock.h>
 
-#define DATA_REQ_ALLOCATE	(1<<0)
-#define DATA_REQ_COPY		(1<<1)
-
 LIST_TYPE(starpu_data_request,
 	starpu_spinlock_t lock;
 	unsigned refcnt;
 
-	/* parameters to define the type of request */
-	unsigned flags;
-
 	starpu_data_handle handle;
 	uint32_t src_node;
 	uint32_t dst_node;
 
 	uint32_t handling_node;
 
-	uint8_t read;
-	uint8_t write;
+	starpu_access_mode mode;
 
 	starpu_async_channel async_channel;
 
@@ -52,9 +45,6 @@ LIST_TYPE(starpu_data_request,
 	/* who should perform the next request ? */
 	unsigned next_req_count;
 
-	/* is StarPU forced to honour that request ? (not really when
-	 * prefetching for instance) */
-	unsigned strictness;
 	unsigned is_a_prefetch_request;
 
 #ifdef STARPU_USE_FXT
@@ -92,8 +82,8 @@ void _starpu_handle_all_pending_node_data_requests(uint32_t src_node);
 
 int _starpu_check_that_no_data_request_exists(uint32_t node);
 
-starpu_data_request_t _starpu_create_data_request(starpu_data_handle handle, uint32_t src_node, uint32_t dst_node, uint32_t handling_node, uint8_t read, uint8_t write, unsigned is_prefetch);
-starpu_data_request_t _starpu_search_existing_data_request(starpu_data_handle handle, uint32_t dst_node, uint8_t read, uint8_t write);
+starpu_data_request_t _starpu_create_data_request(starpu_data_handle handle, uint32_t src_node, uint32_t dst_node, uint32_t handling_node, starpu_access_mode mode, unsigned is_prefetch);
+starpu_data_request_t _starpu_search_existing_data_request(starpu_data_handle handle, uint32_t dst_node, starpu_access_mode mode);
 int _starpu_wait_data_request_completion(starpu_data_request_t r, unsigned may_alloc);
 
 #endif // __DATA_REQUEST_H__

+ 1 - 1
src/datawizard/filters.c

@@ -165,7 +165,7 @@ void starpu_data_unpartition(starpu_data_handle root_handle, uint32_t gathering_
 			starpu_data_unpartition(&root_handle->children[child], gathering_node);
 
 		int ret;
-		ret = _starpu_fetch_data_on_node(&root_handle->children[child], gathering_node, 1, 0, 0);
+		ret = _starpu_fetch_data_on_node(&root_handle->children[child], gathering_node, STARPU_R, 0);
 		/* for now we pretend that the RAM is almost unlimited and that gathering 
 		 * data should be possible from the node that does the unpartionning ... we
 		 * don't want to have the programming deal with memory shortage at that time,

+ 6 - 17
src/datawizard/user_interactions.c

@@ -29,7 +29,7 @@ int starpu_data_request_allocation(starpu_data_handle handle, uint32_t node)
 
 	STARPU_ASSERT(handle);
 
-	r = _starpu_create_data_request(handle, 0, node, node, 0, 0, 1);
+	r = _starpu_create_data_request(handle, 0, node, node, 0, 1);
 
 	/* we do not increase the refcnt associated to the request since we are
 	 * not waiting for its termination */
@@ -66,10 +66,7 @@ static inline void _starpu_sync_data_with_mem_continuation_non_blocking(void *ar
 
 	STARPU_ASSERT(handle);
 
-	unsigned r = (statenode->mode & STARPU_R);
-	unsigned w = (statenode->mode & STARPU_W);
-
-	ret = _starpu_fetch_data_on_node(handle, 0, r, w, 0);
+	ret = _starpu_fetch_data_on_node(handle, 0, statenode->mode, 0);
 	STARPU_ASSERT(!ret);
 	
 	/* continuation of starpu_data_sync_with_mem_non_blocking: we
@@ -132,10 +129,7 @@ static inline void _starpu_sync_data_with_mem_continuation(void *arg)
 
 	STARPU_ASSERT(handle);
 
-	unsigned r = (statenode->mode & STARPU_R);
-	unsigned w = (statenode->mode & STARPU_W);
-
-	ret = _starpu_fetch_data_on_node(handle, 0, r, w, 0);
+	ret = _starpu_fetch_data_on_node(handle, 0, statenode->mode, 0);
 	STARPU_ASSERT(!ret);
 	
 	/* continuation of starpu_data_sync_with_mem */
@@ -195,10 +189,7 @@ int starpu_data_sync_with_mem(starpu_data_handle handle, starpu_access_mode mode
 			_starpu_sync_data_with_mem_continuation, &statenode))
 	{
 		/* no one has locked this data yet, so we proceed immediately */
-		unsigned r = (mode & STARPU_R);
-		unsigned w = (mode & STARPU_W);
-
-		int ret = _starpu_fetch_data_on_node(handle, 0, r, w, 0);
+		int ret = _starpu_fetch_data_on_node(handle, 0, mode, 0);
 		STARPU_ASSERT(!ret);
 	}
 	else {
@@ -234,7 +225,7 @@ static void _prefetch_data_on_node(void *arg)
 {
 	struct state_and_node *statenode = arg;
 
-	_starpu_fetch_data_on_node(statenode->state, statenode->node, 1, 0, statenode->async);
+	_starpu_fetch_data_on_node(statenode->state, statenode->node, STARPU_R, statenode->async);
 
 	PTHREAD_MUTEX_LOCK(&statenode->lock);
 	statenode->finished = 1;
@@ -271,9 +262,7 @@ int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle handle, unsigned
 	if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _prefetch_data_on_node, &statenode))
 	{
 		/* we can immediately proceed */
-		uint8_t read = (mode & STARPU_R);
-		uint8_t write = (mode & STARPU_W);
-		_starpu_fetch_data_on_node(handle, node, read, write, async);
+		_starpu_fetch_data_on_node(handle, node, mode, async);
 
 		/* remove the "lock"/reference */
 		if (!async)

+ 2 - 2
src/datawizard/write_back.c

@@ -42,11 +42,11 @@ void _starpu_write_through_data(starpu_data_handle handle, uint32_t requesting_n
 
 				/* check that there is not already a similar
 				 * request that we should reuse */
-				r = _starpu_search_existing_data_request(handle, node, 1, 0);
+				r = _starpu_search_existing_data_request(handle, node, STARPU_R);
 				if (!r) {
 					/* there was no existing request so we create one now */
 					r = _starpu_create_data_request(handle, requesting_node,
-							node, handling_node, 1, 0, 1);
+							node, handling_node, STARPU_R, 1);
 					_starpu_post_data_request(r, handling_node);
 				}
 				else {

+ 2 - 2
src/debug/latency.c

@@ -26,9 +26,9 @@ void _starpu_benchmark_ping_pong(starpu_data_handle handle,
 	for (iter = 0; iter < niter; iter++)
 	{
 		int ret;
-		ret = _starpu_fetch_data_on_node(handle, node0, 1, 1, 0);
+		ret = _starpu_fetch_data_on_node(handle, node0, STARPU_RW, 0);
 		STARPU_ASSERT(!ret);
-		ret = _starpu_fetch_data_on_node(handle, node1, 1, 1, 0);
+		ret = _starpu_fetch_data_on_node(handle, node1, STARPU_RW, 0);
 		STARPU_ASSERT(!ret);
 	}
 }