浏览代码

* Add starpu_task_wait_for_n_submitted() and
STARPU_LIMIT_MAX_NSUBMITTED_TASKS/STARPU_LIMIT_MIN_NSUBMITTED_TASKS to
easily control the number of submitted tasks by making task submission
block.

Samuel Thibault 10 年之前
父节点
当前提交
51d9fdc477

+ 11 - 0
ChangeLog

@@ -156,6 +156,17 @@ Small changes:
   * Rename function starpu_trace_user_event() as
   * Rename function starpu_trace_user_event() as
     starpu_fxt_trace_user_event()
     starpu_fxt_trace_user_event()
 
 
+StarPU 1.1.5 (svn revision xxx)
+==============================================
+The scheduling context release
+
+  * Add starpu_memory_pin and starpu_memory_unpin to pin memory allocated
+    another way than starpu_malloc.
+  * Add starpu_task_wait_for_n_submitted() and
+    STARPU_LIMIT_MAX_NSUBMITTED_TASKS/STARPU_LIMIT_MIN_NSUBMITTED_TASKS to
+    easily control the number of submitted tasks by making task submission
+    block.
+
 StarPU 1.1.4 (svn revision 14856)
 StarPU 1.1.4 (svn revision 14856)
 ==============================================
 ==============================================
 The scheduling context release
 The scheduling context release

+ 24 - 0
doc/doxygen/chapters/05check_list_performance.doxy

@@ -141,6 +141,30 @@ execution. For example, in the Cholesky factorization (dense linear algebra
 application), the GEMM task uses up to 3 buffers, so it is possible to set the
 application), the GEMM task uses up to 3 buffers, so it is possible to set the
 maximum number of task buffers to 3 to run a Cholesky factorization on StarPU.
 maximum number of task buffers to 3 to run a Cholesky factorization on StarPU.
 
 
+\section HowtoReuseMemory How to reuse memory
+
+When your application needs to allocate more data than the available amount of
+memory usable by StarPU (given by \ref starpu_memory_get_available() ), the
+allocation cache system can reuse data buffers used by previously executed
+tasks. For that system to work with MPI tasks, you need to submit tasks progressively instead
+of as soon as possible, because in the case of MPI receives, the allocation cache check for reusing data
+buffers will be done at submission time, not at execution time.
+
+You have two options to control the task submission flow. The first one is by
+controlling the number of submitted tasks during the whole execution. This can
+be done whether by setting the environment variables \ref
+STARPU_LIMIT_MAX_NSUBMITTED_TASKS and \ref STARPU_LIMIT_MIN_NSUBMITTED_TASKS to
+tell StarPU when to stop submitting tasks and when to wake up and submit tasks
+again, or by explicitely calling \ref starpu_task_wait_for_n_submitted() in
+your application code for finest grain control (for example, between two
+iterations of a submission loop).
+
+The second option is to control the memory size of the allocation cache. This
+can be done in the application by using jointly \ref
+starpu_memory_get_available() and \ref starpu_memory_wait_available() to submit
+tasks only when there is enough memory space to allocate the data needed by the
+task, i.e when enough data are available for reuse in the allocation cache.
+
 \section PerformanceModelCalibration Performance Model Calibration
 \section PerformanceModelCalibration Performance Model Calibration
 
 
 Most schedulers are based on an estimation of codelet duration on each kind
 Most schedulers are based on an estimation of codelet duration on each kind

+ 22 - 0
doc/doxygen/chapters/40environment_variables.doxy

@@ -665,6 +665,28 @@ This specifies then size to be used by StarPU to push data when the main
 memory is getting full. The default is unlimited.
 memory is getting full. The default is unlimited.
 </dd>
 </dd>
 
 
