stavroula 5 anni fa
commit
ddc705c9b9

+ 20 - 0
StarPU_code/Makefile

@@ -0,0 +1,20 @@
+CFLAGS += $(shell pkg-config --cflags starpu-1.2)
+LDFLAGS += $(shell pkg-config --libs starpu-1.2)
+CC = gcc
+NVCC = nvcc
+
+
+main: main.o cpu_output.o output_thread_aggregation.o 
+	$(NVCC) $(CFLAGS) -o main main.o cpu_output.o output_thread_aggregation.o $(LDFLAGS) 
+
+main.o: main.c
+	$(CC) $(CFLAGS) -c main.c $(LDFLAGS)
+
+cpu_output.o: cpu_output.c
+	$(CC) $(CFLAGS) -c cpu_output.c $(LDFLAGS)
+
+output_thread_aggregation.o: output_thread_aggregation.cu
+	$(NVCC) $(CFLAGS) -c output_thread_aggregation.cu $(LDFLAGS)
+
+clean:
+	rm -f main *.o

+ 18 - 0
StarPU_code/cpu_output.c

@@ -0,0 +1,18 @@
+#include <starpu.h>
+
+void cpu_output(void *buffers[], void *cl_arg)
+{
+	printf("cpu function\n");
+	
+	uint64_t *cpu_sum = cl_arg;
+	
+	/* length of the vector */
+	int n = STARPU_VECTOR_GET_NX(buffers[0]);
+	uint32_t *window = (uint32_t *)STARPU_VECTOR_GET_PTR(buffers[0]);
+	
+	for (int i = 0; i < n; i++)
+	{
+		*cpu_sum += window[i];
+	}
+	
+}

+ 127 - 0
StarPU_code/main.c

@@ -0,0 +1,127 @@
+#include<starpu.h>
+#include <stdio.h>
+#include<stdlib.h>
+#include <sys/time.h>
+// platform independent data types:
+#include <stdint.h>     
+
+// Number of iterations
+#define NUM_ITERATIONS		10
+// The window height is equal to the number of streams
+#define NUM_INPUT_STREAMS	WINDOW_HEIGHT
+#define WINDOW_HEIGHT		4
+// The window width is equal to number of tuples required to fill in a window
+#define WINDOW_WIDTH		100
+
+// The total number of elements is equal to the window_height times window_width
+#define elements			WINDOW_HEIGHT*WINDOW_WIDTH
+
+// measure time: 
+struct timeval start[NUM_ITERATIONS], end[NUM_ITERATIONS];
+
+extern void cpu_output(void *buffers[], void *_args);
+extern void output_thread_aggregation(void *buffers[], void *_args);
+
+static struct starpu_perfmodel perf_model = {
+	.type = STARPU_HISTORY_BASED,
+	.symbol = "main",
+};
+
+static struct starpu_codelet cl =
+{
+	/*CPU implementation of the codelet */
+	.cpu_funcs = { cpu_output },
+	.cpu_funcs_name = { "cpu_output" },
+	#ifdef STARPU_USE_CUDA
+	/* CUDA implementation of the codelet */
+	.cuda_funcs = { output_thread_aggregation },
+	#endif
+	.nbuffers = 1,
+	.modes = { STARPU_RW },
+	.model = &perf_model
+};
+
+
+int main(int argc, char **argv)
+{
+
+	uint32_t iterations_id;
+	
+	// create input streams:
+	for (iterations_id=0; iterations_id<NUM_ITERATIONS; iterations_id++) {
+
+		// dynamic allocation of the memory needed for all the elements
+		uint32_t *window;
+		window = (uint32_t*)calloc(elements, sizeof(uint32_t));
+		
+		// check if there's enough space for the allocation
+		if(!window){
+			printf("Allocation error for window - aborting.\n");
+			exit(1);
+		}
+		
+		uint64_t ag_val = 0;	// test variable to check if the cuda sum is equal to the cpu sum
+		
+		// initialization - fill in the window with random numbers:
+		for (int i = 0; i < elements; i++) {
+			window[i] = (rand()%1000);
+			ag_val += window[i];
+		}
+		printf("TEST %lu\n", ag_val);
+		
+		gettimeofday(&start[iterations_id], NULL); // start time for each iteration only for StarPU initialization and time to calculate aggregated value
+		
+		/* initialize StarPU */
+		starpu_init(NULL);
+		
+		/* initialize performance model */
+		starpu_perfmodel_init(&perf_model);
+		
+		/* Tell StaPU to associate the "window" vector with the "vector_handle" */
+		starpu_data_handle_t vector_handle;
+		starpu_vector_data_register(&vector_handle, STARPU_MAIN_RAM, (uintptr_t)window, elements, sizeof(window[0]));
+		
+		/* create a synchronous task: any call to starpu_task_submit will block
+		 * until it is terminated */
+		struct starpu_task *task = starpu_task_create();
+		task->synchronous = 1;
+		task->cl = &cl; /* Pointer to the codelet defined above */
+		
+		/* the codelet manipulates one buffer in RW mode */
+		task->handles[0] = vector_handle;
+		
+		uint64_t aggregated_value = 0;
+		
+		/* an argument is passed to the codelet, beware that this is a
+		 * READ-ONLY buffer and that the codelet may be given a pointer to a
+		 * COPY of the argument */
+		task->cl_arg = &aggregated_value;
+		task->cl_arg_size = sizeof(aggregated_value);
+		
+		/* submit the task to StarPU */
+		starpu_task_submit(task);
+		
+		/* StarPU does not need to manipulate the array anymore so we can stop monitoring it */
+		starpu_data_unregister(vector_handle);
+		
+		/* terminate StarPU */
+		starpu_shutdown();
+		gettimeofday(&end[iterations_id], NULL);	// stop time for each iteration after aggregation value has been calculated and StarPU has been shutted down
+		
+		printf("iter: %d - aggregated value: %lu\n", iterations_id, aggregated_value);
+		
+		//free the memory allocated on the CPU
+		free(window);
+	}
+
+	uint64_t time = 0;		// variable that holds the time
+	
+	// calculate the time required for the calculation of the aggregated value for all iterations
+	for (iterations_id=0; iterations_id<NUM_ITERATIONS; iterations_id++) {
+		time += ((end[iterations_id].tv_sec * 1000000 + end[iterations_id].tv_usec) - (start[iterations_id].tv_sec * 1000000 + start[iterations_id].tv_usec));
+	}
+
+	printf("usec: %ld\n", time);
+
+    return 0;
+}

