Browse Source

finish fixing asynchronous partitioning of temporary data: data_partition_submit should not do coherency when the data is not initialized yet

Samuel Thibault 8 years ago
parent
commit
1496e10d1f

+ 3 - 0
src/core/dependencies/implicit_data_deps.c

@@ -218,6 +218,9 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
 		struct _starpu_job *pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task);
 		struct _starpu_job *post_sync_job = _starpu_get_job_associated_to_task(post_sync_task);
 
+		if (mode & STARPU_W || mode == STARPU_REDUX)
+			handle->initialized = 1;
+
 		/* Skip tasks that are associated to a reduction phase so that
 		 * they do not interfere with the application. */
 		if (pre_sync_job->reduction_task || post_sync_job->reduction_task)

+ 2 - 0
src/datawizard/coherency.h

@@ -185,6 +185,8 @@ struct _starpu_data_state
 
 	/* Does StarPU have to enforce some implicit data-dependencies ? */
 	unsigned sequential_consistency;
+	/* Is the data initialized, or a task is already submitted to initialize it */
+	unsigned initialized;
 
 	/* This lock should protect any operation to enforce
 	 * sequential_consistency */

+ 22 - 10
src/datawizard/filters.c

@@ -599,6 +599,9 @@ void starpu_data_partition_submit(starpu_data_handle_t initial_handle, unsigned
 	initial_handle->partitioned++;
 	_starpu_spin_unlock(&initial_handle->header_lock);
 
+	if (!initial_handle->initialized)
+		/* No need for coherency, it is not initialized */
+		return;
 	unsigned i;
 	struct starpu_data_descr descr[nparts];
 	for (i = 0; i < nparts; i++)
@@ -621,6 +624,7 @@ void starpu_data_partition_readonly_submit(starpu_data_handle_t initial_handle,
 	initial_handle->readonly = 1;
 	_starpu_spin_unlock(&initial_handle->header_lock);
 
+	STARPU_ASSERT_MSG(initial_handle->initialized, "It is odd to read-only-partition a data which does not have a value yet");
 	unsigned i;
 	struct starpu_data_descr descr[nparts];
 	for (i = 0; i < nparts; i++)
@@ -666,16 +670,20 @@ void starpu_data_unpartition_submit(starpu_data_handle_t initial_handle, unsigne
 		initial_handle->readonly = 0;
 	_starpu_spin_unlock(&initial_handle->header_lock);
 
-	unsigned i;
+	unsigned i, n;
 	struct starpu_data_descr descr[nparts];
-	for (i = 0; i < nparts; i++)
+	for (i = 0, n = 0; i < nparts; i++)
 	{
 		STARPU_ASSERT_MSG(children[i]->father_handle == initial_handle, "children parameter of starpu_data_partition_submit must be the children of the parent parameter");
-		descr[i].handle = children[i];
-		descr[i].mode = STARPU_RW;
+		if (!children[i]->initialized)
+			/* Dropped value, do not care about coherency for this one */
+			continue;
+		descr[n].handle = children[i];
+		descr[n].mode = STARPU_RW;
+		n++;
 	}
 	/* TODO: assert nparts too */
-	starpu_task_insert(initial_handle->switch_cl, STARPU_W, initial_handle, STARPU_DATA_MODE_ARRAY, descr, nparts, 0);
+	starpu_task_insert(initial_handle->switch_cl, STARPU_W, initial_handle, STARPU_DATA_MODE_ARRAY, descr, n, 0);
 	for (i = 0; i < nparts; i++)
 		starpu_data_invalidate_submit(children[i]);
 }
@@ -689,16 +697,20 @@ void starpu_data_unpartition_readonly_submit(starpu_data_handle_t initial_handle
 	initial_handle->readonly = 1;
 	_starpu_spin_unlock(&initial_handle->header_lock);
 
-	unsigned i;
+	unsigned i, n;
 	struct starpu_data_descr descr[nparts];
-	for (i = 0; i < nparts; i++)
+	for (i = 0, n = 0; i < nparts; i++)
 	{
 		STARPU_ASSERT_MSG(children[i]->father_handle == initial_handle, "children parameter of starpu_data_partition_submit must be the children of the parent parameter");
-		descr[i].handle = children[i];
-		descr[i].mode = STARPU_R;
+		if (!children[i]->initialized)
+			/* Dropped value, do not care about coherency for this one */
+			continue;
+		descr[n].handle = children[i];
+		descr[n].mode = STARPU_R;
+		n++;
 	}
 	/* TODO: assert nparts too */
-	starpu_task_insert(initial_handle->switch_cl, STARPU_W, initial_handle, STARPU_DATA_MODE_ARRAY, descr, nparts, 0);
+	starpu_task_insert(initial_handle->switch_cl, STARPU_W, initial_handle, STARPU_DATA_MODE_ARRAY, descr, n, 0);
 }
 
 /*

+ 5 - 0
src/datawizard/interfaces/data_interface.c

@@ -276,6 +276,7 @@ static void _starpu_register_new_data(starpu_data_handle_t handle,
 
 	handle->sequential_consistency =
 		starpu_data_get_default_sequential_consistency_flag();
+	handle->initialized = home_node != -1;
 
 	STARPU_PTHREAD_MUTEX_INIT(&handle->sequential_consistency_mutex, NULL);
 	handle->last_submitted_mode = STARPU_R;
@@ -987,6 +988,8 @@ void starpu_data_invalidate(starpu_data_handle_t handle)
 	starpu_data_acquire_on_node(handle, -1, STARPU_W);
 
 	_starpu_data_invalidate(handle);
+
+	handle->initialized = 0;
 }
 
 void starpu_data_invalidate_submit(starpu_data_handle_t handle)
@@ -994,6 +997,8 @@ void starpu_data_invalidate_submit(starpu_data_handle_t handle)
 	STARPU_ASSERT(handle);
 
 	starpu_data_acquire_on_node_cb(handle, -1, STARPU_W, _starpu_data_invalidate, handle);
+
+	handle->initialized = 0;
 }
 
 enum starpu_data_interface_id starpu_data_get_interface_id(starpu_data_handle_t handle)

+ 5 - 13
tests/datawizard/temporary_partition.c

@@ -29,14 +29,6 @@ static void codelet(void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
 {
 }
 
-static struct starpu_codelet clrw =
-{
-	.where = STARPU_CPU,
-	.cpu_funcs = {codelet},
-	.nbuffers = 1,
-	.modes = {STARPU_RW}
-};
-
 static struct starpu_codelet clw =
 {
 	.where = STARPU_CPU,
@@ -65,9 +57,6 @@ int main(int argc, char **argv)
 
 	starpu_vector_data_register(&handle, -1, 0, SIZE, sizeof(char));
 
-	/* Fill temporary */
-	starpu_task_insert(&clw, STARPU_W, handle, 0);
-
 	/* Fork */
 	struct starpu_data_filter f =
 	{
@@ -80,13 +69,16 @@ int main(int argc, char **argv)
 	/* Process in parallel */
 	for (i = 0; i < NPARTS; i++)
 	{
-		ret = starpu_task_insert(&clrw,
-					 STARPU_RW, handles[i],
+		ret = starpu_task_insert(&clw,
+					 STARPU_W, handles[i],
 					 0);
 		if (ret == -ENODEV) goto enodev;
 		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert");
 	}
 
+	/* Invalidate one random piece we don't care coherency about */
+	starpu_data_invalidate_submit(handles[NPARTS/2]);
+
 	/* Join */
 	starpu_data_unpartition_submit(handle, NPARTS, handles, -1);
 	starpu_data_partition_clean(handle, NPARTS, handles);