Browse Source

fixing mem corruption

Andra Hugo 10 years ago
parent
commit
d4e55d69c9

+ 1 - 0
examples/sched_ctx/sched_ctx.c

@@ -152,6 +152,7 @@ int main(int argc, char **argv)
 	/* wait for all tasks at the end*/
 	starpu_task_wait_for_all();
 
+	starpu_sched_ctx_add_workers(procs1, nprocs2, sched_ctx2);
 	starpu_sched_ctx_delete(sched_ctx1);
 	starpu_sched_ctx_delete(sched_ctx2);
 	printf("tasks executed %d out of %d\n", tasks_executed, ntasks/2);

+ 1 - 1
sc_hypervisor/src/hypervisor_policies/feft_lp_policy.c

@@ -320,7 +320,7 @@ static void feft_lp_handle_idle_cycle(unsigned sched_ctx, int worker)
 		int ret = starpu_pthread_mutex_trylock(&act_hypervisor_mutex);
 		if(ret != EBUSY)
 		{
-//			printf("trigger idle \n");
+			printf("trigger idle \n");
 			_resize_leaves(worker);
 			starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
 		}

+ 6 - 3
sc_hypervisor/src/sc_hypervisor.c

@@ -382,9 +382,12 @@ void sc_hypervisor_unregister_ctx(unsigned sched_ctx)
 	int *pus;
 	unsigned npus = starpu_sched_ctx_get_workers_list(sched_ctx, &pus);
 
-	starpu_sched_ctx_set_priority(pus, npus, father, 1);
-	starpu_sched_ctx_set_priority_on_level(pus, npus, father, 1);
-	free(pus);
+	if(npus)
+	{
+		starpu_sched_ctx_set_priority(pus, npus, father, 1);
+		starpu_sched_ctx_set_priority_on_level(pus, npus, father, 1);
+		free(pus);
+	}
 
 	unsigned i;
 	for(i = 0; i < hypervisor.nsched_ctxs; i++)

+ 48 - 10
src/core/sched_ctx.c

@@ -227,10 +227,36 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 
 	if(ndevices > 0)
 	{
+
 		if(sched_ctx->perf_arch.devices == NULL)
 			sched_ctx->perf_arch.devices = (struct starpu_perfmodel_device*)malloc(ndevices*sizeof(struct starpu_perfmodel_device));
 		else
-			sched_ctx->perf_arch.devices = (struct starpu_perfmodel_device*)realloc(sched_ctx->perf_arch.devices, (sched_ctx->perf_arch.ndevices+ndevices)*sizeof(struct starpu_perfmodel_device));
+		{
+			int nfinal_devices = 0;
+			int dev1, dev2;
+			unsigned found = 0;
+			for(dev1 = 0; dev1 < ndevices; dev1++)
+			{
+				for(dev2 = 0; dev2 < sched_ctx->perf_arch.ndevices; dev2++)
+				{
+					if(sched_ctx->perf_arch.devices[dev2].type == devices[dev1].type && sched_ctx->perf_arch.devices[dev2].devid == devices[dev1].devid)
+						found = 1;
+				}
+				
+				if(!found)
+				{
+					nfinal_devices++;
+				}
+				else
+					found = 0;
+				
+			}
+
+
+			int nsize =  (sched_ctx->perf_arch.ndevices+nfinal_devices);
+			sched_ctx->perf_arch.devices  = (struct starpu_perfmodel_device*)realloc(sched_ctx->perf_arch.devices, nsize*sizeof(struct starpu_perfmodel_device));
+			
+		}
 
 		int dev1, dev2;
 		unsigned found = 0;
@@ -240,7 +266,9 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 			{
 				if(sched_ctx->perf_arch.devices[dev2].type == devices[dev1].type && sched_ctx->perf_arch.devices[dev2].devid == devices[dev1].devid)
 				{
-					sched_ctx->perf_arch.devices[dev2].ncores += devices[dev1].ncores;
+					if(sched_ctx->perf_arch.devices[dev2].type == STARPU_CPU_WORKER)
+						sched_ctx->perf_arch.devices[dev2].ncores += devices[dev1].ncores;
+				     
 					found = 1;
 				}
 			}
@@ -320,7 +348,6 @@ static void _starpu_remove_workers_from_sched_ctx(struct _starpu_sched_ctx *sche
 	{
 		struct _starpu_worker *str_worker = _starpu_get_worker_struct(removed_workers[i]);
 		int dev1, dev2;
-		unsigned found = 0;
 		for(dev1 = 0; dev1 < sched_ctx->perf_arch.ndevices; dev1++)
 		{
 			for(dev2 = 0; dev2 < str_worker->perf_arch.ndevices; dev2++)
@@ -335,12 +362,9 @@ static void _starpu_remove_workers_from_sched_ctx(struct _starpu_sched_ctx *sche
 						devices[ndevices].type = sched_ctx->perf_arch.devices[dev1].type;
 						devices[ndevices].devid = sched_ctx->perf_arch.devices[dev1].devid;
 						ndevices++;
-						found = 1;
 					}
 				}
 			}
-			if(found)
-				found = 0;
 		}
 
 	}
