Browse Source

Check the return value of pthread_mutex_lock(...) and pthread_mutex_unlock(...)

Nathalie Furmento 15 years ago
parent
commit
cde4ad65e2

+ 3 - 0
src/common/utils.h

@@ -24,4 +24,7 @@
 int _starpu_mkpath(const char *s, mode_t mode);
 int _starpu_check_mutex_deadlock(pthread_mutex_t *mutex);
 
+#define PTHREAD_MUTEX_LOCK(mutex) { int ret = pthread_mutex_lock(mutex); if (STARPU_UNLIKELY(ret)) { perror("pthread_mutex_lock"); STARPU_ABORT(); }}
+#define PTHREAD_MUTEX_UNLOCK(mutex) { int ret = pthread_mutex_unlock(mutex); if (STARPU_UNLIKELY(ret)) { perror("pthread_mutex_unlock"); STARPU_ABORT(); }}
+
 #endif // __COMMON_UTILS_H__

+ 3 - 2
src/core/debug.c

@@ -16,6 +16,7 @@
 
 #include <common/config.h>
 #include <core/debug.h>
+#include <common/utils.h>
 
 #ifdef STARPU_VERBOSE
 /* we want a single writer at the same time to have a log that is readable */
@@ -52,9 +53,9 @@ void _starpu_print_to_logfile(const char *format __attribute__((unused)), ...)
 #ifdef STARPU_VERBOSE
 	va_list args;
 	va_start(args, format);
-	pthread_mutex_lock(&logfile_mutex);
+	PTHREAD_MUTEX_LOCK(&logfile_mutex);
 	vfprintf(logfile, format, args);
-	pthread_mutex_unlock(&logfile_mutex);
+	PTHREAD_MUTEX_UNLOCK(&logfile_mutex);
 	va_end( args );
 #endif
 }

+ 3 - 2
src/core/dependencies/cg.c

@@ -16,6 +16,7 @@
 
 #include <starpu.h>
 #include <common/config.h>
+#include <common/utils.h>
 #include <core/jobs.h>
 #include <core/dependencies/cg.h>
 #include <core/dependencies/tags.h>
@@ -94,10 +95,10 @@ void _starpu_notify_cg(starpu_cg_t *cg)
 			case STARPU_CG_APPS:
 				/* this is a cg for an application waiting on a set of
 	 			 * tags, wake the thread */
-				pthread_mutex_lock(&cg->succ.succ_apps.cg_mutex);
+				PTHREAD_MUTEX_LOCK(&cg->succ.succ_apps.cg_mutex);
 				cg->succ.succ_apps.completed = 1;
 				pthread_cond_signal(&cg->succ.succ_apps.cg_cond);
-				pthread_mutex_unlock(&cg->succ.succ_apps.cg_mutex);
+				PTHREAD_MUTEX_UNLOCK(&cg->succ.succ_apps.cg_mutex);
 				break;
 
 			case STARPU_CG_TAG:

+ 3 - 2
src/core/dependencies/tags.c

@@ -16,6 +16,7 @@
 
 #include <starpu.h>
 #include <common/config.h>
+#include <common/utils.h>
 #include <core/dependencies/tags.h>
 #include <core/dependencies/htable.h>
 #include <core/jobs.h>
@@ -328,12 +329,12 @@ int starpu_tag_wait_array(unsigned ntags, starpu_tag_t *id)
 		_starpu_spin_unlock(&tag_array[i]->lock);
 	}
 
-	pthread_mutex_lock(&cg->succ.succ_apps.cg_mutex);
+	PTHREAD_MUTEX_LOCK(&cg->succ.succ_apps.cg_mutex);
 
 	if (!cg->succ.succ_apps.completed)
 		pthread_cond_wait(&cg->succ.succ_apps.cg_cond, &cg->succ.succ_apps.cg_mutex);
 
-	pthread_mutex_unlock(&cg->succ.succ_apps.cg_mutex);
+	PTHREAD_MUTEX_UNLOCK(&cg->succ.succ_apps.cg_mutex);
 
 	pthread_mutex_destroy(&cg->succ.succ_apps.cg_mutex);
 	pthread_cond_destroy(&cg->succ.succ_apps.cg_cond);

+ 3 - 2
src/core/dependencies/task_deps.c

@@ -16,6 +16,7 @@
 
 #include <starpu.h>
 #include <common/config.h>
+#include <common/utils.h>
 #include <core/dependencies/tags.h>
 #include <core/dependencies/htable.h>
 #include <core/jobs.h>
@@ -74,8 +75,8 @@ void starpu_task_declare_deps_array(struct starpu_task *task, unsigned ndeps, st
 
 		job = _starpu_get_job_associated_to_task(dep_task);
 
-		pthread_mutex_lock(&job->sync_mutex);
+		PTHREAD_MUTEX_LOCK(&job->sync_mutex);
 		_starpu_task_add_succ(job, cg);
-		pthread_mutex_unlock(&job->sync_mutex);
+		PTHREAD_MUTEX_UNLOCK(&job->sync_mutex);
 	}
 }

+ 13 - 12
src/core/jobs.c

@@ -19,6 +19,7 @@
 #include <core/workers.h>
 #include <core/dependencies/data_concurrency.h>
 #include <common/config.h>
+#include <common/utils.h>
 
 size_t _starpu_job_get_data_size(starpu_job_t j)
 {
@@ -78,7 +79,7 @@ void _starpu_wait_job(starpu_job_t j)
 	STARPU_ASSERT(j->task);
 	STARPU_ASSERT(!j->task->detach);
 
-	pthread_mutex_lock(&j->sync_mutex);
+	PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 
 	if (!j->terminated)
 		pthread_cond_wait(&j->sync_cond, &j->sync_mutex);
@@ -86,7 +87,7 @@ void _starpu_wait_job(starpu_job_t j)
 	/* reset the job state so that it can be reused again */
 	j->terminated = 0;
 
-	pthread_mutex_unlock(&j->sync_mutex);
+	PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 }
 
 void _starpu_handle_job_termination(starpu_job_t j)
