瀏覽代碼

Remove mutexes and conds from simgrid tasks and transfers since we are using queues everywhere now

Samuel Thibault 8 年之前
父節點
當前提交
f3f9acf198

+ 45 - 39
src/core/simgrid.c

@@ -374,8 +374,6 @@ struct task
 
 	/* communication termination signalization */
 	unsigned *finished;
-	starpu_pthread_mutex_t *mutex;
-	starpu_pthread_cond_t *cond;
 
 	/* Next task on this worker */
 	struct task *next;
@@ -406,10 +404,7 @@ static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_AT
 		MSG_task_destroy(task->task);
 		_STARPU_DEBUG("task %p finished\n", task);
 
-		STARPU_PTHREAD_MUTEX_LOCK(task->mutex);
 		*task->finished = 1;
-		STARPU_PTHREAD_COND_BROADCAST(task->cond);
-		STARPU_PTHREAD_MUTEX_UNLOCK(task->mutex);
 		/* The worker which started this task may be sleeping out of tasks, wake it  */
 		starpu_wake_worker(workerid);
 
@@ -427,16 +422,23 @@ void _starpu_simgrid_wait_tasks(int workerid)
 		return;
 
 	unsigned *finished = task->finished;
-	starpu_pthread_mutex_t *mutex = task->mutex;
-	starpu_pthread_cond_t *cond = task->cond;
-	STARPU_PTHREAD_MUTEX_LOCK(mutex);
-	while (!*finished)
-		STARPU_PTHREAD_COND_WAIT(cond, mutex);
-	STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+	starpu_pthread_wait_t wait;
+	starpu_pthread_wait_init(&wait);
+	starpu_pthread_queue_register(&wait, &_starpu_simgrid_task_queue[workerid]);
+
+	while(1)
+	{
+		starpu_pthread_wait_reset(&wait);
+		if (*finished)
+			break;
+		starpu_pthread_wait_wait(&wait);
+	}
+	starpu_pthread_queue_unregister(&wait, &_starpu_simgrid_task_queue[workerid]);
+	starpu_pthread_wait_destroy(&wait);
 }
 
 /* Task execution submitted by StarPU */
