Browse Source

Merge commit '9d2739398e7d75c1697b9138212e250be3168f57'

Samuel Thibault 4 years ago
parent
commit
b641803d23

+ 7 - 1
configure.ac

@@ -3452,7 +3452,13 @@ then
 	AC_PYTHON_MODULE(joblib,[joblib_avail=yes],[joblib_avail=no])
 	AC_MSG_RESULT($joblib_avail)
 	if test "$joblib_avail" = "no" ; then
-		AC_MSG_ERROR([python3 module joblib missing, cannot build StarPU python interface])
+		AC_MSG_ERROR([python3 module joblib missing, cannot build StarPU python interface (consider running 'pip3 install joblib')])
+	fi
+	AC_MSG_CHECKING(for python3 module cloudpickle)
+	AC_PYTHON_MODULE(cloudpickle,[cloudpickle_avail=yes],[cloudpickle_avail=no])
+	AC_MSG_RESULT($cloudpickle_avail)
+	if test "$cloudpickle_avail" = "no" ; then
+		AC_MSG_ERROR([python3 module cloudpickle missing, cannot build StarPU python interface (consider running 'pip3 install cloudpickle')])
 	fi
 	AC_MSG_CHECKING(for python3 module numpy)
 	AC_PYTHON_MODULE(numpy,[numpy_avail=yes],[numpy_avail=no])

File diff suppressed because it is too large
+ 1 - 1
doc/doxygen/chapters/400_python.doxy


+ 28 - 0
include/starpu_task.h

@@ -770,6 +770,24 @@ struct starpu_task
 	size_t cl_arg_size;
 
 	/**
+	   Optional pointer which points to the return value of submitted task.
+	   The default value is <c>NULL</c>. starpu_codelet_pack_arg() 
+	   and starpu_codelet_unpack_arg() can be used to respectively
+	   pack and unpack the return value into and form it. starpu_task::cl_ret 
+	   can be used for MPI support. The only requirement is that
+	   the size of the return value must be set in starpu_task::cl_ret_size .
+	*/
+	void *cl_ret;
+
+	/**
+	   Optional field. The buffer of starpu_codelet_pack_arg() 
+	   and starpu_codelet_unpack_arg() can be allocated with 
+	   the starpu_task::cl_ret_size bytes starting at address starpu_task::cl_ret.
+	   starpu_task::cl_ret_size can be used for MPI supoort.
+	*/
+	size_t cl_ret_size;
+
+	/**
 	   Optional field, the default value is <c>NULL</c>. This is a
 	   function pointer of prototype <c>void (*f)(void *)</c>
 	   which specifies a possible callback. If this pointer is
@@ -855,6 +873,14 @@ struct starpu_task
 	unsigned cl_arg_free:1;
 
 	/**
+	   Optional field. In case starpu_task::cl_ret was allocated
+	   by the application through <c>malloc()</c>, setting
+	   starpu_task::cl_ret_free to 1 makes StarPU automatically
+	   call <c>free(cl_ret)</c> when destroying the task.
+	*/
+	unsigned cl_ret_free:1;
+
+	/**
 	   Optional field. In case starpu_task::callback_arg was
 	   allocated by the application through <c>malloc()</c>,
 	   setting starpu_task::callback_arg_free to 1 makes StarPU
@@ -1264,6 +1290,8 @@ struct starpu_task
 	.where = -1,					\
 	.cl_arg = NULL,					\
 	.cl_arg_size = 0,				\
+	.cl_ret = NULL,					\
+	.cl_ret_size = 0,				\
 	.callback_func = NULL,				\
 	.callback_arg = NULL,				\
 	.priority = STARPU_DEFAULT_PRIO,		\

+ 38 - 0
include/starpu_task_util.h

@@ -510,6 +510,44 @@ void starpu_codelet_pack_arg_fini(struct starpu_codelet_pack_arg_data *state, vo
 void starpu_codelet_unpack_args(void *cl_arg, ...);
 
 /**
+   Initialize struct starpu_codelet_pack_arg_data before calling
+   starpu_codelet_unpack_arg(). This will pass the starpu_task->cl_arg 
+   and starpu_task->cl_arg_size to the content of struct starpu_codelet_pack_arg_data.
+*/
+void starpu_codelet_unpack_arg_init(struct starpu_codelet_pack_arg_data *state, void **cl_arg, size_t *cl_arg_size);
+
+/**
+   Unpack one argument from struct starpu_codelet_pack_arg \p state into ptr with a copy. 
+   That structure has to be initialized before with starpu_codelet_unpack_arg_init().
+   Size is stored in starpu_task->cl_arg, and it is a known parameter in this function .
+*/
+void starpu_codelet_unpack_arg(struct starpu_codelet_pack_arg_data *state, void *ptr, size_t size);
+
+/**
+   Unpack one argument from struct starpu_codelet_pack_arg \p state into ptr with a copy.
+   That structure has to be initialized before with starpu_codelet_unpack_arg_init(). 
+   Size is stored in starpu_task->cl_arg, and it is an unknown parameter in this function. 
+   It will be returned from starpu_task->cl_arg with a copy.
+*/
+void starpu_codelet_dup_arg(struct starpu_codelet_pack_arg_data *state, void **ptr, size_t *size);
+
+/**
+   Unpack one argument from struct starpu_codelet_pack_arg \p state into ptr, and the pointer of ptr will be returned.
+   That structure has to be initialized before with starpu_codelet_unpack_arg_init(). 
+   Size is stored in starpu_task->cl_arg, and it is an unknown parameter in this function. 
+   It will be returned from starpu_task->cl_arg with a copy.
+*/
+void starpu_codelet_pick_arg(struct starpu_codelet_pack_arg_data *state, void **ptr, size_t *size); 
+
+void starpu_codelet_unpack_arg_fini(struct starpu_codelet_pack_arg_data *state);
+
+/**
+   Call this function during unpacking to skip saving the argument in ptr.
+*/
+void starpu_codelet_unpack_discard_arg(struct starpu_codelet_pack_arg_data *state);
+
+
+/**
    Similar to starpu_codelet_unpack_args(), but if any parameter is 0,
    copy the part of \p cl_arg that has not been read in \p buffer
    which can then be used in a later call to one of the unpack

+ 4 - 0
src/core/task.c

@@ -383,6 +383,10 @@ void _starpu_task_destroy(struct starpu_task *task)
 		if (task->cl_arg_free)
 			free(task->cl_arg);
 
+		/* Does user want StarPU release cl_ret ? */
+		if (task->cl_ret_free)
+			free(task->cl_ret);
+
 		/* Does user want StarPU release callback_arg ? */
 		if (task->callback_arg_free)
 			free(task->callback_arg);

+ 3 - 0
src/drivers/mp_common/mp_common.h

@@ -139,6 +139,9 @@ struct mp_task
 	void **interfaces;
 	unsigned nb_interfaces;
 	void *cl_arg;
