瀏覽代碼

wake up workers before shutdown

Andra Hugo 11 年之前
父節點
當前提交
9a2c69cac2
共有 5 個文件被更改,包括 48 次插入23 次删除
  1. 33 11
      src/core/sched_ctx.c
  2. 8 7
      src/core/task.c
  3. 2 0
      src/core/workers.h
  4. 4 4
      src/debug/traces/starpu_fxt.c
  5. 1 1
      tests/main/driver_api/run_driver.c

+ 33 - 11
src/core/sched_ctx.c

@@ -1,4 +1,4 @@
-/* StarPU --- Runtime system for heterogeneous multicore architectures.
+* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2011, 2013  INRIA
  *
@@ -847,18 +847,40 @@ void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
 			return;
 		}
 		STARPU_PTHREAD_MUTEX_UNLOCK(&finished_submit_mutex);
-		/* FIXME: */
-		/* We also need to check for config->submitting = 0 (i.e. the
-		 * user calle starpu_drivers_request_termination()), in which
-		 * case we need to set config->running to 0 and wake workers,
-		 * so they can terminate, just like
-		 * starpu_drivers_request_termination() does.
-		 *
-		 * Set FIXME to 1 in tests/main/driver_api/run_driver.c to
-		 * check it is actually fixed.
-		 */
 	}
 
+	/* We also need to check for config->submitting = 0 (i.e. the
+	 * user calle starpu_drivers_request_termination()), in which
+	 * case we need to set config->running to 0 and wake workers,
+	 * so they can terminate, just like
+	 * starpu_drivers_request_termination() does.
+	 */
+
+	STARPU_PTHREAD_MUTEX_LOCK(&config->submitted_mutex);
+	if(config->submitting == 0)
+	{
+		STARPU_PTHREAD_RWLOCK_RDLOCK(&changing_ctx_mutex[sched_ctx_id]);
+		if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
+		{
+			if(sched_ctx->close_callback)
+				sched_ctx->close_callback(sched_ctx->id, sched_ctx->close_args);
+		}
+		STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
+
+		ANNOTATE_HAPPENS_AFTER(&config->running);
+		config->running = 0;
+		ANNOTATE_HAPPENS_BEFORE(&config->running);
+		int s;
+		for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
+		{
+			if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
+			{
+				_starpu_check_nsubmitted_tasks_of_sched_ctx(config->sched_ctxs[s].id);
+			}
+		}
+	}
+	STARPU_PTHREAD_MUTEX_UNLOCK(&config->submitted_mutex);
+
 	return;
 }
 

+ 8 - 7
src/core/task.c

@@ -41,7 +41,6 @@
  * sure that no task remains !) */
 /* TODO we could make this hierarchical to avoid contention ? */
 //static starpu_pthread_cond_t submitted_cond = STARPU_PTHREAD_COND_INITIALIZER;
