Browse Source

Let the CUDA driver progress while the GPU is computing

Samuel Thibault 11 years ago
parent
commit
7ce4ced85d

+ 1 - 1
src/core/jobs.h

@@ -123,7 +123,7 @@ LIST_TYPE(_starpu_job,
 	 * so we need a flag to differentiate them from "normal" tasks. */
 	unsigned reduction_task;
 
-	/* Used by MIC driver to record codelet start time instead of using a
+	/* Used to record codelet start time instead of using a
 	 * local variable */
 	struct timespec cl_start;
 

+ 20 - 10
src/datawizard/data_request.c

@@ -415,13 +415,15 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
 	return 0;
 }
 
-int _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc)
+int _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc, unsigned *pushed)
 {
 	struct _starpu_data_request *r;
 	struct _starpu_data_request_list *new_data_requests;
 	struct _starpu_data_request_list *empty_list;
 	int ret = 0;
 
+	*pushed = 0;
+
 	/* Here helgrind would should that this is an un protected access.
 	 * We however don't care about missing an entry, we will get called
 	 * again sooner or later. */
@@ -479,6 +481,8 @@ int _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc)
 			_starpu_data_request_list_push_back(new_data_requests, r);
 			break;
 		}
+
+		*pushed++;
 	}
 
 	while (!_starpu_data_request_list_empty(local_list))
@@ -500,13 +504,15 @@ int _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc)
 	return ret;
 }
 
-void _starpu_handle_node_prefetch_requests(unsigned src_node, unsigned may_alloc)
+void _starpu_handle_node_prefetch_requests(unsigned src_node, unsigned may_alloc, unsigned *pushed)
 {
 	struct _starpu_data_request *r;
 	struct _starpu_data_request_list *new_data_requests;
 	struct _starpu_data_request_list *new_prefetch_requests;
 	struct _starpu_data_request_list *empty_list;
 
+	*pushed = 0;
+
 	if (_starpu_data_request_list_empty(prefetch_requests[src_node]))
 		return;
 
@@ -563,6 +569,8 @@ void _starpu_handle_node_prefetch_requests(unsigned src_node, unsigned may_alloc
 			}
 			break;
 		}
+
+		*pushed++;
 	}
 
 	while(!_starpu_data_request_list_empty(local_list))
@@ -590,7 +598,7 @@ void _starpu_handle_node_prefetch_requests(unsigned src_node, unsigned may_alloc
 	_starpu_data_request_list_delete(local_list);
 }
 
-static void _handle_pending_node_data_requests(unsigned src_node, unsigned force)
+static int _handle_pending_node_data_requests(unsigned src_node, unsigned force)
 {
 //	_STARPU_DEBUG("_starpu_handle_pending_node_data_requests ...\n");
 //
@@ -599,12 +607,12 @@ static void _handle_pending_node_data_requests(unsigned src_node, unsigned force
 	unsigned taken, kept;
 
 	if (_starpu_data_request_list_empty(data_requests_pending[src_node]))
-		return;
+		return 0;
 
 	empty_list = _starpu_data_request_list_new();
 	if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_pending_list_mutex[src_node]) && !force)
 		/* List is busy, do not bother with it */
-		return;
+		return 0;
 
 	/* for all entries of the list */
 	struct _starpu_data_request_list *local_list = data_requests_pending[src_node];
@@ -613,7 +621,7 @@ static void _handle_pending_node_data_requests(unsigned src_node, unsigned force
 		/* there is no request */
 		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
 		_starpu_data_request_list_delete(empty_list);
-		return;
+		return 0;
 	}
 	data_requests_pending[src_node] = empty_list;
 
@@ -680,16 +688,18 @@ static void _handle_pending_node_data_requests(unsigned src_node, unsigned force
 
 	_starpu_data_request_list_delete(local_list);
 	_starpu_data_request_list_delete(new_data_requests_pending);
+
+	return taken - kept;
 }
 
-void _starpu_handle_pending_node_data_requests(unsigned src_node)
+int _starpu_handle_pending_node_data_requests(unsigned src_node)
 {
-	_handle_pending_node_data_requests(src_node, 0);
+	return _handle_pending_node_data_requests(src_node, 0);
 }
 
-void _starpu_handle_all_pending_node_data_requests(unsigned src_node)
+int _starpu_handle_all_pending_node_data_requests(unsigned src_node)
 {
-	_handle_pending_node_data_requests(src_node, 1);
+	return _handle_pending_node_data_requests(src_node, 1);
 }
 
 int _starpu_check_that_no_data_request_exists(unsigned node)

+ 5 - 5
src/datawizard/data_request.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009, 2010, 2013  Université de Bordeaux 1
+ * Copyright (C) 2009-2010, 2013-2014  Université de Bordeaux 1
  * Copyright (C) 2010, 2011, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -110,11 +110,11 @@ void _starpu_init_data_request_lists(void);
 void _starpu_deinit_data_request_lists(void);
 void _starpu_post_data_request(struct _starpu_data_request *r, unsigned handling_node);
 /* returns 0 if we have pushed all requests, -EBUSY or -ENOMEM otherwise */
