Browse Source

Fix asynchronous prefetch: we also need to notify data dependencies in that case. Allocate the wrapper structure dynamically to permit asynchronous termination. Permit prefetch in callbacks and codelets

Samuel Thibault 13 years ago
parent
commit
4a0ed0dc25
3 changed files with 57 additions and 68 deletions
  1. 25 25
      src/datawizard/user_interactions.c
  2. 29 29
      tests/microbenchs/prefetch_data_on_node.c
  3. 3 14
      tools/gdbinit

+ 25 - 25
src/datawizard/user_interactions.c

@@ -196,7 +196,7 @@ int starpu_data_acquire(starpu_data_handle handle, starpu_access_mode mode)
 	STARPU_ASSERT(handle);
         _STARPU_LOG_IN();
 
-	/* it is forbidden to call this function from a callback or a codelet */
+	/* unless asynchronous, it is forbidden to call this function from a callback or a codelet */
 	if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls())) {
                 _STARPU_LOG_OUT_TAG("EDEADLK");
 		return -EDEADLK;
@@ -299,12 +299,12 @@ static void _prefetch_data_on_node(void *arg)
 		wrapper->finished = 1;
 		PTHREAD_COND_SIGNAL(&wrapper->cond);
 		PTHREAD_MUTEX_UNLOCK(&wrapper->lock);
-
-		_starpu_spin_lock(&handle->header_lock);
-		_starpu_notify_data_dependencies(handle);
-		_starpu_spin_unlock(&handle->header_lock);
 	}
 
+	_starpu_spin_lock(&handle->header_lock);
+	_starpu_notify_data_dependencies(handle);
+	_starpu_spin_unlock(&handle->header_lock);
+
 }
 
 static
@@ -313,40 +313,40 @@ int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle handle, unsigned
 	STARPU_ASSERT(handle);
 
 	/* it is forbidden to call this function from a callback or a codelet */
-	if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
+	if (STARPU_UNLIKELY(!async && !_starpu_worker_may_perform_blocking_calls()))
 		return -EDEADLK;
 
-	struct user_interaction_wrapper wrapper =
-	{
-		.handle = handle,
-		.node = node,
-		.async = async,
-		.cond = PTHREAD_COND_INITIALIZER,
-		.lock = PTHREAD_MUTEX_INITIALIZER,
-		.finished = 0
-	};
+	struct user_interaction_wrapper *wrapper = (struct user_interaction_wrapper *) malloc(sizeof(*wrapper));
+
+	wrapper->handle = handle;
+	wrapper->node = node;
+	wrapper->async = async;
+	PTHREAD_COND_INIT(&wrapper->cond, NULL);
+	PTHREAD_MUTEX_INIT(&wrapper->lock, NULL);
+	wrapper->finished = 0;
 
