浏览代码

Fault tolerance support with starpu_task_failed()

Samuel Thibault 6 年之前
父节点
当前提交
beb8a3d0b9

+ 1 - 0
ChangeLog

@@ -19,6 +19,7 @@
 StarPU 1.4.0 (svn revision xxxx)
 ==============================================
 New features:
+  * Fault tolerance support with starpu_task_failed().
 
 StarPU 1.3.0 (svn revision xxxx)
 ==============================================

+ 2 - 1
doc/doxygen/Makefile.am

@@ -2,7 +2,7 @@
 #
 # Copyright (C) 2013-2018                                Inria
 # Copyright (C) 2010-2019                                CNRS
-# Copyright (C) 2009,2011,2013,2014,2017                 Université de Bordeaux
+# Copyright (C) 2009,2011,2013,2014,2017,2019            Université de Bordeaux
 #
 # StarPU is free software; you can redistribute it and/or modify
 # it under the terms of the GNU Lesser General Public License as published by
@@ -72,6 +72,7 @@ chapters =	\
 	chapters/390_faq.doxy		\
 	chapters/401_out_of_core.doxy		\
 	chapters/410_mpi_support.doxy		\
+	chapters/415_fault_tolerance.doxy	\
 	chapters/420_fft_support.doxy		\
 	chapters/430_mic_scc_support.doxy		\
 	chapters/440_c_extensions.doxy		\

+ 2 - 1
doc/doxygen/chapters/000_introduction.doxy

@@ -2,7 +2,7 @@
  *
  * Copyright (C) 2010-2019                                CNRS
  * Copyright (C) 2011-2013,2016                           Inria
- * Copyright (C) 2009-2011,2014,2016                      Université de Bordeaux
+ * Copyright (C) 2009-2011,2014,2016,2019                 Université de Bordeaux
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -291,6 +291,7 @@ The documentation chapters include
 <ul>
 <li> \ref OutOfCore
 <li> \ref MPISupport
+<li> \ref FaultTolerance
 <li> \ref FFTSupport
 <li> \ref MICSCCSupport
 <li> \ref cExtensions

+ 32 - 0
doc/doxygen/chapters/415_fault_tolerance.doxy

@@ -0,0 +1,32 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2019                                     Université de Bordeaux
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+/*! \page FaultTolerance Fault Tolerance
+
+\section Introduction Introduction
+
+Due to e.g. hardware error, some tasks may fail, or even complete nodes may fail.
+
+\section TaskRetry Retrying tasks
+
+In case a task implementation notices that it fail to compute properly, it can
+call starpu_task_failed() to notify StarPU of the failure. StarPU will then
+clear the failure and re-execute the task, until an execution succeeds.
+
+This can only work if the task input are not modified, and is thus not supported
+for tasks with data access mode ::STARPU_RW.
+
+*/

+ 6 - 1
doc/doxygen/refman.tex

@@ -2,7 +2,7 @@
 %
 % Copyright (C) 2013-2016,2018                           Inria
 % Copyright (C) 2013-2019                                CNRS
-% Copyright (C) 2014,2018                                Université de Bordeaux
+% Copyright (C) 2014,2018-2019                                Université de Bordeaux
 % Copyright (C) 2013                                     Simon Archipoff
 %
 % StarPU is free software; you can redistribute it and/or modify
@@ -154,6 +154,11 @@ Documentation License”.
 \hypertarget{MPISupport}{}
 \input{MPISupport}
 
+\chapter{Fault Tolerance}
+\label{FaultTolerance}
+\hypertarget{FaultTolerance}{}
+\input{FaultTolerance}
+
 \chapter{FFT Support}
 \label{FFTSupport}
 \hypertarget{FFTSupport}{}

+ 4 - 2
include/starpu_profiling.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2014,2016,2017                      Université de Bordeaux
- * Copyright (C) 2010,2011,2013,2015,2017,2019                 CNRS
+ * Copyright (C) 2010-2014,2016,2017,2019                 Université de Bordeaux
+ * Copyright (C) 2010,2011,2013,2015,2017,2019            CNRS
  * Copyright (C) 2016                                     Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -118,6 +118,8 @@ struct starpu_profiling_worker_info
 	/** Energy consumed by the worker, in Joules */
 	double energy_consumed;
 
+	/* TODO: add wasted time due to failed tasks */
+
 	double flops;
 };
 

+ 18 - 1
include/starpu_task.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2011-2017                                Inria
- * Copyright (C) 2009-2018                                Université de Bordeaux
+ * Copyright (C) 2009-2019                                Université de Bordeaux
  * Copyright (C) 2010-2015,2017,2018,2019                 CNRS
  * Copyright (C) 2011                                     Télécom-SudParis
  * Copyright (C) 2016                                     Uppsala University
@@ -870,6 +870,11 @@ struct starpu_task
 	unsigned no_submitorder:1;
 
 	/**
+	   Whether this task has failed and will thus have to be retried
+	*/
+	unsigned failed:1;
+
+	/**
 	   Whether the scheduler has pushed the task on some queue
 	*/
 	unsigned scheduled:1;
