stavroula пре 6 година
родитељ
комит
fe66c12058

+ 20 - 20
StarPU_code/Makefile

@@ -1,20 +1,20 @@
-CFLAGS += $(shell pkg-config --cflags starpu-1.2)
-LDFLAGS += $(shell pkg-config --libs starpu-1.2)
-CC = gcc
-NVCC = nvcc
-
-
-main: main.o cpu_output.o output_thread_aggregation.o 
-	$(NVCC) $(CFLAGS) -o main main.o cpu_output.o output_thread_aggregation.o $(LDFLAGS) 
-
-main.o: main.c
-	$(CC) $(CFLAGS) -c main.c $(LDFLAGS)
-
-cpu_output.o: cpu_output.c
-	$(CC) $(CFLAGS) -c cpu_output.c $(LDFLAGS)
-
-output_thread_aggregation.o: output_thread_aggregation.cu
-	$(NVCC) $(CFLAGS) -c output_thread_aggregation.cu $(LDFLAGS)
-
-clean:
-	rm -f main *.o
+CFLAGS += $(shell pkg-config --cflags starpu-1.2)
+LDFLAGS += $(shell pkg-config --libs starpu-1.2)
+CC = gcc
+NVCC = nvcc
+
+
+main: main.o cpu_output.o output_thread_aggregation.o 
+	$(NVCC) $(CFLAGS) -o main main.o cpu_output.o output_thread_aggregation.o $(LDFLAGS) 
+
+main.o: main.c
+	$(CC) $(CFLAGS) -c main.c $(LDFLAGS)
+
+cpu_output.o: cpu_output.c
+	$(CC) $(CFLAGS) -c cpu_output.c $(LDFLAGS)
+
+output_thread_aggregation.o: output_thread_aggregation.cu
+	$(NVCC) $(CFLAGS) -c output_thread_aggregation.cu $(LDFLAGS)
+
+clean:
+	rm -f main *.o

+ 18 - 18
StarPU_code/cpu_output.c

@@ -1,18 +1,18 @@
-#include <starpu.h>
-
-void cpu_output(void *buffers[], void *cl_arg)
-{
-	printf("cpu function\n");
-	
-	uint64_t *cpu_sum = cl_arg;
-	
-	/* length of the vector */
-	int n = STARPU_VECTOR_GET_NX(buffers[0]);
-	uint32_t *window = (uint32_t *)STARPU_VECTOR_GET_PTR(buffers[0]);
-	
-	for (int i = 0; i < n; i++)
-	{
-		*cpu_sum += window[i];
-	}
-	
-}
+#include <starpu.h>
+
+void cpu_output(void *buffers[], void *cl_arg)
+{
+	printf("cpu function\n");
+	
+	uint64_t *cpu_sum = cl_arg;
+	
+	/* length of the vector */
+	int n = STARPU_VECTOR_GET_NX(buffers[0]);
+	uint32_t *window = (uint32_t *)STARPU_VECTOR_GET_PTR(buffers[0]);
+	
+	for (int i = 0; i < n; i++)
+	{
+		*cpu_sum += window[i];
+	}
+	
+}

+ 127 - 127
StarPU_code/main.c

