소스 검색

Use starpu_worker_lock/unlock instead of internal _starpu_worker_lock/unlock, for coherency

Samuel Thibault 6 년 전
부모
커밋
3a1eca323f

+ 2 - 2
src/core/jobs.c

@@ -760,7 +760,7 @@ int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *t
 	if (STARPU_UNLIKELY(!(worker->worker_mask & task->where)))
 		return -ENODEV;
 
-	_starpu_worker_lock(worker->workerid);
+	starpu_worker_lock(worker->workerid);
 
 	if (task->execute_on_a_specific_worker && task->workerorder)
 	{
@@ -807,7 +807,7 @@ int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *t
 
 	starpu_wake_worker_locked(worker->workerid);
 	starpu_push_task_end(task);
-	_starpu_worker_unlock(worker->workerid);
+	starpu_worker_unlock(worker->workerid);
 
 	return 0;
 }

+ 10 - 10
src/core/sched_ctx.c

@@ -2250,7 +2250,7 @@ struct _starpu_sched_ctx *__starpu_sched_ctx_get_sched_ctx_for_worker_and_job(st
 	struct _starpu_sched_ctx_list_iterator list_it;
 	struct _starpu_sched_ctx *ret = NULL;
 
-	_starpu_worker_lock(worker->workerid);
+	starpu_worker_lock(worker->workerid);
 	_starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
 	while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
 	{
@@ -2262,7 +2262,7 @@ struct _starpu_sched_ctx *__starpu_sched_ctx_get_sched_ctx_for_worker_and_job(st
 			break;
 		}
 	}
-	_starpu_worker_unlock(worker->workerid);
+	starpu_worker_unlock(worker->workerid);
 	return ret;
 }
 
@@ -2338,9 +2338,9 @@ void starpu_sched_ctx_list_task_counters_increment(unsigned sched_ctx_id, int wo
 	/* FIXME: why do we push events only when the worker belongs to more than one ctx? */
 	if (worker->nsched_ctxs > 1)
 	{
-		_starpu_worker_lock(workerid);
+		starpu_worker_lock(workerid);
 		_starpu_sched_ctx_list_push_event(worker->sched_ctx_list, sched_ctx_id);
-		_starpu_worker_unlock(workerid);
+		starpu_worker_unlock(workerid);
 	}
 }
 
@@ -2413,9 +2413,9 @@ void starpu_sched_ctx_list_task_counters_decrement_all_ctx_locked(struct starpu_
 			struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 			if (worker->nsched_ctxs > 1)
 			{
-				_starpu_worker_lock(workerid);
+				starpu_worker_lock(workerid);
 				starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
-				_starpu_worker_unlock(workerid);
+				starpu_worker_unlock(workerid);
 			}
 		}
 	}
@@ -2435,9 +2435,9 @@ void starpu_sched_ctx_list_task_counters_decrement_all(struct starpu_task *task,
 			struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 			if (worker->nsched_ctxs > 1)
 			{
-				_starpu_worker_lock(workerid);
+				starpu_worker_lock(workerid);
 				starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
-				_starpu_worker_unlock(workerid);
+				starpu_worker_unlock(workerid);
 			}
 		}
 		_starpu_sched_ctx_unlock_write(sched_ctx_id);
@@ -2458,9 +2458,9 @@ void starpu_sched_ctx_list_task_counters_reset_all(struct starpu_task *task, uns
 			struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 			if (worker->nsched_ctxs > 1)
 			{
-				_starpu_worker_lock(workerid);
+				starpu_worker_lock(workerid);
 				starpu_sched_ctx_list_task_counters_reset(sched_ctx_id, workerid);
-				_starpu_worker_unlock(workerid);
+				starpu_worker_unlock(workerid);
 			}
 		}
 		_starpu_sched_ctx_unlock_write(sched_ctx_id);

+ 2 - 2
src/core/workers.c

