浏览代码

mic: Correction nbmaxcombinedworkers + update _starpu_mic_sink_deinit

Thibaud Lambert 11 年之前
父节点
当前提交
b17db603a8

+ 9 - 5
configure.ac

@@ -342,11 +342,6 @@ if test x$enable_cpu = xyes; then
 	AC_DEFINE(STARPU_USE_CPU, [1], [CPU driver is activated])
 fi
 
-# How many parallel worker can we support ?
-nmaxcombinedworkers=`expr 2 \* $maxcpus`
-AC_DEFINE_UNQUOTED(STARPU_NMAX_COMBINEDWORKERS,
-	[$nmaxcombinedworkers], [Maximum number of worker combinations])
-
 ###############################################################################
 #                                                                             #
 #                                 CUDA settings                               #
@@ -1476,6 +1471,15 @@ AC_MSG_CHECKING(Maximum number of workers)
 AC_MSG_RESULT($nmaxworkers)
 AC_DEFINE_UNQUOTED(STARPU_NMAXWORKERS, [$nmaxworkers], [Maximum number of workers])
 
+# Computes the maximun number of combined worker
+nmaxcombinedworkers=`expr $maxcpus + $nmaxmicthreads`  
+AC_MSG_CHECKING(Maximum number of workers combinations)
+AC_MSG_RESULT($nmaxcombinedworkers)
+AC_DEFINE_UNQUOTED(STARPU_NMAX_COMBINEDWORKERS,
+	[$nmaxcombinedworkers], [Maximum number of worker combinations])
+
+
+
 # Computes the maximum number of implementations per arch
 AC_MSG_CHECKING(maximum number of implementations)
 AC_ARG_ENABLE(maximplementations, [AS_HELP_STRING([--enable-maximplementations=<number>],

+ 0 - 1
src/core/sched_policy.c

@@ -611,7 +611,6 @@ pick:
 				sched_ctx = _starpu_get_initial_sched_ctx();
 			else
 				sched_ctx = _get_next_sched_ctx_to_pop_into(worker);
-
 			if(sched_ctx != NULL && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 			{
 				if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)

+ 1 - 1
src/core/workers.c

@@ -1422,7 +1422,7 @@ unsigned starpu_worker_is_combined_worker(int id)
 
 struct _starpu_sched_ctx *_starpu_get_sched_ctx_struct(unsigned id)
 {
-	if (id == STARPU_NMAX_SCHED_CTXS) return NULL;
+	if(id == STARPU_NMAX_SCHED_CTXS) return NULL;
 	return &config.sched_ctxs[id];
 }
 

+ 22 - 2
src/drivers/mic/driver_mic_sink.c

@@ -54,11 +54,11 @@ void _starpu_mic_sink_init(struct _starpu_mp_node *node)
 	_starpu_mic_common_accept(&node->host_sink_dt_connection.mic_endpoint,
 									 STARPU_MIC_SOURCE_DT_PORT_NUMBER);
 	
+	node->is_running = 1;
+
 	//init the set
 	CPU_ZERO(&cpuset);
 
-//	node->nb_cores = COISysGetCoreCount();
-
 	node->nb_cores = COISysGetHardwareThreadCount() - COISysGetHardwareThreadCount() / COISysGetCoreCount();
 	node->thread_table = malloc(sizeof(pthread_t)*node->nb_cores);
 
@@ -117,6 +117,26 @@ void _starpu_mic_sink_init(struct _starpu_mp_node *node)
 
 void _starpu_mic_sink_deinit(struct _starpu_mp_node *node)
 {
+	
+	int i;
+	node->is_running = 0;
+	for(i=0; i<node->nb_cores; i++)
+	{
+		sem_post(&node->sem_run_table[i]);
+		pthread_join(((pthread_t *)node->thread_table)[i],NULL);
+		sem_destroy(&node->sem_run_table[i]);
+	}
+
+	free(node->thread_table);
+	free(node->run_table);
+	free(node->sem_run_table);
+
+	mp_barrier_list_delete(node->barrier_list);
+	mp_message_list_delete(node->message_queue);
+
+	pthread_mutex_destroy(&node->message_queue_mutex);
+	pthread_mutex_destroy(&node->barrier_mutex);
+	
 	//unsigned int i;
 
 	//for (i = 0; i < node->nb_mp_sinks; ++i)

+ 3 - 0
src/drivers/mp_common/mp_common.h

@@ -134,6 +134,9 @@ struct _starpu_mp_node
 	 * Must be initialized during init function*/
 	int nb_cores;
 
+	/*Is starpu running*/
+	int is_running;
+
 	/* Buffer used for scif data transfers, allocated
 	 * during node initialization.
 	 * Size : BUFFER_SIZE */

+ 66 - 63
src/drivers/mp_common/sink_common.c

@@ -258,82 +258,85 @@ void* _starpu_sink_thread(void * thread_arg)
 	/* free the structure */
 	free(thread_arg);
 
-
-	while(1)
+	while(node->is_running)
 	{
 		/*Wait there is a task available */
 		sem_wait(sem);
 
-		/* If it's a parallel task */
-		if((*task)->is_parallel_task)
+		if((*task) != NULL)
 		{
-			/* Synchronize with others threads of the combined worker*/
-			STARPU_PTHREAD_BARRIER_WAIT(&(*task)->mp_barrier->before_work_barrier);
 
-			/* The first thread of the combined worker 
-			 * tell the sink that the execution has begun
-			 */
-			if((*task)->coreid == (*task)->combined_worker[0])
+			/* If it's a parallel task */
+			if((*task)->is_parallel_task)
 			{
-				/* Init message to tell the sink that the execution has begun */
-				struct mp_message * message = mp_message_new();
-				message->type = STARPU_PRE_EXECUTION;
-				*(int *) message->buffer = (*task)->combined_workerid;
-				message->size = sizeof((*task)->combined_workerid);
-
-				/* Append the message to the queue */	
-				pthread_mutex_lock(&node->message_queue_mutex);
-				mp_message_list_push_front(node->message_queue,message);
-				pthread_mutex_unlock(&node->message_queue_mutex);
-
-				/* If the mode is FORKJOIN, 
-				 * the first thread binds himself on all core of the combined worker 
+				/* Synchronize with others threads of the combined worker*/
+				STARPU_PTHREAD_BARRIER_WAIT(&(*task)->mp_barrier->before_work_barrier);
+
+				/* The first thread of the combined worker 
+				 * tell the sink that the execution has begun
 				 */
-				if((*task)->type == STARPU_FORKJOIN)
-					node->bind_thread(node, coreid, (*task)->combined_worker, (*task)->combined_worker_size);
+				if((*task)->coreid == (*task)->combined_worker[0])
+				{
+					/* Init message to tell the sink that the execution has begun */
+					struct mp_message * message = mp_message_new();
+					message->type = STARPU_PRE_EXECUTION;
+					*(int *) message->buffer = (*task)->combined_workerid;
+					message->size = sizeof((*task)->combined_workerid);
+
+					/* Append the message to the queue */	
+					pthread_mutex_lock(&node->message_queue_mutex);
+					mp_message_list_push_front(node->message_queue,message);
+					pthread_mutex_unlock(&node->message_queue_mutex);
+
+					/* If the mode is FORKJOIN, 
+					 * the first thread binds himself on all core of the combined worker 
+					 */
+					if((*task)->type == STARPU_FORKJOIN)
+						node->bind_thread(node, coreid, (*task)->combined_worker, (*task)->combined_worker_size);
+				}
 			}
-		}
-		if((*task)->type != STARPU_FORKJOIN || (*task)->coreid == (*task)->combined_worker[0])
-		{
-			/* execute the task */
-			(*task)->kernel((*task)->interfaces,(*task)->cl_arg);
-		}
-
-		/* If it's a parallel task */
-		if((*task)->is_parallel_task)
-		{
-			/* Synchronize with others threads of the combined worker*/
-			STARPU_PTHREAD_BARRIER_WAIT(&(*task)->mp_barrier->after_work_barrier);
-
-			/* The fisrt thread of the combined */
-			if((*task)->coreid == (*task)->combined_worker[0])
+			if((*task)->type != STARPU_FORKJOIN || (*task)->coreid == (*task)->combined_worker[0])
 			{
-				/* Erase the barrier from the list */
-				pthread_mutex_lock(&node->barrier_mutex);
-				mp_barrier_list_erase(node->barrier_list,(*task)->mp_barrier);
-				pthread_mutex_unlock(&node->barrier_mutex);
-			
-				/* If the mode is FORKJOIN, 
-				 * the first thread rebinds himself on his own core 
-				 */
-				if((*task)->type == STARPU_FORKJOIN)
-					node->bind_thread(node, coreid, &coreid, 1);
+				/* execute the task */
+				(*task)->kernel((*task)->interfaces,(*task)->cl_arg);
+			}
 
+			/* If it's a parallel task */
+			if((*task)->is_parallel_task)
+			{
+				/* Synchronize with others threads of the combined worker*/
+				STARPU_PTHREAD_BARRIER_WAIT(&(*task)->mp_barrier->after_work_barrier);
+
+				/* The fisrt thread of the combined */
+				if((*task)->coreid == (*task)->combined_worker[0])
+				{
+					/* Erase the barrier from the list */
+					pthread_mutex_lock(&node->barrier_mutex);
+					mp_barrier_list_erase(node->barrier_list,(*task)->mp_barrier);
+					pthread_mutex_unlock(&node->barrier_mutex);
+
+					/* If the mode is FORKJOIN, 
+					 * the first thread rebinds himself on his own core 
+					 */
+					if((*task)->type == STARPU_FORKJOIN)
+						node->bind_thread(node, coreid, &coreid, 1);
+
+				}
 			}
+			/* Init message to tell the sink that the execution is completed */
+			struct mp_message * message = mp_message_new();
+			message->type = STARPU_EXECUTION_COMPLETED;
+			message->size = sizeof((*task)->coreid);
+			*(int*) message->buffer = (*task)->coreid;
+
+			free(*task);
+			(*task) = NULL;
+
+			/* Append the message to the queue */
+			pthread_mutex_lock(&node->message_queue_mutex);
+			mp_message_list_push_front(node->message_queue,message);
+			pthread_mutex_unlock(&node->message_queue_mutex);
 		}
-		/* Init message to tell the sink that the execution is completed */
-		struct mp_message * message = mp_message_new();
-		message->type = STARPU_EXECUTION_COMPLETED;
-		message->size = sizeof((*task)->coreid);
-		*(int*) message->buffer = (*task)->coreid;
-
-		free(*task);
-		(*task) = NULL;
-
-		/* Append the message to the queue */
-		pthread_mutex_lock(&node->message_queue_mutex);
-		mp_message_list_push_front(node->message_queue,message);
-		pthread_mutex_unlock(&node->message_queue_mutex);
 
 	}
 	pthread_exit(NULL);

+ 0 - 4
src/sched_policies/parallel_eager.c

@@ -203,8 +203,6 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 		return task;
 	}
 
-	_STARPU_DEBUG("workerid:%d\n", workerid);
-
 	int master = data->master_id[workerid];
 
 	//_STARPU_DEBUG("workerid:%d, master:%d\n",workerid,master);
@@ -238,8 +236,6 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 			}
 		}
 
-		_STARPU_DEBUG("## best_workerid:%d, best_size:%d\n",best_workerid,best_size);
-
 		/* In case nobody can execute this task, we let the master
 		 * worker take it anyway, so that it can discard it afterward.
 		 * */