Browse Source

get rid of the remove/add task (replace it with a flag) + fix a delete_ctx bug (data was freed too early)

Andra Hugo 12 years ago
parent
commit
8aa86f5b27

+ 69 - 73
src/core/sched_ctx.c

@@ -35,48 +35,30 @@ static unsigned _starpu_worker_get_sched_ctx_id(struct _starpu_worker *worker, u
 
 static void _get_workers_list(struct starpu_sched_ctx_worker_collection *workers, int **workerids);
 
-static void change_worker_sched_ctx(unsigned sched_ctx_id)
+static void _starpu_worker_gets_into_ctx(unsigned sched_ctx_id, struct _starpu_worker *worker)
 {
-	int workerid = starpu_worker_get_id();
-	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-
-	int worker_sched_ctx_id = _starpu_worker_get_sched_ctx_id(worker, sched_ctx_id);
-	/* if the worker is not in the ctx's list it means the update concerns the addition of ctxs*/
-	if(worker_sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
-	{
-		worker_sched_ctx_id = _starpu_worker_get_first_free_sched_ctx(worker);
-		struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-		/* add context to worker */
-		worker->sched_ctx[worker_sched_ctx_id] = sched_ctx;
-		worker->nsched_ctxs++;
-		worker->active_ctx = sched_ctx_id;
-	}
-	else
-	{
-		/* remove context from worker */
-		if(worker->sched_ctx[worker_sched_ctx_id]->sched_policy && worker->sched_ctx[worker_sched_ctx_id]->sched_policy->remove_workers)
-			worker->sched_ctx[worker_sched_ctx_id]->sched_policy->remove_workers(sched_ctx_id, &worker->workerid, 1);
-		worker->sched_ctx[worker_sched_ctx_id] = NULL;
-		worker->nsched_ctxs--;
-		starpu_set_turn_to_other_ctx(worker->workerid, sched_ctx_id);
-	}
+	unsigned worker_sched_ctx_id = _starpu_worker_get_first_free_sched_ctx(worker);
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	/* add context to worker */
+	worker->sched_ctx[worker_sched_ctx_id] = sched_ctx;
+	worker->nsched_ctxs++;
+	worker->active_ctx = sched_ctx_id;
+	return;
 }
 
-static void update_workers_func(void *buffers[] __attribute__ ((unused)), void *_args)
+void _starpu_worker_gets_out_of_ctx(unsigned sched_ctx_id, struct _starpu_worker *worker)
 {
-	unsigned sched_ctx_id = (uintptr_t)_args;
-	change_worker_sched_ctx(sched_ctx_id);
+	unsigned worker_sched_ctx_id = _starpu_worker_get_sched_ctx_id(worker, sched_ctx_id);
+	/* remove context from worker */
+	if(worker->sched_ctx[worker_sched_ctx_id]->sched_policy && worker->sched_ctx[worker_sched_ctx_id]->sched_policy->remove_workers)
+		worker->sched_ctx[worker_sched_ctx_id]->sched_policy->remove_workers(sched_ctx_id, &worker->workerid, 1);
+	worker->sched_ctx[worker_sched_ctx_id] = NULL;
+	worker->nsched_ctxs--;
+	starpu_set_turn_to_other_ctx(worker->workerid, sched_ctx_id);
+	return;
 }
 
-struct starpu_codelet sched_ctx_info_cl = {
-	.where = STARPU_CPU|STARPU_CUDA|STARPU_OPENCL,
-	.cuda_func = update_workers_func,
-	.cpu_func = update_workers_func,
-	.opencl_func = update_workers_func,
-	.nbuffers = 0
-};
-
-static void _starpu_update_workers(int *workerids, int nworkers, int sched_ctx_id)
+static void _starpu_update_workers(int *workerids, int nworkers, int sched_ctx_id, unsigned now)
 {
 	int i;
 	struct _starpu_worker *worker[nworkers];
@@ -86,31 +68,49 @@ static void _starpu_update_workers(int *workerids, int nworkers, int sched_ctx_i
 	{
 		worker[i] = _starpu_get_worker_struct(workerids[i]);
 
-		/* if the current thread requires resize it's no need
-		   to send itsefl a message in order to change its
-		   sched_ctx info */
-		if(curr_worker && curr_worker == worker[i])
-			change_worker_sched_ctx(sched_ctx_id);
-		else
+
+		unsigned worker_sched_ctx_id = _starpu_worker_get_sched_ctx_id(worker[i], sched_ctx_id);
+		/* if the worker is not in the ctx's list it means the update concerns the addition of ctxs*/
+		if(worker_sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
 		{
-			worker[i]->tasks[sched_ctx_id] = starpu_task_create();
-			worker[i]->tasks[sched_ctx_id]->cl = &sched_ctx_info_cl;
-			worker[i]->tasks[sched_ctx_id]->cl_arg = (void*)(uintptr_t)sched_ctx_id;
-			worker[i]->tasks[sched_ctx_id]->execute_on_a_specific_worker = 1;
-			worker[i]->tasks[sched_ctx_id]->workerid = workerids[i];
-			worker[i]->tasks[sched_ctx_id]->destroy = 1;
-//			worker[i]->tasks[sched_ctx_id]->sched_ctx = sched_ctx_id;
-
-			int worker_sched_ctx_id = _starpu_worker_get_sched_ctx_id(worker[i], sched_ctx_id);
-			/* if the ctx is not in the worker's list it means the update concerns the addition of ctxs*/
-			if(worker_sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
-				worker[i]->tasks[sched_ctx_id]->priority = 1;
-
-			_starpu_exclude_task_from_dag(worker[i]->tasks[sched_ctx_id]);
-
-			_starpu_task_submit_internally(worker[i]->tasks[sched_ctx_id]);
+			/* if the current thread requires resize it's no need
+			   to lock it in order to change its  sched_ctx info */
+			if((curr_worker && curr_worker == worker[i]) || now)
+				_starpu_worker_gets_into_ctx(sched_ctx_id, worker[i]);
+			else
+			{
+				_STARPU_PTHREAD_MUTEX_LOCK(&worker[i]->sched_mutex);
+				_starpu_worker_gets_into_ctx(sched_ctx_id, worker[i]);
+				_STARPU_PTHREAD_MUTEX_UNLOCK(&worker[i]->sched_mutex);
+			}
+		}
+		else /*remove from context */
+		{
+			if(now)
+			{
+				if(curr_worker && curr_worker == worker[i])
+					_starpu_worker_gets_out_of_ctx(sched_ctx_id, worker[i]);
+				else
+				{
+					_STARPU_PTHREAD_MUTEX_LOCK(&worker[i]->sched_mutex);
+					_starpu_worker_gets_out_of_ctx(sched_ctx_id, worker[i]);
+					_STARPU_PTHREAD_MUTEX_UNLOCK(&worker[i]->sched_mutex);
+				}
+			}
+			else
+			{
+				if(curr_worker && curr_worker == worker[i])
+					worker[i]->removed_from_ctx[sched_ctx_id] = 1;
+				else
+				{
+					_STARPU_PTHREAD_MUTEX_LOCK(&worker[i]->sched_mutex);
+					worker[i]->removed_from_ctx[sched_ctx_id] = 1;
+					_STARPU_PTHREAD_MUTEX_UNLOCK(&worker[i]->sched_mutex);
+				}
+			}
 		}
 	}
+	return;
 }
 
 void starpu_stop_task_submission()
@@ -137,9 +137,7 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 		{
 			int worker = workers->add(workers, (workerids == NULL ? i : workerids[i]));
 			if(worker >= 0)
-			{
 				added_workers[(*n_added_workers)++] = worker;
-			}
 		}
 		else
 		{
@@ -405,7 +403,7 @@ unsigned starpu_sched_ctx_create_inside_interval(const char *policy_name, const
 	sched_ctx->min_ngpus = min_ngpus;
 	sched_ctx->max_ngpus = max_ngpus;
 
-	_starpu_update_workers(sched_ctx->workers->workerids, sched_ctx->workers->nworkers, sched_ctx->id);
+	_starpu_update_workers(sched_ctx->workers->workerids, sched_ctx->workers->nworkers, sched_ctx->id, 0);
 #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
 	sched_ctx->perf_counters = NULL;
 #endif
@@ -418,7 +416,7 @@ unsigned starpu_sched_ctx_create(const char *policy_name, int *workerids,
 	struct _starpu_sched_ctx *sched_ctx = NULL;
 	sched_ctx = _starpu_create_sched_ctx(policy_name, workerids, nworkers, 0, sched_name);
 
-	_starpu_update_workers(sched_ctx->workers->workerids, sched_ctx->workers->nworkers, sched_ctx->id);
+	_starpu_update_workers(sched_ctx->workers->workerids, sched_ctx->workers->nworkers, sched_ctx->id, 0);
 #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
 	sched_ctx->perf_counters = NULL;
 #endif
@@ -473,17 +471,16 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 	{
 		_STARPU_PTHREAD_MUTEX_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
 		starpu_sched_ctx_add_workers(workerids, nworkers_ctx, inheritor_sched_ctx_id);
-
 	}
 	else
 		_STARPU_PTHREAD_MUTEX_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
 
-	if(!_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id) && !_starpu_wait_for_all_tasks_of_sched_ctx(0))
+	if(!_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id))
 	{
 		_STARPU_PTHREAD_MUTEX_LOCK(&changing_ctx_mutex[sched_ctx_id]);
 		/*if btw the mutex release & the mutex lock the context has changed take care to free all
 		  scheduling data before deleting the context */
-		_starpu_update_workers(workerids, nworkers_ctx, sched_ctx_id);
+		_starpu_update_workers(workerids, nworkers_ctx, sched_ctx_id, 1);
 		_starpu_sched_ctx_free_scheduling_data(sched_ctx);
 		_starpu_delete_sched_ctx(sched_ctx);
 
@@ -569,17 +566,16 @@ void starpu_sched_ctx_add_workers(int *workers_to_add, int nworkers_to_add, unsi
 	/* if the context has not already been deleted */
 	if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 	{
-
 		_starpu_add_workers_to_sched_ctx(sched_ctx, workers_to_add, nworkers_to_add, added_workers, &n_added_workers);
-
+		
 		if(n_added_workers > 0)
-		{
-			_starpu_update_workers(added_workers, n_added_workers, sched_ctx->id);
-		}
+			_starpu_update_workers(added_workers, n_added_workers, sched_ctx->id, 0);
+
+		_starpu_fetch_tasks_from_empty_ctx_list(sched_ctx);
 	}
+
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
 
-	_starpu_fetch_tasks_from_empty_ctx_list(sched_ctx);
 
 	return;
 }
@@ -599,7 +595,7 @@ void starpu_sched_ctx_remove_workers(int *workers_to_remove, int nworkers_to_rem
 		_starpu_remove_workers_from_sched_ctx(sched_ctx, workers_to_remove, nworkers_to_remove, removed_workers, &n_removed_workers);
 
 		if(n_removed_workers > 0)
-			_starpu_update_workers(removed_workers, n_removed_workers, sched_ctx->id);
+			_starpu_update_workers(removed_workers, n_removed_workers, sched_ctx->id, 0);
 
 	}
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
@@ -927,7 +923,7 @@ void starpu_set_turn_to_other_ctx(int workerid, unsigned sched_ctx_id)
 		}
 	}
 
-	if(worker->active_ctx != sched_ctx_id)
+	if(active_sched_ctx != NULL && worker->active_ctx != sched_ctx_id)
 	{
 		_starpu_fetch_tasks_from_empty_ctx_list(active_sched_ctx);
 	}

+ 4 - 0
src/core/sched_ctx.h

@@ -131,6 +131,10 @@ _starpu_pthread_mutex_t *_starpu_get_sched_mutex(struct _starpu_sched_ctx *sched
  take care: no mutex taken, the list of workers might not be updated */
 int starpu_get_workers_of_sched_ctx(unsigned sched_ctx_id, int *pus, enum starpu_archtype arch);
 
+/* Let the worker know it does not belong to the context and that
+   it should stop poping from it */
+void _starpu_worker_gets_out_of_ctx(unsigned sched_ctx_id, struct _starpu_worker *worker);
+
 #if defined(_MSC_VER) || defined(STARPU_SIMGRID)
 _starpu_pthread_mutex_t* starpu_get_changing_ctx_mutex(unsigned sched_ctx_id);
 #endif

+ 7 - 2
src/core/sched_policy.c

@@ -572,11 +572,16 @@ pick:
 
 			if(sched_ctx != NULL && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 			{
-
 				if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)
 					task = sched_ctx->sched_policy->pop_task(sched_ctx->id);
 			}
 
+			if(!task && worker->removed_from_ctx[sched_ctx->id])
+			{
+				_starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
+				worker->removed_from_ctx[sched_ctx->id] = 0;
+			}
+
 			if((!task && sched_ctx->pop_counter[worker->workerid] == 0 && been_here[sched_ctx->id]) || worker->nsched_ctxs == 1)
 				break;
 
@@ -596,7 +601,7 @@ pick:
 	for(j = 0; j < STARPU_NMAX_SCHED_CTXS; j++)
 	{
 		sched_ctx = worker->sched_ctx[j];
-		if(sched_ctx != NULL && sched_ctx->id != 0)
+		if(sched_ctx != NULL && sched_ctx->id != 0 && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 		{
 			perf_counters = sched_ctx->perf_counters;
 			if(perf_counters != NULL && perf_counters->notify_idle_cycle && perf_counters->notify_idle_end)

+ 4 - 0
src/core/workers.c

@@ -405,6 +405,10 @@ static void _starpu_launch_drivers(struct _starpu_machine_config *config)
 		workerarg->run_by_starpu = 1;
 		workerarg->worker_is_running = 0;
 		workerarg->worker_is_initialized = 0;
+		
+		int ctx;
+		for(ctx = 0; ctx < STARPU_NMAX_SCHED_CTXS; ctx++)
+			workerarg->removed_from_ctx[ctx] = 0;
 
 		_STARPU_PTHREAD_MUTEX_INIT(&workerarg->sched_mutex, NULL);
 		_STARPU_PTHREAD_COND_INIT(&workerarg->sched_cond, NULL);

+ 2 - 1
src/core/workers.h

@@ -87,7 +87,6 @@ struct _starpu_worker
 	struct _starpu_sched_ctx **sched_ctx;
 	unsigned nsched_ctxs; /* the no of contexts a worker belongs to*/
 	struct _starpu_barrier_counter tasks_barrier; /* wait for the tasks submitted */
-	struct starpu_task *tasks[STARPU_NMAX_SCHED_CTXS];
        
 	unsigned has_prev_init; /* had already been inited in another ctx */
 
@@ -95,6 +94,8 @@ struct _starpu_worker
 	 used for overlapping ctx in order to determine on which 
 	ctx the worker is allowed to pop */
 	unsigned active_ctx;
+
+	unsigned removed_from_ctx[STARPU_NMAX_SCHED_CTXS];
 #ifdef __GLIBC__
 	cpu_set_t initial_cpu_set;
 	cpu_set_t current_cpu_set;

+ 2 - 2
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -466,7 +466,7 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 
 	/* we should now have the best worker in variable "best" */
 	return push_task_on_best_worker(task, best,
-									model_best, transfer_model_best, prio, sched_ctx_id);
+					model_best, transfer_model_best, prio, sched_ctx_id);
 }
 
 static void compute_all_performance_predictions(struct starpu_task *task,
@@ -790,7 +790,7 @@ static void dmda_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nwo
 		workerid = workerids[i];
 		/* if the worker has alreadry belonged to this context
 		   the queue and the synchronization variables have been already initialized */
-		if(dt->queue_array[workerid] ==NULL)
+		if(dt->queue_array[workerid] == NULL)
 			dt->queue_array[workerid] = _starpu_create_fifo();
 	}
 }