Bladeren bron

changes from august until now

Andra Hugo 12 jaren geleden
bovenliggende
commit
047ece5254

+ 4 - 1
include/starpu_task.h

@@ -211,6 +211,8 @@ struct starpu_task
 	double flops;
 
 	unsigned already_pushed;
+
+	unsigned scheduled;
 };
 
 /* It is possible to initialize statically allocated tasks with this value.
@@ -241,7 +243,8 @@ struct starpu_task
 	.control_task = 0,				\
 	.hypervisor_tag = 0,				\
 	.flops = 0.0,					\
-		.already_pushed = 0			\
+	.already_pushed = 0,				\
+		.scheduled = 0				\
 };
 
 /*

+ 1 - 0
sched_ctx_hypervisor/include/sched_ctx_hypervisor.h

@@ -29,6 +29,7 @@
 #define HYPERVISOR_NEW_WORKERS_MAX_IDLE -9
 #define HYPERVISOR_TIME_TO_APPLY -10
 #define HYPERVISOR_EMPTY_CTX_MAX_IDLE -11
+#define HYPERVISOR_NULL -12
 
 pthread_mutex_t act_hypervisor_mutex;
 

+ 1 - 1
sched_ctx_hypervisor/src/Makefile.am

@@ -15,7 +15,7 @@
 AM_CFLAGS = -Wall $(STARPU_CUDA_CPPFLAGS) $(STARPU_OPENCL_CPPFLAGS)
 LIBS = $(top_builddir)/src/libstarpu-@STARPU_EFFECTIVE_VERSION@.la
 
-AM_CPPFLAGS = -I$(top_srcdir)/include/ -I$(top_builddir)/include/ -I$(top_builddir)/src/ -I$(top_srcdir)/src/ -I$(top_srcdir)/sched_ctx_hypervisor/include/
+AM_CPPFLAGS = -I$(top_srcdir)/include/ -I$(top_builddir)/include/starpu/$(STARPU_EFFECTIVE_VERSION)/ -I$(top_builddir)/src/ -I$(top_srcdir)/src/ -I$(top_srcdir)/sched_ctx_hypervisor/include/
 
 AM_LDFLAGS = $(STARPU_CUDA_LDFLAGS) $(STARPU_OPENCL_LDFLAGS)
 

+ 18 - 1
sched_ctx_hypervisor/src/hypervisor_policies/idle_policy.c

@@ -16,12 +16,29 @@
 
 #include "policy_tools.h"
 
+unsigned worker_belong_to_other_sched_ctx(unsigned sched_ctx, int worker)
+{
+	int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
+	int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
+
+	int i;
+	for(i = 0; i < nsched_ctxs; i++)
+		if(sched_ctxs[i] != sched_ctx && starpu_worker_belongs_to_sched_ctx(worker, sched_ctxs[i]))
+			return 1;
+	return 0;
+}
+
 void idle_handle_idle_cycle(unsigned sched_ctx, int worker)
 {
 	struct sched_ctx_wrapper* sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctx);
 	struct policy_config *config = sc_w->config;
 	if(config != NULL &&  sc_w->current_idle_time[worker] > config->max_idle[worker])
-		_resize_to_unknown_receiver(sched_ctx, 0);
+	{
+		if(worker_belong_to_other_sched_ctx(sched_ctx, worker))
+			sched_ctx_hypervisor_remove_workers_from_sched_ctx(&worker, 1, sched_ctx, 1);
+		else
+			_resize_to_unknown_receiver(sched_ctx, 0);
+	}
 }
 
 struct hypervisor_policy idle_policy = {

+ 16 - 0
sched_ctx_hypervisor/src/hypervisor_policies/lp2_policy.c

@@ -433,6 +433,12 @@ static unsigned _compute_task_distribution_over_ctxs(int ns, int nw, int nt, dou
 	double tmin = 0.0;
 	double old_tmax = 0.0;
 	unsigned found_sol = 0;
+
+	struct timeval start_time;
+	struct timeval end_time;
+	int nd = 0;
+	gettimeofday(&start_time, NULL);
+
 	/* we fix tmax and we do not treat it as an unknown
 	   we just vary by dichotomy its values*/
 	while(tmax > 1.0)
