Browse Source

component_fifo/prio: harmonize

Samuel Thibault 5 years ago
parent
commit
121b5d6e24
2 changed files with 75 additions and 74 deletions
  1. 36 36
      src/sched_policies/component_fifo.c
  2. 39 38
      src/sched_policies/component_prio.c

+ 36 - 36
src/sched_policies/component_fifo.c

@@ -44,8 +44,8 @@ static double fifo_estimated_end(struct starpu_sched_component * component)
 {
 	STARPU_ASSERT(component && component->data);
 	struct _starpu_fifo_data * data = component->data;
-	struct _starpu_fifo_taskq * fifo = data->fifo;
-	return starpu_sched_component_estimated_end_min_add(component, fifo->exp_len);
+	struct _starpu_fifo_taskq * queue = data->fifo;
+	return starpu_sched_component_estimated_end_min_add(component, queue->exp_len);
 }
 
 static double fifo_estimated_load(struct starpu_sched_component * component)
@@ -53,7 +53,7 @@ static double fifo_estimated_load(struct starpu_sched_component * component)
 	STARPU_ASSERT(component && component->data);
 	STARPU_ASSERT(starpu_bitmap_cardinal(component->workers_in_ctx) != 0);
 	struct _starpu_fifo_data * data = component->data;
-	struct _starpu_fifo_taskq * fifo = data->fifo;
+	struct _starpu_fifo_taskq * queue = data->fifo;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 	double relative_speedup = 0.0;
 	double load = starpu_sched_component_estimated_load(component);
@@ -62,7 +62,7 @@ static double fifo_estimated_load(struct starpu_sched_component * component)
 		int first_worker = starpu_bitmap_first(component->workers_in_ctx);
 		relative_speedup = starpu_worker_get_relative_speedup(starpu_worker_get_perf_archtype(first_worker, component->tree->sched_ctx_id));
 		STARPU_COMPONENT_MUTEX_LOCK(mutex);
-		load += fifo->ntasks / relative_speedup;
+		load += queue->ntasks / relative_speedup;
 		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 		return load;
 	}
@@ -76,7 +76,7 @@ static double fifo_estimated_load(struct starpu_sched_component * component)
 		relative_speedup /= starpu_bitmap_cardinal(component->workers_in_ctx);
 		STARPU_ASSERT(!_STARPU_IS_ZERO(relative_speedup));
 		STARPU_COMPONENT_MUTEX_LOCK(mutex);
-		load += fifo->ntasks / relative_speedup;
+		load += queue->ntasks / relative_speedup;
 		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 	}
 	return load;