-int _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc);
-void _starpu_handle_node_prefetch_requests(unsigned src_node, unsigned may_alloc);
+int _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc, unsigned *pushed);
+void _starpu_handle_node_prefetch_requests(unsigned src_node, unsigned may_alloc, unsigned *pushed);
 
-void _starpu_handle_pending_node_data_requests(unsigned src_node);
-void _starpu_handle_all_pending_node_data_requests(unsigned src_node);
+int _starpu_handle_pending_node_data_requests(unsigned src_node);
+int _starpu_handle_all_pending_node_data_requests(unsigned src_node);
 
 int _starpu_check_that_no_data_request_exists(unsigned node);
 

+ 26 - 7
src/datawizard/datawizard.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009, 2010, 2012-2013  Université de Bordeaux 1
+ * Copyright (C) 2009-2010, 2012-2014  Université de Bordeaux 1
  * Copyright (C) 2010, 2011, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -24,8 +24,10 @@
 #include <msg/msg.h>
 #endif
 
-void _starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc)
+int __starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc, unsigned push_requests)
 {
+	int ret = 0;
+
 #if STARPU_DEVEL
 #warning FIXME
 #endif
@@ -35,11 +37,28 @@ void _starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc)
 	STARPU_UYIELD();
 
 	/* in case some other driver requested data */
-	_starpu_handle_pending_node_data_requests(memory_node);
-	if (_starpu_handle_node_data_requests(memory_node, may_alloc) == 0)
-		/* We pushed all pending requests, we can afford pushing
-		 * prefetch requests */
-		_starpu_handle_node_prefetch_requests(memory_node, may_alloc);
+	if (_starpu_handle_pending_node_data_requests(memory_node))
+		ret = 1;
+	if (push_requests)
+	{
+		unsigned pushed;
+		if (_starpu_handle_node_data_requests(memory_node, may_alloc, &pushed) == 0)
+		{
+			if (pushed)
+				ret = 1;
+			/* We pushed all pending requests, we can afford pushing
+			 * prefetch requests */
+			_starpu_handle_node_prefetch_requests(memory_node, may_alloc, &pushed);
+		}
+		if (pushed)
+			ret = 1;
+	}
 	_starpu_execute_registered_progression_hooks();
+
+	return ret;
 }
 
+int _starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc)
+{
+	__starpu_datawizard_progress(memory_node, may_alloc, 1);
+}

+ 3 - 2
src/datawizard/datawizard.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009, 2010  Université de Bordeaux 1
+ * Copyright (C) 2009, 2010, 2014  Université de Bordeaux 1
  * Copyright (C) 2010, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -33,6 +33,7 @@
 
 #include <core/dependencies/implicit_data_deps.h>
 
-void _starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc);
+int __starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc, unsigned push_requests);
+int _starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc);
 
 #endif // __DATAWIZARD_H__

+ 69 - 21
src/drivers/cuda/driver_cuda.c

@@ -46,6 +46,7 @@ static cudaStream_t out_transfer_streams[STARPU_MAXCUDADEVS];
 static cudaStream_t in_transfer_streams[STARPU_MAXCUDADEVS];
 static cudaStream_t peer_transfer_streams[STARPU_MAXCUDADEVS][STARPU_MAXCUDADEVS];
 static struct cudaDeviceProp props[STARPU_MAXCUDADEVS];
+static cudaEvent_t task_events[STARPU_MAXCUDADEVS];
 #endif /* STARPU_USE_CUDA */
 
 void
@@ -252,6 +253,10 @@ static void init_context(unsigned devid)
 
 	workerid = starpu_worker_get_id();
 
+	cures = cudaEventCreate(&task_events[workerid]);
+	if (STARPU_UNLIKELY(cures))
+		STARPU_CUDA_REPORT_ERROR(cures);
+
 	cures = cudaStreamCreate(&streams[workerid]);
 	if (STARPU_UNLIKELY(cures))
 		STARPU_CUDA_REPORT_ERROR(cures);
@@ -278,6 +283,7 @@ static void deinit_context(int workerid)
 	int devid = starpu_worker_get_devid(workerid);
 	int i;
 
+	cudaEventDestroy(task_events[workerid]);
 	cudaStreamDestroy(streams[workerid]);
 	cudaStreamDestroy(in_transfer_streams[devid]);
 	cudaStreamDestroy(out_transfer_streams[devid]);
@@ -327,21 +333,22 @@ void _starpu_init_cuda(void)
 	STARPU_ASSERT(ncudagpus <= STARPU_MAXCUDADEVS);
 }
 
