瀏覽代碼

draft of scheduling constraints for replay

Erwan Leria 7 年之前
父節點
當前提交
90d12bac77
共有 3 個文件被更改,包括 404 次插入25 次删除
  1. 4 0
      tools/Makefile.am
  2. 371 0
      tools/replay_sched.c
  3. 29 25
      tools/starpu_replay.c

+ 4 - 0
tools/Makefile.am

@@ -249,6 +249,10 @@ bin_PROGRAMS += 			\
 if STARPU_SIMGRID
 bin_PROGRAMS += 			\
 	starpu_replay
+
+starpu_replay_SOURCES = \
+	starpu_replay.c
+#	replay_sched.c
 endif
 
 starpu_perfmodel_plot_CPPFLAGS = $(AM_CFLAGS) $(AM_CPPFLAGS) $(FXT_CFLAGS)

+ 371 - 0
tools/replay_sched.c

@@ -0,0 +1,371 @@
+/*SCHED.REC*/
+
+#include <starpu.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <math.h>
+#include <common/uthash.h>
+#include <common/utils.h>
+
+
+#define CPY(src, dst, n) memcpy(dst, src, n * sizeof(*dst))
+
+static unsigned eosw;
+static unsigned priority;
+static unsigned workerorder;
+static unsigned workers[STARPU_NMAXWORKERS];
+static unsigned dependson[STARPU_NMAXBUFS];
+static unsigned nworkers;
+static unsigned nparam;
+static unsigned memnode;
+static unsigned ndependson;
+static unsigned submitorder; /* Also use as prefetchtag */
+static unsigned parameters[STARPU_NMAXBUFS];
+
+static unsigned add_to_hash = 0;
+static int sched_type = -1;
+/* sched_type is called s_type in the structure struct task 
+   - If s_type == -1, no task has been added to the structure
+   - If s_type == 0, a false task has been added to the structure (at index 1)
+   - If s_type == 1, scheduling info has been added into the non-false task (at index 0) of the structure
+   - If s_type == 2, a false and a true task have been added into the structure
+*/
+
+static struct starpu_codelet cl_prefetch = {
+        .where = STARPU_NOWHERE,
+        .nbuffers = 1,
+        .modes = { STARPU_R },
+};
+
+static struct task
+{
+	UT_hash_handle hh;
+	unsigned submitorder;
+	int pref_dep;
+	unsigned send;
+	/* "Prefetch dependence" is the submit order of the dependence for the prefetch, we've chosen to work with only one value, but it can be more (in this case rearrange the code 
+			      and add eventually a new field in this structure named npref_dep or something like that) */
+	struct starpu_task tasks[2]; /* It seems that we only need 2 slots, one for the scheduling info (stored in a task), and another for the false task */
+	struct starpu_task pref_task;
+	unsigned dependson[STARPU_NMAXBUFS];
+	unsigned ndependson;
+	unsigned parameters[STARPU_NMAXBUFS];
+	unsigned nparameters;
+	unsigned memory_node;
+	unsigned s_type;
+} *sched_data;
+
+/* TODO : respecter l'ordre de soumission des tâches SubmitOrder */
+
+// TODO: call SchedRecInit
+
+
+void checkField(char * s)
+{
+	if (!strncmp(s, "SubmitOrder: ", sizeof("SubmitOrder: ")))
+	{
+		s = s + sizeof("SubmitOrder: ");
+		submitorder = strtol(s, NULL, 16);
+		sched_type += 2;
+	}
+
+	else if (!strncmp(s, "Priority: ", sizeof("Prioriyty: ")))
+	{
+		s = s + sizeof("Priority: ");
+		priority = strtol(s, NULL, 10);
+	}
+
+	else if (!strncmp(s, "ExecuteOnSpecificWorker: ", sizeof("ExecuteOnSpecificWorker: ")))
+	{
+		eosw = strtol(s, NULL, 10);
+	}
+
+	else if (!strncmp(s, "Workers: ", sizeof("Workers: ")))
+	{
+		s = s + sizeof("Workers: ");
+		char * delim = " ";
+		char * token = strtok(s, delim);
+		int i = 0;
+		 
+		while (token != NULL)
+		{
+			int k = strtol(token, NULL, 10);
+			workers[k/sizeof(*workers)] |= (1 << (k%(sizeof(*workers))));
+			i++;
+		}
+
+		nworkers = i;
+	}
+
+	else if (!strncmp(s, "DependsOn: ", sizeof("DependsOn: ")))
+	{
+		/* NOTE : dependsons (in the sched.rec)  should be the submit orders of the dependences, 
+		   otherwise it can occur an undefined behaviour
+		   (contrary to the tasks.rec where dependences are jobids */
+		unsigned i = 0;
+		char * delim = " ";
+		char * token = strtok(s+sizeof("DependsOn: "), delim);
+		
+		while (token != NULL)
+		{
+			dependson[i] = strtol(token, NULL, 10);
+			i++;
+		}
+		ndependson = i;
+	}
+
+	else if (!strncmp(s, "PrefetchTag: ", sizeof("PrefetchTag: ")))
+	{
+		s = s + sizeof("PrefecthTag: ");
+		submitorder = strtol(s, NULL, 10);
+		sched_type += 1;
+	}
+
+	else if (!strncmp(s, "Parameters: ", sizeof("Parameters: ")))
+	{
+		s = s + sizeof("Parameters: ");
+		char * delim = " ";
+		char * token = strtok(s, delim);
+		int i = 0;
+		 
+		while (token != NULL)
+		{
+			parameters[i] = strtol(token, NULL, 10);
+			i++;
+		}
+		nparam = i;
+	}
+
+	else if (!strncmp(s, "MemoryNode: ", sizeof("MemoryNode: ")))
+	{
+		s = s + sizeof("MemoryNode: ");
+		memnode = strtol(s, NULL, 10);
+	}
+	
+	else if (!strncmp(s, "Workerorder: ", sizeof("Workerorder: ")))
+	{
+		s = s + sizeof("Workerorder: ");
+		workerorder = strtol(s, NULL, 10);
+	}
+}
+
+
+void recordSchedInfo(FILE * f)
+{
+	size_t lnsize = 128;
+	char * s = malloc(sizeof(*s) * lnsize);
+	
+	while(!feof(f))
+	{
+		fgets(s, lnsize, f); /* Get the line */
+		while(!strcmp(s, "\n")) /* As long as the line is not only a newline symbol (emptyline) do {...} */
+		{	  
+			checkField(s);
+		}
+
+		struct task * task;
+			
+		HASH_FIND(hh, sched_data, &submitorder, sizeof(submitorder), task);
+		
+		if (sched_type == 1) /* Only 2 conditions are possible (== 1 or == 0) */
+		{
+			if (task == NULL)
+			{
+				_STARPU_MALLOC(task, sizeof(*task));
+				task->s_type = sched_type;
+				task->submitorder = submitorder;
+				CPY(dependson, task->dependson, ndependson);
+				task->ndependson = ndependson;
+				task->pref_dep = -1;
+				
+				add_to_hash = 1;
+			}
+
+			else
+			{
+				task->s_type += sched_type;
+				CPY(dependson, task->dependson, ndependson);
+				task->ndependson = ndependson;
+				task->pref_dep = -1;
+			}
+			
+			starpu_task_init(&task->tasks[0]);
+			task->tasks[0].workerorder = workerorder;
+			task->tasks[0].priority = priority;
+			task->tasks[0].workerids = workers;
+			task->tasks[0].workerids_len = nworkers;
+
+			unsigned i;
+			for(i = 0; i < ndependson ; i++)
+			{
+				/* Create false task as dependences (they are added later) */
+				struct task * taskdep;
+				HASH_FIND(hh, sched_data, &dependson[i], sizeof(dependson[i]), taskdep);
+
+				if (taskdep == NULL)
+				{
+					_STARPU_MALLOC(taskdep, sizeof(*taskdep));
+					starpu_task_init(&taskdep->tasks[1]);
+					taskdep->submitorder = dependson[i];
+					taskdep->tasks[1].cl = NULL;
+					taskdep->tasks[1].destroy = 0;
+					taskdep->tasks[1]. no_submitorder = 1;
+
+					HASH_ADD(hh, sched_data, submitorder, sizeof(submitorder), taskdep);
+				}
+			}
+
+			if (add_to_ash)
+				HASH_ADD(hh, sched_data, submitorder, sizeof(submitorder), task)
+		}
+
+		else
+		{
+			if (task == NULL)
+			{
+				_STARPU_MALLOC(task, sizeof(*task));
+				task->s_type = sched_type;
+				task->submitorder = submitorder;
+
+
+				add_to_hash = 1;
+			}
+
+			else
+			{
+				task->s_type += shced_type;
+				
+			}
+
+			task->pref_dep = dependson[0];
+			
+			struct task * deptask;
+			HASH_FIND(hh, sched_data, &task->pref_dep, sizeof(task->pref_dep), deptask);
+
+			if (deptask == NULL)
+			{
+				_STARPU_MALLOC(deptask, sizeof(*deptask));
+				deptask->submitorder = task->pref_dep;
+			}
+
+			deptask->send = 1;
+			deptask->nparameters = nparam;
+			CPY(parameters, deptask->parameters, nparam);
+			
+			starpu_task_create(task->pref_task);
+			deptask->pref_task.cl_prefetch;
+			deptask->pref_task.no_submitorder = 1;
+			deptask->pref_task.destroy = 1;
+
+			HASH_ADD(hh, sched_data, task->pref_dep, sizeof(task->pref_dep), deptask);
+
+		      				
+			task->memory_node = memnode;			
+
+			if (add_to_ash)
+				HASH_ADD(hh, sched_data, submitorder, sizeof(submitorder), task)
+			
+		}
+
+		/* reset some values */
+		sched_type = -1;
+		add_to_hash = 0;
+		
+	}
+}
+
+
+void parsing(FILE * f)
+{
+	recordSchedInfo(f);
+}
+
+void put_info(struct starpu_task * task, unsigned submit_order)
+{
+	struct task * tmptask;
+	HASH_FIND(hh, sched_data, &submit_order, sizeof(submitorder), tmptask);
+
+       	if (tmptask == NULL)
+		return;
+
+	if (tmptask->s_type == 2 || tmptask->s_type == 1)
+	{
+		task->workerorder = tmptask->tasks[0].workerorder;
+		task->priority = tmptask->tasks[0].priority;
+		task->workerids_len = tmptask->tasks[0].workerids_len;
+		CPY(tmptask->tasks[0].workerids, task->workerids, task->workerids_len);
+
+		struct starpu_task * deps[tmptask->ndependson];
+
+		unsigned i;
+		for(i = 0; i < tmptask->ndependson ; i++)
+		{
+			struct task * taskdep;
+			HASH_FIND(hh, sched_data, &tmptask->dependson[i], sizeof(tmptask->dependson[i]), taskdep);
+
+			if (taskdep == NULL)
+			{
+				fprintf(stderr, "Can not find the dependence of task(submitorder: %d) according the sched.rec\n", submit_order);
+				exit(EXIT_FAILURE);
+			}
+
+			deps[i] = &taskdep->tasks[1];
+		}
+
+		/* According to the StarPU documentation, these dependences will be added
+		   to other existing dependences for this task */
+		
+		starpu_task_declare_deps_array(task, tmptask->ndependson, deps);
+	}
+
+	if (tmptask->s_type == 0 || tmptask->s_type == 2)
+	{
+		int ret = starpu_task_submit(&tmptask->tasks[1]);
+		if (ret != 0)
+		{
+			fprintf(stderr, "Unable to submit a the false task (corresponding to a false task of the task with the submitorder: %d)", submit_order);
+		}
+	}
+
+	if(tmptask->pref_dep != -1) /* If the task has a dependence for prefetch */
+	{
+		struct task * receive_data;
+		HASH_FIND(sched_data, &submit_order, sizeof(submit_order), receive_data);
+		/* TODO : mettre le handle de receive_data->task dans task.handles */
+	}
+
+	if (tmptask->send) /* If the task has stored data to be prefetched */
+	{
+		struct task * send_data;
+		HASH_FIND(hh, sched_data, &submit_order, sizeof(submit_order), send_data);
+
+		if(send_data == NULL)
+		{
+			fprintf(stderr, "Unable to send_data data for prefetch (submitorder: %d)", submit_order);
+			exit(EXIT_FAILURE);
+		}
+		CPY(&task.handles, send_data->pref_task.handles[0], send_data)
+		send_data->pref_task.handles[0] = task->handles[0];
+		send_data->pref_task.callback_arg = &tmptask->memory_node;
+		/* NOTE : Do it need a function in .callback_func ? */
+	}
+}
+
+FILE * schedRecInit(const char * filename)
+{
+	FILE * f = fopen(filename, "r");
+	
+	if(f == NULL)
+	{
+		return NULL;
+	}
+	parsing(f);
+	
+	return f;
+}
+
+void applySchedRec(struct starpu_task * task, unsigned submit_order)
+{
+	put_info(task, submit_order);
+	return;
+}

