浏览代码

Evaluate the sends induced by a cp.

Romain LION 5 年之前
父节点
当前提交
17ade7fa84
共有 2 个文件被更改,包括 59 次插入15 次删除
  1. 19 6
      mpi/src/starpu_mpi_checkpoint.c
  2. 40 9
      mpi/src/starpu_mpi_checkpoint.h

+ 19 - 6
mpi/src/starpu_mpi_checkpoint.c

@@ -67,12 +67,12 @@ int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* c
 				backup_rank = va_arg(varg_list_copy, int);
 				backup_of   = va_arg(varg_list_copy, int);
 				break;
-			case STARPU_DATA_ARRAY:
-				ptr         = va_arg(varg_list_copy, void*);
-				count       = va_arg(varg_list_copy, int);
-				backup_rank = va_arg(varg_list_copy, int);
-				backup_of   = -1;
-				break;
+//			case STARPU_DATA_ARRAY:
+//				ptr         = va_arg(varg_list_copy, void*);
+//				count       = va_arg(varg_list_copy, int);
+//				backup_rank = va_arg(varg_list_copy, int);
+//				backup_of   = -1;
+//				break;
 			default:
 				STARPU_ABORT_MSG("Unrecognized argument %d, did you perhaps forget to end arguments with 0?\n", arg_type);
 				break;
@@ -159,6 +159,19 @@ int _starpu_mpi_checkpoint_template_submit(starpu_mpi_checkpoint_template_t cp_t
 	return 0;
 }
 
+/**
+ * receives param of type starpu_mpi_checkpoint_template_t
+ * @param args
+ * @return
+ */
+void* _starpu_mpi_checkpoint_ack_send_routine(void* args)
+{
+	starpu_mpi_checkpoint_template_t cp_template = (starpu_mpi_checkpoint_template_t) args;
+	starpu_pthread_mutex_lock(&cp_template->mutex);
+	cp_template->current_send_number--;
+	starpu_pthread_mutex_unlock(&cp_template->mutex);
+}
+
 // For test purpose
 int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_template)
 {

+ 40 - 9
mpi/src/starpu_mpi_checkpoint.h

@@ -38,10 +38,11 @@ struct _starpu_mpi_checkpoint_template{
     struct _starpu_mpi_checkpoint_template_item_list list;
     int size;
     int cp_template_id;
+	int send_number;
+	int current_send_number;
     int pending;
     int frozen;
     starpu_pthread_mutex_t mutex;
-    starpu_sem_t completion_sem;
 };
 
 static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_item_create(int type, void* ptr, int count, int backup_rank, int backup_of)
@@ -68,21 +69,15 @@ static inline starpu_mpi_checkpoint_template_t _starpu_mpi_checkpoint_template_n
 
 static inline int _starpu_mpi_checkpoint_template_add_data(starpu_mpi_checkpoint_template_t cp_template, int type, void* ptr, int count, int backup_rank, int backup_of)
 {
+	starpu_pthread_mutex_lock(&cp_template->mutex);
 	STARPU_ASSERT_MSG(!cp_template->frozen, "It is not possible to modify registered checkpoint template.\n");
 	struct _starpu_mpi_checkpoint_template_item* item;
 	item = _starpu_mpi_checkpoint_template_item_create(type, ptr, count, backup_rank, backup_of);
 	_starpu_mpi_checkpoint_template_item_list_push_back(&cp_template->list, item);
+	starpu_pthread_mutex_unlock(&cp_template->mutex);
 	return 0;
 }
 
-static inline int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t _cp_template)
-{
-	_cp_template->frozen = 1;
-	_cp_template->size = _starpu_mpi_checkpoint_template_item_list_size(&_cp_template->list);
-	starpu_sem_init(&_cp_template->completion_sem, 0, _cp_template->size-1);
-	return _cp_template->size;
-}
-
 static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_get_first_data(starpu_mpi_checkpoint_template_t template)
 {
 	return _starpu_mpi_checkpoint_template_item_list_front(&template->list);
@@ -98,6 +93,42 @@ static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoin
 	return NULL;
 }
 
+static inline int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t cp_template)
+{
+	starpu_pthread_mutex_lock(&cp_template->mutex);
+
+	cp_template->frozen = 1;
+	cp_template->send_number = 0;
+	cp_template->size   = _starpu_mpi_checkpoint_template_item_list_size(&cp_template->list);
+
+	struct _starpu_mpi_checkpoint_template_item* item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
+
+	while (item != _starpu_mpi_checkpoint_template_end(cp_template))
+	{
+		switch (item->type)
+		{
+			case STARPU_VALUE:
+				cp_template->send_number++;
+				break;
+			case STARPU_R:
+				if (starpu_mpi_data_get_rank(*(starpu_data_handle_t *) item->ptr))
+				{
+					cp_template->send_number++;
+				}
+				break;
+			case STARPU_DATA_ARRAY:
+				break;
+		}
+		item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
+	}
+
+	starpu_pthread_mutex_unlock(&cp_template->mutex);
+
+	return cp_template->size;
+}
+
+
+
 int _starpu_mpi_checkpoint_turn_on(void);
 
 // For test purpose