Переглянути джерело

Add fault tolerance support with starpu_task_ft_failed()

Samuel Thibault 6 роки тому
батько
коміт
f87952be4c

+ 1 - 0
ChangeLog

@@ -19,6 +19,7 @@
 StarPU 1.4.0 (svn revision xxxx)
 ==============================================
 New features:
+  * Fault tolerance support with starpu_task_ft_failed().
 
 StarPU 1.3.2 (git revision xxx)
 ==============================================

+ 2 - 1
doc/doxygen/Makefile.am

@@ -2,7 +2,7 @@
 #
 # Copyright (C) 2013-2018                                Inria
 # Copyright (C) 2010-2019                                CNRS
-# Copyright (C) 2009,2011,2013,2014,2017                 Université de Bordeaux
+# Copyright (C) 2009,2011,2013,2014,2017,2019            Université de Bordeaux
 #
 # StarPU is free software; you can redistribute it and/or modify
 # it under the terms of the GNU Lesser General Public License as published by
@@ -72,6 +72,7 @@ chapters =	\
 	chapters/390_faq.doxy		\
 	chapters/401_out_of_core.doxy		\
 	chapters/410_mpi_support.doxy		\
+	chapters/415_fault_tolerance.doxy	\
 	chapters/420_fft_support.doxy		\
 	chapters/430_mic_support.doxy		\
 	chapters/450_native_fortran_support.doxy		\

+ 2 - 1
doc/doxygen/chapters/000_introduction.doxy

@@ -2,7 +2,7 @@
  *
  * Copyright (C) 2010-2019                                CNRS
  * Copyright (C) 2011-2013,2016                           Inria
- * Copyright (C) 2009-2011,2014,2016                      Université de Bordeaux
+ * Copyright (C) 2009-2011,2014,2016,2019                 Université de Bordeaux
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -290,6 +290,7 @@ The documentation chapters include
 <ul>
 <li> \ref OutOfCore
 <li> \ref MPISupport
+<li> \ref FaultTolerance
 <li> \ref FFTSupport
 <li> \ref MICSupport
 <li> \ref NativeFortranSupport

+ 48 - 0
doc/doxygen/chapters/415_fault_tolerance.doxy

@@ -0,0 +1,48 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2019                                     Université de Bordeaux
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+/*! \page FaultTolerance Fault Tolerance
+
+\section Introduction Introduction
+
+Due to e.g. hardware error, some tasks may fail, or even complete nodes may
+fail.  For now, StarPU provides some support for failure of tasks.
+
+\section TaskRetry Retrying tasks
+
+In case a task implementation notices that it fail to compute properly, it can
+call starpu_task_failed() to notify StarPU of the failure.
+
+<c>tests/fault-tolerance/retry.c</c> is an example of coping with such failure:
+the principle is that when submitting the task, one sets its prologue callback
+to starpu_task_ft_prologue(). That prologue will turn the task into a meta
+task which will manage the repeated submission of try-tasks to perform the
+computation until one of the computations succeeds.
+
+By default, try-tasks will be just retried until one of them succeeds (i.e. the
+task implementation does not call starpu_task_failed()). One can change the
+behavior by passing a <c>check_failsafe</c> function as prologue parameter,
+which will be called at the end of the try-task attempt. It can look at
+<c>starpu_task_get_current()->failed</c> to determine whether the try-task
+suceeded, in which case it can call starpu_task_ft_success() on the meta-task to
+notify success, or if it failed, in which case it can call
+starpu_task_failsafe_create_retry() to create another try-task, and submit it
+with starpu_task_submit_nodeps().
+
+This can however only work if the task input are not modified, and is thus not
+supported for tasks with data access mode ::STARPU_RW.
+
+*/

+ 6 - 1
doc/doxygen/refman.tex

@@ -2,7 +2,7 @@
 %
 % Copyright (C) 2013-2016,2018                           Inria
 % Copyright (C) 2013-2019                                CNRS
-% Copyright (C) 2014,2018                                Université de Bordeaux
+% Copyright (C) 2014,2018-2019                                Université de Bordeaux
 % Copyright (C) 2013                                     Simon Archipoff
 %
 % StarPU is free software; you can redistribute it and/or modify