@@ -1875,9 +1875,9 @@ unsigned starpu_worker_is_blocked_in_parallel(int workerid)
 
 unsigned starpu_worker_is_slave_somewhere(int workerid)
 {
-	_starpu_worker_lock(workerid);
+	starpu_worker_lock(workerid);
 	unsigned ret = _starpu_config.workers[workerid].is_slave_somewhere;
-	_starpu_worker_unlock(workerid);
+	starpu_worker_unlock(workerid);
 	return ret;
 }
 

+ 2 - 2
src/sched_policies/component_worker.c

@@ -760,13 +760,13 @@ void _starpu_sched_component_lock_all_workers(void)
 {
 	unsigned i;
 	for(i = 0; i < starpu_worker_get_count(); i++)
-		_starpu_worker_lock(i);
+		starpu_worker_lock(i);
 }
 void _starpu_sched_component_unlock_all_workers(void)
 {
 	unsigned i;
 	for(i = 0; i < starpu_worker_get_count(); i++)
-		_starpu_worker_unlock(i);
+		starpu_worker_unlock(i);
 }
 
 void _starpu_sched_component_workers_destroy(void)

+ 16 - 16
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -329,9 +329,9 @@ static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
 	fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start);
 	fifo->exp_end = fifo->exp_start + fifo->exp_len;
 
-	_starpu_worker_lock_self();
+	starpu_worker_lock_self();
 	new_list = _starpu_fifo_pop_every_task(fifo, workerid);
-	_starpu_worker_unlock_self();
+	starpu_worker_unlock_self();
 
 	starpu_sched_ctx_list_task_counters_reset(sched_ctx_id, workerid);
 
@@ -370,7 +370,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	starpu_sched_ctx_call_pushed_task_cb(best_workerid, sched_ctx_id);
 #endif //STARPU_USE_SC_HYPERVISOR
 
-	_starpu_worker_lock(best_workerid);
+	starpu_worker_lock(best_workerid);
 
         /* Sometimes workers didn't take the tasks as early as we expected */
 	fifo->exp_start = isnan(fifo->exp_start) ? now + fifo->pipeline_len : STARPU_MAX(fifo->exp_start, now);
@@ -416,7 +416,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	}
 	fifo->exp_end = fifo->exp_start + fifo->exp_len;
 
-	_starpu_worker_unlock(best_workerid);
+	starpu_worker_unlock(best_workerid);
 
 	task->predicted = predicted;
 	task->predicted_transfer = predicted_transfer;
@@ -449,7 +449,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	int ret = 0;
 	if (prio)
 	{
-		_starpu_worker_lock(best_workerid);
+		starpu_worker_lock(best_workerid);
 		ret =_starpu_fifo_push_sorted_task(dt->queue_array[best_workerid], task);
 		if(dt->num_priorities != -1)
 		{
@@ -464,11 +464,11 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		starpu_wake_worker_locked(best_workerid);
 #endif
 		starpu_push_task_end(task);
-		_starpu_worker_unlock(best_workerid);
+		starpu_worker_unlock(best_workerid);
 	}
 	else
 	{
-		_starpu_worker_lock(best_workerid);
+		starpu_worker_lock(best_workerid);
 		starpu_task_list_push_back (&dt->queue_array[best_workerid]->taskq, task);
 		dt->queue_array[best_workerid]->ntasks++;
 		dt->queue_array[best_workerid]->nprocessed++;
@@ -476,7 +476,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		starpu_wake_worker_locked(best_workerid);
 #endif
 		starpu_push_task_end(task);
-		_starpu_worker_unlock(best_workerid);
+		starpu_worker_unlock(best_workerid);
 	}
 
 	starpu_sched_ctx_list_task_counters_increment(sched_ctx_id, best_workerid);
@@ -702,9 +702,9 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 				}
 				else
 				{
-					_starpu_worker_lock(workerid);
+					starpu_worker_lock(workerid);
 					prev_exp_len = _starpu_fifo_get_exp_len_prev_task_list(fifo, task, workerid, nimpl, &fifo_ntasks);
-					_starpu_worker_unlock(workerid);
+					starpu_worker_unlock(workerid);
 				}
 			}
 
@@ -1143,7 +1143,7 @@ static void dmda_pre_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
 
 	/* Once the task is executing, we can update the predicted amount
 	 * of work. */
