Bläddra i källkod

use StarPU to perform YUV downscaling (CPU only yet)

Cédric Augonnet 16 år sedan
förälder
incheckning
846e608648
2 ändrade filer med 194 tillägg och 54 borttagningar
  1. 192 54
      examples/ppm-downscaler/yuv-downscaler.c
  2. 2 0
      examples/ppm-downscaler/yuv-downscaler.h

+ 192 - 54
examples/ppm-downscaler/yuv-downscaler.c

@@ -14,8 +14,6 @@
  * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  */
 
-#include "yuv-downscaler.h"
-
 #include <starpu.h>
 
 #include <sys/types.h>
@@ -25,8 +23,10 @@
 #include <assert.h>
 #include <stdio.h>
 
-const char *filename_in_default = "hugefile.0.5s.yuv";
-const char *filename_out_default = "hugefile.0.5s.out.yuv";
+#include "yuv-downscaler.h"
+
+const char *filename_in_default = "hugefile.2s.yuv";
+const char *filename_out_default = "hugefile.2s.out.yuv";
 char filename_in[1024];
 char filename_out[1024];
 
@@ -45,37 +45,38 @@ void parse_args(int argc, char **argv)
 #define FRAMESIZE	sizeof(struct yuv_frame)
 #define NEW_FRAMESIZE	sizeof(struct yuv_new_frame)
 