@@ -154,6 +154,11 @@ Documentation License”.
 \hypertarget{MPISupport}{}
 \input{MPISupport}
 
+\chapter{Fault Tolerance}
+\label{FaultTolerance}
+\hypertarget{FaultTolerance}{}
+\input{FaultTolerance}
+
 \chapter{FFT Support}
 \label{FFTSupport}
 \hypertarget{FFTSupport}{}

+ 4 - 2
include/starpu_profiling.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2014,2016,2017                      Université de Bordeaux
- * Copyright (C) 2010,2011,2013,2015,2017,2019                 CNRS
+ * Copyright (C) 2010-2014,2016,2017,2019                 Université de Bordeaux
+ * Copyright (C) 2010,2011,2013,2015,2017,2019            CNRS
  * Copyright (C) 2016                                     Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -118,6 +118,8 @@ struct starpu_profiling_worker_info
 	/** Energy consumed by the worker, in Joules */
 	double energy_consumed;
 
+	/* TODO: add wasted time due to failed tasks */
+
 	double flops;
 };
 

+ 67 - 0
include/starpu_task.h

@@ -916,6 +916,13 @@ struct starpu_task
 	unsigned no_submitorder:1;
 
 	/**
+	   Whether this task has failed and will thus have to be retried
+
+	   Set by StarPU.
+	*/
+	unsigned failed:1;
+
+	/**
 	   Whether the scheduler has pushed the task on some queue
 
 	   Set by StarPU.
@@ -1345,6 +1352,15 @@ void starpu_task_destroy(struct starpu_task *task);
 int starpu_task_submit(struct starpu_task *task) STARPU_WARN_UNUSED_RESULT;
 
 /**
+   Submit \p task to StarPU with dependency bypass.
+
+   This can only be called on behalf of another task which has already taken the
+   proper dependencies, e.g. this task is just an attempt of doing the actual
+   computation of that task.
+*/
+int starpu_task_submit_nodeps(struct starpu_task *task) STARPU_WARN_UNUSED_RESULT;
+
+/**
    Submit \p task to the context \p sched_ctx_id. By default,
    starpu_task_submit() submits the task to a global context that is
    created automatically by StarPU.
@@ -1501,6 +1517,57 @@ unsigned starpu_task_get_implementation(struct starpu_task *task);
  */
 void starpu_create_sync_task(starpu_tag_t sync_tag, unsigned ndeps, starpu_tag_t *deps, void (*callback)(void *), void *callback_arg);
 
+
+
+
+/**
+   Function to be used as a prologue callback to enable fault tolerance for the
+   task. This prologue will create a try-task, i.e a duplicate of the task,
+   which will to the actual computation.
+
+   The prologue argument can be set to a check_ft function that will be
+   called on termination of the duplicate, which can check the result of the
+   task, and either confirm success, or resubmit another attempt.
+   If it is not set, the default implementation is to just resubmit a new
+   try-task.
+ */
+void starpu_task_ft_prologue(void *check_ft);
+
+
+/**
+   Create a try-task for a \p meta_task, given a \p template_task task
+   template. The meta task can be passed as template on the first call, but
+   since it is mangled by starpu_task_ft_create_retry(), further calls
+   (typically made by the check_ft callback) need to be passed the previous
+   try-task as template task.
+
+   \p check_ft is similar to the prologue argument of
+   starpu_task_ft_prologue(), and is typicall set to the very function calling
+   starpu_task_ft_create_retry().
+
+   The try-task is returned, and can be modified (e.g. to change scheduling
+   parameters) before being submitted with starpu_task_submit_nodeps().
+ */
+struct starpu_task * starpu_task_ft_create_retry(const struct starpu_task *meta_task, const struct starpu_task *template_task, void (*check_ft)(void*));
+
+/**
+   Record that this task failed, and should thus be retried.
+   This is usually called from the task codelet function itself, after checking
+   the result and noticing that the computation went wrong, and thus the task
+   should be retried. The performance of this task execution will not be
+   recorded for performance models.
+
+   This can only be called for a task whose data access modes are either
+   STARPU_R and STARPU_W.
+ */
+void starpu_task_ft_failed(struct starpu_task *task);
+
+/**
+   Notify that the try-task was successful and thus the meta-task was
+   successful.
+ */
+void starpu_task_ft_success(struct starpu_task *meta_task);
+
 /** @} */
 
 #ifdef __cplusplus

