瀏覽代碼

merge branch gpumem_prefetch

Nathalie Furmento 13 年之前
父節點
當前提交
6db2c372ca

+ 8 - 0
configure.ac

@@ -710,6 +710,14 @@ if test x$enable_fast = xyes; then
 	AC_DEFINE(STARPU_NO_ASSERT, [1], [disable assertions])
 fi
 
+AC_MSG_CHECKING(whether memory status should be displayed)
+AC_ARG_ENABLE(memory-status, [AS_HELP_STRING([--enable-memory-status],
+			     [display memory status at the end of execution])],	
+			     enable_memory_status=$enableval, enable_memory_status=no)	
+AC_MSG_RESULT($enable_memory_status)
+if test x$enable_memory_status = xyes; then
+        AC_DEFINE(STARPU_MEMORY_STATUS, [1], [display memory status])
+fi			     				      
 
 
 AC_MSG_CHECKING(whether debug messages should be displayed)

+ 5 - 0
src/core/workers.c

@@ -491,6 +491,11 @@ void starpu_shutdown(void)
 	/* tell all workers to shutdown */
 	_starpu_kill_all_workers(&config);
 
+#ifdef STARPU_MEMORY_STATUS
+	if ((stats = getenv("STARPU_MEMORY_STATS")) && atoi(stats))
+		_starpu_display_data_stats();
+#endif
+
 #ifdef STARPU_DATA_STATS
 	_starpu_display_comm_amounts();
 #endif

+ 23 - 5
src/datawizard/coherency.c

@@ -223,7 +223,7 @@ static int determine_request_path(starpu_data_handle handle,
 /* handle->lock should be taken. r is returned locked. The node parameter
  * indicate either the source of the request, or the destination for a
  * write-only request. */
-static starpu_data_request_t _starpu_search_existing_data_request(struct starpu_data_replicate_s *replicate, unsigned node, starpu_access_mode mode)
+static starpu_data_request_t _starpu_search_existing_data_request(struct starpu_data_replicate_s *replicate, unsigned node, starpu_access_mode mode, unsigned is_prefetch)
 {
 	starpu_data_request_t r;
 
@@ -232,8 +232,11 @@ static starpu_data_request_t _starpu_search_existing_data_request(struct starpu_
 	if (r)
 	{
 		_starpu_spin_lock(&r->lock);
-
-		/* perhaps we need to "upgrade" the request */
+                
+                /* perhaps we need to "upgrade" the request */
+		if (is_prefetch < r->prefetch) 
+			_starpu_update_prefetch_status(r);
+		
 		if (mode & STARPU_R)
 		{
 			/* in case the exisiting request did not imply a memory
@@ -284,9 +287,23 @@ starpu_data_request_t create_request_to_fetch_data(starpu_data_handle handle,
 
 	if (dst_replicate->state != STARPU_INVALID)
 	{
+#ifdef STARPU_MEMORY_STATUS
+		starpu_cache_state old_state = dst_replicate->state;
+#endif
 		/* the data is already available so we can stop */
 		_starpu_update_data_state(handle, dst_replicate, mode);
 		_starpu_msi_cache_hit(requesting_node);
+		
+#ifdef STARPU_MEMORY_STATUS
+		_starpu_handle_stats_cache_hit(handle, requesting_node);
+
+		/* XXX Broken ? */
+		if (old_state == STARPU_SHARED 
+		    && dst_replicate->state == STARPU_OWNER)
+			_starpu_handle_stats_shared_to_owner(handle, requesting_node);
+#endif
+		
+		starpu_memchunk_recently_used(dst_replicate->mc, requesting_node);
 
 		_starpu_spin_unlock(&handle->header_lock);
 
@@ -345,7 +362,8 @@ starpu_data_request_t create_request_to_fetch_data(starpu_data_handle handle,
 
 		/* Try to reuse a request if possible */
 		r = _starpu_search_existing_data_request(hop_dst_replicate,
-				(mode & STARPU_R)?hop_src_node:hop_dst_node, mode);
+				(mode & STARPU_R)?hop_src_node:hop_dst_node, 
+							 mode, is_prefetch);
 
 		reused_requests[hop] = !!r;
 
@@ -353,7 +371,7 @@ starpu_data_request_t create_request_to_fetch_data(starpu_data_handle handle,
 			/* Create a new request if there was no request to reuse */
 			r = _starpu_create_data_request(handle, hop_src_replicate,
 					hop_dst_replicate, hop_handling_node,
-					mode, ndeps);
+					mode, ndeps, is_prefetch);
 		}
 
 		requests[hop] = r; 

+ 12 - 0
src/datawizard/coherency.h

@@ -67,6 +67,9 @@ LIST_TYPE(starpu_data_replicate,
 	 * */
 	uint8_t automatically_allocated;
 
+        /* Pointer to memchunk for LRU strategy */
+	struct starpu_mem_chunk_s * mc;
+ 
 	/* To help the scheduling policies to make some decision, we
 	   may keep a track of the tasks that are likely to request 
 	   this data on the current node.
@@ -190,6 +193,15 @@ struct starpu_data_state_t {
         /* Used for MPI */
         int rank;
 	int tag;
+
+#ifdef STARPU_MEMORY_STATUS
+	/* Handle access stats per node */
+	unsigned stats_direct_access[STARPU_MAXNODES];
+	unsigned stats_loaded_shared[STARPU_MAXNODES];
+	unsigned stats_loaded_owner[STARPU_MAXNODES];
+	unsigned stats_shared_to_owner[STARPU_MAXNODES];
+	unsigned stats_invalidated[STARPU_MAXNODES];
+#endif
 };
 
 void _starpu_display_msi_stats(void);

+ 1 - 1
src/datawizard/copy_driver.c

@@ -252,7 +252,7 @@ int __attribute__((warn_unused_result)) _starpu_driver_copy_data_1_to_1(starpu_d
 		if (!may_alloc)
 			return -ENOMEM;
 
-		ret_alloc = _starpu_allocate_memory_on_node(handle, dst_replicate);
+		ret_alloc = _starpu_allocate_memory_on_node(handle, dst_replicate,req->prefetch);
 		if (ret_alloc)
 			return -ENOMEM;
 	}

+ 136 - 3
src/datawizard/data_request.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2009, 2010  Université de Bordeaux 1
- * Copyright (C) 2010  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -21,6 +21,7 @@
 
 /* requests that have not been treated at all */
 static starpu_data_request_list_t data_requests[STARPU_MAXNODES];
+static starpu_data_request_list_t prefetch_requests[STARPU_MAXNODES];
 static pthread_cond_t data_requests_list_cond[STARPU_MAXNODES];
 static pthread_mutex_t data_requests_list_mutex[STARPU_MAXNODES];
 
@@ -29,11 +30,14 @@ static starpu_data_request_list_t data_requests_pending[STARPU_MAXNODES];
 static pthread_cond_t data_requests_pending_list_cond[STARPU_MAXNODES];
 static pthread_mutex_t data_requests_pending_list_mutex[STARPU_MAXNODES];
 
+int starpu_memstrategy_drop_prefetch[STARPU_MAXNODES];
+
 void _starpu_init_data_request_lists(void)
 {
 	unsigned i;
 	for (i = 0; i < STARPU_MAXNODES; i++)
 	{
+		prefetch_requests[i] = starpu_data_request_list_new();
 		data_requests[i] = starpu_data_request_list_new();
 		PTHREAD_MUTEX_INIT(&data_requests_list_mutex[i], NULL);
 		PTHREAD_COND_INIT(&data_requests_list_cond[i], NULL);
@@ -41,6 +45,8 @@ void _starpu_init_data_request_lists(void)
 		data_requests_pending[i] = starpu_data_request_list_new();
 		PTHREAD_MUTEX_INIT(&data_requests_pending_list_mutex[i], NULL);
 		PTHREAD_COND_INIT(&data_requests_pending_list_cond[i], NULL);
+		
+		starpu_memstrategy_drop_prefetch[i]=0;
 	}
 }
 
@@ -56,6 +62,7 @@ void _starpu_deinit_data_request_lists(void)
 		PTHREAD_COND_DESTROY(&data_requests_list_cond[i]);
 		PTHREAD_MUTEX_DESTROY(&data_requests_list_mutex[i]);
 		starpu_data_request_list_delete(data_requests[i]);
+		starpu_data_request_list_delete(prefetch_requests[i]);
 	}
 }
 
@@ -87,7 +94,8 @@ starpu_data_request_t _starpu_create_data_request(starpu_data_handle handle,
 				struct starpu_data_replicate_s *dst_replicate,
 				uint32_t handling_node,
 				starpu_access_mode mode,
-				unsigned ndeps)
+				unsigned ndeps,
+				unsigned is_prefetch)
 {
 	starpu_data_request_t r = starpu_data_request_new();
 
@@ -99,6 +107,7 @@ starpu_data_request_t _starpu_create_data_request(starpu_data_handle handle,
 	r->mode = mode;
 	r->handling_node = handling_node;
 	r->completed = 0;
+	r->prefetch = is_prefetch;
 	r->retval = -1;
 	r->ndeps = ndeps;
 	r->next_req_count = 0;
@@ -186,7 +195,10 @@ void _starpu_post_data_request(starpu_data_request_t r, uint32_t handling_node)
 
 	/* insert the request in the proper list */
 	PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node]);
-	starpu_data_request_list_push_front(data_requests[handling_node], r);
+	if (r->prefetch)
+		starpu_data_request_list_push_back(prefetch_requests[handling_node], r);
+	else
+		starpu_data_request_list_push_back(data_requests[handling_node], r);
 	PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node]);
 
 #ifndef STARPU_NON_BLOCKING_DRIVERS
