Forráskód Böngészése

starpupy: get the return value of size_base directly from Python function and set it as an option of task_submit

HE Kun 4 éve
szülő
commit
9dfa483688
3 módosított fájl, 56 hozzáadás és 42 törlés
  1. 3 3
      starpupy/src/intermedia.py
  2. 11 2
      starpupy/src/joblib.py
  3. 42 37
      starpupy/src/starpu_task_wrapper.c

+ 3 - 3
starpupy/src/intermedia.py

@@ -41,12 +41,12 @@ def dict_perf_generator(perfsymbol):
 	return p
 
 #add options in function task_submit
-def task_submit(*, name=None, synchronous=0, priority=0, color=None, flops=None, perfmodel=None):
+def task_submit(*, name=None, synchronous=0, priority=0, color=None, flops=None, perfmodel=None, sizebase=0):
 	if perfmodel==None:
-		dict_option={'name': name, 'synchronous': synchronous, 'priority': priority, 'color': color, 'flops': flops, 'perfmodel': None}
+		dict_option={'name': name, 'synchronous': synchronous, 'priority': priority, 'color': color, 'flops': flops, 'perfmodel': None, 'sizebase': sizebase}
 	else:
 		p=dict_perf_generator(perfmodel)
-		dict_option={'name': name, 'synchronous': synchronous, 'priority': priority, 'color': color, 'flops': flops, 'perfmodel': p.get_struct()}
+		dict_option={'name': name, 'synchronous': synchronous, 'priority': priority, 'color': color, 'flops': flops, 'perfmodel': p.get_struct(), 'sizebase': sizebase}
 
 	def call_task_submit(f, *args):
 		fut=starpupy._task_submit(f, *args, dict_option)

+ 11 - 2
starpupy/src/joblib.py

@@ -136,14 +136,22 @@ def future_generator(iterable, n_jobs, dict_task):
 		for i in range(n_block):
 			# generate the argument list
 			L_args=[]
+			sizebase=0
 			for j in range(len(args)):
 				if type(args[j]) is np.ndarray or isinstance(args[j],types.GeneratorType):
 					L_args.append(args_split[j][i])
+					if sizebase==0:
+						sizebase=len(args_split[j][i])
+					else:
+						if sizebase==len(args_split[j][i]):
+							continue
+						else:
+							raise SystemExit('Error: all arrays should be split into equal size')
 				else:
 					L_args.append(args[j])
 			#print("L_args is", L_args)
 			fut=starpu.task_submit(name=dict_task['name'], synchronous=dict_task['synchronous'], priority=dict_task['priority'],\
-								   color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'])\
+								   color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'], sizebase=sizebase)\
 				                  (f, *L_args)
 			L_fut.append(fut)
 		return L_fut
@@ -169,8 +177,9 @@ def future_generator(iterable, n_jobs, dict_task):
 		# operation in each split list
 		L_fut=[]
 		for i in range(len(L_split)):
+			sizebase=len(L_split[i])
 			fut=starpu.task_submit(name=dict_task['name'], synchronous=dict_task['synchronous'], priority=dict_task['priority'],\
-								   color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'])\
+								   color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'], sizebase=sizebase)\
 				                  (lf, L_split[i])
 			L_fut.append(fut)
 		return L_fut

+ 42 - 37
starpupy/src/starpu_task_wrapper.c