+ 114 - 3
src/core/dependencies/data_concurrency.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2013,2015-2017                           Inria
- * Copyright (C) 2009-2015,2017,2018                      Université de Bordeaux
+ * Copyright (C) 2009-2015,2017,2018-2019                      Université de Bordeaux
  * Copyright (C) 2010-2013,2015,2017,2018,2019            CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -241,6 +241,60 @@ static unsigned _starpu_attempt_to_submit_data_request(unsigned request_from_cod
 
 }
 
+/* Take a data, without waiting for it to be available (it is assumed to be).
+ * This is typicall used for nodeps tasks, for which a previous task has already
+ * waited for the proper conditions, and we just need to take another reference
+ * for overall reference coherency.
+/* No lock is held, this acquires and releases the handle header lock */
+static void _starpu_take_data(unsigned request_from_codelet,
+						       starpu_data_handle_t handle, enum starpu_data_access_mode mode,
+						       struct _starpu_job *j)
+{
+	STARPU_ASSERT_MSG(!handle->arbiter, "TODO");
+
+	/* Do not care about some flags */
+	mode &= ~STARPU_COMMUTE;
+	mode &= ~STARPU_SSEND;
+	mode &= ~STARPU_LOCALITY;
+	if (mode == STARPU_RW)
+		mode = STARPU_W;
+
+	/* Take the lock protecting the header. We try to do some progression
+	 * in case this is called from a worker, otherwise we just wait for the
+	 * lock to be available. */
+	if (request_from_codelet)
+	{
+		int cpt = 0;
+		while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
+		{
+			cpt++;
+			_starpu_datawizard_progress(0);
+		}
+		if (cpt == STARPU_SPIN_MAXTRY)
+			_starpu_spin_lock(&handle->header_lock);
+	}
+	else
+	{
+		_starpu_spin_lock(&handle->header_lock);
+	}
+
+	/* If we are currently performing a reduction, we freeze any request
+	 * that is not explicitely a reduction task. */
+	unsigned is_a_reduction_task = (request_from_codelet && j && j->reduction_task);
+
+	STARPU_ASSERT_MSG(!is_a_reduction_task, "TODO");
+
+	enum starpu_data_access_mode previous_mode = handle->current_mode;
+
+	STARPU_ASSERT_MSG(mode == previous_mode, "mode was %d, but requested %d", previous_mode, mode);
+
+	handle->refcnt++;
+	handle->busy_count++;
+
+	_starpu_spin_unlock(&handle->header_lock);
+}
+
+
 /* No lock is held */
 unsigned _starpu_attempt_to_submit_data_request_from_apps(starpu_data_handle_t handle, enum starpu_data_access_mode mode,
 							  void (*callback)(void *), void *argcb)
@@ -260,7 +314,7 @@ static unsigned attempt_to_submit_data_request_from_job(struct _starpu_job *j, u
 	return _starpu_attempt_to_submit_data_request(1, handle, mode, NULL, NULL, j, buffer_index);
 }
 
-/* Acquire all data of the given job, one by one in handle pointer value order
+/* Try to acquire all data of the given job, one by one in handle pointer value order
  */
 /* No lock is held */
 static unsigned _submit_job_enforce_data_deps(struct _starpu_job *j, unsigned start_buffer_index)
@@ -301,6 +355,50 @@ static unsigned _submit_job_enforce_data_deps(struct _starpu_job *j, unsigned st
 	return 0;
 }
 