@@ -1,127 +1,127 @@
-#include<starpu.h>
-#include <stdio.h>
-#include<stdlib.h>
-#include <sys/time.h>
-// platform independent data types:
-#include <stdint.h>     
-
-// Number of iterations
-#define NUM_ITERATIONS		10
-// The window height is equal to the number of streams
-#define NUM_INPUT_STREAMS	WINDOW_HEIGHT
-#define WINDOW_HEIGHT		4
-// The window width is equal to number of tuples required to fill in a window
-#define WINDOW_WIDTH		100
-
-// The total number of elements is equal to the window_height times window_width
-#define elements			WINDOW_HEIGHT*WINDOW_WIDTH
-
-// measure time: 
-struct timeval start[NUM_ITERATIONS], end[NUM_ITERATIONS];
-
-extern void cpu_output(void *buffers[], void *_args);
-extern void output_thread_aggregation(void *buffers[], void *_args);
-
-static struct starpu_perfmodel perf_model = {
-	.type = STARPU_HISTORY_BASED,
-	.symbol = "main",
-};
-
-static struct starpu_codelet cl =
-{
-	/*CPU implementation of the codelet */
-	.cpu_funcs = { cpu_output },
-	.cpu_funcs_name = { "cpu_output" },
-	#ifdef STARPU_USE_CUDA
-	/* CUDA implementation of the codelet */
-	.cuda_funcs = { output_thread_aggregation },
-	#endif
-	.nbuffers = 1,
-	.modes = { STARPU_RW },
-	.model = &perf_model
-};
-
-
-int main(int argc, char **argv)
-{
-
-	uint32_t iterations_id;
-	
-	// create input streams:
-	for (iterations_id=0; iterations_id<NUM_ITERATIONS; iterations_id++) {
-
-		// dynamic allocation of the memory needed for all the elements
-		uint32_t *window;
-		window = (uint32_t*)calloc(elements, sizeof(uint32_t));
-		
-		// check if there's enough space for the allocation
-		if(!window){
-			printf("Allocation error for window - aborting.\n");
-			exit(1);
-		}
-		
-		uint64_t ag_val = 0;	// test variable to check if the cuda sum is equal to the cpu sum
-		
-		// initialization - fill in the window with random numbers:
-		for (int i = 0; i < elements; i++) {
-			window[i] = (rand()%1000);
-			ag_val += window[i];
-		}
-		printf("TEST %lu\n", ag_val);
-		
-		gettimeofday(&start[iterations_id], NULL); // start time for each iteration only for StarPU initialization and time to calculate aggregated value
-		
-		/* initialize StarPU */
-		starpu_init(NULL);
-		
-		/* initialize performance model */
-		starpu_perfmodel_init(&perf_model);
-		
-		/* Tell StaPU to associate the "window" vector with the "vector_handle" */
-		starpu_data_handle_t vector_handle;
-		starpu_vector_data_register(&vector_handle, STARPU_MAIN_RAM, (uintptr_t)window, elements, sizeof(window[0]));
-		
-		/* create a synchronous task: any call to starpu_task_submit will block
-		 * until it is terminated */
-		struct starpu_task *task = starpu_task_create();
-		task->synchronous = 1;
-		task->cl = &cl; /* Pointer to the codelet defined above */
-		
-		/* the codelet manipulates one buffer in RW mode */
-		task->handles[0] = vector_handle;
-		
-		uint64_t aggregated_value = 0;
-		
-		/* an argument is passed to the codelet, beware that this is a
-		 * READ-ONLY buffer and that the codelet may be given a pointer to a
-		 * COPY of the argument */
-		task->cl_arg = &aggregated_value;
-		task->cl_arg_size = sizeof(aggregated_value);
-		
-		/* submit the task to StarPU */
-		starpu_task_submit(task);
-		
-		/* StarPU does not need to manipulate the array anymore so we can stop monitoring it */
-		starpu_data_unregister(vector_handle);
-		
-		/* terminate StarPU */
-		starpu_shutdown();
-		gettimeofday(&end[iterations_id], NULL);	// stop time for each iteration after aggregation value has been calculated and StarPU has been shutted down
-		
-		printf("iter: %d - aggregated value: %lu\n", iterations_id, aggregated_value);
-		
-		//free the memory allocated on the CPU
-		free(window);
-	}
-
-	uint64_t time = 0;		// variable that holds the time
-	
-	// calculate the time required for the calculation of the aggregated value for all iterations
-	for (iterations_id=0; iterations_id<NUM_ITERATIONS; iterations_id++) {
-		time += ((end[iterations_id].tv_sec * 1000000 + end[iterations_id].tv_usec) - (start[iterations_id].tv_sec * 1000000 + start[iterations_id].tv_usec));
-	}
-
-	printf("usec: %ld\n", time);
-
-    return 0;
-}
+#include<starpu.h>
+#include <stdio.h>
+#include<stdlib.h>
+#include <sys/time.h>
+// platform independent data types:
+#include <stdint.h>     
+
+// Number of iterations
+#define NUM_ITERATIONS		10
+// The window height is equal to the number of streams
+#define NUM_INPUT_STREAMS	WINDOW_HEIGHT
+#define WINDOW_HEIGHT		4
+// The window width is equal to number of tuples required to fill in a window
+#define WINDOW_WIDTH		100
+
+// The total number of elements is equal to the window_height times window_width
+#define elements			WINDOW_HEIGHT*WINDOW_WIDTH
+
+// measure time: 
+struct timeval start[NUM_ITERATIONS], end[NUM_ITERATIONS];
+
+extern void cpu_output(void *buffers[], void *_args);
+extern void output_thread_aggregation(void *buffers[], void *_args);
+
+static struct starpu_perfmodel perf_model = {
+	.type = STARPU_HISTORY_BASED,
+	.symbol = "main",
+};
+
+static struct starpu_codelet cl =
+{
+	/*CPU implementation of the codelet */
+	.cpu_funcs = { cpu_output },
+	.cpu_funcs_name = { "cpu_output" },
+	#ifdef STARPU_USE_CUDA
+	/* CUDA implementation of the codelet */
+	.cuda_funcs = { output_thread_aggregation },
+	#endif
+	.nbuffers = 1,
+	.modes = { STARPU_RW },
+	.model = &perf_model
+};
+
+
+int main(int argc, char **argv)
+{
+
+	uint32_t iterations_id;
+	
+	// create input streams:
+	for (iterations_id=0; iterations_id<NUM_ITERATIONS; iterations_id++) {
+
+		// dynamic allocation of the memory needed for all the elements
+		uint32_t *window;
+		window = (uint32_t*)calloc(elements, sizeof(uint32_t));
+		
+		// check if there's enough space for the allocation
+		if(!window){
+			printf("Allocation error for window - aborting.\n");
+			exit(1);
+		}
+		
+		uint64_t ag_val = 0;	// test variable to check if the cuda sum is equal to the cpu sum
+		
+		// initialization - fill in the window with random numbers:
+		for (int i = 0; i < elements; i++) {
+			window[i] = (rand()%1000);
+			ag_val += window[i];
+		}
+		printf("TEST %lu\n", ag_val);
+		
+		gettimeofday(&start[iterations_id], NULL); // start time for each iteration only for StarPU initialization and time to calculate aggregated value
+		
+		/* initialize StarPU */
+		starpu_init(NULL);
+		
+		/* initialize performance model */
+		starpu_perfmodel_init(&perf_model);
+		
+		/* Tell StaPU to associate the "window" vector with the "vector_handle" */
+		starpu_data_handle_t vector_handle;
+		starpu_vector_data_register(&vector_handle, STARPU_MAIN_RAM, (uintptr_t)window, elements, sizeof(window[0]));
+		
+		/* create a synchronous task: any call to starpu_task_submit will block
+		 * until it is terminated */
+		struct starpu_task *task = starpu_task_create();
+		task->synchronous = 1;
+		task->cl = &cl; /* Pointer to the codelet defined above */
+		
+		/* the codelet manipulates one buffer in RW mode */
+		task->handles[0] = vector_handle;
+		
+		uint64_t aggregated_value = 0;
+		
+		/* an argument is passed to the codelet, beware that this is a
+		 * READ-ONLY buffer and that the codelet may be given a pointer to a
+		 * COPY of the argument */
+		task->cl_arg = &aggregated_value;
+		task->cl_arg_size = sizeof(aggregated_value);
+		
+		/* submit the task to StarPU */
+		starpu_task_submit(task);
+		
+		/* StarPU does not need to manipulate the array anymore so we can stop monitoring it */
+		starpu_data_unregister(vector_handle);
+		
+		/* terminate StarPU */
+		starpu_shutdown();
+		gettimeofday(&end[iterations_id], NULL);	// stop time for each iteration after aggregation value has been calculated and StarPU has been shutted down
+		
+		printf("iter: %d - aggregated value: %lu\n", iterations_id, aggregated_value);
+		
+		//free the memory allocated on the CPU
+		free(window);
+	}
+
+	uint64_t time = 0;		// variable that holds the time
+	
+	// calculate the time required for the calculation of the aggregated value for all iterations
+	for (iterations_id=0; iterations_id<NUM_ITERATIONS; iterations_id++) {
+		time += ((end[iterations_id].tv_sec * 1000000 + end[iterations_id].tv_usec) - (start[iterations_id].tv_sec * 1000000 + start[iterations_id].tv_usec));
+	}
+
+	printf("usec: %ld\n", time);
+
+    return 0;
+}

