Bläddra i källkod

merge trunk@6596 (src/drivers/cpu/driver_cpu.c was merged by hand i.e by copying the version out of trunk@6596 and applying by hand all patches from the branch sched_ctx)

Nathalie Furmento 12 år sedan
förälder
incheckning
591fb979a9
3 ändrade filer med 211 tillägg och 115 borttagningar
  1. 1 0
      include/starpu.h
  2. 17 3
      src/core/workers.c
  3. 193 112
      src/drivers/cpu/driver_cpu.c

+ 1 - 0
include/starpu.h

@@ -68,6 +68,7 @@ struct starpu_driver
 	enum starpu_archtype type;
 	union
 	{
+		unsigned cpu_id;
 		unsigned cuda_id;
 #if defined(STARPU_USE_OPENCL) && !defined(__CUDACC__)
 		cl_device_id opencl_id;

+ 17 - 3
src/core/workers.c

@@ -1112,6 +1112,11 @@ starpu_driver_run(struct starpu_driver *d)
 	}
 }
 
+#ifdef STARPU_USE_CPU
+extern int _starpu_cpu_driver_init(struct starpu_driver *);
+extern int _starpu_cpu_driver_run_once(struct starpu_driver *);
+extern int _starpu_cpu_driver_deinit(struct starpu_driver *);
+#endif
 #ifdef STARPU_USE_CUDA
 extern int _starpu_cuda_driver_init(struct starpu_driver *);
 extern int _starpu_cuda_driver_run_once(struct starpu_driver *);
