浏览代码

starpupy: provide parallel computing imitating Joblib

HE Kun 4 年之前
父节点
当前提交
68a18b89ea

+ 3 - 3
starpupy/src/starpu/Makefile

@@ -4,10 +4,10 @@ CPPFLAGS = $(shell $(PYTHON)-config --includes) -Wall -O2 -g
 CFLAGS += $(shell pkg-config --cflags starpu-1.3)
 LDLIBS += $(shell pkg-config --libs starpu-1.3)
 
-all: task.so
+all: starpupy.so
 
-task.so: starpu_task_wrapper.c Makefile
+starpupy.so: starpu_task_wrapper.c Makefile
 	$(CC) -fPIC $(CFLAGS) $< -o $@ -shared  $(CPPFLAGS) $(LDLIBS)
 
 clean:
-	rm -f task.so
+	rm -f starpupy.so

+ 3 - 2
starpupy/src/starpu/__init__.py

@@ -15,5 +15,6 @@
 #
 
 
-from.task import *
-from .delay import delayed
+from.starpupy import *
+from .delay import *
+from . import joblib

+ 2 - 2
starpupy/src/starpu/delay.py

@@ -13,11 +13,11 @@
 #
 # See the GNU Lesser General Public License in COPYING.LGPL for more details.
 #
-from starpu import task
+from starpu import starpupy
 import asyncio
 
 def delayed(f):
 	def submit(*args,**kwargs):
-		fut = task.task_submit(f, *args,**kwargs)
+		fut = starpupy.task_submit(f, *args,**kwargs)
 		return fut
 	return submit

+ 94 - 0
starpupy/src/starpu/joblib.py

@@ -0,0 +1,94 @@
+# StarPU --- Runtime system for heterogeneous multicore architectures.
+#
+# Copyright (C) 2020       Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+#
+# StarPU is free software; you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation; either version 2.1 of the License, or (at
+# your option) any later version.
+#
+# StarPU is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+#
+# See the GNU Lesser General Public License in COPYING.LGPL for more details.
+#
+from starpu import starpupy
+import asyncio
+import math
+
+# get the number of CPUs controlled by StarPU
+n_cpus=starpupy.cpu_worker_get_count()
+
+# split a list ls into n_block numbers of sub-lists 
+def partition(ls, n_block):
+	if len(ls)>=n_block:
+		# there are n1 sub-lists which contain q1 elements, and (n_block-n1) sublists which contain q2 elements (n1 can be 0)
+		q1=math.ceil(len(ls)/n_block)
+		q2=math.floor(len(ls)/n_block)
+		n1=len(ls)%n_block
+		#n2=n_block-n1
+		# generate n1 sub-lists in L1, and (n_block-n1) sub-lists in L2
+		L1=[ls[i:i+q1] for i in range(0, n1*q1, q1)]
+		L2=[ls[i:i+q2] for i in range(n1*q1, len(ls), q2)]
+
+		L=L1+L2
+	else:
+		# if the block number is larger than the length of list, each element in the list is a sub-list
+		L=[ls[i:i+1] for i in range (len(ls))]
+	return L
+
+def future_generator(g, n_jobs):
+	# g is generated by delayed function, after converting to a list, the format is [function, (arg1, arg2, ... ,)]
+	L=list(g)
+	# generate a list of function according to g
+	def lf(ls):
+		L_func=[]
+		for i in range(len(ls)):
+			# the first element is the function
+			f=ls[i][0]
+			# the second element is the args list of a type tuple
+			L_args=list(ls[i][1])
+			# generate a list of function
+			L_func.append(f(*L_args))
+		return L_func
+	# get the number of block
+	if n_jobs<0:
+		n_block=n_cpus+1+n_jobs
+	else:
+		n_block=n_jobs
+	# generate the split function list
+	L_split=partition(L,n_block)
+	# operation in each split list
+	L_fut=[]
+	for i in range(len(L_split)):
+		fut=starpupy.task_submit(lf, L_split[i])
+		L_fut.append(fut)
+	return L_fut
+
+def parallel(*, mode, n_jobs=None):
+	# the mode normal, user can call the function directly without using async
+	if mode=="normal":
+		def parallel_normal(g):
+			async def asy_main():
+				L_fut=future_generator(g, n_jobs)
+				res=[]
+				for i in range(len(L_fut)):
+					L_res=await L_fut[i]
+					res.extend(L_res)
+				print(res)
+				return res
+			asyncio.run(asy_main())
+			return asy_main
+		return parallel_normal
+	# the mode future, user needs to use asyncio module and await the Future result in main function
+	elif mode=="future":
+		def parallel_future(g):
+			L_fut=future_generator(g, n_jobs)
+			return L_fut
+		return parallel_future
+
+def delayed(f):
+	def delayed_func(*args):
+		return f, args
+	return delayed_func