@@ -100,9 +101,9 @@ void _starpu_handle_job_termination(starpu_job_t j)
 	/* We must have set the j->terminated flag early, so that it is
 	 * possible to express task dependencies within the callback
 	 * function. */
-	pthread_mutex_lock(&j->sync_mutex);
+	PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 	j->terminated = 1;
-	pthread_mutex_unlock(&j->sync_mutex);
+	PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 
 	/* the callback is executed after the dependencies so that we may remove the tag 
  	 * of the task itself */
@@ -134,9 +135,9 @@ void _starpu_handle_job_termination(starpu_job_t j)
 	{
 		/* we do not desallocate the job structure if some is going to
 		 * wait after the task */
-		pthread_mutex_lock(&j->sync_mutex);
+		PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 		pthread_cond_broadcast(&j->sync_cond);
-		pthread_mutex_unlock(&j->sync_mutex);
+		PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 	}
 	else {
 		/* no one is going to synchronize with that task so we release
@@ -199,7 +200,7 @@ static unsigned _starpu_not_all_task_deps_are_fulfilled(starpu_job_t j)
 
 	struct starpu_cg_list_s *job_successors = &j->job_successors;
 
-	pthread_mutex_lock(&j->sync_mutex);	
+	PTHREAD_MUTEX_LOCK(&j->sync_mutex);	
 
 	if (!j->submitted || (job_successors->ndeps != job_successors->ndeps_completed))
 	{
@@ -212,7 +213,7 @@ static unsigned _starpu_not_all_task_deps_are_fulfilled(starpu_job_t j)
 		ret = 0;
 	}
 
-	pthread_mutex_unlock(&j->sync_mutex);
+	PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 
 	return ret;
 }
@@ -283,12 +284,12 @@ struct starpu_job_s *_starpu_pop_local_task(struct starpu_worker_s *worker)
 {
 	struct starpu_job_s *j = NULL;
 
-	pthread_mutex_lock(&worker->local_jobs_mutex);
+	PTHREAD_MUTEX_LOCK(&worker->local_jobs_mutex);
 
 	if (!starpu_job_list_empty(worker->local_jobs))
 		j = starpu_job_list_pop_back(worker->local_jobs);
 
-	pthread_mutex_unlock(&worker->local_jobs_mutex);
+	PTHREAD_MUTEX_UNLOCK(&worker->local_jobs_mutex);
 
 	return j;
 }
@@ -300,11 +301,11 @@ int _starpu_push_local_task(struct starpu_worker_s *worker, struct starpu_job_s
 	if (STARPU_UNLIKELY(!(worker->worker_mask & j->task->cl->where)))
 		return -ENODEV;
 
-	pthread_mutex_lock(&worker->local_jobs_mutex);
+	PTHREAD_MUTEX_LOCK(&worker->local_jobs_mutex);
 
 	starpu_job_list_push_front(worker->local_jobs, j);
 
-	pthread_mutex_unlock(&worker->local_jobs_mutex);
+	PTHREAD_MUTEX_UNLOCK(&worker->local_jobs_mutex);
 
 	/* XXX that's a bit excessive ... */
 	_starpu_wake_all_blocked_workers_on_node(worker->memory_node);

+ 11 - 10
src/core/mechanisms/deque_queues.c

@@ -17,6 +17,7 @@
 #include <pthread.h>
 #include <core/mechanisms/deque_queues.h>
 #include <errno.h>
+#include <common/utils.h>
 
 /* keep track of the total number of jobs to be scheduled to avoid infinite 
  * polling when there are really few jobs in the overall queue */
@@ -115,13 +116,13 @@ int _starpu_deque_push_task(struct starpu_jobq_s *q, starpu_job_t task)
 	struct starpu_deque_jobq_s *deque_queue = q->queue;
 
 	/* if anyone is blocked on the entire machine, wake it up */
-	pthread_mutex_lock(sched_mutex);
+	PTHREAD_MUTEX_LOCK(sched_mutex);
 	total_number_of_jobs++;
 	pthread_cond_signal(sched_cond);
-	pthread_mutex_unlock(sched_mutex);
+	PTHREAD_MUTEX_UNLOCK(sched_mutex);
 
 	/* wake people waiting locally */
-	pthread_mutex_lock(&q->activity_mutex);
+	PTHREAD_MUTEX_LOCK(&q->activity_mutex);
 
 	STARPU_TRACE_JOB_PUSH(task, 0);
 	starpu_job_list_push_front(deque_queue->jobq, task);
@@ -129,7 +130,7 @@ int _starpu_deque_push_task(struct starpu_jobq_s *q, starpu_job_t task)
 	deque_queue->nprocessed++;
 
 	pthread_cond_signal(&q->activity_cond);
-	pthread_mutex_unlock(&q->activity_mutex);
+	PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
 
 	return 0;
 }
@@ -158,9 +159,9 @@ starpu_job_t _starpu_deque_pop_task(struct starpu_jobq_s *q)
 
 		/* we are sure that we got it now, so at worst, some people thought 
 		 * there remained some work and will soon discover it is not true */
-		pthread_mutex_lock(sched_mutex);
+		PTHREAD_MUTEX_LOCK(sched_mutex);
 		total_number_of_jobs--;
-		pthread_mutex_unlock(sched_mutex);
+		PTHREAD_MUTEX_UNLOCK(sched_mutex);
 	}
 	
 	return j;
@@ -174,7 +175,7 @@ struct starpu_job_list_s * _starpu_deque_pop_every_task(struct starpu_jobq_s *q,
 	struct starpu_deque_jobq_s *deque_queue = q->queue;
 
 	/* block until some task is available in that queue */
-	pthread_mutex_lock(&q->activity_mutex);
+	PTHREAD_MUTEX_LOCK(&q->activity_mutex);
 
 	if (deque_queue->njobs == 0)
 	{
@@ -219,13 +220,13 @@ struct starpu_job_list_s * _starpu_deque_pop_every_task(struct starpu_jobq_s *q,
 	
 			/* we are sure that we got it now, so at worst, some people thought
 			 * there remained some work and will soon discover it is not true */
-			pthread_mutex_lock(sched_mutex);
+			PTHREAD_MUTEX_LOCK(sched_mutex);
 			total_number_of_jobs -= new_list_size;
-			pthread_mutex_unlock(sched_mutex);
+			PTHREAD_MUTEX_UNLOCK(sched_mutex);
 		}
 	}
 	
-	pthread_mutex_unlock(&q->activity_mutex);
+	PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
 
 	return new_list;
 }

