Browse Source

Each worker can manage the good set of memories

Corentin Salingue 8 years ago
parent
commit
3bbc1443c4

+ 1 - 1
src/core/dependencies/data_arbiter_concurrency.c

@@ -269,7 +269,7 @@ unsigned _starpu_attempt_to_submit_arbitered_data_request(unsigned request_from_
 		while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
 		{
 			cpt++;
-			_starpu_datawizard_progress(_starpu_memory_node_get_local_key(), 0);
+			_starpu_datawizard_progress(0);
 		}
 		if (cpt == STARPU_SPIN_MAXTRY)
 			_starpu_spin_lock(&handle->header_lock);

+ 1 - 1
src/core/dependencies/data_concurrency.c

@@ -130,7 +130,7 @@ static unsigned _starpu_attempt_to_submit_data_request(unsigned request_from_cod
 		while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
 		{
 			cpt++;
-			_starpu_datawizard_progress(_starpu_memory_node_get_local_key(), 0);
+			_starpu_datawizard_progress(0);
 		}
 		if (cpt == STARPU_SPIN_MAXTRY)
 			_starpu_spin_lock(&handle->header_lock);

+ 32 - 0
src/core/topology.c

@@ -1674,6 +1674,9 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 				}
 				workerarg->bindid = _starpu_get_next_bindid(config, NULL, 0);
 				_starpu_memory_node_add_nworkers(memory_node);
+
+                _starpu_worker_drives_memory_node(workerarg->workerid, STARPU_MAIN_RAM);
+                _starpu_worker_drives_memory_node(workerarg->workerid, memory_node);
 #ifdef STARPU_SIMGRID
 				starpu_pthread_queue_register(&workerarg->wait, &_starpu_simgrid_transfer_queue[memory_node]);
 				if (memory_node != STARPU_MAIN_RAM)
@@ -1738,6 +1741,9 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 					}
 				}
 				_starpu_memory_node_add_nworkers(memory_node);
+
+                _starpu_worker_drives_memory_node(workerarg->workerid, STARPU_MAIN_RAM);
+                _starpu_worker_drives_memory_node(workerarg->workerid, memory_node);
 #ifdef STARPU_SIMGRID
 				starpu_pthread_queue_register(&workerarg->set->workers[0].wait, &_starpu_simgrid_transfer_queue[memory_node]);
 				starpu_pthread_queue_register(&workerarg->set->workers[0].wait, &_starpu_simgrid_transfer_queue[STARPU_MAIN_RAM]);
@@ -1777,6 +1783,9 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 #endif /* SIMGRID */
 				}
 				_starpu_memory_node_add_nworkers(memory_node);
+
+                _starpu_worker_drives_memory_node(workerarg->workerid, STARPU_MAIN_RAM);
+                _starpu_worker_drives_memory_node(workerarg->workerid, memory_node);
 #ifdef STARPU_SIMGRID
 				starpu_pthread_queue_register(&workerarg->wait, &_starpu_simgrid_transfer_queue[memory_node]);
 				starpu_pthread_queue_register(&workerarg->wait, &_starpu_simgrid_transfer_queue[STARPU_MAIN_RAM]);
@@ -1808,6 +1817,9 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 				}
 				workerarg->bindid = mic_bindid[devid];
 				_starpu_memory_node_add_nworkers(memory_node);
+
+                _starpu_worker_drives_memory_node(workerarg->workerid, STARPU_MAIN_RAM);
+                _starpu_worker_drives_memory_node(workerarg->workerid, memory_node);
 #ifdef STARPU_SIMGRID
 				starpu_pthread_queue_register(&workerarg->set->workers[0].wait, &_starpu_simgrid_transfer_queue[memory_node]);
 				starpu_pthread_queue_register(&workerarg->set->workers[0].wait, &_starpu_simgrid_transfer_queue[STARPU_MAIN_RAM]);
@@ -1824,6 +1836,9 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 
 				memory_node = ram_memory_node;
 				_starpu_memory_node_add_nworkers(memory_node);
+
+                _starpu_worker_drives_memory_node(workerarg->workerid, STARPU_MAIN_RAM);
+                _starpu_worker_drives_memory_node(workerarg->workerid, memory_node);
 #ifdef STARPU_SIMGRID
 				starpu_pthread_queue_register(&workerarg->wait, &_starpu_simgrid_transfer_queue[memory_node]);
 				starpu_pthread_queue_register(&workerarg->wait, &_starpu_simgrid_transfer_queue[STARPU_MAIN_RAM]);
@@ -1848,6 +1863,23 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 					_starpu_register_bus(memory_node, STARPU_MAIN_RAM);
 
 				}