+<dt>STARPU_LIMIT_MAX_NSUBMITTED_TASKS</dt>
+<dd>
+\anchor STARPU_LIMIT_MAX_NSUBMITTED_TASKS
+\addindex __env__STARPU_LIMIT_MAX_NSUBMITTED_TASKS    
+This variable allows the user to control the task submission flow by specifying
+to StarPU a maximum number of submitted tasks allowed at a given time, i.e. when
+this limit is reached task submission becomes blocking until enough tasks have
+completed, specified by STARPU_LIMIT_MIN_NSUBMITTED_TASKS.
+Setting it enables allocation cache buffer reuse in main memory.
+</dd>
+
+<dt>STARPU_LIMIT_MIN_NSUBMITTED_TASKS</dt>
+<dd>
+\anchor STARPU_LIMIT_MIN_NSUBMITTED_TASKS
+\addindex __env__STARPU_LIMIT_MIN_NSUBMITTED_TASKS    
+This variable allows the user to control the task submission flow by specifying
+to StarPU a submitted task threshold to wait before unblocking task submission. This
+variable has to be used in conjunction with \ref
+STARPU_LIMIT_MAX_NSUBMITTED_TASKS which puts the task submission thread to
+sleep.  Setting it enables allocation cache buffer reuse in main memory.
+</dd>
+
 <dt>STARPU_TRACE_BUFFER_SIZE</dt>
 <dt>STARPU_TRACE_BUFFER_SIZE</dt>
 <dd>
 <dd>
 \anchor STARPU_TRACE_BUFFER_SIZE
 \anchor STARPU_TRACE_BUFFER_SIZE

+ 11 - 0
doc/doxygen/chapters/api/codelet_and_tasks.doxy

@@ -796,6 +796,17 @@ terminated. It does not destroy these tasks.
 This function waits until all the tasks that were already
 This function waits until all the tasks that were already
 submitted to the context \p sched_ctx_id have been executed
 submitted to the context \p sched_ctx_id have been executed
 
 