+ 102 - 102
StarPU_code/output_thread_aggregation.cu

@@ -1,102 +1,102 @@
-#include<stdio.h>
-#include <starpu.h>
-#include <math.h>
-
-__global__ void output_thread_aggregation(uint32_t *window, uint32_t *dev_data, int n)
-{
-	uint tid = threadIdx.x;
-	uint index = blockIdx.x*blockDim.x + threadIdx.x;
-	
-	// convert global data pointer to the local pointer of the block
-	uint32_t *idata = window + blockIdx.x * blockDim.x;
-	
-	if (index >= n) return;
-	
-	// reduction algorithm	
-	for (int stride = blockDim.x/2; stride > 0; stride>>=1){
-		if (tid < stride){
-			idata[tid] += idata[tid+stride];
-		}
-		__syncthreads();
-	}
-	
-	// write result for this block to global memory
-	if(tid == 0){
-		dev_data[blockIdx.x] = idata[0];
-	}
-	
-}
-
-
-extern "C" void output_thread_aggregation(void *buffers[], void *_args)
-{
-	printf("cuda function\n");
-	
-	uint64_t *aggregated_value = (uint64_t *)_args;
-	
-	/* length of the vector */
-	int n = STARPU_VECTOR_GET_NX(buffers[0]);
-	uint32_t *window = (uint32_t *)STARPU_VECTOR_GET_PTR(buffers[0]);
-	
-	/* define the number of threads per block accordingly to the vector's size */
-	int n_threads;
-	if (sqrt(n) <= 32)
-		{n_threads = 32;}
-	else if (sqrt(n) <= 64)
-		{n_threads = 64;}
-	else if (sqrt(n) <= 128)
-		{n_threads = 128;}
-	else if (sqrt(n) <= 256)
-		{n_threads = 256;}
-	else if (sqrt(n) <= 512)
-		{n_threads = 512;}
-	else
-		{n_threads = 1024;}
-
-	printf("n_threads = %lu\n",n_threads);
-	
-	// define number of blocks and number of threads per block (kernel parameters)
-	dim3 threads_per_block (n_threads);
-	dim3 blocks ((n+threads_per_block.x-1)/threads_per_block.x);
-	
-	// dynamic allocation of the reduced data matrix
-	uint32_t *h_data = (uint32_t *)malloc(blocks.x*sizeof(uint32_t));
-	
-	if(!h_data){
-		printf("Allocation error for h_data - aborting.\n");
-		exit(1);
-	}
-	
-	// GPU memory pointers
-	uint32_t *dev_window;
-	uint32_t *dev_data;
-	
-	// allocate the memory on the GPU
-	cudaMalloc((void**)&dev_window, n*sizeof(uint32_t));
-	cudaMalloc((void**)&dev_data, blocks.x*sizeof(uint32_t));
-	
-	// copy the array 'window' to the GPU
-	cudaMemcpyAsync(dev_window, window, n*sizeof(uint32_t), cudaMemcpyHostToDevice);
-	
-	// launch kernel
-	output_thread_aggregation<<<blocks,threads_per_block>>>(dev_window, dev_data, n);
-	
-	cudaStreamSynchronize(starpu_cuda_get_local_stream());
-	
-	// copy back the result to the CPU
-	cudaMemcpyAsync(h_data, dev_data, blocks.x*sizeof(uint32_t), cudaMemcpyDeviceToHost);
-	
-	uint64_t gpu_sum = 0;
-	
-	// compute the total sum from gpu
-	for(int i=0; i<blocks.x; i++){
-		gpu_sum += h_data[i];
-	}
-	
-	// copy the sum to the main program
-	*aggregated_value = gpu_sum;
-	
-	//free the memory allocated on the GPU
-	cudaFree(dev_window);
-	cudaFree(dev_data);
-}
+#include<stdio.h>
+#include <starpu.h>
+#include <math.h>
+
+__global__ void output_thread_aggregation(uint32_t *window, uint32_t *dev_data, int n)
+{
+	uint tid = threadIdx.x;
+	uint index = blockIdx.x*blockDim.x + threadIdx.x;
+	
+	// convert global data pointer to the local pointer of the block
+	uint32_t *idata = window + blockIdx.x * blockDim.x;
+	
+	if (index >= n) return;
+	
+	// reduction algorithm	
+	for (int stride = blockDim.x/2; stride > 0; stride>>=1){
+		if (tid < stride){
+			idata[tid] += idata[tid+stride];
+		}
+		__syncthreads();
+	}
+	
+	// write result for this block to global memory
+	if(tid == 0){
+		dev_data[blockIdx.x] = idata[0];
+	}
+	
+}
+
+
+extern "C" void output_thread_aggregation(void *buffers[], void *_args)
+{
+	printf("cuda function\n");
+	
+	uint64_t *aggregated_value = (uint64_t *)_args;
+	
+	/* length of the vector */
+	int n = STARPU_VECTOR_GET_NX(buffers[0]);
+	uint32_t *window = (uint32_t *)STARPU_VECTOR_GET_PTR(buffers[0]);
+	
+	/* define the number of threads per block accordingly to the vector's size */
+	int n_threads;
+	if (sqrt(n) <= 32)
+		{n_threads = 32;}
+	else if (sqrt(n) <= 64)
+		{n_threads = 64;}
+	else if (sqrt(n) <= 128)
+		{n_threads = 128;}
+	else if (sqrt(n) <= 256)
+		{n_threads = 256;}
+	else if (sqrt(n) <= 512)
+		{n_threads = 512;}
+	else
+		{n_threads = 1024;}
+
+	printf("n_threads = %lu\n",n_threads);
+	
+	// define number of blocks and number of threads per block (kernel parameters)
+	dim3 threads_per_block (n_threads);
+	dim3 blocks ((n+threads_per_block.x-1)/threads_per_block.x);
+	
+	// dynamic allocation of the reduced data matrix
+	uint32_t *h_data = (uint32_t *)malloc(blocks.x*sizeof(uint32_t));
+	
+	if(!h_data){
+		printf("Allocation error for h_data - aborting.\n");
+		exit(1);
+	}
+	
+	// GPU memory pointers
+	uint32_t *dev_window;
+	uint32_t *dev_data;
+	
+	// allocate the memory on the GPU
+	cudaMalloc((void**)&dev_window, n*sizeof(uint32_t));
+	cudaMalloc((void**)&dev_data, blocks.x*sizeof(uint32_t));
+	
+	// copy the array 'window' to the GPU
+	cudaMemcpyAsync(dev_window, window, n*sizeof(uint32_t), cudaMemcpyHostToDevice);
+	
+	// launch kernel
+	output_thread_aggregation<<<blocks,threads_per_block>>>(dev_window, dev_data, n);
+	
+	cudaStreamSynchronize(starpu_cuda_get_local_stream());
+	
+	// copy back the result to the CPU
+	cudaMemcpyAsync(h_data, dev_data, blocks.x*sizeof(uint32_t), cudaMemcpyDeviceToHost);
+	
+	uint64_t gpu_sum = 0;
+	
+	// compute the total sum from gpu
+	for(int i=0; i<blocks.x; i++){
+		gpu_sum += h_data[i];
+	}
+	
+	// copy the sum to the main program
+	*aggregated_value = gpu_sum;
+	
+	//free the memory allocated on the GPU
+	cudaFree(dev_window);
+	cudaFree(dev_data);
+}

