Browse Source

Allocate per_worker array lazily, to divide data handle weight by something by an order of 10x

Samuel Thibault 9 years ago
parent
commit
cc2c83286c

+ 9 - 0
src/datawizard/coherency.c

@@ -893,7 +893,16 @@ int starpu_idle_prefetch_task_input_on_node(struct starpu_task *task, unsigned n
 static struct _starpu_data_replicate *get_replicate(starpu_data_handle_t handle, enum starpu_data_access_mode mode, int workerid, unsigned node)
 {
 	if (mode & (STARPU_SCRATCH|STARPU_REDUX))
+	{
+		if (!handle->per_worker)
+		{
+			_starpu_spin_lock(&handle->header_lock);
+			if (!handle->per_worker)
+				_starpu_data_initialize_per_worker(handle);
+			_starpu_spin_unlock(&handle->header_lock);
+		}
 		return &handle->per_worker[workerid];
+	}
 	else
 		/* That's a "normal" buffer (R/W) */
 		return &handle->per_node[node];

+ 1 - 1
src/datawizard/coherency.h

@@ -163,7 +163,7 @@ struct _starpu_data_state
 
 	/* describe the state of the data in term of coherency */
 	struct _starpu_data_replicate per_node[STARPU_MAXNODES];
-	struct _starpu_data_replicate per_worker[STARPU_NMAXWORKERS];
+	struct _starpu_data_replicate *per_worker;
 
 	struct starpu_data_interface_ops *ops;
 

+ 2 - 26
src/datawizard/filters.c

@@ -148,8 +148,6 @@ static void _starpu_data_partition(starpu_data_handle_t initial_handle, starpu_d
 		initial_handle->nchildren = nparts;
 	}
 
-	unsigned nworkers = starpu_worker_get_count();
-
 	for (node = 0; node < STARPU_MAXNODES; node++)
 	{
 		if (initial_handle->per_node[node].state != STARPU_INVALID)
@@ -283,30 +281,7 @@ static void _starpu_data_partition(starpu_data_handle_t initial_handle, starpu_d
 			f->filter_func(initial_interface, child_interface, f, i, nparts);
 		}
 
-		unsigned worker;
-		for (worker = 0; worker < nworkers; worker++)
-		{
-			struct _starpu_data_replicate *child_replicate;
-			child_replicate = &child->per_worker[worker];
-
-			child_replicate->state = STARPU_INVALID;
-			child_replicate->allocated = 0;
-			child_replicate->automatically_allocated = 0;
-			child_replicate->refcnt = 0;
-			child_replicate->memory_node = starpu_worker_get_memory_node(worker);
-			child_replicate->requested = 0;
-
-			for (node = 0; node < STARPU_MAXNODES; node++)
-			{
-				child_replicate->request[node] = NULL;
-			}
-
-			child_replicate->relaxed_coherency = 1;
-			child_replicate->initialized = 0;
-
-			/* duplicate  the content of the interface on node 0 */
-			memcpy(child_replicate->data_interface, child->per_node[0].data_interface, child->ops->interface_size);
-		}
+		child->per_worker = NULL;
 
 		/* We compute the size and the footprint of the child once and
 		 * store it in the handle */
@@ -410,6 +385,7 @@ void starpu_data_unpartition(starpu_data_handle_t root_handle, unsigned gatherin
 
 		_starpu_data_unregister_ram_pointer(child_handle);
 
+		if (child_handle->per_worker)
 		for (worker = 0; worker < nworkers; worker++)
 		{
 			struct _starpu_data_replicate *local = &child_handle->per_worker[worker];

+ 34 - 26
src/datawizard/interfaces/data_interface.c

@@ -349,11 +349,34 @@ static void _starpu_register_new_data(starpu_data_handle_t handle,
 		}
 	}
 
+	handle->per_worker = NULL;
+
+	/* now the data is available ! */
+	_starpu_spin_unlock(&handle->header_lock);
+
+	ptr = starpu_data_handle_to_pointer(handle, STARPU_MAIN_RAM);
+	if (ptr != NULL)
+	{
+		_starpu_data_register_ram_pointer(handle, ptr);
+	}
+}
+
+void
+_starpu_data_initialize_per_worker(starpu_data_handle_t handle)
+{
 	unsigned worker;
 	unsigned nworkers = starpu_worker_get_count();
+
+	_starpu_spin_checklocked(&handle->header_lock);
+
+	handle->per_worker = calloc(nworkers, sizeof(*handle->per_worker));
+
+	size_t interfacesize = handle->ops->interface_size;
+
 	for (worker = 0; worker < nworkers; worker++)
 	{
 		struct _starpu_data_replicate *replicate;
+		unsigned node;
 		replicate = &handle->per_worker[worker];
 		replicate->allocated = 0;
 		replicate->automatically_allocated = 0;
@@ -372,17 +395,10 @@ static void _starpu_register_new_data(starpu_data_handle_t handle,
 		replicate->initialized = 0;
 		replicate->memory_node = starpu_worker_get_memory_node(worker);
 
+		replicate->data_interface = calloc(1, interfacesize);
+		STARPU_ASSERT(replicate->data_interface);
 		/* duplicate  the content of the interface on node 0 */
-		memcpy(replicate->data_interface, handle->per_node[0].data_interface, handle->ops->interface_size);
-	}
-
-	/* now the data is available ! */
-	_starpu_spin_unlock(&handle->header_lock);
-
-	ptr = starpu_data_handle_to_pointer(handle, STARPU_MAIN_RAM);
-	if (ptr != NULL)
-	{
-		_starpu_data_register_ram_pointer(handle, ptr);
+		memcpy(replicate->data_interface, handle->per_node[STARPU_MAIN_RAM].data_interface, interfacesize);
 	}
 }
 
@@ -400,7 +416,6 @@ void starpu_data_ptr_register(starpu_data_handle_t handle, unsigned node)
 int _starpu_data_handle_init(starpu_data_handle_t handle, struct starpu_data_interface_ops *interface_ops, unsigned int mf_node)
 {
 	unsigned node;
-	unsigned worker;
 
 	/* Tell helgrind that our access to busy_count in
 	 * starpu_data_unregister is actually safe */
@@ -427,19 +442,6 @@ int _starpu_data_handle_init(starpu_data_handle_t handle, struct starpu_data_int
 		STARPU_ASSERT(replicate->data_interface);
 	}
 
-	unsigned nworkers = starpu_worker_get_count();
-	for (worker = 0; worker < nworkers; worker++)
-	{
-		struct _starpu_data_replicate *replicate;
-		replicate = &handle->per_worker[worker];
-
-		replicate->handle = handle;
-
-		replicate->data_interface = calloc(1, interfacesize);
-		STARPU_ASSERT(replicate->data_interface);
-
-	}
-
 	return 0;
 }
 
@@ -559,8 +561,12 @@ void _starpu_data_free_interfaces(starpu_data_handle_t handle)
 	for (node = 0; node < STARPU_MAXNODES; node++)
 		free(handle->per_node[node].data_interface);
 
-	for (worker = 0; worker < nworkers; worker++)
-		free(handle->per_worker[worker].data_interface);
+	if (handle->per_worker)
+	{
+		for (worker = 0; worker < nworkers; worker++)
+			free(handle->per_worker[worker].data_interface);
+		free(handle->per_worker);
+	}
 }
 
 struct _starpu_unregister_callback_arg
@@ -829,6 +835,7 @@ retry_busy:
 	}
 	unsigned worker;
 	unsigned nworkers = starpu_worker_get_count();
+	if (handle->per_worker)
 	for (worker = 0; worker < nworkers; worker++)
 	{
 		struct _starpu_data_replicate *local = &handle->per_worker[worker];
@@ -935,6 +942,7 @@ static void _starpu_data_invalidate(void *data)
 
 	unsigned worker;
 	unsigned nworkers = starpu_worker_get_count();
+	if (handle->per_worker)
 	for (worker = 0; worker < nworkers; worker++)
 	{
 		struct _starpu_data_replicate *local = &handle->per_worker[worker];

+ 1 - 0
src/datawizard/interfaces/data_interface.h

@@ -56,6 +56,7 @@ void _starpu_data_free_interfaces(starpu_data_handle_t handle)
 
 extern
 int _starpu_data_handle_init(starpu_data_handle_t handle, struct starpu_data_interface_ops *interface_ops, unsigned int mf_node);
+void _starpu_data_initialize_per_worker(starpu_data_handle_t handle);
 
 extern struct starpu_arbiter *_starpu_global_arbiter;
 extern void _starpu_data_interface_init(void) STARPU_ATTRIBUTE_INTERNAL;

+ 4 - 1
src/datawizard/reduction.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2014  Université de Bordeaux
+ * Copyright (C) 2010-2014, 2016  Université de Bordeaux
  * Copyright (C) 2011, 2012, 2013  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -112,6 +112,9 @@ void _starpu_data_start_reduction_mode(starpu_data_handle_t handle)
 {
 	STARPU_ASSERT(handle->reduction_refcnt == 0);
 
+	if (!handle->per_worker)
+		_starpu_data_initialize_per_worker(handle);
+
 	unsigned worker;
 
 	unsigned nworkers = starpu_worker_get_count();

+ 2 - 1
src/datawizard/user_interactions.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009-2015  Université de Bordeaux
+ * Copyright (C) 2009-2016  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013, 2015  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -486,6 +486,7 @@ static void _starpu_data_wont_use(void *data)
 		if (local->allocated && local->automatically_allocated)
 			_starpu_memchunk_wont_use(local->mc, node);
 	}
+	if (handle->per_worker)
 	for (worker = 0; worker < nworkers; worker++)
 	{
 		struct _starpu_data_replicate *local = &handle->per_worker[worker];