| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440 | /* StarPU --- Runtime system for heterogeneous multicore architectures. * * Copyright (C) 2016-2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria * Copyright (C) 2017       Erwan Leria * * StarPU is free software; you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation; either version 2.1 of the License, or (at * your option) any later version. * * StarPU is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * * See the GNU Lesser General Public License in COPYING.LGPL for more details. *//* * This reads a sched.rec file and mangles submitted tasks according to the hint * from that file. */#include <starpu.h>#include <unistd.h>#include <stdio.h>#include <math.h>#include <common/uthash.h>#include <common/list.h>#include <common/utils.h>#include <limits.h>/* sched.rec files look like this: SubmitOrder: 1234 Priority: 12 SpecificWorker: 1 Workers: 0 1 2 DependsOn: 1235 Prefetch: 1234 DependsOn: 1233 MemoryNode: 1 Parameters: 1 */#define CPY(src, dst, n) memcpy(dst, src, n * sizeof(*dst))#if 0#define debug(fmt, ...) fprintf(stderr, fmt, ##__VA_ARGS__)#else#define debug(fmt, ...) (void)0#endifstatic unsigned long submitorder; /* Also use as prefetchtag */static int priority;static int eosw;static unsigned workerorder;static int memnode;/* FIXME: MAXs */static uint32_t workers[STARPU_NMAXWORKERS/32];static unsigned nworkers;static unsigned dependson[STARPU_NMAXBUFS];static unsigned ndependson;static unsigned params[STARPU_NMAXBUFS];static unsigned nparams;static enum sched_type{	NormalTask,	PrefetchTask,} sched_type;static struct starpu_codelet cl_prefetch ={        .where = STARPU_NOWHERE,        .nbuffers = 1,        .modes = { STARPU_R },};static struct task{	UT_hash_handle hh;	unsigned long submitorder;	int priority;	int memnode;	unsigned dependson[STARPU_NMAXBUFS];	unsigned ndependson;	struct starpu_task *depends_tasks[STARPU_NMAXBUFS];	/* For real tasks */	int eosw;	unsigned workerorder;	uint32_t workers[STARPU_NMAXWORKERS/32];	unsigned nworkers;	/* For prefetch tasks */	unsigned params[STARPU_NMAXBUFS];	unsigned nparams;	struct starpu_task *pref_task; /* Actual prefetch task */} *mangled_tasks, *prefetch_tasks;LIST_TYPE(dep,	struct task *task;	unsigned i;);struct deps{	UT_hash_handle hh;	unsigned long submitorder;	struct dep_list list;} *dependencies = NULL;static void reset(void){	submitorder = 0;	priority = INT_MIN;	eosw = -1;	memset(&workers, 0, sizeof(workers));	nworkers = 0;	ndependson = 0;	sched_type = NormalTask;	nparams = 0;	memnode = -1;	workerorder = 0;}/* TODO : respecter l'ordre de soumission des tâches SubmitOrder */static void checkField(char * s){	/* Record various information */#define TEST(field) (!strncmp(s, field": ", strlen(field) + 2))	if (TEST("SubmitOrder"))	{		s = s + strlen("SubmitOrder: ");		submitorder = strtol(s, NULL, 10);	}	else if (TEST("Priority"))	{		s = s + strlen("Priority: ");		priority = strtol(s, NULL, 10);	}	else if (TEST("SpecificWorker"))	{		s = s + strlen("SpecificWorker: ");		eosw = strtol(s, NULL, 10);	}	else if (TEST("Workers"))	{		s = s + strlen("Workers: ");		char * delim = " ";		char * token = strtok(s, delim);		int i = 0;		while (token != NULL)		{			int k = strtol(token, NULL, 10);			STARPU_ASSERT_MSG(k < STARPU_NMAXWORKERS, "%d is bigger than maximum %d\n", k, STARPU_NMAXWORKERS);			workers[k/(sizeof(*workers)*8)] |= (1 << (k%(sizeof(*workers)*8)));			i++;			token = strtok(NULL, delim);		}		nworkers = i;	}	else if (TEST("DependsOn"))	{		/* NOTE : dependsons (in the sched.rec)  should be the submit orders of the dependencies,		   otherwise it can occur an undefined behaviour		   (contrary to the tasks.rec where dependencies are jobids */		unsigned i = 0;		char * delim = " ";		char * token = strtok(s+strlen("DependsOn: "), delim);		while (token != NULL)		{			dependson[i] = strtol(token, NULL, 10);			i++;			token = strtok(NULL, delim);		}		ndependson = i;	}	else if (TEST("Prefetch"))	{		s = s + strlen("Prefetch: ");		submitorder = strtol(s, NULL, 10);		sched_type = PrefetchTask;	}	else if (TEST("Parameters"))	{		s = s + strlen("Parameters: ");		char * delim = " ";		char * token = strtok(s, delim);		int i = 0;		while (token != NULL)		{			params[i] = strtol(token, NULL, 10);			i++;			token = strtok(NULL, delim);		}		nparams = i;	}	else if (TEST("MemoryNode"))	{		s = s + strlen("MemoryNode: ");		memnode = strtol(s, NULL, 10);	}	else if (TEST("Workerorder"))	{		s = s + strlen("Workerorder: ");		workerorder = strtol(s, NULL, 10);	}}void schedRecInit(const char * filename){	FILE * f = fopen(filename, "r");	if(f == NULL)	{		fprintf(stderr,"unable to open file %s: %s\n", filename, strerror(errno));		return;	}	size_t lnsize = 128;	char *s;	_STARPU_MALLOC(s, sizeof(*s) * lnsize);	int eof = 0;	reset();	while(!eof && !feof(f))	{		char *ln;		/* Get the line */		if (!fgets(s, lnsize, f))		{			eof = 1;		}		while (!(ln = strchr(s, '\n')))		{			_STARPU_REALLOC(s, lnsize * 2);			if (!fgets(s + lnsize-1, lnsize+1, f))			{				eof = 1;				break;			}			lnsize *= 2;		}		if ((ln == s || eof) && submitorder)		{			/* Empty line, doit */			struct task * task;			unsigned i;			_STARPU_MALLOC(task, sizeof(*task));			task->submitorder = submitorder;			task->priority = priority;			task->memnode = memnode;			CPY(dependson, task->dependson, ndependson);			task->ndependson = ndependson;			/* Also record submitorder of tasks that this one will need to depend on */			for (i = 0; i < ndependson; i++)			{				struct dep *dep;				struct starpu_task *starpu_task;				_STARPU_MALLOC(dep, sizeof(*dep));				dep->task = task;				dep->i = i;				struct deps *deps;				HASH_FIND(hh, dependencies, &task->dependson[i], sizeof(submitorder), deps);				if (!deps)				{					/* No task depends on this one yet, add a cell for it */					_STARPU_MALLOC(deps, sizeof(*deps));					dep_list_init(&deps->list);					deps->submitorder = task->dependson[i];					HASH_ADD(hh, dependencies, submitorder, sizeof(submitorder), deps);				}				dep_list_push_back(&deps->list, dep);				/* Create the intermediate task */				starpu_task = dep->task->depends_tasks[i] = starpu_task_create();				starpu_task->cl = NULL;				starpu_task->destroy = 0;				starpu_task->no_submitorder = 1;			}			switch (sched_type)			{			case NormalTask:				/* A new task to mangle, record what needs to be done */				task->eosw = eosw;				task->workerorder = workerorder;				CPY(workers, task->workers, STARPU_NMAXWORKERS/32);				task->nworkers = nworkers;				STARPU_ASSERT(nparams == 0);				debug("adding mangled task %lu\n", submitorder);				HASH_ADD(hh, mangled_tasks, submitorder, sizeof(submitorder), task);				break;			case PrefetchTask:				STARPU_ASSERT(memnode >= 0);				STARPU_ASSERT(eosw == -1);				STARPU_ASSERT(workerorder == 0);				STARPU_ASSERT(nworkers == 0);				CPY(params, task->params, nparams);				task->nparams = nparams;				/* TODO: more params */				STARPU_ASSERT_MSG(nparams == 1, "only supports one parameter at a time");				debug("adding prefetch task for %lu\n", submitorder);				HASH_ADD(hh, prefetch_tasks, submitorder, sizeof(submitorder), task);				break;			default:				STARPU_ASSERT(0);				break;			}			reset();		}		else checkField(s);	}	fclose(f);}static void do_prefetch(void *arg){	unsigned node = (uintptr_t) arg;	starpu_data_idle_prefetch_on_node(starpu_task_get_current()->handles[0], node, 1);}void applySchedRec(struct starpu_task *starpu_task, unsigned long submit_order){	struct task *task;	struct deps *deps;	int ret;	HASH_FIND(hh, dependencies, &submit_order, sizeof(submit_order), deps);	if (deps)	{		struct dep *dep;		for (dep  = dep_list_begin(&deps->list);		     dep != dep_list_end(&deps->list);		     dep =  dep_list_next(dep))		{			debug("task %lu is %d-th dep for %lu\n", submit_order, dep->i, dep->task->submitorder);			/* Some task will depend on this one, make the dependency */			starpu_task_declare_deps_array(dep->task->depends_tasks[dep->i], 1, &starpu_task);			ret = starpu_task_submit(dep->task->depends_tasks[dep->i]);			STARPU_ASSERT(ret == 0);		}	}	HASH_FIND(hh, prefetch_tasks, &submit_order, sizeof(submit_order), task);	if (task)	{		/* We want to submit a prefetch for this task */		debug("task %lu has a prefetch for parameter %d to node %d\n", submit_order, task->params[0], task->memnode);		struct starpu_task *pref_task;		pref_task = task->pref_task = starpu_task_create();		pref_task->cl = &cl_prefetch;		pref_task->destroy = 1;		pref_task->no_submitorder = 1;		pref_task->callback_arg = (void*)(uintptr_t) task->memnode;		pref_task->callback_func = do_prefetch;		/* TODO: more params */		pref_task->handles[0] = starpu_task->handles[task->params[0]];		/* Make it depend on intermediate tasks */		if (task->ndependson)		{			debug("%u dependencies\n", task->ndependson);			starpu_task_declare_deps_array(pref_task, task->ndependson, task->depends_tasks);		}		ret = starpu_task_submit(pref_task);		STARPU_ASSERT(ret == 0);	}	HASH_FIND(hh, mangled_tasks, &submit_order, sizeof(submit_order), task);       	if (task == NULL)		/* Nothing to do for this */		return;	debug("mangling task %lu\n", submit_order);	if (task->eosw >= 0)	{		debug("execute on a specific worker %d\n", task->eosw);		starpu_task->workerid = task->eosw;		starpu_task->execute_on_a_specific_worker = 1;	}	if (task->workerorder > 0)	{		debug("workerorder %d\n", task->workerorder);		starpu_task->workerorder = task->workerorder;	}	if (task->priority != INT_MIN)	{		debug("priority %d\n", task->priority);		starpu_task->priority = task->priority;	}	if (task->nworkers)	{		debug("%u workers %x\n", task->nworkers, task->workers[0]);		starpu_task->workerids_len = sizeof(task->workers) / sizeof(task->workers[0]);		_STARPU_MALLOC(starpu_task->workerids, task->nworkers * sizeof(*starpu_task->workerids));		CPY(task->workers, starpu_task->workerids, STARPU_NMAXWORKERS/32);	}	if (task->ndependson)	{		debug("%u dependencies\n", task->ndependson);		starpu_task_declare_deps_array(starpu_task, task->ndependson, task->depends_tasks);	}	/* And now, let it go!  */}
 |