Sfoglia il codice sorgente

Add rbtree and rbtree+list implementation. Use it to introduce priorities in data requests, which default to the task priority or workerorder

Samuel Thibault 9 anni fa
parent
commit
a5f7d96fea

+ 1 - 0
ChangeLog

@@ -21,6 +21,7 @@ StarPU 1.3.0 (svn revision xxxx)
 New features:
   * Enable anticipative writeback by default.
   * New scheduler with heterogeneous priorities
+  * Support priorities for data transfers.
 
 Changes:
   * Vastly improve simgrid simulation time.

+ 2 - 0
include/starpu_data.h

@@ -95,7 +95,9 @@ int starpu_data_request_allocation(starpu_data_handle_t handle, unsigned node);
 
 int starpu_data_fetch_on_node(starpu_data_handle_t handle, unsigned node, unsigned async);
 int starpu_data_prefetch_on_node(starpu_data_handle_t handle, unsigned node, unsigned async);
+int starpu_data_prefetch_on_node_prio(starpu_data_handle_t handle, unsigned node, unsigned async, int prio);
 int starpu_data_idle_prefetch_on_node(starpu_data_handle_t handle, unsigned node, unsigned async);
+int starpu_data_idle_prefetch_on_node_prio(starpu_data_handle_t handle, unsigned node, unsigned async, int prio);
 
 void starpu_data_wont_use(starpu_data_handle_t handle);
 

+ 2 - 0
include/starpu_scheduler.h

@@ -70,7 +70,9 @@ int starpu_combined_worker_get_description(int workerid, int *worker_size, int *
 int starpu_combined_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl);
 
 int starpu_get_prefetch_flag(void);
+int starpu_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned node, int prio);
 int starpu_prefetch_task_input_on_node(struct starpu_task *task, unsigned node);
+int starpu_idle_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned node, int prio);
 int starpu_idle_prefetch_task_input_on_node(struct starpu_task *task, unsigned node);
 
 uint32_t starpu_task_footprint(struct starpu_perfmodel *model, struct starpu_task *task, struct starpu_perfmodel_arch *arch, unsigned nimpl);

+ 4 - 0
src/Makefile.am

@@ -117,6 +117,9 @@ noinst_HEADERS = 						\
 	common/barrier.h					\
 	common/uthash.h						\
 	common/barrier_counter.h				\
+	common/rbtree.h						\
+	common/rbtree_i.h					\
+	common/prio_list.h					\
 	drivers/driver_common/driver_common.h			\
 	drivers/mp_common/mp_common.h				\
 	drivers/mp_common/source_common.h			\
@@ -157,6 +160,7 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 		\
 	common/fxt.c						\
 	common/utils.c						\
 	common/thread.c						\
+	common/rbtree.c						\
 	core/jobs.c						\
 	core/task.c						\
 	core/task_bundle.c					\

+ 206 - 0
src/common/prio_list.h