+ 11 - 10
src/core/mechanisms/fifo_queues.c

@@ -17,6 +17,7 @@
 #include <pthread.h>
 #include <core/mechanisms/fifo_queues.h>
 #include <errno.h>
+#include <common/utils.h>
 
 static pthread_cond_t *sched_cond;
 static pthread_mutex_t *sched_mutex;
@@ -79,12 +80,12 @@ int _starpu_fifo_push_prio_task(struct starpu_jobq_s *q, starpu_job_t task)
 	struct starpu_fifo_jobq_s *fifo_queue = q->queue;
 
 	/* if anyone is blocked on the entire machine, wake it up */
-	pthread_mutex_lock(sched_mutex);
+	PTHREAD_MUTEX_LOCK(sched_mutex);
 	pthread_cond_signal(sched_cond);
-	pthread_mutex_unlock(sched_mutex);
+	PTHREAD_MUTEX_UNLOCK(sched_mutex);
 	
 	/* wake people waiting locally */
-	pthread_mutex_lock(&q->activity_mutex);
+	PTHREAD_MUTEX_LOCK(&q->activity_mutex);
 
 	STARPU_TRACE_JOB_PUSH(task, 0);
 	starpu_job_list_push_back(fifo_queue->jobq, task);
@@ -92,7 +93,7 @@ int _starpu_fifo_push_prio_task(struct starpu_jobq_s *q, starpu_job_t task)
 	fifo_queue->nprocessed++;
 
 	pthread_cond_signal(&q->activity_cond);
-	pthread_mutex_unlock(&q->activity_mutex);
+	PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
 
 	return 0;
 #else
@@ -106,12 +107,12 @@ int _starpu_fifo_push_task(struct starpu_jobq_s *q, starpu_job_t task)
 	struct starpu_fifo_jobq_s *fifo_queue = q->queue;
 
 	/* if anyone is blocked on the entire machine, wake it up */
-	pthread_mutex_lock(sched_mutex);
+	PTHREAD_MUTEX_LOCK(sched_mutex);
 	pthread_cond_signal(sched_cond);
-	pthread_mutex_unlock(sched_mutex);
+	PTHREAD_MUTEX_UNLOCK(sched_mutex);
 	
 	/* wake people waiting locally */
-	pthread_mutex_lock(&q->activity_mutex);
+	PTHREAD_MUTEX_LOCK(&q->activity_mutex);
 
 	STARPU_TRACE_JOB_PUSH(task, 0);
 	starpu_job_list_push_front(fifo_queue->jobq, task);
@@ -119,7 +120,7 @@ int _starpu_fifo_push_task(struct starpu_jobq_s *q, starpu_job_t task)
 	fifo_queue->nprocessed++;
 
 	pthread_cond_signal(&q->activity_cond);
-	pthread_mutex_unlock(&q->activity_mutex);
+	PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
 
 	return 0;
 }
@@ -157,7 +158,7 @@ struct starpu_job_list_s * _starpu_fifo_pop_every_task(struct starpu_jobq_s *q,
 	STARPU_ASSERT(q);
 	struct starpu_fifo_jobq_s *fifo_queue = q->queue;
 
-	pthread_mutex_lock(&q->activity_mutex);
+	PTHREAD_MUTEX_LOCK(&q->activity_mutex);
 
 	size = fifo_queue->njobs;
 
@@ -202,7 +203,7 @@ struct starpu_job_list_s * _starpu_fifo_pop_every_task(struct starpu_jobq_s *q,
 		}
 	}
 
-	pthread_mutex_unlock(&q->activity_mutex);
+	PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
 
 	return new_list;
 }

+ 7 - 6
src/core/mechanisms/priority_queues.c

@@ -17,6 +17,7 @@
 #include <starpu.h>
 #include <common/config.h>
 #include <core/mechanisms/priority_queues.h>
+#include <common/utils.h>
 
 /*
  * Centralized queue with priorities 
@@ -88,12 +89,12 @@ int _starpu_priority_push_task(struct starpu_jobq_s *q, starpu_job_t j)
 	struct starpu_priority_jobq_s *queue = q->queue;
 
 	/* if anyone is blocked on the entire machine, wake it up */
-	pthread_mutex_lock(sched_mutex);
+	PTHREAD_MUTEX_LOCK(sched_mutex);
 	pthread_cond_signal(sched_cond);
-	pthread_mutex_unlock(sched_mutex);
+	PTHREAD_MUTEX_UNLOCK(sched_mutex);
 
 	/* wake people waiting locally */
-	pthread_mutex_lock(&q->activity_mutex);
+	PTHREAD_MUTEX_LOCK(&q->activity_mutex);
 
 	STARPU_TRACE_JOB_PUSH(j, 1);
 	
@@ -104,7 +105,7 @@ int _starpu_priority_push_task(struct starpu_jobq_s *q, starpu_job_t j)
 	queue->total_njobs++;
 
 	pthread_cond_signal(&q->activity_cond);
-	pthread_mutex_unlock(&q->activity_mutex);
+	PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
 
 	return 0;
 }
@@ -117,7 +118,7 @@ starpu_job_t _starpu_priority_pop_task(struct starpu_jobq_s *q)
 	struct starpu_priority_jobq_s *queue = q->queue;
 
 	/* block until some event happens */
-	pthread_mutex_lock(&q->activity_mutex);
+	PTHREAD_MUTEX_LOCK(&q->activity_mutex);
 
 	if ((queue->total_njobs == 0) && _starpu_machine_is_running())
 	{
@@ -142,7 +143,7 @@ starpu_job_t _starpu_priority_pop_task(struct starpu_jobq_s *q)
 		} while (!j && priolevel-- > 0);
 	}
 
