|
@@ -22,20 +22,23 @@
|
|
|
#include <core/sched_ctx.h>
|
|
|
#include <sched_policies/fifo_queues.h>
|
|
|
|
|
|
-static int _random_push_task(struct starpu_task *task, unsigned prio, struct starpu_sched_ctx *sched_ctx)
|
|
|
+static int _random_push_task(struct starpu_task *task, unsigned prio)
|
|
|
{
|
|
|
/* find the queue */
|
|
|
- unsigned worker, worker_ctx;
|
|
|
|
|
|
unsigned selected = 0;
|
|
|
|
|
|
double alpha_sum = 0.0;
|
|
|
|
|
|
- unsigned nworkers = sched_ctx->nworkers;
|
|
|
+ unsigned sched_ctx_id = task->sched_ctx;
|
|
|
+ struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
|
|
|
+ int worker;
|
|
|
+ if(workers->init_cursor)
|
|
|
+ workers->init_cursor(workers);
|
|
|
|
|
|
- for (worker_ctx = 0; worker_ctx < nworkers; worker_ctx++)
|
|
|
+ while(workers->has_next(workers))
|
|
|
{
|
|
|
- worker = sched_ctx->workerids[worker_ctx];
|
|
|
+ worker = workers->get_next(workers);
|
|
|
|
|
|
enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
|
|
|
alpha_sum += starpu_worker_get_relative_speedup(perf_arch);
|
|
@@ -45,9 +48,9 @@ static int _random_push_task(struct starpu_task *task, unsigned prio, struct sta
|
|
|
// _STARPU_DEBUG("my rand is %e\n", random);
|
|
|
|
|
|
double alpha = 0.0;
|
|
|
- for (worker_ctx = 0; worker_ctx < nworkers; worker_ctx++)
|
|
|
- {
|
|
|
- worker = sched_ctx->workerids[worker_ctx];
|
|
|
+ while(workers->has_next(workers))
|
|
|
+ {
|
|
|
+ worker = workers->get_next(workers);
|
|
|
|
|
|
enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
|
|
|
double worker_alpha = starpu_worker_get_relative_speedup(perf_arch);
|
|
@@ -62,6 +65,9 @@ static int _random_push_task(struct starpu_task *task, unsigned prio, struct sta
|
|
|
alpha += worker_alpha;
|
|
|
}
|
|
|
|
|
|
+ if(workers->init_cursor)
|
|
|
+ workers->deinit_cursor(workers);
|
|
|
+
|
|
|
/* we should now have the best worker in variable "selected" */
|
|
|
_starpu_increment_nsubmitted_tasks_of_worker(selected);
|
|
|
int n = starpu_push_local_task(selected, task, prio);
|
|
@@ -69,52 +75,68 @@ static int _random_push_task(struct starpu_task *task, unsigned prio, struct sta
|
|
|
}
|
|
|
|
|
|
|
|
|
-static int random_push_task(struct starpu_task *task, unsigned sched_ctx_id)
|
|
|
+static int random_push_task(struct starpu_task *task)
|
|
|
{
|
|
|
- struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
|
|
|
-
|
|
|
- return _random_push_task(task, !!task->priority, sched_ctx);
|
|
|
+ unsigned sched_ctx_id = task->sched_ctx;
|
|
|
+ pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);
|
|
|
+ unsigned nworkers;
|
|
|
+ int ret_val = -1;
|
|
|
+
|
|
|
+ _STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
|
|
|
+ nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
|
|
|
+ if(nworkers == 0)
|
|
|
+ {
|
|
|
+ _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
|
|
|
+ return ret_val;
|
|
|
+ }
|
|
|
+
|
|
|
+ ret_val = _random_push_task(task, !!task->priority);
|
|
|
+ _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
|
|
|
+ return ret_val;
|
|
|
}
|
|
|
|
|
|
-static void initialize_random_policy_for_workers(unsigned sched_ctx_id, int *workerids, unsigned nnew_workers)
|
|
|
+static void random_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
|
|
|
{
|
|
|
- struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
|
|
|
+ unsigned i;
|
|
|
+ int workerid;
|
|
|
+ for (i = 0; i < nworkers; i++)
|
|
|
+ {
|
|
|
+ workerid = workerids[i];
|
|
|
+ struct _starpu_worker *workerarg = _starpu_get_worker_struct(workerid);
|
|
|
+ starpu_worker_set_sched_condition(sched_ctx_id, workerid, &workerarg->sched_mutex, &workerarg->sched_cond);
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
+static void random_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
|
|
|
+{
|
|
|
unsigned i;
|
|
|
int workerid;
|
|
|
- for (i = 0; i < nnew_workers; i++)
|
|
|
+ for (i = 0; i < nworkers; i++)
|
|
|
{
|
|
|
- workerid = sched_ctx->workerids[i];
|
|
|
- struct starpu_worker_s *workerarg = _starpu_get_worker_struct(workerid);
|
|
|
- sched_ctx->sched_mutex[workerid] = &workerarg->sched_mutex;
|
|
|
- sched_ctx->sched_cond[workerid] = &workerarg->sched_cond;
|
|
|
+ workerid = workerids[i];
|
|
|
+ struct _starpu_worker *workerarg = _starpu_get_worker_struct(workerid);
|
|
|
+ starpu_worker_set_sched_condition(sched_ctx_id, workerid, &workerarg->sched_mutex, &workerarg->sched_cond);
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
|
|
|
static void initialize_random_policy(unsigned sched_ctx_id)
|
|
|
{
|
|
|
- struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
|
|
|
-
|
|
|
+ starpu_create_worker_collection_for_sched_ctx(sched_ctx_id, WORKER_LIST);
|
|
|
starpu_srand48(time(NULL));
|
|
|
+}
|
|
|
|
|
|
- unsigned nworkers = sched_ctx->nworkers;
|
|
|
-
|
|
|
- unsigned workerid_ctx;
|
|
|
- int workerid;
|
|
|
- for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
|
|
|
- {
|
|
|
- workerid = sched_ctx->workerids[workerid_ctx];
|
|
|
- struct starpu_worker_s *workerarg = _starpu_get_worker_struct(workerid);
|
|
|
- sched_ctx->sched_mutex[workerid] = &workerarg->sched_mutex;
|
|
|
- sched_ctx->sched_cond[workerid] = &workerarg->sched_cond;
|
|
|
- }
|
|
|
+static void deinitialize_random_policy(unsigned sched_ctx_id)
|
|
|
+{
|
|
|
+ starpu_delete_worker_collection_for_sched_ctx(sched_ctx_id);
|
|
|
}
|
|
|
|
|
|
struct starpu_sched_policy _starpu_sched_random_policy =
|
|
|
{
|
|
|
.init_sched = initialize_random_policy,
|
|
|
- .init_sched_for_workers = initialize_random_policy_for_workers,
|
|
|
- .deinit_sched = NULL,
|
|
|
+ .add_workers = random_add_workers,
|
|
|
+ .remove_workers = random_remove_workers,
|
|
|
+ .deinit_sched = deinitialize_random_policy,
|
|
|
.push_task = random_push_task,
|
|
|
.pop_task = NULL,
|
|
|
.pre_exec_hook = NULL,
|