@@ -0,0 +1,206 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2015  Université de Bordeaux
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+/*
+ * This implements list with priorities (as an int), by using two stages:
+ * - an RB tree stage sorted by priority, whose leafs are...
+ * - ... double-linked lists sorted by insertion order.
+ *
+ * We always keep the 0-priority list allocated, to avoid keeping
+ * allocating/deallocating it when all priorities are 0.
+ *
+ * PRIO_LIST_TYPE(FOO, priority field)
+ * - Declares the following type:
+ *   + priority list: struct FOO_prio_list
+ * - Declares the following inlines:
+ *   * Initialize a new priority list
+ * void FOO_prio_list_init(struct FOO_prio_list*)
+ *   * Add a new element at the end of the list of the priority of the element
+ * void FOO_prio_list_push_back(struct FOO_prio_list*, struct FOO*)
+ *   * Add a new element at the beginning of the list of the priority of the element
+ * void FOO_prio_list_push_back(struct FOO_prio_list*, struct FOO*)
+ *   * Test that the priority list is empty
+ * void FOO_prio_list_empty(struct FOO_prio_list*)
+ *   * Erase element from the priority list
+ * void FOO_prio_list_empty(struct FOO_prio_list*, struct FOO*)
+ *   * Return and erase the first element of the priority list
+ * void FOO_prio_list_pop_front(struct FOO_prio_list*)
+ *   * Catenate second priority list at ends of the first priority list
+ * void FOO_prio_list_push_prio_list_back(struct FOO_prio_list*, struct FOO_prio_list*)
+ *   * Test whether element is part of the list
+ * void FOO_prio_list_ismember(struct FOO_prio_list*, struct FOO*)
+ */
+
+#ifndef __PRIO_LIST_H__
+#define __PRIO_LIST_H__
+
+#include <common/rbtree.h>
+
+#define PRIO_LIST_TYPE(ENAME, PRIOFIELD) \
+	PRIO_LIST_CREATE_TYPE(ENAME, PRIOFIELD)
+
+#define PRIO_LIST_CREATE_TYPE(ENAME, PRIOFIELD) \
+	/* The main type: an RB binary tree */ \
+	struct ENAME##_prio_list { \
+		struct starpu_rbtree tree; \
+	}; \
+	/* The second stage: a list */ \
+	struct ENAME##_prio_list_stage { \
+		struct starpu_rbtree_node node; /* Keep this first so ENAME##_node_to_list_stage can work.  */ \
+		int prio; \
+		struct ENAME##_list list; \
+	}; \
+	static inline struct ENAME##_prio_list_stage *ENAME##_node_to_list_stage(struct starpu_rbtree_node *node) \
+	{ \
+		/* This assumes node is first member of stage */ \
+		return (struct ENAME##_prio_list_stage *) node; \
+	} \
+	static inline void ENAME##_prio_list_init(struct ENAME##_prio_list *priolist) \
+	{ \
+		starpu_rbtree_init(&priolist->tree); \
+	} \
+	static inline void ENAME##_prio_list_deinit(struct ENAME##_prio_list *priolist) \
+	{ \
+		if (starpu_rbtree_empty(&priolist->tree)) \
+			return; \
+		struct starpu_rbtree_node *root = priolist->tree.root; \
+		struct ENAME##_prio_list_stage *stage = ENAME##_node_to_list_stage(root); \
+		assert(ENAME##_list_empty(&stage->list)); \
+		assert(!root->children[0] && !root->children[1]); \
+		starpu_rbtree_remove(&priolist->tree, root); \
+		free(stage); \
+	} \
+	static inline int ENAME##_prio_list_cmp_fn(int prio, struct starpu_rbtree_node *node) \
+	{ \
+		/* Sort by decreasing order */ \
+		struct ENAME##_prio_list_stage *e2 = ENAME##_node_to_list_stage(node); \
+		return (e2->PRIOFIELD - prio); \
+	} \
+	static inline struct ENAME##_prio_list_stage *ENAME##_prio_list_add(struct ENAME##_prio_list *priolist, int prio) \
+	{ \
+		unsigned long slot; \
+		struct starpu_rbtree_node *node; \
+		struct ENAME##_prio_list_stage *stage; \
+		node = starpu_rbtree_lookup_slot(&priolist->tree, prio, ENAME##_prio_list_cmp_fn, slot); \
+		if (node) \
+			stage = ENAME##_node_to_list_stage(node); \
+		else { \
+			stage = malloc(sizeof(*stage)); \
+			starpu_rbtree_node_init(&stage->node); \
+			stage->prio = prio; \
+			_starpu_data_request_list_init(&stage->list); \
+			starpu_rbtree_insert_slot(&priolist->tree, slot, &stage->node); \
+		} \
+		return stage; \
+	} \
+	static inline void ENAME##_prio_list_push_back(struct ENAME##_prio_list *priolist, struct ENAME *e) \
+	{ \
+		struct ENAME##_prio_list_stage *stage = ENAME##_prio_list_add(priolist, e->PRIOFIELD); \
+		ENAME##_list_push_back(&stage->list, e); \
+	} \
+	static inline void ENAME##_prio_list_push_front(struct ENAME##_prio_list *priolist, struct ENAME *e) \
+	{ \
+		struct ENAME##_prio_list_stage *stage = ENAME##_prio_list_add(priolist, e->PRIOFIELD); \
+		ENAME##_list_push_front(&stage->list, e); \
+	} \
+	static inline int ENAME##_prio_list_empty(struct ENAME##_prio_list *priolist) \
+	{ \
+		if (starpu_rbtree_empty(&priolist->tree)) \
+			return 1; \
+		struct starpu_rbtree_node *root = priolist->tree.root; \
+		struct ENAME##_prio_list_stage *stage = ENAME##_node_to_list_stage(root); \
+		if (ENAME##_list_empty(&stage->list) && !root->children[0] && !root->children[1]) \
+			/* Just one empty list */ \
+			return 1; \
+		return 0; \
+	} \
+	static inline void ENAME##_prio_list_erase(struct ENAME##_prio_list *priolist, struct ENAME *e) \
+	{ \
+		struct starpu_rbtree_node *node = starpu_rbtree_lookup(&priolist->tree, e->PRIOFIELD, ENAME##_prio_list_cmp_fn); \
+		struct ENAME##_prio_list_stage *stage = ENAME##_node_to_list_stage(node); \
+		ENAME##_list_erase(&stage->list, e); \
+		if (ENAME##_list_empty(&stage->list) && stage->prio != 0) \
+		{ \
+			/* stage got empty, remove it */ \
+			starpu_rbtree_remove(&priolist->tree, node); \
+			free(stage); \
+		} \
+	} \
+	static inline struct ENAME *ENAME##_prio_list_pop_front(struct ENAME##_prio_list *priolist) \
+	{ \
+		struct starpu_rbtree_node *node; \
+		struct ENAME##_prio_list_stage *stage; \
+		struct ENAME *ret; \
+		node = starpu_rbtree_first(&priolist->tree); \
+		while(1) { \
+			struct starpu_rbtree_node *next; \
+			if (!node) \
+				/* Tree is empty */ \
+				return NULL; \
+			stage = ENAME##_node_to_list_stage(node); \
+			if (!ENAME##_list_empty(&stage->list)) \
+				break; \
+			/* Empty list, skip to next tree entry */ \
+			next = starpu_rbtree_next(node); \
+			/* drop it if not 0-prio */ \
+			if (stage->prio != 0) \
+			{ \
+				starpu_rbtree_remove(&priolist->tree, node); \
+				free(stage); \
+			} \
+			node = next; \
+		} \
+		ret = ENAME##_list_pop_front(&stage->list); \
+		if (ENAME##_list_empty(&stage->list) && stage->prio != 0) \
+		{ \
+			/* stage got empty, remove it */ \
+			starpu_rbtree_remove(&priolist->tree, node); \
+			free(stage); \
+		} \
+		return ret; \
+	} \
+	static inline void ENAME##_prio_list_push_prio_list_back(struct ENAME##_prio_list *priolist, struct ENAME##_prio_list *priolist_toadd) \
+	{ \
+		struct starpu_rbtree_node *node_toadd, *tmp; \
+		starpu_rbtree_for_each_remove(&priolist_toadd->tree, node_toadd, tmp) { \
+			struct ENAME##_prio_list_stage *stage_toadd = ENAME##_node_to_list_stage(node_toadd); \
+			unsigned long slot; \
+			struct starpu_rbtree_node *node = starpu_rbtree_lookup_slot(&priolist->tree, stage_toadd->prio, ENAME##_prio_list_cmp_fn, slot); \
+			if (node) \
+			{ \
+				/* Catenate the lists */ \
+				struct ENAME##_prio_list_stage *stage = ENAME##_node_to_list_stage(node); \
+				ENAME##_list_push_list_back(&stage->list, &stage_toadd->list); \
+				free(node_toadd); \
+			} \
+			else \
+			{ \
+				/* Just move the node between the trees */ \
+				starpu_rbtree_insert_slot(&priolist->tree, slot, node_toadd); \
+			} \
+		} \
+	} \
+	static inline int ENAME##_prio_list_ismember(struct ENAME##_prio_list *priolist, struct ENAME *e) \
+	{ \
+		struct starpu_rbtree_node *node = starpu_rbtree_lookup(&priolist->tree, e->PRIOFIELD, ENAME##_prio_list_cmp_fn); \
+		if (node) { \
+			struct ENAME##_prio_list_stage *stage = ENAME##_node_to_list_stage(node); \
+			return ENAME##_list_ismember(&stage->list, e); \
+		} \
+		return 0; \
+	}
+
+#endif // __PRIO_LIST_H__

+ 482 - 0
src/common/rbtree.c

@@ -0,0 +1,482 @@
+/*
+ * Copyright (c) 2010, 2012 Richard Braun.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <common/rbtree.h>
+#include <common/rbtree_i.h>
+#include <sys/types.h>
+
+#define unlikely(expr) __builtin_expect(!!(expr), 0)
+
+/*
+ * Return the index of a node in the children array of its parent.
+ *
+ * The parent parameter must not be null, and must be the parent of the
+ * given node.
+ */
+static inline int starpu_rbtree_index(const struct starpu_rbtree_node *node,
+                               const struct starpu_rbtree_node *parent)
+{
+    assert(parent != NULL);
+    assert((node == NULL) || (starpu_rbtree_parent(node) == parent));
+
+    if (parent->children[STARPU_RBTREE_LEFT] == node)
+        return STARPU_RBTREE_LEFT;
+
+    assert(parent->children[STARPU_RBTREE_RIGHT] == node);
+
+    return STARPU_RBTREE_RIGHT;
+}
+
+/*
+ * Return the color of a node.
+ */
+static inline int starpu_rbtree_color(const struct starpu_rbtree_node *node)
+{
+    return node->parent & STARPU_RBTREE_COLOR_MASK;
+}
+
+/*
+ * Return true if the node is red.
+ */
+static inline int starpu_rbtree_is_red(const struct starpu_rbtree_node *node)
+{
+    return starpu_rbtree_color(node) == STARPU_RBTREE_COLOR_RED;
+}
+
+/*
+ * Return true if the node is black.
+ */
+static inline int starpu_rbtree_is_black(const struct starpu_rbtree_node *node)
+{
+    return starpu_rbtree_color(node) == STARPU_RBTREE_COLOR_BLACK;
+}
+
+/*
+ * Set the parent of a node, retaining its current color.
+ */
+static inline void starpu_rbtree_set_parent(struct starpu_rbtree_node *node,
+                                     struct starpu_rbtree_node *parent)
+{
+    assert(starpu_rbtree_check_alignment(node));
+    assert(starpu_rbtree_check_alignment(parent));
+
+    node->parent = (unsigned long)parent | (node->parent & STARPU_RBTREE_COLOR_MASK);
+}
+
+/*
+ * Set the color of a node, retaining its current parent.
+ */
+static inline void starpu_rbtree_set_color(struct starpu_rbtree_node *node, int color)
+{
+    assert((color & ~STARPU_RBTREE_COLOR_MASK) == 0);
+    node->parent = (node->parent & STARPU_RBTREE_PARENT_MASK) | color;
+}
+
+/*
+ * Set the color of a node to red, retaining its current parent.
+ */
+static inline void starpu_rbtree_set_red(struct starpu_rbtree_node *node)
+{
+    starpu_rbtree_set_color(node, STARPU_RBTREE_COLOR_RED);
+}
+
+/*
+ * Set the color of a node to black, retaining its current parent.
+ */
+static inline void starpu_rbtree_set_black(struct starpu_rbtree_node *node)
+{
+    starpu_rbtree_set_color(node, STARPU_RBTREE_COLOR_BLACK);
+}
+
+/*
+ * Perform a tree rotation, rooted at the given node.
+ *
+ * The direction parameter defines the rotation direction and is either
+ * STARPU_RBTREE_LEFT or STARPU_RBTREE_RIGHT.
+ */
+static void starpu_rbtree_rotate(struct starpu_rbtree *tree, struct starpu_rbtree_node *node,
+                          int direction)
+{
+    struct starpu_rbtree_node *parent, *rnode;
+    int left, right;
+
+    left = direction;
+    right = 1 - left;
+    parent = starpu_rbtree_parent(node);
+    rnode = node->children[right];
+
+    node->children[right] = rnode->children[left];
+
+    if (rnode->children[left] != NULL)
+        starpu_rbtree_set_parent(rnode->children[left], node);
+
+    rnode->children[left] = node;
+    starpu_rbtree_set_parent(rnode, parent);
+
+    if (unlikely(parent == NULL))
+        tree->root = rnode;
+    else
+        parent->children[starpu_rbtree_index(node, parent)] = rnode;
+
+    starpu_rbtree_set_parent(node, rnode);
+}
+
+void starpu_rbtree_insert_rebalance(struct starpu_rbtree *tree, struct starpu_rbtree_node *parent,
+                             int index, struct starpu_rbtree_node *node)
+{
+    struct starpu_rbtree_node *grand_parent, *uncle, *tmp;
+    int left, right;
+
+    assert(starpu_rbtree_check_alignment(parent));
+    assert(starpu_rbtree_check_alignment(node));
+
+    node->parent = (unsigned long)parent | STARPU_RBTREE_COLOR_RED;
+    node->children[STARPU_RBTREE_LEFT] = NULL;
+    node->children[STARPU_RBTREE_RIGHT] = NULL;
+
+    if (unlikely(parent == NULL))
+        tree->root = node;
+    else
+        parent->children[index] = node;
+
+    for (;;) {
+        if (parent == NULL) {
+            starpu_rbtree_set_black(node);
+            break;
+        }
+
+        if (starpu_rbtree_is_black(parent))
+            break;
+
+        grand_parent = starpu_rbtree_parent(parent);
+        assert(grand_parent != NULL);
+
+        left = starpu_rbtree_index(parent, grand_parent);
+        right = 1 - left;
+
+        uncle = grand_parent->children[right];
+
+        /*
+         * Uncle is red. Flip colors and repeat at grand parent.
+         */
+        if ((uncle != NULL) && starpu_rbtree_is_red(uncle)) {
+            starpu_rbtree_set_black(uncle);
+            starpu_rbtree_set_black(parent);
+            starpu_rbtree_set_red(grand_parent);
+            node = grand_parent;
+            parent = starpu_rbtree_parent(node);
+            continue;
+        }
+
+        /*
+         * Node is the right child of its parent. Rotate left at parent.
+         */
+        if (parent->children[right] == node) {
+            starpu_rbtree_rotate(tree, parent, left);
+            tmp = node;
+            node = parent;
+            parent = tmp;
+        }
+
+        /*
+         * Node is the left child of its parent. Handle colors, rotate right
+         * at grand parent, and leave.
+         */
+        starpu_rbtree_set_black(parent);
+        starpu_rbtree_set_red(grand_parent);
+        starpu_rbtree_rotate(tree, grand_parent, right);
+        break;
+    }
+
+    assert(starpu_rbtree_is_black(tree->root));
+}
+
+void starpu_rbtree_remove(struct starpu_rbtree *tree, struct starpu_rbtree_node *node)
+{
+    struct starpu_rbtree_node *child, *parent, *brother;
+    int color, left, right;
+
+    if (node->children[STARPU_RBTREE_LEFT] == NULL)
+        child = node->children[STARPU_RBTREE_RIGHT];
+    else if (node->children[STARPU_RBTREE_RIGHT] == NULL)
+        child = node->children[STARPU_RBTREE_LEFT];
+    else {
+        struct starpu_rbtree_node *successor;
+
+        /*
+         * Two-children case: replace the node with its successor.
+         */
+
+        successor = node->children[STARPU_RBTREE_RIGHT];
+
+        while (successor->children[STARPU_RBTREE_LEFT] != NULL)
+            successor = successor->children[STARPU_RBTREE_LEFT];
+
+        color = starpu_rbtree_color(successor);
+        child = successor->children[STARPU_RBTREE_RIGHT];
+        parent = starpu_rbtree_parent(node);
+
+        if (unlikely(parent == NULL))
+            tree->root = successor;
+        else
+            parent->children[starpu_rbtree_index(node, parent)] = successor;
+
+        parent = starpu_rbtree_parent(successor);
+
+        /*
+         * Set parent directly to keep the original color.
+         */
+        successor->parent = node->parent;
+        successor->children[STARPU_RBTREE_LEFT] = node->children[STARPU_RBTREE_LEFT];
+        starpu_rbtree_set_parent(successor->children[STARPU_RBTREE_LEFT], successor);
+
+        if (node == parent)
+            parent = successor;
+        else {
+            successor->children[STARPU_RBTREE_RIGHT] = node->children[STARPU_RBTREE_RIGHT];
+            starpu_rbtree_set_parent(successor->children[STARPU_RBTREE_RIGHT], successor);
+            parent->children[STARPU_RBTREE_LEFT] = child;
+
+            if (child != NULL)
+                starpu_rbtree_set_parent(child, parent);
+        }
+
+        goto update_color;
+    }
+
+    /*
+     * Node has at most one child.
+     */
+
+    color = starpu_rbtree_color(node);
+    parent = starpu_rbtree_parent(node);
+
+    if (child != NULL)
+        starpu_rbtree_set_parent(child, parent);
+
+    if (unlikely(parent == NULL))
+        tree->root = child;
+    else
+        parent->children[starpu_rbtree_index(node, parent)] = child;
+
+    /*
+     * The node has been removed, update the colors. The child pointer can
+     * be null, in which case it is considered a black leaf.
+     */
+update_color:
+    if (color == STARPU_RBTREE_COLOR_RED)
+        return;
+
+    for (;;) {
+        if ((child != NULL) && starpu_rbtree_is_red(child)) {
+            starpu_rbtree_set_black(child);
+            break;
+        }
+
+        if (parent == NULL)
+            break;
+
+        left = starpu_rbtree_index(child, parent);
+        right = 1 - left;
+
+        brother = parent->children[right];
+
+        /*
+         * Brother is red. Recolor and rotate left at parent so that brother
+         * becomes black.
+         */
+        if (starpu_rbtree_is_red(brother)) {
+            starpu_rbtree_set_black(brother);
+            starpu_rbtree_set_red(parent);
+            starpu_rbtree_rotate(tree, parent, left);
+            brother = parent->children[right];
+        }
+
+        /*
+         * Brother has no red child. Recolor and repeat at parent.
+         */
+        if (((brother->children[STARPU_RBTREE_LEFT] == NULL)
+             || starpu_rbtree_is_black(brother->children[STARPU_RBTREE_LEFT]))
+            && ((brother->children[STARPU_RBTREE_RIGHT] == NULL)
+                || starpu_rbtree_is_black(brother->children[STARPU_RBTREE_RIGHT]))) {
+            starpu_rbtree_set_red(brother);
+            child = parent;
+            parent = starpu_rbtree_parent(child);
+            continue;
+        }
+
+        /*
+         * Brother's right child is black. Recolor and rotate right at brother.
+         */
+        if ((brother->children[right] == NULL)
+            || starpu_rbtree_is_black(brother->children[right])) {
+            starpu_rbtree_set_black(brother->children[left]);
+            starpu_rbtree_set_red(brother);
+            starpu_rbtree_rotate(tree, brother, right);
+            brother = parent->children[right];
+        }
+
+        /*
+         * Brother's left child is black. Exchange parent and brother colors
+         * (we already know brother is black), set brother's right child black,
+         * rotate left at parent and leave.
+         */
+        starpu_rbtree_set_color(brother, starpu_rbtree_color(parent));
+        starpu_rbtree_set_black(parent);
+        starpu_rbtree_set_black(brother->children[right]);
+        starpu_rbtree_rotate(tree, parent, left);
+        break;
+    }
+
+    assert((tree->root == NULL) || starpu_rbtree_is_black(tree->root));
+}
+
+struct starpu_rbtree_node * starpu_rbtree_nearest(struct starpu_rbtree_node *parent, int index,
+                                    int direction)
+{
+    assert(starpu_rbtree_check_index(direction));
+
+    if (parent == NULL)
+        return NULL;
+
+    assert(starpu_rbtree_check_index(index));
+
+    if (index != direction)
+        return parent;
+
+    return starpu_rbtree_walk(parent, direction);
+}
+
+struct starpu_rbtree_node * starpu_rbtree_firstlast(const struct starpu_rbtree *tree, int direction)
+{
+    struct starpu_rbtree_node *prev, *cur;
+
+    assert(starpu_rbtree_check_index(direction));
+
+    prev = NULL;
+
+    for (cur = tree->root; cur != NULL; cur = cur->children[direction])
+        prev = cur;
+
+    return prev;
+}
+
+struct starpu_rbtree_node * starpu_rbtree_walk(struct starpu_rbtree_node *node, int direction)
+{
+    int left, right;
+
+    assert(starpu_rbtree_check_index(direction));
+
+    left = direction;
+    right = 1 - left;
+
+    if (node == NULL)
+        return NULL;
+
+    if (node->children[left] != NULL) {
+        node = node->children[left];
+
+        while (node->children[right] != NULL)
+            node = node->children[right];
+    } else {
+        struct starpu_rbtree_node *parent;
+        int index;
+
+        for (;;) {
+            parent = starpu_rbtree_parent(node);
+
+            if (parent == NULL)
+                return NULL;
+
+            index = starpu_rbtree_index(node, parent);
+            node = parent;
+
+            if (index == right)
+                break;
+        }
+    }
+
+    return node;
+}
+
+/*
+ * Return the left-most deepest child node of the given node.
+ */
+static struct starpu_rbtree_node * starpu_rbtree_find_deepest(struct starpu_rbtree_node *node)
+{
+    struct starpu_rbtree_node *parent;
+
+    assert(node != NULL);
+
+    for (;;) {
+        parent = node;
+        node = node->children[STARPU_RBTREE_LEFT];
+
+        if (node == NULL) {
+            node = parent->children[STARPU_RBTREE_RIGHT];
+
+            if (node == NULL)
+                return parent;
+        }
+    }
+}
+
+struct starpu_rbtree_node * starpu_rbtree_postwalk_deepest(const struct starpu_rbtree *tree)
+{
+    struct starpu_rbtree_node *node;
+
+    node = tree->root;
+
+    if (node == NULL)
+        return NULL;
+
+    return starpu_rbtree_find_deepest(node);
+}
+
+struct starpu_rbtree_node * starpu_rbtree_postwalk_unlink(struct starpu_rbtree_node *node)
+{
+    struct starpu_rbtree_node *parent;
+    int index;
+
+    if (node == NULL)
+        return NULL;
+
+    assert(node->children[STARPU_RBTREE_LEFT] == NULL);
+    assert(node->children[STARPU_RBTREE_RIGHT] == NULL);
+
+    parent = starpu_rbtree_parent(node);
+
+    if (parent == NULL)
+        return NULL;
+
+    index = starpu_rbtree_index(node, parent);
+    parent->children[index] = NULL;
+    node = parent->children[STARPU_RBTREE_RIGHT];
+
+    if (node == NULL)
+        return parent;
+
+    return starpu_rbtree_find_deepest(node);
+}

+ 307 - 0
src/common/rbtree.h

@@ -0,0 +1,307 @@
+/*
+ * Copyright (c) 2010, 2011 Richard Braun.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ *
+ * Red-black tree.
+ */
+
+#ifndef _KERN_RBTREE_H
+#define _KERN_RBTREE_H
+
+#include <stddef.h>
+#include <assert.h>
+#include <sys/types.h>
+
+#define MACRO_BEGIN ({
+#define MACRO_END })
+/*
+ * Indexes of the left and right nodes in the children array of a node.
+ */
+#define STARPU_RBTREE_LEFT     0
+#define STARPU_RBTREE_RIGHT    1
+
+/*
+ * Red-black node.
+ */
+struct starpu_rbtree_node;
+
+/*
+ * Red-black tree.
+ */
+struct starpu_rbtree;
+
+/*
+ * Static tree initializer.
+ */
+#define STARPU_RBTREE_INITIALIZER { NULL }
+
+#include "rbtree_i.h"
+
+/*
+ * Initialize a tree.
+ */
+static inline void starpu_rbtree_init(struct starpu_rbtree *tree)
+{
+    tree->root = NULL;
+}
+
+/*
+ * Initialize a node.
+ *
+ * A node is in no tree when its parent points to itself.
+ */
+static inline void starpu_rbtree_node_init(struct starpu_rbtree_node *node)
+{
+    assert(starpu_rbtree_check_alignment(node));
+
+    node->parent = (unsigned long)node | STARPU_RBTREE_COLOR_RED;
+    node->children[STARPU_RBTREE_LEFT] = NULL;
+    node->children[STARPU_RBTREE_RIGHT] = NULL;
+}
+
+/*
+ * Return true if node is in no tree.
+ */
+static inline int starpu_rbtree_node_unlinked(const struct starpu_rbtree_node *node)
+{
+    return starpu_rbtree_parent(node) == node;
+}
+
+/*
+ * Macro that evaluates to the address of the structure containing the
+ * given node based on the given type and member.
+ */
+#define starpu_rbtree_entry(node, type, member) structof(node, type, member)
+
+/*
+ * Return true if tree is empty.
+ */
+static inline int starpu_rbtree_empty(const struct starpu_rbtree *tree)
+{
+    return tree->root == NULL;
+}
+
+/*
+ * Look up a node in a tree.
+ *
+ * Note that implementing the lookup algorithm as a macro gives two benefits:
+ * First, it avoids the overhead of a callback function. Next, the type of the
+ * cmp_fn parameter isn't rigid. The only guarantee offered by this
+ * implementation is that the key parameter is the first parameter given to
+ * cmp_fn. This way, users can pass only the value they need for comparison
+ * instead of e.g. allocating a full structure on the stack.
+ *
+ * See starpu_rbtree_insert().
+ */
+#define starpu_rbtree_lookup(tree, key, cmp_fn)                \
+MACRO_BEGIN                                             \
+    struct starpu_rbtree_node *___cur;                         \
+    int ___diff;                                        \
+                                                        \
+    ___cur = (tree)->root;                              \
+                                                        \
+    while (___cur != NULL) {                            \
+        ___diff = cmp_fn(key, ___cur);                  \
+                                                        \
+        if (___diff == 0)                               \
+            break;                                      \
+                                                        \
+        ___cur = ___cur->children[starpu_rbtree_d2i(___diff)]; \
+    }                                                   \
+                                                        \
+    ___cur;                                             \
+MACRO_END
+
+/*
+ * Look up a node or one of its nearest nodes in a tree.
+ *
+ * This macro essentially acts as starpu_rbtree_lookup() but if no entry matched
+ * the key, an additional step is performed to obtain the next or previous
+ * node, depending on the direction (left or right).
+ *
+ * The constraints that apply to the key parameter are the same as for
+ * starpu_rbtree_lookup().
+ */
+#define starpu_rbtree_lookup_nearest(tree, key, cmp_fn, dir)       \
+MACRO_BEGIN                                                 \
+    struct starpu_rbtree_node *___cur, *___prev;                   \
+    int ___diff, ___index;                                  \
+                                                            \
+    ___prev = NULL;                                         \
+    ___index = -1;                                          \
+    ___cur = (tree)->root;                                  \
+                                                            \
+    while (___cur != NULL) {                                \
+        ___diff = cmp_fn(key, ___cur);                      \
+                                                            \
+        if (___diff == 0)                                   \
+            break;                                          \
+                                                            \
+        ___prev = ___cur;                                   \
+        ___index = starpu_rbtree_d2i(___diff);                     \
+        ___cur = ___cur->children[___index];                \
+    }                                                       \
+                                                            \
+    if (___cur == NULL)                                     \
+        ___cur = starpu_rbtree_nearest(___prev, ___index, dir);    \
+                                                            \
+    ___cur;                                                 \
+MACRO_END
+
+/*
+ * Insert a node in a tree.
+ *
+ * This macro performs a standard lookup to obtain the insertion point of
+ * the given node in the tree (it is assumed that the inserted node never
+ * compares equal to any other entry in the tree) and links the node. It
+ * then checks red-black rules violations, and rebalances the tree if
+ * necessary.
+ *
+ * Unlike starpu_rbtree_lookup(), the cmp_fn parameter must compare two complete
+ * entries, so it is suggested to use two different comparison inline
+ * functions, such as myobj_cmp_lookup() and myobj_cmp_insert(). There is no
+ * guarantee about the order of the nodes given to the comparison function.
+ *
+ * See starpu_rbtree_lookup().
+ */
+#define starpu_rbtree_insert(tree, node, cmp_fn)                   \
+MACRO_BEGIN                                                 \
+    struct starpu_rbtree_node *___cur, *___prev;                   \
+    int ___diff, ___index;                                  \
+                                                            \
+    ___prev = NULL;                                         \
+    ___index = -1;                                          \
+    ___cur = (tree)->root;                                  \
+                                                            \
+    while (___cur != NULL) {                                \
+        ___diff = cmp_fn(node, ___cur);                     \
+        assert(___diff != 0);                               \
+        ___prev = ___cur;                                   \
+        ___index = starpu_rbtree_d2i(___diff);                     \
+        ___cur = ___cur->children[___index];                \
+    }                                                       \
+                                                            \
+    starpu_rbtree_insert_rebalance(tree, ___prev, ___index, node); \
+MACRO_END
+
+/*
+ * Look up a node/slot pair in a tree.
+ *
+ * This macro essentially acts as starpu_rbtree_lookup() but in addition to a node,
+ * it also returns a slot, which identifies an insertion point in the tree.
+ * If the returned node is null, the slot can be used by starpu_rbtree_insert_slot()
+ * to insert without the overhead of an additional lookup. The slot is a
+ * simple unsigned long integer.
+ *
+ * The constraints that apply to the key parameter are the same as for
+ * starpu_rbtree_lookup().
+ */
+#define starpu_rbtree_lookup_slot(tree, key, cmp_fn, slot) \
+MACRO_BEGIN                                         \
+    struct starpu_rbtree_node *___cur, *___prev;           \
+    int ___diff, ___index;                          \
+                                                    \
+    ___prev = NULL;                                 \
+    ___index = 0;                                   \
+    ___cur = (tree)->root;                          \
+                                                    \
+    while (___cur != NULL) {                        \
+        ___diff = cmp_fn(key, ___cur);              \
+                                                    \
+        if (___diff == 0)                           \
+            break;                                  \
+                                                    \
+        ___prev = ___cur;                           \
+        ___index = starpu_rbtree_d2i(___diff);             \
+        ___cur = ___cur->children[___index];        \
+    }                                               \
+                                                    \
+    (slot) = starpu_rbtree_slot(___prev, ___index);        \
+    ___cur;                                         \
+MACRO_END
+
+/*
+ * Insert a node at an insertion point in a tree.
+ *
+ * This macro essentially acts as starpu_rbtree_insert() except that it doesn't
+ * obtain the insertion point with a standard lookup. The insertion point
+ * is obtained by calling starpu_rbtree_lookup_slot(). In addition, the new node
+ * must not compare equal to an existing node in the tree (i.e. the slot
+ * must denote a null node).
+ */
+static inline void
+starpu_rbtree_insert_slot(struct starpu_rbtree *tree, unsigned long slot,
+                   struct starpu_rbtree_node *node)
+{
+    struct starpu_rbtree_node *parent;
+    int index;
+
+    parent = starpu_rbtree_slot_parent(slot);
+    index = starpu_rbtree_slot_index(slot);
+    starpu_rbtree_insert_rebalance(tree, parent, index, node);
+}
+
+/*
+ * Remove a node from a tree.
+ *
+ * After completion, the node is stale.
+ */
+void starpu_rbtree_remove(struct starpu_rbtree *tree, struct starpu_rbtree_node *node);
+
+/*
+ * Return the first node of a tree.
+ */
+#define starpu_rbtree_first(tree) starpu_rbtree_firstlast(tree, STARPU_RBTREE_LEFT)
+
+/*
+ * Return the last node of a tree.
+ */
+#define starpu_rbtree_last(tree) starpu_rbtree_firstlast(tree, STARPU_RBTREE_RIGHT)
+
+/*
+ * Return the node previous to the given node.
+ */
+#define starpu_rbtree_prev(node) starpu_rbtree_walk(node, STARPU_RBTREE_LEFT)
+
+/*
+ * Return the node next to the given node.
+ */
+#define starpu_rbtree_next(node) starpu_rbtree_walk(node, STARPU_RBTREE_RIGHT)
+
+/*
+ * Forge a loop to process all nodes of a tree, removing them when visited.
+ *
+ * This macro can only be used to destroy a tree, so that the resources used
+ * by the entries can be released by the user. It basically removes all nodes
+ * without doing any color checking.
+ *
+ * After completion, all nodes and the tree root member are stale.
+ */
+#define starpu_rbtree_for_each_remove(tree, node, tmp)         \
+for (node = starpu_rbtree_postwalk_deepest(tree),              \
+     tmp = starpu_rbtree_postwalk_unlink(node);                \
+     node != NULL;                                      \
+     node = tmp, tmp = starpu_rbtree_postwalk_unlink(node))    \
+
+#endif /* _KERN_RBTREE_H */

+ 186 - 0
src/common/rbtree_i.h

@@ -0,0 +1,186 @@
+/*
+ * Copyright (c) 2010, 2011 Richard Braun.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _KERN_RBTREE_I_H
+#define _KERN_RBTREE_I_H
+
+#include <assert.h>
+
+/*
+ * Red-black node structure.
+ *
+ * To reduce the number of branches and the instruction cache footprint,
+ * the left and right child pointers are stored in an array, and the symmetry
+ * of most tree operations is exploited by using left/right variables when
+ * referring to children.
+ *
+ * In addition, this implementation assumes that all nodes are 4-byte aligned,
+ * so that the least significant bit of the parent member can be used to store
+ * the color of the node. This is true for all modern 32 and 64 bits
+ * architectures, as long as the nodes aren't embedded in structures with
+ * special alignment constraints such as member packing.
+ */
+struct starpu_rbtree_node {
+    unsigned long parent;
+    struct starpu_rbtree_node *children[2];
+};
+
+/*
+ * Red-black tree structure.
+ */
+struct starpu_rbtree {
+    struct starpu_rbtree_node *root;
+};
+
+/*
+ * Masks applied on the parent member of a node to obtain either the
+ * color or the parent address.
+ */
+#define STARPU_RBTREE_COLOR_MASK   0x1UL
+#define STARPU_RBTREE_PARENT_MASK  (~0x3UL)
+
+/*
+ * Node colors.
+ */
+#define STARPU_RBTREE_COLOR_RED    0
+#define STARPU_RBTREE_COLOR_BLACK  1
+
+/*
+ * Masks applied on slots to obtain either the child index or the parent
+ * address.
+ */
+#define STARPU_RBTREE_SLOT_INDEX_MASK  0x1UL
+#define STARPU_RBTREE_SLOT_PARENT_MASK (~STARPU_RBTREE_SLOT_INDEX_MASK)
+
+/*
+ * Return true if the given pointer is suitably aligned.
+ */
+static inline int starpu_rbtree_check_alignment(const struct starpu_rbtree_node *node)
+{
+    return ((unsigned long)node & (~STARPU_RBTREE_PARENT_MASK)) == 0;
+}
+
+/*
+ * Return true if the given index is a valid child index.
+ */
+static inline int starpu_rbtree_check_index(int index)
+{
+    return index == (index & 1);
+}
+
+/*
+ * Convert the result of a comparison into an index in the children array
+ * (0 or 1).
+ *
+ * This function is mostly used when looking up a node.
+ */
+static inline int starpu_rbtree_d2i(int diff)
+{
+    return !(diff <= 0);
+}
+
+/*
+ * Return the parent of a node.
+ */
+static inline struct starpu_rbtree_node * starpu_rbtree_parent(const struct starpu_rbtree_node *node)
+{
+    return (struct starpu_rbtree_node *)(node->parent & STARPU_RBTREE_PARENT_MASK);
+}
+
+/*
+ * Translate an insertion point into a slot.
+ */
+static inline unsigned long starpu_rbtree_slot(struct starpu_rbtree_node *parent, int index)
+{
+    assert(starpu_rbtree_check_alignment(parent));
+    assert(starpu_rbtree_check_index(index));
+    return (unsigned long)parent | index;
+}
+
+/*
+ * Extract the parent address from a slot.
+ */
+static inline struct starpu_rbtree_node * starpu_rbtree_slot_parent(unsigned long slot)
+{
+    return (struct starpu_rbtree_node *)(slot & STARPU_RBTREE_SLOT_PARENT_MASK);
+}
+
+/*
+ * Extract the index from a slot.
+ */
+static inline int starpu_rbtree_slot_index(unsigned long slot)
+{
+    return slot & STARPU_RBTREE_SLOT_INDEX_MASK;
+}
+
+/*
+ * Insert a node in a tree, rebalancing it if necessary.
+ *
+ * The index parameter is the index in the children array of the parent where
+ * the new node is to be inserted. It is ignored if the parent is null.
+ *
+ * This function is intended to be used by the starpu_rbtree_insert() macro only.
+ */
+void starpu_rbtree_insert_rebalance(struct starpu_rbtree *tree, struct starpu_rbtree_node *parent,
+                             int index, struct starpu_rbtree_node *node);
+
+/*
+ * Return the previous or next node relative to a location in a tree.
+ *
+ * The parent and index parameters define the location, which can be empty.
+ * The direction parameter is either STARPU_RBTREE_LEFT (to obtain the previous
+ * node) or STARPU_RBTREE_RIGHT (to obtain the next one).
+ */
+struct starpu_rbtree_node * starpu_rbtree_nearest(struct starpu_rbtree_node *parent, int index,
+                                    int direction);
+
+/*
+ * Return the first or last node of a tree.
+ *
+ * The direction parameter is either STARPU_RBTREE_LEFT (to obtain the first node)
+ * or STARPU_RBTREE_RIGHT (to obtain the last one).
+ */
+struct starpu_rbtree_node * starpu_rbtree_firstlast(const struct starpu_rbtree *tree, int direction);
+
+/*
+ * Return the node next to, or previous to the given node.
+ *
+ * The direction parameter is either STARPU_RBTREE_LEFT (to obtain the previous node)
+ * or STARPU_RBTREE_RIGHT (to obtain the next one).
+ */
+struct starpu_rbtree_node * starpu_rbtree_walk(struct starpu_rbtree_node *node, int direction);
+
+/*
+ * Return the left-most deepest node of a tree, which is the starting point of
+ * the postorder traversal performed by starpu_rbtree_for_each_remove().
+ */
+struct starpu_rbtree_node * starpu_rbtree_postwalk_deepest(const struct starpu_rbtree *tree);
+
+/*
+ * Unlink a node from its tree and return the next (right) node in postorder.
+ */
+struct starpu_rbtree_node * starpu_rbtree_postwalk_unlink(struct starpu_rbtree_node *node);
+
+#endif /* _KERN_RBTREE_I_H */

+ 33 - 17
src/datawizard/coherency.c

@@ -27,6 +27,7 @@
 #include <core/task.h>
 #include <starpu_scheduler.h>
 #include <core/workers.h>
+#include <limits.h>
 
 #ifdef STARPU_SIMGRID
 #include <core/simgrid.h>
@@ -465,7 +466,7 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 								  struct _starpu_data_replicate *dst_replicate,
 								  enum starpu_data_access_mode mode, unsigned is_prefetch,
 								  unsigned async,
-								  void (*callback_func)(void *), void *callback_arg)
+								  void (*callback_func)(void *), void *callback_arg, int prio)
 {
 	/* We don't care about commuting for data requests, that was handled before. */
 	mode &= ~STARPU_COMMUTE;
@@ -615,7 +616,7 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 			/* Create a new request if there was no request to reuse */
 			r = _starpu_create_data_request(handle, hop_src_replicate,
 							hop_dst_replicate, hop_handling_node,
-							mode, ndeps, is_prefetch, 0);
+							mode, ndeps, is_prefetch, prio, 0);
 			nwait++;
 		}
 
@@ -653,7 +654,7 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 		 */
 		struct _starpu_data_request *r = _starpu_create_data_request(handle, dst_replicate,
 							dst_replicate, requesting_node,
-							STARPU_W, nwait, is_prefetch, 1);
+							STARPU_W, nwait, is_prefetch, prio, 1);
 
 		/* and perform the callback after termination */
 		_starpu_data_request_append_callback(r, callback_func, callback_arg);
@@ -701,7 +702,7 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 
 int _starpu_fetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_replicate *dst_replicate,
 			       enum starpu_data_access_mode mode, unsigned detached, unsigned is_prefetch, unsigned async,
-			       void (*callback_func)(void *), void *callback_arg)
+			       void (*callback_func)(void *), void *callback_arg, int prio)
 {
 	unsigned local_node = _starpu_memory_node_get_local_key();
         _STARPU_LOG_IN();
@@ -724,7 +725,7 @@ int _starpu_fetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_
 
 	struct _starpu_data_request *r;
 	r = _starpu_create_request_to_fetch_data(handle, dst_replicate, mode,
-						 is_prefetch, async, callback_func, callback_arg);
+						 is_prefetch, async, callback_func, callback_arg, prio);
 
 	/* If no request was created, the handle was already up-to-date on the
 	 * node. In this case, _starpu_create_request_to_fetch_data has already
@@ -739,19 +740,19 @@ int _starpu_fetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_
         return ret;
 }
 
-static int idle_prefetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode)
+static int idle_prefetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
 {
-	return _starpu_fetch_data_on_node(handle, replicate, mode, 1, 2, 1, NULL, NULL);
+	return _starpu_fetch_data_on_node(handle, replicate, mode, 1, 2, 1, NULL, NULL, prio);
 }
 
-static int prefetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode)
+static int prefetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
 {
-	return _starpu_fetch_data_on_node(handle, replicate, mode, 1, 1, 1, NULL, NULL);
+	return _starpu_fetch_data_on_node(handle, replicate, mode, 1, 1, 1, NULL, NULL, prio);
 }
 
-static int fetch_data(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode)
+static int fetch_data(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
 {
-	return _starpu_fetch_data_on_node(handle, replicate, mode, 0, 0, 0, NULL, NULL);
+	return _starpu_fetch_data_on_node(handle, replicate, mode, 0, 0, 0, NULL, NULL, prio);
 }
 
 uint32_t _starpu_get_data_refcnt(starpu_data_handle_t handle, unsigned node)
@@ -829,7 +830,7 @@ static void _starpu_set_data_requested_flag_if_needed(starpu_data_handle_t handl
 	_starpu_spin_unlock(&handle->header_lock);
 }
 
-int starpu_prefetch_task_input_on_node(struct starpu_task *task, unsigned node)
+int starpu_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned node, int prio)
 {
 	STARPU_ASSERT(!task->prefetched);
 	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
@@ -844,7 +845,7 @@ int starpu_prefetch_task_input_on_node(struct starpu_task *task, unsigned node)
 			continue;
 
 		struct _starpu_data_replicate *replicate = &handle->per_node[node];
-		prefetch_data_on_node(handle, replicate, mode);
+		prefetch_data_on_node(handle, replicate, mode, prio);
 
 		_starpu_set_data_requested_flag_if_needed(handle, replicate);
 	}
@@ -852,7 +853,15 @@ int starpu_prefetch_task_input_on_node(struct starpu_task *task, unsigned node)
 	return 0;
 }
 
-int starpu_idle_prefetch_task_input_on_node(struct starpu_task *task, unsigned node)
+int starpu_prefetch_task_input_on_node(struct starpu_task *task, unsigned node)
+{
+	int prio = task->priority;
+	if (task->workerorder)
+		prio = INT_MAX - task->workerorder;
+	return starpu_prefetch_task_input_on_node_prio(task, node, prio);
+}
+
+int starpu_idle_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned node, int prio)
 {
 	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
 	unsigned index;
@@ -866,12 +875,19 @@ int starpu_idle_prefetch_task_input_on_node(struct starpu_task *task, unsigned n
 			continue;
 
 		struct _starpu_data_replicate *replicate = &handle->per_node[node];
-		idle_prefetch_data_on_node(handle, replicate, mode);
+		idle_prefetch_data_on_node(handle, replicate, mode, prio);
 	}
 
 	return 0;
 }
 
+int starpu_idle_prefetch_task_input_on_node(struct starpu_task *task, unsigned node)
+{
+	int prio = task->priority;
+	if (task->workerorder)
+		prio = INT_MAX - task->workerorder;
+	return starpu_idle_prefetch_task_input_on_node_prio(task, node, prio);
+}
 
 static struct _starpu_data_replicate *get_replicate(starpu_data_handle_t handle, enum starpu_data_access_mode mode, int workerid, unsigned node)
 {
@@ -927,7 +943,7 @@ int _starpu_fetch_task_input(struct _starpu_job *j)
 
 		local_replicate = get_replicate(handle, mode, workerid, node);
 
-		ret = fetch_data(handle, local_replicate, mode);
+		ret = fetch_data(handle, local_replicate, mode, 0);
 		if (STARPU_UNLIKELY(ret))
 			goto enomem;
 
@@ -1120,7 +1136,7 @@ void _starpu_fetch_nowhere_task_input(struct _starpu_job *j)
 
 		local_replicate = get_replicate(handle, mode, -1, node);
 
-		_starpu_fetch_data_on_node(handle, local_replicate, mode, 0, 0, 1, _starpu_fetch_nowhere_task_input_cb, wrapper);
+		_starpu_fetch_data_on_node(handle, local_replicate, mode, 0, 0, 1, _starpu_fetch_nowhere_task_input_cb, wrapper, 0);
 	}
 
 	if (profiling && task->profiling_info)

+ 2 - 2
src/datawizard/coherency.h

@@ -269,7 +269,7 @@ void _starpu_display_msi_stats(void);
  */
 int _starpu_fetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate,
 			       enum starpu_data_access_mode mode, unsigned detached, unsigned is_prefetch, unsigned async,
-			       void (*callback_func)(void *), void *callback_arg);
+			       void (*callback_func)(void *), void *callback_arg, int prio);
 /* This releases a reference on the handle */
 void _starpu_release_data_on_node(struct _starpu_data_state *state, uint32_t default_wt_mask,
 				  struct _starpu_data_replicate *replicate);
@@ -306,7 +306,7 @@ struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_ha
 								  struct _starpu_data_replicate *dst_replicate,
 								  enum starpu_data_access_mode mode, unsigned is_prefetch,
 								  unsigned async,
-								  void (*callback_func)(void *), void *callback_arg);
+								  void (*callback_func)(void *), void *callback_arg, int prio);
 
 void _starpu_redux_init_data_replicate(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, int workerid);
 void _starpu_data_start_reduction_mode(starpu_data_handle_t handle);

