瀏覽代碼

Change the behaviour of starpu_data_unregister so that the data is updated in
node where it was registered before being deleted from StarPU. We also enforce
sequential consistency if needed (eg. the function won't be executed until all
previous requests have been done).

Cédric Augonnet 15 年之前
父節點
當前提交
c4d106f500
共有 1 個文件被更改,包括 81 次插入0 次删除
  1. 81 0
      src/datawizard/interfaces/data_interface.c

+ 81 - 0
src/datawizard/interfaces/data_interface.c

@@ -15,6 +15,7 @@
  */
 
 #include <datawizard/datawizard.h>
+#include <core/dependencies/data_concurrency.h>
 
 /* 
  * Start monitoring a piece of data
@@ -126,10 +127,90 @@ void starpu_data_free_interfaces(starpu_data_handle handle)
 		free(handle->interface[node]);
 }
 
+struct unregister_callback_arg {
+	unsigned memory_node;
+	starpu_data_handle handle;
+	unsigned terminated;
+	pthread_mutex_t mutex;
+	pthread_cond_t cond;
+}; 
+
+static void _starpu_data_unregister_fetch_data_callback(void *_arg)
+{
+	int ret;
+	struct unregister_callback_arg *arg = _arg;
+
+	starpu_data_handle handle = arg->handle;
+
+	STARPU_ASSERT(handle);
+
+	ret = _starpu_fetch_data_on_node(handle, arg->memory_node, STARPU_R, 0, NULL, NULL);
+	STARPU_ASSERT(!ret);
+	
+	/* unlock the caller */
+	PTHREAD_MUTEX_LOCK(&arg->mutex);
+	arg->terminated = 1;
+	PTHREAD_COND_SIGNAL(&arg->cond);
+	PTHREAD_MUTEX_UNLOCK(&arg->mutex);
+}
+
+
 void starpu_data_unregister(starpu_data_handle handle)
 {
 	unsigned node;
 
+	/* If sequential consistency is enabled, wait until data is available */
+	PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
+	int sequential_consistency = handle->sequential_consistency;
+	if (sequential_consistency)
+	{
+		struct starpu_task *sync_task;
+		sync_task = starpu_task_create();
+		sync_task->detach = 0;
+		sync_task->destroy = 1;
+
+		/* It is not really a RW access, but we want to make sure that
+		 * all previous accesses are done */
+		_starpu_detect_implicit_data_deps_with_handle(sync_task, sync_task, handle, STARPU_RW);
+		PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
+
+		/* TODO detect if this is superflous */
+		int ret = starpu_task_submit(sync_task);
+		STARPU_ASSERT(!ret);
+		starpu_task_wait(sync_task);
+	}
+	else {
+		PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
+	}
+
+	/* Fetch data in the home of the data to ensure we have a valid copy
+	 * where we registered it */
+	int home_node = handle->data_home; 
+	if (home_node >= 0)
+	{
+		struct unregister_callback_arg arg;
+		arg.handle = handle;
+		arg.memory_node = (unsigned)home_node;
+		arg.terminated = 0;
+		PTHREAD_MUTEX_INIT(&arg.mutex, NULL);
+		PTHREAD_COND_INIT(&arg.cond, NULL);
+
+		if (!_starpu_attempt_to_submit_data_request_from_apps(handle, STARPU_R,
+				_starpu_data_unregister_fetch_data_callback, &arg))
+		{
+			/* no one has locked this data yet, so we proceed immediately */
+			int ret = _starpu_fetch_data_on_node(handle, home_node, STARPU_R, 0, NULL, NULL);
+			STARPU_ASSERT(!ret);
+		}
+		else {
+			PTHREAD_MUTEX_LOCK(&arg.mutex);
+			while (!arg.terminated)
+				PTHREAD_COND_WAIT(&arg.cond, &arg.mutex);
+			PTHREAD_MUTEX_UNLOCK(&arg.mutex);
+		}
+	}
+
+	/* Destroy the data now */
 	STARPU_ASSERT(handle);
 	for (node = 0; node < STARPU_MAXNODES; node++)
 	{