瀏覽代碼

Add a parameter to the method to pop every tasks from a queue so that we are
sure that the resulting queues can be translated in Gordon job lists directly.

Cédric Augonnet 16 年之前
父節點
當前提交
387c34ccb3

+ 46 - 42
src/core/mechanisms/fifo_queues.c

@@ -156,9 +156,10 @@ job_t fifo_pop_task(struct jobq_s *q)
 	return j;
 }
 
-struct job_list_s * fifo_pop_every_task(struct jobq_s *q)
+/* pop every task that can be executed on the calling driver */
+struct job_list_s * fifo_pop_every_task(struct jobq_s *q, uint32_t where)
 {
-	struct job_list_s *list;
+	struct job_list_s *new_list, *old_list;
 	unsigned size;
 	
 	STARPU_ASSERT(q);
@@ -169,28 +170,55 @@ struct job_list_s * fifo_pop_every_task(struct jobq_s *q)
 	size = fifo_queue->njobs;
 
 	if (size == 0) {
-		list = NULL;
+		new_list = NULL;
 	}
 	else {
-		/* directly use the existing list of jobs */
-		list = fifo_queue->jobq;
-
-	//	fprintf(stderr, "DEBUG, fifo_pop_every_task promised %d got %d\n",  size, job_list_size(list));
-		
-		/* the FIFO is now a new empty list */
-		fifo_queue->jobq = job_list_new();
-		fifo_queue->njobs = 0;
-
-		/* we are sure that we got it now, so at worst, some people thought
-		 * there remained some work and will soon discover it is not true */
-		pthread_mutex_lock(sched_mutex);
-		total_number_of_jobs -= size;
-		pthread_mutex_unlock(sched_mutex);
+		old_list = fifo_queue->jobq;
+		new_list = job_list_new();
+
+		unsigned new_list_size = 0;
+
+		job_itor_t i;
+		job_t next_job;
+		/* note that this starts at the _head_ of the list, so we put
+ 		 * elements at the back of the new list */
+		for(i = job_list_begin(old_list);
+			i != job_list_end(old_list);
+			i  = next_job)
+		{
+			next_job = job_list_next(i);
+
+			if (i->task->cl->where & where)
+			{
+				/* this elements can be moved into the new list */
+				new_list_size++;
+				
+				job_list_erase(old_list, i);
+				job_list_push_back(new_list, i);
+			}
+		}
+
+		if (new_list_size == 0)
+		{
+			/* the new list is empty ... */
+			job_list_delete(new_list);
+			new_list = NULL;
+		}
+		else
+		{
+			fifo_queue->njobs -= new_list_size;
+	
+			/* we are sure that we got it now, so at worst, some people thought
+			 * there remained some work and will soon discover it is not true */
+			pthread_mutex_lock(sched_mutex);
+			total_number_of_jobs -= new_list_size;
+			pthread_mutex_unlock(sched_mutex);
+		}
 	}
 
 	pthread_mutex_unlock(&q->activity_mutex);
 
-	return list;
+	return new_list;
 }
 
 /* for work stealing, typically */
@@ -225,27 +253,3 @@ job_t fifo_non_blocking_pop_task(struct jobq_s *q)
 
 	return j;
 }
-
-job_t fifo_non_blocking_pop_task_if_job_exists(struct jobq_s *q)
-{
-	job_t j;
-
-	j = fifo_non_blocking_pop_task(q);
-
-	if (!j) {
-		/* there is no job at all in the entire system : go to sleep ! */
-
-		/* that wait is not an absolute sign that there is some work 
-		 * if there is some, the thread should be awoken, but if there is none 
-		 * at the moment it is awoken, it may simply poll a limited number of 
-		 * times and just get back to sleep */
-		pthread_mutex_lock(sched_mutex);
-
-		if ((total_number_of_jobs == 0) && machine_is_running())
-			pthread_cond_wait(sched_cond, sched_mutex);
-
-		pthread_mutex_unlock(sched_mutex);
-	}
-
-	return j;
-}

+ 1 - 1
src/core/mechanisms/fifo_queues.h

@@ -41,7 +41,7 @@ int fifo_push_task(struct jobq_s *q, job_t task);
 int fifo_push_prio_task(struct jobq_s *q, job_t task);
 
 job_t fifo_pop_task(struct jobq_s *q);
-struct job_list_s * fifo_pop_every_task(struct jobq_s *q);
+struct job_list_s * fifo_pop_every_task(struct jobq_s *q, uint32_t where);
 job_t fifo_non_blocking_pop_task(struct jobq_s *q);
 job_t fifo_non_blocking_pop_task_if_job_exists(struct jobq_s *q);
 

+ 1 - 1
src/core/mechanisms/queues.h

@@ -39,7 +39,7 @@ struct jobq_s {
  	 *
  	 * NB : this function is non blocking
  	 * */
-	struct job_list_s *(*pop_every_task)(struct jobq_s *);
+	struct job_list_s *(*pop_every_task)(struct jobq_s *, uint32_t);
 
 	/* what are the driver that may pop job from that queue ? */
 	uint32_t who;

+ 5 - 4
src/core/policies/sched_policy.c

@@ -163,20 +163,21 @@ struct job_s * pop_task(void)
 	return pop_task_from_queue(queue);
 }
 
-struct job_list_s * pop_every_task_from_queue(struct jobq_s *queue)
+struct job_list_s * pop_every_task_from_queue(struct jobq_s *queue, uint32_t where)
 {
 	STARPU_ASSERT(queue->pop_every_task);
 
-	struct job_list_s *list = queue->pop_every_task(queue);
+	struct job_list_s *list = queue->pop_every_task(queue, where);
 
 	return list;
 }
 
-struct job_list_s *pop_every_task(void)
+/* pop every task that can be executed on "where" (eg. GORDON) */
+struct job_list_s *pop_every_task(uint32_t where)
 {
 	struct jobq_s *queue = policy.get_local_queue(&policy);
 
-	return pop_every_task_from_queue(queue);
+	return pop_every_task_from_queue(queue, where);
 }
 
 void wait_on_sched_event(void)

+ 2 - 2
src/core/policies/sched_policy.h

@@ -53,8 +53,8 @@ void deinit_sched_policy(struct machine_config_s *config);
 int push_task(job_t task);
 struct job_s *pop_task(void);
 struct job_s *pop_task_from_queue(struct jobq_s *queue);
-struct job_list_s *pop_every_task(void);
-struct job_list_s * pop_every_task_from_queue(struct jobq_s *queue);
+struct job_list_s *pop_every_task(uint32_t where);
+struct job_list_s * pop_every_task_from_queue(struct jobq_s *queue, uint32_t where);
 
 void wait_on_sched_event(void);
 

+ 1 - 1
src/drivers/gordon/driver_gordon.c

@@ -363,7 +363,7 @@ void *gordon_worker_inject(struct worker_set_s *arg)
 		else {
 #ifndef NOCHAIN
 			int ret = 0;
-			struct job_list_s *list = pop_every_task();
+			struct job_list_s *list = pop_every_task(GORDON);
 			/* XXX 0 is hardcoded */
 			if (list)
 			{