Browse Source

mic: update version multi thread mic

Thibaud Lambert 12 years ago
parent
commit
ddd0be6ca2

+ 24 - 0
src/drivers/driver_common/driver_common.c

@@ -225,3 +225,27 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int wor
 
 	return task;
 }
+
+
+int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_task ** tasks, int nworkers)
+{
+  int i, count = 0;
+  //_STARPU_DEBUG(" nworkers:%d\n", nworkers);
+
+  for (i = 0 ; (i < nworkers); i++){
+    if(workers[i].current_task)
+      tasks[i] = NULL;
+    else
+      {
+	//_STARPU_DEBUG(" try pop task\n");
+	tasks[i] = _starpu_pop_task(&workers[i]);
+	if(tasks[i] != NULL)
+	  count ++;
+	//else
+	  //_STARPU_DEBUG(" pop task fail\n");
+	  
+      }
+  }
+  //  _STARPU_DEBUG("count:%d\n", count);
+  return count;
+}

+ 1 - 1
src/drivers/driver_common/driver_common.h

@@ -32,5 +32,5 @@ void _starpu_driver_update_job_feedback(struct _starpu_job *j, struct _starpu_wo
 					struct timespec *codelet_start, struct timespec *codelet_end, int profiling);
 
 struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int workerid, unsigned memnode);
-
+int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_task ** tasks, int nworker);
 #endif // __DRIVER_COMMON_H__

+ 16 - 1
src/drivers/mic/driver_mic_common.c

@@ -19,7 +19,6 @@
 #include <drivers/mp_common/mp_common.h>
 #include <drivers/mic/driver_mic_common.h>
 
-
 void _starpu_mic_common_report_scif_error(const char *func, const char *file, const int line, const int status)
 {
 	const char *errormsg = strerror(status);
@@ -37,6 +36,22 @@ void _starpu_mic_common_send(const struct _starpu_mp_node *node, void *msg, int
 		STARPU_MP_COMMON_REPORT_ERROR(node, errno);
 }
 
+
+/* 
+ *
+ */
+
+int _starpu_mic_common_recv_is_ready(const struct _starpu_mp_node *mp_node)
+{
+  struct scif_pollepd pollepd;
+  pollepd.epd = mp_node->mp_connection.mic_endpoint;
+  pollepd.events = SCIF_POLLIN;
+  pollepd.revents = 0;
+  return  scif_poll(&pollepd,1,0);
+	
+}
+
+
 /* Handles the error so the caller (which must be generic) doesn't have to
  * care about it.
  */

+ 2 - 0
src/drivers/mic/driver_mic_common.h

@@ -53,6 +53,8 @@ struct _starpu_mic_free_command
 
 void _starpu_mic_common_report_scif_error(const char *func, const char *file, int line, const int status);
 
+int _starpu_mic_common_recv_is_ready(const struct _starpu_mp_node *mp_node);
+
 void _starpu_mic_common_send(const struct _starpu_mp_node *node, void *msg, int len);
 
 void _starpu_mic_common_recv(const struct _starpu_mp_node *node, void *msg, int len);

+ 19 - 1
src/drivers/mic/driver_mic_sink.c

@@ -27,7 +27,7 @@
 #include "driver_mic_common.h"
 #include "driver_mic_sink.h"
 
-
+#define HYPER_THREAD_NUMBER 4
 
 
 /* Initialize the MIC sink, initializing connection to the source
@@ -137,3 +137,21 @@ void _starpu_mic_sink_free(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUT
 #endif
 	free(addr);
 }
+
+
+/* bind the thread to a core
+ */
+void _starpu_mic_sink_bind_thread(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, cpu_set_t * cpuset, int coreid, pthread_t *thread)
+{
+  int j, ret;
+  //init the set
+  CPU_ZERO(cpuset);
+
+  //adding the core to the set
+  for(j=0;j<HYPER_THREAD_NUMBER;j++)
+    CPU_SET(j+coreid*HYPER_THREAD_NUMBER,cpuset);
+  
+  //affect the thread to the core
+  ret = pthread_setaffinity_np(*thread, sizeof(cpu_set_t), cpuset);
+  STARPU_ASSERT(ret == 0);
+}

+ 1 - 1
src/drivers/mic/driver_mic_sink.h

@@ -41,7 +41,7 @@ unsigned int _starpu_mic_sink_get_nb_core(void);
 
 void _starpu_mic_sink_allocate(const struct _starpu_mp_node *mp_node, void *arg, int arg_size);
 void _starpu_mic_sink_free(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, void *arg, int arg_size);
-
+void _starpu_mic_sink_bind_thread(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, cpu_set_t * cpuset, int coreid, pthread_t *thread);
 
 #endif /* STARPU_USE_MIC */
 

+ 65 - 110
src/drivers/mic/driver_mic_source.c

@@ -153,7 +153,6 @@ _starpu_mic_src_process_completed_job (struct _starpu_worker_set *workerset)
 	struct _starpu_job *j = _starpu_get_job_associated_to_task (task);
 
 	_starpu_mic_src_finalize_job (j, worker);
-
 	worker->current_task = NULL;
 
 	return 0;
@@ -587,116 +586,72 @@ void *_starpu_mic_src_worker(void *arg)
 	STARPU_PTHREAD_COND_SIGNAL(&args->ready_cond);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&args->mutex);
 
