瀏覽代碼

We do not store a jobq per worker anymore

Cédric Augonnet 14 年之前
父節點
當前提交
ee37af1443

+ 0 - 1
src/Makefile.am

@@ -130,7 +130,6 @@ libstarpu_la_SOURCES = 						\
 	core/dependencies/task_deps.c				\
 	core/dependencies/htable.c				\
 	core/dependencies/data_concurrency.c			\
-	core/mechanisms/queues.c				\
 	core/mechanisms/stack_queues.c				\
 	core/mechanisms/deque_queues.c				\
 	core/mechanisms/priority_queues.c			\

+ 0 - 8
src/core/mechanisms/deque_queues.c

@@ -19,14 +19,6 @@
 #include <errno.h>
 #include <common/utils.h>
 
-void _starpu_init_deque_queues_mechanisms(void)
-{
-}
-
-void _starpu_deinit_deque_queues_mechanisms(void)
-{
-}
-
 struct starpu_jobq_s *_starpu_create_deque(void)
 {
 	struct starpu_jobq_s *jobq;

+ 2 - 3
src/core/mechanisms/deque_queues.h

@@ -17,6 +17,8 @@
 #ifndef __DEQUE_QUEUES_H__
 #define __DEQUE_QUEUES_H__
 
+#include <starpu.h>
+#include <common/config.h>
 #include <core/mechanisms/queues.h>
 
 struct starpu_deque_jobq_s {
@@ -43,9 +45,6 @@ int _starpu_deque_push_prio_task(struct starpu_jobq_s *q, starpu_job_t task);
 
 starpu_job_t _starpu_deque_pop_task(struct starpu_jobq_s *q);
 
-void _starpu_init_deque_queues_mechanisms(void);
-void _starpu_deinit_deque_queues_mechanisms(void);
-
 unsigned _starpu_get_deque_njobs(struct starpu_jobq_s *q);
 unsigned _starpu_get_deque_nprocessed(struct starpu_jobq_s *q);
 

+ 2 - 0
src/core/mechanisms/fifo_queues.h

@@ -17,6 +17,8 @@
 #ifndef __FIFO_QUEUES_H__
 #define __FIFO_QUEUES_H__
 
+#include <starpu.h>
+#include <common/config.h>
 #include <core/mechanisms/queues.h>
 
 struct starpu_fifo_jobq_s {

+ 0 - 61
src/core/mechanisms/queues.c

@@ -1,61 +0,0 @@
-/*
- * StarPU
- * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
-
-#include "queues.h"
-#include <common/utils.h>
-
-/*
- * There can be various queue designs
- * 	- trivial single list
- * 	- cilk-like 
- * 	- hierarchical (marcel-like)
- */
-
-void _starpu_setup_queues(void (*init_queue_design)(void),
-		  struct starpu_jobq_s *(*func_init_queue)(void), 
-		  struct starpu_machine_config_s *config) 
-{
-	unsigned worker;
-
-	if (init_queue_design)
-		init_queue_design();
-
-	for (worker = 0; worker < config->nworkers; worker++)
-	{
-		struct  starpu_worker_s *workerarg = &config->workers[worker];
-		
-		if (func_init_queue)
-			workerarg->jobq = func_init_queue();
-	}
-}
-
-void _starpu_deinit_queues(void (*deinit_queue_design)(void),
-		  void (*func_deinit_queue)(struct starpu_jobq_s *q), 
-		  struct starpu_machine_config_s *config)
-{
-	unsigned worker;
-
-	for (worker = 0; worker < config->nworkers; worker++)
-	{
-		struct  starpu_worker_s *workerarg = &config->workers[worker];
-		
-		if (func_deinit_queue)
-			func_deinit_queue(workerarg->jobq);
-	}
-
-	if (deinit_queue_design)
-		deinit_queue_design();
-}

+ 0 - 14
src/core/mechanisms/queues.h

@@ -25,20 +25,6 @@
 struct starpu_jobq_s {
 	/* a pointer to some queue structure */
 	void *queue; 
-
-//	/* in case workers are blocked on the queue, signaling on that 
-//	  condition must unblock them, even if there is no available task */
-//	pthread_cond_t activity_cond;
-//	pthread_mutex_t activity_mutex;
 };
 
-struct starpu_machine_config_s;
-
-void _starpu_setup_queues(void (*init_queue_design)(void),
-                  struct starpu_jobq_s *(*func_init_queue)(void),
-                  struct starpu_machine_config_s *config);
-void _starpu_deinit_queues(void (*deinit_queue_design)(void),
-		  void (*func_deinit_queue)(struct starpu_jobq_s *q), 
-		  struct starpu_machine_config_s *config);
-
 #endif // __QUEUES_H__

+ 1 - 1
src/core/mechanisms/stack_queues.c

@@ -14,7 +14,7 @@
  * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  */
 
-#include <pthread.h>
+#include <starpu.h>
 #include <core/mechanisms/stack_queues.h>
 #include <errno.h>
 #include <common/utils.h>

+ 2 - 0
src/core/mechanisms/stack_queues.h

@@ -17,6 +17,8 @@
 #ifndef __STACK_QUEUES_H__
 #define __STACK_QUEUES_H__
 
+#include <starpu.h>
+#include <common/config.h>
 #include <core/mechanisms/queues.h>
 
 struct starpu_stack_jobq_s {

+ 14 - 19
src/core/policies/deque_modeling_policy.c

@@ -165,36 +165,31 @@ static int dm_push_task(starpu_job_t j)
 	return _dm_push_task(j, 0);
 }
 
-static struct starpu_jobq_s *init_dm_fifo(void)
+static void init_dm_fifo(void)
 {
-	struct starpu_jobq_s *q;
-
-	q = _starpu_create_fifo();
-
-	int workerid = nworkers++;
-
-	queue_array[workerid] = q;
-
-	PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
-	PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
-
-	starpu_worker_set_sched_condition(workerid, &sched_cond[workerid], &sched_mutex[workerid]);
-
-	return q;
 }
 
 static void initialize_dm_policy(struct starpu_machine_config_s *config, 
 	 __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
 {
-	nworkers = 0;
-
-	_starpu_setup_queues(NULL, init_dm_fifo, config);
+	int workerid;
+	for (workerid = 0; workerid < config->nworkers; workerid++)
+	{
+		queue_array[workerid] = _starpu_create_fifo();
+	
+		PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
+		PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
+	
+		starpu_worker_set_sched_condition(workerid, &sched_cond[workerid], &sched_mutex[workerid]);
+	}
 }
 
 static void deinitialize_dm_policy(struct starpu_machine_config_s *config, 
 	 __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
 {
-	_starpu_deinit_queues(NULL, _starpu_destroy_fifo, config);
+	int worker;
+	for (worker = 0; worker < config->nworkers; worker++)
+		_starpu_destroy_fifo(queue_array[worker]);
 }
 
 struct starpu_sched_policy_s _starpu_sched_dm_policy = {

+ 14 - 21
src/core/policies/deque_modeling_policy_data_aware.c

@@ -197,28 +197,10 @@ static int dmda_push_task(starpu_job_t j)
 	return _dmda_push_task(j, 0);
 }
 
-static struct starpu_jobq_s *init_dmda_fifo(void)
-{
-	struct starpu_jobq_s *q;
-
-	q = _starpu_create_fifo();
-
-	int workerid = nworkers++;
-
-	queue_array[workerid] = q;
-
-	PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
-	PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
-
-	starpu_worker_set_sched_condition(workerid, &sched_cond[workerid], &sched_mutex[workerid]);
-
-	return q;
-}
-
 static void initialize_dmda_policy(struct starpu_machine_config_s *config, 
 	 __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
 {
-	nworkers = 0;
+	nworkers = config->nworkers;
 
 	const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
 	if (strval_alpha)
@@ -228,13 +210,24 @@ static void initialize_dmda_policy(struct starpu_machine_config_s *config,
 	if (strval_beta)
 		beta = atof(strval_beta);
 
-	_starpu_setup_queues(NULL, init_dmda_fifo, config);
+	unsigned workerid;
+	for (workerid = 0; workerid < nworkers; workerid++)
+	{
+		queue_array[workerid] = _starpu_create_fifo();
+	
+		PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
+		PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
+	
+		starpu_worker_set_sched_condition(workerid, &sched_cond[workerid], &sched_mutex[workerid]);
+	}
 }
 
 static void deinitialize_dmda_policy(struct starpu_machine_config_s *config, 
 	 __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
 {
-	_starpu_deinit_queues(NULL, _starpu_destroy_fifo, config);
+	unsigned workerid;
+	for (workerid = 0; workerid < config->nworkers; workerid++)
+		_starpu_destroy_fifo(queue_array[workerid]);
 }
 
 struct starpu_sched_policy_s _starpu_sched_dmda_policy = {

+ 3 - 3
src/core/policies/eager_central_policy.c

@@ -36,12 +36,12 @@ static void initialize_eager_center_policy(struct starpu_machine_config_s *confi
 	PTHREAD_MUTEX_INIT(&sched_mutex, NULL);
 	PTHREAD_COND_INIT(&sched_cond, NULL);
 
-	int workerid;
-	for (workerid = 0; workerid < STARPU_NMAXWORKERS; workerid++)
+	unsigned workerid;
+	for (workerid = 0; workerid < config->nworkers; workerid++)
 		starpu_worker_set_sched_condition(workerid, &sched_cond, &sched_mutex);
 }
 
-static void deinitialize_eager_center_policy(struct starpu_machine_config_s *config, 
+static void deinitialize_eager_center_policy(__attribute__ ((unused)) struct starpu_machine_config_s *config, 
 		   __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
 {
 	/* TODO check that there is no task left in the queue */

+ 3 - 14
src/core/policies/no_prio_policy.c

@@ -27,7 +27,8 @@ static struct starpu_jobq_s *jobq;
 static pthread_cond_t sched_cond;
 static pthread_mutex_t sched_mutex;
 
-static void init_no_prio_design(void)
+static void initialize_no_prio_policy(struct starpu_machine_config_s *config, 
+	   __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
 {
 	/* there is only a single queue in that trivial design */
 	jobq = _starpu_create_fifo();
@@ -36,22 +37,10 @@ static void init_no_prio_design(void)
 	PTHREAD_COND_INIT(&sched_cond, NULL);
 
 	int workerid;
-	for (workerid = 0; workerid < STARPU_NMAXWORKERS; workerid++)
+	for (workerid = 0; workerid < config->nworkers; workerid++)
 		starpu_worker_set_sched_condition(workerid, &sched_cond, &sched_mutex);
 }
 
-static struct starpu_jobq_s *func_init_central_queue(void)
-{
-	/* once again, this is trivial */
-	return jobq;
-}
-
-static void initialize_no_prio_policy(struct starpu_machine_config_s *config, 
-	   __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
-{
-	_starpu_setup_queues(init_no_prio_design, func_init_central_queue, config);
-}
-
 static int push_task_no_prio_policy(starpu_job_t task)
 {
         return _starpu_fifo_push_task(jobq, &sched_mutex, &sched_cond, task);

+ 12 - 21
src/core/policies/random_policy.c

@@ -83,32 +83,23 @@ static int random_push_task(starpu_job_t task)
 	return _random_push_task(task, 0);
 }
 
-static struct starpu_jobq_s *init_random_fifo(void)
-{
-	struct starpu_jobq_s *q;
-
-	q = _starpu_create_fifo();
-
-	int workerid = nworkers++;
-
-	queue_array[workerid] = q;
-
-	PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
-	PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
-
-	starpu_worker_set_sched_condition(workerid, &sched_cond[workerid], &sched_mutex[workerid]);
-
-	return q;
-}
-
 static void initialize_random_policy(struct starpu_machine_config_s *config, 
 	 __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
 {
-	nworkers = 0;
-
 	starpu_srand48(time(NULL));
 
-	_starpu_setup_queues(NULL, init_random_fifo, config);
+	nworkers = config->nworkers;
+
+	int workerid;
+	for (workerid = 0; workerid < nworkers; workerid++)
+	{
+		queue_array[workerid] = _starpu_create_fifo();
+	
+		PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
+		PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
+	
+		starpu_worker_set_sched_condition(workerid, &sched_cond[workerid], &sched_mutex[workerid]);
+	}
 }
 
 struct starpu_sched_policy_s _starpu_sched_random_policy = {

+ 0 - 1
src/core/policies/random_policy.h

@@ -18,7 +18,6 @@
 #define __RANDOM_POLICY_H__
 
 #include <core/workers.h>
-#include <core/mechanisms/queues.h>
 #include <core/mechanisms/fifo_queues.h>
 
 extern struct starpu_sched_policy_s _starpu_sched_random_policy;

+ 0 - 1
src/core/policies/sched_policy.c

@@ -19,7 +19,6 @@
 #include <starpu.h>
 #include <common/config.h>
 #include <common/utils.h>
-#include <core/mechanisms/queues.h>
 #include <core/policies/sched_policy.h>
 #include <core/policies/no_prio_policy.h>
 #include <core/policies/eager_central_policy.h>

+ 1 - 5
src/core/policies/sched_policy.h

@@ -18,12 +18,8 @@
 #define __SCHED_POLICY_H__
 
 #include <starpu.h>
-#include <core/mechanisms/queues.h>
-//#include <core/mechanisms/work_stealing_queues.h>
-//#include <core/mechanisms/central_queues.h>
-//#include <core/mechanisms/central_queues_priorities.h>
-
 #include <core/workers.h>
+#include <core/mechanisms/queues.h>
 
 struct starpu_machine_config_s;
 

+ 5 - 15
src/core/policies/work_stealing_policy.c

@@ -188,21 +188,10 @@ int ws_push_task(starpu_job_t task)
         return 0;
 }
 
-static struct starpu_jobq_s *init_ws_deque(void)
-{
-	struct starpu_jobq_s *q;
-
-	q = _starpu_create_deque();
-
-	queue_array[nworkers++] = q;
-
-	return q;
-}
-
 static void initialize_ws_policy(struct starpu_machine_config_s *config, 
 				__attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
 {
-	nworkers = 0;
+	nworkers = config->nworkers;
 	rr_worker = 0;
 
 	//machineconfig = config;
@@ -211,10 +200,11 @@ static void initialize_ws_policy(struct starpu_machine_config_s *config,
 	PTHREAD_COND_INIT(&global_sched_cond, NULL);
 
 	int workerid;
-	for (workerid = 0; workerid < STARPU_NMAXWORKERS; workerid++)
+	for (workerid = 0; workerid < nworkers; workerid++)
+	{
+		queue_array[workerid] = _starpu_create_deque();
 		starpu_worker_set_sched_condition(workerid, &global_sched_cond, &global_sched_mutex);
-
-	_starpu_setup_queues(_starpu_init_deque_queues_mechanisms, init_ws_deque, config);
+	}
 }
 
 struct starpu_sched_policy_s _starpu_sched_ws_policy = {

+ 0 - 2
src/core/workers.h

@@ -66,8 +66,6 @@ struct starpu_worker_s {
 	int workerid; /* uniquely identify the worker among all processing units types */
         pthread_cond_t ready_cond; /* indicate when the worker is ready */
 	unsigned memory_node; /* which memory node is associated that worker to ? */
-	/* TODO remove */
-	struct starpu_jobq_s *jobq; /* in which queue will that worker get/put tasks ? */
 	pthread_cond_t *sched_cond; /* condition variable used when the worker waits for tasks. */
 	pthread_mutex_t *sched_mutex; /* mutex protecting sched_cond */
 	struct starpu_job_list_s *local_jobs; /* this queue contains tasks that have been explicitely submitted to that queue */