|
@@ -17,9 +17,11 @@
|
|
|
#include <starpu.h>
|
|
|
#include <pthread.h>
|
|
|
|
|
|
-#define NTHREADS 4
|
|
|
+#define NTHREADS 16
|
|
|
#define NITER 128
|
|
|
|
|
|
+//#define DEBUG_MESSAGES 1
|
|
|
+
|
|
|
static pthread_cond_t cond;
|
|
|
static pthread_mutex_t mutex;
|
|
|
|
|
@@ -29,13 +31,23 @@ struct thread_data {
|
|
|
starpu_data_handle handle;
|
|
|
pthread_t thread;
|
|
|
|
|
|
- pthread_cond_t recv_cond;
|
|
|
pthread_mutex_t recv_mutex;
|
|
|
unsigned recv_flag; // set when a message is received
|
|
|
unsigned recv_buf;
|
|
|
struct thread_data *neighbour;
|
|
|
};
|
|
|
|
|
|
+struct data_req {
|
|
|
+ int (*test_func)(void *);
|
|
|
+ void *test_arg;
|
|
|
+ struct data_req *next;
|
|
|
+};
|
|
|
+
|
|
|
+static pthread_mutex_t data_req_mutex;
|
|
|
+static pthread_cond_t data_req_cond;
|
|
|
+struct data_req *data_req_list;
|
|
|
+unsigned progress_thread_running;
|
|
|
+
|
|
|
static struct thread_data problem_data[NTHREADS];
|
|
|
|
|
|
/* We implement some ring transfer, every thread will try to receive a piece of
|
|
@@ -50,6 +62,8 @@ static void increment_handle_cpu_kernel(void *descr[], void *cl_arg __attribute_
|
|
|
{
|
|
|
unsigned *val = (unsigned *)STARPU_GET_VARIABLE_PTR(descr[0]);
|
|
|
*val += 1;
|
|
|
+
|
|
|
+// fprintf(stderr, "VAL %d (&val = %p)\n", *val, val);
|
|
|
}
|
|
|
|
|
|
static starpu_codelet increment_handle_cl = {
|
|
@@ -76,26 +90,72 @@ static void increment_handle_async(struct thread_data *thread_data)
|
|
|
STARPU_ASSERT(!ret);
|
|
|
}
|
|
|
|
|
|
+static int test_recv_handle_async(void *arg)
|
|
|
+{
|
|
|
+// fprintf(stderr, "test_recv_handle_async\n");
|
|
|
+
|
|
|
+ int ret;
|
|
|
+ struct thread_data *thread_data = arg;
|
|
|
+
|
|
|
+ pthread_mutex_lock(&thread_data->recv_mutex);
|
|
|
+
|
|
|
+ ret = (thread_data->recv_flag == 1);
|
|
|
+
|
|
|
+ if (ret)
|
|
|
+ {
|
|
|
+ thread_data->recv_flag = 0;
|
|
|
+ thread_data->val = thread_data->recv_buf;
|
|
|
+ }
|
|
|
+
|
|
|
+ pthread_mutex_unlock(&thread_data->recv_mutex);
|
|
|
+
|
|
|
+ if (ret)
|
|
|
+ {
|
|
|
+#ifdef DEBUG_MESSAGES
|
|
|
+ fprintf(stderr, "Thread %d received value %d from thread %d\n",
|
|
|
+ thread_data->index, thread_data->val, (thread_data->index - 1)%NTHREADS);
|
|
|
+#endif
|
|
|
+ starpu_data_release_from_mem(thread_data->handle);
|
|
|
+ }
|
|
|
+
|
|
|
+ return ret;
|
|
|
+}
|
|
|
+
|
|
|
static void recv_handle_async(void *_thread_data)
|
|
|
{
|
|
|
struct thread_data *thread_data = _thread_data;
|
|
|
- pthread_mutex_lock(&thread_data->recv_mutex);
|
|
|
|
|
|
- /* We wait for the previous thread to notify that the data is available */
|
|
|
- while (!thread_data->recv_flag)
|
|
|
- pthread_cond_wait(&thread_data->recv_cond, &thread_data->recv_mutex);
|
|
|
+ struct data_req *req = malloc(sizeof(struct data_req));
|
|
|
+ req->test_func = test_recv_handle_async;
|
|
|
+ req->test_arg = thread_data;
|
|
|
+ req->next = NULL;
|
|
|
|
|
|
- /* We overwrite thread's data with the received value */
|
|
|
- thread_data->val = thread_data->recv_buf;
|
|
|
+ pthread_mutex_lock(&data_req_mutex);
|
|
|
+ req->next = data_req_list;
|
|
|
+ data_req_list = req;
|
|
|
+ pthread_cond_signal(&data_req_cond);
|
|
|
+ pthread_mutex_unlock(&data_req_mutex);
|
|
|
+}
|
|
|
|
|
|
- /* Notify that we read the value */
|
|
|
- thread_data->recv_flag = 0;
|
|
|
- pthread_cond_signal(&thread_data->recv_cond);
|
|
|
+static int test_send_handle_async(void *arg)
|
|
|
+{
|
|
|
+ int ret;
|
|
|
+ struct thread_data *thread_data = arg;
|
|
|
+ struct thread_data *neighbour_data = thread_data->neighbour;
|
|
|
+
|
|
|
+ pthread_mutex_lock(&neighbour_data->recv_mutex);
|
|
|
+ ret = (neighbour_data->recv_flag == 0);
|
|
|
+ pthread_mutex_unlock(&neighbour_data->recv_mutex);
|
|
|
|
|
|
-// fprintf(stderr, "Thread %d received value %d from thread %d\n", thread_data->index, thread_data->val, (thread_data->index - 1)%NTHREADS);
|
|
|
+ if (ret)
|
|
|
+ {
|
|
|
+#ifdef DEBUG_MESSAGES
|
|
|
+ fprintf(stderr, "Thread %d sends value %d to thread %d\n", thread_data->index, thread_data->val, neighbour_data->index);
|
|
|
+#endif
|
|
|
+ starpu_data_release_from_mem(thread_data->handle);
|
|
|
+ }
|
|
|
|
|
|
- pthread_mutex_unlock(&thread_data->recv_mutex);
|
|
|
- starpu_data_release_from_mem(thread_data->handle);
|
|
|
+ return ret;
|
|
|
}
|
|
|
|
|
|
static void send_handle_async(void *_thread_data)
|
|
@@ -103,20 +163,83 @@ static void send_handle_async(void *_thread_data)
|
|
|
struct thread_data *thread_data = _thread_data;
|
|
|
struct thread_data *neighbour_data = thread_data->neighbour;
|
|
|
|
|
|
-// fprintf(stderr, "Thread %d sends value %d to thread %d\n", thread_data->index, thread_data->val, neighbour_data->index);
|
|
|
+// fprintf(stderr, "send_handle_async\n");
|
|
|
+
|
|
|
/* send the message */
|
|
|
pthread_mutex_lock(&neighbour_data->recv_mutex);
|
|
|
neighbour_data->recv_buf = thread_data->val;
|
|
|
neighbour_data->recv_flag = 1;
|
|
|
- pthread_cond_signal(&neighbour_data->recv_cond);
|
|
|
-
|
|
|
- /* wait until it's received (ie. neighbour's recv_flag is set back to 0) */
|
|
|
- while (neighbour_data->recv_flag)
|
|
|
- pthread_cond_wait(&neighbour_data->recv_cond, &neighbour_data->recv_mutex);
|
|
|
-
|
|
|
pthread_mutex_unlock(&neighbour_data->recv_mutex);
|
|
|
|
|
|
- starpu_data_release_from_mem(thread_data->handle);
|
|
|
+ struct data_req *req = malloc(sizeof(struct data_req));
|
|
|
+ req->test_func = test_send_handle_async;
|
|
|
+ req->test_arg = thread_data;
|
|
|
+ req->next = NULL;
|
|
|
+
|
|
|
+ pthread_mutex_lock(&data_req_mutex);
|
|
|
+ req->next = data_req_list;
|
|
|
+ data_req_list = req;
|
|
|
+ pthread_cond_signal(&data_req_cond);
|
|
|
+ pthread_mutex_unlock(&data_req_mutex);
|
|
|
+}
|
|
|
+
|
|
|
+static void *progress_func(void *arg)
|
|
|
+{
|
|
|
+ pthread_mutex_lock(&data_req_mutex);
|
|
|
+
|
|
|
+ progress_thread_running = 1;
|
|
|
+ pthread_cond_signal(&data_req_cond);
|
|
|
+
|
|
|
+ while (progress_thread_running) {
|
|
|
+ struct data_req *req;
|
|
|
+
|
|
|
+ if (data_req_list == NULL)
|
|
|
+ pthread_cond_wait(&data_req_cond, &data_req_mutex);
|
|
|
+
|
|
|
+ req = data_req_list;
|
|
|
+
|
|
|
+ if (req)
|
|
|
+ {
|
|
|
+ data_req_list = req->next;
|
|
|
+ req->next = NULL;
|
|
|
+
|
|
|
+ pthread_mutex_unlock(&data_req_mutex);
|
|
|
+
|
|
|
+ int ret = req->test_func(req->test_arg);
|
|
|
+
|
|
|
+ if (ret)
|
|
|
+ {
|
|
|
+ free(req);
|
|
|
+ pthread_mutex_lock(&data_req_mutex);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ /* ret = 0 : the request is not finished, we put it back at the end of the list */
|
|
|
+ pthread_mutex_lock(&data_req_mutex);
|
|
|
+
|
|
|
+ struct data_req *req_aux = data_req_list;
|
|
|
+ if (!req_aux)
|
|
|
+ {
|
|
|
+ /* The list is empty */
|
|
|
+ data_req_list = req;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ while (req_aux)
|
|
|
+ {
|
|
|
+ if (req_aux->next == NULL)
|
|
|
+ {
|
|
|
+ req_aux->next = req;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ req_aux = req_aux->next;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ pthread_mutex_unlock(&data_req_mutex);
|
|
|
+
|
|
|
+ return NULL;
|
|
|
}
|
|
|
|
|
|
static void *thread_func(void *arg)
|
|
@@ -125,8 +248,6 @@ static void *thread_func(void *arg)
|
|
|
struct thread_data *thread_data = arg;
|
|
|
unsigned index = thread_data->index;
|
|
|
|
|
|
-// fprintf(stderr, "Hello from thread %d\n", thread_data->index);
|
|
|
-
|
|
|
starpu_variable_data_register(&thread_data->handle, 0, (uintptr_t)&thread_data->val, sizeof(unsigned));
|
|
|
|
|
|
for (iter = 0; iter < NITER; iter++)
|
|
@@ -159,20 +280,34 @@ static void *thread_func(void *arg)
|
|
|
int main(int argc, char **argv)
|
|
|
{
|
|
|
int ret;
|
|
|
+ void *retval;
|
|
|
|
|
|
starpu_init(NULL);
|
|
|
|
|
|
+ /* Create a thread to perform blocking calls */
|
|
|
+ pthread_t progress_thread;
|
|
|
+ pthread_mutex_init(&data_req_mutex, NULL);
|
|
|
+ pthread_cond_init(&data_req_cond, NULL);
|
|
|
+ data_req_list = NULL;
|
|
|
+ progress_thread_running = 0;
|
|
|
+
|
|
|
unsigned t;
|
|
|
for (t = 0; t < NTHREADS; t++)
|
|
|
{
|
|
|
problem_data[t].index = t;
|
|
|
problem_data[t].val = 0;
|
|
|
- pthread_cond_init(&problem_data[t].recv_cond, NULL);
|
|
|
pthread_mutex_init(&problem_data[t].recv_mutex, NULL);
|
|
|
problem_data[t].recv_flag = 0;
|
|
|
problem_data[t].neighbour = &problem_data[(t+1)%NTHREADS];
|
|
|
}
|
|
|
|
|
|
+ pthread_create(&progress_thread, NULL, progress_func, NULL);
|
|
|
+
|
|
|
+ pthread_mutex_lock(&data_req_mutex);
|
|
|
+ while (!progress_thread_running)
|
|
|
+ pthread_cond_wait(&data_req_cond, &data_req_mutex);
|
|
|
+ pthread_mutex_unlock(&data_req_mutex);
|
|
|
+
|
|
|
for (t = 0; t < NTHREADS; t++)
|
|
|
{
|
|
|
ret = pthread_create(&problem_data[t].thread, NULL, thread_func, &problem_data[t]);
|
|
@@ -181,11 +316,18 @@ int main(int argc, char **argv)
|
|
|
|
|
|
for (t = 0; t < NTHREADS; t++)
|
|
|
{
|
|
|
- void *retval;
|
|
|
ret = pthread_join(problem_data[t].thread, &retval);
|
|
|
STARPU_ASSERT(retval == NULL);
|
|
|
}
|
|
|
|
|
|
+ pthread_mutex_lock(&data_req_mutex);
|
|
|
+ progress_thread_running = 0;
|
|
|
+ pthread_cond_signal(&data_req_cond);
|
|
|
+ pthread_mutex_unlock(&data_req_mutex);
|
|
|
+
|
|
|
+ ret = pthread_join(progress_thread, &retval);
|
|
|
+ STARPU_ASSERT(retval == NULL);
|
|
|
+
|
|
|
/* We check that the value in the "last" thread is valid */
|
|
|
starpu_data_handle last_handle = problem_data[NTHREADS - 1].handle;
|
|
|
starpu_data_sync_with_mem(last_handle, STARPU_R);
|