Browse Source

It is now possible to append callback to data requests

Cédric Augonnet 15 years ago
parent
commit
dd1034c582

+ 14 - 3
src/datawizard/coherency.c

@@ -141,7 +141,8 @@ void _starpu_update_data_state(starpu_data_handle handle, uint32_t requesting_no
  */
 
 int _starpu_fetch_data_on_node(starpu_data_handle handle, uint32_t requesting_node,
-				starpu_access_mode mode, unsigned is_prefetch)
+				starpu_access_mode mode, unsigned is_prefetch,
+				void (*callback_func)(void *), void *callback_arg)
 {
 	uint32_t local_node = _starpu_get_local_memory_node();
 
@@ -157,6 +158,10 @@ int _starpu_fetch_data_on_node(starpu_data_handle handle, uint32_t requesting_no
 		_starpu_update_data_state(handle, requesting_node, mode);
 		_starpu_msi_cache_hit(requesting_node);
 		_starpu_spin_unlock(&handle->header_lock);
+
+		if (callback_func)
+			callback_func(callback_arg);
+
 		return 0;
 	}
 
@@ -210,6 +215,8 @@ int _starpu_fetch_data_on_node(starpu_data_handle handle, uint32_t requesting_no
 			/* we chain both requests */
 			r_src_to_ram->next_req[r_src_to_ram->next_req_count++]= r_ram_to_dst;
 
+			_starpu_data_request_append_callback(r_ram_to_dst, callback_func, callback_arg);
+
 			if (reuse_r_src_to_ram)
 				_starpu_spin_unlock(&r_src_to_ram->lock);
 
@@ -229,6 +236,8 @@ int _starpu_fetch_data_on_node(starpu_data_handle handle, uint32_t requesting_no
 
 			r = _starpu_create_data_request(handle, src_node, requesting_node, handling_node, mode, is_prefetch);
 
+			_starpu_data_request_append_callback(r, callback_func, callback_arg);
+
 			if (!is_prefetch)
 				r->refcnt++;
 
@@ -239,6 +248,7 @@ int _starpu_fetch_data_on_node(starpu_data_handle handle, uint32_t requesting_no
 	}
 	else {
 		/* the lock was taken by _starpu_search_existing_data_request */
+		_starpu_data_request_append_callback(r, callback_func, callback_arg);
 
 		/* there is already a similar request */
 		if (is_prefetch)
@@ -246,6 +256,7 @@ int _starpu_fetch_data_on_node(starpu_data_handle handle, uint32_t requesting_no
 			_starpu_spin_unlock(&r->lock);
 
 			_starpu_spin_unlock(&handle->header_lock);
+
 			return 0;
 		}
 
@@ -272,7 +283,7 @@ int _starpu_fetch_data_on_node(starpu_data_handle handle, uint32_t requesting_no
 
 static int prefetch_data_on_node(starpu_data_handle handle, starpu_access_mode mode, uint32_t node)
 {
-	return _starpu_fetch_data_on_node(handle, node, mode, 1);
+	return _starpu_fetch_data_on_node(handle, node, mode, 1, NULL, NULL);
 }
 
 static int fetch_data(starpu_data_handle handle, starpu_access_mode mode)
@@ -281,7 +292,7 @@ static int fetch_data(starpu_data_handle handle, starpu_access_mode mode)
 
 	STARPU_ASSERT(!(mode & STARPU_SCRATCH));
 
-	return _starpu_fetch_data_on_node(handle, requesting_node, mode, 0);
+	return _starpu_fetch_data_on_node(handle, requesting_node, mode, 0, NULL, NULL);
 }
 
 inline uint32_t _starpu_get_data_refcnt(starpu_data_handle handle, uint32_t node)

+ 2 - 1
src/datawizard/coherency.h

@@ -131,7 +131,8 @@ void _starpu_display_msi_stats(void);
 
 __attribute__((warn_unused_result))
 int _starpu_fetch_data_on_node(struct starpu_data_state_t *state, uint32_t requesting_node,
-				starpu_access_mode mode, unsigned is_prefetch);
+				starpu_access_mode mode, unsigned is_prefetch,
+				void (*callback_func)(void *), void *callback_arg);
 void _starpu_release_data_on_node(struct starpu_data_state_t *state, uint32_t default_wb_mask, unsigned memory_node);
 
 void _starpu_update_data_state(struct starpu_data_state_t *state, uint32_t requesting_node, starpu_access_mode mode);

+ 30 - 0
src/datawizard/data_request.c

@@ -85,6 +85,8 @@ starpu_data_request_t _starpu_create_data_request(starpu_data_handle handle, uin
 
 	r->next_req_count = 0;
 
+	r->callbacks = NULL;
+
 	r->is_a_prefetch_request = is_prefetch;
 
 	/* associate that request with the handle so that further similar
@@ -196,6 +198,23 @@ void _starpu_post_data_request(starpu_data_request_t r, uint32_t handling_node)
 	_starpu_wake_all_blocked_workers_on_node(handling_node);
 }
 
+/* We assume that r->lock is taken by the caller */
+void _starpu_data_request_append_callback(starpu_data_request_t r, void (*callback_func)(void *), void *callback_arg)
+{
+	STARPU_ASSERT(r);
+
+	if (callback_func)
+	{
+		struct callback_list *link = malloc(sizeof(struct callback_list));
+		STARPU_ASSERT(link);
+
+		link->callback_func = callback_func;
+		link->callback_arg = callback_arg;
+		link->next = r->callbacks;
+		r->callbacks = link;
+	}
+}
+
 static void starpu_handle_data_request_completion(starpu_data_request_t r)
 {
 	unsigned do_delete = 0;
@@ -232,6 +251,17 @@ static void starpu_handle_data_request_completion(starpu_data_request_t r)
 	
 	r->retval = 0;
 
+	/* In case there are one or multiple callbacks, we execute them now. */
+	struct callback_list *callbacks = r->callbacks;
+	while (callbacks)
+	{
+		callbacks->callback_func(callbacks->callback_arg);
+
+		struct callback_list *next = callbacks->next;
+		free(callbacks);
+		callbacks = next;
+	}
+
 	_starpu_spin_unlock(&r->lock);
 
 	if (do_delete)

+ 11 - 0
src/datawizard/data_request.h

@@ -23,6 +23,12 @@
 #include <common/list.h>
 #include <common/starpu_spinlock.h>
 
+struct callback_list {
+	void (*callback_func)(void *);
+	void *callback_arg;
+	struct callback_list *next;
+};
+
 LIST_TYPE(starpu_data_request,
 	starpu_spinlock_t lock;
 	unsigned refcnt;
@@ -45,6 +51,8 @@ LIST_TYPE(starpu_data_request,
 	/* who should perform the next request ? */
 	unsigned next_req_count;
 
+	struct callback_list *callbacks;
+
 	unsigned is_a_prefetch_request;
 
 #ifdef STARPU_USE_FXT
@@ -86,4 +94,7 @@ starpu_data_request_t _starpu_create_data_request(starpu_data_handle handle, uin
 starpu_data_request_t _starpu_search_existing_data_request(starpu_data_handle handle, uint32_t dst_node, starpu_access_mode mode);
 int _starpu_wait_data_request_completion(starpu_data_request_t r, unsigned may_alloc);
 
+void _starpu_data_request_append_callback(starpu_data_request_t r,
+			void (*callback_func)(void *), void *callback_arg);
+
 #endif // __DATA_REQUEST_H__

+ 1 - 1
src/datawizard/filters.c

@@ -165,7 +165,7 @@ void starpu_data_unpartition(starpu_data_handle root_handle, uint32_t gathering_
 			starpu_data_unpartition(&root_handle->children[child], gathering_node);
 
 		int ret;
-		ret = _starpu_fetch_data_on_node(&root_handle->children[child], gathering_node, STARPU_R, 0);
+		ret = _starpu_fetch_data_on_node(&root_handle->children[child], gathering_node, STARPU_R, 0, NULL, NULL);
 		/* for now we pretend that the RAM is almost unlimited and that gathering 
 		 * data should be possible from the node that does the unpartionning ... we
 		 * don't want to have the programming deal with memory shortage at that time,

+ 5 - 5
src/datawizard/user_interactions.c

@@ -66,7 +66,7 @@ static inline void _starpu_sync_data_with_mem_continuation_non_blocking(void *ar
 
 	STARPU_ASSERT(handle);
 
-	ret = _starpu_fetch_data_on_node(handle, 0, statenode->mode, 0);
+	ret = _starpu_fetch_data_on_node(handle, 0, statenode->mode, 0, NULL, NULL);
 	STARPU_ASSERT(!ret);
 	
 	/* continuation of starpu_data_sync_with_mem_non_blocking: we
@@ -129,7 +129,7 @@ static inline void _starpu_sync_data_with_mem_continuation(void *arg)
 
 	STARPU_ASSERT(handle);
 
-	ret = _starpu_fetch_data_on_node(handle, 0, statenode->mode, 0);
+	ret = _starpu_fetch_data_on_node(handle, 0, statenode->mode, 0, NULL, NULL);
 	STARPU_ASSERT(!ret);
 	
 	/* continuation of starpu_data_sync_with_mem */
@@ -189,7 +189,7 @@ int starpu_data_sync_with_mem(starpu_data_handle handle, starpu_access_mode mode
 			_starpu_sync_data_with_mem_continuation, &statenode))
 	{
 		/* no one has locked this data yet, so we proceed immediately */
-		int ret = _starpu_fetch_data_on_node(handle, 0, mode, 0);
+		int ret = _starpu_fetch_data_on_node(handle, 0, mode, 0, NULL, NULL);
 		STARPU_ASSERT(!ret);
 	}
 	else {
@@ -225,7 +225,7 @@ static void _prefetch_data_on_node(void *arg)
 {
 	struct state_and_node *statenode = arg;
 
-	_starpu_fetch_data_on_node(statenode->state, statenode->node, STARPU_R, statenode->async);
+	_starpu_fetch_data_on_node(statenode->state, statenode->node, STARPU_R, statenode->async, NULL, NULL);
 
 	PTHREAD_MUTEX_LOCK(&statenode->lock);
 	statenode->finished = 1;
@@ -262,7 +262,7 @@ int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle handle, unsigned
 	if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _prefetch_data_on_node, &statenode))
 	{
 		/* we can immediately proceed */
-		_starpu_fetch_data_on_node(handle, node, mode, async);
+		_starpu_fetch_data_on_node(handle, node, mode, async, NULL, NULL);
 
 		/* remove the "lock"/reference */
 		if (!async)

+ 2 - 2
src/debug/latency.c

@@ -26,9 +26,9 @@ void _starpu_benchmark_ping_pong(starpu_data_handle handle,
 	for (iter = 0; iter < niter; iter++)
 	{
 		int ret;
-		ret = _starpu_fetch_data_on_node(handle, node0, STARPU_RW, 0);
+		ret = _starpu_fetch_data_on_node(handle, node0, STARPU_RW, 0, NULL, NULL);
 		STARPU_ASSERT(!ret);
-		ret = _starpu_fetch_data_on_node(handle, node1, STARPU_RW, 0);
+		ret = _starpu_fetch_data_on_node(handle, node1, STARPU_RW, 0, NULL, NULL);
 		STARPU_ASSERT(!ret);
 	}
 }