浏览代码

port r11376 from 1.1: Introduce rwlock_tryrdlock and trywrlock variants. Use them in the progression hooks to avoid getting stuck due to idling workers.

Samuel Thibault 11 年之前
父节点
当前提交
e9c70aac17

+ 4 - 0
include/starpu_thread.h

@@ -160,7 +160,9 @@ typedef int starpu_pthread_rwlockattr_t;
 int starpu_pthread_rwlock_init(starpu_pthread_rwlock_t *rwlock, const starpu_pthread_rwlockattr_t *attr);
 int starpu_pthread_rwlock_destroy(starpu_pthread_rwlock_t *rwlock);
 int starpu_pthread_rwlock_rdlock(starpu_pthread_rwlock_t *rwlock);
+int starpu_pthread_rwlock_tryrdlock(starpu_pthread_rwlock_t *rwlock);
 int starpu_pthread_rwlock_wrlock(starpu_pthread_rwlock_t *rwlock);
+int starpu_pthread_rwlock_trywrlock(starpu_pthread_rwlock_t *rwlock);
 int starpu_pthread_rwlock_unlock(starpu_pthread_rwlock_t *rwlock);
 
 #elif !defined(_MSC_VER) /* STARPU_SIMGRID */
@@ -172,7 +174,9 @@ typedef pthread_rwlockattr_t starpu_pthread_rwlockattr_t;
 #define starpu_pthread_rwlock_destroy pthread_rwlock_destroy
 
 int starpu_pthread_rwlock_rdlock(starpu_pthread_rwlock_t *rwlock);
+int starpu_pthread_rwlock_tryrdlock(starpu_pthread_rwlock_t *rwlock);
 int starpu_pthread_rwlock_wrlock(starpu_pthread_rwlock_t *rwlock);
+int starpu_pthread_rwlock_trywrlock(starpu_pthread_rwlock_t *rwlock);
 int starpu_pthread_rwlock_unlock(starpu_pthread_rwlock_t *rwlock);
 
 #endif /* STARPU_SIMGRID, _MSC_VER */

+ 55 - 1
include/starpu_thread_util.h

@@ -18,7 +18,16 @@
 #ifndef __STARPU_THREAD_UTIL_H__
 #define __STARPU_THREAD_UTIL_H__
 
