123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483 |
- /* StarPU --- Runtime system for heterogeneous multicore architectures.
- *
- * Copyright (C) 2008-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
- *
- * StarPU is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * StarPU is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
- #include <starpu.h>
- #include <common/config.h>
- #include <common/utils.h>
- #include <core/sched_policy.h>
- #include <datawizard/datastats.h>
- #include <datawizard/memory_nodes.h>
- #include <drivers/disk/driver_disk.h>
- #include <drivers/mpi/driver_mpi_sink.h>
- #include <drivers/mpi/driver_mpi_source.h>
- #include <drivers/mpi/driver_mpi_common.h>
- #include <drivers/mic/driver_mic_source.h>
- #include <common/fxt.h>
- #include <datawizard/copy_driver.h>
- #include <datawizard/memalloc.h>
- #include <starpu_opencl.h>
- #include <starpu_cuda.h>
- #include <profiling/profiling.h>
- #include <core/disk.h>
- #ifdef STARPU_SIMGRID
- #include <core/simgrid.h>
- #endif
- void _starpu_wake_all_blocked_workers_on_node(unsigned nodeid)
- {
- /* wake up all workers on that memory node */
- struct _starpu_memory_node_descr * const descr = _starpu_memory_node_get_description();
- const int cur_workerid = starpu_worker_get_id();
- struct _starpu_worker *cur_worker = cur_workerid>=0?_starpu_get_worker_struct(cur_workerid):NULL;
- STARPU_PTHREAD_RWLOCK_RDLOCK(&descr->conditions_rwlock);
- unsigned nconds = descr->condition_count[nodeid];
- unsigned cond_id;
- for (cond_id = 0; cond_id < nconds; cond_id++)
- {
- struct _starpu_cond_and_worker *condition;
- condition = &descr->conditions_attached_to_node[nodeid][cond_id];
- if (condition->worker == cur_worker)
- {
- if (condition->cond == &condition->worker->sched_cond)
- {
- condition->worker->state_keep_awake = 1;
- }
- /* No need to wake myself, and I might be called from
- * the scheduler with mutex locked, through
- * starpu_prefetch_task_input_on_node */
- continue;
- }
- /* wake anybody waiting on that condition */
- STARPU_PTHREAD_MUTEX_LOCK_SCHED(&condition->worker->sched_mutex);
- if (condition->cond == &condition->worker->sched_cond)
- {
- condition->worker->state_keep_awake = 1;
- }
- STARPU_PTHREAD_COND_BROADCAST(condition->cond);
- STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&condition->worker->sched_mutex);
- }
- STARPU_PTHREAD_RWLOCK_UNLOCK(&descr->conditions_rwlock);
- #ifdef STARPU_SIMGRID
- starpu_pthread_queue_broadcast(&_starpu_simgrid_transfer_queue[nodeid]);
- #endif
- }
- void starpu_wake_all_blocked_workers(void)
- {
- /* workers may be blocked on the various queues' conditions */
- struct _starpu_memory_node_descr * const descr = _starpu_memory_node_get_description();
- const int cur_workerid = starpu_worker_get_id();
- struct _starpu_worker *cur_worker = cur_workerid>=0?_starpu_get_worker_struct(cur_workerid):NULL;
- STARPU_PTHREAD_RWLOCK_RDLOCK(&descr->conditions_rwlock);
- unsigned nconds = descr->total_condition_count;
- unsigned cond_id;
- for (cond_id = 0; cond_id < nconds; cond_id++)
- {
- struct _starpu_cond_and_worker *condition;
- condition = &descr->conditions_all[cond_id];
- if (condition->worker == cur_worker)
- {
- if (condition->cond == &condition->worker->sched_cond)
- {
- condition->worker->state_keep_awake = 1;
- }
- /* No need to wake myself, and I might be called from
- * the scheduler with mutex locked, through
- * starpu_prefetch_task_input_on_node */
- continue;
- }
- /* wake anybody waiting on that condition */
- STARPU_PTHREAD_MUTEX_LOCK_SCHED(&condition->worker->sched_mutex);
- if (condition->cond == &condition->worker->sched_cond)
- {
- condition->worker->state_keep_awake = 1;
- }
- STARPU_PTHREAD_COND_BROADCAST(condition->cond);
- STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&condition->worker->sched_mutex);
- }
- STARPU_PTHREAD_RWLOCK_UNLOCK(&descr->conditions_rwlock);
- #ifdef STARPU_SIMGRID
- unsigned workerid, nodeid;
- for (workerid = 0; workerid < starpu_worker_get_count(); workerid++)
- starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[workerid]);
- for (nodeid = 0; nodeid < starpu_memory_nodes_get_count(); nodeid++)
- starpu_pthread_queue_broadcast(&_starpu_simgrid_transfer_queue[nodeid]);
- #endif
- }
- #ifdef STARPU_USE_FXT
- /* we need to identify each communication so that we can match the beginning
- * and the end of a communication in the trace, so we use a unique identifier
- * per communication */
- static unsigned long communication_cnt = 0;
- #endif
- static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
- struct _starpu_data_replicate *src_replicate,
- struct _starpu_data_replicate *dst_replicate,
- struct _starpu_data_request *req)
- {
- unsigned src_node = (unsigned)src_replicate->memory_node;
- unsigned dst_node = (unsigned)dst_replicate->memory_node;
- STARPU_ASSERT(src_replicate->refcnt);
- STARPU_ASSERT(dst_replicate->refcnt);
- STARPU_ASSERT(src_replicate->allocated);
- STARPU_ASSERT(dst_replicate->allocated);
- #ifdef STARPU_SIMGRID
- if (src_node == STARPU_MAIN_RAM || dst_node == STARPU_MAIN_RAM)
- _starpu_simgrid_data_transfer(handle->ops->get_size(handle), src_node, dst_node);
- return _starpu_simgrid_transfer(handle->ops->get_size(handle), src_node, dst_node, req);
- #else /* !SIMGRID */
- enum starpu_node_kind dst_kind = starpu_node_get_kind(dst_node);
- void *src_interface = src_replicate->data_interface;
- void *dst_interface = dst_replicate->data_interface;
- #if defined(STARPU_USE_CUDA) && defined(STARPU_HAVE_CUDA_MEMCPY_PEER) && !defined(STARPU_SIMGRID)
- enum starpu_node_kind src_kind = starpu_node_get_kind(src_node);
- if ((src_kind == STARPU_CUDA_RAM) || (dst_kind == STARPU_CUDA_RAM))
- {
- unsigned devid;
- if ((src_kind == STARPU_CUDA_RAM) && (dst_kind == STARPU_CUDA_RAM))
- {
- /* GPU-GPU transfer, issue it from the destination */
- devid = starpu_memory_node_get_devid(dst_node);
- }
- else
- {
- unsigned node = (dst_kind == STARPU_CUDA_RAM)?dst_node:src_node;
- devid = starpu_memory_node_get_devid(node);
- }
- starpu_cuda_set_device(devid);
- }
- #endif
- struct _starpu_node_ops *node_ops = _starpu_memory_node_get_node_ops(src_node);
- if (node_ops && node_ops->copy_interface_to[dst_kind])
- {
- return node_ops->copy_interface_to[dst_kind](handle, src_interface, src_node, dst_interface, dst_node, req);
- }
- else
- {
- STARPU_ABORT_MSG("No copy_interface_to function defined from node %s to node %s\n", _starpu_node_get_prefix(starpu_node_get_kind(src_node)), _starpu_node_get_prefix(starpu_node_get_kind(dst_node)));
- }
- #endif /* !SIMGRID */
- }
- int STARPU_ATTRIBUTE_WARN_UNUSED_RESULT _starpu_driver_copy_data_1_to_1(starpu_data_handle_t handle,
- struct _starpu_data_replicate *src_replicate,
- struct _starpu_data_replicate *dst_replicate,
- unsigned donotread,
- struct _starpu_data_request *req,
- unsigned may_alloc,
- enum _starpu_is_prefetch prefetch STARPU_ATTRIBUTE_UNUSED)
- {
- if (!donotread)
- {
- STARPU_ASSERT(src_replicate->allocated);
- STARPU_ASSERT(src_replicate->refcnt);
- }
- unsigned src_node = src_replicate->memory_node;
- unsigned dst_node = dst_replicate->memory_node;
- /* first make sure the destination has an allocated buffer */
- if (!dst_replicate->allocated)
- {
- if (!may_alloc || _starpu_is_reclaiming(dst_node))
- /* We're not supposed to allocate there at the moment */
- return -ENOMEM;
- int ret_alloc = _starpu_allocate_memory_on_node(handle, dst_replicate, req ? req->prefetch : STARPU_FETCH);
- if (ret_alloc)
- return -ENOMEM;
- }
- STARPU_ASSERT(dst_replicate->allocated);
- STARPU_ASSERT(dst_replicate->refcnt);
- /* if there is no need to actually read the data,
- * we do not perform any transfer */
- if (!donotread)
- {
- unsigned long STARPU_ATTRIBUTE_UNUSED com_id = 0;
- size_t size = _starpu_data_get_size(handle);
- _starpu_bus_update_profiling_info((int)src_node, (int)dst_node, size);
- #ifdef STARPU_USE_FXT
- com_id = STARPU_ATOMIC_ADDL(&communication_cnt, 1);
- if (req)
- req->com_id = com_id;
- #endif
- dst_replicate->initialized = 1;
- _STARPU_TRACE_START_DRIVER_COPY(src_node, dst_node, size, com_id, prefetch, handle);
- int ret_copy = copy_data_1_to_1_generic(handle, src_replicate, dst_replicate, req);
- if (!req)
- /* Synchronous, this is already finished */
- _STARPU_TRACE_END_DRIVER_COPY(src_node, dst_node, size, com_id, prefetch);
- return ret_copy;
- }
- return 0;
- }
- void starpu_interface_data_copy(unsigned src_node, unsigned dst_node, size_t size)
- {
- _STARPU_TRACE_DATA_COPY(src_node, dst_node, size);
- }
- void starpu_interface_start_driver_copy_async(unsigned src_node, unsigned dst_node, double *start)
- {
- *start = starpu_timing_now();
- _STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
- }
- void starpu_interface_end_driver_copy_async(unsigned src_node, unsigned dst_node, double start)
- {
- double end = starpu_timing_now();
- double elapsed = end - start;
- if (elapsed > 300)
- {
- static int warned = 0;
- if (!warned)
- {
- char src_name[16], dst_name[16];
- warned = 1;
- starpu_memory_node_get_name(src_node, src_name, sizeof(src_name));
- starpu_memory_node_get_name(dst_node, dst_name, sizeof(dst_name));
- _STARPU_DISP("Warning: the submission of asynchronous transfer from %s to %s took a very long time (%f ms)\nFor proper asynchronous transfer overlapping, data registered to StarPU must be allocated with starpu_malloc() or pinned with starpu_memory_pin()\n", src_name, dst_name, elapsed / 1000.);
- }
- }
- _STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
- }
- /* This can be used by interfaces to easily transfer a piece of data without
- * caring about the particular transfer methods. */
- /* This should either return 0 if the transfer is complete, or -EAGAIN if the
- * transfer is still pending, and will have to be waited for by
- * _starpu_driver_test_request_completion/_starpu_driver_wait_request_completion
- */
- int starpu_interface_copy(uintptr_t src, size_t src_offset, unsigned src_node, uintptr_t dst, size_t dst_offset, unsigned dst_node, size_t size, void *async_data)
- {
- struct _starpu_async_channel *async_channel = async_data;
- enum starpu_node_kind dst_kind = starpu_node_get_kind(dst_node);
- struct _starpu_node_ops *node_ops = _starpu_memory_node_get_node_ops(src_node);
- if (node_ops && node_ops->copy_data_to[dst_kind])
- {
- return node_ops->copy_data_to[dst_kind](src, src_offset, src_node,
- dst, dst_offset, dst_node,
- size,
- async_channel);
- }
- else
- {
- STARPU_ABORT_MSG("No copy_data_to function defined from node %s to node %s\n", _starpu_node_get_prefix(starpu_node_get_kind(src_node)), _starpu_node_get_prefix(starpu_node_get_kind(dst_node)));
- return -1;
- }
- }
- int starpu_interface_copy2d(uintptr_t src, size_t src_offset, unsigned src_node,
- uintptr_t dst, size_t dst_offset, unsigned dst_node,
- size_t blocksize,
- size_t numblocks, size_t ld_src, size_t ld_dst,
- void *async_data)
- {
- int ret = 0;
- unsigned i;
- struct _starpu_async_channel *async_channel = async_data;
- enum starpu_node_kind dst_kind = starpu_node_get_kind(dst_node);
- struct _starpu_node_ops *node_ops = _starpu_memory_node_get_node_ops(src_node);
- STARPU_ASSERT_MSG(ld_src >= blocksize, "block size %lu is bigger than ld %lu in source", (unsigned long) blocksize, (unsigned long) ld_src);
- STARPU_ASSERT_MSG(ld_dst >= blocksize, "block size %lu is bigger than ld %lu in destination", (unsigned long) blocksize, (unsigned long) ld_dst);
- if (ld_src == blocksize && ld_dst == blocksize)
- /* Optimize contiguous case */
- return starpu_interface_copy(src, src_offset, src_node,
- dst, dst_offset, dst_node,
- blocksize * numblocks, async_data);
- if (node_ops && node_ops->copy2d_data_to[dst_kind])
- /* Hardware-optimized non-contiguous case */
- return node_ops->copy2d_data_to[dst_kind](src, src_offset, src_node,
- dst, dst_offset, dst_node,
- blocksize,
- numblocks, ld_src, ld_dst,
- async_channel);
- for (i = 0; i < numblocks; i++)
- {
- if (starpu_interface_copy(src, src_offset + i*ld_src, src_node,
- dst, dst_offset + i*ld_dst, dst_node,
- blocksize, async_data))
- ret = -EAGAIN;
- }
- return ret;
- }
- int starpu_interface_copy3d(uintptr_t src, size_t src_offset, unsigned src_node,
- uintptr_t dst, size_t dst_offset, unsigned dst_node,
- size_t blocksize,
- size_t numblocks_1, size_t ld1_src, size_t ld1_dst,
- size_t numblocks_2, size_t ld2_src, size_t ld2_dst,
- void *async_data)
- {
- int ret = 0;
- unsigned i;
- struct _starpu_async_channel *async_channel = async_data;
- enum starpu_node_kind dst_kind = starpu_node_get_kind(dst_node);
- struct _starpu_node_ops *node_ops = _starpu_memory_node_get_node_ops(src_node);
- STARPU_ASSERT_MSG(ld1_src >= blocksize, "block size %lu is bigger than ld %lu in source", (unsigned long) blocksize, (unsigned long) ld1_src);
- STARPU_ASSERT_MSG(ld1_dst >= blocksize, "block size %lu is bigger than ld %lu in destination", (unsigned long) blocksize, (unsigned long) ld1_dst);
- STARPU_ASSERT_MSG(ld2_src >= numblocks_1 * ld1_src, "block group size %lu is bigger than group ld %lu in source", (unsigned long) numblocks_1 * ld1_src, (unsigned long) ld2_src);
- STARPU_ASSERT_MSG(ld2_dst >= numblocks_1 * ld1_dst, "block group size %lu is bigger than group ld %lu in destination", (unsigned long) numblocks_1 * ld1_dst, (unsigned long) ld2_dst);
- if (ld1_src * ld2_src == blocksize * numblocks_1 &&
- ld1_dst * ld2_dst == blocksize * numblocks_1)
- /* Optimize contiguous case */
- return starpu_interface_copy(src, src_offset, src_node,
- dst, dst_offset, dst_node,
- blocksize * numblocks_1 * numblocks_2,
- async_data);
- if (node_ops && node_ops->copy3d_data_to[dst_kind])
- /* Hardware-optimized non-contiguous case */
- return node_ops->copy3d_data_to[dst_kind](src, src_offset, src_node,
- dst, dst_offset, dst_node,
- blocksize,
- numblocks_1, ld1_src, ld1_dst,
- numblocks_2, ld2_src, ld2_dst,
- async_channel);
- for (i = 0; i < numblocks_2; i++)
- {
- if (starpu_interface_copy2d(src, src_offset + i*ld2_src, src_node,
- dst, dst_offset + i*ld2_dst, dst_node,
- blocksize, numblocks_1, ld1_src, ld1_dst,
- async_data))
- ret = -EAGAIN;
- }
- return ret;
- }
- int starpu_interface_copy4d(uintptr_t src, size_t src_offset, unsigned src_node,
- uintptr_t dst, size_t dst_offset, unsigned dst_node,
- size_t blocksize,
- size_t numblocks_1, size_t ld1_src, size_t ld1_dst,
- size_t numblocks_2, size_t ld2_src, size_t ld2_dst,
- size_t numblocks_3, size_t ld3_src, size_t ld3_dst,
- void *async_data)
- {
- int ret = 0;
- unsigned i;
- STARPU_ASSERT_MSG(ld1_src >= blocksize, "block size %lu is bigger than ld %lu in source", (unsigned long) blocksize, (unsigned long) ld1_src);
- STARPU_ASSERT_MSG(ld1_dst >= blocksize, "block size %lu is bigger than ld %lu in destination", (unsigned long) blocksize, (unsigned long) ld1_dst);
- STARPU_ASSERT_MSG(ld2_src >= numblocks_1 * ld1_src, "block group size %lu is bigger than group ld %lu in source", (unsigned long) numblocks_1 * ld1_src, (unsigned long) ld2_src);
- STARPU_ASSERT_MSG(ld2_dst >= numblocks_1 * ld1_dst, "block group size %lu is bigger than group ld %lu in destination", (unsigned long) numblocks_1 * ld1_dst, (unsigned long) ld2_dst);
- STARPU_ASSERT_MSG(ld3_src >= numblocks_2 * ld2_src, "block group group size %lu is bigger than group group ld %lu in source", (unsigned long) numblocks_2 * ld2_src, (unsigned long) ld3_src);
- STARPU_ASSERT_MSG(ld3_dst >= numblocks_2 * ld2_dst, "block group group size %lu is bigger than group group ld %lu in destination", (unsigned long) numblocks_2 * ld2_dst, (unsigned long) ld3_dst);
- if (ld1_src * ld2_src * ld3_src == blocksize * numblocks_1 * numblocks_2 &&
- ld1_dst * ld2_dst * ld3_dst == blocksize * numblocks_1 * numblocks_2)
- /* Optimize contiguous case */
- return starpu_interface_copy(src, src_offset, src_node,
- dst, dst_offset, dst_node,
- blocksize * numblocks_1 * numblocks_2 * numblocks_3,
- async_data);
- /* Probably won't ever have a 4D interface in drivers :) */
- for (i = 0; i < numblocks_3; i++)
- {
- if (starpu_interface_copy3d(src, src_offset + i*ld3_src, src_node,
- dst, dst_offset + i*ld3_dst, dst_node,
- blocksize,
- numblocks_1, ld1_src, ld1_dst,
- numblocks_2, ld2_src, ld2_dst,
- async_data))
- ret = -EAGAIN;
- }
- return ret;
- }
- void _starpu_driver_wait_request_completion(struct _starpu_async_channel *async_channel)
- {
- #ifdef STARPU_SIMGRID
- _starpu_simgrid_wait_transfer_event(&async_channel->event);
- #else /* !SIMGRID */
- struct _starpu_node_ops *node_ops = async_channel->node_ops;
- if (node_ops && node_ops->wait_request_completion != NULL)
- {
- node_ops->wait_request_completion(async_channel);
- }
- else
- {
- STARPU_ABORT_MSG("No wait_request_completion function defined for node %s\n", node_ops?node_ops->name:"unknown");
- }
- #endif /* !SIMGRID */
- }
- unsigned _starpu_driver_test_request_completion(struct _starpu_async_channel *async_channel)
- {
- #ifdef STARPU_SIMGRID
- return _starpu_simgrid_test_transfer_event(&async_channel->event);
- #else /* !SIMGRID */
- struct _starpu_node_ops *node_ops = async_channel->node_ops;
- if (node_ops && node_ops->test_request_completion != NULL)
- {
- return node_ops->test_request_completion(async_channel);
- }
- else
- {
- STARPU_ABORT_MSG("No test_request_completion function defined for node %s\n", node_ops?node_ops->name:"unknown");
- }
- #endif /* !SIMGRID */
- }
|