ソースを参照

new mutex policy for pushing and poping task
a rwlock for protecting acces from application during modification
a mutex by worker for protecting from pops and push from workers (It may have a problem here)
new privet header that should be renamed for exporting some function across modules

Simon Archipoff 12 年 前
コミット
d303d1f7a0
共有5 個のファイルを変更した57 個の追加11 個の削除を含む
  1. 2 2
      include/starpu_sched_node.h
  2. 1 0
      src/Makefile.am
  3. 11 6
      src/sched_policies/node_sched.c
  4. 36 3
      src/sched_policies/node_worker.c
  5. 7 0
      src/sched_policies/sched_node.h

+ 2 - 2
include/starpu_sched_node.h

@@ -1,5 +1,5 @@
-#ifndef __SCHED_NODE_H__
-#define __SCHED_NODE_H__
+#ifndef __STARPU_SCHED_NODE_H__
+#define __STARPU_SCHED_NODE_H__
 #include <starpu.h>
 #include <common/starpu_spinlock.h>
 

+ 1 - 0
src/Makefile.am

@@ -134,6 +134,7 @@ noinst_HEADERS = 						\
 	top/starpu_top_connection.h				\
 	top/starpu_top_core.h					\
 	sched_policies/prio_deque.h				\
+	sched_policies/sched_node.h				\
 	sched_policies/node_composed.h
 #	sched_policies/scheduler_maker.h			
 

+ 11 - 6
src/sched_policies/node_sched.c

@@ -2,6 +2,7 @@
 #include <core/workers.h>
 #include <starpu_sched_node.h>
 #include <starpu_thread_util.h>
+#include "sched_node.h"
 #include <float.h>
 
 double starpu_sched_compute_expected_time(double now, double predicted_end, double predicted_length, double predicted_transfer)
@@ -84,10 +85,12 @@ void starpu_sched_tree_add_workers(unsigned sched_ctx_id, int *workerids, unsign
 {
 	struct starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&t->lock);
+	_starpu_sched_node_lock_all_workers();
 	unsigned i;
 	for(i = 0; i < nworkers; i++)
 		starpu_bitmap_set(t->workers, workerids[i]);
 	starpu_sched_tree_update_workers_in_ctx(t);
+	_starpu_sched_node_unlock_all_workers();
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&t->lock);
 }
 
@@ -95,10 +98,12 @@ void starpu_sched_tree_remove_workers(unsigned sched_ctx_id, int *workerids, uns
 {
 	struct starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&t->lock);
+	_starpu_sched_node_lock_all_workers();
 	unsigned i;
 	for(i = 0; i < nworkers; i++)
 		starpu_bitmap_unset(t->workers, workerids[i]);
 	starpu_sched_tree_update_workers_in_ctx(t);
+	_starpu_sched_node_unlock_all_workers();
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&t->lock);
 }
 
@@ -201,19 +206,20 @@ int starpu_sched_tree_push_task(struct starpu_task * task)
 {
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct starpu_sched_tree *tree = starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	STARPU_PTHREAD_RWLOCK_RDLOCK(&tree->lock);
+	int workerid = starpu_worker_get_id();
+	if(-1 == workerid)
+		STARPU_PTHREAD_RWLOCK_RDLOCK(&tree->lock);
 	int ret_val = tree->root->push_task(tree->root,task);
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&tree->lock);
+	if(-1 == workerid)
+		STARPU_PTHREAD_RWLOCK_UNLOCK(&tree->lock);
 	return ret_val;
 }
 struct starpu_task * starpu_sched_tree_pop_task(unsigned sched_ctx_id)
 {
 	struct starpu_sched_tree *tree = starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	STARPU_PTHREAD_RWLOCK_RDLOCK(&tree->lock);
 	int workerid = starpu_worker_get_id();
 	struct starpu_sched_node * node = starpu_sched_node_worker_get(workerid);
 	struct starpu_task * task = node->pop_task(node, sched_ctx_id);
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&tree->lock);
 	return task;
 }
 /*
@@ -340,8 +346,7 @@ static double estimated_transfer_length(struct starpu_sched_node * node, struct
 	double sum = 0.0;
 	int nb = 0, i = 0;
 	for(i = 0; i < node->nchilds; i++)
-	{
-		struct starpu_sched_node * c = node->childs[i];
+	{		struct starpu_sched_node * c = node->childs[i];
 		if(starpu_sched_node_can_execute_task(c, task))
 		{
 			sum += c->estimated_transfer_length(c, task);

+ 36 - 3
src/sched_policies/node_worker.c

@@ -74,8 +74,15 @@ struct _starpu_worker_task_list
 
 struct _starpu_worker_node_data
 {
-	struct _starpu_worker * worker;
-	struct _starpu_combined_worker * combined_worker;
+	union 
+	{
+		struct
+		{
+			struct _starpu_worker * worker;
+			starpu_pthread_mutex_t lock;
+		};	
+		struct _starpu_combined_worker * combined_worker;
+	};
 	struct _starpu_worker_task_list * list;
 };
 
@@ -310,11 +317,12 @@ struct starpu_task * starpu_sched_node_worker_pop_task(struct starpu_sched_node
 		starpu_push_task_end(task);
 		return task;
 	}
-
+	STARPU_PTHREAD_MUTEX_LOCK(&data->lock);
 	struct starpu_sched_node *father = node->fathers[sched_ctx_id];
 	if(father == NULL)
 		return NULL;
 	task = father->pop_task(father,sched_ctx_id);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data->lock);
 	if(!task)
 		return NULL;
 	if(task->cl->type == STARPU_SPMD)
@@ -348,6 +356,28 @@ void starpu_sched_node_worker_destroy(struct starpu_sched_node *node)
 	_worker_nodes[id] = NULL;
 }
 
+void _starpu_sched_node_lock_all_workers(void)
+{
+	unsigned i;
+	for(i = 0; i < starpu_worker_get_count(); i++)
+	{
+		struct _starpu_worker_node_data * data = starpu_sched_node_worker_create(i)->data;
+		STARPU_PTHREAD_MUTEX_LOCK(&data->lock);
+	}
+}
+void _starpu_sched_node_unlock_all_workers(void)
+{
+	unsigned i;
+	for(i = 0; i < starpu_worker_get_count(); i++)
+	{
+		struct _starpu_worker_node_data * data = starpu_sched_node_worker_create(i)->data;
+		STARPU_PTHREAD_MUTEX_UNLOCK(&data->lock);
+	}
+}
+
+
+
+
 static void simple_worker_available(struct starpu_sched_node * worker_node)
 {
 	(void) worker_node;
@@ -564,9 +594,12 @@ static struct starpu_sched_node * starpu_sched_node_worker_create(int workerid)
 	struct starpu_sched_node * node = starpu_sched_node_create();
 	struct _starpu_worker_node_data * data = malloc(sizeof(*data));
 	memset(data, 0, sizeof(*data));
+
 	data->worker = worker;
+	STARPU_PTHREAD_MUTEX_INIT(&data->lock,NULL);
 	data->list = _starpu_worker_task_list_create();
 	node->data = data;
+
 	node->push_task = starpu_sched_node_worker_push_task;
 	node->pop_task = starpu_sched_node_worker_pop_task;
 	node->estimated_end = simple_worker_estimated_end;

+ 7 - 0
src/sched_policies/sched_node.h

@@ -0,0 +1,7 @@
+#ifndef __SCHED_NODE_H__
+#define __SCHED_NODE_H__
+
+void _starpu_sched_node_lock_all_workers(void);
+void _starpu_sched_node_unlock_all_workers(void);
+
+#endif