+ 102 - 0
StarPU_code/output_thread_aggregation.cu

@@ -0,0 +1,102 @@
+#include<stdio.h>
+#include <starpu.h>
+#include <math.h>
+
+__global__ void output_thread_aggregation(uint32_t *window, uint32_t *dev_data, int n)
+{
+	uint tid = threadIdx.x;
+	uint index = blockIdx.x*blockDim.x + threadIdx.x;
+	
+	// convert global data pointer to the local pointer of the block
+	uint32_t *idata = window + blockIdx.x * blockDim.x;
+	
+	if (index >= n) return;
+	
+	// reduction algorithm	
+	for (int stride = blockDim.x/2; stride > 0; stride>>=1){
+		if (tid < stride){
+			idata[tid] += idata[tid+stride];
+		}
+		__syncthreads();
+	}
+	
+	// write result for this block to global memory
+	if(tid == 0){
+		dev_data[blockIdx.x] = idata[0];
+	}
+	
+}
+
+
+extern "C" void output_thread_aggregation(void *buffers[], void *_args)
+{
+	printf("cuda function\n");
+	
+	uint64_t *aggregated_value = (uint64_t *)_args;
+	
+	/* length of the vector */
+	int n = STARPU_VECTOR_GET_NX(buffers[0]);
+	uint32_t *window = (uint32_t *)STARPU_VECTOR_GET_PTR(buffers[0]);
+	
+	/* define the number of threads per block accordingly to the vector's size */
+	int n_threads;
+	if (sqrt(n) <= 32)
+		{n_threads = 32;}
+	else if (sqrt(n) <= 64)
+		{n_threads = 64;}
+	else if (sqrt(n) <= 128)
+		{n_threads = 128;}
+	else if (sqrt(n) <= 256)
+		{n_threads = 256;}
+	else if (sqrt(n) <= 512)
+		{n_threads = 512;}
+	else
+		{n_threads = 1024;}
+
+	printf("n_threads = %lu\n",n_threads);
+	
+	// define number of blocks and number of threads per block (kernel parameters)
+	dim3 threads_per_block (n_threads);
+	dim3 blocks ((n+threads_per_block.x-1)/threads_per_block.x);
+	
+	// dynamic allocation of the reduced data matrix
+	uint32_t *h_data = (uint32_t *)malloc(blocks.x*sizeof(uint32_t));
+	
+	if(!h_data){
+		printf("Allocation error for h_data - aborting.\n");
+		exit(1);
+	}
+	
+	// GPU memory pointers
+	uint32_t *dev_window;
+	uint32_t *dev_data;
+	
+	// allocate the memory on the GPU
+	cudaMalloc((void**)&dev_window, n*sizeof(uint32_t));
+	cudaMalloc((void**)&dev_data, blocks.x*sizeof(uint32_t));
+	
+	// copy the array 'window' to the GPU
+	cudaMemcpyAsync(dev_window, window, n*sizeof(uint32_t), cudaMemcpyHostToDevice);
+	
+	// launch kernel
+	output_thread_aggregation<<<blocks,threads_per_block>>>(dev_window, dev_data, n);
+	
+	cudaStreamSynchronize(starpu_cuda_get_local_stream());
+	
+	// copy back the result to the CPU
+	cudaMemcpyAsync(h_data, dev_data, blocks.x*sizeof(uint32_t), cudaMemcpyDeviceToHost);
+	
+	uint64_t gpu_sum = 0;
+	
+	// compute the total sum from gpu
+	for(int i=0; i<blocks.x; i++){
+		gpu_sum += h_data[i];
+	}
+	
+	// copy the sum to the main program
+	*aggregated_value = gpu_sum;
+	
+	//free the memory allocated on the GPU
+	cudaFree(dev_window);
+	cudaFree(dev_data);
+}

