瀏覽代碼

create ctx: min/max negociation

Andra Hugo 13 年之前
父節點
當前提交
d21fa91160
共有 6 個文件被更改,包括 275 次插入29 次删除
  1. 4 0
      include/starpu_scheduler.h
  2. 184 15
      src/core/sched_ctx.c
  3. 13 3
      src/core/sched_ctx.h
  4. 0 11
      src/core/sched_policy.c
  5. 72 0
      src/core/workers.c
  6. 2 0
      src/core/workers.h

+ 4 - 0
include/starpu_scheduler.h

@@ -166,6 +166,10 @@ void starpu_call_pushed_task_cb(int workerid, unsigned sched_ctx_id);
 
 unsigned starpu_create_sched_ctx(const char *policy_name, int *workerids_ctx, int nworkers_ctx, const char *sched_name);
 
+unsigned starpu_create_sched_ctx_inside_interval(const char *policy_name, const char *sched_name, 
+						 int min_ncpus, int max_ncpus, int min_ngpus, int max_ngpus,
+						 unsigned allow_overlap);
+
 void starpu_delete_sched_ctx(unsigned sched_ctx_id, unsigned inheritor_sched_ctx_id);
 
 void starpu_add_workers_to_sched_ctx(int *workerids_ctx, int nworkers_ctx, unsigned sched_ctx);

+ 184 - 15
src/core/sched_ctx.c

@@ -19,7 +19,8 @@
 #include <common/utils.h>
 
 extern struct worker_collection worker_list;
-
+static pthread_mutex_t sched_ctx_manag = PTHREAD_MUTEX_INITIALIZER;
+struct starpu_task stop_submission_task = STARPU_TASK_INITIALIZER;
 pthread_key_t sched_ctx_key;
 unsigned with_hypervisor = 0;
 
@@ -179,19 +180,23 @@ struct _starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 				  const char *sched_name)
 {
 	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
+
+	_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
 	STARPU_ASSERT(config->topology.nsched_ctxs < STARPU_NMAX_SCHED_CTXS);
 
 	unsigned id = _starpu_get_first_free_sched_ctx(config);
 
 	struct _starpu_sched_ctx *sched_ctx = &config->sched_ctxs[id];
 	sched_ctx->id = id;
+
+	config->topology.nsched_ctxs++;	
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
+
 	int nworkers = config->topology.nworkers;
 	
 	STARPU_ASSERT(nworkers_ctx <= nworkers);
   
 	_STARPU_PTHREAD_MUTEX_INIT(&sched_ctx->changing_ctx_mutex, NULL);
- 	_STARPU_PTHREAD_MUTEX_INIT(&sched_ctx->no_workers_mutex, NULL);
-	_STARPU_PTHREAD_COND_INIT(&sched_ctx->no_workers_cond, NULL);
 	_STARPU_PTHREAD_MUTEX_INIT(&sched_ctx->empty_ctx_mutex, NULL);
 
 	starpu_task_list_init(&sched_ctx->empty_ctx_tasks);
@@ -217,7 +222,6 @@ struct _starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 	/* after having an worker_collection on the ressources add them */
 	_starpu_add_workers_to_sched_ctx(sched_ctx, workerids, nworkers_ctx, NULL, NULL);
 
-	config->topology.nsched_ctxs++;	
 
 	/* if we create the initial big sched ctx we can update workers' status here
 	   because they haven't been launched yet */
@@ -235,10 +239,157 @@ struct _starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 	return sched_ctx;
 }
 