@@ -1406,6 +1411,18 @@ unsigned starpu_task_get_implementation(struct starpu_task *task);
  */
 void starpu_create_sync_task(starpu_tag_t sync_tag, unsigned ndeps, starpu_tag_t *deps, void (*callback)(void *), void *callback_arg);
 
+/**
+   Record that this task failed, and should thus be retried.
+   This is usually called from the task codelet function itself, after checking
+   the result and noticing that the computation went wrong, and thus the task
+   should be retried. The performance of this task execution will not be
+   recorded for performance models.
+
+   This can only be called for a task whose data access mode are either STARPU_R
+   and STARPU_W.
+ */
+void starpu_task_failed(struct starpu_task *task);
+
 /** @} */
 
 #ifdef __cplusplus

+ 2 - 1
src/core/dependencies/implicit_data_deps.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2011,2012,2016                           Inria
- * Copyright (C) 2010-2018                                Université de Bordeaux
+ * Copyright (C) 2010-2019                                Université de Bordeaux
  * Copyright (C) 2010-2013,2015-2018                      CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -548,6 +548,7 @@ void _starpu_release_task_enforce_sequential_consistency(struct _starpu_job *j)
 		/* Release the reference acquired in _starpu_push_task_output */
 		_starpu_spin_lock(&handle->header_lock);
 		STARPU_ASSERT(handle->busy_count > 0);
+		handle->refcnt--;
 		handle->busy_count--;
 		if (!_starpu_data_check_not_busy(handle))
 			_starpu_spin_unlock(&handle->header_lock);

+ 29 - 8
src/core/jobs.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2011-2017                                Inria
- * Copyright (C) 2008-2018                                Université de Bordeaux
+ * Copyright (C) 2008-2019                                Université de Bordeaux
  * Copyright (C) 2010-2018                                CNRS
  * Copyright (C) 2013                                     Thibaut Lambert
  * Copyright (C) 2011                                     Télécom-SudParis
@@ -277,19 +277,41 @@ void starpu_task_end_dep_add(struct starpu_task *t, int nb_deps)
 
 void _starpu_handle_job_termination(struct _starpu_job *j)
 {
-	if (j->task->nb_termination_call_required != 0)
+	struct starpu_task *task = j->task;
+	unsigned sched_ctx = task->sched_ctx;
+	double flops = task->flops;
+
+	if (task->nb_termination_call_required != 0)
 	{
 		STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
-		int nb = j->task->nb_termination_call_required;
-		j->task->nb_termination_call_required -= 1;
+		int nb = task->nb_termination_call_required;
+		task->nb_termination_call_required -= 1;
 		STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 		if (nb != 0) return;
 	}
 
-	struct starpu_task *task = j->task;
+	_starpu_decrement_nready_tasks_of_sched_ctx(sched_ctx, flops);
+
+	if (task->failed)
+	{
+		/* Oops, try again */
+		task->failed = 0;
+
+		/* Tell the scheduler we finished this task */
+		_starpu_sched_post_exec_hook(task);
+#ifdef STARPU_USE_SC_HYPERVISOR
+		int workerid = starpu_worker_get_id();
+		_starpu_sched_ctx_post_exec_task_cb(workerid, task, data_size, j->footprint);
+#endif //STARPU_USE_SC_HYPERVISOR
+
+		/* But actually re-push it for execution, as if it just came out
+		 * of data dependency check */
+		task->status = STARPU_TASK_BLOCKED_ON_DATA;
+		_starpu_repush_task(j);
+		return;
+	}
+
 	struct starpu_task *end_rdep = NULL;
-	unsigned sched_ctx = task->sched_ctx;
-	double flops = task->flops;
 	const unsigned continuation =
 #ifdef STARPU_OPENMP
 		j->continuation
@@ -535,7 +557,6 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 		}
 	}
 
-	_starpu_decrement_nready_tasks_of_sched_ctx(sched_ctx, flops);
 	_starpu_decrement_nsubmitted_tasks_of_sched_ctx(sched_ctx);
 	struct _starpu_worker *worker;
 	worker = _starpu_get_local_worker_key();

+ 15 - 1
src/core/task.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2011-2018                                Inria
- * Copyright (C) 2009-2018                                Université de Bordeaux
+ * Copyright (C) 2009-2019                                Université de Bordeaux
  * Copyright (C) 2017                                     Erwan Leria
  * Copyright (C) 2010-2018                                CNRS
  * Copyright (C) 2013                                     Thibaut Lambert
@@ -1376,3 +1376,17 @@ void _starpu_watchdog_shutdown(void)
 
 	STARPU_PTHREAD_JOIN(watchdog_thread, NULL);
 }
+
+void starpu_task_failed(struct starpu_task *task)
+{
+	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
+	unsigned i;
+
+	for (i = 0; i < nbuffers; i++)
+	{
+		enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, i);
+		STARPU_ASSERT_MSG (mode == STARPU_R || mode == STARPU_W,
+				"starpu_task_failed is only supported for tasks with access modes STARPU_R and STARPU_W");
+	}
+	task->failed = 1;
+}