+static void take_data_from_job(struct _starpu_job *j, unsigned buffer_index)
+{
+	/* Note that we do not access j->task->handles, but j->ordered_buffers
+	 * which is a sorted copy of it. */
+	struct _starpu_data_descr *buffer = &(_STARPU_JOB_GET_ORDERED_BUFFERS(j)[buffer_index]);
+	starpu_data_handle_t handle = buffer->handle;
+	enum starpu_data_access_mode mode = buffer->mode & ~STARPU_COMMUTE;
+
+	_starpu_take_data(1, handle, mode, j);
+}
+
+/* Immediately acquire all data of the given job, one by one in handle pointer value order
+ */
+/* No lock is held */
+static void _submit_job_take_data_deps(struct _starpu_job *j, unsigned start_buffer_index)
+{
+	unsigned buf;
+
+	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(j->task);
+	for (buf = start_buffer_index; buf < nbuffers; buf++)
+	{
+		starpu_data_handle_t handle = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf);
+		if (buf)
+		{
+			starpu_data_handle_t handle_m1 = _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(j, buf-1);
+			if (handle_m1 == handle)
+				/* We have already requested this data, skip it. This
+				 * depends on ordering putting writes before reads, see
+				 * _starpu_compar_handles.  */
+				continue;
+		}
+
+		if(handle->arbiter)
+		{
+			/* We arrived on an arbitered data, we stop and proceed
+			 * with the arbiter second step.  */
+			STARPU_ASSERT_MSG(0, "TODO");
+			//_starpu_submit_job_take_arbitered_deps(j, buf, nbuffers);
+		}
+
+                take_data_from_job(j, buf);
+	}
+}
+
 /* This is called when the tag+task dependencies are to be finished releasing.  */
 void _starpu_enforce_data_deps_notify_job_ready_soon(struct _starpu_job *j, _starpu_notify_job_start_data *data)
 {
@@ -363,7 +461,7 @@ void _starpu_job_set_ordered_buffers(struct _starpu_job *j)
 }
 
 /* Sort the data used by the given job by handle pointer value order, and
- * acquire them in that order */
+ * try to acquire them in that order */
 /* No  lock is held */
 unsigned _starpu_submit_job_enforce_data_deps(struct _starpu_job *j)
 {
@@ -391,6 +489,19 @@ static unsigned unlock_one_requester(struct _starpu_data_requester *r)
 		return 0;
 }
 
+/* Sort the data used by the given job by handle pointer value order, and
+ * immediately acquire them in that order */
+/* No  lock is held */
+void _starpu_submit_job_take_data_deps(struct _starpu_job *j)
+{
+	struct starpu_codelet *cl = j->task->cl;
+
+	if ((cl == NULL) || (STARPU_TASK_GET_NBUFFERS(j->task) == 0))
+		return;
+
+	_submit_job_take_data_deps(j, 0);
+}
+
 /* This is called when a task is finished with a piece of data
  * (or on starpu_data_release)
  *

+ 2 - 1
src/core/dependencies/data_concurrency.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2015                                     Inria
- * Copyright (C) 2009-2012,2014,2015,2018                 Université de Bordeaux
+ * Copyright (C) 2009-2012,2014,2015,2018-2019                 Université de Bordeaux
  * Copyright (C) 2010,2011,2013,2015,2017                 CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -25,6 +25,7 @@ void _starpu_job_set_ordered_buffers(struct _starpu_job *j);
 
 unsigned _starpu_submit_job_enforce_data_deps(struct _starpu_job *j);
 void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers);
+void _starpu_submit_job_take_data_deps(struct _starpu_job *j);
 void _starpu_enforce_data_deps_notify_job_ready_soon(struct _starpu_job *j, _starpu_notify_job_start_data *data);
 
 int _starpu_notify_data_dependencies(starpu_data_handle_t handle);

+ 16 - 2
src/core/jobs.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2011-2017                                Inria
- * Copyright (C) 2008-2018                                Université de Bordeaux
+ * Copyright (C) 2008-2019                                Université de Bordeaux
  * Copyright (C) 2010-2019                                CNRS
  * Copyright (C) 2013                                     Thibaut Lambert
  * Copyright (C) 2011                                     Télécom-SudParis
@@ -519,7 +519,7 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 			{
 				/* We reuse the same job structure */
 				task->status = STARPU_TASK_BLOCKED;
-				int ret = _starpu_submit_job(j);
+				int ret = _starpu_submit_job(j, 0);
 				STARPU_ASSERT(!ret);
 			}
 #ifdef STARPU_OPENMP
@@ -700,6 +700,20 @@ unsigned _starpu_reenforce_task_deps_and_schedule(struct _starpu_job *j)
 }
 #endif
 
