Browse Source

Create a stream dedicated to StarPU's async data transfers so that we can
overlap computation and communications for real.

Cédric Augonnet 14 years ago
parent
commit
8412f45ae4

+ 1 - 1
src/datawizard/copy_driver.c

@@ -129,7 +129,7 @@ static int copy_data_1_to_1_generic(starpu_data_handle handle, struct starpu_dat
 				cures = cudaEventCreate(&req->async_channel.cuda_event);
 				if (STARPU_UNLIKELY(cures != cudaSuccess)) STARPU_CUDA_REPORT_ERROR(cures);
 
-				stream = starpu_cuda_get_local_stream();
+				stream = starpu_cuda_get_local_transfer_stream();
 				ret = copy_methods->cuda_to_ram_async(src_interface, src_node, dst_interface, dst_node, stream);
 
 				cures = cudaEventRecord(req->async_channel.cuda_event, stream);

+ 14 - 1
src/drivers/cuda/driver_cuda.c

@@ -31,6 +31,7 @@
 static int ncudagpus;
 
 static cudaStream_t streams[STARPU_NMAXWORKERS];
+static cudaStream_t transfer_streams[STARPU_NMAXWORKERS];
 
 /* In case we want to cap the amount of memory available on the GPUs by the
  * mean of the STARPU_LIMIT_GPU_MEM, we allocate a big buffer when the driver
@@ -83,6 +84,13 @@ static void unlimit_gpu_mem_if_needed(int devid)
 	}
 }
 
+cudaStream_t starpu_cuda_get_local_transfer_stream(void)
+{
+	int worker = starpu_worker_get_id();
+
+	return transfer_streams[worker];
+}
+
 cudaStream_t starpu_cuda_get_local_stream(void)
 {
 	int worker = starpu_worker_get_id();
@@ -107,6 +115,10 @@ static void init_context(int devid)
 	cures = cudaStreamCreate(&streams[workerid]);
 	if (STARPU_UNLIKELY(cures))
 		STARPU_CUDA_REPORT_ERROR(cures);
+
+	cures = cudaStreamCreate(&transfer_streams[workerid]);
+	if (STARPU_UNLIKELY(cures))
+		STARPU_CUDA_REPORT_ERROR(cures);
 }
 
 static void deinit_context(int workerid, int devid)
@@ -114,6 +126,7 @@ static void deinit_context(int workerid, int devid)
 	cudaError_t cures;
 
 	cudaStreamDestroy(streams[workerid]);
+	cudaStreamDestroy(transfer_streams[workerid]);
 
 	unlimit_gpu_mem_if_needed(devid);
 
@@ -171,7 +184,7 @@ static int execute_job_on_cuda(starpu_job_t j, struct starpu_worker_s *args)
 
 	if (calibrate_model)
 	{
-		cudaError_t cures = cudaThreadSynchronize();
+		cudaError_t cures = cudaStreamSynchronize(starpu_cuda_get_local_transfer_stream());
 		if (STARPU_UNLIKELY(cures))
 			STARPU_CUDA_REPORT_ERROR(cures);
 	}

+ 1 - 0
src/drivers/cuda/driver_cuda.h

@@ -43,6 +43,7 @@ unsigned _starpu_get_cuda_device_count(void);
 #ifdef STARPU_USE_CUDA
 void _starpu_init_cuda(void);
 void *_starpu_cuda_worker(void *);
+cudaStream_t starpu_cuda_get_local_transfer_stream(void);
 #endif
 
 #endif //  __DRIVER_CUDA_H__

+ 2 - 0
src/util/starpu_cublas.c

@@ -25,6 +25,8 @@ static void init_cublas_func(void *args __attribute__((unused)))
 	cublasStatus cublasst = cublasInit();
 	if (STARPU_UNLIKELY(cublasst))
 		STARPU_CUBLAS_REPORT_ERROR(cublasst);
+
+	cublasSetKernelStream(starpu_cuda_get_local_stream());
 }
 
 static void shutdown_cublas_func(void *args __attribute__((unused)))