+	unsigned cl_arg_size;
+	void *cl_ret;
+	unsigned cl_ret_size;
 	unsigned coreid;
 	enum starpu_codelet_type type;
 	int is_parallel_task;

+ 30 - 3
src/drivers/mp_common/sink_common.c

@@ -511,7 +511,7 @@ static void _starpu_sink_common_pre_execution_message(struct _starpu_mp_node *no
 	_starpu_sink_common_append_message(node, message);
 }
 
-/* Append to the message list a "STARPU_EXECUTION_COMPLETED" message
+/* Append to the message list a "STARPU_EXECUTION_COMPLETED" message and cl_ret
  */
 static void _starpu_sink_common_execution_completed_message(struct _starpu_mp_node *node, struct mp_task *task)
 {
@@ -521,10 +521,23 @@ static void _starpu_sink_common_execution_completed_message(struct _starpu_mp_no
 		message->type = STARPU_MP_COMMAND_EXECUTION_DETACHED_COMPLETED;
 	else
 		message->type = STARPU_MP_COMMAND_EXECUTION_COMPLETED;
-	_STARPU_MALLOC(message->buffer, sizeof(int));
-	*(int*) message->buffer = task->coreid;
+
 	message->size = sizeof(int);
 
+	/* If the user didn't give any cl_ret, there is no need to send it */
+	 if (task->cl_ret)
+	 {
+	 	STARPU_ASSERT(task->cl_ret_size);
+	 	message->size += task->cl_ret_size;
+	 }
+
+	_STARPU_MALLOC(message->buffer, message->size);
+
+	*(int*) message->buffer = task->coreid;
+
+	 if (task->cl_ret)
+	 	memcpy( message->buffer+sizeof(int), task->cl_ret, task->cl_ret_size);
+
 	/* Append the message to the queue */
 	_starpu_sink_common_append_message(node, message);
 }
@@ -602,8 +615,21 @@ static void _starpu_sink_common_execute_kernel(struct _starpu_mp_node *node, int
 	{
 		if (_starpu_get_disable_kernels() <= 0)
 		{
+			struct starpu_task s_task;
+			starpu_task_init(&s_task);
+
+			/*copy cl_arg and cl_arg_size from mp_task into starpu_task*/
+			s_task.cl_arg=task->cl_arg;
+			s_task.cl_arg_size=task->cl_arg_size;
+
+			_starpu_set_current_task(&s_task);
 			/* execute the task */
 			task->kernel(task->interfaces,task->cl_arg);
+			_starpu_set_current_task(NULL);
+
+			/*copy cl_ret and cl_ret_size from starpu_task into mp_task*/
+			task->cl_ret=s_task.cl_ret;
+			task->cl_ret_size=s_task.cl_ret_size;
 		}
 	}
 
@@ -756,6 +782,7 @@ void _starpu_sink_common_execute(struct _starpu_mp_node *node, void *arg, int ar
 		unsigned cl_arg_size = arg_size - (arg_ptr - (uintptr_t) arg);
 		_STARPU_MALLOC(task->cl_arg, cl_arg_size);
 		memcpy(task->cl_arg, (void *) arg_ptr, cl_arg_size);
+		task->cl_arg_size=cl_arg_size;
 	}
 	else
 		task->cl_arg = NULL;

+ 18 - 2
src/drivers/mp_common/source_common.c

@@ -88,15 +88,31 @@ static int _starpu_src_common_process_completed_job(struct _starpu_mp_node *node
 {
 	int coreid;
 
-	STARPU_ASSERT(sizeof(coreid) == arg_size);
+	uintptr_t arg_ptr = (uintptr_t) arg;
 
-	coreid = *(int *) arg;
+	coreid = *(int *) arg_ptr;
+	arg_ptr += sizeof(coreid);
 
 	struct _starpu_worker *worker = &workerset->workers[coreid];
 	struct _starpu_job *j = _starpu_get_job_associated_to_task(worker->current_task);
 
+	struct starpu_task *task = j->task;
+	STARPU_ASSERT(task);
+
 	struct _starpu_worker * old_worker = _starpu_get_local_worker_key();
 
+	/* Was cl_ret sent ? */
+	if (arg_size > arg_ptr - (uintptr_t) arg)
+	{
+		/* Copy cl_ret into the task */
+		unsigned cl_ret_size = arg_size - (arg_ptr - (uintptr_t) arg);
+		_STARPU_MALLOC(task->cl_ret, cl_ret_size);
+		memcpy(task->cl_ret, (void *) arg_ptr, cl_ret_size);
+		task->cl_ret_size=cl_ret_size;
+	}
+	else
+		task->cl_ret = NULL;
+
         /* if arg is not copied we release the mutex */
         if (!stored)
                 STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);

+ 60 - 0
src/util/starpu_task_insert_utils.c

@@ -63,6 +63,66 @@ void starpu_codelet_pack_arg_fini(struct starpu_codelet_pack_arg_data *state, vo
 	*cl_arg_size = state->arg_buffer_size;
 }
 
+void starpu_codelet_unpack_arg_init(struct starpu_codelet_pack_arg_data *state, void **cl_arg, size_t *cl_arg_size)
+{
+	state->arg_buffer = *cl_arg;
+	state->arg_buffer_size = *cl_arg_size;
+	state->current_offset = sizeof(int);
+	state->nargs = 0;
+}
+
+void starpu_codelet_unpack_arg(struct starpu_codelet_pack_arg_data *state, void *ptr, size_t size)
+{
+	size_t ptr_size;
+	memcpy((void *)&ptr_size, state->arg_buffer+state->current_offset, sizeof(ptr_size));
+	assert(ptr_size==size);
+	state->current_offset += sizeof(ptr_size);
+
+	memcpy(ptr, state->arg_buffer+state->current_offset, ptr_size);
+	state->current_offset += ptr_size;
+
+	state->nargs++;
+}
+
+void starpu_codelet_dup_arg(struct starpu_codelet_pack_arg_data *state, void **ptr, size_t *size)
+{
+	memcpy((void*)size, state->arg_buffer+state->current_offset, sizeof(*size));
+	state->current_offset += sizeof(*size);
+
+	*ptr = malloc(*size);
+	memcpy(*ptr, state->arg_buffer+state->current_offset, *size);
+	state->current_offset += *size;
+
+	state->nargs++;
+}
+
+void starpu_codelet_pick_arg(struct starpu_codelet_pack_arg_data *state, void **ptr, size_t *size)
+{
+	memcpy((void*)size, state->arg_buffer+state->current_offset, sizeof(*size));
+	state->current_offset += sizeof(*size);
+	
+	*ptr = state->arg_buffer+state->current_offset;
+	state->current_offset += *size;
+
+	state->nargs++;
+}
+
+void starpu_codelet_unpack_arg_fini(struct starpu_codelet_pack_arg_data *state)
+{
+
+}
+
+void starpu_codelet_unpack_discard_arg(struct starpu_codelet_pack_arg_data *state)
+{
+	size_t ptr_size;
+	memcpy((void *)&ptr_size, state->arg_buffer+state->current_offset, sizeof(ptr_size));
+	
+	state->current_offset += sizeof(ptr_size);
+	state->current_offset += ptr_size;
+
+	state->nargs++;
+}
+
 int _starpu_codelet_pack_args(void **arg_buffer, size_t *arg_buffer_size, va_list varg_list)
 {
 	int arg_type;

+ 5 - 1
starpupy/examples/Makefile.am

@@ -1,6 +1,6 @@
 # StarPU --- Runtime system for heterogeneous multicore architectures.
 #
-# Copyright (C) 2020       Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+# Copyright (C) 2020-2021  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
 #
 # StarPU is free software; you can redistribute it and/or modify
 # it under the terms of the GNU Lesser General Public License as published by
@@ -20,6 +20,10 @@ SUBDIRS =
 
 CLEANFILES = *.gcno *.gcda *.linkinfo
 
+if STARPU_USE_MPI_MASTER_SLAVE
+TESTS_ENVIRONMENT	=	$(MPI_RUN_ARGS) MPI_LAUNCHER="$(MPI_LAUNCHER)" LOADER_ARGS="--mpirun"
+endif
+
 TESTS	=
 TESTS	+=	starpu_py.sh
 TESTS	+=	starpu_py_parallel.sh

+ 52 - 20
starpupy/examples/execute.sh.in

@@ -1,7 +1,7 @@
 #!@REALBASH@
 # StarPU --- Runtime system for heterogeneous multicore architectures.
 #
-# Copyright (C) 2020       Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+# Copyright (C) 2020-2021  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
 #
 # StarPU is free software; you can redistribute it and/or modify
 # it under the terms of the GNU Lesser General Public License as published by
@@ -20,18 +20,58 @@ exampledir=@STARPU_SRC_DIR@/starpupy/examples
 modpath=@STARPU_BUILD_DIR@/src/.libs:
 pypath=@STARPU_BUILD_DIR@/starpupy/src/build:$PYTHONPATH
 
-valgrind=""
-gdb=""
-if test "$1" == "--valgrind"
-then
-    valgrind=1
-    shift
-fi
-if test "$1" == "--gdb"
+LOADER=@PYTHON@
+
+if test -z "$MPI_LAUNCHER"
 then
-    gdb=1
-    shift
+    MPI_LAUNCHER="mpiexec -np 2"
 fi
+mpi=""
+gdb=""
+
+read_arg()
+{
+    do_shift=0
+    if test "$1" == "--valgrind"
+    then
+	export PYTHONMALLOC=malloc
+	LOADER="valgrind --track-origins=yes @PYTHON@"
+	do_shift=1
+    elif test "$1" == "--gdb"
+    then
+	gdb="gdb"
+	if test "$mpi" == "mpi"
+	then
+	    LOADER="$MPI_LAUNCHER xterm -sl 10000 -hold -e gdb --args @PYTHON@"
+	else
+	    LOADER="gdb --args @PYTHON@"
+	fi
+	do_shift=1
+    elif test "$1" == "--mpirun"
+    then
+	mpi="mpi"
+	if test "$gdb" == "gdb"
+	then
+	    LOADER="$MPI_LAUNCHER xterm -sl 10000 -hold -e gdb --args @PYTHON@"
+	else
+	    LOADER="$MPI_LAUNCHER @PYTHON@"
+	fi
+	do_shift=1
+    fi
+}
+
+for x in $*
+do
+    read_arg $x
+    if test $do_shift == 1
+    then
+	shift
+    fi
+done
+for x in $LOADER_ARGS
+do
+    read_arg $x
+done
 
 examplefile=$1
 if test -f $examplefile
@@ -47,13 +87,5 @@ fi
 shift
 
 set -x
-if test "$valgrind" == "1"
-then
-    PYTHONPATH=$pypath LD_LIBRARY_PATH=$modpath PYTHONMALLOC=malloc valgrind --track-origins=yes @PYTHON@ $pythonscript $*
-elif test "$gdb" == "1"
-then
-    PYTHONPATH=$pypath LD_LIBRARY_PATH=$modpath gdb --args @PYTHON@ $pythonscript $*
-else
-    PYTHONPATH=$pypath LD_LIBRARY_PATH=$modpath @PYTHON@ $pythonscript $*
-fi
+PYTHONPATH=$pypath LD_LIBRARY_PATH=$modpath $LOADER $pythonscript $*
 

+ 9 - 2
starpupy/examples/starpu_py.py

@@ -13,10 +13,13 @@
 #
 # See the GNU Lesser General Public License in COPYING.LGPL for more details.
 #
+from math import sqrt
 import starpu
+from starpu import starpupy
 import time
 import asyncio
 
+
 ############################################################################
 #function no input no output print hello world
 def hello():
@@ -119,7 +122,7 @@ async def main():
     res4 = await fut4
     print("The result of function add is :", res4)
 
-	#submit function "sub"
+	#submit function "sub" but only provide function name
     fut5 = starpu.task_submit()(sub, 6, 2, 5.9)
     res5 = await fut5
     print("The result of function sub is:", res5)
@@ -135,7 +138,11 @@ async def main():
     print("The first argument of this function is the result of Example 8")
     print("The result of function is", res7)
 
-asyncio.run(main())
+    fut8 = starpu.task_submit()("sqrt", 4)
+    res8 = await fut8
+    print("The result of function sqrt is:", res8)
 
+asyncio.run(main())
 
+starpupy.shutdown()
 #starpu.task_wait_for_all()

+ 2 - 2
starpupy/examples/starpu_py_np.py

@@ -14,6 +14,7 @@
 # See the GNU Lesser General Public License in COPYING.LGPL for more details.
 #
 import starpu
+from starpu import starpupy
 import asyncio
 import numpy as np
 
@@ -36,5 +37,4 @@ async def main():
 
 asyncio.run(main())
 
-
-#starpu.task_wait_for_all()
+starpupy.shutdown()

+ 19 - 17
starpupy/examples/starpu_py_parallel.py

@@ -15,6 +15,7 @@
 #
 import starpu
 import starpu.joblib
+from starpu import starpupy
 import time
 import asyncio
 from math import sqrt
@@ -125,7 +126,7 @@ N=100
 # b=np.arange(N, 2*N, 1)
 
 displayPlot=False
-listX=[10, 100, 1000, 10000]
+listX=[10, 100]
 for arg in sys.argv[1:]:
         if arg == "-long":
                 listX = [10, 100, 1000, 10000, 100000, 1000000, 10000000]
@@ -134,7 +135,7 @@ for arg in sys.argv[1:]:
 
 for x in listX:
 	for X in range(x, x*10, x):
-		print("X=",X)
+		#print("X=",X)
 		starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="log_list")(starpu.joblib.delayed(log10)(i+1)for i in range(X))
 		A=np.arange(1,X+1,1)
 		starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="log_arr")(starpu.joblib.delayed(log10_arr)(A))
@@ -201,7 +202,6 @@ print("--(multi_2arr)(A, B)")
 n, m = 4, 5
 A = np.arange(n*m).reshape(n, m)
 B = np.arange(n*m, 2*n*m, 1).reshape(n, m)
-print("The input arrays are A", A, "B", B)
 start_exec6=time.time()
 start_cpu6=time.process_time()
 starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="multi_2arr")(starpu.joblib.delayed(multi_2arr)(A, B))
@@ -209,7 +209,6 @@ end_exec6=time.time()
 end_cpu6=time.process_time()
 print("the program execution time is", end_exec6-start_exec6)
 print("the cpu execution time is", end_cpu6-start_cpu6)
-print("The return arrays are A", A, "B", B)
 
 print("--(scal)(2, t=(j for j in a))")
 a=np.arange(N)
@@ -223,7 +222,6 @@ print("the cpu execution time is", end_cpu7-start_cpu7)
 
 print("--(scal)(2,A)")
 A=np.arange(N)
-print("The input array is", A)
 start_exec8=time.time()
 start_cpu8=time.process_time()
 starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="scal")(starpu.joblib.delayed(scal)(2,A))
