瀏覽代碼

replay_sched: fix most sched.rec features

Samuel Thibault 7 年之前
父節點
當前提交
2ae3bc4379
共有 2 個文件被更改,包括 143 次插入59 次删除
  1. 20 8
      tools/starpu_replay.c
  2. 123 51
      tools/starpu_replay_sched.c

+ 20 - 8
tools/starpu_replay.c

@@ -44,6 +44,9 @@
  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
 
+/* TODO: move to core header while moving starpu_replay_sched to core */
+extern void schedRecInit(const char * filename);
+extern void applySchedRec(struct starpu_task * starpu_task, unsigned long submit_order);
 
 /* Enum for normal and "wontuse" tasks */
 enum task_type {NormalTask, WontUseTask};
@@ -314,6 +317,7 @@ int submit_tasks(void)
 	
 	const struct starpu_rbtree * tmptree = &tree;
 	struct starpu_rbtree_node * currentNode = starpu_rbtree_first(tmptree);
+	unsigned long last_submitorder = 0;
 
 	while (currentNode != NULL)
 	{
@@ -321,6 +325,18 @@ int submit_tasks(void)
 
 		if (currentTask->type == NormalTask)
 		{
+			STARPU_ASSERT(currentTask->submit_order >= last_submitorder + 1);
+
+			while (currentTask->submit_order > last_submitorder + 1)
+			{
+				/* Oops, some tasks were not submitted by original application, fake some */
+				struct starpu_task *task = starpu_task_create();
+				int ret;
+				task->cl = NULL;
+				ret = starpu_task_submit(task);
+				STARPU_ASSERT(ret == 0);
+				last_submitorder++;
+			}
 
 			if (currentTask->ndependson > 0)
 			{
@@ -345,7 +361,7 @@ int submit_tasks(void)
 			if (!(currentTask->iteration == -1))
 				starpu_iteration_push(currentTask->iteration);
 
-			//applySchedRec(&currentTask->task, currentTask->submit)
+			applySchedRec(&currentTask->task, currentTask->submit_order);
 			int ret_val = starpu_task_submit(&currentTask->task);
 
 			if (!(currentTask->iteration == -1))
@@ -357,6 +373,7 @@ int submit_tasks(void)
 
 			//printf("submitting task %s (%lu, %llu)\n", currentTask->task.name?currentTask->task.name:"anonymous", currentTask->jobid, (unsigned long long) currentTask->task.tag_id);
 			printf("\rsubmitting task %lu", currentTask->submit_order);
+			last_submitorder++;
 		}
 
 		else
@@ -401,13 +418,8 @@ int main(int argc, char **argv)
 		exit(EXIT_FAILURE);
 	}
 
-	/*
-	  if (schedRecInit(argv[2]) == NULL)
-	{
-		fprintf(stderr,"unable to open file %s: %s\n", argv[2], strerror(errno));
-		exit(EXIT_FAILURE);
-	}
-	*/
+	if (argc >= 3)
+		schedRecInit(argv[2]);
 
 	rec = fopen(argv[1], "r");
 	if (!rec)

+ 123 - 51
tools/starpu_replay_sched.c

@@ -25,30 +25,40 @@
 #include <stdio.h>
 #include <math.h>
 #include <common/uthash.h>
+#include <common/list.h>
 #include <common/utils.h>
 
 /*
  sched.rec files look like this:
 
- Tag: 1234
+ SubmitOrder: 1234
  Priority: 12
- ExecuteOnSpecificWorker: 1
+ SpecificWorker: 1
  Workers: 0 1 2
  DependsOn: 1235
 
  Prefetch: 1234
  DependsOn: 1233
+ MemoryNode: 1
+ Parameters: 1
  */
 
 
 #define CPY(src, dst, n) memcpy(dst, src, n * sizeof(*dst))
 
+#if 0
+#define debug(fmt, ...) fprintf(stderr, fmt, ##__VA_ARGS__)
+#else
+#define debug(fmt, ...) 0
+#endif
+
 static unsigned long submitorder; /* Also use as prefetchtag */
 static int priority;
 static int eosw;
-static int workerorder;
+static unsigned workerorder;
 static int memnode;
-static unsigned workers[STARPU_NMAXWORKERS];
+/* FIXME: MAXs */
+static uint32_t workers[STARPU_NMAXWORKERS/32];
 static unsigned nworkers;
 static unsigned dependson[STARPU_NMAXBUFS];
 static unsigned ndependson;
@@ -79,8 +89,8 @@ static struct task
 
 	/* For real tasks */
 	int eosw;
-	int workerorder;
-	unsigned workers[STARPU_NMAXWORKERS];
+	unsigned workerorder;
+	uint32_t workers[STARPU_NMAXWORKERS/32];
 	unsigned nworkers;
 
 	/* For prefetch tasks */
@@ -89,29 +99,32 @@ static struct task
 	struct starpu_task *pref_task; /* Actual prefetch task */
 } *mangled_tasks, *prefetch_tasks;
 