+static void _get_workers(int min, int max, int *workers, int *nw, enum starpu_archtype arch, unsigned allow_overlap)
+{
+	int pus[max];
+	int npus = 0; 
+	int i;
+	int n = 0;
+		
+	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
+	if(config->topology.nsched_ctxs == 1)
+	{
+		/*we have all available resources */
+		npus = starpu_worker_get_nids_by_type(arch, pus, max);
+/*TODO: hierarchical ctxs: get max good workers: close one to another */
+		for(i = 0; i < npus; i++)
+			workers[(*nw)++] = pus[i];
+	}
+	else
+	{
+		unsigned enough_ressources = 0;
+		npus = starpu_worker_get_available_ids_by_type(arch, pus, max);
+       
+		for(i = 0; i < npus; i++)
+			workers[(*nw)++] = pus[i];
+		
+		if(npus == max)
+			/*we have enough available resources */
+			enough_ressources = 1;
+
+		if(!enough_ressources && npus >= min)
+			/*we have enough available resources */
+			enough_ressources = 1;
+
+		if(!enough_ressources)
+		{
+			/* try to get ressources from ctx who have more than the min of workers they need */
+			int s;
+                        for(s = 1; s < STARPU_NMAX_SCHED_CTXS; s++)
+                        {
+				if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
+				{
+					int _npus = 0;
+					int _pus[STARPU_NMAXWORKERS];
+					_npus = starpu_get_workers_of_sched_ctx(config->sched_ctxs[s].id, _pus, arch);
+					int ctx_min = arch == STARPU_CPU_WORKER ? config->sched_ctxs[s].min_ncpus : config->sched_ctxs[s].min_ngpus;
+					if(_npus > ctx_min)
+					{
+                                                if(npus < min)
+						{
+							n = (_npus - ctx_min) > (min - npus) ? min - npus : (_npus - ctx_min);
+							npus += n;
+						}
+/*TODO: hierarchical ctxs: get n good workers: close to the other ones I already assigned to the ctx */
+						for(i = 0; i < n; i++)
+							workers[(*nw)++] = _pus[i];
+						starpu_remove_workers_from_sched_ctx(_pus, n, config->sched_ctxs[s].id);
+					}
+
+				}
+			}
+			if(npus >= min)
+				enough_ressources = 1;
+
+		}
+		
+		if(!enough_ressources)
+		{
+			/* if there is no available workers to satisfy the  minimum required 
+			 give them workers proportional to their requirements*/
+			int global_npus = starpu_worker_get_count_by_type(arch);
+			
+			int req_npus = 0;
+
+			int s;
+			for(s = 1; s < STARPU_NMAX_SCHED_CTXS; s++)
+				if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
+					req_npus += arch == STARPU_CPU_WORKER ? config->sched_ctxs[s].min_ncpus : config->sched_ctxs[s].min_ngpus;
+
+			req_npus += min;
+			
+			for(s = 1; s < STARPU_NMAX_SCHED_CTXS; s++)
+			{
+				if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
+				{
+					int ctx_min = arch == STARPU_CPU_WORKER ? config->sched_ctxs[s].min_ncpus : config->sched_ctxs[s].min_ngpus;
+					double needed_npus = ((double)ctx_min * (double)global_npus) / (double)req_npus;
+
+					int _npus = 0;
+					int _pus[STARPU_NMAXWORKERS];
+				
+					_npus = starpu_get_workers_of_sched_ctx(config->sched_ctxs[s].id, _pus, arch);					
+					if(needed_npus < (double)_npus)
+					{
+						double npus_to_rem = (double)_npus - needed_npus;
+						int x = floor(npus_to_rem);
+						double x_double = (double)x;
+						double diff = npus_to_rem - x_double;
+						int npus_to_remove = diff >= 0.5 ? x+1 : x;
+
+						int pus_to_remove[npus_to_remove];
+						int c = 0;
+						
+/*TODO: hierarchical ctxs: get npus_to_remove good workers: close to the other ones I already assigned to the ctx */
+						for(i = _npus-1; i >= (_npus - npus_to_remove); i--)
+						{
+							workers[(*nw)++] = _pus[i];
+							pus_to_remove[c++] = _pus[i];
+						}
+						if(!allow_overlap)
+							starpu_remove_workers_from_sched_ctx(pus_to_remove, npus_to_remove, config->sched_ctxs[s].id);
+					}
+
+				}
+			}
+		}
+	}
+}
+
+unsigned starpu_create_sched_ctx_inside_interval(const char *policy_name, const char *sched_name, 
+						 int min_ncpus, int max_ncpus, int min_ngpus, int max_ngpus,
+						 unsigned allow_overlap)
+{
+	struct _starpu_sched_ctx *sched_ctx = NULL;
+	int workers[max_ncpus + max_ngpus];
+	int nw = 0;
+	_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
+	_get_workers(min_ncpus, max_ncpus, workers, &nw, STARPU_CPU_WORKER, allow_overlap);
+	_get_workers(min_ngpus, max_ngpus, workers, &nw, STARPU_CUDA_WORKER, allow_overlap);
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
+	int i;
+	printf("%d: ", nw);
+	for(i = 0; i < nw; i++)
+		printf("%d ", workers[i]);
+	printf("\n");
+	sched_ctx = _starpu_create_sched_ctx(policy_name, workers, nw, 0, sched_name);
+	sched_ctx->min_ncpus = min_ncpus;
+	sched_ctx->max_ncpus = max_ncpus;
+	sched_ctx->min_ngpus = min_ngpus;
+	sched_ctx->max_ngpus = max_ngpus;
+	
+	_starpu_update_workers(sched_ctx->workers->workerids, sched_ctx->workers->nworkers, sched_ctx->id);
+#ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
+	sched_ctx->perf_counters = NULL;
+#endif
+	return sched_ctx->id;
+	
+}
 unsigned starpu_create_sched_ctx(const char *policy_name, int *workerids, 
-			    int nworkers_ctx, const char *sched_name)
+				 int nworkers, const char *sched_name)
 {
-	struct _starpu_sched_ctx *sched_ctx = _starpu_create_sched_ctx(policy_name, workerids, nworkers_ctx, 0, sched_name);
+	struct _starpu_sched_ctx *sched_ctx = NULL;
+	sched_ctx = _starpu_create_sched_ctx(policy_name, workerids, nworkers, 0, sched_name);
 
 	_starpu_update_workers(sched_ctx->workers->workerids, sched_ctx->workers->nworkers, sched_ctx->id);
 #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
@@ -270,12 +421,12 @@ static void _starpu_delete_sched_ctx(struct _starpu_sched_ctx *sched_ctx)
 
 	_STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->changing_ctx_mutex);
 	_STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->empty_ctx_mutex);
