Kaynağa Gözat

- Use events instead of streams to check whether a data transfer is terminated
or not.
- The "starpu_get_local_cuda_stream" is not a helper anymore: this is now a
service provided directly by the CUDA driver.

Cédric Augonnet 15 yıl önce
ebeveyn
işleme
3ea8462620

+ 0 - 1
src/Makefile.am

@@ -133,7 +133,6 @@ libstarpu_la_SOURCES = 						\
 	util/malloc.c						\
 	util/execute_on_all.c					\
 	util/starpu_create_sync_task.c				\
-	util/starpu_get_local_cuda_stream.c			\
 	util/starpu_cublas.c
 	
 if USE_CPU

+ 27 - 24
src/datawizard/copy-driver.c

@@ -73,20 +73,6 @@ void starpu_wake_all_blocked_workers(void)
 static unsigned communication_cnt = 0;
 #endif
 
-#ifdef USE_CUDA
-static cudaStream_t *create_cuda_stream(struct data_request_s *req)
-{
-	cudaStream_t *stream = &(req->async_channel).stream;
-
-	cudaError_t cures;
-	cures = cudaStreamCreate(stream);
-	if (STARPU_UNLIKELY(cures))
-		CUDA_REPORT_ERROR(cures);
-
-	return stream;
-}
-#endif
-
 static int copy_data_1_to_1_generic(starpu_data_handle handle, uint32_t src_node, uint32_t dst_node, struct data_request_s *req __attribute__((unused)))
 {
 	int ret = 0;
@@ -104,6 +90,11 @@ static int copy_data_1_to_1_generic(starpu_data_handle handle, uint32_t src_node
 	STARPU_ASSERT(handle->per_node[src_node].allocated);
 	STARPU_ASSERT(handle->per_node[dst_node].allocated);
 
+#ifdef USE_CUDA
+cudaError_t cures;
+cudaStream_t *stream;
+#endif
+
 	switch (dst_kind) {
 	case RAM:
 		switch (src_kind) {
@@ -126,8 +117,14 @@ static int copy_data_1_to_1_generic(starpu_data_handle handle, uint32_t src_node
 						copy_methods->cuda_to_ram(handle, src_node, dst_node);
 					}
 					else {
-						cudaStream_t *stream = create_cuda_stream(req);
+						cures = cudaEventCreate(&req->async_channel.event);
+						STARPU_ASSERT(cures == cudaSuccess);
+
+						stream = starpu_get_local_stream();
 						ret = copy_methods->cuda_to_ram_async(handle, src_node, dst_node, stream);
+
+						cures = cudaEventRecord(req->async_channel.event, *stream);
+						STARPU_ASSERT(cures == cudaSuccess);
 					}
 				}
 				else
@@ -161,8 +158,14 @@ static int copy_data_1_to_1_generic(starpu_data_handle handle, uint32_t src_node
 					copy_methods->ram_to_cuda(handle, src_node, dst_node);
 				}
 				else {
-					cudaStream_t *stream = create_cuda_stream(req);
+					cures = cudaEventCreate(&req->async_channel.event);
+					STARPU_ASSERT(cures == cudaSuccess);
+
+					stream = starpu_get_local_stream();
 					ret = copy_methods->ram_to_cuda_async(handle, src_node, dst_node, stream);
+
+					cures = cudaEventRecord(req->async_channel.event, *stream);
+					STARPU_ASSERT(cures == cudaSuccess);
 				}
 				break;
 			case CUDA_RAM:
@@ -252,20 +255,20 @@ void driver_wait_request_completion(starpu_async_channel *async_channel __attrib
 {
 	node_kind kind = get_node_kind(handling_node);
 #ifdef USE_CUDA
-	cudaStream_t stream;
+	cudaEvent_t event;
 	cudaError_t cures;
 #endif
 
 	switch (kind) {
 #ifdef USE_CUDA
 		case CUDA_RAM:
-			stream = (*async_channel).stream;
+			event = (*async_channel).event;
 
-			cures = cudaStreamSynchronize(stream);
+			cures = cudaEventSynchronize(event);
 			if (STARPU_UNLIKELY(cures))
 				CUDA_REPORT_ERROR(cures);				
 
-			cures = cudaStreamDestroy(stream);
+			cures = cudaEventDestroy(event);
 			if (STARPU_UNLIKELY(cures))
 				CUDA_REPORT_ERROR(cures);				
 
@@ -283,17 +286,17 @@ unsigned driver_test_request_completion(starpu_async_channel *async_channel __at
 	node_kind kind = get_node_kind(handling_node);
 	unsigned success;
 #ifdef USE_CUDA
-	cudaStream_t stream;
+	cudaEvent_t event;
 #endif
 
 	switch (kind) {
 #ifdef USE_CUDA
 		case CUDA_RAM:
-			stream = (*async_channel).stream;
+			event = (*async_channel).event;
 
-			success = (cudaStreamQuery(stream) == cudaSuccess);
+			success = (cudaEventQuery(event) == cudaSuccess);
 			if (success)
-				cudaStreamDestroy(stream);
+				cudaEventDestroy(event);
 
 			break;
 #endif

+ 1 - 1
src/datawizard/copy-driver.h

@@ -35,7 +35,7 @@ struct data_request_s;
 typedef union {
 	int dummy;
 #ifdef USE_CUDA
-	cudaStream_t stream;
+	cudaEvent_t event;
 #endif
 } starpu_async_channel;
 

+ 10 - 3
src/datawizard/user_interactions.c

@@ -185,7 +185,7 @@ static void _prefetch_data_on_node(void *arg)
 
 }
 
-int starpu_prefetch_data_on_node(starpu_data_handle handle, unsigned node, unsigned async)
+int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle handle, unsigned node, unsigned async, starpu_access_mode mode)
 {
 	STARPU_ASSERT(handle);
 
@@ -203,10 +203,12 @@ int starpu_prefetch_data_on_node(starpu_data_handle handle, unsigned node, unsig
 		.finished = 0
 	};
 
-	if (!attempt_to_submit_data_request_from_apps(handle, STARPU_R, _prefetch_data_on_node, &statenode))
+	if (!attempt_to_submit_data_request_from_apps(handle, mode, _prefetch_data_on_node, &statenode))
 	{
 		/* we can immediately proceed */
-		fetch_data_on_node(handle, node, 1, 0, async);
+		uint8_t read = (mode != STARPU_W);
+		uint8_t write = (mode != STARPU_R);
+		fetch_data_on_node(handle, node, read, write, async);
 
 		/* remove the "lock"/reference */
 		if (!async)
@@ -225,3 +227,8 @@ int starpu_prefetch_data_on_node(starpu_data_handle handle, unsigned node, unsig
 
 	return 0;
 }
+
+int starpu_prefetch_data_on_node(starpu_data_handle handle, unsigned node, unsigned async)
+{
+	return _starpu_prefetch_data_on_node_with_mode(handle, node, async, STARPU_R);
+}

+ 17 - 2
src/drivers/cuda/driver_cuda.c

@@ -22,6 +22,15 @@
 /* the number of CUDA devices */
 static int ncudagpus;
 
+static cudaStream_t streams[STARPU_NMAXWORKERS];
+
+cudaStream_t *starpu_get_local_stream(void)
+{
+	int worker = starpu_get_worker_id();
+
+	return &streams[worker];
+}
+
 static void init_context(int devid)
 {
 	cudaError_t cures;
@@ -32,12 +41,18 @@ static void init_context(int devid)
 
 	/* force CUDA to initialize the context for real */
 	cudaFree(0);
+
+	cures = cudaStreamCreate(starpu_get_local_stream());
+	if (STARPU_UNLIKELY(cures))
+		CUDA_REPORT_ERROR(cures);
 }
 
-static void deinit_context(void)
+static void deinit_context(int workerid)
 {
 	cudaError_t cures;
 
+	cudaStreamDestroy(streams[workerid]);
+
 	/* cleanup the runtime API internal stuffs (which CUBLAS is using) */
 	cures = cudaThreadExit();
 	if (cures)
@@ -263,7 +278,7 @@ void *_starpu_cuda_worker(void *arg)
 
 	TRACE_WORKER_DEINIT_START
 
-	deinit_context();
+	deinit_context(args->workerid);
 
 #ifdef DATA_STATS
 	fprintf(stderr, "CUDA #%d computation %le comm %le (%lf \%%)\n", args->id, args->jobq->total_computation_time, args->jobq->total_communication_time, args->jobq->total_communication_time*100.0/args->jobq->total_computation_time);

+ 0 - 47
src/util/starpu_get_local_cuda_stream.c

@@ -1,47 +0,0 @@
-/*
- * StarPU
- * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
-
-#include <starpu.h>
-#include <common/config.h>
-
-#ifdef USE_CUDA
-static cudaStream_t streams[STARPU_NMAXWORKERS];
-static unsigned cuda_streams_are_initalized = 0;
-
-static void init_stream_on_worker(void *arg __attribute__((unused)))
-{
-	cudaError_t cures;
-	cures = cudaStreamCreate(starpu_helper_get_local_stream());
-	if (STARPU_UNLIKELY(cures))
-		CUDA_REPORT_ERROR(cures);
-}
-
-void starpu_helper_create_per_gpu_streams(void)
-{
-	if (!cuda_streams_are_initalized)
-	{
-		starpu_execute_on_each_worker(init_stream_on_worker, NULL, CUDA);
-		cuda_streams_are_initalized = 1;
-	}
-}
-
-cudaStream_t *starpu_helper_get_local_stream(void)
-{
-	int worker = starpu_get_worker_id();
-
-	return &streams[worker];
-}
-#endif