Переглянути джерело

Fix thread-safety of job creation

A task pointer may be passed between threads by the application before
getting submitted, the job creation would then be triggered in parallel.
Samuel Thibault 4 роки тому
батько
коміт
d13fbbf55d
4 змінених файлів з 128 додано та 4 видалено
  1. 31 0
      src/core/jobs.c
  2. 9 4
      src/core/task.h
  3. 1 0
      tests/Makefile.am
  4. 87 0
      tests/main/job.c

+ 31 - 0
src/core/jobs.c

@@ -125,6 +125,37 @@ struct _starpu_job* STARPU_ATTRIBUTE_MALLOC _starpu_job_create(struct starpu_tas
 	return job;
 }
 
+struct _starpu_job* _starpu_get_job_associated_to_task_slow(struct starpu_task *task, struct _starpu_job *job)
+{
+	if (job == _STARPU_JOB_UNSET)
+	{
+		job = STARPU_VAL_COMPARE_AND_SWAP_PTR(&task->starpu_private, _STARPU_JOB_UNSET, _STARPU_JOB_SETTING);
+		if (job != _STARPU_JOB_UNSET && job != _STARPU_JOB_SETTING)
+		{
+			/* Actually available in the meanwhile */
+			STARPU_RMB();
+			return job;
+		}
+
+		if (job == _STARPU_JOB_UNSET)
+		{
+			/* Ok, we have to do it */
+			job = _starpu_job_create(task);
+			STARPU_WMB();
+			task->starpu_private = job;
+			return job;
+		}
+	}
+
+	/* Saw _STARPU_JOB_SETTING, somebody is doing it, wait for it.
+	 * This is rare enough that busy-reading is fine enough. */
+	while ((job = task->starpu_private) == _STARPU_JOB_SETTING)
+		STARPU_SYNCHRONIZE();
+
+	STARPU_RMB();
+	return job;
+}
+
 void _starpu_job_destroy(struct _starpu_job *j)
 {
 	/* Wait for any code that was still working on the job (and was

+ 9 - 4
src/core/task.h

@@ -45,20 +45,25 @@ int _starpu_submit_job(struct _starpu_job *j, int nodeps);
 
 void _starpu_task_declare_deps_array(struct starpu_task *task, unsigned ndeps, struct starpu_task *task_array[], int check);
 
+#define _STARPU_JOB_UNSET ((struct _starpu_job *) NULL)
+#define _STARPU_JOB_SETTING ((struct _starpu_job *) 1)
+
 /** Returns the job structure (which is the internal data structure associated
  * to a task). */
+struct _starpu_job *_starpu_get_job_associated_to_task_slow(struct starpu_task *task, struct _starpu_job *job);
 static inline struct _starpu_job *_starpu_get_job_associated_to_task(struct starpu_task *task)
 {
 	STARPU_ASSERT(task);
 	struct _starpu_job *job = (struct _starpu_job *) task->starpu_private;
 
-	if (STARPU_UNLIKELY(!job))
+	if (STARPU_LIKELY(job != _STARPU_JOB_UNSET && job != _STARPU_JOB_SETTING))
 	{
-		job = _starpu_job_create(task);
-		task->starpu_private = job;
+		/* Already available */
+		STARPU_RMB();
+		return job;
 	}
 
-	return job;
+	return _starpu_get_job_associated_to_task_slow(task, job);
 }
 
 /** Submits starpu internal tasks to the initial context */

+ 1 - 0
tests/Makefile.am

@@ -157,6 +157,7 @@ myPROGRAMS +=					\
 	main/execute_schedule			\
 	main/insert_task_pack			\
 	main/insert_task_nullcodelet		\
+	main/job				\
 	main/multithreaded_init			\
 	main/empty_task				\
 	main/empty_task_chain			\

+ 87 - 0
tests/main/job.c

@@ -0,0 +1,87 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010-2021  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <stdio.h>
+#include <unistd.h>
+#include <starpu.h>
+#include "../helper.h"
+
+/*
+ * Test that job creation is threadsafe
+ */
+
+#define N 1000
+
+static struct starpu_task *tasks[N];
+
+void dummy_func(void *arg)
+{
+	unsigned worker, i;
+	(void) arg;
+
+	if (starpu_worker_get_id_check() == 0)
+		/* One worker creates the tasks */
+		for (i = 0; i < N; i++)
+		{
+			struct starpu_task *task = starpu_task_create();
+			task->destroy = 0;
+			STARPU_WMB();
+			tasks[i] = task;
+		}
+	else
+		/* While others eagerly wait for it before trying to get their id */
+		for (i = 0; i < N; i++)
+		{
+			struct starpu_task *task;
+			while (!(task = tasks[i]))
+				STARPU_SYNCHRONIZE();
+			STARPU_RMB();
+			starpu_task_get_job_id(task);
+		}
+}
+
+int main(void)
+{
+	int ret;
+	unsigned i;
+
+	ret = starpu_init(NULL);
+	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	starpu_execute_on_each_worker(dummy_func, NULL, STARPU_CPU);
+
+	for (i = 0; i < N; i++)
+	{
+		starpu_task_destroy(tasks[i]);
+	}
+
+	struct starpu_task *task = starpu_task_create();
+	unsigned long id;
+	task->destroy = 0;
+	id = starpu_task_get_job_id(task);
+	starpu_task_destroy(task);
+
+	FPRINTF(stderr, "jobid %lu for %u tasks and %u workers\n",
+			id, N, starpu_worker_get_count());
+
+	/* We are not supposed to have created more than one jobid for each
+	 * worker (for execute_on_each) and for each of the N user tasks. */
+	ret = id > starpu_worker_get_count() + N + 1;
+
+	starpu_shutdown();
+	return ret;
+}