+ 1 - 0
src/datawizard/coherency.c

@@ -1307,6 +1307,7 @@ void __starpu_push_task_output(struct _starpu_job *j)
 		/* Keep a reference for future
 		 * _starpu_release_task_enforce_sequential_consistency call */
 		_starpu_spin_lock(&handle->header_lock);
+		handle->refcnt++;
 		handle->busy_count++;
 
 		if (node == -1)

+ 9 - 3
src/drivers/driver_common/driver_common.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2011-2017                                Inria
- * Copyright (C) 2010-2018                                Université de Bordeaux
+ * Copyright (C) 2010-2019                                Université de Bordeaux
  * Copyright (C) 2010-2017                                CNRS
  * Copyright (C) 2013                                     Thibaut Lambert
  * Copyright (C) 2011                                     Télécom-SudParis
@@ -247,9 +247,12 @@ void _starpu_driver_update_job_feedback(struct _starpu_job *j, struct _starpu_wo
 				do_update_time_model = 1;
 			}
 #else
-			const unsigned do_update_time_model = 1;
+			unsigned do_update_time_model = 1;
 			const double time_consumed = measured;
 #endif
+			if (j->task->failed)
+				/* Do not record perfmodel for failed tasks, they may terminate earlier */
+				do_update_time_model = 0;
 			if (do_update_time_model)
 			{
 				_starpu_update_perfmodel_history(j, j->task->cl->model, perf_arch, worker->devid, time_consumed, j->nimpl);
@@ -280,9 +283,12 @@ void _starpu_driver_update_job_feedback(struct _starpu_job *j, struct _starpu_wo
 		}
 #else
 		const double energy_consumed = profiling_info->energy_consumed;
-		const unsigned do_update_energy_model = 1;
+		unsigned do_update_energy_model = 1;
 #endif
 
+		if (j->task->failed)
+			/* Do not record perfmodel for failed tasks, they may terminate earlier */
+			do_update_energy_model = 0;
 		if (do_update_energy_model)
 		{
 			_starpu_update_perfmodel_history(j, j->task->cl->energy_model, perf_arch, worker->devid, energy_consumed, j->nimpl);

+ 1 - 0
tests/Makefile.am

@@ -335,6 +335,7 @@ myPROGRAMS +=				\
 	disk/mem_reclaim			\
 	errorcheck/invalid_blocking_calls	\
 	errorcheck/workers_cpuid		\
+	fault-tolerance/retry			\
 	helper/starpu_data_cpy			\
 	helper/starpu_create_sync_task		\
 	microbenchs/async_tasks_overhead	\

+ 88 - 0
tests/fault-tolerance/retry.c

@@ -0,0 +1,88 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2011-2013,2015,2017                      CNRS
+ * Copyright (C) 2017                                     Inria
+ * Copyright (C) 2019                                     Université de Bordeaux
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include "../helper.h"
+
+extern void cuda_host_increment(void *descr[], void *_args);
+
+static int retry;
+void cpu_increment(void *descr[], void *arg)
+{
+	(void)arg;
+	unsigned *var = (unsigned *)STARPU_VARIABLE_GET_PTR(descr[0]);
+	unsigned *var2 = (unsigned *)STARPU_VARIABLE_GET_PTR(descr[1]);
+	FPRINTF(stderr,"computing\n");
+	*var2 = *var + 1;
+	if (retry <= 10)
+	{
+		FPRINTF(stderr,"failing\n");
+		retry++;
+		/* Fake failure */
+		starpu_task_failed(starpu_task_get_current());
+	}
+}
+
+static struct starpu_codelet my_codelet =
+{
+	.cpu_funcs = {cpu_increment},
+	.cpu_funcs_name = {"cpu_increment"},
+	.modes = { STARPU_R, STARPU_W },
+	.nbuffers = 2
+};
+
+int main(void)
+{
+	int x = 12;
+	int y = 1;
+        starpu_data_handle_t h_x, h_y;
+	int ret, ret1;
+
+	ret = starpu_init(NULL);
+	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	starpu_variable_data_register(&h_x, STARPU_MAIN_RAM, (uintptr_t)&x, sizeof(x));
+	starpu_variable_data_register(&h_y, STARPU_MAIN_RAM, (uintptr_t)&y, sizeof(y));
+
+	retry = 0;
+	ret1 = starpu_task_insert(&my_codelet,
+				  STARPU_R, h_x,
+				  STARPU_W, h_y,
+				  0);
+	if (ret1 != -ENODEV) STARPU_CHECK_RETURN_VALUE(ret1, "starpu_task_insert");
+	starpu_task_wait_for_all();
+
+	starpu_data_unregister(h_x);
+	starpu_data_unregister(h_y);
+
+	starpu_shutdown();
+
+	if (x != 12)
+		ret = 1;
+	FPRINTF(stderr, "Value x = %d (expected 12)\n", x);
+
+	if (ret1 != -ENODEV)
+	{
+		if (y != 13)
+			ret = 1;
+		FPRINTF(stderr, "Value y = %d (expected 13)\n", y);
+	}
+
+	STARPU_RETURN(ret);
+}