Explorar o código

watchdog hook functionality

Alexis Juven %!s(int64=5) %!d(string=hai) anos
pai
achega
31f97fab81
Modificáronse 2 ficheiros con 91 adicións e 55 borrados
  1. 7 0
      include/starpu_task.h
  2. 84 55
      src/core/task.c

+ 7 - 0
include/starpu_task.h

@@ -1629,6 +1629,13 @@ void starpu_task_ft_failed(struct starpu_task *task);
  */
 void starpu_task_ft_success(struct starpu_task *meta_task);
 
+
+/**
+   Sets the function to call when the watchdog detects that StarPU has not
+ * finished task for STARPU_WATCHDOG_TIMEOUT seconds */
+void starpu_task_set_watchdog_hook(void (*hook)(void *), void * hook_arg);
+
+
 /** @} */
 
 #ifdef __cplusplus

+ 84 - 55
src/core/task.c

@@ -245,6 +245,13 @@ static int limit_max_submitted_tasks;
 static int watchdog_crash;
 static int watchdog_delay;
 
+/*
+ * Fuction to call when watchdog detects that no task has finished for more than STARPU_WATCHDOG_TIMEOUT seconds
+ */
+static void (*watchdog_hook)(void *) = NULL;
+static void * watchdog_hook_arg = NULL;
+
+
 #define _STARPU_TASK_MAGIC 42
 
 /* Called once at starpu_init */
@@ -364,7 +371,7 @@ void _starpu_task_destroy(struct starpu_task *task)
 	/* If starpu_task_destroy is called in a callback, we just set the destroy
 	   flag. The task will be destroyed after the callback returns */
 	if (task == starpu_task_get_current()
-	    && _starpu_get_local_worker_status() == STATUS_CALLBACK)
+			&& _starpu_get_local_worker_status() == STATUS_CALLBACK)
 	{
 		task->destroy = 1;
 	}
@@ -410,7 +417,7 @@ int starpu_task_finished(struct starpu_task *task)
 
 int starpu_task_wait(struct starpu_task *task)
 {
-        _STARPU_LOG_IN();
+	_STARPU_LOG_IN();
 	STARPU_ASSERT(task);
 
 	STARPU_ASSERT_MSG(!task->detach, "starpu_task_wait can only be called on tasks with detach = 0");
@@ -438,7 +445,7 @@ int starpu_task_wait(struct starpu_task *task)
 
 	_starpu_perf_counter_update_global_sample();
 	_STARPU_TRACE_TASK_WAIT_END();
-        _STARPU_LOG_OUT();
+	_STARPU_LOG_OUT();
 	return 0;
 }
 
@@ -504,7 +511,7 @@ int _starpu_submit_job(struct _starpu_job *j, int nodeps)
 #ifdef STARPU_USE_SC_HYPERVISOR
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(j->task->sched_ctx);
 	if(sched_ctx != NULL && j->task->sched_ctx != _starpu_get_initial_sched_ctx()->id && j->task->sched_ctx != STARPU_NMAX_SCHED_CTXS
-	   && sched_ctx->perf_counters != NULL)
+			&& sched_ctx->perf_counters != NULL)
 	{
 		struct starpu_perfmodel_arch arch;
 		_STARPU_MALLOC(arch.devices, sizeof(struct starpu_perfmodel_device));
@@ -597,7 +604,7 @@ void _starpu_codelet_check_deprecated_fields(struct starpu_codelet *cl)
 	unsigned i, some_impl;
 
 	/* Check deprecated and unset fields (where, <device>_func,
- 	 * <device>_funcs) */
+	 * <device>_funcs) */
 
 	/* CPU */
 	if (cl->cpu_func && cl->cpu_func != STARPU_MULTIPLE_CPU_IMPLEMENTATIONS && cl->cpu_funcs[0])