+ 72 - 58
src/datawizard/data_request.c

@@ -32,13 +32,13 @@
 #define MAX_PENDING_IDLE_REQUESTS_PER_NODE 1
 
 /* requests that have not been treated at all */
-static struct _starpu_data_request_list data_requests[STARPU_MAXNODES];
-static struct _starpu_data_request_list prefetch_requests[STARPU_MAXNODES];
-static struct _starpu_data_request_list idle_requests[STARPU_MAXNODES];
+static struct _starpu_data_request_prio_list data_requests[STARPU_MAXNODES];
+static struct _starpu_data_request_prio_list prefetch_requests[STARPU_MAXNODES];
+static struct _starpu_data_request_prio_list idle_requests[STARPU_MAXNODES];
 static starpu_pthread_mutex_t data_requests_list_mutex[STARPU_MAXNODES];
 
 /* requests that are not terminated (eg. async transfers) */
-static struct _starpu_data_request_list data_requests_pending[STARPU_MAXNODES];
+static struct _starpu_data_request_prio_list data_requests_pending[STARPU_MAXNODES];
 static unsigned data_requests_npending[STARPU_MAXNODES];
 static starpu_pthread_mutex_t data_requests_pending_list_mutex[STARPU_MAXNODES];
 
@@ -47,20 +47,20 @@ void _starpu_init_data_request_lists(void)
 	unsigned i;
 	for (i = 0; i < STARPU_MAXNODES; i++)
 	{
-		_starpu_data_request_list_init(&data_requests[i]);
-		_starpu_data_request_list_init(&prefetch_requests[i]);
-		_starpu_data_request_list_init(&idle_requests[i]);
+		_starpu_data_request_prio_list_init(&data_requests[i]);
+		_starpu_data_request_prio_list_init(&prefetch_requests[i]);
+		_starpu_data_request_prio_list_init(&idle_requests[i]);
 
 		/* Tell helgrind that we are fine with checking for list_empty
 		 * in _starpu_handle_node_data_requests, we will call it
 		 * periodically anyway */
-		STARPU_HG_DISABLE_CHECKING(data_requests[i]._head);
-		STARPU_HG_DISABLE_CHECKING(prefetch_requests[i]._head);
-		STARPU_HG_DISABLE_CHECKING(idle_requests[i]._head);
+		STARPU_HG_DISABLE_CHECKING(data_requests[i].tree.root);
+		STARPU_HG_DISABLE_CHECKING(prefetch_requests[i].tree.root);
+		STARPU_HG_DISABLE_CHECKING(idle_requests[i].tree.root);
 
 		STARPU_PTHREAD_MUTEX_INIT(&data_requests_list_mutex[i], NULL);
 
-		_starpu_data_request_list_init(&data_requests_pending[i]);
+		_starpu_data_request_prio_list_init(&data_requests_pending[i]);
 		data_requests_npending[i] = 0;
 		STARPU_PTHREAD_MUTEX_INIT(&data_requests_pending_list_mutex[i], NULL);
 	}
