Browse Source

wrap sched_ctx rwlock operations to allow for checking invariants in functions requiring the lock to be wrlocked

Olivier Aumage 8 years ago
parent
commit
d86e8dcc0f
4 changed files with 79 additions and 62 deletions
  1. 30 49
      src/core/sched_ctx.c
  2. 42 3
      src/core/sched_ctx.h
  3. 6 9
      src/core/sched_policy.c
  4. 1 1
      src/core/workers.h

+ 30 - 49
src/core/sched_ctx.c

@@ -21,8 +21,6 @@
 #include <stdarg.h>
 #include <core/task.h>
 
-starpu_pthread_rwlock_t ctx_mutex[STARPU_NMAX_SCHED_CTXS];
-
 static starpu_pthread_mutex_t sched_ctx_manag = STARPU_PTHREAD_MUTEX_INITIALIZER;
 static starpu_pthread_mutex_t finished_submit_mutex = STARPU_PTHREAD_MUTEX_INITIALIZER;
 static struct starpu_task stop_submission_task = STARPU_TASK_INITIALIZER;
@@ -554,6 +552,8 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 	sched_ctx->sms_end_idx = STARPU_NMAXSMS;
 	sched_ctx->nsms = nsms;
 	sched_ctx->stream_worker = -1;
+	sched_ctx->lock_write_owner = 0;
+	STARPU_PTHREAD_RWLOCK_INIT(&sched_ctx->rwlock, NULL);
 	if(nsms > 0)
 	{
 		STARPU_ASSERT_MSG(workerids, "workerids is needed when setting nsms");
@@ -597,15 +597,6 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 		starpu_sched_ctx_create_worker_collection(sched_ctx->id, STARPU_WORKER_LIST);
 	}
 
-	if(is_initial_sched)
-	{
-		/*initialize the mutexes for all contexts */
-		for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
-		  {
-			STARPU_PTHREAD_RWLOCK_INIT(&ctx_mutex[i], NULL);
-		  }
-	}
-
         /*add sub_ctxs before add workers, in order to be able to associate them if necessary */
 	if(nsub_ctxs != 0)
 	{
@@ -1131,10 +1122,10 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 	}
 #endif //STARPU_USE_SC_HYPERVISOR
 
+	_starpu_sched_ctx_lock_write(sched_ctx_id);
 	unsigned inheritor_sched_ctx_id = sched_ctx->inheritor;
 	struct _starpu_sched_ctx *inheritor_sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx->inheritor);
 
-	STARPU_PTHREAD_RWLOCK_WRLOCK(&ctx_mutex[sched_ctx_id]);
 	STARPU_ASSERT(sched_ctx->id != STARPU_NMAX_SCHED_CTXS);
 
 	int *workerids;
@@ -1170,7 +1161,7 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 	}
 	unlock_workers_for_changing_ctx(nworkers_ctx, backup_workerids);
 
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&ctx_mutex[sched_ctx_id]);
+	_starpu_sched_ctx_unlock_write(sched_ctx_id);
 	/* workerids is malloc-ed in starpu_sched_ctx_get_workers_list, don't forget to free it when
 	   you don't use it anymore */
 	free(workerids);
@@ -1184,17 +1175,22 @@ void _starpu_delete_all_sched_ctxs()
 	unsigned i;
 	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
 	{
+		_starpu_sched_ctx_lock_write(i);
 		struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(i);
-		STARPU_PTHREAD_RWLOCK_WRLOCK(&ctx_mutex[i]);
 		if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 		{
 			_starpu_sched_ctx_free_scheduling_data(sched_ctx);
 			_starpu_barrier_counter_destroy(&sched_ctx->tasks_barrier);
 			_starpu_barrier_counter_destroy(&sched_ctx->ready_tasks_barrier);
+			_starpu_sched_ctx_unlock_write(i);
+			STARPU_PTHREAD_RWLOCK_DESTROY(&sched_ctx->rwlock);
 			_starpu_delete_sched_ctx(sched_ctx);
 		}
-		STARPU_PTHREAD_RWLOCK_UNLOCK(&ctx_mutex[i]);
-		STARPU_PTHREAD_RWLOCK_DESTROY(&ctx_mutex[i]);
+		else
+		{
+			_starpu_sched_ctx_unlock_write(i);
+			STARPU_PTHREAD_RWLOCK_DESTROY(&sched_ctx->rwlock);
+		}
 	}
 
 	STARPU_PTHREAD_KEY_DELETE(sched_ctx_key);
