|
@@ -18,7 +18,7 @@
|
|
|
|
|
|
#include <starpu_config.h>
|
|
#include <starpu_config.h>
|
|
#include <starpu_scheduler.h>
|
|
#include <starpu_scheduler.h>
|
|
-#include <schedulers/heteroprio.h>
|
|
|
|
|
|
+#include <schedulers/starpu_heteroprio.h>
|
|
|
|
|
|
#include <common/fxt.h>
|
|
#include <common/fxt.h>
|
|
#include <core/task.h>
|
|
#include <core/task.h>
|
|
@@ -42,34 +42,36 @@
|
|
* When a task is pushed with a priority X, it will be stored
|
|
* When a task is pushed with a priority X, it will be stored
|
|
* into the bucket X.
|
|
* into the bucket X.
|
|
* All the tasks stored in the fifo should be computable by the arch
|
|
* All the tasks stored in the fifo should be computable by the arch
|
|
- * in valide_archs.
|
|
|
|
- * For example if valide_archs = (STARPU_CPU|STARPU_CUDA)
|
|
|
|
|
|
+ * in valid_archs.
|
|
|
|
+ * For example if valid_archs = (STARPU_CPU|STARPU_CUDA)
|
|
* Then task->task->cl->where should be at least (STARPU_CPU|STARPU_CUDA)
|
|
* Then task->task->cl->where should be at least (STARPU_CPU|STARPU_CUDA)
|
|
*/
|
|
*/
|
|
-struct _heteroprio_bucket{
|
|
|
|
|
|
+struct _heteroprio_bucket
|
|
|
|
+{
|
|
/* The task of the current bucket */
|
|
/* The task of the current bucket */
|
|
struct _starpu_fifo_taskq* tasks_queue;
|
|
struct _starpu_fifo_taskq* tasks_queue;
|
|
/* The correct arch for the current bucket */
|
|
/* The correct arch for the current bucket */
|
|
- unsigned valide_archs;
|
|
|
|
|
|
+ unsigned valid_archs;
|
|
/* The slow factors for any archs */
|
|
/* The slow factors for any archs */
|
|
- float slow_factors_per_index[FSTARPU_NB_TYPES];
|
|
|
|
|
|
+ float slow_factors_per_index[STARPU_NB_TYPES];
|
|
/* The base arch for the slow factor (the fatest arch for the current task in the bucket */
|
|
/* The base arch for the slow factor (the fatest arch for the current task in the bucket */
|
|
unsigned factor_base_arch_index;
|
|
unsigned factor_base_arch_index;
|
|
};
|
|
};
|
|
|
|
|
|
/* Init a bucket */
|
|
/* Init a bucket */
|
|
-static void _heteroprio_bucket_init(struct _heteroprio_bucket* bucket){
|
|
|
|
|
|
+static void _heteroprio_bucket_init(struct _heteroprio_bucket* bucket)
|
|
|
|
+{
|
|
memset(bucket, 0, sizeof(*bucket));
|
|
memset(bucket, 0, sizeof(*bucket));
|
|
bucket->tasks_queue = _starpu_create_fifo();
|
|
bucket->tasks_queue = _starpu_create_fifo();
|
|
}
|
|
}
|
|
|
|
|
|
/* Release a bucket */
|
|
/* Release a bucket */
|
|
-static void _heteroprio_bucket_release(struct _heteroprio_bucket* bucket){
|
|
|
|
|
|
+static void _heteroprio_bucket_release(struct _heteroprio_bucket* bucket)
|
|
|
|
+{
|
|
STARPU_ASSERT(_starpu_fifo_empty(bucket->tasks_queue) != 0);
|
|
STARPU_ASSERT(_starpu_fifo_empty(bucket->tasks_queue) != 0);
|
|
_starpu_destroy_fifo(bucket->tasks_queue);
|
|
_starpu_destroy_fifo(bucket->tasks_queue);
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
/* A worker is mainly composed of a fifo for the tasks
|
|
/* A worker is mainly composed of a fifo for the tasks
|
|
* and some direct access to worker properties.
|
|
* and some direct access to worker properties.
|
|
* The fifo is implemented with any array,
|
|
* The fifo is implemented with any array,
|
|
@@ -77,7 +79,8 @@ static void _heteroprio_bucket_release(struct _heteroprio_bucket* bucket){
|
|
* to write a task, access tasks_queue[(tasks_queue_index+tasks_queue_size)%HETEROPRIO_MAX_PREFETCH]
|
|
* to write a task, access tasks_queue[(tasks_queue_index+tasks_queue_size)%HETEROPRIO_MAX_PREFETCH]
|
|
*/
|
|
*/
|
|
/* ANDRA_MODIF: can use starpu fifo + starpu sched_mutex*/
|
|
/* ANDRA_MODIF: can use starpu fifo + starpu sched_mutex*/
|
|
-struct _heteroprio_worker_wrapper{
|
|
|
|
|
|
+struct _heteroprio_worker_wrapper
|
|
|
|
+{
|
|
unsigned arch_type;
|
|
unsigned arch_type;
|
|
unsigned arch_index;
|
|
unsigned arch_index;
|
|
struct _starpu_fifo_taskq *tasks_queue;
|
|
struct _starpu_fifo_taskq *tasks_queue;
|
|
@@ -88,81 +91,66 @@ struct _starpu_heteroprio_data
|
|
starpu_pthread_mutex_t policy_mutex;
|
|
starpu_pthread_mutex_t policy_mutex;
|
|
struct starpu_bitmap *waiters;
|
|
struct starpu_bitmap *waiters;
|
|
/* The bucket to store the tasks */
|
|
/* The bucket to store the tasks */
|
|
- struct _heteroprio_bucket buckets[HETEROPRIO_MAX_PRIO];
|
|
|
|
|
|
+ struct _heteroprio_bucket buckets[STARPU_HETEROPRIO_MAX_PRIO];
|
|
/* The number of buckets for each arch */
|
|
/* The number of buckets for each arch */
|
|
- unsigned nb_prio_per_arch_index[FSTARPU_NB_TYPES];
|
|
|
|
|
|
+ unsigned nb_prio_per_arch_index[STARPU_NB_TYPES];
|
|
/* The mapping to the corresponding buckets */
|
|
/* The mapping to the corresponding buckets */
|
|
- unsigned prio_mapping_per_arch_index[FSTARPU_NB_TYPES][HETEROPRIO_MAX_PRIO];
|
|
|
|
|
|
+ unsigned prio_mapping_per_arch_index[STARPU_NB_TYPES][STARPU_HETEROPRIO_MAX_PRIO];
|
|
/* The number of available tasks for a given arch (not prefetched) */
|
|
/* The number of available tasks for a given arch (not prefetched) */
|
|
- unsigned nb_remaining_tasks_per_arch_index[FSTARPU_NB_TYPES];
|
|
|
|
|
|
+ unsigned nb_remaining_tasks_per_arch_index[STARPU_NB_TYPES];
|
|
/* The total number of tasks in the bucket (not prefetched) */
|
|
/* The total number of tasks in the bucket (not prefetched) */
|
|
unsigned total_tasks_in_buckets;
|
|
unsigned total_tasks_in_buckets;
|
|
/* The total number of prefetched tasks for a given arch */
|
|
/* The total number of prefetched tasks for a given arch */
|
|
- unsigned nb_prefetched_tasks_per_arch_index[FSTARPU_NB_TYPES];
|
|
|
|
|
|
+ unsigned nb_prefetched_tasks_per_arch_index[STARPU_NB_TYPES];
|
|
/* The information for all the workers */
|
|
/* The information for all the workers */
|
|
struct _heteroprio_worker_wrapper workers_heteroprio[STARPU_NMAXWORKERS];
|
|
struct _heteroprio_worker_wrapper workers_heteroprio[STARPU_NMAXWORKERS];
|
|
/* The number of workers for a given arch */
|
|
/* The number of workers for a given arch */
|
|
- unsigned nb_workers_per_arch_index[FSTARPU_NB_TYPES];
|
|
|
|
|
|
+ unsigned nb_workers_per_arch_index[STARPU_NB_TYPES];
|
|
};
|
|
};
|
|
|
|
|
|
-
|
|
|
|
-
|
|
|
|
/** Tell how many prio there are for a given arch */
|
|
/** Tell how many prio there are for a given arch */
|
|
-void starpu_heteroprio_set_nb_prios(unsigned sched_ctx_id, enum FStarPUTypes arch, unsigned max_prio)
|
|
|
|
|
|
+void starpu_heteroprio_set_nb_prios(unsigned sched_ctx_id, enum starpu_heteroprio_types arch, unsigned max_prio)
|
|
{
|
|
{
|
|
-
|
|
|
|
struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
|
|
|
- STARPU_ASSERT(max_prio < HETEROPRIO_MAX_PRIO);
|
|
|
|
|
|
+ STARPU_ASSERT(max_prio < STARPU_HETEROPRIO_MAX_PRIO);
|
|
|
|
|
|
hp->nb_prio_per_arch_index[arch] = max_prio;
|
|
hp->nb_prio_per_arch_index[arch] = max_prio;
|
|
-
|
|
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
-
|
|
|
|
/** Set the mapping for a given arch prio=>bucket */
|
|
/** Set the mapping for a given arch prio=>bucket */
|
|
-inline void starpu_heteroprio_set_mapping(unsigned sched_ctx_id, enum FStarPUTypes arch, unsigned source_prio, unsigned dest_bucket_id)
|
|
|
|
|
|
+inline void starpu_heteroprio_set_mapping(unsigned sched_ctx_id, enum starpu_heteroprio_types arch, unsigned source_prio, unsigned dest_bucket_id)
|
|
{
|
|
{
|
|
-
|
|
|
|
- STARPU_ASSERT(dest_bucket_id < HETEROPRIO_MAX_PRIO);
|
|
|
|
|
|
+ STARPU_ASSERT(dest_bucket_id < STARPU_HETEROPRIO_MAX_PRIO);
|
|
|
|
|
|
struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
|
|
|
hp->prio_mapping_per_arch_index[arch][source_prio] = dest_bucket_id;
|
|
hp->prio_mapping_per_arch_index[arch][source_prio] = dest_bucket_id;
|
|
|
|
|
|
- hp->buckets[dest_bucket_id].valide_archs |= FStarPUTypesToArch[arch];
|
|
|
|
-
|
|
|
|
|
|
+ hp->buckets[dest_bucket_id].valid_archs |= starpu_heteroprio_types_to_arch[arch];
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
-
|
|
|
|
/** Tell which arch is the faster for the tasks of a bucket (optional) */
|
|
/** Tell which arch is the faster for the tasks of a bucket (optional) */
|
|
-inline void starpu_heteroprio_set_faster_arch(unsigned sched_ctx_id, enum FStarPUTypes arch, unsigned bucket_id){
|
|
|
|
-
|
|
|
|
- STARPU_ASSERT(bucket_id < HETEROPRIO_MAX_PRIO);
|
|
|
|
|
|
+inline void starpu_heteroprio_set_faster_arch(unsigned sched_ctx_id, enum starpu_heteroprio_types arch, unsigned bucket_id)
|
|
|
|
+{
|
|
|
|
+ STARPU_ASSERT(bucket_id < STARPU_HETEROPRIO_MAX_PRIO);
|
|
|
|
|
|
struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
|
|
|
hp->buckets[bucket_id].factor_base_arch_index = arch;
|
|
hp->buckets[bucket_id].factor_base_arch_index = arch;
|
|
|
|
|
|
hp->buckets[bucket_id].slow_factors_per_index[arch] = 0;
|
|
hp->buckets[bucket_id].slow_factors_per_index[arch] = 0;
|
|
-
|
|
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
-
|
|
|
|
/** Tell how slow is a arch for the tasks of a bucket (optional) */
|
|
/** Tell how slow is a arch for the tasks of a bucket (optional) */
|
|
-
|
|
|
|
-inline void starpu_heteroprio_set_arch_slow_factor(unsigned sched_ctx_id, enum FStarPUTypes arch, unsigned bucket_id, float slow_factor){
|
|
|
|
-
|
|
|
|
- STARPU_ASSERT(bucket_id < HETEROPRIO_MAX_PRIO);
|
|
|
|
|
|
+inline void starpu_heteroprio_set_arch_slow_factor(unsigned sched_ctx_id, enum starpu_heteroprio_types arch, unsigned bucket_id, float slow_factor)
|
|
|
|
+{
|
|
|
|
+ STARPU_ASSERT(bucket_id < STARPU_HETEROPRIO_MAX_PRIO);
|
|
|
|
|
|
struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
|
|
|
hp->buckets[bucket_id].slow_factors_per_index[arch] = slow_factor;
|
|
hp->buckets[bucket_id].slow_factors_per_index[arch] = slow_factor;
|
|
-
|
|
|
|
-}
|
|
|
|
|
|
+}
|
|
|
|
|
|
static void initialize_heteroprio_policy(unsigned sched_ctx_id)
|
|
static void initialize_heteroprio_policy(unsigned sched_ctx_id)
|
|
{
|
|
{
|
|
@@ -182,50 +170,50 @@ static void initialize_heteroprio_policy(unsigned sched_ctx_id)
|
|
STARPU_PTHREAD_MUTEX_INIT(&hp->policy_mutex, NULL);
|
|
STARPU_PTHREAD_MUTEX_INIT(&hp->policy_mutex, NULL);
|
|
|
|
|
|
unsigned idx_prio;
|
|
unsigned idx_prio;
|
|
- for(idx_prio = 0; idx_prio < HETEROPRIO_MAX_PRIO; ++idx_prio)
|
|
|
|
|
|
+ for(idx_prio = 0; idx_prio < STARPU_HETEROPRIO_MAX_PRIO; ++idx_prio)
|
|
_heteroprio_bucket_init(&hp->buckets[idx_prio]);
|
|
_heteroprio_bucket_init(&hp->buckets[idx_prio]);
|
|
|
|
|
|
- /* TODO call the callback */
|
|
|
|
|
|
+ /* TODO call the callback */
|
|
void (*init_sched)(void) = starpu_sched_ctx_get_sched_policy_init(sched_ctx_id);
|
|
void (*init_sched)(void) = starpu_sched_ctx_get_sched_policy_init(sched_ctx_id);
|
|
|
|
|
|
if(init_sched)
|
|
if(init_sched)
|
|
init_sched();
|
|
init_sched();
|
|
|
|
|
|
/* Ensure that information have been correctly filled */
|
|
/* Ensure that information have been correctly filled */
|
|
- unsigned check_all_archs[HETEROPRIO_MAX_PRIO];
|
|
|
|
- memset(check_all_archs, 0, sizeof(unsigned)*HETEROPRIO_MAX_PRIO);
|
|
|
|
|
|
+ unsigned check_all_archs[STARPU_HETEROPRIO_MAX_PRIO];
|
|
|
|
+ memset(check_all_archs, 0, sizeof(unsigned)*STARPU_HETEROPRIO_MAX_PRIO);
|
|
unsigned arch_index;
|
|
unsigned arch_index;
|
|
- for(arch_index = 0; arch_index < FSTARPU_NB_TYPES; ++arch_index)
|
|
|
|
|
|
+ for(arch_index = 0; arch_index < STARPU_NB_TYPES; ++arch_index)
|
|
{
|
|
{
|
|
- STARPU_ASSERT(hp->nb_prio_per_arch_index[arch_index] <= HETEROPRIO_MAX_PRIO);
|
|
|
|
-
|
|
|
|
- unsigned check_archs[HETEROPRIO_MAX_PRIO];
|
|
|
|
- memset(check_archs, 0, sizeof(unsigned)*HETEROPRIO_MAX_PRIO);
|
|
|
|
-
|
|
|
|
|
|
+ STARPU_ASSERT(hp->nb_prio_per_arch_index[arch_index] <= STARPU_HETEROPRIO_MAX_PRIO);
|
|
|
|
+
|
|
|
|
+ unsigned check_archs[STARPU_HETEROPRIO_MAX_PRIO];
|
|
|
|
+ memset(check_archs, 0, sizeof(unsigned)*STARPU_HETEROPRIO_MAX_PRIO);
|
|
|
|
+
|
|
for(idx_prio = 0; idx_prio < hp->nb_prio_per_arch_index[arch_index]; ++idx_prio)
|
|
for(idx_prio = 0; idx_prio < hp->nb_prio_per_arch_index[arch_index]; ++idx_prio)
|
|
{
|
|
{
|
|
const unsigned mapped_prio = hp->prio_mapping_per_arch_index[arch_index][idx_prio];
|
|
const unsigned mapped_prio = hp->prio_mapping_per_arch_index[arch_index][idx_prio];
|
|
- STARPU_ASSERT(mapped_prio <= HETEROPRIO_MAX_PRIO);
|
|
|
|
|
|
+ STARPU_ASSERT(mapped_prio <= STARPU_HETEROPRIO_MAX_PRIO);
|
|
STARPU_ASSERT(hp->buckets[mapped_prio].slow_factors_per_index[arch_index] >= 0.0);
|
|
STARPU_ASSERT(hp->buckets[mapped_prio].slow_factors_per_index[arch_index] >= 0.0);
|
|
- STARPU_ASSERT(hp->buckets[mapped_prio].valide_archs & FStarPUTypesToArch[arch_index]);
|
|
|
|
|
|
+ STARPU_ASSERT(hp->buckets[mapped_prio].valid_archs & starpu_heteroprio_types_to_arch[arch_index]);
|
|
check_archs[mapped_prio] = 1;
|
|
check_archs[mapped_prio] = 1;
|
|
check_all_archs[mapped_prio] += 1;
|
|
check_all_archs[mapped_prio] += 1;
|
|
}
|
|
}
|
|
- for(idx_prio = 0; idx_prio < HETEROPRIO_MAX_PRIO; ++idx_prio)
|
|
|
|
|
|
+ for(idx_prio = 0; idx_prio < STARPU_HETEROPRIO_MAX_PRIO; ++idx_prio)
|
|
{
|
|
{
|
|
/* Ensure the current arch use a bucket or someone else can use it */
|
|
/* Ensure the current arch use a bucket or someone else can use it */
|
|
- STARPU_ASSERT(check_archs[idx_prio] == 1 || hp->buckets[idx_prio].valide_archs == 0
|
|
|
|
- || (hp->buckets[idx_prio].valide_archs & ~FStarPUTypesToArch[arch_index]) != 0);
|
|
|
|
|
|
+ STARPU_ASSERT(check_archs[idx_prio] == 1 || hp->buckets[idx_prio].valid_archs == 0
|
|
|
|
+ || (hp->buckets[idx_prio].valid_archs & ~starpu_heteroprio_types_to_arch[arch_index]) != 0);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- /* Ensure that if a valide_archs = (STARPU_CPU|STARPU_CUDA) then check_all_archs[] = 2 for example */
|
|
|
|
-
|
|
|
|
- for(idx_prio = 0; idx_prio < HETEROPRIO_MAX_PRIO; ++idx_prio)
|
|
|
|
|
|
+ /* Ensure that if a valid_archs = (STARPU_CPU|STARPU_CUDA) then check_all_archs[] = 2 for example */
|
|
|
|
+
|
|
|
|
+ for(idx_prio = 0; idx_prio < STARPU_HETEROPRIO_MAX_PRIO; ++idx_prio)
|
|
{
|
|
{
|
|
unsigned nb_arch_on_bucket = 0;
|
|
unsigned nb_arch_on_bucket = 0;
|
|
- for(arch_index = 0; arch_index < FSTARPU_NB_TYPES; ++arch_index)
|
|
|
|
|
|
+ for(arch_index = 0; arch_index < STARPU_NB_TYPES; ++arch_index)
|
|
{
|
|
{
|
|
- if(hp->buckets[idx_prio].valide_archs & FStarPUTypesToArch[arch_index])
|
|
|
|
|
|
+ if(hp->buckets[idx_prio].valid_archs & starpu_heteroprio_types_to_arch[arch_index])
|
|
{
|
|
{
|
|
nb_arch_on_bucket += 1;
|
|
nb_arch_on_bucket += 1;
|
|
}
|
|
}
|
|
@@ -241,13 +229,15 @@ static void deinitialize_heteroprio_policy(unsigned sched_ctx_id)
|
|
/* Ensure there are no more tasks */
|
|
/* Ensure there are no more tasks */
|
|
STARPU_ASSERT(hp->total_tasks_in_buckets == 0);
|
|
STARPU_ASSERT(hp->total_tasks_in_buckets == 0);
|
|
unsigned arch_index;
|
|
unsigned arch_index;
|
|
- for(arch_index = 0; arch_index < FSTARPU_NB_TYPES; ++arch_index){
|
|
|
|
|
|
+ for(arch_index = 0; arch_index < STARPU_NB_TYPES; ++arch_index)
|
|
|
|
+ {
|
|
STARPU_ASSERT(hp->nb_remaining_tasks_per_arch_index[arch_index] == 0);
|
|
STARPU_ASSERT(hp->nb_remaining_tasks_per_arch_index[arch_index] == 0);
|
|
STARPU_ASSERT(hp->nb_prefetched_tasks_per_arch_index[arch_index] == 0);
|
|
STARPU_ASSERT(hp->nb_prefetched_tasks_per_arch_index[arch_index] == 0);
|
|
}
|
|
}
|
|
|
|
|
|
unsigned idx_prio;
|
|
unsigned idx_prio;
|
|
- for(idx_prio = 0; idx_prio < HETEROPRIO_MAX_PRIO; ++idx_prio){
|
|
|
|
|
|
+ for(idx_prio = 0; idx_prio < STARPU_HETEROPRIO_MAX_PRIO; ++idx_prio)
|
|
|
|
+ {
|
|
_heteroprio_bucket_release(&hp->buckets[idx_prio]);
|
|
_heteroprio_bucket_release(&hp->buckets[idx_prio]);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -268,7 +258,7 @@ static void add_workers_heteroprio_policy(unsigned sched_ctx_id, int *workerids,
|
|
{
|
|
{
|
|
workerid = workerids[i];
|
|
workerid = workerids[i];
|
|
memset(&hp->workers_heteroprio[workerid], 0, sizeof(hp->workers_heteroprio[workerid]));
|
|
memset(&hp->workers_heteroprio[workerid], 0, sizeof(hp->workers_heteroprio[workerid]));
|
|
- /* if the worker has alreadry belonged to this context
|
|
|
|
|
|
+ /* if the worker has already belonged to this context
|
|
the queue and the synchronization variables have been already initialized */
|
|
the queue and the synchronization variables have been already initialized */
|
|
if(hp->workers_heteroprio[workerid].tasks_queue == NULL)
|
|
if(hp->workers_heteroprio[workerid].tasks_queue == NULL)
|
|
{
|
|
{
|
|
@@ -278,19 +268,19 @@ static void add_workers_heteroprio_policy(unsigned sched_ctx_id, int *workerids,
|
|
#ifdef STARPU_USE_CPU
|
|
#ifdef STARPU_USE_CPU
|
|
case STARPU_CPU_WORKER:
|
|
case STARPU_CPU_WORKER:
|
|
hp->workers_heteroprio[workerid].arch_type = STARPU_CPU;
|
|
hp->workers_heteroprio[workerid].arch_type = STARPU_CPU;
|
|
- hp->workers_heteroprio[workerid].arch_index = FSTARPU_CPU_IDX;
|
|
|
|
|
|
+ hp->workers_heteroprio[workerid].arch_index = STARPU_CPU_IDX;
|
|
break;
|
|
break;
|
|
#endif
|
|
#endif
|
|
#ifdef STARPU_USE_CUDA
|
|
#ifdef STARPU_USE_CUDA
|
|
case STARPU_CUDA_WORKER:
|
|
case STARPU_CUDA_WORKER:
|
|
hp->workers_heteroprio[workerid].arch_type = STARPU_CUDA;
|
|
hp->workers_heteroprio[workerid].arch_type = STARPU_CUDA;
|
|
- hp->workers_heteroprio[workerid].arch_index = FSTARPU_CUDA_IDX;
|
|
|
|
|
|
+ hp->workers_heteroprio[workerid].arch_index = STARPU_CUDA_IDX;
|
|
break;
|
|
break;
|
|
#endif
|
|
#endif
|
|
#ifdef STARPU_USE_OPENCL
|
|
#ifdef STARPU_USE_OPENCL
|
|
case STARPU_OPENCL_WORKER:
|
|
case STARPU_OPENCL_WORKER:
|
|
hp->workers_heteroprio[workerid].arch_type = STARPU_OPENCL;
|
|
hp->workers_heteroprio[workerid].arch_type = STARPU_OPENCL;
|
|
- hp->workers_heteroprio[workerid].arch_index = FSTARPU_OPENCL_IDX;
|
|
|
|
|
|
+ hp->workers_heteroprio[workerid].arch_index = STARPU_OPENCL_IDX;
|
|
break;
|
|
break;
|
|
#endif
|
|
#endif
|
|
default:
|
|
default:
|
|
@@ -329,20 +319,20 @@ static int push_task_heteroprio_policy(struct starpu_task *task)
|
|
STARPU_PTHREAD_MUTEX_LOCK(&hp->policy_mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&hp->policy_mutex);
|
|
|
|
|
|
/* Retrieve the correct bucket */
|
|
/* Retrieve the correct bucket */
|
|
- STARPU_ASSERT(task->priority < HETEROPRIO_MAX_PRIO);
|
|
|
|
|
|
+ STARPU_ASSERT(task->priority < STARPU_HETEROPRIO_MAX_PRIO);
|
|
struct _heteroprio_bucket* bucket = &hp->buckets[task->priority];
|
|
struct _heteroprio_bucket* bucket = &hp->buckets[task->priority];
|
|
/* Ensure that any worker that check that list can compute the task */
|
|
/* Ensure that any worker that check that list can compute the task */
|
|
- STARPU_ASSERT(bucket->valide_archs
|
|
|
|
- && ((bucket->valide_archs ^ task->cl->where) & bucket->valide_archs) == 0);
|
|
|
|
|
|
+ STARPU_ASSERT(bucket->valid_archs
|
|
|
|
+ && ((bucket->valid_archs ^ task->cl->where) & bucket->valid_archs) == 0);
|
|
/* save the task */
|
|
/* save the task */
|
|
_starpu_fifo_push_back_task(bucket->tasks_queue,task);
|
|
_starpu_fifo_push_back_task(bucket->tasks_queue,task);
|
|
|
|
|
|
/* Inc counters */
|
|
/* Inc counters */
|
|
unsigned arch_index;
|
|
unsigned arch_index;
|
|
- for(arch_index = 0; arch_index < FSTARPU_NB_TYPES; ++arch_index)
|
|
|
|
|
|
+ for(arch_index = 0; arch_index < STARPU_NB_TYPES; ++arch_index)
|
|
{
|
|
{
|
|
/* We test the archs on the bucket and not on task->cl->where since it is restrictive */
|
|
/* We test the archs on the bucket and not on task->cl->where since it is restrictive */
|
|
- if(bucket->valide_archs & FStarPUTypesToArch[arch_index])
|
|
|
|
|
|
+ if(bucket->valid_archs & starpu_heteroprio_types_to_arch[arch_index])
|
|
hp->nb_remaining_tasks_per_arch_index[arch_index] += 1;
|
|
hp->nb_remaining_tasks_per_arch_index[arch_index] += 1;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -415,7 +405,8 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
|
|
}
|
|
}
|
|
|
|
|
|
#ifdef STARPU_NON_BLOCKING_DRIVERS
|
|
#ifdef STARPU_NON_BLOCKING_DRIVERS
|
|
- if (starpu_bitmap_get(hp->waiters, workerid)){
|
|
|
|
|
|
+ if (starpu_bitmap_get(hp->waiters, workerid))
|
|
|
|
+ {
|
|
/* Nobody woke us, avoid bothering the mutex */
|
|
/* Nobody woke us, avoid bothering the mutex */
|
|
return NULL;
|
|
return NULL;
|
|
}
|
|
}
|
|
@@ -424,8 +415,7 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
|
|
starpu_pthread_mutex_t *worker_sched_mutex;
|
|
starpu_pthread_mutex_t *worker_sched_mutex;
|
|
starpu_pthread_cond_t *worker_sched_cond;
|
|
starpu_pthread_cond_t *worker_sched_cond;
|
|
starpu_worker_get_sched_condition(workerid, &worker_sched_mutex, &worker_sched_cond);
|
|
starpu_worker_get_sched_condition(workerid, &worker_sched_mutex, &worker_sched_cond);
|
|
-
|
|
|
|
-
|
|
|
|
|
|
+
|
|
/* Note: Releasing this mutex before taking the victim mutex, to avoid interlock*/
|
|
/* Note: Releasing this mutex before taking the victim mutex, to avoid interlock*/
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(worker_sched_mutex);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(worker_sched_mutex);
|
|
|
|
|
|
@@ -435,18 +425,21 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
|
|
unsigned nb_added_tasks = 0;
|
|
unsigned nb_added_tasks = 0;
|
|
|
|
|
|
/* Check that some tasks are available for the current worker arch */
|
|
/* Check that some tasks are available for the current worker arch */
|
|
- if( hp->nb_remaining_tasks_per_arch_index[worker->arch_index] != 0 ){
|
|
|
|
|
|
+ if( hp->nb_remaining_tasks_per_arch_index[worker->arch_index] != 0 )
|
|
|
|
+ {
|
|
/* Ideally we would like to fill the prefetch array */
|
|
/* Ideally we would like to fill the prefetch array */
|
|
- unsigned nb_tasks_to_prefetch = (HETEROPRIO_MAX_PREFETCH-worker->tasks_queue->ntasks);
|
|
|
|
|
|
+ unsigned nb_tasks_to_prefetch = (STARPU_HETEROPRIO_MAX_PREFETCH-worker->tasks_queue->ntasks);
|
|
/* But there are maybe less tasks than that! */
|
|
/* But there are maybe less tasks than that! */
|
|
- if(nb_tasks_to_prefetch > hp->nb_remaining_tasks_per_arch_index[worker->arch_index]){
|
|
|
|
|
|
+ if(nb_tasks_to_prefetch > hp->nb_remaining_tasks_per_arch_index[worker->arch_index])
|
|
|
|
+ {
|
|
nb_tasks_to_prefetch = hp->nb_remaining_tasks_per_arch_index[worker->arch_index];
|
|
nb_tasks_to_prefetch = hp->nb_remaining_tasks_per_arch_index[worker->arch_index];
|
|
}
|
|
}
|
|
/* But in case there are less tasks than worker we take the minimum */
|
|
/* But in case there are less tasks than worker we take the minimum */
|
|
- if(hp->nb_remaining_tasks_per_arch_index[worker->arch_index] < starpu_sched_ctx_get_nworkers(sched_ctx_id)){
|
|
|
|
- if(worker->tasks_queue->ntasks == 0)
|
|
|
|
|
|
+ if(hp->nb_remaining_tasks_per_arch_index[worker->arch_index] < starpu_sched_ctx_get_nworkers(sched_ctx_id))
|
|
|
|
+ {
|
|
|
|
+ if(worker->tasks_queue->ntasks == 0)
|
|
nb_tasks_to_prefetch = 1;
|
|
nb_tasks_to_prefetch = 1;
|
|
- else
|
|
|
|
|
|
+ else
|
|
nb_tasks_to_prefetch = 0;
|
|
nb_tasks_to_prefetch = 0;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -459,11 +452,11 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
|
|
/* Retrieve the bucket using the mapping */
|
|
/* Retrieve the bucket using the mapping */
|
|
struct _heteroprio_bucket* bucket = &hp->buckets[hp->prio_mapping_per_arch_index[worker->arch_index][idx_prio]];
|
|
struct _heteroprio_bucket* bucket = &hp->buckets[hp->prio_mapping_per_arch_index[worker->arch_index][idx_prio]];
|
|
/* Ensure we can compute task from this bucket */
|
|
/* Ensure we can compute task from this bucket */
|
|
- STARPU_ASSERT(bucket->valide_archs & worker->arch_type);
|
|
|
|
|
|
+ STARPU_ASSERT(bucket->valid_archs & worker->arch_type);
|
|
/* Take nb_tasks_to_prefetch tasks if possible */
|
|
/* Take nb_tasks_to_prefetch tasks if possible */
|
|
- while(!_starpu_fifo_empty(bucket->tasks_queue) && nb_tasks_to_prefetch &&
|
|
|
|
- (bucket->factor_base_arch_index == 0 ||
|
|
|
|
- worker->arch_index == bucket->factor_base_arch_index ||
|
|
|
|
|
|
+ while(!_starpu_fifo_empty(bucket->tasks_queue) && nb_tasks_to_prefetch &&
|
|
|
|
+ (bucket->factor_base_arch_index == 0 ||
|
|
|
|
+ worker->arch_index == bucket->factor_base_arch_index ||
|
|
(((float)bucket->tasks_queue->ntasks)/((float)hp->nb_workers_per_arch_index[bucket->factor_base_arch_index])) >= bucket->slow_factors_per_index[worker->arch_index]))
|
|
(((float)bucket->tasks_queue->ntasks)/((float)hp->nb_workers_per_arch_index[bucket->factor_base_arch_index])) >= bucket->slow_factors_per_index[worker->arch_index]))
|
|
{
|
|
{
|
|
struct starpu_task* task = _starpu_fifo_pop_local_task(bucket->tasks_queue);
|
|
struct starpu_task* task = _starpu_fifo_pop_local_task(bucket->tasks_queue);
|
|
@@ -474,10 +467,12 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
|
|
/* Update general counter */
|
|
/* Update general counter */
|
|
hp->nb_prefetched_tasks_per_arch_index[worker->arch_index] += 1;
|
|
hp->nb_prefetched_tasks_per_arch_index[worker->arch_index] += 1;
|
|
hp->total_tasks_in_buckets -= 1;
|
|
hp->total_tasks_in_buckets -= 1;
|
|
-
|
|
|
|
- for(arch_index = 0; arch_index < FSTARPU_NB_TYPES; ++arch_index){
|
|
|
|
|
|
+
|
|
|
|
+ for(arch_index = 0; arch_index < STARPU_NB_TYPES; ++arch_index)
|
|
|
|
+ {
|
|
/* We test the archs on the bucket and not on task->cl->where since it is restrictive */
|
|
/* We test the archs on the bucket and not on task->cl->where since it is restrictive */
|
|
- if(bucket->valide_archs & FStarPUTypesToArch[arch_index]){
|
|
|
|
|
|
+ if(bucket->valid_archs & starpu_heteroprio_types_to_arch[arch_index])
|
|
|
|
+ {
|
|
hp->nb_remaining_tasks_per_arch_index[arch_index] -= 1;
|
|
hp->nb_remaining_tasks_per_arch_index[arch_index] -= 1;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -486,21 +481,23 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
|
|
// TODO starpu_prefetch_task_input_on_node(task, workerid);
|
|
// TODO starpu_prefetch_task_input_on_node(task, workerid);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- STARPU_ASSERT_MSG(nb_tasks_to_prefetch == 0, "but %d and worker %d \n", nb_tasks_to_prefetch, workerid);
|
|
|
|
|
|
+ STARPU_ASSERT_MSG(nb_tasks_to_prefetch == 0, "nb_tasks_to_prefetch is %d on worker %d \n", nb_tasks_to_prefetch, workerid);
|
|
}
|
|
}
|
|
|
|
|
|
struct starpu_task* task = NULL;
|
|
struct starpu_task* task = NULL;
|
|
|
|
|
|
/* The worker has some tasks in its queue */
|
|
/* The worker has some tasks in its queue */
|
|
- if(worker->tasks_queue->ntasks){
|
|
|
|
|
|
+ if(worker->tasks_queue->ntasks)
|
|
|
|
+ {
|
|
task = _starpu_fifo_pop_task(worker->tasks_queue, workerid);
|
|
task = _starpu_fifo_pop_task(worker->tasks_queue, workerid);
|
|
hp->nb_prefetched_tasks_per_arch_index[worker->arch_index] -= 1;
|
|
hp->nb_prefetched_tasks_per_arch_index[worker->arch_index] -= 1;
|
|
}
|
|
}
|
|
/* Otherwise look if we can steal some work */
|
|
/* Otherwise look if we can steal some work */
|
|
- else if(hp->nb_prefetched_tasks_per_arch_index[worker->arch_index]){
|
|
|
|
|
|
+ else if(hp->nb_prefetched_tasks_per_arch_index[worker->arch_index])
|
|
|
|
+ {
|
|
/* If HETEROPRIO_MAX_PREFETCH==1 it should not be possible to steal work */
|
|
/* If HETEROPRIO_MAX_PREFETCH==1 it should not be possible to steal work */
|
|
- STARPU_ASSERT(HETEROPRIO_MAX_PREFETCH != 1);
|
|
|
|
-
|
|
|
|
|
|
+ STARPU_ASSERT(STARPU_HETEROPRIO_MAX_PREFETCH != 1);
|
|
|
|
+
|
|
struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
|
|
struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
|
|
|
|
|
|
struct starpu_sched_ctx_iterator it;
|
|
struct starpu_sched_ctx_iterator it;
|
|
@@ -514,7 +511,7 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
|
|
if(current_worker == victim)
|
|
if(current_worker == victim)
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/* circular loop */
|
|
/* circular loop */
|
|
while(1)
|
|
while(1)
|
|
{
|
|
{
|
|
@@ -523,7 +520,7 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
|
|
victim = workers->get_next_master(workers, &it);
|
|
victim = workers->get_next_master(workers, &it);
|
|
if(victim == workerid)
|
|
if(victim == workerid)
|
|
continue;
|
|
continue;
|
|
-
|
|
|
|
|
|
+
|
|
/* If it is the same arch and there is a task to steal */
|
|
/* If it is the same arch and there is a task to steal */
|
|
if(hp->workers_heteroprio[victim].arch_index == worker->arch_index
|
|
if(hp->workers_heteroprio[victim].arch_index == worker->arch_index
|
|
&& hp->workers_heteroprio[victim].tasks_queue->ntasks){
|
|
&& hp->workers_heteroprio[victim].tasks_queue->ntasks){
|
|
@@ -535,12 +532,13 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
|
|
STARPU_PTHREAD_MUTEX_LOCK(victim_sched_mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(victim_sched_mutex);
|
|
|
|
|
|
if(hp->workers_heteroprio[victim].arch_index == worker->arch_index
|
|
if(hp->workers_heteroprio[victim].arch_index == worker->arch_index
|
|
- && hp->workers_heteroprio[victim].tasks_queue->ntasks){
|
|
|
|
|
|
+ && hp->workers_heteroprio[victim].tasks_queue->ntasks)
|
|
|
|
+ {
|
|
/* steal the last added task */
|
|
/* steal the last added task */
|
|
task = starpu_task_list_pop_back(&hp->workers_heteroprio[victim].tasks_queue->taskq);
|
|
task = starpu_task_list_pop_back(&hp->workers_heteroprio[victim].tasks_queue->taskq);
|
|
/* we steal a task update global counter */
|
|
/* we steal a task update global counter */
|
|
hp->nb_prefetched_tasks_per_arch_index[hp->workers_heteroprio[victim].arch_index] -= 1;
|
|
hp->nb_prefetched_tasks_per_arch_index[hp->workers_heteroprio[victim].arch_index] -= 1;
|
|
-
|
|
|
|
|
|
+
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(victim_sched_mutex);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(victim_sched_mutex);
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
@@ -550,16 +548,19 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- if (!task){
|
|
|
|
|
|
+ if (!task)
|
|
|
|
+ {
|
|
/* Tell pushers that we are waiting for tasks_queue for us */
|
|
/* Tell pushers that we are waiting for tasks_queue for us */
|
|
starpu_bitmap_set(hp->waiters, workerid);
|
|
starpu_bitmap_set(hp->waiters, workerid);
|
|
}
|
|
}
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&hp->policy_mutex);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&hp->policy_mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(worker_sched_mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(worker_sched_mutex);
|
|
|
|
|
|
- if(task){
|
|
|
|
|
|
+ if(task)
|
|
|
|
+ {
|
|
unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
|
|
unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
|
|
- if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS){
|
|
|
|
|
|
+ if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
|
|
|
|
+ {
|
|
starpu_sched_ctx_move_task_to_ctx(task, child_sched_ctx);
|
|
starpu_sched_ctx_move_task_to_ctx(task, child_sched_ctx);
|
|
starpu_sched_ctx_revert_task_counters(sched_ctx_id, task->flops);
|
|
starpu_sched_ctx_revert_task_counters(sched_ctx_id, task->flops);
|
|
return NULL;
|
|
return NULL;
|
|
@@ -567,7 +568,8 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
|
|
}
|
|
}
|
|
|
|
|
|
/* if we have task (task) me way have some in the queue (worker->tasks_queue_size) that was freshly addeed (nb_added_tasks) */
|
|
/* if we have task (task) me way have some in the queue (worker->tasks_queue_size) that was freshly addeed (nb_added_tasks) */
|
|
- if(task && worker->tasks_queue->ntasks && nb_added_tasks && starpu_get_prefetch_flag()){
|
|
|
|
|
|
+ if(task && worker->tasks_queue->ntasks && nb_added_tasks && starpu_get_prefetch_flag())
|
|
|
|
+ {
|
|
const unsigned memory_node = starpu_worker_get_memory_node(workerid);
|
|
const unsigned memory_node = starpu_worker_get_memory_node(workerid);
|
|
|
|
|
|
/* prefetch the new task that I own but protecte my node from work stealing during the prefetch */
|
|
/* prefetch the new task that I own but protecte my node from work stealing during the prefetch */
|
|
@@ -577,14 +579,14 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
|
|
/* prefetch task but stop in case we now some one may steal a task from us */
|
|
/* prefetch task but stop in case we now some one may steal a task from us */
|
|
/* while(nb_added_tasks && hp->nb_remaining_tasks_per_arch_index[worker->arch_index] != 0){ */
|
|
/* while(nb_added_tasks && hp->nb_remaining_tasks_per_arch_index[worker->arch_index] != 0){ */
|
|
/* /\* prefetch from closest to end task *\/ */
|
|
/* /\* prefetch from closest to end task *\/ */
|
|
- /* starpu_prefetch_task_input_on_node(worker->tasks_queue[(worker->tasks_queue_index+worker->tasks_queue_size-nb_added_tasks)%HETEROPRIO_MAX_PREFETCH], memory_node); */
|
|
|
|
|
|
+ /* starpu_prefetch_task_input_on_node(worker->tasks_queue[(worker->tasks_queue_index+worker->tasks_queue_size-nb_added_tasks)%STARPU_HETEROPRIO_MAX_PREFETCH], memory_node); */
|
|
/* nb_added_tasks -= 1; */
|
|
/* nb_added_tasks -= 1; */
|
|
/* } */
|
|
/* } */
|
|
|
|
|
|
/* TOTO beranger check this out - is this how you planned to prefetch tasks ? */
|
|
/* TOTO beranger check this out - is this how you planned to prefetch tasks ? */
|
|
struct starpu_task *task_to_prefetch = NULL;
|
|
struct starpu_task *task_to_prefetch = NULL;
|
|
for (task_to_prefetch = starpu_task_list_begin(&worker->tasks_queue->taskq);
|
|
for (task_to_prefetch = starpu_task_list_begin(&worker->tasks_queue->taskq);
|
|
- (task_to_prefetch != starpu_task_list_end(&worker->tasks_queue->taskq) &&
|
|
|
|
|
|
+ (task_to_prefetch != starpu_task_list_end(&worker->tasks_queue->taskq) &&
|
|
nb_added_tasks && hp->nb_remaining_tasks_per_arch_index[worker->arch_index] != 0);
|
|
nb_added_tasks && hp->nb_remaining_tasks_per_arch_index[worker->arch_index] != 0);
|
|
task_to_prefetch = starpu_task_list_next(task_to_prefetch))
|
|
task_to_prefetch = starpu_task_list_next(task_to_prefetch))
|
|
{
|
|
{
|