+                _starpu_worker_drives_memory_node(workerarg->workerid, STARPU_MAIN_RAM);
+                _starpu_worker_drives_memory_node(workerarg->workerid, memory_node);
+#ifndef STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD
+                /* MPI driver thread can manage all slave memories if we disable the MPI multiple thread */
+                unsigned findworker;
+                for (findworker = 0; findworker < worker; findworker++)
+                {
+                    struct _starpu_worker *findworkerarg = &config->workers[findworker];
+                    if (findworkerarg->arch == STARPU_MPI_WORKER)
+                    {
+                        _starpu_worker_drives_memory_node(workerarg->workerid, findworkerarg->memory_node);
+                        _starpu_worker_drives_memory_node(findworkerarg->workerid, workerarg->memory_node);
+                    }
+                }
+              
+#endif
+                
 				workerarg->bindid = mpi_bindid[devid];
 				_starpu_memory_node_add_nworkers(memory_node);
 #ifdef STARPU_SIMGRID

+ 3 - 6
src/datawizard/coherency.c

@@ -725,14 +725,13 @@ int _starpu_fetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_
 			       enum starpu_data_access_mode mode, unsigned detached, unsigned is_prefetch, unsigned async,
 			       void (*callback_func)(void *), void *callback_arg, int prio, const char *origin)
 {
-	unsigned local_node = _starpu_memory_node_get_local_key();
         _STARPU_LOG_IN();
 
 	int cpt = 0;
 	while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
 	{
 		cpt++;
-		_starpu_datawizard_progress(local_node, 1);
+		_starpu_datawizard_progress(1);
 	}
 	if (cpt == STARPU_SPIN_MAXTRY)
 		_starpu_spin_lock(&handle->header_lock);
@@ -810,12 +809,11 @@ void _starpu_release_data_on_node(starpu_data_handle_t handle, uint32_t default_
 	if ((wt_mask & ~(1<<memory_node)))
 		_starpu_write_through_data(handle, memory_node, wt_mask);
 
-	unsigned local_node = _starpu_memory_node_get_local_key();
 	int cpt = 0;
 	while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
 	{
 		cpt++;
-		_starpu_datawizard_progress(local_node, 1);
+		_starpu_datawizard_progress(1);
 	}
 	if (cpt == STARPU_SPIN_MAXTRY)
 		_starpu_spin_lock(&handle->header_lock);
@@ -833,12 +831,11 @@ void _starpu_release_data_on_node(starpu_data_handle_t handle, uint32_t default_
 
 static void _starpu_set_data_requested_flag_if_needed(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate)
 {
-	unsigned local_node = _starpu_memory_node_get_local_key();
 	int cpt = 0;
 	while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
 	{
 		cpt++;
-		_starpu_datawizard_progress(local_node, 1);
+		_starpu_datawizard_progress(1);
 	}
 	if (cpt == STARPU_SPIN_MAXTRY)
 		_starpu_spin_lock(&handle->header_lock);

+ 1 - 1
src/datawizard/data_request.c

@@ -244,7 +244,7 @@ int _starpu_wait_data_request_completion(struct _starpu_data_request *r, unsigne
 #endif
 #endif
 
-		_starpu_datawizard_progress(local_node, may_alloc);
+		_starpu_datawizard_progress(may_alloc);
 
 #ifdef STARPU_SIMGRID
 		starpu_pthread_wait_wait(&wait);

+ 26 - 3
src/datawizard/datawizard.c

@@ -25,7 +25,9 @@
 #include <core/simgrid.h>
 #endif
 
-int __starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc, unsigned push_requests)
+static char worker_drives_memory[STARPU_NMAXWORKERS][STARPU_MAXNODES];
+
+int ___starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc, unsigned push_requests)
 {
 	int ret = 0;
 
@@ -63,7 +65,28 @@ int __starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc, unsig
 	return ret;
 }
 
-void _starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc)
+int __starpu_datawizard_progress(unsigned may_alloc, unsigned push_requests)
+{
+    int current_worker_id = starpu_worker_get_id();
+    unsigned memnode;
+
+    int ret = 0;
+
+    for (memnode = 0; memnode < STARPU_MAXNODES; memnode++)
+    {
+        if (worker_drives_memory[current_worker_id][memnode] == 1)
+            ret |= ___starpu_datawizard_progress(memnode, may_alloc, push_requests);
+    }
+
+    return ret;
+}
+
+void _starpu_datawizard_progress(unsigned may_alloc)
+{
+	__starpu_datawizard_progress(may_alloc, 1);
+}
+
+void _starpu_worker_drives_memory_node(unsigned worker_id, unsigned memnode)
 {
-	__starpu_datawizard_progress(memory_node, may_alloc, 1);
+    worker_drives_memory[worker_id][memnode] = 1;   
 }

+ 4 - 2
src/datawizard/datawizard.h

@@ -33,7 +33,9 @@
 
 #include <core/dependencies/implicit_data_deps.h>
 
-int __starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc, unsigned push_requests);
-void _starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc);
+int ___starpu_datawizard_progress(unsigned memory_node, unsigned may_alloc, unsigned push_requests);
+int __starpu_datawizard_progress(unsigned may_alloc, unsigned push_requests);
+void _starpu_datawizard_progress(unsigned may_alloc);
+void _starpu_worker_drives_memory_node(unsigned worker_id, unsigned memnode);
 
 #endif // __DATAWIZARD_H__