+ 29 - 25
tools/starpu_replay.c

@@ -31,7 +31,9 @@
 #include <common/utils.h>
 #include <starpu_scheduler.h>
 #include <common/rbtree.h>
-#include <common/utils.h>
+
+
+#define REPLAY_NMAX_DEPENDENCIES 8
 
 #define ARRAY_DUP(in, out, n) memcpy(out, in, n * sizeof(*out))
 #define ARRAY_INIT(array, n) memset(array, 0, n * sizeof(*array))
@@ -228,19 +230,20 @@ static void variable_data_register_check(size_t * array_of_size, int nb_handles)
 
 	for (h = 0 ; h < nb_handles ; h++)
 	{
-		if(reg_signal[h]) /* Get the register signal, and if it's 1 do ... */
+		if(reg_signal[h]) /* Get the register signal, if it's 1 do ... */
 		{
 			struct handle * handles_cell;
 
 			_STARPU_MALLOC(handles_cell, sizeof(*handles_cell));
 			STARPU_ASSERT(handles_cell != NULL);
 
-			handles_cell->handle = handles_ptr[h];
-			HASH_ADD(hh, handles_hash, handle, sizeof(handles_ptr[h]), handles_cell);
-
+			handles_cell->handle = handles_ptr[h]; /* Get the hidden key (initial handle from the file) to store it as a key*/
+			
 			starpu_variable_data_register(handles_ptr+h, STARPU_MAIN_RAM, (uintptr_t) 1, array_of_size[h]);
+			
+			handles_cell->mem_ptr = handles_ptr[h]; /* Store the new value of the handle into the hash table */
 
-			handles_cell->mem_ptr = handles_ptr[h];
+			HASH_ADD(hh, handles_hash, handle, sizeof(handles_ptr[h]), handles_cell);
 		}
 	}
 }