-static void dummy_y_reduction(uint8_t *y_in, uint8_t *y_out)
-{
-	unsigned col, line;
+static pthread_cond_t ds_callback_cond = PTHREAD_COND_INITIALIZER;
+static pthread_mutex_t ds_callback_mutex = PTHREAD_MUTEX_INITIALIZER;
+static unsigned ds_callback_terminated = 0;
+static unsigned ds_callback_cnt = 0;
 
-	for (line = 0; line < HEIGHT; line+=FACTOR)
-	for (col = 0; col < WIDTH; col+=FACTOR)
+static void ds_callback(void *arg)
+{
+	unsigned val = STARPU_ATOMIC_ADD(&ds_callback_cnt, -1);
+	if (val == 0)
 	{
-		unsigned sum = 0;
-
-		unsigned lline, lcol;
-		for (lline = 0; lline < FACTOR; lline++)
-		for (lcol = 0; lcol < FACTOR; lcol++)
-		{
-			unsigned in_index = (lcol + col) + (lline + line)*WIDTH;
-
-			sum += y_in[in_index];
-		}
-
-		unsigned out_index = (col / FACTOR) + (line / FACTOR)*(NEW_WIDTH);
-		y_out[out_index] = (uint8_t)(sum/(FACTOR*FACTOR));
+		fprintf(stderr, "Downscaling terminated...\n");
+		pthread_mutex_lock(&ds_callback_mutex);
+		ds_callback_terminated = 1;
+		pthread_cond_signal(&ds_callback_cond);
+		pthread_mutex_unlock(&ds_callback_mutex);
 	}
-
-
 }
 
-static void dummy_uv_reduction(uint8_t *uv_in, uint8_t *uv_out)
+static void ds_kernel_cpu(starpu_data_interface_t *descr, __attribute__((unused)) void *arg)
 {
-	unsigned col, line;
+	uint8_t *input = (uint8_t *)descr[0].blas.ptr;
+	unsigned input_ld = descr[0].blas.ld;
+
+	uint8_t *output = (uint8_t *)descr[1].blas.ptr;
+	unsigned output_ld = descr[1].blas.ld;
 
-	for (line = 0; line < HEIGHT/2; line+=FACTOR)
-	for (col = 0; col < WIDTH/2; col+=FACTOR)
+	unsigned ncols = descr[0].blas.nx;
+	unsigned nlines = descr[0].blas.ny;
+
+	unsigned line, col;
+	for (line = 0; line < nlines; line+=FACTOR)
+	for (col = 0; col < ncols; col+=FACTOR)
 	{
 		unsigned sum = 0;
 
@@ -83,42 +84,48 @@ static void dummy_uv_reduction(uint8_t *uv_in, uint8_t *uv_out)
 		for (lline = 0; lline < FACTOR; lline++)
 		for (lcol = 0; lcol < FACTOR; lcol++)
 		{
-			unsigned in_index = (lcol + col) + (lline + line)*WIDTH/2;
+			unsigned in_index = (lcol + col) + (lline + line)*input_ld;
 
-			sum += uv_in[in_index];
+			sum += input[in_index];
 		}
 
-		unsigned out_index = (col / FACTOR) + (line / FACTOR)*NEW_WIDTH/2;
-		uv_out[out_index] = (uint8_t)(sum/(FACTOR*FACTOR));
+		unsigned out_index = (col / FACTOR) + (line / FACTOR)*output_ld;
+		output[out_index] = (uint8_t)(sum/(FACTOR*FACTOR));
 	}
 }
 
-
-
-static void dummy_frame_reduction(struct yuv_frame *yuv_in_buffer, struct yuv_new_frame *yuv_out_buffer)
-{
-	uint8_t *y_in = yuv_in_buffer->y;
-	uint8_t *u_in = yuv_in_buffer->u;
-	uint8_t *v_in = yuv_in_buffer->v;
-
-	uint8_t *y_out = yuv_out_buffer->y;
-	uint8_t *u_out = yuv_out_buffer->u;
-	uint8_t *v_out = yuv_out_buffer->v;
-
-	/* downscale Y */
-	dummy_y_reduction(y_in, y_out);
-
-	/* downscale U */
-	dummy_uv_reduction(u_in, u_out);
-
-	/* downscale V */
-	dummy_uv_reduction(v_in, v_out);
-}	
+static struct starpu_codelet_t ds_codelet = {
+	.where = CORE,
+	.core_func = ds_kernel_cpu,
+	.nbuffers = 2, /* input -> output */
+	.model = NULL
+};
+
+/* each block contains BLOCK_HEIGHT consecutive lines */
+static starpu_filter filter_y = {
+	.filter_func = starpu_block_filter_func,
+	.filter_arg = HEIGHT/BLOCK_HEIGHT
+};
+	
+static starpu_filter filter_u = {
+	.filter_func = starpu_block_filter_func,
+	.filter_arg = (HEIGHT/2)/BLOCK_HEIGHT
+};
+
+static starpu_filter filter_v = {
+	.filter_func = starpu_block_filter_func,
+	.filter_arg = (HEIGHT/2)/BLOCK_HEIGHT
+};
 
 int main(int argc, char **argv)
 {
+	assert(HEIGHT % (2*BLOCK_HEIGHT) == 0);
+	assert(HEIGHT % FACTOR == 0);
+	
 	parse_args(argc, argv);
 
+	fprintf(stderr, "Reading input file ...\n");
+
 	/* how many frames ? */
 	struct stat stbuf;
 	stat(filename_in, &stbuf);
@@ -140,15 +147,146 @@ int main(int argc, char **argv)
 	FILE *f_out = fopen(filename_out, "w+");
 	assert(f_out);
 
+	fprintf(stderr, "Alloc output file ...\n");
 	struct yuv_new_frame *yuv_out_buffer = calloc(nframes, NEW_FRAMESIZE);
 	assert(yuv_out_buffer);
 
+	starpu_data_handle *frame_y_handle = calloc(nframes, sizeof(starpu_data_handle));
+	starpu_data_handle *frame_u_handle = calloc(nframes, sizeof(starpu_data_handle));
+	starpu_data_handle *frame_v_handle = calloc(nframes, sizeof(starpu_data_handle));
+
+	starpu_data_handle *new_frame_y_handle = calloc(nframes, sizeof(starpu_data_handle));
+	starpu_data_handle *new_frame_u_handle = calloc(nframes, sizeof(starpu_data_handle));
+	starpu_data_handle *new_frame_v_handle = calloc(nframes, sizeof(starpu_data_handle));
+
+	starpu_init(NULL);
+
+	/* register and partition all layers */
 	unsigned frame;
 	for (frame = 0; frame < nframes; frame++)
 	{
-		dummy_frame_reduction(&yuv_in_buffer[frame], &yuv_out_buffer[frame]);
+		/* register Y layer */
+		starpu_register_blas_data(&frame_y_handle[frame], 0,
+			(uintptr_t)&yuv_in_buffer[frame].y,
+			WIDTH, WIDTH, HEIGHT, sizeof(uint8_t));
+
+		starpu_partition_data(frame_y_handle[frame], &filter_y);
+
+		starpu_register_blas_data(&new_frame_y_handle[frame], 0,
+			(uintptr_t)&yuv_out_buffer[frame].y,
+			NEW_WIDTH, NEW_WIDTH, NEW_HEIGHT, sizeof(uint8_t));
+
+		starpu_partition_data(new_frame_y_handle[frame], &filter_y);
+
+		/* register U layer */
+		starpu_register_blas_data(&frame_u_handle[frame], 0,
+			(uintptr_t)&yuv_in_buffer[frame].u,
+			WIDTH/2, WIDTH/2, HEIGHT/2, sizeof(uint8_t));
+
+		starpu_partition_data(frame_u_handle[frame], &filter_u);
+
+		starpu_register_blas_data(&new_frame_u_handle[frame], 0,
+			(uintptr_t)&yuv_out_buffer[frame].u,
+			NEW_WIDTH/2, NEW_WIDTH/2, NEW_HEIGHT/2, sizeof(uint8_t));
+
+		starpu_partition_data(new_frame_u_handle[frame], &filter_u);
+
+		/* register V layer */
+		starpu_register_blas_data(&frame_v_handle[frame], 0,
+			(uintptr_t)&yuv_in_buffer[frame].v,
+			WIDTH/2, WIDTH/2, HEIGHT/2, sizeof(uint8_t));
+
+		starpu_partition_data(frame_v_handle[frame], &filter_v);
+
+		starpu_register_blas_data(&new_frame_v_handle[frame], 0,
+			(uintptr_t)&yuv_out_buffer[frame].v,
+			NEW_WIDTH/2, NEW_WIDTH/2, NEW_HEIGHT/2, sizeof(uint8_t));
+
+		starpu_partition_data(new_frame_v_handle[frame], &filter_v);
+
 	}
 
+	/* how many tasks are there ? */
+	unsigned nblocks_y = filter_y.filter_arg;
+	unsigned nblocks_uv = filter_u.filter_arg;
+
+	ds_callback_cnt = (nblocks_y + 2*nblocks_uv)*nframes;
+
+	fprintf(stderr, "Start computation: there will be %d tasks for %d frames\n", ds_callback_cnt, nframes);
+	/* do the computation */
+	for (frame = 0; frame < nframes; frame++)
+	{
+		unsigned blocky;
+		for (blocky = 0; blocky < nblocks_y; blocky++)
+		{
+			struct starpu_task *task = starpu_task_create();
+				task->cl = &ds_codelet;
+				task->callback_func = ds_callback;
+
+				/* input */
+				task->buffers[0].handle = get_sub_data(frame_y_handle[frame], 1, blocky);
+				task->buffers[0].mode = STARPU_R;
+
+				/* output */
+				task->buffers[1].handle = get_sub_data(new_frame_y_handle[frame], 1, blocky);
+				task->buffers[1].mode = STARPU_W;
+
+			starpu_submit_task(task);
+		}
+
+		unsigned blocku;
+		for (blocku = 0; blocku < nblocks_uv; blocku++)
+		{
+			struct starpu_task *task = starpu_task_create();
+				task->cl = &ds_codelet;
+				task->callback_func = ds_callback;
+
+				/* input */
+				task->buffers[0].handle = get_sub_data(frame_u_handle[frame], 1, blocku);
+				task->buffers[0].mode = STARPU_R;
+
+				/* output */
+				task->buffers[1].handle = get_sub_data(new_frame_u_handle[frame], 1, blocku);
+				task->buffers[1].mode = STARPU_W;
+
+			starpu_submit_task(task);
+		}
+
+		unsigned blockv;
+		for (blockv = 0; blockv < nblocks_uv; blockv++)
+		{
+			struct starpu_task *task = starpu_task_create();
+				task->cl = &ds_codelet;
+				task->callback_func = ds_callback;
+
+				/* input */
+				task->buffers[0].handle = get_sub_data(frame_v_handle[frame], 1, blockv);
+				task->buffers[0].mode = STARPU_R;
+
+				/* output */
+				task->buffers[1].handle = get_sub_data(new_frame_v_handle[frame], 1, blockv);
+				task->buffers[1].mode = STARPU_W;
+
+			starpu_submit_task(task);
+		}
+	}
+
+	pthread_mutex_lock(&ds_callback_mutex);
+	if (!ds_callback_terminated)
+		pthread_cond_wait(&ds_callback_cond, &ds_callback_mutex);
+	pthread_mutex_unlock(&ds_callback_mutex);
+
+	/* make sure all output buffers are sync'ed */
+	for (frame = 0; frame < nframes; frame++)
+	{
+		starpu_sync_data_with_mem(new_frame_y_handle[frame]);
+		starpu_sync_data_with_mem(new_frame_u_handle[frame]);
+		starpu_sync_data_with_mem(new_frame_v_handle[frame]);
+	}
+
+	/* partition the layers into smaller parts */
+	starpu_shutdown();
+
 	fwrite(yuv_out_buffer, NEW_FRAMESIZE, nframes, f_out);
 
 	return 0;

+ 2 - 0
examples/ppm-downscaler/yuv-downscaler.h

@@ -22,6 +22,8 @@
 #define NEW_WIDTH	(WIDTH/FACTOR)
 #define NEW_HEIGHT	(HEIGHT/FACTOR)
 
+#define BLOCK_HEIGHT    20
+
 #include <stdint.h>
 
 struct yuv_frame {