@@ -769,8 +776,8 @@ static int _starpu_task_submit_head(struct starpu_task *task)
 		/* Check buffers */
 		if (task->dyn_handles == NULL)
 			STARPU_ASSERT_MSG(STARPU_TASK_GET_NBUFFERS(task) <= STARPU_NMAXBUFS,
-					  "Codelet %p has too many buffers (%d vs max %d). Either use --enable-maxbuffers configure option to increase the max, or use dyn_handles instead of handles.",
-					  task->cl, STARPU_TASK_GET_NBUFFERS(task), STARPU_NMAXBUFS);
+					"Codelet %p has too many buffers (%d vs max %d). Either use --enable-maxbuffers configure option to increase the max, or use dyn_handles instead of handles.",
+					task->cl, STARPU_TASK_GET_NBUFFERS(task), STARPU_NMAXBUFS);
 
 		if (STARPU_UNLIKELY(task->dyn_handles))
 		{
@@ -793,9 +800,9 @@ static int _starpu_task_submit_head(struct starpu_task *task)
 			if (handle->home_node != -1)
 				_STARPU_TASK_SET_INTERFACE(task, starpu_data_get_interface_on_node(handle, handle->home_node), i);
 			if (!(task->cl->flags & STARPU_CODELET_NOPLANS) &&
-			    ((handle->nplans && !handle->nchildren) || handle->siblings)
-			    && handle->partition_automatic_disabled == 0
-			    )
+					((handle->nplans && !handle->nchildren) || handle->siblings)
+					&& handle->partition_automatic_disabled == 0
+			)
 				/* This handle is involved with asynchronous
 				 * partitioning as a parent or a child, make
 				 * sure the right plan is active, submit
@@ -849,16 +856,16 @@ int _starpu_task_submit(struct starpu_task *task, int nodeps)
 	starpu_task_bundle_t bundle = task->bundle;
 	STARPU_ASSERT_MSG(!(nodeps && bundle), "not supported\n");
 	/* internally, StarPU manipulates a struct _starpu_job * which is a wrapper around a
-	* task structure, it is possible that this job structure was already
-	* allocated. */
+	 * task structure, it is possible that this job structure was already
+	 * allocated. */
 	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
 	const unsigned continuation =
 #ifdef STARPU_OPENMP
-		j->continuation
+			j->continuation
 #else
-		0
+			0
 #endif
-		;
+			;
 	if (!_starpu_perf_counter_paused() && !j->internal && !continuation)
 	{
 		(void) STARPU_ATOMIC_ADD64(&_starpu_task__g_total_submitted__value, 1);
@@ -882,7 +889,7 @@ int _starpu_task_submit(struct starpu_task *task, int nodeps)
 	{
 		int nsubmitted_tasks = starpu_task_nsubmitted();
 		if (limit_max_submitted_tasks >= 0 && limit_max_submitted_tasks < nsubmitted_tasks
-			&& limit_min_submitted_tasks >= 0 && limit_min_submitted_tasks < nsubmitted_tasks)
+				&& limit_min_submitted_tasks >= 0 && limit_min_submitted_tasks < nsubmitted_tasks)
 		{
 			starpu_do_schedule();
 			_STARPU_TRACE_TASK_THROTTLE_START();
@@ -904,8 +911,8 @@ int _starpu_task_submit(struct starpu_task *task, int nodeps)
 	{
 		STARPU_ASSERT_MSG(!j->submitted || j->terminated >= 1, "Tasks can not be submitted a second time before being terminated. Please use different task structures, or use the regenerate flag to let the task resubmit itself automatically.");
 		_STARPU_TRACE_TASK_SUBMIT(j,
-			_starpu_get_sched_ctx_struct(task->sched_ctx)->iterations[0],
-			_starpu_get_sched_ctx_struct(task->sched_ctx)->iterations[1]);
+				_starpu_get_sched_ctx_struct(task->sched_ctx)->iterations[0],
+				_starpu_get_sched_ctx_struct(task->sched_ctx)->iterations[1]);
 	}
 
 	/* If this is a continuation, we don't modify the implicit data dependencies detected earlier. */
@@ -965,11 +972,11 @@ int _starpu_task_submit(struct starpu_task *task, int nodeps)
 		_starpu_sched_do_schedule(task->sched_ctx);
 		_starpu_wait_job(j);
 		if (task->destroy)
-		     _starpu_task_destroy(task);
+			_starpu_task_destroy(task);
 	}
 
 	_STARPU_TRACE_TASK_SUBMIT_END();