-
+	struct starpu_task **tasks = malloc(sizeof(struct starpu_task *)*args->nworkers);
+	
+	/*main loop*/
 	while (_starpu_machine_is_running())
-	{
-		int res;
-		struct starpu_task *task = NULL;
-		struct _starpu_job * j;
-		unsigned micworkerid = 0;
-
-		_STARPU_TRACE_START_PROGRESS(memnode);
-		_starpu_datawizard_progress(memnode, 1);
-		_STARPU_TRACE_END_PROGRESS(memnode);
-
-		STARPU_PTHREAD_MUTEX_LOCK(&baseworker->sched_mutex);
-
-		/* We pop tasklists of each worker in the set and process the
-		 * first non-empty list. */
-		for (micworkerid = 0 ; (micworkerid < args->nworkers) && (task == NULL); micworkerid++)
-		    task = _starpu_pop_task (&args->workers[micworkerid]);
-
-		if (task != NULL) {
-			micworkerid--;
-			goto task_found;
-		}
-
-#if 0 // XXX: synchronous execution for now
-		/* No task to submit, so we can poll the MIC device for
-		 * completed jobs. */
-		struct pollfd fd = {
-		    .fd = mic_nodes[baseworker->mp_nodeid]->mp_connection.mic_endpoint,
-		    .events = POLLIN
-		};
-
-		if (0 < poll (&fd, 1, 0)) {
-		    _starpu_mic_src_process_completed_job (args);
-		    goto restart_loop;
-		}
-#endif
-
-		/* At this point, there is really nothing to do for the thread
-		 * so we can block.
-		 * XXX: blocking drivers is in fact broken. DO NOT USE IT ! */
-		if (_starpu_worker_get_status(baseworkerid) != STATUS_SLEEPING)
-		{
-			_STARPU_TRACE_WORKER_SLEEP_START;
-			_starpu_worker_restart_sleeping(baseworkerid);
-			_starpu_worker_set_status(baseworkerid, STATUS_SLEEPING);
-		}
-
-		if (_starpu_worker_can_block(memnode))
-			STARPU_PTHREAD_COND_WAIT(&baseworker->sched_cond, &baseworker->sched_mutex);
-		else
-		{
-			if (_starpu_machine_is_running())
-				STARPU_UYIELD();
-		}
-
-		if (_starpu_worker_get_status(baseworkerid) == STATUS_SLEEPING)
-		{
-			_STARPU_TRACE_WORKER_SLEEP_END;
-			_starpu_worker_stop_sleeping(baseworkerid);
-			_starpu_worker_set_status(baseworkerid, STATUS_UNKNOWN);
-		}
-
-	restart_loop:
-		STARPU_PTHREAD_MUTEX_UNLOCK(&baseworker->sched_mutex);
-		continue;
-
-	task_found:
-		/* If the MIC core associated to `micworkerid' is already
-		 * processing a job, we push back this one in the worker task
-		 * list. */
-		STARPU_PTHREAD_MUTEX_UNLOCK(&baseworker->sched_mutex);
-
-		if (args->workers[micworkerid].current_task) {
-		    _starpu_push_task_to_workers(task);
-		    continue;
-		}
-
-		STARPU_ASSERT(task);
-		j = _starpu_get_job_associated_to_task(task);
-
-		/* can a MIC device do that task ? */
-		if (!_STARPU_MIC_MAY_PERFORM(j))
-		{
-			/* this isn't a mic task */
-			_starpu_push_task_to_workers(task);
-			continue;
-		}
-
-		args->workers[micworkerid].current_task = j->task;
-
-		res = _starpu_mic_src_execute_job (j, &args->workers[micworkerid]);
-
-		if (res)
-		{
-			switch (res)
-			{
-				case -EAGAIN:
-					_STARPU_DISP("ouch, Xeon Phi could not actually run task %p, putting it back...\n", task);
-					_starpu_push_task_to_workers(task);
-					STARPU_ABORT();
-					continue;
-				default:
-					STARPU_ASSERT(0);
-			}
-		}
-
-		/* XXX: synchronous execution for now */
-		_starpu_mic_src_process_completed_job (args);
-	}
+	  {
+	    int res;
+	    struct _starpu_job * j;
+
+	    _STARPU_TRACE_START_PROGRESS(memnode);
+	    _starpu_datawizard_progress(memnode, 1);
+	    _STARPU_TRACE_END_PROGRESS(memnode);
+
+	    STARPU_PTHREAD_MUTEX_LOCK(&baseworker->sched_mutex);
+
+	    /* get task for each worker*/
+	    res = _starpu_get_multi_worker_task(args->workers, tasks, args->nworkers);
+	    STARPU_PTHREAD_MUTEX_UNLOCK(&baseworker->sched_mutex);
+
+
+	    /* poll the MIC device for completed jobs.*/
+	    if (_starpu_mic_common_recv_is_ready(mic_nodes[args->workers[0].mp_nodeid]))
+	      _starpu_mic_src_process_completed_job (args);
+	   	    
+
+	    /*if at least one worker have pop a task*/
+	    if(res != 0)
+	      {
+		//printf("\n nb_tasks:%d\n", res);
+		_STARPU_DEBUG("\n nb_tasks:%d\n", res);
+		for(i=0; i<args->nworkers; i++)
+		  {
+		    if(tasks[i] != NULL)
+		      {
+			j = _starpu_get_job_associated_to_task(tasks[i]);
+
+			/* can a MIC device do that task ? */
+			if (!_STARPU_MIC_MAY_PERFORM(j))
+			  {
+			    /* this isn't a mic task */
+			    _starpu_push_task_to_workers(tasks[i]);
+			    continue;
+			  }
+
+			args->workers[i].current_task = j->task;
+
+			res = _starpu_mic_src_execute_job (j, &args->workers[i]);
+		
+			if (res)
+			  {
+			    switch (res)
+			      {
+			      case -EAGAIN:
+				_STARPU_DISP("ouch, Xeon Phi could not actually run task %p, putting it back...\n", tasks[i]);
+				_starpu_push_task_to_workers(tasks[i]);
+				STARPU_ABORT();
+				continue;
+			      default:
+				STARPU_ASSERT(0);
+			      }
+			  }
+		      }
+		  }
+	      }
+	  }
+
+	free(tasks);
 
 	_STARPU_TRACE_WORKER_DEINIT_START;
 

