Browse Source

factorize fifo estimation movements

Samuel Thibault 8 years ago
parent
commit
93f112aabd
1 changed files with 62 additions and 75 deletions
  1. 62 75
      src/sched_policies/deque_modeling_policy_data_aware.c

+ 62 - 75
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -125,6 +125,63 @@ static int _normalize_prio(int priority, int num_priorities, unsigned sched_ctx_
 	return ((num_priorities-1)/(max-min)) * (priority - min);
 }
 
+/* This is called when a transfer request is actually pushed to the worker */
+static void _starpu_fifo_task_transfer_started(struct _starpu_fifo_taskq *fifo, struct starpu_task *task, int num_priorities)
+{
+	double transfer_model = task->predicted_transfer;
+	if (isnan(transfer_model))
+		return;
+
+	/* We now start the transfer, move it from predicted to pipelined */
+	fifo->exp_len -= transfer_model;
+	fifo->pipeline_len += transfer_model;
+	fifo->exp_start = starpu_timing_now() + fifo->pipeline_len;
+	fifo->exp_end = fifo->exp_start + fifo->exp_len;
+	if(num_priorities != -1)
+	{
+		int i;
+		int task_prio = _normalize_prio(task->priority, num_priorities, task->sched_ctx);
+		for(i = 0; i <= task_prio; i++)
+			fifo->exp_len_per_priority[i] -= transfer_model;
+	}
+}
+
+/* This is called when a task is actually pushed to the worker (i.e. the transfer finished */
+static void _starpu_fifo_task_started(struct _starpu_fifo_taskq *fifo, struct starpu_task *task, int num_priorities)
+{
+	double model = task->predicted;
+	double transfer_model = task->predicted_transfer;
+	if(!isnan(transfer_model))
+		/* The transfer is over, remove it from pipelined */
+		fifo->pipeline_len -= transfer_model;
+
+	if(!isnan(model))
+	{
+		/* We now start the computation, move it from predicted to pipelined */
+		fifo->exp_len -= model;
+		fifo->pipeline_len += model;
+		fifo->exp_start = starpu_timing_now() + fifo->pipeline_len;
+                fifo->exp_end= fifo->exp_start + fifo->exp_len;
+		if(num_priorities != -1)
+		{
+			int i;
+			int task_prio = _normalize_prio(task->priority, num_priorities, task->sched_ctx);
+			for(i = 0; i <= task_prio; i++)
+				fifo->exp_len_per_priority[i] -= model;
+		}
+	}
+}
+
+/* This is called when a task is actually finished */
+static void _starpu_fifo_task_finished(struct _starpu_fifo_taskq *fifo, struct starpu_task *task, int num_priorities STARPU_ATTRIBUTE_UNUSED)
+{
+	if(!isnan(task->predicted))
+		/* The execution is over, remove it from pipelined */
+		fifo->pipeline_len -= task->predicted;
+}
+
+
+
 static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo_taskq *fifo_queue, unsigned node, int num_priorities)
 {
 	struct starpu_task *task = NULL, *current;
@@ -197,22 +254,7 @@ static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
 	task = _starpu_fifo_pop_first_ready_task(fifo, node, dt->num_priorities);
 	if (task)
 	{
-		double transfer_model = task->predicted_transfer;
-		if(!isnan(transfer_model))
-		{
-			/* We now start the transfer, move it from predicted to pipelined */
-			fifo->exp_len -= transfer_model;
-			fifo->pipeline_len += transfer_model;
-			fifo->exp_start = starpu_timing_now() + fifo->pipeline_len;
-			fifo->exp_end = fifo->exp_start + fifo->exp_len;
-			if(dt->num_priorities != -1)
-			{
-				int i;
-				int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
-				for(i = 0; i <= task_prio; i++)
-					fifo->exp_len_per_priority[i] -= transfer_model;
-			}
-		}
+		_starpu_fifo_task_transfer_started(fifo, task, dt->num_priorities);
 
 		starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
 
@@ -248,22 +290,7 @@ static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
 	task = _starpu_fifo_pop_local_task(fifo);
 	if (task)
 	{
-		double transfer_model = task->predicted_transfer;
-		if(!isnan(transfer_model))
-		{
-			/* We now start the transfer, move it from predicted to pipelined */
-			fifo->exp_len -= transfer_model;
-			fifo->pipeline_len += transfer_model;
-			fifo->exp_start = starpu_timing_now() + fifo->pipeline_len;
-			fifo->exp_end = fifo->exp_start + fifo->exp_len;
-			if(dt->num_priorities != -1)
-			{
-				int i;
-				int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
-				for(i = 0; i <= task_prio; i++)
-					fifo->exp_len_per_priority[i] -= transfer_model;
-			}
-		}
+		_starpu_fifo_task_transfer_started(fifo, task, dt->num_priorities);
 
 		starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
 		  
@@ -304,25 +331,7 @@ static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
 	starpu_sched_ctx_list_task_counters_reset(sched_ctx_id, workerid);
 
 	for (task = new_list; task; task = task->next)
-	{
-		double transfer_model = task->predicted_transfer;
-		if(!isnan(transfer_model))
-		{
-			/* We now start the transfer, move it from predicted to pipelined */
-			fifo->exp_len -= transfer_model;
-			fifo->pipeline_len += transfer_model;
-			fifo->exp_start = starpu_timing_now() + fifo->pipeline_len;
-			fifo->exp_end = fifo->exp_start + fifo->exp_len;
-			if(dt->num_priorities != -1)
-			{
-				int i;
-				int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
-				for(i = 0; i < task_prio; i++)
-					fifo->exp_len_per_priority[i] -= transfer_model;
-			}
-		
-		}
-	}
+		_starpu_fifo_task_transfer_started(fifo, task, dt->num_priorities);
 
 	return new_list;
 }
@@ -1092,8 +1101,6 @@ static void dmda_pre_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
 	unsigned workerid = starpu_worker_get_id_check();
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
-	double model = task->predicted;
-	double transfer_model = task->predicted_transfer;
 
 	starpu_pthread_mutex_t *sched_mutex;
 	starpu_pthread_cond_t *sched_cond;
@@ -1103,29 +1110,11 @@ static void dmda_pre_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
 	 * of work. */
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
 
-	if(!isnan(transfer_model))
-		/* The transfer is over, remove it from pipelined */
-		fifo->pipeline_len -= transfer_model;
+	_starpu_fifo_task_started(fifo, task, dt->num_priorities);
 
 	/* Take the opportunity to update start time */
 	fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start) + fifo->pipeline_len;
 
-	if(!isnan(model))
-	{
-		/* We now start the computation, move it from predicted to pipelined */
-		fifo->exp_len -= model;
-		fifo->pipeline_len += model;
-		fifo->exp_start = starpu_timing_now() + fifo->pipeline_len;
-                fifo->exp_end= fifo->exp_start + fifo->exp_len;
-		if(dt->num_priorities != -1)
-		{
-			int i;
-			int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
-			for(i = 0; i <= task_prio; i++)
-				fifo->exp_len_per_priority[i] -= model;
-		}
-	}
-
 	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
 }
 
@@ -1218,9 +1207,7 @@ static void dmda_post_exec_hook(struct starpu_task * task, unsigned sched_ctx_id
 	starpu_pthread_cond_t *sched_cond;
 	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
-	if(!isnan(task->predicted))
-		/* The execution is over, remove it from pipelined */
-		fifo->pipeline_len -= task->predicted;
+	_starpu_fifo_task_finished(fifo, task, dt->num_priorities);
 	fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start) + fifo->pipeline_len;
 	fifo->exp_end = fifo->exp_start + fifo->exp_len;
 	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);