@@ -1130,6 +1135,10 @@ starpu_driver_init(struct starpu_driver *d)
 
 	switch (d->type)
 	{
+#ifdef STARPU_USE_CPU
+	case STARPU_CPU_WORKER:
+		return _starpu_cpu_driver_init(d);
+#endif
 #ifdef STARPU_USE_CUDA
 	case STARPU_CUDA_WORKER:
 		return _starpu_cuda_driver_init(d);
@@ -1138,7 +1147,6 @@ starpu_driver_init(struct starpu_driver *d)
 	case STARPU_OPENCL_WORKER:
 		return _starpu_opencl_driver_init(d);
 #endif
-	case STARPU_CPU_WORKER:    /* Not supported yet */
 	case STARPU_GORDON_WORKER: /* Not supported yet */
 	default:
 		return -EINVAL;
@@ -1152,6 +1160,10 @@ starpu_driver_run_once(struct starpu_driver *d)
 
 	switch (d->type)
 	{
+#ifdef STARPU_USE_CPU
+	case STARPU_CPU_WORKER:
+		return _starpu_cpu_driver_run_once(d);
+#endif
 #ifdef STARPU_USE_CUDA
 	case STARPU_CUDA_WORKER:
 		return _starpu_cuda_driver_run_once(d);
@@ -1160,7 +1172,6 @@ starpu_driver_run_once(struct starpu_driver *d)
 	case STARPU_OPENCL_WORKER:
 		return _starpu_opencl_driver_run_once(d);
 #endif
-	case STARPU_CPU_WORKER:    /* Not supported yet */
 	case STARPU_GORDON_WORKER: /* Not supported yet */
 	default:
 		return -EINVAL;
@@ -1174,6 +1185,10 @@ starpu_driver_deinit(struct starpu_driver *d)
 
 	switch (d->type)
 	{
+#ifdef STARPU_USE_CPU
+	case STARPU_CPU_WORKER:
+		return _starpu_cpu_driver_deinit(d);
+#endif
 #ifdef STARPU_USE_CUDA
 	case STARPU_CUDA_WORKER:
 		return _starpu_cuda_driver_deinit(d);
@@ -1182,7 +1197,6 @@ starpu_driver_deinit(struct starpu_driver *d)
 	case STARPU_OPENCL_WORKER:
 		return _starpu_opencl_driver_deinit(d);
 #endif
-	case STARPU_CPU_WORKER:    /* Not supported yet */
 	case STARPU_GORDON_WORKER: /* Not supported yet */
 	default:
 		return -EINVAL;

+ 193 - 112
src/drivers/cpu/driver_cpu.c

@@ -2,7 +2,7 @@
  *
  * Copyright (C) 2010-2012  Université de Bordeaux 1
  * Copyright (C) 2010  Mehdi Juhoor <mjuhoor@gmail.com>
- * Copyright (C) 2010, 2011  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
  * Copyright (C) 2011  Télécom-SudParis
  * Copyright (C) 2011  INRIA
  *
@@ -51,6 +51,7 @@ static int execute_job_on_cpu(struct _starpu_job *j, struct starpu_task *worker_
 		{
 			/* there was not enough memory so the codelet cannot be executed right now ... */
 			/* push the codelet back and try another one ... */
+			STARPU_ASSERT(ret == 0);
 			return -EAGAIN;
 		}
 	}
@@ -97,162 +98,224 @@ static int execute_job_on_cpu(struct _starpu_job *j, struct starpu_task *worker_
 	return 0;
 }
 
-void *_starpu_cpu_worker(void *arg)
+static struct _starpu_worker*
+_starpu_get_worker_from_driver(struct starpu_driver *d)
 {
-	struct _starpu_worker *cpu_arg = (struct _starpu_worker *) arg;
-	unsigned memnode = cpu_arg->memory_node;
-	int workerid = cpu_arg->workerid;
-	int devid = cpu_arg->devid;
+#if 1
+	int workers[d->id.cpu_id + 1];
+	int nworkers;
+	nworkers = starpu_worker_get_ids_by_type(STARPU_CPU_WORKER, workers, d->id.cpu_id+1);
+	if (nworkers >= 0 && (unsigned) nworkers < d->id.cpu_id)
+		return NULL; // No device was found.
+	
+	return _starpu_get_worker_struct(workers[d->id.cpu_id]);
+#else
+	int workers[STARPU_NMAXWORKERS];
+	int nworkers;
+	nworkers = starpu_worker_get_ids_by_type(STARPU_CPU_WORKER, workers, STARPU_NMAXWORKERS);
+	STARPU_ASSERT(nworkers > 0);
+	int i;
+	for (i = 0; i < nworkers; i++)
+	{
+		fprintf(stderr, "\tCPU %d\n", i);
+	}
+
+	return _starpu_get_worker_struct(workers[d->id.cpu_id]);
+#endif
+}
+
+int _starpu_cpu_driver_init(struct starpu_driver *d)
+{
+	struct _starpu_worker *cpu_worker;
+	cpu_worker = _starpu_get_worker_from_driver(d);
+	STARPU_ASSERT(cpu_worker);
+
+	unsigned memnode = cpu_worker->memory_node;
+	int devid = cpu_worker->devid;
 
 #ifdef STARPU_USE_FXT
-	_starpu_fxt_register_thread(cpu_arg->bindid);
+	_starpu_fxt_register_thread(cpu_worker->bindid);
 #endif
 	_STARPU_TRACE_WORKER_INIT_START(_STARPU_FUT_CPU_KEY, devid, memnode);
 
-	_starpu_bind_thread_on_cpu(cpu_arg->config, cpu_arg->bindid);
+	_starpu_bind_thread_on_cpu(cpu_worker->config, cpu_worker->bindid);
 
-        _STARPU_DEBUG("cpu worker %d is ready on logical cpu %d\n", devid, cpu_arg->bindid);
+        _STARPU_DEBUG("cpu worker %d is ready on logical cpu %d\n", devid, cpu_worker->bindid);
 
-	_starpu_set_local_memory_node_key(&memnode);
+	_starpu_set_local_memory_node_key(&cpu_worker->memory_node);
 
-	_starpu_set_local_worker_key(cpu_arg);
+	_starpu_set_local_worker_key(cpu_worker);
 
-	snprintf(cpu_arg->name, sizeof(cpu_arg->name), "CPU %d", devid);
-	snprintf(cpu_arg->short_name, sizeof(cpu_arg->short_name), "CPU %d", devid);
+	snprintf(cpu_worker->name, sizeof(cpu_worker->name), "CPU %d", devid);
+	snprintf(cpu_worker->short_name, sizeof(cpu_worker->short_name), "CPU %d", devid);
 
-	cpu_arg->status = STATUS_UNKNOWN;
+	cpu_worker->status = STATUS_UNKNOWN;
 
 	_STARPU_TRACE_WORKER_INIT_END
 
         /* tell the main thread that we are ready */
-	_STARPU_PTHREAD_MUTEX_LOCK(&cpu_arg->mutex);
-	cpu_arg->worker_is_initialized = 1;
-	_STARPU_PTHREAD_COND_SIGNAL(&cpu_arg->ready_cond);
-	_STARPU_PTHREAD_MUTEX_UNLOCK(&cpu_arg->mutex);
+	_STARPU_PTHREAD_MUTEX_LOCK(&cpu_worker->mutex);
+	cpu_worker->worker_is_initialized = 1;
+	_STARPU_PTHREAD_COND_SIGNAL(&cpu_worker->ready_cond);
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&cpu_worker->mutex);
+	return 0;
+}
 
-        struct _starpu_job *j;
-	struct starpu_task *task;
+int _starpu_cpu_driver_run_once(struct starpu_driver *d)
+{
+	struct _starpu_worker *cpu_worker;
+	cpu_worker = _starpu_get_worker_from_driver(d);
+	STARPU_ASSERT(cpu_worker);
 
-	int res;
+	pthread_cond_t *sched_cond = &cpu_worker->sched_cond;
+	pthread_mutex_t *sched_mutex = &cpu_worker->sched_mutex;
 
-	pthread_cond_t *sched_cond = &cpu_arg->sched_cond;
-	pthread_mutex_t *sched_mutex = &cpu_arg->sched_mutex;
 	struct timespec start_time, end_time;
 	unsigned idle = 0;
-	while (_starpu_machine_is_running())
-	{
-		_STARPU_TRACE_START_PROGRESS(memnode);
-		_starpu_datawizard_progress(memnode, 1);
-		_STARPU_TRACE_END_PROGRESS(memnode);
 
-		/* take the mutex inside pop because it depends what mutex:
-		   the one of the local task or the one of one of the strategies */
-		task = _starpu_pop_task(cpu_arg);
+	unsigned memnode = cpu_worker->memory_node;
+	int workerid = cpu_worker->workerid;
+
+	_STARPU_TRACE_START_PROGRESS(memnode);
+	_starpu_datawizard_progress(memnode, 1);
+	_STARPU_TRACE_END_PROGRESS(memnode);
+
+	/* take the mutex inside pop because it depends what mutex:
+	   the one of the local task or the one of one of the strategies */
 
-                if (!task)
+        struct _starpu_job *j;
+	struct starpu_task *task;
+	int res;
+
+	task = _starpu_pop_task(cpu_worker);
+
+	if (!task)
+	{
+		_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+		if (_starpu_worker_can_block(memnode))
 		{
-			_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
-			if (_starpu_worker_can_block(memnode))
-				_starpu_block_worker(workerid, sched_cond, sched_mutex);
-			else
-			{
-				_starpu_clock_gettime(&start_time);
-				_starpu_worker_register_sleeping_start_date(workerid, &start_time);
-				idle = 1;
-
-			}
-			_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
-			continue;
-		};
-
-		if(idle)
+/* 			struct starpu_sched_ctx **sched_ctx = cpu_arg->sched_ctx; */
+/* 			int i = 0; */
+/* 			int sleep = 0; */
+/* 			for(i = 0; i < cpu_arg->nctxs; i++){ */
+/* 			  if(sched_ctx[i]->sched_ctx_id  == 2 ){ */
+/* 			    sleep = 1; */
+/* 			    break; */
+/* 			  } */
+/* 			} */
+/* 			if(sleep) */
+			_starpu_block_worker(workerid, sched_cond, sched_mutex);
+		}
+		else
 		{
-			_starpu_clock_gettime(&end_time);
-			
-			int profiling = starpu_profiling_status_get();
-			if (profiling)
-			{
-				struct timespec sleeping_time;
-				starpu_timespec_sub(&end_time, &start_time, &sleeping_time);
-				_starpu_worker_update_profiling_info_sleeping(workerid, &start_time, &end_time);
-			}
-			idle = 0;
+			_starpu_clock_gettime(&start_time);
+			_starpu_worker_register_sleeping_start_date(workerid, &start_time);
+			idle = 1;
 		}
 
-		STARPU_ASSERT(task);
-		j = _starpu_get_job_associated_to_task(task);
+		_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+
+		return 0;
+	};
 
-		/* can a cpu perform that task ? */
-		if (!_STARPU_CPU_MAY_PERFORM(j))
+	if(idle)
+	{
+		_starpu_clock_gettime(&end_time);
+		int profiling = starpu_profiling_status_get();
+		if (profiling)
 		{
-			/* put it and the end of the queue ... XXX */
-			_starpu_push_task(j);
-			continue;
+			struct timespec sleeping_time;
+			starpu_timespec_sub(&end_time, &start_time, &sleeping_time);
+			_starpu_worker_update_profiling_info_sleeping(workerid, &start_time, &end_time);
 		}
+		idle = 0;
+	}
 
-		int rank = 0;
-		int is_parallel_task = (j->task_size > 1);
+	STARPU_ASSERT(task);
+	j = _starpu_get_job_associated_to_task(task);
 
-		enum starpu_perf_archtype perf_arch;
+	/* can a cpu perform that task ? */
+	if (!_STARPU_CPU_MAY_PERFORM(j))
+	{
+		/* put it and the end of the queue ... XXX */
+		_starpu_push_task(j);
+		return 0;
+	}
 
-		/* Get the rank in case it is a parallel task */
-		if (is_parallel_task)
-		{
-			_STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
-			rank = j->active_task_alias_count++;
-			_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
+	int rank = 0;
+	int is_parallel_task = (j->task_size > 1);
 
-			struct _starpu_combined_worker *combined_worker;
-			combined_worker = _starpu_get_combined_worker_struct(j->combined_workerid);
+	enum starpu_perf_archtype perf_arch;
 
-			cpu_arg->combined_workerid = j->combined_workerid;
-			cpu_arg->worker_size = combined_worker->worker_size;
-			cpu_arg->current_rank = rank;
-			perf_arch = combined_worker->perf_arch;
-		}
-		else
-		{
-			cpu_arg->combined_workerid = cpu_arg->workerid;
-			cpu_arg->worker_size = 1;
-			cpu_arg->current_rank = 0;
-			perf_arch = cpu_arg->perf_arch;
-		}
+	/* Get the rank in case it is a parallel task */
+	if (is_parallel_task)
+	{
+		_STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
+		rank = j->active_task_alias_count++;
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 
-		_starpu_set_current_task(j->task);
-		cpu_arg->current_task = j->task;
+		struct _starpu_combined_worker *combined_worker;
+		combined_worker = _starpu_get_combined_worker_struct(j->combined_workerid);
 
-                res = execute_job_on_cpu(j, task, cpu_arg, rank, perf_arch);
+		cpu_worker->combined_workerid = j->combined_workerid;
+		cpu_worker->worker_size = combined_worker->worker_size;
+		cpu_worker->current_rank = rank;
+		perf_arch = combined_worker->perf_arch;
+	}
+	else
+	{
+		cpu_worker->combined_workerid = cpu_worker->workerid;
+		cpu_worker->worker_size = 1;
+		cpu_worker->current_rank = 0;
+		perf_arch = cpu_worker->perf_arch;
+	}
 
-		_starpu_set_current_task(NULL);
-		cpu_arg->current_task = NULL;
+	_starpu_set_current_task(j->task);
+	cpu_worker->current_task = j->task;
 
-		if (res)
-		{
-			switch (res)
-			{
-				case -EAGAIN:
-					_starpu_push_task(j);
-					continue;
-				default:
-					STARPU_ASSERT(0);
-			}
-		}
+	res = execute_job_on_cpu(j, task, cpu_worker, rank, perf_arch);
+
+	_starpu_set_current_task(NULL);
+	cpu_worker->current_task = NULL;
 
-		/* In the case of combined workers, we need to inform the
-		 * scheduler each worker's execution is over.
-		 * Then we free the workers' task alias */
-		if (is_parallel_task)
+	if (res)
+	{
+		switch (res)
 		{
-			_starpu_sched_post_exec_hook(task);
-			free(task);
+		case -EAGAIN:
+			_starpu_push_task(j);
+			return 0;
+		default:
+			STARPU_ABORT();
 		}
+	}
+
+	/* In the case of combined workers, we need to inform the
+	 * scheduler each worker's execution is over.
+	 * Then we free the workers' task alias */
+	if (is_parallel_task)
+	{
+		_starpu_sched_post_exec_hook(task);
+		free(task);
+	}
 
-		if (rank == 0)
-			_starpu_handle_job_termination(j, workerid);
-        }
+	if (rank == 0)
+	{
+		_starpu_handle_job_termination(j, workerid);
+	}
 
+	return 0;
+}
+
+int _starpu_cpu_driver_deinit(struct starpu_driver *d)
+{
 	_STARPU_TRACE_WORKER_DEINIT_START
 
+	struct _starpu_worker *cpu_worker;
+	cpu_worker = _starpu_get_worker_from_driver(d);
+	STARPU_ASSERT(cpu_worker);
+
+	unsigned memnode = cpu_worker->memory_node;
 	_starpu_handle_all_pending_node_data_requests(memnode);
 
 	/* In case there remains some memory that was automatically
@@ -262,5 +325,23 @@ void *_starpu_cpu_worker(void *arg)
 
 	_STARPU_TRACE_WORKER_DEINIT_END(_STARPU_FUT_CPU_KEY);
 
+	return 0;
+}
+
+void *
+_starpu_cpu_worker(void *arg)
+{
+	struct _starpu_worker *args = arg;
+	struct starpu_driver d =
+	{
+		.type      = STARPU_CPU_WORKER,
+		.id.cpu_id = args->devid
+	};
+
+	_starpu_cpu_driver_init(&d);
+	while (_starpu_machine_is_running())
+		_starpu_cpu_driver_run_once(&d);
+	_starpu_cpu_driver_deinit(&d);
+
 	return NULL;
 }