+ 8 - 0
cuda_code/Makefile

@@ -0,0 +1,8 @@
+reduction: reduction.cu
+	nvcc -o reduction reduction.cu
+
+run:
+	./reduction
+	
+clean:
+	rm -f reduction

+ 128 - 0
cuda_code/reduction.cu

@@ -0,0 +1,128 @@
+#include <stdio.h>
+#include<stdlib.h>
+#include <sys/time.h>
+// platform independent data types:
+#include <stdint.h>
+
+// Number of iterations
+#define NUM_ITERATIONS          10
+// The window height is equal to the number of streams
+#define NUM_INPUT_STREAMS       WINDOW_HEIGHT
+#define WINDOW_HEIGHT           4
+// The window width is equal to number of tuples required to fill in a window
+#define WINDOW_WIDTH            100
+
+// The total number of elements is equal to the window_height times window_width
+#define elements				WINDOW_HEIGHT*WINDOW_WIDTH
+
+// measure time: 
+struct timeval start, end;
+
+
+// output streams (aggregator):
+__global__ void output_thread_aggregation(uint32_t *window, uint32_t *dev_data, int n) {
+	
+	uint tid = threadIdx.x;
+	uint index = blockIdx.x*blockDim.x + threadIdx.x;
+	
+	uint32_t *idata = window + blockIdx.x * blockDim.x;
+	
+	if (index >= n) return;
+	
+	for (int stride = blockDim.x/2; stride > 0; stride>>=1){
+		if (tid < stride){
+			idata[tid] += idata[tid+stride];
+		}
+		__syncthreads();
+	}
+	
+	if(tid == 0){
+		dev_data[blockIdx.x] = idata[0];
+	}
+
+}
+
+int main(void) {
+
+	uint32_t iterations_id;
+	uint64_t aggregated_value;
+	
+	int n = elements;
+	
+	//define number of blocks and number of threads per block (kernel parameters)
+	dim3 threads_per_block (1);
+	dim3 blocks ((elements+threads_per_block.x-1)/threads_per_block.x);
+	
+	gettimeofday(&start, NULL);
+
+	// create input streams:
+	for (iterations_id=0; iterations_id<NUM_ITERATIONS; iterations_id++) {
+
+		// dynamic allocation of the memory needed for all the elements
+		uint32_t *window;
+		window = (uint32_t*)calloc(elements, sizeof(uint32_t));
+		
+		// check if there's enough space for the allocation
+		if(!window){
+			printf("Allocation error for window - aborting.\n");
+			exit(1);
+		}
+		
+		// dynamic allocation of the reduced data matrix
+		uint32_t *h_data = (uint32_t *)malloc(blocks.x*sizeof(uint32_t));
+		
+		if(!h_data){
+			printf("Allocation error for h_data - aborting.\n");
+			exit(1);
+		}
+		
+		uint64_t ag_val = 0;
+		
+		// initialization - fill in the window with random numbers:
+		for (int i = 0; i < elements; i++) {
+			window[i] = (rand()%1000);
+			ag_val += window[i];
+		}
+		printf("TEST %d\n", ag_val);
+		
+		//GPU memory pointers
+		uint32_t *dev_window;
+		uint32_t *dev_data;
+
+		//allocate the memory on the GPU
+		cudaMalloc((void**)&dev_window, elements*sizeof(uint32_t));
+		cudaMalloc((void**)&dev_data, blocks.x*sizeof(uint32_t));
+
+		//copy the array 'window' to the GPU
+		cudaMemcpy(dev_window, window, elements*sizeof(uint32_t), cudaMemcpyHostToDevice);
+
+		//launch kernel
+		output_thread_aggregation<<<blocks,threads_per_block>>>(dev_window, dev_data, n);
+		cudaDeviceSynchronize();
+
+		//copy back the result to the CPU
+		cudaMemcpy(h_data, dev_data, blocks.x*sizeof(uint32_t), cudaMemcpyDeviceToHost);
+		
+		aggregated_value = 0;
+		
+		for(int i=0; i<blocks.x; i++){
+			aggregated_value += h_data[i];
+		}
+		
+		cudaDeviceSynchronize();
+
+		printf("iter: %d - aggregated value: %lu\n", iterations_id, aggregated_value);
+		
+		//free the memory allocated on the GPU
+		cudaFree(dev_window);
+		cudaFree(dev_data);
+
+	}
+
+	gettimeofday(&end, NULL);
+	printf("usec: %ld\n", ((end.tv_sec * 1000000 + end.tv_usec) - (start.tv_sec * 1000000 + start.tv_usec)));
+
+	return 0;
+	
+}
+