-        _STARPU_LOG_OUT();
+	_STARPU_LOG_OUT();
 	return ret;
 }
 
@@ -1003,7 +1010,7 @@ int starpu_task_submit_nodeps(struct starpu_task *task)
  * worker->sched_mutex must be locked when calling this function.
  */
 int _starpu_task_submit_conversion_task(struct starpu_task *task,
-					unsigned int workerid)
+		unsigned int workerid)
 {
 	int ret;
 	STARPU_ASSERT(task->cl);
@@ -1393,13 +1400,13 @@ void _starpu_task_prepare_for_continuation_ext(unsigned continuation_resubmit,
 		void (*continuation_callback_on_sleep)(void *arg), void *continuation_callback_on_sleep_arg)
 {
 	_starpu_job_prepare_for_continuation_ext(_starpu_get_job_associated_to_task(starpu_task_get_current()),
-		continuation_resubmit, continuation_callback_on_sleep, continuation_callback_on_sleep_arg);
+			continuation_resubmit, continuation_callback_on_sleep, continuation_callback_on_sleep_arg);
 }
 
 void _starpu_task_set_omp_cleanup_callback(struct starpu_task *task, void (*omp_cleanup_callback)(void *arg), void *omp_cleanup_callback_arg)
 {
 	_starpu_job_set_omp_cleanup_callback(_starpu_get_job_associated_to_task(task),
-		omp_cleanup_callback, omp_cleanup_callback_arg);
+			omp_cleanup_callback, omp_cleanup_callback_arg);
 }
 #endif
 
@@ -1426,14 +1433,14 @@ _starpu_task_uses_multiformat_handles(struct starpu_task *task)
  */
 int
 _starpu_handle_needs_conversion_task(starpu_data_handle_t handle,
-				     unsigned int node)
+		unsigned int node)
 {
 	return _starpu_handle_needs_conversion_task_for_arch(handle, starpu_node_get_kind(node));
 }
 
 int
 _starpu_handle_needs_conversion_task_for_arch(starpu_data_handle_t handle,
-				     enum starpu_node_kind node_kind)
+		enum starpu_node_kind node_kind)
 {
 	/*
 	 * Here, we assume that CUDA devices and OpenCL devices use the
@@ -1442,39 +1449,39 @@ _starpu_handle_needs_conversion_task_for_arch(starpu_data_handle_t handle,
 	 */
 	switch (node_kind)
 	{
+	case STARPU_CPU_RAM:
+		switch(starpu_node_get_kind(handle->mf_node))
+		{
 		case STARPU_CPU_RAM:
-			switch(starpu_node_get_kind(handle->mf_node))
-			{
-				case STARPU_CPU_RAM:
-					return 0;
-				case STARPU_CUDA_RAM:      /* Fall through */
-				case STARPU_OPENCL_RAM:
-				case STARPU_MIC_RAM:
-                                case STARPU_MPI_MS_RAM:
-					return 1;
-				default:
-					STARPU_ABORT();
-			}
-			break;
+			return 0;
+		case STARPU_CUDA_RAM:      /* Fall through */
+		case STARPU_OPENCL_RAM:
+		case STARPU_MIC_RAM:
+		case STARPU_MPI_MS_RAM:
+			return 1;
+		default:
+			STARPU_ABORT();
+		}
+		break;
 		case STARPU_CUDA_RAM:    /* Fall through */
 		case STARPU_OPENCL_RAM:
 		case STARPU_MIC_RAM:
 		case STARPU_MPI_MS_RAM:
 			switch(starpu_node_get_kind(handle->mf_node))
 			{
-				case STARPU_CPU_RAM:
-					return 1;
-				case STARPU_CUDA_RAM:
-				case STARPU_OPENCL_RAM:
-				case STARPU_MIC_RAM:
-                                case STARPU_MPI_MS_RAM:
-					return 0;
-				default:
-					STARPU_ABORT();
+			case STARPU_CPU_RAM:
+				return 1;
+			case STARPU_CUDA_RAM:
+			case STARPU_OPENCL_RAM:
+			case STARPU_MIC_RAM:
+			case STARPU_MPI_MS_RAM:
+				return 0;
+			default:
+				STARPU_ABORT();
 			}
 			break;
-		default:
-			STARPU_ABORT();
+			default:
+				STARPU_ABORT();
 	}
 	/* that instruction should never be reached */
 	return -EINVAL;
@@ -1515,6 +1522,10 @@ static int sleep_some(float timeout)
 	return 1;
 }
 
