Selaa lähdekoodia

add function to calculate bandwith and latency between MAIN_RAM and a DISK_RAM

Corentin Salingue 12 vuotta sitten
vanhempi
commit
188e608679

+ 1 - 0
examples/basic_examples/vector_scal.c

@@ -112,6 +112,7 @@ int main(int argc, char **argv)
 	int ret = starpu_init(NULL);
 
 	unsigned dd = starpu_disk_register(&write_on_file, (void *) "/home/corentin/");
+	
 	starpu_disk_free(dd);
 
 	if (ret == -ENODEV) goto enodev;

+ 5 - 1
include/starpu_disk.h

@@ -14,6 +14,9 @@
  * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  */
 
+#ifndef __STARPU_DISK_H__
+#define __STARPU_DISK_H__
+
 typedef void * (*disk_function)(void *, unsigned);
 
 /* list of functions to use on disk */
@@ -27,6 +30,7 @@ struct disk_ops {
 	/* readv, writev, read2d, write2d, etc. */
 	 void *  (*plug)   (void *parameter);
 	 void    (*unplug) (void *base);
+	void    (*bandwith) (void *base, unsigned node);
 };
 
 
@@ -39,4 +43,4 @@ starpu_disk_register(struct disk_ops * func, void *parameter);
 void
 starpu_disk_free(unsigned node);
 
-
+#endif /* __STARPU_DISK_H__ */

+ 141 - 50
src/core/disk.c

@@ -18,6 +18,9 @@
 #include <stdio.h>
 #include <stdbool.h>
 #include <string.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <time.h>
 #include <common/config.h>
 #include <core/workers.h>
 #include <core/debug.h>
@@ -28,6 +31,10 @@
 #include <profiling/profiling.h>
 #include <common/uthash.h>
 
+#define SIZE	(1024*1024)
+#define NITER	128
+
+
 struct disk_register {
 	unsigned node;
 	void * base;
@@ -56,6 +63,8 @@ starpu_disk_register(struct disk_ops * func, void *parameter)
 	/* remember it */
 	add_disk_in_list(memory_node,func,base);
 
+	func->bandwith(base,memory_node);
+	
 	return memory_node;
 }
 
@@ -134,23 +143,26 @@ get_location_with_node(unsigned node)
 	for(i = 0; i <= disk_number; ++i)
 		if (disk_register_list[i]->node == node)
 			return i;
+	return -1;
 }
 
 
-/* use POSIX to write on disk */
+/* use STDIO to write on disk */
 
