Selaa lähdekoodia

don't allow tasks that can't be parallel to be submitted to a ctx

Andra Hugo 10 vuotta sitten
vanhempi
commit
2bf349d9aa

+ 3 - 1
include/starpu_task.h

@@ -180,6 +180,7 @@ struct starpu_task
 
 	unsigned sched_ctx;
 	int hypervisor_tag;
+	unsigned possibly_parallel;
 
 	starpu_task_bundle_t bundle;
 
@@ -222,7 +223,8 @@ struct starpu_task
 	.dyn_handles = NULL,				\
 	.dyn_interfaces = NULL,				\
 	.dyn_modes = NULL,				\
-	.name = NULL                        		\
+	.name = NULL,                        		\
+	.possibly_parallel = 0                        	\
 }
 
 #define STARPU_TASK_GET_NBUFFERS(task) ((unsigned)((task)->cl->nbuffers == STARPU_VARIABLE_NBUFFERS ? ((task)->nbuffers) : ((task)->cl->nbuffers)))

+ 19 - 18
include/starpu_task_util.h

@@ -32,24 +32,25 @@ extern "C"
 
 void starpu_create_sync_task(starpu_tag_t sync_tag, unsigned ndeps, starpu_tag_t *deps, void (*callback)(void *), void *callback_arg);
 
-#define STARPU_VALUE		 (1<<18)
-#define STARPU_CALLBACK		 (2<<18)
-#define STARPU_CALLBACK_WITH_ARG (3<<18)
-#define STARPU_CALLBACK_ARG	 (4<<18)
-#define STARPU_PRIORITY		 (5<<18)
-#define STARPU_EXECUTE_ON_NODE	 (6<<18)
-#define STARPU_EXECUTE_ON_DATA	 (7<<18)
-#define STARPU_DATA_ARRAY        (8<<18)
-#define STARPU_TAG               (9<<18)
-#define STARPU_HYPERVISOR_TAG	 (10<<18)
-#define STARPU_FLOPS	         (11<<18)
-#define STARPU_SCHED_CTX	 (12<<18)
-#define STARPU_PROLOGUE_CALLBACK   (13<<18)
-#define STARPU_PROLOGUE_CALLBACK_ARG (14<<18)
-#define STARPU_PROLOGUE_CALLBACK_POP   (15<<18)
-#define STARPU_PROLOGUE_CALLBACK_POP_ARG (16<<18)
-#define STARPU_EXECUTE_ON_WORKER (17<<18)
-#define STARPU_TAG_ONLY          (18<<18)
+#define STARPU_VALUE		 (1<<20)
+#define STARPU_CALLBACK		 (2<<20)
+#define STARPU_CALLBACK_WITH_ARG (3<<20)
+#define STARPU_CALLBACK_ARG	 (4<<20)
+#define STARPU_PRIORITY		 (5<<20)
+#define STARPU_EXECUTE_ON_NODE	 (6<<20)
+#define STARPU_EXECUTE_ON_DATA	 (7<<20)
+#define STARPU_DATA_ARRAY        (8<<20)
+#define STARPU_TAG               (9<<20)
+#define STARPU_HYPERVISOR_TAG	 (10<<20)
+#define STARPU_FLOPS	         (11<<20)
+#define STARPU_SCHED_CTX	 (12<<20)
+#define STARPU_PROLOGUE_CALLBACK   (13<<20)
+#define STARPU_PROLOGUE_CALLBACK_ARG (14<<20)
+#define STARPU_PROLOGUE_CALLBACK_POP   (15<<20)
+#define STARPU_PROLOGUE_CALLBACK_POP_ARG (16<<20)
+#define STARPU_EXECUTE_ON_WORKER (17<<20)
+#define STARPU_TAG_ONLY          (18<<20)
+#define STARPU_POSSIBLY_PARALLEL    (19<<20)
 
 struct starpu_task *starpu_task_build(struct starpu_codelet *cl, ...);
 int starpu_task_insert(struct starpu_codelet *cl, ...);

+ 1 - 1
src/core/sched_ctx.c

@@ -1069,7 +1069,7 @@ int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struct _star
 	if(workers->init_iterator)
 		workers->init_iterator(workers, &it);
 
-	while(workers->has_next(workers, &it))
+	while(workers->has_next_master(workers, &it))
 	{
 		worker = workers->get_next(workers, &it);
 		STARPU_ASSERT_MSG(worker < STARPU_NMAXWORKERS, "worker id %d", worker);

+ 9 - 2
src/core/workers.c

@@ -287,8 +287,15 @@ static int _starpu_can_use_nth_implementation(enum starpu_worker_archtype arch,
 int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
-//	if(sched_ctx->parallel_sect[workerid] || (!sched_ctx->policy && workerid != sched_ctx->main_master)) return 0;
-	if(!sched_ctx->sched_policy && workerid != sched_ctx->main_master) return 0;
+
+	/* if the task can't be parallel don't submit it to a ctx */
+	unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx->id);
+        if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
+		if(!task->possibly_parallel) return 0;
+
+	/* if the worker is blocked in a parallel ctx don't submit tasks on it */
+	if(sched_ctx->parallel_sect[workerid] ) return 0;
+
 	/* TODO: check that the task operand sizes will fit on that device */
 	return (task->cl->where & config.workers[workerid].worker_mask) &&
 		_starpu_can_use_nth_implementation(config.workers[workerid].arch, task->cl, nimpl) &&

+ 13 - 0
src/util/starpu_task_insert_utils.c

@@ -126,6 +126,10 @@ void _starpu_task_insert_get_args_size(va_list varg_list, unsigned *nbuffers, si
 		{
 			(void)va_arg(varg_list, int);
 		}
+		else if (arg_type==STARPU_POSSIBLY_PARALLEL)
+		{
+			(void)va_arg(varg_list, unsigned);
+		}
 		else if (arg_type==STARPU_FLOPS)
 		{
 			(void)va_arg(varg_list, double);
@@ -239,6 +243,10 @@ int _starpu_codelet_pack_args(void **arg_buffer, size_t arg_buffer_size, va_list
 		{
 			(void)va_arg(varg_list, int);
 		}
+		else if (arg_type==STARPU_POSSIBLY_PARALLEL)
+		{
+			(void)va_arg(varg_list, unsigned);
+		}
 		else if (arg_type==STARPU_FLOPS)
 		{
 			(void)va_arg(varg_list, double);
@@ -415,6 +423,11 @@ void _starpu_task_insert_create(void *arg_buffer, size_t arg_buffer_size, struct
 			int hypervisor_tag = va_arg(varg_list, int);
 			(*task)->hypervisor_tag = hypervisor_tag;
 		}
+		else if (arg_type==STARPU_POSSIBLY_PARALLEL)
+		{
+			unsigned possibly_parallel = va_arg(varg_list, unsigned);
+			(*task)->possibly_parallel = possibly_parallel;
+		}
 		else if (arg_type==STARPU_FLOPS)
 		{
 			double flops = va_arg(varg_list, double);