Browse Source

merge trunk 10376

Simon Archipoff 12 years ago
parent
commit
57ef1d0c43

+ 6 - 0
ChangeLog

@@ -36,6 +36,10 @@ New features:
 Small features:
   * Add cl_arg_free field to enable automatic free(cl_arg) on task
     destroy.
+  * New functions starpu_data_acquire_cb_sequential_consistency() and
+    starpu_data_acquire_on_node_cb_sequential_consistency() which allows
+    to enable or disable sequential consistency
+
 
 StarPU 1.1.0 (svn revision xxxx)
 ==============================================
@@ -150,6 +154,8 @@ Small features:
   * New function starpu_get_version() to return as 3 integers the
     release version of StarPU.
   * Enable by default data allocation cache
+  * Explicitly name the non-sleeping-non-running time "Overhead", and use
+    another color in vite traces.
 
 Changes:
   * Rename all filter functions to follow the pattern

+ 20 - 0
doc/doxygen/chapters/api/data_management.doxy

@@ -226,6 +226,20 @@ are not disabled. Contrary to starpu_data_acquire(), this function is
 non-blocking and may be called from task callbacks. Upon successful
 completion, this function returns 0.
 
+\fn int starpu_data_acquire_cb_sequential_consistency(starpu_data_handle_t handle, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency)
+\ingroup API_Data_Management
+Equivalent of starpu_data_acquire_cb() with the possibility of enabling or disabling data dependencies.
+When the data specified in \p handle is available in the appropriate access
+mode, the \p callback function is executed. The application may access
+the requested data during the execution of this \p callback. The \p callback
+function must call starpu_data_release() once the application does not
+need to access the piece of data anymore. Note that implicit data
+dependencies are also enforced by starpu_data_acquire_cb_sequential_consistency() in case they
+are not disabled specifically for the given \p handle or by the parameter \p sequential_consistency.
+Similarly to starpu_data_acquire_cb(), this function is
+non-blocking and may be called from task callbacks. Upon successful
+completion, this function returns 0.
+
 \fn int starpu_data_acquire_on_node(starpu_data_handle_t handle, unsigned node, enum starpu_data_access_mode mode)
 \ingroup API_Data_Management
 This is the same as starpu_data_acquire(), except that the data
@@ -237,6 +251,12 @@ This is the same as starpu_data_acquire_cb(), except that the
 data will be available on the given memory node instead of main
 memory.
 
+\int int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t handle, unsigned node, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency)
+\ingroup API_Data_Management
+This is the same as starpu_data_acquire_cb_sequential_consistency(), except that the
+data will be available on the given memory node instead of main
+memory.
+
 \def STARPU_DATA_ACQUIRE_CB(handle, mode, code)
 \ingroup API_Data_Management
 STARPU_DATA_ACQUIRE_CB() is the same as starpu_data_acquire_cb(),

+ 5 - 5
doc/doxygen/chapters/api/performance_model.doxy

@@ -175,7 +175,7 @@ Used by ::STARPU_HISTORY_BASED and ::STARPU_NL_REGRESSION_BASED,
 records all execution history measures.
 \var starpu_perfmodel_per_arch::regression
 \private
-Used by ::STARPU_HISTORY_BASED and
+Used by ::STARPU_REGRESSION_BASED and
 ::STARPU_NL_REGRESSION_BASED, contains the estimated factors of the
 regression.
 
@@ -195,13 +195,13 @@ mean_n = 1/n sum
 \var starpu_perfmodel_history_entry::deviation
 n dev_n = sum2 - 1/n (sum)^2
 \var starpu_perfmodel_history_entry::sum
-num of samples
+sum of samples (in µs)
 \var starpu_perfmodel_history_entry::sum2
 sum of samples^2
 \var starpu_perfmodel_history_entry::nsample
-todo
+number of samples
 \var starpu_perfmodel_history_entry::footprint
-todo
+data footprint
 \var starpu_perfmodel_history_entry::size
 in bytes
 \var starpu_perfmodel_history_entry::flops
@@ -254,7 +254,7 @@ prints the affinity devices on \p f.
 \fn void starpu_perfmodel_update_history(struct starpu_perfmodel *model, struct starpu_task *task, enum starpu_perfmodel_archtype arch, unsigned cpuid, unsigned nimpl, double measured);
 \ingroup API_Performance_Model
 This feeds the performance model model with an explicit
-measurement measured, in addition to measurements done by StarPU
+measurement measured (in µs), in addition to measurements done by StarPU
 itself. This can be useful when the application already has an
 existing set of measurements done in good conditions, that StarPU
 could benefit from instead of doing on-line measurements. And example

+ 1 - 1
doc/texinfo/chapters/api.texi

@@ -2813,7 +2813,7 @@ Used by @code{STARPU_HISTORY_BASED} and @code{STARPU_NL_REGRESSION_BASED},
 records all execution history measures.
 
 @item @code{struct starpu_perfmodel_regression_model regression}
-Used by @code{STARPU_HISTORY_REGRESION_BASED} and
+Used by @code{STARPU_REGRESSION_BASED} and
 @code{STARPU_NL_REGRESSION_BASED}, contains the estimated factors of the
 regression.
 

+ 3 - 0
include/starpu_data.h

@@ -60,6 +60,9 @@ int starpu_data_acquire(starpu_data_handle_t handle, enum starpu_data_access_mod
 int starpu_data_acquire_on_node(starpu_data_handle_t handle, unsigned node, enum starpu_data_access_mode mode);
 int starpu_data_acquire_cb(starpu_data_handle_t handle, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg);
 int starpu_data_acquire_on_node_cb(starpu_data_handle_t handle, unsigned node, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg);
+int starpu_data_acquire_cb_sequential_consistency(starpu_data_handle_t handle, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency);
+int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t handle, unsigned node, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency);
+
 #ifdef __GCC__
 #  define STARPU_DATA_ACQUIRE_CB(handle, mode, code) do \
 	{ \						\

+ 203 - 104
mpi/src/starpu_mpi.c

@@ -57,18 +57,13 @@ static int posted_requests = 0, newer_requests, barrier_running = 0;
 
 #define _STARPU_MPI_INC_POSTED_REQUESTS(value) { STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
 
-struct _starpu_mpi_envelope
-{
-	ssize_t psize;
-	int mpi_tag;
-};
-
 struct _starpu_mpi_copy_handle
 {
 	starpu_data_handle_t handle;
 	struct _starpu_mpi_envelope *env;
 	int mpi_tag;
 	UT_hash_handle hh;
+	struct _starpu_mpi_req *req;
 };
 
  /********************************************************/
@@ -176,135 +171,170 @@ static void delete_chandle(struct _starpu_mpi_copy_handle *chandle)
 	}
 }
 
-/********************************************************/
-/*                                                      */
-/*  Send/Receive functionalities                        */
-/*                                                      */
-/********************************************************/
-
-static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
-							      int srcdst, int mpi_tag, MPI_Comm comm,
-							      unsigned detached, void (*callback)(void *), void *arg,
-							      enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
-							      enum starpu_data_access_mode mode)
+static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
 {
+	/* Initialize the request structure */
+	req->data_handle = NULL;
 
-	_STARPU_MPI_LOG_IN();
-	struct _starpu_mpi_req *req = calloc(1, sizeof(struct _starpu_mpi_req));
-	STARPU_ASSERT_MSG(req, "Invalid request");
+	req->datatype = NULL;
+	req->ptr = NULL;
+	req->count = -1;
+	req->user_datatype = -1;
 
-	_STARPU_MPI_INC_POSTED_REQUESTS(1);
+	req->srcdst = -1;
+	req->mpi_tag = -1;
+	req->comm = 0;
 
-	/* Initialize the request structure */
-	req->submitted = 0;
-	req->completed = 0;
+	req->func = NULL;
+
+	req->status = NULL;
+	req->request = NULL;
+	req->flag = NULL;
+
+	req->ret = -1;
 	STARPU_PTHREAD_MUTEX_INIT(&req->req_mutex, NULL);
 	STARPU_PTHREAD_COND_INIT(&req->req_cond, NULL);
+	STARPU_PTHREAD_MUTEX_INIT(&req->posted_mutex, NULL);
+	STARPU_PTHREAD_COND_INIT(&req->posted_cond, NULL);
 
-	req->request_type = request_type;
-	req->user_datatype = -1;
-	req->count = -1;
-	req->data_handle = data_handle;
-	req->srcdst = srcdst;
-	req->mpi_tag = mpi_tag;
-	req->comm = comm;
+	req->request_type = UNKNOWN_REQ;
 
-	req->detached = detached;
-	req->callback = callback;
-	req->callback_arg = arg;
+	req->submitted = 0;
+	req->completed = 0;
+	req->posted = 0;
 
-	req->func = func;
+	req->other_request = NULL;
 
-	/* Asynchronously request StarPU to fetch the data in main memory: when
-	 * it is available in main memory, _starpu_mpi_submit_new_mpi_request(req) is called and
-	 * the request is actually submitted */
-	starpu_data_acquire_cb(data_handle, mode, _starpu_mpi_submit_new_mpi_request, (void *)req);
+	req->detached = -1;
+	req->callback = NULL;
+	req->callback_arg = NULL;
 
-	_STARPU_MPI_LOG_OUT();
-	return req;
-}
+	req->size_req = NULL;
+	req->internal_req = NULL;
+	req->is_internal_req = 0;
+	req->envelope = NULL;
+ }
 
-/********************************************************/
-/*                                                      */
-/*  Send functionalities                                */
-/*                                                      */
-/********************************************************/
+ /********************************************************/
+ /*                                                      */
+ /*  Send/Receive functionalities                        */
+ /*                                                      */
+ /********************************************************/
 
-static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
-{
-	_STARPU_MPI_LOG_IN();
+ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
+							       int srcdst, int mpi_tag, MPI_Comm comm,
+							       unsigned detached, void (*callback)(void *), void *arg,
+							       enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
+							       enum starpu_data_access_mode mode)
+ {
+
+	 _STARPU_MPI_LOG_IN();
+	 struct _starpu_mpi_req *req = malloc(sizeof(struct _starpu_mpi_req));
+	 STARPU_ASSERT_MSG(req, "Invalid request");
+
+	 _STARPU_MPI_INC_POSTED_REQUESTS(1);
+
+	 /* Initialize the request structure */
+	 _starpu_mpi_request_init(req);
+	 req->request_type = request_type;
+	 req->data_handle = data_handle;
+	 req->srcdst = srcdst;
+	 req->mpi_tag = mpi_tag;
+	 req->comm = comm;
+	 req->detached = detached;
+	 req->callback = callback;
+	 req->callback_arg = arg;
+	 req->func = func;
+
+	 /* Asynchronously request StarPU to fetch the data in main memory: when
+	  * it is available in main memory, _starpu_mpi_submit_new_mpi_request(req) is called and
+	  * the request is actually submitted */
+	 starpu_data_acquire_cb(data_handle, mode, _starpu_mpi_submit_new_mpi_request, (void *)req);
+
+	 _STARPU_MPI_LOG_OUT();
+	 return req;
+ }
 
-	STARPU_ASSERT_MSG(req->ptr, "Pointer containing data to send is invalid");
+ /********************************************************/
+ /*                                                      */
+ /*  Send functionalities                                */
+ /*                                                      */
+ /********************************************************/
 
-	_STARPU_MPI_DEBUG(2, "post MPI isend request %p type %s tag %d src %d data %p datasize %ld ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
+ {
+	 _STARPU_MPI_LOG_IN();
 
-	_starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, req->count);
+	 STARPU_ASSERT_MSG(req->ptr, "Pointer containing data to send is invalid");
 
-	TRACE_MPI_ISEND_SUBMIT_BEGIN(req->srcdst, req->mpi_tag, 0);
+	 _STARPU_MPI_DEBUG(2, "post MPI isend request %p type %s tag %d src %d data %p datasize %ld ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 
-	req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
-	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %d", req->ret);
+	 _starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, req->count);
 
-	TRACE_MPI_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, 0);
+	 TRACE_MPI_ISEND_SUBMIT_BEGIN(req->srcdst, req->mpi_tag, 0);
 
-	/* somebody is perhaps waiting for the MPI request to be posted */
-	STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
-	req->submitted = 1;
-	STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
+	 req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
+	 STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %d", req->ret);
 
-	_starpu_mpi_handle_detached_request(req);
+	 TRACE_MPI_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, 0);
 
