소스 검색

sched ctx on cuda

Andra Hugo 14 년 전
부모
커밋
9944a64b16

+ 7 - 0
include/starpu_scheduler.h

@@ -107,6 +107,13 @@ struct starpu_sched_ctx {
 	unsigned is_init_sched; /*we keep an init sched which we never delete */
 };
 
+struct starpu_device {
+	int id;
+	uint32_t type; /* what is the type of worker ? */
+};
+
+void starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int *workerids_in_ctx, int nworkerids_in_ctx);
+
 void starpu_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx);
 
 /* When there is no available task for a worker, StarPU blocks this worker on a

+ 2 - 0
include/starpu_util.h

@@ -192,4 +192,6 @@ void starpu_pack_cl_args(char **arg_buffer, size_t *arg_buffer_size, ...);
 }
 #endif
 
+#define STARPU_NMAXSCHEDCTXS 32
+
 #endif // __STARPU_UTIL_H__

+ 15 - 5
simple_ex/Makefile

@@ -1,4 +1,6 @@
 PROG=exemple
+CUDA_SDK_ROOT=/usr/local/cuda/sdk/C
+CUDA_HOME=/usr/local/cuda
 
 .PHONY: all
 
@@ -6,14 +8,22 @@ all: $(PROG)
 
 
 CC      := gcc
-CFLAGS  := $$(pkg-config --cflags libstarpu) -Wall
+NVCC    := /usr/local/cuda/bin/nvcc
+CFLAGS  := $$(pkg-config --cflags libstarpu) -O0 -g #-Wall
 LDFLAGS := $$(pkg-config --libs libstarpu)
+CUDADIR=$(CUDA_HOME)
 
-$(PROG): %: %.o
-	$(CC) -g -rdynamic -o $@ $^ $(LDFLAGS)
+COMMONFLAGS += -I. -I$(CUDADIR)/include  -I$(CUDA_SDK_ROOT)/common/inc -DUNIX -g -Xcompiler
+NVCCFLAGS +=  -I$(CUDA_SDK_ROOT)/common/inc -I. -G
+
+%.o: %.cu
+	$(NVCC) $(CFLAGS) $(COMMONFLAGS) $(NVCCFLAGS) -o $@ -c $< 
 
 %.o: %.c
-	$(CC) $(CFLAGS) -o $@ -c $<
+	$(CC) $(CFLAGS) -o $@ -c $< 
+
+$(PROG): %: %.o %_kernel.o
+	$(CC) -o $@ $^ $(LDFLAGS) -L$(CUDADIR)/lib -lcudart
 
 clean:
-	rm -f $(PROG)
+	rm -f $(PROG) *.o

+ 62 - 44
simple_ex/exemple.c

@@ -1,39 +1,44 @@
-#include <starpu.h>
 #include <stdio.h>
-#include <time.h>
 #include <malloc.h>
+#include <starpu.h>
 
-static inline void my_codelet(void *descr[], void *_args)
+static inline void my_codelet_cpu(void *descr[], void *_args)
 {
   unsigned nx = STARPU_VECTOR_GET_NX(descr[0]);
   float *sub = (float *)STARPU_VECTOR_GET_PTR(descr[0]);
 
   unsigned i;
+
   for(i = 0; i < nx; i++){
     sub[i] *= 5;
   }
 }
 
+extern void my_codelet_gpu(void *descr[], __attribute__ ((unused)) void *_args);
+
+
 static starpu_codelet cl = 
   {
-    .where = STARPU_CPU,
-    .cpu_func = my_codelet,
+    .where = STARPU_CPU|STARPU_CUDA,
+    .cpu_func = my_codelet_cpu,
+    .cuda_func = my_codelet_gpu,
     .nbuffers = 1
   };
 
-void print_vect(float *vect, int size){
+void print_vect(int *vect, int size){
   unsigned i;
   for(i = 0; i < size; i++)
-    printf("%4.1f ", vect[i]);
+    printf("%d ", vect[i]);
   printf("\n");
   
 }
+
 int main(int argc, char **argv)
 {
   srand(time(NULL));
-  float *mat;
+  int *mat;
   unsigned size = 20, children = 5;
-  mat = malloc(size*sizeof(float));
+  mat = (int *)malloc(size*sizeof(int));
 
   unsigned i;
   for(i = 0; i < size; i++)
@@ -50,57 +55,70 @@ int main(int argc, char **argv)
   starpu_init(NULL);
 
   starpu_data_handle dataA;
-  starpu_vector_data_register(&dataA, 0, (uintptr_t)mat, size, sizeof(float));
-
-  starpu_data_set_sequential_consistency_flag(dataA, 0);
-  struct starpu_data_filter f;
-  f.filter_func = starpu_vector_list_filter_func;
-  f.nchildren = children;
-  f.get_nchildren = NULL;
-  f.get_child_ops = NULL;
-  int len[] = {4, 4, 4, 4, 4};
-  f.filter_arg_ptr = len;
-  
+  starpu_vector_data_register(&dataA, 0, (uintptr_t)mat, size, sizeof(mat[00]));
+
+  struct starpu_data_filter f =
+    {
+      .filter_func = starpu_block_filter_func_vector,
+      .nchildren = children,
+      .get_nchildren = NULL,
+      .get_child_ops = NULL
+    };
   starpu_data_partition(dataA, &f);
-  starpu_data_map_filters(dataA, 1, &f);
 
   struct starpu_sched_ctx sched_ctx;
-  int procs[]={1, 2, 3};
+  int procs[] = {1, 2, 3};
   starpu_create_sched_ctx(&sched_ctx, "random", procs, 3);
 
-  struct starpu_task *task = starpu_task_create();
-  task->cl = &cl;
-  task->buffers[0].handle = starpu_data_get_sub_data(dataA, 0);
-  task->buffers[0].mode = STARPU_R;
-  task->name = "first 1 2 3";
-  starpu_task_submit_to_ctx(task, &sched_ctx);
+  unsigned block_id[children];  
+  unsigned j;
+  for(j = 0; j < children; j++){
+    block_id[j] = j;
+    struct starpu_task *task = starpu_task_create();
+    task->cl = &cl;
+    task->synchronous = 1;
+    task->callback_func = NULL;
+    task->buffers[0].handle = starpu_data_get_sub_data(dataA, 1, j);
+    task->buffers[0].mode = STARPU_RW;
+    task->name = "first 1 2 3";  
+    starpu_task_submit_to_ctx(task, &sched_ctx);
+  }
+
 
   struct starpu_sched_ctx sched_ctx2;
   int procs2[]={3, 4, 5, 6, 7};
   starpu_create_sched_ctx(&sched_ctx2, "random", procs2, 5);
 
-  struct starpu_task *task3 = starpu_task_create();
-  task3->cl = &cl;
-  task3->buffers[0].handle = starpu_data_get_sub_data(dataA, 0);
-  task3->buffers[0].mode = STARPU_R;
-  task3->name = "third 3 4 5 6 7";
-  starpu_task_submit_to_ctx(task3, &sched_ctx2);
-
-
-  struct starpu_task *task2 = starpu_task_create();
-  task2->cl = &cl;
-  task2->buffers[0].handle = starpu_data_get_sub_data(dataA, 0);
-  task2->buffers[0].mode = STARPU_R;
-  task2->name = "anything";
-  starpu_task_submit(task2);
+  for(j = 0; j < children; j++){
+    struct starpu_task *task3 = starpu_task_create();
+    task3->cl = &cl;
+    task3->synchronous = 1;
+    task3->callback_func = NULL;
+    task3->buffers[0].handle = starpu_data_get_sub_data(dataA, 1, j);
+    task3->buffers[0].mode = STARPU_RW;
+    task3->name = "third 3 4 5 6 7";
+    starpu_task_submit_to_ctx(task3, &sched_ctx2);
+  }
 
+  for(j = 0; j < children; j++){
+    struct starpu_task *task2 = starpu_task_create();
+    task2->cl = &cl;
+    task2->synchronous = 1;
+    task2->callback_func = NULL;
+    task2->buffers[0].handle = starpu_data_get_sub_data(dataA, 1, j);
+    task2->buffers[0].mode = STARPU_RW;
+    task2->name = "anything";
+    starpu_task_submit(task2);
+  }
   
+  printf("wait for all \n");
   starpu_task_wait_for_all();
- 
   starpu_data_unpartition(dataA, 0);
-  
+
+  printf("data unregister  \n");
   starpu_data_unregister(dataA);
   
+  printf("the end \n");
   starpu_shutdown();
 
   print_vect(mat, size);

+ 0 - 1
src/core/jobs.c

@@ -371,7 +371,6 @@ int _starpu_push_local_task(struct starpu_worker_s *worker, struct starpu_task *
 		return -ENODEV;
 
 	PTHREAD_MUTEX_LOCK(worker->sched_mutex);
-
 	if (back)
 		starpu_task_list_push_back(&worker->local_tasks, task);
 	else

+ 14 - 9
src/core/sched_policy.c

@@ -334,21 +334,25 @@ struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker)
 	/* perhaps there is some local task to be executed first */
 	task = _starpu_pop_local_task(worker);
 
