Marc Sergent лет назад: 13
Родитель
Сommit
09ffa51154

+ 4 - 0
examples/cg/cg_kernels.c

@@ -98,6 +98,7 @@ struct starpu_codelet accumulate_variable_cl =
 #ifdef STARPU_USE_CUDA
 	.cuda_funcs = {accumulate_variable_cuda, NULL},
 #endif
+	.modes = {STARPU_RW, STARPU_R},
 	.nbuffers = 2,
 	.model = &accumulate_variable_model
 };
@@ -136,6 +137,7 @@ struct starpu_codelet accumulate_vector_cl =
 #ifdef STARPU_USE_CUDA
 	.cuda_funcs = {accumulate_vector_cuda, NULL},
 #endif
+	.modes = {STARPU_RW, STARPU_R},
 	.nbuffers = 2,
 	.model = &accumulate_vector_model
 };
@@ -176,6 +178,7 @@ struct starpu_codelet bzero_variable_cl =
 #ifdef STARPU_USE_CUDA
 	.cuda_funcs = {bzero_variable_cuda, NULL},
 #endif
+	.modes = {STARPU_W},
 	.nbuffers = 1,
 	.model = &bzero_variable_model
 };
@@ -213,6 +216,7 @@ struct starpu_codelet bzero_vector_cl =
 #ifdef STARPU_USE_CUDA
 	.cuda_funcs = {bzero_vector_cuda, NULL},
 #endif
+	.modes = {STARPU_W},
 	.nbuffers = 1,
 	.model = &bzero_vector_model
 };

+ 3 - 1
examples/pi/pi_redux.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2012  Université de Bordeaux 1
+ * Copyright (C) 2010-2013  Université de Bordeaux 1
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -234,6 +234,7 @@ static struct starpu_codelet init_codelet =
 #ifdef STARPU_HAVE_CURAND
         .cuda_funcs = {init_cuda_func, NULL},
 #endif
+	.modes = {STARPU_W},
         .nbuffers = 1
 };
 
@@ -271,6 +272,7 @@ static struct starpu_codelet redux_codelet =
 #ifdef STARPU_HAVE_CURAND
 	.cuda_funcs = {redux_cuda_func, NULL},
 #endif
+	.modes = {STARPU_RW, STARPU_R},
 	.nbuffers = 2
 };
 

+ 3 - 1
examples/reductions/dot_product.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2012  Université de Bordeaux 1
+ * Copyright (C) 2010-2013  Université de Bordeaux 1
  * Copyright (C) 2012 inria
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -114,6 +114,7 @@ static struct starpu_codelet init_codelet =
 #ifdef STARPU_USE_OPENCL
 	.opencl_funcs = {init_opencl_func, NULL},
 #endif
+	.modes = {STARPU_W},
 	.nbuffers = 1
 };
 
@@ -194,6 +195,7 @@ static struct starpu_codelet redux_codelet =
 #ifdef STARPU_USE_OPENCL
 	.opencl_funcs = {redux_opencl_func, NULL},
 #endif
+	.modes = {STARPU_RW, STARPU_R},
 	.nbuffers = 2
 };
 

+ 3 - 1
examples/reductions/minmax_reduction.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010  Université de Bordeaux 1
+ * Copyright (C) 2010, 2013  Université de Bordeaux 1
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -58,6 +58,7 @@ static void minmax_neutral_cpu_func(void *descr[], void *cl_arg)
 static struct starpu_codelet minmax_init_codelet =
 {
 	.cpu_funcs = {minmax_neutral_cpu_func, NULL},
+	.modes = {STARPU_W},
 	.nbuffers = 1
 };
 
@@ -84,6 +85,7 @@ void minmax_redux_cpu_func(void *descr[], void *cl_arg)
 static struct starpu_codelet minmax_redux_codelet =
 {
 	.cpu_funcs = {minmax_redux_cpu_func, NULL},
+	.modes = {STARPU_RW, STARPU_R},
 	.nbuffers = 2
 };
 

+ 2 - 1
mpi/src/Makefile.am

@@ -45,7 +45,8 @@ libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
 	starpu_mpi_datatype.c				\
 	starpu_mpi_insert_task.c			\
 	starpu_mpi_collective.c				\
-	starpu_mpi_stats.c
+	starpu_mpi_stats.c				\
+	starpu_mpi_private.c
 
 
 showcheck:

+ 59 - 47
mpi/src/starpu_mpi.c

@@ -18,7 +18,6 @@
 #include <stdlib.h>
 #include <starpu_mpi.h>
 #include <starpu_mpi_datatype.h>
-//#define STARPU_MPI_VERBOSE	1
 #include <starpu_mpi_private.h>
 #include <starpu_profiling.h>
 #include <starpu_mpi_stats.h>
@@ -27,7 +26,7 @@
 
 static void _starpu_mpi_submit_new_mpi_request(void *arg);
 static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req);
-#ifdef STARPU_MPI_VERBOSE
+#ifdef STARPU_VERBOSE
 static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type);
 #endif
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
@@ -73,7 +72,7 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
 
 	_STARPU_MPI_LOG_IN();
 	struct _starpu_mpi_req *req = calloc(1, sizeof(struct _starpu_mpi_req));
-	STARPU_ASSERT(req);
+	STARPU_ASSERT_MSG(req, "Invalid request");
 
 	_STARPU_MPI_INC_POSTED_REQUESTS(1);
 
@@ -116,16 +115,16 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 {
 	_STARPU_MPI_LOG_IN();
 
-	STARPU_ASSERT(req->ptr);
+	STARPU_ASSERT_MSG(req->ptr, "Pointer containing data to send is invalid");
 
-	_STARPU_MPI_DEBUG("post MPI isend request %p type %s tag %d src %d data %p ptr %p datatype %p count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, req->user_datatype);
+	_STARPU_MPI_DEBUG(2, "post MPI isend request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 
 	_starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, req->count);
 
 	TRACE_MPI_ISEND_SUBMIT_BEGIN(req->srcdst, req->mpi_tag, 0);
 
 	req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
-	STARPU_ASSERT(req->ret == MPI_SUCCESS);
+	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %d", req->ret);
 
 	TRACE_MPI_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, 0);
 
@@ -150,7 +149,8 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 	}
 	else
 	{
-		ssize_t psize;
+		ssize_t psize = -1;
+		int ret;
 
 		// Do not pack the data, just try to find out the size
 		starpu_handle_pack_data(req->data_handle, NULL, &psize);
@@ -158,8 +158,10 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 		if (psize != -1)
 		{
 			// We already know the size of the data, let's send it to overlap with the packing of the data
-			MPI_Isend(&psize, sizeof(psize), MPI_BYTE, req->srcdst, req->mpi_tag, req->comm, &req->size_req);
+			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), req->mpi_tag, req->srcdst);
 			req->count = psize;
+			ret = MPI_Isend(&req->count, sizeof(req->count), MPI_BYTE, req->srcdst, req->mpi_tag, req->comm, &req->size_req);
+			STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
 		}
 
 		// Pack the data
@@ -167,12 +169,14 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 		if (psize == -1)
 		{
 			// We know the size now, let's send it
-			MPI_Isend(&req->count, sizeof(req->count), MPI_BYTE, req->srcdst, req->mpi_tag, req->comm, &req->size_req);
+			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", req->count, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), req->mpi_tag, req->srcdst);
+			ret = MPI_Isend(&req->count, sizeof(req->count), MPI_BYTE, req->srcdst, req->mpi_tag, req->comm, &req->size_req);
+			STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
 		}
 		else
 		{
 			// We check the size returned with the 2 calls to pack is the same
-			STARPU_ASSERT(req->count == psize);
+			STARPU_ASSERT_MSG(req->count == psize, "Calls to pack_data returned different sizes %ld != %ld", req->count, psize);
 		}
 
 		// We can send the data now
@@ -190,12 +194,12 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t dat
 int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
 {
 	_STARPU_MPI_LOG_IN();
-	STARPU_ASSERT(public_req);
+	STARPU_ASSERT_MSG(public_req, "starpu_mpi_isend needs a valid starpu_mpi_req");
 
 	struct _starpu_mpi_req *req;
 	req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, NULL, NULL);
 
-	STARPU_ASSERT(req);
+	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
 	*public_req = req;
 
 	_STARPU_MPI_LOG_OUT();
