ソースを参照

Factorize the code used to block a worker when there is nothing to do at all.

Cédric Augonnet 15 年 前
コミット
87fb505882
共有5 個のファイルを変更した48 個の追加58 個の削除を含む
  1. 19 0
      src/core/workers.c
  2. 1 0
      src/core/workers.h
  3. 10 20
      src/drivers/cpu/driver_cpu.c
  4. 9 19
      src/drivers/cuda/driver_cuda.c
  5. 9 19
      src/drivers/opencl/driver_opencl.c

+ 19 - 0
src/core/workers.c

@@ -21,6 +21,7 @@
 #include <core/workers.h>
 #include <core/debug.h>
 #include <core/task.h>
+#include <profiling/profiling.h>
 
 #ifdef __MINGW32__
 #include <windows.h>
@@ -607,3 +608,21 @@ void starpu_worker_get_name(int id, char *dst, size_t maxlen)
 
 	snprintf(dst, maxlen, "%s", name);
 }
+
+/* TODO move in some driver/common/ directory */
+/* Workers may block when there is no work to do at all. We assume that the
+ * mutex is hold when that function is called. */
+void _starpu_block_worker(int workerid, pthread_cond_t *cond, pthread_mutex_t *mutex)
+{
+	int profiling;
+	
+	profiling = starpu_profiling_status_get();
+
+	int64_t start_time, end_time;
+	start_time = (int64_t)_starpu_timing_now();
+	PTHREAD_COND_WAIT(cond, mutex);
+	end_time = (int64_t)_starpu_timing_now();
+
+	if (profiling)
+		_starpu_worker_update_profiling_info(workerid, 0, end_time - start_time, 0);
+}

+ 1 - 0
src/core/workers.h

@@ -153,6 +153,7 @@ inline uint32_t _starpu_may_submit_cpu_task(void);
 inline uint32_t _starpu_may_submit_opencl_task(void);
 inline uint32_t _starpu_worker_may_execute_task(unsigned workerid, uint32_t where);
 unsigned _starpu_worker_can_block(unsigned memnode);
+void _starpu_block_worker(int workerid, pthread_cond_t *cond, pthread_mutex_t *mutex);
 
 inline void _starpu_lock_all_queues_attached_to_node(unsigned node);
 inline void _starpu_unlock_all_queues_attached_to_node(unsigned node);

+ 10 - 20
src/drivers/cpu/driver_cpu.c

@@ -127,6 +127,7 @@ void *_starpu_cpu_worker(void *arg)
 	struct starpu_worker_s *cpu_arg = arg;
 	struct starpu_jobq_s *jobq = cpu_arg->jobq;
 	unsigned memnode = cpu_arg->memory_node;
+	int workerid = cpu_arg->workerid;
 
 #ifdef STARPU_USE_FXT
 	_starpu_fxt_register_thread(cpu_arg->bindid);
@@ -163,9 +164,6 @@ void *_starpu_cpu_worker(void *arg)
 	struct starpu_sched_policy_s *policy = _starpu_get_sched_policy();
 	struct starpu_jobq_s *queue = policy->starpu_get_local_queue(policy);
 