-	_STARPU_MPI_LOG_OUT();
-}
+	 /* somebody is perhaps waiting for the MPI request to be posted */
+	 STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
+	 req->submitted = 1;
+	 STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
+	 STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
 
-static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
-{
-	_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
+	 _starpu_mpi_handle_detached_request(req);
+
+	 _STARPU_MPI_LOG_OUT();
+ }
 
-	struct _starpu_mpi_envelope* env = calloc(1,sizeof(struct _starpu_mpi_envelope));
+ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
+ {
+	_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
 
-	env->mpi_tag = req->mpi_tag;
+	req->envelope = calloc(1,sizeof(struct _starpu_mpi_envelope));
+	req->envelope->mpi_tag = req->mpi_tag;
 
 	if (req->user_datatype == 0)
 	{
 		req->count = 1;
 		req->ptr = starpu_data_get_local_ptr(req->data_handle);
 
-		env->psize = (ssize_t)req->count;
+		req->envelope->psize = (ssize_t)req->count;
 
 		_STARPU_MPI_DEBUG(1, "Post MPI isend count (%ld) datatype_size %ld request to %d with tag %d\n",req->count,starpu_data_get_size(req->data_handle),req->srcdst, _starpu_mpi_tag);
-		MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
+		MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
 	}
 	else
 	{
 		int ret;
 
  		// Do not pack the data, just try to find out the size
-		starpu_data_pack(req->data_handle, NULL, &(env->psize));
+		starpu_data_pack(req->data_handle, NULL, &(req->envelope->psize));
 
-		if (env->psize != -1)
+		if (req->envelope->psize != -1)
  		{
  			// We already know the size of the data, let's send it to overlap with the packing of the data
-			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", env->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
-			req->count = env->psize;
-			ret = MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
+			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", req->envelope->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
+			req->count = req->envelope->psize;
+			ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
 			STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
  		}
 
  		// Pack the data
  		starpu_data_pack(req->data_handle, &req->ptr, &req->count);
-		if (env->psize == -1)
+		if (req->envelope->psize == -1)
  		{
  			// We know the size now, let's send it
-			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", env->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
-			ret = MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
+			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", req->envelope->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
+			ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
 			STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
  		}
  		else
  		{
  			// We check the size returned with the 2 calls to pack is the same
-			STARPU_ASSERT_MSG(req->count == env->psize, "Calls to pack_data returned different sizes %ld != %ld", req->count, env->psize);
+			STARPU_ASSERT_MSG(req->count == req->envelope->psize, "Calls to pack_data returned different sizes %ld != %ld", req->count, req->envelope->psize);
  		}
 		// We can send the data now
 	}
@@ -400,6 +430,13 @@ int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
 	_STARPU_MPI_LOG_IN();
 	STARPU_ASSERT_MSG(public_req, "starpu_mpi_irecv needs a valid starpu_mpi_req");
 
+	// We check if a tag is defined for the data handle, if not,
+	// we define the one given for the communication.
+	// A tag is necessary for the internal mpi engine.
+	int tag = starpu_data_get_tag(data_handle);
+	if (tag == -1)
+		starpu_data_set_tag(data_handle, mpi_tag);
+
 	struct _starpu_mpi_req *req;
 	req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, NULL, NULL);
 
@@ -413,7 +450,16 @@ int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
 int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 {
 	_STARPU_MPI_LOG_IN();
+
+	// We check if a tag is defined for the data handle, if not,
+	// we define the one given for the communication.
+	// A tag is necessary for the internal mpi engine.
+	int tag = starpu_data_get_tag(data_handle);
+	if (tag == -1)
+		starpu_data_set_tag(data_handle, mpi_tag);
+
 	_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg);
