/* StarPU --- Runtime system for heterogeneous multicore architectures. * * Copyright (C) 2009, 2010 Université de Bordeaux 1 * Copyright (C) 2010, 2011 Centre National de la Recherche Scientifique * * StarPU is free software; you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation; either version 2.1 of the License, or (at * your option) any later version. * * StarPU is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * * See the GNU Lesser General Public License in COPYING.LGPL for more details. */ #include #include #include #include #include #include uint32_t _starpu_select_node_to_handle_request(uint32_t src_node, uint32_t dst_node) { /* in case one of the node is a GPU, it needs to perform the transfer, * if both of them are GPU, it's a bit more complicated */ unsigned src_is_a_gpu = (_starpu_get_node_kind(src_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(src_node) == STARPU_OPENCL_RAM); unsigned dst_is_a_gpu = (_starpu_get_node_kind(dst_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(dst_node) == STARPU_OPENCL_RAM); /* we do not handle GPU->GPU transfers yet ! */ STARPU_ASSERT( !(src_is_a_gpu && dst_is_a_gpu) ); if (src_is_a_gpu) return src_node; if (dst_is_a_gpu) return dst_node; /* otherwise perform it locally, since we should be on a "sane" arch * where anyone can do the transfer. NB: in StarPU this should actually never * happen */ return _starpu_get_local_memory_node(); } uint32_t _starpu_select_src_node(starpu_data_handle handle) { unsigned src_node = 0; unsigned i; unsigned nnodes = _starpu_get_memory_nodes_count(); /* first find a valid copy, either a STARPU_OWNER or a STARPU_SHARED */ uint32_t node; uint32_t src_node_mask = 0; for (node = 0; node < nnodes; node++) { if (handle->per_node[node].state != STARPU_INVALID) { /* we found a copy ! */ src_node_mask |= (1<requested = 0; if (mode & STARPU_W) { /* the requesting node now has the only valid copy */ uint32_t node; for (node = 0; node < nnodes; node++) handle->per_node[node].state = STARPU_INVALID; requesting_replicate->state = STARPU_OWNER; } else { /* read only */ if (requesting_replicate->state != STARPU_OWNER) { /* there was at least another copy of the data */ uint32_t node; for (node = 0; node < nnodes; node++) { struct starpu_data_replicate_s *replicate = &handle->per_node[node]; if (replicate->state != STARPU_INVALID) replicate->state = STARPU_SHARED; } requesting_replicate->state = STARPU_SHARED; } } } /* * This function is called when the data is needed on the local node, this * returns a pointer to the local copy * * R STARPU_W STARPU_RW * Owner OK OK OK * Shared OK 1 1 * Invalid 2 3 4 * * case 1 : shared + (read)write : * no data copy but shared->Invalid/Owner * case 2 : invalid + read : * data copy + invalid->shared + owner->shared (STARPU_ASSERT(there is a valid)) * case 3 : invalid + write : * no data copy + invalid->owner + (owner,shared)->invalid * case 4 : invalid + R/STARPU_W : * data copy + if (STARPU_W) (invalid->owner + owner->invalid) * else (invalid,owner->shared) */ /* This function is called with handle's header lock taken */ static starpu_data_request_t create_new_request_to_fetch_data(starpu_data_handle handle, struct starpu_data_replicate_s *dst_replicate, starpu_access_mode mode, unsigned is_prefetch, void (*callback_func)(void *), void *callback_arg) { starpu_data_request_t r; unsigned requesting_node = dst_replicate->memory_node; /* find someone who already has the data */ uint32_t src_node = 0; /* if the data is in write only mode, there is no need for a source */ if (mode & STARPU_R) { src_node = _starpu_select_src_node(handle); STARPU_ASSERT(src_node != requesting_node); } unsigned src_is_a_gpu = (_starpu_get_node_kind(src_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(src_node) == STARPU_OPENCL_RAM); unsigned dst_is_a_gpu = (_starpu_get_node_kind(requesting_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(requesting_node) == STARPU_OPENCL_RAM); struct starpu_data_replicate_s *src_replicate = &handle->per_node[src_node]; /* we have to perform 2 successive requests for GPU->GPU transfers */ if ((mode & STARPU_R) && (src_is_a_gpu && dst_is_a_gpu)) { unsigned reuse_r_src_to_ram; starpu_data_request_t r_src_to_ram; starpu_data_request_t r_ram_to_dst; struct starpu_data_replicate_s *ram_replicate = &handle->per_node[0]; /* XXX we hardcore 0 as the RAM node ... */ /* We put a 1 in the number of dependencies because this * depends on the r_src_to_ram request. */ r_ram_to_dst = _starpu_create_data_request(handle, ram_replicate, dst_replicate, requesting_node, mode, 1, is_prefetch); if (!is_prefetch) r_ram_to_dst->refcnt++; r_src_to_ram = _starpu_search_existing_data_request(ram_replicate, mode); reuse_r_src_to_ram = r_src_to_ram?1:0; if (!r_src_to_ram) { r_src_to_ram = _starpu_create_data_request(handle, src_replicate, ram_replicate, src_node, mode, 0, is_prefetch); } /* we chain both requests */ r_src_to_ram->next_req[r_src_to_ram->next_req_count++]= r_ram_to_dst; _starpu_data_request_append_callback(r_ram_to_dst, callback_func, callback_arg); if (reuse_r_src_to_ram) _starpu_spin_unlock(&r_src_to_ram->lock); _starpu_spin_unlock(&handle->header_lock); /* we only submit the first request, the remaining will be automatically submitted afterward */ if (!reuse_r_src_to_ram) _starpu_post_data_request(r_src_to_ram, src_node); /* the application only waits for the termination of the last request */ r = r_ram_to_dst; } else { /* who will perform that request ? */ uint32_t handling_node = _starpu_select_node_to_handle_request(src_node, requesting_node); r = _starpu_create_data_request(handle, src_replicate, dst_replicate, handling_node, mode, 0, is_prefetch); _starpu_data_request_append_callback(r, callback_func, callback_arg); if (!is_prefetch) r->refcnt++; _starpu_spin_unlock(&handle->header_lock); _starpu_post_data_request(r, handling_node); } return r; } int _starpu_fetch_data_on_node(starpu_data_handle handle, struct starpu_data_replicate_s *dst_replicate, starpu_access_mode mode, unsigned is_prefetch, void (*callback_func)(void *), void *callback_arg) { uint32_t local_node = _starpu_get_local_memory_node(); _STARPU_LOG_IN(); unsigned requesting_node = dst_replicate->memory_node; while (_starpu_spin_trylock(&handle->header_lock)) _starpu_datawizard_progress(local_node, 1); if (!is_prefetch) dst_replicate->refcnt++; if (dst_replicate->state != STARPU_INVALID) { /* the data is already available so we can stop */ _starpu_update_data_state(handle, dst_replicate, mode); _starpu_msi_cache_hit(requesting_node); _starpu_spin_unlock(&handle->header_lock); if (callback_func) callback_func(callback_arg); _STARPU_LOG_OUT_TAG("data available"); return 0; } /* the only remaining situation is that the local copy was invalid */ STARPU_ASSERT(dst_replicate->state == STARPU_INVALID); _starpu_msi_cache_miss(requesting_node); starpu_data_request_t r; /* is there already a pending request ? */ r = _starpu_search_existing_data_request(dst_replicate, mode); /* at the exit of _starpu_search_existing_data_request the lock is taken if the request existed ! */ if (!r) { r = create_new_request_to_fetch_data(handle, dst_replicate, mode, is_prefetch, callback_func, callback_arg); } else { /* the lock was taken by _starpu_search_existing_data_request */ _starpu_data_request_append_callback(r, callback_func, callback_arg); /* there is already a similar request */ if (is_prefetch) { _starpu_spin_unlock(&r->lock); _starpu_spin_unlock(&handle->header_lock); _STARPU_LOG_OUT_TAG("similar request"); return 0; } r->refcnt++; //_starpu_spin_lock(&r->lock); if (r->is_a_prefetch_request) { /* transform that prefetch request into a "normal" request */ r->is_a_prefetch_request = 0; /* transform that request into the proper access mode (prefetch could be read only) */ r->mode |= mode; } _starpu_spin_unlock(&r->lock); _starpu_spin_unlock(&handle->header_lock); } int ret = is_prefetch?0:_starpu_wait_data_request_completion(r, 1); _STARPU_LOG_OUT(); return ret; } static int prefetch_data_on_node(starpu_data_handle handle, struct starpu_data_replicate_s *replicate, starpu_access_mode mode) { return _starpu_fetch_data_on_node(handle, replicate, mode, 1, NULL, NULL); } static int fetch_data(starpu_data_handle handle, struct starpu_data_replicate_s *replicate, starpu_access_mode mode) { return _starpu_fetch_data_on_node(handle, replicate, mode, 0, NULL, NULL); } inline uint32_t _starpu_get_data_refcnt(starpu_data_handle handle, uint32_t node) { return handle->per_node[node].refcnt; } size_t _starpu_data_get_size(starpu_data_handle handle) { return handle->data_size; } uint32_t _starpu_data_get_footprint(starpu_data_handle handle) { return handle->footprint; } /* in case the data was accessed on a write mode, do not forget to * make it accessible again once it is possible ! */ void _starpu_release_data_on_node(starpu_data_handle handle, uint32_t default_wt_mask, struct starpu_data_replicate_s *replicate) { uint32_t wt_mask; wt_mask = default_wt_mask | handle->wt_mask; /* Note that it is possible that there is no valid copy of the data (if * starpu_data_invalidate was called for instance). In that case, we do * not enforce any write-through mechanism. */ unsigned memory_node = replicate->memory_node; if (replicate->state != STARPU_INVALID) if ((wt_mask & ~(1<header_lock)) _starpu_datawizard_progress(local_node, 1); replicate->refcnt--; STARPU_ASSERT(replicate->refcnt >= 0); /* In case there was a temporary handle (eg. used for reduction), this * handle may have requested to be destroyed when the data is released * */ unsigned handle_was_destroyed = handle->lazy_unregister; _starpu_notify_data_dependencies(handle); if (!handle_was_destroyed) _starpu_spin_unlock(&handle->header_lock); } static void _starpu_set_data_requested_flag_if_needed(struct starpu_data_replicate_s *replicate) { // XXX : this is just a hint, so we don't take the lock ... // pthread_spin_lock(&handle->header_lock); if (replicate->state == STARPU_INVALID) replicate->requested = 1; // pthread_spin_unlock(&handle->header_lock); } int starpu_prefetch_task_input_on_node(struct starpu_task *task, uint32_t node) { starpu_buffer_descr *descrs = task->buffers; unsigned nbuffers = task->cl->nbuffers; unsigned index; for (index = 0; index < nbuffers; index++) { starpu_data_handle handle = descrs[index].handle; starpu_access_mode mode = descrs[index].mode; if (mode & (STARPU_SCRATCH|STARPU_REDUX)) continue; struct starpu_data_replicate_s *replicate = &handle->per_node[node]; prefetch_data_on_node(handle, replicate, mode); _starpu_set_data_requested_flag_if_needed(replicate); } return 0; } int _starpu_fetch_task_input(struct starpu_task *task, uint32_t mask) { STARPU_TRACE_START_FETCH_INPUT(NULL); int profiling = starpu_profiling_status_get(); if (profiling && task->profiling_info) starpu_clock_gettime(&task->profiling_info->acquire_data_start_time); starpu_buffer_descr *descrs = task->buffers; unsigned nbuffers = task->cl->nbuffers; unsigned local_memory_node = _starpu_get_local_memory_node(); int workerid = starpu_worker_get_id(); unsigned index; for (index = 0; index < nbuffers; index++) { int ret; starpu_data_handle handle = descrs[index].handle; starpu_access_mode mode = descrs[index].mode; struct starpu_data_replicate_s *local_replicate; if (mode & (STARPU_SCRATCH|STARPU_REDUX)) { local_replicate = &handle->per_worker[workerid]; } else { /* That's a "normal" buffer (R/W) */ local_replicate = &handle->per_node[local_memory_node]; } ret = fetch_data(handle, local_replicate, mode); if (STARPU_UNLIKELY(ret)) goto enomem; task->interface[index] = local_replicate->data_interface; if (mode & STARPU_REDUX) { /* If the replicate was not initialized yet, we have to do it now */ if (!local_replicate->initialized) _starpu_redux_init_data_replicate(handle, local_replicate, workerid); } } if (profiling && task->profiling_info) starpu_clock_gettime(&task->profiling_info->acquire_data_end_time); STARPU_TRACE_END_FETCH_INPUT(NULL); return 0; enomem: /* try to unreference all the input that were successfully taken */ /* XXX broken ... */ _STARPU_DISP("something went wrong with buffer %u\n", index); //push_codelet_output(task, index, mask); _starpu_push_task_output(task, mask); return -1; } void _starpu_push_task_output(struct starpu_task *task, uint32_t mask) { STARPU_TRACE_START_PUSH_OUTPUT(NULL); int profiling = starpu_profiling_status_get(); if (profiling && task->profiling_info) starpu_clock_gettime(&task->profiling_info->release_data_start_time); starpu_buffer_descr *descrs = task->buffers; unsigned nbuffers = task->cl->nbuffers; unsigned index; for (index = 0; index < nbuffers; index++) { starpu_data_handle handle = descrs[index].handle; starpu_access_mode mode = descrs[index].mode; struct starpu_data_replicate_s *replicate; if (mode & STARPU_RW) { unsigned local_node = _starpu_get_local_memory_node(); replicate = &handle->per_node[local_node]; } else { int workerid = starpu_worker_get_id(); replicate = &handle->per_worker[workerid]; } /* In case there was a temporary handle (eg. used for * reduction), this handle may have requested to be destroyed * when the data is released * */ unsigned handle_was_destroyed = handle->lazy_unregister; _starpu_release_data_on_node(handle, mask, replicate); if (!handle_was_destroyed) _starpu_release_data_enforce_sequential_consistency(task, handle); } if (profiling && task->profiling_info) starpu_clock_gettime(&task->profiling_info->release_data_end_time); STARPU_TRACE_END_PUSH_OUTPUT(NULL); } /* NB : this value can only be an indication of the status of a data at some point, but there is no strong garantee ! */ unsigned _starpu_is_data_present_or_requested(starpu_data_handle handle, uint32_t node) { unsigned ret = 0; // XXX : this is just a hint, so we don't take the lock ... // pthread_spin_lock(&handle->header_lock); if (handle->per_node[node].state != STARPU_INVALID || handle->per_node[node].requested || handle->per_node[node].request) ret = 1; // pthread_spin_unlock(&handle->header_lock); return ret; }