+
+
+
+
 /* Check from times to times that StarPU does finish some tasks */
 static void *watchdog_func(void *arg)
 {
@@ -1547,16 +1558,21 @@ static void *watchdog_func(void *arg)
 		if (!config->watchdog_ok && last_nsubmitted
 				&& last_nsubmitted == starpu_task_nsubmitted())
 		{
-			_STARPU_MSG("The StarPU watchdog detected that no task finished for %fs (can be configured through STARPU_WATCHDOG_TIMEOUT)\n",
-				    timeout);
+			if (watchdog_hook == NULL)
+				_STARPU_MSG("The StarPU watchdog detected that no task finished for %fs (can be configured through STARPU_WATCHDOG_TIMEOUT)\n",
+									timeout);
+			else
+				watchdog_hook(watchdog_hook_arg);
+
 			if (watchdog_crash)
 			{
 				_STARPU_MSG("Crashing the process\n");
 				raise(SIGABRT);
 			}
-			else
+			else if (watchdog_hook == NULL)
 				_STARPU_MSG("Set the STARPU_WATCHDOG_CRASH environment variable if you want to abort the process in such a case\n");
 		}
+
 		/* Only shout again after another period */
 		config->watchdog_ok = 1;
 	}
@@ -1564,7 +1580,17 @@ static void *watchdog_func(void *arg)
 	return NULL;
 }
 
-void _starpu_watchdog_init(void)
+
+/* Sets the function to call when the watchdog detects that StarPU has not
+ * finished task for STARPU_WATCHDOG_TIMEOUT seconds */
+void starpu_task_set_watchdog_hook(void (*hook)(void *), void * hook_arg)
+{
+	watchdog_hook = hook;
+	watchdog_hook_arg = hook_arg;
+}
+
+
+void _starpu_watchdog_init()
 {
 	struct _starpu_machine_config *config = _starpu_get_machine_config();
 	char *timeout_env = starpu_getenv("STARPU_WATCHDOG_TIMEOUT");
@@ -1577,6 +1603,9 @@ void _starpu_watchdog_init(void)
 	STARPU_PTHREAD_CREATE(&watchdog_thread, NULL, watchdog_func, timeout_env);
 }
 
+
+
+
 void _starpu_watchdog_shutdown(void)
 {
 	char *timeout_env = starpu_getenv("STARPU_WATCHDOG_TIMEOUT");
@@ -1648,7 +1677,7 @@ static void _starpu_default_check_ft(void *arg)
 	}
 
 	new_task = starpu_task_ft_create_retry
-(meta_task, current_task, _starpu_default_check_ft);
+			(meta_task, current_task, _starpu_default_check_ft);
 
 	ret = starpu_task_submit_nodeps(new_task);
 	STARPU_ASSERT(!ret);
@@ -1666,7 +1695,7 @@ void starpu_task_ft_prologue(void *arg)
 
 	/* Create a task which will do the actual computation */
 	new_task = starpu_task_ft_create_retry
-(meta_task, meta_task, check_ft);
+			(meta_task, meta_task, check_ft);
 
 	ret = starpu_task_submit_nodeps(new_task);
 	STARPU_ASSERT(!ret);