|
@@ -22,29 +22,6 @@
|
|
|
#include <core/dependencies/data_concurrency.h>
|
|
|
#include <profiling/profiling.h>
|
|
|
|
|
|
-uint32_t _starpu_select_node_to_handle_request(uint32_t src_node, uint32_t dst_node)
|
|
|
-{
|
|
|
- /* in case one of the node is a GPU, it needs to perform the transfer,
|
|
|
- * if both of them are GPU, it's a bit more complicated */
|
|
|
-
|
|
|
- unsigned src_is_a_gpu = (_starpu_get_node_kind(src_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(src_node) == STARPU_OPENCL_RAM);
|
|
|
- unsigned dst_is_a_gpu = (_starpu_get_node_kind(dst_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(dst_node) == STARPU_OPENCL_RAM);
|
|
|
-
|
|
|
- /* we do not handle GPU->GPU transfers yet ! */
|
|
|
- STARPU_ASSERT( !(src_is_a_gpu && dst_is_a_gpu) );
|
|
|
-
|
|
|
- if (src_is_a_gpu)
|
|
|
- return src_node;
|
|
|
-
|
|
|
- if (dst_is_a_gpu)
|
|
|
- return dst_node;
|
|
|
-
|
|
|
- /* otherwise perform it locally, since we should be on a "sane" arch
|
|
|
- * where anyone can do the transfer. NB: in StarPU this should actually never
|
|
|
- * happen */
|
|
|
- return _starpu_get_local_memory_node();
|
|
|
-}
|
|
|
-
|
|
|
uint32_t _starpu_select_src_node(starpu_data_handle handle)
|
|
|
{
|
|
|
unsigned src_node = 0;
|
|
@@ -103,7 +80,8 @@ void _starpu_update_data_state(starpu_data_handle handle,
|
|
|
unsigned nnodes = _starpu_get_memory_nodes_count();
|
|
|
|
|
|
/* the data is present now */
|
|
|
- requesting_replicate->requested = 0;
|
|
|
+ unsigned requesting_node = requesting_replicate->memory_node;
|
|
|
+ requesting_replicate->requested[requesting_node] = 0;
|
|
|
|
|
|
if (mode & STARPU_W) {
|
|
|
/* the requesting node now has the only valid copy */
|
|
@@ -129,6 +107,88 @@ void _starpu_update_data_state(starpu_data_handle handle,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+/* Determines the path of a request : each hop is defined by (src,dst) and the
|
|
|
+ * node that handles the hop. The returned value indicates the number of hops,
|
|
|
+ * and the max_len is the maximum number of hops (ie. the size of the
|
|
|
+ * src_nodes, dst_nodes and handling_nodes arrays. */
|
|
|
+static int determine_request_path(unsigned src_node, unsigned dst_node,
|
|
|
+ starpu_access_mode mode, int max_len,
|
|
|
+ unsigned *src_nodes, unsigned *dst_nodes,
|
|
|
+ unsigned *handling_nodes)
|
|
|
+{
|
|
|
+ unsigned src_is_a_gpu = (_starpu_get_node_kind(src_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(src_node) == STARPU_OPENCL_RAM);
|
|
|
+ unsigned dst_is_a_gpu = (_starpu_get_node_kind(dst_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(dst_node) == STARPU_OPENCL_RAM);
|
|
|
+
|
|
|
+ if ((mode & STARPU_R) && (src_is_a_gpu && dst_is_a_gpu)) {
|
|
|
+ /* We need an intermediate hop to implement data staging
|
|
|
+ * through main memory. */
|
|
|
+ STARPU_ASSERT(max_len >= 2);
|
|
|
+
|
|
|
+ /* XXX we hardcode 0 as the RAM node ... */
|
|
|
+
|
|
|
+ /* GPU -> RAM */
|
|
|
+ src_nodes[0] = src_node;
|
|
|
+ dst_nodes[0] = 0;
|
|
|
+ handling_nodes[0] = src_node;
|
|
|
+
|
|
|
+ /* RAM -> GPU */
|
|
|
+ src_nodes[1] = 0;
|
|
|
+ dst_nodes[1] = dst_node;
|
|
|
+ handling_nodes[1] = dst_node;
|
|
|
+
|
|
|
+ return 2;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ STARPU_ASSERT(max_len >= 1);
|
|
|
+
|
|
|
+ unsigned handling_node;
|
|
|
+
|
|
|
+ /* The handling node is the GPU (if applicable), otherwise it's
|
|
|
+ * the destination node. If both src and dst are GPUs, we
|
|
|
+ * ensured that !(mode & STARPU_R), so we only need to allocate
|
|
|
+ * the data from the destination */
|
|
|
+ handling_node = (!src_is_a_gpu && dst_is_a_gpu)?dst_node:src_node;
|
|
|
+ src_nodes[0] = src_node;
|
|
|
+ dst_nodes[0] = dst_node;
|
|
|
+ handling_nodes[0] = handling_node;
|
|
|
+
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/* handle->lock should be taken. r is returned locked. The node parameter
|
|
|
+ * indicate either the source of the request, or the destination for a
|
|
|
+ * write-only request. */
|
|
|
+static starpu_data_request_t _starpu_search_existing_data_request(struct starpu_data_replicate_s *replicate, unsigned node, starpu_access_mode mode)
|
|
|
+{
|
|
|
+ starpu_data_request_t r;
|
|
|
+
|
|
|
+ r = replicate->request[node];
|
|
|
+
|
|
|
+ if (r)
|
|
|
+ {
|
|
|
+ _starpu_spin_lock(&r->lock);
|
|
|
+
|
|
|
+ /* perhaps we need to "upgrade" the request */
|
|
|
+ if (mode & STARPU_R)
|
|
|
+ {
|
|
|
+ /* in case the exisiting request did not imply a memory
|
|
|
+ * transfer yet, we have to increment the refcnt now
|
|
|
+ * (so that the source remains valid) */
|
|
|
+ if (!(r->mode & STARPU_R))
|
|
|
+ replicate->refcnt++;
|
|
|
+
|
|
|
+ r->mode |= STARPU_R;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (mode & STARPU_W)
|
|
|
+ r->mode |= STARPU_W;
|
|
|
+ }
|
|
|
+
|
|
|
+ return r;
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
|
|
|
/*
|
|
|
* This function is called when the data is needed on the local node, this
|
|
@@ -151,12 +211,11 @@ void _starpu_update_data_state(starpu_data_handle handle,
|
|
|
*/
|
|
|
|
|
|
/* This function is called with handle's header lock taken */
|
|
|
-static starpu_data_request_t create_new_request_to_fetch_data(starpu_data_handle handle,
|
|
|
+starpu_data_request_t create_request_to_fetch_data(starpu_data_handle handle,
|
|
|
struct starpu_data_replicate_s *dst_replicate,
|
|
|
starpu_access_mode mode, unsigned is_prefetch,
|
|
|
void (*callback_func)(void *), void *callback_arg)
|
|
|
{
|
|
|
- starpu_data_request_t r;
|
|
|
unsigned requesting_node = dst_replicate->memory_node;
|
|
|
|
|
|
/* find someone who already has the data */
|
|
@@ -169,74 +228,84 @@ static starpu_data_request_t create_new_request_to_fetch_data(starpu_data_handle
|
|
|
STARPU_ASSERT(src_node != requesting_node);
|
|
|
}
|
|
|
|
|
|
- unsigned src_is_a_gpu = (_starpu_get_node_kind(src_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(src_node) == STARPU_OPENCL_RAM);
|
|
|
- unsigned dst_is_a_gpu = (_starpu_get_node_kind(requesting_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(requesting_node) == STARPU_OPENCL_RAM);
|
|
|
+ /* We can safely assume that there won't be more than 2 hops in the
|
|
|
+ * current implementation */
|
|
|
+ unsigned src_nodes[4], dst_nodes[4], handling_nodes[4];
|
|
|
+ int nhops = determine_request_path(src_node, requesting_node, mode, 4,
|
|
|
+ src_nodes, dst_nodes, handling_nodes);
|
|
|
+ STARPU_ASSERT(nhops <= 4);
|
|
|
|
|
|
- struct starpu_data_replicate_s *src_replicate = &handle->per_node[src_node];
|
|
|
+ starpu_data_request_t requests[nhops];
|
|
|
|
|
|
- /* we have to perform 2 successive requests for GPU->GPU transfers */
|
|
|
- if ((mode & STARPU_R) && (src_is_a_gpu && dst_is_a_gpu)) {
|
|
|
- unsigned reuse_r_src_to_ram;
|
|
|
- starpu_data_request_t r_src_to_ram;
|
|
|
- starpu_data_request_t r_ram_to_dst;
|
|
|
+ /* Did we reuse a request for that hop ? */
|
|
|
+ int reused_requests[nhops];
|
|
|
|
|
|
- struct starpu_data_replicate_s *ram_replicate = &handle->per_node[0];
|
|
|
+ /* Construct an array with a list of requests, possibly reusing existing requests */
|
|
|
+ int hop;
|
|
|
+ for (hop = 0; hop < nhops; hop++)
|
|
|
+ {
|
|
|
+ starpu_data_request_t r;
|
|
|
|
|
|
- /* XXX we hardcore 0 as the RAM node ... */
|
|
|
- /* We put a 1 in the number of dependencies because this
|
|
|
- * depends on the r_src_to_ram request. */
|
|
|
- r_ram_to_dst = _starpu_create_data_request(handle, ram_replicate,
|
|
|
- dst_replicate, requesting_node, mode, 1, is_prefetch);
|
|
|
+ unsigned hop_src_node = src_nodes[hop];
|
|
|
+ unsigned hop_dst_node = dst_nodes[hop];
|
|
|
+ unsigned hop_handling_node = handling_nodes[hop];
|
|
|
|
|
|
- if (!is_prefetch)
|
|
|
- r_ram_to_dst->refcnt++;
|
|
|
+ struct starpu_data_replicate_s *hop_src_replicate;
|
|
|
+ struct starpu_data_replicate_s *hop_dst_replicate;
|
|
|
|
|
|
- r_src_to_ram = _starpu_search_existing_data_request(ram_replicate, mode);
|
|
|
+ /* Only the first request is independant */
|
|
|
+ unsigned ndeps = (hop == 0)?0:1;
|
|
|
|
|
|
- reuse_r_src_to_ram = r_src_to_ram?1:0;
|
|
|
+ hop_src_replicate = &handle->per_node[hop_src_node];
|
|
|
+ hop_dst_replicate = (hop != nhops - 1)?&handle->per_node[hop_dst_node]:dst_replicate;
|
|
|
|
|
|
- if (!r_src_to_ram)
|
|
|
- {
|
|
|
- r_src_to_ram = _starpu_create_data_request(handle, src_replicate,
|
|
|
- ram_replicate, src_node, mode, 0, is_prefetch);
|
|
|
- }
|
|
|
+ /* Try to reuse a request if possible */
|
|
|
+ r = _starpu_search_existing_data_request(hop_dst_replicate,
|
|
|
+ (mode & STARPU_R)?hop_src_node:hop_dst_node, mode);
|
|
|
|
|
|
- /* we chain both requests */
|
|
|
- r_src_to_ram->next_req[r_src_to_ram->next_req_count++]= r_ram_to_dst;
|
|
|
+ reused_requests[hop] = !!r;
|
|
|
|
|
|
- _starpu_data_request_append_callback(r_ram_to_dst, callback_func, callback_arg);
|
|
|
+ if (!r) {
|
|
|
+ /* Create a new request if there was no request to reuse */
|
|
|
+ r = _starpu_create_data_request(handle, hop_src_replicate,
|
|
|
+ hop_dst_replicate, hop_handling_node,
|
|
|
+ mode, ndeps);
|
|
|
+ }
|
|
|
|
|
|
- if (reuse_r_src_to_ram)
|
|
|
- _starpu_spin_unlock(&r_src_to_ram->lock);
|
|
|
+ requests[hop] = r;
|
|
|
+ }
|
|
|
|
|
|
- _starpu_spin_unlock(&handle->header_lock);
|
|
|
+ /* Chain these requests */
|
|
|
+ for (hop = 0; hop < nhops; hop++)
|
|
|
+ {
|
|
|
+ starpu_data_request_t r;
|
|
|
+ r = requests[hop];
|
|
|
|
|
|
- /* we only submit the first request, the remaining will be automatically submitted afterward */
|
|
|
- if (!reuse_r_src_to_ram)
|
|
|
- _starpu_post_data_request(r_src_to_ram, src_node);
|
|
|
+ if (hop != nhops - 1)
|
|
|
+ {
|
|
|
+ if (!reused_requests[hop + 1])
|
|
|
+ r->next_req[r->next_req_count++] = requests[hop + 1];
|
|
|
+ }
|
|
|
+ else
|
|
|
+ _starpu_data_request_append_callback(r, callback_func, callback_arg);
|
|
|
|
|
|
- /* the application only waits for the termination of the last request */
|
|
|
- r = r_ram_to_dst;
|
|
|
- }
|
|
|
- else {
|
|
|
- /* who will perform that request ? */
|
|
|
- uint32_t handling_node =
|
|
|
- _starpu_select_node_to_handle_request(src_node, requesting_node);
|
|
|
|
|
|
- r = _starpu_create_data_request(handle, src_replicate,
|
|
|
- dst_replicate, handling_node, mode, 0, is_prefetch);
|
|
|
+ if (reused_requests[hop])
|
|
|
+ _starpu_spin_unlock(&r->lock);
|
|
|
+ }
|
|
|
|
|
|
- _starpu_data_request_append_callback(r, callback_func, callback_arg);
|
|
|
+ if (!is_prefetch)
|
|
|
+ requests[nhops - 1]->refcnt++;
|
|
|
|
|
|
- if (!is_prefetch)
|
|
|
- r->refcnt++;
|
|
|
|
|
|
- _starpu_spin_unlock(&handle->header_lock);
|
|
|
+ _starpu_spin_unlock(&handle->header_lock);
|
|
|
|
|
|
- _starpu_post_data_request(r, handling_node);
|
|
|
- }
|
|
|
+ /* we only submit the first request, the remaining will be
|
|
|
+ * automatically submitted afterward */
|
|
|
+ if (!reused_requests[0])
|
|
|
+ _starpu_post_data_request(requests[0], handling_nodes[0]);
|
|
|
|
|
|
- return r;
|
|
|
+ return requests[nhops - 1];
|
|
|
}
|
|
|
|
|
|
int _starpu_fetch_data_on_node(starpu_data_handle handle, struct starpu_data_replicate_s *dst_replicate,
|
|
@@ -275,42 +344,8 @@ int _starpu_fetch_data_on_node(starpu_data_handle handle, struct starpu_data_rep
|
|
|
|
|
|
starpu_data_request_t r;
|
|
|
|
|
|
- /* is there already a pending request ? */
|
|
|
- r = _starpu_search_existing_data_request(dst_replicate, mode);
|
|
|
- /* at the exit of _starpu_search_existing_data_request the lock is taken if the request existed ! */
|
|
|
-
|
|
|
- if (!r) {
|
|
|
- r = create_new_request_to_fetch_data(handle, dst_replicate, mode, is_prefetch, callback_func, callback_arg);
|
|
|
- }
|
|
|
- else {
|
|
|
- /* the lock was taken by _starpu_search_existing_data_request */
|
|
|
- _starpu_data_request_append_callback(r, callback_func, callback_arg);
|
|
|
-
|
|
|
- /* there is already a similar request */
|
|
|
- if (is_prefetch)
|
|
|
- {
|
|
|
- _starpu_spin_unlock(&r->lock);
|
|
|
- _starpu_spin_unlock(&handle->header_lock);
|
|
|
-
|
|
|
- _STARPU_LOG_OUT_TAG("similar request");
|
|
|
- return 0;
|
|
|
- }
|
|
|
-
|
|
|
- r->refcnt++;
|
|
|
-
|
|
|
- //_starpu_spin_lock(&r->lock);
|
|
|
- if (r->is_a_prefetch_request)
|
|
|
- {
|
|
|
- /* transform that prefetch request into a "normal" request */
|
|
|
- r->is_a_prefetch_request = 0;
|
|
|
-
|
|
|
- /* transform that request into the proper access mode (prefetch could be read only) */
|
|
|
- r->mode |= mode;
|
|
|
- }
|
|
|
-
|
|
|
- _starpu_spin_unlock(&r->lock);
|
|
|
- _starpu_spin_unlock(&handle->header_lock);
|
|
|
- }
|
|
|
+ r = create_request_to_fetch_data(handle, dst_replicate, mode,
|
|
|
+ is_prefetch, callback_func, callback_arg);
|
|
|
|
|
|
int ret = is_prefetch?0:_starpu_wait_data_request_completion(r, 1);
|
|
|
_STARPU_LOG_OUT();
|
|
@@ -384,7 +419,10 @@ static void _starpu_set_data_requested_flag_if_needed(struct starpu_data_replica
|
|
|
// pthread_spin_lock(&handle->header_lock);
|
|
|
|
|
|
if (replicate->state == STARPU_INVALID)
|
|
|
- replicate->requested = 1;
|
|
|
+ {
|
|
|
+ unsigned dst_node = replicate->memory_node;
|
|
|
+ replicate->requested[dst_node] = 1;
|
|
|
+ }
|
|
|
|
|
|
// pthread_spin_unlock(&handle->header_lock);
|
|
|
}
|
|
@@ -531,9 +569,21 @@ unsigned _starpu_is_data_present_or_requested(starpu_data_handle handle, uint32_
|
|
|
// XXX : this is just a hint, so we don't take the lock ...
|
|
|
// pthread_spin_lock(&handle->header_lock);
|
|
|
|
|
|
- if (handle->per_node[node].state != STARPU_INVALID
|
|
|
- || handle->per_node[node].requested || handle->per_node[node].request)
|
|
|
- ret = 1;
|
|
|
+ if (handle->per_node[node].state != STARPU_INVALID)
|
|
|
+ {
|
|
|
+ ret = 1;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ unsigned i;
|
|
|
+ unsigned nnodes = _starpu_get_memory_nodes_count();
|
|
|
+
|
|
|
+ for (i = 0; i < nnodes; i++)
|
|
|
+ {
|
|
|
+ if (handle->per_node[node].requested[i] || handle->per_node[node].request[i])
|
|
|
+ ret = 1;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
|
|
|
// pthread_spin_unlock(&handle->header_lock);
|
|
|
|