+\fn int starpu_task_wait_for_n_submitted(unsigned n)
+\ingroup API_Codelet_And_Tasks
+This function blocks until there are <c> n </c> submitted tasks left (to the
+current context or the global one if there aren't any) to be executed. It does
+not destroy these tasks.
+
+\fn int starpu_task_wait_for_n_submitted_in_ctx(unsigned sched_ctx, unsigned n) 
+\ingroup API_Codelet_And_Tasks
+This function waits until there are <c> n </c> tasks submitted left to be
+executed that were already submitted to the context <c> sched_ctx_id </c>.
+
 \fn int starpu_task_nready(void)
 \fn int starpu_task_nready(void)
 \ingroup API_Codelet_And_Tasks
 \ingroup API_Codelet_And_Tasks
 TODO
 TODO

+ 2 - 0
include/starpu_task.h

@@ -286,8 +286,10 @@ int starpu_task_finished(struct starpu_task *task) STARPU_WARN_UNUSED_RESULT;
 int starpu_task_wait(struct starpu_task *task) STARPU_WARN_UNUSED_RESULT;
 int starpu_task_wait(struct starpu_task *task) STARPU_WARN_UNUSED_RESULT;
 
 
 int starpu_task_wait_for_all(void);
 int starpu_task_wait_for_all(void);
+int starpu_task_wait_for_n_submitted(unsigned n);
 
 
 int starpu_task_wait_for_all_in_ctx(unsigned sched_ctx_id);
 int starpu_task_wait_for_all_in_ctx(unsigned sched_ctx_id);
+int starpu_task_wait_for_n_submitted_in_ctx(unsigned sched_ctx_id, unsigned n);
 
 
 int starpu_task_wait_for_no_ready(void);
 int starpu_task_wait_for_no_ready(void);
 
 

+ 3 - 3
src/common/barrier.h

@@ -21,9 +21,9 @@
 
 
 struct _starpu_barrier
 struct _starpu_barrier
 {
 {
-	int count;
-	int reached_start;
-	int reached_exit;
+	unsigned count;
+	unsigned reached_start;
+	unsigned reached_exit;
 	double reached_flops;
 	double reached_flops;
 	starpu_pthread_mutex_t mutex;
 	starpu_pthread_mutex_t mutex;
 	starpu_pthread_mutex_t mutex_exit;
 	starpu_pthread_mutex_t mutex_exit;

+ 13 - 1
src/common/barrier_counter.c

@@ -16,7 +16,7 @@
 
 
 #include <common/barrier_counter.h>
 #include <common/barrier_counter.h>
 
 
-int _starpu_barrier_counter_init(struct _starpu_barrier_counter *barrier_c, int count)
+int _starpu_barrier_counter_init(struct _starpu_barrier_counter *barrier_c, unsigned count)
 {
 {
 	_starpu_barrier_init(&barrier_c->barrier, count);
 	_starpu_barrier_init(&barrier_c->barrier, count);
 	STARPU_PTHREAD_COND_INIT(&barrier_c->cond2, NULL);
 	STARPU_PTHREAD_COND_INIT(&barrier_c->cond2, NULL);
@@ -43,6 +43,18 @@ int _starpu_barrier_counter_wait_for_empty_counter(struct _starpu_barrier_counte
 	return 0;
 	return 0;
 }
 }
 
 
+int _starpu_barrier_counter_wait_until_counter_reaches_n(struct _starpu_barrier_counter *barrier_c, unsigned n)
+{
+	struct _starpu_barrier *barrier = &barrier_c->barrier;
+	STARPU_PTHREAD_MUTEX_LOCK(&barrier->mutex);
+
+	while (barrier->reached_start > n)
+		STARPU_PTHREAD_COND_WAIT(&barrier->cond, &barrier->mutex);
+
+	STARPU_PTHREAD_MUTEX_UNLOCK(&barrier->mutex);
+	return 0;
+}
+
 int _starpu_barrier_counter_wait_for_full_counter(struct _starpu_barrier_counter *barrier_c)
 int _starpu_barrier_counter_wait_for_full_counter(struct _starpu_barrier_counter *barrier_c)
 {
 {
 	struct _starpu_barrier *barrier = &barrier_c->barrier;
 	struct _starpu_barrier *barrier = &barrier_c->barrier;

+ 3 - 1
src/common/barrier_counter.h

@@ -26,12 +26,14 @@ struct _starpu_barrier_counter
 	starpu_pthread_cond_t cond2;
 	starpu_pthread_cond_t cond2;
 };
 };
 
 
-int _starpu_barrier_counter_init(struct _starpu_barrier_counter *barrier_c, int count);
+int _starpu_barrier_counter_init(struct _starpu_barrier_counter *barrier_c, unsigned count);
 
 
 int _starpu_barrier_counter_destroy(struct _starpu_barrier_counter *barrier_c);
 int _starpu_barrier_counter_destroy(struct _starpu_barrier_counter *barrier_c);
 
 
 int _starpu_barrier_counter_wait_for_empty_counter(struct _starpu_barrier_counter *barrier_c);
 int _starpu_barrier_counter_wait_for_empty_counter(struct _starpu_barrier_counter *barrier_c);
 
 
+int _starpu_barrier_counter_wait_until_counter_reaches_n(struct _starpu_barrier_counter *barrier_c, unsigned n);
+
 int _starpu_barrier_counter_wait_for_full_counter(struct _starpu_barrier_counter *barrier_c);
 int _starpu_barrier_counter_wait_for_full_counter(struct _starpu_barrier_counter *barrier_c);
 
 
 int _starpu_barrier_counter_decrement_until_empty_counter(struct _starpu_barrier_counter *barrier_c, double flops);
 int _starpu_barrier_counter_decrement_until_empty_counter(struct _starpu_barrier_counter *barrier_c, double flops);

+ 9 - 0
src/core/sched_ctx.c

@@ -1176,6 +1176,15 @@ int _starpu_wait_for_all_tasks_of_sched_ctx(unsigned sched_ctx_id)
 	return _starpu_barrier_counter_wait_for_empty_counter(&sched_ctx->tasks_barrier);
 	return _starpu_barrier_counter_wait_for_empty_counter(&sched_ctx->tasks_barrier);
 }
 }
 
 
+int _starpu_wait_for_n_submitted_tasks_of_sched_ctx(unsigned sched_ctx_id, unsigned n)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+
+	STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_task_wait_for_n_submitted_tasks must not be called from a task or callback");
+
+	return _starpu_barrier_counter_wait_until_counter_reaches_n(&sched_ctx->tasks_barrier, n);
+}
+
 void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
 void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
 {
 {
 	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
 	struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();

+ 3 - 0
src/core/sched_ctx.h

@@ -182,6 +182,9 @@ void _starpu_delete_all_sched_ctxs();
  * context have been executed. */
  * context have been executed. */
 int _starpu_wait_for_all_tasks_of_sched_ctx(unsigned sched_ctx_id);
 int _starpu_wait_for_all_tasks_of_sched_ctx(unsigned sched_ctx_id);
 
 
+/* This function waits until at most n tasks are still submitted. */
+int _starpu_wait_for_n_submitted_tasks_of_sched_ctx(unsigned sched_ctx_id, unsigned n);
+
 /* In order to implement starpu_wait_for_all_tasks_of_ctx, we keep track of the number of
 /* In order to implement starpu_wait_for_all_tasks_of_ctx, we keep track of the number of
  * task currently submitted to the context */
  * task currently submitted to the context */
 void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id);
 void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id);

+ 59 - 0
src/core/task.c

@@ -604,6 +604,16 @@ int starpu_task_submit(struct starpu_task *task)
 #endif
 #endif
 		;
 		;
 
 
+	if (!j->internal)
+	{
+		int limit_min_submitted_tasks = starpu_get_env_number("STARPU_LIMIT_MIN_SUBMITTED_TASKS");
+		int limit_max_submitted_tasks = starpu_get_env_number("STARPU_LIMIT_MAX_SUBMITTED_TASKS");
+		int nsubmitted_tasks = starpu_task_nsubmitted();
+		if (limit_max_submitted_tasks >= 0 && limit_max_submitted_tasks > nsubmitted_tasks
+			&& limit_min_submitted_tasks >= 0 && limit_min_submitted_tasks < nsubmitted_tasks)
+			starpu_task_wait_for_n_submitted(limit_min_submitted_tasks);
+	}
+
 
 
 	ret = _starpu_task_submit_head(task);
 	ret = _starpu_task_submit_head(task);
 	if (ret)
 	if (ret)
@@ -834,6 +844,55 @@ int starpu_task_wait_for_all_in_ctx(unsigned sched_ctx)
 #endif
 #endif
 	return 0;
 	return 0;
 }
 }
