1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- /****************************************************************************************************
- * count-based window aggregation
- *
- * A number of input streams fetch tuples.
- * Each time the window is filled up
- * The tuples are stored in a MxN matrix
- * M is equal to the number of input streams
- * N is the window size
- ******************************************************************************************************/
- #include <stdio.h>
- #include<stdlib.h>
- #include <sys/time.h>
- // platform independent data types:
- #include <stdint.h>
- // Number of iterations
- #define NUM_ITERATIONS 10
- // The window height is equal to the number of streams
- #define WINDOW_HEIGHT 4
- // The window width is equal to number of tuples required to fill in a window
- #define WINDOW_WIDTH 100
- // The total number of elements is equal to the window_height times window_width
- #define elements WINDOW_HEIGHT*WINDOW_WIDTH
- // measure time:
- struct timeval start, end;
- // output streams (aggregator):
- int output_thread_aggregation(uint32_t *window, uint32_t iterations_id) {
-
- uint64_t *aggregated_value;
- aggregated_value= (uint64_t*)malloc(sizeof(uint64_t));
-
- // check if there's enough space for the allocation
- if(!aggregated_value){
- printf("Allocation error for aggregated_value - aborting.\n");
- exit(1);
- }
- *aggregated_value = 0;
- // aggregate the tuples of each stream:
- for (int i = 0; i < elements; i++) {
- *aggregated_value += window[i];
- }
- printf("iter: %d - aggregated value: %lu\n", iterations_id, *aggregated_value);
- }
- int main(void) {
-
- // dynamic allocation of the memory needed for all the elements
- uint32_t *window;
- window = (uint32_t*)calloc(elements, sizeof(uint32_t));
-
- // check if there's enough space for the allocation
- if(!window){
- printf("Allocation error for window - aborting.\n");
- exit(1);
- }
-
- uint32_t iterations_id;
-
- gettimeofday(&start, NULL);
- // create input streams:
- for (iterations_id=0; iterations_id<NUM_ITERATIONS; iterations_id++) {
- // fill in the window with random numbers:
- for (int i = 0; i < elements; i++) {
- window[i] = (rand()%1000);
- }
- output_thread_aggregation(window, iterations_id);
- }
-
- free(window);
-
- gettimeofday(&end, NULL);
- printf("usec: %ld\n", ((end.tv_sec * 1000000 + end.tv_usec) - (start.tv_sec * 1000000 + start.tv_usec)));
- }
|