@@ -87,13 +87,13 @@ static int fifo_push_local_task(struct starpu_sched_component * component, struc
 	STARPU_ASSERT(component && component->data && task);
 	STARPU_ASSERT(starpu_sched_component_can_execute_task(component,task));
 	struct _starpu_fifo_data * data = component->data;
-	struct _starpu_fifo_taskq * fifo = data->fifo;
+	struct _starpu_fifo_taskq * queue = data->fifo;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 	int ret = 0;
 	const double now = starpu_timing_now();
 	STARPU_COMPONENT_MUTEX_LOCK(mutex);
 
-	if (data->ntasks_threshold != 0 && fifo->ntasks >= data->ntasks_threshold)
+	if (data->ntasks_threshold != 0 && queue->ntasks >= data->ntasks_threshold)
 	{
 		STARPU_ASSERT(!is_pushback);
 		ret = 1;
@@ -103,9 +103,9 @@ static int fifo_push_local_task(struct starpu_sched_component * component, struc
 	{
 		double exp_len;
 		if(!isnan(task->predicted))
-			exp_len = fifo->exp_len + task->predicted;
+			exp_len = queue->exp_len + task->predicted;
 		else
-			exp_len = fifo->exp_len;
+			exp_len = queue->exp_len;
 
 		if (data->exp_len_threshold != 0.0 && exp_len >= data->exp_len_threshold)
 		{
@@ -134,22 +134,22 @@ static int fifo_push_local_task(struct starpu_sched_component * component, struc
 
 			if(!isnan(task->predicted))
 			{
-				fifo->exp_len = exp_len;
-				fifo->exp_end = fifo->exp_start + fifo->exp_len;
+				queue->exp_len = exp_len;
+				queue->exp_end = queue->exp_start + queue->exp_len;
 			}
-			STARPU_ASSERT(!isnan(fifo->exp_end));
-			STARPU_ASSERT(!isnan(fifo->exp_len));
-			STARPU_ASSERT(!isnan(fifo->exp_start));
+			STARPU_ASSERT(!isnan(queue->exp_end));
+			STARPU_ASSERT(!isnan(queue->exp_len));
+			STARPU_ASSERT(!isnan(queue->exp_start));
 		}
 	}
 
 	if(!ret)
 	{
 		if(is_pushback)
-			ret = _starpu_fifo_push_back_task(fifo,task);
+			ret = _starpu_fifo_push_back_task(queue,task);
 		else
 		{
-			ret = _starpu_fifo_push_task(fifo,task);
+			ret = _starpu_fifo_push_task(queue,task);
 			starpu_sched_component_prefetch_on_node(component, task);
 		}
 		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
@@ -169,11 +169,11 @@ static struct starpu_task * fifo_pull_task(struct starpu_sched_component * compo
 {
 	STARPU_ASSERT(component && component->data);
 	struct _starpu_fifo_data * data = component->data;
-	struct _starpu_fifo_taskq * fifo = data->fifo;
+	struct _starpu_fifo_taskq * queue = data->fifo;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 	const double now = starpu_timing_now();
 
-	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_fifo_empty(data->fifo))
+	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_fifo_empty(queue))
 	{
 		starpu_sched_component_send_can_push_to_parents(component);
 		return NULL;
@@ -182,48 +182,48 @@ static struct starpu_task * fifo_pull_task(struct starpu_sched_component * compo
 	STARPU_COMPONENT_MUTEX_LOCK(mutex);
 	struct starpu_task * task;
 	if (data->ready && to->properties & STARPU_SCHED_COMPONENT_SINGLE_MEMORY_NODE)
-		task = _starpu_fifo_pop_first_ready_task(fifo, starpu_bitmap_first(to->workers_in_ctx), -1);
+		task = _starpu_fifo_pop_first_ready_task(queue, starpu_bitmap_first(to->workers_in_ctx), -1);
 	else
-		task = _starpu_fifo_pop_task(fifo, starpu_worker_get_id_check());
+		task = _starpu_fifo_pop_task(queue, starpu_worker_get_id_check());
 	if(task && data->exp)
 	{
 		if(!isnan(task->predicted))
 		{
-			const double exp_len = fifo->exp_len - task->predicted;
-			fifo->exp_start = now + task->predicted;
+			const double exp_len = queue->exp_len - task->predicted;
+			queue->exp_start = now + task->predicted;
 			if (exp_len >= 0.0)
 			{
-				fifo->exp_len = exp_len;
+				queue->exp_len = exp_len;
 			}
 			else
 			{
 				/* exp_len can become negative due to rounding errors */
-				fifo->exp_len = 0.0;
+				queue->exp_len = 0.0;
 			}
 		}
 
-		STARPU_ASSERT_MSG(fifo->exp_len>=0, "fifo->exp_len=%lf\n",fifo->exp_len);
+		STARPU_ASSERT_MSG(queue->exp_len>=0, "fifo->exp_len=%lf\n",queue->exp_len);
 		if(!isnan(task->predicted_transfer))
 		{
-			if (fifo->exp_len > task->predicted_transfer)
+			if (queue->exp_len > task->predicted_transfer)
 			{
-				fifo->exp_start += task->predicted_transfer;
-				fifo->exp_len -= task->predicted_transfer;
+				queue->exp_start += task->predicted_transfer;
+				queue->exp_len -= task->predicted_transfer;
 			}
 			else
 			{
-				fifo->exp_start += fifo->exp_len;
-				fifo->exp_len = 0;
+				queue->exp_start += queue->exp_len;
+				queue->exp_len = 0;
 			}
 		}
 
-		fifo->exp_end = fifo->exp_start + fifo->exp_len;
-		if(fifo->ntasks == 0)
-			fifo->exp_len = 0.0;
+		queue->exp_end = queue->exp_start + queue->exp_len;
+		if(queue->ntasks == 0)
+			queue->exp_len = 0.0;
 	}
-	STARPU_ASSERT(!isnan(fifo->exp_end));
-	STARPU_ASSERT(!isnan(fifo->exp_len));
-	STARPU_ASSERT(!isnan(fifo->exp_start));
+	STARPU_ASSERT(!isnan(queue->exp_end));
+	STARPU_ASSERT(!isnan(queue->exp_len));
+	STARPU_ASSERT(!isnan(queue->exp_start));
 	STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 
 	// When a pop is called, a can_push is called for pushing tasks onto

+ 39 - 38
src/sched_policies/component_prio.c

@@ -63,8 +63,8 @@ static double prio_estimated_end(struct starpu_sched_component * component)
 {
 	STARPU_ASSERT(component && component->data);
 	struct _starpu_prio_data * data = component->data;
-	struct _starpu_prio_deque * prio = &data->prio;
-	return starpu_sched_component_estimated_end_min_add(component, prio->exp_len);
+	struct _starpu_prio_deque * queue = &data->prio;
+	return starpu_sched_component_estimated_end_min_add(component, queue->exp_len);
 }
 
 static double prio_estimated_load(struct starpu_sched_component * component)
@@ -72,7 +72,7 @@ static double prio_estimated_load(struct starpu_sched_component * component)
 	STARPU_ASSERT(component && component->data);
 	STARPU_ASSERT(starpu_bitmap_cardinal(component->workers_in_ctx) != 0);
 	struct _starpu_prio_data * data = component->data;
-	struct _starpu_prio_deque * prio = &data->prio;
+	struct _starpu_prio_deque * queue = &data->prio;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 	double relative_speedup = 0.0;
 	double load = starpu_sched_component_estimated_load(component);
@@ -81,7 +81,7 @@ static double prio_estimated_load(struct starpu_sched_component * component)
 		int first_worker = starpu_bitmap_first(component->workers_in_ctx);
 		relative_speedup = starpu_worker_get_relative_speedup(starpu_worker_get_perf_archtype(first_worker, component->tree->sched_ctx_id));
 		STARPU_COMPONENT_MUTEX_LOCK(mutex);
-		load += prio->ntasks / relative_speedup;
+		load += queue->ntasks / relative_speedup;
 		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 		return load;
 	}
@@ -95,7 +95,7 @@ static double prio_estimated_load(struct starpu_sched_component * component)
 		relative_speedup /= starpu_bitmap_cardinal(component->workers_in_ctx);
 		STARPU_ASSERT(!_STARPU_IS_ZERO(relative_speedup));
 		STARPU_COMPONENT_MUTEX_LOCK(mutex);
-		load += prio->ntasks / relative_speedup;
+		load += queue->ntasks / relative_speedup;
 		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 	}
 	return load;
@@ -106,7 +106,7 @@ static int prio_push_local_task(struct starpu_sched_component * component, struc
 	STARPU_ASSERT(component && component->data && task);
 	STARPU_ASSERT(starpu_sched_component_can_execute_task(component,task));
 	struct _starpu_prio_data * data = component->data;
-	struct _starpu_prio_deque * prio = &data->prio;
+	struct _starpu_prio_deque * queue = &data->prio;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 	int ret = 0;
 	const double now = starpu_timing_now();
@@ -114,7 +114,7 @@ static int prio_push_local_task(struct starpu_sched_component * component, struc
 
 	double exp_len = NAN;
 
-	if (data->ntasks_threshold != 0 && prio->ntasks >= data->ntasks_threshold)
+	if (data->ntasks_threshold != 0 && queue->ntasks >= data->ntasks_threshold)
 	{
 		STARPU_ASSERT(!is_pushback);
 		ret = 1;
@@ -123,9 +123,9 @@ static int prio_push_local_task(struct starpu_sched_component * component, struc
 	else if(data->exp)
 	{
 		if(!isnan(task->predicted))
-			exp_len = prio->exp_len + task->predicted;
+			exp_len = queue->exp_len + task->predicted;
 		else
-			exp_len = prio->exp_len;
+			exp_len = queue->exp_len;
 
 		if (data->exp_len_threshold != 0.0 && exp_len >= data->exp_len_threshold)
 		{
@@ -154,24 +154,24 @@ static int prio_push_local_task(struct starpu_sched_component * component, struc
 
 			if(!isnan(task->predicted))
 			{
-				prio->exp_len = exp_len;
-				prio->exp_end = prio->exp_start + prio->exp_len;
+				queue->exp_len = exp_len;
+				queue->exp_end = queue->exp_start + queue->exp_len;
 			}
-			STARPU_ASSERT(!isnan(prio->exp_end));
-			STARPU_ASSERT(!isnan(prio->exp_len));
-			STARPU_ASSERT(!isnan(prio->exp_start));
+			STARPU_ASSERT(!isnan(queue->exp_end));
+			STARPU_ASSERT(!isnan(queue->exp_len));
+			STARPU_ASSERT(!isnan(queue->exp_start));
 		}
 	}
 
 	if(!ret)
 	{
 		if(is_pushback)
-			ret = _starpu_prio_deque_push_front_task(prio,task);
+			ret = _starpu_prio_deque_push_front_task(queue,task);
 		else
 		{
-			ret = _starpu_prio_deque_push_back_task(prio,task);
+			ret = _starpu_prio_deque_push_back_task(queue,task);
 			starpu_sched_component_prefetch_on_node(component, task);
-			STARPU_TRACE_SCHED_COMPONENT_PUSH_PRIO(component, prio->ntasks, exp_len);
+			STARPU_TRACE_SCHED_COMPONENT_PUSH_PRIO(component, queue->ntasks, exp_len);
 		}
 		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 		if(!is_pushback)
@@ -191,11 +191,11 @@ static struct starpu_task * prio_pull_task(struct starpu_sched_component * compo
 {
 	STARPU_ASSERT(component && component->data);
 	struct _starpu_prio_data * data = component->data;
-	struct _starpu_prio_deque * prio = &data->prio;
+	struct _starpu_prio_deque * queue = &data->prio;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 	const double now = starpu_timing_now();
 
-	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_prio_deque_is_empty(prio))
+	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_prio_deque_is_empty(queue))
 	{
 		starpu_sched_component_send_can_push_to_parents(component);
 		return NULL;
@@ -204,49 +204,50 @@ static struct starpu_task * prio_pull_task(struct starpu_sched_component * compo
 	STARPU_COMPONENT_MUTEX_LOCK(mutex);
 	struct starpu_task * task;
 	if (data->ready && to->properties & STARPU_SCHED_COMPONENT_SINGLE_MEMORY_NODE)
-		task = _starpu_prio_deque_deque_first_ready_task(prio, starpu_bitmap_first(to->workers_in_ctx));
+		task = _starpu_prio_deque_deque_first_ready_task(queue, starpu_bitmap_first(to->workers_in_ctx));
 	else
-		task = _starpu_prio_deque_pop_task(prio);
+		task = _starpu_prio_deque_pop_task(queue);
 	if(task && data->exp)
 	{
 		if(!isnan(task->predicted))
 		{
-			const double exp_len = prio->exp_len - task->predicted;
-			prio->exp_start = now + task->predicted;
+			const double exp_len = queue->exp_len - task->predicted;
+			queue->exp_start = now + task->predicted;
 			if (exp_len >= 0.0)
 			{
-				prio->exp_len = exp_len;
+				queue->exp_len = exp_len;
 			}
 			else
 			{
 				/* exp_len can become negative due to rounding errors */
-				prio->exp_len = 0.0;
+				queue->exp_len = 0.0;
 			}
 		}
-		STARPU_ASSERT_MSG(prio->exp_len>=0, "prio->exp_len=%lf\n",prio->exp_len);
+
+		STARPU_ASSERT_MSG(queue->exp_len>=0, "prio->exp_len=%lf\n",queue->exp_len);
 		if(!isnan(task->predicted_transfer))
 		{
-			if (prio->exp_len > task->predicted_transfer)
+			if (queue->exp_len > task->predicted_transfer)
 			{
-				prio->exp_start += task->predicted_transfer;
-				prio->exp_len -= task->predicted_transfer;
+				queue->exp_start += task->predicted_transfer;
+				queue->exp_len -= task->predicted_transfer;
 			}
 			else
 			{
-				prio->exp_start += prio->exp_len;
-				prio->exp_len = 0;
+				queue->exp_start += queue->exp_len;
+				queue->exp_len = 0;
 			}
 		}
 
-		prio->exp_end = prio->exp_start + prio->exp_len;
-		if(prio->ntasks == 0)
-			prio->exp_len = 0.0;
+		queue->exp_end = queue->exp_start + queue->exp_len;
+		if(queue->ntasks == 0)
+			queue->exp_len = 0.0;
 	}
 	if(task)
-		STARPU_TRACE_SCHED_COMPONENT_POP_PRIO(component, prio->ntasks, prio->exp_len);
-	STARPU_ASSERT(!isnan(prio->exp_end));
-	STARPU_ASSERT(!isnan(prio->exp_len));
-	STARPU_ASSERT(!isnan(prio->exp_start));
+		STARPU_TRACE_SCHED_COMPONENT_POP_PRIO(component, queue->ntasks, queue->exp_len);
+	STARPU_ASSERT(!isnan(queue->exp_end));
+	STARPU_ASSERT(!isnan(queue->exp_len));
+	STARPU_ASSERT(!isnan(queue->exp_start));
 	STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 
 	// When a pop is called, a can_push is called for pushing tasks onto