@@ -60,19 +60,23 @@ void starpupy_codelet_func(void *buffers[], void *cl_arg)
 {
 	PyObject *func_py; /*the python function passed in*/
 	PyObject *argList; /*argument list of python function passed in*/
-	PyObject *fut; /*asyncio.Future*/
-	PyObject *loop; /*asyncio.Eventloop*/
+
 	//struct codelet_args *cst = (struct codelet_args*) cl_arg;
 
 	struct starpu_task *task = starpu_task_get_current();
+	/*Initialize struct starpu_codelet_unpack_arg_data*/
 	struct starpu_codelet_pack_arg_data data;
 	starpu_codelet_unpack_arg_init(&data, &task->cl_arg, &task->cl_arg_size);
 
 	starpu_codelet_unpack_arg(&data, &func_py);
-
 	starpu_codelet_unpack_arg(&data, &argList);
-	starpu_codelet_unpack_arg(&data, &fut);
-	starpu_codelet_unpack_arg(&data, &loop);
+	/*skip fut*/
+	starpu_codelet_unpack_discard_arg(&data);
+	/*skip loop*/
+	starpu_codelet_unpack_discard_arg(&data);
+	/*skip sb*/
+	starpu_codelet_unpack_discard_arg(&data);
+
 	/*make sure we own the GIL*/
 	PyGILState_STATE state = PyGILState_Ensure();
 
@@ -125,12 +129,18 @@ void cb_func(void *v)
 
 	struct starpu_task *task = starpu_task_get_current();
 	//struct codelet_args *cst = (struct codelet_args*) task->cl_arg;
+	/*Initialize struct starpu_codelet_unpack_arg_data*/
 	struct starpu_codelet_pack_arg_data data;
 	starpu_codelet_unpack_arg_init(&data, &task->cl_arg, &task->cl_arg_size);
+
+	/*skip func_py*/
 	starpu_codelet_unpack_discard_arg(&data);
+	/*skip argList*/
 	starpu_codelet_unpack_discard_arg(&data);
 	starpu_codelet_unpack_arg(&data, &fut);
 	starpu_codelet_unpack_arg(&data, &loop);
+	/*skip sb*/
+	starpu_codelet_unpack_discard_arg(&data);
 	starpu_codelet_unpack_arg(&data, &rv);
 
 	/*make sure we own the GIL*/
@@ -187,38 +197,26 @@ static PyObject *PyTask_FromTask(struct starpu_task *task)
 /***********************************************************************************/
 static size_t sizebase (struct starpu_task *task, unsigned nimpl)
 {
-	int n=0;
+	int sb;
 	//struct codelet_args *cst = (struct codelet_args*) task->cl_arg;
-	PyObject *func_py; /*the python function passed in*/
-	PyObject *argList; /*argument list of python function passed in*/
-	PyObject *fut; /*asyncio.Future*/
-	PyObject *loop; /*asyncio.Eventloop*/
-	PyObject *rv; /*return value when using PyObject_CallObject call the function f*/
+	/*Initialize struct starpu_codelet_unpack_arg_data*/
+	struct starpu_codelet_pack_arg_data data;
+	starpu_codelet_unpack_arg_init(&data, &task->cl_arg, &task->cl_arg_size);
 
-	struct starpu_task *task_submit = starpu_task_get_current();
-	//struct codelet_args *cst = (struct codelet_args*) task->cl_arg;
-	starpu_codelet_unpack_args(task_submit->cl_arg, &func_py, &argList, &fut, &loop, &rv);
+	/*skip func_py*/
+	starpu_codelet_unpack_discard_arg(&data);
+	/*skip argList*/
+	starpu_codelet_unpack_discard_arg(&data);
+	/*skip fut*/
+	starpu_codelet_unpack_discard_arg(&data);
+	/*skip loop*/
+	starpu_codelet_unpack_discard_arg(&data);
+	starpu_codelet_unpack_arg(&data, &sb);
+	/*skip rv*/
+	starpu_codelet_unpack_discard_arg(&data);
+	//starpu_codelet_unpack_args(task_submit->cl_arg, &func_py, &argList, &fut, &loop, &sb, &rv);
 
-	/*get the result of function*/
-	PyObject *obj=rv;
-	/*get the length of result*/
-	const char *tp = Py_TYPE(obj)->tp_name;
-#ifdef STARPU_PYTHON_HAVE_NUMPY
-	/*if the result is a numpy array*/
-	if (strcmp(tp, "numpy.ndarray")==0)
-		n = PyArray_SIZE(obj);
-	else
-#endif
-	/*if the result is a list*/
-	if (strcmp(tp, "list")==0)
-		n = PyList_Size(obj);
-	/*else error*/
-	else
-	{
-		printf("starpu_perfmodel::size_base: the type of function result is unrecognized\n");
-		exit(1);
-	}
-	return n;
+	return sb;
 }
 
 static void del_Perf(PyObject *obj)
@@ -373,7 +371,7 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
 			Py_INCREF(PyTuple_GetItem(argList, i));
 		}
 	}
-	
+
 	starpu_codelet_pack_arg(&data, &func_py, sizeof(func_py));
 	starpu_codelet_pack_arg(&data, &argList, sizeof(argList));
 	starpu_codelet_pack_arg(&data, &fut, sizeof(fut));
@@ -381,9 +379,8 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
 
 	task->cl=func_cl;
 	//task->cl_arg=cst;
-	starpu_codelet_pack_arg_fini(&data, &task->cl_arg, &task->cl_arg_size);
 
-	/*pass optional values name=None, synchronous=1, priority=0, color=None, flops=None, perfmodel=None*/
+	/*pass optional values name=None, synchronous=1, priority=0, color=None, flops=None, perfmodel=None, sizebase=0*/
 	/*const char * name*/
 	PyObject *PyName = PyDict_GetItemString(dict_option, "name");
 	const char *name_type = Py_TYPE(PyName)->tp_name;
@@ -429,6 +426,14 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
 		task->flops=flops;
 	}
 
+	/*int sizebase*/
+	PyObject *PySB = PyDict_GetItemString(dict_option, "sizebase");
+	int sb=PyLong_AsLong(PySB);
+	//printf("pack sizebase is %d\n", sb);
+	starpu_codelet_pack_arg(&data, &sb, sizeof(sb));
+
+	starpu_codelet_pack_arg_fini(&data, &task->cl_arg, &task->cl_arg_size);
+
 	task->callback_func=&cb_func;
 
 	/*call starpu_task_submit method*/