@@ -478,7 +484,17 @@ static unsigned _compute_task_distribution_over_ctxs(int ns, int nw, int nt, dou
 			tmin = smallest_tmax;
 			tmax = _find_tmax(tmin, tmax);
 		}
+		nd++;
 	}
+	gettimeofday(&end_time, NULL);
+
+	long diff_s = end_time.tv_sec  - start_time.tv_sec;
+        long diff_us = end_time.tv_usec  - start_time.tv_usec;
+
+        float timing = (float)(diff_s*1000000 + diff_us)/1000;
+
+        fprintf(stdout, "nd = %d total time: %f ms \n", nd, timing);
+
 	return found_sol;
 }
 

+ 15 - 0
sched_ctx_hypervisor/src/hypervisor_policies/lp_policy.c

@@ -30,7 +30,22 @@ static void lp_handle_poped_task(unsigned sched_ctx, int worker)
 		{ 
 			int total_nw[2];
 			_get_total_nw(NULL, -1, 2, total_nw);
+
+
+			struct timeval start_time;
+			struct timeval end_time;
+			gettimeofday(&start_time, NULL);
+
 			double vmax = _lp_get_nworkers_per_ctx(nsched_ctxs, 2, nworkers, total_nw);
+			gettimeofday(&end_time, NULL);
+
+			long diff_s = end_time.tv_sec  - start_time.tv_sec;
+			long diff_us = end_time.tv_usec  - start_time.tv_usec;
+
+			float timing = (float)(diff_s*1000000 + diff_us)/1000;
+
+			fprintf(stdout, "total time: %f ms \n", timing);
+
 			if(vmax != 0.0)
 			{
 //				printf("********resize\n");

+ 1 - 1
sched_ctx_hypervisor/src/hypervisor_policies/policy_tools.c

@@ -243,7 +243,6 @@ unsigned _resize(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, unsigne
 	if(ret != EBUSY)
 	{					
 		unsigned nworkers_to_move = _get_nworkers_to_move(sender_sched_ctx);
-
 		if(nworkers_to_move > 0)
 		{
 			unsigned poor_sched_ctx = STARPU_NMAX_SCHED_CTXS;
@@ -264,6 +263,7 @@ unsigned _resize(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, unsigne
 			if(poor_sched_ctx != STARPU_NMAX_SCHED_CTXS)
 			{						
 				int *workers_to_move = _get_first_workers(sender_sched_ctx, &nworkers_to_move, STARPU_ALL);
+				printf("try tot move to %d\n", poor_sched_ctx);
 				sched_ctx_hypervisor_move_workers(sender_sched_ctx, poor_sched_ctx, workers_to_move, nworkers_to_move, now);
 				
 				struct policy_config *new_config = sched_ctx_hypervisor_get_config(poor_sched_ctx);

+ 2 - 2
sched_ctx_hypervisor/src/sched_ctx_config.c

@@ -114,7 +114,7 @@ static struct policy_config* _ioctl(unsigned sched_ctx, va_list varg_list, unsig
 	int *workerids;
 	int nworkers;
 
-	while ((arg_type = va_arg(varg_list, int)) != -1) 
+	while ((arg_type = va_arg(varg_list, int)) != HYPERVISOR_NULL) 
 	{
 		switch(arg_type)
 		{
@@ -207,7 +207,7 @@ void sched_ctx_hypervisor_ioctl(unsigned sched_ctx, ...)
 	int stop = 0;
 	int task_tag = -1;
 
-	while ((arg_type = va_arg(varg_list, int)) != -1) 
+	while ((arg_type = va_arg(varg_list, int)) != HYPERVISOR_NULL) 
 	{
 		switch(arg_type)
 		{

+ 3 - 1
sched_ctx_hypervisor/src/sched_ctx_hypervisor.c

@@ -650,7 +650,9 @@ static void notify_idle_cycle(unsigned sched_ctx, int worker, double idle_time)
 		struct sched_ctx_wrapper *sc_w = &hypervisor.sched_ctx_w[sched_ctx];
 		sc_w->current_idle_time[worker] += idle_time;
 		if(hypervisor.policy.handle_idle_cycle)
-				hypervisor.policy.handle_idle_cycle(sched_ctx, worker);
+		{
+			hypervisor.policy.handle_idle_cycle(sched_ctx, worker);
+		}
 	}		
 	return;
 }

+ 0 - 1
src/core/dependencies/implicit_data_deps.c

@@ -260,7 +260,6 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
 #ifdef STARPU_USE_FXT
 				_starpu_get_job_associated_to_task(new_sync_task)->model_name = "sync_task_redux";
 #endif
-
 				_starpu_add_writer_after_readers(handle, new_sync_task, new_sync_task);
 
 				task = new_sync_task;

+ 11 - 8
src/core/perfmodel/perfmodel.c

@@ -342,6 +342,7 @@ double starpu_task_expected_data_transfer_time(uint32_t memory_node, struct star
 }
 
 /* Return the expected duration of the entire task bundle in µs */
+
 double starpu_task_bundle_expected_length(starpu_task_bundle_t bundle, enum starpu_perf_archtype arch, unsigned nimpl)
 {
 	double expected_length = 0.0;
@@ -354,16 +355,18 @@ double starpu_task_bundle_expected_length(starpu_task_bundle_t bundle, enum star
 
 	while (entry)
 	{
-		double task_length = starpu_task_expected_length(entry->task, arch, nimpl);
-
-		/* In case the task is not calibrated, we consider the task
-		 * ends immediately. */
-		if (task_length > 0.0)
-			expected_length += task_length;
-
+		if(!entry->task->scheduled)
+		{
+			double task_length = starpu_task_expected_length(entry->task, arch, nimpl);
+			
+			/* In case the task is not calibrated, we consider the task
+			 * ends immediately. */
+			if (task_length > 0.0)
+				expected_length += task_length;
+		}
+			
 		entry = entry->next;
 	}
-
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&bundle->mutex);
 
 	return expected_length;

+ 1 - 1
src/core/sched_ctx.c

@@ -657,7 +657,7 @@ unsigned starpu_get_sched_ctx()
 {
 	unsigned *sched_ctx = (unsigned*)pthread_getspecific(sched_ctx_key);
 	if(sched_ctx == NULL)
-		return STARPU_NMAX_SCHED_CTXS;
+		return 0;//STARPU_NMAX_SCHED_CTXS;
 	STARPU_ASSERT(*sched_ctx < STARPU_NMAX_SCHED_CTXS);
 	return *sched_ctx;
 }

+ 5 - 1
src/core/sched_policy.c

@@ -729,7 +729,11 @@ int starpu_push_local_task(int workerid, struct starpu_task *task, int back)
 {
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 
-	return _starpu_push_local_task(worker, task, back);
+	int ret =  _starpu_push_local_task(worker, task, back);
+	
+	task->scheduled = 1;
+
+	return ret;
 }
 
 

+ 25 - 4
src/core/task.c

@@ -92,6 +92,8 @@ void starpu_task_init(struct starpu_task *task)
 	task->flops = 0.0;
 	
 	task->already_pushed = 0;
+
+	task->scheduled = 0;
 }
 
 /* Free all the ressources allocated for a task, without deallocating the task
@@ -417,11 +419,30 @@ int starpu_task_submit(struct starpu_task *task)
 
 		_starpu_detect_implicit_data_deps(task);
 
-		if (task->cl->model)
-			_starpu_load_perfmodel(task->cl->model);
+		if(task->bundle)
+		{
+			struct _starpu_task_bundle_entry *entry;
+			entry = task->bundle->list;
 
-		if (task->cl->power_model)
-			_starpu_load_perfmodel(task->cl->power_model);
+			while(entry)
+			{
+				if (entry->task->cl->model)
+					_starpu_load_perfmodel(entry->task->cl->model);
+				
+				if (entry->task->cl->power_model)
+					_starpu_load_perfmodel(entry->task->cl->power_model);
+
+				entry = entry->next;
+			}
+		}
+		else
+		{
+			if (task->cl->model)
+				_starpu_load_perfmodel(task->cl->model);
+			
+			if (task->cl->power_model)
+				_starpu_load_perfmodel(task->cl->power_model);
+		}
 	}
 
 	/* If profiling is activated, we allocate a structure to store the

+ 1 - 1
src/core/task_bundle.c

@@ -80,7 +80,7 @@ int starpu_task_bundle_insert(starpu_task_bundle_t bundle, struct starpu_task *t
 		item->next = entry;
 	}
 
-	/* Mark the task as belonging the bundle */
+	/* Mark the task as belonging to the bundle */
 	task->bundle = bundle;
 
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&bundle->mutex);

+ 23 - 15
src/sched_policies/heft.c

@@ -290,12 +290,13 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 }
 
 /* TODO: factorize with dmda!! */
+
 static void compute_all_performance_predictions(struct starpu_task *task,
-						double (*local_task_length)[STARPU_MAXIMPLEMENTATIONS], 
-						double (*exp_end)[STARPU_MAXIMPLEMENTATIONS],
+						double local_task_length[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS], 
+						double exp_end[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
 						double *max_exp_endp, double *best_exp_endp,
-						double (*local_data_penalty)[STARPU_MAXIMPLEMENTATIONS],
-						double (*local_power)[STARPU_MAXIMPLEMENTATIONS], 
+						double local_data_penalty[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
+						double local_power[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS], 
 						int *forced_worker, int *forced_impl,
 						starpu_task_bundle_t bundle,
 						unsigned sched_ctx_id)
@@ -322,6 +323,13 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 
 			for (nimpl = 0; nimpl <STARPU_MAXIMPLEMENTATIONS; nimpl++) 
 			{
+				if (!starpu_worker_can_execute_task(worker, task, nimpl))
+				{
+					/* no one on that queue may execute this task */
+//				worker_ctx++;
+					continue;
+				}
+		
 				/* Sometimes workers didn't take the tasks as early as we expected */
 				pthread_mutex_t *sched_mutex;
 				pthread_cond_t *sched_cond;
@@ -332,12 +340,6 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 				if (exp_end[worker_ctx][nimpl] > max_exp_end)
 					max_exp_end = exp_end[worker_ctx][nimpl];
 				_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
-				if (!starpu_worker_can_execute_task(worker, task, nimpl))
-				{
-					/* no one on that queue may execute this task */
-//				worker_ctx++;
-					continue;
-				}
 				
 				enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
 				unsigned memory_node = starpu_worker_get_memory_node(worker);
@@ -358,10 +360,13 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 					double conversion_time = starpu_task_expected_conversion_time(task, perf_arch, nimpl);
 					if (conversion_time > 0.0)
 						local_task_length[worker_ctx][nimpl] += conversion_time;
+
 					//_STARPU_DEBUG("Scheduler heft bundle: task length (%lf) local power (%lf) worker (%u) kernel (%u) \n", local_task_length[worker_ctx],local_power[worker_ctx],worker,nimpl);
 				}
-				
 				double ntasks_end = ntasks[worker] / starpu_worker_get_relative_speedup(perf_arch);
+
+/* 				printf("**********%d/%d: len = %lf penalty = %lf \n", worker, worker_ctx,  */
+/* 				       local_task_length[worker_ctx][nimpl], local_data_penalty[worker_ctx][nimpl]); */
 				
 				if (ntasks_best == -1
 				    || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better worker */
@@ -466,10 +471,10 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
 
 	unsigned nworkers_ctx = workers->nworkers;
-	double local_task_length[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
-	double local_data_penalty[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
-	double local_power[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
-	double exp_end[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
+	double local_task_length[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];
+	double local_data_penalty[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];
+	double local_power[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];
+	double exp_end[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];
 	double max_exp_end = 0.0;
 
 	double best_exp_end;
@@ -504,6 +509,9 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 			push_conversion_tasks(task, forced_worker);
 			prio = 0;
 		}
+		unsigned memory_node = starpu_worker_get_memory_node(forced_worker);
+		double transfer_model_best = starpu_task_expected_data_transfer_time(memory_node, task);
+
 		return push_task_on_best_worker(task, forced_worker, 0.0, 0.0, prio, sched_ctx_id);
 	}