123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- /* StarPU --- Runtime system for heterogeneous multicore architectures.
- *
- * Copyright (C) 2010-2011 Université de Bordeaux 1
- * Copyright (C) 2010, 2011, 2013 Centre National de la Recherche Scientifique
- * Copyright (C) 2011 Télécom-SudParis
- *
- * StarPU is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * StarPU is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
- /* Deque queues, ready for use by schedulers */
- #include <starpu.h>
- #include <starpu_scheduler.h>
- #include <sched_policies/deque_queues.h>
- #include <core/workers.h>
- struct _starpu_deque_jobq *_starpu_create_deque(void)
- {
- struct _starpu_deque_jobq *deque;
- deque = (struct _starpu_deque_jobq *) malloc(sizeof(struct _starpu_deque_jobq));
- /* note that not all mechanisms (eg. the semaphore) have to be used */
- deque->jobq = _starpu_job_list_new();
- deque->njobs = 0;
- deque->nprocessed = 0;
- deque->exp_start = starpu_timing_now();
- deque->exp_len = 0.0;
- deque->exp_end = deque->exp_start;
- return deque;
- }
- void _starpu_destroy_deque(struct _starpu_deque_jobq *deque)
- {
- _starpu_job_list_delete(deque->jobq);
- free(deque);
- }
- unsigned _starpu_get_deque_njobs(struct _starpu_deque_jobq *deque_queue)
- {
- return deque_queue->njobs;
- }
- int _starpu_get_deque_nprocessed(struct _starpu_deque_jobq *deque_queue)
- {
- return deque_queue->nprocessed;
- }
- struct starpu_task *_starpu_deque_pop_task(struct _starpu_deque_jobq *deque_queue, int workerid)
- {
- struct _starpu_job *j = NULL;
- if ((deque_queue->njobs == 0) && _starpu_machine_is_running())
- {
- return NULL;
- }
- /* TODO find a task that suits workerid */
- for (j = _starpu_job_list_begin(deque_queue->jobq);
- j != _starpu_job_list_end(deque_queue->jobq);
- j = _starpu_job_list_next(j))
- {
- unsigned nimpl;
- STARPU_ASSERT(j);
- for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
- if (starpu_worker_can_execute_task(workerid, j->task, nimpl))
- {
- j->nimpl = nimpl;
- j = _starpu_job_list_pop_front(deque_queue->jobq);
- _STARPU_TRACE_JOB_POP(j, 0);
- return j->task;
- }
- }
- return NULL;
- }
- struct _starpu_job_list *_starpu_deque_pop_every_task(struct _starpu_deque_jobq *deque_queue, starpu_pthread_mutex_t *sched_mutex, int workerid)
- {
- struct _starpu_job_list *new_list, *old_list;
- /* block until some task is available in that queue */
- _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
- if (deque_queue->njobs == 0)
- {
- new_list = NULL;
- }
- else
- {
- /* there is a task */
- old_list = deque_queue->jobq;
- new_list = _starpu_job_list_new();
- unsigned new_list_size = 0;
- struct _starpu_job *i;
- struct _starpu_job *next_job;
- /* note that this starts at the _head_ of the list, so we put
- * elements at the back of the new list */
- for(i = _starpu_job_list_begin(old_list);
- i != _starpu_job_list_end(old_list);
- i = next_job)
- {
- unsigned nimpl;
- next_job = _starpu_job_list_next(i);
- for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
- if (starpu_worker_can_execute_task(workerid, i->task, nimpl))
- {
- /* this elements can be moved into the new list */
- new_list_size++;
- _starpu_job_list_erase(old_list, i);
- _starpu_job_list_push_back(new_list, i);
- i->nimpl = nimpl;
- }
- }
- if (new_list_size == 0)
- {
- /* the new list is empty ... */
- _starpu_job_list_delete(new_list);
- new_list = NULL;
- }
- else
- {
- deque_queue->njobs -= new_list_size;
- }
- }
- _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
- return new_list;
- }
|