Browse Source

Add STARPU_LOCALITY flag

Samuel Thibault 9 years ago
parent
commit
654ad27766

+ 2 - 0
ChangeLog

@@ -117,6 +117,8 @@ New features:
   * Add asynchronous partition planning. It only supports coherency through
   * Add asynchronous partition planning. It only supports coherency through
     the home node of data for now.
     the home node of data for now.
   * Add graph inspection facility for schedulers.
   * Add graph inspection facility for schedulers.
+  * New STARPU_LOCALITY flag to mark data which should be taken into account
+    by schedulers for improving locality.
 
 
 Small features:
 Small features:
   * Tasks can now have a name (via the field const char *name of
   * Tasks can now have a name (via the field const char *name of

+ 4 - 1
doc/doxygen/chapters/api/data_management.doxy

@@ -72,7 +72,10 @@ using a synchronous and non-blocking mode (see starpu_mpi_issend())
 \var starpu_data_access_mode::STARPU_LOCALITY
 \var starpu_data_access_mode::STARPU_LOCALITY
 \ingroup API_Data_Management
 \ingroup API_Data_Management
 used to tell the scheduler which data is the most important for the task, and
 used to tell the scheduler which data is the most important for the task, and
-should thus be used to try to group tasks on the same core or cache, etc.
+should thus be used to try to group tasks on the same core or cache, etc. For
+now only the ws and lws schedulers take this flag into account, and only when
+rebuild with USE_LOCALITY flag defined in the
+src/sched_policies/work_stealing_policy.c source code.
 
 
 @name Basic Data Management API
 @name Basic Data Management API
 \ingroup API_Data_Management
 \ingroup API_Data_Management

+ 2 - 1
include/starpu_data.h

@@ -39,7 +39,8 @@ enum starpu_data_access_mode
 	STARPU_REDUX=(1<<3),
 	STARPU_REDUX=(1<<3),
 	STARPU_COMMUTE=(1<<4),
 	STARPU_COMMUTE=(1<<4),
 	STARPU_SSEND=(1<<5),
 	STARPU_SSEND=(1<<5),
-	STARPU_ACCESS_MODE_MAX=(1<<6)
+	STARPU_LOCALITY=(1<<6),
+	STARPU_ACCESS_MODE_MAX=(1<<7)
 	/* Note: other STARPU_* values in include/starpu_task_util.h */
 	/* Note: other STARPU_* values in include/starpu_task_util.h */
 };
 };
 
 

+ 1 - 0
src/core/dependencies/implicit_data_deps.c

@@ -208,6 +208,7 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
 
 
 	/* Do not care about some flags */
 	/* Do not care about some flags */
 	mode &= ~ STARPU_SSEND;
 	mode &= ~ STARPU_SSEND;
+	mode &= ~ STARPU_LOCALITY;
 
 
 	STARPU_ASSERT(!(mode & STARPU_SCRATCH));
 	STARPU_ASSERT(!(mode & STARPU_SCRATCH));
         _STARPU_LOG_IN();
         _STARPU_LOG_IN();

+ 6 - 1
src/datawizard/coherency.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
- * Copyright (C) 2009-2015  Université de Bordeaux
+ * Copyright (C) 2009-2016  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015  CNRS
  * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015  CNRS
  * Copyright (C) 2014-2015  Inria
  * Copyright (C) 2014-2015  Inria
  *
  *
@@ -257,6 +257,11 @@ struct _starpu_data_state
 	struct starpu_arbiter *arbiter;
 	struct starpu_arbiter *arbiter;
 	/* This is protected by the arbiter mutex */
 	/* This is protected by the arbiter mutex */
 	struct _starpu_data_requester_list arbitered_req_list;
 	struct _starpu_data_requester_list arbitered_req_list;
+
+	/* Data maintained by schedulers themselves */
+	/* Last worker that took this data in locality mode, or -1 if nobody
+	 * took it yet */
+	int last_locality;
 };
 };
 
 
 void _starpu_display_msi_stats(void);
 void _starpu_display_msi_stats(void);

+ 1 - 0
src/datawizard/interfaces/data_interface.c