-void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch, double length, unsigned *finished, starpu_pthread_mutex_t *mutex, starpu_pthread_cond_t *cond)
+void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch, double length, unsigned *finished)
 {
 	struct starpu_task *starpu_task = j->task;
 	msg_task_t simgrid_task;
@@ -478,8 +480,6 @@ void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *j, struct star
 		task->task = simgrid_task;
 		task->finished = finished;
 		*finished = 0;
-		task->mutex = mutex;
-		task->cond = cond;
 		task->next = NULL;
 		/* Sleep 10µs for the GPU task queueing */
 		if (_starpu_simgrid_queue_malloc_cost())
@@ -513,8 +513,6 @@ LIST_TYPE(transfer,
 
 	/* communication termination signalization */
 	unsigned *finished;
-	starpu_pthread_mutex_t *mutex;
-	starpu_pthread_cond_t *cond;
 
 	/* transfers which wait for this transfer */
 	struct transfer **wake;
@@ -589,10 +587,7 @@ static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARP
 	MSG_task_execute(transfer->task);
 	MSG_task_destroy(transfer->task);
 	_STARPU_DEBUG("transfer %p finished\n", transfer);
-	STARPU_PTHREAD_MUTEX_LOCK(transfer->mutex);
 	*transfer->finished = 1;
-	STARPU_PTHREAD_COND_BROADCAST(transfer->cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(transfer->mutex);
 
 	/* The workers which started this request may be sleeping out of tasks, wake it  */
 	_starpu_wake_all_blocked_workers_on_node(transfer->run_node);
@@ -656,6 +651,30 @@ static void transfer_submit(struct transfer *transfer)
 	}
 }
 
+int _starpu_simgrid_wait_transfer_event(union _starpu_async_channel_event *event)
+{
+	/* this is not associated to a request so it's synchronous */
+	starpu_pthread_wait_t wait;
+	starpu_pthread_wait_init(&wait);
+	starpu_pthread_queue_register(&wait, event->queue);
+
+	while(1)
+	{
+		starpu_pthread_wait_reset(&wait);
+		if (event->finished)
+			break;
+		starpu_pthread_wait_wait(&wait);
+	}
+	starpu_pthread_queue_unregister(&wait, event->queue);
+	starpu_pthread_wait_destroy(&wait);
+	return 0;
+}
+
+int _starpu_simgrid_test_transfer_event(union _starpu_async_channel_event *event)
+{
+	return event->finished;
+}
+
 /* Data transfer issued by StarPU */
 int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node, struct _starpu_data_request *req)
 {
@@ -667,9 +686,7 @@ int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node,
 	msg_host_t *hosts;
 	double *computation;
 	double *communication;
-	starpu_pthread_mutex_t mutex;
-	starpu_pthread_cond_t cond;
-	unsigned finished;
+	union _starpu_async_channel_event *event, myevent;
 
 	_STARPU_CALLOC(hosts, 2, sizeof(*hosts));
 	_STARPU_CALLOC(computation, 2, sizeof(*computation));
@@ -692,21 +709,13 @@ int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node,
 	transfer->run_node = _starpu_memory_node_get_local_key();
 
 	if (req)
-	{
-		transfer->finished = &req->async_channel.event.finished;
-		transfer->mutex = &req->async_channel.event.mutex;
-		transfer->cond = &req->async_channel.event.cond;
-	}
+		event = &req->async_channel.event;
 	else
-	{
-		transfer->finished = &finished;
-		transfer->mutex = &mutex;
-		transfer->cond = &cond;
-	}
+		event = &myevent;
+	event->finished = 0;
+	transfer->finished = &event->finished;
+	event->queue = &_starpu_simgrid_transfer_queue[transfer->run_node];
 
-	*transfer->finished = 0;
-	STARPU_PTHREAD_MUTEX_INIT(transfer->mutex, NULL);
-	STARPU_PTHREAD_COND_INIT(transfer->cond, NULL);
 	transfer->wake = NULL;
 	transfer->nwake = 0;
 	transfer->nwait = 0;
@@ -729,10 +738,7 @@ int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node,
 	else
 	{
 		/* this is not associated to a request so it's synchronous */
-		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-		while (!finished)
-			STARPU_PTHREAD_COND_WAIT(&cond, &mutex);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+		_starpu_simgrid_wait_transfer_event(event);
 		return 0;
 	}
 }

+ 3 - 1
src/core/simgrid.h

@@ -42,8 +42,10 @@ void _starpu_simgrid_init_early(int *argc, char ***argv);
 void _starpu_simgrid_init(void);
 void _starpu_simgrid_deinit(void);
 void _starpu_simgrid_wait_tasks(int workerid);
-void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *job, struct starpu_perfmodel_arch* perf_arch, double length, unsigned *finished, starpu_pthread_mutex_t *mutex, starpu_pthread_cond_t *cond);
+void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *job, struct starpu_perfmodel_arch* perf_arch, double length, unsigned *finished);
 int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node, struct _starpu_data_request *req);
+int _starpu_simgrid_wait_transfer_event(union _starpu_async_channel_event *event);
+int _starpu_simgrid_test_transfer_event(union _starpu_async_channel_event *event);
 /* Return the number of hosts prefixed by PREFIX */
 int _starpu_simgrid_get_nbhosts(const char *prefix);
 unsigned long long _starpu_simgrid_get_memsize(const char *prefix, unsigned devid);

+ 3 - 10
src/datawizard/copy_driver.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2016  Université de Bordeaux
+ * Copyright (C) 2010-2017  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2013, 2016  CNRS
  * Copyright (C) 2016  INRIA
  *
