123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843 |
- /* StarPU --- Runtime system for heterogeneous multicore architectures.
- *
- * Copyright (C) 2009-2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
- *
- * StarPU is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * StarPU is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
- #include <common/config.h>
- #include <common/utils.h>
- #include <core/task.h>
- #include <datawizard/coherency.h>
- #include <datawizard/copy_driver.h>
- #include <datawizard/write_back.h>
- #include <core/dependencies/data_concurrency.h>
- #include <core/sched_policy.h>
- #include <datawizard/memory_nodes.h>
- static void _starpu_data_check_initialized(starpu_data_handle_t handle, enum starpu_data_access_mode mode)
- {
- if (!(mode & STARPU_R))
- return;
- if (((handle->nplans && !handle->nchildren) || handle->siblings)
- && handle->partition_automatic_disabled == 0)
- {
- _starpu_data_partition_access_submit(handle, (mode & STARPU_W) != 0);
- }
- if (!handle->initialized && handle->init_cl)
- {
- int ret = starpu_task_insert(handle->init_cl, STARPU_W, handle, 0);
- STARPU_ASSERT(ret == 0);
- }
- STARPU_ASSERT_MSG(handle->initialized, "handle %p is not initialized while trying to read it\n", handle);
- }
- /* Explicitly ask StarPU to allocate room for a piece of data on the specified
- * memory node. */
- int starpu_data_request_allocation(starpu_data_handle_t handle, unsigned node)
- {
- struct _starpu_data_request *r;
- STARPU_ASSERT(handle);
- _starpu_spin_lock(&handle->header_lock);
- r = _starpu_create_data_request(handle, NULL, &handle->per_node[node], node, STARPU_NONE, 0, NULL, STARPU_PREFETCH, 0, 0, "starpu_data_request_allocation");
- /* we do not increase the refcnt associated to the request since we are
- * not waiting for its termination */
- _starpu_post_data_request(r);
- _starpu_spin_unlock(&handle->header_lock);
- return 0;
- }
- struct user_interaction_wrapper
- {
- starpu_data_handle_t handle;
- enum starpu_data_access_mode mode;
- int node;
- starpu_pthread_cond_t cond;
- starpu_pthread_mutex_t lock;
- unsigned finished;
- unsigned detached;
- enum starpu_is_prefetch prefetch;
- unsigned async;
- int prio;
- void (*callback_acquired)(void *, int *node, enum starpu_data_access_mode mode);
- void (*callback)(void *);
- void *callback_arg;
- struct starpu_task *pre_sync_task;
- struct starpu_task *post_sync_task;
- };
- static inline void _starpu_data_acquire_wrapper_init(struct user_interaction_wrapper *wrapper, starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode)
- {
- memset(wrapper, 0, sizeof(*wrapper));
- wrapper->handle = handle;
- wrapper->node = node;
- wrapper->mode = mode;
- //wrapper->finished = 0;
- STARPU_PTHREAD_COND_INIT0(&wrapper->cond, NULL);
- STARPU_PTHREAD_MUTEX_INIT0(&wrapper->lock, NULL);
- }
- /* Called to signal completion of asynchronous data acquisition */
- static inline void _starpu_data_acquire_wrapper_finished(struct user_interaction_wrapper *wrapper)
- {
- STARPU_PTHREAD_MUTEX_LOCK(&wrapper->lock);
- wrapper->finished = 1;
- STARPU_PTHREAD_COND_SIGNAL(&wrapper->cond);
- STARPU_PTHREAD_MUTEX_UNLOCK(&wrapper->lock);
- }
- /* Called to wait for completion of asynchronous data acquisition */
- static inline void _starpu_data_acquire_wrapper_wait(struct user_interaction_wrapper *wrapper)
- {
- STARPU_PTHREAD_MUTEX_LOCK(&wrapper->lock);
- while (!wrapper->finished)
- STARPU_PTHREAD_COND_WAIT(&wrapper->cond, &wrapper->lock);
- STARPU_PTHREAD_MUTEX_UNLOCK(&wrapper->lock);
- }
- static inline void _starpu_data_acquire_wrapper_fini(struct user_interaction_wrapper *wrapper)
- {
- STARPU_PTHREAD_COND_DESTROY(&wrapper->cond);
- STARPU_PTHREAD_MUTEX_DESTROY(&wrapper->lock);
- }
- /* Called when the data acquisition is done, to launch the fetch into target memory */
- static inline void _starpu_data_acquire_launch_fetch(struct user_interaction_wrapper *wrapper, int async, void (*callback)(void *), void *callback_arg)
- {
- int node = wrapper->node;
- starpu_data_handle_t handle = wrapper->handle;
- struct _starpu_data_replicate *replicate = node >= 0 ? &handle->per_node[node] : NULL;
- int ret = _starpu_fetch_data_on_node(handle, node, replicate, wrapper->mode, wrapper->detached, NULL, wrapper->prefetch, async, callback, callback_arg, wrapper->prio, "_starpu_data_acquire_launch_fetch");
- STARPU_ASSERT(!ret);
- }
- /*
- * Non Blocking data request from application
- */
- /* Called when fetch is done, call the callback */
- static void _starpu_data_acquire_fetch_data_callback(void *arg)
- {
- struct user_interaction_wrapper *wrapper = (struct user_interaction_wrapper *) arg;
- starpu_data_handle_t handle = wrapper->handle;
- /* At that moment, the caller holds a reference to the piece of data.
- * We enqueue the "post" sync task in the list associated to the handle
- * so that it is submitted by the starpu_data_release
- * function. */
- if (wrapper->post_sync_task)
- _starpu_add_post_sync_tasks(wrapper->post_sync_task, handle);
- wrapper->callback(wrapper->callback_arg);
- _starpu_data_acquire_wrapper_fini(wrapper);
- free(wrapper);
- }
- /* Called when the data acquisition is done, launch the fetch into target memory */
- static void _starpu_data_acquire_continuation_non_blocking(void *arg)
- {
- struct user_interaction_wrapper *wrapper = (struct user_interaction_wrapper *) arg;
- if (wrapper->callback_acquired)
- /* This can change the node at will according to the current data situation */
- wrapper->callback_acquired(wrapper->callback_arg, &wrapper->node, wrapper->mode);
- _starpu_data_acquire_launch_fetch(arg, 1, _starpu_data_acquire_fetch_data_callback, arg);
- }
- /* Called when the implicit data dependencies are done, launch the data acquisition */
- static void starpu_data_acquire_cb_pre_sync_callback(void *arg)
- {
- struct user_interaction_wrapper *wrapper = (struct user_interaction_wrapper *) arg;
- /* we try to get the data, if we do not succeed immediately, we set a
- * callback function that will be executed automatically when the data is
- * available again, otherwise we fetch the data directly */
- if (!_starpu_attempt_to_submit_data_request_from_apps(wrapper->handle, wrapper->mode,
- _starpu_data_acquire_continuation_non_blocking, wrapper))
- {
- /* no one has locked this data yet, so we proceed immediately */
- _starpu_data_acquire_continuation_non_blocking(wrapper);
- }
- }
- /* The data must be released by calling starpu_data_release later on */
- int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_data_handle_t handle, int node,
- enum starpu_data_access_mode mode,
- void (*callback_acquired)(void *arg, int *node, enum starpu_data_access_mode mode),
- void (*callback)(void *arg),
- void *arg,
- int sequential_consistency, int quick,
- long *pre_sync_jobid, long *post_sync_jobid, int prio)
- {
- STARPU_ASSERT(handle);
- STARPU_ASSERT_MSG(handle->nchildren == 0, "Acquiring a partitioned data (%p) is not possible", handle);
- _STARPU_LOG_IN();
- /* Check that previous tasks have set a value if needed */
- _starpu_data_check_initialized(handle, mode);
- struct user_interaction_wrapper *wrapper;
- _STARPU_MALLOC(wrapper, sizeof(struct user_interaction_wrapper));
- _starpu_data_acquire_wrapper_init(wrapper, handle, node, mode);
- wrapper->async = 1;
- wrapper->callback_acquired = callback_acquired;
- wrapper->callback = callback;
- wrapper->callback_arg = arg;
- wrapper->pre_sync_task = NULL;
- wrapper->post_sync_task = NULL;
- wrapper->prio = prio;
- STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
- int handle_sequential_consistency = handle->sequential_consistency;
- if (handle_sequential_consistency && sequential_consistency)
- {
- struct starpu_task *new_task;
- struct _starpu_job *pre_sync_job, *post_sync_job;
- int submit_pre_sync = 0;
- wrapper->pre_sync_task = starpu_task_create();
- wrapper->pre_sync_task->name = "_starpu_data_acquire_cb_pre";
- wrapper->pre_sync_task->detach = 1;
- wrapper->pre_sync_task->callback_func = starpu_data_acquire_cb_pre_sync_callback;
- wrapper->pre_sync_task->callback_arg = wrapper;
- wrapper->pre_sync_task->type = STARPU_TASK_TYPE_DATA_ACQUIRE;
- wrapper->pre_sync_task->priority = prio;
- pre_sync_job = _starpu_get_job_associated_to_task(wrapper->pre_sync_task);
- if (pre_sync_jobid)
- *pre_sync_jobid = pre_sync_job->job_id;
- wrapper->post_sync_task = starpu_task_create();
- wrapper->post_sync_task->name = "_starpu_data_acquire_cb_release";
- wrapper->post_sync_task->detach = 1;
- wrapper->post_sync_task->type = STARPU_TASK_TYPE_DATA_ACQUIRE;
- wrapper->post_sync_task->priority = prio;
- post_sync_job = _starpu_get_job_associated_to_task(wrapper->post_sync_task);
- if (post_sync_jobid)
- *post_sync_jobid = post_sync_job->job_id;
- if (quick)
- pre_sync_job->quick_next = post_sync_job;
- new_task = _starpu_detect_implicit_data_deps_with_handle(wrapper->pre_sync_task, &submit_pre_sync, wrapper->post_sync_task, &_starpu_get_job_associated_to_task(wrapper->post_sync_task)->implicit_dep_slot, handle, mode, sequential_consistency);
- STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
- if (STARPU_UNLIKELY(new_task))
- {
- int ret = _starpu_task_submit_internally(new_task);
- STARPU_ASSERT(!ret);
- }
- if (submit_pre_sync)
- {
- int ret = _starpu_task_submit_internally(wrapper->pre_sync_task);
- STARPU_ASSERT(!ret);
- }
- else
- {
- wrapper->pre_sync_task->detach = 0;
- starpu_task_destroy(wrapper->pre_sync_task);
- starpu_data_acquire_cb_pre_sync_callback(wrapper);
- }
- }
- else
- {
- if (pre_sync_jobid)
- *pre_sync_jobid = -1;
- if (post_sync_jobid)
- *post_sync_jobid = -1;
- STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
- starpu_data_acquire_cb_pre_sync_callback(wrapper);
- }
- _STARPU_LOG_OUT();
- return 0;
- }
- int starpu_data_acquire_on_node_cb_sequential_consistency_quick(starpu_data_handle_t handle, int node,
- enum starpu_data_access_mode mode, void (*callback)(void *), void *arg,
- int sequential_consistency, int quick)
- {
- return starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(handle, node, mode, NULL, callback, arg, sequential_consistency, quick, NULL, NULL, STARPU_DEFAULT_PRIO);
- }
- int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t handle, int node,
- enum starpu_data_access_mode mode, void (*callback)(void *), void *arg,
- int sequential_consistency)
- {
- return starpu_data_acquire_on_node_cb_sequential_consistency_quick(handle, node, mode, callback, arg, sequential_consistency, 0);
- }
- int starpu_data_acquire_on_node_cb(starpu_data_handle_t handle, int node,
- enum starpu_data_access_mode mode, void (*callback)(void *), void *arg)
- {
- return starpu_data_acquire_on_node_cb_sequential_consistency(handle, node, mode, callback, arg, 1);
- }
- int starpu_data_acquire_cb(starpu_data_handle_t handle,
- enum starpu_data_access_mode mode, void (*callback)(void *), void *arg)
- {
- int home_node = handle->home_node;
- if (home_node < 0)
- home_node = STARPU_MAIN_RAM;
- return starpu_data_acquire_on_node_cb(handle, home_node, mode, callback, arg);
- }
- int starpu_data_acquire_cb_sequential_consistency(starpu_data_handle_t handle,
- enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency)
- {
- int home_node = handle->home_node;
- if (home_node < 0)
- home_node = STARPU_MAIN_RAM;
- return starpu_data_acquire_on_node_cb_sequential_consistency(handle, home_node, mode, callback, arg, sequential_consistency);
- }
- /*
- * Blocking data request from application
- */
- static inline void _starpu_data_acquire_continuation(void *arg)
- {
- struct user_interaction_wrapper *wrapper = (struct user_interaction_wrapper *) arg;
- starpu_data_handle_t handle = wrapper->handle;
- STARPU_ASSERT(handle);
- _starpu_data_acquire_launch_fetch(wrapper, 0, NULL, NULL);
- _starpu_data_acquire_wrapper_finished(wrapper);
- }
- /* The data must be released by calling starpu_data_release later on */
- int starpu_data_acquire_on_node(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode)
- {
- STARPU_ASSERT(handle);
- STARPU_ASSERT_MSG(handle->nchildren == 0, "Acquiring a partitioned data is not possible");
- _STARPU_LOG_IN();
- /* unless asynchronous, it is forbidden to call this function from a callback or a codelet */
- STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "Acquiring a data synchronously is not possible from a codelet or from a task callback, use starpu_data_acquire_cb instead.");
- /* Check that previous tasks have set a value if needed */
- _starpu_data_check_initialized(handle, mode);
- if (node >= 0 && _starpu_data_is_multiformat_handle(handle) &&
- _starpu_handle_needs_conversion_task(handle, node))
- {
- struct starpu_task *task = _starpu_create_conversion_task(handle, node);
- int ret;
- _starpu_spin_lock(&handle->header_lock);
- handle->refcnt--;
- handle->busy_count--;
- handle->mf_node = node;
- _starpu_spin_unlock(&handle->header_lock);
- task->synchronous = 1;
- ret = _starpu_task_submit_internally(task);
- STARPU_ASSERT(!ret);
- }
- struct user_interaction_wrapper wrapper;
- _starpu_data_acquire_wrapper_init(&wrapper, handle, node, mode);
- // _STARPU_DEBUG("TAKE sequential_consistency_mutex starpu_data_acquire\n");
- STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
- int sequential_consistency = handle->sequential_consistency;
- if (sequential_consistency)
- {
- struct starpu_task *new_task;
- int submit_pre_sync = 0;
- wrapper.pre_sync_task = starpu_task_create();
- wrapper.pre_sync_task->name = "_starpu_data_acquire_pre";
- wrapper.pre_sync_task->detach = 0;
- wrapper.pre_sync_task->type = STARPU_TASK_TYPE_DATA_ACQUIRE;
- wrapper.post_sync_task = starpu_task_create();
- wrapper.post_sync_task->name = "_starpu_data_acquire_post";
- wrapper.post_sync_task->detach = 1;
- wrapper.post_sync_task->type = STARPU_TASK_TYPE_DATA_ACQUIRE;
- new_task = _starpu_detect_implicit_data_deps_with_handle(wrapper.pre_sync_task, &submit_pre_sync, wrapper.post_sync_task, &_starpu_get_job_associated_to_task(wrapper.post_sync_task)->implicit_dep_slot, handle, mode, sequential_consistency);
- STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
- if (STARPU_UNLIKELY(new_task))
- {
- int ret = _starpu_task_submit_internally(new_task);
- STARPU_ASSERT(!ret);
- }
- if (submit_pre_sync)
- {
- wrapper.pre_sync_task->synchronous = 1;
- int ret = _starpu_task_submit_internally(wrapper.pre_sync_task);
- STARPU_ASSERT(!ret);
- }
- else
- {
- wrapper.pre_sync_task->detach = 0;
- starpu_task_destroy(wrapper.pre_sync_task);
- }
- }
- else
- {
- STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
- }
- /* we try to get the data, if we do not succeed immediately, we set a
- * callback function that will be executed automatically when the data is
- * available again, otherwise we fetch the data directly */
- if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _starpu_data_acquire_continuation, &wrapper))
- {
- /* no one has locked this data yet, so we proceed immediately */
- _starpu_data_acquire_launch_fetch(&wrapper, 0, NULL, NULL);
- }
- else
- {
- _starpu_data_acquire_wrapper_wait(&wrapper);
- }
- _starpu_data_acquire_wrapper_fini(&wrapper);
- /* At that moment, the caller holds a reference to the piece of data.
- * We enqueue the "post" sync task in the list associated to the handle
- * so that it is submitted by the starpu_data_release
- * function. */
- if (sequential_consistency)
- _starpu_add_post_sync_tasks(wrapper.post_sync_task, handle);
- _STARPU_LOG_OUT();
- return 0;
- }
- int starpu_data_acquire(starpu_data_handle_t handle, enum starpu_data_access_mode mode)
- {
- int home_node = handle->home_node;
- if (home_node < 0)
- home_node = STARPU_MAIN_RAM;
- return starpu_data_acquire_on_node(handle, home_node, mode);
- }
- int starpu_data_acquire_on_node_try(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode)
- {
- STARPU_ASSERT(handle);
- STARPU_ASSERT_MSG(handle->nchildren == 0, "Acquiring a partitioned data is not possible");
- /* it is forbidden to call this function from a callback or a codelet */
- STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "Acquiring a data synchronously is not possible from a codelet or from a task callback, use starpu_data_acquire_cb instead.");
- /* Check that previous tasks have set a value if needed */
- _starpu_data_check_initialized(handle, mode);
- int ret;
- STARPU_ASSERT_MSG(!_starpu_data_is_multiformat_handle(handle), "not supported yet");
- STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
- ret = _starpu_test_implicit_data_deps_with_handle(handle, mode);
- STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
- if (ret)
- return ret;
- struct user_interaction_wrapper wrapper;
- _starpu_data_acquire_wrapper_init(&wrapper, handle, node, mode);
- /* we try to get the data, if we do not succeed immediately, we set a
- * callback function that will be executed automatically when the data is
- * available again, otherwise we fetch the data directly */
- if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _starpu_data_acquire_continuation, &wrapper))
- {
- /* no one has locked this data yet, so we proceed immediately */
- _starpu_data_acquire_launch_fetch(&wrapper, 0, NULL, NULL);
- }
- else
- {
- _starpu_data_acquire_wrapper_wait(&wrapper);
- }
- _starpu_data_acquire_wrapper_fini(&wrapper);
- return 0;
- }
- int starpu_data_acquire_try(starpu_data_handle_t handle, enum starpu_data_access_mode mode)
- {
- return starpu_data_acquire_on_node_try(handle, STARPU_MAIN_RAM, mode);
- }
- /* This function must be called after starpu_data_acquire so that the
- * application release the data */
- void starpu_data_release_to_on_node(starpu_data_handle_t handle, enum starpu_data_access_mode mode, int node)
- {
- STARPU_ASSERT(handle);
- if (mode == STARPU_RW)
- /* They are equivalent here, and current_mode is never STARPU_RW */
- mode = STARPU_W;
- STARPU_ASSERT_MSG(mode == STARPU_NONE ||
- mode == handle->current_mode ||
- (mode == STARPU_R &&
- handle->current_mode == STARPU_W),
- "We only support releasing from W to R");
- /* In case there are some implicit dependencies, unlock the "post sync" tasks */
- _starpu_unlock_post_sync_tasks(handle, mode);
- /* The application can now release the rw-lock */
- if (node >= 0)
- _starpu_release_data_on_node(handle, 0, mode, &handle->per_node[node]);
- else
- {
- _starpu_spin_lock(&handle->header_lock);
- if (node == STARPU_ACQUIRE_NO_NODE_LOCK_ALL)
- {
- int i;
- for (i = 0; i < STARPU_MAXNODES; i++)
- handle->per_node[i].refcnt--;
- }
- handle->busy_count--;
- if (!_starpu_notify_data_dependencies(handle, mode))
- _starpu_spin_unlock(&handle->header_lock);
- }
- }
- void starpu_data_release_on_node(starpu_data_handle_t handle, int node)
- {
- starpu_data_release_to_on_node(handle, STARPU_NONE, node);
- }
- void starpu_data_release_to(starpu_data_handle_t handle, enum starpu_data_access_mode mode)
- {
- int home_node = handle->home_node;
- if (home_node < 0)
- home_node = STARPU_MAIN_RAM;
- starpu_data_release_to_on_node(handle, mode, home_node);
- }
- void starpu_data_release(starpu_data_handle_t handle)
- {
- starpu_data_release_to(handle, STARPU_NONE);
- }
- static void _prefetch_data_on_node(void *arg)
- {
- struct user_interaction_wrapper *wrapper = (struct user_interaction_wrapper *) arg;
- starpu_data_handle_t handle = wrapper->handle;
- _starpu_data_acquire_launch_fetch(wrapper, wrapper->async, NULL, NULL);
- if (wrapper->async)
- free(wrapper);
- else
- _starpu_data_acquire_wrapper_finished(wrapper);
- _starpu_spin_lock(&handle->header_lock);
- if (!_starpu_notify_data_dependencies(handle, STARPU_NONE))
- _starpu_spin_unlock(&handle->header_lock);
- }
- static
- int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle_t handle, unsigned node, unsigned async, enum starpu_data_access_mode mode, enum starpu_is_prefetch prefetch, int prio)
- {
- STARPU_ASSERT(handle);
- /* it is forbidden to call this function from a callback or a codelet */
- STARPU_ASSERT_MSG(async || _starpu_worker_may_perform_blocking_calls(), "Synchronous prefetch is not possible from a task or a callback");
- /* Check that previous tasks have set a value if needed */
- _starpu_data_check_initialized(handle, mode);
- struct user_interaction_wrapper *wrapper;
- _STARPU_MALLOC(wrapper, sizeof(*wrapper));
- _starpu_data_acquire_wrapper_init(wrapper, handle, node, STARPU_R);
- wrapper->detached = async;
- wrapper->prefetch = prefetch;
- wrapper->async = async;
- wrapper->prio = prio;
- if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _prefetch_data_on_node, wrapper))
- {
- /* we can immediately proceed */
- struct _starpu_data_replicate *replicate = &handle->per_node[node];
- _starpu_data_acquire_launch_fetch(wrapper, async, NULL, NULL);
- _starpu_data_acquire_wrapper_fini(wrapper);
- free(wrapper);
- /* remove the "lock"/reference */
- _starpu_spin_lock(&handle->header_lock);
- if (!async)
- {
- /* Release our refcnt, like _starpu_release_data_on_node would do */
- replicate->refcnt--;
- STARPU_ASSERT(replicate->refcnt >= 0);
- STARPU_ASSERT(handle->busy_count > 0);
- handle->busy_count--;
- }
- /* In case there was a temporary handle (eg. used for reduction), this
- * handle may have requested to be destroyed when the data is released
- * */
- if (!_starpu_notify_data_dependencies(handle, STARPU_NONE))
- _starpu_spin_unlock(&handle->header_lock);
- }
- else if (!async)
- {
- _starpu_data_acquire_wrapper_wait(wrapper);
- _starpu_data_acquire_wrapper_fini(wrapper);
- free(wrapper);
- }
- return 0;
- }
- int starpu_data_fetch_on_node(starpu_data_handle_t handle, unsigned node, unsigned async)
- {
- return _starpu_prefetch_data_on_node_with_mode(handle, node, async, STARPU_R, STARPU_FETCH, STARPU_DEFAULT_PRIO);
- }
- int starpu_data_prefetch_on_node_prio(starpu_data_handle_t handle, unsigned node, unsigned async, int prio)
- {
- return _starpu_prefetch_data_on_node_with_mode(handle, node, async, STARPU_R, STARPU_PREFETCH, prio);
- }
- int starpu_data_prefetch_on_node(starpu_data_handle_t handle, unsigned node, unsigned async)
- {
- return starpu_data_prefetch_on_node_prio(handle, node, async, STARPU_DEFAULT_PRIO);
- }
- int starpu_data_idle_prefetch_on_node_prio(starpu_data_handle_t handle, unsigned node, unsigned async, int prio)
- {
- return _starpu_prefetch_data_on_node_with_mode(handle, node, async, STARPU_R, STARPU_IDLEFETCH, prio);
- }
- int starpu_data_idle_prefetch_on_node(starpu_data_handle_t handle, unsigned node, unsigned async)
- {
- return starpu_data_idle_prefetch_on_node_prio(handle, node, async, STARPU_DEFAULT_PRIO);
- }
- static void _starpu_data_wont_use(void *data)
- {
- unsigned node;
- starpu_data_handle_t handle = data;
- _STARPU_TRACE_DATA_DOING_WONT_USE(handle);
- _starpu_spin_lock(&handle->header_lock);
- for (node = 0; node < STARPU_MAXNODES; node++)
- {
- struct _starpu_data_replicate *local = &handle->per_node[node];
- if (local->allocated && local->automatically_allocated)
- _starpu_memchunk_wont_use(local->mc, node);
- }
- if (handle->per_worker)
- {
- unsigned nworkers = starpu_worker_get_count();
- unsigned worker;
- for (worker = 0; worker < nworkers; worker++)
- {
- struct _starpu_data_replicate *local = &handle->per_worker[worker];
- if (local->allocated && local->automatically_allocated)
- _starpu_memchunk_wont_use(local->mc, starpu_worker_get_memory_node(worker));
- }
- }
- _starpu_spin_unlock(&handle->header_lock);
- starpu_data_release_on_node(handle, STARPU_ACQUIRE_NO_NODE_LOCK_ALL);
- if (handle->home_node != -1)
- starpu_data_idle_prefetch_on_node(handle, handle->home_node, 1);
- else
- {
- if (handle->ooc)
- {
- /* Try to push it to some disk */
- unsigned i;
- unsigned nnodes = starpu_memory_nodes_get_count();
- for (i = 0; i < nnodes; i++)
- {
- if (starpu_node_get_kind(i) == STARPU_DISK_RAM)
- starpu_data_idle_prefetch_on_node(handle, i, 1);
- }
- }
- }
- }
- void starpu_data_wont_use(starpu_data_handle_t handle)
- {
- if (!handle->initialized)
- /* No value atm actually */
- return;
- if (starpu_data_get_nb_children(handle) != 0)
- {
- int i;
- for(i=0 ; i<starpu_data_get_nb_children(handle) ; i++)
- starpu_data_wont_use(starpu_data_get_child(handle, i));
- return;
- }
- if (handle->partitioned != 0)
- {
- unsigned i;
- for(i=0 ; i<handle->partitioned; i++)
- {
- unsigned j;
- for(j=0 ; j<handle->active_readonly_nchildren[i] ; j++)
- starpu_data_wont_use(handle->active_readonly_children[i][j]);
- }
- }
- if (handle->active_nchildren != 0)
- {
- unsigned j;
- for(j=0 ; j<handle->active_nchildren ; j++)
- starpu_data_wont_use(handle->active_children[j]);
- return;
- }
- _STARPU_TRACE_DATA_WONT_USE(handle);
- starpu_data_acquire_on_node_cb_sequential_consistency_quick(handle, STARPU_ACQUIRE_NO_NODE_LOCK_ALL, STARPU_R, _starpu_data_wont_use, handle, 1, 1);
- }
- /*
- * It is possible to specify that a piece of data can be discarded without
- * impacting the application.
- */
- int _starpu_has_not_important_data;
- void starpu_data_advise_as_important(starpu_data_handle_t handle, unsigned is_important)
- {
- if (!is_important)
- _starpu_has_not_important_data = 1;
- _starpu_spin_lock(&handle->header_lock);
- /* first take all the children lock (in order !) */
- unsigned child;
- for (child = 0; child < handle->nchildren; child++)
- {
- /* make sure the intermediate children is advised as well */
- starpu_data_handle_t child_handle = starpu_data_get_child(handle, child);
- if (child_handle->nchildren > 0)
- starpu_data_advise_as_important(child_handle, is_important);
- }
- handle->is_not_important = !is_important;
- /* now the parent may be used again so we release the lock */
- _starpu_spin_unlock(&handle->header_lock);
- }
- void starpu_data_set_sequential_consistency_flag(starpu_data_handle_t handle, unsigned flag)
- {
- _starpu_spin_lock(&handle->header_lock);
- unsigned child;
- for (child = 0; child < handle->nchildren; child++)
- {
- /* make sure that the flags are applied to the children as well */
- starpu_data_handle_t child_handle = starpu_data_get_child(handle, child);
- if (child_handle->nchildren > 0)
- starpu_data_set_sequential_consistency_flag(child_handle, flag);
- }
- STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
- handle->sequential_consistency = flag;
- STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
- _starpu_spin_unlock(&handle->header_lock);
- }
- unsigned starpu_data_get_sequential_consistency_flag(starpu_data_handle_t handle)
- {
- return handle->sequential_consistency;
- }
- void starpu_data_set_ooc_flag(starpu_data_handle_t handle, unsigned flag)
- {
- handle->ooc = flag;
- }
- unsigned starpu_data_get_ooc_flag(starpu_data_handle_t handle)
- {
- return handle->ooc;
- }
- /* By default, sequential consistency is enabled */
- static unsigned default_sequential_consistency_flag = 1;
- unsigned starpu_data_get_default_sequential_consistency_flag(void)
- {
- return default_sequential_consistency_flag;
- }
- void starpu_data_set_default_sequential_consistency_flag(unsigned flag)
- {
- default_sequential_consistency_flag = flag;
- }
- /* Query the status of the handle on the specified memory node. */
- void starpu_data_query_status2(starpu_data_handle_t handle, int memory_node, int *is_allocated, int *is_valid, int *is_loading, int *is_requested)
- {
- // XXX : this is just a hint, so we don't take the lock ...
- // _starpu_spin_lock(&handle->header_lock);
- if (is_allocated)
- *is_allocated = handle->per_node[memory_node].allocated;
- if (is_valid)
- *is_valid = (handle->per_node[memory_node].state != STARPU_INVALID);
- if (is_loading)
- *is_loading = handle->per_node[memory_node].load_request != NULL;
- if (is_requested)
- {
- int requested = 0;
- unsigned node;
- for (node = 0; node < STARPU_MAXNODES; node++)
- {
- if (handle->per_node[memory_node].request[node])
- {
- requested = 1;
- break;
- }
- }
- *is_requested = requested;
- }
- // _starpu_spin_unlock(&handle->header_lock);
- }
- void starpu_data_query_status(starpu_data_handle_t handle, int memory_node, int *is_allocated, int *is_valid, int *is_requested)
- {
- return starpu_data_query_status2(handle, memory_node, is_allocated, is_valid, NULL, is_requested);
- }
|