Samuel Thibault преди 10 години
родител
ревизия
3434b52b4c

+ 1 - 0
AUTHORS

@@ -2,6 +2,7 @@ Simon Archipoff <simon.archipoff@etu.u-bordeaux1.fr>
 Cédric Augonnet <cedric.augonnet@inria.fr>
 Olivier Aumage <olivier.aumage@inria.fr>
 William Braik <wbraik@gmail.com>
+Berenger Bramas <berenger.bramas@inria.fr>
 Alfredo Buttari <alfredo.buttari@enseeiht.fr>
 Jérôme Clet-Ortega <jerome.clet-ortega@labri.fr>
 Nicolas Collin <nicolas.collin@inria.fr>

+ 2 - 0
ChangeLog

@@ -98,6 +98,8 @@ New features:
     in the close future.
   * New scheduler 'dmdasd' that considers priority when deciding on
     which worker to schedule
+  * Add data access arbiters, to improve parallelism of concurrent data
+    accesses, notably with STARPU_COMMUTE.
 
 Small features:
   * Tasks can now have a name (via the field const char *name of

+ 30 - 0
doc/doxygen/chapters/07data_management.doxy

@@ -334,6 +334,36 @@ value of handle1 or handle2 becomes available first, the corresponding task
 running cl2 will start first. The task running cl1 will however always be run
 before them, and the task running cl3 will always be run after them.
 
+If a lot of tasks use the commute access on the same set of data and a lot of
+them are ready at the same time, it may become interesting to use an arbiter,
+see \ref ConcurrentDataAccess .
+
+\section ConcurrentDataAccess Concurrent Data accesses
+
+When several tasks are ready and will work on several data, StarPU is faced with
+the classical Dining Philosophers problem, and has to determine the order in
+which it will run the tasks.
+
+Data accesses usually use sequential ordering, so data accesses are usually
+already serialized, and thus by default StarPU uses the Dijkstra solution which
+scales very well in terms of overhead: tasks will just acquire data one by one
+by data handle pointer value order.
+
+When sequential ordering is disabled or the ::STARPU_COMMUTE flag is used, there
+may be a lot of concurrent accesses to the same data, and the Dijkstra solution
+gets only poor parallelism, typically in some pathological cases which do happen
+in various applications. In that case, one can use a data access arbiter, which
+implements the classical centralized solution for the Dining Philosophers
+problem. This is more expensive in terms of overhead since it is centralized,
+but it opportunistically gets a lot of parallelism. The centralization can also
+be avoided by using several arbiters, thus separating sets of data for which
+arbitration will be done.  If a task accesses data from different arbiters, it
+will acquire them arbiter by arbiter, in arbiter pointer value order.
+
+See the tests/datawizard/test_arbiter.cpp example.
+
+Arbiters however do not support the STARPU_REDUX flag yet.
+
 \section TemporaryBuffers Temporary Buffers
 
 There are two kinds of temporary buffers: temporary data which just pass results

+ 18 - 0
doc/doxygen/chapters/api/data_management.doxy

@@ -21,6 +21,11 @@ StarPU, it is associated to a ::starpu_data_handle_t which keeps track
 of the state of the piece of data over the entire machine, so that we
 can maintain data consistency and locate data replicates for instance.
 
+\typedef starpu_arbiter_t
+\ingroup API_Data_Management
+This is an arbiter, which implements an advanced but centralized management of
+concurrent data accesses, see \ref ConcurrentDataAccess for the details.
+
 \enum starpu_data_access_mode
 \ingroup API_Data_Management
 This datatype describes a data access mode.
@@ -316,4 +321,17 @@ starpu_data_acquire_cb().
 This is the same as starpu_data_release(), except that the data
 will be available on the given memory \p node instead of main memory.
 
+\fn starpu_arbiter_t starpu_arbiter_create(void)
+\ingroup API_Data_Management
+This creates a data access arbiter, see \ref ConcurrentDataAccess for the details
+
+\fn void starpu_data_assign_arbiter(starpu_data_handle_t handle, starpu_arbiter_t arbiter)
+\ingroup API_Data_Management
+This makes accesses to \p handle managed by \p arbiter
+
+\fn void starpu_arbiter_destroy(starpu_arbiter_t arbiter)
+\ingroup API_Data_Management
+This destroys the \p arbiter . This must only be called after all data assigned
+to it have been unregistered.
+
 */

+ 4 - 0
examples/cg/cg.c

@@ -409,6 +409,10 @@ int main(int argc, char **argv)
 {
 	int ret;
 
+	/* Not supported yet */
+	if (starpu_get_env_number_default("STARPU_GLOBAL_ARBITER", 0) > 0)
+		return 77;
+
 #ifdef STARPU_QUICK_CHECK
 	i_max = 16;
 #endif

+ 5 - 1
examples/pi/pi_redux.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2014  Université de Bordeaux
+ * Copyright (C) 2010-2015  Université de Bordeaux
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -314,6 +314,10 @@ int main(int argc, char **argv)
 	unsigned i;
 	int ret;
 
+	/* Not supported yet */
+	if (starpu_get_env_number_default("STARPU_GLOBAL_ARBITER", 0) > 0)
+		return 77;
+
 	parse_args(argc, argv);
 
 	ret = starpu_init(NULL);

+ 4 - 0
examples/reductions/dot_product.c

@@ -332,6 +332,10 @@ int main(int argc, char **argv)
 {
 	int ret;
 
+	/* Not supported yet */
+	if (starpu_get_env_number_default("STARPU_GLOBAL_ARBITER", 0) > 0)
+		return 77;
+
 	ret = starpu_init(NULL);
 	if (ret == -ENODEV)
 		return 77;

+ 5 - 1
examples/reductions/minmax_reduction.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010, 2013  Université de Bordeaux
+ * Copyright (C) 2010, 2013, 2015  Université de Bordeaux
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -139,6 +139,10 @@ int main(int argc, char **argv)
 	unsigned long i;
 	int ret;
 
+	/* Not supported yet */
+	if (starpu_get_env_number_default("STARPU_GLOBAL_ARBITER", 0) > 0)
+		return 77;
+
 	ret = starpu_init(NULL);
 	if (ret == -ENODEV)
 		return 77;

+ 5 - 0
include/starpu_data.h

@@ -81,6 +81,11 @@ int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t h
 void starpu_data_release(starpu_data_handle_t handle);
 void starpu_data_release_on_node(starpu_data_handle_t handle, int node);
 
+typedef struct starpu_arbiter *starpu_arbiter_t;
+starpu_arbiter_t starpu_arbiter_create(void);
+void starpu_data_assign_arbiter(starpu_data_handle_t handle, starpu_arbiter_t arbiter);
+void starpu_arbiter_destroy(starpu_arbiter_t);
+
 void starpu_data_display_memory_stats();
 
 #define starpu_data_malloc_pinned_if_possible	starpu_malloc

+ 5 - 1
mpi/tests/mpi_reduction.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2013  Université de Bordeaux
+ * Copyright (C) 2013, 2015  Université de Bordeaux
  * Copyright (C) 2012, 2013, 2014, 2015  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -72,6 +72,10 @@ int main(int argc, char **argv)
 
 	int nb_elements, step, loops;
 
+	/* Not supported yet */
+	if (starpu_get_env_number_default("STARPU_GLOBAL_ARBITER", 0) > 0)
+		return STARPU_TEST_SKIPPED;
+
 	int ret = starpu_init(NULL);
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
 	ret = starpu_mpi_init(&argc, &argv, 1);

+ 2 - 1
src/Makefile.am

@@ -1,6 +1,6 @@
 # StarPU --- Runtime system for heterogeneous multicore architectures.
 #
-# Copyright (C) 2009-2014  Université de Bordeaux
+# Copyright (C) 2009-2015  Université de Bordeaux
 # Copyright (C) 2010, 2011, 2012, 2013, 2015  CNRS
 # Copyright (C) 2011, 2014  INRIA
 #
@@ -165,6 +165,7 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 		\
 	core/dependencies/tags.c				\
 	core/dependencies/task_deps.c				\
 	core/dependencies/data_concurrency.c			\
+	core/dependencies/data_arbiter_concurrency.c		\
 	core/disk_ops/disk_stdio.c				\
 	core/disk_ops/disk_unistd.c                             \
 	core/disk_ops/unistd/disk_unistd_global.c		\

+ 717 - 0
src/core/dependencies/data_arbiter_concurrency.c

@@ -0,0 +1,717 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2015  Université de Bordeaux
+ * Copyright (C) 2015  Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <core/dependencies/data_concurrency.h>
+#include <datawizard/coherency.h>
+#include <core/sched_policy.h>
+#include <common/starpu_spinlock.h>
+#include <datawizard/sort_data_handles.h>
+
+/* TODO factorize with data_concurrency.c and btw support redux */
+
+//#define LOCK_OR_DELEGATE
+
+/*
+ * This implements a solution for the dining philosophers problem (see
+ * data_concurrency.c for the rationale) based on a centralized arbiter.  This
+ * allows to get a more parallel solution than the Dijkstra solution, by
+ * avoiding strictly serialized executions, and instead opportunistically find
+ * which tasks can take data.
+ *
+ * These are the algorithms implemented below:
+ *
+ *
+ * at termination of task T:
+ *
+ * - for each handle h of T:
+ *   - mutex_lock(&arbiter)
+ *   - release reference on h
+ *   - for each task Tc waiting for h:
+ *     - for each data Tc_h it is waiting:
+ *       - if Tc_h is busy, goto fail
+ *     // Ok, now really take them
+ *     - For each data Tc_h it is waiting:
+ *       - lock(Tc_h)
+ *       - take reference on h (it should be still available since we hold the arbiter)
+ *       - unlock(Tc_h)
+ *     // Ok, we managed to find somebody, we're finished!
+ *     _starpu_push_task(Tc);
+ *     break;
+ *     fail:
+ *       // No luck, let's try another task
+ *       continue;
+ *   // Release the arbiter mutex a bit from time to time
+ *   - mutex_unlock(&arbiter)
+ *
+ *
+ * at submission of task T:
+ *
+ * - mutex_lock(&arbiter)
+ * - for each handle h of T:
+ *   - lock(h)
+ *   - try to take a reference on h, goto fail on failure
+ *   - unlock(h)
+ * // Success!
+ * - mutex_unlock(&arbiter);
+ * - return 0;
+ *
+ * fail:
+ * // couldn't take everything, abort and record task T
+ * // drop spurious references
+ * - for each handle h of T already taken:
+ *   - lock(h)
+ *   - release reference on h
+ *   - unlock(h)
+ * // record T on the list of requests for h
+ * TODO: record on only one handle
+ * - for each handle h of T:
+ *   - record T as waiting on h
+ * - mutex_unlock(&arbiter)
+ * - return 1;
+ */
+
+struct starpu_arbiter
+{
+#ifdef LOCK_OR_DELEGATE
+/* The list of task to perform */
+	struct LockOrDelegateListNode* dlTaskListHead;
+
+/* To protect the list of tasks */
+	struct _starpu_spinlock dlListLock;
+/* Whether somebody is working on the list */
+	int working;
+#else /* LOCK_OR_DELEGATE */
+	starpu_pthread_mutex_t mutex;
+#endif /* LOCK_OR_DELEGATE */
+};
+
+#ifdef LOCK_OR_DELEGATE
+
+/* In case of congestion, we don't want to needlessly wait for the arbiter lock
+ * while we can just delegate the work to the worker already managing some
+ * dependencies.
+ *
+ * So we push work on the dlTastListHead queue and only one worker will process
+ * the list.
+ */
+
+/* A LockOrDelegate task list */
+struct LockOrDelegateListNode
+{
+	void (*func)(void*);
+	void* data;
+	struct LockOrDelegateListNode* next;
+};
+
+/* Post a task to perfom if possible, otherwise put it in the list
+ * If we can perfom this task, we may also perfom all the tasks in the list
+ * This function return 1 if the task (and maybe some others) has been done
+ * by the calling thread and 0 otherwise (if the task has just been put in the list)
+ */
+static int _starpu_LockOrDelegatePostOrPerform(starpu_arbiter_t arbiter, void (*func)(void*), void* data)
+{
+	struct LockOrDelegateListNode* newNode = malloc(sizeof(*newNode)), *iter;
+	int did = 0;
+	STARPU_ASSERT(newNode);
+	newNode->data = data;
+	newNode->func = func;
+
+	_starpu_spin_lock(&arbiter->dlListLock);
+	if (arbiter->working)
+	{
+		/* Somebody working on it, insert the node */
+		newNode->next = arbiter->dlTaskListHead;
+		arbiter->dlTaskListHead = newNode;
+	}
+	else
+	{
+		/* Nobody working on the list, we'll work */
+		arbiter->working = 1;
+
+		/* work on what was pushed so far first */
+		iter = arbiter->dlTaskListHead;
+		arbiter->dlTaskListHead = NULL;
+		_starpu_spin_unlock(&arbiter->dlListLock);
+		while (iter != NULL)
+		{
+			(*iter->func)(iter->data);
+			free(iter);
+			iter = iter->next;
+		}
+
+		/* And then do our job */
+		(*func)(data);
+		free(newNode);
+		did = 1;
+
+		_starpu_spin_lock(&arbiter->dlListLock);
+		/* And finish working on anything that could have been pushed
+		 * in the meanwhile */
+		while (arbiter->dlTaskListHead != 0)
+		{
+			iter = arbiter->dlTaskListHead;
+			arbiter->dlTaskListHead = arbiter->dlTaskListHead->next;
+			_starpu_spin_unlock(&arbiter->dlListLock);
+
+			(*iter->func)(iter->data);
+			free(iter);
+			_starpu_spin_lock(&arbiter->dlListLock);
+		}
+
+		arbiter->working = 0;
+	}
+
+	_starpu_spin_unlock(&arbiter->dlListLock);
+	return did;
+}
+
+#endif
+
+/* Try to submit a data request, in case the request can be processed
+ * immediatly, return 0, if there is still a dependency that is not compatible
+ * with the current mode, the request is put in the per-handle list of
+ * "requesters", and this function returns 1. */
+#ifdef LOCK_OR_DELEGATE
+struct starpu_submit_arbitered_args
+{
+	unsigned request_from_codelet;
+	starpu_data_handle_t handle;
+	enum starpu_data_access_mode mode;
+	void (*callback)(void *);
+	void *argcb;
+	struct _starpu_job *j;
+	unsigned buffer_index;
+};
+static unsigned ___starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_codelet,
+						       starpu_data_handle_t handle, enum starpu_data_access_mode mode,
+						       void (*callback)(void *), void *argcb,
+						       struct _starpu_job *j, unsigned buffer_index);
+static void __starpu_attempt_to_submit_arbitered_data_request(void *inData)
+{
+	struct starpu_submit_arbitered_args* args = inData;
+	unsigned request_from_codelet = args->request_from_codelet;
+	starpu_data_handle_t handle = args->handle;
+	enum starpu_data_access_mode mode = args->mode;
+	void (*callback)(void*) = args->callback;
+	void *argcb = args->argcb;
+	struct _starpu_job *j = args->j;
+	unsigned buffer_index = args->buffer_index;
+	free(args);
+	if (!___starpu_attempt_to_submit_arbitered_data_request(request_from_codelet, handle, mode, callback, argcb, j, buffer_index))
+		/* Success, but we have no way to report it to original caller,
+		 * so call callback ourself */
+		callback(argcb);
+}
+
+unsigned _starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_codelet,
+						       starpu_data_handle_t handle, enum starpu_data_access_mode mode,
+						       void (*callback)(void *), void *argcb,
+						       struct _starpu_job *j, unsigned buffer_index)
+{
+	struct starpu_submit_arbitered_args* args = malloc(sizeof(*args));
+	args->request_from_codelet = request_from_codelet;
+	args->handle = handle;
+	args->mode = mode;
+	args->callback = callback;
+	args->argcb = argcb;
+	args->j = j;
+	args->buffer_index = buffer_index;
+	/* The function will delete args */
+	_starpu_LockOrDelegatePostOrPerform(handle->arbiter, &__starpu_attempt_to_submit_arbitered_data_request, args);
+	return 1;
+}
+
+unsigned ___starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_codelet,
+						       starpu_data_handle_t handle, enum starpu_data_access_mode mode,
+						       void (*callback)(void *), void *argcb,
+						       struct _starpu_job *j, unsigned buffer_index)
+{
+	STARPU_ASSERT(handle->arbiter);
+#else // LOCK_OR_DELEGATE
+unsigned _starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_codelet,
+						       starpu_data_handle_t handle, enum starpu_data_access_mode mode,
+						       void (*callback)(void *), void *argcb,
+						       struct _starpu_job *j, unsigned buffer_index)
+{
+	starpu_arbiter_t arbiter = handle->arbiter;
+	STARPU_PTHREAD_MUTEX_LOCK(&arbiter->mutex);
+#endif // LOCK_OR_DELEGATE
+
+	if (mode == STARPU_RW)
+		mode = STARPU_W;
+
+	STARPU_ASSERT_MSG(!(mode & STARPU_REDUX), "REDUX with arbiter is not implemented\n");
+
+	/* Take the lock protecting the header. We try to do some progression
+	 * in case this is called from a worker, otherwise we just wait for the
+	 * lock to be available. */
+	if (request_from_codelet)
+	{
+		int cpt = 0;
+		while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
+		{
+			cpt++;
+			_starpu_datawizard_progress(_starpu_memory_node_get_local_key(), 0);
+		}
+		if (cpt == STARPU_SPIN_MAXTRY)
+			_starpu_spin_lock(&handle->header_lock);
+	}
+	else
+	{
+		_starpu_spin_lock(&handle->header_lock);
+	}
+
+	/* If there is currently nobody accessing the piece of data, or it's
+	 * not another writter and if this is the same type of access as the
+	 * current one, we can proceed. */
+	unsigned put_in_list;
+
+	if (handle->refcnt)
+	{
+		/* there cannot be multiple writers or a new writer
+		 * while the data is in read mode */
+
+		handle->busy_count++;
+		/* enqueue the request */
+		struct _starpu_data_requester *r = _starpu_data_requester_new();
+		r->mode = mode;
+		r->is_requested_by_codelet = request_from_codelet;
+		r->j = j;
+		r->buffer_index = buffer_index;
+		r->ready_data_callback = callback;
+		r->argcb = argcb;
+
+		_starpu_data_requester_list_push_back(handle->arbitered_req_list, r);
+
+		/* failed */
+		put_in_list = 1;
+	}
+	else
+	{
+		handle->refcnt++;
+		handle->busy_count++;
+
+		/* Do not write to handle->current_mode if it is already
+		 * R. This avoids a spurious warning from helgrind when
+		 * the following happens:
+		 * acquire(R) in thread A
+		 * acquire(R) in thread B
+		 * release_data_on_node() in thread A
+		 * helgrind would shout that the latter reads current_mode
+		 * unsafely.
+		 *
+		 * This actually basically explains helgrind that it is a
+		 * shared R acquisition.
+		 */
+		if (mode != STARPU_R || handle->current_mode != mode)
+			handle->current_mode = mode;
+
+		/* success */
+		put_in_list = 0;
+	}
+
+	_starpu_spin_unlock(&handle->header_lock);
+#ifndef LOCK_OR_DELEGATE
+	STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
+#endif // LOCK_OR_DELEGATE
+	return put_in_list;
+
+}
+
+
+
+/* This function find a node that contains the parameter j as job and remove it from the list
+ * the function return 0 if a node was found and deleted, 1 otherwise
+ */
+static unsigned remove_job_from_requester_list(struct _starpu_data_requester_list* req_list, struct _starpu_job * j)
+{
+	struct _starpu_data_requester * iter = _starpu_data_requester_list_begin(req_list);//_head;
+	while (iter != _starpu_data_requester_list_end(req_list) && iter->j != j)
+	{
+		iter = _starpu_data_requester_list_next(iter); // iter = iter->_next;
+	}
+	if (iter)
+	{
+		_starpu_data_requester_list_erase(req_list, iter);
+		return 0;
+	}
+	return 1;
+}
+
+#ifdef LOCK_OR_DELEGATE
+/* These are the arguments passed to _submit_job_enforce_arbitered_deps */
+struct starpu_enforce_arbitered_args
+{
+	struct _starpu_job *j;
+	unsigned buf;
+	unsigned nbuffers;
+};
+
+static void ___starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers);
+static void __starpu_submit_job_enforce_arbitered_deps(void* inData)
+{
+	struct starpu_enforce_arbitered_args* args = inData;
+	struct _starpu_job *j = args->j;
+	unsigned buf		  = args->buf;
+	unsigned nbuffers	 = args->nbuffers;
+	/* we are in charge of freeing the args */
+	free(args);
+	___starpu_submit_job_enforce_arbitered_deps(j, buf, nbuffers);
+}
+
+void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers)
+{
+	struct starpu_enforce_arbitered_args* args = malloc(sizeof(*args));
+	starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf);
+	args->j = j;
+	args->buf = buf;
+	args->nbuffers = nbuffers;
+	/* The function will delete args */
+	_starpu_LockOrDelegatePostOrPerform(handle->arbiter, &__starpu_submit_job_enforce_arbitered_deps, args);
+}
+
+static void ___starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers)
+{
+	starpu_arbiter_t arbiter = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf)->arbiter;
+#else // LOCK_OR_DELEGATE
+void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers)
+{
+	starpu_arbiter_t arbiter = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf)->arbiter;
+	STARPU_PTHREAD_MUTEX_LOCK(&arbiter->mutex);
+#endif
+	STARPU_ASSERT(arbiter);
+
+	const unsigned nb_non_arbitered_buff = buf;
+	unsigned idx_buf_arbiter;
+	unsigned all_arbiter_available = 1;
+
+
+	for (idx_buf_arbiter = nb_non_arbitered_buff; idx_buf_arbiter < nbuffers; idx_buf_arbiter++)
+	{
+		starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter);
+		enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_arbiter);
+
+		if (idx_buf_arbiter && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter-1)==handle))
+			/* We have already requested this data, skip it. This
+			 * depends on ordering putting writes before reads, see
+			 * _starpu_compar_handles.  */
+			continue;
+
+		if (handle->arbiter != arbiter)
+		{
+			/* another arbiter */
+			break;
+		}
+
+		/* we post all arbiter  */
+		_starpu_spin_lock(&handle->header_lock);
+		if (handle->refcnt == 0)
+		{
+			handle->refcnt += 1;
+			handle->busy_count += 1;
+			handle->current_mode = mode;
+			_starpu_spin_unlock(&handle->header_lock);
+		}
+		else
+		{
+			/* stop if an handle do not have a refcnt == 0 */
+			_starpu_spin_unlock(&handle->header_lock);
+			all_arbiter_available = 0;
+			break;
+		}
+	}
+	if (all_arbiter_available == 0)
+	{
+		/* Oups cancel all taken and put req in arbiter list */
+		unsigned idx_buf_cancel;
+		for (idx_buf_cancel = nb_non_arbitered_buff; idx_buf_cancel < idx_buf_arbiter ; idx_buf_cancel++)
+		{
+			starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
+
+			if (idx_buf_cancel && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel-1)==cancel_handle))
+				continue;
+			if (cancel_handle->arbiter != arbiter)
+				/* Will have to process another arbiter, will do that later */
+				break;
+
+			_starpu_spin_lock(&cancel_handle->header_lock);
+			/* reset the counter because finally we do not take the data */
+			STARPU_ASSERT(cancel_handle->refcnt == 1);
+			cancel_handle->refcnt -= 1;
+			_starpu_spin_unlock(&cancel_handle->header_lock);
+		}
+
+		for (idx_buf_cancel = nb_non_arbitered_buff; idx_buf_cancel < nbuffers ; idx_buf_cancel++)
+		{
+			starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
+			enum starpu_data_access_mode cancel_mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_cancel);
+
+			if (cancel_handle->arbiter != arbiter)
+				break;
+
+			struct _starpu_data_requester *r = _starpu_data_requester_new();
+			r->mode = cancel_mode;
+			r->is_requested_by_codelet = 1;
+			r->j = j;
+			r->buffer_index = idx_buf_cancel;
+			r->ready_data_callback = NULL;
+			r->argcb = NULL;
+
+			_starpu_spin_lock(&cancel_handle->header_lock);
+			/* store node in list */
+			_starpu_data_requester_list_push_front(cancel_handle->arbitered_req_list, r);
+			/* inc the busy count if it has not been changed in the previous loop */
+			if (idx_buf_arbiter <= idx_buf_cancel)
+				cancel_handle->busy_count += 1;
+			_starpu_spin_unlock(&cancel_handle->header_lock);
+		}
+
+#ifndef LOCK_OR_DELEGATE
+		STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
+#endif
+		return;
+	}
+#ifndef LOCK_OR_DELEGATE
+	STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
+#endif
+
+	// all_arbiter_available is true
+	if (idx_buf_arbiter < nbuffers)
+		/* Other arbitered data, process them */
+		_starpu_submit_job_enforce_arbitered_deps(j, idx_buf_arbiter, nbuffers);
+	else
+		/* Finished with all data, can eventually push! */
+		_starpu_push_task(j);
+}
+
+#ifdef LOCK_OR_DELEGATE
+void ___starpu_notify_arbitered_dependencies(starpu_data_handle_t handle);
+void __starpu_notify_arbitered_dependencies(void* inData)
+{
+	starpu_data_handle_t handle = inData;
+	___starpu_notify_arbitered_dependencies(handle);
+}
+void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
+{
+	_starpu_LockOrDelegatePostOrPerform(handle->arbiter, &__starpu_notify_arbitered_dependencies, handle);
+}
+void ___starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
+#else // LOCK_OR_DELEGATE
+void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle)
+#endif
+{
+	starpu_arbiter_t arbiter = handle->arbiter;
+#ifndef LOCK_OR_DELEGATE
+	STARPU_PTHREAD_MUTEX_LOCK(&arbiter->mutex);
+#endif
+
+	/* Since the request has been posted the handle may have been proceed and released */
+	if (_starpu_data_requester_list_empty(handle->arbitered_req_list))
+	{
+#ifndef LOCK_OR_DELEGATE
+		STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
+#endif
+		return;
+	}
+	/* no one has the right to work on arbitered_req_list without a lock on mutex
+	   so we do not need to lock the handle for safety */
+	struct _starpu_data_requester *r;
+	for (r = _starpu_data_requester_list_begin(handle->arbitered_req_list);
+	     r != _starpu_data_requester_list_end(handle->arbitered_req_list);
+	     r = _starpu_data_requester_list_next(r))
+	{
+		if (!r->is_requested_by_codelet)
+		{
+			/* data_acquire_cb, process it */
+			enum starpu_data_access_mode r_mode = r->mode;
+			if (r_mode == STARPU_RW)
+				r_mode = STARPU_W;
+
+			_starpu_spin_lock(&handle->header_lock);
+			handle->refcnt++;
+			handle->busy_count++;
+			handle->current_mode = r_mode;
+			_starpu_spin_unlock(&handle->header_lock);
+			_starpu_data_requester_list_erase(handle->arbitered_req_list, r);
+#ifndef LOCK_OR_DELEGATE
+			STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
+#endif
+			r->ready_data_callback(r->argcb);
+			_starpu_data_requester_delete(r);
+
+			_starpu_spin_lock(&handle->header_lock);
+			STARPU_ASSERT(handle->busy_count > 0);
+			handle->busy_count--;
+			if (!_starpu_data_check_not_busy(handle))
+				_starpu_spin_unlock(&handle->header_lock);
+			return;
+		}
+
+		struct _starpu_job* j = r->j;
+		unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(j->task);
+		unsigned nb_non_arbitered_buff;
+		/* find the position of arbiter buffers */
+		for (nb_non_arbitered_buff = 0; nb_non_arbitered_buff < nbuffers; nb_non_arbitered_buff++)
+		{
+			starpu_data_handle_t handle_arbiter = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, nb_non_arbitered_buff);
+			if (nb_non_arbitered_buff && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, nb_non_arbitered_buff-1) == handle_arbiter))
+				/* We have already requested this data, skip it. This
+				 * depends on ordering putting writes before reads, see
+				 * _starpu_compar_handles.  */
+				continue;
+			if (handle_arbiter->arbiter == arbiter)
+			{
+				break;
+			}
+		}
+
+		unsigned idx_buf_arbiter;
+		unsigned all_arbiter_available = 1;
+
+		for (idx_buf_arbiter = nb_non_arbitered_buff; idx_buf_arbiter < nbuffers; idx_buf_arbiter++)
+		{
+			starpu_data_handle_t handle_arbiter = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter);
+			if (idx_buf_arbiter && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter-1)==handle_arbiter))
+				/* We have already requested this data, skip it. This
+				 * depends on ordering putting writes before reads, see
+				 * _starpu_compar_handles.  */
+				continue;
+			if (handle_arbiter->arbiter != arbiter)
+				/* Will have to process another arbiter, will do that later */
+				break;
+
+			/* we post all arbiter  */
+			enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_arbiter);
+
+			_starpu_spin_lock(&handle_arbiter->header_lock);
+			if (handle_arbiter->refcnt != 0)
+			{
+				/* handle is not available, record ourself */
+				_starpu_spin_unlock(&handle_arbiter->header_lock);
+				all_arbiter_available = 0;
+				break;
+			}
+			/* mark the handle as taken */
+			handle_arbiter->refcnt += 1;
+			handle_arbiter->current_mode = mode;
+			_starpu_spin_unlock(&handle_arbiter->header_lock);
+		}
+
+		if (all_arbiter_available)
+		{
+			for (idx_buf_arbiter = nb_non_arbitered_buff; idx_buf_arbiter < nbuffers; idx_buf_arbiter++)
+			{
+				starpu_data_handle_t handle_arbiter = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter);
+				if (idx_buf_arbiter && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_arbiter-1)==handle_arbiter))
+					continue;
+				if (handle_arbiter->arbiter != arbiter)
+					break;
+
+				/* we post all arbiter  */
+				enum starpu_data_access_mode mode = _STARPU_JOB_GET_ORDERED_BUFFER_MODE(j, idx_buf_arbiter);
+
+				_starpu_spin_lock(&handle_arbiter->header_lock);
+				STARPU_ASSERT(handle_arbiter->refcnt == 1);
+				STARPU_ASSERT( handle_arbiter->busy_count >= 1);
+				STARPU_ASSERT( handle_arbiter->current_mode == mode);
+				const unsigned correctly_deleted = remove_job_from_requester_list(handle_arbiter->arbitered_req_list, j);
+				STARPU_ASSERT(correctly_deleted == 0);
+				_starpu_spin_unlock(&handle_arbiter->header_lock);
+			}
+			/* Remove and delete list node */
+			_starpu_data_requester_delete(r);
+
+			/* release global mutex */
+#ifndef LOCK_OR_DELEGATE
+			STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
+#endif
+
+			if (idx_buf_arbiter < nbuffers)
+				/* Other arbitered data, process them */
+				_starpu_submit_job_enforce_arbitered_deps(j, idx_buf_arbiter, nbuffers);
+			else
+				/* Finished with all data, can eventually push! */
+				_starpu_push_task(j);
+
+			return;
+		}
+		else
+		{
+			unsigned idx_buf_cancel;
+			/* all handles are not available - revert the mark */
+			for (idx_buf_cancel = nb_non_arbitered_buff; idx_buf_cancel < idx_buf_arbiter ; idx_buf_cancel++)
+			{
+				starpu_data_handle_t cancel_handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel);
+				if (idx_buf_cancel && (_STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, idx_buf_cancel-1)==cancel_handle))
+					continue;
+				if (cancel_handle->arbiter != arbiter)
+					break;
+				_starpu_spin_lock(&cancel_handle->header_lock);
+				STARPU_ASSERT(cancel_handle->refcnt == 1);
+				cancel_handle->refcnt -= 1;
+				_starpu_spin_unlock(&cancel_handle->header_lock);
+			}
+		}
+	}
+	/* no task has been pushed */
+#ifndef LOCK_OR_DELEGATE
+	STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
+#endif
+	return;
+}
+
+starpu_arbiter_t starpu_arbiter_create(void)
+{
+	starpu_arbiter_t res = malloc(sizeof(*res));
+
+#ifdef LOCK_OR_DELEGATE
+	res->dlTaskListHead = NULL;
+	_starpu_spin_init(&res->dlListLock);
+	res->working = 0;
+#else /* LOCK_OR_DELEGATE */
+	STARPU_PTHREAD_MUTEX_INIT(&res->mutex, NULL);
+#endif /* LOCK_OR_DELEGATE */
+
+	return res;
+}
+
+void starpu_data_assign_arbiter(starpu_data_handle_t handle, starpu_arbiter_t arbiter)
+{
+	if (handle->arbiter && handle->arbiter == _starpu_global_arbiter)
+		/* Just for testing purpose */
+		return;
+	STARPU_ASSERT_MSG(!handle->arbiter, "handle can only be assigned one arbiter");
+	STARPU_ASSERT_MSG(!handle->refcnt, "arbiter can be assigned to handle only right after initialization");
+	STARPU_ASSERT_MSG(!handle->busy_count, "arbiter can be assigned to handle only right after initialization");
+	handle->arbiter = arbiter;
+}
+
+void starpu_arbiter_destroy(starpu_arbiter_t arbiter)
+{
+#ifdef LOCK_OR_DELEGATE
+	_starpu_spin_lock(&arbiter->dlListLock);
+	STARPU_ASSERT(!arbiter->dlTaskListHead);
+	STARPU_ASSERT(!arbiter->working);
+	_starpu_spin_unlock(&arbiter->dlListLock);
+	_starpu_spin_destroy(&arbiter->dlListLock);
+#else /* LOCK_OR_DELEGATE */
+	STARPU_PTHREAD_MUTEX_LOCK(&arbiter->mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&arbiter->mutex);
+	STARPU_PTHREAD_MUTEX_DESTROY(&arbiter->mutex);
+#endif /* LOCK_OR_DELEGATE */
+	free(arbiter);
+}

