starpu_task_wrapper.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #undef NDEBUG
  17. #include <stdio.h>
  18. #include <stdlib.h>
  19. #include <string.h>
  20. #include <starpu.h>
  21. #define PY_SSIZE_T_CLEAN
  22. #include <Python.h>
  23. #ifdef STARPU_PYTHON_HAVE_NUMPY
  24. #include <numpy/arrayobject.h>
  25. #endif
  26. /*macro*/
  27. #if defined(Py_DEBUG) || defined(DEBUG)
  28. extern void _Py_CountReferences(FILE*);
  29. #define CURIOUS(x) { fprintf(stderr, __FILE__ ":%d ", __LINE__); x; }
  30. #else
  31. #define CURIOUS(x)
  32. #endif
  33. #define MARKER() CURIOUS(fprintf(stderr, "\n"))
  34. #define DESCRIBE(x) CURIOUS(fprintf(stderr, " " #x "=%d\n", x))
  35. #define DESCRIBE_HEX(x) CURIOUS(fprintf(stderr, " " #x "=%08x\n", x))
  36. #define COUNTREFS() CURIOUS(_Py_CountReferences(stderr))
  37. /*******/
  38. /*********************Functions passed in task_submit wrapper***********************/
  39. static PyObject *asyncio_module; /*python asyncio library*/
  40. /*structure contains parameters which are passed to starpu_task.cl_arg*/
  41. struct codelet_args
  42. {
  43. PyObject *f; /*the python function passed in*/
  44. PyObject *argList; /*argument list of python function passed in*/
  45. PyObject *rv; /*return value when using PyObject_CallObject call the function f*/
  46. PyObject *fut; /*asyncio.Future*/
  47. PyObject *lp; /*asyncio.Eventloop*/
  48. };
  49. /*function passed to starpu_codelet.cpu_func*/
  50. void codelet_func(void *buffers[], void *cl_arg)
  51. {
  52. struct codelet_args *cst = (struct codelet_args*) cl_arg;
  53. /*make sure we own the GIL*/
  54. PyGILState_STATE state = PyGILState_Ensure();
  55. /*verify that the function is a proper callable*/
  56. if (!PyCallable_Check(cst->f))
  57. {
  58. printf("py_callback: expected a callable function\n");
  59. exit(1);
  60. }
  61. /*check the arguments of python function passed in*/
  62. for (int i=0; i < PyTuple_Size(cst->argList); i++)
  63. {
  64. PyObject *obj = PyTuple_GetItem(cst->argList, i);
  65. const char *tp = Py_TYPE(obj)->tp_name;
  66. if(strcmp(tp, "_asyncio.Future") == 0)
  67. {
  68. /*if one of arguments is Future, get its result*/
  69. PyObject *fut_result = PyObject_CallMethod(obj, "result", NULL);
  70. /*replace the Future argument to its result*/
  71. PyTuple_SetItem(cst->argList, i, fut_result);
  72. }
  73. /*else if (strcmp(tp, "numpy.ndarray")==0)
  74. {
  75. printf("array is %p\n", obj);
  76. }*/
  77. }
  78. /*call the python function*/
  79. PyObject *pRetVal = PyObject_CallObject(cst->f, cst->argList);
  80. //const char *tp = Py_TYPE(pRetVal)->tp_name;
  81. //printf("return value type is %s\n", tp);
  82. cst->rv = pRetVal;
  83. //Py_DECREF(cst->f);
  84. /*restore previous GIL state*/
  85. PyGILState_Release(state);
  86. }
  87. /*function passed to starpu_task.callback_func*/
  88. void cb_func(void *v)
  89. {
  90. struct starpu_task *task = starpu_task_get_current();
  91. struct codelet_args *cst = (struct codelet_args*) task->cl_arg;
  92. /*make sure we own the GIL*/
  93. PyGILState_STATE state = PyGILState_Ensure();
  94. /*set the Future result and mark the Future as done*/
  95. PyObject *set_result = PyObject_GetAttrString(cst->fut, "set_result");
  96. PyObject *loop_callback = PyObject_CallMethod(cst->lp, "call_soon_threadsafe", "(O,O)", set_result, cst->rv);
  97. Py_DECREF(loop_callback);
  98. Py_DECREF(set_result);
  99. Py_DECREF(cst->rv);
  100. Py_DECREF(cst->fut);
  101. Py_DECREF(cst->lp);
  102. Py_DECREF(cst->argList);
  103. //Py_DECREF(perfmodel);
  104. struct starpu_codelet *func_cl=(struct starpu_codelet *) task->cl;
  105. if (func_cl->model != NULL)
  106. {
  107. struct starpu_perfmodel *perf =(struct starpu_perfmodel *) func_cl->model;
  108. PyObject *perfmodel=PyCapsule_New(perf, "Perf", 0);
  109. Py_DECREF(perfmodel);
  110. }
  111. /*restore previous GIL state*/
  112. PyGILState_Release(state);
  113. /*deallocate task*/
  114. free(task->cl);
  115. free(task->cl_arg);
  116. if (task->name!=NULL)
  117. {
  118. free(task->name);
  119. }
  120. }
  121. /***********************************************************************************/
  122. /*PyObject*->struct starpu_task**/
  123. static struct starpu_task *PyTask_AsTask(PyObject *obj)
  124. {
  125. return (struct starpu_task *) PyCapsule_GetPointer(obj, "Task");
  126. }
  127. /* destructor function for task */
  128. static void del_Task(PyObject *obj)
  129. {
  130. struct starpu_task *obj_task=PyTask_AsTask(obj);
  131. obj_task->destroy=1; /*XXX we should call starpu task destroy*/
  132. }
  133. /*struct starpu_task*->PyObject**/
  134. static PyObject *PyTask_FromTask(struct starpu_task *task)
  135. {
  136. return PyCapsule_New(task, "Task", del_Task);
  137. }
  138. /***********************************************************************************/
  139. static size_t sizebase (struct starpu_task *task, unsigned nimpl)
  140. {
  141. int n=0;
  142. struct codelet_args *cst = (struct codelet_args*) task->cl_arg;
  143. /*get the result of function*/
  144. PyObject *obj=cst->rv;
  145. /*get the length of result*/
  146. const char *tp = Py_TYPE(obj)->tp_name;
  147. #ifdef STARPU_PYTHON_HAVE_NUMPY
  148. /*if the result is a numpy array*/
  149. if (strcmp(tp, "numpy.ndarray")==0)
  150. n = PyArray_SIZE(obj);
  151. else
  152. #endif
  153. /*if the result is a list*/
  154. if (strcmp(tp, "list")==0)
  155. n = PyList_Size(obj);
  156. /*else error*/
  157. else
  158. {
  159. printf("starpu_perfmodel::size_base: the type of function result is unrecognized\n");
  160. exit(1);
  161. }
  162. return n;
  163. }
  164. static void del_Perf(PyObject *obj)
  165. {
  166. struct starpu_perfmodel *perf=(struct starpu_perfmodel*)PyCapsule_GetPointer(obj, "Perf");
  167. free(perf);
  168. }
  169. /*initialization of perfmodel*/
  170. static PyObject* init_perfmodel(PyObject *self, PyObject *args)
  171. {
  172. char *sym;
  173. if (!PyArg_ParseTuple(args, "s", &sym))
  174. return NULL;
  175. /*allocate a perfmodel structure*/
  176. struct starpu_perfmodel *perf=(struct starpu_perfmodel*)calloc(1, sizeof(struct starpu_perfmodel));
  177. /*get the perfmodel symbol*/
  178. char *p =strdup(sym);
  179. perf->symbol=p;
  180. perf->type=STARPU_HISTORY_BASED;
  181. /*struct perfmodel*->PyObject**/
  182. PyObject *perfmodel=PyCapsule_New(perf, "Perf", NULL);
  183. return perfmodel;
  184. }
  185. /*free perfmodel*/
  186. static PyObject* free_perfmodel(PyObject *self, PyObject *args)
  187. {
  188. PyObject *perfmodel;
  189. if (!PyArg_ParseTuple(args, "O", &perfmodel))
  190. return NULL;
  191. /*PyObject*->struct perfmodel**/
  192. struct starpu_perfmodel *perf=PyCapsule_GetPointer(perfmodel, "Perf");
  193. starpu_save_history_based_model(perf);
  194. //starpu_perfmodel_unload_model(perf);
  195. //free(perf->symbol);
  196. starpu_perfmodel_deinit(perf);
  197. free(perf);
  198. /*return type is void*/
  199. Py_INCREF(Py_None);
  200. return Py_None;
  201. }
  202. static PyObject* starpu_save_history_based_model_wrapper(PyObject *self, PyObject *args)
  203. {
  204. PyObject *perfmodel;
  205. if (!PyArg_ParseTuple(args, "O", &perfmodel))
  206. return NULL;
  207. /*PyObject*->struct perfmodel**/
  208. struct starpu_perfmodel *perf=PyCapsule_GetPointer(perfmodel, "Perf");
  209. starpu_save_history_based_model(perf);
  210. /*return type is void*/
  211. Py_INCREF(Py_None);
  212. return Py_None;
  213. }
  214. /*****************************Wrappers of StarPU methods****************************/
  215. /*wrapper submit method*/
  216. static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
  217. {
  218. /*get the running Event loop*/
  219. PyObject *loop = PyObject_CallMethod(asyncio_module, "get_running_loop", NULL);
  220. /*create a asyncio.Future object*/
  221. PyObject *fut = PyObject_CallMethod(loop, "create_future", NULL);
  222. /*first argument in args is always the python function passed in*/
  223. PyObject *func_py = PyTuple_GetItem(args, 0);
  224. Py_INCREF(func_py);
  225. /*allocate a task structure and initialize it with default values*/
  226. struct starpu_task *task=starpu_task_create();
  227. task->destroy=0;
  228. PyObject *PyTask=PyTask_FromTask(task);
  229. /*set one of fut attribute to the task pointer*/
  230. PyObject_SetAttrString(fut, "starpu_task", PyTask);
  231. /*check the arguments of python function passed in*/
  232. for (int i=1; i < PyTuple_Size(args)-1; i++)
  233. {
  234. PyObject *obj=PyTuple_GetItem(args, i);
  235. const char* tp = Py_TYPE(obj)->tp_name;
  236. if(strcmp(tp, "_asyncio.Future") == 0)
  237. {
  238. /*if one of arguments is Future, get its corresponding task*/
  239. PyObject *fut_task=PyObject_GetAttrString(obj, "starpu_task");
  240. /*declare task dependencies between the current task and the corresponding task of Future argument*/
  241. starpu_task_declare_deps(task, 1, PyTask_AsTask(fut_task));
  242. Py_DECREF(fut_task);
  243. }
  244. }
  245. /*allocate a codelet structure*/
  246. struct starpu_codelet *func_cl=(struct starpu_codelet*)malloc(sizeof(struct starpu_codelet));
  247. /*initialize func_cl with default values*/
  248. starpu_codelet_init(func_cl);
  249. func_cl->cpu_funcs[0]=&codelet_func;
  250. /*check whether the option perfmodel is None*/
  251. PyObject *dict_option = PyTuple_GetItem(args, PyTuple_Size(args)-1);/*the last argument is the option dictionary*/
  252. PyObject *perfmodel = PyDict_GetItemString(dict_option, "perfmodel");
  253. const char *tp_perf = Py_TYPE(perfmodel)->tp_name;
  254. if (strcmp(tp_perf, "PyCapsule")==0)
  255. {
  256. /*PyObject*->struct perfmodel**/
  257. struct starpu_perfmodel *perf=PyCapsule_GetPointer(perfmodel, "Perf");
  258. func_cl->model=perf;
  259. Py_INCREF(perfmodel);
  260. }
  261. /*allocate a new codelet structure to pass the python function, asyncio.Future and Event loop*/
  262. struct codelet_args *cst = (struct codelet_args*)malloc(sizeof(struct codelet_args));
  263. cst->f = func_py;
  264. cst->fut = fut;
  265. cst->lp = loop;
  266. Py_INCREF(fut);
  267. Py_INCREF(loop);
  268. /*pass args in argList*/
  269. if (PyTuple_Size(args)==2)/*function no arguments*/
  270. cst->argList = PyTuple_New(0);
  271. else
  272. {/*function has arguments*/
  273. cst->argList = PyTuple_New(PyTuple_Size(args)-2);
  274. for (int i=0; i < PyTuple_Size(args)-2; i++)
  275. {
  276. PyObject *tmp=PyTuple_GetItem(args, i+1);
  277. PyTuple_SetItem(cst->argList, i, tmp);
  278. Py_INCREF(PyTuple_GetItem(cst->argList, i));
  279. }
  280. }
  281. task->cl=func_cl;
  282. task->cl_arg=cst;
  283. /*pass optional values name=None, synchronous=1, priority=0, color=None, flops=None, perfmodel=None*/
  284. /*const char * name*/
  285. PyObject *PyName = PyDict_GetItemString(dict_option, "name");
  286. const char *name_type = Py_TYPE(PyName)->tp_name;
  287. if (strcmp(name_type, "NoneType")!=0)
  288. {
  289. PyObject *pStrObj = PyUnicode_AsUTF8String(PyName);
  290. char* name_str = PyBytes_AsString(pStrObj);
  291. char* name = strdup(name_str);
  292. //printf("name is %s\n", name);
  293. task->name=name;
  294. Py_DECREF(pStrObj);
  295. }
  296. /*unsigned synchronous:1*/
  297. PyObject *PySync = PyDict_GetItemString(dict_option, "synchronous");
  298. unsigned sync=PyLong_AsUnsignedLong(PySync);
  299. //printf("sync is %u\n", sync);
  300. task->synchronous=sync;
  301. /*int priority*/
  302. PyObject *PyPrio = PyDict_GetItemString(dict_option, "priority");
  303. int prio=PyLong_AsLong(PyPrio);
  304. //printf("prio is %d\n", prio);
  305. task->priority=prio;
  306. /*unsigned color*/
  307. PyObject *PyColor = PyDict_GetItemString(dict_option, "color");
  308. const char *color_type = Py_TYPE(PyColor)->tp_name;
  309. if (strcmp(color_type, "NoneType")!=0)
  310. {
  311. unsigned color=PyLong_AsUnsignedLong(PyColor);
  312. //printf("color is %u\n", color);
  313. task->color=color;
  314. }
  315. /*double flops*/
  316. PyObject *PyFlops = PyDict_GetItemString(dict_option, "flops");
  317. const char *flops_type = Py_TYPE(PyFlops)->tp_name;
  318. if (strcmp(flops_type, "NoneType")!=0)
  319. {
  320. double flops=PyFloat_AsDouble(PyFlops);
  321. //printf("flops is %f\n", flop);
  322. task->flops=flops;
  323. }
  324. task->callback_func=&cb_func;
  325. /*call starpu_task_submit method*/
  326. Py_BEGIN_ALLOW_THREADS
  327. int ret = starpu_task_submit(task);
  328. assert(ret==0);
  329. Py_END_ALLOW_THREADS
  330. if (strcmp(tp_perf, "PyCapsule")==0)
  331. {
  332. struct starpu_perfmodel *perf =(struct starpu_perfmodel *) func_cl->model;
  333. perf->size_base=&sizebase;
  334. }
  335. //printf("the number of reference is %ld\n", Py_REFCNT(func_py));
  336. //_Py_PrintReferences(stderr);
  337. //COUNTREFS();
  338. return fut;
  339. }
  340. /*wrapper wait for all method*/
  341. static PyObject* starpu_task_wait_for_all_wrapper(PyObject *self, PyObject *args)
  342. {
  343. /*call starpu_task_wait_for_all method*/
  344. Py_BEGIN_ALLOW_THREADS
  345. starpu_task_wait_for_all();
  346. Py_END_ALLOW_THREADS
  347. /*return type is void*/
  348. Py_INCREF(Py_None);
  349. return Py_None;
  350. }
  351. /*wrapper pause method*/
  352. static PyObject* starpu_pause_wrapper(PyObject *self, PyObject *args)
  353. {
  354. /*call starpu_pause method*/
  355. starpu_pause();
  356. /*return type is void*/
  357. Py_INCREF(Py_None);
  358. return Py_None;
  359. }
  360. /*wrapper resume method*/
  361. static PyObject* starpu_resume_wrapper(PyObject *self, PyObject *args)
  362. {
  363. /*call starpu_resume method*/
  364. starpu_resume();
  365. /*return type is void*/
  366. Py_INCREF(Py_None);
  367. return Py_None;
  368. }
  369. /*wrapper get count cpu method*/
  370. static PyObject* starpu_cpu_worker_get_count_wrapper(PyObject *self, PyObject *args)
  371. {
  372. /*call starpu_cpu_worker_get_count method*/
  373. int num_cpu=starpu_cpu_worker_get_count();
  374. /*return type is unsigned*/
  375. return Py_BuildValue("I", num_cpu);
  376. }
  377. /*wrapper get min priority method*/
  378. static PyObject* starpu_sched_get_min_priority_wrapper(PyObject *self, PyObject *args)
  379. {
  380. /*call starpu_sched_get_min_priority*/
  381. int min_prio=starpu_sched_get_min_priority();
  382. /*return type is int*/
  383. return Py_BuildValue("i", min_prio);
  384. }
  385. /*wrapper get max priority method*/
  386. static PyObject* starpu_sched_get_max_priority_wrapper(PyObject *self, PyObject *args)
  387. {
  388. /*call starpu_sched_get_max_priority*/
  389. int max_prio=starpu_sched_get_max_priority();
  390. /*return type is int*/
  391. return Py_BuildValue("i", max_prio);
  392. }
  393. /*wrapper get the number of no completed submitted tasks method*/
  394. static PyObject* starpu_task_nsubmitted_wrapper(PyObject *self, PyObject *args)
  395. {
  396. /*call starpu_task_nsubmitted*/
  397. int num_task=starpu_task_nsubmitted();
  398. /*Return the number of submitted tasks which have not completed yet */
  399. return Py_BuildValue("i", num_task);
  400. }
  401. /***********************************************************************************/
  402. /***************The module’s method table and initialization function**************/
  403. /*method table*/
  404. static PyMethodDef starpupyMethods[] =
  405. {
  406. {"_task_submit", starpu_task_submit_wrapper, METH_VARARGS, "submit the task"}, /*submit method*/
  407. {"task_wait_for_all", starpu_task_wait_for_all_wrapper, METH_VARARGS, "wait the task"}, /*wait for all method*/
  408. {"pause", starpu_pause_wrapper, METH_VARARGS, "suspend the processing of new tasks by workers"}, /*pause method*/
  409. {"resume", starpu_resume_wrapper, METH_VARARGS, "resume the workers polling for new tasks"}, /*resume method*/
  410. {"cpu_worker_get_count", starpu_cpu_worker_get_count_wrapper, METH_VARARGS, "return the number of CPUs controlled by StarPU"}, /*get count cpu method*/
  411. {"init_perfmodel", init_perfmodel, METH_VARARGS, "initialize struct starpu_perfmodel"}, /*initialize perfmodel*/
  412. {"free_perfmodel", free_perfmodel, METH_VARARGS, "free struct starpu_perfmodel"}, /*free perfmodel*/
  413. {"save_history_based_model", starpu_save_history_based_model_wrapper, METH_VARARGS, "save the performance model"}, /*save the performance model*/
  414. {"sched_get_min_priority", starpu_sched_get_min_priority_wrapper, METH_VARARGS, "get the number of min priority"}, /*get the number of min priority*/
  415. {"sched_get_max_priority", starpu_sched_get_max_priority_wrapper, METH_VARARGS, "get the number of max priority"}, /*get the number of max priority*/
  416. {"task_nsubmitted", starpu_task_nsubmitted_wrapper, METH_VARARGS, "get the number of submitted tasks which have not completed yet"}, /*get the number of submitted tasks which have not completed yet*/
  417. {NULL, NULL}
  418. };
  419. /*deallocation function*/
  420. static void starpupyFree(void *self)
  421. {
  422. starpu_shutdown();
  423. Py_DECREF(asyncio_module);
  424. //COUNTREFS();
  425. }
  426. /*module definition structure*/
  427. static struct PyModuleDef starpupymodule =
  428. {
  429. PyModuleDef_HEAD_INIT,
  430. "starpupy", /*name of module*/
  431. NULL,
  432. -1,
  433. starpupyMethods, /*method table*/
  434. NULL,
  435. NULL,
  436. NULL,
  437. starpupyFree /*deallocation function*/
  438. };
  439. /*initialization function*/
  440. PyMODINIT_FUNC
  441. PyInit_starpupy(void)
  442. {
  443. PyEval_InitThreads();
  444. /*starpu initialization*/
  445. int ret = starpu_init(NULL);
  446. assert(ret==0);
  447. /*python asysncio import*/
  448. asyncio_module = PyImport_ImportModule("asyncio");
  449. #ifdef STARPU_PYTHON_HAVE_NUMPY
  450. /*numpy import array*/
  451. import_array();
  452. #endif
  453. /*module import initialization*/
  454. return PyModule_Create(&starpupymodule);
  455. }
  456. /***********************************************************************************/