Przeglądaj źródła

Improve BUFFER_SIZE management for MIC messages

Samuel Thibault 9 lat temu
rodzic
commit
73e95ec64e

+ 1 - 1
src/drivers/mp_common/mp_common.c

@@ -230,7 +230,7 @@ void _starpu_mp_common_send_command(const struct _starpu_mp_node *node,
 				    const enum _starpu_mp_command command,
 				    const enum _starpu_mp_command command,
 				    void *arg, int arg_size)
 				    void *arg, int arg_size)
 {
 {
-	STARPU_ASSERT(arg_size <= BUFFER_SIZE);
+	STARPU_ASSERT_MSG(arg_size <= BUFFER_SIZE, "Too much data (%d) for the static MIC buffer (%d), increase BUFFER_SIZE perhaps?", arg_size, BUFFER_SIZE);
 
 
 	/* MIC and MPI sizes are given through a int */
 	/* MIC and MPI sizes are given through a int */
 	int command_size = sizeof(enum _starpu_mp_command);
 	int command_size = sizeof(enum _starpu_mp_command);

+ 2 - 2
src/drivers/mp_common/mp_common.h

@@ -33,7 +33,7 @@
 #include <scif.h>
 #include <scif.h>
 #endif /* STARPU_USE_MIC */
 #endif /* STARPU_USE_MIC */
 
 
-#define BUFFER_SIZE 256
+#define BUFFER_SIZE 65536
 
 
 #define STARPU_MP_SRC_NODE 0
 #define STARPU_MP_SRC_NODE 0
 #define STARPU_MP_SINK_NODE(a) ((a) + 1)
 #define STARPU_MP_SINK_NODE(a) ((a) + 1)
@@ -108,7 +108,7 @@ LIST_TYPE(mp_barrier,
 
 
 LIST_TYPE(mp_message,
 LIST_TYPE(mp_message,
 		enum _starpu_mp_command type;
 		enum _starpu_mp_command type;
-		char buffer[BUFFER_SIZE];
+		char *buffer;
 		int size;
 		int size;
 	 );
 	 );
 
 

+ 6 - 3
src/drivers/mp_common/sink_common.c

@@ -298,7 +298,8 @@ void _starpu_sink_common_worker(void)
 			STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
 			STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
 			//_STARPU_DEBUG("telling host that we have finished the task %p sur %d.\n", task->kernel, task->coreid);
 			//_STARPU_DEBUG("telling host that we have finished the task %p sur %d.\n", task->kernel, task->coreid);
 			_starpu_mp_common_send_command(node, message->type,
 			_starpu_mp_common_send_command(node, message->type,
-					&message->buffer, message->size);
+					message->buffer, message->size);
+			free(message->buffer);
 			mp_message_delete(message);
 			mp_message_delete(message);
 		}
 		}
 		else
 		else
@@ -374,8 +375,9 @@ static void _starpu_sink_common_pre_execution_message(struct _starpu_mp_node *no
 	/* Init message to tell the sink that the execution has begun */
 	/* Init message to tell the sink that the execution has begun */
 	struct mp_message * message = mp_message_new();
 	struct mp_message * message = mp_message_new();
 	message->type = STARPU_PRE_EXECUTION;
 	message->type = STARPU_PRE_EXECUTION;
+	message->buffer = malloc(sizeof(int));
 	*(int *) message->buffer = task->combined_workerid;
 	*(int *) message->buffer = task->combined_workerid;
-	message->size = sizeof(task->combined_workerid);
+	message->size = sizeof(int);
 
 
 
 
 	/* Append the message to the queue */
 	/* Append the message to the queue */
@@ -390,8 +392,9 @@ static void _starpu_sink_common_execution_completed_message(struct _starpu_mp_no
 	/* Init message to tell the sink that the execution is completed */
 	/* Init message to tell the sink that the execution is completed */
 	struct mp_message * message = mp_message_new();
 	struct mp_message * message = mp_message_new();
 	message->type = STARPU_EXECUTION_COMPLETED;
 	message->type = STARPU_EXECUTION_COMPLETED;
-	message->size = sizeof(task->coreid);
+	message->buffer = malloc(sizeof(int));
 	*(int*) message->buffer = task->coreid;
 	*(int*) message->buffer = task->coreid;
+	message->size = sizeof(int);
 
 
 	/* Append the message to the queue */
 	/* Append the message to the queue */
 	_starpu_sink_common_append_message(node, message);
 	_starpu_sink_common_append_message(node, message);

+ 2 - 0
src/drivers/mp_common/source_common.c

@@ -139,6 +139,7 @@ static void _starpu_src_common_handle_stored_async(struct _starpu_mp_node *node)
 		struct mp_message * message = mp_message_list_pop_back(&node->message_queue);
 		struct mp_message * message = mp_message_list_pop_back(&node->message_queue);
 		_starpu_src_common_handle_async(node, message->buffer,
 		_starpu_src_common_handle_async(node, message->buffer,
 				message->size, message->type);
 				message->size, message->type);
+		free(message->buffer);
 		mp_message_delete(message);
 		mp_message_delete(message);
 	}
 	}
 	STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
@@ -157,6 +158,7 @@ int _starpu_src_common_store_message(struct _starpu_mp_node *node,
 		case STARPU_PRE_EXECUTION:
 		case STARPU_PRE_EXECUTION:
 			message = mp_message_new();
 			message = mp_message_new();
 			message->type = answer;
 			message->type = answer;
+			message->buffer = malloc(arg_size);
 			memcpy(message->buffer, arg, arg_size);
 			memcpy(message->buffer, arg, arg_size);
 			message->size = arg_size;
 			message->size = arg_size;
 
 

+ 2 - 1
tests/main/insert_task_dyn_handles.c

@@ -35,7 +35,8 @@ void func_cpu(void *descr[], void *_args)
 struct starpu_codelet codelet =
 struct starpu_codelet codelet =
 {
 {
 	.cpu_funcs = {func_cpu},
 	.cpu_funcs = {func_cpu},
-	.cpu_funcs_name = {"func_cpu"},
+	/* starpu_task_get_current() doesn't work on MIC */
+	/* .cpu_funcs_name = {"func_cpu"}, */
 	.nbuffers = STARPU_VARIABLE_NBUFFERS,
 	.nbuffers = STARPU_VARIABLE_NBUFFERS,
 };
 };