@@ -1240,9 +1236,9 @@ unsigned _starpu_can_push_task(struct _starpu_sched_ctx *sched_ctx, struct starp
 	{
 		if (window_size == 0.0) return 1;
 
-		STARPU_PTHREAD_RWLOCK_RDLOCK(&ctx_mutex[sched_ctx->id]);
+		_starpu_sched_ctx_lock_read(sched_ctx->id);
 		double expected_end = sched_ctx->sched_policy->simulate_push_task(task);
-		STARPU_PTHREAD_RWLOCK_UNLOCK(&ctx_mutex[sched_ctx->id]);
+		_starpu_sched_ctx_unlock_read(sched_ctx->id);
 
 		double expected_len = 0.0;
 		if(hyp_actual_start_sample[sched_ctx->id] != 0.0)
@@ -1357,11 +1353,11 @@ void starpu_sched_ctx_add_workers(int *workers_to_add, int nworkers_to_add, unsi
 	STARPU_ASSERT(workers_to_add != NULL && nworkers_to_add > 0);
 	_starpu_check_workers(workers_to_add, nworkers_to_add);
 
-	STARPU_PTHREAD_RWLOCK_WRLOCK(&ctx_mutex[sched_ctx_id]);
+	_starpu_sched_ctx_lock_write(sched_ctx_id);
 	lock_workers_for_changing_ctx(nworkers_to_add, workers_to_add);
 	add_locked_workers(workers_to_add, nworkers_to_add, sched_ctx_id);
 	unlock_workers_for_changing_ctx(nworkers_to_add, workers_to_add);
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&ctx_mutex[sched_ctx_id]);
+	_starpu_sched_ctx_unlock_write(sched_ctx_id);
 }
 
 void starpu_sched_ctx_remove_workers(int *workers_to_remove, int nworkers_to_remove, unsigned sched_ctx_id)
@@ -1370,7 +1366,7 @@ void starpu_sched_ctx_remove_workers(int *workers_to_remove, int nworkers_to_rem
 
 	_starpu_check_workers(workers_to_remove, nworkers_to_remove);
 
-	STARPU_PTHREAD_RWLOCK_WRLOCK(&ctx_mutex[sched_ctx_id]);
+	_starpu_sched_ctx_lock_write(sched_ctx_id);
 	/* if the context has not already been deleted */
 	if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 	{
@@ -1387,7 +1383,7 @@ void starpu_sched_ctx_remove_workers(int *workers_to_remove, int nworkers_to_rem
 		}
 		unlock_workers_for_changing_ctx(nworkers_to_remove, workers_to_remove);
 	}
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&ctx_mutex[sched_ctx_id]);
+	_starpu_sched_ctx_unlock_write(sched_ctx_id);
 
 	return;
 }
@@ -1396,7 +1392,7 @@ int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struct _star
 {
 	unsigned nworkers = 0;
 
-	STARPU_PTHREAD_RWLOCK_RDLOCK(&ctx_mutex[sched_ctx->id]);
+	_starpu_sched_ctx_lock_read(sched_ctx->id);
 	struct starpu_worker_collection *workers = sched_ctx->workers;
 
 	struct starpu_sched_ctx_iterator it;
@@ -1409,7 +1405,7 @@ int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struct _star
 		if (starpu_worker_can_execute_task_first_impl(worker, task, NULL))
 			nworkers++;
 	}
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&ctx_mutex[sched_ctx->id]);
+	_starpu_sched_ctx_unlock_read(sched_ctx->id);
 
 	return nworkers;
 }
