|
@@ -9,11 +9,15 @@ static pthread_cond_t wakeup_ths_cond = PTHREAD_COND_INITIALIZER;
|
|
static pthread_mutex_t blocking_ths_mutex = PTHREAD_MUTEX_INITIALIZER;
|
|
static pthread_mutex_t blocking_ths_mutex = PTHREAD_MUTEX_INITIALIZER;
|
|
static int nblocked_ths = 0;
|
|
static int nblocked_ths = 0;
|
|
|
|
|
|
-void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int
|
|
|
|
- *workerids_in_ctx, int nworkerids_in_ctx, unsigned is_initial_sched,
|
|
|
|
- const char *sched_name)
|
|
|
|
|
|
+int _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx,
|
|
|
|
+ int nworkerids_in_ctx, unsigned is_initial_sched,
|
|
|
|
+ const char *sched_name)
|
|
{
|
|
{
|
|
struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
|
|
struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
|
|
|
|
+ STARPU_ASSERT(config->topology.nsched_ctxs < STARPU_NMAX_SCHED_CTXS - 1);
|
|
|
|
+
|
|
|
|
+ struct starpu_sched_ctx *sched_ctx = &config->sched_ctxs[config->topology.nsched_ctxs];
|
|
|
|
+
|
|
int nworkers = config->topology.nworkers;
|
|
int nworkers = config->topology.nworkers;
|
|
|
|
|
|
STARPU_ASSERT(nworkerids_in_ctx <= nworkers);
|
|
STARPU_ASSERT(nworkerids_in_ctx <= nworkers);
|
|
@@ -61,7 +65,10 @@ void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *po
|
|
|
|
|
|
_starpu_init_sched_policy(config, sched_ctx, policy_name);
|
|
_starpu_init_sched_policy(config, sched_ctx, policy_name);
|
|
|
|
|
|
- return;
|
|
|
|
|
|
+ sched_ctx->sched_ctx_id = config->topology.nsched_ctxs;
|
|
|
|
+ config->topology.nsched_ctxs++;
|
|
|
|
+
|
|
|
|
+ return sched_ctx->sched_ctx_id;
|
|
}
|
|
}
|
|
|
|
|
|
void _starpu_decrement_nblocked_ths(void)
|
|
void _starpu_decrement_nblocked_ths(void)
|
|
@@ -107,8 +114,7 @@ static int _starpu_wait_for_all_threads_to_wake_up(void)
|
|
return 0;
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
|
|
-static int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_ctx, int
|
|
|
|
- *workerids_in_ctx)
|
|
|
|
|
|
+static int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_ctx, int *workerids_in_ctx)
|
|
{
|
|
{
|
|
struct starpu_machine_config_s *config = _starpu_get_machine_config();
|
|
struct starpu_machine_config_s *config = _starpu_get_machine_config();
|
|
|
|
|
|
@@ -154,16 +160,17 @@ static int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkeri
|
|
return 0;
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
|
|
-void starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int
|
|
|
|
- *workerids_in_ctx, int nworkerids_in_ctx, const char *sched_name)
|
|
|
|
|
|
+int starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx,
|
|
|
|
+ int nworkerids_in_ctx, const char *sched_name)
|
|
{
|
|
{
|
|
|
|
+ int ret;
|
|
/* block the workers until the contex is switched */
|
|
/* block the workers until the contex is switched */
|
|
set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx);
|
|
set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx);
|
|
- _starpu_create_sched_ctx(sched_ctx, policy_name, workerids_in_ctx, nworkerids_in_ctx, 0, sched_name);
|
|
|
|
|
|
+ ret = _starpu_create_sched_ctx(policy_name, workerids_in_ctx, nworkerids_in_ctx, 0, sched_name);
|
|
/* also wait the workers to wake up before using the context */
|
|
/* also wait the workers to wake up before using the context */
|
|
set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx);
|
|
set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx);
|
|
-
|
|
|
|
- return;
|
|
|
|
|
|
+
|
|
|
|
+ return ret;
|
|
}
|
|
}
|
|
|
|
|
|
static unsigned _starpu_worker_belongs_to_ctx(struct starpu_worker_s *workerarg, struct starpu_sched_ctx *sched_ctx)
|
|
static unsigned _starpu_worker_belongs_to_ctx(struct starpu_worker_s *workerarg, struct starpu_sched_ctx *sched_ctx)
|
|
@@ -178,7 +185,7 @@ static unsigned _starpu_worker_belongs_to_ctx(struct starpu_worker_s *workerarg,
|
|
|
|
|
|
static void _starpu_remove_sched_ctx_from_worker(struct starpu_worker_s *workerarg, struct starpu_sched_ctx *sched_ctx)
|
|
static void _starpu_remove_sched_ctx_from_worker(struct starpu_worker_s *workerarg, struct starpu_sched_ctx *sched_ctx)
|
|
{
|
|
{
|
|
- int i;
|
|
|
|
|
|
+ unsigned i;
|
|
unsigned to_remove = 0;
|
|
unsigned to_remove = 0;
|
|
for(i = 0; i < workerarg->nctxs; i++)
|
|
for(i = 0; i < workerarg->nctxs; i++)
|
|
{
|
|
{
|
|
@@ -196,10 +203,11 @@ static void _starpu_remove_sched_ctx_from_worker(struct starpu_worker_s *workera
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
-void starpu_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx)
|
|
|
|
|
|
+void starpu_delete_sched_ctx(int sched_ctx_id)
|
|
{
|
|
{
|
|
- if(!starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx))
|
|
|
|
|
|
+ if(!starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id))
|
|
{
|
|
{
|
|
|
|
+ struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
|
|
|
|
|
|
int nworkers = sched_ctx->nworkers_in_ctx;
|
|
int nworkers = sched_ctx->nworkers_in_ctx;
|
|
int workerid;
|
|
int workerid;
|
|
@@ -296,7 +304,6 @@ void _starpu_decrement_nsubmitted_tasks_of_worker(int workerid)
|
|
|
|
|
|
void _starpu_increment_nsubmitted_tasks_of_worker(int workerid)
|
|
void _starpu_increment_nsubmitted_tasks_of_worker(int workerid)
|
|
{
|
|
{
|
|
- // printf("task submitted to %d\n", workerid);
|
|
|
|
struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
|
|
struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
|
|
|
|
|
|
PTHREAD_MUTEX_LOCK(&worker->submitted_mutex);
|
|
PTHREAD_MUTEX_LOCK(&worker->submitted_mutex);
|
|
@@ -356,8 +363,10 @@ static void _starpu_add_workers_to_sched_ctx(int *workerids_in_ctx, int nworkeri
|
|
}
|
|
}
|
|
|
|
|
|
void starpu_add_workers_to_sched_ctx(int *workerids_in_ctx, int nworkerids_in_ctx,
|
|
void starpu_add_workers_to_sched_ctx(int *workerids_in_ctx, int nworkerids_in_ctx,
|
|
- struct starpu_sched_ctx *sched_ctx)
|
|
|
|
|
|
+ int sched_ctx_id)
|
|
{
|
|
{
|
|
|
|
+ struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
|
|
|
|
+
|
|
/* block the workers until the contex is switched */
|
|
/* block the workers until the contex is switched */
|
|
set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx);
|
|
set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx);
|
|
_starpu_add_workers_to_sched_ctx(workerids_in_ctx, nworkerids_in_ctx, sched_ctx);
|
|
_starpu_add_workers_to_sched_ctx(workerids_in_ctx, nworkerids_in_ctx, sched_ctx);
|
|
@@ -451,12 +460,14 @@ static void _starpu_remove_workers_from_sched_ctx(int *workerids_in_ctx, int nwo
|
|
}
|
|
}
|
|
|
|
|
|
void starpu_remove_workers_from_sched_ctx(int *workerids_in_ctx, int nworkerids_in_ctx,
|
|
void starpu_remove_workers_from_sched_ctx(int *workerids_in_ctx, int nworkerids_in_ctx,
|
|
- struct starpu_sched_ctx *sched_ctx)
|
|
|
|
|
|
+ int sched_ctx_id)
|
|
{
|
|
{
|
|
/* wait for the workers concerned by the change of contex
|
|
/* wait for the workers concerned by the change of contex
|
|
* to finish their work in the previous context */
|
|
* to finish their work in the previous context */
|
|
if(!starpu_wait_for_all_tasks_of_workers(workerids_in_ctx, nworkerids_in_ctx))
|
|
if(!starpu_wait_for_all_tasks_of_workers(workerids_in_ctx, nworkerids_in_ctx))
|
|
{
|
|
{
|
|
|
|
+ struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
|
|
|
|
+
|
|
/* block the workers until the contex is switched */
|
|
/* block the workers until the contex is switched */
|
|
set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx);
|
|
set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx);
|
|
_starpu_remove_workers_from_sched_ctx(workerids_in_ctx, nworkerids_in_ctx, sched_ctx);
|
|
_starpu_remove_workers_from_sched_ctx(workerids_in_ctx, nworkerids_in_ctx, sched_ctx);
|
|
@@ -467,20 +478,22 @@ void starpu_remove_workers_from_sched_ctx(int *workerids_in_ctx, int nworkerids_
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
-int starpu_wait_for_all_tasks_of_sched_ctx(struct starpu_sched_ctx *sched_ctx)
|
|
|
|
|
|
+int starpu_wait_for_all_tasks_of_sched_ctx(int sched_ctx_id)
|
|
{
|
|
{
|
|
- if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
|
|
|
|
- return -EDEADLK;
|
|
|
|
-
|
|
|
|
- PTHREAD_MUTEX_LOCK(&sched_ctx->submitted_mutex);
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- while (sched_ctx->nsubmitted > 0)
|
|
|
|
- PTHREAD_COND_WAIT(&sched_ctx->submitted_cond, &sched_ctx->submitted_mutex);
|
|
|
|
-
|
|
|
|
- PTHREAD_MUTEX_UNLOCK(&sched_ctx->submitted_mutex);
|
|
|
|
-
|
|
|
|
- return 0;
|
|
|
|
|
|
+ struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
|
|
|
|
+
|
|
|
|
+ if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
|
|
|
|
+ return -EDEADLK;
|
|
|
|
+
|
|
|
|
+ PTHREAD_MUTEX_LOCK(&sched_ctx->submitted_mutex);
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ while (sched_ctx->nsubmitted > 0)
|
|
|
|
+ PTHREAD_COND_WAIT(&sched_ctx->submitted_cond, &sched_ctx->submitted_mutex);
|
|
|
|
+
|
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&sched_ctx->submitted_mutex);
|
|
|
|
+
|
|
|
|
+ return 0;
|
|
}
|
|
}
|
|
|
|
|
|
void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(struct starpu_sched_ctx *sched_ctx)
|
|
void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(struct starpu_sched_ctx *sched_ctx)
|