@@ -221,8 +233,30 @@ static void starpu_handle_data_request_completion(starpu_data_request_t r)
 	struct starpu_data_replicate_s *src_replicate = r->src_replicate;
 	struct starpu_data_replicate_s *dst_replicate = r->dst_replicate;
 
+
+	starpu_cache_state old_src_replicate_state = src_replicate->state;
 	_starpu_update_data_state(handle, r->dst_replicate, mode);
 
+#ifdef STARPU_MEMORY_STATUS
+	if (src_replicate->state == STARPU_INVALID)
+	{
+		if (old_src_replicate_state == STARPU_OWNER)
+			_starpu_handle_stats_invalidated(handle, src_replicate->memory_node);
+		else 
+		{
+			/* XXX Currently only ex-OWNER are tagged as invalidated */
+			/* XXX Have to check all old state of every node in case a SHARED data become OWNED by the dst_replicate */
+		}
+		
+	}
+	if (dst_replicate->state == STARPU_SHARED)
+		_starpu_handle_stats_loaded_shared(handle, dst_replicate->memory_node);
+	else if (dst_replicate->state == STARPU_OWNER)
+	{
+		_starpu_handle_stats_loaded_owner(handle, dst_replicate->memory_node);
+	}
+#endif
+
 #ifdef STARPU_USE_FXT
 	uint32_t src_node = src_replicate->memory_node;
 	uint32_t dst_node = dst_replicate->memory_node;
@@ -386,6 +420,72 @@ void _starpu_handle_node_data_requests(uint32_t src_node, unsigned may_alloc)
 	starpu_data_request_list_delete(local_list);
 }
 
