Explorar o código

Avoid emitting progress probes repeatedly, allowing to re-enable them in the trace

Samuel Thibault %!s(int64=8) %!d(string=hai) anos
pai
achega
bfcac01ad1

+ 2 - 4
src/common/fxt.h

@@ -728,12 +728,10 @@ do {										\
 /* We skip these events becasue they are called so often that they cause FxT to
  * fail and make the overall trace unreadable anyway. */
 #define _STARPU_TRACE_START_PROGRESS(memnode)		\
-	do {} while (0)
-//	FUT_DO_PROBE2(_STARPU_FUT_START_PROGRESS, memnode, _starpu_gettid());
+	FUT_DO_PROBE2(_STARPU_FUT_START_PROGRESS_ON_TID, memnode, _starpu_gettid());
 
 #define _STARPU_TRACE_END_PROGRESS(memnode)		\
-	do {} while (0)
-	//FUT_DO_PROBE2(_STARPU_FUT_END_PROGRESS, memnode, _starpu_gettid());
+	FUT_DO_PROBE2(_STARPU_FUT_END_PROGRESS_ON_TID, memnode, _starpu_gettid());
 	
 #define _STARPU_TRACE_USER_EVENT(code)			\
 	FUT_DO_PROBE2(_STARPU_FUT_USER_EVENT, code, _starpu_gettid());

+ 14 - 5
src/drivers/cpu/driver_cpu.c

@@ -326,18 +326,20 @@ int _starpu_cpu_driver_run_once(struct _starpu_worker *cpu_worker)
 	pending_task = cpu_worker->task_transferring;
 	if (pending_task != NULL && cpu_worker->nb_buffers_transferred == cpu_worker->nb_buffers_totransfer)
 	{
+		int ret;
+		_STARPU_TRACE_END_PROGRESS(memnode);
 		j = _starpu_get_job_associated_to_task(pending_task);
 
 		_starpu_release_fetch_task_input_async(j, cpu_worker);
 		/* Reset it */
 		cpu_worker->task_transferring = NULL;
 
-		return _starpu_cpu_driver_execute_task(cpu_worker, pending_task, j);
+		ret = _starpu_cpu_driver_execute_task(cpu_worker, pending_task, j);
+		_STARPU_TRACE_START_PROGRESS(memnode);
+		return ret;
 	}
 
-	_STARPU_TRACE_START_PROGRESS(memnode);
 	res = __starpu_datawizard_progress(1, 1);
-	_STARPU_TRACE_END_PROGRESS(memnode);
 
 	if (!pending_task)
 		task = _starpu_get_worker_task(cpu_worker, workerid, memnode);
@@ -364,6 +366,7 @@ int _starpu_cpu_driver_run_once(struct _starpu_worker *cpu_worker)
 		return 0;
 	}
 
+	_STARPU_TRACE_END_PROGRESS(memnode);
 	/* Get the rank in case it is a parallel task */
 	if (j->task_size > 1)
 	{
@@ -382,8 +385,12 @@ int _starpu_cpu_driver_run_once(struct _starpu_worker *cpu_worker)
 		res = _starpu_fetch_task_input(task, j, 1);
 		STARPU_ASSERT(res == 0);
 	}
-	else
-		return _starpu_cpu_driver_execute_task(cpu_worker, task, j);
+	else {
+		int ret = _starpu_cpu_driver_execute_task(cpu_worker, task, j);
+		_STARPU_TRACE_END_PROGRESS(memnode);
+		return ret;
+	}
+	_STARPU_TRACE_END_PROGRESS(memnode);
 	return 0;
 }
 
@@ -411,11 +418,13 @@ _starpu_cpu_worker(void *arg)
 	struct _starpu_worker *args = arg;
 
 	_starpu_cpu_driver_init(args);
+	_STARPU_TRACE_END_PROGRESS(args->memory_node);
 	while (_starpu_machine_is_running())
 	{
 		_starpu_may_pause();
 		_starpu_cpu_driver_run_once(args);
 	}
+	_STARPU_TRACE_START_PROGRESS(args->memory_node);
 	_starpu_cpu_driver_deinit(args);
 
 	return NULL;

+ 12 - 8
src/drivers/cuda/driver_cuda.c

@@ -737,7 +737,6 @@ int _starpu_cuda_driver_init(struct _starpu_worker_set *worker_set)
 int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 {
 	struct _starpu_worker *worker0 = &worker_set->workers[0];
-	unsigned memnode = worker0->memory_node;
 	struct starpu_task *tasks[worker_set->nworkers], *task;
 	struct _starpu_job *j;
 	int i, res;
@@ -756,6 +755,7 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 	{
 		struct _starpu_worker *worker = &worker_set->workers[i];
 		int workerid = worker->workerid;
+		unsigned memnode = worker->memory_node;
 
 		if (!worker->ntasks)
 			idle_tasks++;
@@ -874,7 +874,7 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 	res |= __starpu_datawizard_progress(1, 1);
 
 	/* And pull tasks */
-	res |= _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers, memnode);
+	res |= _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers, worker0->memory_node);
 
 #ifdef STARPU_SIMGRID
 	if (!res)
