浏览代码

mic: first version with thread (not finished)

Thibaud Lambert 12 年之前
父节点
当前提交
1782571d3a

+ 2 - 0
src/Makefile.am

@@ -108,6 +108,7 @@ noinst_HEADERS = 						\
 	drivers/mp_common/mp_common.h				\
 	drivers/mp_common/mp_common.h				\
 	drivers/mp_common/source_common.h			\
 	drivers/mp_common/source_common.h			\
 	drivers/mp_common/sink_common.h				\
 	drivers/mp_common/sink_common.h				\
+	drivers/mp_common/task_fifo.h				\
 	drivers/cpu/driver_cpu.h				\
 	drivers/cpu/driver_cpu.h				\
 	drivers/cuda/driver_cuda.h				\
 	drivers/cuda/driver_cuda.h				\
 	drivers/opencl/driver_opencl.h				\
 	drivers/opencl/driver_opencl.h				\
@@ -176,6 +177,7 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 						\
 	sched_policies/parallel_heft.c				\
 	sched_policies/parallel_heft.c				\
 	sched_policies/parallel_eager.c				\
 	sched_policies/parallel_eager.c				\
 	drivers/driver_common/driver_common.c			\
 	drivers/driver_common/driver_common.c			\
+	drivers/mp_common/task_fifo.c				\
 	datawizard/memory_nodes.c				\
 	datawizard/memory_nodes.c				\
 	datawizard/write_back.c					\
 	datawizard/write_back.c					\
 	datawizard/coherency.c					\
 	datawizard/coherency.c					\

+ 0 - 4
src/core/topology.c

