Explorar o código

Added a new combined_workers test.

Inspired by the bug reported by Usman on combined_workers using OpenMP.
The bug is not fixed yet, even if a hack is available in temp.patch which solves
the problem (and bring new ones ?).
Nicolas Collin %!s(int64=13) %!d(string=hai) anos
pai
achega
e4c7c5898b

+ 14 - 0
tests/main/combined_workers/bfs/Makefile

@@ -0,0 +1,14 @@
+CFLAGS += $(shell pkg-config --cflags starpu-1.0) -g -O0 -Wall -Werror
+LDFLAGS += $(shell pkg-config --libs starpu-1.0) -g -O0 -Wall -Werror
+ 
+all: bfs
+ 
+bfs : bfs.o bfs_omp_func.o
+	g++ bfs.o bfs_omp_func.o $(CFLAGS) $(LDFLAGS) -fopenmp -O3 -o bfs
+bfs.o : bfs.cpp
+	g++ bfs.cpp $(CFLAGS) -fopenmp -O3 -c -o bfs.o
+bfs_omp_func.o : ./bfs_func/bfs_omp_func.cpp
+	g++ ./bfs_func/bfs_omp_func.cpp $(CFLAGS) -fopenmp -O3 -c -o bfs_omp_func.o
+ 
+clean:
+	rm -f bfs *.o *~

+ 249 - 0
tests/main/combined_workers/bfs/bfs.cpp