-	_starpu_worker_lock_self();
+	starpu_worker_lock_self();
 
 	_starpu_fifo_task_started(fifo, task, dt->num_priorities);
 
@@ -1151,7 +1151,7 @@ static void dmda_pre_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
 	fifo->exp_start = STARPU_MAX(now + fifo->pipeline_len, fifo->exp_start);
 	fifo->exp_end = fifo->exp_start + fifo->exp_len;
 
-	_starpu_worker_unlock_self();
+	starpu_worker_unlock_self();
 }
 
 static void dmda_push_task_notify(struct starpu_task *task, int workerid, int perf_workerid, unsigned sched_ctx_id)
@@ -1168,7 +1168,7 @@ static void dmda_push_task_notify(struct starpu_task *task, int workerid, int pe
 	double now = starpu_timing_now();
 
 	/* Update the predictions */
-	_starpu_worker_lock(workerid);
+	starpu_worker_lock(workerid);
 	/* Sometimes workers didn't take the tasks as early as we expected */
 	fifo->exp_start = isnan(fifo->exp_start) ? now + fifo->pipeline_len : STARPU_MAX(fifo->exp_start, now);
 	fifo->exp_end = fifo->exp_start + fifo->exp_len;
@@ -1226,7 +1226,7 @@ static void dmda_push_task_notify(struct starpu_task *task, int workerid, int pe
 
 	fifo->ntasks++;
 
-	_starpu_worker_unlock(workerid);
+	starpu_worker_unlock(workerid);
 }
 
 static void dmda_post_exec_hook(struct starpu_task * task, unsigned sched_ctx_id)
@@ -1234,9 +1234,9 @@ static void dmda_post_exec_hook(struct starpu_task * task, unsigned sched_ctx_id
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	unsigned workerid = starpu_worker_get_id_check();
 	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
-	_starpu_worker_lock_self();
+	starpu_worker_lock_self();
 	_starpu_fifo_task_finished(fifo, task, dt->num_priorities);
-	_starpu_worker_unlock_self();
+	starpu_worker_unlock_self();
 }
 
 struct starpu_sched_policy _starpu_sched_dm_policy =

+ 3 - 3
src/sched_policies/heteroprio.c

@@ -592,7 +592,7 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
 				   && hp->workers_heteroprio[victim].tasks_queue.ntasks)
 				{
 					/* ensure the worker is not currently prefetching its data */
-					_starpu_worker_lock(victim);
+					starpu_worker_lock(victim);
 
 					if(hp->workers_heteroprio[victim].arch_index == worker->arch_index
 					   && hp->workers_heteroprio[victim].tasks_queue.ntasks)
@@ -603,10 +603,10 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
 						/* we steal a task update global counter */
 						hp->nb_prefetched_tasks_per_arch_index[hp->workers_heteroprio[victim].arch_index] -= 1;
 
-						_starpu_worker_unlock(victim);
+						starpu_worker_unlock(victim);
 						goto done;
 					}
-					_starpu_worker_unlock(victim);
+					starpu_worker_unlock(victim);
 				}
 			}
 		}

+ 2 - 2
src/sched_policies/parallel_eager.c

@@ -358,11 +358,11 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 	for (i = 1; i < worker_size; i++)
 	{
 		int local_worker = combined_workerid[i];
-		_starpu_worker_lock(local_worker);
+		starpu_worker_lock(local_worker);
 #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
 		starpu_wake_worker_locked(local_worker);
 #endif
-		_starpu_worker_unlock(local_worker);
+		starpu_worker_unlock(local_worker);
 	}
 
 ret:

+ 8 - 8
src/sched_policies/parallel_heft.c

@@ -92,12 +92,12 @@ static void parallel_heft_pre_exec_hook(struct starpu_task *task, unsigned sched
 
 	/* Once we have executed the task, we can update the predicted amount
 	 * of work. */
-	_starpu_worker_lock_self();
+	starpu_worker_lock_self();
 	worker_exp_len[workerid] -= model + transfer_model;
 	worker_exp_start[workerid] = now + model;
 	worker_exp_end[workerid] = worker_exp_start[workerid] + worker_exp_len[workerid];
 	ntasks[workerid]--;
-	_starpu_worker_unlock_self();
+	starpu_worker_unlock_self();
 }
 
 static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double exp_end_predicted, int prio, unsigned sched_ctx_id)