@@ -803,10 +803,7 @@ int starpu_interface_copy(uintptr_t src, size_t src_offset, unsigned src_node, u
 void _starpu_driver_wait_request_completion(struct _starpu_async_channel *async_channel)
 {
 #ifdef STARPU_SIMGRID
-	STARPU_PTHREAD_MUTEX_LOCK(&async_channel->event.mutex);
-	while (!async_channel->event.finished)
-		STARPU_PTHREAD_COND_WAIT(&async_channel->event.cond, &async_channel->event.mutex);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&async_channel->event.mutex);
+	_starpu_simgrid_wait_transfer_event(&async_channel->event);
 #else /* !SIMGRID */
 	enum starpu_node_kind kind = async_channel->type;
 #ifdef STARPU_USE_CUDA
@@ -868,11 +865,7 @@ void _starpu_driver_wait_request_completion(struct _starpu_async_channel *async_
 unsigned _starpu_driver_test_request_completion(struct _starpu_async_channel *async_channel)
 {
 #ifdef STARPU_SIMGRID
-	unsigned ret;
-	STARPU_PTHREAD_MUTEX_LOCK(&async_channel->event.mutex);
-	ret = async_channel->event.finished;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&async_channel->event.mutex);
-	return ret;
+	return _starpu_simgrid_test_transfer_event(&async_channel->event);
 #else /* !SIMGRID */
 	enum starpu_node_kind kind = async_channel->type;
 	unsigned success = 0;

+ 2 - 3
src/datawizard/copy_driver.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010, 2012-2015  Université de Bordeaux
+ * Copyright (C) 2010, 2012-2015, 2017  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2013, 2015  CNRS
  * Copyright (C) 2016  INRIA
  *
@@ -86,8 +86,7 @@ union _starpu_async_channel_event
 	struct
 	{
 		unsigned finished;
-		starpu_pthread_mutex_t mutex;
-		starpu_pthread_cond_t cond;
+		starpu_pthread_queue_t *queue;
 	};
 #endif
 #ifdef STARPU_USE_CUDA

+ 1 - 1
src/drivers/cpu/driver_cpu.c

@@ -106,7 +106,7 @@ static int execute_job_on_cpu(struct _starpu_job *j, struct starpu_task *worker_
 			if (cl->flags & STARPU_CODELET_SIMGRID_EXECUTE)
 				func(_STARPU_TASK_GET_INTERFACES(task), task->cl_arg);
 			else
-				_starpu_simgrid_submit_job(cpu_args->workerid, j, perf_arch, NAN, NULL, NULL, NULL);
+				_starpu_simgrid_submit_job(cpu_args->workerid, j, perf_arch, NAN, NULL);
 #else
 			func(_STARPU_TASK_GET_INTERFACES(task), task->cl_arg);
 #endif

+ 2 - 13
src/drivers/cuda/driver_cuda.c

@@ -67,8 +67,6 @@ static cudaEvent_t task_events[STARPU_NMAXWORKERS][STARPU_MAX_PIPELINE];
 #endif /* STARPU_USE_CUDA */
 #ifdef STARPU_SIMGRID
 static unsigned task_finished[STARPU_NMAXWORKERS][STARPU_MAX_PIPELINE];
-static starpu_pthread_mutex_t task_mutex[STARPU_NMAXWORKERS][STARPU_MAX_PIPELINE];
-static starpu_pthread_cond_t task_cond[STARPU_NMAXWORKERS][STARPU_MAX_PIPELINE];
 #endif /* STARPU_SIMGRID */
 
 static enum initialization cuda_device_init[STARPU_MAXCUDADEVS];
@@ -375,11 +373,7 @@ static void init_worker_context(unsigned workerid)
 	int j;
 #ifdef STARPU_SIMGRID
 	for (j = 0; j < STARPU_MAX_PIPELINE; j++)
-	{
 		task_finished[workerid][j] = 0;
-		STARPU_PTHREAD_MUTEX_INIT(&task_mutex[workerid][j], NULL);
-		STARPU_PTHREAD_COND_INIT(&task_cond[workerid][j], NULL);
-	}
 #else /* !STARPU_SIMGRID */
 	cudaError_t cures;
 
@@ -418,10 +412,7 @@ static void deinit_worker_context(unsigned workerid)
 	unsigned j;
 #ifdef STARPU_SIMGRID
 	for (j = 0; j < STARPU_MAX_PIPELINE; j++)
-	{
-		STARPU_PTHREAD_MUTEX_DESTROY(&task_mutex[workerid][j]);
-		STARPU_PTHREAD_COND_DESTROY(&task_cond[workerid][j]);
-	}
+		task_finished[workerid][j] = 0;
 #else /* STARPU_SIMGRID */
 	for (j = 0; j < STARPU_MAX_PIPELINE; j++)
 		cudaEventDestroy(task_events[workerid][j]);
@@ -508,9 +499,7 @@ static int start_job_on_cuda(struct _starpu_job *j, struct _starpu_worker *worke
 			func(_STARPU_TASK_GET_INTERFACES(task), task->cl_arg);
 		else
 			_starpu_simgrid_submit_job(workerid, j, &worker->perf_arch, NAN,
-				async ? &task_finished[workerid][pipeline_idx] : NULL,
-				async ? &task_mutex[workerid][pipeline_idx] : NULL,
-				async ? &task_cond[workerid][pipeline_idx] : NULL);
+				async ? &task_finished[workerid][pipeline_idx] : NULL);
 #else
 		func(_STARPU_TASK_GET_INTERFACES(task), task->cl_arg);
 #endif

+ 1 - 13
src/drivers/opencl/driver_opencl.c

@@ -57,8 +57,6 @@ static cl_event task_events[STARPU_MAXOPENCLDEVS][STARPU_MAX_PIPELINE];
 #endif
 #ifdef STARPU_SIMGRID
 static unsigned task_finished[STARPU_MAXOPENCLDEVS][STARPU_MAX_PIPELINE];
-static starpu_pthread_mutex_t task_mutex[STARPU_MAXOPENCLDEVS][STARPU_MAX_PIPELINE];
-static starpu_pthread_cond_t task_cond[STARPU_MAXOPENCLDEVS][STARPU_MAX_PIPELINE];
 #endif /* STARPU_SIMGRID */
 
 void
@@ -153,11 +151,7 @@ int _starpu_opencl_init_context(int devid)
 #ifdef STARPU_SIMGRID
 	int j;
 	for (j = 0; j < STARPU_MAX_PIPELINE; j++)
-	{
 		task_finished[devid][j] = 0;
-		STARPU_PTHREAD_MUTEX_INIT(&task_mutex[devid][j], NULL);
-		STARPU_PTHREAD_COND_INIT(&task_cond[devid][j], NULL);
-	}
 #else /* !STARPU_SIMGRID */
 	cl_int err;
 	cl_uint uint;
@@ -206,11 +200,7 @@ int _starpu_opencl_deinit_context(int devid)
 #ifdef STARPU_SIMGRID
 	int j;
 	for (j = 0; j < STARPU_MAX_PIPELINE; j++)
-	{
 		task_finished[devid][j] = 0;
-		STARPU_PTHREAD_MUTEX_DESTROY(&task_mutex[devid][j]);
-		STARPU_PTHREAD_COND_DESTROY(&task_cond[devid][j]);
-	}
 #else /* !STARPU_SIMGRID */
         cl_int err;
 
@@ -978,9 +968,7 @@ static int _starpu_opencl_start_job(struct _starpu_job *j, struct _starpu_worker
 		}
 		if (simulate)
 			_starpu_simgrid_submit_job(worker->workerid, j, &worker->perf_arch, length,
-						   async ? &task_finished[worker->devid][pipeline_idx] : NULL,
-						   async ? &task_mutex[worker->devid][pipeline_idx] : NULL,
-						   async ? &task_cond[worker->devid][pipeline_idx] : NULL);
+						   async ? &task_finished[worker->devid][pipeline_idx] : NULL);
 #else
 		func(_STARPU_TASK_GET_INTERFACES(task), task->cl_arg);
 #endif