Browse Source

Support asynchronous partitioning through the home node of data, not only MAIN_RAM

Samuel Thibault 9 years ago
parent
commit
c8077021d0

+ 1 - 1
ChangeLog

@@ -108,7 +108,7 @@ New features:
   * New StarPU-MPI gdb debug functions
   * Generate animated html trace of modular schedulers.
   * Add asynchronous partition planning. It only supports coherency through
-    the main RAM for now.
+    the home node of data for now.
 
 Small features:
   * Tasks can now have a name (via the field const char *name of

+ 0 - 4
src/core/jobs.c

@@ -247,10 +247,6 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 	size_t data_size = 0;
 #endif //STARPU_USE_SC_HYPERVISOR
 
-	if (task->cl && task->cl->where == STARPU_NOWHERE)
-		/* push_task_output hasn't been done */
-		_starpu_release_nowhere_task_output(j);
-
 	/* We release handle reference count */
 	if (task->cl && !continuation)
 	{

+ 11 - 3
src/core/sched_policy.c

@@ -406,11 +406,19 @@ int _starpu_repush_task(struct _starpu_job *j)
 	 * corresponding dependencies */
 	if (task->cl == NULL || task->cl->where == STARPU_NOWHERE)
 	{
-		if(task->prologue_callback_pop_func)
+		if (task->prologue_callback_pop_func)
 			task->prologue_callback_pop_func(task->prologue_callback_pop_arg);
 
-		_starpu_handle_job_termination(j);
-		_STARPU_LOG_OUT_TAG("handle_job_termination");
+		if (task->cl && task->cl->specific_nodes)
+		{
+			/* Nothing to do, but we are asked to fetch data on some memory nodes */
+			_starpu_fetch_nowhere_task_input(j);
+		}
+		else
+		{
+			_starpu_handle_job_termination(j);
+			_STARPU_LOG_OUT_TAG("handle_job_termination");
+		}
 		return 0;
 	}
 

+ 90 - 30
src/datawizard/coherency.c

@@ -808,6 +808,7 @@ static struct _starpu_data_replicate *get_replicate(starpu_data_handle_t handle,
 		return &handle->per_node[node];
 }
 
+/* Synchronously fetch data for a given task (if it's not there already) */
 int _starpu_fetch_task_input(struct _starpu_job *j)
 {
 	_STARPU_TRACE_START_FETCH_INPUT(NULL);
@@ -919,13 +920,12 @@ enomem:
 	return -1;
 }
 
-void _starpu_push_task_output(struct _starpu_job *j)
+/* Release task data dependencies */
+static void __starpu_push_task_output(struct _starpu_job *j)
 {
 #ifdef STARPU_OPENMP
 	STARPU_ASSERT(!j->continuation);
 #endif
-	_STARPU_TRACE_START_PUSH_OUTPUT(NULL);
-
 	int profiling = starpu_profiling_status_get();
 	struct starpu_task *task = j->task;
 	if (profiling && task->profiling_info)
@@ -943,7 +943,7 @@ void _starpu_push_task_output(struct _starpu_job *j)
 		starpu_data_handle_t handle = descrs[index].handle;
 		enum starpu_data_access_mode mode = descrs[index].mode;
 		int node = descrs[index].node;
-		if (node == -1)
+		if (node == -1 && task->cl->where != STARPU_NOWHERE)
 			node = local_memory_node;
 
 		struct _starpu_data_replicate *local_replicate;
@@ -954,62 +954,122 @@ void _starpu_push_task_output(struct _starpu_job *j)
 			 * _starpu_compar_handles */
 			continue;
 
-		local_replicate = get_replicate(handle, mode, workerid, node);
+		if (node != -1)
+			local_replicate = get_replicate(handle, mode, workerid, node);
 
 		/* Keep a reference for future
 		 * _starpu_release_task_enforce_sequential_consistency call */
 		_starpu_spin_lock(&handle->header_lock);
 		handle->busy_count++;
-		_starpu_spin_unlock(&handle->header_lock);
 
-		_starpu_release_data_on_node(handle, 0, local_replicate);
+		if (node == -1)
+		{
+			/* NOWHERE case, just notify dependencies */
+			if (!_starpu_notify_data_dependencies(handle))
+				_starpu_spin_unlock(&handle->header_lock);
+		}
+		else
+		{
+			_starpu_spin_unlock(&handle->header_lock);
+			_starpu_release_data_on_node(handle, 0, local_replicate);
+		}
 	}
 
 	if (profiling && task->profiling_info)
 		_starpu_clock_gettime(&task->profiling_info->release_data_end_time);
+}
 
+/* Version for a driver running on a worker: we show the driver state in the trace */
+void _starpu_push_task_output(struct _starpu_job *j)
+{
+	_STARPU_TRACE_START_PUSH_OUTPUT(NULL);
+	__starpu_push_task_output(j);
 	_STARPU_TRACE_END_PUSH_OUTPUT(NULL);
 }
 
-/* Version of _starpu_push_task_output used by NOWHERE tasks, for which
- * _starpu_fetch_task_input was not called. We just release the handle */
-void _starpu_release_nowhere_task_output(struct _starpu_job *j)
+struct fetch_nowhere_wrapper
+{
+	struct _starpu_job *j;
+	unsigned pending;
+};
+
+static void _starpu_fetch_nowhere_task_input_cb(void *arg);
+/* Asynchronously fetch data for a task which will have no content */
+void _starpu_fetch_nowhere_task_input(struct _starpu_job *j)
 {
-#ifdef STARPU_OPENMP
-	STARPU_ASSERT(!j->continuation);
-#endif
 	int profiling = starpu_profiling_status_get();
 	struct starpu_task *task = j->task;
 	if (profiling && task->profiling_info)
-		_starpu_clock_gettime(&task->profiling_info->release_data_start_time);
+		_starpu_clock_gettime(&task->profiling_info->acquire_data_start_time);
 
-        struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
-        unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
+	struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
+	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
+	unsigned nfetchbuffers = 0;
+	struct fetch_nowhere_wrapper *wrapper;
 
 	unsigned index;
 	for (index = 0; index < nbuffers; index++)
 	{
-		starpu_data_handle_t handle = descrs[index].handle;
+		int node = descrs[index].node;
+		if (node != -1)
+			nfetchbuffers++;
+	}
 
-		if (index && descrs[index-1].handle == descrs[index].handle)
-			/* We have already released this data, skip it. This
-			 * depends on ordering putting writes before reads, see
-			 * _starpu_compar_handles */
+	if (!nfetchbuffers)
+	{
+		/* Nothing to fetch actually, already finished! */
+		_starpu_handle_job_termination(j);
+		_STARPU_LOG_OUT_TAG("handle_job_termination");
+		return;
+	}
+
+	wrapper = malloc(sizeof(*wrapper));
+	wrapper->j = j;
+	wrapper->pending = nfetchbuffers;
+
+	for (index = 0; index < nbuffers; index++)
+	{
+		starpu_data_handle_t handle = descrs[index].handle;
+		enum starpu_data_access_mode mode = descrs[index].mode;
+		int node = descrs[index].node;
+		if (node == -1)
 			continue;
 
-		/* Keep a reference for future
-		 * _starpu_release_task_enforce_sequential_consistency call */
-		_starpu_spin_lock(&handle->header_lock);
-		handle->busy_count++;
-		_starpu_spin_unlock(&handle->header_lock);
+		if (mode == STARPU_NONE ||
+			(mode & ((1<<STARPU_MODE_SHIFT) - 1)) >= STARPU_ACCESS_MODE_MAX ||
+			(mode >> STARPU_MODE_SHIFT) >= STARPU_SHIFTED_MODE_MAX)
+			STARPU_ASSERT_MSG(0, "mode %d (0x%x) is bogus\n", mode, mode);
+		STARPU_ASSERT(mode != STARPU_SCRATCH && mode != STARPU_REDUX);
 
-		_starpu_spin_lock(&handle->header_lock);
-		if (!_starpu_notify_data_dependencies(handle))
-			_starpu_spin_unlock(&handle->header_lock);
+		struct _starpu_data_replicate *local_replicate;
+
+		local_replicate = get_replicate(handle, mode, -1, node);
+
+		_starpu_fetch_data_on_node(handle, local_replicate, mode, 0, 0, 1, _starpu_fetch_nowhere_task_input_cb, wrapper);
 	}
 
 	if (profiling && task->profiling_info)
-		_starpu_clock_gettime(&task->profiling_info->release_data_end_time);
+		_starpu_clock_gettime(&task->profiling_info->acquire_data_end_time);
+}
+
+static void _starpu_fetch_nowhere_task_input_cb(void *arg)
+{
+	/* One more transfer finished */
+	struct fetch_nowhere_wrapper *wrapper = arg;
+
+	ANNOTATE_HAPPENS_BEFORE(&wrapper->pending);
+	unsigned pending = STARPU_ATOMIC_ADD(&wrapper->pending, -1);
+	if (pending == 0)
+	{
+		ANNOTATE_HAPPENS_AFTER(&wrapper->pending);
+
+		/* Finished transferring, task is over */
+		struct _starpu_job *j = wrapper->j;
+		free(wrapper);
+		__starpu_push_task_output(j);
+		_starpu_handle_job_termination(j);
+		_STARPU_LOG_OUT_TAG("handle_job_termination");
+	}
 }
 
 /* NB : this value can only be an indication of the status of a data

+ 3 - 0
src/datawizard/coherency.h

@@ -147,6 +147,8 @@ struct _starpu_data_state
 	unsigned nchildren;
 	/* How many partition plans this handle has */
 	unsigned nplans;
+	/* Switch codelet for asynchronous partitioning */
+	struct starpu_codelet *switch_cl;
 	/* Whether a partition plan is currently submitted and the
 	 * corresponding unpartition has not been yet
 	 *
@@ -283,6 +285,7 @@ void _starpu_release_nowhere_task_output(struct _starpu_job *j);
 
 STARPU_ATTRIBUTE_WARN_UNUSED_RESULT
 int _starpu_fetch_task_input(struct _starpu_job *j);
+void _starpu_fetch_nowhere_task_input(struct _starpu_job *j);
 
 unsigned _starpu_is_data_present_or_requested(struct _starpu_data_state *state, unsigned node);
 

+ 21 - 22
src/datawizard/filters.c

@@ -186,6 +186,7 @@ static void _starpu_data_partition(starpu_data_handle_t initial_handle, starpu_d
 
 		child->nchildren = 0;
 		child->nplans = 0;
+		child->switch_cl = NULL;
 		child->partitioned = 0;
 		child->readonly = 0;
                 child->mpi_data = initial_handle->mpi_data;
@@ -520,12 +521,24 @@ void starpu_data_partition_plan(starpu_data_handle_t initial_handle, struct star
 	unsigned i;
 	unsigned nparts = _starpu_data_partition_nparts(initial_handle, f);
 	STARPU_ASSERT_MSG(initial_handle->nchildren == 0, "partition planning and synchronous partitioning is not supported");
-	STARPU_ASSERT_MSG(initial_handle->home_node == STARPU_MAIN_RAM, "partition planning is currently only supported from main RAM");
 	STARPU_ASSERT_MSG(initial_handle->sequential_consistency, "partition planning is currently only supported for data with sequential consistency");
 
 	for (i = 0; i < nparts; i++)
 		childrenp[i] = calloc(1, sizeof(struct _starpu_data_state));
 	_starpu_data_partition(initial_handle, childrenp, nparts, f, 0);
+
+	if (!initial_handle->switch_cl)
+	{
+		/* Create a codelet that will make the coherency on the home node */
+		struct starpu_codelet *cl = initial_handle->switch_cl = calloc(1, sizeof(*initial_handle->switch_cl));
+		cl->where = STARPU_NOWHERE;
+		cl->nbuffers = STARPU_VARIABLE_NBUFFERS;
+		cl->name = "data_partition_switch";
+		cl->specific_nodes = 1;
+		cl->dyn_nodes = malloc((nparts+1) * sizeof(*cl->dyn_nodes));
+		for (i = 0; i < nparts+1; i++)
+			cl->dyn_nodes[i] = initial_handle->home_node;
+	}
 }
 
 void starpu_data_partition_clean(starpu_data_handle_t root_handle, unsigned nparts, starpu_data_handle_t *children)
