Преглед на файлове

- fix omp task implementation
- add a table to associate workers with omp threads

Olivier Aumage преди 11 години
родител
ревизия
23b36f6bd5
променени са 6 файла, в които са добавени 110 реда и са изтрити 49 реда
  1. 15 0
      src/core/jobs.c
  2. 5 0
      src/core/jobs.h
  3. 6 0
      src/core/task.c
  4. 3 0
      src/core/task.h
  5. 77 46
      src/util/openmp_runtime_support.c
  6. 4 3
      src/util/openmp_runtime_support.h

+ 15 - 0
src/core/jobs.c

@@ -156,6 +156,7 @@ void _starpu_job_prepare_for_continuation_ext(struct _starpu_job *j, unsigned co
 	j->continuation_resubmit = continuation_resubmit;
 	j->continuation_callback_on_sleep = continuation_callback_on_sleep;
 	j->continuation_callback_on_sleep_arg = continuation_callback_on_sleep_arg;
+	j->job_successors.ndeps = 0;
 }
 /* Prepare a currently running job for accepting a new set of
  * dependencies in anticipation of becoming a continuation. */
@@ -163,6 +164,12 @@ void _starpu_job_prepare_for_continuation(struct _starpu_job *j)
 {
 	_starpu_job_prepare_for_continuation_ext(j, 1, NULL, NULL);
 }
+void _starpu_job_set_omp_cleanup_callback(struct _starpu_job *j,
+		void (*omp_cleanup_callback)(void *arg), void *omp_cleanup_callback_arg)
+{
+	j->omp_cleanup_callback = omp_cleanup_callback;
+	j->omp_cleanup_callback_arg = omp_cleanup_callback_arg;
+}
 #endif
 
 void _starpu_handle_job_termination(struct _starpu_job *j)