+ 8 - 0
original_code/Makefile

@@ -0,0 +1,8 @@
+aggregation: aggregation.c 
+	gcc -o aggregation aggregation.c
+
+run:
+	./aggregation	
+
+clean:
+	rm -rf aggregation

+ 82 - 0
original_code/aggregation.c

@@ -0,0 +1,82 @@
+/****************************************************************************************************
+* count-based window aggregation
+*
+* A number of input streams fetch tuples.
+* Each time the window is filled up 
+* The tuples are stored in a MxN matrix
+* M is equal to the number of input streams
+* N is the window size
+******************************************************************************************************/
+#include <stdio.h>
+#include<stdlib.h>
+#include <sys/time.h>
+// platform independent data types:
+#include <stdint.h>     
+
+// Number of iterations
+#define NUM_ITERATIONS		10
+// The window height is equal to the number of streams
+#define WINDOW_HEIGHT		4
+// The window width is equal to number of tuples required to fill in a window
+#define WINDOW_WIDTH		100
+
+// The total number of elements is equal to the window_height times window_width
+#define elements			WINDOW_HEIGHT*WINDOW_WIDTH
+
+// measure time: 
+struct timeval start, end;
+
+
+// output streams (aggregator):
+int output_thread_aggregation(uint32_t *window, uint32_t iterations_id) {
+	
+	uint64_t *aggregated_value;
+	aggregated_value= (uint64_t*)malloc(sizeof(uint64_t));
+	
+	// check if there's enough space for the allocation
+	if(!aggregated_value){
+		printf("Allocation error for aggregated_value - aborting.\n");
+		exit(1);
+	}
+
+	*aggregated_value = 0;
+
+	// aggregate the tuples of each stream:
+	for (int i = 0; i < elements; i++) {
+		*aggregated_value += window[i];
+	}
+	printf("iter: %d - aggregated value: %lu\n", iterations_id, *aggregated_value);
+
+}
+
+int main(void) {
+	
+	// dynamic allocation of the memory needed for all the elements
+	uint32_t *window;
+	window = (uint32_t*)calloc(elements, sizeof(uint32_t));
+	
+	// check if there's enough space for the allocation
+	if(!window){
+		printf("Allocation error for window - aborting.\n");
+		exit(1);
+	}
+	
+	uint32_t iterations_id;
+	
+	gettimeofday(&start, NULL);
+
+	// create input streams:
+	for (iterations_id=0; iterations_id<NUM_ITERATIONS; iterations_id++) {
+		// fill in the window with random numbers:
+		for (int i = 0; i < elements; i++) {
+			window[i] = (rand()%1000);
+		}
+		output_thread_aggregation(window, iterations_id);
+	}
+	
+	free(window);
+	
+	gettimeofday(&end, NULL);
+	printf("usec: %ld\n", ((end.tv_sec * 1000000 + end.tv_usec) - (start.tv_sec * 1000000 + start.tv_usec)));
+
+}