@@ -540,20 +553,6 @@ void starpu_data_partition_clean(starpu_data_handle_t root_handle, unsigned npar
 	_starpu_spin_unlock(&root_handle->header_lock);
 }
 
-static void empty(void *buffers[] STARPU_ATTRIBUTE_UNUSED, void *cl_arg STARPU_ATTRIBUTE_UNUSED)
-{
-	/* This doesn't need to do anything, it's simply used to make coherency
-	 * between the two views, by simply running on the home node of the
-	 * data, thus getting back all data pieces there.  */
-}
-
-static struct starpu_codelet cl_switch =
-{
-	.cpu_funcs = {empty},
-	.nbuffers = STARPU_VARIABLE_NBUFFERS,
-	.name = "data_partition_switch"
-};
-
 void starpu_data_partition_submit(starpu_data_handle_t initial_handle, unsigned nparts, starpu_data_handle_t *children)
 {
 	STARPU_ASSERT_MSG(initial_handle->sequential_consistency, "partition planning is currently only supported for data with sequential consistency");
@@ -572,7 +571,7 @@ void starpu_data_partition_submit(starpu_data_handle_t initial_handle, unsigned
 		descr[i].mode = STARPU_W;
 	}
 	/* TODO: assert nparts too */
-	starpu_task_insert(&cl_switch, STARPU_RW, initial_handle, STARPU_DATA_MODE_ARRAY, descr, nparts, 0);
+	starpu_task_insert(initial_handle->switch_cl, STARPU_RW, initial_handle, STARPU_DATA_MODE_ARRAY, descr, nparts, 0);
 	starpu_data_invalidate_submit(initial_handle);
 }
 