@@ -629,10 +629,6 @@ _starpu_init_mic_config (struct _starpu_machine_config *config,
 
 
 	/* _starpu_initialize_workers_mic_deviceid (config); */
 	/* _starpu_initialize_workers_mic_deviceid (config); */
 
 
-	mic_nodes[mic_idx].min_nworkers = topology->nworkers;
-	_starpu_mp_common_send_command(mic_nodes[mic_idx], STARPU_MIN_NWORKERS, &(mic_nodes[mic_idx].min_nworkers), sizeof(int));
-	_STARPU_DEBUG("Host: the min_nworkers of the Mic is: %d.\n", mic_nodes[mic_idx].min_nworkers );	
-
 	unsigned miccore_id;
 	unsigned miccore_id;
 	for (miccore_id = 0; miccore_id < topology->nmiccores[mic_idx]; miccore_id++)
 	for (miccore_id = 0; miccore_id < topology->nmiccores[mic_idx]; miccore_id++)
 	{
 	{

+ 2 - 2
src/drivers/mic/driver_mic_common.c

@@ -33,7 +33,7 @@ void _starpu_mic_common_report_scif_error(const char *func, const char *file, co
 
 
 void _starpu_mic_common_send(const struct _starpu_mp_node *node, void *msg, int len)
 void _starpu_mic_common_send(const struct _starpu_mp_node *node, void *msg, int len)
 {
 {
-	if ((scif_send(node->mp_connection.mic_endpoint, msg, len, SCIF_SEND_BLOCK)) < 0)
+  if ((scif_send(node->mp_connection.mic_endpoint, msg, len, SCIF_SEND_BLOCK)) < 0)
 		STARPU_MP_COMMON_REPORT_ERROR(node, errno);
 		STARPU_MP_COMMON_REPORT_ERROR(node, errno);
 }
 }
 
 
@@ -49,7 +49,7 @@ void _starpu_mic_common_recv(const struct _starpu_mp_node *node, void *msg, int
 
 
 /* Handles the error so the caller (which must be generic) doesn't have to
 /* Handles the error so the caller (which must be generic) doesn't have to
  * care about it.
  * care about it.
- */
+x */
 void _starpu_mic_common_dt_send(const struct _starpu_mp_node *mp_node, void *msg, int len)
 void _starpu_mic_common_dt_send(const struct _starpu_mp_node *mp_node, void *msg, int len)
 {
 {
 	if ((scif_send(mp_node->host_sink_dt_connection.mic_endpoint, msg, len, SCIF_SEND_BLOCK)) < 0)
 	if ((scif_send(mp_node->host_sink_dt_connection.mic_endpoint, msg, len, SCIF_SEND_BLOCK)) < 0)

+ 4 - 0
src/drivers/mic/driver_mic_sink.c

@@ -22,10 +22,14 @@
 #include <starpu.h>
 #include <starpu.h>
 #include <drivers/mp_common/mp_common.h>
 #include <drivers/mp_common/mp_common.h>
 #include <drivers/mp_common/sink_common.h>
 #include <drivers/mp_common/sink_common.h>
+#include <datawizard/interfaces/data_interface.h>
 
 
 #include "driver_mic_common.h"
 #include "driver_mic_common.h"
 #include "driver_mic_sink.h"
 #include "driver_mic_sink.h"
 
 
+
+
+
 /* Initialize the MIC sink, initializing connection to the source
 /* Initialize the MIC sink, initializing connection to the source
  * and to the other devices (not implemented yet).
  * and to the other devices (not implemented yet).
  */
  */

+ 1 - 0
src/drivers/mic/driver_mic_sink.h

@@ -42,6 +42,7 @@ unsigned int _starpu_mic_sink_get_nb_core(void);
 void _starpu_mic_sink_allocate(const struct _starpu_mp_node *mp_node, void *arg, int arg_size);
 void _starpu_mic_sink_allocate(const struct _starpu_mp_node *mp_node, void *arg, int arg_size);
 void _starpu_mic_sink_free(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, void *arg, int arg_size);
 void _starpu_mic_sink_free(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, void *arg, int arg_size);
 
 
+
 #endif /* STARPU_USE_MIC */
 #endif /* STARPU_USE_MIC */
 
 
 
 

+ 2 - 0
src/drivers/mp_common/mp_common.c

@@ -42,6 +42,8 @@ struct _starpu_mp_node * STARPU_ATTRIBUTE_MALLOC
 
 
 	node->peer_id = peer_id;
 	node->peer_id = peer_id;
 
 
+	task_fifo_init(&(node->dead_queue));
+
 	switch(node->kind)
 	switch(node->kind)
 	{
 	{
 #ifdef STARPU_USE_MIC
 #ifdef STARPU_USE_MIC

+ 3 - 3
src/drivers/mp_common/mp_common.h

@@ -22,6 +22,7 @@
 #include <starpu.h>
 #include <starpu.h>
 #include <common/config.h>
 #include <common/config.h>
 
 
+#include "task_fifo.h"
 
 
 #ifdef STARPU_USE_MP
 #ifdef STARPU_USE_MP
 
 
@@ -59,7 +60,6 @@ enum _starpu_mp_command
 	STARPU_ANSWER_SINK_NBCORES = 0x16,
 	STARPU_ANSWER_SINK_NBCORES = 0x16,
 	STARPU_EXECUTION_SUBMITTED = 0x17,
 	STARPU_EXECUTION_SUBMITTED = 0x17,
 	STARPU_EXECUTION_COMPLETED = 0x18,
 	STARPU_EXECUTION_COMPLETED = 0x18,
-	STARPU_MIN_NWORKERS = 0x19
 };
 };
 
 
 enum _starpu_mp_node_kind
 enum _starpu_mp_node_kind
@@ -117,8 +117,8 @@ struct _starpu_mp_node
 	 * This is the devid both for the sink and the host. */
 	 * This is the devid both for the sink and the host. */
 	int devid;
 	int devid;
 
 
-        /*The id of the lowest worker for the mic*/
-	int min_nworkers;
+        /*dead queue*/
+        struct task_fifo dead_queue;
 
 
 	/* Only MIC use this for now !!
 	/* Only MIC use this for now !!
 	*  Is the number ok MIC on the system. */
 	*  Is the number ok MIC on the system. */

+ 155 - 73
src/drivers/mp_common/sink_common.c

@@ -29,6 +29,9 @@
 
 
 #include "sink_common.h"
 #include "sink_common.h"
 
 
+#define HYPER_THREAD_NUMBER 4
+#include "task_fifo.h"
+
 /* Return the sink kind of the running process, based on the value of the
 /* Return the sink kind of the running process, based on the value of the
  * STARPU_SINK environment variable.
  * STARPU_SINK environment variable.
  * If there is no valid value retrieved, return STARPU_INVALID_KIND
  * If there is no valid value retrieved, return STARPU_INVALID_KIND
@@ -67,65 +70,6 @@ _starpu_sink_nbcores (const struct _starpu_mp_node *node)
 }
 }
 
 
 
 
-/* Receive paquet from _starpu_src_common_execute_kernel in the form below :
- * [Function pointer on sink, number of interfaces, interfaces
- * (union _starpu_interface), cl_arg]
- * Then call the function given, passing as argument an array containing the
- * addresses of the received interfaces
- */
-void _starpu_sink_common_execute(const struct _starpu_mp_node *node,
-					void *arg, int arg_size)
-{
-	unsigned id = 0;
-
-	void *arg_ptr = arg;
-	void (*kernel)(void **, void *) = NULL;
-	unsigned coreid = 0;
-	unsigned nb_interfaces = 0;
-	void *interfaces[STARPU_NMAXBUFS];
-	void *cl_arg;
-
-	kernel = *(void(**)(void **, void *)) arg_ptr;
-	arg_ptr += sizeof(kernel);
-
-	coreid = *(unsigned *) arg_ptr;
-	arg_ptr += sizeof(coreid);
-
-	nb_interfaces = *(unsigned *) arg_ptr;
-	arg_ptr += sizeof(nb_interfaces);
-
-	/* The function needs an array pointing to each interface it needs
-	 * during execution. As in sink-side there is no mean to know which
-	 * kind of interface to expect, the array is composed of unions of
-	 * interfaces, thus we expect the same size anyway */
-	for (id = 0; id < nb_interfaces; id++)
-	{
-		interfaces[id] = arg_ptr;
-		arg_ptr += sizeof(union _starpu_interface);
-	}
-
-	/* Was cl_arg sent ? */
-	if (arg_size > arg_ptr - arg)
-		cl_arg = arg_ptr;
-	else
-		cl_arg = NULL;
-
-	//_STARPU_DEBUG("telling host that we have submitted the task %p.\n", kernel);
-	/* XXX: in the future, we will not have to directly execute the kernel
-	 * but submit it to the correct local worker. */
-	_starpu_mp_common_send_command(node, STARPU_EXECUTION_SUBMITTED,
-				       NULL, 0);
-
-	//_STARPU_DEBUG("executing the task %p\n", kernel);
-	/* XXX: we keep the synchronous execution model on the sink side for
-	 * now. */
-	kernel(interfaces, cl_arg);
-
-	//_STARPU_DEBUG("telling host that we have finished the task %p.\n", kernel);
-	_starpu_mp_common_send_command(node, STARPU_EXECUTION_COMPLETED,
-				       &coreid, sizeof(coreid));
-}
-
 
 
 static void _starpu_sink_common_lookup(const struct _starpu_mp_node *node,
 static void _starpu_sink_common_lookup(const struct _starpu_mp_node *node,
 				       char *func_name)
 				       char *func_name)
@@ -213,16 +157,6 @@ static void _starpu_sink_common_copy_to_sink(const struct _starpu_mp_node *mp_no
     mp_node->dt_send_to_device(mp_node, cmd->devid, cmd->addr, cmd->size);
     mp_node->dt_send_to_device(mp_node, cmd->devid, cmd->addr, cmd->size);
 }
 }
 
 
-static void _starpu_sink_min_nworkers(const struct _starpu_mp_node *mp_node, void * arg, 
-				      int arg_size)
-{
-	STARPU_ASSERT(arg_size == sizeof(int));
-
-	mp_node->min_nworkers = *((int*)arg);
-
-	_STARPU_DEBUG("Mic: my min_nworkers is: %d.\n", mp_node->min_nworkers);	
-}
-
 /* Function looping on the sink, waiting for tasks to execute.
 /* Function looping on the sink, waiting for tasks to execute.
  * If the caller is the host, don't do anything.
  * If the caller is the host, don't do anything.
  */
  */
@@ -282,13 +216,20 @@ void _starpu_sink_common_worker(void)
 				_starpu_sink_common_copy_to_sink(node, arg, arg_size);
 				_starpu_sink_common_copy_to_sink(node, arg, arg_size);
 				break;
 				break;
 
 
-		        case STARPU_MIN_NWORKERS:
-				_starpu_sink_min_nworkers(node, arg, arg_size);
-				break;
-
 			default:
 			default:
 				printf("Oops, command %x unrecognized\n", command);
 				printf("Oops, command %x unrecognized\n", command);
 		}
 		}
+
+		if(!task_fifo_is_empty(&(node->dead_queue)))
+		  {
+		    struct task * task = node->dead_queue.first;
+		    _STARPU_DEBUG("telling host that we have finished the task %p sur %d.\n", task->kernel, task->coreid);
+		    _starpu_mp_common_send_command(task->node, STARPU_EXECUTION_COMPLETED,
+								    &(task->coreid), sizeof(task->coreid));
+		    _STARPU_DEBUG("we have finished the task %p sur %d.\n", task->kernel, task->coreid);
+		    task_fifo_pop(&(node->dead_queue));
+		    free(task);
+		  }
 	}
 	}
 
 
 	/* Deinitialize the node and release it */
 	/* Deinitialize the node and release it */
@@ -296,3 +237,144 @@ void _starpu_sink_common_worker(void)
 
 
 	exit(0);
 	exit(0);
 }
 }