@@ -336,6 +343,14 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 	STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 	if (!continuation)
 	{
+#ifdef STARPU_OPENMP
+		if (j->omp_cleanup_callback)
+		{
+			j->omp_cleanup_callback(j->omp_cleanup_callback_arg);
+			j->omp_cleanup_callback = NULL;
+			j->omp_cleanup_callback_arg = NULL;
+		}
+#endif
 		/* A value of 2 is put to specify that not only the codelet but
 		 * also the callback were executed. */
 		j->terminated = 2;

+ 5 - 0
src/core/jobs.h

@@ -125,6 +125,9 @@ LIST_TYPE(_starpu_job,
 	void (*continuation_callback_on_sleep)(void *arg);
 	void *continuation_callback_on_sleep_arg;
 
+	void (*omp_cleanup_callback)(void *arg);
+	void *omp_cleanup_callback_arg;
+
 	/* Job has been stopped at least once. */
 	unsigned discontinuous;
 #endif
@@ -185,6 +188,8 @@ int _starpu_test_job_termination(struct _starpu_job *j);
 void _starpu_job_prepare_for_continuation_ext(struct _starpu_job *j, unsigned continuation_resubmit,
 		void (*continuation_callback_on_sleep)(void *arg), void *continuation_callback_on_sleep_arg);
 void _starpu_job_prepare_for_continuation(struct _starpu_job *j);
+void _starpu_job_set_omp_cleanup_callback(struct _starpu_job *j,
+		void (*omp_cleanup_callback)(void *arg), void *omp_cleanup_callback_arg);
 #endif
 
 /* Specify that the task should not appear in the DAG generated by debug tools. */

+ 6 - 0
src/core/task.c

@@ -964,6 +964,12 @@ void _starpu_task_prepare_for_continuation_ext(unsigned continuation_resubmit,
 	_starpu_job_prepare_for_continuation_ext(_starpu_get_job_associated_to_task(starpu_task_get_current()),
 		continuation_resubmit, continuation_callback_on_sleep, continuation_callback_on_sleep_arg);
 }
+
+void _starpu_task_set_omp_cleanup_callback(struct starpu_task *task, void (*omp_cleanup_callback)(void *arg), void *omp_cleanup_callback_arg)
+{
+	_starpu_job_set_omp_cleanup_callback(_starpu_get_job_associated_to_task(task),
+		omp_cleanup_callback, omp_cleanup_callback_arg);
+}
 #endif
 
 /*

+ 3 - 0
src/core/task.h

@@ -65,6 +65,9 @@ void _starpu_task_prepare_for_continuation_ext(unsigned continuation_resubmit,
 		void (*continuation_callback_on_sleep)(void *arg), void *continuation_callback_on_sleep_arg);
 
 void _starpu_task_prepare_for_continuation(void);
+
+void _starpu_task_set_omp_cleanup_callback(struct starpu_task *task, void (*omp_cleanup_callback)(void *arg),
+		void *omp_cleanup_callback_arg);
 #endif
 
 int _starpu_task_uses_multiformat_handles(struct starpu_task *task);

+ 77 - 46
src/util/openmp_runtime_support.c

@@ -55,6 +55,33 @@ static void destroy_omp_task_struct(struct starpu_omp_task *task);
 static void _wake_up_locked_task(struct starpu_omp_task *task);
 static void wake_up_barrier(struct starpu_omp_region *parallel_region);
 
+static void register_thread_worker(struct starpu_omp_thread *thread)
+{
+	STARPU_ASSERT(thread->worker != NULL);
+	_starpu_spin_lock(&_global_state.hash_workers_lock);
+	struct _starpu_worker *check = thread->worker;
+	struct starpu_omp_thread *tmp = NULL;
+	HASH_FIND_PTR(_global_state.hash_workers, &check, tmp);
+	STARPU_ASSERT(tmp == NULL);
+	HASH_ADD_PTR(_global_state.hash_workers, worker, thread);
+	_starpu_spin_unlock(&_global_state.hash_workers_lock);
+}
+static struct starpu_omp_thread *get_local_thread(void)
+{
+	struct starpu_omp_thread *thread = STARPU_PTHREAD_GETSPECIFIC(omp_thread_key);
+	if (thread == NULL)
+	{
+		struct _starpu_worker *starpu_worker = _starpu_get_local_worker_key();
+		STARPU_ASSERT(starpu_worker != NULL);
+		_starpu_spin_lock(&_global_state.hash_workers_lock);
+		HASH_FIND_PTR(_global_state.hash_workers, &starpu_worker, thread);
+		STARPU_ASSERT(thread != NULL);
+		_starpu_spin_unlock(&_global_state.hash_workers_lock);
+		STARPU_PTHREAD_SETSPECIFIC(omp_thread_key, thread);
+	}
+	return thread;
+}
+
 static struct starpu_omp_critical *create_omp_critical_struct(void)
 {
 	struct starpu_omp_critical *critical = malloc(sizeof(*critical));
@@ -156,7 +183,16 @@ static struct starpu_omp_thread *create_omp_thread_struct(struct starpu_omp_regi
 static void destroy_omp_thread_struct(struct starpu_omp_thread *thread)
 {
 	STARPU_ASSERT(thread->current_task == NULL);
-	STARPU_ASSERT(thread->primary_task == NULL);
+	if (thread->worker)
+	{
+		struct _starpu_worker *check = thread->worker;
+		struct starpu_omp_thread *_tmp;
+		_starpu_spin_lock(&_global_state.hash_workers_lock);
+		HASH_FIND_PTR(_global_state.hash_workers, &check, _tmp);
+		STARPU_ASSERT(_tmp == thread);
+		HASH_DEL(_global_state.hash_workers, _tmp);
+		_starpu_spin_unlock(&_global_state.hash_workers_lock);
+	}
 	memset(thread, 0, sizeof(*thread));
 	starpu_omp_thread_delete(thread);
 }
@@ -219,22 +255,7 @@ static void starpu_omp_implicit_task_exec(void *buffers[], void *cl_arg)
 	struct starpu_omp_task *task = starpu_task_get_current()->omp_task;
 	STARPU_ASSERT(task->is_implicit);
 	STARPU_PTHREAD_SETSPECIFIC(omp_task_key, task);
-	struct starpu_omp_thread *thread = STARPU_PTHREAD_GETSPECIFIC(omp_thread_key);
-	if (thread == NULL)
-	{
-		/*
-		 * this is the first time an implicit omp task is launched on the current worker.
-		 */
-		thread = task->owner_thread;
-		STARPU_ASSERT(thread->owner_region != NULL);
-		STARPU_ASSERT(thread->owner_region == task->owner_region);
-		thread->primary_task = task;
-
-		/*
-		 * make this worker an omp-enabled worker
-		 */
-		STARPU_PTHREAD_SETSPECIFIC(omp_thread_key, thread);
-	}
+	struct starpu_omp_thread *thread = get_local_thread();
 	if (task->state != starpu_omp_task_state_preempted)
 	{
 		task->starpu_buffers = buffers;
@@ -252,20 +273,6 @@ static void starpu_omp_implicit_task_exec(void *buffers[], void *cl_arg)
 	STARPU_ASSERT(task->state == starpu_omp_task_state_preempted
 			|| task->state == starpu_omp_task_state_terminated);
 	STARPU_PTHREAD_SETSPECIFIC(omp_task_key, NULL);
-	if (task->state == starpu_omp_task_state_terminated && task == thread->primary_task)
-	{
-		/*
-		 * make this worker an omp-disabled worker
-		 */
-		STARPU_PTHREAD_SETSPECIFIC(omp_thread_key, NULL);
-		thread->primary_task = NULL;
-
-		/*
-		 * make sure this worker wont be used for running omp tasks
-		 * until a new region is created
-		 */
-		thread->owner_region = NULL;
-	}
 
 	/* TODO: analyse the cause of the return and take appropriate steps */
 	if (task->state == starpu_omp_task_state_terminated)
@@ -284,10 +291,7 @@ static void starpu_omp_explicit_task_exec(void *buffers[], void *cl_arg)
 	struct starpu_omp_task *task = starpu_task_get_current()->omp_task;
 	STARPU_ASSERT(!task->is_implicit);
 	STARPU_PTHREAD_SETSPECIFIC(omp_task_key, task);
-	struct starpu_omp_thread *thread = STARPU_PTHREAD_GETSPECIFIC(omp_thread_key);
-	/* TODO: handle the case where an explicit task might get scheduled on the
-	 * thread before any implicit task: associate threads/worker beforehand */
-	STARPU_ASSERT(thread != NULL);
+	struct starpu_omp_thread *thread = get_local_thread();
 	if (task->state != starpu_omp_task_state_preempted)
 	{
 		if (!task->is_untied)
@@ -340,20 +344,19 @@ static void starpu_omp_explicit_task_exec(void *buffers[], void *cl_arg)
 				STARPU_ASSERT(waiting_task->wait_on & starpu_omp_task_wait_on_region_tasks);
 				waiting_task->wait_on &= ~starpu_omp_task_wait_on_region_tasks;
 				_wake_up_locked_task(waiting_task);
+				_starpu_spin_unlock(&parallel_region->lock);
 				_starpu_spin_unlock(&waiting_task->lock);
 			}
 		}
-		_starpu_spin_unlock(&parallel_region->lock);
-		task->starpu_task->omp_task = NULL;
-		task->starpu_task = NULL;
-		destroy_omp_task_struct(task);
-		task = NULL;
+		else
+		{
+			_starpu_spin_unlock(&parallel_region->lock);
+		}
 	}
 	else if (task->state != starpu_omp_task_state_preempted)
 		_STARPU_ERROR("invalid omp task state");
 }
 