@@ -1562,8 +1558,7 @@ unsigned _starpu_increment_nready_tasks_of_sched_ctx(unsigned sched_ctx_id, doub
 
 	if(!sched_ctx->is_initial_sched)
 	{
-		starpu_pthread_rwlock_t *ctx_mutex = _starpu_sched_ctx_get_mutex(sched_ctx->id);
-		STARPU_PTHREAD_RWLOCK_WRLOCK(ctx_mutex);
+		_starpu_sched_ctx_lock_write(sched_ctx->id);
 	}
 
 	_starpu_barrier_counter_increment(&sched_ctx->ready_tasks_barrier, ready_flops);
@@ -1577,8 +1572,7 @@ unsigned _starpu_increment_nready_tasks_of_sched_ctx(unsigned sched_ctx_id, doub
 			ret = 0;
 		}
 
-		starpu_pthread_rwlock_t *ctx_mutex = _starpu_sched_ctx_get_mutex(sched_ctx->id);
-		STARPU_PTHREAD_RWLOCK_UNLOCK(ctx_mutex);
+		_starpu_sched_ctx_unlock_write(sched_ctx->id);
 	}
 	return ret;
 }
@@ -1589,8 +1583,7 @@ void _starpu_decrement_nready_tasks_of_sched_ctx(unsigned sched_ctx_id, double r
 
 	if(!sched_ctx->is_initial_sched)
 	{
-		starpu_pthread_rwlock_t *ctx_mutex = _starpu_sched_ctx_get_mutex(sched_ctx->id);
-		STARPU_PTHREAD_RWLOCK_WRLOCK(ctx_mutex);
+		_starpu_sched_ctx_lock_write(sched_ctx->id);
 	}
 
 	_starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->ready_tasks_barrier, ready_flops);
@@ -1599,8 +1592,7 @@ void _starpu_decrement_nready_tasks_of_sched_ctx(unsigned sched_ctx_id, double r
 	if(!sched_ctx->is_initial_sched)
 	{
 		_starpu_fetch_task_from_waiting_list(sched_ctx);
-		starpu_pthread_rwlock_t *ctx_mutex = _starpu_sched_ctx_get_mutex(sched_ctx->id);
-		STARPU_PTHREAD_RWLOCK_UNLOCK(ctx_mutex);
+		_starpu_sched_ctx_unlock_write(sched_ctx->id);
 	}
 
 }
@@ -1841,11 +1833,6 @@ int _starpu_get_workers_of_sched_ctx(unsigned sched_ctx_id, int *pus, enum starp
 	return npus;
 }
 
-starpu_pthread_rwlock_t* _starpu_sched_ctx_get_mutex(unsigned sched_ctx_id)
-{
-	return &ctx_mutex[sched_ctx_id];
-}
-
 unsigned starpu_sched_ctx_get_nworkers(unsigned sched_ctx_id)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
@@ -2273,19 +2260,17 @@ void starpu_sched_ctx_list_task_counters_increment_all(struct starpu_task *task,
 	   hence our counters are useless */
 	if (_starpu_get_nsched_ctxs() > 1)
 	{
-		struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+		_starpu_sched_ctx_lock_write(sched_ctx_id);
 		struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 		struct starpu_sched_ctx_iterator it;
 
 		workers->init_iterator_for_parallel_tasks(workers, &it, task);
-		starpu_pthread_rwlock_t *ctx_mutex = _starpu_sched_ctx_get_mutex(sched_ctx->id);
-		STARPU_PTHREAD_RWLOCK_WRLOCK(ctx_mutex);
 		while(workers->has_next(workers, &it))
 		{
 			int worker = workers->get_next(workers, &it);
 			starpu_sched_ctx_list_task_counters_increment(sched_ctx_id, worker);
 		}
-		STARPU_PTHREAD_RWLOCK_UNLOCK(ctx_mutex);
+		_starpu_sched_ctx_unlock_write(sched_ctx_id);
 	}
 }
 
@@ -2301,12 +2286,10 @@ void starpu_sched_ctx_list_task_counters_decrement_all(struct starpu_task *task,
 			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&curr_worker_str->sched_mutex);
 		}
 
-		struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+		_starpu_sched_ctx_lock_write(sched_ctx_id);
 		struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 		struct starpu_sched_ctx_iterator it;
 		workers->init_iterator_for_parallel_tasks(workers, &it, task);
