Forráskód Böngészése

New STARPU_MPI_MEM_THROTTLE environment variable

to throttle mpi submission according to memory use.
Samuel Thibault 5 éve
szülő
commit
94f97985dc

+ 2 - 0
ChangeLog

@@ -26,6 +26,8 @@ New features:
     or received data and their use by tasks
   * Add 4D tensor data interface.
   * New sched_tasks.rec trace file which monitors task scheduling push/pop actions
+  * New STARPU_MPI_MEM_THROTTLE environment variable to throttle mpi
+    submission according to memory use.
 
 Small changes:
   * Use the S4U interface of Simgrid instead of xbt and MSG.

+ 13 - 0
doc/doxygen/chapters/501_environment_variables.doxy

@@ -674,6 +674,15 @@ for that environment variable to be used, and the
 STARPU_MPI_DRIVER_CALL_FREQUENCY environment variable set to a positive value.
 </dd>
 
+<dt>STARPU_MPI_MEM_THROTTLE</dt>
+<dd>
+\anchor STARPU_MPI_MEM_THROTTLE
+\addindex __env__STARPU_MPI_MEM_THROTTLE
+When set to a positive value, this makes the starpu_mpi_*recv* functions
+block when the memory allocation required for network reception overflows the
+available main memory (as typically set by \ref STARPU_LIMIT_CPU_MEM)
+</dd>
+
 <dt>STARPU_SIMGRID_TRANSFER_COST</dt>
 <dd>
 \anchor STARPU_SIMGRID_TRANSFER_COST
@@ -935,6 +944,10 @@ that have a limited amount of memory.
 Specify the maximum number of megabytes that should be
 available to the application in the main CPU memory. Setting it enables allocation
 cache in main memory. Setting it to zero lets StarPU overflow memory.
+
+Note: for now not all StarPU allocations get throttled by this
+parameter. Notably MPI reception are not throttled unless \ref
+STARPU_MPI_MEM_THROTTLE is set to 1.
 </dd>
 
 <dt>STARPU_LIMIT_CPU_NUMA_devid_MEM</dt>

+ 7 - 0
mpi/src/mpi/starpu_mpi_mpi.c

@@ -167,6 +167,13 @@ void _starpu_mpi_submit_ready_request(void *arg)
 	_STARPU_MPI_LOG_IN();
 	struct _starpu_mpi_req *req = arg;
 
+	if (req->reserved_size)
+	{
+		/* The core will have really allocated the reception buffer now, release our reservation */
+		starpu_memory_deallocate(STARPU_MAIN_RAM, req->reserved_size);
+		req->reserved_size = 0;
+	}
+
 	_STARPU_MPI_INC_POSTED_REQUESTS(-1);
 
 	_STARPU_MPI_DEBUG(0, "new req %p srcdst %d tag %"PRIi64" and type %s %d\n", req, req->node_tag.node.rank, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->backend->is_internal_req);

+ 7 - 0
mpi/src/nmad/starpu_mpi_nmad.c

@@ -397,6 +397,13 @@ void _starpu_mpi_submit_ready_request(void *arg)
 	struct _starpu_mpi_req *req = arg;
 	STARPU_ASSERT_MSG(req, "Invalid request");
 
+	if (req->reserved_size)
+	{
+		/* The core will have really allocated the reception buffer now, release our reservation */
+		starpu_memory_deallocate(STARPU_MAIN_RAM, req->reserved_size);
+		req->reserved_size = 0;
+	}
+
 	/* submit the request to MPI directly from submitter */
 	_STARPU_MPI_DEBUG(2, "Handling new request %p type %s tag %ld src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
 			  req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.node.rank, req->data_handle, req->ptr, req->datatype_name, (int)req->count, req->registered_datatype);

+ 13 - 0
mpi/src/starpu_mpi.c

@@ -41,6 +41,19 @@ static void _starpu_mpi_isend_irecv_common(struct _starpu_mpi_req *req, enum sta
 	/* Asynchronously request StarPU to fetch the data in main memory: when
 	 * it is available in main memory, _starpu_mpi_submit_ready_request(req) is called and
 	 * the request is actually submitted */
+
+	if (_starpu_mpi_mem_throttle && mode & STARPU_W && !req->data_handle->initialized)
+	{
+		/* We will trigger allocation, pre-reserve for it */
+		size_t size = starpu_data_get_size(req->data_handle);
+		if (size)
+		{
+			/* This will potentially block */
+			starpu_memory_allocate(STARPU_MAIN_RAM, size, STARPU_MEMORY_WAIT);
+			req->reserved_size = size;
+		}
+	}
+
 	starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_submit_ready_request, (void *)req, sequential_consistency, 1, &req->pre_sync_jobid, &req->post_sync_jobid);
 }
 

+ 3 - 1
mpi/src/starpu_mpi_private.c

@@ -2,7 +2,7 @@
  *
  * Copyright (C) 2013                                     Inria
  * Copyright (C) 2010-2013,2015-2017                      CNRS
- * Copyright (C) 2010,2012,2014-2016,2018                 Université de Bordeaux
+ * Copyright (C) 2010,2012,2014-2016,2018,2020            Université de Bordeaux
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -29,6 +29,7 @@ int _starpu_mpi_use_prio = 1;
 int _starpu_mpi_fake_world_size = -1;
 int _starpu_mpi_fake_world_rank = -1;
 int _starpu_mpi_use_coop_sends = 1;
+int _starpu_mpi_mem_throttle = 0;
 
 void _starpu_mpi_set_debug_level_min(int level)
 {
@@ -66,4 +67,5 @@ void _starpu_mpi_env_init(void)
 	_starpu_mpi_thread_cpuid = starpu_get_env_number_default("STARPU_MPI_THREAD_CPUID", -1);
 	_starpu_mpi_use_prio = starpu_get_env_number_default("STARPU_MPI_PRIORITIES", 1);
 	_starpu_mpi_use_coop_sends = starpu_get_env_number_default("STARPU_MPI_COOP_SENDS", 1);
+	_starpu_mpi_mem_throttle = starpu_get_env_number_default("STARPU_MPI_MEM_THROTTLE", 0);
 }

+ 5 - 1
mpi/src/starpu_mpi_private.h

@@ -2,7 +2,7 @@
  *
  * Copyright (C) 2013,2016,2017                           Inria
  * Copyright (C) 2010-2017, 2019                          CNRS
- * Copyright (C) 2010-2019                                Université de Bordeaux
+ * Copyright (C) 2010-2020                                Université de Bordeaux
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -65,6 +65,7 @@ extern int _starpu_mpi_fake_world_rank;
 extern int _starpu_mpi_use_prio;
 extern int _starpu_mpi_thread_cpuid;
 extern int _starpu_mpi_use_coop_sends;
+extern int _starpu_mpi_mem_throttle;
 void _starpu_mpi_env_init(void);
 
 #ifdef STARPU_NO_ASSERT
@@ -239,6 +240,9 @@ LIST_TYPE(_starpu_mpi_req,
 	int *flag;
 	unsigned sync;
 
+	/* Amount of memory pre-reserved for the reception buffer */
+	size_t reserved_size;
+
 	int ret;
 
 	enum _starpu_mpi_request_type request_type; /* 0 send, 1 recv */