Prechádzať zdrojové kódy

Add a 'waiting' worker status

Samuel Thibault 8 rokov pred
rodič
commit
8bcca522cb

+ 3 - 1
src/core/errorcheck.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009, 2010, 2014  Université de Bordeaux
+ * Copyright (C) 2009, 2010, 2014, 2017  Université de Bordeaux
  * Copyright (C) 2010, 2011  CNRS
  * Copyright (C) 2017  Inria
  *
@@ -37,6 +37,8 @@ enum _starpu_worker_status
 	STATUS_CALLBACK,
 	/* while executing the scheduler code */
 	STATUS_SCHEDULING,
+	/* while waiting for a data transfer */
+	STATUS_WAITING,
 	/* while sleeping because there is nothing to do */
 	STATUS_SLEEPING
 };

+ 2 - 0
src/datawizard/coherency.c

@@ -1043,6 +1043,7 @@ int _starpu_fetch_task_input(struct starpu_task *task, struct _starpu_job *j, in
 			{
 				/* Ooops, not enough memory, make worker wait for these for now, and the synchronous call will finish by forcing eviction*/
 				worker->nb_buffers_totransfer = nacquires;
+				_starpu_set_worker_status(worker, STATUS_WAITING);
 				return 0;
 			}
 		}
@@ -1062,6 +1063,7 @@ int _starpu_fetch_task_input(struct starpu_task *task, struct _starpu_job *j, in
 	if (async)
 	{
 		worker->nb_buffers_totransfer = nacquires;
+		_starpu_set_worker_status(worker, STATUS_WAITING);
 		return 0;
 	}
 

+ 13 - 0
src/datawizard/data_request.c

@@ -210,6 +210,14 @@ int _starpu_wait_data_request_completion(struct _starpu_data_request *r, unsigne
 	starpu_pthread_queue_register(&wait, &_starpu_simgrid_transfer_queue[(unsigned) r->dst_replicate->memory_node]);
 #endif
 
+	struct _starpu_worker *worker = _starpu_get_local_worker_key();
+
+	if (worker)
+	{
+		STARPU_ASSERT(worker->status == STATUS_UNKNOWN);
+		_starpu_set_worker_status(worker, STATUS_WAITING);
+	}
+
 	do
 	{
 #ifdef STARPU_SIMGRID
@@ -244,6 +252,11 @@ int _starpu_wait_data_request_completion(struct _starpu_data_request *r, unsigne
 	}
 	while (1);
 
+	if (worker)
+	{
+		_starpu_set_worker_status(worker, STATUS_UNKNOWN);
+	}
+
 #ifdef STARPU_SIMGRID
 	starpu_pthread_queue_unregister(&wait, &_starpu_simgrid_transfer_queue[local_node]);
 	starpu_pthread_queue_unregister(&wait, &_starpu_simgrid_transfer_queue[(unsigned) r->dst_replicate->memory_node]);

+ 14 - 0
src/datawizard/memory_manager.c

@@ -19,6 +19,7 @@
 #include <common/thread.h>
 #include <common/fxt.h>
 #include <datawizard/memory_manager.h>
+#include <core/workers.h>
 #include <starpu_stdlib.h>
 
 static size_t global_size[STARPU_MAXNODES];
@@ -79,6 +80,14 @@ int starpu_memory_allocate(unsigned node, size_t size, int flags)
 	STARPU_PTHREAD_MUTEX_LOCK(&lock_nodes[node]);
 	if (flags & STARPU_MEMORY_WAIT)
 	{
+		struct _starpu_worker *worker = _starpu_get_local_worker_key();
+
+		if (worker)
+		{
+			STARPU_ASSERT(worker->status == STATUS_UNKNOWN);
+			_starpu_set_worker_status(worker, STATUS_WAITING);
+		}
+
 		while (used_size[node] + size > global_size[node])
 		{
 			/* Tell deallocators we need this amount */
@@ -89,6 +98,11 @@ int starpu_memory_allocate(unsigned node, size_t size, int flags)
 			STARPU_PTHREAD_COND_WAIT(&cond_nodes[node], &lock_nodes[node]);
 		}
 
+		if (worker)
+		{
+			_starpu_set_worker_status(worker, STATUS_UNKNOWN);
+		}
+
 		/* And take it */
 		used_size[node] += size;
 		_STARPU_TRACE_USED_MEM(node, used_size[node]);

+ 1 - 0
src/drivers/cpu/driver_cpu.c

@@ -330,6 +330,7 @@ int _starpu_cpu_driver_run_once(struct _starpu_worker *cpu_worker)
 		j = _starpu_get_job_associated_to_task(pending_task);
 
 		_starpu_fetch_task_input_tail(pending_task, j, cpu_worker);
+		_starpu_set_worker_status(cpu_worker, STATUS_UNKNOWN);
 		/* Reset it */
 		cpu_worker->task_transferring = NULL;
 

+ 1 - 0
src/drivers/cuda/driver_cuda.c

@@ -775,6 +775,7 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 
 			_starpu_set_local_worker_key(worker);
 			_starpu_fetch_task_input_tail(task, j, worker);
+			_starpu_set_worker_status(worker, STATUS_UNKNOWN);
 			/* Reset it */
 			worker->task_transferring = NULL;
 

+ 3 - 2
src/drivers/mp_common/source_common.c

@@ -960,6 +960,9 @@ static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set *
 			_STARPU_TRACE_END_PROGRESS(memnode);
 			_starpu_set_local_worker_key(&worker_set->workers[i]);
 			_starpu_fetch_task_input_tail(task, j, &worker_set->workers[i]);
+			_starpu_set_worker_status(worker, STATUS_UNKNOWN);
+			/* Reset it */
+			worker_set->workers[i].task_transferring = NULL;
 
 			/* Execute the task */
 			res =  _starpu_src_common_execute(j, &worker_set->workers[i], mp_node);
@@ -978,8 +981,6 @@ static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set *
 					STARPU_ASSERT(0);
 			}
 
-			/* Reset it */
-			worker_set->workers[i].task_transferring = NULL;
 			_STARPU_TRACE_START_PROGRESS(memnode);
 		}
 	}

+ 1 - 0
src/drivers/opencl/driver_opencl.c

@@ -693,6 +693,7 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 		j = _starpu_get_job_associated_to_task(task);
 
 		_starpu_fetch_task_input_tail(task, j, worker);
+		_starpu_set_worker_status(worker, STATUS_UNKNOWN);
 		/* Reset it */
 		worker->task_transferring = NULL;
 

+ 3 - 0
tools/gdbinit

@@ -162,6 +162,9 @@ define starpu-workers
     if $worker->status == STATUS_SCHEDULING
       set $status="SCHEDULING"
     end
+    if $worker->status == STATUS_WAITING
+      set $status="WAITING"
+    end
     if $worker->status == STATUS_SLEEPING
       set $status="SLEEPING"
     end