浏览代码

simplify computing pipelined length and clean a bit

Samuel Thibault 8 年之前
父节点
当前提交
f47836973e

+ 38 - 47
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -192,18 +192,18 @@ static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
 	unsigned node = starpu_worker_get_memory_node(workerid);
 	unsigned node = starpu_worker_get_memory_node(workerid);
 
 
 	/* Take the opportunity to update start time */
 	/* Take the opportunity to update start time */
-	fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start);
+	fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start) + fifo->pipeline_len;
 
 
 	task = _starpu_fifo_pop_first_ready_task(fifo, node, dt->num_priorities);
 	task = _starpu_fifo_pop_first_ready_task(fifo, node, dt->num_priorities);
 	if (task)
 	if (task)
 	{
 	{
-		/* We now start the transfer, get rid of it in the completion
-		 * prediction */
 		double transfer_model = task->predicted_transfer;
 		double transfer_model = task->predicted_transfer;
-		if(!isnan(transfer_model)) 
+		if(!isnan(transfer_model))
 		{
 		{
+			/* We now start the transfer, move it from predicted to pipelined */
 			fifo->exp_len -= transfer_model;
 			fifo->exp_len -= transfer_model;
-			fifo->exp_start = starpu_timing_now() + transfer_model;
+			fifo->pipeline_len += transfer_model;
+			fifo->exp_start = starpu_timing_now() + fifo->pipeline_len;
 			fifo->exp_end = fifo->exp_start + fifo->exp_len;
 			fifo->exp_end = fifo->exp_start + fifo->exp_len;
 			if(dt->num_priorities != -1)
 			if(dt->num_priorities != -1)
 			{
 			{
@@ -212,9 +212,6 @@ static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
 				for(i = 0; i <= task_prio; i++)
 				for(i = 0; i <= task_prio; i++)
 					fifo->exp_len_per_priority[i] -= transfer_model;
 					fifo->exp_len_per_priority[i] -= transfer_model;
 			}
 			}
-
-			fifo->pipeline_len += task->predicted + transfer_model;
-			fifo->pipelined_tasks++;
 		}
 		}
 
 
 		starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
 		starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
@@ -244,7 +241,7 @@ static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
 	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
 	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
 
 
 	/* Take the opportunity to update start time */
 	/* Take the opportunity to update start time */
-	fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start);
+	fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start) + fifo->pipeline_len;
 
 
 	STARPU_ASSERT_MSG(fifo, "worker %u does not belong to ctx %u anymore.\n", workerid, sched_ctx_id);
 	STARPU_ASSERT_MSG(fifo, "worker %u does not belong to ctx %u anymore.\n", workerid, sched_ctx_id);
 
 