-
 static struct starpu_omp_task *create_omp_task_struct(struct starpu_omp_task *parent_task,
 		struct starpu_omp_thread *owner_thread, struct starpu_omp_region *owner_region, int is_implicit)
 {
@@ -444,7 +447,6 @@ static void omp_initial_thread_setup(void)
 	 * we configure starpu to not launch CPU worker 0
 	 * because we will use the main thread to play the role of worker 0
 	 */
-	initial_thread->starpu_worker_id = 0;
 	struct starpu_conf conf;
 	int ret = starpu_conf_init(&conf);
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_conf_init");
@@ -458,7 +460,10 @@ static void omp_initial_thread_setup(void)
 	ret = starpu_driver_init(&initial_thread->starpu_driver);
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_driver_init");
 	STARPU_PTHREAD_SETSPECIFIC(omp_task_key, initial_task);
+	initial_thread->worker = _starpu_get_worker_struct(0);
+	STARPU_ASSERT(initial_thread->worker);
 	STARPU_PTHREAD_SETSPECIFIC(omp_thread_key, initial_thread);
+	register_thread_worker(initial_thread);
 }
 
 static void omp_initial_thread_exit()
@@ -511,6 +516,8 @@ int starpu_omp_init(void)
 	_global_state.default_critical = create_omp_critical_struct();
 	_global_state.named_criticals = NULL;
 	_starpu_spin_init(&_global_state.named_criticals_lock);
+	_global_state.hash_workers = NULL;
+	_starpu_spin_init(&_global_state.hash_workers_lock);
 	_starpu_omp_global_state = &_global_state;
 
 	omp_initial_region_setup();
@@ -549,6 +556,19 @@ void starpu_omp_shutdown(void)
 	STARPU_ASSERT(_global_state.named_criticals == NULL);
 	_starpu_spin_unlock(&_global_state.named_criticals_lock);
 	_starpu_spin_destroy(&_global_state.named_criticals_lock);
+	_starpu_spin_lock(&_global_state.hash_workers_lock);
+	{
+		struct starpu_omp_thread *thread, *tmp;
+		HASH_ITER(hh, _global_state.hash_workers, thread, tmp)
+		{
+			STARPU_ASSERT(thread != NULL);
+			HASH_DEL(_global_state.hash_workers, thread);
+			destroy_omp_thread_struct(thread);
+		}
+	}
+	STARPU_ASSERT(_global_state.hash_workers == NULL);
+	_starpu_spin_unlock(&_global_state.hash_workers_lock);
+	_starpu_spin_destroy(&_global_state.hash_workers_lock);
 	STARPU_PTHREAD_KEY_DELETE(omp_task_key);
 	STARPU_PTHREAD_KEY_DELETE(omp_thread_key);
 }
