idle_agent.c 9.8 KB


  1. #include "idle_agent.h"
  2. //extern int *pid_num;
  3. extern int num_idags, node_id ,my_idag;//, fd_log;
  4. extern FILE *log_file;
  5. extern core_states state;
  6. extern inter_list **core_inter_head,**core_inter_tail;
  7. extern app my_app;
  8. extern app init_app;
  9. extern app far_req_app;
  10. extern metrics my_stats, total_stats;
  11. extern int far_req_or_sender;//far_req_max_man_cores, far_req_max_man, far_req_max_man_count,
  12. extern int *idag_mask, *idag_id_arr;
  13. extern int *Cl_x_max_arr, *Cl_y_max_arr;
  14. extern int DDS_count, my_cores_count;
  15. extern DDS_list *DDS, *DDS_tail;
  16. extern core_list *my_cores, *my_cores_tail;
  17. extern offer_list *init_man_offers, *selfopt_man_offers;
  18. extern offer_list *far_man_offers, *far_man_offers_tail;
  19. extern int far_list_count, far_man_agent_count;
  20. extern int my_agent, time_worked;
  21. extern int debug_global;
  22. extern time_t cur_time;
  23. extern struct tm *cur_t;
  24. extern struct sigevent sev;
  25. extern struct itimerspec its, chk_timer;
  26. extern timer_t timerid;
  27. extern long int selfopt_time_rem;//-1 means it is not set
  28. extern long int upper_work_bound;
  29. extern int time_for_farman;
  30. extern char scen_num[4];
  31. extern int chk_rem_count, chk_rem_num, sum_rem_time;
  32. extern float old_Speedup, my_Speedup;
  33. extern int nodes_ended_cnt, app_terminated, stats_replied, nodes_initialised;
  34. extern int *sig_array, *data_array, NUES;
  35. extern RCCE_FLAG flag_signals_enabled,flag_data_written;
  36. void idle_agent_actions(int idag_num, char scen_num[4]){
  37. int i,j,k, Cl_x_max, Cl_y_max, one_core;
  38. pid_t p;
  39. offer_list *tmp_offer_list;
  40. core_list *tmp_cores_list;
  41. float avg_cluster_util;
  42. DDS_count=0;
  43. my_cores_count=0;
  44. DDS=NULL;
  45. my_cores=NULL;
  46. my_stats.msg_count=0;
  47. my_stats.message_size=0;
  48. my_stats.distance=0;
  49. my_stats.app_turnaround=0;
  50. my_stats.comp_effort=0;
  51. my_stats.cores_utilized=0;
  52. my_stats.times_accessed=0;
  53. its.it_value.tv_sec = 0;
  54. its.it_interval.tv_sec = 0;//its.it_value.tv_sec;
  55. its.it_interval.tv_nsec = 0;
  56. nodes_initialised=0;
  57. i = get_cluster_info(idag_num, &Cl_x_max, &Cl_y_max);
  58. idag_id_arr = (int *) malloc(num_idags*sizeof(int));
  59. Cl_x_max_arr = (int *) malloc(num_idags*sizeof(int));
  60. Cl_y_max_arr = (int *) malloc(num_idags*sizeof(int));
  61. idag_mask = (int *) malloc(X_max*Y_max*sizeof(int));
  62. for (i=0; i<num_idags; i++){
  63. idag_id_arr[i] = get_cluster_info(i, &Cl_x_max_arr[i], &Cl_y_max_arr[i]);
  64. for (j=idag_id_arr[i]; j<idag_id_arr[i] + Cl_y_max_arr[i]*X_max; j+=X_max)
  65. for (k=0; k<Cl_x_max_arr[i]; k++)
  66. idag_mask[j+k] = idag_id_arr[i];
  67. }
  68. //printf("I an idag with node_id = %d, pid = %d\n",node_id,getpid());
  69. log_file = create_log_file(node_id, scen_num);
  70. cur_time = time(NULL);
  71. cur_t = localtime(&cur_time);
  72. fprintf(log_file, "[%d:%d:%d]: I start initialising node_id=%d\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,node_id);
  73. fflush(log_file);
  74. install_signal_handlers();
  75. //sig_SEGV_enable();
  76. sev.sigev_notify = SIGEV_SIGNAL;
  77. sev.sigev_signo = SIG_TIMER;
  78. sev.sigev_value.sival_ptr = &timerid;
  79. if (timer_create(CLOCK_REALTIME, &sev, &timerid) == -1) printf("timer_create error\n");
  80. for (j=node_id; j<node_id+Cl_y_max*X_max; j+=X_max)
  81. for (k=0; k<Cl_x_max; k++) {
  82. if (my_cores == NULL) {
  83. my_cores = (core_list *) malloc(sizeof(core_list));
  84. my_cores_tail = my_cores;
  85. } else {
  86. my_cores_tail->next = (core_list *) malloc(sizeof(core_list));
  87. my_cores_tail = my_cores_tail->next;
  88. }
  89. my_cores_count++;
  90. my_cores_tail->core_id = j+k;
  91. my_cores_tail->offered_to = -1;
  92. my_cores_tail->next = NULL;
  93. if ((j+k) == node_id){
  94. DDS = (DDS_list *) malloc(sizeof(DDS_list));
  95. DDS->agent_id = j+k;
  96. DDS->num_of_cores = Cl_x_max*Cl_y_max;
  97. DDS->next = NULL;
  98. DDS_tail = DDS;
  99. DDS_count++;
  100. //pid_num[j+k] = getpid();
  101. }
  102. }
  103. RCCE_barrier(&RCCE_COMM_WORLD);
  104. //sleep(1);
  105. //for (i=0; i<Cl_x_max*Cl_y_max-1; i++) waitpid(-1, NULL, WUNTRACED);
  106. for (j=node_id; j<node_id+Cl_x_max*X_max; j+=X_max)
  107. for (k=0; k<Cl_x_max; k++)
  108. if ((j+k) != node_id) {
  109. signals_disable();
  110. one_core = j+k;
  111. if (core_inter_head[one_core] == NULL){
  112. core_inter_head[one_core] = (inter_list *) malloc(sizeof(inter_list));
  113. core_inter_tail[one_core] = core_inter_head[one_core];
  114. } else {
  115. core_inter_tail[one_core]->next = (inter_list *) malloc(sizeof(inter_list));
  116. core_inter_tail[one_core] = core_inter_tail[one_core]->next;
  117. }
  118. core_inter_tail[one_core]->type = INIT_CORE;
  119. core_inter_tail[one_core]->next = NULL;
  120. signals_enable();
  121. //kill(pid_num[one_core], SIG_INIT);
  122. scc_kill(one_core, SIG_INIT);
  123. //my_stats.msg_count++;
  124. //my_stats.distance += distance(node_id,one_core);
  125. }
  126. int dummy=0;
  127. while (nodes_initialised != my_cores_count-1) {//pause();
  128. for (i=0; i<1000; i++)
  129. for(j=0; j<1000; j++)
  130. dummy++;
  131. scc_signals_check();
  132. }
  133. state = IDLE_IDAG;
  134. while (state != IDAG_ENDING)
  135. if (state == IDLE_IDAG || state == IDLE_FAR_MAN) {
  136. //pause();
  137. dummy=0;
  138. for (i=0; i<100; i++)
  139. for(j=0; j<100; j++)
  140. dummy++;
  141. scc_signals_check();
  142. /*else if (state == FAR_MAN_CHK_OFFERS){
  143. signals_disable();
  144. printf("far check alarm went off in idag %d! far_req_or_sender = %d\n",node_id,far_req_or_sender);
  145. cur_time = time(NULL);
  146. cur_t = localtime(&cur_time);
  147. fprintf(log_file, "[%d:%d:%d]: far check alarm went off in idag %d! far_req_or_sender = %d\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,node_id,far_req_or_sender);
  148. fflush(log_file);
  149. tmp_offer_list = far_man_offers;
  150. while (tmp_offer_list != NULL){
  151. printf("zxcA Offer by %d for %d cores\n",tmp_offer_list->sender,tmp_offer_list->off.num_of_cores);
  152. //tmp_offer_list->answer = &core_inter_head[sender_id]->data.offer_accepted; must be a serious bug
  153. //tmp_offer_list->answer = &core_inter_head[tmp_offer_list->sender]->data.off_acc_arr[1];
  154. tmp_offer_list = tmp_offer_list->next;
  155. }
  156. if (core_inter_head[far_req_or_sender] == NULL){
  157. core_inter_head[far_req_or_sender] = (inter_list *) malloc(sizeof(inter_list));
  158. core_inter_tail[far_req_or_sender] = core_inter_head[far_req_or_sender];
  159. } else {
  160. core_inter_tail[far_req_or_sender]->next = (inter_list *) malloc(sizeof(inter_list));
  161. core_inter_tail[far_req_or_sender] = core_inter_tail[far_req_or_sender]->next;
  162. }
  163. core_inter_tail[far_req_or_sender]->type = FAR_REQ_OFFER;
  164. core_inter_tail[far_req_or_sender]->data.my_offer = far_man_offers->off;
  165. core_inter_tail[far_req_or_sender]->next = NULL;
  166. //kill(pid_num[far_req_or_sender],SIG_FAR_REQ);
  167. if (core_inter_head[far_req_or_sender]->next == NULL) {
  168. kill(pid_num[far_req_or_sender],SIG_FAR_REQ);
  169. my_stats.msg_count++;
  170. my_stats.distance += distance(node_id,far_req_or_sender);
  171. } else printf("first i am doing smth else with far_req_or_sender type0=%d type1=%d\n",core_inter_head[far_req_or_sender]->type,core_inter_head[far_req_or_sender]->next->type);
  172. if (selfopt_time_rem != -1) printf("selfopt timer in idag??\n");
  173. state = IDLE_IDAG;
  174. signals_enable();*/
  175. } else {
  176. printf("Uknown state node_id = %d state = %d\n",node_id,state);
  177. state = IDLE_IDAG;
  178. }
  179. printf("killing inside %d\n",getpid());
  180. tmp_cores_list = my_cores;
  181. my_cores = my_cores->next;
  182. free(tmp_cores_list);
  183. for (; my_cores != NULL; my_cores = my_cores->next){
  184. tmp_cores_list = my_cores;
  185. one_core = my_cores->core_id;
  186. if (core_inter_head[one_core] == NULL){
  187. core_inter_head[one_core] = (inter_list *) malloc(sizeof(inter_list));
  188. core_inter_tail[one_core] = core_inter_head[one_core];
  189. } else {
  190. core_inter_tail[one_core]->next = (inter_list *) malloc(sizeof(inter_list));
  191. core_inter_tail[one_core] = core_inter_tail[one_core]->next;
  192. fprintf(log_file,"I am still doing smth with my node %d interaction = %d\n",one_core,core_inter_head[one_core]->type);
  193. fflush(log_file);
  194. }
  195. core_inter_tail[one_core]->type = TERMINATION_STATS;
  196. core_inter_tail[one_core]->next = NULL;
  197. //kill(pid_num[one_core], SIG_TERMINATE);
  198. scc_kill(one_core, SIG_TERMINATE);
  199. my_stats.msg_count++;
  200. my_stats.distance += distance(node_id,one_core);
  201. free(tmp_cores_list);
  202. }
  203. while (state == IDAG_ENDING) {
  204. //pause();
  205. dummy=0;
  206. for (i=0; i<1000; i++)
  207. for(j=0; j<1000; j++)
  208. dummy++;
  209. scc_signals_check();
  210. if (stats_replied == my_cores_count-1) {
  211. //printf("I am %d and all my cores replied their stats\n",node_id);
  212. core_inter_head[0] = (inter_list *) malloc(sizeof(inter_list));
  213. core_inter_tail[0] = core_inter_head[0];
  214. core_inter_tail[0]->type = REP_STATISTICS;
  215. core_inter_tail[0]->next = NULL;
  216. total_stats.msg_count += my_stats.msg_count;
  217. total_stats.message_size += my_stats.message_size;
  218. total_stats.distance += my_stats.distance;
  219. total_stats.app_turnaround += my_stats.app_turnaround;
  220. total_stats.comp_effort += my_stats.comp_effort;
  221. total_stats.cores_utilized += my_stats.cores_utilized;
  222. total_stats.times_accessed += my_stats.times_accessed;
  223. avg_cluster_util = (float) my_stats.cores_utilized / (my_stats.times_accessed * (my_cores_count-1));
  224. printf("I am %d with cores_utilized = %d times_accessed = %d my_cores_count = %d and avg_cluster_util = %0.2f\n",
  225. node_id,my_stats.cores_utilized,my_stats.times_accessed,my_cores_count,avg_cluster_util);
  226. fprintf(log_file,"cores_utilized = %d times_accessed = %d my_cores_count = %d and avg_cluster_util = %0.2f\n",
  227. my_stats.cores_utilized,my_stats.times_accessed,my_cores_count,avg_cluster_util);
  228. fflush(log_file);
  229. core_inter_tail[0]->data.stats = total_stats;
  230. //kill(pid_num[0], SIG_TERMINATE);
  231. scc_kill(0, SIG_TERMINATE);
  232. my_cores_count = 0;
  233. }
  234. }
  235. //for (i=1; i<my_cores_count; i++) pause();
  236. RCCE_flag_free(&flag_signals_enabled);
  237. RCCE_flag_free(&flag_data_written);
  238. RCCE_free((t_vcharp) sig_array);
  239. RCCE_free((t_vcharp) data_array);
  240. //for (i=0; i<Cl_x_max*Cl_y_max-1; i++) wait(NULL);
  241. cur_time = time(NULL);
  242. cur_t = localtime(&cur_time);
  243. fprintf(log_file, "[%d:%d:%d]: I ended well\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec);
  244. fclose(log_file);
  245. exit(0);
  246. }