+
+/*
+ * We wait until there's a certain number of the tasks that have already been
+ * submitted left. Note that a regenerable is not considered finished until it
+ * was explicitely set as non-regenerale anymore (eg. from a callback).
+ */
+int starpu_task_wait_for_n_submitted(unsigned n)
+{
+	unsigned nsched_ctxs = _starpu_get_nsched_ctxs();
+	unsigned sched_ctx_id = nsched_ctxs == 1 ? 0 : starpu_sched_ctx_get_context();
+
+	/* if there is no indication about which context to wait,
+	   we wait for all tasks submitted to starpu */
+	if (sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
+	{
+		_STARPU_DEBUG("Waiting for all tasks\n");
+		STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_task_wait_for_n_submitted must not be called from a task or callback");
+
+		struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
+		if(config->topology.nsched_ctxs == 1)
+			_starpu_wait_for_n_submitted_tasks_of_sched_ctx(0, n);
+		else
+		{
+			int s;
+			for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
+			{
+				if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
+				{
+					_starpu_wait_for_n_submitted_tasks_of_sched_ctx(config->sched_ctxs[s].id, n);
+				}
+			}
+		}
+
+		return 0;
+	}
+	else
+	{
+		_STARPU_DEBUG("Waiting for tasks submitted to context %u\n", sched_ctx_id);
+		_starpu_wait_for_n_submitted_tasks_of_sched_ctx(sched_ctx_id, n);
+	}
+	return 0;
+}
+
+int starpu_task_wait_for_n_submitted_in_ctx(unsigned sched_ctx, unsigned n)
+{
+	_starpu_wait_for_n_submitted_tasks_of_sched_ctx(sched_ctx, n);
+
+	return 0;
+}
 /*
 /*
  * We wait until there is no ready task any more (i.e. StarPU will not be able
  * We wait until there is no ready task any more (i.e. StarPU will not be able
  * to progress any more).
  * to progress any more).