ソースを参照

get job & combined_worker pointers out of parallel_eager & parallel_heft

Andra Hugo 12 年 前
コミット
fd8a470cc5

+ 11 - 1
include/starpu_task.h

@@ -329,10 +329,13 @@ int starpu_task_submit_to_ctx(struct starpu_task *task, unsigned sched_ctx_id);
  * indicates that the waited task was either synchronous or detached. */
 int starpu_task_wait(struct starpu_task *task) STARPU_WARN_UNUSED_RESULT;
 
-/* This function waits until all the tasks that were already submitted have
+/* This function waits until all the tasks that were already submitted 
+ * (to the current context or the global one if there aren't any) have
  * been executed. */
 int starpu_task_wait_for_all(void);
 
+/* This function waits until all the tasks that were already submitted to the 
+ * context have been executed */
 int starpu_task_wait_for_all_in_ctx(unsigned sched_ctx_id);
 
 /* This function waits until there is no more ready task. */
@@ -350,6 +353,13 @@ void starpu_display_codelet_stats(struct starpu_codelet *cl);
  * being executed at the moment. */
 struct starpu_task *starpu_task_get_current(void);
 
+/* initialise the barrier for the parallel task, st all workers start it 
+ * at the same time */
+void starpu_init_parallel_task_barrier(struct starpu_task* task, int best_workerid);
+
+/* create task alias to dispatch it to the workers of a combined workers */
+struct starpu_task *starpu_create_task_alias(struct starpu_task *task);
+
 #ifdef __cplusplus
 }
 #endif

+ 0 - 1
include/starpu_worker.h

@@ -123,7 +123,6 @@ int starpu_combined_worker_get_id(void);
 int starpu_combined_worker_get_size(void);
 int starpu_combined_worker_get_rank(void);
 
-
 /* This function returns the type of worker associated to an identifier (as
  * returned by the starpu_worker_get_id function). The returned value indicates
  * the architecture of the worker: STARPU_CPU_WORKER for a CPU core,

+ 0 - 1
src/Makefile.am

@@ -73,7 +73,6 @@ noinst_HEADERS = 						\
 	core/debug.h						\
 	core/errorcheck.h					\
 	core/combined_workers.h					\
-	core/parallel_task.h					\
 	core/simgrid.h						\
 	core/task_bundle.h					\
 	sched_policies/detect_combined_workers.h		\

+ 1 - 0
src/core/combined_workers.c

@@ -162,3 +162,4 @@ int starpu_combined_worker_get_description(int workerid, int *worker_size, int *
 
 	return 0;
 }
+

+ 24 - 1
src/core/parallel_task.c

@@ -19,8 +19,10 @@
 #include <core/jobs.h>
 #include <core/task.h>
 #include <common/utils.h>
+#include <core/workers.h>
+#include <common/barrier.h>
 
-struct starpu_task *_starpu_create_task_alias(struct starpu_task *task)
+struct starpu_task *starpu_create_task_alias(struct starpu_task *task)
 {
 	struct starpu_task *task_dup = (struct starpu_task *) malloc(sizeof(struct starpu_task));
 	STARPU_ASSERT(task_dup);
@@ -31,3 +33,24 @@ struct starpu_task *_starpu_create_task_alias(struct starpu_task *task)
 
 	return task_dup;
 }
+
+void starpu_init_parallel_task_barrier(struct starpu_task* task, int best_workerid)
+{
+	/* The master needs to dispatch the task between the
+	 * different combined workers */
+	struct _starpu_combined_worker *combined_worker =  _starpu_get_combined_worker_struct(best_workerid);
+	int worker_size = combined_worker->worker_size;
+	
+	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
+	j->task_size = worker_size;
+	j->combined_workerid = best_workerid;
+	j->active_task_alias_count = 0;
+	
+	//fprintf(stderr, "POP -> size %d best_size %d\n", worker_size, best_size);
+	
+	_STARPU_PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);
+	_STARPU_PTHREAD_BARRIER_INIT(&j->after_work_barrier, NULL, worker_size);
+
+	return;
+}
+

+ 0 - 24
src/core/parallel_task.h

@@ -1,24 +0,0 @@
-/* StarPU --- Runtime system for heterogeneous multicore architectures.
- *
- * Copyright (C) 2010  Université de Bordeaux 1
- *
- * StarPU is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * StarPU is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
-
-#ifndef __PARALLEL_TASK_H__
-#define __PARALLEL_TASK_H__
-
-#include <starpu.h>
-
-struct starpu_task *_starpu_create_task_alias(struct starpu_task *task);
-
-#endif /* __PARALLEL_TASK_H__ */

+ 34 - 0
src/core/sched_ctx.c

@@ -1102,3 +1102,37 @@ int starpu_sched_ctx_set_max_priority(unsigned sched_ctx_id, int max_prio)
 	sched_ctx->max_priority = max_prio;
 	return 0;
 }
+
+
+/* static void _starpu_sched_ctx_bind_thread_to_ctx_cpus(unsigned sched_ctx_id) */
+/* { */
+/* #ifdef STARPU_HAVE_HWLOC */
+/* 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id); */
+/* 	struct _starpu_machine_config *config = _starpu_get_machine_config(); */
+
+/* 	const struct hwloc_topology_support *support = hwloc_topology_get_support(config->topology.hwtopology); */
+/*         if (support->cpubind->set_thisthread_cpubind) */
+/*         { */
+/* 		hwloc_bitmap_t set = sched_ctx->hwloc_cpu_set; */
+/*                 int ret; */
+
+/*                 ret = hwloc_set_cpubind (config->topology.hwtopology, set, */
+/*                                          HWLOC_CPUBIND_THREAD); */
+/* 		if (ret) */
+/*                 { */
+/*                         perror("binding thread"); */
+/* 			STARPU_ABORT(); */
+/*                 } */
+/* 	} */
+
+/* #else */
+/* #warning no sched ctx CPU binding support */
+/* #endif */
+/* } */
+
+/* int starpu_sched_ctx_exec_parallel_code(void* (*func)(void* param), void* param, unsigned sched_ctx_id) */
+/* {	 */
+	
+/* 	_starpu_sched_ctx_bind_thread_to_ctx_cpus(sched_ctx_id); */
+/* 	func(param); */
+/* } */