@@ -594,7 +593,7 @@ void starpu_data_partition_readonly_submit(starpu_data_handle_t initial_handle,
 		descr[i].mode = STARPU_W;
 	}
 	/* TODO: assert nparts too */
-	starpu_task_insert(&cl_switch, STARPU_R, initial_handle, STARPU_DATA_MODE_ARRAY, descr, nparts, 0);
+	starpu_task_insert(initial_handle->switch_cl, STARPU_R, initial_handle, STARPU_DATA_MODE_ARRAY, descr, nparts, 0);
 }
 
 void starpu_data_partition_readwrite_upgrade_submit(starpu_data_handle_t initial_handle, unsigned nparts, starpu_data_handle_t *children)
@@ -615,14 +614,14 @@ void starpu_data_partition_readwrite_upgrade_submit(starpu_data_handle_t initial
 		descr[i].mode = STARPU_W;
 	}
 	/* TODO: assert nparts too */
-	starpu_task_insert(&cl_switch, STARPU_RW, initial_handle, STARPU_DATA_MODE_ARRAY, descr, nparts, 0);
+	starpu_task_insert(initial_handle->switch_cl, STARPU_RW, initial_handle, STARPU_DATA_MODE_ARRAY, descr, nparts, 0);
 	starpu_data_invalidate_submit(initial_handle);
 }
 
 void starpu_data_unpartition_submit(starpu_data_handle_t initial_handle, unsigned nparts, starpu_data_handle_t *children, int gather_node)
 {
 	STARPU_ASSERT_MSG(initial_handle->sequential_consistency, "partition planning is currently only supported for data with sequential consistency");
-	STARPU_ASSERT_MSG(gather_node == STARPU_MAIN_RAM || gather_node == -1, "gathering node different from main RAM is currently not supported");
+	STARPU_ASSERT_MSG(gather_node == initial_handle->home_node || gather_node == -1, "gathering node different from home node is currently not supported");
 	_starpu_spin_lock(&initial_handle->header_lock);
 	STARPU_ASSERT_MSG(initial_handle->partitioned >= 1, "No partition planning is active for this handle");
 	initial_handle->partitioned--;
@@ -639,7 +638,7 @@ void starpu_data_unpartition_submit(starpu_data_handle_t initial_handle, unsigne
 		descr[i].mode = STARPU_RW;
 	}
 	/* TODO: assert nparts too */
-	starpu_task_insert(&cl_switch, STARPU_W, initial_handle, STARPU_DATA_MODE_ARRAY, descr, nparts, 0);
+	starpu_task_insert(initial_handle->switch_cl, STARPU_W, initial_handle, STARPU_DATA_MODE_ARRAY, descr, nparts, 0);
 	for (i = 0; i < nparts; i++)
 		starpu_data_invalidate_submit(children[i]);
 }