+ 9 - 1
src/drivers/mp_common/mp_common.c

@@ -56,11 +56,13 @@ struct _starpu_mp_node * STARPU_ATTRIBUTE_MALLOC
 				node->deinit = _starpu_mic_src_deinit;
 				node->report_error = _starpu_mic_src_report_scif_error;
 
+				node->mp_recv_is_ready = NULL;
 				node->mp_send = _starpu_mic_common_send;
 				node->mp_recv = _starpu_mic_common_recv;
 				node->dt_send = _starpu_mic_common_dt_send;
 				node->dt_recv = _starpu_mic_common_dt_recv;
 
+				node->bind_thread = NULL;
 				node->execute = NULL;
 				node->nbcores = NULL;
 				node->allocate = NULL;
@@ -81,11 +83,13 @@ struct _starpu_mp_node * STARPU_ATTRIBUTE_MALLOC
 				node->deinit = _starpu_mic_sink_deinit;
 				node->report_error = _starpu_mic_sink_report_error;
 
+				node->mp_recv_is_ready = _starpu_mic_common_recv_is_ready;
 				node->mp_send = _starpu_mic_common_send;
 				node->mp_recv = _starpu_mic_common_recv;
 				node->dt_send = _starpu_mic_common_dt_send;
 				node->dt_recv = _starpu_mic_common_dt_recv;
 