@@ -72,6 +72,9 @@ void _starpu_deinit_data_request_lists(void)
 	unsigned i;
 	for (i = 0; i < STARPU_MAXNODES; i++)
 	{
+		_starpu_data_request_prio_list_deinit(&data_requests[i]);
+		_starpu_data_request_prio_list_deinit(&prefetch_requests[i]);
+		_starpu_data_request_prio_list_deinit(&idle_requests[i]);
 		STARPU_PTHREAD_MUTEX_DESTROY(&data_requests_pending_list_mutex[i]);
 		STARPU_PTHREAD_MUTEX_DESTROY(&data_requests_list_mutex[i]);
 	}
@@ -131,6 +134,7 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
 							 enum starpu_data_access_mode mode,
 							 unsigned ndeps,
 							 unsigned is_prefetch,
+							 int prio,
 							 unsigned is_write_invalidation)
 {
 	struct _starpu_data_request *r = _starpu_data_request_new();
@@ -148,6 +152,7 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
 	STARPU_ASSERT(handling_node == STARPU_MAIN_RAM || _starpu_memory_node_get_nworkers(handling_node));
 	r->completed = 0;
 	r->prefetch = is_prefetch;
+	r->prio = prio;
 	r->retval = -1;
 	r->ndeps = ndeps;
 	r->next_req_count = 0;
@@ -287,11 +292,11 @@ void _starpu_post_data_request(struct _starpu_data_request *r, unsigned handling
 	/* insert the request in the proper list */
 	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node]);
 	if (r->prefetch == 2)
