瀏覽代碼

Merge branch 'master' of git+ssh://scm.gforge.inria.fr/gitroot/starpu/starpu

Nathalie Furmento 5 年之前
父節點
當前提交
fa2d9f9a98
共有 3 個文件被更改,包括 185 次插入111 次删除
  1. 94 34
      src/common/fxt.h
  2. 43 37
      src/sched_policies/component_fifo.c
  3. 48 40
      src/sched_policies/component_prio.c

+ 94 - 34
src/common/fxt.h

@@ -327,15 +327,11 @@ void _starpu_fxt_dump_file(void);
 #define _STARPU_FUT_COMMIT(size) do { } while (0)
 #endif
 
-#ifdef FUT_FULL_PROBE1STR
-#define _STARPU_FUT_FULL_PROBE1STR(KEYMASK, CODE, P1, str) FUT_FULL_PROBE1STR(CODE, P1, str)
+#ifdef FUT_ALWAYS_PROBE1STR
+#define _STARPU_FUT_ALWAYS_PROBE1STR(CODE, P1, str) FUT_RAW_ALWAYS_PROBE1STR(CODE, P1, str)
 #else
-/** Sometimes we need something a little more specific than the wrappers from
- * FxT: these macro permit to put add an event with 3 (or 4) numbers followed
- * by a string. */
-#define _STARPU_FUT_FULL_PROBE1STR(KEYMASK, CODE, P1, str)			\
+#define _STARPU_FUT_ALWAYS_PROBE1STR(CODE, P1, str)	\
 do {									\
-    if(KEYMASK & fut_active) {							\
 	/* No more than FXT_MAX_PARAMS args are allowed */		\
 	/* we add a \0 just in case ... */				\
 	size_t len = STARPU_MIN(strlen(str)+1, (FXT_MAX_PARAMS - 1)*sizeof(unsigned long));\
@@ -348,19 +344,28 @@ do {									\
 	snprintf((char *)futargs, len, "%s", str);			\
 	((char *)futargs)[len - 1] = '\0';				\
 	_STARPU_FUT_COMMIT(total_len);					\
-    }									\
 } while (0);
 #endif
 
-#ifdef FUT_FULL_PROBE2STR
-#define _STARPU_FUT_FULL_PROBE2STR(KEYMASK, CODE, P1, P2, str) FUT_FULL_PROBE2STR(CODE, P1, P2, str)
+#ifdef FUT_FULL_PROBE1STR
+#define _STARPU_FUT_FULL_PROBE1STR(KEYMASK, CODE, P1, str) FUT_FULL_PROBE1STR(CODE, P1, str)
 #else
 /** Sometimes we need something a little more specific than the wrappers from
  * FxT: these macro permit to put add an event with 3 (or 4) numbers followed
  * by a string. */
-#define _STARPU_FUT_FULL_PROBE2STR(KEYMASK, CODE, P1, P2, str)			\
+#define _STARPU_FUT_FULL_PROBE1STR(KEYMASK, CODE, P1, str)		\
+do {									\
+    if(KEYMASK & fut_active) {						\
+	_STARPU_FUT_ALWAYS_PROBE1STR(CODE, P1, str);		\
+    }									\
+} while (0);
+#endif
+
+#ifdef FUT_ALWAYS_PROBE2STR
+#define _STARPU_FUT_ALWAYS_PROBE2STR(CODE, P1, P2, str) FUT_RAW_ALWAYS_PROBE2STR(CODE, P1, P2, str)
+#else
+#define _STARPU_FUT_ALWAYS_PROBE2STR(CODE, P1, P2, str)			\
 do {									\
-    if(KEYMASK & fut_active) {							\
 	/* No more than FXT_MAX_PARAMS args are allowed */		\
 	/* we add a \0 just in case ... */				\
 	size_t len = STARPU_MIN(strlen(str)+1, (FXT_MAX_PARAMS - 2)*sizeof(unsigned long));\
@@ -374,16 +379,25 @@ do {									\
 	snprintf((char *)futargs, len, "%s", str);			\
 	((char *)futargs)[len - 1] = '\0';				\
 	_STARPU_FUT_COMMIT(total_len);					\
+} while (0);
+#endif
+
+#ifdef FUT_FULL_PROBE2STR
+#define _STARPU_FUT_FULL_PROBE2STR(KEYMASK, CODE, P1, P2, str) FUT_FULL_PROBE2STR(CODE, P1, P2, str)
+#else
+#define _STARPU_FUT_FULL_PROBE2STR(KEYMASK, CODE, P1, P2, str)		\
+do {									\
+    if(KEYMASK & fut_active) {						\
+	_STARPU_FUT_ALWAYS_PROBE2STR(CODE, P1, P2, str);		\
     }									\
 } while (0);
 #endif
 
-#ifdef FUT_FULL_PROBE3STR
-#define _STARPU_FUT_FULL_PROBE3STR(KEYMASK, CODE, P1, P2, P3, str) FUT_FULL_PROBE3STR(CODE, P1, P2, P3, str)
+#ifdef FUT_ALWAYS_PROBE3STR
+#define _STARPU_FUT_ALWAYS_PROBE3STR(CODE, P1, P2, P3, str) FUT_RAW_ALWAYS_PROBE3STR(CODE, P1, P2, P3, str)
 #else
-#define _STARPU_FUT_FULL_PROBE3STR(KEYMASK, CODE, P1, P2, P3, str)			\
+#define _STARPU_FUT_ALWAYS_PROBE3STR(CODE, P1, P2, P3, str)			\
 do {									\
-    if(KEYMASK & fut_active) {							\
 	/* No more than FXT_MAX_PARAMS args are allowed */		\
 	/* we add a \0 just in case ... */				\
 	size_t len = STARPU_MIN(strlen(str)+1, (FXT_MAX_PARAMS - 3)*sizeof(unsigned long));\
@@ -398,16 +412,25 @@ do {									\
 	snprintf((char *)futargs, len, "%s", str);			\
 	((char *)futargs)[len - 1] = '\0';				\
 	_STARPU_FUT_COMMIT(total_len);					\
+} while (0);
+#endif
+
+#ifdef FUT_FULL_PROBE3STR
+#define _STARPU_FUT_FULL_PROBE3STR(KEYMASK, CODE, P1, P2, P3, str) FUT_FULL_PROBE3STR(CODE, P1, P2, P3, str)
+#else
+#define _STARPU_FUT_FULL_PROBE3STR(KEYMASK, CODE, P1, P2, P3, str)		\
+do {									\
+    if(KEYMASK & fut_active) {						\
+	_STARPU_FUT_ALWAYS_PROBE3STR(CODE, P1, P2, P3, str);	\
     }									\
 } while (0);
 #endif
 
-#ifdef FUT_FULL_PROBE4STR
-#define _STARPU_FUT_FULL_PROBE4STR(KEYMASK, CODE, P1, P2, P3, P4, str) FUT_FULL_PROBE4STR(CODE, P1, P2, P3, P4, str)
+#ifdef FUT_ALWAYS_PROBE4STR
+#define _STARPU_FUT_ALWAYS_PROBE4STR(CODE, P1, P2, P3, P4, str) FUT_RAW_ALWAYS_PROBE4STR(CODE, P1, P2, P3, P4, str)
 #else
-#define _STARPU_FUT_FULL_PROBE4STR(KEYMASK, CODE, P1, P2, P3, P4, str)		\
+#define _STARPU_FUT_ALWAYS_PROBE4STR(CODE, P1, P2, P3, P4, str)		\
 do {									\
-    if(KEYMASK & fut_active) {							\
 	/* No more than FXT_MAX_PARAMS args are allowed */		\
 	/* we add a \0 just in case ... */				\
 	size_t len = STARPU_MIN(strlen(str)+1, (FXT_MAX_PARAMS - 4)*sizeof(unsigned long));\
@@ -423,16 +446,25 @@ do {									\
 	snprintf((char *)futargs, len, "%s", str);			\
 	((char *)futargs)[len - 1] = '\0';				\
 	_STARPU_FUT_COMMIT(total_len);					\
+} while (0);
+#endif
+
+#ifdef FUT_FULL_PROBE4STR
+#define _STARPU_FUT_FULL_PROBE4STR(KEYMASK, CODE, P1, P2, P3, P4, str) FUT_FULL_PROBE4STR(CODE, P1, P2, P3, P4, str)
+#else
+#define _STARPU_FUT_FULL_PROBE4STR(KEYMASK, CODE, P1, P2, P3, P4, str)		\
+do {									\
+    if(KEYMASK & fut_active) {						\
+	_STARPU_FUT_ALWAYS_PROBE4STR(CODE, P1, P2, P3, P4, str);	\
     }									\
 } while (0);
 #endif
 
-#ifdef FUT_FULL_PROBE5STR
-#define _STARPU_FUT_FULL_PROBE5STR(KEYMASK, CODE, P1, P2, P3, P4, P5, str) FUT_FULL_PROBE5STR(CODE, P1, P2, P3, P4, P5, str)
+#ifdef FUT_ALWAYS_PROBE5STR
+#define _STARPU_FUT_ALWAYS_PROBE5STR(CODE, P1, P2, P3, P4, P5, str) FUT_RAW_ALWAYS_PROBE5STR(CODE, P1, P2, P3, P4, P5, str)
 #else
-#define _STARPU_FUT_FULL_PROBE5STR(KEYMASK, CODE, P1, P2, P3, P4, P5, str)		\
+#define _STARPU_FUT_ALWAYS_PROBE5STR(CODE, P1, P2, P3, P4, P5, str)		\
 do {									\
-    if(KEYMASK & fut_active) {							\
 	/* No more than FXT_MAX_PARAMS args are allowed */		\
 	/* we add a \0 just in case ... */				\
 	size_t len = STARPU_MIN(strlen(str)+1, (FXT_MAX_PARAMS - 5)*sizeof(unsigned long));\
@@ -449,16 +481,25 @@ do {									\
 	snprintf((char *)futargs, len, "%s", str);			\
 	((char *)futargs)[len - 1] = '\0';				\
 	_STARPU_FUT_COMMIT(total_len);					\
+} while (0);
+#endif
+
+#ifdef FUT_FULL_PROBE5STR
+#define _STARPU_FUT_FULL_PROBE5STR(KEYMASK, CODE, P1, P2, P3, P4, P5, str) FUT_FULL_PROBE5STR(CODE, P1, P2, P3, P4, P5, str)
+#else
+#define _STARPU_FUT_FULL_PROBE5STR(KEYMASK, CODE, P1, P2, P3, P4, P5, str)		\
+do {									\
+    if(KEYMASK & fut_active) {						\
+	_STARPU_FUT_ALWAYS_PROBE5STR(CODE, P1, P2, P3, P4, P5, str);	\
     }									\
 } while (0);
 #endif
 
-#ifdef FUT_FULL_PROBE6STR
-#define _STARPU_FUT_FULL_PROBE6STR(KEYMASK, CODE, P1, P2, P3, P4, P5, P6, str) FUT_FULL_PROBE6STR(CODE, P1, P2, P3, P4, P5, P6, str)
+#ifdef FUT_ALWAYS_PROBE6STR
+#define _STARPU_FUT_ALWAYS_PROBE6STR(CODE, P1, P2, P3, P4, P5, P6, str) FUT_RAW_ALWAYS_PROBE6STR(CODE, P1, P2, P3, P4, P5, P6, str)
 #else
-#define _STARPU_FUT_FULL_PROBE6STR(KEYMASK, CODE, P1, P2, P3, P4, P5, P6, str)	\
+#define _STARPU_FUT_ALWAYS_PROBE6STR(CODE, P1, P2, P3, P4, P5, P6, str)	\
 do {									\
-    if(KEYMASK & fut_active) {							\
 	/* No more than FXT_MAX_PARAMS args are allowed */		\
 	/* we add a \0 just in case ... */				\
 	size_t len = STARPU_MIN(strlen(str)+1, (FXT_MAX_PARAMS - 6)*sizeof(unsigned long));\
@@ -476,16 +517,25 @@ do {									\
 	snprintf((char *)futargs, len, "%s", str);			\
 	((char *)futargs)[len - 1] = '\0';				\
 	_STARPU_FUT_COMMIT(total_len);					\
+} while (0);
+#endif
+
+#ifdef FUT_FULL_PROBE6STR
+#define _STARPU_FUT_FULL_PROBE6STR(KEYMASK, CODE, P1, P2, P3, P4, P5, P6, str) FUT_FULL_PROBE6STR(CODE, P1, P2, P3, P4, P5, P6, str)
+#else
+#define _STARPU_FUT_FULL_PROBE6STR(KEYMASK, CODE, P1, P2, P3, P4, P5, P6, str)		\
+do {									\
+    if(KEYMASK & fut_active) {						\
+	_STARPU_FUT_ALWAYS_PROBE6STR(CODE, P1, P2, P3, P4, P5, P6, str);	\
     }									\
 } while (0);
 #endif
 
-#ifdef FUT_FULL_PROBE7STR
-#define _STARPU_FUT_FULL_PROBE7STR(KEYMASK, CODE, P1, P2, P3, P4, P5, P6, P7, str) FUT_FULL_PROBE7STR(CODE, P1, P2, P3, P4, P5, P6, P7, str)
+#ifdef FUT_ALWAYS_PROBE7STR
+#define _STARPU_FUT_ALWAYS_PROBE7STR(CODE, P1, P2, P3, P4, P5, P6, P7, str) FUT_RAW_ALWAYS_PROBE7STR(CODE, P1, P2, P3, P4, P5, P6, P7, str)
 #else
-#define _STARPU_FUT_FULL_PROBE7STR(KEYMASK, CODE, P1, P2, P3, P4, P5, P6, P7, str)	\
+#define _STARPU_FUT_ALWAYS_PROBE7STR(CODE, P1, P2, P3, P4, P5, P6, P7, str)	\
 do {									\
-    if(KEYMASK & fut_active) {							\
 	/* No more than FXT_MAX_PARAMS args are allowed */		\
 	/* we add a \0 just in case ... */				\
 	size_t len = STARPU_MIN(strlen(str)+1, (FXT_MAX_PARAMS - 7)*sizeof(unsigned long));\
@@ -504,6 +554,16 @@ do {									\
 	snprintf((char *)futargs, len, "%s", str);			\
 	((char *)futargs)[len - 1] = '\0';				\
 	_STARPU_FUT_COMMIT(total_len);					\
+} while (0);
+#endif
+
+#ifdef FUT_FULL_PROBE7STR
+#define _STARPU_FUT_FULL_PROBE7STR(KEYMASK, CODE, P1, P2, P3, P4, P5, P6, P7, str) FUT_FULL_PROBE7STR(CODE, P1, P2, P3, P4, P5, P6, P7, str)
+#else
+#define _STARPU_FUT_FULL_PROBE7STR(KEYMASK, CODE, P1, P2, P3, P4, P5, P6, P7, str)		\
+do {									\
+    if(KEYMASK & fut_active) {						\
+	_STARPU_FUT_ALWAYS_PROBE7STR(CODE, P1, P2, P3, P4, P5, P6, P7, str);	\
     }									\
 } while (0);
 #endif
@@ -1204,10 +1264,10 @@ do {										\
 	FUT_FULL_PROBE4(_STARPU_FUT_KEYMASK_SCHED, _STARPU_FUT_SCHED_COMPONENT_POP_PRIO, _starpu_gettid(), workerid, ntasks, exp_len);
 
 #define _STARPU_TRACE_SCHED_COMPONENT_NEW(component)		\
-	_STARPU_FUT_FULL_PROBE1STR(_STARPU_FUT_KEYMASK_SCHED, _STARPU_FUT_SCHED_COMPONENT_NEW, component, (component)->name);
+	_STARPU_FUT_ALWAYS_PROBE1STR(_STARPU_FUT_SCHED_COMPONENT_NEW, component, (component)->name);
 
 #define _STARPU_TRACE_SCHED_COMPONENT_CONNECT(parent, child)		\
-	FUT_FULL_PROBE2(_STARPU_FUT_KEYMASK_SCHED, _STARPU_FUT_SCHED_COMPONENT_CONNECT, parent, child);
+	FUT_RAW_ALWAYS_PROBE2(FUT_CODE(_STARPU_FUT_SCHED_COMPONENT_CONNECT,2), parent, child);
 
 #define _STARPU_TRACE_SCHED_COMPONENT_PUSH(from, to, task)		\
 	FUT_FULL_PROBE5(_STARPU_FUT_KEYMASK_SCHED, _STARPU_FUT_SCHED_COMPONENT_PUSH, _starpu_gettid(), from, to, task, (task)->priority);

+ 43 - 37
src/sched_policies/component_fifo.c

@@ -44,8 +44,8 @@ static double fifo_estimated_end(struct starpu_sched_component * component)
 {
 	STARPU_ASSERT(component && component->data);
 	struct _starpu_fifo_data * data = component->data;
-	struct _starpu_fifo_taskq * fifo = data->fifo;
-	return starpu_sched_component_estimated_end_min_add(component, fifo->exp_len);
+	struct _starpu_fifo_taskq * queue = data->fifo;
+	return starpu_sched_component_estimated_end_min_add(component, queue->exp_len);
 }
 
 static double fifo_estimated_load(struct starpu_sched_component * component)
@@ -53,7 +53,7 @@ static double fifo_estimated_load(struct starpu_sched_component * component)
 	STARPU_ASSERT(component && component->data);
 	STARPU_ASSERT(starpu_bitmap_cardinal(component->workers_in_ctx) != 0);
 	struct _starpu_fifo_data * data = component->data;
-	struct _starpu_fifo_taskq * fifo = data->fifo;
+	struct _starpu_fifo_taskq * queue = data->fifo;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 	double relative_speedup = 0.0;
 	double load = starpu_sched_component_estimated_load(component);
@@ -62,7 +62,7 @@ static double fifo_estimated_load(struct starpu_sched_component * component)
 		int first_worker = starpu_bitmap_first(component->workers_in_ctx);
 		relative_speedup = starpu_worker_get_relative_speedup(starpu_worker_get_perf_archtype(first_worker, component->tree->sched_ctx_id));
 		STARPU_COMPONENT_MUTEX_LOCK(mutex);
-		load += fifo->ntasks / relative_speedup;
+		load += queue->ntasks / relative_speedup;
 		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 		return load;
 	}
@@ -76,7 +76,7 @@ static double fifo_estimated_load(struct starpu_sched_component * component)
 		relative_speedup /= starpu_bitmap_cardinal(component->workers_in_ctx);
 		STARPU_ASSERT(!_STARPU_IS_ZERO(relative_speedup));
 		STARPU_COMPONENT_MUTEX_LOCK(mutex);
-		load += fifo->ntasks / relative_speedup;
+		load += queue->ntasks / relative_speedup;
 		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 	}
 	return load;
@@ -87,21 +87,27 @@ static int fifo_push_local_task(struct starpu_sched_component * component, struc
 	STARPU_ASSERT(component && component->data && task);
 	STARPU_ASSERT(starpu_sched_component_can_execute_task(component,task));
 	struct _starpu_fifo_data * data = component->data;
-	struct _starpu_fifo_taskq * fifo = data->fifo;
+	struct _starpu_fifo_taskq * queue = data->fifo;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 	int ret = 0;
 	const double now = starpu_timing_now();
 	STARPU_COMPONENT_MUTEX_LOCK(mutex);
 
-	if(data->exp)
+	if (data->ntasks_threshold != 0 && queue->ntasks >= data->ntasks_threshold)
+	{
+		STARPU_ASSERT(!is_pushback);
+		ret = 1;
+		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
+	}
+	else if(data->exp)
 	{
 		double exp_len;
 		if(!isnan(task->predicted))
-			exp_len = fifo->exp_len + task->predicted;
+			exp_len = queue->exp_len + task->predicted;
 		else
-			exp_len = fifo->exp_len;
+			exp_len = queue->exp_len;
 
-		if ((data->ntasks_threshold != 0 && fifo->ntasks >= data->ntasks_threshold) || (data->exp_len_threshold != 0.0 && exp_len >= data->exp_len_threshold))
+		if (data->exp_len_threshold != 0.0 && exp_len >= data->exp_len_threshold)
 		{
 			static int warned;
 			if(data->exp_len_threshold != 0.0 && task->predicted > data->exp_len_threshold && !warned)
@@ -128,22 +134,22 @@ static int fifo_push_local_task(struct starpu_sched_component * component, struc
 
 			if(!isnan(task->predicted))
 			{
-				fifo->exp_len = exp_len;
-				fifo->exp_end = fifo->exp_start + fifo->exp_len;
+				queue->exp_len = exp_len;
+				queue->exp_end = queue->exp_start + queue->exp_len;
 			}
-			STARPU_ASSERT(!isnan(fifo->exp_end));
-			STARPU_ASSERT(!isnan(fifo->exp_len));
-			STARPU_ASSERT(!isnan(fifo->exp_start));
+			STARPU_ASSERT(!isnan(queue->exp_end));
+			STARPU_ASSERT(!isnan(queue->exp_len));
+			STARPU_ASSERT(!isnan(queue->exp_start));
 		}
 	}
 
 	if(!ret)
 	{
 		if(is_pushback)
-			ret = _starpu_fifo_push_back_task(fifo,task);
+			ret = _starpu_fifo_push_back_task(queue,task);
 		else
 		{
-			ret = _starpu_fifo_push_task(fifo,task);
+			ret = _starpu_fifo_push_task(queue,task);
 			starpu_sched_component_prefetch_on_node(component, task);
 		}
 		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
@@ -163,11 +169,11 @@ static struct starpu_task * fifo_pull_task(struct starpu_sched_component * compo
 {
 	STARPU_ASSERT(component && component->data);
 	struct _starpu_fifo_data * data = component->data;
-	struct _starpu_fifo_taskq * fifo = data->fifo;
+	struct _starpu_fifo_taskq * queue = data->fifo;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 	const double now = starpu_timing_now();
 
-	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_fifo_empty(data->fifo))
+	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_fifo_empty(queue))
 	{
 		starpu_sched_component_send_can_push_to_parents(component);
 		return NULL;
@@ -176,48 +182,48 @@ static struct starpu_task * fifo_pull_task(struct starpu_sched_component * compo
 	STARPU_COMPONENT_MUTEX_LOCK(mutex);
 	struct starpu_task * task;
 	if (data->ready && to->properties & STARPU_SCHED_COMPONENT_SINGLE_MEMORY_NODE)
-		task = _starpu_fifo_pop_first_ready_task(fifo, starpu_bitmap_first(to->workers_in_ctx), -1);
+		task = _starpu_fifo_pop_first_ready_task(queue, starpu_bitmap_first(to->workers_in_ctx), -1);
 	else
-		task = _starpu_fifo_pop_task(fifo, starpu_worker_get_id_check());
+		task = _starpu_fifo_pop_task(queue, starpu_worker_get_id_check());
 	if(task && data->exp)
 	{
 		if(!isnan(task->predicted))
 		{
-			const double exp_len = fifo->exp_len - task->predicted;
-			fifo->exp_start = now + task->predicted;
+			const double exp_len = queue->exp_len - task->predicted;
+			queue->exp_start = now + task->predicted;
 			if (exp_len >= 0.0)
 			{
-				fifo->exp_len = exp_len;
+				queue->exp_len = exp_len;
 			}
 			else
 			{
 				/* exp_len can become negative due to rounding errors */
-				fifo->exp_len = 0.0;
+				queue->exp_len = 0.0;
 			}
 		}
 
-		STARPU_ASSERT_MSG(fifo->exp_len>=0, "fifo->exp_len=%lf\n",fifo->exp_len);
+		STARPU_ASSERT_MSG(queue->exp_len>=0, "fifo->exp_len=%lf\n",queue->exp_len);
 		if(!isnan(task->predicted_transfer))
 		{
-			if (fifo->exp_len > task->predicted_transfer)
+			if (queue->exp_len > task->predicted_transfer)
 			{
-				fifo->exp_start += task->predicted_transfer;
-				fifo->exp_len -= task->predicted_transfer;
+				queue->exp_start += task->predicted_transfer;
+				queue->exp_len -= task->predicted_transfer;
 			}
 			else
 			{
-				fifo->exp_start += fifo->exp_len;
-				fifo->exp_len = 0;
+				queue->exp_start += queue->exp_len;
+				queue->exp_len = 0;
 			}
 		}
 
-		fifo->exp_end = fifo->exp_start + fifo->exp_len;
-		if(fifo->ntasks == 0)
-			fifo->exp_len = 0.0;
+		queue->exp_end = queue->exp_start + queue->exp_len;
+		if(queue->ntasks == 0)
+			queue->exp_len = 0.0;
 	}
-	STARPU_ASSERT(!isnan(fifo->exp_end));
-	STARPU_ASSERT(!isnan(fifo->exp_len));
-	STARPU_ASSERT(!isnan(fifo->exp_start));
+	STARPU_ASSERT(!isnan(queue->exp_end));
+	STARPU_ASSERT(!isnan(queue->exp_len));
+	STARPU_ASSERT(!isnan(queue->exp_start));
 	STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 
 	// When a pop is called, a can_push is called for pushing tasks onto

+ 48 - 40
src/sched_policies/component_prio.c

@@ -63,8 +63,8 @@ static double prio_estimated_end(struct starpu_sched_component * component)
 {
 	STARPU_ASSERT(component && component->data);
 	struct _starpu_prio_data * data = component->data;
-	struct _starpu_prio_deque * prio = &data->prio;
-	return starpu_sched_component_estimated_end_min_add(component, prio->exp_len);
+	struct _starpu_prio_deque * queue = &data->prio;
+	return starpu_sched_component_estimated_end_min_add(component, queue->exp_len);
 }
 
 static double prio_estimated_load(struct starpu_sched_component * component)
@@ -72,7 +72,7 @@ static double prio_estimated_load(struct starpu_sched_component * component)
 	STARPU_ASSERT(component && component->data);
 	STARPU_ASSERT(starpu_bitmap_cardinal(component->workers_in_ctx) != 0);
 	struct _starpu_prio_data * data = component->data;
-	struct _starpu_prio_deque * prio = &data->prio;
+	struct _starpu_prio_deque * queue = &data->prio;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 	double relative_speedup = 0.0;
 	double load = starpu_sched_component_estimated_load(component);
@@ -81,7 +81,7 @@ static double prio_estimated_load(struct starpu_sched_component * component)
 		int first_worker = starpu_bitmap_first(component->workers_in_ctx);
 		relative_speedup = starpu_worker_get_relative_speedup(starpu_worker_get_perf_archtype(first_worker, component->tree->sched_ctx_id));
 		STARPU_COMPONENT_MUTEX_LOCK(mutex);
-		load += prio->ntasks / relative_speedup;
+		load += queue->ntasks / relative_speedup;
 		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 		return load;
 	}
@@ -95,7 +95,7 @@ static double prio_estimated_load(struct starpu_sched_component * component)
 		relative_speedup /= starpu_bitmap_cardinal(component->workers_in_ctx);
 		STARPU_ASSERT(!_STARPU_IS_ZERO(relative_speedup));
 		STARPU_COMPONENT_MUTEX_LOCK(mutex);
-		load += prio->ntasks / relative_speedup;
+		load += queue->ntasks / relative_speedup;
 		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 	}
 	return load;
@@ -106,21 +106,28 @@ static int prio_push_local_task(struct starpu_sched_component * component, struc
 	STARPU_ASSERT(component && component->data && task);
 	STARPU_ASSERT(starpu_sched_component_can_execute_task(component,task));
 	struct _starpu_prio_data * data = component->data;
-	struct _starpu_prio_deque * prio = &data->prio;
+	struct _starpu_prio_deque * queue = &data->prio;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 	int ret = 0;
 	const double now = starpu_timing_now();
 	STARPU_COMPONENT_MUTEX_LOCK(mutex);
 
 	double exp_len = NAN;
-	if(data->exp)
+
+	if (data->ntasks_threshold != 0 && queue->ntasks >= data->ntasks_threshold)
+	{
+		STARPU_ASSERT(!is_pushback);
+		ret = 1;
+		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
+	}
+	else if(data->exp)
 	{
 		if(!isnan(task->predicted))
-			exp_len = prio->exp_len + task->predicted;
+			exp_len = queue->exp_len + task->predicted;
 		else
-			exp_len = prio->exp_len;
+			exp_len = queue->exp_len;
 
-		if ((data->ntasks_threshold != 0 && prio->ntasks >= data->ntasks_threshold) || (data->exp_len_threshold != 0.0 && exp_len >= data->exp_len_threshold))
+		if (data->exp_len_threshold != 0.0 && exp_len >= data->exp_len_threshold)
 		{
 			static int warned;
 			if(data->exp_len_threshold != 0.0 && task->predicted > data->exp_len_threshold && !warned)
@@ -128,12 +135,12 @@ static int prio_push_local_task(struct starpu_sched_component * component, struc
 				_STARPU_DISP("Warning : a predicted task length (%lf) exceeds the expected length threshold (%lf) of a prio component queue, you should reconsider the value of this threshold. This message will not be printed again for further thresholds exceeding.\n",task->predicted,data->exp_len_threshold);
 				warned = 1;
 			}
+			STARPU_ASSERT(!is_pushback);
 			ret = 1;
 			STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 		}
 		else
 		{
-
 			if(!isnan(task->predicted_transfer))
 			{
 				double end = prio_estimated_end(component);
@@ -147,24 +154,24 @@ static int prio_push_local_task(struct starpu_sched_component * component, struc
 
 			if(!isnan(task->predicted))
 			{
-				prio->exp_len = exp_len;
-				prio->exp_end = prio->exp_start + prio->exp_len;
+				queue->exp_len = exp_len;
+				queue->exp_end = queue->exp_start + queue->exp_len;
 			}
-			STARPU_ASSERT(!isnan(prio->exp_end));
-			STARPU_ASSERT(!isnan(prio->exp_len));
-			STARPU_ASSERT(!isnan(prio->exp_start));
+			STARPU_ASSERT(!isnan(queue->exp_end));
+			STARPU_ASSERT(!isnan(queue->exp_len));
+			STARPU_ASSERT(!isnan(queue->exp_start));
 		}
 	}
 
 	if(!ret)
 	{
 		if(is_pushback)
-			ret = _starpu_prio_deque_push_front_task(prio,task);
+			ret = _starpu_prio_deque_push_front_task(queue,task);
 		else
 		{
-			ret = _starpu_prio_deque_push_back_task(prio,task);
+			ret = _starpu_prio_deque_push_back_task(queue,task);
 			starpu_sched_component_prefetch_on_node(component, task);
-			STARPU_TRACE_SCHED_COMPONENT_PUSH_PRIO(component, prio->ntasks, exp_len);
+			STARPU_TRACE_SCHED_COMPONENT_PUSH_PRIO(component, queue->ntasks, exp_len);
 		}
 		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 		if(!is_pushback)
@@ -184,11 +191,11 @@ static struct starpu_task * prio_pull_task(struct starpu_sched_component * compo
 {
 	STARPU_ASSERT(component && component->data);
 	struct _starpu_prio_data * data = component->data;
-	struct _starpu_prio_deque * prio = &data->prio;
+	struct _starpu_prio_deque * queue = &data->prio;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 	const double now = starpu_timing_now();
 
-	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_prio_deque_is_empty(prio))
+	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_prio_deque_is_empty(queue))
 	{
 		starpu_sched_component_send_can_push_to_parents(component);
 		return NULL;
@@ -197,49 +204,50 @@ static struct starpu_task * prio_pull_task(struct starpu_sched_component * compo
 	STARPU_COMPONENT_MUTEX_LOCK(mutex);
 	struct starpu_task * task;
 	if (data->ready && to->properties & STARPU_SCHED_COMPONENT_SINGLE_MEMORY_NODE)
-		task = _starpu_prio_deque_deque_first_ready_task(prio, starpu_bitmap_first(to->workers_in_ctx));
+		task = _starpu_prio_deque_deque_first_ready_task(queue, starpu_bitmap_first(to->workers_in_ctx));
 	else
-		task = _starpu_prio_deque_pop_task(prio);
+		task = _starpu_prio_deque_pop_task(queue);
 	if(task && data->exp)
 	{
 		if(!isnan(task->predicted))
 		{
-			const double exp_len = prio->exp_len - task->predicted;
-			prio->exp_start = now + task->predicted;
+			const double exp_len = queue->exp_len - task->predicted;
+			queue->exp_start = now + task->predicted;
 			if (exp_len >= 0.0)
 			{
-				prio->exp_len = exp_len;
+				queue->exp_len = exp_len;
 			}
 			else
 			{
 				/* exp_len can become negative due to rounding errors */
-				prio->exp_len = 0.0;
+				queue->exp_len = 0.0;
 			}
 		}
-		STARPU_ASSERT_MSG(prio->exp_len>=0, "prio->exp_len=%lf\n",prio->exp_len);
+
+		STARPU_ASSERT_MSG(queue->exp_len>=0, "prio->exp_len=%lf\n",queue->exp_len);
 		if(!isnan(task->predicted_transfer))
 		{
-			if (prio->exp_len > task->predicted_transfer)
+			if (queue->exp_len > task->predicted_transfer)
 			{
-				prio->exp_start += task->predicted_transfer;
-				prio->exp_len -= task->predicted_transfer;
+				queue->exp_start += task->predicted_transfer;
+				queue->exp_len -= task->predicted_transfer;
 			}
 			else
 			{
-				prio->exp_start += prio->exp_len;
-				prio->exp_len = 0;
+				queue->exp_start += queue->exp_len;
+				queue->exp_len = 0;
 			}
 		}
 
-		prio->exp_end = prio->exp_start + prio->exp_len;
-		if(prio->ntasks == 0)
-			prio->exp_len = 0.0;
+		queue->exp_end = queue->exp_start + queue->exp_len;
+		if(queue->ntasks == 0)
+			queue->exp_len = 0.0;
 	}
 	if(task)
-		STARPU_TRACE_SCHED_COMPONENT_POP_PRIO(component, prio->ntasks, prio->exp_len);
-	STARPU_ASSERT(!isnan(prio->exp_end));
-	STARPU_ASSERT(!isnan(prio->exp_len));
-	STARPU_ASSERT(!isnan(prio->exp_start));
+		STARPU_TRACE_SCHED_COMPONENT_POP_PRIO(component, queue->ntasks, queue->exp_len);
+	STARPU_ASSERT(!isnan(queue->exp_end));
+	STARPU_ASSERT(!isnan(queue->exp_len));
+	STARPU_ASSERT(!isnan(queue->exp_start));
 	STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 
 	// When a pop is called, a can_push is called for pushing tasks onto