+ 1 - 1
src/datawizard/memalloc.c

@@ -1360,7 +1360,7 @@ static starpu_ssize_t _starpu_allocate_interface(starpu_data_handle_t handle, st
 	while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
 	{
 		cpt++;
-		_starpu_datawizard_progress(_starpu_memory_node_get_local_key(), 0);
+		_starpu_datawizard_progress(0);
 	}
 	if (cpt == STARPU_SPIN_MAXTRY)
 		_starpu_spin_lock(&handle->header_lock);

+ 1 - 1
src/datawizard/write_back.c

@@ -50,7 +50,7 @@ void _starpu_write_through_data(starpu_data_handle_t handle, unsigned requesting
 				while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
 				{
 					cpt++;
-					_starpu_datawizard_progress(requesting_node, 1);
+					__starpu_datawizard_progress(1, 1);
 				}
 				if (cpt == STARPU_SPIN_MAXTRY)
 					_starpu_spin_lock(&handle->header_lock);

+ 1 - 3
src/drivers/cpu/driver_cpu.c

@@ -242,9 +242,7 @@ int _starpu_cpu_driver_run_once(struct _starpu_worker *cpu_worker)
 #endif
 
 	_STARPU_TRACE_START_PROGRESS(memnode);
-	res = __starpu_datawizard_progress(memnode, 1, 1);
-	if (memnode != STARPU_MAIN_RAM)
-		res |= __starpu_datawizard_progress(STARPU_MAIN_RAM, 1, 1);
+	res = __starpu_datawizard_progress(1, 1);
 	_STARPU_TRACE_END_PROGRESS(memnode);
 
 	struct _starpu_job *j;

+ 2 - 4
src/drivers/cuda/driver_cuda.c

@@ -770,16 +770,14 @@ int _starpu_cuda_driver_run_once(struct _starpu_worker_set *worker_set)
 	if (!idle)
 	{
 		/* Nothing ready yet, no better thing to do than waiting */
-		__starpu_datawizard_progress(memnode, 1, 0);
-		__starpu_datawizard_progress(STARPU_MAIN_RAM, 1, 0);
+		__starpu_datawizard_progress(1, 0);
 		return 0;
 	}
 #endif
 
 	/* Something done, make some progress */
 	res = !idle;
-	res |= __starpu_datawizard_progress(memnode, 1, 1);
-	res |= __starpu_datawizard_progress(STARPU_MAIN_RAM, 1, 1);
+	res |= __starpu_datawizard_progress(1, 1);
 
 	/* And pull tasks */
 	res |= _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers, memnode);

+ 4 - 2
src/drivers/mp_common/source_common.c

@@ -752,8 +752,7 @@ static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set *
 #endif
 
     _STARPU_TRACE_START_PROGRESS(memnode);
-    res |= __starpu_datawizard_progress(memnode, 1, 1);
-    res |= __starpu_datawizard_progress(STARPU_MAIN_RAM, 1, 1);
+    res |= __starpu_datawizard_progress(1, 1);
     _STARPU_TRACE_END_PROGRESS(memnode);
 
     /* Handle message which have been store */
@@ -800,6 +799,9 @@ static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set *
         }
     }
 
+    /* Handle message which have been store */
+    _starpu_src_common_handle_stored_async(mp_node);
+
 }
 
 

+ 2 - 4
src/drivers/opencl/driver_opencl.c

@@ -755,15 +755,13 @@ int _starpu_opencl_driver_run_once(struct _starpu_worker *worker)
 	if (!idle)
 	{
 		/* Not ready yet, no better thing to do than waiting */
-		__starpu_datawizard_progress(memnode, 1, 0);
-		__starpu_datawizard_progress(STARPU_MAIN_RAM, 1, 0);
+		__starpu_datawizard_progress(1, 0);
 		return 0;
 	}
 #endif
 
 	res = !idle;
-	res |= __starpu_datawizard_progress(memnode, 1, 1);
-	res |= __starpu_datawizard_progress(STARPU_MAIN_RAM, 1, 1);
+	res |= __starpu_datawizard_progress(1, 1);
 
 	task = _starpu_get_worker_task(worker, workerid, memnode);