-static struct dep {
-	UT_hash_handle hh;
-	unsigned long submitorder;
+LIST_TYPE(dep,
 	struct task *task;
 	unsigned i;
-} *dependences;
+);
+
+struct deps {
+	UT_hash_handle hh;
+	unsigned long submitorder;
+	struct dep_list list;
+} *dependencies = NULL;
 
 static void reset(void) {
 	submitorder = 0;
-	priority = 0;
+	priority = INT_MIN;
 	eosw = -1;
+	memset(&workers, 0, sizeof(workers));
 	nworkers = 0;
 	ndependson = 0;
 	sched_type = NormalTask;
 	nparams = 0;
 	memnode = -1;
-	workerorder = -1;
+	workerorder = 0;
 }
 
 /* TODO : respecter l'ordre de soumission des tâches SubmitOrder */
 
-// TODO: call SchedRecInit
-
 
 static void checkField(char * s)
 {
@@ -120,24 +133,25 @@ static void checkField(char * s)
 
 	if (TEST("SubmitOrder"))
 	{
-		s = s + sizeof("SubmitOrder: ");
-		submitorder = strtol(s, NULL, 16);
+		s = s + strlen("SubmitOrder: ");
+		submitorder = strtol(s, NULL, 10);
 	}
 
 	else if (TEST("Priority"))
 	{
-		s = s + sizeof("Priority: ");
+		s = s + strlen("Priority: ");
 		priority = strtol(s, NULL, 10);
 	}
 
-	else if (TEST("ExecuteOnSpecificWorker"))
+	else if (TEST("SpecificWorker"))
 	{
+		s = s + strlen("SpecificWorker: ");
 		eosw = strtol(s, NULL, 10);
 	}
 
 	else if (TEST("Workers"))
 	{
-		s = s + sizeof("Workers: ");
+		s = s + strlen("Workers: ");
 		char * delim = " ";
 		char * token = strtok(s, delim);
 		int i = 0;
@@ -145,8 +159,10 @@ static void checkField(char * s)
 		while (token != NULL)
 		{
 			int k = strtol(token, NULL, 10);
-			workers[k/sizeof(*workers)] |= (1 << (k%(sizeof(*workers))));
+			STARPU_ASSERT_MSG(k < STARPU_NMAXWORKERS, "%d is bigger than maximum %d\n", k, STARPU_NMAXWORKERS);
+			workers[k/(sizeof(*workers)*8)] |= (1 << (k%(sizeof(*workers)*8)));
 			i++;
+			token = strtok(NULL, delim);
 		}
 
 		nworkers = i;
@@ -154,31 +170,32 @@ static void checkField(char * s)
 
 	else if (TEST("DependsOn"))
 	{
-		/* NOTE : dependsons (in the sched.rec)  should be the submit orders of the dependences, 
+		/* NOTE : dependsons (in the sched.rec)  should be the submit orders of the dependencies, 
 		   otherwise it can occur an undefined behaviour
-		   (contrary to the tasks.rec where dependences are jobids */
+		   (contrary to the tasks.rec where dependencies are jobids */
 		unsigned i = 0;
 		char * delim = " ";
-		char * token = strtok(s+sizeof("DependsOn: "), delim);
+		char * token = strtok(s+strlen("DependsOn: "), delim);
 		
 		while (token != NULL)
 		{
 			dependson[i] = strtol(token, NULL, 10);
 			i++;
+			token = strtok(NULL, delim);
 		}
 		ndependson = i;
 	}
 
 	else if (TEST("Prefetch"))
 	{
-		s = s + sizeof("Prefetch: ");
+		s = s + strlen("Prefetch: ");
 		submitorder = strtol(s, NULL, 10);
 		sched_type = PrefetchTask;
 	}
 
 	else if (TEST("Parameters"))
 	{
-		s = s + sizeof("Parameters: ");
+		s = s + strlen("Parameters: ");
 		char * delim = " ";
 		char * token = strtok(s, delim);
 		int i = 0;
@@ -187,19 +204,20 @@ static void checkField(char * s)
 		{
 			params[i] = strtol(token, NULL, 10);
 			i++;
+			token = strtok(NULL, delim);
 		}
 		nparams = i;
 	}
 
 	else if (TEST("MemoryNode"))
 	{
-		s = s + sizeof("MemoryNode: ");
+		s = s + strlen("MemoryNode: ");
 		memnode = strtol(s, NULL, 10);
 	}
 	
 	else if (TEST("Workerorder"))
 	{
-		s = s + sizeof("Workerorder: ");
+		s = s + strlen("Workerorder: ");
 		workerorder = strtol(s, NULL, 10);
 	}
 }
@@ -211,34 +229,37 @@ void schedRecInit(const char * filename)
 	
 	if(f == NULL)
 	{
+		fprintf(stderr,"unable to open file %s: %s\n", filename, strerror(errno));
 		return;
 	}
 
 	size_t lnsize = 128;
 	char * s = malloc(sizeof(*s) * lnsize);
+	int eof = 0;
 	
 	reset();
 
-	while(!feof(f))
+	while(!eof && !feof(f))
 	{
 		char *ln;
 
 		/* Get the line */
 		if (!fgets(s, lnsize, f))
 		{
-			return;
+			eof = 1;
 		}
 		while (!(ln = strchr(s, '\n')))
 		{
 			_STARPU_REALLOC(s, lnsize * 2);
 			if (!fgets(s + lnsize-1, lnsize+1, f))
 			{
-				return;
+				eof = 1;
+				break;
 			}
 			lnsize *= 2;
 		}
 
-		if (ln == s)
+		if ((ln == s || eof) && submitorder)
 		{
 			/* Empty line, doit */
 			struct task * task;
@@ -258,8 +279,17 @@ void schedRecInit(const char * filename)
 				_STARPU_MALLOC(dep, sizeof(*dep));
 				dep->task = task;
 				dep->i = i;
-				dep->submitorder = task->dependson[i];
-				HASH_ADD(hh, dependences, submitorder, sizeof(submitorder), dep);
+
+				struct deps *deps;
+				HASH_FIND(hh, dependencies, &task->dependson[i], sizeof(submitorder), deps);
+				if (!deps) {
+					/* No task depends on this one yet, add a cell for it */
+					_STARPU_MALLOC(deps, sizeof(*deps));
+					dep_list_init(&deps->list);
+					deps->submitorder = task->dependson[i];
+					HASH_ADD(hh, dependencies, submitorder, sizeof(submitorder), deps);
+				}
+				dep_list_push_back(&deps->list, dep);
 
 				/* Create the intermediate task */
 				starpu_task = dep->task->depends_tasks[i] = starpu_task_create();
@@ -267,7 +297,6 @@ void schedRecInit(const char * filename)
 				starpu_task->destroy = 0;
 				starpu_task->no_submitorder = 1;
 			}
-			break;
 
 			switch (sched_type)
 			{
@@ -278,19 +307,29 @@ void schedRecInit(const char * filename)
 				CPY(workers, task->workers, nworkers);
 				task->nworkers = nworkers;
 				STARPU_ASSERT(nparams == 0);
+
+				debug("adding mangled task %lu\n", submitorder);
+				HASH_ADD(hh, mangled_tasks, submitorder, sizeof(submitorder), task);
 				break;
 
 			case PrefetchTask:
+				STARPU_ASSERT(memnode >= 0);
 				STARPU_ASSERT(eosw == -1);
-				STARPU_ASSERT(workerorder == -1);
+				STARPU_ASSERT(workerorder == 0);
 				STARPU_ASSERT(nworkers == 0);
 				CPY(params, task->params, nparams);
 				task->nparams = nparams;
+				/* TODO: more params */
+				STARPU_ASSERT_MSG(nparams == 1, "only supports one parameter at a time");
+
+				debug("adding prefetch task for %lu\n", submitorder);
+				HASH_ADD(hh, prefetch_tasks, submitorder, sizeof(submitorder), task);
+				break;
+			default:
+				STARPU_ASSERT(0);
 				break;
 			}
 
-			HASH_ADD(hh, mangled_tasks, submitorder, sizeof(submitorder), task);
-
 			reset();
 		}
 		else checkField(s);
@@ -303,24 +342,32 @@ static void do_prefetch(void *arg)
 	starpu_data_idle_prefetch_on_node(starpu_task_get_current()->handles[0], node, 1);
 }
 
-void applySchedRec(struct starpu_task * starpu_task, unsigned long submit_order)
+void applySchedRec(struct starpu_task *starpu_task, unsigned long submit_order)
 {
 	struct task *task;
-	struct dep *dep;
+	struct deps *deps;
 	int ret;
 
-	HASH_FIND(hh, dependences, &submit_order, sizeof(submit_order), dep);
-	if (dep)
+	HASH_FIND(hh, dependencies, &submit_order, sizeof(submit_order), deps);
+	if (deps)
 	{
-		/* Some task will depend on this one, make the dependency */
-		starpu_task_declare_deps_array(dep->task->depends_tasks[dep->i], 1, &starpu_task);
-		ret = starpu_task_submit(dep->task->depends_tasks[dep->i]);
-		STARPU_ASSERT(ret == 0);
+		struct dep *dep;
+		for (dep  = dep_list_begin(&deps->list);
+		     dep != dep_list_end(&deps->list);
+		     dep =  dep_list_next(dep))
+		{
+			debug("task %lu is %d-th dep for %lu\n", submit_order, dep->i, dep->task->submitorder);
+			/* Some task will depend on this one, make the dependency */
+			starpu_task_declare_deps_array(dep->task->depends_tasks[dep->i], 1, &starpu_task);
+			ret = starpu_task_submit(dep->task->depends_tasks[dep->i]);
+			STARPU_ASSERT(ret == 0);
+		}
 	}
 
 	HASH_FIND(hh, prefetch_tasks, &submit_order, sizeof(submit_order), task);
 	if (task) {
 		/* We want to submit a prefetch for this task */
+		debug("task %lu has a prefetch for parameter %d to node %d\n", submit_order, task->params[0], task->memnode);
 		struct starpu_task *pref_task;
 		pref_task = task->pref_task = starpu_task_create();
 		pref_task->cl = &cl_prefetch;
@@ -330,10 +377,13 @@ void applySchedRec(struct starpu_task * starpu_task, unsigned long submit_order)
 		pref_task->callback_func = do_prefetch;
 
 		/* TODO: more params */
-		pref_task->handles[0] = starpu_task->handles[0];
+		pref_task->handles[0] = starpu_task->handles[task->params[0]];
 		/* Make it depend on intermediate tasks */
 		if (task->ndependson)
+		{
+			debug("%u dependencies\n", task->ndependson);
 			starpu_task_declare_deps_array(pref_task, task->ndependson, task->depends_tasks);
+		}
 		ret = starpu_task_submit(pref_task);
 		STARPU_ASSERT(ret == 0);
 	}
@@ -343,14 +393,36 @@ void applySchedRec(struct starpu_task * starpu_task, unsigned long submit_order)
 		/* Nothing to do for this */
 		return;
 
-	starpu_task->workerorder = task->workerorder;
-	starpu_task->priority = task->priority;
-	starpu_task->workerids_len = task->nworkers;
-	_STARPU_MALLOC(starpu_task->workerids, task->nworkers * sizeof(*starpu_task->workerids));
-	CPY(task->workers, starpu_task->workerids, task->nworkers);
+	debug("mangling task %lu\n", submit_order);
+	if (task->eosw >= 0)
+	{
+		debug("execute on a specific worker %d\n", task->eosw);
+		starpu_task->workerid = task->eosw;
+		starpu_task->execute_on_a_specific_worker = 1;
+	}
+	if (task->workerorder > 0)
+	{
+		debug("workerorder %d\n", task->workerorder);
+		starpu_task->workerorder = task->workerorder;
+	}
+	if (task->priority != INT_MIN)
+	{
+		debug("priority %d\n", task->priority);
+		starpu_task->priority = task->priority;
+	}
+	if (task->nworkers)
+	{
+		debug("%u workers %x\n", task->nworkers, task->workers[0]);
+		starpu_task->workerids_len = sizeof(task->workers) / sizeof(task->workers[0]);
+		_STARPU_MALLOC(starpu_task->workerids, task->nworkers * sizeof(*starpu_task->workerids));
+		CPY(task->workers, starpu_task->workerids, task->nworkers);
+	}
 
 	if (task->ndependson)
+	{
+		debug("%u dependencies\n", task->ndependson);
 		starpu_task_declare_deps_array(starpu_task, task->ndependson, task->depends_tasks);
+	}
 
 	/* And now, let it go!  */
 }