-	pthread_mutex_unlock(&q->activity_mutex);
+	PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
 
 	return j;
 }

+ 3 - 14
src/core/mechanisms/queues.c

@@ -15,6 +15,7 @@
  */
 
 #include "queues.h"
+#include <common/utils.h>
 
 /*
  * There can be various queue designs
@@ -81,24 +82,12 @@ void _starpu_jobq_lock(struct starpu_jobq_s *jobq)
 {
 //	_starpu_check_mutex_deadlock(&jobq->activity_mutex);
 
-	int ret;
-	ret = pthread_mutex_lock(&jobq->activity_mutex);	
-	if (STARPU_UNLIKELY(ret))
-	{
-		perror("pthread_mutex_lock");
-		STARPU_ABORT();
-	}
+        PTHREAD_MUTEX_LOCK(&jobq->activity_mutex);
 }
 
 void _starpu_jobq_unlock(struct starpu_jobq_s *jobq)
 {
-	int ret;
-	ret = pthread_mutex_unlock(&jobq->activity_mutex);	
-	if (STARPU_UNLIKELY(ret))
-	{
-		perror("pthread_mutex_lock");
-		STARPU_ABORT();
-	}
+	PTHREAD_MUTEX_UNLOCK(&jobq->activity_mutex);
 }
 
 int _starpu_jobq_trylock(struct starpu_jobq_s *jobq)

+ 11 - 10
src/core/mechanisms/stack_queues.c

@@ -17,6 +17,7 @@
 #include <pthread.h>
 #include <core/mechanisms/stack_queues.h>
 #include <errno.h>
+#include <common/utils.h>
 
 /* keep track of the total number of jobs to be scheduled to avoid infinite 
  * polling when there are really few jobs in the overall queue */
@@ -91,13 +92,13 @@ void _starpu_stack_push_prio_task(struct starpu_jobq_s *q, starpu_job_t task)
 	struct starpu_stack_jobq_s *stack_queue = q->queue;
 
 	/* if anyone is blocked on the entire machine, wake it up */
-	pthread_mutex_lock(sched_mutex);
+	PTHREAD_MUTEX_LOCK(sched_mutex);
 	total_number_of_jobs++;
 	pthread_cond_signal(sched_cond);
-	pthread_mutex_unlock(sched_mutex);
+	PTHREAD_MUTEX_UNLOCK(sched_mutex);
 
 	/* wake people waiting locally */
-	pthread_mutex_lock(&q->activity_mutex);
+	PTHREAD_MUTEX_LOCK(&q->activity_mutex);
 
 	STARPU_TRACE_JOB_PUSH(task, 0);
 	starpu_job_list_push_back(stack_queue->jobq, task);
@@ -105,7 +106,7 @@ void _starpu_stack_push_prio_task(struct starpu_jobq_s *q, starpu_job_t task)
 	deque_queue->nprocessed++;
 
 	pthread_cond_signal(&q->activity_cond);
-	pthread_mutex_unlock(&q->activity_mutex);
+	PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
 #else
 	_starpu_stack_push_task(q, task);
 #endif
@@ -117,13 +118,13 @@ void _starpu_stack_push_task(struct starpu_jobq_s *q, starpu_job_t task)
 	struct starpu_stack_jobq_s *stack_queue = q->queue;
 
 	/* if anyone is blocked on the entire machine, wake it up */
-	pthread_mutex_lock(sched_mutex);
+	PTHREAD_MUTEX_LOCK(sched_mutex);
 	total_number_of_jobs++;
 	pthread_cond_signal(sched_cond);
-	pthread_mutex_unlock(sched_mutex);
+	PTHREAD_MUTEX_UNLOCK(sched_mutex);
 
 	/* wake people waiting locally */
-	pthread_mutex_lock(&q->activity_mutex);
+	PTHREAD_MUTEX_LOCK(&q->activity_mutex);
 
 	STARPU_TRACE_JOB_PUSH(task, 0);
 	starpu_job_list_push_front(stack_queue->jobq, task);
@@ -131,7 +132,7 @@ void _starpu_stack_push_task(struct starpu_jobq_s *q, starpu_job_t task)
 	deque_queue->nprocessed++;
 
 	pthread_cond_signal(&q->activity_cond);
-	pthread_mutex_unlock(&q->activity_mutex);
+	PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
 }
 
 starpu_job_t _starpu_stack_pop_task(struct starpu_jobq_s *q)
@@ -156,9 +157,9 @@ starpu_job_t _starpu_stack_pop_task(struct starpu_jobq_s *q)
 
 		/* we are sure that we got it now, so at worst, some people thought 
 		 * there remained some work and will soon discover it is not true */
-		pthread_mutex_lock(sched_mutex);
+		PTHREAD_MUTEX_LOCK(sched_mutex);
 		total_number_of_jobs--;
-		pthread_mutex_unlock(sched_mutex);
+		PTHREAD_MUTEX_UNLOCK(sched_mutex);
 	}
 	
 	return j;

+ 3 - 2
src/core/policies/sched_policy.c

@@ -18,6 +18,7 @@
 
 #include <starpu.h>
 #include <common/config.h>
+#include <common/utils.h>
 #include <core/mechanisms/queues.h>
 #include <core/policies/sched_policy.h>
 #include <core/policies/no_prio_policy.h>
@@ -272,7 +273,7 @@ void _starpu_wait_on_sched_event(void)
 {
 	struct starpu_jobq_s *q = policy.starpu_get_local_queue(&policy);
 
-	pthread_mutex_lock(&q->activity_mutex);
+	PTHREAD_MUTEX_LOCK(&q->activity_mutex);
 
 	_starpu_handle_all_pending_node_data_requests(_starpu_get_local_memory_node());
 
@@ -283,5 +284,5 @@ void _starpu_wait_on_sched_event(void)
 #endif
 	}
 
-	pthread_mutex_unlock(&q->activity_mutex);
+	PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
 }

+ 8 - 7
src/core/progress_hook.c

@@ -16,6 +16,7 @@
 
 #include <pthread.h>
 #include <core/workers.h>
+#include <common/utils.h>
 
 #define NMAXHOOKS	16
 