@@ -231,12 +229,10 @@ end_exec8=time.time()
 end_cpu8=time.process_time()
 print("the program execution time is", end_exec8-start_exec8)
 print("the cpu execution time is", end_cpu8-start_cpu8)
-print("The return array is", A)
 
 print("--(add_scal)(t1=A,t2=B,a=2)")
 A=np.arange(N)
 B=np.arange(N)
-print("The input arrays are A", A, "B", B)
 start_exec9=time.time()
 start_cpu9=time.process_time()
 starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="add_scal")(starpu.joblib.delayed(add_scal)(t1=A,t2=B,a=2))
@@ -244,7 +240,6 @@ end_exec9=time.time()
 end_cpu9=time.process_time()
 print("the program execution time is", end_exec9-start_exec9)
 print("the cpu execution time is", end_cpu9-start_cpu9)
-print("The return arrays are A", A, "B", B)
 
 
 print("--input is iterable function list")
@@ -270,35 +265,39 @@ async def main():
 	print("--(sqrt)(i**2)for i in range(N)")
 	fut1=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="sqrt")(starpu.joblib.delayed(sqrt)(i**2)for i in range(N))
 	res1=await fut1
-	#print(res1)
+	print("The result is", sum(res1,[]))
 
 	print("--(multi)(i,j) for i,j in zip(a,b)")
 	a=np.arange(N)
 	b=np.arange(N, 2*N, 1)