-		_starpu_data_request_list_push_back(&idle_requests[handling_node], r);
+		_starpu_data_request_prio_list_push_back(&idle_requests[handling_node], r);
 	else if (r->prefetch)
-		_starpu_data_request_list_push_back(&prefetch_requests[handling_node], r);
+		_starpu_data_request_prio_list_push_back(&prefetch_requests[handling_node], r);
 	else
-		_starpu_data_request_list_push_back(&data_requests[handling_node], r);
+		_starpu_data_request_prio_list_push_back(&data_requests[handling_node], r);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node]);
 
 #ifndef STARPU_NON_BLOCKING_DRIVERS
@@ -490,7 +495,7 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
 		_starpu_spin_unlock(&handle->header_lock);
 
 		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[r->handling_node]);
-		_starpu_data_request_list_push_back(&data_requests_pending[r->handling_node], r);
+		_starpu_data_request_prio_list_push_back(&data_requests_pending[r->handling_node], r);
 		data_requests_npending[r->handling_node]++;
 		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[r->handling_node]);
 
@@ -504,10 +509,10 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
 	return 0;
 }
 
-static int __starpu_handle_node_data_requests(struct _starpu_data_request_list *reqlist, unsigned src_node, unsigned may_alloc, unsigned n, unsigned *pushed, unsigned prefetch)
+static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_list *reqlist, unsigned src_node, unsigned may_alloc, unsigned n, unsigned *pushed, unsigned prefetch)
 {
 	struct _starpu_data_request *r;
-	struct _starpu_data_request_list new_data_requests[prefetch + 1]; /* Indexed by prefetch level */
+	struct _starpu_data_request_prio_list new_data_requests[prefetch + 1]; /* Indexed by prefetch level */
 	unsigned i;
 	int ret = 0;
 
@@ -517,7 +522,7 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_list *
 	/* This is racy, but not posing problems actually, since we know we
 	 * will come back here to probe again regularly anyway.
 	 * Thus, do not expose this optimization to helgrind */
-	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_data_request_list_empty(&reqlist[src_node]))
+	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_data_request_prio_list_empty(&reqlist[src_node]))
 		return 0;
 #endif
 