-static int execute_job_on_cuda(struct _starpu_job *j, struct _starpu_worker *args)
+static int start_job_on_cuda(struct _starpu_job *j, struct _starpu_worker *args)
 {
 	int ret;
 
 	STARPU_ASSERT(j);
 	struct starpu_task *task = j->task;
 
-	struct timespec codelet_start, codelet_end;
-
 	int profiling = starpu_profiling_status_get();
 
 	STARPU_ASSERT(task);
 	struct starpu_codelet *cl = task->cl;
 	STARPU_ASSERT(cl);
 
+	_starpu_set_current_task(task);
+	args->current_task = j->task;
+
 	ret = _starpu_fetch_task_input(j);
 	if (ret != 0)
 	{
@@ -351,7 +358,7 @@ static int execute_job_on_cuda(struct _starpu_job *j, struct _starpu_worker *arg
 		return -EAGAIN;
 	}
 
-	_starpu_driver_start_job(args, j, &codelet_start, 0, profiling);
+	_starpu_driver_start_job(args, j, &j->cl_start, 0, profiling);
 
 #if defined(HAVE_CUDA_MEMCPY_PEER) && !defined(STARPU_SIMGRID)
 	/* We make sure we do manipulate the proper device */
@@ -367,18 +374,28 @@ static int execute_job_on_cuda(struct _starpu_job *j, struct _starpu_worker *arg
 		_starpu_simgrid_execute_job(j, &args->perf_arch, NAN);
 #else
 		func(_STARPU_TASK_GET_INTERFACES(task), task->cl_arg);
-		if (cl->cuda_flags[j->nimpl] & STARPU_CUDA_ASYNC)
-			cudaStreamSynchronize(starpu_cuda_get_local_stream());
 #endif
 	}
 
+	return 0;
+}
+
+static void finish_job_on_cuda(struct _starpu_job *j, struct _starpu_worker *args)
+{
+	struct timespec codelet_end;
+
+	int profiling = starpu_profiling_status_get();
+
+	_starpu_set_current_task(NULL);
+	args->current_task = NULL;
+
 	_starpu_driver_end_job(args, j, &args->perf_arch, &codelet_end, 0, profiling);
 
-	_starpu_driver_update_job_feedback(j, args, &args->perf_arch, &codelet_start, &codelet_end, profiling);
+	_starpu_driver_update_job_feedback(j, args, &args->perf_arch, &j->cl_start, &codelet_end, profiling);
 
 	_starpu_push_task_output(j);
 
-	return 0;
+	_starpu_handle_job_termination(j);
 }
 
 /* XXX Should this be merged with _starpu_init_cuda ? */
@@ -440,12 +457,30 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker *args)
 	unsigned memnode = args->memory_node;
 	int workerid = args->workerid;
 
-	_STARPU_TRACE_START_PROGRESS(memnode);
-	_starpu_datawizard_progress(memnode, 1);
-	_STARPU_TRACE_END_PROGRESS(memnode);
-
 	struct starpu_task *task;
-	struct _starpu_job *j = NULL;
+	struct _starpu_job *j;
+
+	task = starpu_task_get_current();
+
+	if (task)
+	{
+		/* On-going asynchronous task, check for its termination first */
+		cudaError_t cures = cudaEventQuery(task_events[workerid]);
+
+		if (cures != cudaSuccess)
+		{
+			/* Not ready yet, no better thing to do than waiting */
+			__starpu_datawizard_progress(memnode, 1, 0);
+
+			STARPU_ASSERT(cures == cudaErrorNotReady);
+			return 0;
+		}
+
+		/* Asynchronous task completed! */
+		finish_job_on_cuda(_starpu_get_job_associated_to_task(task), args);
+	}
+
+	__starpu_datawizard_progress(memnode, 1, 1);
 
 	task = _starpu_get_worker_task(args, workerid, memnode);
 
@@ -462,13 +497,8 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker *args)
 		return 0;
 	}
 
-	_starpu_set_current_task(task);
-	args->current_task = j->task;
-
-	int res = execute_job_on_cuda(j, args);
-
-	_starpu_set_current_task(NULL);
-	args->current_task = NULL;
+	_STARPU_TRACE_END_PROGRESS(memnode);
+	int res = start_job_on_cuda(j, args);
 
 	if (res)
 	{
@@ -483,7 +513,23 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker *args)
 		}
 	}
 
-	_starpu_handle_job_termination(j);
+#ifndef STARPU_SIMGRID
+	if (task->cl->cuda_flags[j->nimpl] & STARPU_CUDA_ASYNC)
+	{
+		/* Record event to synchronize with task termination later */
+		cudaEventRecord(task_events[workerid], starpu_cuda_get_local_stream());
+	}
+	else
+#else
+#ifdef STARPU_DEVEL
+#warning No CUDA asynchronous execution with simgrid yet.
+#endif
+#endif
+	/* Synchronous execution */
+	{
+		finish_job_on_cuda(j, args);
+	}
+	_STARPU_TRACE_START_PROGRESS(memnode);
 
 	return 0;
 }
@@ -516,8 +562,10 @@ void *_starpu_cuda_worker(void *arg)
 	struct _starpu_worker* args = arg;
 
 	_starpu_cuda_driver_init(args);
+	_STARPU_TRACE_START_PROGRESS(memnode);
 	while (_starpu_machine_is_running())
 		_starpu_cuda_driver_run_once(args);
+	_STARPU_TRACE_END_PROGRESS(memnode);
 	_starpu_cuda_driver_deinit(args);
 
 	return NULL;