@@ -887,6 +887,7 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 	for (i = 0; i < (int) worker_set->nworkers; i++)
 	{
 		struct _starpu_worker *worker = &worker_set->workers[i];
+		unsigned memnode STARPU_ATTRIBUTE_UNUSED = worker->memory_node;
 
 		task = tasks[i];
 		if (!task)
@@ -974,17 +975,20 @@ int _starpu_cuda_driver_deinit(struct _starpu_worker_set *worker_set)
 
 void *_starpu_cuda_worker(void *_arg)
 {
-	struct _starpu_worker_set* worker = _arg;
+	struct _starpu_worker_set* worker_set = _arg;
+	unsigned i;
 
-	_starpu_cuda_driver_init(worker);
-	_STARPU_TRACE_START_PROGRESS(memnode);
+	_starpu_cuda_driver_init(worker_set);
+	for (i = 0; i < worker_set->nworkers; i++)
+		_STARPU_TRACE_START_PROGRESS(worker_set->workers[i].memory_node);
 	while (_starpu_machine_is_running())
 	{
 		_starpu_may_pause();
-		_starpu_cuda_driver_run_once(worker);
+		_starpu_cuda_driver_run_once(worker_set);
 	}
-	_STARPU_TRACE_END_PROGRESS(memnode);
-	_starpu_cuda_driver_deinit(worker);
+	for (i = 0; i < worker_set->nworkers; i++)
+		_STARPU_TRACE_END_PROGRESS(worker_set->workers[i].memory_node);
+	_starpu_cuda_driver_deinit(worker_set);
 
 	return NULL;
 }

+ 18 - 2
src/drivers/mp_common/source_common.c

@@ -173,6 +173,7 @@ static int _starpu_src_common_handle_async(struct _starpu_mp_node *node,
 /* Handle all message which have been stored in the message_queue */
 static void _starpu_src_common_handle_stored_async(struct _starpu_mp_node *node)
 {
+	int stopped_progress = 0;
 	STARPU_PTHREAD_MUTEX_LOCK(&node->message_queue_mutex);
 	/* while the list is not empty */
 	while(!mp_message_list_empty(&node->message_queue))
@@ -180,6 +181,8 @@ static void _starpu_src_common_handle_stored_async(struct _starpu_mp_node *node)
 		/* We pop a message and handle it */
 		struct mp_message * message = mp_message_list_pop_back(&node->message_queue);
                 /* Release mutex during handle */
+		stopped_progress = 1;
+		_STARPU_TRACE_END_PROGRESS(memnode);
                 STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
 		_starpu_src_common_handle_async(node, message->buffer,
 				message->size, message->type, 1);
@@ -189,6 +192,8 @@ static void _starpu_src_common_handle_stored_async(struct _starpu_mp_node *node)
                 STARPU_PTHREAD_MUTEX_LOCK(&node->message_queue_mutex);
 	}
 	STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
+	if (stopped_progress)
+		_STARPU_TRACE_START_PROGRESS(memnode);
 }
 
 /* Store a message if is asynchronous
@@ -951,6 +956,7 @@ static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set *
 		{
 			struct _starpu_job * j = _starpu_get_job_associated_to_task(task);
 
+			_STARPU_TRACE_END_PROGRESS(memnode);
 			_starpu_set_local_worker_key(&worker_set->workers[i]);
 			_starpu_release_fetch_task_input_async(j, &worker_set->workers[i]);
 
@@ -973,25 +979,29 @@ static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set *
 
 			/* Reset it */
 			worker_set->workers[i].task_transferring = NULL;
+			_STARPU_TRACE_START_PROGRESS(memnode);
 		}
 	}
 
-        _STARPU_TRACE_START_PROGRESS(memnode);
         res |= __starpu_datawizard_progress(1, 1);
-        _STARPU_TRACE_END_PROGRESS(memnode);
 
         /* Handle message which have been store */
         _starpu_src_common_handle_stored_async(mp_node);
 
         STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
 
+	unsigned stopped_progress = 0;
         /* poll the device for completed jobs.*/
         while(mp_node->mp_recv_is_ready(mp_node))
         {
+		stopped_progress = 1;
+		_STARPU_TRACE_END_PROGRESS(memnode);
                 _starpu_src_common_recv_async(mp_node);
                 /* Mutex is unlock in _starpu_src_common_recv_async */
                 STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
         }
+	if (stopped_progress)
+		_STARPU_TRACE_START_PROGRESS(memnode);
 
         STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
 
@@ -1011,9 +1021,11 @@ static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set *
 		{
 			if(tasks[i] != NULL)
 			{
+				_STARPU_TRACE_END_PROGRESS(memnode);
 				_starpu_set_local_worker_key(&worker_set->workers[i]);
 				int ret = _starpu_fetch_task_input(tasks[i], _starpu_get_job_associated_to_task(tasks[i]), 1);
 				STARPU_ASSERT(!ret);
+				_STARPU_TRACE_START_PROGRESS(memnode);
 			}
 		}
 