@@ -0,0 +1,249 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <math.h>
+#include <limits.h>
+#include <starpu.h>
+#include "common.h"
+#include "timer.h"
+
+#define NB_ITERATION 10
+
+
+extern void omp_bfs_func(void *buffers[], void *_args);
+
+void Usage(int argc, char**argv){
+	fprintf(stderr,"Usage: %s <input_file>\n", argv[0]);
+}
+
+void read_file(char *input_f, unsigned int *nb_nodes, unsigned int *nb_edges,
+	       Node **origin_graph_nodes, bool **origin_graph_mask,
+	       bool **origin_updating_graph_mask, bool **origin_graph_visited,
+	       int **origin_graph_edges, int **origin_cost)
+{
+	FILE *fp;
+	int source = 0;
+
+	printf("Reading File\n");
+
+	//Read in Graph from a file
+	fp = fopen(input_f,"r");
+	if(!fp)
+	{
+		printf("Error Reading graph file\n");
+		exit(1);
+	}
+
+	fscanf(fp, "%u", nb_nodes);
+	
+	// allocate host memory
+	*origin_graph_nodes = (Node *) malloc(sizeof(Node) * (*nb_nodes));
+	*origin_graph_mask = (bool *) malloc(sizeof(bool) * (*nb_nodes));
+	*origin_updating_graph_mask = (bool *) malloc(sizeof(bool) * (*nb_nodes));
+	*origin_graph_visited = (bool *) malloc(sizeof(bool) * (*nb_nodes));
+
+	int start, edgeno;   
+	// initalize the memory
+	for( unsigned int i = 0; i < *nb_nodes; i++) 
+	{
+		fscanf(fp,"%d %d",&start,&edgeno);
+		(*origin_graph_nodes)[i].starting = start;
+		(*origin_graph_nodes)[i].no_of_edges = edgeno;
+		(*origin_graph_mask)[i]=false;
+		(*origin_updating_graph_mask)[i]=false;
+		(*origin_graph_visited)[i]=false;
+	}
+
+	//read the source node from the file
+	fscanf(fp, "%d", &source);
+	source=0;
+
+	//set the source node as true in the mask
+	(*origin_graph_mask)[source]=true;
+	(*origin_graph_visited)[source]=true;
+
+	fscanf(fp, "%u", nb_edges);
+
+	int id, cost;
+	*origin_graph_edges = (int*) malloc(sizeof(int) * (*nb_edges));
+	for(unsigned int i=0; i < *nb_edges ; i++)
+	{
+		fscanf(fp,"%d",&id);
+		fscanf(fp,"%d",&cost);
+		(*origin_graph_edges)[i] = id;
+	}
+    
+
+	// allocate mem for the result on host side
+	*origin_cost = (int*) malloc( sizeof(int)* (*nb_nodes));
+	for(unsigned int i = 0; i < (*nb_nodes); i++)
+		(*origin_cost)[i]=-1;
+	(*origin_cost)[source]=0;
+
+	fclose(fp);
+}
+
+//extern void omp_bfs_func(Node* h_graph_nodes, int* h_graph_edges, bool *h_graph_mask, bool *h_updating_graph_mask, bool *h_graph_visited, int* h_cost, int nb_nodes, int nb_edges);
+//extern void cuda_bfs_func(Node* h_graph_nodes, int* h_graph_edges, bool *h_graph_mask, bool *h_updating_graph_mask, bool *h_graph_visited, int* h_cost, int nb_nodes, int nb_edges);
+////////////////////////////////////////////////////////////////////////////////
+// Main Program
+////////////////////////////////////////////////////////////////////////////////
+int main( int argc, char** argv) 
+{
+	int ret;
+	char *input_f;
+	Timer timer;
+
+	unsigned int nb_nodes = 0, nb_edges = 0;
+
+	Node *origin_graph_nodes, *graph_nodes;
+	bool *origin_graph_mask, *graph_mask;
+	bool *origin_updating_graph_mask, *updating_graph_mask;
+	bool *origin_graph_visited, *graph_visited;
+	int *origin_graph_edges, *graph_edges;
+	int *origin_cost, *cost;
+
+	static struct starpu_perfmodel bfs_model;
+	static struct starpu_codelet bfs_cl;
+
+	bfs_model.type = STARPU_HISTORY_BASED;
+	bfs_model.symbol = "omp_bfs";
+
+	bfs_cl.modes[0] = STARPU_R;
+	bfs_cl.modes[1] = STARPU_R;
+	bfs_cl.modes[2] = STARPU_RW;
+	bfs_cl.modes[3] = STARPU_RW;
+	bfs_cl.modes[4] = STARPU_RW;
+	bfs_cl.modes[5] = STARPU_RW;
+	bfs_cl.where = STARPU_CPU;
+	bfs_cl.type = STARPU_FORKJOIN;
+	bfs_cl.max_parallelism = INT_MAX;
+	bfs_cl.cpu_funcs[0] = omp_bfs_func;
+	bfs_cl.cpu_funcs[1] = NULL;
+	bfs_cl.nbuffers = 6;
+	bfs_cl.model = &bfs_model;
+
+	starpu_data_handle_t graph_nodes_handle;
+	starpu_data_handle_t graph_edges_handle;
+	starpu_data_handle_t graph_mask_handle;
+	starpu_data_handle_t updating_graph_mask_handle;
+	starpu_data_handle_t graph_visited_handle;
+	starpu_data_handle_t cost_handle;
+
+	if(argc != 2){
+		Usage(argc, argv);
+		exit(1);
+	}
+    
+	input_f = argv[1];
+	read_file(input_f, &nb_nodes, &nb_edges, &origin_graph_nodes,
+		  &origin_graph_mask, &origin_updating_graph_mask,
+		  &origin_graph_visited, &origin_graph_edges, &origin_cost);
+
+	graph_nodes = (Node *) malloc(sizeof(Node)*nb_nodes);
+	graph_mask = (bool *) malloc(sizeof(bool)*nb_nodes);
+	updating_graph_mask = (bool *) malloc(sizeof(bool)*nb_nodes);
+	graph_visited = (bool *) malloc(sizeof(bool)*nb_nodes);
+	graph_edges = (int*) malloc(sizeof(int)*nb_edges);
+	cost = (int*) malloc( sizeof(int)*nb_nodes);
+
+	memcpy(graph_nodes, origin_graph_nodes, nb_nodes*sizeof(Node));
+	memcpy(graph_edges, origin_graph_edges, nb_edges*sizeof(int));
+
+	ret = starpu_init(NULL);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+	
+	starpu_vector_data_register(&graph_nodes_handle, 0,
+				    (uintptr_t) graph_nodes, nb_nodes,
+				    sizeof(graph_nodes[0] ));
+	starpu_vector_data_register(&graph_edges_handle, 0,
+				    (uintptr_t)graph_edges, nb_edges,
+				    sizeof(graph_edges[0]));
+	starpu_vector_data_register(&graph_mask_handle, 0,
+				    (uintptr_t)graph_mask, nb_nodes,
+				    sizeof(graph_mask[0] ));
+	starpu_vector_data_register(&updating_graph_mask_handle, 0,
+				    (uintptr_t)updating_graph_mask,
+				    nb_nodes,
+				    sizeof(updating_graph_mask[0]));
+	starpu_vector_data_register(&graph_visited_handle, 0,
+				    (uintptr_t)graph_visited, nb_nodes,
+				    sizeof(graph_visited[0]));
+	starpu_vector_data_register(&cost_handle, 0, (uintptr_t)cost,
+				    nb_nodes, sizeof(cost[0]));
+	
+	for(int it=0; it < NB_ITERATION; it++)
+	{
+		starpu_data_acquire(graph_mask_handle, STARPU_W);
+		starpu_data_acquire(updating_graph_mask_handle, STARPU_W);
+		starpu_data_acquire(graph_visited_handle, STARPU_W);
+		starpu_data_acquire(cost_handle, STARPU_W);
+
+		memcpy(graph_mask, origin_graph_mask, nb_nodes * sizeof(bool));
+		memcpy(updating_graph_mask, origin_updating_graph_mask, nb_nodes * sizeof(bool));
+		memcpy(graph_visited, origin_graph_visited, nb_nodes * sizeof(bool));
+		memcpy(cost, origin_cost, nb_nodes * sizeof(int));
+
+		starpu_data_release(graph_mask_handle);
+		starpu_data_release(updating_graph_mask_handle);
+		starpu_data_release(graph_visited_handle);
+		starpu_data_release(cost_handle);
+
+		struct starpu_task *task = starpu_task_create();
+
+		task->cl = &bfs_cl;
+
+		task->handles[0] = graph_nodes_handle;
+		task->handles[1] = graph_edges_handle;
+		task->handles[2] = graph_mask_handle;
+		task->handles[3] = updating_graph_mask_handle;
+		task->handles[4] = graph_visited_handle;
+		task->handles[5] = cost_handle;
+
+		task->synchronous = 1;
+
+		printf("Start traversing the tree\n");
+
+		timer.start();
+
+		ret = starpu_task_submit(task);
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+		timer.stop();
+	}
+
+	starpu_data_unregister(graph_nodes_handle);
+	starpu_data_unregister(graph_edges_handle);
+	starpu_data_unregister(graph_mask_handle);
+	starpu_data_unregister(updating_graph_mask_handle);
+	starpu_data_unregister(graph_visited_handle);
+	starpu_data_unregister(cost_handle);
+
+	starpu_shutdown();
+
+	printf("File: %s, Avergae Time: %f, Total time: %f\n", input_f,
+	       timer.getAverageTime(), timer.getTotalTime());
+
+	//Store the result into a file
+	FILE *fpo = fopen("result.txt","w");
+	for(unsigned int i=0;i<nb_nodes;i++)
+		fprintf(fpo,"%d) cost:%d\n", i, cost[i]);
+	fclose(fpo);
+	printf("Result stored in result.txt\n");
+
+
+	// cleanup memory
+	free(graph_nodes);
+	free(graph_edges);
+	free(graph_mask);
+	free(updating_graph_mask);
+	free(graph_visited);
+	free(cost);
+	free(origin_graph_nodes);
+	free(origin_graph_edges);
+	free(origin_graph_mask);
+	free(origin_updating_graph_mask);
+	free(origin_graph_visited);
+	free(origin_cost);
+
+}