-	if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _prefetch_data_on_node, &wrapper))
+	if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _prefetch_data_on_node, wrapper))
 	{
 		/* we can immediately proceed */
 		struct starpu_data_replicate_s *replicate = &handle->per_node[node];
 		_starpu_fetch_data_on_node(handle, replicate, mode, async, NULL, NULL);
 
 		/* remove the "lock"/reference */
-		if (!async)
-		{
-			_starpu_spin_lock(&handle->header_lock);
+
+		_starpu_spin_lock(&handle->header_lock);
+		if (!async) {
 			replicate->refcnt--;
 			STARPU_ASSERT(replicate->refcnt >= 0);
-			_starpu_notify_data_dependencies(handle);
-			_starpu_spin_unlock(&handle->header_lock);
 		}
+		_starpu_notify_data_dependencies(handle);
+		_starpu_spin_unlock(&handle->header_lock);
+
 	}
 	else if (!async) {
-		PTHREAD_MUTEX_LOCK(&wrapper.lock);
-		while (!wrapper.finished)
-			PTHREAD_COND_WAIT(&wrapper.cond, &wrapper.lock);
-		PTHREAD_MUTEX_UNLOCK(&wrapper.lock);
+		PTHREAD_MUTEX_LOCK(&wrapper->lock);
+		while (!wrapper->finished)
+			PTHREAD_COND_WAIT(&wrapper->cond, &wrapper->lock);
+		PTHREAD_MUTEX_UNLOCK(&wrapper->lock);
 	}
 
 	return 0;

+ 29 - 29
tests/microbenchs/prefetch_data_on_node.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009, 2010  Université de Bordeaux 1
+ * Copyright (C) 2009-2011  Université de Bordeaux 1
  * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -26,34 +26,16 @@
 
 #define VECTORSIZE	1024
 
-static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-
-static unsigned finished = 0;
-
-static unsigned cnt;
-
 starpu_data_handle v_handle;
 static unsigned *v;
 
 static void callback(void *arg)
 {
-	unsigned res = STARPU_ATOMIC_ADD(&cnt, -1);
-
-	//fprintf(stderr, "res ...%d\n", res);
-	//fflush(stderr);
+	unsigned node = (unsigned)(uintptr_t) arg;
 
-	if (res == 0)
-	{
-		pthread_mutex_lock(&mutex);
-		finished = 1;
-		pthread_cond_signal(&cond);
-		pthread_mutex_unlock(&mutex);
-	}
+	starpu_data_prefetch_on_node(v_handle, node, 1);
 }
 
-
-
 static void codelet_null(void *descr[], __attribute__ ((unused)) void *_args)
 {
 //	fprintf(stderr, "pif\n");
@@ -95,8 +77,6 @@ int main(int argc, char **argv)
 
 	unsigned nworker = starpu_worker_get_count();
 
-	cnt = nworker*N;
-
 	unsigned iter, worker;
 	for (iter = 0; iter < N; iter++)
 	{
@@ -113,10 +93,33 @@ int main(int argc, char **argv)
 			task->buffers[0].handle = v_handle;
 			task->buffers[0].mode = select_random_mode();
 
+			task->synchronous = 1;
+
+			int ret = starpu_task_submit(task);
+			if (ret == -ENODEV)
+				goto enodev;
+		}
+	}
+
+	for (iter = 0; iter < N; iter++)
+	{
+		for (worker = 0; worker < nworker; worker++)
+		{
+			/* asynchronous prefetch */
+			unsigned node = starpu_worker_get_memory_node(worker);
+			starpu_data_prefetch_on_node(v_handle, node, 1);
+
+			/* execute a task */
+			struct starpu_task *task = starpu_task_create();
+			task->cl = &cl;
+
+			task->buffers[0].handle = v_handle;
+			task->buffers[0].mode = select_random_mode();
+
 			task->callback_func = callback;
-			task->callback_arg = NULL;
+			task->callback_arg = (void*)(uintptr_t) starpu_worker_get_memory_node((worker+1)%nworker);
 
-			task->synchronous = 1;
+			task->synchronous = 0;
 
 			int ret = starpu_task_submit(task);
 			if (ret == -ENODEV)
@@ -124,10 +127,7 @@ int main(int argc, char **argv)
 		}
 	}
 
-	pthread_mutex_lock(&mutex);
-	if (!finished)
-		pthread_cond_wait(&cond, &mutex);
-	pthread_mutex_unlock(&mutex);
+	starpu_task_wait_for_all();
 
 	starpu_free(v);
 	starpu_shutdown();

+ 3 - 14
tools/gdbinit

@@ -187,9 +187,9 @@ define starpu-print-data
   printf "Requester tasks\n"
   set $requesterlist = $data->req_list._head
   while $requesterlist != 0x0
-    print "mode: "
+    printf "mode: "
     starpu-print-mode $requesterlist->mode
-    print "\n"
+    printf "\n"
     starpu-print-job $requesterlist->j
     set $requesterlist = $requesterlist->_next
   end
@@ -241,18 +241,7 @@ define starpu-print-requests
     set $request = data_requests[$node]._head
     while $request != 0
       printf " Request %p: handle %p ", $request, $request->handle
-      if ($request->mode & 1)
-        printf "R"
-      end
-      if ($request->mode & 2)
-        printf "W"
-      end
-      if ($request->mode & 4)
-        printf "S"
-      end
-      if ($request->mode & 8)
-        printf "X"
-      end
+      starpu-print-mode $request->mode
       printf "\n"
       set $request = $request->_next
     end