|
@@ -45,12 +45,13 @@ static int simgrid_started;
|
|
|
starpu_pthread_queue_t _starpu_simgrid_transfer_queue[STARPU_MAXNODES];
|
|
|
|
|
|
starpu_pthread_queue_t _starpu_simgrid_task_queue[STARPU_NMAXWORKERS];
|
|
|
-static struct task *first_task[STARPU_NMAXWORKERS];
|
|
|
-static struct task *last_task[STARPU_NMAXWORKERS];
|
|
|
-static msg_sem_t worker_sem[STARPU_NMAXWORKERS];
|
|
|
-static msg_process_t worker_runner[STARPU_NMAXWORKERS];
|
|
|
-static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED);
|
|
|
static int runners_running;
|
|
|
+static struct worker_runner {
|
|
|
+ struct task *first_task, *last_task;
|
|
|
+ msg_sem_t sem;
|
|
|
+ msg_process_t runner;
|
|
|
+} worker_runner[STARPU_NMAXWORKERS];
|
|
|
+static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED);
|
|
|
|
|
|
|
|
|
* main(), try to cope by calling starpu_main */
|
|
@@ -326,9 +327,9 @@ void _starpu_simgrid_init(void)
|
|
|
char s[32];
|
|
|
snprintf(s, sizeof(s), "worker %u runner", i);
|
|
|
void **tsd = calloc(MAX_TSD+1, sizeof(void*));
|
|
|
- worker_sem[i] = MSG_sem_init(0);
|
|
|
+ worker_runner[i].sem = MSG_sem_init(0);
|
|
|
tsd[0] = (void*)(uintptr_t) i;
|
|
|
- worker_runner[i] = MSG_process_create_with_arguments(s, task_execute, tsd, _starpu_simgrid_get_host_by_worker(_starpu_get_worker_struct(i)), 0, NULL);
|
|
|
+ worker_runner[i].runner = MSG_process_create_with_arguments(s, task_execute, tsd, _starpu_simgrid_get_host_by_worker(_starpu_get_worker_struct(i)), 0, NULL);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -343,16 +344,17 @@ void _starpu_simgrid_deinit(void)
|
|
|
}
|
|
|
for (i = 0; i < starpu_worker_get_count(); i++)
|
|
|
{
|
|
|
- MSG_sem_release(worker_sem[i]);
|
|
|
+ struct worker_runner *w = &worker_runner[i];
|
|
|
+ MSG_sem_release(w->sem);
|
|
|
#if SIMGRID_VERSION_MAJOR > 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR >= 14)
|
|
|
- MSG_process_join(worker_runner[i], 1000000);
|
|
|
+ MSG_process_join(w->runner, 1000000);
|
|
|
#else
|
|
|
MSG_process_sleep(1);
|
|
|
#endif
|
|
|
- MSG_sem_destroy(worker_sem[i]);
|
|
|
+ MSG_sem_destroy(w->sem);
|
|
|
+ STARPU_ASSERT(w->first_task == NULL);
|
|
|
+ STARPU_ASSERT(w->last_task == NULL);
|
|
|
starpu_pthread_queue_destroy(&_starpu_simgrid_task_queue[i]);
|
|
|
- STARPU_ASSERT(first_task[i] == NULL);
|
|
|
- STARPU_ASSERT(last_task[i] == NULL);
|
|
|
}
|
|
|
#ifdef HAVE_MSG_PROCESS_ATTACH
|
|
|
if (simgrid_started == 2)
|
|
@@ -386,18 +388,20 @@ static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_AT
|
|
|
MSG_process_sleep(0.000001);
|
|
|
|
|
|
unsigned workerid = (uintptr_t) starpu_pthread_getspecific(0);
|
|
|
+ struct worker_runner *w = &worker_runner[workerid];
|
|
|
+
|
|
|
_STARPU_DEBUG("worker %u started\n", workerid);
|
|
|
while (1) {
|
|
|
struct task *task;
|
|
|
|
|
|
- MSG_sem_acquire(worker_sem[workerid]);
|
|
|
+ MSG_sem_acquire(w->sem);
|
|
|
if (!runners_running)
|
|
|
break;
|
|
|
|
|
|
- task = first_task[workerid];
|
|
|
- first_task[workerid] = task->next;
|
|
|
- if (last_task[workerid] == task)
|
|
|
- last_task[workerid] = NULL;
|
|
|
+ task = w->first_task;
|
|
|
+ w->first_task = task->next;
|
|
|
+ if (w->last_task == task)
|
|
|
+ w->last_task = NULL;
|
|
|
|
|
|
_STARPU_DEBUG("task %p started\n", task);
|
|
|
MSG_task_execute(task->task);
|
|
@@ -417,7 +421,7 @@ static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_AT
|
|
|
|
|
|
void _starpu_simgrid_wait_tasks(int workerid)
|
|
|
{
|
|
|
- struct task *task = last_task[workerid];
|
|
|
+ struct task *task = worker_runner[workerid].last_task;
|
|
|
if (!task)
|
|
|
return;
|
|
|
|
|
@@ -476,6 +480,7 @@ void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *j, struct star
|
|
|
{
|
|
|
|
|
|
struct task *task;
|
|
|
+ struct worker_runner *w = &worker_runner[workerid];
|
|
|
_STARPU_MALLOC(task, sizeof(*task));
|
|
|
task->task = simgrid_task;
|
|
|
task->finished = finished;
|
|
@@ -484,19 +489,19 @@ void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *j, struct star
|
|
|
|
|
|
if (_starpu_simgrid_queue_malloc_cost())
|
|
|
MSG_process_sleep(0.000010);
|
|
|
- if (last_task[workerid])
|
|
|
+ if (w->last_task)
|
|
|
{
|
|
|
|
|
|
- last_task[workerid]->next = task;
|
|
|
- last_task[workerid] = task;
|
|
|
+ w->last_task->next = task;
|
|
|
+ w->last_task = task;
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- STARPU_ASSERT(!first_task[workerid]);
|
|
|
- first_task[workerid] = task;
|
|
|
- last_task[workerid] = task;
|
|
|
+ STARPU_ASSERT(!w->first_task);
|
|
|
+ w->first_task = task;
|
|
|
+ w->last_task = task;
|
|
|
}
|
|
|
- MSG_sem_release(worker_sem[workerid]);
|
|
|
+ MSG_sem_release(w->sem);
|
|
|
}
|
|
|
}
|
|
|
|