-		starpu_pthread_rwlock_t *ctx_mutex = _starpu_sched_ctx_get_mutex(sched_ctx->id);
-		STARPU_PTHREAD_RWLOCK_WRLOCK(ctx_mutex);
 		while(workers->has_next(workers, &it))
 		{
 			int worker = workers->get_next(workers, &it);
@@ -2319,7 +2302,7 @@ void starpu_sched_ctx_list_task_counters_decrement_all(struct starpu_task *task,
 				STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker_str->sched_mutex);
 			}
 		}
-		STARPU_PTHREAD_RWLOCK_UNLOCK(ctx_mutex);
+		_starpu_sched_ctx_unlock_write(sched_ctx_id);
 
 		if(curr_workerid != -1)
 			STARPU_PTHREAD_MUTEX_LOCK_SCHED(&curr_worker_str->sched_mutex);
@@ -2338,12 +2321,10 @@ void starpu_sched_ctx_list_task_counters_reset_all(struct starpu_task *task, uns
 			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&curr_worker_str->sched_mutex);
 		}
 
-		struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+		_starpu_sched_ctx_lock_write(sched_ctx_id);
 		struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 		struct starpu_sched_ctx_iterator it;
 		workers->init_iterator_for_parallel_tasks(workers, &it, task);
-		starpu_pthread_rwlock_t *ctx_mutex = _starpu_sched_ctx_get_mutex(sched_ctx->id);
-		STARPU_PTHREAD_RWLOCK_WRLOCK(ctx_mutex);
 		while(workers->has_next(workers, &it))
 		{
 			int worker = workers->get_next(workers, &it);
@@ -2355,7 +2336,7 @@ void starpu_sched_ctx_list_task_counters_reset_all(struct starpu_task *task, uns
 				STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker_str->sched_mutex);
 			}
 		}
-		STARPU_PTHREAD_RWLOCK_UNLOCK(ctx_mutex);
+		_starpu_sched_ctx_unlock_write(sched_ctx_id);
 
 		if(curr_workerid != -1)
 			STARPU_PTHREAD_MUTEX_LOCK_SCHED(&curr_worker_str->sched_mutex);

+ 42 - 3
src/core/sched_ctx.h

@@ -175,6 +175,9 @@ struct _starpu_sched_ctx
 	int sms_end_idx;
 
 	int stream_worker;
+
+	starpu_pthread_rwlock_t rwlock;
+	starpu_pthread_t lock_write_owner;
 };
 
 struct _starpu_machine_config;
@@ -226,9 +229,6 @@ void _starpu_worker_gets_out_of_ctx(unsigned sched_ctx_id, struct _starpu_worker
 /* Check if the worker belongs to another sched_ctx */
 unsigned _starpu_worker_belongs_to_a_sched_ctx(int workerid, unsigned sched_ctx_id);
 
-/* mutex synchronising several simultaneous modifications of a context */
-starpu_pthread_rwlock_t* _starpu_sched_ctx_get_mutex(unsigned sched_ctx_id);
-
 /* indicates wheather this worker should go to sleep or not 
    (if it is the last one awake in a context he should better keep awake) */
 unsigned _starpu_sched_ctx_last_worker_awake(struct _starpu_worker *worker);
@@ -263,4 +263,43 @@ struct _starpu_sched_ctx *__starpu_sched_ctx_get_sched_ctx_for_worker_and_job(st
 #define _starpu_sched_ctx_get_sched_ctx_for_worker_and_job(w,j) \
 	(_starpu_get_nsched_ctxs() <= 1 ? _starpu_get_sched_ctx_struct(0) : __starpu_sched_ctx_get_sched_ctx_for_worker_and_job((w),(j)))
 
+static inline struct _starpu_sched_ctx *_starpu_get_sched_ctx_struct(unsigned id);
+
+static inline int _starpu_sched_ctx_check_write_locked(unsigned sched_ctx_id)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	return sched_ctx->lock_write_owner == pthread_self();
+}
+#define STARPU_SCHED_CTX_CHECK_LOCK(ctx) STARPU_ASSERT(_starpu_sched_ctx_check_write_locked((ctx)))
+
+static inline void _starpu_sched_ctx_lock_write(unsigned sched_ctx_id)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	STARPU_ASSERT(sched_ctx->lock_write_owner != pthread_self());
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&sched_ctx->rwlock);
+	sched_ctx->lock_write_owner = pthread_self();
+}
+
+static inline void _starpu_sched_ctx_unlock_write(unsigned sched_ctx_id)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	STARPU_ASSERT(sched_ctx->lock_write_owner == pthread_self());
+	sched_ctx->lock_write_owner = 0;
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&sched_ctx->rwlock);
+}
+
+static inline void _starpu_sched_ctx_lock_read(unsigned sched_ctx_id)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	STARPU_ASSERT(sched_ctx->lock_write_owner != pthread_self());
+	STARPU_PTHREAD_RWLOCK_RDLOCK(&sched_ctx->rwlock);
+}
+
+static inline void _starpu_sched_ctx_unlock_read(unsigned sched_ctx_id)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	STARPU_ASSERT(sched_ctx->lock_write_owner != pthread_self());
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&sched_ctx->rwlock);
+}
+
 #endif // __SCHED_CONTEXT_H__