+ 31 - 1
src/core/dependencies/data_concurrency.c

@@ -2,6 +2,7 @@
  *
  * Copyright (C) 2010-2015  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013  CNRS
+ * Copyright (C) 2015  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -49,6 +50,9 @@
  *
  * The same mechanism is used for application data aquisition
  * (starpu_data_acquire).
+ *
+ * For data with an arbiter, we have a second step, performed after this first
+ * step, implemented in data_arbiter_concurrency.c
  */
 
 /*
@@ -110,6 +114,9 @@ static unsigned _starpu_attempt_to_submit_data_request(unsigned request_from_cod
 						       void (*callback)(void *), void *argcb,
 						       struct _starpu_job *j, unsigned buffer_index)
 {
+	if (handle->arbiter)
+		return _starpu_attempt_to_submit_arbitered_data_request(request_from_codelet, handle, mode, callback, argcb, j, buffer_index);
+
 	if (mode == STARPU_RW)
 		mode = STARPU_W;
 
@@ -256,10 +263,10 @@ static unsigned _submit_job_enforce_data_deps(struct _starpu_job *j, unsigned st
 	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(j->task);
 	for (buf = start_buffer_index; buf < nbuffers; buf++)
 	{
+		starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf);
 		if (buf)
 		{
 			starpu_data_handle_t handle_m1 = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf-1);
-			starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf);
 			if (handle_m1 == handle)
 				/* We have already requested this data, skip it. This
 				 * depends on ordering putting writes before reads, see
@@ -268,6 +275,15 @@ static unsigned _submit_job_enforce_data_deps(struct _starpu_job *j, unsigned st
 		}
 
                 j->task->status = STARPU_TASK_BLOCKED_ON_DATA;
+
+		if(handle->arbiter)
+		{
+			/* We arrived on an arbitered data, we stop and proceed
+			 * with the arbiter second step.  */
+			_starpu_submit_job_enforce_arbitered_deps(j, buf, nbuffers);
+			return 1;
+		}
+
                 if (attempt_to_submit_data_request_from_job(j, buf))
 		{
 			return 1;
@@ -350,6 +366,20 @@ int _starpu_notify_data_dependencies(starpu_data_handle_t handle)
 		/* Handle was destroyed, nothing left to do.  */
 		return 1;
 
+	if (handle->arbiter)
+	{
+		unsigned refcnt = handle->refcnt;
+		STARPU_ASSERT(_starpu_data_requester_list_empty(handle->req_list));
+		STARPU_ASSERT(_starpu_data_requester_list_empty(handle->reduction_req_list));
+		_starpu_spin_unlock(&handle->header_lock);
+		/* _starpu_notify_arbitered_dependencies will handle its own locking */
+		if (!refcnt)
+			_starpu_notify_arbitered_dependencies(handle);
+		/* We have already unlocked */
+		return 1;
+	}
+	STARPU_ASSERT(_starpu_data_requester_list_empty(handle->arbitered_req_list));
+
 	/* In case there is a pending reduction, and that this is the last
 	 * requester, we may go back to a "normal" coherency model. */
 	if (handle->reduction_refcnt > 0)

+ 8 - 0
src/core/dependencies/data_concurrency.h

@@ -2,6 +2,7 @@
  *
  * Copyright (C) 2010, 2012, 2015  Université de Bordeaux
  * Copyright (C) 2010, 2011  CNRS
+ * Copyright (C) 2015  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -23,12 +24,19 @@
 void _starpu_job_set_ordered_buffers(struct _starpu_job *j);
 
 unsigned _starpu_submit_job_enforce_data_deps(struct _starpu_job *j);
+void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers);
 
 int _starpu_notify_data_dependencies(starpu_data_handle_t handle);