+void _starpu_handle_node_prefetch_requests(uint32_t src_node, unsigned may_alloc){
+	starpu_memstrategy_drop_prefetch[src_node]=0;
+
+	starpu_data_request_t r;
+
+	/* take all the entries from the request list */
+        PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
+
+	starpu_data_request_list_t local_list = prefetch_requests[src_node];
+	
+	if (starpu_data_request_list_empty(local_list))
+	{
+		/* there is no request */
+                PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
+		return;
+	}
+
+	/* There is an entry: we create a new empty list to replace the list of
+	 * requests, and we handle the request(s) one by one in the former
+	 * list, without concurrency issues.*/
+	prefetch_requests[src_node] = starpu_data_request_list_new();
+
+	PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
+
+	/* for all entries of the list */
+	while (!starpu_data_request_list_empty(local_list))
+	{
+                int res;
+
+		r = starpu_data_request_list_pop_back(local_list);
+
+		res = starpu_handle_data_request(r, may_alloc);
+		if (res == -ENOMEM )
+		{
+			starpu_memstrategy_drop_prefetch[src_node]=1;
+			PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
+			if (r->prefetch)
+				starpu_data_request_list_push_front(prefetch_requests[src_node], r);
+			else 
+			{
+				/* Prefetch request promoted while in tmp list*/
+				starpu_data_request_list_push_front(data_requests[src_node], r);
+			}
+			PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
+			break;
+		}
+
+		/* wake the requesting worker up */
+		// if we do not progress ..
+		// pthread_cond_broadcast(&data_requests_list_cond[src_node]);
+	}
+
+	while(!starpu_data_request_list_empty(local_list) && starpu_memstrategy_drop_prefetch[src_node])
+	{
+		r = starpu_data_request_list_pop_back(local_list);
+		PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
+		if (r->prefetch)
+			starpu_data_request_list_push_back(prefetch_requests[src_node], r);
+		else 
+			starpu_data_request_list_push_front(data_requests[src_node], r);
+		PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
+	}
+
+	starpu_data_request_list_delete(local_list);
+}
+
 static void _handle_pending_node_data_requests(uint32_t src_node, unsigned force)
 {
 //	_STARPU_DEBUG("_starpu_handle_pending_node_data_requests ...\n");
@@ -456,3 +556,36 @@ int _starpu_check_that_no_data_request_exists(uint32_t node)
 
 	return (no_request && no_pending);
 }
+
+
+void _starpu_update_prefetch_status(starpu_data_request_t r){
+	STARPU_ASSERT(r->prefetch > 0);
+	r->prefetch=0;
+	
+	/* We have to promote chained_request too! */
+	unsigned chained_req;
+	for (chained_req = 0; chained_req < r->next_req_count; chained_req++)
+	{
+		struct starpu_data_request_s *next_req = r->next_req[chained_req];
+		_starpu_update_prefetch_status(next_req);		
+	}
+
+	PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[r->handling_node]);
+	
+	/* The request can be in a different list (handling request or the temp list)
+	 * we have to check that it is really in the prefetch list. */
+	starpu_data_request_t r_iter;
+	for (r_iter = starpu_data_request_list_begin(prefetch_requests[r->handling_node]);
+	     r_iter != starpu_data_request_list_end(prefetch_requests[r->handling_node]);
+	     r_iter = starpu_data_request_list_next(r_iter))
+	{
+		
+		if (r==r_iter)
+		{
+			starpu_data_request_list_erase(prefetch_requests[r->handling_node],r);
+			starpu_data_request_list_push_front(data_requests[r->handling_node],r);
+			break;
+		}		
+	}
+	PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[r->handling_node]);
+}

+ 5 - 1
src/datawizard/data_request.h

@@ -47,6 +47,7 @@ LIST_TYPE(starpu_data_request,
 	struct starpu_async_channel async_channel;
 
 	unsigned completed;
+	unsigned prefetch;
 	int retval;
 
 	/* The request will not actually be submitted until there remains
@@ -89,6 +90,7 @@ void _starpu_init_data_request_lists(void);
 void _starpu_deinit_data_request_lists(void);
 void _starpu_post_data_request(starpu_data_request_t r, uint32_t handling_node);
 void _starpu_handle_node_data_requests(uint32_t src_node, unsigned may_alloc);
+void _starpu_handle_node_prefetch_requests(uint32_t src_node, unsigned may_alloc);
 
 void _starpu_handle_pending_node_data_requests(uint32_t src_node);
 void _starpu_handle_all_pending_node_data_requests(uint32_t src_node);
@@ -100,11 +102,13 @@ starpu_data_request_t _starpu_create_data_request(starpu_data_handle handle,
 				struct starpu_data_replicate_s *dst_replicate,
 				uint32_t handling_node,
 				starpu_access_mode mode,
-				unsigned ndeps);
+				unsigned ndeps,
+				unsigned is_prefetch);
 
 int _starpu_wait_data_request_completion(starpu_data_request_t r, unsigned may_alloc);
 
 void _starpu_data_request_append_callback(starpu_data_request_t r,
 			void (*callback_func)(void *), void *callback_arg);
 
+void _starpu_update_prefetch_status(starpu_data_request_t r);
 #endif // __DATA_REQUEST_H__

+ 67 - 0
src/datawizard/datastats.c

@@ -17,6 +17,7 @@
 
 #include <starpu.h>
 #include <datawizard/datastats.h>
+#include <datawizard/coherency.h>
 #include <common/config.h>
 
 #ifdef STARPU_DATA_STATS
@@ -144,3 +145,69 @@ void _starpu_display_comm_amounts(void)
 }
 
 #endif
+
+#ifdef STARPU_MEMORY_STATUS
+void _starpu_display_data_stats(void)
+{
+	unsigned node;
+	for (node = 0; node < STARPU_MAXNODES; node++)
+	{
+		_starpu_display_data_stats_by_node(node);
+	}
+}
+
+void _starpu_display_data_handle_stats(starpu_data_handle handle)
+{
+	unsigned node;
+	
+	fprintf(stderr, "#-----\n");
+	fprintf(stderr, "Data : %p\n", handle);
+	fprintf(stderr, "Size : %d\n", (int)handle->data_size);
+	fprintf(stderr, "\n");
+	
+	fprintf(stderr, "#--\n");
+	fprintf(stderr, "Data access stats\n");
+	fprintf(stderr, "/!\\ Work Underway\n");
+	for (node = 0; node < STARPU_MAXNODES; node++)
+	{
+		if (handle->stats_direct_access[node]+handle->stats_loaded_shared[node]
+		    +handle->stats_invalidated[node]+handle->stats_loaded_owner[node])
+		{
+			fprintf(stderr, "Node #%d\n", node);
+			fprintf(stderr, "\tDirect access : %d\n", handle->stats_direct_access[node]);
+			/* XXX Not Working yet. */
+			if (handle->stats_shared_to_owner[node])
+				fprintf(stderr, "\t\tShared to Owner : %d\n", handle->stats_shared_to_owner[node]);
+			fprintf(stderr, "\tLoaded (Owner) : %d\n", handle->stats_loaded_owner[node]);
+			fprintf(stderr, "\tLoaded (Shared) : %d\n", handle->stats_loaded_shared[node]);
+			fprintf(stderr, "\tInvalidated (was Owner) : %d\n\n", handle->stats_invalidated[node]);
+		}
+	}
+}
+
+void _starpu_handle_stats_cache_hit(starpu_data_handle handle, unsigned node)
+{
+	handle->stats_direct_access[node]++;
+}
+
+void _starpu_handle_stats_loaded_shared(starpu_data_handle handle, unsigned node)
+{
+	handle->stats_loaded_shared[node]++;
+}
+
+void _starpu_handle_stats_loaded_owner(starpu_data_handle handle, unsigned node)
+{
+	handle->stats_loaded_owner[node]++;
+}
+
+void _starpu_handle_stats_shared_to_owner(starpu_data_handle handle, unsigned node)
+{
+	handle->stats_shared_to_owner[node]++;
+}
+
+void _starpu_handle_stats_invalidated(starpu_data_handle handle, unsigned node)
+{
+	handle->stats_invalidated[node]++;
+}
+
+#endif

