Browse Source

Fix Fix parallelism in modular-eager-prefetch: the eager component should pull from can_push emitters, instead of pushing from above

Samuel Thibault 7 years ago
parent
commit
7fd78a9baa
1 changed files with 62 additions and 2 deletions
  1. 62 2
      src/sched_policies/component_eager.c

+ 62 - 2
src/sched_policies/component_eager.c

@@ -2,7 +2,7 @@
  *
  * Copyright (C) 2013                                     Inria
  * Copyright (C) 2017                                     CNRS
- * Copyright (C) 2014-2017                                Université de Bordeaux
+ * Copyright (C) 2014-2018                                Université de Bordeaux
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -19,12 +19,41 @@
 #include <starpu_sched_component.h>
 #include <starpu_scheduler.h>
 
+struct _starpu_eager_data
+{
+	struct starpu_sched_component *target;
+	starpu_pthread_mutex_t scheduling_mutex;
+};
+
 static int eager_push_task(struct starpu_sched_component * component, struct starpu_task * task)
 {
 	int ret;
 	STARPU_ASSERT(component && task && starpu_sched_component_is_eager(component));
 	STARPU_ASSERT(starpu_sched_component_can_execute_task(component,task));
-	
+	struct _starpu_eager_data *d = component->data;
+
+	if (d->target)
+	{
+		/* target told us we could push to it, try to */
+		int idworker;
+		for(idworker = starpu_bitmap_first(d->target->workers);
+			idworker != -1;
+			idworker = starpu_bitmap_next(d->target->workers, idworker))
+		{
+			int nimpl;
+			for(nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
+			{
+				if(starpu_worker_can_execute_task(idworker,task,nimpl)
+				   || starpu_combined_worker_can_execute_task(idworker, task, nimpl))
+				{
+					ret = starpu_sched_component_push_task(component,d->target,task);
+					if (!ret)
+						return 0;
+				}
+			}
+		}
+	}
+
 	int workerid;
 	for(workerid = starpu_bitmap_first(component->workers_in_ctx);
 	    workerid != -1;
@@ -66,6 +95,29 @@ static int eager_push_task(struct starpu_sched_component * component, struct sta
 	return 1;
 }
 
+/* Note: we can't use starpu_sched_component_pump_to because if a fifo below
+ * refuses a task, we have no way to push it back to a fifo above. */
+static int eager_can_push(struct starpu_sched_component * component, struct starpu_sched_component * to)
+{
+	int success;
+	struct _starpu_eager_data *d = component->data;
+	STARPU_COMPONENT_MUTEX_LOCK(&d->scheduling_mutex);
+	/* Target flow of tasks to this child */
+	d->target = to;
+	success = starpu_sched_component_can_push(component, to);
+	d->target = NULL;
+	STARPU_COMPONENT_MUTEX_UNLOCK(&d->scheduling_mutex);
+	return success;
+}
+
+static void eager_deinit_data(struct starpu_sched_component *component)
+{
+	STARPU_ASSERT(starpu_sched_component_is_eager(component));
+	struct _starpu_eager_data *d = component->data;
+	STARPU_PTHREAD_MUTEX_DESTROY(&d->scheduling_mutex);
+	free(d);
+}
+
 int starpu_sched_component_is_eager(struct starpu_sched_component * component)
 {
 	return component->push_task == eager_push_task;
@@ -75,7 +127,15 @@ struct starpu_sched_component * starpu_sched_component_eager_create(struct starp
 {
 	(void)arg;
 	struct starpu_sched_component * component = starpu_sched_component_create(tree, "eager");
+	struct _starpu_eager_data *data;
+	_STARPU_MALLOC(data, sizeof(*data));
+	data->target = NULL;
+	STARPU_PTHREAD_MUTEX_INIT(&data->scheduling_mutex, NULL);
+
+	component->data = data;
 	component->push_task = eager_push_task;
+	component->can_push = eager_can_push;
+	component->deinit_data = eager_deinit_data;
 
 	return component;
 }