@@ -114,7 +114,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 
 	if (!starpu_worker_is_combined_worker(best_workerid))
 	{
-		_starpu_worker_lock(best_workerid);
+		starpu_worker_lock(best_workerid);
 		task->predicted = exp_end_predicted - worker_exp_end[best_workerid];
 		/* TODO */
 		task->predicted_transfer = 0;
@@ -123,7 +123,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		worker_exp_start[best_workerid] = exp_end_predicted - worker_exp_len[best_workerid];
 
 		ntasks[best_workerid]++;
-		_starpu_worker_unlock(best_workerid);
+		starpu_worker_unlock(best_workerid);
 
 		/* We don't want it to interlace its task with a combined
 		 * worker's one */
@@ -164,13 +164,13 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 			/* TODO */
 			alias->predicted_transfer = 0;
 			alias->destroy = 1;
-			_starpu_worker_lock(local_combined_workerid);
+			starpu_worker_lock(local_combined_workerid);
 			worker_exp_len[local_combined_workerid] += alias->predicted;
 			worker_exp_end[local_combined_workerid] = exp_end_predicted;
 			worker_exp_start[local_combined_workerid] = exp_end_predicted - worker_exp_len[local_combined_workerid];
 
 			ntasks[local_combined_workerid]++;
-			_starpu_worker_unlock(local_combined_workerid);
+			starpu_worker_unlock(local_combined_workerid);
 
 			_STARPU_TRACE_JOB_PUSH(alias, alias->priority > 0);
 			ret |= starpu_push_local_task(local_combined_workerid, alias, prio);
@@ -308,12 +308,12 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 		if(!starpu_worker_is_combined_worker(workerid))
 		{
 			/* Sometimes workers didn't take the tasks as early as we expected */
-			_starpu_worker_lock(workerid);
+			starpu_worker_lock(workerid);
 			worker_exp_start[workerid] = STARPU_MAX(worker_exp_start[workerid], now);
 			worker_exp_end[workerid] = worker_exp_start[workerid] + worker_exp_len[workerid];
 			if (worker_exp_end[workerid] > max_exp_end)
 				max_exp_end = worker_exp_end[workerid];
-			_starpu_worker_unlock(workerid);
+			starpu_worker_unlock(workerid);
 		}
 	}
 

+ 3 - 3
src/sched_policies/work_stealing_policy.c

@@ -588,7 +588,7 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 		record_worker_locality(ws, task, workerid, sched_ctx_id);
 		locality_popped_task(ws, task, victim, sched_ctx_id);
 	}
-	_starpu_worker_unlock(victim);
+	starpu_worker_unlock(victim);
 
 #ifndef STARPU_NON_BLOCKING_DRIVERS
         /* While stealing, perhaps somebody actually give us a task, don't miss
@@ -644,7 +644,7 @@ int ws_push_task(struct starpu_task *task)
 	if (workerid == -1 || !starpu_sched_ctx_contains_worker(workerid, sched_ctx_id) ||
 			!starpu_worker_can_execute_task_first_impl(workerid, task, NULL))
 		workerid = select_worker(ws, task, sched_ctx_id);
-	_starpu_worker_lock(workerid);
+	starpu_worker_lock(workerid);
 	STARPU_AYU_ADDTOTASKQUEUE(starpu_task_get_job_id(task), workerid);
 	starpu_sched_task_break(task);
 	record_data_locality(task, workerid);
@@ -653,7 +653,7 @@ int ws_push_task(struct starpu_task *task)
 	locality_pushed_task(ws, task, workerid, sched_ctx_id);
 
 	starpu_push_task_end(task);
-	_starpu_worker_unlock(workerid);
+	starpu_worker_unlock(workerid);
 	starpu_sched_ctx_list_task_counters_increment(sched_ctx_id, workerid);
 
 #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)