Преглед на файлове

Patch from Benoît Lizé: Add pause/resume functions

Samuel Thibault преди 11 години
родител
ревизия
353d5502b5
променени са 8 файла, в които са добавени 201 реда и са изтрити 2 реда
  1. 1 0
      AUTHORS
  2. 36 0
      doc/doxygen/chapters/14faq.doxy
  3. 12 0
      doc/doxygen/chapters/api/initialization.doxy
  4. 3 0
      include/starpu.h
  5. 32 1
      src/core/workers.c
  6. 4 0
      src/core/workers.h
  7. 2 1
      tests/Makefile.am
  8. 111 0
      tests/main/pause_resume.c

+ 1 - 0
AUTHORS

@@ -12,6 +12,7 @@ Sylvain Henry <sylvain.henry@inria.fr>
 Andra Hugo <andra.hugo@inria.fr>
 Mehdi Juhoor <mjuhoor@gmail.com>
 Xavier Lacoste <xavier.lacoste@inria.fr>
+Benoît Lizé <benoit.lize@gmail.com>
 Antoine Lucas <antoine.lucas.33@gmail.com>
 Brice Mortier <brice.mortier@etu.u-bordeaux1.fr>
 Joris Pablo <joris.pablo@orange.fr>

+ 36 - 0
doc/doxygen/chapters/14faq.doxy

@@ -190,4 +190,40 @@ Or add the following line in the file <c>/etc/sysctl.conf</c>
 security.models.extensions.user_set_cpu_affinity=1
 \endverbatim
 
+
+\section PauseResume Interleaving StarPU and non-StarPU code
+
+If your application only partially uses StarPU, and you do not want to
+call \ref starpu_init() / \ref starpu_shutdown() at the beginning/end
+of each section, StarPU workers will poll for work between the
+sections. To avoid this behavior, you can "pause" StarPU with the \ref
+starpu_pause() function. This will prevent the StarPU workers from
+accepting new work (tasks that are already in progress will not be
+frozen), and stop them from polling for more work.
+
+Note that this does not prevent you from submitting new tasks, but
+they won't execute until \ref starpu_resume() is called. Also note
+that StarPU must not be paused when you call starpu_shutdown(), and
+that this function pair works in a push/pull manner, ie you need to
+match the number of calls to these functions to clear their effect.
+
+
+One way to use these functions could be:
+\code{.c}
+starpu_init(NULL);
+starpu_pause(); // To submit all the tasks without a single one executing
+submit_some_tasks();
+starpu_resume(); // The tasks start executing
+
+
+starpu_task_wait_for_all();
+starpu_pause(); // Stop the workers from polling
+
+// Non-StarPU code
+
+starpu_resume();
+// ...
+starpu_shutdown();
+\endcode
+
 */

+ 12 - 0
doc/doxygen/chapters/api/initialization.doxy

@@ -241,6 +241,18 @@ This is StarPU termination method. It must be called at the end of the
 application: statistics and other post-mortem debugging information
 are not guaranteed to be available until this method has been called.
 
+\fn void starpu_pause(void)
+\ingroup API_Initialization_and_Termination
+This call is used to suspend the processing of new tasks by
+workers. It can be used in a program where StarPU is used during only
+a part of the execution. Without this call, the workers continue to
+poll for new tasks in a tight loop, wasting CPU time. The symmetric
+call to \ref starpu_resume() should be used to unfreeze the workers.
+
+\fn vois starpu_resume(void)
+This is the symmetrical call to \ref starpu_pause(), used to resume
+the workers polling for new tasks.
+
 \fn int starpu_asynchronous_copy_disabled(void)
 \ingroup API_Initialization_and_Termination
 Return 1 if asynchronous data transfers between CPU and accelerators

+ 3 - 0
include/starpu.h

@@ -131,6 +131,9 @@ int starpu_init(struct starpu_conf *conf) STARPU_WARN_UNUSED_RESULT;
 
 int starpu_initialize(struct starpu_conf *user_conf, int *argc, char ***argv);
 
+void starpu_pause();
+void starpu_resume();
+
 void starpu_shutdown(void);
 
 void starpu_topology_print(FILE *f);

+ 32 - 1
src/core/workers.c

@@ -489,6 +489,7 @@ void _starpu_worker_start(struct _starpu_worker *worker, unsigned fut_key)
 static void _starpu_launch_drivers(struct _starpu_machine_config *pconfig)
 {
 	pconfig->running = 1;
+	pconfig->pause_depth = 0;
 	pconfig->submitting = 1;
 	STARPU_HG_DISABLE_CHECKING(pconfig->watchdog_ok);
 
@@ -1178,17 +1179,44 @@ out:
 	}
 }
 
+/* Condition variable and mutex used to pause/resume. */
+static starpu_pthread_cond_t pause_cond = STARPU_PTHREAD_COND_INITIALIZER;
+static starpu_pthread_mutex_t pause_mutex = STARPU_PTHREAD_MUTEX_INITIALIZER;
 unsigned _starpu_machine_is_running(void)
 {
 	unsigned ret;
-	/* running is just protected by a memory barrier */
+	/* running and pause_depth are just protected by a memory barrier */
 	STARPU_RMB();
+
+	if (STARPU_UNLIKELY(config.pause_depth > 0)) {
+		STARPU_PTHREAD_MUTEX_LOCK(&pause_mutex);
+		if (config.pause_depth > 0) {
+			STARPU_PTHREAD_COND_WAIT(&pause_cond, &pause_mutex);
+		}
+		STARPU_PTHREAD_MUTEX_UNLOCK(&pause_mutex);
+	}
+
 	ANNOTATE_HAPPENS_AFTER(&config.running);
 	ret = config.running;
 	ANNOTATE_HAPPENS_BEFORE(&config.running);
 	return ret;
 }
 