@@ -368,11 +392,24 @@ static void _starpu_remove_workers_from_sched_ctx(struct _starpu_sched_ctx *sche
 			struct _starpu_worker *str_worker = _starpu_get_worker_struct(worker);
 			for(dev = 0; dev < str_worker->perf_arch.ndevices; dev++)
 			{
-				devices[ndevices].type = str_worker->perf_arch.devices[dev].type;
-				devices[ndevices].devid = str_worker->perf_arch.devices[dev].devid;
-				devices[ndevices].ncores = str_worker->perf_arch.devices[dev].ncores;
-				ndevices++;
+				int dev2;
+				for(dev2 = 0; dev2 < ndevices; dev2++)
+				{
+					if(devices[dev2].type == str_worker->perf_arch.devices[dev].type &&
+					   devices[dev2].devid == str_worker->perf_arch.devices[dev].devid)
+						found = 1;
+				}
+				if(!found)
+				{
+					devices[ndevices].type = str_worker->perf_arch.devices[dev].type;
+					devices[ndevices].devid = str_worker->perf_arch.devices[dev].devid;
+					devices[ndevices].ncores = str_worker->perf_arch.devices[dev].ncores;
+					ndevices++;
+				}
+				else 
+					found = 0;
 			}
+			found = 0;
 		}
 		else
 			found = 0;
@@ -1390,6 +1427,7 @@ unsigned starpu_sched_ctx_get_workers_list(unsigned sched_ctx_id, int **workerid
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	struct starpu_worker_collection *workers = sched_ctx->workers;
+	if(!workers) return 0;
 	*workerids = (int*)malloc(workers->nworkers*sizeof(int));
 	int worker;
 	unsigned nworkers = 0;

+ 5 - 2
src/drivers/driver_common/driver_common.c

@@ -76,7 +76,8 @@ void _starpu_driver_start_job(struct _starpu_worker *worker, struct _starpu_job
 
 	// Find out if the worker is the master of a parallel context
 	struct _starpu_sched_ctx *sched_ctx = _starpu_sched_ctx_get_sched_ctx_for_worker_and_job(worker, j);
-	STARPU_ASSERT_MSG(sched_ctx != NULL, "there should be a worker %d in the ctx of this job \n", worker->workerid);
+	if(!sched_ctx)
+		sched_ctx = _starpu_get_sched_ctx_struct(j->task->sched_ctx);
 	if(!sched_ctx->sched_policy)
 	{
 		if(!sched_ctx->awake_workers && sched_ctx->main_master == worker->workerid)
@@ -115,7 +116,9 @@ void _starpu_driver_end_job(struct _starpu_worker *worker, struct _starpu_job *j
 
 	// Find out if the worker is the master of a parallel context
 	struct _starpu_sched_ctx *sched_ctx = _starpu_sched_ctx_get_sched_ctx_for_worker_and_job(worker, j);
-	STARPU_ASSERT_MSG(sched_ctx != NULL, "there should be a worker %d in the ctx of this job \n", worker->workerid);
+	unsigned worker_left_ctx = 0;
+	if(!sched_ctx)
+		sched_ctx = _starpu_get_sched_ctx_struct(j->task->sched_ctx);
 
 	if (!sched_ctx->sched_policy)
 	{

+ 7 - 7
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -307,7 +307,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 
         /* Sometimes workers didn't take the tasks as early as we expected */
-	fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
+	fifo->exp_start = isnan(fifo->exp_start) ? starpu_timing_now() : STARPU_MAX(fifo->exp_start, starpu_timing_now());
 	fifo->exp_end = fifo->exp_start + fifo->exp_len;
 
 	if ((starpu_timing_now() + predicted_transfer) < fifo->exp_end)
@@ -420,7 +420,7 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 		struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(worker, sched_ctx_id);
 
 		/* Sometimes workers didn't take the tasks as early as we expected */
-		double exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
+		double exp_start = isnan(fifo->exp_start) ? starpu_timing_now() : STARPU_MAX(fifo->exp_start, starpu_timing_now());
 
 		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 		{
@@ -559,8 +559,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 		unsigned memory_node = starpu_worker_get_memory_node(worker);
 
 		/* Sometimes workers didn't take the tasks as early as we expected */
-		double exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
-
+		double exp_start = isnan(fifo->exp_start) ? starpu_timing_now() : STARPU_MAX(fifo->exp_start, starpu_timing_now());
 		for(nimpl  = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 	 	{
 			if (!starpu_worker_can_execute_task(worker, task, nimpl))
@@ -582,6 +581,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 				local_task_length[worker_ctx][nimpl] = starpu_task_bundle_expected_length(bundle, perf_arch, nimpl);
 				local_data_penalty[worker_ctx][nimpl] = starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
 				local_power[worker_ctx][nimpl] = starpu_task_bundle_expected_power(bundle, perf_arch,nimpl);
+
 			}
 			else
 			{
@@ -750,12 +750,12 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 					selected_impl = nimpl;
 
 					//_STARPU_DEBUG("best fitness (worker %d) %e = alpha*(%e) + beta(%e) +gamma(%e)\n", worker, best_fitness, exp_end[worker][nimpl] - best_exp_end, local_data_penalty[worker][nimpl], local_power[worker][nimpl]);
+
 				}
 			}
 			worker_ctx++;
 		}
 	}
-
 	STARPU_ASSERT(forced_best != -1 || best != -1);
 
 	if (forced_best != -1)
@@ -783,7 +783,7 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 
 	//_STARPU_DEBUG("Scheduler dmda: kernel (%u)\n", best_impl);
 	starpu_task_set_implementation(task, selected_impl);
-
+	
 	/* we should now have the best worker in variable "best" */
 	return push_task_on_best_worker(task, best, model_best, transfer_model_best, prio, sched_ctx_id);
 }
@@ -958,7 +958,7 @@ static void dmda_push_task_notify(struct starpu_task *task, int workerid, int pe
 	/* Update the predictions */
 	STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 	/* Sometimes workers didn't take the tasks as early as we expected */
-	fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
+	fifo->exp_start = isnan(fifo->exp_start) ? starpu_timing_now() : STARPU_MAX(fifo->exp_start, starpu_timing_now());
 	fifo->exp_end = fifo->exp_start + fifo->exp_len;
 
 	/* If there is no prediction available, we consider the task has a null length */