+ 63 - 0
tests/main/combined_workers/bfs/bfs_func/bfs_omp_func.cpp

@@ -0,0 +1,63 @@
+#include "../common.h"
+#include <starpu.h>
+#include <omp.h>
+
+#include <stdio.h>
+
+void omp_bfs_func(void *buffers[], void *_args)
+{
+	Node* graph_nodes = (Node *) STARPU_VECTOR_GET_PTR(buffers[0]);
+	int no_of_nodes = STARPU_VECTOR_GET_NX(buffers[0]);
+	int* graph_edges = (int *) STARPU_VECTOR_GET_PTR(buffers[1]);
+	bool *graph_mask = (bool *) STARPU_VECTOR_GET_PTR(buffers[2]);
+	bool *updating_graph_mask = (bool *) STARPU_VECTOR_GET_PTR(buffers[3]);
+	bool *graph_visited = (bool *) STARPU_VECTOR_GET_PTR(buffers[4]);
+	int* cost = (int *) STARPU_VECTOR_GET_PTR(buffers[5]);
+	int k=0;
+    
+	bool stop;
+	do
+	{
+		//if no thread changes this value then the loop stops
+		stop=false;
+
+#ifdef OPEN
+		#pragma omp parallel for num_threads(starpu_combined_worker_get_size())
+#endif 
+		for(int tid = 0; tid < no_of_nodes; tid++ )
+		{
+			if (graph_mask[tid] == true)
+			{ 
+				graph_mask[tid]=false;
+				for(int i=graph_nodes[tid].starting; i<(graph_nodes[tid].no_of_edges + graph_nodes[tid].starting); i++)
+				{
+					int id = graph_edges[i];
+					if(!graph_visited[id])
+						{
+						cost[id]=cost[tid]+1;
+						updating_graph_mask[id]=true;
+						}
+				}
+			}
+		}
+
+  		for(int tid=0; tid< no_of_nodes ; tid++ )
+		{
+			if (updating_graph_mask[tid] == true){
+			graph_mask[tid]=true;
+			graph_visited[tid]=true;
+			stop=true;
+			updating_graph_mask[tid]=false;
+			}
+		}
+		k++;
+	}
+	while(stop);
+	
+	printf("Kernel Executed %d times, threads: %d\n",k, starpu_combined_worker_get_size());
+	//printf("graph_edges = %d, %d, %d\n",graph_edges[0], graph_edges[1], graph_edges[2]);
+	//printf("graph_mask = %d, %d, %d\n",graph_mask[0], graph_mask[1], graph_mask[2]);
+	//printf("updating_graph_mask = %d, %d, %d\n",updating_graph_mask[0], updating_graph_mask[1], updating_graph_mask[2]);
+	//printf("graph_visited = %d, %d, %d\n",graph_visited[0], graph_visited[1], graph_visited[2]);
+	//printf("Cost = %d, %d, %d\n",cost[0], cost[1], cost[2]);
+}