-	if(!task){
+	if(!task)
+	  {
 		struct starpu_sched_ctx *sched_ctx;
 		unsigned i;
-		for(i = 0; i < worker->nctxs; i++){
+		for(i = 0; i < worker->nctxs; i++)
+		  {
 			sched_ctx = worker->sched_ctx[i];
-			if (sched_ctx->sched_policy->pop_task){
+			if (sched_ctx->sched_policy->pop_task)
+			  {
 				task = sched_ctx->sched_policy->pop_task();
 				break;
-			}
-		}
-	}
+			  }
+		  }
+	  }
 
-	if(task){
-	  printf("task %s poped by th %d with strateg %s\n", task->name, worker->workerid, task->sched_ctx->sched_policy->policy_name);
-	}
+	if(task)
+	  {
+		printf("task %s poped by th %d for %d  with strateg %s\n", task->name, worker->workerid, worker->arch, task->sched_ctx->sched_policy->policy_name);
+	  }
 
 	/* Note that we may get a NULL task in case the scheduler was unlocked
 	 * for some reason. */
@@ -440,6 +444,7 @@ void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *po
 			for(j = 0; j < nworkers; j++){
 				if(sched_ctx->workerid[i] == j){
 					struct starpu_worker_s *workerarg = _starpu_get_worker_struct(j);
+
 					workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
 				}
 			}

