Explorar el Código

move sched_op section from handle_job_termination to push_task to avoid having to call relax_on in tag routines

Olivier Aumage hace 8 años
padre
commit
07a626a8a5
Se han modificado 5 ficheros con 19 adiciones y 90 borrados
  1. 0 32
      src/core/dependencies/tags.c
  2. 4 24
      src/core/jobs.c
  3. 13 0
      src/core/sched_policy.c
  4. 2 0
      src/core/workers.h
  5. 0 34
      src/datawizard/data_request.c

+ 0 - 32
src/core/dependencies/tags.c

@@ -98,9 +98,7 @@ static void _starpu_tag_free(void *_tag)
 
 	if (tag)
 	{
-		_starpu_worker_relax_on();
 		_starpu_spin_lock(&tag->lock);
-		_starpu_worker_relax_off();
 
 		unsigned nsuccs = tag->tag_successors.nsuccs;
 		unsigned succ;
@@ -143,9 +141,7 @@ void starpu_tag_remove(starpu_tag_t id)
 
 	STARPU_ASSERT(!STARPU_AYU_EVENT || id < STARPU_AYUDAME_OFFSET);
 	STARPU_AYU_REMOVETASK(id + STARPU_AYUDAME_OFFSET);
-	_starpu_worker_relax_on();
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&tag_global_rwlock);
-	_starpu_worker_relax_off();
 
 	HASH_FIND_UINT64_T(tag_htbl, &id, entry);
 	if (entry) HASH_DEL(tag_htbl, entry);
@@ -161,9 +157,7 @@ void starpu_tag_remove(starpu_tag_t id)
 
 void _starpu_tag_clear(void)
 {
-	_starpu_worker_relax_on();
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&tag_global_rwlock);
-	_starpu_worker_relax_off();
 
 	/* XXX: _starpu_tag_free takes the tag spinlocks while we are keeping
 	 * the global rwlock. This contradicts the lock order of
@@ -212,9 +206,7 @@ static struct _starpu_tag *_gettag_struct(starpu_tag_t id)
 static struct _starpu_tag *gettag_struct(starpu_tag_t id)
 {
 	struct _starpu_tag *tag;
-	_starpu_worker_relax_on();
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&tag_global_rwlock);
-	_starpu_worker_relax_off();
 	tag = _gettag_struct(id);
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&tag_global_rwlock);
 	return tag;
@@ -235,14 +227,10 @@ void _starpu_tag_set_ready(struct _starpu_tag *tag)
 	_starpu_spin_unlock(&tag->lock);
 
 	/* enforce data dependencies */
-	_starpu_worker_relax_on();
 	STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
-	_starpu_worker_relax_off();
 	_starpu_enforce_deps_starting_from_task(j);
 
-	_starpu_worker_relax_on();
 	_starpu_spin_lock(&tag->lock);
-	_starpu_worker_relax_off();
 	STARPU_ASSERT(!STARPU_AYU_EVENT || tag->id < STARPU_AYUDAME_OFFSET);
 	STARPU_AYU_PRERUNTASK(tag->id + STARPU_AYUDAME_OFFSET, -1);
 	STARPU_AYU_POSTRUNTASK(tag->id + STARPU_AYUDAME_OFFSET);