@@ -32,7 +33,7 @@ static struct progression_hook hooks[NMAXHOOKS] = {{NULL, NULL, 0}};
 int starpu_register_progression_hook(unsigned (*func)(void *arg), void *arg)
 {
 	int hook;
-	pthread_mutex_lock(&progression_hook_mutex);
+	PTHREAD_MUTEX_LOCK(&progression_hook_mutex);
 	for (hook = 0; hook < NMAXHOOKS; hook++)
 	{
 		if (!hooks[hook].active)
@@ -42,13 +43,13 @@ int starpu_register_progression_hook(unsigned (*func)(void *arg), void *arg)
 			hooks[hook].arg = arg;
 			hooks[hook].active = 1;
 
-			pthread_mutex_unlock(&progression_hook_mutex);
+			PTHREAD_MUTEX_UNLOCK(&progression_hook_mutex);
 			
 			return hook;
 		}
 	}
 
-	pthread_mutex_unlock(&progression_hook_mutex);
+	PTHREAD_MUTEX_UNLOCK(&progression_hook_mutex);
 
 	starpu_wake_all_blocked_workers();
 
@@ -58,9 +59,9 @@ int starpu_register_progression_hook(unsigned (*func)(void *arg), void *arg)
 
 void starpu_deregister_progression_hook(int hook_id)
 {
-	pthread_mutex_lock(&progression_hook_mutex);
+	PTHREAD_MUTEX_LOCK(&progression_hook_mutex);
 	hooks[hook_id].active = 0;
-	pthread_mutex_unlock(&progression_hook_mutex);
+	PTHREAD_MUTEX_UNLOCK(&progression_hook_mutex);
 }
 
 unsigned _starpu_execute_registered_progression_hooks(void)
@@ -74,9 +75,9 @@ unsigned _starpu_execute_registered_progression_hooks(void)
 	{
 		unsigned active;
 
-		pthread_mutex_lock(&progression_hook_mutex);
+		PTHREAD_MUTEX_LOCK(&progression_hook_mutex);
 		active = hooks[hook].active;
-		pthread_mutex_unlock(&progression_hook_mutex);
+		PTHREAD_MUTEX_UNLOCK(&progression_hook_mutex);
 
 		unsigned may_block_hook = 1;
 

+ 7 - 6
src/core/task.c

@@ -19,6 +19,7 @@
 #include <core/jobs.h>
 #include <core/task.h>
 #include <common/config.h>
+#include <common/utils.h>
 
 /* XXX this should be reinitialized when StarPU is shutdown (or we should make
  * sure that no task remains !) */
@@ -244,7 +245,7 @@ int starpu_wait_all_tasks(void)
 	if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
 		return -EDEADLK;
 
-	pthread_mutex_lock(&submitted_mutex);
+	PTHREAD_MUTEX_LOCK(&submitted_mutex);
 
 	if (nsubmitted > 0)
 	{
@@ -252,14 +253,14 @@ int starpu_wait_all_tasks(void)
 		STARPU_ASSERT(!res);
 	}
 	
-	pthread_mutex_unlock(&submitted_mutex);
+	PTHREAD_MUTEX_UNLOCK(&submitted_mutex);
 
 	return 0;
 }
 
 void _starpu_decrement_nsubmitted_tasks(void)
 {
-	pthread_mutex_lock(&submitted_mutex);
+	PTHREAD_MUTEX_LOCK(&submitted_mutex);
 	if (--nsubmitted == 0)
 	{
 		int broadcast_res;
@@ -267,15 +268,15 @@ void _starpu_decrement_nsubmitted_tasks(void)
 		STARPU_ASSERT(!broadcast_res);
 
 	}
-	pthread_mutex_unlock(&submitted_mutex);
+	PTHREAD_MUTEX_UNLOCK(&submitted_mutex);
 
 }
 
 static void _starpu_increment_nsubmitted_tasks(void)
 {
-	pthread_mutex_lock(&submitted_mutex);
+	PTHREAD_MUTEX_LOCK(&submitted_mutex);
 	nsubmitted++;
-	pthread_mutex_unlock(&submitted_mutex);
+	PTHREAD_MUTEX_UNLOCK(&submitted_mutex);
 }
 
 void _starpu_initialize_current_task_key(void)

+ 21 - 20
src/core/workers.c

@@ -17,6 +17,7 @@
 #include <stdlib.h>
 #include <stdio.h>
 #include <common/config.h>
+#include <common/utils.h>
 #include <core/workers.h>
 #include <core/debug.h>
 #include <core/task.h>
@@ -164,11 +165,11 @@ static void _starpu_init_workers(struct starpu_machine_config_s *config)
 					pthread_create(&gordon_worker_set.worker_thread, NULL, 
 							_starpu_gordon_worker, &gordon_worker_set);
 
-					pthread_mutex_lock(&gordon_worker_set.mutex);
+					PTHREAD_MUTEX_LOCK(&gordon_worker_set.mutex);
 					if (!gordon_worker_set.set_is_initialized)
 						pthread_cond_wait(&gordon_worker_set.ready_cond,
 									&gordon_worker_set.mutex);
-					pthread_mutex_unlock(&gordon_worker_set.mutex);
+					PTHREAD_MUTEX_UNLOCK(&gordon_worker_set.mutex);
 
 					gordon_inited = 1;
 				}