+void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle);
 
 unsigned _starpu_attempt_to_submit_data_request_from_apps(starpu_data_handle_t handle,
 							  enum starpu_data_access_mode mode,
 							  void (*callback)(void *), void *argcb);
 
+unsigned _starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_codelet,
+						       starpu_data_handle_t handle, enum starpu_data_access_mode mode,
+						       void (*callback)(void *), void *argcb,
+						       struct _starpu_job *j, unsigned buffer_index);
+
 #endif // __DATA_CONCURRENCY_H__
 

+ 5 - 1
src/datawizard/coherency.h

@@ -2,7 +2,7 @@
  *
  * Copyright (C) 2009-2015  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015  CNRS
- * Copyright (C) 2014  INRIA
+ * Copyright (C) 2014-2015  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -234,6 +234,10 @@ struct _starpu_data_state
 
 	/* hook to be called when unregistering the data */
 	_starpu_data_handle_unregister_hook unregister_hook;
+
+	struct starpu_arbiter *arbiter;
+	/* This is protected by the arbiter mutex */
+	struct _starpu_data_requester_list *arbitered_req_list;
 };
 
 void _starpu_display_msi_stats(void);

+ 7 - 0
src/datawizard/filters.c

@@ -209,6 +209,13 @@ void starpu_data_partition(starpu_data_handle_t initial_handle, struct starpu_da
 		child->last_submitted_ghost_accessors_id = NULL;
 #endif
 
+		if (_starpu_global_arbiter)
+			/* Just for testing purpose */
+			starpu_data_assign_arbiter(child, _starpu_global_arbiter);
+		else
+			child->arbiter = NULL;
+		child->arbitered_req_list = _starpu_data_requester_list_new();
+
 		for (node = 0; node < STARPU_MAXNODES; node++)
 		{
 			struct _starpu_data_replicate *initial_replicate;

+ 13 - 0
src/datawizard/interfaces/data_interface.c

@@ -41,12 +41,17 @@ struct handle_entry
 static struct handle_entry *registered_handles;
 static struct _starpu_spinlock    registered_handles_lock;
 static int _data_interface_number = STARPU_MAX_INTERFACE_ID;
+starpu_arbiter_t _starpu_global_arbiter;
 
 static void _starpu_data_unregister(starpu_data_handle_t handle, unsigned coherent, unsigned nowait);
 
 void _starpu_data_interface_init(void)
 {
 	_starpu_spin_init(&registered_handles_lock);
+
+	/* Just for testing purpose */
+	if (starpu_get_env_number_default("STARPU_GLOBAL_ARBITER", 0) > 0)
+		_starpu_global_arbiter = starpu_arbiter_create();
 }
 
 void _starpu_data_interface_shutdown()
@@ -291,6 +296,13 @@ static void _starpu_register_new_data(starpu_data_handle_t handle,
 
 	handle->home_node = home_node;
 
+	if (_starpu_global_arbiter)
+		/* Just for testing purpose */
+		starpu_data_assign_arbiter(handle, _starpu_global_arbiter);
+	else
+		handle->arbiter = NULL;
+	handle->arbitered_req_list = _starpu_data_requester_list_new();
+
 	/* that new data is invalid from all nodes perpective except for the
 	 * home node */
 	unsigned node;
@@ -792,6 +804,7 @@ static void _starpu_data_unregister(starpu_data_handle_t handle, unsigned cohere
 	_starpu_data_free_interfaces(handle);
 
 	_starpu_memory_stats_free(handle);
+	_starpu_data_requester_list_delete(handle->arbitered_req_list);
 	_starpu_data_requester_list_delete(handle->req_list);
 	_starpu_data_requester_list_delete(handle->reduction_req_list);
 

+ 2 - 1
src/datawizard/interfaces/data_interface.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009-2012, 2014  Université de Bordeaux
+ * Copyright (C) 2009-2012, 2014-2015  Université de Bordeaux
  * Copyright (C) 2010, 2012, 2013, 2014, 2015  CNRS
  * Copyright (C) 2014  INRIA
  *
@@ -57,6 +57,7 @@ void _starpu_data_free_interfaces(starpu_data_handle_t handle)
 extern
 int _starpu_data_handle_init(starpu_data_handle_t handle, struct starpu_data_interface_ops *interface_ops, unsigned int mf_node);
 
+struct starpu_arbiter *_starpu_global_arbiter;
 extern void _starpu_data_interface_init(void) STARPU_ATTRIBUTE_INTERNAL;
 extern int _starpu_data_check_not_busy(starpu_data_handle_t handle) STARPU_ATTRIBUTE_INTERNAL;
 extern void _starpu_data_interface_shutdown(void) STARPU_ATTRIBUTE_INTERNAL;

+ 13 - 1
src/datawizard/sort_data_handles.c

@@ -1,7 +1,8 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2011, 2014  Université de Bordeaux
+ * Copyright (C) 2010-2011, 2014-2015  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2015  CNRS
+ * Copyright (C) 2015  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -93,6 +94,17 @@ static int _starpu_compar_handles(const struct _starpu_data_descr *descrA,
 			return 1;
 	}
 
+	/* Put arbitered accesses after non-arbitered */
+	if (dataA->arbiter && !(dataB->arbiter))
+		return 1;
+	if (dataB->arbiter && !(dataA->arbiter))
+		return -1;
+	if (dataA->arbiter != dataB->arbiter)
+		/* Both are arbitered, sort by arbiter pointer order */
+		return ((dataA->arbiter < dataB->arbiter)?-1:1);
+	/* If both are arbitered by the same arbiter (or they are both not
+	 * arbitered), we'll sort them by handle */
+
 	/* In case we have data/subdata from different trees */
 	if (dataA->root_handle != dataB->root_handle)
 		return ((dataA->root_handle < dataB->root_handle)?-1:1);

+ 4 - 0
tests/Makefile.am

@@ -212,6 +212,7 @@ noinst_PROGRAMS =				\
 	datawizard/readonly			\
 	datawizard/specific_node		\
 	datawizard/task_with_multiple_time_the_same_handle	\
+	datawizard/test_arbiter			\
 	disk/disk_copy				\
 	disk/disk_compute			\
 	disk/disk_pack				\
@@ -457,6 +458,9 @@ datawizard_specific_node_SOURCES +=			\
 	datawizard/opencl_codelet_unsigned_inc.c
 endif
 
+datawizard_test_arbiter_SOURCES =	\
+	datawizard/test_arbiter.cpp
+
 main_deprecated_func_CFLAGS = $(AM_CFLAGS) -Wno-deprecated-declarations
 
 main_subgraph_repeat_SOURCES =		\

+ 5 - 1
tests/datawizard/increment_redux.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010, 2012-2014  Université de Bordeaux
+ * Copyright (C) 2010, 2012-2015  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -212,6 +212,10 @@ int main(int argc, char **argv)
 {
 	int ret;
 
+	/* Not supported yet */
+	if (starpu_get_env_number_default("STARPU_GLOBAL_ARBITER", 0) > 0)
+		return STARPU_TEST_SKIPPED;
+
 	ret = starpu_initialize(NULL, &argc, &argv);
 	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");

+ 5 - 1
tests/datawizard/increment_redux_lazy.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010, 2012-2014  Université de Bordeaux
+ * Copyright (C) 2010, 2012-2015  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -193,6 +193,10 @@ int main(int argc, char **argv)
 	int ret;
 	unsigned *var;
 
+	/* Not supported yet */
+	if (starpu_get_env_number_default("STARPU_GLOBAL_ARBITER", 0) > 0)
+		return STARPU_TEST_SKIPPED;
+
 	ret = starpu_initialize(NULL, &argc, &argv);
 	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");

+ 5 - 1
tests/datawizard/increment_redux_v2.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2011-2014  Université de Bordeaux
+ * Copyright (C) 2011-2015  Université de Bordeaux
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -227,6 +227,10 @@ int main(int argc, char **argv)
 {
 	int ret;
 
+	/* Not supported yet */
+	if (starpu_get_env_number_default("STARPU_GLOBAL_ARBITER", 0) > 0)
+		return STARPU_TEST_SKIPPED;
+
 	ret = starpu_initialize(NULL, &argc, &argv);
 	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");

+ 213 - 0
tests/datawizard/test_arbiter.cpp

@@ -0,0 +1,213 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2015  INRIA
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+/*
+ * for i from 0 to nbA
+ *  insert task handles[i] in STARPU_RW|STARPU_COMMUTE
+ *  for j from 0 to nbA
+ *	  if i != j
+			 insert task handles[i] in STARPU_RW|STARPU_COMMUTE, and handles[j] in STARPU_RW|STARPU_COMMUTE
+ */
+
+
+// @FUSE_STARPU
+
+
+#ifdef STARPU_QUICK_CHECK
+#define SLEEP_SLOW 6000
+#define SLEEP_FAST 1000
+#else
+#define SLEEP_SLOW 600000
+#define SLEEP_FAST 100000
+#endif
+
+#include <starpu.h>
+#include "../helper.h"
+
+#include <vector>
+#include <unistd.h>
+
+static unsigned nb, nb_slow;
+
+void callback(void * /*buffers*/[], void * /*cl_arg*/)
+{
+	unsigned val;
+	val = STARPU_ATOMIC_ADD(&nb, 1);
+	FPRINTF(stdout,"callback in (%d)\n", val); fflush(stdout);
+	usleep(SLEEP_FAST);
+	val = STARPU_ATOMIC_ADD(&nb, -1);
+	FPRINTF(stdout,"callback out (%d)\n", val); fflush(stdout);
+}
+
+void callback_slow(void * /*buffers*/[], void * /*cl_arg*/)
+{
+	unsigned val;
+	val = STARPU_ATOMIC_ADD(&nb_slow, 1);
+	FPRINTF(stdout,"callback_slow in (%d)\n", val); fflush(stdout);
+	usleep(SLEEP_SLOW);
+	val = STARPU_ATOMIC_ADD(&nb_slow, -1);
+	FPRINTF(stdout,"callback_slow out (%d)\n", val); fflush(stdout);
+}
+
+
+int main(int /*argc*/, char** /*argv*/)
+{
+	int ret;
+	struct starpu_conf conf;
+	starpu_arbiter_t arbiter, arbiter2;
+	ret = starpu_conf_init(&conf);
+	STARPU_ASSERT(ret == 0);
+	//conf.ncpus = 1;//// 4
+	ret = starpu_init(&conf);
+	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
+	STARPU_ASSERT(ret == 0);
+
+	FPRINTF(stdout, "Max Thread %d\n", starpu_worker_get_count());
+
+	//////////////////////////////////////////////////////
+
+	starpu_codelet normalCodelete;
+	{
+		memset(&normalCodelete, 0, sizeof(normalCodelete));
+		normalCodelete.where = STARPU_CPU;
+		normalCodelete.cpu_funcs[0] = callback;
+		normalCodelete.nbuffers = 2;
+		normalCodelete.modes[0] = starpu_data_access_mode(STARPU_RW|STARPU_COMMUTE);
+		normalCodelete.modes[1] = starpu_data_access_mode(STARPU_RW|STARPU_COMMUTE);
+		normalCodelete.name = "normalCodelete";
+	}
+	starpu_codelet slowCodelete;
+	{
+		memset(&slowCodelete, 0, sizeof(slowCodelete));
+		slowCodelete.where = STARPU_CPU;
+		slowCodelete.cpu_funcs[0] = callback_slow;
+		slowCodelete.nbuffers = 1;
+		slowCodelete.modes[0] = starpu_data_access_mode (STARPU_RW|STARPU_COMMUTE);
+		slowCodelete.name = "slowCodelete";
+	}
+
+	//////////////////////////////////////////////////////
+	//////////////////////////////////////////////////////
+
+	///const int nbA = 3;
+	const int nbA = 10;
+	FPRINTF(stdout, "Nb A = %d\n", nbA);
+
+	std::vector<starpu_data_handle_t> handleA(nbA);
+	std::vector<int> dataA(nbA);
+	arbiter = starpu_arbiter_create();
+	arbiter2 = starpu_arbiter_create();
+	for(int idx = 0 ; idx < nbA ; ++idx)
+	{
+		dataA[idx] = idx;
+	}
+	for(int idxHandle = 0 ; idxHandle < nbA ; ++idxHandle)
+	{
+		starpu_variable_data_register(&handleA[idxHandle], 0, (uintptr_t)&dataA[idxHandle], sizeof(dataA[idxHandle]));
+		starpu_data_assign_arbiter(handleA[idxHandle], arbiter);
+	}
+
+	//////////////////////////////////////////////////////
+	//////////////////////////////////////////////////////
+	FPRINTF(stdout,"Submit tasks\n");
+
+	for(int idxHandleA1 = 0 ; idxHandleA1 < nbA ; ++idxHandleA1)
+	{
+		ret = starpu_task_insert(&slowCodelete,
+				(STARPU_RW|STARPU_COMMUTE), handleA[idxHandleA1],
+				0);
+		if (ret == -ENODEV) goto out;
+		for(int idxHandleA2 = 0 ; idxHandleA2 < nbA ; ++idxHandleA2)
+		{
+			if(idxHandleA1 != idxHandleA2)
+			{
+				ret = starpu_task_insert(&normalCodelete,
+						(STARPU_RW|STARPU_COMMUTE), handleA[idxHandleA1],
+						(STARPU_RW|STARPU_COMMUTE), handleA[idxHandleA2],
+						0);
+				if (ret == -ENODEV) goto out;
+			}
+		}
+	}
+
+	//////////////////////////////////////////////////////
+	FPRINTF(stdout,"Wait task\n");
+
+	starpu_task_wait_for_all();
+
+	//////////////////////////////////////////////////////
+	FPRINTF(stdout,"Release data\n");
+
+	for(int idxHandle = 0 ; idxHandle < nbA ; ++idxHandle)
+	{
+		starpu_data_unregister(handleA[idxHandle]);
+	}
+
+	//////////////////////////////////////////////////////
+	FPRINTF(stdout,"Proceed gain, with several arbiters\n");
+
+	for(int idxHandle = 0 ; idxHandle < nbA ; ++idxHandle)
+	{
+		starpu_variable_data_register(&handleA[idxHandle], 0, (uintptr_t)&dataA[idxHandle], sizeof(dataA[idxHandle]));
+		starpu_data_assign_arbiter(handleA[idxHandle], idxHandle%2?arbiter:arbiter2);
+	}
+
+	//////////////////////////////////////////////////////
+	//////////////////////////////////////////////////////
+	FPRINTF(stdout,"Submit tasks\n");
+
+	for(int idxHandleA1 = 0 ; idxHandleA1 < nbA ; ++idxHandleA1)
+	{
+		ret = starpu_task_insert(&slowCodelete,
+				(STARPU_RW|STARPU_COMMUTE), handleA[idxHandleA1],
+				0);
+		if (ret == -ENODEV) goto out;
+		for(int idxHandleA2 = 0 ; idxHandleA2 < nbA ; ++idxHandleA2)
+		{
+			if(idxHandleA1 != idxHandleA2)
+			{
+				ret = starpu_task_insert(&normalCodelete,
+						(STARPU_RW|STARPU_COMMUTE), handleA[idxHandleA1],
+						(STARPU_RW|STARPU_COMMUTE), handleA[idxHandleA2],
+						0);
+				if (ret == -ENODEV) goto out;
+			}
+		}
+	}
+
+	//////////////////////////////////////////////////////
+	FPRINTF(stdout,"Wait task\n");
+
+out:
+	starpu_task_wait_for_all();
+
+	//////////////////////////////////////////////////////
+	FPRINTF(stdout,"Release data\n");
+
+	for(int idxHandle = 0 ; idxHandle < nbA ; ++idxHandle)
+	{
+		starpu_data_unregister(handleA[idxHandle]);
+	}
+	starpu_arbiter_destroy(arbiter);
+	starpu_arbiter_destroy(arbiter2);
+
+	//////////////////////////////////////////////////////
+	FPRINTF(stdout,"Shutdown\n");
+
+	starpu_shutdown();
+
+	return 0;
+}

+ 9 - 0
tools/gdbinit

@@ -311,6 +311,15 @@ define starpu-print-data
     starpu-print-job $requesterlist->j
     set $requesterlist = $requesterlist->_next
   end
+  printf "Arbitered requester tasks\n"
+  set $requesterlist = $data->arbitered_req_list._head
+  while $requesterlist != 0x0
+    printf "mode: "
+    starpu-print-mode $requesterlist->mode
+    printf "\n"
+    starpu-print-job $requesterlist->j
+    set $requesterlist = $requesterlist->_next
+  end
   if ($data->nchildren)
     printf "%d children\n", $data->nchildren
   end