| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496 | /* StarPU --- Runtime system for heterogeneous multicore architectures. * * Copyright (C) 2009-2015,2017                           Université de Bordeaux * Copyright (C) 2012,2013                                Inria * Copyright (C) 2010                                     Mehdi Juhoor * Copyright (C) 2010-2013,2015-2017                      CNRS * * StarPU is free software; you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation; either version 2.1 of the License, or (at * your option) any later version. * * StarPU is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * * See the GNU Lesser General Public License in COPYING.LGPL for more details. *//* * This reads a wave file, splits it into chunks, and on each of them run a * task which performs an fft, drop some high and low frequencies, and performs * the inverse fft.  It then writes the output to a wave file. */#include <stdio.h>#include <stdlib.h>#include <math.h>#include <string.h>#include <sys/types.h>#include <starpu.h>#include <fftw3.h>#ifdef STARPU_USE_CUDA#include <cufft.h>#include <starpu_cublas_v2.h>#endif/* #define SAVE_RAW	1 */#define DEFAULTINPUTFILE	"input.wav"#define DEFAULTOUTPUTFILE	"output.wav"#define NSAMPLES	(256*1024)#define SAMPLERATE	44100static unsigned nsamples = NSAMPLES;/* This is a band filter, we want to stop everything that is not between LOWFREQ and HIGHFREQ*//* LOWFREQ < i * SAMPLERATE / NSAMPLE */#define LOWFREQ	500U#define HIFREQ	800Ustatic const size_t headersize = 37+9;static FILE *infile, *outfile;static FILE *infile_raw, *outfile_raw;static char *inputfilename = DEFAULTINPUTFILE;static char *outputfilename = DEFAULTOUTPUTFILE;static unsigned use_pin = 0;unsigned length_data;/* buffer containing input WAV data */float *A;starpu_data_handle_t A_handle;/* For performance evaluation */static double start;static double end;static unsigned task_per_worker[STARPU_NMAXWORKERS] = {0};/* *	Functions to Manipulate WAV files */unsigned get_wav_data_bytes_length(FILE *file){	/* this is clearly suboptimal !! */	fseek(file, headersize, SEEK_SET);	unsigned cnt = 0;	while (fgetc(file) != EOF)		cnt++;	return cnt;}void copy_wav_header(FILE *srcfile, FILE *dstfile){	unsigned char buffer[128];	fseek(srcfile, 0, SEEK_SET);	fseek(dstfile, 0, SEEK_SET);	fread(buffer, 1, headersize, infile);	fwrite(buffer, 1, headersize, outfile);}void read_16bit_wav(FILE *infile, unsigned size, float *arrayout, FILE *save_file){	int v;#if SAVE_RAW	unsigned currentpos = 0;#endif	/* we skip the header to only keep the data */	fseek(infile, headersize, SEEK_SET);	for (v=0;v<size;v++)	{		signed char val = (signed char)fgetc(infile);		signed char val2 = (signed char)fgetc(infile);		arrayout[v] = 256*val2 + val;#if SAVE_RAW		fprintf(save_file, "%u %f\n", currentpos++, arrayout[v]);#endif	}}/* we only write the data, not the header !*/void write_16bit_wav(FILE *outfile, unsigned size, float *arrayin, FILE *save_file){	int v;#if SAVE_RAW	unsigned currentpos = 0;#endif	/* we assume that the header is copied using copy_wav_header */	fseek(outfile, headersize, SEEK_SET);	for (v=0;v<size;v++)	{		signed char val = ((int)arrayin[v]) % 256;		signed char val2  = ((int)arrayin[v]) / 256;		fputc(val, outfile);		fputc(val2, outfile);#if SAVE_RAW		if (save_file)	                fprintf(save_file, "%u %f\n", currentpos++, arrayin[v]);#endif	}}/* * *	The actual kernels * *//* we don't reinitialize the CUFFT plan for every kernel, so we "cache" it */typedef struct{	unsigned is_initialized;#ifdef STARPU_USE_CUDA	cufftHandle plan;	cufftHandle inv_plan;	cufftComplex *localout;#endif	fftwf_complex *localout_cpu;	float *Acopy;	fftwf_plan plan_cpu;	fftwf_plan inv_plan_cpu;} fft_plan_cache;static fft_plan_cache plans[STARPU_NMAXWORKERS];#ifdef STARPU_USE_CUDAstatic void band_filter_kernel_gpu(void *descr[], void *arg){	cufftResult cures;	float *localA = (float *)STARPU_VECTOR_GET_PTR(descr[0]);	cufftComplex *localout;	int workerid = starpu_worker_get_id();	/* initialize the plane only during the first iteration */	if (!plans[workerid].is_initialized)	{		cures = cufftPlan1d(&plans[workerid].plan, nsamples, CUFFT_R2C, 1);		STARPU_ASSERT(cures == CUFFT_SUCCESS);		cufftSetStream(plans[workerid].plan, starpu_cuda_get_local_stream());		cures = cufftPlan1d(&plans[workerid].inv_plan, nsamples, CUFFT_C2R, 1);		STARPU_ASSERT(cures == CUFFT_SUCCESS);		cufftSetStream(plans[workerid].inv_plan, starpu_cuda_get_local_stream());		cudaMalloc((void **)&plans[workerid].localout,					nsamples*sizeof(cufftComplex));		STARPU_ASSERT(plans[workerid].localout);		plans[workerid].is_initialized = 1;	}	localout = plans[workerid].localout;	/* FFT */	cures = cufftExecR2C(plans[workerid].plan, localA, localout);	STARPU_ASSERT(cures == CUFFT_SUCCESS);	/* filter low freqs */	unsigned lowfreq_index = (LOWFREQ*nsamples)/SAMPLERATE;	cudaMemsetAsync(&localout[0], 0, lowfreq_index*sizeof(fftwf_complex), starpu_cuda_get_local_stream());	/* filter high freqs */	unsigned hifreq_index = (HIFREQ*nsamples)/SAMPLERATE;	cudaMemsetAsync(&localout[hifreq_index], nsamples/2, (nsamples/2 - hifreq_index)*sizeof(fftwf_complex), starpu_cuda_get_local_stream());	/* inverse FFT */	cures = cufftExecC2R(plans[workerid].inv_plan, localout, localA);	STARPU_ASSERT(cures == CUFFT_SUCCESS);	/* FFTW does not normalize its output ! */	float scal = 1.0f/nsamples;	cublasStatus_t status = cublasSscal (starpu_cublas_get_local_handle(), nsamples, &scal, localA, 1);	if (status != CUBLAS_STATUS_SUCCESS)		STARPU_CUBLAS_REPORT_ERROR(status);}#endifstatic starpu_pthread_mutex_t fftw_mutex = PTHREAD_MUTEX_INITIALIZER;static void band_filter_kernel_cpu(void *descr[], void *arg){	float *localA = (float *)STARPU_VECTOR_GET_PTR(descr[0]);	int workerid = starpu_worker_get_id();	/* initialize the plane only during the first iteration */	if (!plans[workerid].is_initialized)	{		plans[workerid].localout_cpu = malloc(nsamples*sizeof(fftwf_complex));		plans[workerid].Acopy = malloc(nsamples*sizeof(float));		/* create plans, only "fftwf_execute" is thread safe in FFTW ... */		STARPU_PTHREAD_MUTEX_LOCK(&fftw_mutex);		plans[workerid].plan_cpu = fftwf_plan_dft_r2c_1d(nsamples,					plans[workerid].Acopy,					plans[workerid].localout_cpu,					FFTW_ESTIMATE);		plans[workerid].inv_plan_cpu = fftwf_plan_dft_c2r_1d(nsamples,					plans[workerid].localout_cpu,					plans[workerid].Acopy,					FFTW_ESTIMATE);		STARPU_PTHREAD_MUTEX_UNLOCK(&fftw_mutex);		plans[workerid].is_initialized = 1;	}	fftwf_complex *localout = plans[workerid].localout_cpu;	/* copy data into the temporary buffer */	memcpy(plans[workerid].Acopy, localA, nsamples*sizeof(float));	/* FFT */	fftwf_execute(plans[workerid].plan_cpu);	/* filter low freqs */	unsigned lowfreq_index = (LOWFREQ*nsamples)/SAMPLERATE;	memset(&localout[0], 0, lowfreq_index*sizeof(fftwf_complex));	/* filter high freqs */	unsigned hifreq_index = (HIFREQ*nsamples)/SAMPLERATE;	memset(&localout[hifreq_index], nsamples/2, (nsamples/2 - hifreq_index)*sizeof(fftwf_complex));	/* inverse FFT */	fftwf_execute(plans[workerid].inv_plan_cpu);	/* copy data into the temporary buffer */	memcpy(localA, plans[workerid].Acopy, nsamples*sizeof(float));	/* FFTW does not normalize its output ! */	/* TODO use BLAS ?*/	int i;	for (i = 0; i < nsamples; i++)		localA[i] /= nsamples;}struct starpu_perfmodel band_filter_model ={	.type = STARPU_HISTORY_BASED,	.symbol = "FFT_band_filter"};static struct starpu_codelet band_filter_cl ={	.modes = { STARPU_RW },#ifdef STARPU_USE_CUDA	.cuda_funcs = {band_filter_kernel_gpu},	.cuda_flags = {STARPU_CUDA_ASYNC},#endif	.cpu_funcs = {band_filter_kernel_cpu},	.model = &band_filter_model,	.nbuffers = 1};void callback(void *arg){	/* do some accounting */	int id = starpu_worker_get_id();	task_per_worker[id]++;}void create_starpu_task(unsigned iter){	int ret;	struct starpu_task *task = starpu_task_create();	task->cl = &band_filter_cl;	task->handles[0] = starpu_data_get_sub_data(A_handle, 1, iter);	task->callback_func = callback;	task->callback_arg = NULL;	ret = starpu_task_submit(task);	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");}static void init_problem(void){	infile = fopen(inputfilename, "r");	if (outputfilename)		outfile = fopen(outputfilename, "w+");#if SAVE_RAW	infile_raw = fopen("input.raw", "w");	outfile_raw = fopen("output.raw", "w");#endif	/* copy input's header into output WAV  */	if (outputfilename)		copy_wav_header(infile, outfile);	/* read length of input WAV's data */	/* each element is 2 bytes long (16bits)*/	length_data = get_wav_data_bytes_length(infile)/2;	while (nsamples > length_data)		nsamples /= 2;	/* allocate a buffer to store the content of input file */	if (use_pin)	{		starpu_malloc((void **)&A, length_data*sizeof(float));	}	else	{		A = malloc(length_data*sizeof(float));	}	/* allocate working buffer (this could be done online, but we'll keep it simple) */	/* starpu_data_malloc_pinned_if_possible((void **)&outdata, length_data*sizeof(fftwf_complex)); */	/* read input data into buffer "A" */	read_16bit_wav(infile, length_data, A, infile_raw);}static void parse_args(int argc, char **argv){	int i;	for (i = 1; i < argc; i++)	{		if (strcmp(argv[i], "-h") == 0)		{			fprintf(stderr, "Usage: %s [-pin] [-nsamples block_size] [-i input.wav] [-o output.wav | -no-output] [-h]\n", argv[0]);			exit(-1);		}		if (strcmp(argv[i], "-i") == 0)		{			inputfilename = argv[++i];		}		if (strcmp(argv[i], "-o") == 0)		{			outputfilename = argv[++i];		}		if (strcmp(argv[i], "-no-output") == 0)		{			outputfilename = NULL;		}		/* block size */		if (strcmp(argv[i], "-nsamples") == 0)		{			char *argptr;			nsamples = strtol(argv[++i], &argptr, 10);		}		if (strcmp(argv[i], "-pin") == 0)		{			use_pin = 1;		}	}}int main(int argc, char **argv){	unsigned iter;	int ret;	parse_args(argc, argv);	fprintf(stderr, "Reading input data\n");	init_problem();	unsigned niter = length_data/nsamples;	fprintf(stderr, "input: %s\noutput: %s\n#chunks %u\n", inputfilename, outputfilename, niter);	/* launch StarPU */	ret = starpu_init(NULL);	if (ret == -ENODEV)		return 77;	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");	starpu_cublas_init();	starpu_vector_data_register(&A_handle, STARPU_MAIN_RAM, (uintptr_t)A, niter*nsamples, sizeof(float));	struct starpu_data_filter f =	{		.filter_func = starpu_vector_filter_block,		.nchildren = niter	};	starpu_data_partition(A_handle, &f);	for (iter = 0; iter < niter; iter++)		starpu_data_set_wt_mask(starpu_data_get_sub_data(A_handle, 1, iter), 1<<0);	start = starpu_timing_now();	for (iter = 0; iter < niter; iter++)	{		create_starpu_task(iter);	}	starpu_task_wait_for_all();	end = starpu_timing_now();	double timing = end - start;	fprintf(stderr, "Computation took %2.2f ms\n", timing/1000);	int worker;	for (worker = 0; worker < STARPU_NMAXWORKERS; worker++)	{		if (task_per_worker[worker])		{			char name[32];			starpu_worker_get_name(worker, name, 32);			unsigned long bytes = nsamples*sizeof(float)*task_per_worker[worker];			fprintf(stderr, "\t%s -> %2.2f MB\t%2.2f\tMB/s\t%2.2f %%\n", name, (1.0*bytes)/(1024*1024), bytes/timing, (100.0*task_per_worker[worker])/niter);		}	}	if (outputfilename)		fprintf(stderr, "Writing output data\n");	/* make sure that the output is in RAM before quitting StarPU */	starpu_data_unpartition(A_handle, STARPU_MAIN_RAM);	starpu_data_unregister(A_handle);	starpu_cublas_shutdown();	/* we are done ! */	starpu_shutdown();	fclose(infile);	if (outputfilename)	{		write_16bit_wav(outfile, length_data, A, outfile_raw);		fclose(outfile);	}#if SAVE_RAW	fclose(infile_raw);	fclose(outfile_raw);#endif	return 0;}
 |