Browse Source

The "regenerate" flag of the task structure indicates that the task structure
should be submitted to StarPU again after it has been executed.

Cédric Augonnet 15 years ago
parent
commit
f30ae2d243
7 changed files with 157 additions and 10 deletions
  1. 6 4
      include/starpu-task.h
  2. 18 2
      src/core/jobs.c
  3. 10 3
      src/core/task.c
  4. 2 0
      src/core/task.h
  5. 1 1
      src/datawizard/memalloc.h
  6. 4 0
      tests/Makefile.am
  7. 116 0
      tests/core/regenerate.c

+ 6 - 4
include/starpu-task.h

@@ -51,9 +51,6 @@ typedef struct starpu_codelet_t {
 	uint32_t where;
 
 	/* the different implementations of the codelet */
-	//void (*cuda_func)(starpu_data_interface_t *, void *);
-	//void (*cpu_func)(starpu_data_interface_t *, void *);
-
 	void (*cuda_func)(void **, void *);
 	void (*cpu_func)(void **, void *);
 	uint8_t gordon_func;
@@ -74,7 +71,6 @@ struct starpu_task {
 
 	/* arguments managed by the DSM */
 	struct starpu_buffer_descr_t buffers[STARPU_NMAXBUFS];
-	//starpu_data_interface_t interface[STARPU_NMAXBUFS];
 	void *interface[STARPU_NMAXBUFS];
 
 	/* arguments not managed by the DSM are given as a buffer */
@@ -113,6 +109,11 @@ struct starpu_task {
 	 * behaviour. */
 	int destroy;
 
+	/* If this flag is set, the task will be re-submitted to StarPU once it
+	 * has been executed. This flag must not be set if the destroy flag is
+	 * set too. */ 
+	int regenerate;
+
 	/* this is private to StarPU, do not modify. If the task is allocated
 	 * by hand (without starpu_task_create), this field should be set to
 	 * NULL. */
@@ -135,6 +136,7 @@ struct starpu_task {
 	.execute_on_a_specific_worker = 0,		\
 	.detach = 1,					\
 	.destroy = 0,					\
+	.regenerate = 0,				\
 	.starpu_private = NULL				\
 };
 

+ 18 - 2
src/core/jobs.c

@@ -98,6 +98,13 @@ void _starpu_handle_job_termination(job_t j)
 		_starpu_set_local_worker_status(STATUS_UNKNOWN);
 	}
 
+	/* NB: we do not save those values before the callback, in case the
+	 * application changes some parameters eventually (eg. a task may not
+	 * be generated if the application is terminated). */
+	int destroy = task->destroy;
+	int detach = task->detach;
+	int regenerate = task->regenerate;
+
 	if (!task->detach)
 	{
 		/* we do not desallocate the job structure if some is going to
@@ -110,14 +117,23 @@ void _starpu_handle_job_termination(job_t j)
 	else {
 		/* no one is going to synchronize with that task so we release
  		 * the data structures now */
-		if (task->detach)
+		if (detach && !regenerate)
 			job_delete(j);
 
-		if (task->destroy)
+		if (destroy)
 			free(task);
 	}
 
 	_starpu_decrement_nsubmitted_tasks();
+
+	if (regenerate)
+	{
+		STARPU_ASSERT(detach && !destroy && !task->synchronous);
+
+		/* We reuse the same job structure */
+		int ret = _starpu_submit_job(j);
+		STARPU_ASSERT(!ret);
+	}	
 }
 
 /* This function is called when a new task is submitted to StarPU 

+ 10 - 3
src/core/task.c

@@ -51,6 +51,8 @@ void starpu_task_init(struct starpu_task *task)
 	 * tasks */
 	task->destroy = 0;
 
+	task->regenerate = 0;
+
 	task->starpu_private = NULL;
 }
 
@@ -106,6 +108,13 @@ int starpu_wait_task(struct starpu_task *task)
 	return 0;
 }
 