@@ -252,14 +249,12 @@ static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
 	if (task)
 	if (task)
 	{
 	{
 		double transfer_model = task->predicted_transfer;
 		double transfer_model = task->predicted_transfer;
-		/* We now start the transfer, get rid of it in the completion
-		 * prediction */
-
-		if(!isnan(transfer_model)) 
+		if(!isnan(transfer_model))
 		{
 		{
-			double model = task->predicted;
+			/* We now start the transfer, move it from predicted to pipelined */
 			fifo->exp_len -= transfer_model;
 			fifo->exp_len -= transfer_model;
-			fifo->exp_start = starpu_timing_now() + transfer_model+model;
+			fifo->pipeline_len += transfer_model;
+			fifo->exp_start = starpu_timing_now() + fifo->pipeline_len;
 			fifo->exp_end = fifo->exp_start + fifo->exp_len;
 			fifo->exp_end = fifo->exp_start + fifo->exp_len;
 			if(dt->num_priorities != -1)
 			if(dt->num_priorities != -1)
 			{
 			{
@@ -268,11 +263,8 @@ static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
 				for(i = 0; i <= task_prio; i++)
 				for(i = 0; i <= task_prio; i++)
 					fifo->exp_len_per_priority[i] -= transfer_model;
 					fifo->exp_len_per_priority[i] -= transfer_model;
 			}
 			}
-
-			fifo->pipeline_len += task->predicted + transfer_model;
-			fifo->pipelined_tasks++;
-
 		}
 		}
+
 		starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
 		starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
 		  
 		  
 #ifdef STARPU_VERBOSE
 #ifdef STARPU_VERBOSE
@@ -294,13 +286,13 @@ static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
 {
 {
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
 
-	struct starpu_task *new_list;
+	struct starpu_task *new_list, *task;
 
 
 	unsigned workerid = starpu_worker_get_id_check();
 	unsigned workerid = starpu_worker_get_id_check();
 	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
 	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
 
 
 	/* Take the opportunity to update start time */
 	/* Take the opportunity to update start time */
-	fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start);
+	fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start) + fifo->pipeline_len;
 
 
 	starpu_pthread_mutex_t *sched_mutex;
 	starpu_pthread_mutex_t *sched_mutex;
 	starpu_pthread_cond_t *sched_cond;
 	starpu_pthread_cond_t *sched_cond;
@@ -311,26 +303,25 @@ static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
 
 
 	starpu_sched_ctx_list_task_counters_reset(sched_ctx_id, workerid);
 	starpu_sched_ctx_list_task_counters_reset(sched_ctx_id, workerid);
 
 
-	while (new_list)
+	for (task = new_list; task; task = task->next)
 	{
 	{
-		double transfer_model = new_list->predicted_transfer;
-		/* We now start the transfer, get rid of it in the completion
-		 * prediction */
-		if(!isnan(transfer_model)) 
+		double transfer_model = task->predicted_transfer;
+		if(!isnan(transfer_model))
 		{
 		{
+			/* We now start the transfer, move it from predicted to pipelined */
 			fifo->exp_len -= transfer_model;
 			fifo->exp_len -= transfer_model;
-			fifo->exp_start = starpu_timing_now() + transfer_model;
+			fifo->pipeline_len += transfer_model;
+			fifo->exp_start = starpu_timing_now() + fifo->pipeline_len;
 			fifo->exp_end = fifo->exp_start + fifo->exp_len;
 			fifo->exp_end = fifo->exp_start + fifo->exp_len;
 			if(dt->num_priorities != -1)
 			if(dt->num_priorities != -1)
 			{
 			{
 				int i;
 				int i;
-				for(i = 0; i < new_list->priority; i++)
+				int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
+				for(i = 0; i < task_prio; i++)
 					fifo->exp_len_per_priority[i] -= transfer_model;
 					fifo->exp_len_per_priority[i] -= transfer_model;
 			}
 			}
 		
 		
 		}
 		}
-
-		new_list = new_list->next;
 	}
 	}
 
 
 	return new_list;
 	return new_list;
@@ -366,6 +357,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 
 
         /* Sometimes workers didn't take the tasks as early as we expected */
         /* Sometimes workers didn't take the tasks as early as we expected */
 	fifo->exp_start = isnan(fifo->exp_start) ? starpu_timing_now() : STARPU_MAX(fifo->exp_start, starpu_timing_now());
 	fifo->exp_start = isnan(fifo->exp_start) ? starpu_timing_now() : STARPU_MAX(fifo->exp_start, starpu_timing_now());
+	fifo->exp_start += fifo->pipeline_len;
 	fifo->exp_end = fifo->exp_start + fifo->exp_len;
 	fifo->exp_end = fifo->exp_start + fifo->exp_len;
 
 
 	if ((starpu_timing_now() + predicted_transfer) < fifo->exp_end)
 	if ((starpu_timing_now() + predicted_transfer) < fifo->exp_end)
@@ -507,6 +499,7 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 
 
 		/* Sometimes workers didn't take the tasks as early as we expected */
 		/* Sometimes workers didn't take the tasks as early as we expected */
 		double exp_start = isnan(fifo->exp_start) ? starpu_timing_now() : STARPU_MAX(fifo->exp_start, starpu_timing_now());
 		double exp_start = isnan(fifo->exp_start) ? starpu_timing_now() : STARPU_MAX(fifo->exp_start, starpu_timing_now());
+		exp_start += fifo->pipeline_len;
 
 
 		if (!starpu_worker_can_execute_task_impl(worker, task, &impl_mask))
 		if (!starpu_worker_can_execute_task_impl(worker, task, &impl_mask))
 			continue;
 			continue;
@@ -1100,6 +1093,7 @@ static void dmda_pre_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
 	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
 	double model = task->predicted;
 	double model = task->predicted;
+	double transfer_model = task->predicted_transfer;
 
 
 	starpu_pthread_mutex_t *sched_mutex;
 	starpu_pthread_mutex_t *sched_mutex;
 	starpu_pthread_cond_t *sched_cond;
 	starpu_pthread_cond_t *sched_cond;
@@ -1109,26 +1103,19 @@ static void dmda_pre_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
 	 * of work. */
 	 * of work. */
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
 
 
-	if(fifo->pipelined_tasks > 0)
-	{
-		/* decrement here bc we add the predicted exec time of the task to exp_start
-		   we don't want to add it twice */
-		if (!isnan(task->predicted))
-			fifo->pipeline_len -= task->predicted;
-		if(!isnan(task->predicted_transfer))
-			fifo->pipeline_len -= task->predicted_transfer;
-		fifo->pipelined_tasks--;
-	}
+	if(!isnan(transfer_model))
+		/* The transfer is over, remove it from pipelined */
+		fifo->pipeline_len -= transfer_model;
 
 
 	/* Take the opportunity to update start time */
 	/* Take the opportunity to update start time */
-	fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start);
+	fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start) + fifo->pipeline_len;
 
 
 	if(!isnan(model))
 	if(!isnan(model))
 	{
 	{
-		/* We now start the computation, get rid of it in the completion
-		 * prediction */
-		fifo->exp_len-= model;
-                fifo->exp_start = starpu_timing_now() + model;
+		/* We now start the computation, move it from predicted to pipelined */
+		fifo->exp_len -= model;
+		fifo->pipeline_len += model;
+		fifo->exp_start = starpu_timing_now() + fifo->pipeline_len;
                 fifo->exp_end= fifo->exp_start + fifo->exp_len;
                 fifo->exp_end= fifo->exp_start + fifo->exp_len;
 		if(dt->num_priorities != -1)
 		if(dt->num_priorities != -1)
 		{
 		{
@@ -1163,6 +1150,7 @@ static void dmda_push_task_notify(struct starpu_task *task, int workerid, int pe
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
 	/* Sometimes workers didn't take the tasks as early as we expected */
 	/* Sometimes workers didn't take the tasks as early as we expected */
 	fifo->exp_start = isnan(fifo->exp_start) ? starpu_timing_now() : STARPU_MAX(fifo->exp_start, starpu_timing_now());
 	fifo->exp_start = isnan(fifo->exp_start) ? starpu_timing_now() : STARPU_MAX(fifo->exp_start, starpu_timing_now());
+	fifo->exp_start += fifo->pipeline_len;
 	fifo->exp_end = fifo->exp_start + fifo->exp_len;
 	fifo->exp_end = fifo->exp_start + fifo->exp_len;
 
 
 	/* If there is no prediction available, we consider the task has a null length */
 	/* If there is no prediction available, we consider the task has a null length */
@@ -1221,7 +1209,7 @@ static void dmda_push_task_notify(struct starpu_task *task, int workerid, int pe
 	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
 	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
 }
 }
 
 
-static void dmda_post_exec_hook(struct starpu_task * task STARPU_ATTRIBUTE_UNUSED, unsigned sched_ctx_id)
+static void dmda_post_exec_hook(struct starpu_task * task, unsigned sched_ctx_id)
 {
 {
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	unsigned workerid = starpu_worker_get_id_check();
 	unsigned workerid = starpu_worker_get_id_check();
@@ -1230,7 +1218,10 @@ static void dmda_post_exec_hook(struct starpu_task * task STARPU_ATTRIBUTE_UNUSE
 	starpu_pthread_cond_t *sched_cond;
 	starpu_pthread_cond_t *sched_cond;
 	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
 	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
-	fifo->exp_start = starpu_timing_now();
+	if(!isnan(task->predicted))
+		/* The execution is over, remove it from pipelined */
+		fifo->pipeline_len -= task->predicted;
+	fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start) + fifo->pipeline_len;
 	fifo->exp_end = fifo->exp_start + fifo->exp_len;
 	fifo->exp_end = fifo->exp_start + fifo->exp_len;
 	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
 	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
 }
 }

+ 0 - 1
src/sched_policies/fifo_queues.c

@@ -58,7 +58,6 @@ struct _starpu_fifo_taskq *_starpu_create_fifo(void)
 	fifo->exp_end = fifo->exp_start;
 	fifo->exp_end = fifo->exp_start;
 	fifo->exp_len_per_priority = NULL;
 	fifo->exp_len_per_priority = NULL;
 	fifo->pipeline_len = 0.0;
 	fifo->pipeline_len = 0.0;
-	fifo->pipelined_tasks = 0;
 
 
 	return fifo;
 	return fifo;
 }
 }

+ 1 - 2
src/sched_policies/fifo_queues.h

@@ -43,8 +43,7 @@ struct _starpu_fifo_taskq
 	double exp_end; /* Expected end date of last task in the queue */
 	double exp_end; /* Expected end date of last task in the queue */
 	double exp_len; /* Expected duration of the set of tasks in the queue */
 	double exp_len; /* Expected duration of the set of tasks in the queue */
 	double *exp_len_per_priority; /* Expected duration of the set of tasks in the queue corresponding to each priority */
 	double *exp_len_per_priority; /* Expected duration of the set of tasks in the queue corresponding to each priority */
-	double pipeline_len; /* the expected the length of the pipelined tasks */
-	int pipelined_tasks; /* the expected no of pipelined tasks */
+	double pipeline_len; /* the expected duration of what is already pushed to the worker */
 };
 };
 
 
 struct _starpu_fifo_taskq*_starpu_create_fifo(void) STARPU_ATTRIBUTE_MALLOC;
 struct _starpu_fifo_taskq*_starpu_create_fifo(void) STARPU_ATTRIBUTE_MALLOC;