Sfoglia il codice sorgente

Fix a bad interaction between tag deps and task deps: task dep wake-up should not retest tags since we clear them automatically. Add a thorough dependency testsuite which catches that. This also changes the locking convention for _starpu_enforce_deps_starting_from_task

Samuel Thibault 12 anni fa
parent
commit
f62266afa8

+ 5 - 5
src/core/dependencies/cg.c

@@ -157,12 +157,12 @@ void _starpu_notify_cg(struct _starpu_cg *cg)
 				/* Need to atomically test submitted and check
 				 * dependencies, since this is concurrent with
 				 * _starpu_submit_job */
-				if (j->submitted && job_successors->ndeps == ndeps_completed)
+				if (j->submitted && job_successors->ndeps == ndeps_completed &&
+					j->task->status == STARPU_TASK_BLOCKED_ON_TASK)
 				{
-					/* Note that this also ensures that tag deps are
-					 * fulfilled. This counter is reseted only when the
-					 * dependencies are are all fulfilled) */
-					_starpu_enforce_deps_and_schedule(j);
+					/* That task has already passed tag checks,
+					 * do not do them again since the tag has been cleared! */
+					_starpu_enforce_deps_starting_from_task(j);
 				} else
 					_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 

+ 1 - 0
src/core/dependencies/tags.c

@@ -209,6 +209,7 @@ void _starpu_tag_set_ready(struct _starpu_tag *tag)
 	_starpu_spin_unlock(&tag->lock);
 
 	/* enforce data dependencies */
+	_STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 	_starpu_enforce_deps_starting_from_task(j);
 
 	_starpu_spin_lock(&tag->lock);

+ 0 - 1
src/core/jobs.c

