浏览代码

add a prolog callback to a task (+ example)

Andra Hugo 11 年之前
父节点
当前提交
f108a67609

+ 2 - 0
examples/Makefile.am

@@ -182,6 +182,7 @@ examplebin_PROGRAMS +=				\
 	spmd/vector_scal_spmd			\
 	spmv/spmv				\
 	callback/callback			\
+	callback/prolog		\
 	incrementer/incrementer			\
 	binary/binary				\
 	interface/complex			\
@@ -256,6 +257,7 @@ STARPU_EXAMPLES +=				\
 	spmd/vector_scal_spmd			\
 	spmv/spmv				\
 	callback/callback			\
+	callback/prolog		\
 	incrementer/incrementer			\
 	binary/binary				\
 	interface/complex			\

+ 86 - 0
examples/callback/prolog.c

@@ -0,0 +1,86 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2009, 2010, 2013  Université de Bordeaux 1
+ * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include <sys/time.h>
+
+#define FPRINTF(ofile, fmt, ...) do { if (!getenv("STARPU_SSILENT")) {fprintf(ofile, fmt, ## __VA_ARGS__); }} while(0)
+
+starpu_data_handle_t handle;
+
+void cpu_codelet(void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
+{
+	int *val = (int *)STARPU_VARIABLE_GET_PTR(descr[0]);
+
+	*val += 1;
+}
+
+struct starpu_codelet cl =
+{
+	.modes = { STARPU_RW },
+	.cpu_funcs = {cpu_codelet, NULL},
+	.nbuffers = 1,
+	.name = "callback"
+};
+
+void callback_func(void *callback_arg)
+{
+	int ret;
+
+	struct starpu_task *task = starpu_task_create();
+	task->cl = &cl;
+	task->handles[0] = handle;
+
+	ret = starpu_task_submit(task);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+}
+
+int main(int argc, char **argv)
+{
+	int v=40;
+	int ret;
+
+	ret = starpu_init(NULL);
+	if (ret == -ENODEV)
+		return 77;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	starpu_variable_data_register(&handle, STARPU_MAIN_RAM, (uintptr_t)&v, sizeof(int));
+
+	struct starpu_task *task = starpu_task_create();
+	task->cl = &cl;
+	task->prolog_func = callback_func;
+	task->prolog_arg = NULL;
+	task->handles[0] = handle;
+
+	ret = starpu_task_submit(task);
+	if (ret == -ENODEV) goto enodev;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+
+	starpu_task_wait_for_all();
+	starpu_data_unregister(handle);
+
+	FPRINTF(stderr, "v -> %d\n", v);
+
+	starpu_shutdown();
+
+	return 0;
+
+enodev:
+	starpu_shutdown();
+	return 77;
+}

+ 3 - 0
include/starpu_task.h

@@ -126,6 +126,9 @@ struct starpu_task
 	void (*callback_func)(void *);
 	void *callback_arg;
 
+	void (*prolog_func)(void *);
+	void *prolog_arg;
+
 	unsigned use_tag;
 	starpu_tag_t tag_id;
 

+ 2 - 0
include/starpu_task_util.h

@@ -43,6 +43,8 @@ void starpu_create_sync_task(starpu_tag_t sync_tag, unsigned ndeps, starpu_tag_t
 #define STARPU_HYPERVISOR_TAG	 (1<<28)
 #define STARPU_FLOPS	         (1<<29)
 #define STARPU_SCHED_CTX	 (1<<30)
+#define STARPU_FREE_DEP_CALLBACK (1<<31)
+#define STARPU_FREE_DEP_CALLBACK_ARG (1<<32)
 
 int starpu_insert_task(struct starpu_codelet *cl, ...);
 

+ 1 - 1
sc_hypervisor/src/policies_utils/lp_tools.c

@@ -44,7 +44,7 @@ double sc_hypervisor_lp_get_nworkers_per_ctx(int nsched_ctxs, int ntypes_of_work
 			flops[i] = sc_w->ready_flops/1000000000.0; /* in gflops*/
 		else
 		{
-			if((sc_w->ready_flops/1000000000.0) < 0.5)
+			if((sc_w->ready_flops/1000000000.0) <= 0.000002)
 				flops[i] = 0.0;
 			else
 				flops[i] = sc_w->remaining_flops/1000000000.0; /* in gflops*/

+ 2 - 0
sc_hypervisor/src/sc_hypervisor.c

@@ -848,7 +848,9 @@ static void notify_poped_task(unsigned sched_ctx, int worker, struct starpu_task
 		if(hypervisor.policy.handle_poped_task)
 			hypervisor.policy.handle_poped_task(sched_ctx, worker, task, footprint);
 	}
+	starpu_pthread_mutex_lock(&act_hypervisor_mutex);
 	_ack_resize_completed(sched_ctx, worker);
+	starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
 	if(hypervisor.sched_ctx_w[sched_ctx].poped_tasks[worker] % 200 == 0)
 		_print_current_time();
 }

+ 2 - 0
src/core/jobs.c

@@ -413,6 +413,8 @@ unsigned _starpu_enforce_deps_and_schedule(struct _starpu_job *j)
 		return 0;
 	}
 
+	if(j->task->prolog_func)
+		j->task->prolog_func(j->task->prolog_arg);
 	ret = _starpu_push_task(j);
 
 	_STARPU_LOG_OUT();

+ 4 - 0
src/core/sched_policy.c

@@ -313,6 +313,10 @@ static int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struc
 
 int _starpu_push_task(struct _starpu_job *j)
 {
+
+	if(j->task->prolog_func)
+		j->task->prolog_func(j->task->prolog_arg);
+
 	struct starpu_task *task = j->task;
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
 	unsigned nworkers = 0;

+ 32 - 0
src/util/starpu_insert_task_utils.c

@@ -172,6 +172,15 @@ int _starpu_codelet_pack_args(void **arg_buffer, size_t arg_buffer_size, va_list
 		{
 			(void)va_arg(varg_list, void *);
 		}
+		else if (arg_type==STARPU_FREE_DEP_CALLBACK)
+		{
+			va_arg(varg_list, _starpu_callback_func_t);
+			va_arg(varg_list, void *);
+		}
+		else if (arg_type==STARPU_FREE_DEP_CALLBACK_ARG)
+		{
+			(void)va_arg(varg_list, void *);
+		}
 		else if (arg_type==STARPU_PRIORITY)
 		{
 			(void)va_arg(varg_list, int);
@@ -228,6 +237,11 @@ int _starpu_insert_task_create_and_submit(void *arg_buffer, size_t arg_buffer_si
 
 	cl_arg_wrapper->callback_func = NULL;
 
+	struct insert_task_cb_wrapper *free_dep_cl_arg_wrapper = (struct insert_task_cb_wrapper *) malloc(sizeof(struct insert_task_cb_wrapper));
+	STARPU_ASSERT(free_dep_cl_arg_wrapper);
+
+	free_dep_cl_arg_wrapper->callback_func = NULL;
+
 	while((arg_type = va_arg(varg_list, int)) != 0)
 	{
 		if (arg_type & STARPU_R || arg_type & STARPU_W || arg_type & STARPU_SCRATCH || arg_type & STARPU_REDUX)
@@ -296,6 +310,20 @@ int _starpu_insert_task_create_and_submit(void *arg_buffer, size_t arg_buffer_si
 			void *callback_arg = va_arg(varg_list, void *);
 			cl_arg_wrapper->callback_arg = callback_arg;
 		}
+		else if (arg_type==STARPU_FREE_DEP_CALLBACK)
+		{
+			void (*callback_func)(void *);
+			void *callback_arg;
+			callback_func = va_arg(varg_list, _starpu_callback_func_t);
+			callback_arg = va_arg(varg_list, void *);
+			free_dep_cl_arg_wrapper->callback_func = callback_func;
+			free_dep_cl_arg_wrapper->callback_arg = callback_arg;
+		}
+		else if (arg_type==STARPU_FREE_DEP_CALLBACK_ARG)
+		{
+			void *callback_arg = va_arg(varg_list, void *);
+			cl_arg_wrapper->callback_arg = callback_arg;
+		}
 		else if (arg_type==STARPU_PRIORITY)
 		{
 			/* Followed by a priority level */
@@ -346,6 +374,9 @@ int _starpu_insert_task_create_and_submit(void *arg_buffer, size_t arg_buffer_si
 	(*task)->callback_func = starpu_task_insert_callback_wrapper;
 	(*task)->callback_arg = cl_arg_wrapper;
 
+	(*task)->prolog_func = starpu_task_insert_callback_wrapper;
+	(*task)->prolog_arg = cl_arg_wrapper;
+
 	int ret = starpu_task_submit(*task);
 
 	if (STARPU_UNLIKELY(ret == -ENODEV))
@@ -356,6 +387,7 @@ int _starpu_insert_task_create_and_submit(void *arg_buffer, size_t arg_buffer_si
 			(*task)->cl->name ? (*task)->cl->name :
 			((*task)->cl->model && (*task)->cl->model->symbol)?(*task)->cl->model->symbol:"none");
 		free(cl_arg_wrapper);
+		free(free_dep_cl_arg_wrapper);
 	}
 
 	return ret;