+
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 }
@@ -421,8 +467,15 @@ int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int
 int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, MPI_Status *status)
 {
 	starpu_mpi_req req;
-
 	_STARPU_MPI_LOG_IN();
+
+	// We check if a tag is defined for the data handle, if not,
+	// we define the one given for the communication.
+	// A tag is necessary for the internal mpi engine.
+	int tag = starpu_data_get_tag(data_handle);
+	if (tag == -1)
+		starpu_data_set_tag(data_handle, mpi_tag);
+
 	starpu_mpi_irecv(data_handle, &req, source, mpi_tag, comm);
 	starpu_mpi_wait(&req, status);
 
@@ -457,8 +510,11 @@ int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
 {
 	_STARPU_MPI_LOG_IN();
 	int ret;
-	struct _starpu_mpi_req *waiting_req = calloc(1, sizeof(struct _starpu_mpi_req));
+
+	struct _starpu_mpi_req *waiting_req = malloc(sizeof(struct _starpu_mpi_req));
+	_starpu_mpi_request_init(waiting_req);
 	STARPU_ASSERT_MSG(waiting_req, "Allocation failed");
+
 	struct _starpu_mpi_req *req = *public_req;
 
 	_STARPU_MPI_INC_POSTED_REQUESTS(1);
@@ -549,9 +605,9 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 
 	if (submitted)
 	{
-		struct _starpu_mpi_req *testing_req = calloc(1, sizeof(struct _starpu_mpi_req));
+		struct _starpu_mpi_req *testing_req = malloc(sizeof(struct _starpu_mpi_req));
 		STARPU_ASSERT_MSG(testing_req, "allocation failed");
-		//		memset(testing_req, 0, sizeof(struct _starpu_mpi_req));
+		_starpu_mpi_request_init(testing_req);
 
 		/* Initialize the request structure */
 		STARPU_PTHREAD_MUTEX_INIT(&(testing_req->req_mutex), NULL);
@@ -615,8 +671,9 @@ int starpu_mpi_barrier(MPI_Comm comm)
 {
 	_STARPU_MPI_LOG_IN();
 	int ret;
-	struct _starpu_mpi_req *barrier_req = calloc(1, sizeof(struct _starpu_mpi_req));
+	struct _starpu_mpi_req *barrier_req = malloc(sizeof(struct _starpu_mpi_req));
 	STARPU_ASSERT_MSG(barrier_req, "allocation failed");
+	_starpu_mpi_request_init(barrier_req);
 
 	/* First wait for *both* all tasks and MPI requests to finish, in case
 	 * some tasks generate MPI requests, MPI requests generate tasks, etc.
@@ -681,6 +738,7 @@ static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type
 		case WAIT_REQ: return "WAIT_REQ";
 		case TEST_REQ: return "TEST_REQ";
 		case BARRIER_REQ: return "BARRIER_REQ";
+		case UNKNOWN_REQ: return "UNSET_REQ";
 		default: return "unknown request type";
 		}
 }
@@ -725,12 +783,25 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 			}
 			else
 			{
+				_STARPU_MPI_DEBUG(3, "NOT deleting chandle %p from hashmap (tag %d %d)\n", chandle, req->mpi_tag, starpu_data_get_tag(req->data_handle));
 				_starpu_mpi_handle_free_datatype(req->data_handle, &req->datatype);
 			}
 		}
 		starpu_data_release(req->data_handle);
 	}
 
+	if (req->envelope)
+	{
+		free(req->envelope);
+		req->envelope = NULL;
+	}
+
+	if (req->internal_req)
+	{
+		free(req->internal_req);
+		req->internal_req = NULL;
+	}
+
 	/* Execute the specified callback, if any */
 	if (req->callback)
 		req->callback(req->callback_arg);
@@ -755,6 +826,11 @@ static void _starpu_mpi_copy_cb(void* arg)
 {
 	struct _starpu_mpi_copy_cb_args *args = arg;
 
+	// We store in the application request the internal MPI
+	// request so that it can be used by starpu_mpi_wait
+	args->req->request = args->req->internal_req->request;
+	args->req->submitted = 1;
+
 	struct starpu_data_interface_ops *itf = starpu_data_get_interface_ops(args->copy_handle);
 	void* itf_src = starpu_data_get_interface_on_node(args->copy_handle,0);
 	void* itf_dst = starpu_data_get_interface_on_node(args->data_handle,0);
@@ -777,7 +853,11 @@ static void _starpu_mpi_copy_cb(void* arg)
 	starpu_data_unregister_submit(args->copy_handle);
 
 	_STARPU_MPI_DEBUG(3, "Done, handling request %p termination of the already received request\n",args->req);
-	_starpu_mpi_handle_request_termination(args->req);
+	if (args->req->detached)
+		_starpu_mpi_handle_request_termination(args->req);
+	// else: If the request is not detached its termination will
+	// be handled when calling starpu_mpi_wait
+
 
 	free(args);
 }
@@ -789,6 +869,8 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
 
 	_STARPU_MPI_INC_POSTED_REQUESTS(-1);
 
+	_STARPU_MPI_DEBUG(3, "calling _starpu_mpi_submit_new_mpi_request with req %p tag %d and type %s\n", req, req->mpi_tag, _starpu_mpi_request_type(req->request_type));
+
 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 
 	if (req->request_type == RECV_REQ)
@@ -804,6 +886,8 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
 		{
 			_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->mpi_tag);
 
+			req->internal_req = chandle->req;
+
 			struct _starpu_mpi_copy_cb_args *cb_args = malloc(sizeof(struct _starpu_mpi_copy_cb_args));
 			cb_args->data_handle = req->data_handle;
 			cb_args->copy_handle = chandle->handle;
@@ -835,9 +919,16 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
 					STARPU_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld\n", req->count);
 				}
 
+				_STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 				_starpu_mpi_req_list_push_front(new_requests, req);
 
-				_STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+				/* inform the starpu mpi thread that the request has beenbe pushed in the new_requests list */
+				STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+				STARPU_PTHREAD_MUTEX_LOCK(&req->posted_mutex);
+				req->posted = 1;
+				STARPU_PTHREAD_COND_BROADCAST(&req->posted_cond);
+				STARPU_PTHREAD_MUTEX_UNLOCK(&req->posted_mutex);
+				STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 			}
 			/* Case : a classic receive request with no send received earlier than expected.
 			 * We just add the pending receive request to the requests' hashmap. */
@@ -931,7 +1022,8 @@ static void _starpu_mpi_test_detached_requests(void)
 		if (flag)
 		{
 			_starpu_mpi_req_list_erase(detached_requests, req);
-			free(req);
+			if (!req->is_internal_req)
+				free(req);
 		}
 
 	}
@@ -1041,13 +1133,11 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
  	struct _starpu_mpi_envelope *recv_env = calloc(1,sizeof(struct _starpu_mpi_envelope));
 
- 	MPI_Request header_req;
  	int header_req_submitted = 0;
 
 	while (running || posted_requests || !(_starpu_mpi_req_list_empty(new_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
 	{
 		/* shall we block ? */
-		_STARPU_MPI_DEBUG(3, "HASH_COUNT(_starpu_mpi_req_hashmap) = %d\n",HASH_COUNT(_starpu_mpi_req_hashmap));
 		unsigned block = _starpu_mpi_req_list_empty(new_requests) && (HASH_COUNT(_starpu_mpi_req_hashmap) == 0);
 
 #ifndef STARPU_MPI_ACTIVITY
@@ -1085,11 +1175,11 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 		/* If there is no currently submitted header_req submitted to catch envelopes from senders, and there is some pending receive
 		 * requests in our side, we resubmit a header request. */
-		if ((HASH_COUNT(_starpu_mpi_req_hashmap) > 0) && (header_req_submitted == 0) && (HASH_COUNT(_starpu_mpi_copy_handle_hashmap) == 0))
+		MPI_Request header_req;
+		if ((HASH_COUNT(_starpu_mpi_req_hashmap) > 0) && (header_req_submitted == 0))// && (HASH_COUNT(_starpu_mpi_copy_handle_hashmap) == 0))
 		{
+			_STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop\n");
 			MPI_Irecv(recv_env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _starpu_mpi_tag, MPI_COMM_WORLD, &header_req);
-
-			_STARPU_MPI_DEBUG(3, "Submit of header_req OK!\n");
 			header_req_submitted = 1;
 		}
 
@@ -1102,7 +1192,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		{
 			int flag,res;
 			MPI_Status status;
-			_STARPU_MPI_DEBUG(3, "Test of header_req\n");
+			_STARPU_MPI_DEBUG(4, "Test of header_req\n");
 
 			/* test whether an envelope has arrived. */
 			res = MPI_Test(&header_req, &flag, &status);
@@ -1110,9 +1200,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 			if (flag)
 			{
-				_STARPU_MPI_DEBUG(3, "header_req received !\n");
-
-				_STARPU_MPI_DEBUG(3, "Searching for request with tag %d, size %ld ..\n",recv_env->mpi_tag, recv_env->psize);
+				_STARPU_MPI_DEBUG(3, "Searching for request with tag %d (size %ld)\n", recv_env->mpi_tag, recv_env->psize);
 
 				struct _starpu_mpi_req *found_req = find_req(recv_env->mpi_tag);
 
@@ -1127,7 +1215,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 					while(!(data_handle))
 					{
+						STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 						data_handle = starpu_data_get_data_handle_from_tag(recv_env->mpi_tag);
+						STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 					}
 					STARPU_ASSERT(data_handle);
 
@@ -1139,12 +1229,21 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 					starpu_data_register_same(&chandle->handle, data_handle);
 					add_chandle(chandle);
 
-					_STARPU_MPI_DEBUG(3, "Posting internal starpu_irecv_detached on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
-
-					res = starpu_mpi_irecv_detached(chandle->handle,status.MPI_SOURCE,chandle->mpi_tag,MPI_COMM_WORLD,NULL,NULL);
-					STARPU_ASSERT(res == MPI_SUCCESS);
+					_STARPU_MPI_DEBUG(3, "Posting internal detached irecv on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
+					chandle->req = _starpu_mpi_irecv_common(chandle->handle, status.MPI_SOURCE, chandle->mpi_tag, MPI_COMM_WORLD, 1, NULL, NULL);
+					chandle->req->is_internal_req = 1;
 
-					_STARPU_MPI_DEBUG(3, "Success of starpu_irecv_detached on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
+					// We wait until the request is pushed in the
+					// new_request list, that ensures that the next loop
+					// will call _starpu_mpi_handle_new_request
+					// on the request and post the corresponding mpi_irecv,
+					// otherwise, it may lead to read data as envelop
+					STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+					STARPU_PTHREAD_MUTEX_LOCK(&(chandle->req->posted_mutex));
+					while (!(chandle->req->posted))
+					     STARPU_PTHREAD_COND_WAIT(&(chandle->req->posted_cond), &(chandle->req->posted_mutex));
+					STARPU_PTHREAD_MUTEX_UNLOCK(&(chandle->req->posted_mutex));
+					STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 				}
 				/* Case : a matching receive has been found for the incoming data, we handle the correct allocation of the pointer associated to
 				 * the data handle, then submit the corresponding receive with _starpu_mpi_handle_new_request. */
@@ -1181,7 +1280,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 			}
 			else
 			{
-				_STARPU_MPI_DEBUG(3, "Nothing received, continue ..\n");
+				_STARPU_MPI_DEBUG(4, "Nothing received, continue ..\n");
 			}
 		}
 	}

+ 1 - 0
mpi/src/starpu_mpi_datatype.c

@@ -227,5 +227,6 @@ char *_starpu_mpi_datatype(MPI_Datatype datatype)
      if (datatype == MPI_INTEGER4) return "MPI_INTEGER4";
      if (datatype == MPI_INTEGER8) return "MPI_INTEGER8";
      if (datatype == MPI_PACKED) return "MPI_PACKED";
+     if (datatype == 0) return "Unknown datatype";
      return "User defined MPI Datatype";
 }

+ 9 - 3
mpi/src/starpu_mpi_private.c

@@ -18,12 +18,18 @@
 #include <starpu_mpi_private.h>
 
 int _debug_rank=-1;
-int _debug_level=0;
+int _debug_level_min=0;
+int _debug_level_max=0;
 int _starpu_mpi_tag = 42;
 
-void _starpu_mpi_set_debug_level(int level)
+void _starpu_mpi_set_debug_level_min(int level)
 {
-	_debug_level = level;
+	_debug_level_min = level;
+}
+
+void _starpu_mpi_set_debug_level_max(int level)
+{
+	_debug_level_max = level;
 }
 
 int starpu_mpi_get_communication_tag(void)

+ 28 - 8
mpi/src/starpu_mpi_private.h

@@ -31,18 +31,20 @@ extern "C" {
 
 #ifdef STARPU_VERBOSE
 extern int _debug_rank;
-extern int _debug_level;
-void _starpu_mpi_set_debug_level(int level);
+extern int _debug_level_min;
+extern int _debug_level_max;
+void _starpu_mpi_set_debug_level_min(int level);
+void _starpu_mpi_set_debug_level_max(int level);
 #endif
 
 #ifdef STARPU_VERBOSE
 #  define _STARPU_MPI_DEBUG(level, fmt, ...) \
 	do \
 	{								\
-		if (!getenv("STARPU_SILENT") && level <= _debug_level)	\
+		if (!getenv("STARPU_SILENT") && _debug_level_min <= level && level <= _debug_level_max)	\
 		{							\
 			if (_debug_rank == -1) MPI_Comm_rank(MPI_COMM_WORLD, &_debug_rank); \
-			fprintf(stderr, "%*s[%d][starpu_mpi][%s] " fmt , (_debug_rank+1)*4, "", _debug_rank, __starpu_func__ ,## __VA_ARGS__); \
+			fprintf(stderr, "%*s[%d][starpu_mpi][%s:%d] " fmt , (_debug_rank+1)*4, "", _debug_rank, __starpu_func__ , __LINE__,## __VA_ARGS__); \
 			fflush(stderr); \
 		}			\
 	} while(0);
@@ -52,17 +54,17 @@ void _starpu_mpi_set_debug_level(int level);
 
 #define _STARPU_MPI_DISP(fmt, ...) do { if (!getenv("STARPU_SILENT")) { \
 	       				     if (_debug_rank == -1) MPI_Comm_rank(MPI_COMM_WORLD, &_debug_rank); \
-                                             fprintf(stderr, "%*s[%d][starpu_mpi][%s] " fmt , (_debug_rank+1)*4, "", _debug_rank, __starpu_func__ ,## __VA_ARGS__); \
+                                             fprintf(stderr, "%*s[%d][starpu_mpi][%s:%d] " fmt , (_debug_rank+1)*4, "", _debug_rank, __starpu_func__ , __LINE__ ,## __VA_ARGS__); \
                                              fflush(stderr); }} while(0);
 
 #ifdef STARPU_VERBOSE0
 #  define _STARPU_MPI_LOG_IN()             do { if (!getenv("STARPU_SILENT")) { \
                                                if (_debug_rank == -1) MPI_Comm_rank(MPI_COMM_WORLD, &_debug_rank);                        \
-                                               fprintf(stderr, "%*s[%d][starpu_mpi][%s] -->\n", (_debug_rank+1)*4, "", _debug_rank, __starpu_func__ ); \
+                                               fprintf(stderr, "%*s[%d][starpu_mpi][%s:%d] -->\n", (_debug_rank+1)*4, "", _debug_rank, __starpu_func__ , __LINE__); \
                                                fflush(stderr); }} while(0)
 #  define _STARPU_MPI_LOG_OUT()            do { if (!getenv("STARPU_SILENT")) { \
                                                if (_debug_rank == -1) MPI_Comm_rank(MPI_COMM_WORLD, &_debug_rank);                        \
-                                               fprintf(stderr, "%*s[%d][starpu_mpi][%s] <--\n", (_debug_rank+1)*4, "", _debug_rank, __starpu_func__ ); \
+                                               fprintf(stderr, "%*s[%d][starpu_mpi][%s:%d] <--\n", (_debug_rank+1)*4, "", _debug_rank, __starpu_func__, __LINE__ ); \
                                                fflush(stderr); }} while(0)
 #else
 #  define _STARPU_MPI_LOG_IN()
@@ -78,9 +80,18 @@ enum _starpu_mpi_request_type
 	WAIT_REQ=2,
 	TEST_REQ=3,
 	BARRIER_REQ=4,
-	PROBE_REQ=5
+	PROBE_REQ=5,
+	UNKNOWN_REQ=6,
 };
 
+struct _starpu_mpi_envelope
+{
+	ssize_t psize;
+	int mpi_tag;
+};
+
+struct _starpu_mpi_req;
+
 LIST_TYPE(_starpu_mpi_req,
 	/* description of the data at StarPU level */
 	starpu_data_handle_t data_handle;
@@ -106,10 +117,14 @@ LIST_TYPE(_starpu_mpi_req,
 	starpu_pthread_mutex_t req_mutex;
 	starpu_pthread_cond_t req_cond;
 
+	starpu_pthread_mutex_t posted_mutex;
+	starpu_pthread_cond_t posted_cond;
+
 	enum _starpu_mpi_request_type request_type; /* 0 send, 1 recv */
 
 	unsigned submitted;
 	unsigned completed;
+	unsigned posted;
 
 	UT_hash_handle hh;
 
@@ -124,6 +139,11 @@ LIST_TYPE(_starpu_mpi_req,
 
         /* in the case of user-defined datatypes, we need to send the size of the data */
 	MPI_Request size_req;
+
+        struct _starpu_mpi_envelope* envelope;
+
+	int is_internal_req;
+	struct _starpu_mpi_req *internal_req;
 );
 
 #ifdef __cplusplus

+ 12 - 0
mpi/tests/Makefile.am

@@ -80,10 +80,13 @@ starpu_mpi_TESTS =				\
 	pingpong				\
 	mpi_test				\
 	mpi_isend				\
+	mpi_earlyrecv				\
+	mpi_earlyrecv2				\
 	mpi_irecv				\
 	mpi_isend_detached			\
 	mpi_irecv_detached			\
 	mpi_detached_tag			\
+	mpi_redux				\
 	ring					\
 	ring_async				\
 	ring_async_implicit			\
@@ -104,10 +107,13 @@ noinst_PROGRAMS =				\
 	pingpong				\
 	mpi_test				\
 	mpi_isend				\
+	mpi_earlyrecv				\
+	mpi_earlyrecv2				\
 	mpi_irecv				\
 	mpi_isend_detached			\
 	mpi_irecv_detached			\
 	mpi_detached_tag			\
+	mpi_redux				\
 	ring					\
 	ring_async				\
 	ring_async_implicit			\
@@ -126,6 +132,10 @@ noinst_PROGRAMS =				\
 
 mpi_isend_LDADD =					\
 	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
+mpi_earlyrecv_LDADD =					\
+	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
+mpi_earlyrecv2_LDADD =					\
+	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
 mpi_irecv_LDADD =					\
 	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
 mpi_isend_detached_LDADD =			\
@@ -134,6 +144,8 @@ mpi_irecv_detached_LDADD =			\
 	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
 mpi_detached_tag_LDADD =				\
 	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
+mpi_redux_LDADD =					\
+	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
 pingpong_LDADD =					\
 	../src/libstarpumpi-@STARPU_EFFECTIVE_VERSION@.la
 mpi_test_LDADD =					\

+ 102 - 0
mpi/tests/mpi_earlyrecv.c

@@ -0,0 +1,102 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2009, 2010  Université de Bordeaux 1
+ * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu_mpi.h>
+#include "helper.h"
+#include <unistd.h>
+
+//#define NB 1000
+#define NB 10
+
+int main(int argc, char **argv)
+{
+	int ret, rank, size, i, nb_requests;
+	starpu_data_handle_t tab_handle[NB];
+	starpu_mpi_req request[NB];
+
+	MPI_Init(NULL, NULL);
+	MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+	MPI_Comm_size(MPI_COMM_WORLD, &size);
+
+	if (size%2 != 0)
+	{
+		if (rank == 0)
+			FPRINTF(stderr, "We need a even number of processes.\n");
+
+		MPI_Finalize();
+		return STARPU_TEST_SKIPPED;
+	}
+
+	ret = starpu_init(NULL);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+	ret = starpu_mpi_init(NULL, NULL, 0);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init");
+
+	for(i=0 ; i<NB ; i++)
+	{
+		starpu_variable_data_register(&tab_handle[i], 0, (uintptr_t)&rank, sizeof(int));
+		starpu_data_set_tag(tab_handle[i], i);
+		request[i] = NULL;
+	}
+
+	int other_rank = rank%2 == 0 ? rank+1 : rank-1;
+
+	fprintf(stderr, "rank %d exchanging with rank %d\n", rank, other_rank);
+
+	if (rank%2)
+	{
+		starpu_mpi_isend(tab_handle[0], &request[0], other_rank, 0, MPI_COMM_WORLD);
+		starpu_mpi_recv(tab_handle[2], other_rank, 2, MPI_COMM_WORLD, NULL);
+		starpu_mpi_isend(tab_handle[1], &request[1], other_rank, 1, MPI_COMM_WORLD);
+		nb_requests = 2;
+	}
+	else
+	{
+		starpu_mpi_irecv(tab_handle[0], &request[0], other_rank, 0, MPI_COMM_WORLD);
+		starpu_mpi_irecv(tab_handle[1], &request[1], other_rank, 1, MPI_COMM_WORLD);
+		starpu_mpi_isend(tab_handle[2], &request[2], other_rank, 2, MPI_COMM_WORLD);
+		nb_requests = 3;
+	}
+
+	int finished=0;
+	while (!finished)
+	{
+		for(i=0 ; i<nb_requests ; i++)
+		{
+			if (request[i])
+			{
+				int flag;
+				MPI_Status status;
+				starpu_mpi_test(&request[i], &flag, &status);
+				if (flag)
+					fprintf(stderr, "request[%d] = %d %p\n", i, flag, request[i]);
+			}
+		}
+		finished = request[0] == NULL;
+		for(i=1 ; i<nb_requests ; i++) finished = finished && request[i] == NULL;
+	}
+
+	for(i=0 ; i<NB ; i++)
+		starpu_data_unregister(tab_handle[i]);
+
+	starpu_mpi_shutdown();
+	starpu_shutdown();
+
+	MPI_Finalize();
+
+	return 0;
+}

+ 94 - 0
mpi/tests/mpi_earlyrecv2.c

@@ -0,0 +1,94 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2009, 2010  Université de Bordeaux 1
+ * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu_mpi.h>
+#include "helper.h"
+#include <unistd.h>
+
+//#define NB 1000
+#define NB 10
+
+int main(int argc, char **argv)
+{
+	int ret, rank, size, i;
+	starpu_data_handle_t tab_handle[NB];
+
+	MPI_Init(NULL, NULL);
+	MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+	MPI_Comm_size(MPI_COMM_WORLD, &size);
+
+	if (size%2 != 0)
+	{
+		if (rank == 0)
+			FPRINTF(stderr, "We need a even number of processes.\n");
+
+		MPI_Finalize();
+		return STARPU_TEST_SKIPPED;
+	}
+
+	ret = starpu_init(NULL);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+	ret = starpu_mpi_init(NULL, NULL, 0);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init");
+
+	for(i=0 ; i<NB ; i++)
+	{
+		starpu_variable_data_register(&tab_handle[i], 0, (uintptr_t)&rank, sizeof(int));
+		starpu_data_set_tag(tab_handle[i], i);
+	}
+
+	int other_rank = rank%2 == 0 ? rank+1 : rank-1;
+
+	if (rank%2)
+	{
+		starpu_mpi_send(tab_handle[0], other_rank, 0, MPI_COMM_WORLD);
+		starpu_mpi_send(tab_handle[NB-1], other_rank, NB-1, MPI_COMM_WORLD);
+		for(i=1 ; i<NB-1 ; i++)
+		{
+			starpu_mpi_send(tab_handle[i], other_rank, i, MPI_COMM_WORLD);
+		}
+	}
+	else
+	{
+		starpu_mpi_req req[NB];
+		memset(req, 0, NB*sizeof(starpu_mpi_req));
+
+		starpu_mpi_irecv(tab_handle[0], &req[0], other_rank, 0, MPI_COMM_WORLD);
+		STARPU_ASSERT(req[0] != NULL);
+		// We sleep to make sure that the data for the tag 9 will be received before the recv is posted
+		usleep(2000000);
+		for(i=1 ; i<NB ; i++)
+		{
+			starpu_mpi_irecv(tab_handle[i], &req[i], other_rank, i, MPI_COMM_WORLD);
+			STARPU_ASSERT(req[i] != NULL);
+		}
+		for(i=0 ; i<NB ; i++)
+		{
+			starpu_mpi_wait(&req[i], NULL);
+		}
+	}
+
+	for(i=0 ; i<NB ; i++)
+		starpu_data_unregister(tab_handle[i]);
+
+	starpu_mpi_shutdown();
+	starpu_shutdown();
+
+	MPI_Finalize();
+
+	return 0;
+}

+ 107 - 0
mpi/tests/mpi_redux.c

@@ -0,0 +1,107 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu_mpi.h>
+#include "helper.h"
+
+static starpu_pthread_mutex_t mutex = STARPU_PTHREAD_MUTEX_INITIALIZER;
+static starpu_pthread_cond_t cond = STARPU_PTHREAD_COND_INITIALIZER;
+
+void callback(void *arg)
+{
+	unsigned *received = arg;
+
+	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+	*received = *received + 1;
+	fprintf(stderr, "received = %d\n", *received);
+	STARPU_PTHREAD_COND_SIGNAL(&cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+}
+
+int main(int argc, char **argv)
+{
+	int ret, rank, size, sum;
+	int value=0;
+	starpu_data_handle_t *handles;
+
+	MPI_Init(NULL, NULL);
+	MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+	MPI_Comm_size(MPI_COMM_WORLD, &size);
+
+	sum = ((size-1) * (size) / 2);
+
+	ret = starpu_init(NULL);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+	ret = starpu_mpi_init(NULL, NULL, 0);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init");
+
+	if (rank == 0)
+	{
+		int src;
+		int received = 1;
+
+		handles = malloc(size * sizeof(starpu_data_handle_t));
+
+		for(src=1 ; src<size ; src++)
+		{
+			starpu_variable_data_register(&handles[src], -1, (uintptr_t)NULL, sizeof(int));
+			starpu_mpi_irecv_detached(handles[src], src, 12+src, MPI_COMM_WORLD, callback, &received);
+		}
+
+		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+		while (received != size)
+			STARPU_PTHREAD_COND_WAIT(&cond, &mutex);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+
+		for(src=1 ; src<size ; src++)
+		{
+			void *ptr = starpu_data_get_local_ptr(handles[src]);
+			value += *((int *)ptr);
+			starpu_data_unregister(handles[src]);
+		}
+
+		for(src=1 ; src<size ; src++)
+		{
+			starpu_variable_data_register(&handles[src], 0, (uintptr_t)&sum, sizeof(int));
+			starpu_mpi_send(handles[src], src, 12+src, MPI_COMM_WORLD);
+			starpu_data_unregister(handles[src]);
+		}
+	}
+	else
+	{
+		value = rank;
+		handles = malloc(sizeof(starpu_data_handle_t));
+		starpu_variable_data_register(&handles[0], 0, (uintptr_t)&value, sizeof(int));
+		starpu_mpi_send(handles[0], 0, 12+rank, MPI_COMM_WORLD);
+		starpu_data_unregister_submit(handles[0]);
+
+		starpu_variable_data_register(&handles[0], 0, (uintptr_t)&value, sizeof(int));
+		starpu_mpi_recv(handles[0], 0, 12+rank, MPI_COMM_WORLD, NULL);
+		starpu_data_unregister(handles[0]);
+	}
+
+	starpu_task_wait_for_all();
+	free(handles);
+
+	starpu_mpi_shutdown();
+	starpu_shutdown();
+
+	MPI_Finalize();
+
+	STARPU_ASSERT_MSG(sum == value, "Sum of first %d integers is %d, not %d\n", size-1, sum, value);
+
+	return 0;
+}

+ 3 - 0
sc_hypervisor/include/sc_hypervisor_monitoring.h

@@ -50,6 +50,9 @@ struct sc_hypervisor_wrapper
 	/* idle time of workers in this context */
 	double current_idle_time[STARPU_NMAXWORKERS];
 	
+	double idle_time[STARPU_NMAXWORKERS];
+	double idle_start_time[STARPU_NMAXWORKERS];
+	
 	/* list of workers that will leave this contexts (lazy resizing process) */
 	int worker_to_be_removed[STARPU_NMAXWORKERS];
 

+ 1 - 0
sc_hypervisor/src/Makefile.am

@@ -25,6 +25,7 @@ libsc_hypervisor_la_SOURCES = 				\
 	sc_hypervisor.c					\
 	sc_config.c					\
 	policies_utils/policy_tools.c			\
+	policies_utils/speed.c				\
 	policies_utils/task_pool.c			\
 	policies_utils/lp_tools.c			\
 	policies_utils/lp_programs.c			\

+ 50 - 46
sc_hypervisor/src/hypervisor_policies/feft_lp_policy.c

@@ -22,56 +22,56 @@
 #ifdef STARPU_HAVE_GLPK_H
 static void _try_resizing(void)
 {
+	/* for vite */
+	starpu_trace_user_event(2);
+
 	int nsched_ctxs = sc_hypervisor_get_nsched_ctxs();
-	
 	double nworkers[nsched_ctxs][2];
-	
-	int ret = starpu_pthread_mutex_trylock(&act_hypervisor_mutex);
-	if(ret != EBUSY)
-	{
-		starpu_trace_user_event(2);
-		int nw = 1;
+	int nw = 1;
 #ifdef STARPU_USE_CUDA
-		int ncuda = starpu_worker_get_count_by_type(STARPU_CUDA_WORKER);
-		nw = ncuda != 0 ? 2 : 1;
+	int ncuda = starpu_worker_get_count_by_type(STARPU_CUDA_WORKER);
+	nw = ncuda != 0 ? 2 : 1;
 #endif
-		int total_nw[nw];
-		sc_hypervisor_group_workers_by_type(NULL, -1, nw, total_nw);
-		
-		
-		struct timeval start_time;
-		struct timeval end_time;
-		gettimeofday(&start_time, NULL);
-		
-		double vmax = sc_hypervisor_lp_get_nworkers_per_ctx(nsched_ctxs, nw, nworkers, total_nw);
-		gettimeofday(&end_time, NULL);
-		
-		long diff_s = end_time.tv_sec  - start_time.tv_sec;
-		long diff_us = end_time.tv_usec  - start_time.tv_usec;
-		
-		float timing = (float)(diff_s*1000000 + diff_us)/1000;
-
-		if(vmax != 0.0)
-		{
-			int nworkers_rounded[nsched_ctxs][nw];
-			sc_hypervisor_lp_round_double_to_int(nsched_ctxs, nw, nworkers, nworkers_rounded);
-			sc_hypervisor_lp_redistribute_resources_in_ctxs(nsched_ctxs, nw, nworkers_rounded, nworkers);
-		}
-		starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
+	int total_nw[nw];
+	sc_hypervisor_group_workers_by_type(NULL, -1, nw, total_nw);
+	
+	
+	struct timeval start_time;
+	struct timeval end_time;
+	gettimeofday(&start_time, NULL);
+	
+	double vmax = sc_hypervisor_lp_get_nworkers_per_ctx(nsched_ctxs, nw, nworkers, total_nw);
+	gettimeofday(&end_time, NULL);
+	
+	long diff_s = end_time.tv_sec  - start_time.tv_sec;
+	long diff_us = end_time.tv_usec  - start_time.tv_usec;
+	
+	float timing = (float)(diff_s*1000000 + diff_us)/1000;
+	
+	if(vmax != 0.0)
+	{
+		int nworkers_rounded[nsched_ctxs][nw];
+		sc_hypervisor_lp_round_double_to_int(nsched_ctxs, nw, nworkers, nworkers_rounded);
+		sc_hypervisor_lp_redistribute_resources_in_ctxs(nsched_ctxs, nw, nworkers_rounded, nworkers);
 	}
 	
 }
 static void feft_lp_handle_poped_task(unsigned sched_ctx, int worker, struct starpu_task *task, uint32_t footprint)
 {
-	unsigned criteria = sc_hypervisor_get_resize_criteria();
-	if(criteria != SC_NOTHING && criteria == SC_VELOCITY)
+	int ret = starpu_pthread_mutex_trylock(&act_hypervisor_mutex);
+	if(ret != EBUSY)
 	{
-
-		if(sc_hypervisor_check_velocity_gap_btw_ctxs())
+		unsigned criteria = sc_hypervisor_get_resize_criteria();
+		if(criteria != SC_NOTHING && criteria == SC_VELOCITY)
 		{
-			_try_resizing();
+			if(sc_hypervisor_check_velocity_gap_btw_ctxs())
+			{
+				_try_resizing();
+			}
 		}
+		starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
 	}
+
 }
 static void feft_lp_size_ctxs(int *sched_ctxs, int ns, int *workers, int nworkers)
 {
@@ -111,15 +111,13 @@ static void feft_lp_size_ctxs(int *sched_ctxs, int ns, int *workers, int nworker
 /* 				printf("ctx %d/worker type %d: n = %d \n", i, 1, nworkers_per_type_rounded[i][1]); */
 /* #endif */
 /* 		} */
-		int *current_sched_ctxs = sched_ctxs == NULL ? sc_hypervisor_get_sched_ctxs() : 
-			sched_ctxs;
+		int *current_sched_ctxs = sched_ctxs == NULL ? sc_hypervisor_get_sched_ctxs() : sched_ctxs;
 
 		unsigned has_workers = 0;
 		int s;
 		for(s = 0; s < ns; s++)
 		{
-			int nworkers_ctx = sc_hypervisor_get_nworkers_ctx(current_sched_ctxs[s], 
-									     STARPU_ANY_WORKER);
+			int nworkers_ctx = sc_hypervisor_get_nworkers_ctx(current_sched_ctxs[s], STARPU_ANY_WORKER);
 			if(nworkers_ctx != 0)
 			{
 				has_workers = 1;
@@ -136,14 +134,20 @@ static void feft_lp_size_ctxs(int *sched_ctxs, int ns, int *workers, int nworker
 
 static feft_lp_handle_idle_cycle(unsigned sched_ctx, int worker)
 {
-	unsigned criteria = sc_hypervisor_get_resize_criteria();
-	if(criteria != SC_NOTHING && criteria == SC_IDLE)
+	int ret = starpu_pthread_mutex_trylock(&act_hypervisor_mutex);
+	if(ret != EBUSY)
 	{
-
-		if(sc_hypervisor_check_idle(sched_ctx, worker))
+		unsigned criteria = sc_hypervisor_get_resize_criteria();
+		if(criteria != SC_NOTHING && criteria == SC_IDLE)
 		{
-			_try_resizing();
+			
+			if(sc_hypervisor_check_idle(sched_ctx, worker))
+			{
+				_try_resizing();
+//				sc_hypervisor_move_workers(sched_ctx, 3 - sched_ctx, &worker, 1, 1);
+			}
 		}
+		starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
 	}
 }
 

+ 1 - 0
sc_hypervisor/src/hypervisor_policies/teft_lp_policy.c

@@ -275,6 +275,7 @@ static int teft_lp_handle_idle_cycle(unsigned sched_ctx, int worker)
 			if(sc_hypervisor_check_idle(sched_ctx, worker))
 			{
 				_try_resizing();
+//				sc_hypervisor_move_workers(sched_ctx, 3 - sched_ctx, &worker, 1, 1);
 			}
 		}
 		starpu_pthread_mutex_unlock(&act_hypervisor_mutex);

+ 18 - 2
sc_hypervisor/src/policies_utils/lp_tools.c

@@ -17,6 +17,7 @@
 #include <math.h>
 #include "sc_hypervisor_lp.h"
 #include "sc_hypervisor_policy.h"
+#include "sc_hypervisor_intern.h"
 #include <starpu_config.h>
 
 #ifdef STARPU_HAVE_GLPK_H
@@ -48,11 +49,26 @@ double sc_hypervisor_lp_get_nworkers_per_ctx(int nsched_ctxs, int ntypes_of_work
 #else
 		v[i][0] = sc_hypervisor_get_velocity(sc_w, STARPU_CPU_WORKER);
 #endif // STARPU_USE_CUDA
-		flops[i] = sc_w->remaining_flops/1000000000; //sc_w->total_flops/1000000000; /* in gflops*/
+		
+		flops[i] = sc_w->remaining_flops < 0.0 ? 0.0 : sc_w->remaining_flops/1000000000; //sc_w->total_flops/1000000000; /* in gflops*/
 //		printf("%d: flops %lf\n", sched_ctxs[i], flops[i]);
 	}
 
-	return 1/sc_hypervisor_lp_simulate_distrib_flops(nsched_ctxs, ntypes_of_workers, v, flops, res, total_nw);
+	double vmax = 1/sc_hypervisor_lp_simulate_distrib_flops(nsched_ctxs, ntypes_of_workers, v, flops, res, total_nw);
+	double optimal_v = 0.0;
+	for(i = 0; i < nsched_ctxs; i++)
+	{
+#ifdef STARPU_USE_CUDA
+		optimal_v = res[i][0] * v[i][0] + res[i][1]* v[i][1];
+#else
+		optimal_v = res[i][0] * v[i][0];
+#endif //STARPU_USE_CUDA
+//				printf("%d: set opt %lf\n", i, optimal_v[i]);
+		if(optimal_v != 0.0)
+			_set_optimal_v(i, optimal_v);
+	}
+
+	return vmax;
 #else//STARPU_HAVE_GLPK_H
 	return 0.0;
 #endif//STARPU_HAVE_GLPK_H

+ 113 - 171
sc_hypervisor/src/policies_utils/policy_tools.c

@@ -16,6 +16,7 @@
 
 #include "sc_hypervisor_policy.h"
 #include "sc_hypervisor_intern.h"
+#include "sc_hypervisor_lp.h"
 #include <math.h>
 
 static int _compute_priority(unsigned sched_ctx)
@@ -348,33 +349,6 @@ static double _get_ispeed_sample_for_sched_ctx(unsigned sched_ctx)
 	return ispeed_sample;
 }
 
-double sc_hypervisor_get_ctx_velocity(struct sc_hypervisor_wrapper* sc_w)
-{
-	struct sc_hypervisor_policy_config *config = sc_hypervisor_get_config(sc_w->sched_ctx);
-        double elapsed_flops = sc_hypervisor_get_elapsed_flops_per_sched_ctx(sc_w);
-//	double sample = _get_ispeed_sample_for_sched_ctx(sc_w->sched_ctx);
-	double sample = config->ispeed_ctx_sample;
-	
-/* 	double total_elapsed_flops = sc_hypervisor_get_total_elapsed_flops_per_sched_ctx(sc_w); */
-/* 	double prc = config->ispeed_ctx_sample != 0.0 ? elapsed_flops : elapsed_flops/sc_w->total_flops; */
-/* 	double redim_sample = config->ispeed_ctx_sample != 0.0 ? config->ispeed_ctx_sample :  */
-/* 		(elapsed_flops == total_elapsed_flops ? HYPERVISOR_START_REDIM_SAMPLE : HYPERVISOR_REDIM_SAMPLE); */
-//	printf("%d: prc %lf sample %lf\n", sc_w->sched_ctx, prc, redim_sample);
-
-/* 	double curr_time2 = starpu_timing_now(); */
-/* 	double elapsed_time2 = (curr_time2 - sc_w->start_time) / 1000000.0; /\* in seconds *\/ */
-/* 	if(elapsed_time2 > 5.0 && elapsed_flops < sample) */
-/* 		return (elapsed_flops/1000000000.0)/elapsed_time2;/\* in Gflops/s *\/ */
-
-	if(elapsed_flops >= sample)
-        {
-                double curr_time = starpu_timing_now();
-                double elapsed_time = (curr_time - sc_w->start_time) / 1000000.0; /* in seconds */
-                return (elapsed_flops/1000000000.0)/elapsed_time;/* in Gflops/s */
-        }
-	return -1.0;
-}
-
 double sc_hypervisor_get_slowest_ctx_exec_time(void)
 {
 	int *sched_ctxs = sc_hypervisor_get_sched_ctxs();
@@ -424,150 +398,6 @@ double sc_hypervisor_get_fastest_ctx_exec_time(void)
 	return fastest_time;
 }
 
-
-double sc_hypervisor_get_velocity_per_worker(struct sc_hypervisor_wrapper *sc_w, unsigned worker)
-{
-	if(!starpu_sched_ctx_contains_worker(worker, sc_w->sched_ctx))
-		return -1.0;
-
-        double elapsed_flops = sc_w->elapsed_flops[worker] / 1000000000.0; /*in gflops */
-	size_t elapsed_data_used = sc_w->elapsed_data[worker];
-	int elapsed_tasks = sc_w->elapsed_tasks[worker];
-	struct sc_hypervisor_policy_config *config = sc_hypervisor_get_config(sc_w->sched_ctx);
-	double sample = config->ispeed_w_sample[worker] / 1000000000.0; /*in gflops */
-
-	double ctx_elapsed_flops = sc_hypervisor_get_elapsed_flops_per_sched_ctx(sc_w);
-	double ctx_sample = config->ispeed_ctx_sample;
-	if(ctx_elapsed_flops > ctx_sample && elapsed_flops == 0.0)
-		return 0.00000000000001;
-
-/*         if( elapsed_flops >= sample) */
-/*         { */
-/*                 double curr_time = starpu_timing_now(); */
-/*                 double elapsed_time = (curr_time - sc_w->start_time) / 1000000.0; /\* in seconds *\/ */
-/* 		sc_w->ref_velocity[worker] = (elapsed_flops/elapsed_time); /\* in Gflops/s *\/ */
-/*                 return sc_w->ref_velocity[worker]; */
-/*         } */
-
-/*         return -1.0; */
-
-        if( elapsed_flops != 0.0)
-        {
-                double curr_time = starpu_timing_now();
-		size_t elapsed_data_used = sc_w->elapsed_data[worker];
-                double elapsed_time = (curr_time - sc_w->start_time) / 1000000.0; /* in seconds */
- 		enum starpu_worker_archtype arch = starpu_worker_get_type(worker);
-		if(arch == STARPU_CUDA_WORKER)
-		{
-/* 			unsigned worker_in_ctx = starpu_sched_ctx_contains_worker(worker, sc_w->sched_ctx); */
-/* 			if(!worker_in_ctx) */
-/* 			{ */
-
-/* 				double transfer_velocity = starpu_get_bandwidth_RAM_CUDA(worker); */
-/* 				elapsed_time +=  (elapsed_data_used / transfer_velocity) / 1000000 ; */
-/* 			} */
-			double latency = starpu_get_latency_RAM_CUDA(worker);
-//			printf("%d/%d: latency %lf elapsed_time before %lf ntasks %d\n", worker, sc_w->sched_ctx, latency, elapsed_time, elapsed_tasks);
-			elapsed_time += (elapsed_tasks * latency)/1000000;
-//			printf("elapsed time after %lf \n", elapsed_time);
-		}
-			
-                double vel  = (elapsed_flops/elapsed_time);/* in Gflops/s */
-		sc_w->ref_velocity[worker] = sc_w->ref_velocity[worker] > 1.0 ? (sc_w->ref_velocity[worker] + vel) / 2 : vel; 
-                return vel;
-        }
-
-        return 0.00000000000001;
-
-
-}
-
-static double _get_best_elapsed_flops(struct sc_hypervisor_wrapper* sc_w, int *npus, enum starpu_worker_archtype req_arch)
-{
-	double ret_val = 0.0;
-	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sc_w->sched_ctx);
-        int worker;
-
-	struct starpu_sched_ctx_iterator it;
-	if(workers->init_iterator)
-                workers->init_iterator(workers, &it);
-
-        while(workers->has_next(workers, &it))
-	{
-                worker = workers->get_next(workers, &it);
-                enum starpu_worker_archtype arch = starpu_worker_get_type(worker);
-                if(arch == req_arch)
-                {
-			if(sc_w->elapsed_flops[worker] > ret_val)
-				ret_val = sc_w->elapsed_flops[worker];
-			(*npus)++;
-                }
-        }
-
-	return ret_val;
-}
-
-/* compute an average value of the cpu/cuda velocity */
-double sc_hypervisor_get_velocity_per_worker_type(struct sc_hypervisor_wrapper* sc_w, enum starpu_worker_archtype arch)
-{
-        int npus = 0;
-        double elapsed_flops = _get_best_elapsed_flops(sc_w, &npus, arch) / 1000000000.0 ; /* in gflops */
-	if(npus == 0)
-		return -1.0; 
-
-        if( elapsed_flops != 0.0)
-        {
-                double curr_time = starpu_timing_now();
-                double elapsed_time = (curr_time - sc_w->start_time) / 1000000.0; /* in seconds */
-		double velocity = (elapsed_flops/elapsed_time); /* in Gflops/s */
-                return velocity;
-        }
-
-        return -1.0;
-}
-
-
-/* check if there is a big velocity gap between the contexts */
-unsigned sc_hypervisor_check_velocity_gap_btw_ctxs(void)
-{
-	int *sched_ctxs = sc_hypervisor_get_sched_ctxs();
-	int nsched_ctxs = sc_hypervisor_get_nsched_ctxs();
-	int i = 0, j = 0;
-	struct sc_hypervisor_wrapper* sc_w;
-	struct sc_hypervisor_wrapper* other_sc_w;
-
-	for(i = 0; i < nsched_ctxs; i++)
-	{
-		sc_w = sc_hypervisor_get_wrapper(sched_ctxs[i]);
-		double ctx_v = sc_hypervisor_get_ctx_velocity(sc_w);
-		if(ctx_v != -1.0)
-		{
-			for(j = 0; j < nsched_ctxs; j++)
-			{
-				if(sched_ctxs[i] != sched_ctxs[j])
-				{
-					unsigned nworkers = starpu_sched_ctx_get_nworkers(sched_ctxs[j]);
-					if(nworkers == 0) 
-						return 1;
-
-					other_sc_w = sc_hypervisor_get_wrapper(sched_ctxs[j]);
-					double other_ctx_v = sc_hypervisor_get_ctx_velocity(other_sc_w);
-					if(other_ctx_v != -1.0)
-					{
-						double gap = ctx_v < other_ctx_v ? other_ctx_v / ctx_v : ctx_v / other_ctx_v ;
-//						if(gap > 1.5)
-						if(gap > _get_max_velocity_gap())
-							return 1;
-					}
-				}
-			}
-		}
-
-	}
-	return 0;
-}
-
-
 void sc_hypervisor_group_workers_by_type(int *workers, int nworkers, int ntypes_of_workers, int total_nw[ntypes_of_workers])
 {
 	int current_nworkers = workers == NULL ? starpu_worker_get_count() : nworkers;
@@ -646,6 +476,118 @@ unsigned sc_hypervisor_check_idle(unsigned sched_ctx, int worker)
 	return 0;
 }
 
+/* check if there is a big velocity gap between the contexts */
+unsigned sc_hypervisor_check_velocity_gap_btw_ctxs(void)
+{
+	int *sched_ctxs = sc_hypervisor_get_sched_ctxs();
+	int nsched_ctxs = sc_hypervisor_get_nsched_ctxs();
+	int i = 0, j = 0;
+	struct sc_hypervisor_wrapper* sc_w;
+	struct sc_hypervisor_wrapper* other_sc_w;
+
+	
+	double optimal_v[nsched_ctxs];
+	unsigned has_opt_v = 1;
+	for(i = 0; i < nsched_ctxs; i++)
+	{
+		optimal_v[i] = _get_optimal_v(i);
+		if(optimal_v[i] == 0.0)
+		{
+			has_opt_v = 0;
+			break;
+		}
+	}
+
+	if(!has_opt_v)
+	{
+		int nw = 1;
+#ifdef STARPU_USE_CUDA
+		int ncuda = starpu_worker_get_count_by_type(STARPU_CUDA_WORKER);
+		nw = ncuda != 0 ? 2 : 1;
+#endif	
+		double nworkers_per_type[nsched_ctxs][nw];
+		int total_nw[nw];
+		for(i = 0; i < nw; i++)
+		{
+			for(j = 0; j < nsched_ctxs; j++)
+				nworkers_per_type[j][i] = 0.0;
+			total_nw[i] = 0;
+		}
+		sc_hypervisor_group_workers_by_type(NULL, -1, nw, total_nw);
+		
+		double vmax = sc_hypervisor_lp_get_nworkers_per_ctx(nsched_ctxs, nw, nworkers_per_type, total_nw);
+		
+		if(vmax != 0.0)
+		{
+			for(i = 0; i < nsched_ctxs; i++)
+			{
+				sc_w = sc_hypervisor_get_wrapper(sched_ctxs[i]);
+				double v[nw];
+				v[0] = sc_hypervisor_get_velocity(sc_w, STARPU_CUDA_WORKER);
+				v[1] = sc_hypervisor_get_velocity(sc_w, STARPU_CPU_WORKER);
+				
+				optimal_v[i] = nworkers_per_type[i][0] * v[0] + nworkers_per_type[i][1]* v[1];
+				_set_optimal_v(i, optimal_v[i]);
+			}
+			has_opt_v = 1;
+		}
+	}
+
+	if(has_opt_v)
+	{
+		for(i = 0; i < nsched_ctxs; i++)
+		{
+			sc_w = sc_hypervisor_get_wrapper(sched_ctxs[i]);
+			
+			double ctx_v = sc_hypervisor_get_ctx_velocity(sc_w);
+			if(ctx_v == -1.0)
+				return 0;
+		}
+
+		for(i = 0; i < nsched_ctxs; i++)
+		{
+			sc_w = sc_hypervisor_get_wrapper(sched_ctxs[i]);
+			
+			double ctx_v = sc_hypervisor_get_ctx_velocity(sc_w);
+			if(ctx_v != -1.0 && ((ctx_v < 0.8*optimal_v[i]) || ctx_v > 1.2*optimal_v[i])) 
+				return 1;
+		}
+	}
+	else
+	{
+		for(i = 0; i < nsched_ctxs; i++)
+		{
+			sc_w = sc_hypervisor_get_wrapper(sched_ctxs[i]);
+			double ctx_v = sc_hypervisor_get_ctx_velocity(sc_w);
+			if(ctx_v != -1.0)
+			{
+				for(j = 0; j < nsched_ctxs; j++)
+				{
+					if(sched_ctxs[i] != sched_ctxs[j])
+					{
+						unsigned nworkers = starpu_sched_ctx_get_nworkers(sched_ctxs[j]);
+						if(nworkers == 0)
+							return 1;
+						
+						other_sc_w = sc_hypervisor_get_wrapper(sched_ctxs[j]);
+						double other_ctx_v = sc_hypervisor_get_ctx_velocity(other_sc_w);
+						if(other_ctx_v != -1.0)
+						{
+							double gap = ctx_v < other_ctx_v ? other_ctx_v / ctx_v : ctx_v / other_ctx_v;
+							double max_vel = _get_max_velocity_gap();
+							if(gap > max_vel-1 && gap < max_vel+1)
+								return 1;
+						}
+					}
+				}
+			}
+			
+		}
+	}
+	return 0;
+}
+
+
 unsigned sc_hypervisor_criteria_fulfilled(unsigned sched_ctx, int worker)
 {
 	unsigned criteria = sc_hypervisor_get_resize_criteria();

+ 168 - 0
sc_hypervisor/src/policies_utils/speed.c

@@ -0,0 +1,168 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010-2013  INRIA
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include "sc_hypervisor_policy.h"
+#include "sc_hypervisor_intern.h"
+#include <math.h>
+
+
+double sc_hypervisor_get_ctx_velocity(struct sc_hypervisor_wrapper* sc_w)
+{
+	struct sc_hypervisor_policy_config *config = sc_hypervisor_get_config(sc_w->sched_ctx);
+        double elapsed_flops = sc_hypervisor_get_elapsed_flops_per_sched_ctx(sc_w);
+	double sample = config->ispeed_ctx_sample;
+	
+
+	double total_elapsed_flops = sc_hypervisor_get_total_elapsed_flops_per_sched_ctx(sc_w);
+	double total_flops = sc_w->total_flops;
+
+	char *start_sample_prc_char = getenv("SC_HYPERVISOR_START_RESIZE");
+	double start_sample_prc = start_sample_prc_char ? atof(start_sample_prc_char) : 0.0;
+	double start_sample = start_sample_prc > 0.0 ? (start_sample_prc / 100) * total_flops : sample;
+	double redim_sample = elapsed_flops == total_elapsed_flops ? (start_sample > 0.0 ? start_sample : sample) : sample;
+
+	if(elapsed_flops >= redim_sample)
+        {
+                double curr_time = starpu_timing_now();
+                double elapsed_time = (curr_time - sc_w->start_time) / 1000000.0; /* in seconds */
+                return (elapsed_flops/1000000000.0)/elapsed_time;/* in Gflops/s */
+        }
+	return -1.0;
+}
+
+double sc_hypervisor_get_velocity_per_worker(struct sc_hypervisor_wrapper *sc_w, unsigned worker)
+{
+	if(!starpu_sched_ctx_contains_worker(worker, sc_w->sched_ctx))
+		return -1.0;
+
+        double elapsed_flops = sc_w->elapsed_flops[worker] / 1000000000.0; /*in gflops */
+
+	struct sc_hypervisor_policy_config *config = sc_hypervisor_get_config(sc_w->sched_ctx);
+	double sample = config->ispeed_w_sample[worker] / 1000000000.0; /*in gflops */
+
+	double ctx_elapsed_flops = sc_hypervisor_get_elapsed_flops_per_sched_ctx(sc_w);
+	double ctx_sample = config->ispeed_ctx_sample;
+	if(ctx_elapsed_flops > ctx_sample && elapsed_flops == 0.0)
+		return 0.00000000000001;
+
+
+        if( elapsed_flops > sample)
+        {
+                double curr_time = starpu_timing_now();
+                double elapsed_time = (curr_time - sc_w->start_time) / 1000000.0; /* in seconds */
+		elapsed_time -= sc_w->idle_time[worker];
+		sc_w->idle_time[worker] = 0.0;
+
+/* 		size_t elapsed_data_used = sc_w->elapsed_data[worker]; */
+/*  		enum starpu_worker_archtype arch = starpu_worker_get_type(worker); */
+/* 		if(arch == STARPU_CUDA_WORKER) */
+/* 		{ */
+/* /\* 			unsigned worker_in_ctx = starpu_sched_ctx_contains_worker(worker, sc_w->sched_ctx); *\/ */
+/* /\* 			if(!worker_in_ctx) *\/ */
+/* /\* 			{ *\/ */
+
+/* /\* 				double transfer_velocity = starpu_get_bandwidth_RAM_CUDA(worker); *\/ */
+/* /\* 				elapsed_time +=  (elapsed_data_used / transfer_velocity) / 1000000 ; *\/ */
+/* /\* 			} *\/ */
+/* 			double latency = starpu_get_latency_RAM_CUDA(worker); */
+/* //			printf("%d/%d: latency %lf elapsed_time before %lf ntasks %d\n", worker, sc_w->sched_ctx, latency, elapsed_time, elapsed_tasks); */
+/* 			elapsed_time += (elapsed_tasks * latency)/1000000; */
+/* //			printf("elapsed time after %lf \n", elapsed_time); */
+/* 		} */
+			
+                double vel  = (elapsed_flops/elapsed_time);/* in Gflops/s */
+//		printf("%d in ctx %d: vel %lf\n", worker, sc_w->sched_ctx, vel);
+		sc_w->ref_velocity[worker] = sc_w->ref_velocity[worker] > 1.0 ? (sc_w->ref_velocity[worker] + vel) / 2 : vel; 
+                return vel;
+        }
+
+        return -1.0;
+
+
+}
+
+
+/* compute an average value of the cpu/cuda velocity */
+double sc_hypervisor_get_velocity_per_worker_type(struct sc_hypervisor_wrapper* sc_w, enum starpu_worker_archtype arch)
+{
+	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sc_w->sched_ctx);
+        int worker;
+
+	struct starpu_sched_ctx_iterator it;
+	if(workers->init_iterator)
+                workers->init_iterator(workers, &it);
+
+	double velocity = 0.0;
+	unsigned nworkers = 0;
+        while(workers->has_next(workers, &it))
+	{
+                worker = workers->get_next(workers, &it);
+                enum starpu_worker_archtype req_arch = starpu_worker_get_type(worker);
+                if(arch == req_arch)
+                {
+			double _vel = sc_hypervisor_get_velocity_per_worker(sc_w, worker);
+			if(_vel == -1.0) return -1.0;
+			velocity += _vel;
+			nworkers++;
+		}
+	}
+			
+
+        return (nworkers != 0 ? velocity / nworkers : -1.0);
+}
+
+/* compute an average value of the cpu/cuda old velocity */
+double sc_hypervisor_get_ref_velocity_per_worker_type(struct sc_hypervisor_wrapper* sc_w, enum starpu_worker_archtype arch)
+{
+	double ref_velocity = 0.0;
+	unsigned nw = 0;
+
+	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sc_w->sched_ctx);
+	int worker;
+
+	struct starpu_sched_ctx_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
+
+	while(workers->has_next(workers, &it))
+	{
+		worker = workers->get_next(workers, &it);
+                enum starpu_worker_archtype req_arch = starpu_worker_get_type(worker);
+                if(arch == req_arch)
+                {
+		
+			if(sc_w->ref_velocity[worker] > 1.0)
+			{
+				ref_velocity += sc_w->ref_velocity[worker];
+				nw++;
+			}
+		}
+	}
+	
+	return (nw != 0 ? ref_velocity / nw : -1.0);
+}
+
+double sc_hypervisor_get_velocity(struct sc_hypervisor_wrapper *sc_w, enum starpu_worker_archtype arch)
+{
+
+	double velocity = sc_hypervisor_get_velocity_per_worker_type(sc_w, arch);
+	if(velocity == -1.0)
+		velocity = sc_hypervisor_get_ref_velocity_per_worker_type(sc_w, arch);
+	if(velocity == -1.0)
+		velocity = arch == STARPU_CPU_WORKER ? 5.0 : 100.0;
+       
+	return velocity;
+}

+ 27 - 85
sc_hypervisor/src/sc_hypervisor.c

@@ -113,7 +113,7 @@ static struct sc_hypervisor_policy *_select_hypervisor_policy(struct sc_hypervis
 	}
 	else
 	{
-		policy_name = getenv("HYPERVISOR_POLICY");
+		policy_name = getenv("SC_HYPERVISOR_POLICY");
 	}
 
 	if (policy_name)
@@ -134,10 +134,10 @@ struct starpu_sched_ctx_performance_counters* sc_hypervisor_init(struct sc_hyper
 {
 	hypervisor.min_tasks = 0;
 	hypervisor.nsched_ctxs = 0;
-	char* vel_gap = getenv("MAX_VELOCITY_GAP");
+	char* vel_gap = getenv("SC_HYPERVISOR_MAX_VELOCITY_GAP");
 	hypervisor.max_velocity_gap = vel_gap ? atof(vel_gap) : SC_VELOCITY_MAX_GAP_DEFAULT;
-	char* crit =  getenv("HYPERVISOR_TRIGGER_RESIZE");
-	hypervisor.resize_criteria = !crit ? SC_NOTHING : strcmp(crit,"idle") == 0 ? SC_IDLE : (strcmp(crit,"speed") == 0 ? SC_VELOCITY : SC_NOTHING);
+	char* crit =  getenv("SC_HYPERVISOR_TRIGGER_RESIZE");
+	hypervisor.resize_criteria = !crit ? SC_IDLE : strcmp(crit,"idle") == 0 ? SC_IDLE : (strcmp(crit,"speed") == 0 ? SC_VELOCITY : SC_NOTHING);
 
 	starpu_pthread_mutex_init(&act_hypervisor_mutex, NULL);
 	hypervisor.start_executing_time = starpu_timing_now();
@@ -162,11 +162,14 @@ struct starpu_sched_ctx_performance_counters* sc_hypervisor_init(struct sc_hyper
 		hypervisor.sched_ctx_w[i].resize_ack.nmoved_workers = 0;
 		hypervisor.sched_ctx_w[i].resize_ack.acked_workers = NULL;
 		starpu_pthread_mutex_init(&hypervisor.sched_ctx_w[i].mutex, NULL);
+		hypervisor.optimal_v[i] = 0.0;
 
 		int j;
 		for(j = 0; j < STARPU_NMAXWORKERS; j++)
 		{
 			hypervisor.sched_ctx_w[i].current_idle_time[j] = 0.0;
+			hypervisor.sched_ctx_w[i].idle_time[j] = 0.0;
+			hypervisor.sched_ctx_w[i].idle_start_time[j] = 0.0;
 			hypervisor.sched_ctx_w[i].pushed_tasks[j] = 0;
 			hypervisor.sched_ctx_w[i].poped_tasks[j] = 0;
 			hypervisor.sched_ctx_w[i].elapsed_flops[j] = 0.0;
@@ -216,7 +219,7 @@ void sc_hypervisor_start_resize(unsigned sched_ctx)
 
 static void _print_current_time()
 {
-	if(!getenv("HYPERVISOR_STOP_PRINT"))
+	if(!getenv("SC_HYPERVISOR_STOP_PRINT"))
 	{
 		double curr_time = starpu_timing_now();
 		double elapsed_time = (curr_time - hypervisor.start_executing_time) / 1000000.0; /* in seconds */
@@ -348,30 +351,6 @@ void sc_hypervisor_unregister_ctx(unsigned sched_ctx)
 	starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
 }
 
-static double _get_best_total_elapsed_flops(struct sc_hypervisor_wrapper* sc_w, int *npus, enum starpu_worker_archtype req_arch)
-{
-	double ret_val = 0.0;
-	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sc_w->sched_ctx);
-        int worker;
-
-	struct starpu_sched_ctx_iterator it;
-	if(workers->init_iterator)
-                workers->init_iterator(workers, &it);
-
-        while(workers->has_next(workers, &it))
-	{
-                worker = workers->get_next(workers, &it);
-                enum starpu_worker_archtype arch = starpu_worker_get_type(worker);
-                if(arch == req_arch)
-                {
-			if(sc_w->total_elapsed_flops[worker] > ret_val)
-				ret_val = sc_w->total_elapsed_flops[worker];
-			(*npus)++;
-                }
-        }
-
-	return ret_val;
-}
 
 double _get_max_velocity_gap()
 {
@@ -383,53 +362,6 @@ unsigned sc_hypervisor_get_resize_criteria()
 	return hypervisor.resize_criteria;
 }
 
-/* compute an average value of the cpu/cuda velocity */
-double sc_hypervisorsc_hypervisor_get_velocity_per_worker_type(struct sc_hypervisor_wrapper* sc_w, enum starpu_worker_archtype arch)
-{
-        int npus = 0;
-        double elapsed_flops = _get_best_total_elapsed_flops(sc_w, &npus, arch) / 1000000000.0 ; /* in gflops */
-	if(npus == 0)
-		return -1.0; 
-
-        if( elapsed_flops != 0.0)
-        {
-                double curr_time = starpu_timing_now();
-                double elapsed_time = (curr_time - sc_w->real_start_time) / 1000000.0; /* in seconds */
-		double velocity = (elapsed_flops/elapsed_time); /* in Gflops/s */
-                return velocity;
-        }
-
-        return -1.0;
-}
-
-/* compute an average value of the cpu/cuda old velocity */
-double sc_hypervisor_get_ref_velocity_per_worker_type(struct sc_hypervisor_wrapper* sc_w, enum starpu_worker_archtype arch)
-{
-	double ref_velocity = 0.0;
-	unsigned nw = 0;
-
-	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sc_w->sched_ctx);
-	int worker;
-
-	struct starpu_sched_ctx_iterator it;
-	if(workers->init_iterator)
-		workers->init_iterator(workers, &it);
-
-	while(workers->has_next(workers, &it))
-	{
-		worker = workers->get_next(workers, &it);
-		if(sc_w->ref_velocity[worker] > 1.0)
-		{
-			ref_velocity += sc_w->ref_velocity[worker];
-			nw++;
-		}
-	}
-	
-	if(nw > 0)
-		return ref_velocity / nw;
-	return -1.0;
-}
-
 static int get_ntasks( int *tasks)
 {
 	int ntasks = 0;
@@ -508,7 +440,6 @@ double sc_hypervisor_get_total_elapsed_flops_per_sched_ctx(struct sc_hypervisor_
 	return ret_val;
 }
 
-
 void _reset_resize_sample_info(unsigned sender_sched_ctx, unsigned receiver_sched_ctx)
 {
 	/* info concerning only the gflops_rate strateg */
@@ -793,6 +724,15 @@ static void notify_idle_end(unsigned sched_ctx, int worker)
 	if(hypervisor.resize[sched_ctx])
 		hypervisor.sched_ctx_w[sched_ctx].current_idle_time[worker] = 0.0;
 
+	struct sc_hypervisor_wrapper *sc_w = &hypervisor.sched_ctx_w[sched_ctx];
+
+	if(sc_w->idle_start_time[worker] != 0.0)
+	{
+		double end_time  = starpu_timing_now();
+		sc_w->idle_time[worker] += (end_time - sc_w->idle_start_time[worker]) / 1000000.0; /* in seconds */ 
+		sc_w->idle_start_time[worker] = 0.0;
+	}
+
 	if(hypervisor.policy.handle_idle_end)
 		hypervisor.policy.handle_idle_end(sched_ctx, worker);
 
@@ -805,6 +745,10 @@ static void notify_idle_cycle(unsigned sched_ctx, int worker, double idle_time)
 	{
 		struct sc_hypervisor_wrapper *sc_w = &hypervisor.sched_ctx_w[sched_ctx];
 		sc_w->current_idle_time[worker] += idle_time;
+
+		if(sc_w->idle_start_time[worker] == 0.0)
+			sc_w->idle_start_time[worker] = starpu_timing_now();
+
 		if(hypervisor.policy.handle_idle_cycle)
 		{
 			hypervisor.policy.handle_idle_cycle(sched_ctx, worker);
@@ -994,14 +938,12 @@ void sc_hypervisor_free_size_req(void)
 	}
 }
 
-double sc_hypervisor_get_velocity(struct sc_hypervisor_wrapper *sc_w, enum starpu_worker_archtype arch)
+double _get_optimal_v(unsigned sched_ctx)
 {
+	return hypervisor.optimal_v[sched_ctx];
+}
 
-	double velocity = sc_hypervisorsc_hypervisor_get_velocity_per_worker_type(sc_w, arch);
-	if(velocity == -1.0)
-		velocity = sc_hypervisor_get_ref_velocity_per_worker_type(sc_w, arch);
-	if(velocity == -1.0)
-		velocity = arch == STARPU_CPU_WORKER ? 5.0 : 100.0;
-       
-	return velocity;
+void _set_optimal_v(unsigned sched_ctx, double optimal_v)
+{
+	hypervisor.optimal_v[sched_ctx] = optimal_v;
 }

+ 5 - 0
sc_hypervisor/src/sc_hypervisor_intern.h

@@ -83,6 +83,9 @@ struct sc_hypervisor
 	
 	/* criteria to trigger resizing */
 	unsigned resize_criteria;
+
+	/* value of the speed to compare the speed of the context to */
+	double optimal_v[STARPU_NMAX_SCHED_CTXS];
 };
 
 struct sc_hypervisor_adjustment
@@ -100,3 +103,5 @@ void _remove_config(unsigned sched_ctx);
 
 double _get_max_velocity_gap();
 
+double _get_optimal_v(unsigned sched_ctx);
+void _set_optimal_v(unsigned sched_ctx, double optimal_v);

+ 1 - 7
src/core/dependencies/implicit_data_deps.c

@@ -481,12 +481,8 @@ void _starpu_unlock_post_sync_tasks(starpu_data_handle_t handle)
 	struct _starpu_task_wrapper_list *post_sync_tasks = NULL;
 	unsigned do_submit_tasks = 0;
 
-	STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
-
-	if (handle->sequential_consistency)
+	if (handle->post_sync_tasks_cnt > 0)
 	{
-		STARPU_ASSERT(handle->post_sync_tasks_cnt > 0);
-
 		if (--handle->post_sync_tasks_cnt == 0)
 		{
 			/* unlock all tasks : we need not hold the lock while unlocking all these tasks */
@@ -496,8 +492,6 @@ void _starpu_unlock_post_sync_tasks(starpu_data_handle_t handle)
 		}
 	}
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
-
 	if (do_submit_tasks)
 	{
 		struct _starpu_task_wrapper_list *link = post_sync_tasks;

+ 1 - 1
src/core/perfmodel/perfmodel_history.c

@@ -416,7 +416,7 @@ static void dump_per_arch_model_file(FILE *f, struct starpu_perfmodel *model, un
 	/* Dump the history into the model file in case it is necessary */
 	if (model->type == STARPU_HISTORY_BASED || model->type == STARPU_NL_REGRESSION_BASED)
 	{
-		fprintf(f, "# hash\t\tsize\t\tflops\t\tmean\t\tdev\t\tsum\t\tsum2\t\tn\n");
+		fprintf(f, "# hash\t\tsize\t\tflops\t\tmean (us)\t\tdev (us)\t\tsum\t\tsum2\t\tn\n");
 		ptr = per_arch_model->list;
 		while (ptr)
 		{

+ 1 - 1
src/core/perfmodel/perfmodel_print.c

@@ -28,7 +28,7 @@ void _starpu_perfmodel_print_history_based(struct starpu_perfmodel_per_arch *per
 	ptr = per_arch_model->list;
 
 	if (!parameter && ptr)
-		fprintf(output, "# hash\t\tsize\t\tflops\t\tmean\t\tstddev\t\tn\n");
+		fprintf(output, "# hash\t\tsize\t\tflops\t\tmean (us)\t\tstddev (us)\t\tn\n");
 
 	while (ptr)
 	{

+ 1 - 1
src/core/sched_policy.c

@@ -549,7 +549,6 @@ struct _starpu_sched_ctx* _get_next_sched_ctx_to_pop_into(struct _starpu_worker
 		struct _starpu_sched_ctx *sched_ctx, *good_sched_ctx = NULL;
 		unsigned smallest_counter =  worker->nsched_ctxs;
 		unsigned i;
-
 		for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
 		{
 			sched_ctx = worker->sched_ctx[i];
@@ -573,6 +572,7 @@ struct _starpu_sched_ctx* _get_next_sched_ctx_to_pop_into(struct _starpu_worker
 				if(sched_ctx != NULL && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 					sched_ctx->pop_counter[worker->workerid] = 0;
 			}
+			
 			continue;
 		}
 		return good_sched_ctx;

+ 3 - 1
src/core/workers.c

@@ -213,6 +213,7 @@ static int _starpu_can_use_nth_implementation(enum starpu_worker_archtype arch,
 	case STARPU_ANY_WORKER:
 	{
 		int cpu_func_enabled=1, cuda_func_enabled=1, opencl_func_enabled=1;
+		/* TODO: MIC/SCC */
 
 #if defined(STARPU_USE_CPU) || defined(STARPU_SIMGRID)
 		starpu_cpu_func_t cpu_func = _starpu_task_get_cpu_nth_implementation(cl, nimpl);
@@ -301,8 +302,9 @@ int starpu_combined_worker_can_execute_task(unsigned workerid, struct starpu_tas
 
 			/* Is the worker larger than requested ? */
 			int worker_size = (int)config.combined_workers[workerid - nworkers].worker_size;
+			int worker0 = config.combined_workers[workerid - nworkers].combined_workerid[0];
 			return !!((worker_size <= task->cl->max_parallelism) &&
-				  _starpu_can_use_nth_implementation(config.workers[workerid].arch, task->cl, nimpl));
+				_starpu_can_use_nth_implementation(config.workers[worker0].arch, task->cl, nimpl));
 		}
 		else
 		{

+ 6 - 1
src/datawizard/memalloc.c

@@ -920,6 +920,7 @@ unsigned starpu_data_test_if_allocated_on_node(starpu_data_handle_t handle, unsi
 	return handle->per_node[memory_node].allocated;
 }
 
+/* Record that this memchunk has been recently used */
 void _starpu_memchunk_recently_used(struct _starpu_mem_chunk *mc, unsigned node)
 {
 	_starpu_spin_lock(&lru_rwlock[node]);
@@ -929,10 +930,11 @@ void _starpu_memchunk_recently_used(struct _starpu_mem_chunk *mc, unsigned node)
 	_starpu_spin_unlock(&lru_rwlock[node]);
 }
 
+/* Push the given memchunk, recently used, at the end of the chunks to be evicted */
 /* The mc_rwlock[node] rw-lock should be taken prior to calling this function.*/
 static void _starpu_memchunk_recently_used_move(struct _starpu_mem_chunk *mc, unsigned node)
 {
-	/* XXX Sometimes the memchunk is not in the list... */
+	/* Note: Sometimes the memchunk is not in the list... */
 	struct _starpu_mem_chunk *mc_iter;
 	for (mc_iter = _starpu_mem_chunk_list_begin(mc_list[node]);
 	     mc_iter != _starpu_mem_chunk_list_end(mc_list[node]);
@@ -948,6 +950,9 @@ static void _starpu_memchunk_recently_used_move(struct _starpu_mem_chunk *mc, un
 	}
 }
 
+/* Put the recently used memchunks at the end of the mc_list, in the same order
+ * as the LRU list, so that the most recently used memchunk eventually comes
+ * last in the mc_list */
 static void starpu_lru(unsigned node)
 {
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&mc_rwlock[node]);

+ 22 - 5
src/datawizard/user_interactions.c

@@ -75,7 +75,8 @@ static void _starpu_data_acquire_fetch_data_callback(void *arg)
 	 * We enqueue the "post" sync task in the list associated to the handle
 	 * so that it is submitted by the starpu_data_release
 	 * function. */
-	_starpu_add_post_sync_tasks(wrapper->post_sync_task, handle);
+	if (wrapper->post_sync_task)
+		_starpu_add_post_sync_tasks(wrapper->post_sync_task, handle);
 
 	wrapper->callback(wrapper->callback_arg);
 
@@ -114,8 +115,9 @@ static void starpu_data_acquire_cb_pre_sync_callback(void *arg)
 }
 
 /* The data must be released by calling starpu_data_release later on */
-int starpu_data_acquire_on_node_cb(starpu_data_handle_t handle, unsigned node,
-			   enum starpu_data_access_mode mode, void (*callback)(void *), void *arg)
+int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t handle, unsigned node,
+							  enum starpu_data_access_mode mode, void (*callback)(void *), void *arg,
+							  int sequential_consistency)
 {
 	STARPU_ASSERT(handle);
 	STARPU_ASSERT_MSG(handle->nchildren == 0, "Acquiring a partitioned data (%p) is not possible", handle);
@@ -132,10 +134,12 @@ int starpu_data_acquire_on_node_cb(starpu_data_handle_t handle, unsigned node,
 	STARPU_PTHREAD_COND_INIT(&wrapper->cond, NULL);
 	STARPU_PTHREAD_MUTEX_INIT(&wrapper->lock, NULL);
 	wrapper->finished = 0;
+	wrapper->pre_sync_task = NULL;
+	wrapper->post_sync_task = NULL;
 
 	STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
-	int sequential_consistency = handle->sequential_consistency;
-	if (sequential_consistency)
+	int handle_sequential_consistency = handle->sequential_consistency;
+	if (handle_sequential_consistency && sequential_consistency)
 	{
 		struct starpu_task *new_task;
 		wrapper->pre_sync_task = starpu_task_create();
@@ -177,12 +181,25 @@ int starpu_data_acquire_on_node_cb(starpu_data_handle_t handle, unsigned node,
 	return 0;
 }
 
+
+int starpu_data_acquire_on_node_cb(starpu_data_handle_t handle, unsigned node,
+				   enum starpu_data_access_mode mode, void (*callback)(void *), void *arg)
+{
+	return starpu_data_acquire_on_node_cb_sequential_consistency(handle, node, mode, callback, arg, 1);
+}
+
 int starpu_data_acquire_cb(starpu_data_handle_t handle,
 			   enum starpu_data_access_mode mode, void (*callback)(void *), void *arg)
 {
 	return starpu_data_acquire_on_node_cb(handle, 0, mode, callback, arg);
 }
 
+int starpu_data_acquire_cb_sequential_consistency(starpu_data_handle_t handle,
+						  enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency)
+{
+	return starpu_data_acquire_on_node_cb_sequential_consistency(handle, 0, mode, callback, arg, sequential_consistency);
+}
+
 /*
  *	Block data request from application
  */

+ 4 - 4
src/debug/traces/starpu_paje.c

@@ -160,7 +160,7 @@ void _starpu_fxt_write_paje_header(FILE *file)
 	poti_DefineEntityValue("Fi", "S", "FetchingInput", "1.0 .1 1.0");
 	poti_DefineEntityValue("Po", "S", "PushingOutput", "0.1 1.0 1.0");
 	poti_DefineEntityValue("C", "S", "Callback", ".0 .3 .8");
-	poti_DefineEntityValue("B", "S", "Blocked", ".9 .1 .0");
+	poti_DefineEntityValue("B", "S", "Overhead", ".5 .18 .0");
 	poti_DefineEntityValue("Sl", "S", "Sleeping", ".9 .1 .0");
 	poti_DefineEntityValue("P", "S", "Progressing", ".4 .1 .6");
 
@@ -187,7 +187,7 @@ void _starpu_fxt_write_paje_header(FILE *file)
 		poti_DefineEntityValue("Fi", ctx, "FetchingInput", "1.0 .1 1.0");
 		poti_DefineEntityValue("Po", ctx, "PushingOutput", "0.1 1.0 1.0");
 		poti_DefineEntityValue("C", ctx, "Callback", ".0 .3 .8");
-		poti_DefineEntityValue("B", ctx, "Blocked", ".9 .1 .0");
+		poti_DefineEntityValue("B", ctx, "Overhead", ".5 .18 .0");
 		poti_DefineEntityValue("Sl", ctx, "Sleeping", ".9 .1 .0");
 		poti_DefineEntityValue("P", ctx, "Progressing", ".4 .1 .6");
 	}
@@ -226,7 +226,7 @@ void _starpu_fxt_write_paje_header(FILE *file)
 6       Fi       S      FetchingInput       \"1.0 .1 1.0\"            \n\
 6       Po       S      PushingOutput       \"0.1 1.0 1.0\"            \n\
 6       C       S       Callback       \".0 .3 .8\"            \n\
-6       B       S       Blocked         \".9 .1 .0\"		\n\
+6       B       S       Overhead         \".5 .18 .0\"		\n\
 6       Sl       S      Sleeping         \".9 .1 .0\"		\n\
 6       P       S       Progressing         \".4 .1 .6\"		\n");
 	fprintf(file, "\
@@ -245,7 +245,7 @@ void _starpu_fxt_write_paje_header(FILE *file)
 6       Fi       Ctx%u      FetchingInput       \"1.0 .1 1.0\"            \n\
 6       Po       Ctx%u      PushingOutput       \"0.1 1.0 1.0\"            \n\
 6       C       Ctx%u       Callback       \".0 .3 .8\"            \n\
-6       B       Ctx%u       Blocked         \".9 .1 .0\"		\n\
+6       B       Ctx%u       Overhead         \".5 .18 .0\"		\n\
 6       Sl       Ctx%u      Sleeping         \".9 .1 .0\"		\n\
 6       P       Ctx%u       Progressing         \".4 .1 .6\"		\n",
 		i, i, i, i, i, i, i, i);

+ 2 - 8
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -546,13 +546,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 		unsigned memory_node = starpu_worker_get_memory_node(worker);
 
 		/* Sometimes workers didn't take the tasks as early as we expected */
-		starpu_pthread_mutex_t *sched_mutex;
-		starpu_pthread_cond_t *sched_cond;
-		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
-
-		STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
-		fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
-		STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+		double exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
 
 		for(nimpl  = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 	 	{
@@ -638,7 +632,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 			if (unknown)
 				continue;
 
-			exp_end[worker_ctx][nimpl] = fifo->exp_start + fifo->exp_len + local_task_length[worker_ctx][nimpl];
+			exp_end[worker_ctx][nimpl] = exp_start + fifo->exp_len + local_task_length[worker_ctx][nimpl];
 
 			if (exp_end[worker_ctx][nimpl] < best_exp_end)
 			{

+ 6 - 5
tests/Makefile.am

@@ -141,6 +141,7 @@ noinst_PROGRAMS =				\
 	main/starpu_init			\
 	main/starpu_worker_exists		\
 	main/submit				\
+	main/codelet_null_callback		\
 	datawizard/allocate			\
 	datawizard/acquire_cb			\
 	datawizard/acquire_cb_insert		\
@@ -232,10 +233,6 @@ noinst_PROGRAMS +=				\
 	datawizard/reclaim
 endif
 
-noinst_nulldir=/tmp
-noinst_null_PROGRAMS =				\
-	$(LOADER)
-
 examplebin_PROGRAMS = \
 	microbenchs/tasks_size_overhead		\
 	microbenchs/local_pingpong
@@ -243,7 +240,11 @@ examplebin_SCRIPTS = \
 	microbenchs/tasks_size_overhead.gp \
 	microbenchs/tasks_size_overhead.sh
 
-check_PROGRAMS = $(noinst_PROGRAMS) $(noinst_2_PROGRAMS)
+if STARPU_HAVE_WINDOWS
+check_PROGRAMS	=	$(noinst_PROGRAMS)
+else
+check_PROGRAMS	=	$(LOADER) $(noinst_PROGRAMS)
+endif
 
 #######################
 # Source files        #

+ 46 - 0
tests/main/codelet_null_callback.c

@@ -0,0 +1,46 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include "../helper.h"
+
+void callback(void *ptr)
+{
+     int *x = (int *)ptr;
+     FPRINTF(stderr, "x=%d\n", *x);
+     STARPU_ASSERT(*x == 42);
+}
+
+int main(int argc, char **argv)
+{
+	int ret;
+	int x=42;
+
+	ret = starpu_initialize(NULL, &argc, &argv);
+	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	ret = starpu_insert_task(NULL,
+				 STARPU_CALLBACK_WITH_ARG, callback, &x,
+				 0);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_insert_task");
+
+	starpu_task_wait_for_all();
+	starpu_shutdown();
+
+	return EXIT_SUCCESS;
+}
+