@@ -587,11 +607,12 @@ void starpu_omp_parallel_region(const struct starpu_codelet * const _parallel_re
 			/* TODO: use a less arbitrary thread/worker mapping scheme */
 			if (region->level == 0)
 			{
-				new_thread->starpu_worker_id = i;
+				new_thread->worker = _starpu_get_worker_struct(i);
+				register_thread_worker(new_thread);
 			}
 			else
 			{
-				new_thread->starpu_worker_id = master_thread->starpu_worker_id;
+				new_thread->worker = master_thread->worker;
 			}
 			starpu_omp_thread_list_push_back(new_region->thread_list, new_thread);
 		}
@@ -613,7 +634,7 @@ void starpu_omp_parallel_region(const struct starpu_codelet * const _parallel_re
 		new_region->continuation_starpu_task = starpu_task_create();
 		/* in that case, the continuation starpu task is only used for synchronisation */
 		new_region->continuation_starpu_task->cl = NULL;
-		new_region->continuation_starpu_task->workerid = master_thread->starpu_worker_id;
+		new_region->continuation_starpu_task->workerid = master_thread->worker->workerid;
 		new_region->continuation_starpu_task->execute_on_a_specific_worker = 1;
 		/* this sync task will be tested for completion in omp_initial_thread_func() */
 		new_region->continuation_starpu_task->detach = 0;
@@ -653,7 +674,7 @@ void starpu_omp_parallel_region(const struct starpu_codelet * const _parallel_re
 		implicit_task->starpu_task->cl = &implicit_task->cl;
 		implicit_task->starpu_task->cl_arg = parallel_region_cl_arg;
 		implicit_task->starpu_task->omp_task = implicit_task;
-		implicit_task->starpu_task->workerid = implicit_task->owner_thread->starpu_worker_id;
+		implicit_task->starpu_task->workerid = implicit_task->owner_thread->worker->workerid;
 		implicit_task->starpu_task->execute_on_a_specific_worker = 1;
 		starpu_task_declare_deps_array(new_region->continuation_starpu_task, 1, &implicit_task->starpu_task);
 	}
@@ -901,6 +922,15 @@ void starpu_omp_critical(void (*f)(void *arg), void *arg, const char *name)
 	_starpu_spin_unlock(&critical->lock);
 }
 
+static void explicit_task__destroy_callback(void *_task)
+{
+	struct starpu_omp_task *task = _task;
+	STARPU_ASSERT(!task->is_implicit);
+	task->starpu_task->omp_task = NULL;
+	task->starpu_task = NULL;
+	destroy_omp_task_struct(task);
+}
+
 void starpu_omp_task_region(const struct starpu_codelet * const _task_region_cl,
 		void * const task_region_cl_arg,
 		int if_clause, int final_clause, int untied_clause, int mergeable_clause)
@@ -971,6 +1001,7 @@ void starpu_omp_task_region(const struct starpu_codelet * const _task_region_cl,
 		generated_task->starpu_task->cl = &generated_task->cl;
 		generated_task->starpu_task->cl_arg = task_region_cl_arg;
 		generated_task->starpu_task->omp_task = generated_task;
+		_starpu_task_set_omp_cleanup_callback(generated_task->starpu_task, explicit_task__destroy_callback, generated_task);
 		/* if the task is tied, execute_on_a_specific_worker will be changed to 1
 		 * upon the first preemption of the generated task, once we know
 		 * which worker thread has been selected */

+ 4 - 3
src/util/openmp_runtime_support.h

@@ -244,11 +244,10 @@ LIST_TYPE(starpu_omp_task,
 
 LIST_TYPE(starpu_omp_thread,
 
+	UT_hash_handle hh;
 	struct starpu_omp_task *current_task;
 	struct starpu_omp_region *owner_region;
 
-	struct starpu_omp_task *primary_task;
-
 	/*
 	 * stack to execute the initial thread over
 	 * when preempting the initial task
@@ -264,7 +263,7 @@ LIST_TYPE(starpu_omp_thread,
 	ucontext_t ctx;
 
 	struct starpu_driver starpu_driver;
-	unsigned starpu_worker_id;
+	struct _starpu_worker *worker;
 )
 
 struct starpu_omp_region
@@ -302,6 +301,8 @@ struct starpu_omp_global
 	struct starpu_omp_critical *default_critical;
 	struct starpu_omp_critical *named_criticals;
 	struct _starpu_spinlock named_criticals_lock;
+	struct starpu_omp_thread *hash_workers;
+	struct _starpu_spinlock hash_workers_lock;
 };
 
 /*