+unsigned _starpu_take_deps_and_schedule(struct _starpu_job *j)
+{
+	unsigned ret;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
+
+	/* Take references */
+	_starpu_submit_job_take_data_deps(j);
+
+	/* And immediately push task */
+	ret = _starpu_push_task(j);
+
+	return ret;
+}
+
 /* This is called when a tag or task dependency is to be released.  */
 void _starpu_enforce_deps_notify_job_ready_soon(struct _starpu_job *j, _starpu_notify_job_start_data *data, int tag)
 {

+ 2 - 1
src/core/jobs.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2008-2018                                Université de Bordeaux
+ * Copyright (C) 2008-2019                                Université de Bordeaux
  * Copyright (C) 2011,2014                                Inria
  * Copyright (C) 2010,2011,2013-2015,2017,2018,2019       CNRS
  * Copyright (C) 2013                                     Thibaut Lambert
@@ -255,6 +255,7 @@ unsigned _starpu_enforce_deps_starting_from_task(struct _starpu_job *j);
 /* When waking up a continuation, we only enforce new task dependencies */
 unsigned _starpu_reenforce_task_deps_and_schedule(struct _starpu_job *j);
 #endif
+unsigned _starpu_take_deps_and_schedule(struct _starpu_job *j);
 void _starpu_enforce_deps_notify_job_ready_soon(struct _starpu_job *j, _starpu_notify_job_start_data *data, int tag);
 
 /* Called at the submission of the job */

+ 126 - 30
src/core/task.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2011-2018                                Inria
- * Copyright (C) 2009-2018                                Université de Bordeaux
+ * Copyright (C) 2009-2019                                Université de Bordeaux
  * Copyright (C) 2017                                     Erwan Leria
  * Copyright (C) 2010-2019                                CNRS
  * Copyright (C) 2013                                     Thibaut Lambert
@@ -297,7 +297,7 @@ int _starpu_task_test_termination(struct starpu_task *task)
 
 /* NB in case we have a regenerable task, it is possible that the job was
  * already counted. */
-int _starpu_submit_job(struct _starpu_job *j)
+int _starpu_submit_job(struct _starpu_job *j, int nodeps)
 {
 	struct starpu_task *task = j->task;
 	int ret;
@@ -371,15 +371,22 @@ int _starpu_submit_job(struct _starpu_job *j)
 	}
 #endif
 
-#ifdef STARPU_OPENMP
-	if (continuation)
+	if (nodeps)
 	{
-		ret = _starpu_reenforce_task_deps_and_schedule(j);
+		ret = _starpu_take_deps_and_schedule(j);
 	}
 	else
-#endif
 	{
-		ret = _starpu_enforce_deps_and_schedule(j);
+#ifdef STARPU_OPENMP
+		if (continuation)
+		{
+			ret = _starpu_reenforce_task_deps_and_schedule(j);
+		}
+		else
+#endif
+		{
+			ret = _starpu_enforce_deps_and_schedule(j);
+		}
 	}
 
 	_STARPU_LOG_OUT();
@@ -629,7 +636,7 @@ static int _starpu_task_submit_head(struct starpu_task *task)
 }
 
 /* application should submit new tasks to StarPU through this function */
