Browse Source

- merge trunk

Olivier Aumage 11 years ago
parent
commit
1712f84e3b

+ 2 - 0
ChangeLog

@@ -87,6 +87,8 @@ Changes:
   * StarPU-MPI: Fix for being able to receive data with the same tag
     from several nodes (see mpi/tests/gather.c)
   * StarPU-MPI: Fix overzealous allocation of memory.
+  * Interfaces: Allow interface implementation to change pointers at will, in
+    unpack notably.
 
 Small changes:
   * Rename function starpu_trace_user_event() as

+ 3 - 3
doc/doxygen/chapters/api/data_management.doxy

@@ -163,9 +163,9 @@ will commit their changes in main memory (node 0).
 Issue a prefetch request for a given data to a given node, i.e.
 requests that the data be replicated to the given node, so that it is
 available there for tasks. If the \p async parameter is 0, the call will
-block until the transfer is achieved, else the call will return as
-soon as the request is scheduled (which may however have to wait for a
-task completion).
+block until the transfer is achieved, else the call will return immediately,
+after having just queued the request. In the latter case, the request will
+asynchronously wait for the completion of any task writing on the data.
 
 \fn starpu_data_handle_t starpu_data_lookup(const void *ptr)
 \ingroup API_Data_Management

+ 4 - 4
mpi/src/starpu_mpi.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009, 2010-2013  Université de Bordeaux 1
+ * Copyright (C) 2009, 2010-2014  Université de Bordeaux 1
  * Copyright (C) 2010, 2011, 2012, 2013, 2014  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -934,7 +934,7 @@ static void _starpu_mpi_copy_cb(void* arg)
 		struct starpu_data_interface_ops *itf_src = starpu_data_get_interface_ops(args->copy_handle);
 		struct starpu_data_interface_ops *itf_dst = starpu_data_get_interface_ops(args->data_handle);
 		STARPU_ASSERT_MSG(itf_dst->unpack_data, "The data interface does not define an unpack function\n");
-		itf_dst->unpack_data(args->data_handle, 0, args->buffer, itf_src->get_size(args->copy_handle));
+		itf_dst->unpack_data(args->data_handle, STARPU_MAIN_RAM, args->buffer, itf_src->get_size(args->copy_handle));
 		free(args->buffer);
 	}
 	else
@@ -946,12 +946,12 @@ static void _starpu_mpi_copy_cb(void* arg)
 		if (!itf->copy_methods->ram_to_ram)
 		{
 			_STARPU_MPI_DEBUG(3, "Initiating any_to_any copy..\n");
-			itf->copy_methods->any_to_any(itf_src, 0, itf_dst, 0, NULL);
+			itf->copy_methods->any_to_any(itf_src, STARPU_MAIN_RAM, itf_dst, STARPU_MAIN_RAM, NULL);
 		}
 		else
 		{
 			_STARPU_MPI_DEBUG(3, "Initiating ram_to_ram copy..\n");
-			itf->copy_methods->ram_to_ram(itf_src, 0, itf_dst, 0);
+			itf->copy_methods->ram_to_ram(itf_src, STARPU_MAIN_RAM, itf_dst, STARPU_MAIN_RAM);
 		}
 	}
 

+ 50 - 0
mpi/src/starpu_mpi_task_insert.c

@@ -30,6 +30,8 @@
 #include <starpu_mpi_task_insert.h>
 #include <starpu_mpi_cache.h>
 