+int _starpu_submit_job(job_t j)
+{
+	_starpu_increment_nsubmitted_tasks();
+
+	return _starpu_enforce_deps_and_schedule(j);
+}
+
 /* application should submit new tasks to StarPU through this function */
 int starpu_submit_task(struct starpu_task *task)
 {
@@ -145,9 +154,7 @@ int starpu_submit_task(struct starpu_task *task)
 
 	task->starpu_private = j;
 
-	_starpu_increment_nsubmitted_tasks();
-
-	ret = _starpu_enforce_deps_and_schedule(j);
+	ret = _starpu_submit_job(j);
 
 	/* XXX modify when we'll have starpu_wait_task */
 	if (is_sync)

+ 2 - 0
src/core/task.h

@@ -25,4 +25,6 @@
 void _starpu_increment_nsubmitted_tasks(void);
 void _starpu_decrement_nsubmitted_tasks(void);
 
+int _starpu_submit_job(job_t j);
+
 #endif // __CORE_TASK_H__

+ 1 - 1
src/datawizard/memalloc.h

@@ -35,7 +35,7 @@ LIST_TYPE(mem_chunk,
 	/* The footprint of the data is not sufficient to determine whether two
 	 * pieces of data have the same layout (there could be collision in the
 	 * hash function ...) so we still keep a copy of the actual layout (ie.
-	 * the starpu_data_interface_t) to stay on the safe side. We make a copy of
+	 * the data interface) to stay on the safe side. We make a copy of
 	 * because when a data is deleted, the memory chunk remains.
 	 */
 	struct starpu_data_interface_ops_t *ops;

+ 4 - 0
tests/Makefile.am

@@ -78,6 +78,7 @@ check_PROGRAMS += 				\
 	core/static_restartable			\
 	core/static_restartable_using_initializer\
 	core/static_restartable_tag		\
+	core/regenerate				\
 	core/empty_task				\
 	core/empty_task_sync_point		\
 	core/tag-wait-api			\
@@ -127,6 +128,9 @@ core_static_restartable_using_initializer_SOURCES =		\
 core_static_restartable_tag_SOURCES =		\
 	core/static_restartable_tag.c
 
+core_regenerate_SOURCES =			\
+	core/regenerate.c
+
 core_empty_task_SOURCES =			\
 	core/empty_task.c
 

+ 116 - 0
tests/core/regenerate.c

@@ -0,0 +1,116 @@
+/*
+ * StarPU
+ * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <sys/time.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <starpu.h>
+
+static unsigned ntasks = 65536;
+static unsigned cnt = 0;
+
+static unsigned completed = 0;
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
+
+static void callback(void *arg)
+{
+	struct starpu_task *task = arg;
+
+	cnt++;
+
+	if (cnt == ntasks)
+	{
+		task->regenerate = 0;
+		fprintf(stderr, "Stop !\n");
+
+		pthread_mutex_lock(&mutex);
+		completed = 1;
+		pthread_cond_signal(&cond);
+		pthread_mutex_unlock(&mutex);
+	}
+}
+
+static void dummy_func(void *descr[] __attribute__ ((unused)), void *arg __attribute__ ((unused)))
+{
+}
+
+static starpu_codelet dummy_codelet = 
+{
+	.where = STARPU_CPU|STARPU_CUDA,
+	.cpu_func = dummy_func,
+	.cuda_func = dummy_func,
+	.model = NULL,
+	.nbuffers = 0
+};
+
+static void parse_args(int argc, char **argv)
+{
+	int c;
+	while ((c = getopt(argc, argv, "i:")) != -1)
+	switch(c) {
+		case 'i':
+			ntasks = atoi(optarg);
+			break;
+	}
+}
+
+int main(int argc, char **argv)
+{
+	unsigned i;
+	double timing;
+	struct timeval start;
+	struct timeval end;
+
+	parse_args(argc, argv);
+
+	starpu_init(NULL);
+
+	struct starpu_task task;
+
+	starpu_task_init(&task);
+
+	task.cl = &dummy_codelet;
+	task.regenerate = 1;
+	task.detach = 1;
+
+	task.callback_func = callback;
+	task.callback_arg = &task;
+
+	fprintf(stderr, "#tasks : %d\n", ntasks);
+
+	gettimeofday(&start, NULL);
+
+	starpu_submit_task(&task);
+
+	pthread_mutex_lock(&mutex);
+	if (!completed)
+		pthread_cond_wait(&cond, &mutex);
+	pthread_mutex_unlock(&mutex);
+
+	gettimeofday(&end, NULL);
+
+	timing = (double)((end.tv_sec - start.tv_sec)*1000000
+				+ (end.tv_usec - start.tv_usec));
+
+	fprintf(stderr, "Total: %lf secs\n", timing/1000000);
+	fprintf(stderr, "Per task: %lf usecs\n", timing/ntasks);
+
+	starpu_shutdown();
+
+	return 0;
+}