+ 44 - 14
starpupy/src/starpu/starpu_task_wrapper.c

@@ -22,6 +22,19 @@
 #define PY_SSIZE_T_CLEAN
 #include <Python.h>
 
+/*macro*/
+#if defined(Py_DEBUG) || defined(DEBUG)
+extern void _Py_CountReferences(FILE*);
+#define CURIOUS(x) { fprintf(stderr, __FILE__ ":%d ", __LINE__); x; }
+#else
+#define CURIOUS(x)
+#endif
+#define MARKER()        CURIOUS(fprintf(stderr, "\n"))
+#define DESCRIBE(x)     CURIOUS(fprintf(stderr, "  " #x "=%d\n", x))
+#define DESCRIBE_HEX(x) CURIOUS(fprintf(stderr, "  " #x "=%08x\n", x))
+#define COUNTREFS()     CURIOUS(_Py_CountReferences(stderr))
+/*******/
+
 /*********************Functions passed in task_submit wrapper***********************/
 
 static PyObject* asyncio_module; /*python asyncio library*/
@@ -47,7 +60,7 @@ void codelet_func(void *buffers[], void *cl_arg){
     /*verify that the function is a proper callable*/
     if (!PyCallable_Check(cst->f)) {
 
-        printf("py_callback: expected a callablen\n"); 
+        printf("py_callback: expected a callable function\n"); 
         exit(1);
     }
     
@@ -67,14 +80,15 @@ void codelet_func(void *buffers[], void *cl_arg){
     PyObject *pRetVal = PyObject_CallObject(cst->f, cst->argList);
     cst->rv=pRetVal;
 
-    Py_DECREF(cst->f);
     for(int i = 0; i < PyTuple_Size(cst->argList); i++){
         Py_DECREF(PyTuple_GetItem(cst->argList, i));
     }
     Py_DECREF(cst->argList);
+    //Py_DECREF(cst->f);
 
     /*restore previous GIL state*/
     PyGILState_Release(state);
+
 }
 
 /*function passed to starpu_task.callback_func*/
@@ -162,6 +176,7 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args){
     /*initialize func_cl with default values*/
     starpu_codelet_init(func_cl);
     func_cl->cpu_func=&codelet_func;
+    //func_cl->model=p; p malloc perfmode
 
     /*allocate a new codelet structure to pass the python function, asyncio.Future and Event loop*/
     codelet_st *cst = (codelet_st*)malloc(sizeof(codelet_st));
@@ -186,12 +201,15 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args){
 
     task->cl=func_cl;
     task->cl_arg=cst;
-
     /*call starpu_task_submit method*/
-    int retval=starpu_task_submit(task);
+    starpu_task_submit(task);
     task->callback_func=&cb_func;
 
+    //printf("the number of reference is %ld\n", Py_REFCNT(func_py));
+    //_Py_PrintReferences(stderr);
+    //COUNTREFS();
     return fut;
+
 }
 
 /*wrapper wait for all method*/
@@ -229,48 +247,60 @@ static PyObject* starpu_resume_wrapper(PyObject *self, PyObject *args){
     return Py_None;
 }
 
+/*wrapper get count cpu method*/
+static PyObject* starpu_cpu_worker_get_count_wrapper(PyObject *self, PyObject *args){
+
+  /*call starpu_cpu_worker_get_count method*/
+  int num_cpu=starpu_cpu_worker_get_count();
+
+  /*return type is unsigned*/
+  return Py_BuildValue("I", num_cpu);
+}
+
 /***********************************************************************************/
 
 /***************The module’s method table and initialization function**************/
 /*method table*/
-static PyMethodDef taskMethods[] = 
+static PyMethodDef starpupyMethods[] = 
 { 
   {"task_submit", starpu_task_submit_wrapper, METH_VARARGS, "submit the task"}, /*submit method*/
   {"task_wait_for_all", starpu_task_wait_for_all_wrapper, METH_VARARGS, "wait the task"}, /*wait for all method*/
   {"pause", starpu_pause_wrapper, METH_VARARGS, "suspend the processing of new tasks by workers"}, /*pause method*/
   {"resume", starpu_resume_wrapper, METH_VARARGS, "resume the workers polling for new tasks"}, /*resume method*/
+  {"cpu_worker_get_count", starpu_cpu_worker_get_count_wrapper, METH_VARARGS, "return the number of CPUs controlled by StarPU"}, /*get count cpu method*/
   {NULL, NULL}
 };
 
 /*deallocation function*/