+	print("The inputs are a", a, "b", b)
 	fut2=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="multi")(starpu.joblib.delayed(multi)(i,j) for i,j in zip(a,b))
 	res2=await fut2
-	#print(res2)
+	print("The result is", sum(res2,[]))
 
 	print("--(scal_arr)((i for i in b), A)")
 	A=np.arange(N)
 	b=np.arange(N, 2*N, 1)
+	print("The input arrays are A", A, "b", b)
 	fut3=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="scal_arr")(starpu.joblib.delayed(scal_arr)((i for i in b), A))
 	res3=await fut3
-	#print(res3)
+	print("The return array is", np.concatenate(res3))
 
 	print("--(multi_list)((i,j) for i,j in zip(a,b))")
 	a=np.arange(N)
 	b=np.arange(N, 2*N, 1)
+	print("The input lists are a", a, "b", b)
 	fut4=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="multi_list")(starpu.joblib.delayed(multi_list)((i,j) for i,j in zip(a,b)))
 	res4=await fut4
-	#print(res4)
+	print("The result is", sum(res4,[]))
 
 	print("--(multi_2arr)((i for i in a), (j for j in b))")
 	a=np.arange(N)
 	b=np.arange(N, 2*N, 1)
+	print("The input lists are a", a, "b", b)
 	fut5=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="multi_2arr")(starpu.joblib.delayed(multi_2arr)((i for i in a), (j for j in b)))
 	res5=await fut5
-	#print(res5)
+	print("The result is", sum(res5,[]))
 
 	print("--(multi_2arr)(b=B, a=A)")
 	A=np.arange(N)
@@ -306,21 +305,22 @@ async def main():
 	print("The input arrays are A", A, "B", B)
 	fut6=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="multi_2arr")(starpu.joblib.delayed(multi_2arr)(b=B, a=A))
 	res6=await fut6
-	print("The return arrays are A", A, "B", B)
+	print("The return array is", np.concatenate(res6))
 
 
 	print("--(scal)(2, (j for j in a))")
 	a=np.arange(N)
+	print("The input list is a", a)
 	fut7=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="scal")(starpu.joblib.delayed(scal)(2, (j for j in a)))
 	res7=await fut7
-	#print(res6)
+	print("The result is", sum(res7,[]))
 
 	print("--(scal)(2,t=A)")
 	A=np.arange(N)
 	print("The input array is", A)
 	fut8=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="scal")(starpu.joblib.delayed(scal)(2,t=A))
 	res8=await fut8
-	print("The return array is", A)
+	print("The return array is", np.concatenate(res8))
 
 	print("--(scal)(2,A,B)")
 	A=np.arange(N)
@@ -328,7 +328,7 @@ async def main():
 	print("The input arrays are A", A, "B", B)
 	fut9=starpu.joblib.Parallel(mode="future", n_jobs=-1, perfmodel="add_scal")(starpu.joblib.delayed(add_scal)(2,A,B))
 	res9=await fut9
-	print("The return arrays are A", A, "B", B)
+	print("The return array is", np.concatenate(res9))
 
 	print("--input is iterable function list")
 	fut10=starpu.joblib.Parallel(mode="future", n_jobs=-1)(g_func)
@@ -348,3 +348,5 @@ starpu.perfmodel_plot(perfmodel="func",view=displayPlot)
 
 starpu.perfmodel_plot(perfmodel="log_list",view=displayPlot)
 starpu.perfmodel_plot(perfmodel="log_arr",view=displayPlot)
+
+starpupy.shutdown()

+ 2 - 1
starpupy/src/Makefile.am

@@ -1,6 +1,6 @@
 # StarPU --- Runtime system for heterogeneous multicore architectures.
 #
-# Copyright (C) 2020       Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+# Copyright (C) 2020-2021       Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
 #
 # StarPU is free software; you can redistribute it and/or modify
 # it under the terms of the GNU Lesser General Public License as published by
@@ -45,6 +45,7 @@ install-exec-local:
 if STARPU_BUILD_STARPUPY
 clean-local:
 	$(PYTHON) setup.py clean