@@ -191,10 +192,10 @@ static void _starpu_init_workers(struct starpu_machine_config_s *config)
 		switch (workerarg->arch) {
 			case STARPU_CPU_WORKER:
 			case STARPU_CUDA_WORKER:
-				pthread_mutex_lock(&workerarg->mutex);
+				PTHREAD_MUTEX_LOCK(&workerarg->mutex);
 				if (!workerarg->worker_is_initialized)
 					pthread_cond_wait(&workerarg->ready_cond, &workerarg->mutex);
-				pthread_mutex_unlock(&workerarg->mutex);
+				PTHREAD_MUTEX_UNLOCK(&workerarg->mutex);
 				break;
 #ifdef STARPU_USE_GORDON
 			case STARPU_GORDON_WORKER:
@@ -223,7 +224,7 @@ int starpu_init(struct starpu_conf *user_conf)
 {
 	int ret;
 
-	pthread_mutex_lock(&init_mutex);
+	PTHREAD_MUTEX_LOCK(&init_mutex);
 	while (initialized == CHANGING)
 		/* Wait for the other one changing it */
 		pthread_cond_wait(&init_cond, &init_mutex);
@@ -233,7 +234,7 @@ int starpu_init(struct starpu_conf *user_conf)
 		return 0;
 	/* initialized == UNINITIALIZED */
 	initialized = CHANGING;
-	pthread_mutex_unlock(&init_mutex);
+	PTHREAD_MUTEX_UNLOCK(&init_mutex);
 
 #ifdef __MINGW32__
 	WSADATA wsadata;
@@ -258,12 +259,12 @@ int starpu_init(struct starpu_conf *user_conf)
 
 	ret = _starpu_build_topology(&config);
 	if (ret) {
-		pthread_mutex_lock(&init_mutex);
+		PTHREAD_MUTEX_LOCK(&init_mutex);
 		init_count--;
 		initialized = UNINITIALIZED;
 		/* Let somebody else try to do it */
 		pthread_cond_signal(&init_cond);
-		pthread_mutex_unlock(&init_mutex);
+		PTHREAD_MUTEX_UNLOCK(&init_mutex);
 		return ret;
 	}
 
@@ -278,11 +279,11 @@ int starpu_init(struct starpu_conf *user_conf)
 
 	_starpu_init_workers(&config);
 
-	pthread_mutex_lock(&init_mutex);
+	PTHREAD_MUTEX_LOCK(&init_mutex);
 	initialized = INITIALIZED;
 	/* Tell everybody that we initialized */
 	pthread_cond_broadcast(&init_cond);
-	pthread_mutex_unlock(&init_mutex);
+	PTHREAD_MUTEX_UNLOCK(&init_mutex);
 
 	return 0;
 }
@@ -385,10 +386,10 @@ static void _starpu_operate_on_all_queues_attached_to_node(unsigned nodeid, queu
 				pthread_cond_broadcast(&q->activity_cond);
 				break;
 			case LOCK:
-				pthread_mutex_lock(&q->activity_mutex);
+				PTHREAD_MUTEX_LOCK(&q->activity_mutex);
 				break;
 			case UNLOCK:
-				pthread_mutex_unlock(&q->activity_mutex);
+				PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
 				break;
 		}
 	}
@@ -430,10 +431,10 @@ static void _starpu_operate_on_all_queues(queue_op op)
 				pthread_cond_broadcast(&q->activity_cond);
 				break;
 			case LOCK:
-				pthread_mutex_lock(&q->activity_mutex);
+				PTHREAD_MUTEX_LOCK(&q->activity_mutex);
 				break;
 			case UNLOCK:
-				pthread_mutex_unlock(&q->activity_mutex);
+				PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
 				break;
 		}
 	}
@@ -451,7 +452,7 @@ static void _starpu_kill_all_workers(struct starpu_machine_config_s *config)
 	struct starpu_sched_policy_s *sched = _starpu_get_sched_policy();
 
 	_starpu_operate_on_all_queues(LOCK);
-	pthread_mutex_lock(&sched->sched_activity_mutex);
+	PTHREAD_MUTEX_LOCK(&sched->sched_activity_mutex);
 	
 	/* set the flag which will tell workers to stop */
 	config->running = 0;
@@ -459,20 +460,20 @@ static void _starpu_kill_all_workers(struct starpu_machine_config_s *config)
 	_starpu_operate_on_all_queues(BROADCAST);
 	pthread_cond_broadcast(&sched->sched_activity_cond);
 
-	pthread_mutex_unlock(&sched->sched_activity_mutex);
+	PTHREAD_MUTEX_UNLOCK(&sched->sched_activity_mutex);
 	_starpu_operate_on_all_queues(UNLOCK);
 }
 
 void starpu_shutdown(void)
 {
-	pthread_mutex_lock(&init_mutex);
+	PTHREAD_MUTEX_LOCK(&init_mutex);
 	init_count--;
 	if (init_count)
 		/* Still somebody needing StarPU, don't deinitialize */
 		return;
 	/* We're last */
 	initialized = CHANGING;
-	pthread_mutex_unlock(&init_mutex);
+	PTHREAD_MUTEX_UNLOCK(&init_mutex);
 
 	_starpu_display_msi_stats();
 	_starpu_display_alloc_cache_stats();
@@ -500,11 +501,11 @@ void starpu_shutdown(void)
 
 	_starpu_close_debug_logfile();
 
-	pthread_mutex_lock(&init_mutex);
+	PTHREAD_MUTEX_LOCK(&init_mutex);
 	initialized = UNINITIALIZED;
 	/* Let someone else that wants to initialize it again do it */
 	pthread_cond_signal(&init_cond);
-	pthread_mutex_unlock(&init_mutex);
+	PTHREAD_MUTEX_UNLOCK(&init_mutex);
 }
 
 unsigned starpu_get_worker_count(void)

+ 5 - 4
src/datawizard/copy_driver.c

@@ -16,6 +16,7 @@
 
 #include <pthread.h>
 #include <common/config.h>
+#include <common/utils.h>
 #include <core/policies/sched_policy.h>
 #include <datawizard/datastats.h>
 #include <common/fxt.h>
@@ -38,9 +39,9 @@ void _starpu_wake_all_blocked_workers_on_node(unsigned nodeid)
 		q  = descr->attached_queues_per_node[nodeid][q_id];
 
 		/* wake anybody waiting on that queue */
-		pthread_mutex_lock(&q->activity_mutex);
+		PTHREAD_MUTEX_LOCK(&q->activity_mutex);
 		pthread_cond_broadcast(&q->activity_cond);
-		pthread_mutex_unlock(&q->activity_mutex);
+		PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
 	}
 
 	pthread_rwlock_unlock(&descr->attached_queues_rwlock);
@@ -53,9 +54,9 @@ void starpu_wake_all_blocked_workers(void)
 	pthread_cond_t *sched_cond = &sched->sched_activity_cond;
 	pthread_mutex_t *sched_mutex = &sched->sched_activity_mutex;
 
