|
@@ -64,6 +64,8 @@
|
|
|
|
|
|
enum initialization { UNINITIALIZED = 0, CHANGING, INITIALIZED };
|
|
|
|
|
|
+struct _starpu_ctx_change_list;
|
|
|
+
|
|
|
/* This is initialized from in _starpu_worker_init */
|
|
|
LIST_TYPE(_starpu_worker,
|
|
|
struct _starpu_machine_config *config;
|
|
@@ -82,8 +84,37 @@ LIST_TYPE(_starpu_worker,
|
|
|
starpu_pthread_cond_t started_cond; /* indicate when the worker is ready */
|
|
|
starpu_pthread_cond_t ready_cond; /* indicate when the worker is ready */
|
|
|
unsigned memory_node; /* which memory node is the worker associated with ? */
|
|
|
- starpu_pthread_cond_t sched_cond; /* condition variable used when the worker waits for tasks. */
|
|
|
+ /* condition variable used for passive waiting operations on worker
|
|
|
+ * STARPU_PTHREAD_COND_BROADCAST must be used instead of STARPU_PTHREAD_COND_SIGNAL,
|
|
|
+ * since the condition is shared for multiple purpose */
|
|
|
+ starpu_pthread_cond_t sched_cond;
|
|
|
starpu_pthread_mutex_t sched_mutex; /* mutex protecting sched_cond */
|
|
|
+ unsigned state_relax_refcnt; /* mark scheduling sections where other workers can safely access the worker state */
|
|
|
+#ifdef STARPU_SPINLOCK_CHECK
|
|
|
+ const char *relax_on_file;
|
|
|
+ int relax_on_line;
|
|
|
+ const char *relax_on_func;
|
|
|
+ const char *relax_off_file;
|
|
|
+ int relax_off_line;
|
|
|
+ const char *relax_off_func;
|
|
|
+#endif
|
|
|
+ unsigned state_sched_op_pending; /* a task pop is ongoing even though sched_mutex may temporarily be unlocked */
|
|
|
+ unsigned state_changing_ctx_waiting; /* a thread is waiting for operations such as pop to complete before acquiring sched_mutex and modifying the worker ctx*/
|
|
|
+ unsigned state_changing_ctx_notice; /* the worker ctx is about to change or being changed, wait for flag to be cleared before starting new scheduling operations */
|
|
|
+ unsigned state_blocked_in_parallel; /* worker is currently blocked on a parallel section */
|
|
|
+ unsigned state_blocked_in_parallel_observed; /* the blocked state of the worker has been observed by another worker during a relaxed section */
|
|
|
+ unsigned state_block_in_parallel_req; /* a request for state transition from unblocked to blocked is pending */
|
|
|
+ unsigned state_block_in_parallel_ack; /* a block request has been honored */
|
|
|
+ unsigned state_unblock_in_parallel_req; /* a request for state transition from blocked to unblocked is pending */
|
|
|
+ unsigned state_unblock_in_parallel_ack; /* an unblock request has been honored */
|
|
|
+ /* cumulative blocking depth
|
|
|
+ * - =0 worker unblocked
|
|
|
+ * - >0 worker blocked
|
|
|
+ * - transition from 0 to 1 triggers a block_req
|
|
|
+ * - transition from 1 to 0 triggers a unblock_req
|
|
|
+ */
|
|
|
+ unsigned block_in_parallel_ref_count;
|
|
|
+ struct _starpu_ctx_change_list ctx_change_list;
|
|
|
struct starpu_task_list local_tasks; /* this queue contains tasks that have been explicitely submitted to that queue */
|
|
|
struct starpu_task **local_ordered_tasks; /* this queue contains tasks that have been explicitely submitted to that queue with an explicit order */
|
|
|
unsigned local_ordered_tasks_size; /* this records the size of local_ordered_tasks */
|
|
@@ -103,6 +134,7 @@ LIST_TYPE(_starpu_worker,
|
|
|
unsigned worker_is_running;
|
|
|
unsigned worker_is_initialized;
|
|
|
enum _starpu_worker_status status; /* what is the worker doing now ? (eg. CALLBACK) */
|
|
|
+ unsigned state_keep_awake; /* !0 if a task has been pushed to the worker and the task has not yet been seen by the worker, the worker should no go to sleep before processing this task*/
|
|
|
char name[64];
|
|
|
char short_name[10];
|
|
|
unsigned run_by_starpu; /* Is this run by StarPU or directly by the application ? */
|
|
@@ -137,12 +169,6 @@ LIST_TYPE(_starpu_worker,
|
|
|
/* indicate which priority of ctx is currently active: the values are 0 or 1*/
|
|
|
unsigned pop_ctx_priority;
|
|
|
|
|
|
- /* flag to know if sched_mutex is locked or not */
|
|
|
- unsigned sched_mutex_locked;
|
|
|
-
|
|
|
- /* bool to indicate if the worker is blocked in a ctx */
|
|
|
- unsigned blocked;
|
|
|
-
|
|
|
/* bool to indicate if the worker is slave in a ctx */
|
|
|
unsigned is_slave_somewhere;
|
|
|
|
|
@@ -508,7 +534,7 @@ static inline struct _starpu_worker *_starpu_get_worker_struct(unsigned id)
|
|
|
return &_starpu_config.workers[id];
|
|
|
}
|
|
|
|
|
|
-/* Returns the starpu_sched_ctx structure that descriebes the state of the
|
|
|
+/* Returns the starpu_sched_ctx structure that describes the state of the
|
|
|
* specified ctx */
|
|
|
static inline struct _starpu_sched_ctx *_starpu_get_sched_ctx_struct(unsigned id)
|
|
|
{
|
|
@@ -558,18 +584,6 @@ int starpu_worker_get_nids_by_type(enum starpu_worker_archtype type, int *worker
|
|
|
the list might not be updated */
|
|
|
int starpu_worker_get_nids_ctx_free_by_type(enum starpu_worker_archtype type, int *workerids, int maxsize);
|
|
|
|
|
|
-/* if the current worker has the lock release it */
|
|
|
-void _starpu_unlock_mutex_if_prev_locked();
|
|
|
-
|
|
|
-/* if we prev released the lock relock it */
|
|
|
-void _starpu_relock_mutex_if_prev_locked();
|
|
|
-
|
|
|
-static inline void _starpu_worker_set_flag_sched_mutex_locked(int workerid, unsigned flag)
|
|
|
-{
|
|
|
- struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
|
|
|
- w->sched_mutex_locked = flag;
|
|
|
-}
|
|
|
-
|
|
|
static inline unsigned _starpu_worker_mutex_is_sched_mutex(int workerid, starpu_pthread_mutex_t *mutex)
|
|
|
{
|
|
|
struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
|
|
@@ -584,6 +598,8 @@ static inline int _starpu_worker_get_nsched_ctxs(int workerid)
|
|
|
/* Get the total number of sched_ctxs created till now */
|
|
|
static inline unsigned _starpu_get_nsched_ctxs(void)
|
|
|
{
|
|
|
+ /* topology.nsched_ctxs may be increased asynchronously in sched_ctx_create */
|
|
|
+ STARPU_RMB();
|
|
|
return _starpu_config.topology.nsched_ctxs;
|
|
|
}
|
|
|
|
|
@@ -622,4 +638,420 @@ void _starpu_worker_set_stream_ctx(unsigned workerid, struct _starpu_sched_ctx *
|
|
|
|
|
|
struct _starpu_sched_ctx* _starpu_worker_get_ctx_stream(unsigned stream_workerid);
|
|
|
|
|
|
+/* Must be called with worker's sched_mutex held.
|
|
|
+ */
|
|
|
+static inline void _starpu_worker_request_blocking_in_parallel(struct _starpu_worker * const worker)
|
|
|
+{
|
|
|
+ /* flush pending requests to start on a fresh transaction epoch */
|
|
|
+ while (worker->state_unblock_in_parallel_req)
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
|
|
|
+
|
|
|
+ /* announce blocking intent */
|
|
|
+ STARPU_ASSERT(worker->block_in_parallel_ref_count < UINT_MAX);
|
|
|
+ worker->block_in_parallel_ref_count++;
|
|
|
+
|
|
|
+ if (worker->block_in_parallel_ref_count == 1)
|
|
|
+ {
|
|
|
+ /* only the transition from 0 to 1 triggers the block_in_parallel_req */
|
|
|
+
|
|
|
+ STARPU_ASSERT(!worker->state_blocked_in_parallel);
|
|
|
+ STARPU_ASSERT(!worker->state_block_in_parallel_req);
|
|
|
+ STARPU_ASSERT(!worker->state_block_in_parallel_ack);
|
|
|
+ STARPU_ASSERT(!worker->state_unblock_in_parallel_req);
|
|
|
+ STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
|
|
|
+
|
|
|
+ /* trigger the block_in_parallel_req */
|
|
|
+ worker->state_block_in_parallel_req = 1;
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
|
|
|
+
|
|
|
+ /* wait for block_in_parallel_req to be processed */
|
|
|
+ while (!worker->state_block_in_parallel_ack)
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
|
|
|
+
|
|
|
+ STARPU_ASSERT(worker->block_in_parallel_ref_count >= 1);
|
|
|
+ STARPU_ASSERT(worker->state_block_in_parallel_req);
|
|
|
+ STARPU_ASSERT(worker->state_blocked_in_parallel);
|
|
|
+
|
|
|
+ /* reset block_in_parallel_req state flags */
|
|
|
+ worker->state_block_in_parallel_req = 0;
|
|
|
+ worker->state_block_in_parallel_ack = 0;
|
|
|
+
|
|
|
+ /* broadcast block_in_parallel_req state flags reset */
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/* Must be called with worker's sched_mutex held.
|
|
|
+ */
|
|
|
+static inline void _starpu_worker_request_unblocking_in_parallel(struct _starpu_worker * const worker)
|
|
|
+{
|
|
|
+ /* flush pending requests to start on a fresh transaction epoch */
|
|
|
+ while (worker->state_block_in_parallel_req)
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
|
|
|
+
|
|
|
+ /* unblocking may be requested unconditionnally
|
|
|
+ * thus, check is unblocking is really needed */
|
|
|
+ if (worker->state_blocked_in_parallel)
|
|
|
+ {
|
|
|
+ if (worker->block_in_parallel_ref_count == 1)
|
|
|
+ {
|
|
|
+ /* only the transition from 1 to 0 triggers the unblock_in_parallel_req */
|
|
|
+
|
|
|
+ STARPU_ASSERT(!worker->state_block_in_parallel_req);
|
|
|
+ STARPU_ASSERT(!worker->state_block_in_parallel_ack);
|
|
|
+ STARPU_ASSERT(!worker->state_unblock_in_parallel_req);
|
|
|
+ STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
|
|
|
+
|
|
|
+ /* trigger the unblock_in_parallel_req */
|
|
|
+ worker->state_unblock_in_parallel_req = 1;
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
|
|
|
+
|
|
|
+ /* wait for the unblock_in_parallel_req to be processed */
|
|
|
+ while (!worker->state_unblock_in_parallel_ack)
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
|
|
|
+
|
|
|
+ STARPU_ASSERT(worker->state_unblock_in_parallel_req);
|
|
|
+ STARPU_ASSERT(!worker->state_blocked_in_parallel);
|
|
|
+
|
|
|
+ /* reset unblock_in_parallel_req state flags */
|
|
|
+ worker->state_unblock_in_parallel_req = 0;
|
|
|
+ worker->state_unblock_in_parallel_ack = 0;
|
|
|
+
|
|
|
+ /* broadcast unblock_in_parallel_req state flags reset */
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
|
|
|
+ }
|
|
|
+
|
|
|
+ /* announce unblocking complete */
|
|
|
+ STARPU_ASSERT(worker->block_in_parallel_ref_count > 0);
|
|
|
+ worker->block_in_parallel_ref_count--;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/* Must be called with worker's sched_mutex held.
|
|
|
+ */
|
|
|
+static inline void _starpu_worker_process_block_in_parallel_requests(struct _starpu_worker * const worker)
|
|
|
+{
|
|
|
+ while (worker->state_block_in_parallel_req)
|
|
|
+ {
|
|
|
+ STARPU_ASSERT(!worker->state_blocked_in_parallel);
|
|
|
+ STARPU_ASSERT(!worker->state_block_in_parallel_ack);
|
|
|
+ STARPU_ASSERT(!worker->state_unblock_in_parallel_req);
|
|
|
+ STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
|
|
|
+ STARPU_ASSERT(worker->block_in_parallel_ref_count > 0);
|
|
|
+
|
|
|
+ /* enter effective blocked state */
|
|
|
+ worker->state_blocked_in_parallel = 1;
|
|
|
+
|
|
|
+ /* notify block_in_parallel_req processing */
|
|
|
+ worker->state_block_in_parallel_ack = 1;
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
|
|
|
+
|
|
|
+ /* block */
|
|
|
+ while (!worker->state_unblock_in_parallel_req)
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
|
|
|
+
|
|
|
+ STARPU_ASSERT(worker->state_blocked_in_parallel);
|
|
|
+ STARPU_ASSERT(!worker->state_block_in_parallel_req);
|
|
|
+ STARPU_ASSERT(!worker->state_block_in_parallel_ack);
|
|
|
+ STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
|
|
|
+ STARPU_ASSERT(worker->block_in_parallel_ref_count > 0);
|
|
|
+
|
|
|
+ /* leave effective blocked state */
|
|
|
+ worker->state_blocked_in_parallel = 0;
|
|
|
+
|
|
|
+ /* notify unblock_in_parallel_req processing */
|
|
|
+ worker->state_unblock_in_parallel_ack = 1;
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/* Must be called with worker's sched_mutex held.
|
|
|
+ * Mark the beginning of a scheduling operation during which the sched_mutex
|
|
|
+ * lock may be temporarily released, but the scheduling context of the worker
|
|
|
+ * should not be modified */
|
|
|
+#ifdef STARPU_SPINLOCK_CHECK
|
|
|
+static inline void __starpu_worker_enter_sched_op(struct _starpu_worker * const worker, const char*file, int line, const char* func)
|
|
|
+#else
|
|
|
+static inline void _starpu_worker_enter_sched_op(struct _starpu_worker * const worker)
|
|
|
+#endif
|
|
|
+{
|
|
|
+ STARPU_ASSERT(!worker->state_sched_op_pending);
|
|
|
+ if (!worker->state_blocked_in_parallel_observed)
|
|
|
+ {
|
|
|
+ /* process pending block requests before entering a sched_op region */
|
|
|
+ _starpu_worker_process_block_in_parallel_requests(worker);
|
|
|
+ while (worker->state_changing_ctx_notice)
|
|
|
+ {
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
|
|
|
+
|
|
|
+ /* new block requests may have been triggered during the wait,
|
|
|
+ * need to check again */
|
|
|
+ _starpu_worker_process_block_in_parallel_requests(worker);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ /* if someone observed the worker state since the last call, postpone block request
|
|
|
+ * processing for one sched_op turn more, because the observer will not have seen
|
|
|
+ * new block requests between its observation and now.
|
|
|
+ *
|
|
|
+ * however, the worker still has to wait for context change operations to complete
|
|
|
+ * before entering sched_op again*/
|
|
|
+ while (worker->state_changing_ctx_notice)
|
|
|
+ {
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /* no block request and no ctx change ahead,
|
|
|
+ * enter sched_op */
|
|
|
+ worker->state_sched_op_pending = 1;
|
|
|
+ worker->state_blocked_in_parallel_observed = 0;
|
|
|
+ worker->state_relax_refcnt = 0;
|
|
|
+#ifdef STARPU_SPINLOCK_CHECK
|
|
|
+ worker->relax_on_file = file;
|
|
|
+ worker->relax_on_line = line;
|
|
|
+ worker->relax_on_func = func;
|
|
|
+#endif
|
|
|
+}
|
|
|
+#ifdef STARPU_SPINLOCK_CHECK
|
|
|
+#define _starpu_worker_enter_sched_op(worker) __starpu_worker_enter_sched_op((worker), __FILE__, __LINE__, __starpu_func__)
|
|
|
+#endif
|
|
|
+
|
|
|
+/* Must be called with worker's sched_mutex held.
|
|
|
+ * Mark the end of a scheduling operation, and notify potential waiters that
|
|
|
+ * scheduling context changes can safely be performed again.
|
|
|
+ */
|
|
|
+void _starpu_worker_apply_deferred_ctx_changes(void);
|
|
|
+#ifdef STARPU_SPINLOCK_CHECK
|
|
|
+static inline void __starpu_worker_leave_sched_op(struct _starpu_worker * const worker, const char*file, int line, const char* func)
|
|
|
+#else
|
|
|
+static inline void _starpu_worker_leave_sched_op(struct _starpu_worker * const worker)
|
|
|
+#endif
|
|
|
+{
|
|
|
+ STARPU_ASSERT(worker->state_sched_op_pending);
|
|
|
+ worker->state_relax_refcnt = 1;
|
|
|
+#ifdef STARPU_SPINLOCK_CHECK
|
|
|
+ worker->relax_off_file = file;
|
|
|
+ worker->relax_off_line = line;
|
|
|
+ worker->relax_off_func = func;
|
|
|
+#endif
|
|
|
+ worker->state_sched_op_pending = 0;
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
|
|
|
+ _starpu_worker_apply_deferred_ctx_changes();
|
|
|
+}
|
|
|
+#ifdef STARPU_SPINLOCK_CHECK
|
|
|
+#define _starpu_worker_leave_sched_op(worker) __starpu_worker_leave_sched_op((worker), __FILE__, __LINE__, __starpu_func__)
|
|
|
+#endif
|
|
|
+
|
|
|
+static inline int _starpu_worker_sched_op_pending(void)
|
|
|
+{
|
|
|
+ int workerid = starpu_worker_get_id();
|
|
|
+ if (workerid == -1)
|
|
|
+ return 0;
|
|
|
+ struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
|
|
|
+ STARPU_ASSERT(worker != NULL);
|
|
|
+ return worker->state_sched_op_pending;
|
|
|
+}
|
|
|
+
|
|
|
+/* Must be called with worker's sched_mutex held.
|
|
|
+ */
|
|
|
+static inline void _starpu_worker_enter_changing_ctx_op(struct _starpu_worker * const worker)
|
|
|
+{
|
|
|
+ /* flush pending requests to start on a fresh transaction epoch */
|
|
|
+ while (worker->state_changing_ctx_notice)
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
|
|
|
+
|
|
|
+ /* announce changing_ctx intent
|
|
|
+ *
|
|
|
+ * - an already started sched_op is allowed to complete
|
|
|
+ * - no new sched_op may be started
|
|
|
+ */
|
|
|
+ worker->state_changing_ctx_notice = 1;
|
|
|
+
|
|
|
+ /* allow for an already started sched_op to complete */
|
|
|
+ if (worker->state_sched_op_pending)
|
|
|
+ {
|
|
|
+ /* request sched_op to broadcast when way is cleared */
|
|
|
+ worker->state_changing_ctx_waiting = 1;
|
|
|
+
|
|
|
+ /* wait for sched_op completion */
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
|
|
|
+ do
|
|
|
+ {
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
|
|
|
+ }
|
|
|
+ while (worker->state_sched_op_pending);
|
|
|
+
|
|
|
+ /* reset flag so other sched_ops wont have to broadcast state */
|
|
|
+ worker->state_changing_ctx_waiting = 0;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/* Must be called with worker's sched_mutex held.
|
|
|
+ */
|
|
|
+static inline void _starpu_worker_leave_changing_ctx_op(struct _starpu_worker * const worker)
|
|
|
+{
|
|
|
+ worker->state_changing_ctx_notice = 0;
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
|
|
|
+}
|
|
|
+
|
|
|
+/* Temporarily allow other worker to access current worker state, when still scheduling,
|
|
|
+ * but the scheduling has not yet been made or is already done */
|
|
|
+#ifdef STARPU_SPINLOCK_CHECK
|
|
|
+static inline void __starpu_worker_relax_on(const char*file, int line, const char* func)
|
|
|
+#else
|
|
|
+static inline void _starpu_worker_relax_on(void)
|
|
|
+#endif
|
|
|
+{
|
|
|
+ int workerid = starpu_worker_get_id();
|
|
|
+ if (workerid == -1)
|
|
|
+ return;
|
|
|
+ struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
|
|
|
+ STARPU_ASSERT(worker != NULL);
|
|
|
+ if (!worker->state_sched_op_pending)
|
|
|
+ return;
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
|
|
|
+#ifdef STARPU_SPINLOCK_CHECK
|
|
|
+ STARPU_ASSERT_MSG(worker->state_relax_refcnt<UINT_MAX, "relax last turn on in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
|
|
|
+#else
|
|
|
+ STARPU_ASSERT(worker->state_relax_refcnt<UINT_MAX);
|
|
|
+#endif
|
|
|
+ worker->state_relax_refcnt++;
|
|
|
+#ifdef STARPU_SPINLOCK_CHECK
|
|
|
+ worker->relax_on_file = file;
|
|
|
+ worker->relax_on_line = line;
|
|
|
+ worker->relax_on_func = func;
|
|
|
+#endif
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
|
|
|
+}
|
|
|
+#ifdef STARPU_SPINLOCK_CHECK
|
|
|
+#define _starpu_worker_relax_on() __starpu_worker_relax_on(__FILE__, __LINE__, __starpu_func__)
|
|
|
+#endif
|
|
|
+
|
|
|
+#ifdef STARPU_SPINLOCK_CHECK
|
|
|
+static inline void __starpu_worker_relax_off(const char*file, int line, const char* func)
|
|
|
+#else
|
|
|
+static inline void _starpu_worker_relax_off(void)
|
|
|
+#endif
|
|
|
+{
|
|
|
+ int workerid = starpu_worker_get_id();
|
|
|
+ if (workerid == -1)
|
|
|
+ return;
|
|
|
+ struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
|
|
|
+ STARPU_ASSERT(worker != NULL);
|
|
|
+ if (!worker->state_sched_op_pending)
|
|
|
+ return;
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
|
|
|
+#ifdef STARPU_SPINLOCK_CHECK
|
|
|
+ STARPU_ASSERT_MSG(worker->state_relax_refcnt>0, "relax last turn off in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
|
|
|
+#else
|
|
|
+ STARPU_ASSERT(worker->state_relax_refcnt>0);
|
|
|
+#endif
|
|
|
+ worker->state_relax_refcnt--;
|
|
|
+#ifdef STARPU_SPINLOCK_CHECK
|
|
|
+ worker->relax_off_file = file;
|
|
|
+ worker->relax_off_line = line;
|
|
|
+ worker->relax_off_func = func;
|
|
|
+#endif
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
|
|
|
+}
|
|
|
+#ifdef STARPU_SPINLOCK_CHECK
|
|
|
+#define _starpu_worker_relax_off() __starpu_worker_relax_off(__FILE__, __LINE__, __starpu_func__)
|
|
|
+#endif
|
|
|
+
|
|
|
+static inline int _starpu_worker_get_relax_state(void)
|
|
|
+{
|
|
|
+ int workerid = starpu_worker_get_id();
|
|
|
+ if (workerid < 0)
|
|
|
+ return 1;
|
|
|
+ struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
|
|
|
+ STARPU_ASSERT(worker != NULL);
|
|
|
+ return worker->state_relax_refcnt != 0;
|
|
|
+}
|
|
|
+
|
|
|
+/* lock a worker for observing contents
|
|
|
+ *
|
|
|
+ * notes:
|
|
|
+ * - if the observed worker is not in state_relax_refcnt, the function block until the state is reached */
|
|
|
+static inline void _starpu_worker_lock(int workerid)
|
|
|
+{
|
|
|
+ struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
|
|
|
+ STARPU_ASSERT(worker != NULL);
|
|
|
+ int cur_workerid = starpu_worker_get_id();
|
|
|
+ if (workerid != cur_workerid)
|
|
|
+ {
|
|
|
+ _starpu_worker_relax_on();
|
|
|
+
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
|
|
|
+ while (!worker->state_relax_refcnt)
|
|
|
+ {
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+static inline int _starpu_worker_trylock(int workerid)
|
|
|
+{
|
|
|
+ struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
|
|
|
+ STARPU_ASSERT(worker != NULL);
|
|
|
+ int ret = STARPU_PTHREAD_MUTEX_TRYLOCK_SCHED(&worker->sched_mutex);
|
|
|
+ if (!ret)
|
|
|
+ {
|
|
|
+ int cur_workerid = starpu_worker_get_id();
|
|
|
+ if (workerid != cur_workerid) {
|
|
|
+ ret = !worker->state_relax_refcnt;
|
|
|
+ if (ret)
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!ret)
|
|
|
+ {
|
|
|
+ _starpu_worker_relax_on();
|
|
|
+ }
|
|
|
+ return ret;
|
|
|
+}
|
|
|
+
|
|
|
+static inline void _starpu_worker_unlock(int workerid)
|
|
|
+{
|
|
|
+ struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
|
|
|
+ STARPU_ASSERT(worker != NULL);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
|
|
|
+ int cur_workerid = starpu_worker_get_id();
|
|
|
+ if (workerid != cur_workerid)
|
|
|
+ {
|
|
|
+ _starpu_worker_relax_off();
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+static inline void _starpu_worker_lock_self(void)
|
|
|
+{
|
|
|
+ int workerid = starpu_worker_get_id_check();
|
|
|
+ struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
|
|
|
+ STARPU_ASSERT(worker != NULL);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
|
|
|
+}
|
|
|
+
|
|
|
+static inline void _starpu_worker_unlock_self(void)
|
|
|
+{
|
|
|
+ int workerid = starpu_worker_get_id_check();
|
|
|
+ struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
|
|
|
+ STARPU_ASSERT(worker != NULL);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
|
|
|
+}
|
|
|
+
|
|
|
+static inline int _starpu_wake_worker_relax(int workerid)
|
|
|
+{
|
|
|
+ _starpu_worker_lock(workerid);
|
|
|
+ int ret = starpu_wake_worker_locked(workerid);
|
|
|
+ _starpu_worker_unlock(workerid);
|
|
|
+ return ret;
|
|
|
+}
|
|
|
+
|
|
|
+void _starpu_worker_refuse_task(struct _starpu_worker *worker, struct starpu_task *task);
|
|
|
#endif // __WORKERS_H__
|