+
+
+
+static void* _starpu_mic_sink_thread(void * thread_arg)
+{
+  struct task *arg = (struct task *)thread_arg;
+  _STARPU_DEBUG("thread launch: %d.\n", arg->coreid);
+  arg->kernel(arg->interfaces,arg->cl_arg);
+
+  task_fifo_append(&(arg->node->dead_queue),arg);
+
+  pthread_exit(NULL);
+}
+
+static void _starpu_mic_sink_execute_thread(struct task *arg)
+{
+  int j;
+  pthread_t thread;
+  cpu_set_t cpuset;
+  int ret;
+  
+  ret = pthread_create(&thread, NULL, _starpu_mic_sink_thread, arg);
+  STARPU_ASSERT(ret == 0);
+  
+  CPU_ZERO(&cpuset);
+  for(j=0;j<HYPER_THREAD_NUMBER;j++)
+    CPU_SET(j+arg->coreid*HYPER_THREAD_NUMBER,&cpuset);
+
+  _STARPU_DEBUG("coreid: %d.\n", arg->coreid);
+  ret = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
+  if(ret != 0)
+    { 
+      if(ret== EFAULT)
+	printf("\n\n EFAULT \n\n");
+      if(ret == EINVAL)
+	printf("\n\n EINVAL \n\n");
+      if(ret == ESRCH)
+	printf("\n\n ESRCH \n\n");
+    }
+  STARPU_ASSERT(ret == 0);
+}
+
+
+/* Receive paquet from _starpu_src_common_execute_kernel in the form below :
+ * [Function pointer on sink, number of interfaces, interfaces
+ * (union _starpu_interface), cl_arg]
+ * Then call the function given, passing as argument an array containing the
+ * addresses of the received interfaces
+ */
+
+void _starpu_sink_common_execute(const struct _starpu_mp_node *node,
+					void *arg, int arg_size)
+{
+	unsigned id = 0;
+	unsigned nb_interfaces;
+
+	void *arg_ptr = arg;
+	struct task *thread_arg = malloc(sizeof(struct task));
+	
+	thread_arg->node = node;
+
+	thread_arg->kernel = *(void(**)(void **, void *)) arg_ptr;
+	arg_ptr += sizeof(thread_arg->kernel);
+
+	thread_arg->coreid = *(unsigned *) arg_ptr;
+	arg_ptr += sizeof(thread_arg->coreid);
+
+	nb_interfaces = *(unsigned *) arg_ptr;
+	arg_ptr += sizeof(nb_interfaces);
+
+	/* The function needs an array pointing to each interface it needs
+	 * during execution. As in sink-side there is no mean to know which
+	 * kind of interface to expect, the array is composed of unions of
+	 * interfaces, thus we expect the same size anyway */
+	for (id = 0; id < nb_interfaces; id++)
+	{
+		thread_arg->interfaces[id] = arg_ptr;
+		arg_ptr += sizeof(union _starpu_interface);
+	}
+
+	/* Was cl_arg sent ? */
+	if (arg_size > arg_ptr - arg)
+		thread_arg->cl_arg = arg_ptr;
+	else
+		thread_arg->cl_arg = NULL;
+
+	_STARPU_DEBUG("telling host that we have submitted the task %p.\n", thread_arg->kernel);
+	_starpu_mp_common_send_command(node, STARPU_EXECUTION_SUBMITTED,
+				       NULL, 0);
+
+	//_STARPU_DEBUG("executing the task %p\n", kernel);
+	_starpu_mic_sink_execute_thread(thread_arg);
+	
+}
+
+
+
+/*
+void _starpu_sink_common_execute(const struct _starpu_mp_node *node,
+					void *arg, int arg_size)
+{
+	unsigned id = 0;
+
+	void *arg_ptr = arg;
+	void (*kernel)(void **, void *) = NULL;
+	unsigned coreid = 0;
+	unsigned nb_interfaces = 0;
+	void *interfaces[STARPU_NMAXBUFS];
+	void *cl_arg;
+
+	kernel = *(void(**)(void **, void *)) arg_ptr;
+	arg_ptr += sizeof(kernel);
+
+	coreid = *(unsigned *) arg_ptr;
+	arg_ptr += sizeof(coreid);
+
+	nb_interfaces = *(unsigned *) arg_ptr;
+	arg_ptr += sizeof(nb_interfaces);
+
+	for (id = 0; id < nb_interfaces; id++)
+	{
+		interfaces[id] = arg_ptr;
+		arg_ptr += sizeof(union _starpu_interface);
+	}
+
+
+	if (arg_size > arg_ptr - arg)
+		cl_arg = arg_ptr;
+	else
+		cl_arg = NULL;
+
+	_starpu_mp_common_send_command(node, STARPU_EXECUTION_SUBMITTED,
+				       NULL, 0);
+
+
+	kernel(interfaces, cl_arg);
+
+	_starpu_mp_common_send_command(node, STARPU_EXECUTION_COMPLETED,
+				       &coreid, sizeof(coreid));
+}*/
+

+ 1 - 0
src/drivers/mp_common/sink_common.h

@@ -40,6 +40,7 @@ void _starpu_sink_nbcores (const struct _starpu_mp_node *node);
 void _starpu_sink_common_allocate(const struct _starpu_mp_node *mp_node, void *arg, int arg_size);
 void _starpu_sink_common_allocate(const struct _starpu_mp_node *mp_node, void *arg, int arg_size);
 void _starpu_sink_common_free(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, void *arg, int arg_size);
 void _starpu_sink_common_free(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, void *arg, int arg_size);
 
 
+
 #endif /* STARPU_USE_MP */
 #endif /* STARPU_USE_MP */