Selaa lähdekoodia

Rework simgrid transfers to always use a transfer structure

Samuel Thibault 12 vuotta sitten
vanhempi
commit
e0313a071e
1 muutettua tiedostoa jossa 34 lisäystä ja 17 poistoa
  1. 34 17
      src/core/simgrid.c

+ 34 - 17
src/core/simgrid.c

@@ -255,6 +255,9 @@ int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node,
 	msg_host_t *hosts = calloc(2, sizeof(*hosts));
 	double *computation = calloc(2, sizeof(*computation));
 	double *communication = calloc(4, sizeof(*communication));
+	_starpu_pthread_mutex_t mutex;
+	_starpu_pthread_cond_t cond;
+	unsigned finished;
 
 	hosts[0] = _starpu_simgrid_memory_node_get_host(src_node);
 	hosts[1] = _starpu_simgrid_memory_node_get_host(dst_node);
@@ -263,29 +266,43 @@ int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node,
 
 	task = MSG_parallel_task_create("copy", 2, hosts, computation, communication, NULL);
 
-	if (!req) {
-		/* this is not associated to a request so it's synchronous */
-		MSG_task_execute(task);
-		MSG_task_destroy(task);
-		return 0;
-	}
-	_STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
-	req->async_channel.event.finished = 0;
-	_STARPU_PTHREAD_MUTEX_INIT(&req->async_channel.event.mutex, NULL);
-	_STARPU_PTHREAD_COND_INIT(&req->async_channel.event.cond, NULL);
-
 	struct transfer *transfer = malloc(sizeof (*transfer));
 	transfer->task = task;
-	transfer->finished = &req->async_channel.event.finished;
-	transfer->mutex = &req->async_channel.event.mutex;
-	transfer->cond = &req->async_channel.event.cond;
 	transfer->src_node = src_node;
 	transfer->dst_node = dst_node;
+
+	if (req) {
+		transfer->finished = &req->async_channel.event.finished;
+		transfer->mutex = &req->async_channel.event.mutex;
+		transfer->cond = &req->async_channel.event.cond;
+	} else {
+		transfer->finished = &finished;
+		transfer->mutex = &mutex;
+		transfer->cond = &cond;
+	}
+
+	*transfer->finished = 0;
+	_STARPU_PTHREAD_MUTEX_INIT(transfer->mutex, NULL);
+	_STARPU_PTHREAD_COND_INIT(transfer->cond, NULL);
+
+	if (req)
+		_STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
+
 	MSG_process_create("transfer task", transfer_execute, transfer, MSG_get_host_by_name("MAIN"));
+	/* Note: from here, transfer might be already freed */
 
-	_STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
-	_STARPU_TRACE_DATA_COPY(src_node, dst_node, size);
-	return -EAGAIN;
+	if (req) {
+		_STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
+		_STARPU_TRACE_DATA_COPY(src_node, dst_node, size);
+		return -EAGAIN;
+	} else {
+		/* this is not associated to a request so it's synchronous */
+		_STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+		while (!finished)
+			_STARPU_PTHREAD_COND_WAIT(&cond, &mutex);
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+		return 0;
+	}
 }
 
 static int last_key;