-	pthread_mutex_lock(sched_mutex);
+	PTHREAD_MUTEX_LOCK(sched_mutex);
 	pthread_cond_broadcast(sched_cond);
-	pthread_mutex_unlock(sched_mutex);
+	PTHREAD_MUTEX_UNLOCK(sched_mutex);
 
 	/* workers may be blocked on the various queues' conditions */
 	unsigned node;

+ 16 - 26
src/datawizard/data_request.c

@@ -17,6 +17,7 @@
 #include <common/config.h>
 #include <datawizard/data_request.h>
 #include <pthread.h>
+#include <common/utils.h>
 
 /* requests that have not been treated at all */
 static starpu_data_request_list_t data_requests[STARPU_MAXNODES];
@@ -180,7 +181,6 @@ int _starpu_wait_data_request_completion(starpu_data_request_t r, unsigned may_a
 /* this is non blocking */
 void _starpu_post_data_request(starpu_data_request_t r, uint32_t handling_node)
 {
-	int res;
 //	fprintf(stderr, "POST REQUEST\n");
 
 	if (r->read)
@@ -190,13 +190,11 @@ void _starpu_post_data_request(starpu_data_request_t r, uint32_t handling_node)
 	}
 
 	/* insert the request in the proper list */
-	res = pthread_mutex_lock(&data_requests_list_mutex[handling_node]);
-	STARPU_ASSERT(!res);
+	PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node]);
 
 	starpu_data_request_list_push_front(data_requests[handling_node], r);
 
-	res = pthread_mutex_unlock(&data_requests_list_mutex[handling_node]);
-	STARPU_ASSERT(!res);
+	PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node]);
 
 	_starpu_wake_all_blocked_workers_on_node(handling_node);
 }
@@ -278,9 +276,9 @@ static int starpu_handle_data_request(starpu_data_request_t r, unsigned may_allo
 		_starpu_spin_unlock(&handle->header_lock);
 
 		/* the request is pending and we put it in the corresponding queue  */
-		pthread_mutex_lock(&data_requests_pending_list_mutex[r->handling_node]);
+		PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[r->handling_node]);
 		starpu_data_request_list_push_front(data_requests_pending[r->handling_node], r);
-		pthread_mutex_unlock(&data_requests_pending_list_mutex[r->handling_node]);
+		PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[r->handling_node]);
 
 		return EAGAIN;
 	}
@@ -293,45 +291,40 @@ static int starpu_handle_data_request(starpu_data_request_t r, unsigned may_allo
 
 void _starpu_handle_node_data_requests(uint32_t src_node, unsigned may_alloc)
 {
-	int res;
-
 	/* for all entries of the list */
 	starpu_data_request_t r;
 
 	/* take all the entries from the request list */
-	res = pthread_mutex_lock(&data_requests_list_mutex[src_node]);
-	STARPU_ASSERT(!res);
+        PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
 
 	starpu_data_request_list_t local_list = data_requests[src_node];
 
 	if (starpu_data_request_list_empty(local_list))
 	{
 		/* there is no request */
-		res = pthread_mutex_unlock(&data_requests_list_mutex[src_node]);
-		STARPU_ASSERT(!res);
+                PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
 
 		return;
 	}
 
 	data_requests[src_node] = starpu_data_request_list_new();
 
-	res = pthread_mutex_unlock(&data_requests_list_mutex[src_node]);
-	STARPU_ASSERT(!res);
+	PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
 
 	while (!starpu_data_request_list_empty(local_list))
 	{
+                int res;
+
 		r = starpu_data_request_list_pop_back(local_list);
 
 		res = starpu_handle_data_request(r, may_alloc);
 		if (res == ENOMEM)
 		{
-			res = pthread_mutex_lock(&data_requests_list_mutex[src_node]);
-			STARPU_ASSERT(!res);
+                        PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
 
 			starpu_data_request_list_push_front(data_requests[src_node], r);
 
-			res = pthread_mutex_unlock(&data_requests_list_mutex[src_node]);
-			STARPU_ASSERT(!res);
+			PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
 		}
 
 		/* wake the requesting worker up */
@@ -344,18 +337,15 @@ void _starpu_handle_node_data_requests(uint32_t src_node, unsigned may_alloc)
 
 static void _handle_pending_node_data_requests(uint32_t src_node, unsigned force)
 {
-	int res;
 //	fprintf(stderr, "_starpu_handle_pending_node_data_requests ...\n");
 
-	res = pthread_mutex_lock(&data_requests_pending_list_mutex[src_node]);
-	STARPU_ASSERT(!res);
+	PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[src_node]);
 
 	/* for all entries of the list */
 	starpu_data_request_list_t local_list = data_requests_pending[src_node];
 	data_requests_pending[src_node] = starpu_data_request_list_new();
 
-	res = pthread_mutex_unlock(&data_requests_pending_list_mutex[src_node]);
-	STARPU_ASSERT(!res);
+	PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
 
 	while (!starpu_data_request_list_empty(local_list))
 	{
@@ -385,9 +375,9 @@ static void _handle_pending_node_data_requests(uint32_t src_node, unsigned force
 				_starpu_spin_unlock(&handle->header_lock);
 
 				/* wake the requesting worker up */
-				pthread_mutex_lock(&data_requests_pending_list_mutex[src_node]);
+				PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[src_node]);
 				starpu_data_request_list_push_front(data_requests_pending[src_node], r);
-				pthread_mutex_unlock(&data_requests_pending_list_mutex[src_node]);
+				PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
 			}
 		}
 	}

+ 9 - 8
src/datawizard/user_interactions.c

@@ -15,6 +15,7 @@
  */
 
 #include <common/config.h>
+#include <common/utils.h>
 #include <datawizard/coherency.h>
 #include <datawizard/copy_driver.h>
 #include <datawizard/write_back.h>
@@ -76,10 +77,10 @@ static inline void _starpu_sync_data_with_mem_continuation(void *arg)
 	}
 	else {
 		/* continuation of starpu_sync_data_with_mem */
-		pthread_mutex_lock(&statenode->lock);
+		PTHREAD_MUTEX_LOCK(&statenode->lock);
 		statenode->finished = 1;
 		pthread_cond_signal(&statenode->cond);
-		pthread_mutex_unlock(&statenode->lock);
+		PTHREAD_MUTEX_UNLOCK(&statenode->lock);
 	}
 }
 
