Kaynağa Gözat

starpupy: add the case to calculate size_base for numpy array when generating perfmodel

HE Kun 4 yıl önce
ebeveyn
işleme
8931d07e7a

+ 11 - 6
starpupy/examples/starpu_py_parallel.py

@@ -155,7 +155,7 @@ A=np.arange(N)
 b=np.arange(N, 2*N, 1)
 start_exec3=time.time()
 start_cpu3=time.process_time()
-starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="scal_list")(starpu.joblib.delayed(scal_arr)((i for i in b), A))
+starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="scal_arr")(starpu.joblib.delayed(scal_arr)((i for i in b), A))
 end_exec3=time.time()
 end_cpu3=time.process_time()
 print("the program execution time is", end_exec3-start_exec3)
@@ -177,7 +177,7 @@ a=np.arange(N)
 b=np.arange(N, 2*N, 1)
 start_exec5=time.time()
 start_cpu5=time.process_time()
-starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="multi_2list")(starpu.joblib.delayed(multi_2arr)((i for i in a), (j for j in b)))
+starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="multi_2arr")(starpu.joblib.delayed(multi_2arr)((i for i in a), (j for j in b)))
 end_exec5=time.time()
 end_cpu5=time.process_time()
 print("the program execution time is", end_exec5-start_exec5)
@@ -189,7 +189,7 @@ B=np.arange(N, 2*N, 1)
 print("The input arrays are A", A, "B", B)
 start_exec6=time.time()
 start_cpu6=time.process_time()
-starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="multi_2list")(starpu.joblib.delayed(multi_2arr)(A, B))
+starpu.joblib.Parallel(mode="normal", n_jobs=-1, perfmodel="multi_2arr")(starpu.joblib.delayed(multi_2arr)(A, B))
 end_exec6=time.time()
 end_cpu6=time.process_time()
 print("the program execution time is", end_exec6-start_exec6)
@@ -266,7 +266,7 @@ async def main():
 	print("--(scal_arr)((i for i in b), A)")
 	A=np.arange(N)
 	b=np.arange(N, 2*N, 1)
-	fut3=starpu.joblib.Parallel(mode="future", n_jobs=3, perfmodel="scal_list")(starpu.joblib.delayed(scal_arr)((i for i in b), A))
+	fut3=starpu.joblib.Parallel(mode="future", n_jobs=3, perfmodel="scal_arr")(starpu.joblib.delayed(scal_arr)((i for i in b), A))
 	res3=await fut3
 	#print(res3)
 
@@ -280,7 +280,7 @@ async def main():
 	print("--(multi_2arr)((i for i in a), (j for j in b))")
 	a=np.arange(N)
 	b=np.arange(N, 2*N, 1)
-	fut5=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="multi_2list")(starpu.joblib.delayed(multi_2arr)((i for i in a), (j for j in b)))
+	fut5=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="multi_2arr")(starpu.joblib.delayed(multi_2arr)((i for i in a), (j for j in b)))
 	res5=await fut5
 	#print(res5)
 
@@ -288,7 +288,7 @@ async def main():
 	A=np.arange(N)
 	B=np.arange(N, 2*N, 1)
 	print("The input arrays are A", A, "B", B)
-	fut6=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="multi_2list")(starpu.joblib.delayed(multi_2arr)(b=B, a=A))
+	fut6=starpu.joblib.Parallel(mode="future", n_jobs=2, perfmodel="multi_2arr")(starpu.joblib.delayed(multi_2arr)(b=B, a=A))
 	res6=await fut6
 	print("The return arrays are A", A, "B", B)
 
@@ -323,4 +323,9 @@ asyncio.run(main())
 
 starpu.perfmodel_plot(perfmodel="sqrt",view=False)
 starpu.perfmodel_plot(perfmodel="multi",view=False)
+starpu.perfmodel_plot(perfmodel="scal_arr",view=False)
+starpu.perfmodel_plot(perfmodel="multi_list",view=False)
+starpu.perfmodel_plot(perfmodel="multi_2arr")
+starpu.perfmodel_plot(perfmodel="scal")
+starpu.perfmodel_plot(perfmodel="add_scal")
 starpu.perfmodel_plot(perfmodel="func",view=False)

+ 1 - 1
starpupy/src/joblib.py

@@ -117,7 +117,7 @@ def future_generator(iterable, n_jobs, dict_task):
 					L_args.append(args[j])
 			#print("L_args is", L_args)
 			fut=starpu.task_submit(name=dict_task['name'], synchronous=dict_task['synchronous'], priority=dict_task['priority'],\
-								   color=dict_task['color'], flops=dict_task['flops'])\
+								   color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'])\
 				                  (f, *L_args)
 			L_fut.append(fut)
 		return L_fut