+				node->bind_thread = _starpu_mic_sink_bind_thread;
 				node->execute = _starpu_sink_common_execute;
 				node->nbcores = _starpu_sink_nbcores;
 				node->allocate = _starpu_mic_sink_allocate;
@@ -102,7 +106,8 @@ struct _starpu_mp_node * STARPU_ATTRIBUTE_MALLOC
 				node->init = _starpu_scc_src_init;
 				node->deinit = NULL;
 				node->report_error = _starpu_scc_common_report_rcce_error;
-
+				
+				node->mp_recv_is_ready = NULL;
 				node->mp_send = _starpu_scc_common_send;
 				node->mp_recv = _starpu_scc_common_recv;
 				node->dt_send = _starpu_scc_common_send;
@@ -110,6 +115,7 @@ struct _starpu_mp_node * STARPU_ATTRIBUTE_MALLOC
 				node->dt_send_to_device = NULL;
 				node->dt_recv_from_device = NULL;
 
+				node->bind_thread = NULL;
 				node->execute = NULL;
 				node->allocate = NULL;
 				node->free = NULL;
@@ -124,6 +130,7 @@ struct _starpu_mp_node * STARPU_ATTRIBUTE_MALLOC
 				node->deinit = _starpu_scc_sink_deinit;
 				node->report_error = _starpu_scc_common_report_rcce_error;
 
+				node->mp_recv_is_ready = NULL;
 				node->mp_send = _starpu_scc_common_send;
 				node->mp_recv = _starpu_scc_common_recv;
 				node->dt_send = _starpu_scc_common_send;
@@ -131,6 +138,7 @@ struct _starpu_mp_node * STARPU_ATTRIBUTE_MALLOC
 				node->dt_send_to_device = _starpu_scc_sink_send_to_device;
 				node->dt_recv_from_device = _starpu_scc_sink_recv_from_device;
 
+				node->bind_thread = NULL;
 				node->execute = _starpu_scc_sink_execute;
 				node->allocate = _starpu_sink_common_allocate;
 				node->free = _starpu_sink_common_free;

+ 3 - 1
src/drivers/mp_common/mp_common.h

@@ -147,7 +147,8 @@ struct _starpu_mp_node
 	void (*deinit)(struct _starpu_mp_node *node);
 	void (*report_error)(const char *, const char *, const int, const int);
 
-	/* Message passing */
+  /* Message passing */
+  int (*mp_recv_is_ready)(const struct _starpu_mp_node *);
 	void (*mp_send)(const struct _starpu_mp_node *, void *, int);
 	void (*mp_recv)(const struct _starpu_mp_node *, void *, int);
 
@@ -157,6 +158,7 @@ struct _starpu_mp_node
 	void (*dt_send_to_device)(const struct _starpu_mp_node *, int, void *, int);
 	void (*dt_recv_from_device)(const struct _starpu_mp_node *, int, void *, int);
 
+  void (*bind_thread)(const struct _starpu_mp_node *, cpu_set_t *,int, pthread_t *);
 	void (*execute)(const struct _starpu_mp_node *, void *, int);
 	void (*nbcores)(const struct _starpu_mp_node *);
 	void (*allocate)(const struct _starpu_mp_node *, void *, int);

+ 34 - 81
src/drivers/mp_common/sink_common.c

@@ -29,7 +29,6 @@
 
 #include "sink_common.h"
 
