Prechádzať zdrojové kódy

mic: update sink_common.c (size, id, rank for workers)

Thibaud Lambert 12 rokov pred
rodič
commit
3cd4163add
1 zmenil súbory, kde vykonal 93 pridanie a 70 odobranie
  1. 93 70
      src/drivers/mp_common/sink_common.c

+ 93 - 70
src/drivers/mp_common/sink_common.c

@@ -423,6 +423,94 @@ static void _starpu_sink_common_bind_to_combined_worker(struct _starpu_mp_node *
 }
 
 
+
+/* Get the current rank of the worker in the combined worker 
+ */
+static int _starpu_sink_common_get_current_rank(int workerid, struct _starpu_combined_worker * combined_worker)
+{
+	int i;
+	for(i=0; i<combined_worker->worker_size; i++)
+		if(workerid == combined_worker->combined_workerid[i])
+			return i;
+
+	STARPU_ASSERT(0);
+}
+
+/* Execute the task 
+ */
+static void _starpu_sink_common_execute_kernel(struct _starpu_mp_node *node, int coreid, struct mp_task *task, struct _starpu_worker * worker)
+{
+	struct _starpu_combined_worker * combined_worker = NULL;
+	/* If it's a parallel task */
+	if(task->is_parallel_task)
+	{
+		combined_worker = _starpu_get_combined_worker_struct(task->combined_workerid);
+		
+		worker->current_rank = _starpu_sink_common_get_current_rank(worker->workerid, combined_worker);
+		worker->combined_workerid = task->combined_workerid;
+		worker->worker_size = combined_worker->worker_size;
+		
+		/* Synchronize with others threads of the combined worker*/
+		STARPU_PTHREAD_BARRIER_WAIT(&task->mp_barrier->before_work_barrier);
+
+
+		/* The first thread of the combined worker */
+		if(worker->current_rank == 0)
+		{
+			/* tell the sink that the execution has begun */
+			_starpu_sink_common_pre_execution_message(node,task);
+
+			/* If the mode is FORKJOIN, 
+			 * the first thread binds himself 
+			 * on all core of the combined worker*/
+			if(task->type == STARPU_FORKJOIN)
+			{
+				_starpu_sink_common_bind_to_combined_worker(node, coreid, combined_worker);
+			}
+		}
+	}
+	else
+	{
+		worker->current_rank = 0;
+		worker->combined_workerid = 0;
+		worker->worker_size = 1;
+	}
+	if(task->type != STARPU_FORKJOIN || worker->current_rank == 0)
+	{
+		/* execute the task */
+		task->kernel(task->interfaces,task->cl_arg);
+	}
+
+	/* If it's a parallel task */
+	if(task->is_parallel_task)
+	{
+		/* Synchronize with others threads of the combined worker*/
+		STARPU_PTHREAD_BARRIER_WAIT(&task->mp_barrier->after_work_barrier);
+
+		/* The fisrt thread of the combined */
+		if(worker->current_rank == 0)
+		{
+			/* Erase the barrier from the list */
+			_starpu_sink_common_erase_barrier(node,task->mp_barrier);
+
+			/* If the mode is FORKJOIN, 
+			 * the first thread rebinds himself on his own core */
+			if(task->type == STARPU_FORKJOIN)
+				node->bind_thread(node, coreid, &coreid, 1);
+
+		}
+	}
+
+	node->run_table[coreid] = NULL;
+
+	/* tell the sink that the execution is completed */
+	_starpu_sink_common_execution_completed_message(node,task);
+
+	free(task);
+
+}
+
+
 /* The main function executed by the thread 
  * thread_arg is a structure containing the information needed by the thread
  */
@@ -436,81 +524,16 @@ void* _starpu_sink_thread(void * thread_arg)
 	free(thread_arg);
 
 	STARPU_PTHREAD_BARRIER_WAIT(&node->init_completed_barrier);	
-	
-	struct _starpu_machine_config *config = _starpu_get_machine_config();
-	struct _starpu_worker * worker = &config->workers[node->baseworkerid + coreid];
+
+	struct _starpu_worker *worker = &_starpu_get_machine_config()->workers[node->baseworkerid + coreid];
 
 	_starpu_set_local_worker_key(worker);
-	struct _starpu_combined_worker * combined_worker; 
 	while(node->is_running)
 	{
 		/*Wait there is a task available */
 		sem_wait(sem);
-		struct mp_task *task = node->run_table[coreid];
-		combined_worker = NULL;
-		if(task != NULL)
-		{
-
-			/* If it's a parallel task */
-			if(task->is_parallel_task)
-			{
-				combined_worker = _starpu_get_combined_worker_struct(task->combined_workerid);
-				/* Synchronize with others threads of the combined worker*/
-				STARPU_PTHREAD_BARRIER_WAIT(&task->mp_barrier->before_work_barrier);
-
-
-				/* The first thread of the combined worker */
-				if(worker->workerid == combined_worker->combined_workerid[0])
-				{
-					/* tell the sink that the execution has begun */
-					_starpu_sink_common_pre_execution_message(node,task);
-
-					/* If the mode is FORKJOIN, 
-					 * the first thread binds himself 
-					 * on all core of the combined worker*/
-					if(task->type == STARPU_FORKJOIN)
-					{
-						_starpu_sink_common_bind_to_combined_worker(node, coreid, combined_worker);
-					}
-				}
-			}
-			if(task->type != STARPU_FORKJOIN || worker->workerid == combined_worker->combined_workerid[0])
-			{
-				if(combined_worker != NULL)
-					worker->worker_size = combined_worker->worker_size;
-				else 
-					worker->worker_size = 1;
-				/* execute the task */
-				task->kernel(task->interfaces,task->cl_arg);
-			}
-
-			/* If it's a parallel task */
-			if(task->is_parallel_task)
-			{
-				/* Synchronize with others threads of the combined worker*/
-				STARPU_PTHREAD_BARRIER_WAIT(&task->mp_barrier->after_work_barrier);
-
-				/* The fisrt thread of the combined */
-				if(worker->workerid == combined_worker->combined_workerid[0])
-				{
-					/* Erase the barrier from the list */
-					_starpu_sink_common_erase_barrier(node,task->mp_barrier);
-
-					/* If the mode is FORKJOIN, 
-					 * the first thread rebinds himself on his own core */
-					if(task->type == STARPU_FORKJOIN)
-						node->bind_thread(node, coreid, &coreid, 1);
-
-				}
-			}
-			
-			node->run_table[coreid] = NULL;
-
-			/* tell the sink that the execution is completed */
-			_starpu_sink_common_execution_completed_message(node,task);
-
-			free(task);
-		}
+		if(node->run_table[coreid] != NULL)
+			_starpu_sink_common_execute_kernel(node,coreid,node->run_table[coreid],worker);
 
 	}
 	pthread_exit(NULL);
@@ -518,7 +541,7 @@ void* _starpu_sink_thread(void * thread_arg)
 
 
 /* Add the task to the specific thread and wake him up
- */
+*/
 static void _starpu_sink_common_execute_thread(struct _starpu_mp_node *node, struct mp_task *task)
 {
 	/* Add the task to the specific thread */