@@ -113,10 +114,10 @@ int starpu_sync_data_with_mem(starpu_data_handle handle, starpu_access_mode mode
 		_starpu_sync_data_with_mem_continuation(&statenode);
 	}
 	else {
-		pthread_mutex_lock(&statenode.lock);
+		PTHREAD_MUTEX_LOCK(&statenode.lock);
 		if (!statenode.finished)
 			pthread_cond_wait(&statenode.cond, &statenode.lock);
-		pthread_mutex_unlock(&statenode.lock);
+		PTHREAD_MUTEX_UNLOCK(&statenode.lock);
 	}
 
 	return 0;
@@ -171,10 +172,10 @@ static void _prefetch_data_on_node(void *arg)
 
 	_starpu_fetch_data_on_node(statenode->state, statenode->node, 1, 0, statenode->async);
 
-	pthread_mutex_lock(&statenode->lock);
+	PTHREAD_MUTEX_LOCK(&statenode->lock);
 	statenode->finished = 1;
 	pthread_cond_signal(&statenode->cond);
-	pthread_mutex_unlock(&statenode->lock);
+	PTHREAD_MUTEX_UNLOCK(&statenode->lock);
 
 	if (!statenode->async)
 	{
@@ -219,10 +220,10 @@ int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle handle, unsigned
 		}
 	}
 	else {
-		pthread_mutex_lock(&statenode.lock);
+		PTHREAD_MUTEX_LOCK(&statenode.lock);
 		if (!statenode.finished)
 			pthread_cond_wait(&statenode.cond, &statenode.lock);
-		pthread_mutex_unlock(&statenode.lock);
+		PTHREAD_MUTEX_UNLOCK(&statenode.lock);
 	}
 
 	return 0;

+ 3 - 2
src/drivers/cpu/driver_cpu.c

@@ -16,6 +16,7 @@
 
 #include <math.h>
 
+#include <common/utils.h>
 #include <core/debug.h>
 #include "driver_cpu.h"
 #include <core/policies/sched_policy.h>
@@ -131,10 +132,10 @@ void *_starpu_cpu_worker(void *arg)
 	STARPU_TRACE_WORKER_INIT_END
 
         /* tell the main thread that we are ready */
-	pthread_mutex_lock(&cpu_arg->mutex);
+	PTHREAD_MUTEX_LOCK(&cpu_arg->mutex);
 	cpu_arg->worker_is_initialized = 1;
 	pthread_cond_signal(&cpu_arg->ready_cond);
-	pthread_mutex_unlock(&cpu_arg->mutex);
+	PTHREAD_MUTEX_UNLOCK(&cpu_arg->mutex);
 
         starpu_job_t j;
 	int res;

+ 3 - 2
src/drivers/cuda/driver_cuda.c

@@ -14,6 +14,7 @@
  * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  */
 
+#include <common/utils.h>
 #include <common/config.h>
 #include <core/debug.h>
 #include "driver_cuda.h"
@@ -213,10 +214,10 @@ void *_starpu_cuda_worker(void *arg)
 	STARPU_TRACE_WORKER_INIT_END
 
 	/* tell the main thread that this one is ready */
-	pthread_mutex_lock(&args->mutex);
+	PTHREAD_MUTEX_LOCK(&args->mutex);
 	args->worker_is_initialized = 1;
 	pthread_cond_signal(&args->ready_cond);
-	pthread_mutex_unlock(&args->mutex);
+	PTHREAD_MUTEX_UNLOCK(&args->mutex);
 
 	struct starpu_job_s * j;
 	int res;

+ 7 - 6
src/drivers/gordon/driver_gordon.c

@@ -20,6 +20,7 @@
 #include <sched.h>
 #include <pthread.h>
 #include <semaphore.h>
+#include <common/utils.h>
 #include "driver_gordon.h"
 #include "gordon_interface.h"
 #include <core/policies/sched_policy.h>
@@ -54,10 +55,10 @@ void *gordon_worker_progress(void *arg)
 		(gordon_set_arg->workers[0].bindid + 1)%(gordon_set_arg->config->nhwcores);
 	_starpu_bind_thread_on_cpu(gordon_set_arg->config, prog_thread_bind_id);
 
-	pthread_mutex_lock(&progress_mutex);
+	PTHREAD_MUTEX_LOCK(&progress_mutex);
 	progress_thread_is_inited = 1;
 	pthread_cond_signal(&progress_cond);
-	pthread_mutex_unlock(&progress_mutex);
+	PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 
 	while (1) {
 		/* the Gordon runtime needs to make sure that we poll it 
@@ -443,18 +444,18 @@ void *_starpu_gordon_worker(void *arg)
 	pthread_create(&progress_thread, NULL, gordon_worker_progress, gordon_set_arg);
 
 	/* wait for the progression thread to be ready */
-	pthread_mutex_lock(&progress_mutex);
+	PTHREAD_MUTEX_LOCK(&progress_mutex);
 	if (!progress_thread_is_inited)
 		pthread_cond_wait(&progress_cond, &progress_mutex);
-	pthread_mutex_unlock(&progress_mutex);
+	PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 
 	fprintf(stderr, "progress thread is running ... \n");
 	
 	/* tell the core that gordon is ready */
-	pthread_mutex_lock(&gordon_set_arg->mutex);
+	PTHREAD_MUTEX_LOCK(&gordon_set_arg->mutex);
 	gordon_set_arg->set_is_initialized = 1;
 	pthread_cond_signal(&gordon_set_arg->ready_cond);
-	pthread_mutex_unlock(&gordon_set_arg->mutex);
+	PTHREAD_MUTEX_UNLOCK(&gordon_set_arg->mutex);
 
 	gordon_worker_inject(gordon_set_arg);