@@ -1055,6 +1067,7 @@ void _starpu_src_common_workers_set(struct _starpu_worker_set * worker_set,
                 _starpu_src_common_send_workers(mp_node[device], baseworkerid, worker_set[device].nworkers);
         }
 
+        _STARPU_TRACE_START_PROGRESS(memnode);
         /*main loop*/
         while (_starpu_machine_is_running())
         {
@@ -1065,6 +1078,7 @@ void _starpu_src_common_workers_set(struct _starpu_worker_set * worker_set,
                 }
         }
         free(tasks);
+        _STARPU_TRACE_END_PROGRESS(memnode);
 
         for (device = 0; device < ndevices; device++)
                 _starpu_handle_all_pending_node_data_requests(memnode[device]);
@@ -1090,12 +1104,14 @@ void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
 
         _starpu_src_common_send_workers(mp_node, baseworkerid, worker_set->nworkers);
 
+        _STARPU_TRACE_START_PROGRESS(memnode);
         /*main loop*/
         while (_starpu_machine_is_running())
         {
                 _starpu_src_common_worker_internal_work(worker_set, mp_node, tasks, memnode);
         }
         free(tasks);
+        _STARPU_TRACE_END_PROGRESS(memnode);
 
         _starpu_handle_all_pending_node_data_requests(memnode);
 

+ 7 - 9
src/drivers/opencl/driver_opencl.c

@@ -687,6 +687,7 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 		idle_transfers++;
 	if (task && worker->nb_buffers_transferred == worker->nb_buffers_totransfer)
 	{
+		_STARPU_TRACE_END_PROGRESS(memnode);
 		j = _starpu_get_job_associated_to_task(task);
 
 		_starpu_release_fetch_task_input_async(j, worker);
@@ -702,7 +703,6 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 			return 0;
 		}
 
-		_STARPU_TRACE_END_PROGRESS(memnode);
 		_starpu_opencl_execute_job(task, worker);
 		_STARPU_TRACE_START_PROGRESS(memnode);
 	}
@@ -720,7 +720,7 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 		task = worker->current_tasks[worker->first_task];
 
 #ifdef STARPU_SIMGRID
-		if (task_finished[worker->devid][worker->first_task])
+		if (!task_finished[worker->devid][worker->first_task])
 #else /* !STARPU_SIMGRID */
 		cl_int status;
 		err = clGetEventInfo(task_events[worker->devid][worker->first_task], CL_EVENT_COMMAND_EXECUTION_STATUS, sizeof(cl_int), &status, &size);
@@ -730,10 +730,10 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 		if (status != CL_COMPLETE)
 #endif /* !STARPU_SIMGRID */
 		{
-			_STARPU_TRACE_START_EXECUTING();
 		}
 		else
 		{
+			_STARPU_TRACE_END_PROGRESS(memnode);
 #ifndef STARPU_SIMGRID
 			err = clReleaseEvent(task_events[worker->devid][worker->first_task]);
 			if (STARPU_UNLIKELY(err)) STARPU_OPENCL_REPORT_ERROR(err);
@@ -756,15 +756,13 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 				else
 				{
 					/* A synchronous task, we have finished flushing the pipeline, we can now at last execute it.  */
-					_STARPU_TRACE_END_PROGRESS(memnode);
 					_STARPU_TRACE_EVENT("sync_task");
 					_starpu_opencl_execute_job(task, worker);
 					_STARPU_TRACE_EVENT("end_sync_task");
-					_STARPU_TRACE_START_PROGRESS(memnode);
 					worker->pipeline_stuck = 0;
 				}
 			}
-			_STARPU_TRACE_END_EXECUTING();
+			_STARPU_TRACE_START_PROGRESS(memnode);
 		}
 	}
 	if (!worker->pipeline_length || worker->ntasks < worker->pipeline_length)
@@ -802,11 +800,11 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 		return 0;
 	}
 
+	_STARPU_TRACE_END_PROGRESS(memnode);
 	worker->current_tasks[(worker->first_task  + worker->ntasks)%STARPU_MAX_PIPELINE] = task;
 	worker->ntasks++;
 
 	/* Fetch data asynchronously */
-	_STARPU_TRACE_END_PROGRESS(memnode);
 	res = _starpu_fetch_task_input(task, j, 1);
 	STARPU_ASSERT(res == 0);
 	_STARPU_TRACE_START_PROGRESS(memnode);
@@ -843,14 +841,14 @@ void *_starpu_opencl_worker(void *_arg)
 	struct _starpu_worker* worker = _arg;
 
 	_starpu_opencl_driver_init(worker);
-	_STARPU_TRACE_START_PROGRESS(memnode);
+	_STARPU_TRACE_START_PROGRESS(worker->memory_node);
 	while (_starpu_machine_is_running())
 	{
 		_starpu_may_pause();
 		_starpu_opencl_driver_run_once(worker);
 	}
 	_starpu_opencl_driver_deinit(worker);
-	_STARPU_TRACE_END_PROGRESS(memnode);
+	_STARPU_TRACE_END_PROGRESS(worker->memory_node);
 
 	return NULL;
 }