-#include <starpu.h>
+#include <starpu_util.h>
+#include <errno.h>
+
+#ifndef STARPU_INLINE
+#ifdef __GNUC_GNU_INLINE__
+#define STARPU_INLINE extern inline
+#else
+#define STARPU_INLINE inline
+#endif
+#endif
 
 /*
  * Encapsulation of the starpu_pthread_create_* functions.
@@ -78,6 +87,21 @@
 	}                                                                      \
 } while (0)
 
+#define STARPU_PTHREAD_MUTEX_TRYLOCK(mutex) \
+	_STARPU_PTHREAD_MUTEX_TRYLOCK(mutex, __FILE__, __LINE__)
+STARPU_INLINE
+int _STARPU_PTHREAD_MUTEX_TRYLOCK(starpu_pthread_mutex_t *mutex, char *file, int line)
+{
+	int p_ret = starpu_pthread_mutex_trylock(mutex);
+	if (STARPU_UNLIKELY(p_ret != 0 && p_ret != EBUSY)) {
+		fprintf(stderr,
+			"%s:%d starpu_pthread_mutex_trylock: %s\n",
+			file, line, strerror(p_ret));
+		STARPU_ABORT();
+	}
+	return p_ret;
+}
+
 #define STARPU_PTHREAD_MUTEX_UNLOCK(mutex) do {                               \
 	int p_ret = starpu_pthread_mutex_unlock(mutex);                        \
 	if (STARPU_UNLIKELY(p_ret)) {                                          \
@@ -143,6 +167,21 @@
 	}                                                                      \
 } while (0)
 
+#define STARPU_PTHREAD_RWLOCK_TRYRDLOCK(rwlock) \
+	_starpu_pthread_rwlock_tryrdlock(rwlock, __FILE__, __LINE__)
+STARPU_INLINE
+int _starpu_pthread_rwlock_tryrdlock(starpu_pthread_rwlock_t *rwlock, char *file, int line)
+{
+	int p_ret = starpu_pthread_rwlock_tryrdlock(rwlock);
+	if (STARPU_UNLIKELY(p_ret != 0 && p_ret != EBUSY)) {
+		fprintf(stderr,
+			"%s:%d starpu_pthread_rwlock_tryrdlock: %s\n",
+			file, line, strerror(p_ret));
+		STARPU_ABORT();
+	}
+	return p_ret;
+}
+
 #define STARPU_PTHREAD_RWLOCK_WRLOCK(rwlock) do {                              \
 	int p_ret = starpu_pthread_rwlock_wrlock(rwlock);                      \
 	if (STARPU_UNLIKELY(p_ret)) {                                          \
@@ -153,6 +192,21 @@
 	}                                                                      \
 } while (0)
 
+#define STARPU_PTHREAD_RWLOCK_TRYWRLOCK(rwlock) \
+	_starpu_pthread_rwlock_trywrlock(rwlock, __FILE__, __LINE__)
+STARPU_INLINE
+int _starpu_pthread_rwlock_trywrlock(starpu_pthread_rwlock_t *rwlock, char *file, int line)
+{
+	int p_ret = starpu_pthread_rwlock_trywrlock(rwlock);
+	if (STARPU_UNLIKELY(p_ret != 0 && p_ret != EBUSY)) {
+		fprintf(stderr,
+			"%s:%d starpu_pthread_rwlock_trywrlock: %s\n",
+			file, line, strerror(p_ret));
+		STARPU_ABORT();
+	}
+	return p_ret;
+}
+
 #define STARPU_PTHREAD_RWLOCK_UNLOCK(rwlock) do {                              \
 	int p_ret = starpu_pthread_rwlock_unlock(rwlock);                      \
 	if (STARPU_UNLIKELY(p_ret)) {                                          \

+ 45 - 0
src/common/thread.c

@@ -217,6 +217,16 @@ int starpu_pthread_rwlock_rdlock(starpu_pthread_rwlock_t *rwlock)
 	return p_ret;
 }
 
+int starpu_pthread_rwlock_tryrdlock(starpu_pthread_rwlock_t *rwlock)
+{
+	int p_ret = starpu_pthread_mutex_trylock(rwlock);
+
+	if (!p_ret)
+		_STARPU_TRACE_RWLOCK_RDLOCKED();
+
+	return p_ret;
+}
+
 int starpu_pthread_rwlock_wrlock(starpu_pthread_rwlock_t *rwlock)
 {
 	_STARPU_TRACE_WRLOCKING_RWLOCK();
@@ -228,6 +238,17 @@ int starpu_pthread_rwlock_wrlock(starpu_pthread_rwlock_t *rwlock)
 	return p_ret;
 }
 
+int starpu_pthread_rwlock_trywrlock(starpu_pthread_rwlock_t *rwlock)
+{
+	int p_ret =  starpu_pthread_mutex_trylock(rwlock);
+
+	if (!p_ret)
+		_STARPU_TRACE_RWLOCK_RDLOCKED();
+
+	return p_ret;
+}
+
+
 int starpu_pthread_rwlock_unlock(starpu_pthread_rwlock_t *rwlock)
 {
 	_STARPU_TRACE_UNLOCKING_RWLOCK();
@@ -298,6 +319,18 @@ int starpu_pthread_rwlock_rdlock(starpu_pthread_rwlock_t *rwlock)
 	return p_ret;
 }
 
+int starpu_pthread_rwlock_tryrdlock(starpu_pthread_rwlock_t *rwlock)
+{
+	_STARPU_TRACE_RDLOCKING_RWLOCK();
+
+ 	int p_ret = pthread_rwlock_tryrdlock(rwlock);
+
+	if (!p_ret)
+		_STARPU_TRACE_RWLOCK_RDLOCKED();
+
+	return p_ret;
+}
+
 int starpu_pthread_rwlock_wrlock(starpu_pthread_rwlock_t *rwlock)
 {
 	_STARPU_TRACE_WRLOCKING_RWLOCK();
@@ -309,6 +342,18 @@ int starpu_pthread_rwlock_wrlock(starpu_pthread_rwlock_t *rwlock)
 	return p_ret;
 }
 
+int starpu_pthread_rwlock_trywrlock(starpu_pthread_rwlock_t *rwlock)
+{
+	_STARPU_TRACE_WRLOCKING_RWLOCK();
+
+ 	int p_ret = pthread_rwlock_trywrlock(rwlock);
+
+	if (!p_ret)
+		_STARPU_TRACE_RWLOCK_WRLOCKED();
+
+	return p_ret;
+}
+
 int starpu_pthread_rwlock_unlock(starpu_pthread_rwlock_t *rwlock)
 {
 	_STARPU_TRACE_UNLOCKING_RWLOCK();

+ 4 - 2
src/core/progress_hook.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010  Université de Bordeaux 1
+ * Copyright (C) 2010, 2013  Université de Bordeaux 1
  * Copyright (C) 2010-2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -86,7 +86,9 @@ void starpu_progression_hook_deregister(int hook_id)
 unsigned _starpu_execute_registered_progression_hooks(void)
 {
 	/* If there is no hook registered, we short-cut loop. */