+ 10 - 0
src/datawizard/datastats.h

@@ -35,4 +35,14 @@ void _starpu_data_allocation_inc_stats(unsigned node __attribute__ ((unused)));
 void _starpu_display_comm_amounts(void);
 void _starpu_display_alloc_cache_stats(void);
 
+void _starpu_display_data_stats();
+void _starpu_display_data_handle_stats(starpu_data_handle handle);
+
+void _starpu_handle_stats_cache_hit(starpu_data_handle handle, unsigned node);
+void _starpu_handle_stats_loaded_shared(starpu_data_handle handle, unsigned node);
+void _starpu_handle_stats_loaded_owner(starpu_data_handle handle, unsigned node);
+void _starpu_handle_stats_shared_to_owner(starpu_data_handle handle, unsigned node);
+void _starpu_handle_stats_invalidated(starpu_data_handle handle, unsigned node);
+
+
 #endif // __DATASTATS_H__

+ 9 - 0
src/datawizard/interfaces/data_interface.c

@@ -226,6 +226,15 @@ static starpu_data_handle _starpu_data_handle_allocate(struct starpu_data_interf
 	unsigned node;
 	for (node = 0; node < STARPU_MAXNODES; node++)
 	{
+#ifdef STARPU_MEMORY_STATUS
+		/* Stats initilization */
+		handle->stats_direct_access[node]=0;
+		handle->stats_loaded_shared[node]=0;
+		handle->stats_shared_to_owner[node]=0;
+		handle->stats_loaded_owner[node]=0;
+		handle->stats_invalidated[node]=0;
+#endif
+
 		struct starpu_data_replicate_s *replicate;
 		replicate = &handle->per_node[node];
 		/* relaxed_coherency = 0 */

+ 171 - 36
src/datawizard/memalloc.c

@@ -19,7 +19,13 @@
 #include <datawizard/footprint.h>
 
 /* This per-node RW-locks protect mc_list and memchunk_cache entries */
-static pthread_rwlock_t mc_rwlock[STARPU_MAXNODES]; 
+static pthread_rwlock_t mc_rwlock[STARPU_MAXNODES];
+
+/* This per-node RW-locks protect lru_list */
+static pthread_rwlock_t lru_rwlock[STARPU_MAXNODES];
+
+/* Last Recently used memory chunkgs */
+static starpu_mem_chunk_lru_list_t starpu_lru_list[STARPU_MAXNODES];
 
 /* Potentially in use memory chunks */
 static starpu_mem_chunk_list_t mc_list[STARPU_MAXNODES];
@@ -27,13 +33,20 @@ static starpu_mem_chunk_list_t mc_list[STARPU_MAXNODES];
 /* Explicitly caches memory chunks that can be reused */
 static starpu_mem_chunk_list_t memchunk_cache[STARPU_MAXNODES];
 
+/* When reclaiming memory to allocate, we reclaim MAX(what_is_to_reclaim_on_device, data_size_coefficient*data_size) */
+const unsigned starpu_memstrategy_data_size_coefficient=2;
+
+static void starpu_lru(unsigned node);
+
 void _starpu_init_mem_chunk_lists(void)
 {
 	unsigned i;
 	for (i = 0; i < STARPU_MAXNODES; i++)
 	{
 		PTHREAD_RWLOCK_INIT(&mc_rwlock[i], NULL);
+		PTHREAD_RWLOCK_INIT(&lru_rwlock[i], NULL);
 		mc_list[i] = starpu_mem_chunk_list_new();
+		starpu_lru_list[i] = starpu_mem_chunk_lru_list_new();
 		memchunk_cache[i] = starpu_mem_chunk_list_new();
 	}
 }
@@ -45,6 +58,7 @@ void _starpu_deinit_mem_chunk_lists(void)
 	{
 		starpu_mem_chunk_list_delete(mc_list[i]);
 		starpu_mem_chunk_list_delete(memchunk_cache[i]);
+		starpu_mem_chunk_lru_list_delete(starpu_lru_list[i]);
 	}
 }
 
@@ -74,11 +88,11 @@ static void unlock_all_subtree(starpu_data_handle handle)
 {
 	if (handle->nchildren == 0)
 	{
-		/* this is a leaf */	
+		/* this is a leaf */
 		_starpu_spin_unlock(&handle->header_lock);
 	}
 	else {
-		/* lock all sub-subtrees children 
+		/* lock all sub-subtrees children
 		 * Note that this is done in the reverse order of the
 		 * lock_all_subtree so that we avoid deadlock */
 		unsigned i;
@@ -96,10 +110,10 @@ static unsigned may_free_subtree(starpu_data_handle handle, unsigned node)
 	uint32_t refcnt = _starpu_get_data_refcnt(handle, node);
 	if (refcnt)
 		return 0;
-	
+
 	if (!handle->nchildren)
 		return 1;
-	
+
 	/* look into all sub-subtrees children */
 	unsigned child;
 	for (child = 0; child < handle->nchildren; child++)
@@ -113,7 +127,7 @@ static unsigned may_free_subtree(starpu_data_handle handle, unsigned node)
 	return 1;
 }
 
-static void transfer_subtree_to_node(starpu_data_handle handle, unsigned src_node, 
+static void transfer_subtree_to_node(starpu_data_handle handle, unsigned src_node,
 						unsigned dst_node)
 {
 	unsigned i;
@@ -157,7 +171,7 @@ static void transfer_subtree_to_node(starpu_data_handle handle, unsigned src_nod
 			for (i = 0; i < STARPU_MAXNODES; i++)
 			{
 				if (handle->per_node[i].state == STARPU_SHARED) {
-					cnt++; 
+					cnt++;
 					last = i;
 				}
 			}
@@ -208,7 +222,7 @@ static size_t free_memory_on_node(starpu_mem_chunk_t mc, uint32_t node)
 #endif
 //	_starpu_spin_lock(&handle->header_lock);
 
-	if (mc->automatically_allocated && 
+	if (mc->automatically_allocated &&
 		(!handle || data_was_deleted || replicate->refcnt == 0))
 	{
 		if (handle && !data_was_deleted)
@@ -253,6 +267,8 @@ static size_t do_free_mem_chunk(starpu_mem_chunk_t mc, unsigned node)
 {
 	size_t size;
 
+	mc->replicate->mc=NULL;
+
 	/* free the actual buffer */
 	size = free_memory_on_node(mc, node);
 
@@ -262,7 +278,7 @@ static size_t do_free_mem_chunk(starpu_mem_chunk_t mc, unsigned node)
 	free(mc->chunk_interface);
 	starpu_mem_chunk_delete(mc);
 
-	return size; 
+	return size;
 }
 
 /* This function is called for memory chunks that are possibly in used (ie. not
@@ -298,22 +314,27 @@ static size_t try_to_free_mem_chunk(starpu_mem_chunk_t mc, unsigned node)
 	else {
 		/* try to lock all the leafs of the subtree */
 		lock_all_subtree(handle);
-	
+
 		/* check if they are all "free" */
 		if (may_free_subtree(handle, node))
 		{
 			STARPU_ASSERT(handle->per_node[node].refcnt == 0);
-	
-			/* in case there was nobody using that buffer, throw it 
+
+			if (handle->per_node[node].state == STARPU_OWNER)
+				_starpu_handle_stats_invalidated(handle, node);
+			/* else XXX Considering only owner to invalidate */
+
+			/* in case there was nobody using that buffer, throw it
 			 * away after writing it back to main memory */
 			transfer_subtree_to_node(handle, node, 0);
-	
+
+			_starpu_handle_stats_loaded_owner(handle, 0);
 			STARPU_ASSERT(handle->per_node[node].refcnt == 0);
-	
+
 			/* now the actual buffer may be freed */
 			freed = do_free_mem_chunk(mc, node);
 		}
-	
+
 		/* unlock the leafs */
 		unlock_all_subtree(handle);
 	}
@@ -355,7 +376,7 @@ static void reuse_mem_chunk(unsigned node, struct starpu_data_replicate_s *new_r
 	mc->data_was_deleted = 0;
 	/* mc->ops, mc->size, mc->footprint and mc->interface should be
  	 * unchanged ! */
-	
+
 	/* reinsert the mem chunk in the list of active memory chunks */
 	if (!is_already_in_mc_list)
 	{
@@ -381,7 +402,7 @@ static unsigned try_to_reuse_mem_chunk(starpu_mem_chunk_t mc, unsigned node, sta
 	{
 		success = 1;
 
-		/* in case there was nobody using that buffer, throw it 
+		/* in case there was nobody using that buffer, throw it
 		 * away after writing it back to main memory */
 		transfer_subtree_to_node(old_data, node, 0);
 
@@ -477,10 +498,10 @@ static unsigned try_to_find_reusable_mem_chunk(unsigned node, starpu_data_handle
  * Free the memory chuncks that are explicitely tagged to be freed. The
  * mc_rwlock[node] rw-lock should be taken prior to calling this function.
  */
-static size_t flush_memchunk_cache(uint32_t node)
+static size_t flush_memchunk_cache(uint32_t node, size_t reclaim)
 {
 	starpu_mem_chunk_t mc, next_mc;
-	
+
 	size_t freed = 0;
 
 	for (mc = starpu_mem_chunk_list_begin(memchunk_cache[node]);
@@ -495,6 +516,8 @@ static size_t flush_memchunk_cache(uint32_t node)
 
 		free(mc->chunk_interface);
 		starpu_mem_chunk_delete(mc);
+		if (reclaim && freed>reclaim)
+			break;
 	}
 
 	return freed;
@@ -506,7 +529,7 @@ static size_t flush_memchunk_cache(uint32_t node)
  * should only be used at the termination of StarPU for instance). The
  * mc_rwlock[node] rw-lock should be taken prior to calling this function.
  */
-static size_t free_potentially_in_use_mc(uint32_t node, unsigned force)
+static size_t free_potentially_in_use_mc(uint32_t node, unsigned force, size_t reclaim)
 {
 	size_t freed = 0;
 
@@ -516,7 +539,7 @@ static size_t free_potentially_in_use_mc(uint32_t node, unsigned force)
 	     mc != starpu_mem_chunk_list_end(mc_list[node]);
 	     mc = next_mc)
 	{
-		/* there is a risk that the memory chunk is freed 
+		/* there is a risk that the memory chunk is freed
 		   before next iteration starts: so we compute the next
 		   element of the list now */
 		next_mc = starpu_mem_chunk_list_next(mc);
@@ -524,8 +547,8 @@ static size_t free_potentially_in_use_mc(uint32_t node, unsigned force)
 		if (!force)
 		{
 			freed += try_to_free_mem_chunk(mc, node);
-			#if 0
-			if (freed > toreclaim)
+			#if 1
+			if (reclaim && freed > reclaim)
 				break;
 			#endif
 		}
@@ -539,17 +562,20 @@ static size_t free_potentially_in_use_mc(uint32_t node, unsigned force)
 	return freed;
 }
 
-static size_t reclaim_memory_generic(uint32_t node, unsigned force)
+static size_t reclaim_memory_generic(uint32_t node, unsigned force, size_t reclaim)
 {
 	size_t freed = 0;
 
 	PTHREAD_RWLOCK_WRLOCK(&mc_rwlock[node]);
 
+	starpu_lru(node);
+
 	/* remove all buffers for which there was a removal request */
-	freed += flush_memchunk_cache(node);
+	freed += flush_memchunk_cache(node, reclaim);
 
 	/* try to free all allocated data potentially in use */
-	freed += free_potentially_in_use_mc(node, force);
+	if (reclaim && freed<reclaim)
+		freed += free_potentially_in_use_mc(node, force, reclaim);
 
 	PTHREAD_RWLOCK_UNLOCK(&mc_rwlock[node]);
 
@@ -564,7 +590,7 @@ static size_t reclaim_memory_generic(uint32_t node, unsigned force)
  */
 size_t _starpu_free_all_automatically_allocated_buffers(uint32_t node)
 {
-	return reclaim_memory_generic(node, 1);
+	return reclaim_memory_generic(node, 1, 0);
 }
 
 static starpu_mem_chunk_t _starpu_memchunk_init(struct starpu_data_replicate_s *replicate, size_t size, size_t interface_size, unsigned automatically_allocated)
@@ -581,8 +607,9 @@ static starpu_mem_chunk_t _starpu_memchunk_init(struct starpu_data_replicate_s *
 	mc->ops = handle->ops;
 	mc->data_was_deleted = 0;
 	mc->automatically_allocated = automatically_allocated;
-	mc->relaxed_coherency = replicate->relaxed_coherency;		
+	mc->relaxed_coherency = replicate->relaxed_coherency;
 	mc->replicate = replicate;
+	mc->replicate->mc = mc;
 
 	/* Save a copy of the interface */
 	mc->chunk_interface = malloc(interface_size);
@@ -602,11 +629,11 @@ static void register_mem_chunk(struct starpu_data_replicate_s *replicate, size_t
 	size_t interface_size = replicate->handle->ops->interface_size;
 
 	/* Put this memchunk in the list of memchunk in use */
-	mc = _starpu_memchunk_init(replicate, size, interface_size, automatically_allocated); 
+	mc = _starpu_memchunk_init(replicate, size, interface_size, automatically_allocated);
 
 	PTHREAD_RWLOCK_WRLOCK(&mc_rwlock[dst_node]);
 
-	starpu_mem_chunk_list_push_front(mc_list[dst_node], mc);
+	starpu_mem_chunk_list_push_back(mc_list[dst_node], mc);
 
 	PTHREAD_RWLOCK_UNLOCK(&mc_rwlock[dst_node]);
 }
@@ -646,6 +673,39 @@ void _starpu_request_mem_chunk_removal(starpu_data_handle handle, unsigned node)
 	PTHREAD_RWLOCK_UNLOCK(&mc_rwlock[node]);
 }
 
+static size_t _starpu_get_global_mem_size(int dst_node)
+{
+	starpu_node_kind kind = _starpu_get_node_kind(dst_node);
+	size_t global_mem_size;
+
+	switch(kind)
+	{
+		case STARPU_CPU_RAM:
+#ifdef STARPU_DEVEL
+#warning to be fixed
+#endif
+			global_mem_size = 64*1024*1024;
+			break;
+#ifdef STARPU_USE_CUDA
+		case STARPU_CUDA_RAM:
+		{
+			int devid = starpu_memory_node_to_devid(dst_node);
+			global_mem_size = starpu_cuda_get_global_mem_size(devid);
+			break;
+		}
+#endif
+#ifdef STARPU_USE_OPENCL
+		case STARPU_OPENCL_RAM:
+		{
+			int devid = starpu_memory_node_to_devid(dst_node);
+			global_mem_size = starpu_opencl_get_global_mem_size(devid);
+			break;
+		}
+#endif
+	}
+	return global_mem_size;
+}
+
 /*
  * In order to allocate a piece of data, we try to reuse existing buffers if
  * its possible.
@@ -658,7 +718,7 @@ void _starpu_request_mem_chunk_removal(starpu_data_handle handle, unsigned node)
  *
  */
 
-static ssize_t _starpu_allocate_interface(starpu_data_handle handle, struct starpu_data_replicate_s *replicate, uint32_t dst_node)
+static ssize_t _starpu_allocate_interface(starpu_data_handle handle, struct starpu_data_replicate_s *replicate, uint32_t dst_node, unsigned is_prefetch)
 {
 	unsigned attempts = 0;
 	ssize_t allocated_memory;
@@ -708,25 +768,33 @@ static ssize_t _starpu_allocate_interface(starpu_data_handle handle, struct star
 
 		if (allocated_memory == -ENOMEM)
 		{
+			_STARPU_DEBUG("needs to reclaim memory\n");
+			size_t reclaim = 0.25*_starpu_get_global_mem_size(dst_node);
+			if (starpu_memstrategy_data_size_coefficient*handle->data_size > reclaim)
+				reclaim = starpu_memstrategy_data_size_coefficient*handle->data_size;
+
 			replicate->refcnt++;
 			_starpu_spin_unlock(&handle->header_lock);
 
 			STARPU_TRACE_START_MEMRECLAIM(dst_node);
-			reclaim_memory_generic(dst_node, 0);
+			if (is_prefetch)
+				flush_memchunk_cache(dst_node, reclaim);
+			else
+				reclaim_memory_generic(dst_node, 0, reclaim);
 			STARPU_TRACE_END_MEMRECLAIM(dst_node);
 
 		        while (_starpu_spin_trylock(&handle->header_lock))
 		                _starpu_datawizard_progress(_starpu_get_local_memory_node(), 0);
-		
+
 			replicate->refcnt--;
 		}
-		
+
 	} while((allocated_memory == -ENOMEM) && attempts++ < 2);
 
 	return allocated_memory;
 }
 
-int _starpu_allocate_memory_on_node(starpu_data_handle handle, struct starpu_data_replicate_s *replicate)
+int _starpu_allocate_memory_on_node(starpu_data_handle handle, struct starpu_data_replicate_s *replicate, unsigned is_prefetch)
 {
 	ssize_t allocated_memory;
 
@@ -739,7 +807,7 @@ int _starpu_allocate_memory_on_node(starpu_data_handle handle, struct starpu_dat
 		return 0;
 
 	STARPU_ASSERT(replicate->data_interface);
-	allocated_memory = _starpu_allocate_interface(handle, replicate, dst_node);
+	allocated_memory = _starpu_allocate_interface(handle, replicate, dst_node, is_prefetch);
 
 	/* perhaps we could really not handle that capacity misses */
 	if (allocated_memory == -ENOMEM)
@@ -766,3 +834,70 @@ unsigned starpu_data_test_if_allocated_on_node(starpu_data_handle handle, uint32
 {
 	return handle->per_node[memory_node].allocated;
 }
+
+void starpu_memchunk_recently_used(starpu_mem_chunk_t mc, unsigned node)
+{
+	PTHREAD_RWLOCK_WRLOCK(&lru_rwlock[node]);
+	starpu_mem_chunk_lru_t mc_lru=starpu_mem_chunk_lru_new();
+	mc_lru->mc=mc;
+	starpu_mem_chunk_lru_list_push_front(starpu_lru_list[node],mc_lru);
+	PTHREAD_RWLOCK_UNLOCK(&lru_rwlock[node]);
+}
+
+/* The mc_rwlock[node] rw-lock should be taken prior to calling this function.*/
+static void starpu_memchunk_recently_used_move(starpu_mem_chunk_t mc, unsigned node)
+{
+	/* XXX Sometimes the memchunk is not in the list... */
+	starpu_mem_chunk_t mc_iter;
+	for (mc_iter = starpu_mem_chunk_list_begin(mc_list[node]);
+	     mc_iter != starpu_mem_chunk_list_end(mc_list[node]);
+	     mc_iter = starpu_mem_chunk_list_next(mc_iter) )
+	{
+		if (mc_iter==mc)
+		{
+			starpu_mem_chunk_list_erase(mc_list[node], mc);
+			starpu_mem_chunk_list_push_back(mc_list[node], mc);
+			return;
+		}
+
+	}
+}
+
+static void starpu_lru(unsigned node)
+{
+	PTHREAD_RWLOCK_WRLOCK(&lru_rwlock[node]);
+	while (!starpu_mem_chunk_lru_list_empty(starpu_lru_list[node]))
+	{
+		starpu_mem_chunk_lru_t mc_lru=starpu_mem_chunk_lru_list_front(starpu_lru_list[node]);
+		starpu_memchunk_recently_used_move(mc_lru->mc, node);
+		starpu_mem_chunk_lru_list_erase(starpu_lru_list[node], mc_lru);
+		starpu_mem_chunk_lru_delete(mc_lru);
+	}
+	PTHREAD_RWLOCK_UNLOCK(&lru_rwlock[node]);
+}
+
+
+#ifdef STARPU_MEMORY_STATUS
+void _starpu_display_data_stats_by_node(int node)
+{
+	PTHREAD_RWLOCK_WRLOCK(&mc_rwlock[node]);
+
+	if (!starpu_mem_chunk_list_empty(mc_list[node]))
+	{
+		fprintf(stderr, "#-------\n");
+		fprintf(stderr, "Data on Node #%d\n",node);
+
+		starpu_mem_chunk_t mc;
+
+		for (mc = starpu_mem_chunk_list_begin(mc_list[node]);
+		     mc != starpu_mem_chunk_list_end(mc_list[node]);
+		     mc = starpu_mem_chunk_list_next(mc))
+		{
+			_starpu_display_data_handle_stats(mc->data);
+		}
+
+	}
+
+	PTHREAD_RWLOCK_UNLOCK(&mc_rwlock[node]);
+}
+#endif

+ 9 - 1
src/datawizard/memalloc.h

@@ -51,9 +51,17 @@ LIST_TYPE(starpu_mem_chunk,
 	struct starpu_data_replicate_s *replicate;
 )
 
+/* LRU list */
+LIST_TYPE(starpu_mem_chunk_lru,
+	starpu_mem_chunk_t mc;
+)
+
 void _starpu_init_mem_chunk_lists(void);
 void _starpu_deinit_mem_chunk_lists(void);
 void _starpu_request_mem_chunk_removal(starpu_data_handle handle, unsigned node);
-int _starpu_allocate_memory_on_node(starpu_data_handle handle, struct starpu_data_replicate_s *replicate);
+int _starpu_allocate_memory_on_node(starpu_data_handle handle, struct starpu_data_replicate_s *replicate, unsigned is_prefetch);
 size_t _starpu_free_all_automatically_allocated_buffers(uint32_t node);
+void starpu_memchunk_recently_used(starpu_mem_chunk_t mc, unsigned node);
+
+void _starpu_display_data_stats_by_node(int node);
 #endif

+ 3 - 2
src/datawizard/progress.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2009, 2010  Université de Bordeaux 1
- * Copyright (C) 2010  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -25,6 +25,7 @@ void _starpu_datawizard_progress(uint32_t memory_node, unsigned may_alloc)
 	/* in case some other driver requested data */
 	_starpu_handle_pending_node_data_requests(memory_node);
 	_starpu_handle_node_data_requests(memory_node, may_alloc);
-
+	_starpu_handle_node_prefetch_requests(memory_node, may_alloc);
 	_starpu_execute_registered_progression_hooks();
 }
+

+ 1 - 1
src/datawizard/user_interactions.c

@@ -31,7 +31,7 @@ int starpu_data_request_allocation(starpu_data_handle handle, uint32_t node)
 
 	STARPU_ASSERT(handle);
 
-	r = _starpu_create_data_request(handle, NULL, &handle->per_node[node], node, 0, 0);
+	r = _starpu_create_data_request(handle, NULL, &handle->per_node[node], node, 0, 0, 0);
 
 	/* we do not increase the refcnt associated to the request since we are
 	 * not waiting for its termination */

+ 13 - 0
src/drivers/cuda/driver_cuda.c

@@ -83,6 +83,19 @@ static void unlimit_gpu_mem_if_needed(int devid)
 	}
 }
 
+size_t starpu_cuda_get_global_mem_size(int devid)
+{
+	cudaError_t cures;
+	struct cudaDeviceProp prop;
+
+	/* Find the size of the memory on the device */
+	cures = cudaGetDeviceProperties(&prop, devid);
+	if (STARPU_UNLIKELY(cures))
+		STARPU_CUDA_REPORT_ERROR(cures);
+
+	return (size_t)prop.totalGlobalMem;
+}
+
 cudaStream_t starpu_cuda_get_local_transfer_stream(void)
 {
 	int worker = starpu_worker_get_id();

+ 10 - 0
src/drivers/opencl/driver_opencl.c

@@ -79,6 +79,16 @@ static void unlimit_gpu_mem_if_needed(int devid)
 	}
 }
 
+size_t starpu_opencl_get_global_mem_size(int devid)
+{
+	cl_int err;
+	cl_ulong totalGlobalMem;
+
+	/* Request the size of the current device's memory */
+	clGetDeviceInfo(devices[devid], CL_DEVICE_GLOBAL_MEM_SIZE, sizeof(totalGlobalMem), &totalGlobalMem, NULL);
+
+	return (size_t)totalGlobalMem;
+}
 
 void starpu_opencl_get_context(int devid, cl_context *context)
 {

+ 6 - 7
tests/datawizard/reclaim.c

@@ -16,7 +16,7 @@
 
 /*
  * This test stress the memory allocation system and should force StarPU to
- * reclaim memory from time to time. 
+ * reclaim memory from time to time.
  */
 
 #include <assert.h>
@@ -41,7 +41,7 @@ static uint64_t get_total_memory_size(void)
 	hwloc_topology_t hwtopology;
 	hwloc_topology_init(&hwtopology);
 	hwloc_topology_load(hwtopology);
-	hwloc_obj_t root = hwloc_get_root_obj(hwtopology);	
+	hwloc_obj_t root = hwloc_get_root_obj(hwtopology);
 
 	return root->memory.total_memory;
 }
@@ -95,8 +95,7 @@ int main(int argc, char **argv)
 	{
 		host_ptr_array[i] = (float *) malloc(BLOCK_SIZE);
 		assert(host_ptr_array[i]);
-		starpu_variable_data_register(&handle_array[i], 0,
-			(uintptr_t)host_ptr_array[i], BLOCK_SIZE);
+		starpu_variable_data_register(&handle_array[i], 0, (uintptr_t)host_ptr_array[i], BLOCK_SIZE);
 		assert(handle_array[i]);
 	}
 
@@ -104,11 +103,11 @@ int main(int argc, char **argv)
 	{
 		struct starpu_task *task = starpu_task_create();
 		task->cl = &dummy_cl;
-		task->buffers[0].handle = handle_array[i%mb];
+		task->buffers[0].handle = handle_array[taskid%mb];
 		task->buffers[0].mode = STARPU_RW;
-		task->buffers[1].handle = handle_array[(i+1)%mb];
+		task->buffers[1].handle = handle_array[(taskid+1)%mb];
 		task->buffers[1].mode = STARPU_R;
-		task->buffers[2].handle = handle_array[(i+2)%mb];
+		task->buffers[2].handle = handle_array[(taskid+2)%mb];
 		task->buffers[2].mode = STARPU_R;
 		starpu_task_submit(task);
 	}