/**************************************************************************************************** * count-based window aggregation * * A number of input streams fetch tuples. * Each time the window is filled up * The tuples are stored in a MxN matrix * M is equal to the number of input streams * N is the window size ******************************************************************************************************/ #include #include #include // platform independent data types: #include // Number of iterations #define NUM_ITERATIONS 10 // The window height is equal to the number of streams #define WINDOW_HEIGHT 4 // The window width is equal to number of tuples required to fill in a window #define WINDOW_WIDTH 100 // The total number of elements is equal to the window_height times window_width #define elements WINDOW_HEIGHT*WINDOW_WIDTH // measure time: struct timeval start, end; // output streams (aggregator): int output_thread_aggregation(uint32_t *window, uint32_t iterations_id) { uint64_t *aggregated_value; aggregated_value= (uint64_t*)malloc(sizeof(uint64_t)); // check if there's enough space for the allocation if(!aggregated_value){ printf("Allocation error for aggregated_value - aborting.\n"); exit(1); } *aggregated_value = 0; // aggregate the tuples of each stream: for (int i = 0; i < elements; i++) { *aggregated_value += window[i]; } printf("iter: %d - aggregated value: %lu\n", iterations_id, *aggregated_value); } int main(void) { // dynamic allocation of the memory needed for all the elements uint32_t *window; window = (uint32_t*)calloc(elements, sizeof(uint32_t)); // check if there's enough space for the allocation if(!window){ printf("Allocation error for window - aborting.\n"); exit(1); } uint32_t iterations_id; gettimeofday(&start, NULL); // create input streams: for (iterations_id=0; iterations_id