@@ -308,7 +311,9 @@ void fix_wontuse_handle(struct task * wontuseTask)
 int submit_tasks(void)
 {
 	/* Add dependencies */
-	struct starpu_rbtree_node * currentNode = starpu_rbtree_first(&tree);
+	
+	const struct starpu_rbtree * tmptree = &tree;
+	struct starpu_rbtree_node * currentNode = starpu_rbtree_first(tmptree);
 
 	while (currentNode != NULL)
 	{
@@ -340,6 +345,7 @@ int submit_tasks(void)
 			if (!(currentTask->iteration == -1))
 				starpu_iteration_push(currentTask->iteration);
 
+			//applySchedRec(&currentTask->task, currentTask->submit)
 			int ret_val = starpu_task_submit(&currentTask->task);
 
 			if (!(currentTask->iteration == -1))
@@ -349,12 +355,12 @@ int submit_tasks(void)
 				return -1;
 
 
-			printf("submitting task %s (%lu, %llu)\n", currentTask->task.name?currentTask->task.name:"anonymous", currentTask->jobid, (unsigned long long) currentTask->task.tag_id /* tag*/);
+			printf("submitting task %s (%lu, %llu)\n", currentTask->task.name?currentTask->task.name:"anonymous", currentTask->jobid, (unsigned long long) currentTask->task.tag_id);
 		}
 
 		else
 		{
-			fix_wontuse_handle(currentTask);
+			fix_wontuse_handle(currentTask); /* Add the handle in the wontuse task */
                         /* FIXME: can not actually work properly since we have
                          * disabled sequential consistency, so we don't have any
                          * easy way to make this wait for the last task that
@@ -365,6 +371,7 @@ int submit_tasks(void)
 		currentNode = starpu_rbtree_next(currentNode);
 
 	}
+
 	return 1;
 }
 
@@ -382,7 +389,7 @@ int main(int argc, char **argv)
 	size_t s_allocated = 128;
 
 	_STARPU_MALLOC(s, s_allocated);
-	dependson_size = 8; /* arbitrary initial value */
+	dependson_size = REPLAY_NMAX_DEPENDENCIES; /* Change the value of REPLAY_NMAX_DEPENCIES to modify the number of dependencies */
 	_STARPU_MALLOC(dependson, dependson_size * sizeof (* dependson));
 	alloc_mode = 1;
 
@@ -392,6 +399,14 @@ int main(int argc, char **argv)
 		exit(EXIT_FAILURE);
 	}
 
+	/*
+	  if (schedRecInit(argv[2]) == NULL)
+	{
+		fprintf(stderr,"unable to open file %s: %s\n", argv[2], strerror(errno));
+		exit(EXIT_FAILURE);
+	}
+	*/
+
 	rec = fopen(argv[1], "r");
 	if (!rec)
 	{
@@ -516,6 +531,7 @@ int main(int argc, char **argv)
 
 						if (error)
 						{
+
 							fprintf(stderr, "[starpu][Warning] Error loading perfmodel symbol %s\n", model);
 							exit(EXIT_FAILURE);
 						}
@@ -557,19 +573,6 @@ int main(int argc, char **argv)
 				ARRAY_DUP(handles_ptr, task->task.handles, nb_parameters);
 			}
 
-
-			// TODO: call applyOrdoRec(task);
-			//
-			// Tag: 1234
-			// Priority: 12
-			// ExecuteOnSpecificWorker: 1
-			// Workers: 0 1 2
-			// DependsOn: 1235
-			//
-			// PrefetchTag: 1234
-			// DependsOn: 1233
-
-
 			/* Add this task to task hash */
 			HASH_ADD(hh, tasks, jobid, sizeof(jobid), task);
 
@@ -646,7 +649,7 @@ int main(int argc, char **argv)
 
 			nb_parameters = count + 1; /* There is one underscore per paramater execept for the last one, that's why we have to add +1 (dirty programming) */
 
-			/* The algorithm determine will determine if it needs static or dynamic arrays */
+			/* This part of the algorithm will determine if it needs static or dynamic arrays */
 			alloc_mode = set_alloc_mode(nb_parameters);
 			arrays_managing(alloc_mode);
 
@@ -671,6 +674,7 @@ int main(int argc, char **argv)
 					/* If it wasn't, then add it to the hash table */
 					if (handles_cell == NULL)
 					{
+						/* Hide the initial handle from the file into the handles array to find it when necessary */
 						handles_ptr[i] = handle_value;
 						reg_signal[i] = 1;
 					}
@@ -775,7 +779,7 @@ eof:
 		HASH_DEL(handles_hash, handle);
 		free(handle);
         }
-
+	
 	struct perfmodel * model_s, * modeltmp;
 	HASH_ITER(hh, model_hash, model_s, modeltmp)
 	{