-static starpu_pthread_mutex_t submitted_mutex = STARPU_PTHREAD_MUTEX_INITIALIZER;
 
 /* This key stores the task currently handled by the thread, note that we
  * cannot use the worker structure to store that information because it is
@@ -804,7 +803,7 @@ starpu_drivers_request_termination(void)
 {
 	struct _starpu_machine_config *config = _starpu_get_machine_config();
 
-	STARPU_PTHREAD_MUTEX_LOCK(&submitted_mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&config->submitted_mutex);
 	int nsubmitted = starpu_task_nsubmitted();
 	config->submitting = 0;
 	if (nsubmitted == 0)
@@ -822,7 +821,7 @@ starpu_drivers_request_termination(void)
 		}
 	}
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(&submitted_mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&config->submitted_mutex);
 }
 
 int starpu_task_nsubmitted(void)
@@ -1018,16 +1017,16 @@ static void *watchdog_func(void *foo STARPU_ATTRIBUTE_UNUSED)
 	ts.tv_nsec = (timeout % 1000000) * 1000;
 	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
 	
-	STARPU_PTHREAD_MUTEX_LOCK(&submitted_mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&config->submitted_mutex);
 	while (_starpu_machine_is_running())
 	{
 		int last_nsubmitted = starpu_task_nsubmitted();
 		config->watchdog_ok = 0;
-		STARPU_PTHREAD_MUTEX_UNLOCK(&submitted_mutex);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&config->submitted_mutex);
 
 		_starpu_sleep(ts);
 
-		STARPU_PTHREAD_MUTEX_LOCK(&submitted_mutex);
+		STARPU_PTHREAD_MUTEX_LOCK(&config->submitted_mutex);
 		if (!config->watchdog_ok && last_nsubmitted
 				&& last_nsubmitted == starpu_task_nsubmitted())
 		{
@@ -1041,12 +1040,14 @@ static void *watchdog_func(void *foo STARPU_ATTRIBUTE_UNUSED)
 				fprintf(stderr,"Set the STARPU_WATCHDOG_CRASH environment variable if you want to abort the process in such a case\n");
 		}
 	}
-	STARPU_PTHREAD_MUTEX_UNLOCK(&submitted_mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&config->submitted_mutex);
 	return NULL;
 }
 
 void _starpu_watchdog_init(void)
 {
+	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
+	STARPU_PTHREAD_MUTEX_INIT(&config->submitted_mutex, NULL);
 	STARPU_PTHREAD_CREATE(&watchdog_thread, NULL, watchdog_func, NULL);
 }
 

+ 2 - 0
src/core/workers.h

@@ -311,6 +311,8 @@ struct _starpu_machine_config
 	unsigned submitting;
 
 	int watchdog_ok;
+
+	starpu_pthread_mutex_t submitted_mutex;
 };
 
 /* Three functions to manage argv, argc */

+ 4 - 4
src/debug/traces/starpu_fxt.c

@@ -515,9 +515,9 @@ static void create_paje_state_if_not_found(char *name, struct starpu_fxt_options
 	{
 #ifdef STARPU_HAVE_POTI
 		create_paje_state_color(name, "S", red, green, blue);
-		create_paje_state_color(name, "Ctx1", 255.0, 255.0, 0.0);
+		create_paje_state_color(name, "Ctx1", 255.0, 102.0, 255.0);
 		create_paje_state_color(name, "Ctx2", .0, 255.0, 0.0);
-		create_paje_state_color(name, "Ctx3", 75.0, .0, 130.0);
+		create_paje_state_color(name, "Ctx3", 255.0, 255.0, .0);
 		create_paje_state_color(name, "Ctx4", .0, 245.0, 255.0);
 		create_paje_state_color(name, "Ctx5", .0, .0, .0);
 		create_paje_state_color(name, "Ctx6", .0, .0, 128.0);
@@ -527,9 +527,9 @@ static void create_paje_state_if_not_found(char *name, struct starpu_fxt_options
 		create_paje_state_color(name, "Ctx10", 154.0, 205.0, 50.0);
 #else
 		fprintf(out_paje_file, "6	%s	S	%s	\"%f %f %f\" \n", name, name, red, green, blue);
-		fprintf(out_paje_file, "6	%s	Ctx1	%s	\"255.0 255.0 0.0\" \n", name, name);
+		fprintf(out_paje_file, "6	%s	Ctx1	%s	\"255.0 102.0 255.0\" \n", name, name);
 		fprintf(out_paje_file, "6	%s	Ctx2	%s	\".0 255.0 .0\" \n", name, name);
-		fprintf(out_paje_file, "6	%s	Ctx3	%s	\"75.0 .0 130.0\" \n", name, name);
+		fprintf(out_paje_file, "6	%s	Ctx3	%s	\"225.0 225.0 .0\" \n", name, name);
 		fprintf(out_paje_file, "6	%s	Ctx4	%s	\".0 245.0 255.0\" \n", name, name);
 		fprintf(out_paje_file, "6	%s	Ctx5	%s	\".0 .0 .0\" \n", name, name);
 		fprintf(out_paje_file, "6	%s	Ctx6	%s	\".0 .0 128.0\" \n", name, name);

+ 1 - 1
tests/main/driver_api/run_driver.c

@@ -35,7 +35,7 @@
 /* See FIXME in src/core/sched_ctx.c about starpu_drivers_request_termination.
  * This test should really use non-synchronous tasks, to properly cover all
  * needed cases. */
-#define FIXME 0
+#define FIXME 1
 
 #if defined(STARPU_USE_CPU) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_OPENCL)
 static void