Sfoglia il codice sorgente

Start factorizing queue completion times between worker component and dmda

Samuel Thibault 8 anni fa
parent
commit
a94eec3bcb
1 ha cambiato i file con 90 aggiunte e 49 eliminazioni
  1. 90 49
      src/sched_policies/component_worker.c

+ 90 - 49
src/sched_policies/component_worker.c

@@ -99,6 +99,52 @@ struct _starpu_worker_task_list
 	starpu_pthread_mutex_t mutex;
 };
 
+/* This is called when a transfer request is actually pushed to the worker */
+static void _starpu_worker_task_list_transfer_started(struct _starpu_worker_task_list *l, struct starpu_task *task)
+{
+	double transfer_model = task->predicted_transfer;
+	if (isnan(transfer_model))
+		return;
+
+	/* We now start the transfer, move it from predicted to pipelined */
+	l->exp_len -= transfer_model;
+	l->pipeline_len += transfer_model;
+	l->exp_start = starpu_timing_now() + l->pipeline_len;
+	l->exp_end = l->exp_start + l->exp_len;
+
+}
+
+#ifdef STARPU_DEVEL
+#warning FIXME: merge with deque_modeling_policy_data_aware
+#endif
+/* This is called when a task is actually pushed to the worker (i.e. the transfer finished */
+static void _starpu_worker_task_list_started(struct _starpu_worker_task_list *l, struct starpu_task *task)
+{
+	double model = task->predicted;
+	double transfer_model = task->predicted_transfer;
+	if(!isnan(transfer_model))
+		/* The transfer is over, remove it from pipelined */
+		l->pipeline_len -= transfer_model;
+
+	if(!isnan(model))
+	{
+		/* We now start the computation, move it from predicted to pipelined */
+		l->exp_len -= model;
+		l->pipeline_len += model;
+		l->exp_start = starpu_timing_now() + l->pipeline_len;
+                l->exp_end= l->exp_start + l->exp_len;
+	}
+}
+
+/* This is called when a task is actually finished */
+static void _starpu_worker_task_list_finished(struct _starpu_worker_task_list *l, struct starpu_task *task)
+{
+	if(!isnan(task->predicted))
+		/* The execution is over, remove it from pipelined */
+		l->pipeline_len -= task->predicted;
+}
+
+
 enum _starpu_worker_component_status
 {
 	COMPONENT_STATUS_SLEEPING,
@@ -172,20 +218,10 @@ static void _starpu_worker_task_list_destroy(struct _starpu_worker_task_list * l
 	}
 }
 
-static inline void _starpu_worker_task_list_push(struct _starpu_worker_task_list * l, struct _starpu_task_grid * t)
+static inline void _starpu_worker_task_list_add(struct _starpu_worker_task_list * l, struct starpu_task *task)
 {
-/* the task, ntasks, pntasks, left and right members of t are set by the caller */
-	STARPU_ASSERT(t->task);
-	if(l->first == NULL)
-		l->first = l->last = t;
-	t->down = l->last;
-	l->last->up = t;
-	t->up = NULL;
-	l->last = t;
-	l->ntasks++;
-
-	double predicted = t->task->predicted;
-	double predicted_transfer = t->task->predicted_transfer;
+	double predicted = task->predicted;
+	double predicted_transfer = task->predicted_transfer;
 
 	/* Sometimes workers didn't take the tasks as early as we expected */
 	l->exp_start = STARPU_MAX(l->exp_start, starpu_timing_now());
@@ -216,8 +252,23 @@ static inline void _starpu_worker_task_list_push(struct _starpu_worker_task_list
 		l->exp_len += predicted;
 	}
 
-	t->task->predicted = predicted;
-	t->task->predicted_transfer = predicted_transfer;
+	task->predicted = predicted;
+	task->predicted_transfer = predicted_transfer;
+}
+
+static inline void _starpu_worker_task_list_push(struct _starpu_worker_task_list * l, struct _starpu_task_grid * t)
+{
+/* the task, ntasks, pntasks, left and right members of t are set by the caller */
+	STARPU_ASSERT(t->task);
+	if(l->first == NULL)
+		l->first = l->last = t;
+	t->down = l->last;
+	l->last->up = t;
+	t->up = NULL;
+	l->last = t;
+	l->ntasks++;
+
+	_starpu_worker_task_list_add(l, t->task);
 }
 
 /* recursively set left and right pointers to NULL */
@@ -469,9 +520,13 @@ static int simple_worker_push_task(struct starpu_sched_component * component, st
 		starpu_prefetch_task_input_on_node(task, memory_node);
 	}
 #endif
