|
@@ -6,9 +6,102 @@ static struct _starpu_barrier_counter_t workers_barrier[STARPU_NMAX_SCHED_CTXS];
|
|
|
static unsigned _starpu_get_first_available_sched_ctx_id(struct starpu_machine_config_s *config);
|
|
|
static unsigned _starpu_get_first_free_sched_ctx_in_worker_list(struct starpu_worker_s *worker);
|
|
|
static void _starpu_rearange_sched_ctx_workerids(struct starpu_sched_ctx *sched_ctx, int old_nworkerids_in_ctx);
|
|
|
-int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_ctx, int *workerids_in_ctx, unsigned sched_ctx_id);
|
|
|
|
|
|
-unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx,
|
|
|
+struct sched_ctx_info {
|
|
|
+ unsigned sched_ctx_id;
|
|
|
+ struct starpu_sched_ctx *sched_ctx;
|
|
|
+ struct starpu_worker_s *worker;
|
|
|
+};
|
|
|
+
|
|
|
+static void update_workers_func(void *buffers[] __attribute__ ((unused)), void *_args)
|
|
|
+{
|
|
|
+ struct sched_ctx_info *sched_ctx_info_args = (struct sched_ctx_info*)_args;
|
|
|
+ struct starpu_worker_s *worker = sched_ctx_info_args->worker;
|
|
|
+ struct starpu_sched_ctx *current_sched_ctx = sched_ctx_info_args->sched_ctx;
|
|
|
+ unsigned sched_ctx_id = sched_ctx_info_args->sched_ctx_id;
|
|
|
+
|
|
|
+ if(current_sched_ctx != NULL)
|
|
|
+ {
|
|
|
+ /* add context to worker */
|
|
|
+ worker->sched_ctx[sched_ctx_info_args->sched_ctx_id] = current_sched_ctx;
|
|
|
+ worker->nctxs++;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ /* remove context from worker */
|
|
|
+ int i;
|
|
|
+ for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
|
|
|
+ if(worker->sched_ctx[i] != NULL && worker->sched_ctx[i]->sched_ctx_id == sched_ctx_id)
|
|
|
+ {
|
|
|
+ worker->sched_ctx[i] = NULL;
|
|
|
+ worker->nctxs--;
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+static void _starpu_update_workers(int *workerids_in_ctx, int nworkerids_in_ctx,
|
|
|
+ int sched_ctx_id, struct starpu_sched_ctx *sched_ctx)
|
|
|
+{
|
|
|
+ struct starpu_task *tasks[nworkerids_in_ctx];
|
|
|
+
|
|
|
+ struct starpu_codelet_t sched_ctx_info_cl = {
|
|
|
+ .where = STARPU_CPU|STARPU_CUDA|STARPU_OPENCL,
|
|
|
+ .cuda_func = update_workers_func,
|
|
|
+ .cpu_func = update_workers_func,
|
|
|
+ .opencl_func = update_workers_func,
|
|
|
+ .nbuffers = 0
|
|
|
+ };
|
|
|
+
|
|
|
+ int i, ret;
|
|
|
+ for(i = 0; i < nworkerids_in_ctx; i++)
|
|
|
+ {
|
|
|
+ struct starpu_worker_s *worker = _starpu_get_worker_struct(workerids_in_ctx[i]);
|
|
|
+
|
|
|
+ struct sched_ctx_info sched_info_args= {
|
|
|
+ .sched_ctx_id = sched_ctx_id == -1 ?
|
|
|
+ _starpu_get_first_free_sched_ctx_in_worker_list(worker) : (unsigned)sched_ctx_id,
|
|
|
+ .sched_ctx = sched_ctx,
|
|
|
+ .worker = worker
|
|
|
+ };
|
|
|
+
|
|
|
+ tasks[i] = starpu_task_create();
|
|
|
+ tasks[i]->cl = &sched_ctx_info_cl;
|
|
|
+ tasks[i]->cl_arg = &sched_info_args;
|
|
|
+ tasks[i]->execute_on_a_specific_worker = 1;
|
|
|
+ tasks[i]->workerid = workerids_in_ctx[i];
|
|
|
+ tasks[i]->detach = 0;
|
|
|
+ tasks[i]->destroy = 0;
|
|
|
+
|
|
|
+#ifdef STARPU_USE_FXT
|
|
|
+ starpu_job_t job = _starpu_get_job_associated_to_task(tasks[i]);
|
|
|
+ job->model_name = "sched_ctx_info";
|
|
|
+#endif
|
|
|
+
|
|
|
+ _starpu_exclude_task_from_dag(tasks[i]);
|
|
|
+
|
|
|
+ ret = starpu_task_submit(tasks[i]);
|
|
|
+ if (ret == -ENODEV)
|
|
|
+ {
|
|
|
+ /* if the worker is not able to execute this tasks, we
|
|
|
+ * don't insist as this means the worker is not
|
|
|
+ * designated by the "where" bitmap */
|
|
|
+ starpu_task_destroy(tasks[i]);
|
|
|
+ tasks[i] = NULL;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for (i = 0; i < nworkerids_in_ctx; i++)
|
|
|
+ {
|
|
|
+ if (tasks[i])
|
|
|
+ {
|
|
|
+ ret = starpu_task_wait(tasks[i]);
|
|
|
+ STARPU_ASSERT(!ret);
|
|
|
+ starpu_task_destroy(tasks[i]);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+struct starpu_sched_ctx* _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx,
|
|
|
int nworkerids_in_ctx, unsigned is_initial_sched,
|
|
|
const char *sched_name)
|
|
|
{
|
|
@@ -36,14 +129,12 @@ unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx
|
|
|
_starpu_barrier_counter_init(&sched_ctx->tasks_barrier, 0);
|
|
|
|
|
|
int j;
|
|
|
- /* if null add ll the workers are to the contex */
|
|
|
+ /* if null add all the workers are to the contex */
|
|
|
if(workerids_in_ctx == NULL)
|
|
|
{
|
|
|
for(j = 0; j < nworkers; j++)
|
|
|
{
|
|
|
sched_ctx->workerid[j] = j;
|
|
|
- struct starpu_worker_s *workerarg = _starpu_get_worker_struct(j);
|
|
|
- workerarg->sched_ctx[_starpu_get_first_free_sched_ctx_in_worker_list(workerarg)] = sched_ctx;
|
|
|
}
|
|
|
sched_ctx->nworkers_in_ctx = nworkers;
|
|
|
}
|
|
@@ -53,11 +144,9 @@ unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx
|
|
|
for(i = 0; i < nworkerids_in_ctx; i++)
|
|
|
{
|
|
|
/* the user should not ask for a resource that does not exist */
|
|
|
- STARPU_ASSERT( workerids_in_ctx[i] >= 0 && workerids_in_ctx[i] <= nworkers);
|
|
|
-
|
|
|
+ STARPU_ASSERT( workerids_in_ctx[i] >= 0 && workerids_in_ctx[i] <= nworkers);
|
|
|
sched_ctx->workerid[i] = workerids_in_ctx[i];
|
|
|
- struct starpu_worker_s *workerarg = _starpu_get_worker_struct(sched_ctx->workerid[i]);
|
|
|
- workerarg->sched_ctx[_starpu_get_first_free_sched_ctx_in_worker_list(workerarg)] = sched_ctx;
|
|
|
+
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -70,25 +159,28 @@ unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx
|
|
|
|
|
|
config->topology.nsched_ctxs++;
|
|
|
|
|
|
- return sched_ctx->sched_ctx_id;
|
|
|
+ /* if we create the initial big sched ctx we can update workers' status here
|
|
|
+ because they haven't been launched yet */
|
|
|
+ if(is_initial_sched)
|
|
|
+ {
|
|
|
+ int i;
|
|
|
+ for(i = 0; i < sched_ctx->nworkers_in_ctx; i++)
|
|
|
+ {
|
|
|
+ struct starpu_worker_s *worker = _starpu_get_worker_struct(sched_ctx->workerid[i]);
|
|
|
+ worker->sched_ctx[_starpu_get_first_free_sched_ctx_in_worker_list(worker)] = sched_ctx;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return sched_ctx;
|
|
|
}
|
|
|
|
|
|
|
|
|
unsigned starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx,
|
|
|
int nworkerids_in_ctx, const char *sched_name)
|
|
|
{
|
|
|
- struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
|
|
|
-
|
|
|
- unsigned id = _starpu_get_first_available_sched_ctx_id(config);
|
|
|
-
|
|
|
- unsigned ret;
|
|
|
- /* block the workers until the contex is switched */
|
|
|
- set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx, id);
|
|
|
- ret = _starpu_create_sched_ctx(policy_name, workerids_in_ctx, nworkerids_in_ctx, 0, sched_name);
|
|
|
- /* also wait the workers to wake up before using the context */
|
|
|
- set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx, id);
|
|
|
-
|
|
|
- return ret;
|
|
|
+ struct starpu_sched_ctx *sched_ctx = _starpu_create_sched_ctx(policy_name, workerids_in_ctx, nworkerids_in_ctx, 0, sched_name);
|
|
|
+ _starpu_update_workers(sched_ctx->workerid, sched_ctx->nworkers_in_ctx, -1, sched_ctx);
|
|
|
+ return sched_ctx->sched_ctx_id;
|
|
|
}
|
|
|
|
|
|
/* check if the worker already belongs to the context */
|
|
@@ -101,37 +193,9 @@ static unsigned _starpu_worker_belongs_to_ctx(int workerid, struct starpu_sched_
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
-/* remove the context from the worker's list of contexts */
|
|
|
-static void _starpu_remove_sched_ctx_from_worker(struct starpu_worker_s *workerarg, struct starpu_sched_ctx *sched_ctx)
|
|
|
-{
|
|
|
- unsigned i;
|
|
|
- for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
|
|
|
- {
|
|
|
- if(sched_ctx != NULL && workerarg->sched_ctx[i] == sched_ctx
|
|
|
- && workerarg->status != STATUS_JOINED)
|
|
|
- {
|
|
|
- workerarg->sched_ctx[i] = NULL;
|
|
|
- workerarg->nctxs--;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return;
|
|
|
-}
|
|
|
-
|
|
|
/* free all structures for the context */
|
|
|
-static void _starpu_manage_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx)
|
|
|
+static void free_sched_ctx_mem(struct starpu_sched_ctx *sched_ctx)
|
|
|
{
|
|
|
- int nworkers = sched_ctx->nworkers_in_ctx;
|
|
|
- int workerid;
|
|
|
- int i;
|
|
|
- for(i = 0; i < nworkers; i++)
|
|
|
- {
|
|
|
- workerid = sched_ctx->workerid[i];
|
|
|
- struct starpu_worker_s *workerarg = _starpu_get_worker_struct(workerid);
|
|
|
- _starpu_remove_sched_ctx_from_worker(workerarg, sched_ctx);
|
|
|
- }
|
|
|
-
|
|
|
free(sched_ctx->sched_policy);
|
|
|
free(sched_ctx->sched_mutex);
|
|
|
free(sched_ctx->sched_cond);
|
|
@@ -139,7 +203,13 @@ static void _starpu_manage_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx)
|
|
|
struct starpu_machine_config_s *config = _starpu_get_machine_config();
|
|
|
config->topology.nsched_ctxs--;
|
|
|
sched_ctx->sched_ctx_id = STARPU_NMAX_SCHED_CTXS;
|
|
|
- PTHREAD_MUTEX_DESTROY(&sched_ctx->changing_ctx_mutex);
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+static void _starpu_manage_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx)
|
|
|
+{
|
|
|
+ _starpu_update_workers(sched_ctx->workerid, sched_ctx->nworkers_in_ctx,
|
|
|
+ sched_ctx->sched_ctx_id, NULL);
|
|
|
}
|
|
|
|
|
|
static void _starpu_add_workers_to_sched_ctx(int *new_workers, int nnew_workers,
|
|
@@ -147,51 +217,46 @@ static void _starpu_add_workers_to_sched_ctx(int *new_workers, int nnew_workers,
|
|
|
{
|
|
|
struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
|
|
|
int ntotal_workers = config->topology.nworkers;
|
|
|
-
|
|
|
int nworkerids_already_in_ctx = sched_ctx->nworkers_in_ctx;
|
|
|
- int j;
|
|
|
|
|
|
int n_added_workers = 0;
|
|
|
-
|
|
|
+ int added_workers[ntotal_workers];
|
|
|
+
|
|
|
/*if null add the rest of the workers which don't already belong to this ctx*/
|
|
|
if(new_workers == NULL)
|
|
|
- {
|
|
|
+ {
|
|
|
+ int j;
|
|
|
for(j = 0; j < ntotal_workers; j++)
|
|
|
- {
|
|
|
- struct starpu_worker_s *workerarg = _starpu_get_worker_struct(j);
|
|
|
if(!_starpu_worker_belongs_to_ctx(j, sched_ctx))
|
|
|
- {
|
|
|
+ {
|
|
|
sched_ctx->workerid[++nworkerids_already_in_ctx] = j;
|
|
|
- workerarg->sched_ctx[_starpu_get_first_free_sched_ctx_in_worker_list(workerarg)] = sched_ctx;
|
|
|
- }
|
|
|
- }
|
|
|
+ added_workers[n_added_workers] = j;
|
|
|
+ }
|
|
|
+
|
|
|
n_added_workers = ntotal_workers;
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
int i;
|
|
|
- //printf("%d redim worker:", nnew_workers);
|
|
|
for(i = 0; i < nnew_workers; i++)
|
|
|
{
|
|
|
/* take care the user does not ask for a resource that does not exist */
|
|
|
STARPU_ASSERT( new_workers[i] >= 0 && new_workers[i] <= ntotal_workers);
|
|
|
- //printf(" %d", new_workers[i]);
|
|
|
|
|
|
- struct starpu_worker_s *workerarg = _starpu_get_worker_struct(new_workers[i]);
|
|
|
if(!_starpu_worker_belongs_to_ctx(new_workers[i], sched_ctx))
|
|
|
{
|
|
|
/* add worker to context */
|
|
|
sched_ctx->workerid[ nworkerids_already_in_ctx + n_added_workers] = new_workers[i];
|
|
|
- /* add context to worker */
|
|
|
- workerarg->sched_ctx[_starpu_get_first_free_sched_ctx_in_worker_list(workerarg)] = sched_ctx;
|
|
|
+ added_workers[n_added_workers] = new_workers[i];
|
|
|
n_added_workers++;
|
|
|
}
|
|
|
}
|
|
|
- //printf("\n");
|
|
|
}
|
|
|
|
|
|
sched_ctx->sched_policy->init_sched_for_workers(sched_ctx->sched_ctx_id, n_added_workers);
|
|
|
|
|
|
+ _starpu_update_workers(added_workers, n_added_workers, -1, sched_ctx);
|
|
|
+
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -203,8 +268,6 @@ void starpu_delete_sched_ctx(unsigned sched_ctx_id, unsigned inheritor_sched_ctx
|
|
|
struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
|
|
|
struct starpu_sched_ctx *inheritor_sched_ctx = _starpu_get_sched_ctx(inheritor_sched_ctx_id);
|
|
|
|
|
|
- /* block the workers until the contex is switched */
|
|
|
- set_changing_ctx_flag(STATUS_CHANGING_CTX, sched_ctx->nworkers_in_ctx, sched_ctx->workerid, sched_ctx_id);
|
|
|
_starpu_manage_delete_sched_ctx(sched_ctx);
|
|
|
|
|
|
/*if both of them have all the ressources is pointless*/
|
|
@@ -214,25 +277,21 @@ void starpu_delete_sched_ctx(unsigned sched_ctx_id, unsigned inheritor_sched_ctx
|
|
|
|
|
|
if(!(sched_ctx->nworkers_in_ctx == ntotal_workers && sched_ctx->nworkers_in_ctx == inheritor_sched_ctx->nworkers_in_ctx))
|
|
|
_starpu_add_workers_to_sched_ctx(sched_ctx->workerid, sched_ctx->nworkers_in_ctx, inheritor_sched_ctx);
|
|
|
- /* also wait the workers to wake up before using the context */
|
|
|
- set_changing_ctx_flag(STATUS_UNKNOWN, sched_ctx->nworkers_in_ctx, sched_ctx->workerid, sched_ctx_id);
|
|
|
+ free_sched_ctx_mem(sched_ctx);
|
|
|
+
|
|
|
}
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+/* called after the workers are terminated so we don't have anything else to do but free the memory*/
|
|
|
void _starpu_delete_all_sched_ctxs()
|
|
|
{
|
|
|
unsigned i;
|
|
|
-
|
|
|
for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
|
|
|
{
|
|
|
- struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(i);
|
|
|
- if(sched_ctx->sched_ctx_id != STARPU_NMAX_SCHED_CTXS){
|
|
|
- if(!starpu_wait_for_all_tasks_of_sched_ctx(i))
|
|
|
- {
|
|
|
- _starpu_manage_delete_sched_ctx(sched_ctx);
|
|
|
- }
|
|
|
- }
|
|
|
+ struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(i);
|
|
|
+ if(sched_ctx->sched_ctx_id != STARPU_NMAX_SCHED_CTXS)
|
|
|
+ free_sched_ctx_mem(sched_ctx);
|
|
|
}
|
|
|
return;
|
|
|
}
|
|
@@ -241,12 +300,7 @@ void starpu_add_workers_to_sched_ctx(int *workerids_in_ctx, int nworkerids_in_ct
|
|
|
unsigned sched_ctx_id)
|
|
|
{
|
|
|
struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
|
|
|
-
|
|
|
- /* block the workers until the contex is switched */
|
|
|
- set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx, sched_ctx_id);
|
|
|
_starpu_add_workers_to_sched_ctx(workerids_in_ctx, nworkerids_in_ctx, sched_ctx);
|
|
|
- /* also wait the workers to wake up before using the context */
|
|
|
- set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx, sched_ctx_id);
|
|
|
|
|
|
return;
|
|
|
}
|
|
@@ -256,25 +310,26 @@ static void _starpu_remove_workers_from_sched_ctx(int *workerids_in_ctx, int nwo
|
|
|
{
|
|
|
struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
|
|
|
int nworkers = config->topology.nworkers;
|
|
|
-
|
|
|
int nworkerids_already_in_ctx = sched_ctx->nworkers_in_ctx;
|
|
|
|
|
|
STARPU_ASSERT(nworkerids_in_ctx <= nworkerids_already_in_ctx);
|
|
|
|
|
|
int i, workerid;
|
|
|
|
|
|
+ int nremoved_workers = 0;
|
|
|
+ int removed_workers[nworkers];
|
|
|
+
|
|
|
/*if null remove all the workers that belong to this ctx*/
|
|
|
if(workerids_in_ctx == NULL)
|
|
|
{
|
|
|
for(i = 0; i < nworkerids_already_in_ctx; i++)
|
|
|
{
|
|
|
- workerid = sched_ctx->workerid[i];
|
|
|
- struct starpu_worker_s *workerarg = _starpu_get_worker_struct(workerid);
|
|
|
- _starpu_remove_sched_ctx_from_worker(workerarg, sched_ctx);
|
|
|
+ removed_workers[i] = sched_ctx->workerid[i];
|
|
|
sched_ctx->workerid[i] = -1;
|
|
|
}
|
|
|
|
|
|
sched_ctx->nworkers_in_ctx = 0;
|
|
|
+ nremoved_workers = nworkerids_already_in_ctx;
|
|
|
}
|
|
|
else
|
|
|
{
|
|
@@ -283,23 +338,24 @@ static void _starpu_remove_workers_from_sched_ctx(int *workerids_in_ctx, int nwo
|
|
|
workerid = workerids_in_ctx[i];
|
|
|
/* take care the user does not ask for a resource that does not exist */
|
|
|
STARPU_ASSERT( workerid >= 0 && workerid <= nworkers);
|
|
|
+ removed_workers[i] = sched_ctx->workerid[i];
|
|
|
|
|
|
- struct starpu_worker_s *workerarg = _starpu_get_worker_struct(workerid);
|
|
|
- _starpu_remove_sched_ctx_from_worker(workerarg, sched_ctx);
|
|
|
int j;
|
|
|
-
|
|
|
/* don't leave the workerid with a correct value even if we don't use it anymore */
|
|
|
for(j = 0; j < nworkerids_already_in_ctx; j++)
|
|
|
if(sched_ctx->workerid[j] == workerid)
|
|
|
sched_ctx->workerid[j] = -1;
|
|
|
}
|
|
|
|
|
|
+ nremoved_workers = nworkerids_in_ctx;
|
|
|
sched_ctx->nworkers_in_ctx -= nworkerids_in_ctx;
|
|
|
/* reorder the worker's list of contexts in order to avoid
|
|
|
the holes in the list after removing some elements */
|
|
|
_starpu_rearange_sched_ctx_workerids(sched_ctx, nworkerids_already_in_ctx);
|
|
|
}
|
|
|
|
|
|
+ _starpu_update_workers(removed_workers, nremoved_workers, sched_ctx->sched_ctx_id, NULL);
|
|
|
+
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -312,13 +368,9 @@ void starpu_remove_workers_from_sched_ctx(int *workerids_in_ctx, int nworkerids_
|
|
|
{
|
|
|
struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
|
|
|
|
|
|
- /* block the workers until the contex is switched */
|
|
|
- set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx, sched_ctx_id);
|
|
|
PTHREAD_MUTEX_LOCK(&sched_ctx->changing_ctx_mutex);
|
|
|
_starpu_remove_workers_from_sched_ctx(workerids_in_ctx, nworkerids_in_ctx, sched_ctx);
|
|
|
PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
|
|
|
- /* also wait the workers to wake up before using the context */
|
|
|
- set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx, sched_ctx_id);
|
|
|
}
|
|
|
return;
|
|
|
}
|
|
@@ -405,70 +457,6 @@ static void _starpu_rearange_sched_ctx_workerids(struct starpu_sched_ctx *sched_
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/* manage blocking and waking up threads when constructing/modifying contexts */
|
|
|
-int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_ctx, int *workerids_in_ctx, unsigned sched_ctx_id)
|
|
|
-{
|
|
|
- struct starpu_machine_config_s *config = _starpu_get_machine_config();
|
|
|
-
|
|
|
- int i;
|
|
|
- int nworkers = nworkerids_in_ctx == -1 ? (int)config->topology.nworkers : nworkerids_in_ctx;
|
|
|
-
|
|
|
- struct starpu_worker_s *worker = NULL;
|
|
|
- pthread_mutex_t *changing_ctx_mutex = NULL;
|
|
|
- pthread_cond_t *changing_ctx_cond = NULL;
|
|
|
-
|
|
|
- int workerid = -1;
|
|
|
- if(changing_ctx == STATUS_CHANGING_CTX)
|
|
|
- _starpu_barrier_counter_update(&workers_barrier[sched_ctx_id], nworkers);
|
|
|
-
|
|
|
- for(i = 0; i < nworkers; i++)
|
|
|
- {
|
|
|
- workerid = workerids_in_ctx == NULL ? i : workerids_in_ctx[i];
|
|
|
- worker = _starpu_get_worker_struct(workerid);
|
|
|
-
|
|
|
- changing_ctx_mutex = &worker->changing_ctx_mutex;
|
|
|
- changing_ctx_cond = &worker->changing_ctx_cond;
|
|
|
-
|
|
|
- if(changing_ctx == STATUS_CHANGING_CTX)
|
|
|
- {
|
|
|
- worker->workers_barrier[sched_ctx_id] = &workers_barrier[sched_ctx_id];
|
|
|
- }
|
|
|
-
|
|
|
- /*if the status is CHANGING_CTX let the thread know that it must block*/
|
|
|
- PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
|
|
|
- worker->blocking_status = changing_ctx;
|
|
|
- worker->nworkers_of_next_ctx = nworkers;
|
|
|
- PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
|
|
|
-
|
|
|
- /*if we have finished changing the ctx wake up the blocked threads*/
|
|
|
- if(changing_ctx == STATUS_UNKNOWN)
|
|
|
- {
|
|
|
- PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
|
|
|
- PTHREAD_COND_SIGNAL(changing_ctx_cond);
|
|
|
- PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /* after letting know all the concerned threads about the change
|
|
|
- wait for them to take into account the info */
|
|
|
- if(changing_ctx == STATUS_CHANGING_CTX)
|
|
|
- _starpu_wait_for_all_threads_to_block(&workers_barrier[sched_ctx_id]);
|
|
|
- else
|
|
|
- _starpu_wait_for_all_threads_to_wake_up(&workers_barrier[sched_ctx_id]);
|
|
|
-
|
|
|
- if(changing_ctx == STATUS_UNKNOWN)
|
|
|
- {
|
|
|
- for(i = 0; i < nworkers; i++)
|
|
|
- {
|
|
|
- workerid = workerids_in_ctx == NULL ? i : workerids_in_ctx[i];
|
|
|
- worker = _starpu_get_worker_struct(workerid);
|
|
|
- worker->workers_barrier[sched_ctx_id] = NULL;
|
|
|
- }
|
|
|
- }
|
|
|
- return 0;
|
|
|
-}
|
|
|
-
|
|
|
-
|
|
|
int starpu_wait_for_all_tasks_of_worker(int workerid)
|
|
|
{
|
|
|
if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
|
|
@@ -518,33 +506,6 @@ void _starpu_increment_nsubmitted_tasks_of_worker(int workerid)
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
-void _starpu_decrement_nblocked_ths(struct _starpu_barrier_counter_t **barrier)
|
|
|
-{
|
|
|
- unsigned i;
|
|
|
- for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
|
|
|
- if(barrier[i] != NULL)
|
|
|
- _starpu_barrier_counter_decrement_until_empty_counter(barrier[i]);
|
|
|
-
|
|
|
-}
|
|
|
-
|
|
|
-void _starpu_increment_nblocked_ths(struct _starpu_barrier_counter_t **barrier)
|
|
|
-{
|
|
|
- unsigned i;
|
|
|
- for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
|
|
|
- if(barrier[i] != NULL)
|
|
|
- _starpu_barrier_counter_increment_until_full_counter(barrier[i]);
|
|
|
-}
|
|
|
-
|
|
|
-int _starpu_wait_for_all_threads_to_block(struct _starpu_barrier_counter_t *barrier)
|
|
|
-{
|
|
|
- return _starpu_barrier_counter_wait_for_full_counter(barrier);
|
|
|
-}
|
|
|
-
|
|
|
-int _starpu_wait_for_all_threads_to_wake_up(struct _starpu_barrier_counter_t *barrier)
|
|
|
-{
|
|
|
- return _starpu_barrier_counter_wait_for_empty_counter(barrier);
|
|
|
-}
|
|
|
-
|
|
|
int starpu_wait_for_all_tasks_of_sched_ctx(unsigned sched_ctx_id)
|
|
|
{
|
|
|
struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
|