-struct starpu_posix_obj {
-	FILE * descriptor;
+struct starpu_stdio_obj {
+	int descriptor;
+	FILE * file;
 	char * path;
 	double size;
 };
 
 
 /* allocation memory on disk */
-void * 
-starpu_posix_alloc (void *base, size_t size)
+static void * 
+starpu_stdio_alloc (void *base, size_t size)
 {
-	struct starpu_posix_obj * obj = malloc(sizeof(struct starpu_posix_obj));
+	struct starpu_stdio_obj * obj = malloc(sizeof(struct starpu_stdio_obj));
+	STARPU_ASSERT(obj != NULL);
 	int id = -1;
 
 	/* create template for mkstemp */
@@ -159,21 +171,23 @@ starpu_posix_alloc (void *base, size_t size)
 		sizeBase *= 2;
 
 	char * baseCpy = malloc(sizeBase*sizeof(char));
+	STARPU_ASSERT(baseCpy != NULL);
 	char * tmp = "XXXXXX";
 
 	strcpy(baseCpy, (char *) base);
 	strcat(baseCpy,tmp);
 
 	id = mkstemp(baseCpy);
-	STARPU_ASSERT_MSG(id >= 0, "Posix allocation failed");
+	STARPU_ASSERT_MSG(id >= 0, "Stdio allocation failed");
 
-	FILE * f = fdopen(id, "r+");
-	STARPU_ASSERT_MSG(f != NULL, "Posix allocation failed");
+	FILE * f = fdopen(id, "rb+");
+	STARPU_ASSERT_MSG(f != NULL, "Stdio allocation failed");
 
 	int val = ftruncate(id,size);
-	STARPU_ASSERT_MSG(val >= 0, "Posix allocation failed");
+	STARPU_ASSERT_MSG(val >= 0, "Stdio allocation failed");
 
-	obj->descriptor = f;
+	obj->descriptor = id;
+	obj->file = f;
 	obj->path = baseCpy;
 	obj->size = size;
 
@@ -182,13 +196,14 @@ starpu_posix_alloc (void *base, size_t size)
 
 
 /* free memory on disk */
-void
-starpu_posix_free (void *base, void *obj, size_t size)
+static void
+starpu_stdio_free (void *base, void *obj, size_t size)
 {
-	struct starpu_posix_obj * tmp = (struct starpu_posix_obj *) obj;
+	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
 
 	unlink(tmp->path);
-	fclose(tmp->descriptor);
+	fclose(tmp->file);
+	close(tmp->descriptor);
 
 	free(tmp->path);
 	free(tmp);
@@ -196,25 +211,30 @@ starpu_posix_free (void *base, void *obj, size_t size)
 
 
 /* open an existing memory on disk */
-void * 
-starpu_posix_open (void *base, void *pos, size_t size)
+static void * 
+starpu_unistd_open (void *base, void *pos, size_t size)
 {
-	struct starpu_posix_obj * obj = malloc(sizeof(struct starpu_posix_obj));
-	FILE * id = NULL;
+	struct starpu_stdio_obj * obj = malloc(sizeof(struct starpu_stdio_obj));
+	STARPU_ASSERT(obj != NULL);
 
 	/* create template for mkstemp */
 	unsigned int sizeBase = 16;
 	while(sizeBase < (strlen(base)+strlen(pos)+1))
 		sizeBase *= 2;
-
+	
 	char * baseCpy = malloc(sizeBase*sizeof(char));
+	STARPU_ASSERT(baseCpy != NULL);
 	strcpy(baseCpy,(char *) base);
 	strcat(baseCpy,(char *) pos);
 
-	id = fopen(baseCpy,"r+");
-	STARPU_ASSERT_MSG(id != NULL, "Posix allocation failed");
+	int id = open(baseCpy, O_RDONLY);
+	STARPU_ASSERT_MSG(id >= 0, "Unistd open failed");
+
+	FILE * f = fdopen(id,"rb+");
+	STARPU_ASSERT_MSG(f != NULL, "Unistd open failed");
 
 	obj->descriptor = id;
+	obj->file = f;
 	obj->path = baseCpy;
 	obj->size = size;
 
@@ -224,70 +244,141 @@ starpu_posix_open (void *base, void *pos, size_t size)
 
 
 /* free memory without delete it */
-void 
-starpu_posix_close (void *base, void *obj, size_t size)
+static void 
+starpu_unistd_close (void *base, void *obj, size_t size)
 {
-	struct starpu_posix_obj * tmp = (struct starpu_posix_obj *) obj;
+	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
 
-	fclose(tmp->descriptor);
+	fclose(tmp->file);
+	close(tmp->descriptor);
 	free(tmp->path);
 	free(tmp);	
 }
 
 
 /* read the memory disk */
-ssize_t 
-starpu_posix_read (void *base, void *obj, void *buf, off_t offset, size_t size)
+static ssize_t 
+starpu_stdio_read (void *base, void *obj, void *buf, off_t offset, size_t size)
 {
-	struct starpu_posix_obj * tmp = (struct starpu_posix_obj *) obj;
+	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
 
-	 int res = fseek(tmp->descriptor, offset, SEEK_SET); 
-	STARPU_ASSERT_MSG(res == 0, "Posix read failed");
+	int res = fseek(tmp->file, offset, SEEK_SET); 
+	STARPU_ASSERT_MSG(res == 0, "Stdio read failed");
 
-	ssize_t nb = fread (buf, 1, size, tmp->descriptor);
+	ssize_t nb = fread (buf, 1, size, tmp->file);
 	return nb;
 }
 
 
 /* write on the memory disk */
-ssize_t 
-starpu_posix_write (void *base, void *obj, const void *buf, off_t offset, size_t size)
+static ssize_t 
+starpu_stdio_write (void *base, void *obj, const void *buf, off_t offset, size_t size)
 {
-	struct starpu_posix_obj * tmp = (struct starpu_posix_obj *) obj;
+	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
 
-	 int res = fseek(tmp->descriptor, offset, SEEK_SET); 
-	STARPU_ASSERT_MSG(res == 0, "Posix read failed");
+	int res = fseek(tmp->file, offset, SEEK_SET); 
+	STARPU_ASSERT_MSG(res == 0, "Stdio read failed");
+
+	ssize_t nb = fwrite (buf, 1, size, tmp->file);
 
-	ssize_t nb = fwrite (buf, 1, size, tmp->descriptor);
 	return nb;
 }
 
 
 /* create a new copy of parameter == base */
-void * 
-starpu_posix_plug (void *parameter)
+static void * 
+starpu_stdio_plug (void *parameter)
 {
 	char * tmp = malloc(sizeof(char)*(strlen(parameter)+1));
+	STARPU_ASSERT(tmp != NULL);
 	strcpy(tmp,(char *) parameter);
 	return (void *) tmp;	
 }
 
 
 /* free memory allocated for the base */
-void
-starpu_posix_unplug (void *base)
+static void
+starpu_stdio_unplug (void *base)
 {
 	free(base);
 }
 
 
+static void
+get_stdio_bandwith_between_disk_and_main_ram(void * base, unsigned node)
+{
+
+	unsigned iter;
+	double timing;
+	struct timeval start;
+	struct timeval end;
+	
+	srand (time (NULL)); 
+	int pos = get_location_with_node(node);
+	char * buf = malloc(SIZE*sizeof(char));
+	STARPU_ASSERT(buf != NULL);
+	
+	/* allocate memory */
+	void * mem = disk_register_list[pos]->functions->alloc(base, SIZE);
+	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) mem;
+
+	/* Measure upload bandwidth */
+	gettimeofday(&start, NULL);
+	for (iter = 0; iter < NITER; ++iter)
+	{
+		disk_register_list[pos]->functions->write(base, mem, buf, 0, SIZE);
+		/* clean cache memory */
+		int res = fflush (tmp->file);
+		STARPU_ASSERT_MSG(res == 0, "Bandwith computation failed");
+
+		res = fsync(tmp->descriptor);
+		STARPU_ASSERT_MSG(res == 0, "Bandwith computation failed");
+	}
+	gettimeofday(&end, NULL);
+	timing = (double)((end.tv_sec - start.tv_sec)*1000000 + (end.tv_usec - start.tv_usec));
+
+	printf("\n upload: %f ", timing/NITER/SIZE);
+
+	/* free memory */
+	disk_register_list[pos]->functions->free(base, mem, SIZE);
+	free(buf);
+
+	mem = disk_register_list[pos]->functions->alloc(base, 2*SIZE);
+	tmp = (struct starpu_stdio_obj *) mem;
+	buf = malloc(sizeof(char));
+	STARPU_ASSERT(buf != NULL);
+
+	/* Measure latency */
+	gettimeofday(&start, NULL);
+	for (iter = 0; iter < NITER; ++iter)
+	{
+		disk_register_list[pos]->functions->write(base, mem, buf, rand() % ((2*SIZE)-1) +1 , 1);
+
+		int res = fflush (tmp->file);
+		STARPU_ASSERT_MSG(res == 0, "Bandwith computation failed");
+
+		res = fsync(tmp->descriptor);
+		STARPU_ASSERT_MSG(res == 0, "Bandwith computation failed");
+	}
+	gettimeofday(&end, NULL);
+	timing = (double)((end.tv_sec - start.tv_sec)*1000000 + (end.tv_usec - start.tv_usec));
+
+	printf("\n latency: %f \n\n", timing/NITER);
+
+	disk_register_list[pos]->functions->free(base, mem, SIZE);
+	free(buf);
+}
+
+
+
 struct disk_ops write_on_file = {
-	.alloc = starpu_posix_alloc,
-	.free = starpu_posix_free,
-	.open = starpu_posix_open,
-	.close = starpu_posix_close,
-	.read = starpu_posix_read,
-	.write = starpu_posix_write,
-	.plug = starpu_posix_plug,
-	.unplug = starpu_posix_unplug
+	.alloc = starpu_stdio_alloc,
+	.free = starpu_stdio_free,
+	.open = starpu_unistd_open,
+	.close = starpu_unistd_close,
+	.read = starpu_stdio_read,
+	.write = starpu_stdio_write,
+	.plug = starpu_stdio_plug,
+	.unplug = starpu_stdio_unplug,
+	.bandwith = get_stdio_bandwith_between_disk_and_main_ram
 };

+ 3 - 0
src/core/perfmodel/perfmodel_bus.c

@@ -1837,3 +1837,6 @@ double _starpu_predict_transfer_time(unsigned src_node, unsigned dst_node, size_
 
 	return latency + (size/bandwidth)*2*(topology->ncudagpus+topology->nopenclgpus);
 }
+
+
+/* calculate bandwith and latency */