@@ -534,7 +539,7 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_list *
 	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
 #endif
 
-	if (_starpu_data_request_list_empty(&reqlist[src_node]))
+	if (_starpu_data_request_prio_list_empty(&reqlist[src_node]))
 	{
 		/* there is no request */
                 STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
@@ -544,16 +549,16 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_list *
 	/* There is an entry: we create a new empty list to replace the list of
 	 * requests, and we handle the request(s) one by one in the former
 	 * list, without concurrency issues.*/
-	struct _starpu_data_request_list local_list = reqlist[src_node];
-	_starpu_data_request_list_init(&reqlist[src_node]);
+	struct _starpu_data_request_prio_list local_list = reqlist[src_node];
+	_starpu_data_request_prio_list_init(&reqlist[src_node]);
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
 
 	for (i = 0; i <= prefetch; i++)
-		_starpu_data_request_list_init(&new_data_requests[i]);
+		_starpu_data_request_prio_list_init(&new_data_requests[i]);
 
 	/* for all entries of the list */
-	while (!_starpu_data_request_list_empty(&local_list))
+	while (!_starpu_data_request_prio_list_empty(&local_list))
 	{
                 int res;
 
@@ -565,7 +570,7 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_list *
 			break;
 		}
 
-		r = _starpu_data_request_list_pop_front(&local_list);
+		r = _starpu_data_request_prio_list_pop_front(&local_list);
 
 		res = starpu_handle_data_request(r, may_alloc, prefetch);
 		if (res != 0 && res != -EAGAIN)
@@ -573,7 +578,7 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_list *
 			/* handle is busy, or not enough memory, postpone for now */
 			ret = res;
 			/* Prefetch requests might have gotten promoted while in tmp list */
-			_starpu_data_request_list_push_back(&new_data_requests[r->prefetch], r);
+			_starpu_data_request_prio_list_push_back(&new_data_requests[r->prefetch], r);
 			if (prefetch)
 				/* Prefetching more there would make the situation even worse */
 				break;
@@ -583,26 +588,35 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_list *
 	}
 
 	/* Push back requests we didn't handle on the proper list */