+ 7 - 7
cuda_code/Makefile

@@ -1,8 +1,8 @@
-reduction: reduction.cu
-	nvcc -o reduction reduction.cu
-
-run:
-	./reduction
-	
-clean:
+reduction: reduction.cu
+	nvcc -o reduction reduction.cu
+
+run:
+	./reduction
+	
+clean:
 	rm -f reduction

+ 128 - 128
cuda_code/reduction.cu

@@ -1,128 +1,128 @@
-#include <stdio.h>
-#include<stdlib.h>
-#include <sys/time.h>
-// platform independent data types:
-#include <stdint.h>
-
-// Number of iterations
-#define NUM_ITERATIONS          10
-// The window height is equal to the number of streams
-#define NUM_INPUT_STREAMS       WINDOW_HEIGHT
-#define WINDOW_HEIGHT           4
-// The window width is equal to number of tuples required to fill in a window
-#define WINDOW_WIDTH            100
-
-// The total number of elements is equal to the window_height times window_width
-#define elements				WINDOW_HEIGHT*WINDOW_WIDTH
-
-// measure time: 
-struct timeval start, end;
-
-
-// output streams (aggregator):
-__global__ void output_thread_aggregation(uint32_t *window, uint32_t *dev_data, int n) {
-	
-	uint tid = threadIdx.x;
-	uint index = blockIdx.x*blockDim.x + threadIdx.x;
-	
-	uint32_t *idata = window + blockIdx.x * blockDim.x;
-	
-	if (index >= n) return;
-	
-	for (int stride = blockDim.x/2; stride > 0; stride>>=1){
-		if (tid < stride){
-			idata[tid] += idata[tid+stride];
-		}
-		__syncthreads();
-	}
-	
-	if(tid == 0){
-		dev_data[blockIdx.x] = idata[0];
-	}
-
-}
-
-int main(void) {
-
-	uint32_t iterations_id;
-	uint64_t aggregated_value;
-	
-	int n = elements;
-	
-	//define number of blocks and number of threads per block (kernel parameters)
-	dim3 threads_per_block (1);
-	dim3 blocks ((elements+threads_per_block.x-1)/threads_per_block.x);
-	
-	gettimeofday(&start, NULL);
-
-	// create input streams:
-	for (iterations_id=0; iterations_id<NUM_ITERATIONS; iterations_id++) {
-
-		// dynamic allocation of the memory needed for all the elements
-		uint32_t *window;
-		window = (uint32_t*)calloc(elements, sizeof(uint32_t));
-		
-		// check if there's enough space for the allocation
-		if(!window){
-			printf("Allocation error for window - aborting.\n");
-			exit(1);
-		}
-		
-		// dynamic allocation of the reduced data matrix
-		uint32_t *h_data = (uint32_t *)malloc(blocks.x*sizeof(uint32_t));
-		
-		if(!h_data){
-			printf("Allocation error for h_data - aborting.\n");
-			exit(1);
-		}
-		
-		uint64_t ag_val = 0;
-		
-		// initialization - fill in the window with random numbers:
-		for (int i = 0; i < elements; i++) {
-			window[i] = (rand()%1000);
-			ag_val += window[i];
-		}
-		printf("TEST %d\n", ag_val);
-		
-		//GPU memory pointers
-		uint32_t *dev_window;
-		uint32_t *dev_data;
-
-		//allocate the memory on the GPU
-		cudaMalloc((void**)&dev_window, elements*sizeof(uint32_t));
-		cudaMalloc((void**)&dev_data, blocks.x*sizeof(uint32_t));
-
-		//copy the array 'window' to the GPU
-		cudaMemcpy(dev_window, window, elements*sizeof(uint32_t), cudaMemcpyHostToDevice);
-
-		//launch kernel
-		output_thread_aggregation<<<blocks,threads_per_block>>>(dev_window, dev_data, n);
-		cudaDeviceSynchronize();
-
-		//copy back the result to the CPU
-		cudaMemcpy(h_data, dev_data, blocks.x*sizeof(uint32_t), cudaMemcpyDeviceToHost);
-		
-		aggregated_value = 0;
-		
-		for(int i=0; i<blocks.x; i++){
-			aggregated_value += h_data[i];
-		}
-		
-		cudaDeviceSynchronize();
-
-		printf("iter: %d - aggregated value: %lu\n", iterations_id, aggregated_value);
-		
-		//free the memory allocated on the GPU
-		cudaFree(dev_window);
-		cudaFree(dev_data);
-
-	}
-
-	gettimeofday(&end, NULL);
-	printf("usec: %ld\n", ((end.tv_sec * 1000000 + end.tv_usec) - (start.tv_sec * 1000000 + start.tv_usec)));
-
-	return 0;
-	
-}
-
+#include <stdio.h>
+#include<stdlib.h>
+#include <sys/time.h>
+// platform independent data types:
+#include <stdint.h>
+
+// Number of iterations
+#define NUM_ITERATIONS          10
+// The window height is equal to the number of streams
+#define NUM_INPUT_STREAMS       WINDOW_HEIGHT
+#define WINDOW_HEIGHT           4
+// The window width is equal to number of tuples required to fill in a window
+#define WINDOW_WIDTH            100
+
+// The total number of elements is equal to the window_height times window_width
+#define elements				WINDOW_HEIGHT*WINDOW_WIDTH
+
+// measure time: 
+struct timeval start, end;
+
+
+// output streams (aggregator):
+__global__ void output_thread_aggregation(uint32_t *window, uint32_t *dev_data, int n) {
+	
+	uint tid = threadIdx.x;
+	uint index = blockIdx.x*blockDim.x + threadIdx.x;
+	
+	uint32_t *idata = window + blockIdx.x * blockDim.x;
+	
+	if (index >= n) return;
+	
+	for (int stride = blockDim.x/2; stride > 0; stride>>=1){
+		if (tid < stride){
+			idata[tid] += idata[tid+stride];
+		}
+		__syncthreads();
+	}
+	
+	if(tid == 0){
+		dev_data[blockIdx.x] = idata[0];
+	}
+
+}
+
+int main(void) {
+
+	uint32_t iterations_id;
+	uint64_t aggregated_value;
+	
+	int n = elements;
+	
+	//define number of blocks and number of threads per block (kernel parameters)
+	dim3 threads_per_block (1);
+	dim3 blocks ((elements+threads_per_block.x-1)/threads_per_block.x);
+	
+	gettimeofday(&start, NULL);
+
+	// create input streams:
+	for (iterations_id=0; iterations_id<NUM_ITERATIONS; iterations_id++) {
+
+		// dynamic allocation of the memory needed for all the elements
+		uint32_t *window;
+		window = (uint32_t*)calloc(elements, sizeof(uint32_t));
+		
+		// check if there's enough space for the allocation
+		if(!window){
+			printf("Allocation error for window - aborting.\n");
+			exit(1);
+		}
+		
+		// dynamic allocation of the reduced data matrix
+		uint32_t *h_data = (uint32_t *)malloc(blocks.x*sizeof(uint32_t));
+		
+		if(!h_data){
+			printf("Allocation error for h_data - aborting.\n");
+			exit(1);
+		}
+		
+		uint64_t ag_val = 0;
+		
+		// initialization - fill in the window with random numbers:
+		for (int i = 0; i < elements; i++) {
+			window[i] = (rand()%1000);
+			ag_val += window[i];
+		}
+		printf("TEST %d\n", ag_val);
+		
+		//GPU memory pointers
+		uint32_t *dev_window;
+		uint32_t *dev_data;
+
+		//allocate the memory on the GPU
+		cudaMalloc((void**)&dev_window, elements*sizeof(uint32_t));
+		cudaMalloc((void**)&dev_data, blocks.x*sizeof(uint32_t));
+
+		//copy the array 'window' to the GPU
+		cudaMemcpy(dev_window, window, elements*sizeof(uint32_t), cudaMemcpyHostToDevice);
+
+		//launch kernel
+		output_thread_aggregation<<<blocks,threads_per_block>>>(dev_window, dev_data, n);
+		cudaDeviceSynchronize();
+
+		//copy back the result to the CPU
+		cudaMemcpy(h_data, dev_data, blocks.x*sizeof(uint32_t), cudaMemcpyDeviceToHost);
+		
+		aggregated_value = 0;
+		
+		for(int i=0; i<blocks.x; i++){
+			aggregated_value += h_data[i];
+		}
+		
+		cudaDeviceSynchronize();
+
+		printf("iter: %d - aggregated value: %lu\n", iterations_id, aggregated_value);
+		
+		//free the memory allocated on the GPU
+		cudaFree(dev_window);
+		cudaFree(dev_data);
+
+	}
+
+	gettimeofday(&end, NULL);
+	printf("usec: %ld\n", ((end.tv_sec * 1000000 + end.tv_usec) - (start.tv_sec * 1000000 + start.tv_usec)));
+
+	return 0;
+	
+}
+