-int starpu_task_submit(struct starpu_task *task)
+int _starpu_task_submit(struct starpu_task *task, int nodeps)
 {
 	_STARPU_LOG_IN();
 	STARPU_ASSERT(task);
@@ -638,6 +645,7 @@ int starpu_task_submit(struct starpu_task *task)
 	int ret;
 	unsigned is_sync = task->synchronous;
 	starpu_task_bundle_t bundle = task->bundle;
+	STARPU_ASSERT_MSG(!(nodeps && bundle), "not supported\n");
 	/* internally, StarPU manipulates a struct _starpu_job * which is a wrapper around a
 	* task structure, it is possible that this job structure was already
 	* allocated. */
@@ -649,6 +657,7 @@ int starpu_task_submit(struct starpu_task *task)
 		0
 #endif
 		;
+	STARPU_ASSERT_MSG(!(nodeps && continuation), "not supported\n");
 
 	if (!j->internal)
 	{
@@ -684,7 +693,8 @@ int starpu_task_submit(struct starpu_task *task)
 	if (task->cl && !continuation)
 	{
 		_starpu_job_set_ordered_buffers(j);
-		_starpu_detect_implicit_data_deps(task);
+		if (!nodeps)
+			_starpu_detect_implicit_data_deps(task);
 	}
 
 	if (bundle)
@@ -725,7 +735,7 @@ int starpu_task_submit(struct starpu_task *task)
 	if (profiling)
 		_starpu_clock_gettime(&info->submit_time);
 
-	ret = _starpu_submit_job(j);
+	ret = _starpu_submit_job(j, nodeps);
 #ifdef STARPU_SIMGRID
 	if (_starpu_simgrid_task_submit_cost())
 		MSG_process_sleep(0.000001);
@@ -744,6 +754,11 @@ int starpu_task_submit(struct starpu_task *task)
 	return ret;
 }
 
+int starpu_task_submit(struct starpu_task *task)
+{
+	return _starpu_task_submit(task, 0);
+}
+
 int _starpu_task_submit_internally(struct starpu_task *task)
 {
 	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
@@ -760,27 +775,9 @@ int starpu_task_submit_to_ctx(struct starpu_task *task, unsigned sched_ctx_id)
 
 /* The StarPU core can submit tasks directly to the scheduler or a worker,
  * skipping dependencies completely (when it knows what it is doing).  */
-int _starpu_task_submit_nodeps(struct starpu_task *task)
+int starpu_task_submit_nodeps(struct starpu_task *task)
 {
-	int ret = _starpu_task_submit_head(task);
-	STARPU_ASSERT(ret == 0);
-
-	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
-
-	_starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
-	_starpu_sched_task_submit(task);
-
-	STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
-	_starpu_handle_job_submission(j);
-	_starpu_increment_nready_tasks_of_sched_ctx(j->task->sched_ctx, j->task->flops, j->task);
-	if (task->cl)
-		/* This would be done by data dependencies checking */
-		_starpu_job_set_ordered_buffers(j);
-	STARPU_ASSERT(task->status == STARPU_TASK_BLOCKED);
-	task->status = STARPU_TASK_READY;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
-
-	return _starpu_push_task(j);
+	return _starpu_task_submit(task, 1);
 }
 
 /*
@@ -1361,3 +1358,102 @@ void _starpu_watchdog_shutdown(void)
 
 	STARPU_PTHREAD_JOIN(watchdog_thread, NULL);
 }
+
+static void _starpu_ft_check_support(const struct starpu_task *task)
+{
+	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
+	unsigned i;
+
+	for (i = 0; i < nbuffers; i++)
+	{
+		enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, i);
+		STARPU_ASSERT_MSG (mode == STARPU_R || mode == STARPU_W,
+				"starpu_task_failed is only supported for tasks with access modes STARPU_R and STARPU_W");
+	}
+}
+
+struct starpu_task *starpu_task_ft_create_retry
+(const struct starpu_task *meta_task, const struct starpu_task *template_task, void (*check_ft)(void *))
+{
+	/* Create a new task to actually perform the result */
+	struct starpu_task *new_task = starpu_task_create();
+
+	*new_task = *template_task;
+	new_task->prologue_callback_func = NULL;
+	/* XXX: cl_arg needs to be duplicated */
+	STARPU_ASSERT_MSG(!meta_task->cl_arg_free || !meta_task->cl_arg, "not supported yet");
+	STARPU_ASSERT_MSG(!meta_task->callback_func, "not supported");
+	new_task->callback_func = check_ft;
+	new_task->callback_arg = (void*) meta_task;
+	new_task->callback_arg_free = 0;
+	new_task->prologue_callback_arg_free = 0;
+	STARPU_ASSERT_MSG(!new_task->prologue_callback_pop_arg_free, "not supported");
+	new_task->use_tag = 0;
+	new_task->synchronous = 0;
+	new_task->destroy = 1;
+	new_task->regenerate = 0;
+	new_task->no_submitorder = 1;
+	new_task->failed = 0;
+	new_task->status = STARPU_TASK_INVALID;
+	new_task->profiling_info = NULL;
+	new_task->prev = NULL;
+	new_task->next = NULL;
+	new_task->starpu_private = NULL;
+	new_task->omp_task = NULL;
+
+	return new_task;
+}
+
+static void _starpu_default_check_ft(void *arg)
+{
+	struct starpu_task *meta_task = arg;
+	struct starpu_task *current_task = starpu_task_get_current();
+	struct starpu_task *new_task;
+	int ret;
+
+	if (!current_task->failed)
+	{
+		starpu_task_ft_success(meta_task);
+		return;
+	}
+
+	new_task = starpu_task_ft_create_retry
+(meta_task, current_task, _starpu_default_check_ft);
+
+	ret = starpu_task_submit_nodeps(new_task);
+	STARPU_ASSERT(!ret);
+}
+
+void starpu_task_ft_prologue(void *arg)
+{
+	struct starpu_task *meta_task = starpu_task_get_current();
+	struct starpu_task *new_task;
+	void (*check_ft)(void*) = arg;
+	int ret;
+
+	if (!check_ft)
+		check_ft = _starpu_default_check_ft;
+
+	/* Create a task which will do the actual computation */
+	new_task = starpu_task_ft_create_retry
+(meta_task, meta_task, check_ft);
+
+	ret = starpu_task_submit_nodeps(new_task);
+	STARPU_ASSERT(!ret);
+
+	/* Make the parent task wait for the result getting correct */
+	starpu_task_end_dep_add(meta_task, 1);
+	meta_task->where = STARPU_NOWHERE;
+}
+
+void starpu_task_ft_failed(struct starpu_task *task)
+{
+	_starpu_ft_check_support(task);
+
+	task->failed = 1;
+}
+
+void starpu_task_ft_success(struct starpu_task *meta_task)
+{
+	starpu_task_end_dep_release(meta_task);
+}

+ 2 - 6
src/core/task.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2011-2014,2016,2017                      Inria
- * Copyright (C) 2009-2018                                Université de Bordeaux
+ * Copyright (C) 2009-2019                                Université de Bordeaux
  * Copyright (C) 2010-2017, 2019                          CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -39,11 +39,7 @@ void _starpu_task_init(void);
 void _starpu_task_deinit(void);
 void _starpu_set_current_task(struct starpu_task *task);
 
-/* NB the second argument makes it possible to count regenerable tasks only
- * once. */
-int _starpu_submit_job(struct _starpu_job *j);
-
-int _starpu_task_submit_nodeps(struct starpu_task *task);
+int _starpu_submit_job(struct _starpu_job *j, int nodeps);
 
 void _starpu_task_declare_deps_array(struct starpu_task *task, unsigned ndeps, struct starpu_task *task_array[], int check);
 

+ 9 - 3
src/drivers/driver_common/driver_common.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2011-2017                                Inria
- * Copyright (C) 2010-2018                                Université de Bordeaux
+ * Copyright (C) 2010-2019                                Université de Bordeaux
  * Copyright (C) 2010-2017, 2019                          CNRS
  * Copyright (C) 2013                                     Thibaut Lambert
  * Copyright (C) 2011                                     Télécom-SudParis
@@ -236,9 +236,12 @@ void _starpu_driver_update_job_feedback(struct _starpu_job *j, struct _starpu_wo
 				do_update_time_model = 1;
 			}
 #else
-			const unsigned do_update_time_model = 1;
+			unsigned do_update_time_model = 1;
 			const double time_consumed = measured;
 #endif
+			if (j->task->failed)
+				/* Do not record perfmodel for failed tasks, they may terminate earlier */
+				do_update_time_model = 0;
 			if (do_update_time_model)
 			{
 				_starpu_update_perfmodel_history(j, j->task->cl->model, perf_arch, worker->devid, time_consumed, j->nimpl);
@@ -269,9 +272,12 @@ void _starpu_driver_update_job_feedback(struct _starpu_job *j, struct _starpu_wo
 		}
 #else
 		const double energy_consumed = profiling_info->energy_consumed;
-		const unsigned do_update_energy_model = 1;
+		unsigned do_update_energy_model = 1;
 #endif
 
+		if (j->task->failed)
+			/* Do not record perfmodel for failed tasks, they may terminate earlier */
+			do_update_energy_model = 0;
 		if (do_update_energy_model)
 		{
 			_starpu_update_perfmodel_history(j, j->task->cl->energy_model, perf_arch, worker->devid, energy_consumed, j->nimpl);

+ 1 - 0
tests/Makefile.am

@@ -327,6 +327,7 @@ myPROGRAMS +=				\
 	disk/mem_reclaim			\
 	errorcheck/invalid_blocking_calls	\
 	errorcheck/workers_cpuid		\
+	fault-tolerance/retry			\
 	helper/starpu_data_cpy			\
 	helper/starpu_create_sync_task		\
 	microbenchs/async_tasks_overhead	\

+ 123 - 0
tests/fault-tolerance/retry.c

@@ -0,0 +1,123 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2011-2013,2015,2017                      CNRS
+ * Copyright (C) 2017                                     Inria
+ * Copyright (C) 2019                                     Université de Bordeaux
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+/*
+ * This tests the fault tolerance interface: it submits a tasks which repeatedly
+ * fails until being eventually successful
+ */
+
+#include <starpu.h>
+#include "../helper.h"
+
+/* This task fakes some repeated errors  */
+static int retry;
+void cpu_increment(void *descr[], void *arg)
+{
+	(void)arg;
+	unsigned *var = (unsigned *)STARPU_VARIABLE_GET_PTR(descr[0]);
+	unsigned *var2 = (unsigned *)STARPU_VARIABLE_GET_PTR(descr[1]);
+	FPRINTF(stderr,"computing\n");
+	*var2 = *var + 1;
+	if (retry < 10)
+	{
+		FPRINTF(stderr,"failing\n");
+		retry++;
+		/* Fake failure */
+		starpu_task_ft_failed(starpu_task_get_current());
+	}
+	else
+		FPRINTF(stderr,"succeed\n");
+}
+
+static struct starpu_codelet my_codelet =
+{
+	.cpu_funcs = {cpu_increment},
+	.cpu_funcs_name = {"cpu_increment"},
+	.modes = { STARPU_R, STARPU_W },
+	.nbuffers = 2
+};
+
+/* This implements the retry strategy
+ * (Identical to the default implementation: just retry) */
+static void check_ft(void *arg)
+{
+	struct starpu_task *meta_task = arg;
+	struct starpu_task *current_task = starpu_task_get_current();
+	struct starpu_task *new_task;
+	int ret;
+
+	if (!current_task->failed)
+	{
+		FPRINTF(stderr,"didn't fail, release main task\n");
+		starpu_task_ft_success(meta_task);
+		return;
+	}
+
+	FPRINTF(stderr,"failed, try again\n");
+
+	new_task = starpu_task_ft_create_retry(meta_task, current_task, check_ft);
+
+	/* Here we could e.g. force the task to use only a CPU implementation
+	 * known to be failsafe */
+
+	ret = starpu_task_submit_nodeps(new_task);
+	STARPU_ASSERT(!ret);
+}
+
+int main(void)
+{
+	int x = 12;
+	int y = 1;
+        starpu_data_handle_t h_x, h_y;
+	int ret, ret1;
+
+	ret = starpu_init(NULL);
+	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	starpu_variable_data_register(&h_x, STARPU_MAIN_RAM, (uintptr_t)&x, sizeof(x));
+	starpu_variable_data_register(&h_y, STARPU_MAIN_RAM, (uintptr_t)&y, sizeof(y));
+
+	retry = 0;
+	ret1 = starpu_task_insert(&my_codelet,
+				  STARPU_PROLOGUE_CALLBACK, starpu_task_ft_prologue,
+				  STARPU_PROLOGUE_CALLBACK_ARG, check_ft,
+				  STARPU_R, h_x,
+				  STARPU_W, h_y,
+				  0);
+	if (ret1 != -ENODEV) STARPU_CHECK_RETURN_VALUE(ret1, "starpu_task_insert");
+	starpu_task_wait_for_all();
+
+	starpu_data_unregister(h_x);
+	starpu_data_unregister(h_y);
+
+	starpu_shutdown();
+
+	if (x != 12)
+		ret = 1;
+	FPRINTF(stderr, "Value x = %d (expected 12)\n", x);
+
+	if (ret1 != -ENODEV)
+	{
+		if (y != 13)
+			ret = 1;
+		FPRINTF(stderr, "Value y = %d (expected 13)\n", y);
+	}
+
+	STARPU_RETURN(ret);
+}