+void starpu_pause()
+{
+	config.pause_depth += 1;
+}
+
+void starpu_resume()
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&pause_mutex);
+	config.pause_depth -= 1;
+	if (!config.pause_depth) {
+		STARPU_PTHREAD_COND_BROADCAST(&pause_cond);
+	}
+	STARPU_PTHREAD_MUTEX_UNLOCK(&pause_mutex);
+}
+
 unsigned _starpu_worker_can_block(unsigned memnode STARPU_ATTRIBUTE_UNUSED)
 {
 #ifdef STARPU_NON_BLOCKING_DRIVERS
@@ -1243,6 +1271,9 @@ void starpu_shutdown(void)
 	initialized = CHANGING;
 	STARPU_PTHREAD_MUTEX_UNLOCK(&init_mutex);
 
+	/* If the workers are frozen, no progress can be made. */
+	STARPU_ASSERT(config.pause_depth <= 0);
+
 	starpu_task_wait_for_no_ready();
 
 	/* tell all workers to shutdown */

+ 4 - 0
src/core/workers.h

@@ -316,6 +316,10 @@ struct _starpu_machine_config
 	/* this flag is set until the runtime is stopped */
 	unsigned running;
 
+	/* Number of calls to starpu_pause() - calls to starpu_resume(). When >0,
+	 * StarPU should pause. */
+	int pause_depth;
+
 	/* all the sched ctx of the current instance of starpu */
 	struct _starpu_sched_ctx sched_ctxs[STARPU_NMAX_SCHED_CTXS];
 

+ 2 - 1
tests/Makefile.am

@@ -1,6 +1,6 @@
 # StarPU --- Runtime system for heterogeneous multicore architectures.
 #
-# Copyright (C) 2009-2013  Université de Bordeaux 1
+# Copyright (C) 2009-2014  Université de Bordeaux 1
 # Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
 # Copyright (C) 2010, 2011, 2012  Institut National de Recherche en Informatique et Automatique
 #
@@ -144,6 +144,7 @@ noinst_PROGRAMS =				\
 	main/starpu_init			\
 	main/starpu_worker_exists		\
 	main/submit				\
+	main/pause_resume			\
 	main/codelet_null_callback		\
 	datawizard/allocate			\
 	datawizard/acquire_cb			\

+ 111 - 0
tests/main/pause_resume.c

@@ -0,0 +1,111 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010-2011, 2013  Université de Bordeaux 1
+ * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <sys/time.h>
+#include <stdio.h>
+#include <unistd.h>
+
+#include <starpu.h>
+#include "../helper.h"
+
+#ifdef STARPU_QUICK_CHECK
+static unsigned ntasks = 64;
+#else
+static unsigned ntasks = 200000;
+#endif
+
+static void dummy_func(void *descr[] STARPU_ATTRIBUTE_UNUSED, void *arg STARPU_ATTRIBUTE_UNUSED)
+{
+}
+
+static struct starpu_codelet dummy_codelet =
+{
+	.cpu_funcs = {dummy_func, NULL},
+	.cuda_funcs = {dummy_func, NULL},
+	.opencl_funcs = {dummy_func, NULL},
+	.model = NULL,
+	.nbuffers = 0
+};
+
+
+int main(int argc, char **argv)
+{
+	double timing;
+	struct timeval start;
+	struct timeval end;
+	int ret;
+
+#ifdef STARPU_HAVE_VALGRIND_H
+	if(RUNNING_ON_VALGRIND) ntasks = 5;
+#endif
+
+	ret = starpu_init(NULL);
+	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	/* Check that we can submit tasks to a "paused" StarPU and then have
+	 * it run normally.
+	 */
+	starpu_pause();
+	unsigned i;
+	for (i = 0; i < ntasks; i++)
+	{
+		ret = starpu_insert_task(&dummy_codelet, 0);
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert");
+	}
+
+	gettimeofday(&start, NULL);
+	starpu_resume();
+	starpu_task_wait_for_all();
+	gettimeofday(&end, NULL);
+	timing = (double)((end.tv_sec - start.tv_sec)*1000000 + (end.tv_usec - start.tv_usec));
+
+	FPRINTF(stderr, "Without interruptions:\n\tTotal: %f secs\n", timing/1000000);
+	FPRINTF(stderr, "\tPer task: %f usecs\n", timing/ntasks);
+
+	/* Do the same thing, but with a lot of interuptions to see if there
+	 * is any overhead associated with the pause/resume calls.
+	 */
+	starpu_pause();
+	for (i = 0; i < ntasks; i++) {
+		ret = starpu_insert_task(&dummy_codelet, 0);
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert");
+	}
+	starpu_resume();
+
+	gettimeofday(&start, NULL);
+	for (i = 0; i < 100; i++) {
+		starpu_pause();
+		starpu_resume();
+	}
+	starpu_task_wait_for_all();
+	gettimeofday(&end, NULL);
+	timing = (double)((end.tv_sec - start.tv_sec)*1000000 + (end.tv_usec - start.tv_usec));
+
+	FPRINTF(stderr, "With 100 interruptions:\n\tTotal: %f secs\n", timing/1000000);
+	FPRINTF(stderr, "\tPer task: %f usecs\n", timing/ntasks);
+
+	/* Finally, check that the nesting of pause/resume calls works. */
+	starpu_pause();
+	starpu_pause();
+	starpu_resume();
+	starpu_resume();
+
+	starpu_shutdown();
+
+	return EXIT_SUCCESS;
+}