@@ -264,9 +252,7 @@ static void _starpu_tag_add_succ(struct _starpu_tag *tag, struct _starpu_cg *cg)
 
 void _starpu_notify_tag_dependencies(struct _starpu_tag *tag)
 {
-	_starpu_worker_relax_on();
 	_starpu_spin_lock(&tag->lock);
-	_starpu_worker_relax_off();
 
 	if (tag->state == STARPU_DONE)
 	{
@@ -286,9 +272,7 @@ void starpu_tag_restart(starpu_tag_t id)
 {
 	struct _starpu_tag *tag = gettag_struct(id);
 
-	_starpu_worker_relax_on();
 	_starpu_spin_lock(&tag->lock);
-	_starpu_worker_relax_off();
 	STARPU_ASSERT_MSG(tag->state == STARPU_DONE || tag->state == STARPU_INVALID_STATE || tag->state == STARPU_ASSOCIATED || tag->state == STARPU_BLOCKED, "Only completed tags can be restarted (%llu was %d)", (unsigned long long) id, tag->state);
 	tag->state = STARPU_BLOCKED;
 	_starpu_spin_unlock(&tag->lock);
@@ -308,9 +292,7 @@ void _starpu_tag_declare(starpu_tag_t id, struct _starpu_job *job)
 
 	struct _starpu_tag *tag= gettag_struct(id);
 
-	_starpu_worker_relax_on();
 	_starpu_spin_lock(&tag->lock);
-	_starpu_worker_relax_off();
 
 	/* Note: a tag can be shared by several tasks, when it is used to
 	 * detect when either of them are finished. We however don't allow
@@ -346,9 +328,7 @@ void starpu_tag_declare_deps_array(starpu_tag_t id, unsigned ndeps, starpu_tag_t
 	/* create the associated completion group */
 	struct _starpu_tag *tag_child = gettag_struct(id);
 
-	_starpu_worker_relax_on();
 	_starpu_spin_lock(&tag_child->lock);
-	_starpu_worker_relax_off();
 	struct _starpu_cg *cg = create_cg_tag(ndeps, tag_child);
 	_starpu_spin_unlock(&tag_child->lock);
 
@@ -362,10 +342,8 @@ void starpu_tag_declare_deps_array(starpu_tag_t id, unsigned ndeps, starpu_tag_t
 		_starpu_bound_tag_dep(id, dep_id);
 		struct _starpu_tag *tag_dep = gettag_struct(dep_id);
 		STARPU_ASSERT(tag_dep != tag_child);
-		_starpu_worker_relax_on();
 		_starpu_spin_lock(&tag_dep->lock);
 		_starpu_spin_lock(&tag_child->lock);
-		_starpu_worker_relax_off();
 		_starpu_tag_add_succ(tag_dep, cg);
 		STARPU_ASSERT(!STARPU_AYU_EVENT || dep_id < STARPU_AYUDAME_OFFSET);
 		STARPU_ASSERT(!STARPU_AYU_EVENT || id < STARPU_AYUDAME_OFFSET);
@@ -385,9 +363,7 @@ void starpu_tag_declare_deps(starpu_tag_t id, unsigned ndeps, ...)
 	/* create the associated completion group */
 	struct _starpu_tag *tag_child = gettag_struct(id);
 
-	_starpu_worker_relax_on();
 	_starpu_spin_lock(&tag_child->lock);
-	_starpu_worker_relax_off();
 	struct _starpu_cg *cg = create_cg_tag(ndeps, tag_child);
 	_starpu_spin_unlock(&tag_child->lock);
 
@@ -404,10 +380,8 @@ void starpu_tag_declare_deps(starpu_tag_t id, unsigned ndeps, ...)
 		_starpu_bound_tag_dep(id, dep_id);
 		struct _starpu_tag *tag_dep = gettag_struct(dep_id);
 		STARPU_ASSERT(tag_dep != tag_child);
-		_starpu_worker_relax_on();
 		_starpu_spin_lock(&tag_dep->lock);
 		_starpu_spin_lock(&tag_child->lock);
-		_starpu_worker_relax_off();
 		_starpu_tag_add_succ(tag_dep, cg);
 		STARPU_ASSERT(!STARPU_AYU_EVENT || dep_id < STARPU_AYUDAME_OFFSET);
 		STARPU_ASSERT(!STARPU_AYU_EVENT || id < STARPU_AYUDAME_OFFSET);
@@ -432,7 +406,6 @@ int starpu_tag_wait_array(unsigned ntags, starpu_tag_t *id)
 	STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_tag_wait must not be called from a task or callback");
 
 	starpu_do_schedule();
-	_starpu_worker_relax_on();
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&tag_global_rwlock);
 	/* only wait the tags that are not done yet */
 	for (i = 0, current = 0; i < ntags; i++)
@@ -453,7 +426,6 @@ int starpu_tag_wait_array(unsigned ntags, starpu_tag_t *id)
 		}
 	}
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&tag_global_rwlock);
-	_starpu_worker_relax_off();
 
 	if (current == 0)
 	{
@@ -471,14 +443,12 @@ int starpu_tag_wait_array(unsigned ntags, starpu_tag_t *id)
 		_starpu_spin_unlock(&tag_array[i]->lock);
 	}
 
-	_starpu_worker_relax_on();
 	STARPU_PTHREAD_MUTEX_LOCK(&cg->succ.succ_apps.cg_mutex);
 
 	while (!cg->succ.succ_apps.completed)
 		STARPU_PTHREAD_COND_WAIT(&cg->succ.succ_apps.cg_cond, &cg->succ.succ_apps.cg_mutex);
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(&cg->succ.succ_apps.cg_mutex);
-	_starpu_worker_relax_off();
 
 	STARPU_PTHREAD_MUTEX_DESTROY(&cg->succ.succ_apps.cg_mutex);
 	STARPU_PTHREAD_COND_DESTROY(&cg->succ.succ_apps.cg_cond);
@@ -499,9 +469,7 @@ struct starpu_task *starpu_tag_get_task(starpu_tag_t id)
 	struct _starpu_tag_table *entry;
 	struct _starpu_tag *tag;
 
-	_starpu_worker_relax_on();
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&tag_global_rwlock);
-	_starpu_worker_relax_off();
 	HASH_FIND_UINT64_T(tag_htbl, &id, entry);
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&tag_global_rwlock);
 