+	rm -fr build
 	rm -f starpu/*py starpu/*c
 endif
 

+ 3 - 3
starpupy/src/intermedia.py

@@ -41,12 +41,12 @@ def dict_perf_generator(perfsymbol):
 	return p
 
 #add options in function task_submit
-def task_submit(*, name=None, synchronous=0, priority=0, color=None, flops=None, perfmodel=None):
+def task_submit(*, name=None, synchronous=0, priority=0, color=None, flops=None, perfmodel=None, sizebase=0):
 	if perfmodel==None:
-		dict_option={'name': name, 'synchronous': synchronous, 'priority': priority, 'color': color, 'flops': flops, 'perfmodel': None}
+		dict_option={'name': name, 'synchronous': synchronous, 'priority': priority, 'color': color, 'flops': flops, 'perfmodel': None, 'sizebase': sizebase}
 	else:
 		p=dict_perf_generator(perfmodel)
-		dict_option={'name': name, 'synchronous': synchronous, 'priority': priority, 'color': color, 'flops': flops, 'perfmodel': p.get_struct()}
+		dict_option={'name': name, 'synchronous': synchronous, 'priority': priority, 'color': color, 'flops': flops, 'perfmodel': p.get_struct(), 'sizebase': sizebase}
 
 	def call_task_submit(f, *args):
 		fut=starpupy._task_submit(f, *args, dict_option)

+ 11 - 3
starpupy/src/joblib.py

@@ -94,7 +94,7 @@ def future_generator(iterable, n_jobs, dict_task):
 		# the function is always the first element
 		f=iterable[0]
 		# get the name of formal arguments of f
-		formal_args=inspect.getargspec(f).args
+		formal_args=inspect.getfullargspec(f).args
 		# get the arguments list
 		args=[]
 		# argument is arbitrary in iterable[1]
@@ -136,14 +136,21 @@ def future_generator(iterable, n_jobs, dict_task):
 		for i in range(n_block):
 			# generate the argument list
 			L_args=[]
+			sizebase=0
 			for j in range(len(args)):
 				if type(args[j]) is np.ndarray or isinstance(args[j],types.GeneratorType):
 					L_args.append(args_split[j][i])
+					if sizebase==0:
+						sizebase=len(args_split[j][i])
+					elif sizebase==len(args_split[j][i]):
+						continue
+					else:
+						raise SystemExit('Error: all arrays should be split into equal size')
 				else:
 					L_args.append(args[j])
 			#print("L_args is", L_args)
 			fut=starpu.task_submit(name=dict_task['name'], synchronous=dict_task['synchronous'], priority=dict_task['priority'],\
-								   color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'])\
+								   color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'], sizebase=sizebase)\
 				                  (f, *L_args)
 			L_fut.append(fut)
 		return L_fut
@@ -169,8 +176,9 @@ def future_generator(iterable, n_jobs, dict_task):
 		# operation in each split list
 		L_fut=[]
 		for i in range(len(L_split)):
+			sizebase=len(L_split[i])
 			fut=starpu.task_submit(name=dict_task['name'], synchronous=dict_task['synchronous'], priority=dict_task['priority'],\
-								   color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'])\
+								   color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'], sizebase=sizebase)\
 				                  (lf, L_split[i])
 			L_fut.append(fut)
 		return L_fut

+ 311 - 105
starpupy/src/starpu_task_wrapper.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2020       Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ * Copyright (C) 2020-2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -27,74 +27,207 @@
 #include <numpy/arrayobject.h>
 #endif
 
-/*macro*/
-#if defined(Py_DEBUG) || defined(DEBUG)
-extern void _Py_CountReferences(FILE*);
-#define CURIOUS(x) { fprintf(stderr, __FILE__ ":%d ", __LINE__); x; }
-#else
-#define CURIOUS(x)
-#endif
-#define MARKER()        CURIOUS(fprintf(stderr, "\n"))
-#define DESCRIBE(x)     CURIOUS(fprintf(stderr, "  " #x "=%d\n", x))
-#define DESCRIBE_HEX(x) CURIOUS(fprintf(stderr, "  " #x "=%08x\n", x))
-#define COUNTREFS()     CURIOUS(_Py_CountReferences(stderr))
-/*******/
-
 /*********************Functions passed in task_submit wrapper***********************/
 
-static PyObject *asyncio_module; /*python asyncio library*/
+static PyObject *asyncio_module; /*python asyncio module*/
 
-/*structure contains parameters which are passed to starpu_task.cl_arg*/
-struct codelet_args
+static char* starpu_cloudpickle_dumps(PyObject *obj, PyObject **obj_bytes, Py_ssize_t *obj_data_size)
 {
-	PyObject *f; /*the python function passed in*/
-	PyObject *argList; /*argument list of python function passed in*/
-	PyObject *rv; /*return value when using PyObject_CallObject call the function f*/
-	PyObject *fut; /*asyncio.Future*/
-	PyObject *lp; /*asyncio.Eventloop*/
-};
+	PyObject *cloudpickle_module = PyImport_ImportModule("cloudpickle");
+	if (cloudpickle_module == NULL)
+	{
+		printf("can't find cloudpickle module\n");
+		exit(1);
+	}
+	PyObject *dumps = PyObject_GetAttrString(cloudpickle_module, "dumps");
+	*obj_bytes= PyObject_CallFunctionObjArgs(dumps, obj, NULL);
 
-/*function passed to starpu_codelet.cpu_func*/
-void codelet_func(void *buffers[], void *cl_arg)
-{
-	struct codelet_args *cst = (struct codelet_args*) cl_arg;
+	char* obj_data;
+	PyBytes_AsStringAndSize(*obj_bytes, &obj_data, obj_data_size);
 
-	/*make sure we own the GIL*/
-	PyGILState_STATE state = PyGILState_Ensure();
+	return obj_data;
+}
 