-	while (!_starpu_data_request_list_empty(&local_list))
+	while (!_starpu_data_request_prio_list_empty(&local_list))
 	{
-		r = _starpu_data_request_list_pop_front(&local_list);
+		r = _starpu_data_request_prio_list_pop_front(&local_list);
 		/* Prefetch requests might have gotten promoted while in tmp list */
-		_starpu_data_request_list_push_back(&new_data_requests[r->prefetch], r);
+		_starpu_data_request_prio_list_push_back(&new_data_requests[r->prefetch], r);
 	}
 
 	for (i = 0; i <= prefetch; i++)
-		if (!_starpu_data_request_list_empty(&new_data_requests[i]))
+		if (!_starpu_data_request_prio_list_empty(&new_data_requests[i]))
 			break;
 
 	if (i <= prefetch)
 	{
 		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
-		if (!(_starpu_data_request_list_empty(&new_data_requests[0])))
-			_starpu_data_request_list_push_list_front(&new_data_requests[0], &data_requests[src_node]);
-		if (prefetch >= 1 && !(_starpu_data_request_list_empty(&new_data_requests[1])))
-			_starpu_data_request_list_push_list_front(&new_data_requests[1], &prefetch_requests[src_node]);
-		if (prefetch >= 2 && !(_starpu_data_request_list_empty(&new_data_requests[2])))
-			_starpu_data_request_list_push_list_front(&new_data_requests[2], &idle_requests[src_node]);
+		if (!(_starpu_data_request_prio_list_empty(&new_data_requests[0])))
+		{
+			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[0], &data_requests[src_node]);
+			data_requests[src_node] = new_data_requests[0];
+		}
+		if (prefetch >= 1 && !(_starpu_data_request_prio_list_empty(&new_data_requests[1])))
+		{
+			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[1], &prefetch_requests[src_node]);
+			prefetch_requests[src_node] = new_data_requests[1];
+		}
+		if (prefetch >= 2 && !(_starpu_data_request_prio_list_empty(&new_data_requests[2])))
+		{
+			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[2], &idle_requests[src_node]);
+			idle_requests[src_node] = new_data_requests[2];
+		}
 		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
 
 #ifdef STARPU_SIMGRID
@@ -644,19 +658,19 @@ static int _handle_pending_node_data_requests(unsigned src_node, unsigned force)
 {
 //	_STARPU_DEBUG("_starpu_handle_pending_node_data_requests ...\n");
 //
-	struct _starpu_data_request_list new_data_requests_pending;
-	struct _starpu_data_request_list empty_list;
+	struct _starpu_data_request_prio_list new_data_requests_pending;
+	struct _starpu_data_request_prio_list empty_list;
 	unsigned taken, kept;
 
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 	/* Here helgrind would should that this is an un protected access.
 	 * We however don't care about missing an entry, we will get called
 	 * again sooner or later. */
-	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_data_request_list_empty(&data_requests_pending[src_node]))
+	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_data_request_prio_list_empty(&data_requests_pending[src_node]))
 		return 0;
 #endif
 
-	_starpu_data_request_list_init(&empty_list);
+	_starpu_data_request_prio_list_init(&empty_list);
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 	if (!force)
 	{
@@ -671,26 +685,26 @@ static int _handle_pending_node_data_requests(unsigned src_node, unsigned force)
 		/* We really want to handle requests */
 		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[src_node]);
 
-	if (_starpu_data_request_list_empty(&data_requests_pending[src_node]))
+	if (_starpu_data_request_prio_list_empty(&data_requests_pending[src_node]))
 	{
 		/* there is no request */
 		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
 		return 0;
 	}
 	/* for all entries of the list */
-	struct _starpu_data_request_list local_list = data_requests_pending[src_node];
-	_starpu_data_request_list_init(&data_requests_pending[src_node]);
+	struct _starpu_data_request_prio_list local_list = data_requests_pending[src_node];
+	_starpu_data_request_prio_list_init(&data_requests_pending[src_node]);
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
 
-	_starpu_data_request_list_init(&new_data_requests_pending);
+	_starpu_data_request_prio_list_init(&new_data_requests_pending);
 	taken = 0;
 	kept = 0;
 
-	while (!_starpu_data_request_list_empty(&local_list))
+	while (!_starpu_data_request_prio_list_empty(&local_list))
 	{
 		struct _starpu_data_request *r;
-		r = _starpu_data_request_list_pop_front(&local_list);
+		r = _starpu_data_request_prio_list_pop_front(&local_list);
 		taken++;
 
 		starpu_data_handle_t handle = r->handle;
@@ -708,7 +722,7 @@ static int _handle_pending_node_data_requests(unsigned src_node, unsigned force)
 			if (_starpu_spin_trylock(&handle->header_lock))
 			{
 				/* Handle is busy, retry this later */
-				_starpu_data_request_list_push_back(&new_data_requests_pending, r);
+				_starpu_data_request_prio_list_push_back(&new_data_requests_pending, r);
 				kept++;
 				continue;
 			}
@@ -738,7 +752,7 @@ static int _handle_pending_node_data_requests(unsigned src_node, unsigned force)
 				_starpu_spin_unlock(&r->lock);
 				_starpu_spin_unlock(&handle->header_lock);
 
-				_starpu_data_request_list_push_back(&new_data_requests_pending, r);
+				_starpu_data_request_prio_list_push_back(&new_data_requests_pending, r);
 				kept++;
 			}
 		}
@@ -746,7 +760,7 @@ static int _handle_pending_node_data_requests(unsigned src_node, unsigned force)
 	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[src_node]);
 	data_requests_npending[src_node] -= taken - kept;
 	if (kept)
-		_starpu_data_request_list_push_list_back(&data_requests_pending[src_node], &new_data_requests_pending);
+		_starpu_data_request_prio_list_push_prio_list_back(&data_requests_pending[src_node], &new_data_requests_pending);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
 
 	return taken - kept;
@@ -770,9 +784,9 @@ int _starpu_check_that_no_data_request_exists(unsigned node)
 	int no_pending;
 
 	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[node]);
-	no_request = _starpu_data_request_list_empty(&data_requests[node])
-	          && _starpu_data_request_list_empty(&prefetch_requests[node])
-		  && _starpu_data_request_list_empty(&idle_requests[node]);
+	no_request = _starpu_data_request_prio_list_empty(&data_requests[node])
+	          && _starpu_data_request_prio_list_empty(&prefetch_requests[node])
+		  && _starpu_data_request_prio_list_empty(&idle_requests[node]);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[node]);
 	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[node]);
 	no_pending = !data_requests_npending[node];
@@ -807,20 +821,20 @@ void _starpu_update_prefetch_status(struct _starpu_data_request *r, unsigned pre
 
 	/* The request can be in a different list (handling request or the temp list)
 	 * we have to check that it is really in the prefetch list. */
-	if (_starpu_data_request_list_ismember(&prefetch_requests[r->handling_node], r))
+	if (_starpu_data_request_prio_list_ismember(&prefetch_requests[r->handling_node], r))
 	{
-		_starpu_data_request_list_erase(&prefetch_requests[r->handling_node],r);
-		_starpu_data_request_list_push_back(&data_requests[r->handling_node],r);
+		_starpu_data_request_prio_list_erase(&prefetch_requests[r->handling_node],r);
+		_starpu_data_request_prio_list_push_back(&data_requests[r->handling_node],r);
 	}
 	/* The request can be in a different list (handling request or the temp list)
 	 * we have to check that it is really in the idle list. */
-	else if (_starpu_data_request_list_ismember(&idle_requests[r->handling_node], r))
+	else if (_starpu_data_request_prio_list_ismember(&idle_requests[r->handling_node], r))
 	{
-		_starpu_data_request_list_erase(&idle_requests[r->handling_node],r);
+		_starpu_data_request_prio_list_erase(&idle_requests[r->handling_node],r);
 		if (prefetch == 1)
-			_starpu_data_request_list_push_back(&prefetch_requests[r->handling_node],r);
+			_starpu_data_request_prio_list_push_back(&prefetch_requests[r->handling_node],r);
 		else
-			_starpu_data_request_list_push_back(&data_requests[r->handling_node],r);
+			_starpu_data_request_prio_list_push_back(&data_requests[r->handling_node],r);
 	}
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[r->handling_node]);
 

+ 6 - 0
src/datawizard/data_request.h

@@ -24,6 +24,7 @@
 #include <semaphore.h>
 #include <datawizard/copy_driver.h>
 #include <common/list.h>
+#include <common/prio_list.h>
 #include <common/starpu_spinlock.h>
 
 struct _starpu_data_replicate;
@@ -74,6 +75,9 @@ LIST_TYPE(_starpu_data_request,
 	 */
 	unsigned prefetch;
 
+	/* Priority of the request. Default is 0 */
+	int prio;
+
 	/* The value returned by the transfer function */
 	int retval;
 
@@ -91,6 +95,7 @@ LIST_TYPE(_starpu_data_request,
 
 	unsigned com_id;
 )
+PRIO_LIST_TYPE(_starpu_data_request, prio)
 
 /* Everyone that wants to access some piece of data will post a request.
  * Not only StarPU internals, but also the application may put such requests */
@@ -133,6 +138,7 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
 							 enum starpu_data_access_mode mode,
 							 unsigned ndeps,
 							 unsigned is_prefetch,
+							 int prio,
 							 unsigned is_write_invalidation);
 
 int _starpu_wait_data_request_completion(struct _starpu_data_request *r, unsigned may_alloc);