-static void taskFree(void *v){
+static void starpupyFree(void *v){
 	starpu_shutdown();
-    Py_DECREF(asyncio_module);
+  Py_DECREF(asyncio_module);
+  //COUNTREFS();
 }
 
 /*module definition structure*/
-static struct PyModuleDef taskmodule={
+static struct PyModuleDef starpupymodule={
   PyModuleDef_HEAD_INIT,
-  "task", /*name of module*/
+  "starpupy", /*name of module*/
   NULL,
   -1,
-  taskMethods, /*method table*/
+  starpupyMethods, /*method table*/
   NULL,
   NULL,
   NULL,
-  taskFree /*deallocation function*/
+  starpupyFree /*deallocation function*/
 };
 
 /*initialization function*/
 PyMODINIT_FUNC
-PyInit_task(void)
+PyInit_starpupy(void)
 {
     PyEval_InitThreads();
     /*starpu initialization*/
-	int ret = starpu_init(NULL);
+	  starpu_init(NULL);
     /*python asysncio import*/
     asyncio_module = PyImport_ImportModule("asyncio");
     /*module import initialization*/
-    return PyModule_Create(&taskmodule);
+    return PyModule_Create(&starpupymodule);
 }
 /***********************************************************************************/

+ 1 - 0
starpupy/tests/Makefile

@@ -2,4 +2,5 @@ PYTHON ?= python3
 
 all:
 	PYTHONPATH=../src $(PYTHON) starpu_py.py
+	PYTHONPATH=../src $(PYTHON) starpu_py_parallel.py
 

+ 91 - 0
starpupy/tests/starpu_py_parallel.py

@@ -0,0 +1,91 @@
+# StarPU --- Runtime system for heterogeneous multicore architectures.
+#
+# Copyright (C) 2020       Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+#
+# StarPU is free software; you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation; either version 2.1 of the License, or (at
+# your option) any later version.
+#
+# StarPU is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+#
+# See the GNU Lesser General Public License in COPYING.LGPL for more details.
+#
+import starpu
+import time
+import asyncio
+from math import sqrt
+
+#generate a list to store functions
+g_func=[]
+
+#function no input no output print hello world
+def hello():
+	print ("Example 1: Hello, world!")
+g_func.append(starpu.joblib.delayed(hello)())
+
+#function no input no output
+def func1():
+	print ("Example 2: This is a function no input no output")
+g_func.append(starpu.joblib.delayed(func1)())
+
+#function no input return a value
+def func2():
+	print ("Example 3:")
+	return 12
+g_func.append(starpu.joblib.delayed(func2)())	
+ 
+#function has 2 int inputs and 1 int output
+def multi(a,b):
+	res_multi=a*b
+	print("Example 4: The result of ",a,"*",b,"is",res_multi)
+	return res_multi
+g_func.append(starpu.joblib.delayed(multi)(2, 3))
+
+#function has 4 float inputs and 1 float output
+def add(a,b,c,d):
+	res_add=a+b+c+d
+	print("Example 5: The result of ",a,"+",b,"+",c,"+",d,"is",res_add)
+	return res_add
+g_func.append(starpu.joblib.delayed(add)(1.2, 2.5, 3.6, 4.9))
+
+#function has 2 int inputs 1 float input and 1 float output 1 int output
+def sub(a,b,c):
+	res_sub1=a-b-c
+	res_sub2=a-b
+	print ("Example 6: The result of ",a,"-",b,"-",c,"is",res_sub1,"and the result of",a,"-",b,"is",res_sub2)
+	return res_sub1, res_sub2
+g_func.append(starpu.joblib.delayed(sub)(6, 2, 5.9))
+
+print("************************")
+print("parallel Normal version:")
+print("************************")
+print("--input is iterable argument list")
+starpu.joblib.parallel(mode="normal", n_jobs=-2)(starpu.joblib.delayed(sqrt)(i**2)for i in range(10))
+
+print("--input is iterable function list")
+starpu.joblib.parallel(mode="normal", n_jobs=3)(g_func)
+
+
+print("************************")
+print("parallel Future version:")
+print("************************")
+async def main():
+	print("--input is iterable argument list")
+	L_fut1=starpu.joblib.parallel(mode="future", n_jobs=-3)(starpu.joblib.delayed(sqrt)(i**2)for i in range(10))
+	res1=[]
+	for i in range(len(L_fut1)):
+		L_res1=await L_fut1[i]
+		res1.extend(L_res1)
+	print(res1)
+
+	print("--input is iterable function list")
+	L_fut2=starpu.joblib.parallel(mode="future", n_jobs=2)(g_func)
+	res2=[]
+	for i in range(len(L_fut2)):
+		L_res2=await L_fut2[i]
+		res2.extend(L_res2)
+	print(res2)
+asyncio.run(main())