-	_STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->no_workers_mutex);
-	_STARPU_PTHREAD_COND_DESTROY(&sched_ctx->no_workers_cond);
 
 	struct _starpu_machine_config *config = _starpu_get_machine_config();
+	_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
 	config->topology.nsched_ctxs--;
 	sched_ctx->id = STARPU_NMAX_SCHED_CTXS;
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
 }
 
 void starpu_delete_sched_ctx(unsigned sched_ctx_id, unsigned inheritor_sched_ctx_id)
@@ -350,13 +501,6 @@ void starpu_add_workers_to_sched_ctx(int *workers_to_add, int nworkers_to_add, u
 
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
 
-/* 	if(n_added_workers > 0) */
-/* 	{ */
-/* 		_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->no_workers_mutex); */
-/* 		_STARPU_PTHREAD_COND_BROADCAST(&sched_ctx->no_workers_cond); */
-/* 		_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->no_workers_mutex); */
-/* 	} */
-
 	unsigned unlocked = 0;
 	_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
 	while(!starpu_task_list_empty(&sched_ctx->empty_ctx_tasks))
@@ -615,6 +759,31 @@ struct worker_collection* starpu_get_worker_collection_of_sched_ctx(unsigned sch
 	return sched_ctx->workers;
 }
 
+int starpu_get_workers_of_sched_ctx(unsigned sched_ctx_id, int *pus, enum starpu_archtype arch)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+
+	struct worker_collection *workers = sched_ctx->workers;
+        int worker;
+
+	int npus = 0;
+
+        if(workers->init_cursor)
+                workers->init_cursor(workers);
+
+        while(workers->has_next(workers))
+        {
+                worker = workers->get_next(workers);
+		enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
+		if(curr_arch == arch)
+			pus[npus++] = worker;
+	}
+
+        if(workers->init_cursor)
+                workers->deinit_cursor(workers);
+	return npus;
+}
+
 pthread_mutex_t* starpu_get_changing_ctx_mutex(unsigned sched_ctx_id)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);

+ 13 - 3
src/core/sched_ctx.h