-	STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
-	_starpu_worker_task_list_push(data->list, t);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
+	struct _starpu_worker_task_list * list = data->list;
+	STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
+        /* Sometimes workers didn't take the tasks as early as we expected */
+	list->exp_start = isnan(list->exp_start) ? starpu_timing_now() + list->pipeline_len : STARPU_MAX(list->exp_start, starpu_timing_now());
+	list->exp_end = list->exp_start + list->exp_len;
+	_starpu_worker_task_list_push(list, t);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
 	simple_worker_can_pull(component);
 	return 0;
 }
@@ -482,10 +537,13 @@ static struct starpu_task * simple_worker_pull_task(struct starpu_sched_componen
 	struct _starpu_worker_component_data * data = component->data;
 	struct _starpu_worker_task_list * list = data->list;
 	STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
+	/* Take the opportunity to update start time */
+	data->list->exp_start = STARPU_MAX(starpu_timing_now(), data->list->exp_start);
 	struct starpu_task * task =  _starpu_worker_task_list_pop(list);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
 	if(task)
 	{
+		_starpu_worker_task_list_transfer_started(list, task);
 		starpu_push_task_end(task);
 		return task;
 	}
@@ -517,6 +575,8 @@ static struct starpu_task * simple_worker_pull_task(struct starpu_sched_componen
 	{
 		if(!starpu_worker_is_combined_worker(workerid))
 		{
+			_starpu_worker_task_list_add(list, task);
+			_starpu_worker_task_list_transfer_started(list, task);
 			starpu_push_task_end(task);
 			return task;
 		}
@@ -527,7 +587,11 @@ static struct starpu_task * simple_worker_pull_task(struct starpu_sched_componen
 
 	}
 	if(task)
+	{
+		_starpu_worker_task_list_add(list, task);
+		_starpu_worker_task_list_transfer_started(list, task);
 		starpu_push_task_end(task);
+	}
 	return task;
 }
 
@@ -535,7 +599,6 @@ static double simple_worker_estimated_end(struct starpu_sched_component * compon
 {
 	struct _starpu_worker_component_data * data = component->data;
 	STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
-	data->list->exp_start = STARPU_MAX(starpu_timing_now(), data->list->exp_start);
 	double tmp = data->list->exp_end = data->list->exp_start + data->list->exp_len;
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
 	return tmp;
@@ -843,35 +906,12 @@ int starpu_sched_component_worker_get_workerid(struct starpu_sched_component * w
 
 void starpu_sched_component_worker_pre_exec_hook(struct starpu_task * task, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
 {
-	double model = task->predicted;
-	double transfer_model = task->predicted_transfer;
-
-	if(!isnan(task->predicted) || !isnan(task->predicted_transfer))
-	{
-		struct _starpu_worker_task_list * list = _worker_get_list(sched_ctx_id);
-		STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
-
-		list->exp_start = STARPU_MAX(starpu_timing_now(), list->exp_start);
-
-		/* The transfer is over, get rid of it in the completion
-		 * prediction */
-		if (!isnan(transfer_model))
-			list->exp_len -= transfer_model;
-
-		if (!isnan(model))
-		{
-			/* We now start the computation, get rid of it in the
-			 * completion prediction */
-			list->exp_len -= model;
-			list->exp_start += model;
-		}
-		
-		if(list->ntasks == 0)
-			list->exp_len = 0.0;
-
-		list->exp_end = list->exp_start + list->exp_len;
-		STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
-	}
+	struct _starpu_worker_task_list * list = _worker_get_list(sched_ctx_id);
+	STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
+	_starpu_worker_task_list_started(list, task);
+	/* Take the opportunity to update start time */
+	list->exp_start = STARPU_MAX(starpu_timing_now() + list->pipeline_len, list->exp_start);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
 }
 
 void starpu_sched_component_worker_post_exec_hook(struct starpu_task * task, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
@@ -880,7 +920,8 @@ void starpu_sched_component_worker_post_exec_hook(struct starpu_task * task, uns
 		return;
 	struct _starpu_worker_task_list * list = _worker_get_list(sched_ctx_id);
 	STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
-	list->exp_start = starpu_timing_now();
+	_starpu_worker_task_list_finished(list, task);
+	list->exp_start = STARPU_MAX(starpu_timing_now() + list->pipeline_len, list->exp_start);
 	list->exp_end = list->exp_start + list->exp_len;
 	STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
 }