Sfoglia il codice sorgente

The unpack function does not free the buffer in cl_arg anymore. This makes it
possible to have multiple tasks reading the same argument stack multiple times
(this happens when we have a parallel task).

Cédric Augonnet 14 anni fa
parent
commit
ecac820f78
2 ha cambiato i file con 39 aggiunte e 7 eliminazioni
  1. 0 3
      src/util/starpu_insert_task.c
  2. 39 4
      src/util/starpu_insert_task_utils.c

+ 0 - 3
src/util/starpu_insert_task.c

@@ -48,9 +48,6 @@ void starpu_unpack_cl_args(void *_cl_arg, ...)
 	}
 
 	va_end(varg_list);
-
-	/* XXX this should not really be done in StarPU but well .... */
-	free(_cl_arg);
 }
 
 void starpu_insert_task(starpu_codelet *cl, ...)

+ 39 - 4
src/util/starpu_insert_task_utils.c

@@ -18,6 +18,27 @@
 #include <common/config.h>
 #include <common/utils.h>
 
+/* Deal with callbacks. The unpack function may be called multiple times when
+ * we have a parallel task, and we should not free the cl_arg parameter from
+ * the callback function. */
+struct insert_task_cb_wrapper {
+	void (*callback_func)(void *);
+	void *callback_arg;
+	void *arg_stack;
+};
+
+void starpu_task_insert_callback_wrapper(void *_cl_arg_wrapper)
+{
+	struct insert_task_cb_wrapper *cl_arg_wrapper = _cl_arg_wrapper;
+
+	/* Execute the callback specified by the application */
+	if (cl_arg_wrapper->callback_func)
+		cl_arg_wrapper->callback_func(cl_arg_wrapper->callback_arg);
+
+	/* Free the stack of arguments */
+	free(cl_arg_wrapper->arg_stack);
+}
+
 size_t starpu_insert_task_get_arg_size(va_list varg_list)
 {
 	int arg_type;
@@ -57,12 +78,22 @@ int starpu_insert_task_create_and_submit(size_t arg_buffer_size, starpu_codelet
         int arg_type;
 	unsigned current_buffer = 0;
 	unsigned char nargs = 0;
+
+	/* TODO use a single malloc to allocate the memory for arg_buffer and
+	 * the callback argument wrapper */
 	char *arg_buffer = malloc(arg_buffer_size);
+	STARPU_ASSERT(arg_buffer);
 	unsigned current_arg_offset = 0;
 
 	/* We will begin the buffer with the number of args (which is stored as a char) */
 	current_arg_offset += sizeof(char);
 
+	struct insert_task_cb_wrapper *cl_arg_wrapper = malloc(sizeof(struct insert_task_cb_wrapper));
+	STARPU_ASSERT(cl_arg_wrapper);
+
+	cl_arg_wrapper->callback_func = NULL;
+	cl_arg_wrapper->arg_stack = arg_buffer;
+
 	while((arg_type = va_arg(varg_list, int)) != 0)
 	{
 		if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type == STARPU_SCRATCH)
@@ -96,12 +127,11 @@ int starpu_insert_task_create_and_submit(size_t arg_buffer_size, starpu_codelet
 		{
 			void (*callback_func)(void *);
 			callback_func = va_arg(varg_list, void (*)(void *));
-			(*task)->callback_func = callback_func;
+			cl_arg_wrapper->callback_func = callback_func;
 		}
 		else if (arg_type==STARPU_CALLBACK_ARG) {
-			void *callback_arg;
-			callback_arg = va_arg(varg_list, void *);
-			(*task)->callback_arg = callback_arg;
+			void *callback_arg = va_arg(varg_list, void *);
+			cl_arg_wrapper->callback_arg = callback_arg;
 		}
 		else if (arg_type==STARPU_PRIORITY)
 		{
@@ -120,6 +150,11 @@ int starpu_insert_task_create_and_submit(size_t arg_buffer_size, starpu_codelet
 	(*task)->cl = cl;
 	(*task)->cl_arg = arg_buffer;
 
+	/* The callback will free the argument stack and execute the
+	 * application's callback, if any. */
+	(*task)->callback_func = starpu_task_insert_callback_wrapper;
+	(*task)->callback_arg = cl_arg_wrapper;
+
 	int ret = starpu_task_submit(*task);
 
 	if (STARPU_UNLIKELY(ret == -ENODEV))