-	STARPU_PTHREAD_RWLOCK_RDLOCK(&progression_hook_rwlock);
+	if (STARPU_PTHREAD_RWLOCK_TRYRDLOCK(&progression_hook_rwlock))
+		/* It is busy, do not bother */
+		return;
 	int no_hook = (active_hook_cnt == 0);
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&progression_hook_rwlock);
 

+ 36 - 18
src/datawizard/data_request.c

@@ -356,17 +356,12 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
 {
 	starpu_data_handle_t handle = r->handle;
 
-	if (prefetch) {
-		if (_starpu_spin_trylock(&handle->header_lock))
-			return -EBUSY;
-		if (_starpu_spin_trylock(&r->lock))
-		{
-			_starpu_spin_unlock(&handle->header_lock);
-			return -EBUSY;
-		}
-	} else {
-		_starpu_spin_lock(&handle->header_lock);
-		_starpu_spin_lock(&r->lock);
+	if (_starpu_spin_trylock(&handle->header_lock))
+		return -EBUSY;
+	if (_starpu_spin_trylock(&r->lock))
+	{
+		_starpu_spin_unlock(&handle->header_lock);
+		return -EBUSY;
 	}
 
 	struct _starpu_data_replicate *src_replicate = r->src_replicate;
@@ -420,19 +415,22 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
 	return 0;
 }
 
-void _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc)
+int _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc)
 {
 	struct _starpu_data_request *r;
 	struct _starpu_data_request_list *new_data_requests;
+	int ret = 0;
 
 	/* Here helgrind would should that this is an un protected access.
 	 * We however don't care about missing an entry, we will get called
 	 * again sooner or later. */
 	if (_starpu_data_request_list_empty(data_requests[src_node]))
-		return;
+		return 0;
 
 	/* take all the entries from the request list */
-        STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
+	if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_list_mutex[src_node]))
+		/* List is busy, do not bother with it */
+		return -EBUSY;
 
 	struct _starpu_data_request_list *local_list = data_requests[src_node];
 
@@ -441,7 +439,7 @@ void _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc)
 		/* there is no request */
                 STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
 
