Przeglądaj źródła

Make it possible to register progression method which are called when the
workers are idle with the starpu_register_progression_hook function. Such
method must return an unsigned which indicates if the workers may sleep or not.

Cédric Augonnet 15 lat temu
rodzic
commit
585e31ce7f

+ 3 - 0
include/starpu-data.h

@@ -70,6 +70,9 @@ void starpu_data_set_wb_mask(starpu_data_handle state, uint32_t wb_mask);
 
 
 unsigned starpu_test_if_data_is_allocated_on_node(starpu_data_handle handle, uint32_t memory_node);
 unsigned starpu_test_if_data_is_allocated_on_node(starpu_data_handle handle, uint32_t memory_node);
 
 
+int starpu_register_progression_hook(unsigned (*func)(void *arg), void *arg);
+void starpu_deregister_progression_hook(int hook_id);
+
 #ifdef __cplusplus
 #ifdef __cplusplus
 }
 }
 #endif
 #endif

+ 1 - 0
src/Makefile.am

@@ -88,6 +88,7 @@ libstarpu_la_SOURCES = 						\
 	core/topology.c						\
 	core/topology.c						\
 	core/debug.c						\
 	core/debug.c						\
 	core/errorcheck.c					\
 	core/errorcheck.c					\
+	core/progress_hook.c					\
 	core/dependencies/tags.c				\
 	core/dependencies/tags.c				\
 	core/dependencies/htable.c				\
 	core/dependencies/htable.c				\
 	core/dependencies/data-concurrency.c			\
 	core/dependencies/data-concurrency.c			\

+ 93 - 0
src/core/progress_hook.c

@@ -0,0 +1,93 @@
+/*
+ * StarPU
+ * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <pthread.h>
+#include <core/workers.h>
+
+#define NMAXHOOKS	16
+
+struct progression_hook {
+	unsigned (*func)(void *arg);
+	void *arg;
+	unsigned active;
+};
+
+/* protect the hook table */
+static pthread_mutex_t progression_hook_mutex = PTHREAD_MUTEX_INITIALIZER;
+static struct progression_hook hooks[NMAXHOOKS] = {{NULL, NULL, 0}};
+
+int starpu_register_progression_hook(unsigned (*func)(void *arg), void *arg)
+{
+	int hook;
+	pthread_mutex_lock(&progression_hook_mutex);
+	for (hook = 0; hook < NMAXHOOKS; hook++)
+	{
+		if (!hooks[hook].active)
+		{
+			/* We found an empty slot */
+			hooks[hook].func = func;
+			hooks[hook].arg = arg;
+			hooks[hook].active = 1;
+
+			pthread_mutex_unlock(&progression_hook_mutex);
+			
+			return hook;
+		}
+	}
+
+	pthread_mutex_unlock(&progression_hook_mutex);
+
+	wake_all_blocked_workers();
+
+	/* We could not find an empty slot */
+	return -1;
+}
+
+void starpu_deregister_progression_hook(int hook_id)
+{
+	pthread_mutex_lock(&progression_hook_mutex);
+	hooks[hook_id].active = 0;
+	pthread_mutex_unlock(&progression_hook_mutex);
+}
+
+unsigned execute_registered_progression_hooks(void)
+{
+	/* By default, it is possible to block, but if some progression hooks
+	 * requires that it's not blocking, we disable blocking. */
+	unsigned may_block = 1;
+
+	unsigned hook;
+	for (hook = 0; hook < NMAXHOOKS; hook++)
+	{
+		unsigned active;
+
+		pthread_mutex_lock(&progression_hook_mutex);
+		active = hooks[hook].active;
+		pthread_mutex_unlock(&progression_hook_mutex);
+
+		unsigned may_block_hook;
+
+		if (active)
+			may_block_hook = hooks[hook].func(hooks[hook].arg);
+
+		/* As soon as one hook tells that the driver cannot be
+		 * blocking, we don't allow it. */
+		if (!may_block_hook)
+			may_block = 0;
+	}
+
+	return may_block;
+}

+ 16 - 0
src/core/workers.c

@@ -299,6 +299,22 @@ unsigned machine_is_running(void)
 	return config.running;
 	return config.running;
 }
 }
 
 