+ 17 - 0
tests/main/combined_workers/bfs/common.h

@@ -0,0 +1,17 @@
+#ifndef COMMON_BFS_H
+#define COMMON_BFS_H
+
+
+#define MAX_THREADS_PER_BLOCK 512
+
+#define OPEN
+
+
+//Structure to hold a node information
+struct Node
+{
+	int starting;
+	int no_of_edges;
+};
+
+#endif

A diferenza do arquivo foi suprimida porque é demasiado grande
+ 458757 - 0
tests/main/combined_workers/bfs/data/graph65536.txt


+ 13 - 0
tests/main/combined_workers/bfs/run.sh

@@ -0,0 +1,13 @@
+#export STARPU_GENERATE_TRACE=1
+#export GOMP_CPU_AFFINITY="0 6 1 7 2 8 3 9 4 10 5 11"
+#export OMP_WAIT_POLICY=PASSIVE
+export STARPU_SCHED=pheft
+export STARPU_NCPUS=12
+export STARPU_SINGLE_COMBINED_WORKER=1
+export STARPU_MIN_WORKERSIZE=12
+export STARPU_MAX_WORKERSIZE=12
+export STARPU_NCUDA=0
+export STARPU_NOPENCL=0
+export STARPU_WORKER_STATS=1
+export STARPU_CALIBRATE=1
+./bfs data/graph65536.txt

+ 20 - 0
tests/main/combined_workers/bfs/temp.patch

