idle_agent.c~ 9.6 KB


  1. #include "idle_agent.h"
  2. //extern int *pid_num;
  3. extern int num_idags, node_id ,my_idag;//, fd_log;
  4. extern FILE *log_file;
  5. extern core_states state;
  6. extern inter_list **core_inter_head,**core_inter_tail;
  7. extern app my_app;
  8. extern app init_app;
  9. extern app far_req_app;
  10. extern metrics my_stats, total_stats;
  11. extern int far_req_or_sender;//far_req_max_man_cores, far_req_max_man, far_req_max_man_count,
  12. extern int *idag_mask, *idag_id_arr;
  13. extern int *Cl_x_max_arr, *Cl_y_max_arr;
  14. extern int DDS_count, my_cores_count;
  15. extern DDS_list *DDS, *DDS_tail;
  16. extern core_list *my_cores, *my_cores_tail;
  17. extern offer_list *init_man_offers, *selfopt_man_offers;
  18. extern offer_list *far_man_offers, *far_man_offers_tail;
  19. extern int far_list_count, far_man_agent_count;
  20. extern int my_agent, time_worked;
  21. extern int debug_global;
  22. extern time_t cur_time;
  23. extern struct tm *cur_t;
  24. extern struct sigevent sev;
  25. extern struct itimerspec its, chk_timer;
  26. extern timer_t timerid;
  27. extern long int selfopt_time_rem;//-1 means it is not set
  28. extern long int upper_work_bound;
  29. extern int time_for_farman;
  30. extern char scen_num[4];
  31. extern int chk_rem_count, chk_rem_num, sum_rem_time;
  32. extern float old_Speedup, my_Speedup;
  33. extern int nodes_ended_cnt, app_terminated, stats_replied, nodes_initialised;
  34. void idle_agent_actions(int idag_num, char scen_num[4], int seg_id){
  35. int i,j,k, Cl_x_max, Cl_y_max, one_core;
  36. pid_t p;
  37. offer_list *tmp_offer_list;
  38. core_list *tmp_cores_list;
  39. float avg_cluster_util;
  40. DDS_count=0;
  41. my_cores_count=0;
  42. DDS=NULL;
  43. my_cores=NULL;
  44. my_stats.msg_count=0;
  45. my_stats.message_size=0;
  46. my_stats.distance=0;
  47. my_stats.app_turnaround=0;
  48. my_stats.comp_effort=0;
  49. my_stats.cores_utilized=0;
  50. my_stats.times_accessed=0;
  51. its.it_value.tv_sec = 0;
  52. its.it_interval.tv_sec = 0;//its.it_value.tv_sec;
  53. its.it_interval.tv_nsec = 0;
  54. nodes_initialised=0;
  55. i = get_cluster_info(idag_num, &Cl_x_max, &Cl_y_max);
  56. idag_id_arr = (int *) malloc(num_idags*sizeof(int));
  57. Cl_x_max_arr = (int *) malloc(num_idags*sizeof(int));
  58. Cl_y_max_arr = (int *) malloc(num_idags*sizeof(int));
  59. idag_mask = (int *) malloc(X_max*Y_max*sizeof(int));
  60. for (i=0; i<num_idags; i++){
  61. idag_id_arr[i] = get_cluster_info(i, &Cl_x_max_arr[i], &Cl_y_max_arr[i]);
  62. for (j=idag_id_arr[i]; j<idag_id_arr[i] + Cl_y_max_arr[i]*X_max; j+=X_max)
  63. for (k=0; k<Cl_x_max_arr[i]; k++)
  64. idag_mask[j+k] = idag_id_arr[i];
  65. }
  66. //printf("I an idag with node_id = %d, pid = %d\n",node_id,getpid());
  67. log_file = create_log_file(node_id, scen_num);
  68. cur_time = time(NULL);
  69. cur_t = localtime(&cur_time);
  70. fprintf(log_file, "[%d:%d:%d]: I start initialising node_id=%d\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,node_id);
  71. fflush(log_file);
  72. install_signal_handlers();
  73. //sig_SEGV_enable();
  74. sev.sigev_notify = SIGEV_SIGNAL;
  75. sev.sigev_signo = SIG_TIMER;
  76. sev.sigev_value.sival_ptr = &timerid;
  77. if (timer_create(CLOCK_REALTIME, &sev, &timerid) == -1) printf("timer_create error\n");
  78. for (j=node_id; j<node_id+Cl_y_max*X_max; j+=X_max)
  79. for (k=0; k<Cl_x_max; k++) {
  80. if (my_cores == NULL) {
  81. my_cores = (core_list *) malloc(sizeof(core_list));
  82. my_cores_tail = my_cores;
  83. } else {
  84. my_cores_tail->next = (core_list *) malloc(sizeof(core_list));
  85. my_cores_tail = my_cores_tail->next;
  86. }
  87. my_cores_count++;
  88. my_cores_tail->core_id = j+k;
  89. my_cores_tail->offered_to = -1;
  90. my_cores_tail->next = NULL;
  91. if ((j+k) == node_id){
  92. DDS = (DDS_list *) malloc(sizeof(DDS_list));
  93. DDS->agent_id = j+k;
  94. DDS->num_of_cores = Cl_x_max*Cl_y_max;
  95. DDS->next = NULL;
  96. DDS_tail = DDS;
  97. DDS_count++;
  98. //pid_num[j+k] = getpid();
  99. }
  100. }
  101. RCCE_barrier(&RCCE_COMM_WORLD);
  102. //sleep(1);
  103. //for (i=0; i<Cl_x_max*Cl_y_max-1; i++) waitpid(-1, NULL, WUNTRACED);
  104. for (j=node_id; j<node_id+Cl_x_max*X_max; j+=X_max)
  105. for (k=0; k<Cl_x_max; k++)
  106. if ((j+k) != node_id) {
  107. signals_disable();
  108. one_core = j+k;
  109. if (core_inter_head[one_core] == NULL){
  110. core_inter_head[one_core] = (inter_list *) malloc(sizeof(inter_list));
  111. core_inter_tail[one_core] = core_inter_head[one_core];
  112. } else {
  113. core_inter_tail[one_core]->next = (inter_list *) malloc(sizeof(inter_list));
  114. core_inter_tail[one_core] = core_inter_tail[one_core]->next;
  115. }
  116. core_inter_tail[one_core]->type = INIT_CORE;
  117. core_inter_tail[one_core]->next = NULL;
  118. signals_enable();
  119. //kill(pid_num[one_core], SIG_INIT);
  120. scc_kill(one_core, SIG_INIT);
  121. //my_stats.msg_count++;
  122. //my_stats.distance += distance(node_id,one_core);
  123. }
  124. int dummy=0;
  125. while (nodes_initialised != my_cores_count-1) {//pause();
  126. for (i=0; i<1000; i++)
  127. for(j=0; j<1000; j++)
  128. dummy++;
  129. scc_signals_check();
  130. }
  131. state = IDLE_IDAG;
  132. while (state != IDAG_ENDING)
  133. if (state == IDLE_IDAG || state == IDLE_FAR_MAN) {
  134. //pause();
  135. dummy=0;
  136. for (i=0; i<1000; i++)
  137. for(j=0; j<1000; j++)
  138. dummy++;
  139. scc_signals_check();
  140. /*else if (state == FAR_MAN_CHK_OFFERS){
  141. signals_disable();
  142. printf("far check alarm went off in idag %d! far_req_or_sender = %d\n",node_id,far_req_or_sender);
  143. cur_time = time(NULL);
  144. cur_t = localtime(&cur_time);
  145. fprintf(log_file, "[%d:%d:%d]: far check alarm went off in idag %d! far_req_or_sender = %d\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,node_id,far_req_or_sender);
  146. fflush(log_file);
  147. tmp_offer_list = far_man_offers;
  148. while (tmp_offer_list != NULL){
  149. printf("zxcA Offer by %d for %d cores\n",tmp_offer_list->sender,tmp_offer_list->off.num_of_cores);
  150. //tmp_offer_list->answer = &core_inter_head[sender_id]->data.offer_accepted; must be a serious bug
  151. //tmp_offer_list->answer = &core_inter_head[tmp_offer_list->sender]->data.off_acc_arr[1];
  152. tmp_offer_list = tmp_offer_list->next;
  153. }
  154. if (core_inter_head[far_req_or_sender] == NULL){
  155. core_inter_head[far_req_or_sender] = (inter_list *) malloc(sizeof(inter_list));
  156. core_inter_tail[far_req_or_sender] = core_inter_head[far_req_or_sender];
  157. } else {
  158. core_inter_tail[far_req_or_sender]->next = (inter_list *) malloc(sizeof(inter_list));
  159. core_inter_tail[far_req_or_sender] = core_inter_tail[far_req_or_sender]->next;
  160. }
  161. core_inter_tail[far_req_or_sender]->type = FAR_REQ_OFFER;
  162. core_inter_tail[far_req_or_sender]->data.my_offer = far_man_offers->off;
  163. core_inter_tail[far_req_or_sender]->next = NULL;
  164. //kill(pid_num[far_req_or_sender],SIG_FAR_REQ);
  165. if (core_inter_head[far_req_or_sender]->next == NULL) {
  166. kill(pid_num[far_req_or_sender],SIG_FAR_REQ);
  167. my_stats.msg_count++;
  168. my_stats.distance += distance(node_id,far_req_or_sender);
  169. } else printf("first i am doing smth else with far_req_or_sender type0=%d type1=%d\n",core_inter_head[far_req_or_sender]->type,core_inter_head[far_req_or_sender]->next->type);
  170. if (selfopt_time_rem != -1) printf("selfopt timer in idag??\n");
  171. state = IDLE_IDAG;
  172. signals_enable();*/
  173. } else {
  174. printf("Uknown state node_id = %d state = %d\n",node_id,state);
  175. state = IDLE_IDAG;
  176. }
  177. printf("killing inside %d\n",getpid());
  178. tmp_cores_list = my_cores;
  179. my_cores = my_cores->next;
  180. free(tmp_cores_list);
  181. for (; my_cores != NULL; my_cores = my_cores->next){
  182. tmp_cores_list = my_cores;
  183. one_core = my_cores->core_id;
  184. if (core_inter_head[one_core] == NULL){
  185. core_inter_head[one_core] = (inter_list *) malloc(sizeof(inter_list));
  186. core_inter_tail[one_core] = core_inter_head[one_core];
  187. } else {
  188. core_inter_tail[one_core]->next = (inter_list *) malloc(sizeof(inter_list));
  189. core_inter_tail[one_core] = core_inter_tail[one_core]->next;
  190. fprintf(log_file,"I am still doing smth with my node %d interaction = %d\n",one_core,core_inter_head[one_core]->type);
  191. fflush(log_file);
  192. }
  193. core_inter_tail[one_core]->type = TERMINATION_STATS;
  194. core_inter_tail[one_core]->next = NULL;
  195. //kill(pid_num[one_core], SIG_TERMINATE);
  196. scc_kill(one_core, SIG_TERMINATE);
  197. my_stats.msg_count++;
  198. my_stats.distance += distance(node_id,one_core);
  199. free(tmp_cores_list);
  200. }
  201. while (state == IDAG_ENDING) {
  202. //pause();
  203. dummy=0;
  204. for (i=0; i<1000; i++)
  205. for(j=0; j<1000; j++)
  206. dummy++;
  207. scc_signals_check();
  208. if (stats_replied == my_cores_count-1) {
  209. //printf("I am %d and all my cores replied their stats\n",node_id);
  210. core_inter_head[0] = (inter_list *) malloc(sizeof(inter_list));
  211. core_inter_tail[0] = core_inter_head[0];
  212. core_inter_tail[0]->type = REP_STATISTICS;
  213. core_inter_tail[0]->next = NULL;
  214. total_stats.msg_count += my_stats.msg_count;
  215. total_stats.message_size += my_stats.message_size;
  216. total_stats.distance += my_stats.distance;
  217. total_stats.app_turnaround += my_stats.app_turnaround;
  218. total_stats.comp_effort += my_stats.comp_effort;
  219. total_stats.cores_utilized += my_stats.cores_utilized;
  220. total_stats.times_accessed += my_stats.times_accessed;
  221. avg_cluster_util = (float) my_stats.cores_utilized / (my_stats.times_accessed * (my_cores_count-1));
  222. printf("I am %d with cores_utilized = %d times_accessed = %d my_cores_count = %d and avg_cluster_util = %0.2f\n",
  223. node_id,my_stats.cores_utilized,my_stats.times_accessed,my_cores_count,avg_cluster_util);
  224. fprintf(log_file,"cores_utilized = %d times_accessed = %d my_cores_count = %d and avg_cluster_util = %0.2f\n",
  225. my_stats.cores_utilized,my_stats.times_accessed,my_cores_count,avg_cluster_util);
  226. fflush(log_file);
  227. core_inter_tail[0]->data.stats = total_stats;
  228. //kill(pid_num[0], SIG_TERMINATE);
  229. scc_kill(0, SIG_TERMINATE);
  230. my_cores_count = 0;
  231. }
  232. }
  233. //for (i=1; i<my_cores_count; i++) pause();
  234. //for (i=0; i<Cl_x_max*Cl_y_max-1; i++) wait(NULL);
  235. cur_time = time(NULL);
  236. cur_t = localtime(&cur_time);
  237. fprintf(log_file, "[%d:%d:%d]: I ended well\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec);
  238. fclose(log_file);
  239. exit(0);
  240. }