Browse Source

src/drivers/driver_common/driver_common.c: when dealing with parallel contexts, also generate trace and stats for non-master workers

Nathalie Furmento 11 years ago
parent
commit
892665a9b3
1 changed files with 47 additions and 6 deletions
  1. 47 6
      src/drivers/driver_common/driver_common.c

+ 47 - 6
src/drivers/driver_common/driver_common.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
  * Copyright (C) 2010-2014  Université de Bordeaux 1
  * Copyright (C) 2010-2014  Université de Bordeaux 1
- * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014  Centre National de la Recherche Scientifique
  * Copyright (C) 2011  Télécom-SudParis
  * Copyright (C) 2011  Télécom-SudParis
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
@@ -74,6 +74,26 @@ void _starpu_driver_start_job(struct _starpu_worker *args, struct _starpu_job *j
 		_starpu_top_task_started(task,workerid,codelet_start);
 		_starpu_top_task_started(task,workerid,codelet_start);
 
 
 	_STARPU_TRACE_START_CODELET_BODY(j, j->nimpl, perf_arch, workerid);
 	_STARPU_TRACE_START_CODELET_BODY(j, j->nimpl, perf_arch, workerid);
+
+	// Find out if the worker is the master of a parallel context
+	struct _starpu_sched_ctx *sched_ctx = _starpu_sched_ctx_get_sched_ctx_for_worker_and_job(args, j);
+	if (sched_ctx)
+	{
+		struct starpu_worker_collection *workers = sched_ctx->workers;
+		struct starpu_sched_ctx_iterator it;
+
+		if (workers->init_iterator)
+			workers->init_iterator(workers, &it);
+		while (workers->has_next(workers, &it))
+		{
+			int _workerid = workers->get_next(workers, &it);
+			if (_workerid != workerid)
+			{
+				struct _starpu_worker *worker = _starpu_get_worker_struct(_workerid);
+				_starpu_driver_start_job(worker, j, &worker->perf_arch, codelet_start, rank, profiling);
+			}
+		}
+	}
 }
 }
 
 
 void _starpu_driver_end_job(struct _starpu_worker *args, struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch STARPU_ATTRIBUTE_UNUSED, struct timespec *codelet_end, int rank, int profiling)
 void _starpu_driver_end_job(struct _starpu_worker *args, struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch STARPU_ATTRIBUTE_UNUSED, struct timespec *codelet_end, int rank, int profiling)
@@ -103,7 +123,28 @@ void _starpu_driver_end_job(struct _starpu_worker *args, struct _starpu_job *j,
 		_starpu_top_task_ended(task,workerid,codelet_end);
 		_starpu_top_task_ended(task,workerid,codelet_end);
 
 
 	args->status = STATUS_UNKNOWN;
 	args->status = STATUS_UNKNOWN;
+
+	// Find out if the worker is the master of a parallel context
+	struct _starpu_sched_ctx *sched_ctx = _starpu_sched_ctx_get_sched_ctx_for_worker_and_job(args, j);
+	if (sched_ctx)
+	{
+		struct starpu_worker_collection *workers = sched_ctx->workers;
+		struct starpu_sched_ctx_iterator it;
+
+		if (workers->init_iterator)
+			workers->init_iterator(workers, &it);
+		while (workers->has_next(workers, &it))
+		{
+			int _workerid = workers->get_next(workers, &it);
+			if (_workerid != workerid)
+			{
+				struct _starpu_worker *worker = _starpu_get_worker_struct(_workerid);
+				_starpu_driver_end_job(worker, j, &worker->perf_arch, codelet_end, rank, profiling);
+			}
+		}
+	}
 }
 }
+
 void _starpu_driver_update_job_feedback(struct _starpu_job *j, struct _starpu_worker *worker_args,
 void _starpu_driver_update_job_feedback(struct _starpu_job *j, struct _starpu_worker *worker_args,
 					struct starpu_perfmodel_arch* perf_arch,
 					struct starpu_perfmodel_arch* perf_arch,
 					struct timespec *codelet_start, struct timespec *codelet_end, int profiling)
 					struct timespec *codelet_start, struct timespec *codelet_end, int profiling)
@@ -201,10 +242,10 @@ static void _starpu_worker_set_status_wakeup(int workerid)
 static void _starpu_exponential_backoff(struct _starpu_worker *args)
 static void _starpu_exponential_backoff(struct _starpu_worker *args)
 {
 {
 	int delay = args->spinning_backoff;
 	int delay = args->spinning_backoff;
-	
+
 	if (args->spinning_backoff < BACKOFF_MAX)
 	if (args->spinning_backoff < BACKOFF_MAX)
-		args->spinning_backoff<<=1; 
-	
+		args->spinning_backoff<<=1;
+
 	while(delay--)
 	while(delay--)
 		STARPU_UYIELD();
 		STARPU_UYIELD();
 }
 }
@@ -285,7 +326,7 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int wor
 		}
 		}
 		else
 		else
 		{
 		{
-			STARPU_PTHREAD_MUTEX_UNLOCK(&args->sched_mutex);			
+			STARPU_PTHREAD_MUTEX_UNLOCK(&args->sched_mutex);
 			if (_starpu_machine_is_running())
 			if (_starpu_machine_is_running())
 			{
 			{
 				_starpu_exponential_backoff(args);
 				_starpu_exponential_backoff(args);
@@ -362,7 +403,7 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 					STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 					STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 					workers[i].current_rank = j->active_task_alias_count++;
 					workers[i].current_rank = j->active_task_alias_count++;
 					STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 					STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
-					
+
 					combined_worker = _starpu_get_combined_worker_struct(j->combined_workerid);
 					combined_worker = _starpu_get_combined_worker_struct(j->combined_workerid);
 					workers[i].combined_workerid = j->combined_workerid;
 					workers[i].combined_workerid = j->combined_workerid;
 					workers[i].worker_size = combined_worker->worker_size;
 					workers[i].worker_size = combined_worker->worker_size;