@@ -0,0 +1,20 @@
+Index: src/core/topology.c
+===================================================================
+--- src/core/topology.c	(revision 6542)
++++ src/core/topology.c	(working copy)
+@@ -682,10 +682,13 @@
+ 	support = hwloc_topology_get_support(config->topology.hwtopology);
+ 	if (support->cpubind->set_thisthread_cpubind)
+ 	{
+-		hwloc_cpuset_t set = combined_worker->hwloc_cpu_set;
++		hwloc_cpuset_t set = hwloc_bitmap_alloc();
++		hwloc_bitmap_zero(set);
++		hwloc_bitmap_set_range(set, 0, 12);
++		//hwloc_cpuset_t set = combined_worker->hwloc_cpu_set;
+ 		int ret;
+ 
+-		ret = hwloc_set_cpubind(config->topology.hwtopology, set, HWLOC_CPUBIND_THREAD);
++		ret = hwloc_set_cpubind(config->topology.hwtopology, set, HWLOC_CPUBIND_PROCESS | HWLOC_CPUBIND_NOMEMBIND);
+ 		if (ret)
+ 		{
+ 			perror("binding thread");

+ 216 - 0
tests/main/combined_workers/bfs/timer.h

@@ -0,0 +1,216 @@
+/*! \file timer_linux.h
+ *  \brief Contains timer class that can be used by Linux systems.
+ */
+
+#ifndef TIMER_LINUX_H
+#define TIMER_LINUX_H
+
+#include <sys/time.h>
+#include <iostream>
+#include <vector>
+
+
+class Timer
+{
+
+private:
+    timeval timerStart;
+    timeval timerEnd;
+
+
+    
+    std::vector<double> multi_time; // used for estimating multi backends.
+    std::vector<double> time;
+    bool record_multi;
+    
+    
+    void addMultiMaxTime() // used to estimate when using multi-Backend
+    {
+	double max = 0.0f; 
+	//std::cout<<"AddMultiMaxTime call before"<<time.size()<<"\n";
+	if(multi_time.empty())
+	  return;
+	for(std::vector<double>::iterator it = multi_time.begin(); it != multi_time.end(); ++it)
+        {
+	  if (max < *it) 
+	      max = *it;
+	}
+        time.push_back( max );
+	//std::cout<<"MAXXX:"<<max<<" ";
+	
+	multi_time.clear();  // clear it in both cases.
+	//std::cout<<"AddMultiMaxTime call after reset"<<time.size()<<"\n";
+    }
+    
+    void addMultiMinTime() // used to estimate when using multi-Backend
+    {
+	double min = 0.0f; 
+	if(multi_time.empty())
+	  return;
+	for(std::vector<double>::iterator it = multi_time.begin(); it != multi_time.end(); ++it)
+        {
+	  if (min > *it) 
+	      min = *it;
+	}
+        time.push_back( min );
+	
+	multi_time.clear();  // clear it in both cases.
+    }
+    
+public:
+
+      
+     void start_record_multi()
+     {
+       record_multi=true;
+       multi_time.clear();  // clear it in both cases.
+     }
+     
+     void stop_record_multi()
+     {
+       addMultiMaxTime();
+       record_multi=false;
+     }
+  
+    Timer() {record_multi=false;}
+
+    /*!
+     *  Starts the timimg.
+     */
+    void start()
+    {
+        gettimeofday(&timerStart, NULL);
+	//std::cout<<"start_\n";
+    }
+
+    /*!
+     *  Stops the timimg and stores time in a vector.
+     */
+    void stop()
+    {
+        gettimeofday(&timerEnd, NULL);
+	if(record_multi)
+	  multi_time.push_back( (timerEnd.tv_sec - timerStart.tv_sec + (timerEnd.tv_usec - timerStart.tv_usec) / 1000000.0) * 1000 );
+	else 
+	  time.push_back( (timerEnd.tv_sec - timerStart.tv_sec + (timerEnd.tv_usec - timerStart.tv_usec) / 1000000.0) * 1000 );
+	//std::cout<<"stop_\n";
+    }
+
+    /*!
+     *  Clears all timings taken.
+     */
+    void reset()
+    {
+      if(!record_multi)
+      {
+//	std::cout<<"Time reset:\n";
+        time.clear();
+      }
+      
+      multi_time.clear();  // clear it in both cases.
+      
+    }
+    
+    
+    
+
+    /*!
+     *  \param run The run to get timing of.
+     *
+     *  \return Time for a certain run.
+     */
+    double getTime(int run = 0)
+    {
+        return time.at(run);
+    }
+
+    /*!
+     *  \return Total time of all stored runs.
+     */
+    double getTotalTime()
+    {
+        double totalTime = 0.0f;
+
+        for(std::vector<double>::iterator it = time.begin(); it != time.end(); ++it)
+        {
+            totalTime += *it;
+        }
+
+        return totalTime;
+    }
+
+    /*!
+     *  \return Average time of all stored runs.
+     */
+    double getAverageTime()
+    {
+        double totalTime = 0.0f;
+
+        for(std::vector<double>::iterator it = time.begin(); it != time.end(); ++it)
+        {
+            totalTime += *it;
+        }
+
+        return (double)(totalTime/time.size());
+    }
+    
+    double getMaxTime()
+    {
+        double max = 0.0f; 
+	for(std::vector<double>::iterator it = time.begin(); it != time.end(); ++it)
+        {
+	  if (max < *it) 
+	      max = *it;
+	}
+	return max;
+    }
+    
+    double getMinTime()
+    {
+        double min = 0.0f; 
+	for(std::vector<double>::iterator it = time.begin(); it != time.end(); ++it)
+        {
+	  if (min > *it) 
+	      min = *it;
+	}
+	
+	return min;
+    }
+
+    /*!
+     *  \return The resolution of the timer in micro seconds.
+     */
+    double getResolutionUs()
+    {
+        double result = 0.0f;
+        timeval tStart;
+        timeval tEnd;
+        gettimeofday(&tStart, NULL);
+        gettimeofday(&tEnd, NULL);
+        int delay = 0;
+
+        do
+        {
+            delay++;
+            gettimeofday(&tStart, NULL);
+            for(int i = 0; i < delay; ++i) ;
+            gettimeofday(&tEnd, NULL);
+
+            result = ((((double)tEnd.tv_sec)*1000000.0) + ((double)tEnd.tv_usec)) - ((((double)tStart.tv_sec)*1000000.0) + ((double)tStart.tv_usec));
+
+        } while(result == 0);
+
+        return result;
+    }
+
+    /*!
+     *  \return Number of runs stored in timer.
+     */
+    int getNumTimings()
+    {
+        return time.size();
+    }
+};
+
+
+#endif