+ 37 - 13
starpupy/src/starpu_task_wrapper.c

@@ -85,8 +85,8 @@ void codelet_func(void *buffers[], void *cl_arg)
 
 	/*call the python function*/
 	PyObject *pRetVal = PyObject_CallObject(cst->f, cst->argList);
-  //const char *tp = Py_TYPE(pRetVal)->tp_name;
-  //printf("return value type is %s\n", tp);
+    //const char *tp = Py_TYPE(pRetVal)->tp_name;
+    //printf("return value type is %s\n", tp);
 	cst->rv = pRetVal;
 
 	//Py_DECREF(cst->f);
@@ -113,7 +113,7 @@ void cb_func(void *v)
 	Py_DECREF(cst->rv);
 	Py_DECREF(cst->fut);
 	Py_DECREF(cst->lp);
-  Py_DECREF(cst->argList);
+    Py_DECREF(cst->argList);
 
 	//Py_DECREF(perfmodel);
 	struct starpu_codelet *func_cl=(struct starpu_codelet *) task->cl;
@@ -130,10 +130,10 @@ void cb_func(void *v)
 	/*deallocate task*/
 	free(task->cl);
 	free(task->cl_arg);
-  if (task->name!=NULL)
-  {
-    free(task->name);
-  }
+    if (task->name!=NULL)
+    {
+      free(task->name);
+    }
 }
 
 /***********************************************************************************/
@@ -159,12 +159,25 @@ static PyObject *PyTask_FromTask(struct starpu_task *task)
 /***********************************************************************************/
 static size_t sizebase (struct starpu_task *task, unsigned nimpl)
 {
+	int n=0;
 	struct codelet_args *cst = (struct codelet_args*) task->cl_arg;
 
-	PyObject *obj=PyTuple_GetItem(cst->argList, 0);
-	/*get the length of arguments*/
-	int n = PyList_Size(obj);
-
+	/*get the result of function*/
+	PyObject *obj=cst->rv;
+	/*get the length of result*/
+	const char *tp = Py_TYPE(obj)->tp_name;
+	/*if the result is a numpy array*/ 
+	if (strcmp(tp, "numpy.ndarray")==0)
+		n = PyArray_SIZE(obj);
+	/*if the result is a list*/
+	else if (strcmp(tp, "list")==0)
+		n = PyList_Size(obj);
+	/*else error*/
+	else
+	{
+		printf("starpu_perfmodel::size_base: the type of function result is unrecognized\n");
+		exit(1);
+	}
 	return n;
 }
 
@@ -277,8 +290,8 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
 	func_cl->cpu_funcs[0]=&codelet_func;
 
 	/*check whether the option perfmodel is None*/
-  PyObject *dict_option = PyTuple_GetItem(args, PyTuple_Size(args)-1);/*the last argument is the option dictionary*/
-  PyObject *perfmodel = PyDict_GetItemString(dict_option, "perfmodel");
+    PyObject *dict_option = PyTuple_GetItem(args, PyTuple_Size(args)-1);/*the last argument is the option dictionary*/
+    PyObject *perfmodel = PyDict_GetItemString(dict_option, "perfmodel");
 	const char *tp_perf = Py_TYPE(perfmodel)->tp_name;
 	if (strcmp(tp_perf, "PyCapsule")==0)
 	{
@@ -442,6 +455,16 @@ static PyObject* starpu_sched_get_max_priority_wrapper(PyObject *self, PyObject
   /*return type is int*/
   return Py_BuildValue("i", max_prio);
 }
+
+/*wrapper get the number of no completed submitted tasks method*/
+static PyObject* starpu_task_nsubmitted_wrapper(PyObject *self, PyObject *args)
+{
+  /*call starpu_task_nsubmitted*/
+  int num_task=starpu_task_nsubmitted();
+
+  /*Return the number of submitted tasks which have not completed yet */
+  return Py_BuildValue("i", num_task);
+}
 /***********************************************************************************/
 
 /***************The module’s method table and initialization function**************/
@@ -458,6 +481,7 @@ static PyMethodDef starpupyMethods[] =
   {"save_history_based_model", starpu_save_history_based_model_wrapper, METH_VARARGS, "save the performance model"}, /*save the performance model*/
   {"sched_get_min_priority", starpu_sched_get_min_priority_wrapper, METH_VARARGS, "get the number of min priority"}, /*get the number of min priority*/
   {"sched_get_max_priority", starpu_sched_get_max_priority_wrapper, METH_VARARGS, "get the number of max priority"}, /*get the number of max priority*/
+  {"task_nsubmitted", starpu_task_nsubmitted_wrapper, METH_VARARGS, "get the number of submitted tasks which have not completed yet"}, /*get the number of submitted tasks which have not completed yet*/
   {NULL, NULL}
 };