Przeglądaj źródła

starpupy: add optional field in the function task_submit

(cherry picked from commit 81cff9d9276dfbe01f64a2001385544954fe2573)
HE Kun 4 lat temu
rodzic
commit
19a93f5423

Plik diff jest za duży
+ 54 - 11
doc/doxygen/chapters/400_python.doxy


+ 1 - 0
starpupy/src/starpu/__init__.py

@@ -18,3 +18,4 @@
 from.starpupy import *
 from .delay import *
 from . import joblib
+from .intermedia import *

+ 9 - 3
starpupy/src/starpu/delay.py

@@ -14,10 +14,16 @@
 # See the GNU Lesser General Public License in COPYING.LGPL for more details.
 #
 from starpu import starpupy
+import starpu
 import asyncio
+from functools import partial
 
-def delayed(f):
-	def submit(*args,**kwargs):
-		fut = starpupy.task_submit(f, *args,**kwargs)
+def delayed(f=None,*, name=None, synchronous=0, priority=0, color=None, flops=None, perfmodel=None):
+	# add options of task_submit
+	if f is None:
+		return partial(delayed, name=name, synchronous=synchronous, priority=priority, color=color, flops=flops, perfmodel=perfmodel)
+	def submit(*args):
+		fut = starpu.task_submit(name=name, synchronous=synchronous, priority=priority,\
+								 color=color, flops=flops, perfmodel=perfmodel)(f, *args)
 		return fut
 	return submit

+ 62 - 0
starpupy/src/starpu/intermedia.py

@@ -0,0 +1,62 @@
+# StarPU --- Runtime system for heterogeneous multicore architectures.
+#
+# Copyright (C) 2020       Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+#
+# StarPU is free software; you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation; either version 2.1 of the License, or (at
+# your option) any later version.
+#
+# StarPU is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+#
+# See the GNU Lesser General Public License in COPYING.LGPL for more details.
+#
+
+from starpu import starpupy
+import os
+
+#class perfmodel
+class Perfmodel(object):
+	def __init__(self, symbol):
+		self.symbol=symbol
+		self.pstruct=starpupy.init_perfmodel(self.symbol)
+
+	def get_struct(self):
+		return self.pstruct
+
+	def __del__(self):
+	#def free_struct(self):
+		starpupy.free_perfmodel(self.pstruct)
+
+# generate the dictionary which contains the perfmodel symbol and its struct pointer
+dict_perf={}
+def dict_perf_generator(perfsymbol):
+	if dict_perf.get(perfsymbol)==None:
+		p=Perfmodel(perfsymbol)
+		dict_perf[perfsymbol]=p
+	else:
+		p=dict_perf[perfsymbol]
+	return p
+
+#add options in function task_submit
+def task_submit(*, name=None, synchronous=0, priority=0, color=None, flops=None, perfmodel=None):
+	if perfmodel==None:
+		dict_option={'name': name, 'synchronous': synchronous, 'priority': priority, 'color': color, 'flops': flops, 'perfmodel': None}
+	else:
+		p=dict_perf_generator(perfmodel)
+		dict_option={'name': name, 'synchronous': synchronous, 'priority': priority, 'color': color, 'flops': flops, 'perfmodel': p.get_struct()}
+
+	def call_task_submit(f, *args):
+		fut=starpupy._task_submit(f, *args, dict_option)
+		return fut
+	return call_task_submit
+
+# dump performance model and show the plot
+def perfmodel_plot(perfmodel):
+	p=dict_perf[perfmodel]
+	starpupy.save_history_based_model(p.get_struct())
+	os.system('starpu_perfmodel_plot -s "' + perfmodel +'"')
+	os.system('gnuplot starpu_'+perfmodel+'.gp')
+	os.system('gv starpu_'+perfmodel+'.eps')

+ 12 - 43
starpupy/src/starpu/joblib.py

@@ -14,29 +14,14 @@
 # See the GNU Lesser General Public License in COPYING.LGPL for more details.
 #
 from starpu import starpupy
+import starpu
 import asyncio
 import math
-import os
-import pickle
-import json
 import functools
 
 # get the number of CPUs controlled by StarPU
 n_cpus=starpupy.cpu_worker_get_count()
 
-#class perfmodel
-class Perfmodel(object):
-	def __init__(self, symbol):
-		self.symbol=symbol
-		self.pstruct=starpupy.init_perfmodel(self.symbol)
-
-	def get_struct(self):
-		return self.pstruct
-
-	def __del__(self):
-	#def free_struct(self):
-		starpupy.free_perfmodel(self.pstruct)
-
 # split a list ls into n_block numbers of sub-lists 
 def partition(ls, n_block):
 	if len(ls)>=n_block:
@@ -55,17 +40,7 @@ def partition(ls, n_block):
 		L=[ls[i:i+1] for i in range (len(ls))]
 	return L
 
-# generate the dictionary which contains the perfmodel symbol and its struct pointer
-dict_perf={}
-def dict_perf_generator(perfsymbol):
-	if dict_perf.get(perfsymbol)==None:
-		p=Perfmodel(perfsymbol)
-		dict_perf[perfsymbol]=p
-	else:
-		p=dict_perf[perfsymbol]
-	return p
-
-def future_generator(g, n_jobs, perfsymbol):
+def future_generator(g, n_jobs, dict_task):
 	# g is generated by delayed function, after converting to a list, the format is [function, (arg1, arg2, ... ,)]
 	L=list(g)
 	# generate a list of function according to g
@@ -91,24 +66,24 @@ def future_generator(g, n_jobs, perfsymbol):
 	# operation in each split list
 	L_fut=[]
 	for i in range(len(L_split)):
-		if perfsymbol==None:
-			fut=starpupy.task_submit(lf, L_split[i])
-			L_fut.append(fut)
-		else:
-			p=dict_perf_generator(perfsymbol)
-			fut=starpupy.task_submit(lf, L_split[i], p.get_struct())
-			L_fut.append(fut)
+		fut=starpu.task_submit(name=dict_task['name'], synchronous=dict_task['synchronous'], priority=dict_task['priority'],\
+							   color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'])\
+			                  (lf, L_split[i])
+		L_fut.append(fut)
 	return L_fut
 
 def parallel(*, mode="normal", n_jobs=1, perfmodel=None, end_msg=None,\
+			 name=None, synchronous=0, priority=0, color=None, flops=None,\
 	         backend=None, verbose=0, timeout=None, pre_dispatch='2 * n_jobs',\
 	         batch_size='auto', temp_folder=None, max_nbytes='1M',\
 	         mmap_mode='r', prefer=None, require=None):
+	#generate the dictionary of task_submit
+	dict_task={'name': name, 'synchronous': synchronous, 'priority': priority, 'color': color, 'flops': flops, 'perfmodel': perfmodel}
 	# the mode normal, user can call the function directly without using async
 	if mode=="normal":
 		def parallel_normal(g):
 			async def asy_main():
-				L_fut=future_generator(g, n_jobs, perfmodel)
+				L_fut=future_generator(g, n_jobs, dict_task)
 				res=[]
 				for i in range(len(L_fut)):
 					L_res=await L_fut[i]
@@ -121,7 +96,7 @@ def parallel(*, mode="normal", n_jobs=1, perfmodel=None, end_msg=None,\
 	# the mode future, user needs to use asyncio module and await the Future result in main function
 	elif mode=="future":
 		def parallel_future(g):
-			L_fut=future_generator(g, n_jobs, perfmodel)
+			L_fut=future_generator(g, n_jobs, dict_task)
 			fut=asyncio.gather(*L_fut)
 			if end_msg==None:
 				return fut
@@ -138,10 +113,4 @@ def delayed(f):
 
 
 ######################################################################
-# dump performance model
-def perfmodel_plot(perfmodel):
-	p=dict_perf[perfmodel]
-	starpupy.save_history_based_model(p.get_struct())
-	os.system('starpu_perfmodel_plot -s "' + perfmodel +'"')
-	os.system('gnuplot starpu_'+perfmodel+'.gp')
-	os.system('gv starpu_'+perfmodel+'.eps')
+

+ 74 - 15
starpupy/src/starpu/starpu_task_wrapper.c

@@ -125,6 +125,9 @@ void cb_func(void *v){
     /*deallocate task*/
     free(task->cl);
 	  free(task->cl_arg);
+    if (task->name!=NULL){
+      free(task->name);
+    }
 
 }
 
@@ -243,7 +246,7 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args){
     /*set one of fut attribute to the task pointer*/
     PyObject_SetAttrString(fut, "starpu_task", PyTask);
     /*check the arguments of python function passed in*/
-    for (int i=1; i < PyTuple_Size(args); i++){
+    for (int i=1; i < PyTuple_Size(args)-1; i++){
       PyObject* obj=PyTuple_GetItem(args, i);
       const char* tp = Py_TYPE(obj)->tp_name;
       if(strcmp(tp, "_asyncio.Future") == 0){
@@ -262,8 +265,9 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args){
     starpu_codelet_init(func_cl);
     func_cl->cpu_func=&codelet_func;
     
-    /*check whether the last argument in args is the perfmodel*/
-    PyObject* perfmodel=PyTuple_GetItem(args, PyTuple_Size(args)-1);
+    /*check whether the option perfmodel is None*/
+    PyObject* dict_option = PyTuple_GetItem(args, PyTuple_Size(args)-1);/*the last argument is the option dictionary*/
+    PyObject* perfmodel = PyDict_GetItemString(dict_option, "perfmodel");
     const char* tp_perf = Py_TYPE(perfmodel)->tp_name;
     if (strcmp(tp_perf, "PyCapsule")==0){
       /*PyObject*->struct perfmodel**/
@@ -271,7 +275,6 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args){
       func_cl->model=perf;
       Py_INCREF(perfmodel);
     }
-    
 
     /*allocate a new codelet structure to pass the python function, asyncio.Future and Event loop*/
     codelet_st *cst = (codelet_st*)malloc(sizeof(codelet_st));
@@ -283,9 +286,9 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args){
     Py_INCREF(loop);
 
     /*pass args in argList*/
-    if (PyTuple_Size(args)==1 || (PyTuple_Size(args)==2 && strcmp(tp_perf, "PyCapsule")==0))/*function no arguments*/
+    if (PyTuple_Size(args)==2)/*function no arguments*/
       cst->argList = PyTuple_New(0);
-    else if(PyTuple_Size(args)>2 && strcmp(tp_perf, "PyCapsule")==0){/*function has arguments and the last argument in args is the perfmodel*/
+    else{/*function has arguments*/
       cst->argList = PyTuple_New(PyTuple_Size(args)-2);
       for (int i=0; i < PyTuple_Size(args)-2; i++){
         PyObject* tmp=PyTuple_GetItem(args, i+1);
@@ -293,17 +296,52 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args){
         Py_INCREF(PyTuple_GetItem(cst->argList, i));
       }
     }
-    else{/*function has arguments and no perfmodel*/
-      cst->argList = PyTuple_New(PyTuple_Size(args)-1);
-      for (int i=0; i < PyTuple_Size(args)-1; i++){
-        PyObject* tmp=PyTuple_GetItem(args, i+1);
-        PyTuple_SetItem(cst->argList, i, tmp);
-        Py_INCREF(PyTuple_GetItem(cst->argList, i));
-      }
-    }
 
     task->cl=func_cl;
     task->cl_arg=cst;
+    /*pass optional values name=None, synchronous=1, priority=0, color=None, flops=None, perfmodel=None*/
+    /*const char * name*/
+    PyObject* PyName = PyDict_GetItemString(dict_option, "name");
+    const char* name_type = Py_TYPE(PyName)->tp_name;
+    if (strcmp(name_type, "NoneType")!=0){
+      PyObject* pStrObj = PyUnicode_AsUTF8String(PyName);
+      char* name_str = PyBytes_AsString(pStrObj);
+      char* name = strdup(name_str);
+      //printf("name is %s\n", name);
+      task->name=name;
+      Py_DECREF(pStrObj);
+    }
+
+    /*unsigned synchronous:1*/
+    PyObject* PySync = PyDict_GetItemString(dict_option, "synchronous");
+    unsigned sync=PyLong_AsUnsignedLong(PySync);
+    //printf("sync is %u\n", sync);
+    task->synchronous=sync;
+
+    /*int priority*/
+    PyObject* PyPrio = PyDict_GetItemString(dict_option, "priority");
+    int prio=PyLong_AsLong(PyPrio);
+    //printf("prio is %d\n", prio);
+    task->priority=prio;
+
+    /*unsigned color*/
+    PyObject* PyColor = PyDict_GetItemString(dict_option, "color");
+    const char* color_type = Py_TYPE(PyColor)->tp_name;
+    if (strcmp(color_type, "NoneType")!=0){
+      unsigned color=PyLong_AsUnsignedLong(PyColor);
+      //printf("color is %u\n", color);
+      task->color=color;
+    }
+
+    /*double flops*/
+    PyObject* PyFlops = PyDict_GetItemString(dict_option, "flops");
+    const char* flops_type = Py_TYPE(PyFlops)->tp_name;
+    if (strcmp(flops_type, "NoneType")!=0){
+      double flop=PyFloat_AsDouble(PyFlops);
+      //printf("flops is %f\n", flop);
+      task->flops=flop;
+    }
+
     /*call starpu_task_submit method*/
     starpu_task_submit(task);
     task->callback_func=&cb_func;
@@ -364,13 +402,32 @@ static PyObject* starpu_cpu_worker_get_count_wrapper(PyObject *self, PyObject *a
   return Py_BuildValue("I", num_cpu);
 }
 
+/*wrapper get min priority method*/
+static PyObject* starpu_sched_get_min_priority_wrapper(PyObject *self, PyObject *args){
+
+  /*call starpu_sched_get_min_priority*/
+  int min_prio=starpu_sched_get_min_priority();
+
+  /*return type is int*/
+  return Py_BuildValue("i", min_prio);
+}
+
+/*wrapper get max priority method*/
+static PyObject* starpu_sched_get_max_priority_wrapper(PyObject *self, PyObject *args){
+
+  /*call starpu_sched_get_max_priority*/
+  int max_prio=starpu_sched_get_max_priority();
+
+  /*return type is int*/
+  return Py_BuildValue("i", max_prio);
+}
 /***********************************************************************************/
 
 /***************The module’s method table and initialization function**************/
 /*method table*/
 static PyMethodDef starpupyMethods[] = 
 { 
-  {"task_submit", starpu_task_submit_wrapper, METH_VARARGS, "submit the task"}, /*submit method*/
+  {"_task_submit", starpu_task_submit_wrapper, METH_VARARGS, "submit the task"}, /*submit method*/
   {"task_wait_for_all", starpu_task_wait_for_all_wrapper, METH_VARARGS, "wait the task"}, /*wait for all method*/
   {"pause", starpu_pause_wrapper, METH_VARARGS, "suspend the processing of new tasks by workers"}, /*pause method*/
   {"resume", starpu_resume_wrapper, METH_VARARGS, "resume the workers polling for new tasks"}, /*resume method*/
@@ -378,6 +435,8 @@ static PyMethodDef starpupyMethods[] =
   {"init_perfmodel", init_perfmodel, METH_VARARGS, "initialize struct starpu_perfmodel"}, /*initialize perfmodel*/
   {"free_perfmodel", free_perfmodel, METH_VARARGS, "free struct starpu_perfmodel"}, /*free perfmodel*/
   {"save_history_based_model", starpu_save_history_based_model_wrapper, METH_VARARGS, "save the performance model"}, /*save the performance model*/
+  {"sched_get_min_priority", starpu_sched_get_min_priority_wrapper, METH_VARARGS, "get the number of min priority"}, /*get the number of min priority*/
+  {"sched_get_max_priority", starpu_sched_get_max_priority_wrapper, METH_VARARGS, "get the number of max priority"}, /*get the number of max priority*/
   {NULL, NULL}
 };
 

+ 8 - 8
starpupy/tests/starpu_py.py

@@ -73,7 +73,7 @@ def sub(a,b,c):
 ###############################################################################
 
 #using decorator wrap the function with input
-@starpu.delayed
+@starpu.delayed(name="test")
 def add_deco(a,b,c):
 	#time.sleep(1)
 	print ("Example 8:")
@@ -83,7 +83,7 @@ def add_deco(a,b,c):
 ###############################################################################
 
 #using decorator wrap the function with input
-@starpu.delayed
+@starpu.delayed(color=1)
 def sub_deco(x,a):
 	print ("Example 9:")
 	print ("This is a function with input and output wrapped by the decorator function:")
@@ -93,34 +93,34 @@ def sub_deco(x,a):
 
 async def main():
 	#submit function "hello"
-    fut = starpu.task_submit(hello)
+    fut = starpu.task_submit()(hello)
     await fut
 
     #submit function "func1"
-    fut1 = starpu.task_submit(func1)
+    fut1 = starpu.task_submit()(func1)
     await fut1
 
     #apply starpu.delayed(func1_deco())
     await func1_deco()
 
 	#submit function "func2"
-    fut2 = starpu.task_submit(func2)
+    fut2 = starpu.task_submit()(func2)
     res2 = await fut2
 	#print the result of function
     print("This is a function no input and the return value is", res2)
 
     #submit function "multi"
-    fut3 = starpu.task_submit(multi, 2, 3)
+    fut3 = starpu.task_submit()(multi, 2, 3)
     res3 = await fut3
     print("The result of function multi is :", res3)
 
 	#submit function "add"
-    fut4 = starpu.task_submit(add, 1.2, 2.5, 3.6, 4.9)
+    fut4 = starpu.task_submit()(add, 1.2, 2.5, 3.6, 4.9)
     res4 = await fut4
     print("The result of function add is :", res4)
 
 	#submit function "sub"
-    fut5 = starpu.task_submit(sub, 6, 2, 5.9)
+    fut5 = starpu.task_submit()(sub, 6, 2, 5.9)
     res5 = await fut5
     print("The result of function sub is:", res5)
 

+ 3 - 3
starpupy/tests/starpu_py_parallel.py

@@ -96,6 +96,6 @@ async def main():
 	#print(res3)
 asyncio.run(main())
 
-starpu.joblib.perfmodel_plot(perfmodel="first")
-starpu.joblib.perfmodel_plot(perfmodel="second")
-starpu.joblib.perfmodel_plot(perfmodel="third")
+starpu.perfmodel_plot(perfmodel="first")
+starpu.perfmodel_plot(perfmodel="second")
+starpu.perfmodel_plot(perfmodel="third")