+ 4 - 24
src/core/jobs.c

@@ -262,18 +262,6 @@ void _starpu_handle_job_submission(struct _starpu_job *j)
 
 void _starpu_handle_job_termination(struct _starpu_job *j)
 {
-	struct _starpu_worker *worker = _starpu_get_local_worker_key();
-	int need_leave_sched_op = 0;
-	if (worker)
-	{
-		STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
-		if (!worker->state_sched_op_pending)
-		{
-			_starpu_worker_enter_sched_op(worker);
-			need_leave_sched_op = 1;
-		}
-		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
-	}
 	struct starpu_task *task = j->task;
 	unsigned sched_ctx = task->sched_ctx;
 	double flops = task->flops;
@@ -520,17 +508,13 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 		}
 	}
 
+	_starpu_decrement_nready_tasks_of_sched_ctx(sched_ctx, flops);
+	_starpu_decrement_nsubmitted_tasks_of_sched_ctx(sched_ctx);
+	struct _starpu_worker *worker;
+	worker = _starpu_get_local_worker_key();
 	if (worker)
 	{
-		_starpu_worker_relax_on();
-		_starpu_sched_ctx_lock_write(sched_ctx);
-		_starpu_worker_relax_off();
-		starpu_sched_ctx_revert_task_counters_ctx_locked(sched_ctx, flops);
-		_starpu_sched_ctx_unlock_write(sched_ctx);
-
 		STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
-		if (need_leave_sched_op)
-			_starpu_worker_leave_sched_op(worker);
 
 		if(worker->removed_from_ctx[sched_ctx] == 1 && worker->shares_tasks_lists[sched_ctx] == 1)
 		{
@@ -539,10 +523,6 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 		}
 		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 	}
-	else
-	{
-		starpu_sched_ctx_revert_task_counters(sched_ctx, flops);
-	}
 }
 
 /* This function is called when a new task is submitted to StarPU

+ 13 - 0
src/core/sched_policy.c

@@ -596,10 +596,23 @@ int _starpu_push_task_to_workers(struct starpu_task *task)
 				ret = -1;
 			else
 			{
+				struct _starpu_worker *worker = _starpu_get_local_worker_key();
+				if (worker)
+				{
+					STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
+					_starpu_worker_enter_sched_op(worker);
+					STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
+				}
 				_STARPU_TASK_BREAK_ON(task, push);
 				_STARPU_SCHED_BEGIN;
 				ret = sched_ctx->sched_policy->push_task(task);
 				_STARPU_SCHED_END;
+				if (worker)
+				{
+					STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
+					_starpu_worker_leave_sched_op(worker);
+					STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
+				}
 			}
 		}
 

+ 2 - 0
src/core/workers.h

@@ -775,6 +775,7 @@ static inline void __starpu_worker_enter_sched_op(struct _starpu_worker * const
 static inline void _starpu_worker_enter_sched_op(struct _starpu_worker * const worker)
 #endif
 {
+	STARPU_ASSERT(!worker->state_sched_op_pending);
 	if (!worker->state_blocked_in_parallel_observed)
 	{
 		/* process pending block requests before entering a sched_op region */
@@ -828,6 +829,7 @@ static inline void __starpu_worker_leave_sched_op(struct _starpu_worker * const
 static inline void _starpu_worker_leave_sched_op(struct _starpu_worker * const worker)
 #endif
 {
+	STARPU_ASSERT(worker->state_sched_op_pending);
 	worker->state_safe_for_observation = 1;
 #ifdef STARPU_SPINLOCK_CHECK
 	worker->relax_off_file = file;

+ 0 - 34
src/datawizard/data_request.c

@@ -173,9 +173,7 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
 	r->callbacks = NULL;
 	r->com_id = 0;
 
-	_starpu_worker_relax_on();
 	_starpu_spin_lock(&r->lock);
-	_starpu_worker_relax_off();
 
 	/* Take a reference on the target for the request to be able to write it */
 	if (dst_replicate)
@@ -241,9 +239,7 @@ int _starpu_wait_data_request_completion(struct _starpu_data_request *r, unsigne
 			completed = r->completed;
 		if (completed)
 		{
-			_starpu_worker_relax_on();
 			_starpu_spin_lock(&r->lock);
-			_starpu_worker_relax_off();
 			if (r->completed)
 				break;
 			_starpu_spin_unlock(&r->lock);
@@ -311,9 +307,7 @@ void _starpu_post_data_request(struct _starpu_data_request *r)
 	}
 
 	/* insert the request in the proper list */
-	_starpu_worker_relax_on();
 	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node]);
-	_starpu_worker_relax_off();
 	if (r->prefetch == 2)
 		_starpu_data_request_prio_list_push_back(&idle_requests[handling_node], r);
 	else if (r->prefetch)
@@ -483,10 +477,8 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
 	/* Have to wait for the handle, whatever it takes, in simgrid,
 	 * since we can not afford going to sleep, since nobody would wake us
 	 * up. */
-	_starpu_worker_relax_on();
 	_starpu_spin_lock(&handle->header_lock);
 	_starpu_spin_lock(&r->lock);
-	_starpu_worker_relax_off();
 #endif
 
 	struct _starpu_data_replicate *src_replicate = r->src_replicate;
@@ -529,9 +521,7 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
 		 * requests in the meantime. */
 		_starpu_spin_unlock(&handle->header_lock);
 
-		_starpu_worker_relax_on();
 		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[r->handling_node]);