@@ -364,7 +364,6 @@ unsigned _starpu_enforce_deps_starting_from_task(struct _starpu_job *j)
 {
 	unsigned ret;
 
-	_STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 	/* enfore task dependencies */
 	if (_starpu_not_all_task_deps_are_fulfilled(j))
 	{

+ 1 - 1
src/core/jobs.h

@@ -148,7 +148,7 @@ void _starpu_wait_job(struct _starpu_job *j);
 /* Specify that the task should not appear in the DAG generated by debug tools. */
 void _starpu_exclude_task_from_dag(struct starpu_task *task);
 
-/* try to submit job j, enqueue it if it's not schedulable yet */
+/* try to submit job j, enqueue it if it's not schedulable yet. The job's sync mutex is supposed to be held already */
 unsigned _starpu_enforce_deps_and_schedule(struct _starpu_job *j);
 unsigned _starpu_enforce_deps_starting_from_task(struct _starpu_job *j);
 

+ 1 - 0
tests/Makefile.am

@@ -154,6 +154,7 @@ noinst_PROGRAMS =				\
 	main/declare_deps_in_callback		\
 	main/declare_deps_after_submission	\
 	main/declare_deps_after_submission_synchronous	\
+	main/tag_task_data_deps			\
 	main/get_current_task			\
 	main/starpu_init			\
 	main/starpu_worker_exists		\

+ 250 - 0
tests/main/tag_task_data_deps.c

@@ -0,0 +1,250 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010-2012  Université de Bordeaux 1
+ * Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+/*
+ * This tests combinations of various tag/task/data dependencies
+ */
+
+#include <pthread.h>
+#include <stdio.h>
+#include <unistd.h>
+
+#include <starpu.h>
+#include "../helper.h"
+
+#define NLOOPS	(128*1024)
+
+static void dummy_func(void *descr[] __attribute__ ((unused)), void *arg)
+{
+	unsigned duration = (uintptr_t) arg;
+	if (duration)
+		usleep(duration);
+}
+
+static struct starpu_codelet dummy_Rcodelet = 
+{
+	.cpu_funcs = {dummy_func, NULL},
+	.model = NULL,
+	.nbuffers = 1,
+	.modes = {STARPU_R}
+};
+
+static struct starpu_codelet dummy_Wcodelet = 
+{
+	.cpu_funcs = {dummy_func, NULL},
+	.model = NULL,
+	.nbuffers = 1,
+	.modes = {STARPU_W}
+};
+
+static struct starpu_codelet dummy_codelet = 
+{
+	.cpu_funcs = {dummy_func, NULL},
+	.model = NULL,
+	.nbuffers = 0,
+};
+
+static struct starpu_task *create_dummy_task(int write, int data, unsigned duration, starpu_data_handle_t handle)
+{
+	struct starpu_task *task = starpu_task_create();
+
+	if (data)
+	{
+		if (write)
+			task->cl = &dummy_Wcodelet;
+		else
+			task->cl = &dummy_Rcodelet;
+		task->handles[0] = handle;
+	} else
+		task->cl = &dummy_codelet;
+	task->cl_arg = (void*) (uintptr_t) duration;
+
+	return task;
+}
+
+int main(int argc, char **argv)
+{
+	int ret;
+	unsigned loop, nloops = NLOOPS;
+	unsigned duration = 1000;
+
+	starpu_data_handle_t handle1, handle2;
+
+#ifdef STARPU_SLOW_MACHINE
+	duration = 0;
+#endif
+
+	ret = starpu_init(NULL);
+	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	starpu_void_data_register(&handle1);
+	starpu_void_data_register(&handle2);
+	starpu_data_set_sequential_consistency_flag(handle2, 0);
+
+	struct starpu_task *taskA, *taskB, *taskC;
+
+#if 1
+	for (loop = 0; loop < nloops; loop++)
+	{
+#else
+	loop = 0x258;
+	do {
+#endif
+		int durationA = loop & 1 ? duration:0;
+		int durationB = loop & 2 ? duration:0;
+		int durationC = loop & 4 ? duration:0;
+		int writeA, dataA;
+		int writeB, dataB;
+		int writeC, dataC;
+		starpu_data_handle_t handleA, handleB, handleC;
+
+		handleA = handle1;
+		writeA = !!(loop & 8);
+		dataA = !!(loop & 16);
+		if (!dataA && writeA) {
+			handleA = handle2;
+			dataA = 1;
+		}
+		handleB = handle1;
+		writeB = !!(loop & 32);
+		dataB = !!(loop & 64);
+		if (!dataB && writeB) {
+			handleB = handle2;
+			dataB = 1;
+		}
+		handleC = handle1;
+		writeC = !!(loop & 128);
+		dataC = !!(loop & 256);
+		if (!dataC && writeC) {
+			handleC = handle2;
+			dataC = 1;
+		}
+
+		FPRINTF(stderr,"\r%u", loop);
+#if 0
+		if (durationA)
+			FPRINTF(stderr, " longA ");
+		if (durationB)
+			FPRINTF(stderr, " longB ");
+		if (durationC)
+			FPRINTF(stderr, " longC ");
+		if (dataA) {
+			if (writeA)
+				FPRINTF(stderr, " WA");
+			else
+				FPRINTF(stderr, " RA");
+		} else if (writeA)
+			FPRINTF(stderr, " wA");
+		if (dataB) {
+			if (writeB)
+				FPRINTF(stderr, " WB");
+			else
+				FPRINTF(stderr, " RB");
+		} else if (writeB)
+			FPRINTF(stderr, " wB");
+		if (dataC) {
+			if (writeC)
+				FPRINTF(stderr, " WC");
+			else
+				FPRINTF(stderr, " RC");
+		} else if (writeC)
+			FPRINTF(stderr, " wC");
+		if (loop & 512)
+			FPRINTF(stderr, " Tag AB");
+		if (loop & 1024)
+			FPRINTF(stderr, " Tag AC");
+		if (loop & 2048)
+			FPRINTF(stderr, " Tag BC");
+		if (loop & 4096)
+			FPRINTF(stderr, " Task AB");
+		if (loop & 8192)
+			FPRINTF(stderr, " Task AC");
+		if (loop & 16384)
+			FPRINTF(stderr, " Task BC");
+		if (loop & 32768)
+			FPRINTF(stderr, " delayB");
+		if (loop & 65536)
+			FPRINTF(stderr, " delayC");
+		FPRINTF(stderr,"                      ");
+#endif
+		fflush(stderr);
+
+		taskA = create_dummy_task(writeA, dataA, durationA, handleA);
+		taskB = create_dummy_task(writeB, dataB, durationB, handleB);
+		taskC = create_dummy_task(writeC, dataC, durationC, handleC);
+
+		taskA->tag_id = 3*loop;
+		taskA->use_tag = 1;
+		taskB->tag_id = 3*loop+1;
+		taskB->use_tag = 1;
+		taskC->tag_id = 3*loop+2;
+		taskC->use_tag = 1;
+
+		if (loop & 512)
+			starpu_tag_declare_deps(taskB->tag_id, 1, taskA->tag_id);
+		if (loop & 1024)
+			starpu_tag_declare_deps(taskC->tag_id, 1, taskA->tag_id);
+		if (loop & 2048)
+			starpu_tag_declare_deps(taskC->tag_id, 1, taskB->tag_id);
+
+		if (loop & 4096)
+			starpu_task_declare_deps_array(taskB, 1, &taskA);
+		if (loop & 8192)
+			starpu_task_declare_deps_array(taskC, 1, &taskA);
+		if (loop & 16384)
+			starpu_task_declare_deps_array(taskC, 1, &taskB);
+
+		taskA->detach = 0;
+		taskB->detach = 0;
+		taskC->detach = 0;
+
+		ret = starpu_task_submit(taskA);
+		if (ret == -ENODEV)
+			return STARPU_TEST_SKIPPED;
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+
+		if (loop & 32768)
+			usleep(duration);
+
+		ret = starpu_task_submit(taskB);
+		if (ret == -ENODEV)
+			return STARPU_TEST_SKIPPED;
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+
+		if (loop & 65536)
+			usleep(duration);
+
+		ret = starpu_task_submit(taskC);
+		if (ret == -ENODEV)
+			return STARPU_TEST_SKIPPED;
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+
+		ret = starpu_task_wait(taskA);
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+
+		ret = starpu_task_wait(taskB);
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+
+		ret = starpu_task_wait(taskC);
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+	} while(0);
+
+	starpu_shutdown();
+
+	return EXIT_SUCCESS;
+}