瀏覽代碼

- Add prev and next fields in the task structure for the convenience of the scheduler.
- Provide helpers to manipulate lists of tasks.
- The pop_every_task method now directly returns a task structure

Cédric Augonnet 15 年之前
父節點
當前提交
3c95481321

+ 1 - 0
Makefile.am

@@ -32,6 +32,7 @@ include_HEADERS = 				\
 	include/starpu_data_filters.h		\
 	include/starpu_data_interfaces.h	\
 	include/starpu_task.h			\
+	include/starpu_task_list.h		\
 	include/starpu_data.h			\
 	include/starpu_perfmodel.h		\
 	include/starpu_util.h			\

+ 1 - 6
examples/scheduler/dummy_sched.c

@@ -14,11 +14,7 @@
  * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  */
 
-#include <sys/time.h>
 #include <pthread.h>
-#include <stdio.h>
-#include <unistd.h>
-
 #include <starpu.h>
 
 #define NTASKS	32000
@@ -57,8 +53,7 @@ static void init_dummy_sched(struct starpu_machine_topology_s *topology,
 static void deinit_dummy_sched(struct starpu_machine_topology_s *topology,
 				struct starpu_sched_policy_s *policy)
 {
-	STARPU_ASSERT(sched_list.head == NULL);
-	STARPU_ASSERT(sched_list.tail == NULL);
+	STARPU_ASSERT((sched_list.head == NULL) && (sched_list.tail == NULL));
 
 	pthread_cond_destroy(&sched_cond);
 	pthread_mutex_destroy(&sched_mutex);

+ 1 - 0
include/starpu.h

@@ -33,6 +33,7 @@ typedef unsigned long long uint64_t;
 #include <starpu_data.h>
 #include <starpu_perfmodel.h>
 #include <starpu_task.h>
+#include <starpu_task_list.h>
 #include <starpu_scheduler.h>
 #include <starpu_expert.h>
 

+ 4 - 7
include/starpu_scheduler.h

@@ -64,13 +64,10 @@ struct starpu_sched_policy_s {
 	int (*push_prio_task)(struct starpu_task *);
 	struct starpu_task *(*pop_task)(void);
 
-	/* returns the number of tasks that were retrieved 
- 	 * the function is reponsible for allocating the output but the driver
- 	 * has to free it 
- 	 *
- 	 * NB : this function is non blocking
- 	 * */
-	struct starpu_task_list *(*pop_every_task)(uint32_t where);
+	 /* Remove all available tasks from the scheduler (tasks are chained by
+	  * the means of the prev and next fields of the starpu_task
+	  * structure). */
+	struct starpu_task *(*pop_every_task)(uint32_t where);
 
 	/* name of the policy (optionnal) */
 	const char *policy_name;

+ 5 - 6
include/starpu_task.h

@@ -18,8 +18,8 @@
 #define __STARPU_TASK_H__
 
 #include <errno.h>
-#include <starpu_config.h>
 #include <starpu.h>
+#include <starpu_config.h>
 
 #ifdef STARPU_USE_CUDA
 #include <cuda.h>
@@ -131,17 +131,16 @@ struct starpu_task {
 	 * scheduling strategy uses performance models. */
 	double predicted;
 
+	/* This field are provided for the convenience of the scheduler. */
+	struct starpu_task *prev;
+	struct starpu_task *next;
+
 	/* this is private to StarPU, do not modify. If the task is allocated
 	 * by hand (without starpu_task_create), this field should be set to
 	 * NULL. */
 	void *starpu_private;
 };
 
-struct starpu_task_list { 
-	struct starpu_task *task;
-	struct starpu_task_list *next;
-};
-
 /* It is possible to initialize statically allocated tasks with this value.
  * This is equivalent to initializing a starpu_task structure with the
  * starpu_task_init function. */

+ 54 - 0
include/starpu_task_list.h

@@ -0,0 +1,54 @@
+/*
+ * StarPU
+ * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __STARPU_TASK_LIST_H__
+#define __STARPU_TASK_LIST_H__
+
+#include <starpu_task.h>
+
+struct starpu_task_list {
+	struct starpu_task *head;
+	struct starpu_task *tail;
+};
+
+/* Initialize a list structure */
+void starpu_task_list_init(struct starpu_task_list *list);
+
+/* Push a task at the front of a list */
+void starpu_task_list_push_front(struct starpu_task_list *list,	struct starpu_task *task);
+
+/* Push a task at the back of a list */
+void starpu_task_list_push_back(struct starpu_task_list *list, struct starpu_task *task);
+
+/* Get the front of the list (without removing it) */
+struct starpu_task *starpu_task_list_front(struct starpu_task_list *list);
+
+/* Get the back of the list (without removing it) */
+struct starpu_task *starpu_task_list_back(struct starpu_task_list *list);
+
+/* Test if a list is empty */
+int starpu_task_list_empty(struct starpu_task_list *list);
+
+/* Remove an element from the list */
+void starpu_task_list_erase(struct starpu_task_list *list, struct starpu_task *task);
+
+/* Remove the element at the front of the list */
+struct starpu_task *starpu_task_pop_front(struct starpu_task_list *list);
+
+/* Remove the element at the back of the list */
+struct starpu_task *starpu_task_pop_back(struct starpu_task_list *list);
+						
+#endif // __STARPU_TASK_LIST_H__

+ 1 - 0
src/Makefile.am

@@ -176,6 +176,7 @@ libstarpu_la_SOURCES = 						\
 	util/starpu_cublas.c					\
 	util/file.c						\
 	util/starpu_insert_task.c				\
+	util/starpu_task_list.c					\
 	debug/latency.c						\
 	profiling/profiling.c					\
 	profiling/bus_profiling_helpers.c

+ 9 - 9
src/core/dependencies/implicit_data_deps.c

@@ -82,7 +82,7 @@ void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_
 			
 				/* Count the readers */
 				unsigned nreaders = 0;
-				struct starpu_task_list *l;
+				struct starpu_task_wrapper_list *l;
 				l = handle->last_submitted_readers;
 				while (l)
 				{
@@ -99,7 +99,7 @@ void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_
 					STARPU_ASSERT(l->task);
 					task_array[i++] = l->task;
 
-					struct starpu_task_list *prev = l;
+					struct starpu_task_wrapper_list *prev = l;
 					l = l->next;
 					free(prev);
 				}
@@ -133,7 +133,7 @@ void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_
 			STARPU_ASSERT(post_sync_task);
 	
 			/* Add this task to the list of readers */
-			struct starpu_task_list *link = malloc(sizeof(struct starpu_task_list));
+			struct starpu_task_wrapper_list *link = malloc(sizeof(struct starpu_task_wrapper_list));
 			link->task = post_sync_task;
 			link->next = handle->last_submitted_readers;
 			handle->last_submitted_readers = link;
@@ -221,12 +221,12 @@ void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *tas
 
 		/* Same if this is one of the readers: we go through the list
 		 * of readers and remove the task if it is found. */
-		struct starpu_task_list *l;
+		struct starpu_task_wrapper_list *l;
 		l = handle->last_submitted_readers;
-		struct starpu_task_list *prev = NULL;
+		struct starpu_task_wrapper_list *prev = NULL;
 		while (l)
 		{
-			struct starpu_task_list *next = l->next;
+			struct starpu_task_wrapper_list *next = l->next;
 
 			if (l->task == task)
 			{
@@ -277,7 +277,7 @@ void _starpu_add_post_sync_tasks(struct starpu_task *post_sync_task, starpu_data
 	{
 		handle->post_sync_tasks_cnt++;
 
-		struct starpu_task_list *link = malloc(sizeof(struct starpu_task_list));
+		struct starpu_task_wrapper_list *link = malloc(sizeof(struct starpu_task_wrapper_list));
 		link->task = post_sync_task;
 		link->next = handle->post_sync_tasks;
 		handle->post_sync_tasks = link;		
@@ -288,7 +288,7 @@ void _starpu_add_post_sync_tasks(struct starpu_task *post_sync_task, starpu_data
 
 void _starpu_unlock_post_sync_tasks(starpu_data_handle handle)
 {
-	struct starpu_task_list *post_sync_tasks = NULL;
+	struct starpu_task_wrapper_list *post_sync_tasks = NULL;
 	unsigned do_submit_tasks = 0;
 
 	PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
@@ -311,7 +311,7 @@ void _starpu_unlock_post_sync_tasks(starpu_data_handle handle)
 
 	if (do_submit_tasks)
 	{
-		struct starpu_task_list *link = post_sync_tasks;
+		struct starpu_task_wrapper_list *link = post_sync_tasks;
 
 		while (link) {
 			/* There is no need to depend on that task now, since it was already unlocked */

+ 14 - 18
src/core/mechanisms/fifo_queues.c

@@ -99,13 +99,13 @@ struct starpu_task *_starpu_fifo_pop_task(struct starpu_fifo_jobq_s *fifo_queue)
 }
 
 /* pop every task that can be executed on the calling driver */
-struct starpu_task_list *_starpu_fifo_pop_every_task(struct starpu_fifo_jobq_s *fifo_queue, pthread_mutex_t *sched_mutex, uint32_t where)
+struct starpu_task *_starpu_fifo_pop_every_task(struct starpu_fifo_jobq_s *fifo_queue, pthread_mutex_t *sched_mutex, uint32_t where)
 {
 	struct starpu_job_list_s *old_list;
 	unsigned size;
 
-	struct starpu_task_list *new_list = NULL;
-	struct starpu_task_list *new_list_tail = NULL;
+	struct starpu_task *new_list = NULL;
+	struct starpu_task *new_list_tail = NULL;
 	
 	PTHREAD_MUTEX_LOCK(sched_mutex);
 
@@ -124,31 +124,27 @@ struct starpu_task_list *_starpu_fifo_pop_every_task(struct starpu_fifo_jobq_s *
 			i  = next_job)
 		{
 			next_job = starpu_job_list_next(i);
+			struct starpu_task *task = i->task;
 
-			if (i->task->cl->where & where)
+			if (task->cl->where & where)
 			{
 				/* this elements can be moved into the new list */
 				new_list_size++;
 				
 				starpu_job_list_erase(old_list, i);
 
-				if (new_list)
+				if (new_list_tail)
 				{
-					struct starpu_task_list *link;
-
-					link = malloc(sizeof(struct starpu_task_list));
-					link->task = i->task;
-					link->next = NULL;
-
-					new_list_tail->next = link;
-					new_list_tail = link;
-					
+					new_list_tail->next = task;
+					task->prev = new_list_tail;
+					task->next = NULL;
+					new_list_tail = task;
 				}
 				else {
-					new_list = malloc(sizeof(struct starpu_task_list));
-					new_list->task = i->task;
-					new_list->next = NULL;
-					new_list_tail = new_list;
+					new_list = task;
+					new_list_tail = task;
+					task->prev = NULL;
+					task->next = NULL;
 				}
 			}
 		}

+ 1 - 1
src/core/mechanisms/fifo_queues.h

@@ -44,6 +44,6 @@ int _starpu_fifo_push_task(struct starpu_fifo_jobq_s *fifo, pthread_mutex_t *sch
 int _starpu_fifo_push_prio_task(struct starpu_fifo_jobq_s *fifo, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, struct starpu_task *task);
 
 struct starpu_task *_starpu_fifo_pop_task(struct starpu_fifo_jobq_s *fifo);
-struct starpu_task_list *_starpu_fifo_pop_every_task(struct starpu_fifo_jobq_s *fifo, pthread_mutex_t *sched_mutex, uint32_t where);
+struct starpu_task *_starpu_fifo_pop_every_task(struct starpu_fifo_jobq_s *fifo, pthread_mutex_t *sched_mutex, uint32_t where);
 
 #endif // __FIFO_QUEUES_H__

+ 3 - 3
src/core/policies/deque_modeling_policy.c

@@ -44,9 +44,9 @@ static struct starpu_task *dm_pop_task(void)
 	return task;
 }
 
-static struct starpu_task_list *dm_pop_every_task(uint32_t where)
+static struct starpu_task *dm_pop_every_task(uint32_t where)
 {
-	struct starpu_task_list *new_list;
+	struct starpu_task *new_list;
 
 	int workerid = starpu_worker_get_id();
 
@@ -56,7 +56,7 @@ static struct starpu_task_list *dm_pop_every_task(uint32_t where)
 
 	while (new_list)
 	{
-		double model = new_list->task->predicted;
+		double model = new_list->predicted;
 
 		fifo->exp_len -= model;
 		fifo->exp_start = _starpu_timing_now() + model;

+ 1 - 1
src/core/policies/eager_central_policy.c

@@ -60,7 +60,7 @@ static int push_prio_task_eager_policy(struct starpu_task *task)
 	return _starpu_fifo_push_prio_task(fifo, &sched_mutex, &sched_cond, task);
 }
 
-static struct starpu_task_list *pop_every_task_eager_policy(uint32_t where)
+static struct starpu_task *pop_every_task_eager_policy(uint32_t where)
 {
 	return _starpu_fifo_pop_every_task(fifo, &sched_mutex, where);
 }

+ 8 - 2
src/datawizard/coherency.h

@@ -69,6 +69,12 @@ struct starpu_jobid_list {
 	struct starpu_jobid_list *next;
 };
 
+/* This structure describes a simply-linked list of task */
+struct starpu_task_wrapper_list {
+	struct starpu_task *task;
+	struct starpu_task_wrapper_list *next;
+};
+
 struct starpu_data_state_t {
 	struct starpu_data_requester_list_s *req_list;
 	/* the number of requests currently in the scheduling engine
@@ -127,7 +133,7 @@ struct starpu_data_state_t {
 	 * sequential_consistency flag is enabled. */
 	starpu_access_mode last_submitted_mode;
 	struct starpu_task *last_submitted_writer;
-	struct starpu_task_list *last_submitted_readers;
+	struct starpu_task_wrapper_list *last_submitted_readers;
 
 #ifdef STARPU_USE_FXT
 	/* If FxT is enabled, we keep track of "ghost dependencies": that is to
@@ -141,7 +147,7 @@ struct starpu_data_state_t {
 	struct starpu_jobid_list *last_submitted_ghost_readers_id;
 #endif
 	
-	struct starpu_task_list *post_sync_tasks;
+	struct starpu_task_wrapper_list *post_sync_tasks;
 	unsigned post_sync_tasks_cnt;
 };
 

+ 113 - 0
src/util/starpu_task_list.c

@@ -0,0 +1,113 @@
+/*
+ * StarPU
+ * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+
+void starpu_task_list_init(struct starpu_task_list *list)
+{
+	list->head = NULL;
+	list->tail = NULL;
+}
+
+void starpu_task_list_push_front(struct starpu_task_list *list,
+				struct starpu_task *task)
+{
+	if (list->tail == NULL)
+	{
+		list->tail = task;
+	}
+	else {
+		list->head->prev = task;
+	}
+
+	task->prev = NULL;
+	task->next = list->head;
+	list->head = task;
+}
+
+void starpu_task_list_push_back(struct starpu_task_list *list,
+				struct starpu_task *task)
+{
+	if (list->head == NULL)
+	{
+		list->head = task;
+	}
+	else {
+		list->tail->next = task;
+	}
+
+	task->next = NULL;
+	task->prev = list->tail;
+	list->tail = task;
+}
+
+struct starpu_task *starpu_task_list_front(struct starpu_task_list *list)
+{
+	return list->head;
+}
+
+struct starpu_task *starpu_task_list_back(struct starpu_task_list *list)
+{
+	return list->tail;
+}
+
+int starpu_task_list_empty(struct starpu_task_list *list)
+{
+	return (list->head == NULL);
+}
+
+void starpu_task_list_erase(struct starpu_task_list *list,
+				struct starpu_task *task)
+{
+	struct starpu_task *p = task->prev;
+
+	if (p)
+	{
+		p->next = task->next;
+	}
+	else {
+		list->head = task->next;
+	}
+
+	if (task->next)
+	{
+		task->next->prev = p;
+	}
+	else {
+		list->tail = p;
+	}
+
+	task->prev = NULL;
+	task->next = NULL;
+}
+
+struct starpu_task *starpu_task_pop_front(struct starpu_task_list *list)
+{
+	struct starpu_task *task = list->head;
+
+	starpu_task_list_erase(list, task);
+
+	return task;
+}
+						
+struct starpu_task *starpu_task_pop_back(struct starpu_task_list *list)
+{
+	struct starpu_task *task = list->tail;
+
+	starpu_task_list_erase(list, task);
+
+	return task;
+}