-		_starpu_worker_relax_off();
 		_starpu_data_request_prio_list_push_back(&data_requests_pending[r->handling_node], r);
 		data_requests_npending[r->handling_node]++;
 		STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[r->handling_node]);
@@ -540,9 +530,7 @@ static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned m
 	}
 
 	/* the request has been handled */
-	_starpu_worker_relax_on();
 	_starpu_spin_lock(&r->lock);
-	_starpu_worker_relax_off();
 	starpu_handle_data_request_completion(r);
 
 	return 0;
@@ -575,9 +563,7 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 		return -EBUSY;
 	}
 #else
-	_starpu_worker_relax_on();
 	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
-	_starpu_worker_relax_off();
 #endif
 
 	if (_starpu_data_request_prio_list_empty(&reqlist[src_node]))
@@ -642,9 +628,7 @@ static int __starpu_handle_node_data_requests(struct _starpu_data_request_prio_l
 
 	if (i <= prefetch)
 	{
-		_starpu_worker_relax_on();
 		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
-		_starpu_worker_relax_off();
 		if (!(_starpu_data_request_prio_list_empty(&new_data_requests[0])))
 		{
 			_starpu_data_request_prio_list_push_prio_list_back(&new_data_requests[0], &data_requests[src_node]);
@@ -725,12 +709,8 @@ static int _handle_pending_node_data_requests(unsigned src_node, unsigned force)
 	}
 	else
 #endif
-	{
 		/* We really want to handle requests */
-		_starpu_worker_relax_on();
 		STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[src_node]);
-		_starpu_worker_relax_off();
-	}
 
 	if (_starpu_data_request_prio_list_empty(&data_requests_pending[src_node]))
 	{
@@ -763,11 +743,7 @@ static int _handle_pending_node_data_requests(unsigned src_node, unsigned force)
 			/* Or when running in simgrid, in which case we can not
 			 * afford going to sleep, since nobody would wake us
 			 * up. */
-		{
-			_starpu_worker_relax_on();
 			_starpu_spin_lock(&handle->header_lock);
-			_starpu_worker_relax_off();
-		}
 #ifndef STARPU_SIMGRID
 		else
 			if (_starpu_spin_trylock(&handle->header_lock))
@@ -780,9 +756,7 @@ static int _handle_pending_node_data_requests(unsigned src_node, unsigned force)
 #endif
 
 		/* This shouldn't be too hard to acquire */
-		_starpu_worker_relax_on();
 		_starpu_spin_lock(&r->lock);
-		_starpu_worker_relax_off();
 
 		/* wait until the transfer is terminated */
 		if (force)
@@ -810,9 +784,7 @@ static int _handle_pending_node_data_requests(unsigned src_node, unsigned force)
 			}
 		}
 	}
-	_starpu_worker_relax_on();
 	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[src_node]);
-	_starpu_worker_relax_off();
 	data_requests_npending[src_node] -= taken - kept;
 	if (kept)
 		_starpu_data_request_prio_list_push_prio_list_back(&data_requests_pending[src_node], &new_data_requests_pending);
@@ -838,16 +810,12 @@ int _starpu_check_that_no_data_request_exists(unsigned node)
 	int no_request;
 	int no_pending;
 
-	_starpu_worker_relax_on();
 	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[node]);
-	_starpu_worker_relax_off();
 	no_request = _starpu_data_request_prio_list_empty(&data_requests[node])
 	          && _starpu_data_request_prio_list_empty(&prefetch_requests[node])
 		  && _starpu_data_request_prio_list_empty(&idle_requests[node]);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[node]);
-	_starpu_worker_relax_on();
 	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[node]);
-	_starpu_worker_relax_off();
 	no_pending = !data_requests_npending[node];
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[node]);
 
@@ -876,9 +844,7 @@ void _starpu_update_prefetch_status(struct _starpu_data_request *r, unsigned pre
 			_starpu_update_prefetch_status(next_req, prefetch);
 	}
 
-	_starpu_worker_relax_on();
 	STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[r->handling_node]);
-	_starpu_worker_relax_off();
 
 	/* The request can be in a different list (handling request or the temp list)
 	 * we have to check that it is really in the prefetch list. */