-#define HYPER_THREAD_NUMBER 4
 #include "task_fifo.h"
 
 /* Return the sink kind of the running process, based on the value of the
@@ -77,17 +76,25 @@ static void _starpu_sink_common_lookup(const struct _starpu_mp_node *node,
 	void (*func)(void);
 	void *dl_handle = dlopen(NULL, RTLD_NOW);
 	func = dlsym(dl_handle, func_name);
-
-	//_STARPU_DEBUG("Looked up %s, got %p\n", func_name, func);
+	
+	printf("Looked up %s, got %p\n", func_name, func);
+	_STARPU_DEBUG("Looked up %s, got %p\n", func_name, func);
 
 	/* If we couldn't find the function, let's send an error to the host.
 	 * The user probably made a mistake in the name */
 	if (func)
-		_starpu_mp_common_send_command(node, STARPU_ANSWER_LOOKUP,
+	  {
+	    printf("\n LOOL UP OK \n");
+	    _starpu_mp_common_send_command(node, STARPU_ANSWER_LOOKUP,
 					       &func, sizeof(func));
+	
+	  }
 	else
-		_starpu_mp_common_send_command(node, STARPU_ERROR_LOOKUP,
+	  {
+	    printf("\n LOOL UP FAIL \n");
+	    _starpu_mp_common_send_command(node, STARPU_ERROR_LOOKUP,
 					       NULL, 0);
+	  }
 }
 
 void _starpu_sink_common_allocate(const struct _starpu_mp_node *mp_node,
@@ -167,7 +174,7 @@ void _starpu_sink_common_worker(void)
 	enum _starpu_mp_command command = STARPU_EXIT;
 	int arg_size = 0;
 	void *arg = NULL;
-
+	int exit_starpu = 0;
 	enum _starpu_mp_node_kind node_kind = _starpu_sink_common_get_kind();
 
 	if (node_kind == STARPU_INVALID_KIND)
@@ -177,11 +184,17 @@ void _starpu_sink_common_worker(void)
 
 	/* Create and initialize the node */
 	node = _starpu_mp_common_node_create(node_kind, -1);
-
-	while ((command = _starpu_mp_common_recv_command(node, &arg, &arg_size)) != STARPU_EXIT)
+	
+	while (!exit_starpu)
 	{
+	  if(node->mp_recv_is_ready(node))
+	    {	
+	      command = _starpu_mp_common_recv_command(node, &arg, &arg_size);
 		switch(command)
 		{
+		case STARPU_EXIT:
+		  exit_starpu = 1;
+		  break;
 			case STARPU_EXECUTE:
 				node->execute(node, arg, arg_size);
 				break;
@@ -219,6 +232,7 @@ void _starpu_sink_common_worker(void)
 			default:
 				printf("Oops, command %x unrecognized\n", command);
 		}
+	    }
 
 		if(!task_fifo_is_empty(&(node->dead_queue)))
 		  {
@@ -226,7 +240,6 @@ void _starpu_sink_common_worker(void)
 		    _STARPU_DEBUG("telling host that we have finished the task %p sur %d.\n", task->kernel, task->coreid);
 		    _starpu_mp_common_send_command(task->node, STARPU_EXECUTION_COMPLETED,
 								    &(task->coreid), sizeof(task->coreid));
-		    _STARPU_DEBUG("we have finished the task %p sur %d.\n", task->kernel, task->coreid);
 		    task_fifo_pop(&(node->dead_queue));
 		    free(task);
 		  }
@@ -240,43 +253,31 @@ void _starpu_sink_common_worker(void)
 
 
 
-static void* _starpu_mic_sink_thread(void * thread_arg)
+static void* _starpu_sink_thread(void * thread_arg)
 {
   struct task *arg = (struct task *)thread_arg;
-  _STARPU_DEBUG("thread launch: %d.\n", arg->coreid);
+  
+  //execute the task
   arg->kernel(arg->interfaces,arg->cl_arg);
 
+  //append the finished task to the dead queue
   task_fifo_append(&(arg->node->dead_queue),arg);
-
   pthread_exit(NULL);
 }
 
-static void _starpu_mic_sink_execute_thread(struct task *arg)
+static void _starpu_sink_execute_thread(struct task *arg)
 {
   int j;
   pthread_t thread;
   cpu_set_t cpuset;
   int ret;
   
-  ret = pthread_create(&thread, NULL, _starpu_mic_sink_thread, arg);
+  //create the tread
+  ret = pthread_create(&thread, NULL, _starpu_sink_thread, arg);
   STARPU_ASSERT(ret == 0);
   
-  CPU_ZERO(&cpuset);
-  for(j=0;j<HYPER_THREAD_NUMBER;j++)
-    CPU_SET(j+arg->coreid*HYPER_THREAD_NUMBER,&cpuset);
-
-  _STARPU_DEBUG("coreid: %d.\n", arg->coreid);
-  ret = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
-  if(ret != 0)
-    { 
-      if(ret== EFAULT)
-	printf("\n\n EFAULT \n\n");
-      if(ret == EINVAL)
-	printf("\n\n EINVAL \n\n");
-      if(ret == ESRCH)
-	printf("\n\n ESRCH \n\n");
-    }
-  STARPU_ASSERT(ret == 0);
+  //bind the thread on the core coreid
+  arg->node->bind_thread(arg->node, &cpuset, arg->coreid, &thread);
 }
 
 
@@ -323,58 +324,10 @@ void _starpu_sink_common_execute(const struct _starpu_mp_node *node,
 	else
 		thread_arg->cl_arg = NULL;
 
-	_STARPU_DEBUG("telling host that we have submitted the task %p.\n", thread_arg->kernel);
+	//_STARPU_DEBUG("telling host that we have submitted the task %p.\n", thread_arg->kernel);
 	_starpu_mp_common_send_command(node, STARPU_EXECUTION_SUBMITTED,
 				       NULL, 0);
 
-	//_STARPU_DEBUG("executing the task %p\n", kernel);
-	_starpu_mic_sink_execute_thread(thread_arg);
-	
+	//_STARPU_DEBUG("executing the task %p\n", thread_arg->kernel);
+	_starpu_sink_execute_thread(thread_arg);	
 }
-
-
-
-/*
-void _starpu_sink_common_execute(const struct _starpu_mp_node *node,
-					void *arg, int arg_size)
-{
-	unsigned id = 0;
-
-	void *arg_ptr = arg;
-	void (*kernel)(void **, void *) = NULL;
-	unsigned coreid = 0;
-	unsigned nb_interfaces = 0;
-	void *interfaces[STARPU_NMAXBUFS];
-	void *cl_arg;
-
-	kernel = *(void(**)(void **, void *)) arg_ptr;
-	arg_ptr += sizeof(kernel);
-
-	coreid = *(unsigned *) arg_ptr;
-	arg_ptr += sizeof(coreid);
-
-	nb_interfaces = *(unsigned *) arg_ptr;
-	arg_ptr += sizeof(nb_interfaces);
-
-	for (id = 0; id < nb_interfaces; id++)
-	{
-		interfaces[id] = arg_ptr;
-		arg_ptr += sizeof(union _starpu_interface);
-	}
-
-
-	if (arg_size > arg_ptr - arg)
-		cl_arg = arg_ptr;
-	else
-		cl_arg = NULL;
-
-	_starpu_mp_common_send_command(node, STARPU_EXECUTION_SUBMITTED,
-				       NULL, 0);
-
-
-	kernel(interfaces, cl_arg);
-
-	_starpu_mp_common_send_command(node, STARPU_EXECUTION_COMPLETED,
-				       &coreid, sizeof(coreid));
-}*/
-

+ 5 - 2
src/drivers/mp_common/source_common.c

@@ -64,6 +64,9 @@ int _starpu_src_common_lookup(struct _starpu_mp_node *node,
 	answer = _starpu_mp_common_recv_command(node, (void **) &arg,
 						&arg_size);
 
+	//	printf("\n\n\n answer:%d\n\n", (int)answer);
+	
+
 	if (answer == STARPU_ERROR_LOOKUP) {
 		_STARPU_DISP("Error looking up symbol %s\n", func_name);
 		return -ESPIPE;
@@ -71,8 +74,8 @@ int _starpu_src_common_lookup(struct _starpu_mp_node *node,
 
 	/* We have to be sure the device answered the right question and the
 	 * answer has the right size */
-	STARPU_ASSERT(answer == STARPU_ANSWER_LOOKUP &&
-		      arg_size == sizeof(*func_ptr));
+	STARPU_ASSERT(answer == STARPU_ANSWER_LOOKUP);
+	STARPU_ASSERT(arg_size == sizeof(*func_ptr));
 
 	memcpy(func_ptr, arg, arg_size);