-	/*verify that the function is a proper callable*/
-	if (!PyCallable_Check(cst->f))
+static PyObject* starpu_cloudpickle_loads(char* pyString, Py_ssize_t pyString_size)
+{
+	PyObject *pickle_module = PyImport_ImportModule("pickle");
+	if (pickle_module == NULL)
 	{
-		printf("py_callback: expected a callable function\n");
+		printf("can't find pickle module\n");
 		exit(1);
 	}
+	PyObject *loads = PyObject_GetAttrString(pickle_module, "loads");
+	PyObject *obj_bytes_str = PyBytes_FromStringAndSize(pyString, pyString_size);
+	PyObject *obj = PyObject_CallFunctionObjArgs(loads, obj_bytes_str, NULL);
 
-	/*check the arguments of python function passed in*/
+	Py_DECREF(obj_bytes_str);
+
+	return obj;
+}
+
+/* prologue_callback_func*/
+void prologue_cb_func(void *cl_arg)
+{
+	PyObject *func_data;
+	Py_ssize_t func_data_size;
+	PyObject *argList;
+	PyObject *fut;
+	PyObject *loop;
+	int sb;
+
+	/*make sure we own the GIL*/
+	PyGILState_STATE state = PyGILState_Ensure();
+
+	struct starpu_task *task = starpu_task_get_current();
+	/*Initialize struct starpu_codelet_unpack_arg_data*/
+	struct starpu_codelet_pack_arg_data data_org;
+	starpu_codelet_unpack_arg_init(&data_org, &task->cl_arg, &task->cl_arg_size);
+
+	/*get func_py char**/
+	starpu_codelet_pick_arg(&data_org, &func_data, &func_data_size);
+	/*get argList*/
+	starpu_codelet_unpack_arg(&data_org, &argList, sizeof(argList));
+	/*get fut*/
+	starpu_codelet_unpack_arg(&data_org, &fut, sizeof(fut));
+	/*get loop*/
+	starpu_codelet_unpack_arg(&data_org, &loop, sizeof(loop));
+	/*get sb*/
+	starpu_codelet_unpack_arg(&data_org, &sb, sizeof(sb));
+
+	/*Repack the data*/
+	/*Initialize struct starpu_codelet_pack_arg_data*/
+	struct starpu_codelet_pack_arg_data data;
+	starpu_codelet_pack_arg_init(&data);
+
+	/*repack func_data*/
+	starpu_codelet_pack_arg(&data, func_data, func_data_size);
+
+	/*check if there is Future in argList, if so, get the Future result*/
 	int i;
-	for(i=0; i < PyTuple_Size(cst->argList); i++)
+	for(i=0; i < PyTuple_Size(argList); i++)
 	{
-		PyObject *obj = PyTuple_GetItem(cst->argList, i);
-		const char *tp = Py_TYPE(obj)->tp_name;
+		PyObject *obj=PyTuple_GetItem(argList, i);
+		const char* tp = Py_TYPE(obj)->tp_name;
 		if(strcmp(tp, "_asyncio.Future") == 0)
 		{
 			/*if one of arguments is Future, get its result*/
 			PyObject *fut_result = PyObject_CallMethod(obj, "result", NULL);
 			/*replace the Future argument to its result*/
-			PyTuple_SetItem(cst->argList, i, fut_result);
+			PyTuple_SetItem(argList, i, fut_result);
 		}
-		/*else if (strcmp(tp, "numpy.ndarray")==0)
-		  {
-		  printf("array is %p\n", obj);
-		  }*/
 	}
 
-	/*call the python function*/
-	PyObject *pRetVal = PyObject_CallObject(cst->f, cst->argList);
-	//const char *tp = Py_TYPE(pRetVal)->tp_name;
-	//printf("return value type is %s\n", tp);
-	cst->rv = pRetVal;
+	/*use cloudpickle to dump dumps argList*/
+	Py_ssize_t arg_data_size;
+	PyObject *arg_bytes;
+	char* arg_data = starpu_cloudpickle_dumps(argList, &arg_bytes, &arg_data_size);
+	starpu_codelet_pack_arg(&data, arg_data, arg_data_size);
+	Py_DECREF(arg_bytes);
+	/*repack fut*/
+	starpu_codelet_pack_arg(&data, &fut, sizeof(fut));
+	/*repack loop*/
+	starpu_codelet_pack_arg(&data, &loop, sizeof(loop));
+	/*repack sb*/
+	starpu_codelet_pack_arg(&data, &sb, sizeof(sb));
+	/*finish repacking data and store the struct in cl_arg*/
+	starpu_codelet_pack_arg_fini(&data, &task->cl_arg, &task->cl_arg_size);
+
+	/*restore previous GIL state*/
+	PyGILState_Release(state);
+}
+
+/*function passed to starpu_codelet.cpu_func*/
+void starpupy_codelet_func(void *buffers[], void *cl_arg)
+{
+	char* func_data;
+	Py_ssize_t func_data_size;
+	PyObject *func_py; /*the python function passed in*/
+	char* arg_data;
+	Py_ssize_t arg_data_size;
+	PyObject *argList; /*argument list of python function passed in*/
+
+	/*make sure we own the GIL*/
+	PyGILState_STATE state = PyGILState_Ensure();
+
+	//struct codelet_args *cst = (struct codelet_args*) cl_arg;
+
+	struct starpu_task *task = starpu_task_get_current();
+	/*Initialize struct starpu_codelet_unpack_arg_data*/
+	struct starpu_codelet_pack_arg_data data;
+	starpu_codelet_unpack_arg_init(&data, &task->cl_arg, &task->cl_arg_size);
+
+	/*get func_py char**/
+	starpu_codelet_pick_arg(&data, &func_data, &func_data_size);
+	/*get argList char**/
+	starpu_codelet_pick_arg(&data, &arg_data, &arg_data_size);
+	/*skip fut*/
+	starpu_codelet_unpack_discard_arg(&data);
+	/*skip loop*/
+	starpu_codelet_unpack_discard_arg(&data);
+	/*skip sb*/
+	starpu_codelet_unpack_discard_arg(&data);
+
+	/*use cloudpickle to load function (maybe only function name)*/
+	PyObject *pFunc=starpu_cloudpickle_loads(func_data, func_data_size);
+
+	/* if the function name is passed in*/
+	const char* tp_func = Py_TYPE(pFunc)->tp_name;
+	if (strcmp(tp_func, "str")==0)
+	{
+		/*getattr(sys.modules[__name__], "<functionname>")*/
+		/*get sys.modules*/
+		PyObject *sys_modules = PyImport_GetModuleDict();
+		/*get sys.modules[__name__]*/
+		PyObject *sys_modules_name=PyDict_GetItemString(sys_modules,"__main__");
+		/*get function object*/
+		func_py=PyObject_GetAttr(sys_modules_name,pFunc);
+	}
+	else
+	{
+		func_py=pFunc;
+	}
+
+	/*use cloudpickle to load argList*/
+	argList=starpu_cloudpickle_loads(arg_data, arg_data_size);
 
-	//Py_DECREF(cst->f);
+	/*verify that the function is a proper callable*/
+	if (!PyCallable_Check(func_py))
+	{
+		printf("py_callback: expected a callable function\n");
+		exit(1);
+	}
+
+	/*call the python function get the return value rv*/
+	PyObject *rv = PyObject_CallObject(func_py, argList);
+
+	/*Initialize struct starpu_codelet_pack_arg_data for return value*/
+	struct starpu_codelet_pack_arg_data data_ret;
+	starpu_codelet_pack_arg_init(&data_ret);
+
+	/*if the result is None type, pack NULL without using cloudpickle*/
+	if (rv==Py_None)
+	{
+		char* rv_data=NULL;
+		Py_ssize_t rv_data_size=0;
+		starpu_codelet_pack_arg(&data_ret, &rv_data_size, sizeof(rv_data_size));
+	    starpu_codelet_pack_arg(&data_ret, &rv_data, sizeof(rv_data));
+	}
+	/*else use cloudpickle to dump rv*/
+	else
+	{
+		Py_ssize_t rv_data_size;
+		PyObject *rv_bytes;
+		char* rv_data = starpu_cloudpickle_dumps(rv, &rv_bytes, &rv_data_size);
+		starpu_codelet_pack_arg(&data_ret, &rv_data_size, sizeof(rv_data_size));
+	    starpu_codelet_pack_arg(&data_ret, rv_data, rv_data_size);
+	    Py_DECREF(rv_bytes);
+	}
+
+    /*store the return value in task_>cl_ret*/
+    starpu_codelet_pack_arg_fini(&data_ret, &task->cl_ret, &task->cl_ret_size);
+
+    Py_DECREF(func_py);
+    Py_DECREF(argList);
 
 	/*restore previous GIL state*/
 	PyGILState_Release(state);
@@ -103,24 +236,61 @@ void codelet_func(void *buffers[], void *cl_arg)
 /*function passed to starpu_task.callback_func*/
 void cb_func(void *v)
 {
-	struct starpu_task *task = starpu_task_get_current();
-	struct codelet_args *cst = (struct codelet_args*) task->cl_arg;
+	PyObject *fut; /*asyncio.Future*/
+	PyObject *loop; /*asyncio.Eventloop*/
+	char* rv_data;
+	Py_ssize_t rv_data_size;
+	PyObject *rv; /*return value when using PyObject_CallObject call the function f*/
 
 	/*make sure we own the GIL*/
 	PyGILState_STATE state = PyGILState_Ensure();
 
+	struct starpu_task *task = starpu_task_get_current();
+
+	/*Initialize struct starpu_codelet_unpack_arg_data data*/
+	struct starpu_codelet_pack_arg_data data;
+	starpu_codelet_unpack_arg_init(&data, &task->cl_arg, &task->cl_arg_size);
+
+	/*skip func_py*/
+	starpu_codelet_unpack_discard_arg(&data);
+	/*skip argList*/
+	starpu_codelet_unpack_discard_arg(&data);
+	/*get fut*/
+	starpu_codelet_unpack_arg(&data, &fut, sizeof(fut));
+	/*get loop*/
+	starpu_codelet_unpack_arg(&data, &loop, sizeof(loop));
+	/*skip sb*/
+	starpu_codelet_unpack_discard_arg(&data);
+
+	/*Initialize struct starpu_codelet_unpack_arg_data data*/
+	struct starpu_codelet_pack_arg_data data_ret;
+	starpu_codelet_unpack_arg_init(&data_ret, &task->cl_ret, &task->cl_ret_size);
+	/*get rv_data_size*/
+	starpu_codelet_unpack_arg(&data_ret, &rv_data_size, sizeof(rv_data_size));
+
+	/*if the rv_data_size is 0, the result is None type*/
+	if (rv_data_size==0)
+	{
+		starpu_codelet_unpack_discard_arg(&data_ret);
+		rv=Py_None;
+	}
+	/*else use cloudpickle to load rv*/
+	else
+	{
+		starpu_codelet_pick_arg(&data_ret, &rv_data, &rv_data_size);
+		rv=starpu_cloudpickle_loads(rv_data, rv_data_size);
+	}
+
 	/*set the Future result and mark the Future as done*/
-	PyObject *set_result = PyObject_GetAttrString(cst->fut, "set_result");
-	PyObject *loop_callback = PyObject_CallMethod(cst->lp, "call_soon_threadsafe", "(O,O)", set_result, cst->rv);
+	PyObject *set_result = PyObject_GetAttrString(fut, "set_result");
+	PyObject *loop_callback = PyObject_CallMethod(loop, "call_soon_threadsafe", "(O,O)", set_result, rv);
 
 	Py_DECREF(loop_callback);
 	Py_DECREF(set_result);
-	Py_DECREF(cst->rv);
-	Py_DECREF(cst->fut);
-	Py_DECREF(cst->lp);
-	Py_DECREF(cst->argList);
+	Py_DECREF(rv);
+	Py_DECREF(fut);
+	Py_DECREF(loop);
 
-	//Py_DECREF(perfmodel);
 	struct starpu_codelet *func_cl=(struct starpu_codelet *) task->cl;
 	if (func_cl->model != NULL)
 	{
@@ -160,29 +330,26 @@ static PyObject *PyTask_FromTask(struct starpu_task *task)
 /***********************************************************************************/
 static size_t sizebase (struct starpu_task *task, unsigned nimpl)
 {
-	int n=0;
-	struct codelet_args *cst = (struct codelet_args*) task->cl_arg;
-
-	/*get the result of function*/
-	PyObject *obj=cst->rv;
-	/*get the length of result*/
-	const char *tp = Py_TYPE(obj)->tp_name;
-#ifdef STARPU_PYTHON_HAVE_NUMPY
-	/*if the result is a numpy array*/
-	if (strcmp(tp, "numpy.ndarray")==0)
-		n = PyArray_SIZE(obj);
-	else
-#endif
-	/*if the result is a list*/
-	if (strcmp(tp, "list")==0)
-		n = PyList_Size(obj);
-	/*else error*/
-	else
-	{
-		printf("starpu_perfmodel::size_base: the type of function result is unrecognized\n");
-		exit(1);
-	}
-	return n;
+	int sb;
+
+	/*Initialize struct starpu_codelet_unpack_arg_data*/
+	struct starpu_codelet_pack_arg_data data;
+	starpu_codelet_unpack_arg_init(&data, &task->cl_arg, &task->cl_arg_size);
+
+	/*skip func_py*/
+	//starpu_codelet_unpack_discard_arg(&data);
+	starpu_codelet_unpack_discard_arg(&data);
+	/*skip argList*/
+	//starpu_codelet_unpack_discard_arg(&data);
+	starpu_codelet_unpack_discard_arg(&data);
+	/*skip fut*/
+	starpu_codelet_unpack_discard_arg(&data);
+	/*skip loop*/
+	starpu_codelet_unpack_discard_arg(&data);
+	/*get sb*/
+	starpu_codelet_unpack_arg(&data, &sb, sizeof(sb));
+
+	return sb;
 }
 
 static void del_Perf(PyObject *obj)
@@ -261,6 +428,9 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
 
 	/*first argument in args is always the python function passed in*/
 	PyObject *func_py = PyTuple_GetItem(args, 0);
+
+	Py_INCREF(fut);
+	Py_INCREF(loop);
 	Py_INCREF(func_py);
 
 	/*allocate a task structure and initialize it with default values*/
@@ -292,8 +462,8 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
 	struct starpu_codelet *func_cl=(struct starpu_codelet*)malloc(sizeof(struct starpu_codelet));
 	/*initialize func_cl with default values*/
 	starpu_codelet_init(func_cl);
-	func_cl->cpu_funcs[0]=&codelet_func;
-	func_cl->cpu_funcs_name[0]="codelet_func";
+	func_cl->cpu_funcs[0]=&starpupy_codelet_func;
+	func_cl->cpu_funcs_name[0]="starpupy_codelet_func";
 
 	/*check whether the option perfmodel is None*/
 	PyObject *dict_option = PyTuple_GetItem(args, PyTuple_Size(args)-1);/*the last argument is the option dictionary*/
@@ -307,45 +477,53 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
 		Py_INCREF(perfmodel);
 	}
 
-	/*allocate a new codelet structure to pass the python function, asyncio.Future and Event loop*/
-	struct codelet_args *cst = (struct codelet_args*)malloc(sizeof(struct codelet_args));
-	cst->f = func_py;
-	cst->fut = fut;
-	cst->lp = loop;
+	/*Initialize struct starpu_codelet_pack_arg_data*/
+	struct starpu_codelet_pack_arg_data data;
+	starpu_codelet_pack_arg_init(&data);
 
-	Py_INCREF(fut);
-	Py_INCREF(loop);
+	/*argument list of python function passed in*/
+	PyObject *argList;
 
 	/*pass args in argList*/
 	if (PyTuple_Size(args)==2)/*function no arguments*/
-		cst->argList = PyTuple_New(0);
+		argList = PyTuple_New(0);
 	else
-	{/*function has arguments*/
-		cst->argList = PyTuple_New(PyTuple_Size(args)-2);
+	{
+		/*function has arguments*/
+		argList = PyTuple_New(PyTuple_Size(args)-2);
 		int i;
 		for(i=0; i < PyTuple_Size(args)-2; i++)
 		{
 			PyObject *tmp=PyTuple_GetItem(args, i+1);
-			PyTuple_SetItem(cst->argList, i, tmp);
-			Py_INCREF(PyTuple_GetItem(cst->argList, i));
+			PyTuple_SetItem(argList, i, tmp);
+			Py_INCREF(PyTuple_GetItem(argList, i));
 		}
 	}
 
+	/*use cloudpickle to dump func_py*/
+	Py_ssize_t func_data_size;
+	PyObject *func_bytes;
+	char* func_data = starpu_cloudpickle_dumps(func_py, &func_bytes, &func_data_size);
+    starpu_codelet_pack_arg(&data, func_data, func_data_size);
+    Py_DECREF(func_bytes);
+    /*pack argList*/
+	starpu_codelet_pack_arg(&data, &argList, sizeof(argList));
+	/*pack fut*/
+	starpu_codelet_pack_arg(&data, &fut, sizeof(fut));
+	/*pack loop*/
+	starpu_codelet_pack_arg(&data, &loop, sizeof(loop));
+
 	task->cl=func_cl;
-	task->cl_arg=cst;
 
-	/*pass optional values name=None, synchronous=1, priority=0, color=None, flops=None, perfmodel=None*/
+	/*pass optional values name=None, synchronous=1, priority=0, color=None, flops=None, perfmodel=None, sizebase=0*/
 	/*const char * name*/
 	PyObject *PyName = PyDict_GetItemString(dict_option, "name");
-	const char *name_type = Py_TYPE(PyName)->tp_name;
-	if (strcmp(name_type, "NoneType")!=0)
+	if (PyName!=Py_None)
 	{
-		PyObject *pStrObj = PyUnicode_AsUTF8String(PyName);
-		char* name_str = PyBytes_AsString(pStrObj);
+		char* name_str = PyUnicode_AsUTF8(PyName);
 		char* name = strdup(name_str);
 		//printf("name is %s\n", name);
 		task->name=name;
-		Py_DECREF(pStrObj);
 	}
 
 	/*unsigned synchronous:1*/
@@ -362,8 +540,7 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
 
 	/*unsigned color*/
 	PyObject *PyColor = PyDict_GetItemString(dict_option, "color");
-	const char *color_type = Py_TYPE(PyColor)->tp_name;
-	if (strcmp(color_type, "NoneType")!=0)
+	if (PyColor!=Py_None)
 	{
 		unsigned color=PyLong_AsUnsignedLong(PyColor);
 		//printf("color is %u\n", color);
@@ -372,14 +549,24 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
 
 	/*double flops*/
 	PyObject *PyFlops = PyDict_GetItemString(dict_option, "flops");
-	const char *flops_type = Py_TYPE(PyFlops)->tp_name;
-	if (strcmp(flops_type, "NoneType")!=0)
+	if (PyFlops!=Py_None)
 	{
 		double flops=PyFloat_AsDouble(PyFlops);
-		//printf("flops is %f\n", flop);
+		//printf("flops is %f\n", flops);
 		task->flops=flops;
 	}
 
+	/*int sizebase*/
+	PyObject *PySB = PyDict_GetItemString(dict_option, "sizebase");
+	int sb=PyLong_AsLong(PySB);
+	//printf("pack sizebase is %d\n", sb);
+	/*pack sb*/
+	starpu_codelet_pack_arg(&data, &sb, sizeof(sb));
+
+	/*finish packing data and store the struct in cl_arg*/
+	starpu_codelet_pack_arg_fini(&data, &task->cl_arg, &task->cl_arg_size);
+
+	task->prologue_callback_func=&prologue_cb_func;
 	task->callback_func=&cb_func;
 
 	/*call starpu_task_submit method*/
@@ -474,6 +661,19 @@ static PyObject* starpu_task_nsubmitted_wrapper(PyObject *self, PyObject *args)
 	/*Return the number of submitted tasks which have not completed yet */
 	return Py_BuildValue("i", num_task);
 }
+
+/*wrapper shutdown method*/
+static PyObject* starpu_shutdown_wrapper(PyObject *self, PyObject *args)
+{
+	/*call starpu_shutdown method*/
+	Py_BEGIN_ALLOW_THREADS
+	starpu_shutdown();
+	Py_END_ALLOW_THREADS
+
+	/*return type is void*/
+	Py_INCREF(Py_None);
+	return Py_None;
+}
 /***********************************************************************************/
 
 /***************The module’s method table and initialization function**************/
@@ -491,6 +691,7 @@ static PyMethodDef starpupyMethods[] =
 	{"sched_get_min_priority", starpu_sched_get_min_priority_wrapper, METH_VARARGS, "get the number of min priority"}, /*get the number of min priority*/
 	{"sched_get_max_priority", starpu_sched_get_max_priority_wrapper, METH_VARARGS, "get the number of max priority"}, /*get the number of max priority*/
 	{"task_nsubmitted", starpu_task_nsubmitted_wrapper, METH_VARARGS, "get the number of submitted tasks which have not completed yet"}, /*get the number of submitted tasks which have not completed yet*/
+	{"shutdown", starpu_shutdown_wrapper, METH_VARARGS, "shutdown starpu"}, /*shutdown starpu*/
 	{NULL, NULL}
 };
 
@@ -521,11 +722,16 @@ PyMODINIT_FUNC
 PyInit_starpupy(void)
 {
 	PyEval_InitThreads();
+	//PyThreadState* st = PyEval_SaveThread();
+	Py_BEGIN_ALLOW_THREADS
 	/*starpu initialization*/
 	int ret = starpu_init(NULL);
 	assert(ret==0);
+	Py_END_ALLOW_THREADS
+
 	/*python asysncio import*/
 	asyncio_module = PyImport_ImportModule("asyncio");
+
 #ifdef STARPU_PYTHON_HAVE_NUMPY
 	/*numpy import array*/
 	import_array();