+ 1 - 1
src/core/task.c

@@ -286,7 +286,7 @@ int starpu_task_submit_to_ctx(struct starpu_task *task, struct starpu_sched_ctx
 }
 
 int starpu_task_submit(struct starpu_task *task){
-  return starpu_task_submit_to_ctx(task, _starpu_get_initial_sched_ctx());
+   return  starpu_task_submit_to_ctx(task, _starpu_get_initial_sched_ctx());
 }
 
 void starpu_display_codelet_stats(struct starpu_codelet_t *cl)

+ 0 - 1
src/datawizard/filters.c

@@ -86,7 +86,6 @@ starpu_data_handle starpu_data_get_sub_data(starpu_data_handle root_handle, unsi
 	{
 		unsigned next_child;
 		next_child = va_arg(pa, unsigned);
-		printf("next child = %d \n", next_child);
 		STARPU_ASSERT(next_child < current_handle->nchildren);
 
 		current_handle = &current_handle->children[next_child];

+ 1 - 0
src/drivers/cpu/driver_cpu.c

@@ -164,6 +164,7 @@ void *_starpu_cpu_worker(void *arg)
 			_starpu_block_worker(workerid, changing_ctx_cond, changing_ctx_mutex);
 			_starpu_decrement_nblocked_ths();
 		}
+
 		PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
   
 		PTHREAD_MUTEX_LOCK(sched_mutex);

+ 23 - 5
src/drivers/cuda/driver_cuda.c

@@ -175,6 +175,7 @@ static int execute_job_on_cuda(starpu_job_t j, struct starpu_worker_s *args)
 		calibrate_model = 1;
 
 	ret = _starpu_fetch_task_input(task, mask);
+
 	if (ret != 0) {
 		/* there was not enough memory, so the input of
 		 * the codelet cannot be fetched ... put the 
@@ -271,27 +272,44 @@ void *_starpu_cuda_worker(void *arg)
 	struct starpu_task *task;
 	int res;
 
+	pthread_cond_t *sched_cond = args->sched_cond;
+        pthread_mutex_t *sched_mutex = args->sched_mutex;
+        pthread_cond_t *changing_ctx_cond = &args->changing_ctx_cond;
+        pthread_mutex_t *changing_ctx_mutex = &args->changing_ctx_mutex;
+
 	while (_starpu_machine_is_running())
 	{
 		STARPU_TRACE_START_PROGRESS(memnode);
 		_starpu_datawizard_progress(memnode, 1);
 		STARPU_TRACE_END_PROGRESS(memnode);
 
-		PTHREAD_MUTEX_LOCK(args->sched_mutex);
+		/*when contex is changing block the threads belonging to it*/
+                PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+
+                if(args->status == STATUS_CHANGING_CTX){
+			_starpu_increment_nblocked_ths(args->nworkers_of_next_ctx);
+			_starpu_block_worker(workerid, changing_ctx_cond, changing_ctx_mutex);
+			_starpu_decrement_nblocked_ths();
+                }
+
+                PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+
+		PTHREAD_MUTEX_LOCK(sched_mutex);
 
 		task = _starpu_pop_task(args);
-	
+
+		if(task) printf("gpu is poping \n");
                 if (task == NULL) 
 		{
 			if (_starpu_worker_can_block(memnode))
-				_starpu_block_worker(workerid, args->sched_cond, args->sched_mutex);
+				_starpu_block_worker(workerid, sched_cond, sched_mutex);
 
-			PTHREAD_MUTEX_UNLOCK(args->sched_mutex);
+			PTHREAD_MUTEX_UNLOCK(sched_mutex);
 
 			continue;
 		};
 
-		PTHREAD_MUTEX_UNLOCK(args->sched_mutex);
+		PTHREAD_MUTEX_UNLOCK(sched_mutex);
 
 		STARPU_ASSERT(task);
 		j = _starpu_get_job_associated_to_task(task);

+ 1 - 0
src/sched_policies/eager_central_policy.c

@@ -64,6 +64,7 @@ static int push_task_eager_policy(struct starpu_task *task, struct starpu_sched_
 		_starpu_increment_nsubmitted_tasks_of_worker(workerid);
 	}
 
+	printf("task %s pushed on central fifo\n", task->name);
 	return _starpu_fifo_push_task(fifo, &sched_mutex, &sched_cond, task);
 }
 

+ 2 - 1
src/sched_policies/random_policy.c

@@ -64,7 +64,8 @@ static int _random_push_task(struct starpu_task *task, unsigned prio, struct sta
 
 	/* we should now have the best worker in variable "selected" */
 	_starpu_increment_nsubmitted_tasks_of_worker(selected);
-	return starpu_push_local_task(selected, task, prio);
+	int n = starpu_push_local_task(selected, task, prio);
+	return n;
 }
 
 static int random_push_prio_task(struct starpu_task *task, struct starpu_sched_ctx *sched_ctx)