@@ -309,6 +309,7 @@ static void _starpu_register_new_data(starpu_data_handle_t handle,
 	else
 	else
 		handle->arbiter = NULL;
 		handle->arbiter = NULL;
 	_starpu_data_requester_list_init(&handle->arbitered_req_list);
 	_starpu_data_requester_list_init(&handle->arbitered_req_list);
+	handle->last_locality = -1;
 
 
 	/* that new data is invalid from all nodes perpective except for the
 	/* that new data is invalid from all nodes perpective except for the
 	 * home node */
 	 * home node */

+ 74 - 2
src/sched_policies/work_stealing_policy.c

@@ -32,6 +32,9 @@
 /* Experimental (dead) code which needs to be tested, fixed... */
 /* Experimental (dead) code which needs to be tested, fixed... */
 /* #define USE_OVERLOAD */
 /* #define USE_OVERLOAD */
 
 
+/* Experimental code for improving data cache locality */
+//#define USE_LOCALITY
+
 struct _starpu_work_stealing_data
 struct _starpu_work_stealing_data
 {
 {
 	unsigned (*select_victim)(unsigned, int);
 	unsigned (*select_victim)(unsigned, int);
@@ -121,6 +124,68 @@ static unsigned select_worker_round_robin(unsigned sched_ctx_id)
 	return worker;
 	return worker;
 }
 }
 
 
+#ifdef USE_LOCALITY
+/* Select a worker according to the locality of the data of the task to be scheduled */
+static unsigned select_worker_locality(struct starpu_task *task, unsigned sched_ctx_id)
+{
+	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
+	if (nbuffers == 0)
+		return -1;
+
+	unsigned i, n;
+	starpu_data_handle_t locality[nbuffers];
+	unsigned ndata[STARPU_NMAXWORKERS] = { 0 };
+	int best_worker = -1;
+	unsigned best_ndata = 0;
+
+	n = 0;
+	for (i = 0; i < nbuffers; i++)
+		if (STARPU_TASK_GET_MODE(task, i) & STARPU_LOCALITY)
+		{
+			locality[n] = STARPU_TASK_GET_HANDLE(task, i);
+			if (locality[n]->last_locality >= 0)
+				ndata[locality[n]->last_locality]++;
+			n++;
+		}
+
+	if (n)
+	{
+		/* Some locality buffers, choose worker which has most of them */
+
+		struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
+		struct starpu_sched_ctx_iterator it;
+		workers->init_iterator(workers, &it);
+
+		while(workers->has_next(workers, &it))
+		{
+			int workerid = workers->get_next(workers, &it);
+			if (ndata[workerid] > best_ndata)
+			{
+				best_worker = workerid;
+				best_ndata = ndata[workerid];
+			}
+		}
+	}
+	return best_worker;
+}
+
+static void record_worker_locality(struct starpu_task *task, int workerid)
+{
+	/* Record where in locality data where the task went */
+	unsigned i;
+	for (i = 0; i < STARPU_TASK_GET_NBUFFERS(task); i++)
+		if (STARPU_TASK_GET_MODE(task, i) & STARPU_LOCALITY)
+		{
+			STARPU_TASK_GET_HANDLE(task, i)->last_locality = workerid;
+		}
+}
+
+#else
+static void record_worker_locality(struct starpu_task *task STARPU_ATTRIBUTE_UNUSED, int workerid)
+{
+}
+#endif
+
 #ifdef USE_OVERLOAD
 #ifdef USE_OVERLOAD
 
 
 /**
 /**
@@ -305,6 +370,7 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 	if (task)
 	if (task)
 	{
 	{
 		_STARPU_TRACE_WORK_STEALING(workerid, victim);
 		_STARPU_TRACE_WORK_STEALING(workerid, victim);
+		record_worker_locality(task, workerid);
 	}
 	}
 
 
 	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(victim_sched_mutex);
 	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(victim_sched_mutex);
@@ -329,10 +395,14 @@ int ws_push_task(struct starpu_task *task)
 {
 {
 	unsigned sched_ctx_id = task->sched_ctx;
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	int workerid;
+	int workerid = -1;
 
 
 pick_worker:
 pick_worker:
-	workerid = starpu_worker_get_id();
+#ifdef USE_LOCALITY
+	workerid = select_worker_locality(task, sched_ctx_id);
+#endif
+	if (workerid == -1)
+		workerid = starpu_worker_get_id();
 
 
 	/* If the current thread is not a worker but
 	/* If the current thread is not a worker but
 	 * the main thread (-1), we find the better one to
 	 * the main thread (-1), we find the better one to
@@ -349,6 +419,8 @@ pick_worker:
 	if (ws->queue_array[workerid] == NULL)
 	if (ws->queue_array[workerid] == NULL)
 		goto pick_worker;
 		goto pick_worker;
 
 
+	record_worker_locality(task, workerid);
+
 #ifdef HAVE_AYUDAME_H
 #ifdef HAVE_AYUDAME_H
 	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
 	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
 	if (AYU_event)
 	if (AYU_event)

+ 1 - 0
tests/Makefile.am

@@ -221,6 +221,7 @@ noinst_PROGRAMS =				\
 	datawizard/specific_node		\
 	datawizard/specific_node		\
 	datawizard/task_with_multiple_time_the_same_handle	\
 	datawizard/task_with_multiple_time_the_same_handle	\
 	datawizard/test_arbiter			\
 	datawizard/test_arbiter			\
+	datawizard/locality			\
 	disk/disk_copy				\
 	disk/disk_copy				\
 	disk/disk_compute			\
 	disk/disk_compute			\
 	disk/disk_pack				\
 	disk/disk_pack				\

+ 104 - 0
tests/datawizard/locality.c

@@ -0,0 +1,104 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2016  Université de Bordeaux
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+/*
+ * This is a dumb sample of stencil application
+ *
+ * Dumb domain split in N pieces:
+ *
+ * 0 | 1 | ... | N-1
+ *
+ * for each simulation iteration, a task works on some adjactent pieces
+ *
+ * Locality is thus set on the central piece.
+ */
+
+#include <starpu.h>
+#include "../helper.h"
+
+#define N 50
+
+#define ITER 50
+
+int task_worker[N][ITER];
+
+void cpu_f(void *descr[] STARPU_ATTRIBUTE_UNUSED, void *_args)
+{
+	unsigned i, loop;
+	starpu_codelet_unpack_args(_args, &loop, &i);
+	task_worker[i][loop] = starpu_worker_get_id();
+	starpu_sleep(0.001);
+}
+
+static struct starpu_codelet cl =
+{
+	.cpu_funcs = { cpu_f },
+	.cpu_funcs_name = { "cpu_f" },
+	.nbuffers = 3,
+	.modes =
+	{
+		STARPU_RW | STARPU_COMMUTE | STARPU_LOCALITY,
+		STARPU_RW | STARPU_COMMUTE | STARPU_LOCALITY,
+		STARPU_RW | STARPU_COMMUTE | STARPU_LOCALITY,
+	},
+};
+
+int main(int argc, char *argv[])
+{
+	int ret;
+	starpu_data_handle_t A[N];
+	unsigned i, loop;
+
+	ret = starpu_initialize(NULL, &argc, &argv);
+	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	/* Get most parallelism by using an arbiter */
+	starpu_arbiter_t arbiter = starpu_arbiter_create();
+	for (i = 0; i < N; i++)
+	{
+		starpu_void_data_register(&A[i]);
+		starpu_data_assign_arbiter(A[i], arbiter);
+	}
+
+	for (loop = 0; loop < ITER; loop++)
+	{
+		for (i = 1; i < N-1; i++)
+		{
+			starpu_task_insert(&cl,
+					STARPU_RW | STARPU_COMMUTE | STARPU_LOCALITY, A[i-1],
+					STARPU_RW | STARPU_COMMUTE | STARPU_LOCALITY, A[i],
+					STARPU_RW | STARPU_COMMUTE | STARPU_LOCALITY, A[i+1],
+					STARPU_VALUE, &loop, sizeof(loop),
+					STARPU_VALUE, &i, sizeof(i),
+					0);
+		}
+	}
+
+	starpu_task_wait_for_all();
+
+	for (loop = 0; loop < ITER; loop++)
+	{
+		for (i = 1; i < N-1; i++)
+		{
+			printf("%d ", task_worker[i][loop]);
+		}
+		printf("\n");
+	}
+
+	starpu_shutdown();
+	return EXIT_SUCCESS;
+}