Browse Source

Indicate whether a worker shares tasks lists with other workers (if yes if we remove it from the context we should do it right away, the other workers are in charge of the remaining tasks)

Andra Hugo 12 years ago
parent
commit
6e52308e70

+ 1 - 0
include/starpu_scheduler.h

@@ -76,6 +76,7 @@ double starpu_task_bundle_expected_length(starpu_task_bundle_t bundle, enum star
 double starpu_task_bundle_expected_data_transfer_time(starpu_task_bundle_t bundle, unsigned memory_node);
 double starpu_task_bundle_expected_power(starpu_task_bundle_t bundle, enum starpu_perfmodel_archtype arch, unsigned nimpl);
 
+void starpu_sched_ctx_worker_shares_tasks_lists(int workerid, int sched_ctx_id);
 #ifdef __cplusplus
 }
 #endif

+ 2 - 1
sc_hypervisor/src/sc_hypervisor.c

@@ -502,7 +502,7 @@ void sc_hypervisor_move_workers(unsigned sender_sched_ctx, unsigned receiver_sch
 				}
 
 				hypervisor.resize[sender_sched_ctx] = 0;
-
+				if(imposed_resize)  imposed_resize = 0;
 				starpu_pthread_mutex_unlock(&hypervisor.sched_ctx_w[sender_sched_ctx].mutex);
 			}
 		}
@@ -589,6 +589,7 @@ void sc_hypervisor_remove_workers_from_sched_ctx(int* workers_to_remove, unsigne
 				}
 
 				hypervisor.resize[sched_ctx] = 0;
+				if(imposed_resize)  imposed_resize = 0;
 				starpu_pthread_mutex_unlock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
 			}
 		}

+ 12 - 0
src/core/jobs.c

@@ -294,6 +294,18 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 	_starpu_decrement_nready_tasks();
 
 	_starpu_decrement_nsubmitted_tasks_of_sched_ctx(sched_ctx);
+
+	struct _starpu_worker *worker;
+	worker = _starpu_get_local_worker_key();
+	STARPU_ASSERT(worker);
+	STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
+
+	if(worker->removed_from_ctx[sched_ctx] == 1 && worker->shares_tasks_lists[sched_ctx] == 1)
+	{
+		_starpu_worker_gets_out_of_ctx(sched_ctx, worker);
+		worker->removed_from_ctx[sched_ctx] = 0;
+	}
+	STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 }
 
 /* This function is called when a new task is submitted to StarPU

+ 8 - 0
src/core/sched_ctx.c

@@ -128,6 +128,14 @@ void starpu_sched_ctx_stop_task_submission()
 	_starpu_task_submit_internally(&stop_submission_task);
 }
 
+void starpu_sched_ctx_worker_shares_tasks_lists(int workerid, int sched_ctx_id)
+{
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
+	worker->shares_tasks_lists[sched_ctx_id] = 1;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
+}
+
 static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx, int *workerids, int nworkers,
 				       int *added_workers, int *n_added_workers)
 {

+ 4 - 0
src/core/workers.c

@@ -468,7 +468,11 @@ static void _starpu_launch_drivers(struct _starpu_machine_config *pconfig)
 
 		int ctx;
 		for(ctx = 0; ctx < STARPU_NMAX_SCHED_CTXS; ctx++)
+		{
 			workerarg->removed_from_ctx[ctx] = 0;
+			workerarg->shares_tasks_lists[ctx] = 0;
+		}
+
 
 		STARPU_PTHREAD_MUTEX_INIT(&workerarg->sched_mutex, NULL);
 		STARPU_PTHREAD_COND_INIT(&workerarg->sched_cond, NULL);

+ 3 - 0
src/core/workers.h

@@ -105,6 +105,9 @@ struct _starpu_worker
 	   parallel sections to be executed on their allocated resources */
 	unsigned parallel_sect;
 
+	/* indicate whether the workers shares tasks lists with other workers*/
+	/* in this case when removing him from a context it disapears instantly */
+	unsigned shares_tasks_lists[STARPU_NMAX_SCHED_CTXS];
 #ifdef __GLIBC__
 	cpu_set_t cpu_set;
 #endif /* __GLIBC__ */

+ 14 - 2
src/sched_policies/eager_central_policy.c

@@ -130,17 +130,29 @@ static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 	VALGRIND_HG_MUTEX_UNLOCK_POST(&data->policy_mutex);
 
 	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
-	 task = _starpu_fifo_pop_task(data->fifo, workerid);
+	task = _starpu_fifo_pop_task(data->fifo, workerid);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
 	return task;
 }
 
+static void eager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
+{
+
+	int workerid;
+	unsigned i;
+	for (i = 0; i < nworkers; i++)
+	{
+		workerid = workerids[i];
+		starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
+	}
+}
+
 struct starpu_sched_policy _starpu_sched_eager_policy =
 {
 	.init_sched = initialize_eager_center_policy,
 	.deinit_sched = deinitialize_eager_center_policy,
-	.add_workers = NULL,
+	.add_workers = eager_add_workers,
 	.remove_workers = NULL,
 	.push_task = push_task_eager_policy,
 	.pop_task = pop_task_eager_policy,