+ 0 - 0
original_code/Makefile


+ 82 - 82
original_code/aggregation.c

@@ -1,82 +1,82 @@
-/****************************************************************************************************
-* count-based window aggregation
-*
-* A number of input streams fetch tuples.
-* Each time the window is filled up 
-* The tuples are stored in a MxN matrix
-* M is equal to the number of input streams
-* N is the window size
-******************************************************************************************************/
-#include <stdio.h>
-#include<stdlib.h>
-#include <sys/time.h>
-// platform independent data types:
-#include <stdint.h>     
-
-// Number of iterations
-#define NUM_ITERATIONS		10
-// The window height is equal to the number of streams
-#define WINDOW_HEIGHT		4
-// The window width is equal to number of tuples required to fill in a window
-#define WINDOW_WIDTH		100
-
-// The total number of elements is equal to the window_height times window_width
-#define elements			WINDOW_HEIGHT*WINDOW_WIDTH
-
-// measure time: 
-struct timeval start, end;
-
-
-// output streams (aggregator):
-int output_thread_aggregation(uint32_t *window, uint32_t iterations_id) {
-	
-	uint64_t *aggregated_value;
-	aggregated_value= (uint64_t*)malloc(sizeof(uint64_t));
-	
-	// check if there's enough space for the allocation
-	if(!aggregated_value){
-		printf("Allocation error for aggregated_value - aborting.\n");
-		exit(1);
-	}
-
-	*aggregated_value = 0;
-
-	// aggregate the tuples of each stream:
-	for (int i = 0; i < elements; i++) {
-		*aggregated_value += window[i];
-	}
-	printf("iter: %d - aggregated value: %lu\n", iterations_id, *aggregated_value);
-
-}
-
-int main(void) {
-	
-	// dynamic allocation of the memory needed for all the elements
-	uint32_t *window;
-	window = (uint32_t*)calloc(elements, sizeof(uint32_t));
-	
-	// check if there's enough space for the allocation
-	if(!window){
-		printf("Allocation error for window - aborting.\n");
-		exit(1);
-	}
-	
-	uint32_t iterations_id;
-	
-	gettimeofday(&start, NULL);
-
-	// create input streams:
-	for (iterations_id=0; iterations_id<NUM_ITERATIONS; iterations_id++) {
-		// fill in the window with random numbers:
-		for (int i = 0; i < elements; i++) {
-			window[i] = (rand()%1000);
-		}
-		output_thread_aggregation(window, iterations_id);
-	}
-	
-	free(window);
-	
-	gettimeofday(&end, NULL);
-	printf("usec: %ld\n", ((end.tv_sec * 1000000 + end.tv_usec) - (start.tv_sec * 1000000 + start.tv_usec)));
-
-}
+/****************************************************************************************************
+* count-based window aggregation
+*
+* A number of input streams fetch tuples.
+* Each time the window is filled up 
+* The tuples are stored in a MxN matrix
+* M is equal to the number of input streams
+* N is the window size
+******************************************************************************************************/
+#include <stdio.h>
+#include<stdlib.h>
+#include <sys/time.h>
+// platform independent data types:
+#include <stdint.h>     
+
+// Number of iterations
+#define NUM_ITERATIONS		10
+// The window height is equal to the number of streams
+#define WINDOW_HEIGHT		4
+// The window width is equal to number of tuples required to fill in a window
+#define WINDOW_WIDTH		100
+
+// The total number of elements is equal to the window_height times window_width
+#define elements			WINDOW_HEIGHT*WINDOW_WIDTH
+
+// measure time: 
+struct timeval start, end;
+
+
+// output streams (aggregator):
+int output_thread_aggregation(uint32_t *window, uint32_t iterations_id) {
+	
+	uint64_t *aggregated_value;
+	aggregated_value= (uint64_t*)malloc(sizeof(uint64_t));
+	
+	// check if there's enough space for the allocation
+	if(!aggregated_value){
+		printf("Allocation error for aggregated_value - aborting.\n");
+		exit(1);
+	}
+
+	*aggregated_value = 0;
+
+	// aggregate the tuples of each stream:
+	for (int i = 0; i < elements; i++) {
+		*aggregated_value += window[i];
+	}
+	printf("iter: %d - aggregated value: %lu\n", iterations_id, *aggregated_value);
+
+}
+
+int main(void) {
+	
+	// dynamic allocation of the memory needed for all the elements
+	uint32_t *window;
+	window = (uint32_t*)calloc(elements, sizeof(uint32_t));
+	
+	// check if there's enough space for the allocation
+	if(!window){
+		printf("Allocation error for window - aborting.\n");
+		exit(1);
+	}
+	
+	uint32_t iterations_id;
+	
+	gettimeofday(&start, NULL);
+
+	// create input streams:
+	for (iterations_id=0; iterations_id<NUM_ITERATIONS; iterations_id++) {
+		// fill in the window with random numbers:
+		for (int i = 0; i < elements; i++) {
+			window[i] = (rand()%1000);
+		}
+		output_thread_aggregation(window, iterations_id);
+	}
+	
+	free(window);
+	
+	gettimeofday(&end, NULL);
+	printf("usec: %ld\n", ((end.tv_sec * 1000000 + end.tv_usec) - (start.tv_sec * 1000000 + start.tv_usec)));
+
+}