@@ -237,14 +241,14 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 {
 	_STARPU_MPI_LOG_IN();
 
-	STARPU_ASSERT(req->ptr);
+	STARPU_ASSERT_MSG(req->ptr, "Invalid pointer to receive data");
 
-	_STARPU_MPI_DEBUG("post MPI irecv request %p type %s tag %d src %d data %p ptr %p datatype %p count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, req->user_datatype);
+	_STARPU_MPI_DEBUG(2, "post MPI irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 
 	TRACE_MPI_IRECV_SUBMIT_BEGIN(req->srcdst, req->mpi_tag);
 
 	req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
-	STARPU_ASSERT(req->ret == MPI_SUCCESS);
+	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %d", req->ret);
 
 	TRACE_MPI_IRECV_SUBMIT_END(req->srcdst, req->mpi_tag);
 
@@ -271,7 +275,7 @@ static void _starpu_mpi_irecv_size_callback(void *arg)
 
 	starpu_data_unregister(callback->handle);
 	callback->req->ptr = malloc(callback->req->count);
-	STARPU_ASSERT_MSG(callback->req->ptr, "cannot allocate message of size %ld\n", callback->req->count);
+	STARPU_ASSERT_MSG(callback->req->ptr, "cannot allocate message of size %ld", callback->req->count);
 	_starpu_mpi_irecv_data_func(callback->req);
 	free(callback);
 }
@@ -292,6 +296,7 @@ static void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
 		struct _starpu_mpi_irecv_size_callback *callback = malloc(sizeof(struct _starpu_mpi_irecv_size_callback));
 		callback->req = req;
 		starpu_variable_data_register(&callback->handle, 0, (uintptr_t)&(callback->req->count), sizeof(callback->req->count));
+		_STARPU_MPI_DEBUG(4, "Receiving size with tag %d from node %d\n", req->mpi_tag, req->srcdst);
 		_starpu_mpi_irecv_common(callback->handle, req->srcdst, req->mpi_tag, req->comm, 1, _starpu_mpi_irecv_size_callback, callback);
 	}
 
@@ -305,12 +310,12 @@ static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t dat
 int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
 {
 	_STARPU_MPI_LOG_IN();
-	STARPU_ASSERT(public_req);
+	STARPU_ASSERT_MSG(public_req, "starpu_mpi_irecv needs a valid starpu_mpi_req");
 
 	struct _starpu_mpi_req *req;
 	req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, NULL, NULL);
 
-	STARPU_ASSERT(req);
+	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_irecv_common");
 	*public_req = req;
 
 	_STARPU_MPI_LOG_OUT();
@@ -349,7 +354,7 @@ static void _starpu_mpi_probe_func(struct _starpu_mpi_req *req)
 	req->count = 1;
 	req->ptr = starpu_handle_get_local_ptr(req->data_handle);
 
-	_STARPU_MPI_DEBUG("MPI probe request %p type %s tag %d src %d data %p ptr %p datatype %p count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, req->user_datatype);
+	_STARPU_MPI_DEBUG(2, "MPI probe request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 
 	/* somebody is perhaps waiting for the MPI request to be posted */
 	_STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
@@ -385,7 +390,7 @@ static void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
 	TRACE_MPI_UWAIT_BEGIN(req->srcdst, req->mpi_tag);
 
 	req->ret = MPI_Wait(&req->request, waiting_req->status);
-	STARPU_ASSERT(req->ret == MPI_SUCCESS);
+	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Wait returning %d", req->ret);
 
 	TRACE_MPI_UWAIT_END(req->srcdst, req->mpi_tag);
 
@@ -398,7 +403,7 @@ int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
 	_STARPU_MPI_LOG_IN();
 	int ret;
 	struct _starpu_mpi_req *waiting_req = calloc(1, sizeof(struct _starpu_mpi_req));
-	STARPU_ASSERT(waiting_req);
+	STARPU_ASSERT_MSG(waiting_req, "Allocation failed");
 	struct _starpu_mpi_req *req = *public_req;
 
 	_STARPU_MPI_INC_POSTED_REQUESTS(1);
@@ -449,12 +454,13 @@ static void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
 	/* Which is the mpi request we are testing for ? */
 	struct _starpu_mpi_req *req = testing_req->other_request;
 
-	_STARPU_MPI_DEBUG("Test request %p type %s tag %d src %d data %p ptr %p datatype %p count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, req->user_datatype);
+	_STARPU_MPI_DEBUG(2, "Test request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
+			  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 
 	TRACE_MPI_UTESTING_BEGIN(req->srcdst, req->mpi_tag);
 
 	req->ret = MPI_Test(&req->request, testing_req->flag, testing_req->status);
-	STARPU_ASSERT(req->ret == MPI_SUCCESS);
+	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %d", req->ret);
 
 	TRACE_MPI_UTESTING_END(req->srcdst, req->mpi_tag);
 
@@ -476,11 +482,11 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 	_STARPU_MPI_LOG_IN();
 	int ret = 0;
 
-	STARPU_ASSERT(public_req);
+	STARPU_ASSERT_MSG(public_req, "starpu_mpi_test needs a valid starpu_mpi_req");
 
 	struct _starpu_mpi_req *req = *public_req;
 
-	STARPU_ASSERT(!req->detached);
+	STARPU_ASSERT_MSG(!req->detached, "MPI_Test cannot be called on a detached request");
 
 	_STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
 	unsigned submitted = req->submitted;
@@ -489,7 +495,7 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 	if (submitted)
 	{
 		struct _starpu_mpi_req *testing_req = calloc(1, sizeof(struct _starpu_mpi_req));
-		STARPU_ASSERT(testing_req);
+		STARPU_ASSERT_MSG(testing_req, "allocation failed");
 		//		memset(testing_req, 0, sizeof(struct _starpu_mpi_req));
 
 		/* Initialize the request structure */
@@ -544,7 +550,7 @@ static void _starpu_mpi_barrier_func(struct _starpu_mpi_req *barrier_req)
 	_STARPU_MPI_LOG_IN();
 
 	barrier_req->ret = MPI_Barrier(barrier_req->comm);
-	STARPU_ASSERT(barrier_req->ret == MPI_SUCCESS);
+	STARPU_ASSERT_MSG(barrier_req->ret == MPI_SUCCESS, "MPI_Barrier returning %d", barrier_req->ret);
 
 	_starpu_mpi_handle_request_termination(barrier_req);
 	_STARPU_MPI_LOG_OUT();
@@ -555,7 +561,7 @@ int starpu_mpi_barrier(MPI_Comm comm)
 	_STARPU_MPI_LOG_IN();
 	int ret;
 	struct _starpu_mpi_req *barrier_req = calloc(1, sizeof(struct _starpu_mpi_req));
-	STARPU_ASSERT(barrier_req);
+	STARPU_ASSERT_MSG(barrier_req, "allocation failed");
 
 	/* First wait for *both* all tasks and MPI requests to finish, in case
 	 * some tasks generate MPI requests, MPI requests generate tasks, etc.
@@ -610,7 +616,7 @@ int starpu_mpi_barrier(MPI_Comm comm)
 /*                                                      */
 /********************************************************/
 
-#ifdef STARPU_MPI_VERBOSE
+#ifdef STARPU_VERBOSE
 static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type)
 {
 	switch (request_type)
@@ -628,9 +634,12 @@ static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type
 
 static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 {
+	int ret;
+
 	_STARPU_MPI_LOG_IN();
 
-	_STARPU_MPI_DEBUG("complete MPI request %p type %s tag %d src %d data %p ptr %p datatype %p count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, req->user_datatype);
+	_STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
+			  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 	if (req->request_type == PROBE_REQ)
 	{
 #ifdef STARPU_DEVEL
@@ -639,7 +648,7 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 		MPI_Status status;
 		memset(&status, 0, sizeof(MPI_Status));
 		req->ret = MPI_Recv(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &status);
-		STARPU_ASSERT(req->ret == MPI_SUCCESS);
+		STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Recv returning %d", req->ret);
 	}
 
 	if (req->request_type == RECV_REQ || req->request_type == SEND_REQ || req->request_type == PROBE_REQ)
@@ -651,8 +660,9 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 				// We already know the request to send the size is completed, we just call MPI_Test to make sure that the request object is deallocated
 				MPI_Status status;
 				int flag;
-				MPI_Test(&req->size_req, &flag, &status);
-				STARPU_ASSERT(flag);
+				ret = MPI_Test(&req->size_req, &flag, &status);
+				STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Test returning %d", ret);
+				STARPU_ASSERT_MSG(flag, "MPI_Test returning flag %d", flag);
 			}
 			if (req->request_type == RECV_REQ)
 				// req->ptr is freed by starpu_handle_unpack_data
@@ -690,7 +700,8 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
 	_STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 	_starpu_mpi_req_list_push_front(new_requests, req);
 	newer_requests = 1;
-	_STARPU_MPI_DEBUG("Pushing new request %p type %s tag %d src %d data %p ptr %p datatype %p count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, req->user_datatype);
+	_STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
+			  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 	_STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 	_STARPU_MPI_LOG_OUT();
@@ -730,7 +741,7 @@ static void _starpu_mpi_test_detached_requests(void)
 
 		_STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
 
-		//_STARPU_MPI_DEBUG("Test detached request %p - mpitag %d - TYPE %s %d\n", &req->request, req->mpi_tag, _starpu_mpi_request_type(req->request_type), req->srcdst);
+		//_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %d - TYPE %s %d\n", &req->request, req->mpi_tag, _starpu_mpi_request_type(req->request_type), req->srcdst);
 		if (req->request_type == PROBE_REQ)
 		{
 			req->ret = MPI_Iprobe(req->srcdst, req->mpi_tag, req->comm, &flag, &status);
@@ -740,7 +751,7 @@ static void _starpu_mpi_test_detached_requests(void)
 			req->ret = MPI_Test(&req->request, &flag, &status);
 		}
 
-		STARPU_ASSERT(req->ret == MPI_SUCCESS);
+		STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Iprobe or MPI_Test returning %d", req->ret);
 
 		if (flag)
 		{
@@ -800,10 +811,11 @@ static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req)
 static void _starpu_mpi_handle_new_request(struct _starpu_mpi_req *req)
 {
 	_STARPU_MPI_LOG_IN();
-	STARPU_ASSERT(req);
+	STARPU_ASSERT_MSG(req, "Invalid request");
 
 	/* submit the request to MPI */
-	_STARPU_MPI_DEBUG("Handling new request %p type %s tag %d src %d data %p ptr %p datatype %p count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, req->user_datatype);
+	_STARPU_MPI_DEBUG(2, "Handling new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
+			  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 	req->func(req);
 
 	_STARPU_MPI_LOG_OUT();
@@ -888,7 +900,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 		if (block)
 		{
-			_STARPU_MPI_DEBUG("NO MORE REQUESTS TO HANDLE\n");
+			_STARPU_MPI_DEBUG(3, "NO MORE REQUESTS TO HANDLE\n");
 
 			TRACE_MPI_SLEEP_BEGIN();
 
@@ -921,13 +933,13 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		}
 	}
 
-	STARPU_ASSERT(_starpu_mpi_req_list_empty(detached_requests));
-	STARPU_ASSERT(_starpu_mpi_req_list_empty(new_requests));
-	STARPU_ASSERT(posted_requests == 0);
+	STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
+	STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(new_requests), "List of new requests not empty");
+	STARPU_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
 
 	if (argc_argv->initialize_mpi)
 	{
-		_STARPU_MPI_DEBUG("Calling MPI_Finalize()\n");
+		_STARPU_MPI_DEBUG(3, "Calling MPI_Finalize()\n");
 		MPI_Finalize();
 	}
 
@@ -958,7 +970,7 @@ static void _starpu_mpi_add_sync_point_in_fxt(void)
 	MPI_Comm_size(MPI_COMM_WORLD, &worldsize);
 
 	ret = MPI_Barrier(MPI_COMM_WORLD);
-	STARPU_ASSERT(ret == MPI_SUCCESS);
+	STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Barrier returning %d", ret);
 
 	/* We generate a "unique" key so that we can make sure that different
 	 * FxT traces come from the same MPI run. */
@@ -973,11 +985,11 @@ static void _starpu_mpi_add_sync_point_in_fxt(void)
 	}
 
 	ret = MPI_Bcast(&random_number, 1, MPI_INT, 0, MPI_COMM_WORLD);
-	STARPU_ASSERT(ret == MPI_SUCCESS);
+	STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Bcast returning %d", ret);
 
 	TRACE_MPI_BARRIER(rank, worldsize, random_number);
 
-	_STARPU_MPI_DEBUG("unique key %x\n", random_number);
+	_STARPU_MPI_DEBUG(3, "unique key %x\n", random_number);
 #endif
 }
 
@@ -1008,7 +1020,7 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi)
 
 #ifdef STARPU_MPI_ACTIVITY
 	hookid = starpu_progression_hook_register(progression_hook_func, NULL);
-	STARPU_ASSERT(hookid >= 0);
+	STARPU_ASSERT_MSG(hookid >= 0, "starpu_progression_hook_register failed");
 #endif /* STARPU_MPI_ACTIVITY */
 
 	_starpu_mpi_add_sync_point_in_fxt();

+ 4 - 4
mpi/src/starpu_mpi_collective.c

@@ -65,7 +65,7 @@ int starpu_mpi_scatter_detached(starpu_data_handle_t *data_handles, int count, i
 			{
 				int owner = starpu_data_get_rank(data_handles[x]);
 				int mpi_tag = starpu_data_get_tag(data_handles[x]);
-				STARPU_ASSERT(mpi_tag >= 0);
+				STARPU_ASSERT_MSG(mpi_tag >= 0, "Invalid tag for data handle");
 				if ((rank == root) && (owner != root))
 				{
 					callback_arg->count ++;
@@ -84,7 +84,7 @@ int starpu_mpi_scatter_detached(starpu_data_handle_t *data_handles, int count, i
 		{
 			int owner = starpu_data_get_rank(data_handles[x]);
 			int mpi_tag = starpu_data_get_tag(data_handles[x]);
-			STARPU_ASSERT(mpi_tag >= 0);
+			STARPU_ASSERT_MSG(mpi_tag >= 0, "Invalid tag for data handle");
 			if ((rank == root) && (owner != root))
 			{
 				//fprintf(stderr, "[%d] Sending data[%d] to %d\n", rank, x, owner);
@@ -127,7 +127,7 @@ int starpu_mpi_gather_detached(starpu_data_handle_t *data_handles, int count, in
 			{
 				int owner = starpu_data_get_rank(data_handles[x]);
 				int mpi_tag = starpu_data_get_tag(data_handles[x]);
-				STARPU_ASSERT(mpi_tag >= 0);
+				STARPU_ASSERT_MSG(mpi_tag >= 0, "Invalid tag for data handle");
 				if ((rank == root) && (owner != root))
 				{
 					callback_arg->count ++;
@@ -146,7 +146,7 @@ int starpu_mpi_gather_detached(starpu_data_handle_t *data_handles, int count, in
 		{
 			int owner = starpu_data_get_rank(data_handles[x]);
 			int mpi_tag = starpu_data_get_tag(data_handles[x]);
-			STARPU_ASSERT(mpi_tag >= 0);
+			STARPU_ASSERT_MSG(mpi_tag >= 0, "Invalid tag for data handle");
 			if ((rank == root) && (owner != root))
 			{
 				//fprintf(stderr, "[%d] Receiving data[%d] from %d\n", rank, x, owner);

+ 49 - 13
mpi/src/starpu_mpi_datatype.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2009-2011  Université de Bordeaux 1
- * Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -34,10 +34,10 @@ static void handle_to_datatype_matrix(starpu_data_handle_t data_handle, MPI_Data
 	size_t elemsize = starpu_matrix_get_elemsize(data_handle);
 
 	ret = MPI_Type_vector(ny, nx*elemsize, ld*elemsize, MPI_BYTE, datatype);
-	STARPU_ASSERT(ret == MPI_SUCCESS);
+	STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Type_vector failed");
 
 	ret = MPI_Type_commit(datatype);
-	STARPU_ASSERT(ret == MPI_SUCCESS);
+	STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Type_commit failed");
 }
 
 /*
@@ -57,16 +57,16 @@ static void handle_to_datatype_block(starpu_data_handle_t data_handle, MPI_Datat
 
 	MPI_Datatype datatype_2dlayer;
 	ret = MPI_Type_vector(ny, nx*elemsize, ldy*elemsize, MPI_BYTE, &datatype_2dlayer);
-	STARPU_ASSERT(ret == MPI_SUCCESS);
+	STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Type_vector failed");
 
 	ret = MPI_Type_commit(&datatype_2dlayer);
-	STARPU_ASSERT(ret == MPI_SUCCESS);
+	STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Type_commit failed");
 
 	ret = MPI_Type_hvector(nz, 1, ldz*elemsize, datatype_2dlayer, datatype);
-	STARPU_ASSERT(ret == MPI_SUCCESS);
+	STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Type_hvector failed");
 
 	ret = MPI_Type_commit(datatype);
-	STARPU_ASSERT(ret == MPI_SUCCESS);
+	STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Type_commit failed");
 }
 
 /*
@@ -81,10 +81,10 @@ static void handle_to_datatype_vector(starpu_data_handle_t data_handle, MPI_Data
 	size_t elemsize = starpu_vector_get_elemsize(data_handle);
 
 	ret = MPI_Type_contiguous(nx*elemsize, MPI_BYTE, datatype);
-	STARPU_ASSERT(ret == MPI_SUCCESS);
+	STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Type_contiguous failed");
 
 	ret = MPI_Type_commit(datatype);
-	STARPU_ASSERT(ret == MPI_SUCCESS);
+	STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Type_commit failed");
 }
 
 /*
@@ -98,10 +98,10 @@ static void handle_to_datatype_variable(starpu_data_handle_t data_handle, MPI_Da
 	size_t elemsize = starpu_variable_get_elemsize(data_handle);
 
 	ret = MPI_Type_contiguous(elemsize, MPI_BYTE, datatype);
-	STARPU_ASSERT(ret == MPI_SUCCESS);
+	STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Type_contiguous failed");
 
 	ret = MPI_Type_commit(datatype);
-	STARPU_ASSERT(ret == MPI_SUCCESS);
+	STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Type_commit failed");
 }
 
 /*
@@ -127,7 +127,7 @@ void _starpu_mpi_handle_allocate_datatype(starpu_data_handle_t data_handle, MPI_
 	if (id < STARPU_MAX_INTERFACE_ID)
 	{
 		handle_to_datatype_func func = handle_to_datatype_funcs[id];
-		STARPU_ASSERT(func);
+		STARPU_ASSERT_MSG(func, "Handle To Datatype Function not defined for StarPU data interface %d", id);
 		func(data_handle, datatype);
 		*user_datatype = 0;
 	}
@@ -188,8 +188,44 @@ void _starpu_mpi_handle_free_datatype(starpu_data_handle_t data_handle, MPI_Data
 	if (id < STARPU_MAX_INTERFACE_ID)
 	{
 		handle_free_datatype_func func = handle_free_datatype_funcs[id];
-		STARPU_ASSERT(func);
+		STARPU_ASSERT_MSG(func, "Handle free datatype function not defined for StarPU data interface %d", id);
 		func(datatype);
 	}
 	/* else the datatype is not predefined by StarPU */
 }
+
+char *_starpu_mpi_datatype(MPI_Datatype datatype)
+{
+     if (datatype == MPI_DATATYPE_NULL) return "MPI_DATATYPE_NULL";
+     if (datatype == MPI_CHAR) return "MPI_CHAR";
+     if (datatype == MPI_UNSIGNED_CHAR) return "MPI_UNSIGNED_CHAR";
+     if (datatype == MPI_BYTE) return "MPI_BYTE";
+     if (datatype == MPI_SHORT) return "MPI_SHORT";
+     if (datatype == MPI_UNSIGNED_SHORT) return "MPI_UNSIGNED_SHORT";
+     if (datatype == MPI_INT) return "MPI_INT";
+     if (datatype == MPI_UNSIGNED) return "MPI_UNSIGNED";
+     if (datatype == MPI_LONG) return "MPI_LONG";
+     if (datatype == MPI_UNSIGNED_LONG) return "MPI_UNSIGNED_LONG";
+     if (datatype == MPI_FLOAT) return "MPI_FLOAT";
+     if (datatype == MPI_DOUBLE) return "MPI_DOUBLE";
+     if (datatype == MPI_LONG_DOUBLE) return "MPI_LONG_DOUBLE";
+     if (datatype == MPI_LONG_LONG) return "MPI_LONG_LONG";
+     if (datatype == MPI_LONG_INT) return "MPI_LONG_INT";
+     if (datatype == MPI_SHORT_INT) return "MPI_SHORT_INT";
+     if (datatype == MPI_FLOAT_INT) return "MPI_FLOAT_INT";
+     if (datatype == MPI_DOUBLE_INT) return "MPI_DOUBLE_INT";
+     if (datatype == MPI_2INT) return "MPI_2INT";
+     if (datatype == MPI_2DOUBLE_PRECISION) return "MPI_2DOUBLE_PRECISION";
+     if (datatype == MPI_COMPLEX) return "MPI_COMPLEX";
+     if (datatype == MPI_DOUBLE_COMPLEX) return "MPI_DOUBLE_COMPLEX";
+     if (datatype == MPI_LOGICAL) return "MPI_LOGICAL";
+     if (datatype == MPI_REAL) return "MPI_REAL";
+     if (datatype == MPI_REAL4) return "MPI_REAL4";
+     if (datatype == MPI_REAL8) return "MPI_REAL8";
+     if (datatype == MPI_DOUBLE_PRECISION) return "MPI_DOUBLE_PRECISION";
+     if (datatype == MPI_INTEGER) return "MPI_INTEGER";
+     if (datatype == MPI_INTEGER4) return "MPI_INTEGER4";
+     if (datatype == MPI_INTEGER8) return "MPI_INTEGER8";
+     if (datatype == MPI_PACKED) return "MPI_PACKED";
+     return "User defined MPI Datatype";
+}

+ 2 - 1
mpi/src/starpu_mpi_datatype.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2009-2011  Université de Bordeaux 1
- * Copyright (C) 2010, 2012  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2012, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -26,6 +26,7 @@ extern "C" {
 
 void _starpu_mpi_handle_allocate_datatype(starpu_data_handle_t data_handle, MPI_Datatype *datatype, int *user_datatype);
 void _starpu_mpi_handle_free_datatype(starpu_data_handle_t data_handle, MPI_Datatype *datatype);
+char *_starpu_mpi_datatype(MPI_Datatype datatype);
 
 #ifdef __cplusplus
 }

+ 41 - 27
mpi/src/starpu_mpi_insert_task.c

@@ -25,7 +25,6 @@
 #include <util/starpu_insert_task_utils.h>
 #include <datawizard/coherency.h>
 
-//#define STARPU_MPI_VERBOSE 1
 #include <starpu_mpi_private.h>
 
 /* Whether we are allowed to keep copies of remote data. */
@@ -57,7 +56,7 @@ void _starpu_mpi_cache_init(MPI_Comm comm)
 	}
 
 	MPI_Comm_size(comm, &nb_nodes);
-	_STARPU_MPI_DEBUG("Initialising htable for cache\n");
+	_STARPU_MPI_DEBUG(2, "Initialising htable for cache\n");
 	_cache_sent_data = malloc(nb_nodes * sizeof(struct _starpu_data_entry *));
 	for(i=0 ; i<nb_nodes ; i++) _cache_sent_data[i] = NULL;
 	_cache_received_data = malloc(nb_nodes * sizeof(struct _starpu_data_entry *));
@@ -70,7 +69,7 @@ void _starpu_mpi_cache_empty_tables(int world_size)
 
 	if (_cache_enabled == 0) return;
 
-	_STARPU_MPI_DEBUG("Clearing htable for cache\n");
+	_STARPU_MPI_DEBUG(2, "Clearing htable for cache\n");
 
 	for(i=0 ; i<world_size ; i++)
 	{
@@ -120,14 +119,14 @@ void starpu_mpi_cache_flush(MPI_Comm comm, starpu_data_handle_t data_handle)
 		HASH_FIND_PTR(_cache_sent_data[i], &data_handle, avail);
 		if (avail)
 		{
-			_STARPU_MPI_DEBUG("Clearing send cache for data %p\n", data_handle);
+			_STARPU_MPI_DEBUG(2, "Clearing send cache for data %p\n", data_handle);
 			HASH_DEL(_cache_sent_data[i], avail);
 			free(avail);
 		}
 		HASH_FIND_PTR(_cache_received_data[i], &data_handle, avail);
 		if (avail)
 		{
-			_STARPU_MPI_DEBUG("Clearing send cache for data %p\n", data_handle);
+			_STARPU_MPI_DEBUG(2, "Clearing send cache for data %p\n", data_handle);
 			HASH_DEL(_cache_received_data[i], avail);
 			free(avail);
 		}
@@ -149,7 +148,7 @@ void *_starpu_mpi_already_received(starpu_data_handle_t data, int mpi_rank)
 	}
 	else
 	{
-		_STARPU_MPI_DEBUG("Do not receive data %p from node %d as it is already available\n", data, mpi_rank);
+		_STARPU_MPI_DEBUG(2, "Do not receive data %p from node %d as it is already available\n", data, mpi_rank);
 	}
 	return already_received;
 }
@@ -166,11 +165,11 @@ void *_starpu_mpi_already_sent(starpu_data_handle_t data, int dest)
 		struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
 		entry->data = data;
 		HASH_ADD_PTR(_cache_sent_data[dest], data, entry);
-		_STARPU_MPI_DEBUG("Noting that data %p has already been sent to %d\n", data, dest);
+		_STARPU_MPI_DEBUG(2, "Noting that data %p has already been sent to %d\n", data, dest);
 	}
 	else
 	{
-		_STARPU_MPI_DEBUG("Do not send data %p to node %d as it has already been sent\n", data, dest);
+		_STARPU_MPI_DEBUG(2, "Do not send data %p to node %d as it has already been sent\n", data, dest);
 	}
 	return already_sent;
 }
@@ -197,7 +196,7 @@ int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_access
 			/* Yes, the app could actually not call
 			 * insert_task at all itself, this is just a
 			 * safeguard. */
-			_STARPU_MPI_DEBUG("oh oh\n");
+			_STARPU_MPI_DEBUG(3, "oh oh\n");
 			_STARPU_MPI_LOG_OUT();
 			return -EINVAL;
 		}
@@ -258,7 +257,7 @@ void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum
 			void *already_received = _starpu_mpi_already_received(data, mpi_rank);
 			if (already_received == NULL)
 			{
-				_STARPU_MPI_DEBUG("Receive data %p from %d\n", data, mpi_rank);
+				_STARPU_MPI_DEBUG(1, "Receive data %p from %d\n", data, mpi_rank);
 				starpu_mpi_irecv_detached(data, mpi_rank, mpi_tag, comm, NULL, NULL);
 			}
 		}
@@ -268,7 +267,7 @@ void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum
 			void *already_sent = _starpu_mpi_already_sent(data, dest);
 			if (already_sent == NULL)
 			{
-				_STARPU_MPI_DEBUG("Send data %p to %d\n", data, dest);
+				_STARPU_MPI_DEBUG(1, "Send data %p to %d\n", data, dest);
 				starpu_mpi_isend_detached(data, dest, mpi_tag, comm, NULL, NULL);
 			}
 		}
@@ -296,13 +295,13 @@ void _starpu_mpi_exchange_data_after_execution(starpu_data_handle_t data, enum s
 		{
 			if (xrank != -1 && me != xrank)
 			{
-				_STARPU_MPI_DEBUG("Receive data %p back from the task %d which executed the codelet ...\n", data, dest);
+				_STARPU_MPI_DEBUG(1, "Receive data %p back from the task %d which executed the codelet ...\n", data, dest);
 				starpu_mpi_irecv_detached(data, dest, mpi_tag, comm, NULL, NULL);
 			}
 		}
 		else if (do_execute)
 		{
-			_STARPU_MPI_DEBUG("Send data %p back to its owner %d...\n", data, mpi_rank);
+			_STARPU_MPI_DEBUG(1, "Send data %p back to its owner %d...\n", data, mpi_rank);
 			starpu_mpi_isend_detached(data, mpi_rank, mpi_tag, comm, NULL, NULL);
 		}
 	}
@@ -325,7 +324,7 @@ void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum star
 					HASH_FIND_PTR(_cache_sent_data[n], &data, already_sent);
 					if (already_sent)
 					{
-						_STARPU_MPI_DEBUG("Clearing send cache for data %p\n", data);
+						_STARPU_MPI_DEBUG(2, "Clearing send cache for data %p\n", data);
 						HASH_DEL(_cache_sent_data[n], already_sent);
 						free(already_sent);
 					}
@@ -341,7 +340,7 @@ void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum star
 #ifdef STARPU_DEVEL
 #  warning TODO: Somebody else will write to the data, so discard our cached copy if any. starpu_mpi could just remember itself.
 #endif
-					_STARPU_MPI_DEBUG("Clearing receive cache for data %p\n", data);
+					_STARPU_MPI_DEBUG(2, "Clearing receive cache for data %p\n", data);
 					HASH_DEL(_cache_received_data[mpi_rank], already_received);
 					free(already_received);
 					starpu_data_invalidate_submit(data);
@@ -391,15 +390,15 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 		if (arg_type==STARPU_EXECUTE_ON_NODE)
 		{
 			xrank = va_arg(varg_list, int);
-			_STARPU_MPI_DEBUG("Executing on node %d\n", xrank);
+			_STARPU_MPI_DEBUG(1, "Executing on node %d\n", xrank);
 			do_execute = 1;
 		}
 		else if (arg_type==STARPU_EXECUTE_ON_DATA)
 		{
 			starpu_data_handle_t data = va_arg(varg_list, starpu_data_handle_t);
 			xrank = starpu_data_get_rank(data);
-			_STARPU_MPI_DEBUG("Executing on data node %d\n", xrank);
-			STARPU_ASSERT(xrank <= nb_nodes);
+			_STARPU_MPI_DEBUG(1, "Executing on data node %d\n", xrank);
+			STARPU_ASSERT_MSG(xrank <= nb_nodes, "Node %d to execute codelet is not a valid node (%d)", xrank, nb_nodes);
 			do_execute = 1;
 		}
 		else if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type==STARPU_SCRATCH || arg_type==STARPU_REDUX)
@@ -483,7 +482,7 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 		}
 		if (xrank != -1)
 		{
-			_STARPU_MPI_DEBUG("Node %d is having the most R data\n", xrank);
+			_STARPU_MPI_DEBUG(1, "Node %d is having the most R data\n", xrank);
 			do_execute = 1;
 		}
 	}
@@ -495,7 +494,7 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 	{
 		if (xrank == -1)
 		{
-			_STARPU_MPI_DEBUG("Different tasks are owning W data. Needs to specify which one is to execute the codelet, using STARPU_EXECUTE_ON_NODE or STARPU_EXECUTE_ON_DATA\n");
+			_STARPU_MPI_DEBUG(1, "Different tasks are owning W data. Needs to specify which one is to execute the codelet, using STARPU_EXECUTE_ON_NODE or STARPU_EXECUTE_ON_DATA\n");
 			return -EINVAL;
 		}
 		else
@@ -594,12 +593,11 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 			_starpu_codelet_pack_args(arg_buffer_size, &arg_buffer, varg_list);
 		}
 
-		_STARPU_MPI_DEBUG("Execution of the codelet %p (%s)\n", codelet, codelet->name);
+		_STARPU_MPI_DEBUG(1, "Execution of the codelet %p (%s)\n", codelet, codelet->name);
 		va_start(varg_list, codelet);
 		struct starpu_task *task = starpu_task_create();
 		int ret = _starpu_insert_task_create_and_submit(arg_buffer, arg_buffer_size, codelet, &task, varg_list);
-		_STARPU_MPI_DEBUG("ret: %d\n", ret);
-		STARPU_ASSERT(ret==0);
+		STARPU_ASSERT_MSG(ret==0, "_starpu_insert_task_create_and_submit failure %d", ret);
 	}
 
 	if (inconsistent_execute)
@@ -806,6 +804,8 @@ void starpu_mpi_get_data_on_node(MPI_Comm comm, starpu_data_handle_t data_handle
 	}
 }
 
+/* TODO: this should rather be implicitly called by starpu_mpi_insert_task when
+ * a data previously accessed in REDUX mode gets accessed in R mode. */
 void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle)
 {
 	int me, rank, tag, nb_nodes;
@@ -826,7 +826,7 @@ void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle)
 	MPI_Comm_rank(comm, &me);
 	MPI_Comm_size(comm, &nb_nodes);
 
-	_STARPU_MPI_DEBUG("Doing reduction for data %p on node %d with %d nodes ...\n", data_handle, rank, nb_nodes);
+	_STARPU_MPI_DEBUG(1, "Doing reduction for data %p on node %d with %d nodes ...\n", data_handle, rank, nb_nodes);
 
 	// need to count how many nodes have the data in redux mode
 	if (me == rank)
@@ -841,8 +841,22 @@ void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle)
 
 				starpu_data_register_same(&new_handle, data_handle);
 
-				_STARPU_MPI_DEBUG("Receiving redux handle from %d in %p ...\n", i, new_handle);
-
+				_STARPU_MPI_DEBUG(1, "Receiving redux handle from %d in %p ...\n", i, new_handle);
+
+				/* FIXME: we here allocate a lot of data: one
+				 * instance per MPI node and per number of
+				 * times we are called. We should rather do
+				 * that much later, e.g. after data_handle
+				 * finished its last read access, by submitting
+				 * an empty task A reading data_handle whose
+				 * callback submits the mpi comm, whose
+				 * callback submits the redux_cl task B with
+				 * sequential consistency set to 0, and submit
+				 * an empty task C writing data_handle and
+				 * depending on task B, just to replug with
+				 * implicit data dependencies with tasks
+				 * inserted after this reduction.
+				 */
 				starpu_mpi_irecv_detached(new_handle, i, tag, comm, NULL, NULL);
 				starpu_insert_task(data_handle->redux_cl,
 						   STARPU_RW, data_handle,
@@ -854,7 +868,7 @@ void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle)
 	}
 	else
 	{
-		_STARPU_MPI_DEBUG("Sending redux handle to %d ...\n", rank);
+		_STARPU_MPI_DEBUG(1, "Sending redux handle to %d ...\n", rank);
 		starpu_mpi_isend_detached(data_handle, rank, tag, comm, NULL, NULL);
 		starpu_insert_task(data_handle->init_cl, STARPU_W, data_handle, 0);
 	}

+ 25 - 0
mpi/src/starpu_mpi_private.c

@@ -0,0 +1,25 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010, 2012  Université de Bordeaux 1
+ * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+int _debug_rank=-1;
+int _debug_level=0;
+
+void _starpu_mpi_set_debug_level(int level)
+{
+	_debug_level = level;
+}
+

+ 17 - 11
mpi/src/starpu_mpi_private.h

@@ -30,19 +30,25 @@
 extern "C" {
 #endif
 
-//#define STARPU_MPI_VERBOSE	1
-
-#ifdef STARPU_MPI_VERBOSE
-static int _debug_rank=-1;
+#ifdef STARPU_VERBOSE
+extern int _debug_rank;
+extern int _debug_level;
+void _starpu_mpi_set_debug_level(int level);
 #endif
 
-#ifdef STARPU_MPI_VERBOSE
-#  define _STARPU_MPI_DEBUG(fmt, args ...) do { if (!getenv("STARPU_SILENT")) { \
-	                                        if (_debug_rank == -1) MPI_Comm_rank(MPI_COMM_WORLD, &_debug_rank); \
-                                                fprintf(stderr, "%*s[%d][starpu_mpi][%s] " fmt , (_debug_rank+1)*4, "", _debug_rank, __func__ ,##args); \
-                                                fflush(stderr); }} while(0);
+#ifdef STARPU_VERBOSE
+#  define _STARPU_MPI_DEBUG(level, fmt, args ...) \
+	do \
+	{								\
+		if (!getenv("STARPU_SILENT") && level <= _debug_level)	\
+		{							\
+			if (_debug_rank == -1) MPI_Comm_rank(MPI_COMM_WORLD, &_debug_rank); \
+			fprintf(stderr, "%*s[%d][starpu_mpi][%s] " fmt , (_debug_rank+1)*4, "", _debug_rank, __func__ ,##args); \
+			fflush(stderr); \
+		}			\
+	} while(0);
 #else
-#  define _STARPU_MPI_DEBUG(fmt, args ...)
+#  define _STARPU_MPI_DEBUG(level, fmt, args ...)
 #endif
 
 #define _STARPU_MPI_DISP(fmt, args ...) do { if (!getenv("STARPU_SILENT")) { \
@@ -50,7 +56,7 @@ static int _debug_rank=-1;
                                              fprintf(stderr, "%*s[%d][starpu_mpi][%s] " fmt , (_debug_rank+1)*4, "", _debug_rank, __func__ ,##args); \
                                              fflush(stderr); }} while(0);
 
-#ifdef STARPU_MPI_VERBOSE0
+#ifdef STARPU_VERBOSE0
 #  define _STARPU_MPI_LOG_IN()             do { if (!getenv("STARPU_SILENT")) { \
                                                if (_debug_rank == -1) MPI_Comm_rank(MPI_COMM_WORLD, &_debug_rank);                        \
                                                fprintf(stderr, "%*s[%d][starpu_mpi][%s] -->\n", (_debug_rank+1)*4, "", _debug_rank, __func__ ); \

+ 2 - 3
mpi/src/starpu_mpi_stats.c

@@ -17,7 +17,6 @@
 #include <starpu_mpi_stats.h>
 #include <common/config.h>
 #include <stdio.h>
-//#define STARPU_MPI_VERBOSE	1
 #include <starpu_mpi_private.h>
 
 /* measure the amount of data transfers between each pair of MPI nodes */
@@ -38,7 +37,7 @@ void _starpu_mpi_comm_amounts_init(MPI_Comm comm)
 	if (!getenv("STARPU_SILENT")) fprintf(stderr,"Warning: StarPU is executed with STARPU_COMM_STATS=1, which slows down a bit\n");
 
 	MPI_Comm_size(comm, &world_size);
-	_STARPU_MPI_DEBUG("allocating for %d nodes\n", world_size);
+	_STARPU_MPI_DEBUG(1, "allocating for %d nodes\n", world_size);
 
 	comm_amount = (size_t *) calloc(world_size, sizeof(size_t));
 }
@@ -58,7 +57,7 @@ void _starpu_mpi_comm_amounts_inc(MPI_Comm comm, unsigned dst, MPI_Datatype data
 	MPI_Comm_rank(comm, &src);
 	MPI_Type_size(datatype, &size);
 
-	_STARPU_MPI_DEBUG("[%d] adding %d to %d\n", src, count*size, dst);
+	_STARPU_MPI_DEBUG(1, "[%d] adding %d to %d\n", src, count*size, dst);
 
 	comm_amount[dst] += count*size;
 }

+ 3 - 0
mpi/tests/Makefile.am

@@ -49,6 +49,9 @@ BUILT_SOURCES =
 
 CLEANFILES = *.gcno *.gcda *.linkinfo
 
+EXTRA_DIST = 					\
+	user_defined_datatype_value.h
+
 examplebindir = $(libdir)/starpu/examples/mpi
 
 examplebin_PROGRAMS =

+ 4 - 1
mpi/tests/mpi_reduction.c

@@ -1,6 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012  Centre National de la Recherche Scientifique
+ * Copyright (C) 2013  Université de Bordeaux 1
+ * Copyright (C) 2012, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -26,12 +27,14 @@ static struct starpu_codelet init_codelet =
 {
 	.cpu_funcs = {init_cpu_func, NULL},
 	.nbuffers = 1,
+	.modes = {STARPU_W},
 	.name = "init_codelet"
 };
 
 static struct starpu_codelet redux_codelet =
 {
 	.cpu_funcs = {redux_cpu_func, NULL},
+	.modes = {STARPU_RW, STARPU_R},
 	.nbuffers = 2,
 	.name = "redux_codelet"
 };

+ 25 - 10
mpi/tests/user_defined_datatype.c

@@ -17,6 +17,7 @@
 #include <starpu_mpi.h>
 #include <interface/complex_interface.h>
 #include <interface/complex_codelet.h>
+#include <user_defined_datatype_value.h>
 
 #ifdef STARPU_QUICK_CHECK
 #  define ELEMENTS 10
@@ -86,15 +87,18 @@ int main(int argc, char **argv)
 			int i;
 
 			starpu_data_handle_t handle_complex[ELEMENTS];
+			starpu_data_handle_t handle_values[ELEMENTS];
 			starpu_data_handle_t handle_vars[ELEMENTS];
 
 			double real[ELEMENTS][2];
 			double imaginary[ELEMENTS][2];
-			double foo[ELEMENTS];
+			float foo[ELEMENTS];
+			int values[ELEMENTS];
 
-			double foo_compare=42;
 			double real_compare[2] = {12.0, 45.0};
 			double imaginary_compare[2] = {7.0, 42.0};
+			float foo_compare=42.0;
+			int value_compare=36;
 
 			fprintf(stderr, "\nTesting with function %p\n", f);
 
@@ -102,36 +106,41 @@ int main(int argc, char **argv)
 			{
 				for(i=0 ; i<ELEMENTS; i++)
 				{
-					foo[i] = 8;
+					foo[i] = 8.0;
 					real[i][0] = 0.0;
 					real[i][1] = 0.0;
 					imaginary[i][0] = 0.0;
 					imaginary[i][1] = 0.0;
+					values[i] = 7;
 				}
 			}
 			if (rank == 1)
 			{
 				for(i=0 ; i<ELEMENTS; i++)
 				{
-					foo[i] = 42;
-					real[i][0] = 12.0;
-					real[i][1] = 45.0;
-					imaginary[i][0] = 7.0;
-					imaginary[i][1] = 42.0;
+					foo[i] = foo_compare;
+					real[i][0] = real_compare[0];
+					real[i][1] = real_compare[1];
+					imaginary[i][0] = imaginary_compare[0];
+					imaginary[i][1] = imaginary_compare[1];
+					values[i] = value_compare;
 				}
 			}
 			for(i=0 ; i<ELEMENTS ; i++)
 			{
 				starpu_complex_data_register(&handle_complex[i], 0, real[i], imaginary[i], 2);
-				starpu_variable_data_register(&handle_vars[i], 0, (uintptr_t)&foo[i], sizeof(double));
+				starpu_value_data_register(&handle_values[i], 0, &values[i]);
+				starpu_variable_data_register(&handle_vars[i], 0, (uintptr_t)&foo[i], sizeof(float));
 			}
 
 			f(handle_vars, ELEMENTS, rank, ELEMENTS);
-			f(handle_complex, ELEMENTS, rank, 4*ELEMENTS);
+			f(handle_complex, ELEMENTS, rank, 2*ELEMENTS);
+			f(handle_values, ELEMENTS, rank, 4*ELEMENTS);
 
 			for(i=0 ; i<ELEMENTS ; i++)
 			{
 				starpu_data_unregister(handle_complex[i]);
+				starpu_data_unregister(handle_values[i]);
 				starpu_data_unregister(handle_vars[i]);
 			}
 			starpu_task_wait_for_all();
@@ -147,6 +156,12 @@ int main(int argc, char **argv)
 						fprintf(stderr, "ERROR. foo[%d] == %f != %f\n", i, foo[i], foo_compare);
 						goto end;
 					}