+unsigned worker_can_block(unsigned memnode)
+{
+	unsigned can_block = 1;
+
+	if (!check_that_no_data_request_exists(memnode))
+		can_block = 0;
+
+	if (!machine_is_running())
+		can_block = 0;
+
+	if (!execute_registered_progression_hooks())
+		can_block = 0;
+
+	return can_block;
+}
+
 typedef enum {
 typedef enum {
 	BROADCAST,
 	BROADCAST,
 	LOCK,
 	LOCK,

+ 4 - 0
src/core/workers.h

@@ -139,6 +139,7 @@ inline uint32_t worker_exists(uint32_t task_mask);
 inline uint32_t may_submit_cuda_task(void);
 inline uint32_t may_submit_cuda_task(void);
 inline uint32_t may_submit_core_task(void);
 inline uint32_t may_submit_core_task(void);
 inline uint32_t worker_may_execute_task(unsigned workerid, uint32_t where);
 inline uint32_t worker_may_execute_task(unsigned workerid, uint32_t where);
+unsigned worker_can_block(unsigned memnode);
 
 
 inline void lock_all_queues_attached_to_node(unsigned node);
 inline void lock_all_queues_attached_to_node(unsigned node);
 inline void unlock_all_queues_attached_to_node(unsigned node);
 inline void unlock_all_queues_attached_to_node(unsigned node);
@@ -151,4 +152,7 @@ struct worker_s *get_worker_struct(unsigned id);
 
 
 struct machine_config_s *get_machine_config(void);
 struct machine_config_s *get_machine_config(void);
 
 
+/* TODO move */
+unsigned execute_registered_progression_hooks(void);
+
 #endif // __WORKERS_H__
 #endif // __WORKERS_H__

+ 2 - 0
src/datawizard/progress.c

@@ -24,4 +24,6 @@ void datawizard_progress(uint32_t memory_node, unsigned may_alloc)
 	/* in case some other driver requested data */
 	/* in case some other driver requested data */
 	handle_pending_node_data_requests(memory_node);
 	handle_pending_node_data_requests(memory_node);
 	handle_node_data_requests(memory_node, may_alloc);
 	handle_node_data_requests(memory_node, may_alloc);
+
+	execute_registered_progression_hooks();
 }
 }

+ 3 - 1
src/drivers/core/driver_core.c

@@ -149,6 +149,8 @@ void *core_worker(void *arg)
 		datawizard_progress(memnode, 1);
 		datawizard_progress(memnode, 1);
 		TRACE_END_PROGRESS(memnode);
 		TRACE_END_PROGRESS(memnode);
 
 
+		execute_registered_progression_hooks();
+
 		jobq_lock(queue);
 		jobq_lock(queue);
 
 
 		/* perhaps there is some local task to be executed first */
 		/* perhaps there is some local task to be executed first */
@@ -159,7 +161,7 @@ void *core_worker(void *arg)
 			j = pop_task();
 			j = pop_task();
 
 
                 if (j == NULL) {
                 if (j == NULL) {
-			if (check_that_no_data_request_exists(memnode) && machine_is_running())
+			if (worker_can_block(memnode))
 				pthread_cond_wait(&queue->activity_cond, &queue->activity_mutex);
 				pthread_cond_wait(&queue->activity_cond, &queue->activity_mutex);
 			jobq_unlock(queue);
 			jobq_unlock(queue);
  			continue;
  			continue;

+ 3 - 1
src/drivers/cuda/driver_cuda.c

@@ -215,6 +215,8 @@ void *cuda_worker(void *arg)
 		TRACE_START_PROGRESS(memnode);
 		TRACE_START_PROGRESS(memnode);
 		datawizard_progress(memnode, 1);
 		datawizard_progress(memnode, 1);
 		TRACE_END_PROGRESS(memnode);
 		TRACE_END_PROGRESS(memnode);
+
+		execute_registered_progression_hooks();
 	
 	
 		jobq_lock(queue);
 		jobq_lock(queue);
 
 
@@ -226,7 +228,7 @@ void *cuda_worker(void *arg)
 			j = pop_task();
 			j = pop_task();
 
 
 		if (j == NULL) {
 		if (j == NULL) {
-			if (check_that_no_data_request_exists(memnode) && machine_is_running())
+			if (worker_can_block(memnode))
 				pthread_cond_wait(&queue->activity_cond, &queue->activity_mutex);
 				pthread_cond_wait(&queue->activity_cond, &queue->activity_mutex);
 			jobq_unlock(queue);
 			jobq_unlock(queue);
 			continue;
 			continue;