-	int profiling;
-	int64_t start_time, end_time;
-
 	while (_starpu_machine_is_running())
 	{
 		STARPU_TRACE_START_PROGRESS(memnode);
@@ -183,24 +181,16 @@ void *_starpu_cpu_worker(void *arg)
 		if (!j)
 			j = _starpu_pop_task();
 		
-		profiling = starpu_profiling_status_get();
-		
                 if (j == NULL) 
-		  {
-		    if (_starpu_worker_can_block(memnode)) {
-
-		      start_time = (int64_t)_starpu_timing_now();
-		      PTHREAD_COND_WAIT(&queue->activity_cond, &queue->activity_mutex);
-		      end_time = (int64_t)_starpu_timing_now();
-
-		      if(profiling){		
-			_starpu_worker_update_profiling_info(cpu_arg->workerid, 0, end_time - start_time, 0);
-		      }   
-		    }
-		    _starpu_jobq_unlock(queue);
-		    continue;
-		  };
-		
+		{
+			if (_starpu_worker_can_block(memnode))
+				_starpu_block_worker(workerid, &queue->activity_cond, &queue->activity_mutex);
+
+			_starpu_jobq_unlock(queue);
+
+			continue;
+		};
+	
 		_starpu_jobq_unlock(queue);
 		
 		/* can a cpu perform that task ? */

+ 9 - 19
src/drivers/cuda/driver_cuda.c

@@ -200,6 +200,7 @@ void *_starpu_cuda_worker(void *arg)
 	struct starpu_jobq_s *jobq = args->jobq;
 
 	int devid = args->devid;
+	int workerid = args->workerid;
 	unsigned memnode = args->memory_node;
 
 #ifdef STARPU_USE_FXT
@@ -247,9 +248,6 @@ void *_starpu_cuda_worker(void *arg)
 	struct starpu_sched_policy_s *policy = _starpu_get_sched_policy();
 	struct starpu_jobq_s *queue = policy->starpu_get_local_queue(policy);
 	
-	int profiling;
-	int64_t start_time, end_time;
-
 	while (_starpu_machine_is_running())
 	{
 		STARPU_TRACE_START_PROGRESS(memnode);
@@ -267,23 +265,15 @@ void *_starpu_cuda_worker(void *arg)
 		if (!j)
 			j = _starpu_pop_task();
 	       
-		profiling = starpu_profiling_status_get();
-	
                 if (j == NULL) 
-		  {
-		    if (_starpu_worker_can_block(memnode)) {
-
-		      start_time = (int64_t)_starpu_timing_now();
-		      PTHREAD_COND_WAIT(&queue->activity_cond, &queue->activity_mutex);
-		      end_time = (int64_t)_starpu_timing_now();
-
-		      if(profiling){		
-			_starpu_worker_update_profiling_info(args->workerid, 0, end_time - start_time, 0);
-		      }   
-		    }
-		    _starpu_jobq_unlock(queue);
-		    continue;
-		  };
+		{
+			if (_starpu_worker_can_block(memnode))
+				_starpu_block_worker(workerid, &queue->activity_cond, &queue->activity_mutex);
+
+			_starpu_jobq_unlock(queue);
+
+			continue;
+		};
 
 		_starpu_jobq_unlock(queue);
 

+ 9 - 19
src/drivers/opencl/driver_opencl.c

@@ -190,6 +190,7 @@ void *_starpu_opencl_worker(void *arg)
 	struct starpu_jobq_s *jobq = args->jobq;
 
 	int devid = args->devid;
+	int workerid = args->workerid;
 
 #ifdef USE_FXT
 	fxt_register_thread(args->bindid);
@@ -234,9 +235,6 @@ void *_starpu_opencl_worker(void *arg)
 	struct starpu_sched_policy_s *policy = _starpu_get_sched_policy();
 	struct starpu_jobq_s *queue = policy->starpu_get_local_queue(policy);
 
-	int profiling;
-	int64_t start_time, end_time;
-
 	while (_starpu_machine_is_running())
 	{
 		STARPU_TRACE_START_PROGRESS(memnode);
@@ -254,23 +252,15 @@ void *_starpu_opencl_worker(void *arg)
 		if (!j)
 			j = _starpu_pop_task();
 		
- 		profiling = starpu_profiling_status_get();
-		
                 if (j == NULL) 
-		  {
-		    if (_starpu_worker_can_block(memnode)) {
-
-		      start_time = (int64_t)_starpu_timing_now();
-		      PTHREAD_COND_WAIT(&queue->activity_cond, &queue->activity_mutex);
-		      end_time = (int64_t)_starpu_timing_now();
-
-		      if(profiling){		
-			_starpu_worker_update_profiling_info(args->workerid, 0, end_time - start_time, 0);
-		      }   
-		    }
-		    _starpu_jobq_unlock(queue);
-		    continue;
-		  };
+		{
+			if (_starpu_worker_can_block(memnode))
+				_starpu_block_worker(workerid, &queue->activity_cond, &queue->activity_mutex);
+
+			_starpu_jobq_unlock(queue);
+
+			continue;
+		};
 
 		_starpu_jobq_unlock(queue);