+typedef void (*_starpu_callback_func_t)(void *);
+
 static
 int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int *do_execute, int *inconsistent_execute, int *dest, size_t *size_on_nodes)
 {
@@ -279,6 +281,22 @@ int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nod
 		{
 			va_arg(varg_list_copy, void *);
 		}
+		else if (arg_type==STARPU_PROLOGUE_CALLBACK)
+                {
+                        (void)va_arg(varg_list, _starpu_callback_func_t);
+		}
+                else if (arg_type==STARPU_PROLOGUE_CALLBACK_ARG)
+                {
+                        (void)va_arg(varg_list, void *);
+                }
+                else if (arg_type==STARPU_PROLOGUE_CALLBACK_POP)
+                {
+			(void)va_arg(varg_list, _starpu_callback_func_t);
+                }
+                else if (arg_type==STARPU_PROLOGUE_CALLBACK_POP_ARG)
+                {
+                        (void)va_arg(varg_list, void *);
+		}
 		else if (arg_type==STARPU_PRIORITY)
 		{
 			va_arg(varg_list_copy, int);
@@ -411,6 +429,22 @@ int _starpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, stru
 		{
 			va_arg(varg_list_copy, void *);
 		}
+		else if (arg_type==STARPU_PROLOGUE_CALLBACK)
+                {
+                        (void)va_arg(varg_list, _starpu_callback_func_t);
+		}
+                else if (arg_type==STARPU_PROLOGUE_CALLBACK_ARG)
+                {
+                        (void)va_arg(varg_list, void *);
+                }
+                else if (arg_type==STARPU_PROLOGUE_CALLBACK_POP)
+                {
+			(void)va_arg(varg_list, _starpu_callback_func_t);
+                }
+                else if (arg_type==STARPU_PROLOGUE_CALLBACK_POP_ARG)
+                {
+                        (void)va_arg(varg_list, void *);
+		}
 		else if (arg_type==STARPU_PRIORITY)
 		{
 			va_arg(varg_list_copy, int);
@@ -535,6 +569,22 @@ int _starpu_mpi_task_postbuild_v(MPI_Comm comm, struct starpu_codelet *codelet,
 		{
 			va_arg(varg_list_copy, void *);
 		}
+		else if (arg_type==STARPU_PROLOGUE_CALLBACK)
+                {
+                        (void)va_arg(varg_list, _starpu_callback_func_t);
+		}
+                else if (arg_type==STARPU_PROLOGUE_CALLBACK_ARG)
+                {
+                        (void)va_arg(varg_list, void *);
+                }
+                else if (arg_type==STARPU_PROLOGUE_CALLBACK_POP)
+                {
+			(void)va_arg(varg_list, _starpu_callback_func_t);
+                }
+                else if (arg_type==STARPU_PROLOGUE_CALLBACK_POP_ARG)
+                {
+                        (void)va_arg(varg_list, void *);
+		}
 		else if (arg_type==STARPU_PRIORITY)
 		{
 			va_arg(varg_list_copy, int);

+ 1 - 1
src/core/perfmodel/perfmodel_history.c

@@ -56,7 +56,7 @@ size_t _starpu_job_get_data_size(struct starpu_perfmodel *model, struct starpu_p
 {
 	struct starpu_task *task = j->task;
 
-	if (model && model->per_arch[arch->type][arch->devid][arch->ncore][nimpl].size_base)
+	if (model && model->per_arch && model->per_arch[arch->type][arch->devid][arch->ncore][nimpl].size_base)
 	{
 		return model->per_arch[arch->type][arch->devid][arch->ncore][nimpl].size_base(task, arch, nimpl);
 	}

+ 0 - 8
src/core/workers.c

@@ -1952,11 +1952,3 @@ unsigned starpu_worker_get_sched_ctx_list(int workerid, unsigned **sched_ctxs)
 	return nsched_ctxs;
 }
 
-unsigned _starpu_worker_have_only_CPUs()
-{
-	int i;
-	for(i = 0; i < STARPU_NMAXWORKERS; i++)
-		if(config.workers[i].arch != STARPU_CPU_WORKER)
-			return 0;
-	return 1;
-}

+ 0 - 1
src/core/workers.h

@@ -435,5 +435,4 @@ unsigned _starpu_worker_mutex_is_sched_mutex(int workerid, starpu_pthread_mutex_
 
 int _starpu_worker_get_nsched_ctxs(int workerid);
 
-unsigned _starpu_worker_have_only_CPUs(void);
 #endif // __WORKERS_H__

+ 7 - 1
src/datawizard/coherency.c

@@ -194,7 +194,13 @@ static int worker_supports_direct_access(unsigned node, unsigned handling_node)
 			enum starpu_node_kind kind = starpu_node_get_kind(handling_node);
 			/* GPUs not always allow direct remote access: if CUDA4
 			 * is enabled, we allow two CUDA devices to communicate. */
-			return kind == STARPU_CPU_RAM || kind == STARPU_CUDA_RAM;
+			return
+#if 0
+				/* CUDA does not seem very safe with concurrent
+				 * transfer queueing, avoid queueing from CPUs */
+				kind == STARPU_CPU_RAM ||
+#endif
+				kind == STARPU_CUDA_RAM;
 		}
 #else
 			/* Direct GPU-GPU transfers are not allowed in general */

+ 4 - 4
src/datawizard/copy_driver.c

@@ -163,7 +163,7 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
 			cures = cudaEventCreate(&req->async_channel.event.cuda_event);
 			if (STARPU_UNLIKELY(cures != cudaSuccess)) STARPU_CUDA_REPORT_ERROR(cures);
 
-			stream = starpu_cuda_get_out_transfer_stream(src_node);
+			stream = starpu_cuda_get_local_out_transfer_stream();
 			if (copy_methods->cuda_to_ram_async)
 				ret = copy_methods->cuda_to_ram_async(src_interface, src_node, dst_interface, dst_node, stream);
 			else
@@ -199,7 +199,7 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
 			if (STARPU_UNLIKELY(cures != cudaSuccess))
 				STARPU_CUDA_REPORT_ERROR(cures);
 
-			stream = starpu_cuda_get_in_transfer_stream(dst_node);
+			stream = starpu_cuda_get_local_in_transfer_stream();
 			if (copy_methods->ram_to_cuda_async)
 				ret = copy_methods->ram_to_cuda_async(src_interface, src_node, dst_interface, dst_node, stream);
 			else
@@ -531,7 +531,7 @@ int starpu_interface_copy(uintptr_t src, size_t src_offset, unsigned src_node, u
 				(void*) src + src_offset, src_node,
 				(void*) dst + dst_offset, dst_node,
 				size,
-				async_channel?starpu_cuda_get_out_transfer_stream(src_node):NULL,
+				async_channel?starpu_cuda_get_local_out_transfer_stream():NULL,
 				cudaMemcpyDeviceToHost);
 
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_CPU_RAM,STARPU_CUDA_RAM):
@@ -539,7 +539,7 @@ int starpu_interface_copy(uintptr_t src, size_t src_offset, unsigned src_node, u
 				(void*) src + src_offset, src_node,
 				(void*) dst + dst_offset, dst_node,
 				size,
-				async_channel?starpu_cuda_get_in_transfer_stream(dst_node):NULL,
+				async_channel?starpu_cuda_get_local_in_transfer_stream():NULL,
 				cudaMemcpyHostToDevice);
 
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_CUDA_RAM,STARPU_CUDA_RAM):

+ 3 - 3
src/datawizard/data_request.c

@@ -394,14 +394,14 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
 
 	if (r->retval == -EAGAIN)
 	{
-		/* The request was successful, but could not be terminted
-		 * immediatly. We will handle the completion of the request
+		/* The request was successful, but could not be terminated
+		 * immediately. We will handle the completion of the request
 		 * asynchronously. The request is put in the list of "pending"
 		 * requests in the meantime. */
 		_starpu_spin_unlock(&handle->header_lock);
 
 		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[r->handling_node]);
-		_starpu_data_request_list_push_front(data_requests_pending[r->handling_node], r);
+		_starpu_data_request_list_push_back(data_requests_pending[r->handling_node], r);
 		data_requests_npending[r->handling_node]++;
 		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[r->handling_node]);
 

+ 1 - 1
src/datawizard/filters.c

@@ -341,7 +341,6 @@ void starpu_data_unpartition(starpu_data_handle_t root_handle, unsigned gatherin
 		_starpu_spin_lock(&child_handle->header_lock);
 
 		_starpu_data_unregister_ram_pointer(child_handle);
-		_starpu_data_free_interfaces(child_handle);
 
 		for (worker = 0; worker < nworkers; worker++)
 		{
@@ -424,6 +423,7 @@ void starpu_data_unpartition(starpu_data_handle_t root_handle, unsigned gatherin
 	for (child = 0; child < root_handle->nchildren; child++)
 	{
 		starpu_data_handle_t child_handle = starpu_data_get_child(root_handle, child);
+		_starpu_data_free_interfaces(child_handle);
 		_starpu_spin_unlock(&child_handle->header_lock);
 		_starpu_spin_destroy(&child_handle->header_lock);
 

+ 1 - 1
src/datawizard/footprint.c

@@ -50,7 +50,7 @@ uint32_t _starpu_compute_buffers_footprint(struct starpu_perfmodel *model, struc
 	{
 		footprint = model->footprint(task);
 	}
-	else if (model != NULL && 
+	else if (model != NULL && model->per_arch &&
 			model->per_arch[arch->type] != NULL &&
 			model->per_arch[arch->type][arch->devid] != NULL &&
 			model->per_arch[arch->type][arch->devid][arch->ncore] != NULL &&

+ 2 - 5
src/datawizard/interfaces/data_interface.c

@@ -615,11 +615,8 @@ static void _starpu_data_unregister(starpu_data_handle_t handle, unsigned cohere
 	STARPU_ASSERT_MSG(handle->nchildren == 0, "data %p needs to be unpartitioned before unregistration", handle);
 	STARPU_ASSERT(!(nowait && handle->busy_count != 0));
 
-	/* no need to forbid the unregister in a task or callback when we have only CPUs,
-	   the data is on the RAM anyway */
-	unsigned only_cpus = _starpu_worker_have_only_CPUs();
 	int sequential_consistency = handle->sequential_consistency;
-	if (sequential_consistency && !nowait && !only_cpus)
+	if (sequential_consistency && !nowait)
 	{
 		STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_data_unregister must not be called from a task or callback, perhaps you can use starpu_data_unregister_submit instead");
 
@@ -757,7 +754,6 @@ static void _starpu_data_unregister(starpu_data_handle_t handle, unsigned cohere
 	size_t size = _starpu_data_get_size(handle);
 
 	_starpu_data_unregister_ram_pointer(handle);
-	_starpu_data_free_interfaces(handle);
 
 	/* Destroy the data now */
 	unsigned node;
@@ -777,6 +773,7 @@ static void _starpu_data_unregister(starpu_data_handle_t handle, unsigned cohere
 		if (local->allocated && local->automatically_allocated)
 			_starpu_request_mem_chunk_removal(handle, local, starpu_worker_get_memory_node(worker), size);
 	}
+	_starpu_data_free_interfaces(handle);
 
 	_starpu_memory_stats_free(handle);
 	_starpu_data_requester_list_delete(handle->req_list);

+ 39 - 15
src/datawizard/memalloc.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009-2013  Université de Bordeaux 1
+ * Copyright (C) 2009-2014  Université de Bordeaux 1
  * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -259,6 +259,8 @@ static size_t free_memory_on_node(struct _starpu_mem_chunk *mc, unsigned node)
 	if (mc->automatically_allocated &&
 		(!handle || replicate->refcnt == 0))
 	{
+		void *interface;
+
 		if (handle)
 			STARPU_ASSERT(replicate->allocated);
 
@@ -273,8 +275,14 @@ static size_t free_memory_on_node(struct _starpu_mem_chunk *mc, unsigned node)
 		}
 #endif
 
+		if (handle)
+			interface = replicate->data_interface;
+		else
+			interface = mc->chunk_interface;
+		STARPU_ASSERT(interface);
+
 		_STARPU_TRACE_START_FREE(node, mc->size);
-		mc->ops->free_data_on_node(mc->chunk_interface, node);
+		mc->ops->free_data_on_node(interface, node);
 		_STARPU_TRACE_END_FREE(node);
 
 		if (handle)
@@ -301,7 +309,8 @@ static size_t do_free_mem_chunk(struct _starpu_mem_chunk *mc, unsigned node)
 		mc->size = _starpu_data_get_size(handle);
 	}
 
-	mc->replicate->mc=NULL;
+	if (mc->replicate)
+		mc->replicate->mc=NULL;
 
 	/* free the actual buffer */
 	size = free_memory_on_node(mc, node);
@@ -309,7 +318,6 @@ static size_t do_free_mem_chunk(struct _starpu_mem_chunk *mc, unsigned node)
 	/* remove the mem_chunk from the list */
 	_starpu_mem_chunk_list_erase(mc_list[node], mc);
 
-	free(mc->chunk_interface);
 	_starpu_mem_chunk_delete(mc);
 
 	return size;
@@ -351,7 +359,7 @@ static size_t try_to_free_mem_chunk(struct _starpu_mem_chunk *mc, unsigned node)
 
 		if (mc->replicate->refcnt == 0)
 		{
-			/* Note taht there is no need to transfer any data or
+			/* Note that there is no need to transfer any data or
 			 * to update the status in terms of MSI protocol
 			 * because this memchunk is associated to a replicate
 			 * in "relaxed coherency" mode. */
@@ -409,22 +417,36 @@ static size_t try_to_free_mem_chunk(struct _starpu_mem_chunk *mc, unsigned node)
  * therefore not in the cache. */
 static void reuse_mem_chunk(unsigned node, struct _starpu_data_replicate *new_replicate, struct _starpu_mem_chunk *mc, unsigned is_already_in_mc_list)
 {
+	void *interface;
+
 	/* we found an appropriate mem chunk: so we get it out
 	 * of the "to free" list, and reassign it to the new
 	 * piece of data */
 
 	struct _starpu_data_replicate *old_replicate = mc->replicate;
-	old_replicate->allocated = 0;
-	old_replicate->automatically_allocated = 0;
-	old_replicate->initialized = 0;
+	if (old_replicate)
+	{
+		old_replicate->allocated = 0;
+		old_replicate->automatically_allocated = 0;
+		old_replicate->initialized = 0;
+		interface = old_replicate->data_interface;
+	}
+	else
+		interface = mc->chunk_interface;
 
 	new_replicate->allocated = 1;
 	new_replicate->automatically_allocated = 1;
 	new_replicate->initialized = 0;
 
 	STARPU_ASSERT(new_replicate->data_interface);
-	STARPU_ASSERT(mc->chunk_interface);
-	memcpy(new_replicate->data_interface, mc->chunk_interface, mc->size_interface);
+	STARPU_ASSERT(interface);
+	memcpy(new_replicate->data_interface, interface, mc->size_interface);
+
+	if (!old_replicate)
+	{
+		free(mc->chunk_interface);
+		mc->chunk_interface = NULL;
+	}
 
 	mc->data = new_replicate->handle;
 	/* mc->ops, mc->footprint and mc->interface should be
@@ -717,12 +739,8 @@ static struct _starpu_mem_chunk *_starpu_memchunk_init(struct _starpu_data_repli
 	mc->relaxed_coherency = replicate->relaxed_coherency;
 	mc->replicate = replicate;
 	mc->replicate->mc = mc;
-
-	/* Save a copy of the interface */
-	mc->chunk_interface = malloc(interface_size);
+	mc->chunk_interface = NULL;
 	mc->size_interface = interface_size;
-	STARPU_ASSERT(mc->chunk_interface);
-	memcpy(mc->chunk_interface, replicate->data_interface, interface_size);
 
 	return mc;
 }
@@ -761,8 +779,14 @@ void _starpu_request_mem_chunk_removal(starpu_data_handle_t handle, struct _star
 	 * by freeing this.  */
 	mc->size = size;
 
+	/* Also keep the interface parameters and pointers, for later reuse
+	 * while detached, or freed */
+	mc->chunk_interface = malloc(mc->size_interface);
+	memcpy(mc->chunk_interface, replicate->data_interface, mc->size_interface);
+
 	/* This memchunk doesn't have to do with the data any more. */
 	replicate->mc = NULL;
+	mc->replicate = NULL;
 	replicate->allocated = 0;
 	replicate->automatically_allocated = 0;
 

+ 8 - 6
src/datawizard/memalloc.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009, 2010, 2012-2013  Université de Bordeaux 1
+ * Copyright (C) 2009-2010, 2012-2014  Université de Bordeaux 1
  * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -33,11 +33,13 @@ LIST_TYPE(_starpu_mem_chunk,
 
 	uint32_t footprint;
 
-	/* The footprint of the data is not sufficient to determine whether two
-	 * pieces of data have the same layout (there could be collision in the
-	 * hash function ...) so we still keep a copy of the actual layout (ie.
-	 * the data interface) to stay on the safe side. We make a copy of
-	 * because when a data is deleted, the memory chunk remains.
+	/*
+	 * When re-using a memchunk, the footprint of the data is not
+	 * sufficient to determine whether two pieces of data have the same
+	 * layout (there could be collision in the hash function ...) so we
+	 * still keep a copy of the actual layout (ie. the data interface) to
+	 * stay on the safe side while the memchunk is detached from an actual
+	 * data.
 	 */
 	struct starpu_data_interface_ops *ops;
 	void *chunk_interface;

+ 3 - 3
src/datawizard/memory_nodes.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009-2013  Université de Bordeaux 1
+ * Copyright (C) 2009-2014  Université de Bordeaux 1
  * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -69,9 +69,9 @@ unsigned _starpu_memory_node_get_local_key(void)
 	memory_node = (unsigned *) STARPU_PTHREAD_GETSPECIFIC(memory_node_key);
 
 	/* in case this is called by the programmer, we assume the RAM node
-	   is the appropriate memory node ... so we return 0 XXX */
+	   is the appropriate memory node ... XXX */
 	if (STARPU_UNLIKELY(!memory_node))
-		return 0;
+		return STARPU_MAIN_RAM;
 
 	return *memory_node;
 }

+ 12 - 12
src/drivers/cuda/driver_cuda.c

@@ -42,8 +42,8 @@ static int ncudagpus;
 static size_t global_mem[STARPU_MAXCUDADEVS];
 #ifdef STARPU_USE_CUDA
 static cudaStream_t streams[STARPU_NMAXWORKERS];
-static cudaStream_t out_transfer_streams[STARPU_MAXCUDADEVS];
-static cudaStream_t in_transfer_streams[STARPU_MAXCUDADEVS];
+static cudaStream_t out_transfer_streams[STARPU_NMAXWORKERS];
+static cudaStream_t in_transfer_streams[STARPU_NMAXWORKERS];
 static cudaStream_t peer_transfer_streams[STARPU_MAXCUDADEVS][STARPU_MAXCUDADEVS];
 static struct cudaDeviceProp props[STARPU_MAXCUDADEVS];
 static cudaEvent_t task_events[STARPU_MAXCUDADEVS];
@@ -116,18 +116,18 @@ static void _starpu_cuda_limit_gpu_mem_if_needed(unsigned devid)
 }
 
 #ifdef STARPU_USE_CUDA
-cudaStream_t starpu_cuda_get_in_transfer_stream(unsigned node)
+cudaStream_t starpu_cuda_get_local_in_transfer_stream(void)
 {
-	int devid = _starpu_memory_node_get_devid(node);
+	int worker = starpu_worker_get_id();
 
-	return in_transfer_streams[devid];
+	return in_transfer_streams[worker];
 }
 
-cudaStream_t starpu_cuda_get_out_transfer_stream(unsigned node)
+cudaStream_t starpu_cuda_get_local_out_transfer_stream(void)
 {
-	int devid = _starpu_memory_node_get_devid(node);
+	int worker = starpu_worker_get_id();
 
-	return out_transfer_streams[devid];
+	return out_transfer_streams[worker];
 }
 
 cudaStream_t starpu_cuda_get_peer_transfer_stream(unsigned src_node, unsigned dst_node)
@@ -261,11 +261,11 @@ static void init_context(unsigned devid)
 	if (STARPU_UNLIKELY(cures))
 		STARPU_CUDA_REPORT_ERROR(cures);
 
-	cures = cudaStreamCreate(&in_transfer_streams[devid]);
+	cures = cudaStreamCreate(&in_transfer_streams[workerid]);
 	if (STARPU_UNLIKELY(cures))
 		STARPU_CUDA_REPORT_ERROR(cures);
 
-	cures = cudaStreamCreate(&out_transfer_streams[devid]);
+	cures = cudaStreamCreate(&out_transfer_streams[workerid]);
 	if (STARPU_UNLIKELY(cures))
 		STARPU_CUDA_REPORT_ERROR(cures);
 
@@ -285,8 +285,8 @@ static void deinit_context(int workerid)
 
 	cudaEventDestroy(task_events[workerid]);
 	cudaStreamDestroy(streams[workerid]);
-	cudaStreamDestroy(in_transfer_streams[devid]);
-	cudaStreamDestroy(out_transfer_streams[devid]);
+	cudaStreamDestroy(in_transfer_streams[workerid]);
+	cudaStreamDestroy(out_transfer_streams[workerid]);
 	for (i = 0; i < ncudagpus; i++)
 		cudaStreamDestroy(peer_transfer_streams[i][devid]);
 

+ 2 - 2
src/drivers/cuda/driver_cuda.h

@@ -48,8 +48,8 @@ void *_starpu_cuda_worker(void *);
 #  define _starpu_cuda_discover_devices(config) ((void) config)
 #endif
 #ifdef STARPU_USE_CUDA
-cudaStream_t starpu_cuda_get_in_transfer_stream(unsigned node);
-cudaStream_t starpu_cuda_get_out_transfer_stream(unsigned node);
+cudaStream_t starpu_cuda_get_local_in_transfer_stream(void);
+cudaStream_t starpu_cuda_get_local_out_transfer_stream(void);
 cudaStream_t starpu_cuda_get_peer_transfer_stream(unsigned src_node, unsigned dst_node);
 
 struct _starpu_worker;