+ 1 - 2
src/core/sched_policy.c

@@ -23,7 +23,6 @@
 #include <profiling/profiling.h>
 #include <common/barrier.h>
 #include <core/debug.h>
-#include <core/parallel_task.h>
 
 static int use_prefetch = 0;
 
@@ -284,7 +283,7 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 		int j;
 		for (j = 0; j < worker_size; j++)
 		{
-			struct starpu_task *alias = _starpu_create_task_alias(task);
+			struct starpu_task *alias = starpu_create_task_alias(task);
 
 			worker = _starpu_get_worker_struct(combined_workerid[j]);
 			ret |= _starpu_push_local_task(worker, alias, 0);

+ 9 - 23
src/sched_policies/parallel_eager.c

@@ -15,12 +15,10 @@
  *
  * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  */
-
-#include <core/workers.h>
 #include <sched_policies/fifo_queues.h>
-#include <common/barrier.h>
 #include <sched_policies/detect_combined_workers.h>
-#include <core/parallel_task.h>
+#include <starpu_scheduler.h>
+#include <core/workers.h>
 
 struct _starpu_peager_data
 {
@@ -168,7 +166,7 @@ static int push_task_peager_policy(struct starpu_task *task)
 
         /*if there are no tasks block */
         /* wake people waiting for a task */
-        unsigned worker = 0;
+        int worker = -1;
         struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 
         struct starpu_sched_ctx_iterator it;
@@ -259,27 +257,15 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 		}
 		else
 		{
-			/* The master needs to dispatch the task between the
-			 * different combined workers */
-			struct _starpu_combined_worker *combined_worker;
-			combined_worker = _starpu_get_combined_worker_struct(best_workerid);
-			int worker_size = combined_worker->worker_size;
-			int *combined_workerid = combined_worker->combined_workerid;
-
-			struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
-			j->task_size = worker_size;
-			j->combined_workerid = best_workerid;
-			j->active_task_alias_count = 0;
-
-			//fprintf(stderr, "POP -> size %d best_size %d\n", worker_size, best_size);
-
-			_STARPU_PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);
-			_STARPU_PTHREAD_BARRIER_INIT(&j->after_work_barrier, NULL, worker_size);
+			starpu_init_parallel_task_barrier(task, best_workerid);
+			int worker_size = 0;
+			int *combined_workerid;
+			starpu_combined_worker_get_description(best_workerid, &worker_size, &combined_workerid);
 
 			/* Dispatch task aliases to the different slaves */
 			for (i = 1; i < worker_size; i++)
 			{
-				struct starpu_task *alias = _starpu_create_task_alias(task);
+				struct starpu_task *alias = starpu_create_task_alias(task);
 				int local_worker = combined_workerid[i];
 				
 				starpu_pthread_mutex_t *sched_mutex;
@@ -296,7 +282,7 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 			}
 
 			/* The master also manipulated an alias */
-			struct starpu_task *master_alias = _starpu_create_task_alias(task);
+			struct starpu_task *master_alias = starpu_create_task_alias(task);
 			return master_alias;
 		}
 	}

+ 6 - 16
src/sched_policies/parallel_heft.c

@@ -23,9 +23,7 @@
 #include <core/workers.h>
 #include <core/perfmodel/perfmodel.h>
 #include <starpu_parameters.h>
-#include <common/barrier.h>
 #include <sched_policies/detect_combined_workers.h>
-#include <core/parallel_task.h>
 
 #ifndef DBL_MIN
 #define DBL_MIN __DBL_MIN__
@@ -136,33 +134,25 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	}
 	else
 	{
-		/* This is a combined worker so we create task aliases */
-		struct _starpu_combined_worker *combined_worker;
-		combined_worker = _starpu_get_combined_worker_struct(best_workerid);
-		int worker_size = combined_worker->worker_size;
-		int *combined_workerid = combined_worker->combined_workerid;
-
-		struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
-		j->task_size = worker_size;
-		j->combined_workerid = best_workerid;
-		j->active_task_alias_count = 0;
-
 		/* This task doesn't belong to an actual worker, it belongs
 		 * to a combined worker and thus the scheduler doesn't care
 		 * of its predicted values which are insignificant */
 		task->predicted = 0;
 		task->predicted_transfer = 0;
 
-		_STARPU_PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);
-		_STARPU_PTHREAD_BARRIER_INIT(&j->after_work_barrier, NULL, worker_size);
+		starpu_init_parallel_task_barrier(task, best_workerid);
+		int worker_size = 0;
+		int *combined_workerid;
+		starpu_combined_worker_get_description(best_workerid, &worker_size, &combined_workerid);
 
 		/* All cpu workers must be locked at once */
 		_STARPU_PTHREAD_MUTEX_LOCK(&hd->global_push_mutex);
 
+		/* This is a combined worker so we create task aliases */
 		int i;
 		for (i = 0; i < worker_size; i++)
 		{
-			struct starpu_task *alias = _starpu_create_task_alias(task);
+			struct starpu_task *alias = starpu_create_task_alias(task);
 			int local_worker = combined_workerid[i];
 
 			alias->predicted = exp_end_predicted - worker_exp_end[local_worker];