aggregation.c 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. /****************************************************************************************************
  2. * count-based window aggregation
  3. *
  4. * A number of input streams fetch tuples.
  5. * Each time the window is filled up
  6. * The tuples are stored in a MxN matrix
  7. * M is equal to the number of input streams
  8. * N is the window size
  9. ******************************************************************************************************/
  10. #include <stdio.h>
  11. #include<stdlib.h>
  12. #include <sys/time.h>
  13. // platform independent data types:
  14. #include <stdint.h>
  15. // Number of iterations
  16. #define NUM_ITERATIONS 10
  17. // The window height is equal to the number of streams
  18. #define WINDOW_HEIGHT 4
  19. // The window width is equal to number of tuples required to fill in a window
  20. #define WINDOW_WIDTH 100
  21. // The total number of elements is equal to the window_height times window_width
  22. #define elements WINDOW_HEIGHT*WINDOW_WIDTH
  23. // measure time:
  24. struct timeval start, end;
  25. // output streams (aggregator):
  26. int output_thread_aggregation(uint32_t *window, uint32_t iterations_id) {
  27. uint64_t *aggregated_value;
  28. aggregated_value= (uint64_t*)malloc(sizeof(uint64_t));
  29. // check if there's enough space for the allocation
  30. if(!aggregated_value){
  31. printf("Allocation error for aggregated_value - aborting.\n");
  32. exit(1);
  33. }
  34. *aggregated_value = 0;
  35. // aggregate the tuples of each stream:
  36. for (int i = 0; i < elements; i++) {
  37. *aggregated_value += window[i];
  38. }
  39. printf("iter: %d - aggregated value: %lu\n", iterations_id, *aggregated_value);
  40. }
  41. int main(void) {
  42. // dynamic allocation of the memory needed for all the elements
  43. uint32_t *window;
  44. window = (uint32_t*)calloc(elements, sizeof(uint32_t));
  45. // check if there's enough space for the allocation
  46. if(!window){
  47. printf("Allocation error for window - aborting.\n");
  48. exit(1);
  49. }
  50. uint32_t iterations_id;
  51. gettimeofday(&start, NULL);
  52. // create input streams:
  53. for (iterations_id=0; iterations_id<NUM_ITERATIONS; iterations_id++) {
  54. // fill in the window with random numbers:
  55. for (int i = 0; i < elements; i++) {
  56. window[i] = (rand()%1000);
  57. }
  58. output_thread_aggregation(window, iterations_id);
  59. }
  60. free(window);
  61. gettimeofday(&end, NULL);
  62. printf("usec: %ld\n", ((end.tv_sec * 1000000 + end.tv_usec) - (start.tv_sec * 1000000 + start.tv_usec)));
  63. }