+ 1 - 1
src/datawizard/interfaces/data_interface.c

@@ -608,7 +608,7 @@ void _starpu_check_if_valid_and_fetch_data_on_node(starpu_data_handle_t handle,
 	}
 	if (valid)
 	{
-		int ret = _starpu_fetch_data_on_node(handle, replicate, STARPU_R, 0, 0, 0, NULL, NULL);
+		int ret = _starpu_fetch_data_on_node(handle, replicate, STARPU_R, 0, 0, 0, NULL, NULL, 0);
 		STARPU_ASSERT(!ret);
 		_starpu_release_data_on_node(handle, 0, &handle->per_node[handle->home_node]);
 	}

+ 2 - 2
src/datawizard/memalloc.c

@@ -244,7 +244,7 @@ static int transfer_subtree_to_node(starpu_data_handle_t handle, unsigned src_no
 		{
 			/* This is the only copy, push it to destination */
 			struct _starpu_data_request *r;
-			r = _starpu_create_request_to_fetch_data(handle, dst_replicate, STARPU_R, 0, 0, NULL, NULL);
+			r = _starpu_create_request_to_fetch_data(handle, dst_replicate, STARPU_R, 0, 0, NULL, NULL, 0);
 			/* There is no way we don't need a request, since
 			 * source is OWNER, destination can't be having it */
 			STARPU_ASSERT(r);
@@ -1011,7 +1011,7 @@ void starpu_memchunk_tidy(unsigned node)
 				}
 
 				_starpu_spin_unlock(&mc_lock[node]);
-				if (!_starpu_create_request_to_fetch_data(handle, &handle->per_node[handle->home_node], STARPU_R, 2, 1, NULL, NULL))
+				if (!_starpu_create_request_to_fetch_data(handle, &handle->per_node[handle->home_node], STARPU_R, 2, 1, NULL, NULL, 0))
 				{
 					/* No request was actually needed??
 					 * Odd, but cope with it.  */

+ 22 - 10
src/datawizard/user_interactions.c

@@ -34,7 +34,7 @@ int starpu_data_request_allocation(starpu_data_handle_t handle, unsigned node)
 
 	_starpu_spin_lock(&handle->header_lock);
 
-	r = _starpu_create_data_request(handle, NULL, &handle->per_node[node], node, STARPU_NONE, 0, 1, 0);
+	r = _starpu_create_data_request(handle, NULL, &handle->per_node[node], node, STARPU_NONE, 0, 1, 0, 0);
 
 	/* we do not increase the refcnt associated to the request since we are
 	 * not waiting for its termination */
@@ -56,6 +56,7 @@ struct user_interaction_wrapper
 	unsigned finished;
 	unsigned async;
 	unsigned prefetch;
+	int prio;
 	void (*callback)(void *);
 	void (*callback_fetch_data)(void *); // called after fetch_data
 	void *callback_arg;
@@ -98,7 +99,7 @@ static void _starpu_data_acquire_continuation_non_blocking(void *arg)
 		struct _starpu_data_replicate *replicate = &handle->per_node[wrapper->node];
 
 		ret = _starpu_fetch_data_on_node(handle, replicate, wrapper->mode, 0, 0, 1,
-						 _starpu_data_acquire_fetch_data_callback, wrapper);
+						 _starpu_data_acquire_fetch_data_callback, wrapper, 0);
 		STARPU_ASSERT(!ret);
 	}
 	else
@@ -217,7 +218,7 @@ static inline void _starpu_data_acquire_continuation(void *arg)
 		int ret;
 		struct _starpu_data_replicate *replicate = &handle->per_node[wrapper->node];
 
-		ret = _starpu_fetch_data_on_node(handle, replicate, wrapper->mode, 0, 0, 0, NULL, NULL);
+		ret = _starpu_fetch_data_on_node(handle, replicate, wrapper->mode, 0, 0, 0, NULL, NULL, 0);
 		STARPU_ASSERT(!ret);
 	}
 
@@ -305,7 +306,7 @@ int starpu_data_acquire_on_node(starpu_data_handle_t handle, int node, enum star
 		{
 			/* no one has locked this data yet, so we proceed immediately */
 			struct _starpu_data_replicate *replicate = &handle->per_node[node];
-			int ret = _starpu_fetch_data_on_node(handle, replicate, mode, 0, 0, 0, NULL, NULL);
+			int ret = _starpu_fetch_data_on_node(handle, replicate, mode, 0, 0, 0, NULL, NULL, 0);
 			STARPU_ASSERT(!ret);
 		}
 	}
@@ -367,7 +368,7 @@ static void _prefetch_data_on_node(void *arg)
         int ret;
 
 	struct _starpu_data_replicate *replicate = &handle->per_node[wrapper->node];
-	ret = _starpu_fetch_data_on_node(handle, replicate, STARPU_R, wrapper->async, wrapper->prefetch, wrapper->async, NULL, NULL);
+	ret = _starpu_fetch_data_on_node(handle, replicate, STARPU_R, wrapper->async, wrapper->prefetch, wrapper->async, NULL, NULL, wrapper->prio);
         STARPU_ASSERT(!ret);
 
 	if (wrapper->async)
@@ -386,7 +387,7 @@ static void _prefetch_data_on_node(void *arg)
 }
 
 static
-int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle_t handle, unsigned node, unsigned async, enum starpu_data_access_mode mode, unsigned prefetch)
+int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle_t handle, unsigned node, unsigned async, enum starpu_data_access_mode mode, unsigned prefetch, int prio)
 {
 	STARPU_ASSERT(handle);
 
@@ -399,6 +400,7 @@ int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle_t handle, unsigne
 	wrapper->node = node;
 	wrapper->async = async;
 	wrapper->prefetch = prefetch;
+	wrapper->prio = prio;
 	STARPU_PTHREAD_COND_INIT(&wrapper->cond, NULL);
 	STARPU_PTHREAD_MUTEX_INIT(&wrapper->lock, NULL);
 	wrapper->finished = 0;
@@ -412,7 +414,7 @@ int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle_t handle, unsigne
 		STARPU_PTHREAD_MUTEX_DESTROY(&wrapper->lock);
 		free(wrapper);
 
-		_starpu_fetch_data_on_node(handle, replicate, mode, async, prefetch, async, NULL, NULL);
+		_starpu_fetch_data_on_node(handle, replicate, mode, async, prefetch, async, NULL, NULL, prio);
 
 		/* remove the "lock"/reference */
 
@@ -449,17 +451,27 @@ int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle_t handle, unsigne
 
 int starpu_data_fetch_on_node(starpu_data_handle_t handle, unsigned node, unsigned async)
 {
-	return _starpu_prefetch_data_on_node_with_mode(handle, node, async, STARPU_R, 0);
+	return _starpu_prefetch_data_on_node_with_mode(handle, node, async, STARPU_R, 0, 0);
+}
+
+int starpu_data_prefetch_on_node_prio(starpu_data_handle_t handle, unsigned node, unsigned async, int prio)
+{
+	return _starpu_prefetch_data_on_node_with_mode(handle, node, async, STARPU_R, 1, prio);
 }
 
 int starpu_data_prefetch_on_node(starpu_data_handle_t handle, unsigned node, unsigned async)
 {
-	return _starpu_prefetch_data_on_node_with_mode(handle, node, async, STARPU_R, 1);
+	return starpu_data_prefetch_on_node_prio(handle, node, async, 0);
+}
+
+int starpu_data_idle_prefetch_on_node_prio(starpu_data_handle_t handle, unsigned node, unsigned async, int prio)
+{
+	return _starpu_prefetch_data_on_node_with_mode(handle, node, async, STARPU_R, 2, prio);
 }
 
 int starpu_data_idle_prefetch_on_node(starpu_data_handle_t handle, unsigned node, unsigned async)
 {
-	return _starpu_prefetch_data_on_node_with_mode(handle, node, async, STARPU_R, 2);
+	return starpu_data_idle_prefetch_on_node_prio(handle, node, async, 0);
 }
 
 static void _starpu_data_wont_use(void *data)

+ 1 - 1
src/datawizard/write_back.c

@@ -64,7 +64,7 @@ void _starpu_write_through_data(starpu_data_handle_t handle, unsigned requesting
 
 				struct _starpu_data_request *r;
 				r = _starpu_create_request_to_fetch_data(handle, &handle->per_node[node],
-									 STARPU_R, 1, 1, wt_callback, handle);
+									 STARPU_R, 1, 1, wt_callback, handle, 0);
 
 			        /* If no request was created, the handle was already up-to-date on the
 			         * node */

+ 2 - 2
src/debug/latency.c

@@ -35,7 +35,7 @@ void _starpu_benchmark_ping_pong(starpu_data_handle_t handle,
 		_starpu_spin_unlock(&handle->header_lock);
 
 		struct _starpu_data_replicate *replicate_0 = &handle->per_node[node0];
-		ret = _starpu_fetch_data_on_node(handle, replicate_0, STARPU_RW, 0, 0, 0, NULL, NULL);
+		ret = _starpu_fetch_data_on_node(handle, replicate_0, STARPU_RW, 0, 0, 0, NULL, NULL, 0);
 		STARPU_ASSERT(!ret);
 		_starpu_release_data_on_node(handle, 0, replicate_0);
 
@@ -45,7 +45,7 @@ void _starpu_benchmark_ping_pong(starpu_data_handle_t handle,
 		_starpu_spin_unlock(&handle->header_lock);
 
 		struct _starpu_data_replicate *replicate_1 = &handle->per_node[node1];
-		ret = _starpu_fetch_data_on_node(handle, replicate_1, STARPU_RW, 0, 0, 0, NULL, NULL);
+		ret = _starpu_fetch_data_on_node(handle, replicate_1, STARPU_RW, 0, 0, 0, NULL, NULL, 0);
 		STARPU_ASSERT(!ret);
 		_starpu_release_data_on_node(handle, 0, replicate_1);
 	}