-		return;
+		return 0;
 	}
 
 	/* There is an entry: we create a new empty list to replace the list of
@@ -463,6 +461,7 @@ void _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc)
 		{
 			/* Too many requests at the same time, skip pushing
 			 * more for now */
+			ret = -EBUSY;
 			break;
 		}
 
@@ -471,6 +470,8 @@ void _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc)
 		res = starpu_handle_data_request(r, may_alloc, 0);
 		if (res != 0 && res != -EAGAIN)
 		{
+			/* handle is busy, or not enough memory, postpone for now */
+			ret = res;
 			_starpu_data_request_list_push_back(new_data_requests, r);
 			break;
 		}
@@ -491,6 +492,8 @@ void _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc)
 
 	_starpu_data_request_list_delete(new_data_requests);
 	_starpu_data_request_list_delete(local_list);
+
+	return ret;
 }
 
 void _starpu_handle_node_prefetch_requests(unsigned src_node, unsigned may_alloc)
@@ -503,7 +506,9 @@ void _starpu_handle_node_prefetch_requests(unsigned src_node, unsigned may_alloc
 		return;
 
 	/* take all the entries from the request list */
-        STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
+	if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_list_mutex[src_node]))
+		/* List is busy, do not bother with it */
+		return;
 
 	struct _starpu_data_request_list *local_list = prefetch_requests[src_node];
 
@@ -587,7 +592,9 @@ static void _handle_pending_node_data_requests(unsigned src_node, unsigned force
 	if (_starpu_data_request_list_empty(data_requests_pending[src_node]))
 		return;
 
-	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[src_node]);
+	if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_pending_list_mutex[src_node]) && !force)
+		/* List is busy, do not bother with it */
+		return;
 
 	/* for all entries of the list */
 	struct _starpu_data_request_list *local_list = data_requests_pending[src_node];
@@ -613,8 +620,19 @@ static void _handle_pending_node_data_requests(unsigned src_node, unsigned force
 
 		starpu_data_handle_t handle = r->handle;
 
-		_starpu_spin_lock(&handle->header_lock);
+		if (force)
+			/* Have to wait for the handle, whatever it takes */
+			_starpu_spin_lock(&handle->header_lock);
+		else
+			if (_starpu_spin_trylock(&handle->header_lock))
+			{
+				/* Handle is busy, retry this later */
+				_starpu_data_request_list_push_back(new_data_requests_pending, r);
+				kept++;
+				continue;
+			}
 
+		/* This shouldn't be too hard to acquire */
 		_starpu_spin_lock(&r->lock);
 
 		/* wait until the transfer is terminated */

+ 2 - 1
src/datawizard/data_request.h

@@ -109,7 +109,8 @@ LIST_TYPE(_starpu_data_requester,
 void _starpu_init_data_request_lists(void);
 void _starpu_deinit_data_request_lists(void);
 void _starpu_post_data_request(struct _starpu_data_request *r, unsigned handling_node);
-void _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc);
+/* returns 0 if we have pushed all requests, -EBUSY or -ENOMEM otherwise */
+int _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc);
 void _starpu_handle_node_prefetch_requests(unsigned src_node, unsigned may_alloc);
 
 void _starpu_handle_pending_node_data_requests(unsigned src_node);

+ 4 - 2
src/datawizard/datawizard.c

@@ -36,8 +36,10 @@ void _starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc)
 
 	/* in case some other driver requested data */
 	_starpu_handle_pending_node_data_requests(memory_node);
-	_starpu_handle_node_data_requests(memory_node, may_alloc);
-	_starpu_handle_node_prefetch_requests(memory_node, may_alloc);
+	if (_starpu_handle_node_data_requests(memory_node, may_alloc) == 0)
+		/* We pushed all pending requests, we can afford pushing
+		 * prefetch requests */
+		_starpu_handle_node_prefetch_requests(memory_node, may_alloc);
 	_starpu_execute_registered_progression_hooks();
 }