@@ -67,7 +67,19 @@ struct _starpu_sched_ctx {
 	struct starpu_task_list empty_ctx_tasks;
 
 	/* mutext protecting empty_ctx_tasks list */
-	pthread_mutex_t empty_ctx_mutex;
+	pthread_mutex_t empty_ctx_mutex; 
+
+	/* min CPUs to execute*/
+	int min_ncpus;
+
+	/* max CPUs to execute*/
+	int max_ncpus;
+
+	/* min GPUs to execute*/
+	int min_ngpus;
+
+	/* max GPUs to execute*/	
+	int max_ngpus;
 
 #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
 	/* a structure containing a series of performance counters determining the resize procedure */
@@ -77,8 +89,6 @@ struct _starpu_sched_ctx {
 
 struct _starpu_machine_config;
 
-struct starpu_task stop_submission_task;
-
 /* init sched_ctx_id of all contextes*/
 void _starpu_init_all_sched_ctxs(struct _starpu_machine_config *config);
 

+ 0 - 11
src/core/sched_policy.c

@@ -342,21 +342,10 @@ int _starpu_push_task(struct _starpu_job *j)
 		
 		if(nworkers == 0)
 		{
-/* 			if(workerid == -1) */
-/* 			{ */
-/* 				_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->no_workers_mutex); */
-/* 				_STARPU_PTHREAD_COND_WAIT(&sched_ctx->no_workers_cond, &sched_ctx->no_workers_mutex); */
-/* 				_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->no_workers_mutex); */
-/* 				nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx); */
-/* 				if(nworkers == 0) return _starpu_push_task(j); */
-/* 			} */
-/* 			else */
-/* 			{ */
 				_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
 				starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
 				_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
 				return 0;
-//			}
 		}
 	}
 

+ 72 - 0
src/core/workers.c

@@ -899,6 +899,78 @@ int starpu_worker_get_ids_by_type(enum starpu_archtype type, int *workerids, int
 	return cnt;
 }
 
+
+int starpu_worker_get_nids_by_type(enum starpu_archtype type, int *workerids, int maxsize)
+{
+	unsigned nworkers = starpu_worker_get_count();
+
+	int cnt = 0;
+
+	unsigned id;
+	for (id = 0; id < nworkers; id++)
+	{
+		if (starpu_worker_get_type(id) == type)
+		{
+			/* Perhaps the array is too small ? */
+			if (cnt >= maxsize)
+				return cnt;
+
+			workerids[cnt++] = id;
+		}
+	}
+
+	return cnt;
+}
+
+
+int starpu_worker_get_available_ids_by_type(enum starpu_archtype type, int *workerids, int maxsize)
+{
+	unsigned nworkers = starpu_worker_get_count();
+
+	int cnt = 0;
+
+	unsigned id, worker;
+	unsigned found = 0;
+	for (id = 0; id < nworkers; id++)
+	{
+		found = 0;
+		if (starpu_worker_get_type(id) == type)
+		{
+			/* Perhaps the array is too small ? */
+			if (cnt >= maxsize)
+				return cnt;
+			int s;
+			for(s = 1; s < STARPU_NMAX_SCHED_CTXS; s++)
+			{
+				if(config.sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
+				{
+					struct worker_collection *workers = config.sched_ctxs[s].workers;
+					if(workers->init_cursor)
+						workers->init_cursor(workers);
+					
+					while(workers->has_next(workers))
+					{
+						worker = workers->get_next(workers);
+						if(worker == id)
+						{
+							found = 1;
+							break;
+						}
+					}
+					
+					if(workers->init_cursor)
+						workers->deinit_cursor(workers);
+					if(found) break;
+				}
+			}
+			if(!found)
+				workerids[cnt++] = id;
+		}
+	}
+
+	return cnt;
+}
+
 void starpu_worker_get_name(int id, char *dst, size_t maxlen)
 {
 	char *name = config.workers[id].name;

+ 2 - 0
src/core/workers.h

@@ -234,5 +234,7 @@ unsigned _starpu_execute_registered_progression_hooks(void);
 /* We keep an initial sched ctx which might be used in case no other ctx is available */
 struct _starpu_sched_ctx* _starpu_get_initial_sched_ctx(void);
 
+int starpu_worker_get_nids_by_type(enum starpu_archtype type, int *workerids, int maxsize);
+
 #endif // __WORKERS_H__