+ 6 - 9
src/core/sched_policy.c

@@ -431,10 +431,9 @@ int _starpu_repush_task(struct _starpu_job *j)
 
 		if(nworkers == 0)
 		{
-			starpu_pthread_rwlock_t *ctx_mutex = _starpu_sched_ctx_get_mutex(sched_ctx->id);
-			STARPU_PTHREAD_RWLOCK_WRLOCK(ctx_mutex);
+			_starpu_sched_ctx_lock_write(sched_ctx->id);
 			starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
-			STARPU_PTHREAD_RWLOCK_UNLOCK(ctx_mutex);
+			_starpu_sched_ctx_unlock_write(sched_ctx->id);
 #ifdef STARPU_USE_SC_HYPERVISOR
 			if(sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
 			   && sched_ctx->perf_counters->notify_empty_ctx)
@@ -498,10 +497,9 @@ int _starpu_push_task_to_workers(struct starpu_task *task)
 
 		if (nworkers == 0)
 		{
-			starpu_pthread_rwlock_t *ctx_mutex = _starpu_sched_ctx_get_mutex(sched_ctx->id);
-			STARPU_PTHREAD_RWLOCK_WRLOCK(ctx_mutex);
+			_starpu_sched_ctx_lock_write(sched_ctx->id);
 			starpu_task_list_push_back(&sched_ctx->empty_ctx_tasks, task);
-			STARPU_PTHREAD_RWLOCK_UNLOCK(ctx_mutex);
+			_starpu_sched_ctx_unlock_write(sched_ctx->id);
 #ifdef STARPU_USE_SC_HYPERVISOR
 			if(sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
 			   && sched_ctx->perf_counters->notify_empty_ctx)
@@ -591,8 +589,7 @@ int _starpu_push_task_to_workers(struct starpu_task *task)
 		{
 			STARPU_ASSERT(sched_ctx->sched_policy->push_task);
 			/* check out if there are any workers in the context */
-			starpu_pthread_rwlock_t *ctx_mutex = _starpu_sched_ctx_get_mutex(sched_ctx->id);
-			STARPU_PTHREAD_RWLOCK_WRLOCK(ctx_mutex);
+			_starpu_sched_ctx_lock_write(sched_ctx->id);
 			nworkers = starpu_sched_ctx_get_nworkers(sched_ctx->id);
 			if (nworkers == 0)
 				ret = -1;
@@ -603,7 +600,7 @@ int _starpu_push_task_to_workers(struct starpu_task *task)
 				ret = sched_ctx->sched_policy->push_task(task);
 				_STARPU_SCHED_END;
 			}
-			STARPU_PTHREAD_RWLOCK_UNLOCK(ctx_mutex);
+			_starpu_sched_ctx_unlock_write(sched_ctx->id);
 		}
 
 		if(ret == -1)

+ 1 - 1
src/core/workers.h

@@ -508,7 +508,7 @@ static inline struct _starpu_worker *_starpu_get_worker_struct(unsigned id)
 	return &_starpu_config.workers[id];
 }
 
-/* Returns the starpu_sched_ctx structure that descriebes the state of the 
+/* Returns the starpu_sched_ctx structure that describes the state of the 
  * specified ctx */
 static inline struct _starpu_sched_ctx *_starpu_get_sched_ctx_struct(unsigned id)
 {