@@ -647,7 +646,7 @@ void starpu_data_unpartition_submit(starpu_data_handle_t initial_handle, unsigne
 void starpu_data_unpartition_readonly_submit(starpu_data_handle_t initial_handle, unsigned nparts, starpu_data_handle_t *children, int gather_node)
 {
 	STARPU_ASSERT_MSG(initial_handle->sequential_consistency, "partition planning is currently only supported for data with sequential consistency");
-	STARPU_ASSERT_MSG(gather_node == STARPU_MAIN_RAM || gather_node == -1, "gathering node different from main RAM is currently not supported");
+	STARPU_ASSERT_MSG(gather_node == initial_handle->home_node || gather_node == -1, "gathering node different from home node is currently not supported");
 	_starpu_spin_lock(&initial_handle->header_lock);
 	STARPU_ASSERT_MSG(initial_handle->partitioned >= 1, "No partition planning is active for this handle");
 	initial_handle->readonly = 1;
@@ -662,7 +661,7 @@ void starpu_data_unpartition_readonly_submit(starpu_data_handle_t initial_handle
 		descr[i].mode = STARPU_R;
 	}
 	/* TODO: assert nparts too */
-	starpu_task_insert(&cl_switch, STARPU_W, initial_handle, STARPU_DATA_MODE_ARRAY, descr, nparts, 0);
+	starpu_task_insert(initial_handle->switch_cl, STARPU_W, initial_handle, STARPU_DATA_MODE_ARRAY, descr, nparts, 0);
 }
 
 /*

+ 6 - 0
src/datawizard/interfaces/data_interface.c

@@ -253,6 +253,7 @@ static void _starpu_register_new_data(starpu_data_handle_t handle,
 	/* there is no hierarchy yet */
 	handle->nchildren = 0;
 	handle->nplans = 0;
+	handle->switch_cl = NULL;
 	handle->partitioned = 0;
 	handle->readonly = 0;
 	handle->root_handle = handle;
@@ -824,6 +825,11 @@ static void _starpu_data_unregister(starpu_data_handle_t handle, unsigned cohere
 	STARPU_HG_ENABLE_CHECKING(handle->post_sync_tasks_cnt);
 	STARPU_HG_ENABLE_CHECKING(handle->busy_count);
 
+	if (handle->switch_cl)
+	{
+		free(handle->switch_cl->dyn_nodes);
+		free(handle->switch_cl);
+	}
 	free(handle);
 }