+					compare = (values[i] == value_compare);
+					if (compare == 0)
+					{
+						fprintf(stderr, "ERROR. value[%d] == %d != %d\n", i, values[i], value_compare);
+						goto end;
+					}
 					for(j=0 ; j<2 ; j++)
 					{
 						compare = (real[i][j] == real_compare[j]);

+ 170 - 0
mpi/tests/user_defined_datatype_value.h

@@ -0,0 +1,170 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef _USER_DEFINED_DATATYPE_VALUE_H
+#define _USER_DEFINED_DATATYPE_VALUE_H
+
+struct starpu_value_interface
+{
+	int *value;
+};
+#define STARPU_VALUE_GET(interface)	(((struct starpu_value_interface *)(interface))->value)
+
+int *starpu_value_get(starpu_data_handle_t handle)
+{
+	struct starpu_value_interface *value_interface =
+		(struct starpu_value_interface *) starpu_data_get_interface_on_node(handle, 0);
+	return value_interface->value;
+}
+
+static void value_register_data_handle(starpu_data_handle_t handle, unsigned home_node, void *data_interface)
+{
+	struct starpu_value_interface *value_interface = (struct starpu_value_interface *) data_interface;
+
+	unsigned node;
+	for (node = 0; node < STARPU_MAXNODES; node++)
+	{
+		struct starpu_value_interface *local_interface = (struct starpu_value_interface *)
+			starpu_data_get_interface_on_node(handle, node);
+
+		local_interface->value = value_interface->value;
+	}
+}
+
+static starpu_ssize_t value_allocate_data_on_node(void *data_interface, unsigned node)
+{
+	struct starpu_value_interface *value_interface = (struct starpu_value_interface *) data_interface;
+	int *addr = 0;
+
+	addr = (int *) starpu_malloc_on_node(node, sizeof(int));
+	if (!addr)
+		return -ENOMEM;
+
+	/* update the data properly in consequence */
+	value_interface->value = addr;
+
+	return sizeof(int);
+}
+
+static void value_free_data_on_node(void *data_interface, unsigned node)
+{
+	struct starpu_value_interface *value_interface = (struct starpu_value_interface *) data_interface;
+
+	starpu_free_on_node(node, (uintptr_t) value_interface->value, sizeof(int));
+}
+
+static size_t value_get_size(starpu_data_handle_t handle)
+{
+	return sizeof(int);
+}
+
+static uint32_t value_footprint(starpu_data_handle_t handle)
+{
+	int *x = starpu_value_get(handle);
+	return starpu_crc32_be(*x, 0);
+}
+
+static void *value_handle_to_pointer(starpu_data_handle_t handle, unsigned node)
+{
+	STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
+
+	struct starpu_value_interface *value_interface = (struct starpu_value_interface *)
+		starpu_data_get_interface_on_node(handle, node);
+
+	return (void*) value_interface->value;
+}
+
+static int value_pack_data(starpu_data_handle_t handle, unsigned node, void **ptr, ssize_t *count)
+{
+	STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
+
+	struct starpu_value_interface *value_interface = (struct starpu_value_interface *)
+		starpu_data_get_interface_on_node(handle, node);
+
+	*count = sizeof(int);
+	if (ptr != NULL)
+	{
+		*ptr = malloc(*count);
+		memcpy(*ptr, value_interface->value, sizeof(int));
+	}
+
+	return 0;
+}
+
+static int value_unpack_data(starpu_data_handle_t handle, unsigned node, void *ptr, size_t count)
+{
+	STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
+
+	struct starpu_value_interface *value_interface = (struct starpu_value_interface *)
+		starpu_data_get_interface_on_node(handle, node);
+
+	value_interface->value[0] = ((int *)ptr)[0];
+
+	assert(value_interface->value[0] == 36);
+
+	return 0;
+}
+
+static int copy_any_to_any(void *src_interface, unsigned src_node,
+			   void *dst_interface, unsigned dst_node,
+			   void *async_data)
+{
+	struct starpu_value_interface *src_value = src_interface;
+	struct starpu_value_interface *dst_value = dst_interface;
+	int ret = 0;
+
+	return starpu_interface_copy((uintptr_t) src_value->value, 0, src_node,
+				     (uintptr_t) dst_value->value, 0, dst_node,
+				     sizeof(int),
+				     async_data);
+}
+
+static const struct starpu_data_copy_methods value_copy_methods =
+{
+	.any_to_any = copy_any_to_any
+};
+
+static struct starpu_data_interface_ops interface_value_ops =
+{
+	.register_data_handle = value_register_data_handle,
+	.allocate_data_on_node = value_allocate_data_on_node,
+	.free_data_on_node = value_free_data_on_node,
+	.copy_methods = &value_copy_methods,
+	.get_size = value_get_size,
+	.footprint = value_footprint,
+	.interfaceid = STARPU_UNKNOWN_INTERFACE_ID,
+	.interface_size = sizeof(struct starpu_value_interface),
+	.handle_to_pointer = value_handle_to_pointer,
+	.pack_data = value_pack_data,
+	.unpack_data = value_unpack_data
+};
+
+void starpu_value_data_register(starpu_data_handle_t *handleptr, unsigned home_node, int *value)
+{
+	struct starpu_value_interface value_int =
+	{
+		.value = value
+	};
+
+	if (interface_value_ops.interfaceid == STARPU_UNKNOWN_INTERFACE_ID)
+	{
+		interface_value_ops.interfaceid = starpu_data_interface_get_next_id();
+	}
+
+	starpu_data_register(handleptr, home_node, &value_int, &interface_value_ops);
+}
+
+#endif /* _USER_DEFINED_DATATYPE_VALUE_H */

+ 10 - 0
sched_ctx_hypervisor/src/hypervisor_policies/ispeed_lp_policy.c

@@ -49,6 +49,16 @@ static unsigned _compute_flops_distribution_over_ctxs(int ns, int nw, double w_i
 			{
 				enum starpu_archtype arch = starpu_worker_get_type(worker);
 				velocity[s][w] = sched_ctx_hypervisor_get_velocity(sc_w, arch);
+				if(arch == STARPU_CUDA_WORKER)
+				{
+					unsigned worker_in_ctx = starpu_sched_ctx_contains_worker(worker, sc_w->sched_ctx);
+					if(!worker_in_ctx)
+					{
+						double transfer_velocity = starpu_get_bandwidth_RAM_CUDA(worker) / 1000;
+						velocity[s][w] = (velocity[s][w] * transfer_velocity) / (velocity[s][w] + transfer_velocity);
+					}
+				}
+
 			}
 			
 //			printf("v[w%d][s%d] = %lf\n",w, s, velocity[s][w]);

+ 8 - 2
sched_ctx_hypervisor/src/hypervisor_policies/policy_tools.c

@@ -454,12 +454,18 @@ double _get_velocity_per_worker(struct sched_ctx_hypervisor_wrapper *sc_w, unsig
         if( elapsed_flops != 0.0)
         {
                 double curr_time = starpu_timing_now();
+		size_t elapsed_data_used = sc_w->elapsed_data[worker];
                 double elapsed_time = (curr_time - sc_w->start_time) / 1000000.0; /* in seconds */
  		enum starpu_archtype arch = starpu_worker_get_type(worker);
 		if(arch == STARPU_CUDA_WORKER)
 		{
-			double transfer_velocity = starpu_get_bandwidth_RAM_CUDA(worker);
-			elapsed_time +=  (elapsed_data_used / transfer_velocity) / 1000000 ;
+/* 			unsigned worker_in_ctx = starpu_sched_ctx_contains_worker(worker, sc_w->sched_ctx); */
+/* 			if(!worker_in_ctx) */
+/* 			{ */
+
+/* 				double transfer_velocity = starpu_get_bandwidth_RAM_CUDA(worker); */
+/* 				elapsed_time +=  (elapsed_data_used / transfer_velocity) / 1000000 ; */
+/* 			} */
 			double latency = starpu_get_latency_RAM_CUDA(worker);
 //			printf("%d/%d: latency %lf elapsed_time before %lf ntasks %d\n", worker, sc_w->sched_ctx, latency, elapsed_time, elapsed_tasks);
 			elapsed_time += (elapsed_tasks * latency)/1000000;

+ 3 - 1
socl/src/gc.c

@@ -108,10 +108,12 @@ void gc_stop(void) {
 
 int gc_entity_release_ex(entity e, const char * DEBUG_PARAM(caller)) {
 
+  DEBUG_MSG("[%s] Decrementing refcount of %s %p to ", caller, e->name, e);
+
   /* Decrement reference count */
   int refs = __sync_sub_and_fetch(&e->refs, 1);
 
-  DEBUG_MSG("[%s] Decrementing refcount of %s %p to %d\n", caller, e->name, e, refs);
+  DEBUG_MSG_NOHEAD("%d\n", refs);
 
   assert(refs >= 0);
 

+ 17 - 12
src/datawizard/reduction.c

@@ -217,12 +217,16 @@ void _starpu_data_end_reduction_mode(starpu_data_handle_t handle)
 
 					redux_task->cl = handle->redux_cl;
 					STARPU_ASSERT(redux_task->cl);
+					if (!redux_task->cl->modes[0])
+						redux_task->cl->modes[0] = STARPU_RW;
+					if (!redux_task->cl->modes[1])
+						redux_task->cl->modes[1] = STARPU_R;
 
-					redux_task->handles[0] = replicate_array[i];
-					redux_task->cl->modes[0] = STARPU_RW;
+					STARPU_ASSERT_MSG(redux_task->cl->modes[0] == STARPU_RW, "First parameter of reduction codelet has to be RW");
+					STARPU_ASSERT_MSG(redux_task->cl->modes[1] == STARPU_R, "Second parameter of reduction codelet has to be R");
 
+					redux_task->handles[0] = replicate_array[i];
 					redux_task->handles[1] = replicate_array[i+step];
-					redux_task->cl->modes[1] = STARPU_R;
 
 					int ndeps = 0;
 					struct starpu_task *task_deps[2];
@@ -274,10 +278,9 @@ void _starpu_data_end_reduction_mode(starpu_data_handle_t handle)
 
 			redux_task->cl = handle->init_cl;
 			STARPU_ASSERT(redux_task->cl);
-#ifdef STARPU_DEVEL
-#  warning the mode should already be set in the codelet. Only check they are valid?
-#endif
-			redux_task->cl->modes[0] = STARPU_W;
+			if (!redux_task->cl->modes[0])
+				redux_task->cl->modes[0] = STARPU_W;
+			STARPU_ASSERT_MSG(redux_task->cl->modes[0] == STARPU_W, "Parameter of initialization codelet has to be W");
 			redux_task->handles[0] = handle;
 
 			int ret = _starpu_task_submit_internally(redux_task);
@@ -300,11 +303,13 @@ void _starpu_data_end_reduction_mode(starpu_data_handle_t handle)
 			redux_task->cl = handle->redux_cl;
 			STARPU_ASSERT(redux_task->cl);
 
-#ifdef STARPU_DEVEL
-#  warning the modes should already be set in the codelet. Only check they are valid?
-#endif
-			redux_task->cl->modes[0] = STARPU_RW;
-			redux_task->cl->modes[1] = STARPU_R;
+			if (!redux_task->cl->modes[0])
+				redux_task->cl->modes[0] = STARPU_RW;
+			if (!redux_task->cl->modes[1])
+				redux_task->cl->modes[1] = STARPU_R;
+
+			STARPU_ASSERT_MSG(redux_task->cl->modes[0] == STARPU_RW, "First parameter of reduction codelet has to be RW");
+			STARPU_ASSERT_MSG(redux_task->cl->modes[1] == STARPU_R, "Second parameter of reduction codelet has to be R");
 
 			redux_task->handles[0] = handle;
 			redux_task->handles[1] = replicate_array[replicate];

+ 37 - 4
src/sched_policies/eager_central_policy.c

@@ -82,6 +82,27 @@ static int push_task_eager_policy(struct starpu_task *task)
 	_starpu_push_task_end(task);
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
+
+	/*if there are no tasks block */
+	/* wake people waiting for a task */
+	unsigned worker = 0;
+	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
+	
+	struct starpu_sched_ctx_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
+	
+	while(workers->has_next(workers, &it))
+	{
+		worker = workers->get_next(workers, &it);
+		_starpu_pthread_mutex_t *sched_mutex;
+		_starpu_pthread_cond_t *sched_cond;
+		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+		_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+		_STARPU_PTHREAD_COND_SIGNAL(sched_cond);
+		_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+	}
+
 		
 	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 	return ret_val;
@@ -108,12 +129,24 @@ static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
 	struct starpu_task *task = NULL;
-	if(!_starpu_fifo_empty(data->fifo))
+
+	/* Tell helgrind that it's fine to check for empty fifo without actual
+	 * mutex (it's just a pointer) */
+	VALGRIND_HG_MUTEX_LOCK_PRE(&data->policy_mutex, 0);
+	VALGRIND_HG_MUTEX_LOCK_POST(&data->policy_mutex);
+	/* block until some event happens */
+	if (_starpu_fifo_empty(data->fifo))
 	{
-		_STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
-		 task = _starpu_fifo_pop_task(data->fifo, workerid);
-		_STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+		VALGRIND_HG_MUTEX_UNLOCK_PRE(&data->policy_mutex);
+		VALGRIND_HG_MUTEX_UNLOCK_POST(&data->policy_mutex);
+		return NULL;
 	}
+	VALGRIND_HG_MUTEX_UNLOCK_PRE(&data->policy_mutex);
+	VALGRIND_HG_MUTEX_UNLOCK_POST(&data->policy_mutex);
+
+	_STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
+	 task = _starpu_fifo_pop_task(data->fifo, workerid);
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 		
 	return task;
 }

+ 10 - 0
src/sched_policies/eager_central_priority_policy.c

@@ -166,9 +166,19 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 
 	struct _starpu_priority_taskq *taskq = data->taskq;
 
+	/* Tell helgrind that it's fine to check for empty fifo without actual
+	 * mutex (it's just a pointer) */
+	VALGRIND_HG_MUTEX_LOCK_PRE(&data->policy_mutex, 0);
+	VALGRIND_HG_MUTEX_LOCK_POST(&data->policy_mutex);
 	/* block until some event happens */
 	if (taskq->total_ntasks == 0)
+	{
+		VALGRIND_HG_MUTEX_UNLOCK_PRE(&data->policy_mutex);
+		VALGRIND_HG_MUTEX_UNLOCK_POST(&data->policy_mutex);
 		return NULL;
+	}
+	VALGRIND_HG_MUTEX_UNLOCK_PRE(&data->policy_mutex);
+	VALGRIND_HG_MUTEX_UNLOCK_POST(&data->policy_mutex);
 
 	/* release this mutex before trying to wake up other workers */
 	_starpu_pthread_mutex_t *curr_sched_mutex;

+ 3 - 2
src/util/starpu_insert_task_utils.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2011, 2012  Centre National de la Recherche Scientifique
+ * Copyright (C) 2011, 2012, 2013  Centre National de la Recherche Scientifique
  * Copyright (C) 2011  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -234,7 +234,8 @@ int _starpu_insert_task_create_and_submit(char *arg_buffer, size_t arg_buffer_si
 			(*task)->handles[current_buffer] = handle;
 			if (cl->modes[current_buffer])
 			{
-				STARPU_ASSERT(cl->modes[current_buffer] == mode);
+				STARPU_ASSERT_MSG(cl->modes[current_buffer] == mode, "The codelet <%s> defines the access mode %d for the buffer %d which is different from the mode %d given to starpu_insert_task\n",
+						  cl->name, cl->modes[current_buffer], current_buffer, mode);
 			}
 			else
 			{

+ 3 - 1
tests/datawizard/increment_redux.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010, 2012  Université de Bordeaux 1
+ * Copyright (C) 2010, 2012-2013  Université de Bordeaux 1
  * Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -126,6 +126,7 @@ static struct starpu_codelet redux_cl =
 	.opencl_funcs = {redux_opencl_kernel, NULL},
 #endif
 	.cpu_funcs = {redux_cpu_kernel, NULL},
+	.modes = {STARPU_RW, STARPU_R},
 	.nbuffers = 2
 };
 
@@ -138,6 +139,7 @@ static struct starpu_codelet neutral_cl =
 	.opencl_funcs = {neutral_opencl_kernel, NULL},
 #endif
 	.cpu_funcs = {neutral_cpu_kernel, NULL},
+	.modes = {STARPU_W},
 	.nbuffers = 1
 };
 

+ 3 - 1
tests/datawizard/increment_redux_lazy.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010, 2012  Université de Bordeaux 1
+ * Copyright (C) 2010, 2012-2013  Université de Bordeaux 1
  * Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -112,6 +112,7 @@ static struct starpu_codelet redux_cl =
 	.opencl_funcs = {redux_opencl_kernel, NULL},
 #endif
 	.cpu_funcs = {redux_cpu_kernel, NULL},
+	.modes = {STARPU_RW, STARPU_R},
 	.nbuffers = 2
 };
 
@@ -124,6 +125,7 @@ static struct starpu_codelet neutral_cl =
 	.opencl_funcs = {neutral_opencl_kernel, NULL},
 #endif
 	.cpu_funcs = {neutral_cpu_kernel, NULL},
+	.modes = {STARPU_W},
 	.nbuffers = 1
 };
 

+ 3 - 1
tests/datawizard/increment_redux_v2.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2011-2012  Université de Bordeaux 1
+ * Copyright (C) 2011-2013  Université de Bordeaux 1
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -125,6 +125,7 @@ static struct starpu_codelet redux_cl =
 	.opencl_funcs = {redux_opencl_kernel, NULL},
 #endif
 	.cpu_funcs = {redux_cpu_kernel, NULL},
+	.modes = {STARPU_RW, STARPU_R},
 	.nbuffers = 2
 };
 
@@ -137,6 +138,7 @@ static struct starpu_codelet neutral_cl =
 	.opencl_funcs = {neutral_opencl_kernel, NULL},
 #endif
 	.cpu_funcs = {neutral_cpu_kernel, NULL},
+	.modes = {STARPU_W},
 	.nbuffers = 1
 };
 

+ 0 - 26
tools/valgrind/starpu.suppr

@@ -70,29 +70,3 @@
    fun:_starpu_load_bus_performance_files
    ...
 }
-
-{
-   This is racy, but keep it away for now, otherwise it clutters the buildbot log
-   Helgrind:Race
-   fun:_starpu_fifo_empty
-   fun:pop_task_eager_policy
-   ...
-}
-
-{
-   This is the counterpart of the suppression above
-   Helgrind:Race
-   fun:_starpu_fifo_push_task
-   fun:push_task_eager_policy
-   ...
-}
-
-
-{
-   This is the counterpart of the suppression above
-   Helgrind:Race
-   fun:_starpu_fifo_push_sorted_task
-   fun:_starpu_fifo_push_task
-   fun:push_task_eager_policy
-   ...
-}