idle_agent.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. #include "idle_agent.h"
  2. //extern int *pid_num;
  3. extern int num_idags, node_id ,my_idag;//, fd_log;
  4. extern FILE *log_file;
  5. extern core_states state;
  6. extern inter_list **core_inter_head,**core_inter_tail;
  7. extern app my_app;
  8. extern app init_app;
  9. extern app far_req_app;
  10. extern metrics my_stats, total_stats;
  11. extern int far_req_or_sender;//far_req_max_man_cores, far_req_max_man, far_req_max_man_count,
  12. extern int *idag_mask, *idag_id_arr;
  13. extern int *Cl_x_max_arr, *Cl_y_max_arr;
  14. extern int DDS_count, my_cores_count;
  15. extern DDS_list *DDS, *DDS_tail;
  16. extern core_list *my_cores, *my_cores_tail;
  17. extern offer_list *init_man_offers, *selfopt_man_offers;
  18. extern offer_list *far_man_offers, *far_man_offers_tail;
  19. extern int far_list_count, far_man_agent_count;
  20. extern int my_agent, time_worked;
  21. extern int debug_global;
  22. extern time_t cur_time;
  23. extern struct tm *cur_t;
  24. extern struct sigevent sev;
  25. extern struct itimerspec its, chk_timer;
  26. extern timer_t timerid;
  27. extern long int selfopt_time_rem;//-1 means it is not set
  28. extern long int upper_work_bound;
  29. extern int time_for_farman;
  30. extern char scen_num[4];
  31. extern int chk_rem_count, chk_rem_num, sum_rem_time;
  32. extern float old_Speedup, my_Speedup;
  33. extern int nodes_ended_cnt, app_terminated, stats_replied;
  34. region far_reg;
  35. extern int nodes_initialised;
  36. extern int *sig_array, *data_array, NUES;
  37. extern RCCE_FLAG flag_signals_enabled,flag_data_written;
  38. void idle_agent_actions(int idag_num, char scen_num[4]){
  39. int i,j,k, Cl_x_max, Cl_y_max, one_core;
  40. pid_t p;
  41. offer_list *tmp_offer_list;
  42. core_list *tmp_cores_list;
  43. float avg_cluster_util;
  44. DDS_count=0;
  45. my_cores_count=0;
  46. DDS=NULL;
  47. my_cores=NULL;
  48. my_stats.msg_count=0;
  49. my_stats.message_size=0;
  50. my_stats.distance=0;
  51. my_stats.app_turnaround=0;
  52. my_stats.comp_effort=0;
  53. my_stats.cores_utilized=0;
  54. my_stats.times_accessed=0;
  55. its.it_value.tv_sec = 0;
  56. its.it_interval.tv_sec = 0;//its.it_value.tv_sec;
  57. its.it_interval.tv_nsec = 0;
  58. nodes_initialised=0;
  59. //node_sem = (sem_t*) shmat (seg_id, NULL, 0);
  60. i = get_cluster_info(idag_num, &Cl_x_max, &Cl_y_max);
  61. if (i != node_id) printf("I am %d and i was %d\n",node_id,i);
  62. idag_id_arr = (int *) malloc(num_idags*sizeof(int));
  63. Cl_x_max_arr = (int *) malloc(num_idags*sizeof(int));
  64. Cl_y_max_arr = (int *) malloc(num_idags*sizeof(int));
  65. idag_mask = (int *) malloc(X_max*Y_max*sizeof(int));
  66. far_reg.C = -1;
  67. far_reg.r = -1;
  68. for (i=0; i<num_idags; i++) {
  69. idag_id_arr[i] = get_cluster_info(i, &Cl_x_max_arr[i], &Cl_y_max_arr[i]);
  70. for (j=idag_id_arr[i]; j<idag_id_arr[i] + Cl_y_max_arr[i]*X_max; j+=X_max)
  71. for (k=0; k<Cl_x_max_arr[i]; k++)
  72. idag_mask[j+k] = idag_id_arr[i];
  73. }
  74. printf("I an idag with node_id = %d, pid = %d\n",node_id,getpid());
  75. log_file = create_log_file(node_id, scen_num);
  76. cur_time = time(NULL);
  77. cur_t = localtime(&cur_time);
  78. fprintf(log_file, "[%d:%d:%d]: I start initialising node_id=%d",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,node_id);
  79. fflush(log_file);
  80. install_signal_handlers();
  81. //sig_SEGV_enable();
  82. sev.sigev_notify = SIGEV_SIGNAL;
  83. sev.sigev_signo = SIG_TIMER;
  84. sev.sigev_value.sival_ptr = &timerid;
  85. if (timer_create(CLOCK_REALTIME, &sev, &timerid) == -1) printf("timer_create error\n");
  86. for (j=node_id; j<node_id+Cl_y_max*X_max; j+=X_max)
  87. for (k=0; k<Cl_x_max; k++) {
  88. if (my_cores == NULL) {
  89. my_cores = (core_list *) malloc(sizeof(core_list));
  90. my_cores_tail = my_cores;
  91. } else {
  92. my_cores_tail->next = (core_list *) malloc(sizeof(core_list));
  93. my_cores_tail = my_cores_tail->next;
  94. }
  95. my_cores_count++;
  96. my_cores_tail->core_id = j+k;
  97. my_cores_tail->offered_to = -1;
  98. my_cores_tail->next = NULL;
  99. if ((j+k) == node_id){
  100. DDS = (DDS_list *) malloc(sizeof(DDS_list));
  101. DDS->agent_id = j+k;
  102. DDS->num_of_cores = Cl_x_max*Cl_y_max;
  103. DDS->next = NULL;
  104. DDS_tail = DDS;
  105. DDS_count++;
  106. //pid_num[j+k] = getpid();
  107. } /*else {
  108. p = fork();
  109. if (p==0){
  110. node_id = j+k;
  111. common_node_actions(node_id,scen_num,seg_id);
  112. }
  113. }*/
  114. }
  115. RCCE_barrier(&RCCE_COMM_WORLD);
  116. //sleep(1);
  117. for (j=node_id; j<node_id+Cl_x_max*X_max; j+=X_max)
  118. for (k=0; k<Cl_x_max; k++)
  119. if ((j+k) != node_id) {
  120. signals_disable();
  121. one_core = j+k;
  122. if (core_inter_head[one_core] == NULL){
  123. core_inter_head[one_core] = (inter_list *) malloc(sizeof(inter_list));
  124. core_inter_tail[one_core] = core_inter_head[one_core];
  125. } else {
  126. core_inter_tail[one_core]->next = (inter_list *) malloc(sizeof(inter_list));
  127. core_inter_tail[one_core] = core_inter_tail[one_core]->next;
  128. }
  129. core_inter_tail[one_core]->type = INIT_CORE;
  130. core_inter_tail[one_core]->next = NULL;
  131. signals_enable();
  132. //kill(pid_num[one_core], SIG_INIT);
  133. scc_kill(one_core, SIG_INIT);
  134. //my_stats.msg_count++;
  135. //my_stats.distance += distance(node_id,one_core);
  136. }
  137. int dummy=0;
  138. while (nodes_initialised != my_cores_count-1) {//pause();
  139. for (i=0; i<1000; i++)
  140. for(j=0; j<1000; j++)
  141. dummy++;
  142. scc_signals_check();
  143. }
  144. state = IDLE_IDAG;
  145. while (state != IDAG_ENDING)
  146. if (state == IDLE_IDAG || state == IDLE_FAR_MAN) {
  147. //pause();
  148. dummy=0;
  149. for (i=0; i<1000; i++)
  150. for(j=0; j<1000; j++)
  151. dummy++;
  152. scc_signals_check();
  153. /*} else if (state == FAR_MAN_CHK_OFFERS) {
  154. signals_disable();
  155. printf("far check alarm went off in idag %d! far_req_or_sender = %d\n",node_id,far_req_or_sender);
  156. cur_time = time(NULL);
  157. cur_t = localtime(&cur_time);
  158. fprintf(log_file, "[%d:%d:%d]: far check alarm went off in idag %d! far_req_or_sender = %d\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,node_id,far_req_or_sender);
  159. fflush(log_file);
  160. tmp_offer_list = far_man_offers;
  161. while (tmp_offer_list != NULL){
  162. fprintf(log_file,"Offer by %d for %d cores\n",tmp_offer_list->sender,tmp_offer_list->off.num_of_cores);
  163. tmp_offer_list = tmp_offer_list->next;
  164. }
  165. fflush(log_file);
  166. if (core_inter_head[far_req_or_sender] == NULL){
  167. core_inter_head[far_req_or_sender] = (inter_list *) malloc(sizeof(inter_list));
  168. core_inter_tail[far_req_or_sender] = core_inter_head[far_req_or_sender];
  169. } else {
  170. core_inter_tail[far_req_or_sender]->next = (inter_list *) malloc(sizeof(inter_list));
  171. core_inter_tail[far_req_or_sender] = core_inter_tail[far_req_or_sender]->next;
  172. }
  173. core_inter_tail[far_req_or_sender]->type = FAR_REQ_OFFER;
  174. //core_inter_tail[far_req_or_sender]->data.my_offer = far_man_offers->off;
  175. if (far_man_offers != NULL)
  176. core_inter_tail[far_req_or_sender]->data.my_offer = far_man_offers->off;
  177. else {
  178. fprintf(log_file,"far_man_offers is null far_list_count = %d\n",far_list_count);
  179. fflush(log_file);
  180. }
  181. core_inter_tail[far_req_or_sender]->next = NULL;
  182. //kill(pid_num[far_req_or_sender],SIG_FAR_REQ);
  183. if (core_inter_head[far_req_or_sender]->next == NULL) {
  184. kill(pid_num[far_req_or_sender],SIG_FAR_REQ);
  185. my_stats.msg_count++;
  186. my_stats.distance += distance(node_id,far_req_or_sender);
  187. } else printf("first i am doing smth else with far_req_or_sender type0=%d type1=%d\n",core_inter_head[far_req_or_sender]->type,core_inter_head[far_req_or_sender]->next->type);
  188. if (selfopt_time_rem != -1) printf("selfopt timer in idag??\n");
  189. state = IDLE_IDAG;
  190. signals_enable();*/
  191. } else {
  192. printf("Uknown state node_id = %d state = %d\n",node_id,state);
  193. state = IDLE_IDAG;
  194. }
  195. //printf("killing inside %d\n",getpid());
  196. tmp_cores_list = my_cores;
  197. my_cores = my_cores->next;
  198. free(tmp_cores_list);
  199. for (; my_cores != NULL; my_cores = my_cores->next){
  200. tmp_cores_list = my_cores;
  201. one_core = my_cores->core_id;
  202. if (core_inter_head[one_core] == NULL){
  203. core_inter_head[one_core] = (inter_list *) malloc(sizeof(inter_list));
  204. core_inter_tail[one_core] = core_inter_head[one_core];
  205. } else {
  206. core_inter_tail[one_core]->next = (inter_list *) malloc(sizeof(inter_list));
  207. core_inter_tail[one_core] = core_inter_tail[one_core]->next;
  208. fprintf(log_file,"I am still doing smth with my node %d interaction = %d\n",one_core,core_inter_head[one_core]->type);
  209. fflush(log_file);
  210. }
  211. core_inter_tail[one_core]->type = TERMINATION_STATS;
  212. core_inter_tail[one_core]->next = NULL;
  213. //kill(pid_num[one_core], SIG_TERMINATE);
  214. scc_kill(one_core, SIG_TERMINATE);
  215. my_stats.msg_count++;
  216. my_stats.distance += distance(node_id,one_core);
  217. free(tmp_cores_list);
  218. }
  219. while (state == IDAG_ENDING) {
  220. //pause();
  221. dummy=0;
  222. for (i=0; i<1000; i++)
  223. for(j=0; j<1000; j++)
  224. dummy++;
  225. scc_signals_check();
  226. if (stats_replied == my_cores_count-1) {
  227. //printf("I am %d and all my cores replied their stats\n",node_id);
  228. core_inter_head[0] = (inter_list *) malloc(sizeof(inter_list));
  229. core_inter_tail[0] = core_inter_head[0];
  230. core_inter_tail[0]->type = REP_STATISTICS;
  231. core_inter_tail[0]->next = NULL;
  232. total_stats.msg_count += my_stats.msg_count;
  233. total_stats.message_size += my_stats.message_size;
  234. total_stats.distance += my_stats.distance;
  235. total_stats.app_turnaround += my_stats.app_turnaround;
  236. total_stats.comp_effort += my_stats.comp_effort;
  237. total_stats.cores_utilized += my_stats.cores_utilized;
  238. total_stats.times_accessed += my_stats.times_accessed;
  239. avg_cluster_util = (float) my_stats.cores_utilized / (my_stats.times_accessed * (my_cores_count-1));
  240. printf("I am %d with cores_utilized = %d times_accessed = %d my_cores_count = %d and avg_cluster_util = %0.2f\n",
  241. node_id,my_stats.cores_utilized,my_stats.times_accessed,my_cores_count,avg_cluster_util);
  242. fprintf(log_file,"cores_utilized = %d times_accessed = %d my_cores_count = %d and avg_cluster_util = %0.2f\n",
  243. my_stats.cores_utilized,my_stats.times_accessed,my_cores_count,avg_cluster_util);
  244. fflush(log_file);
  245. core_inter_tail[0]->data.stats = total_stats;
  246. //kill(pid_num[0], SIG_TERMINATE);
  247. scc_kill(0, SIG_TERMINATE);
  248. my_cores_count = 0;
  249. }
  250. }
  251. //for (i=0; i<Cl_x_max*Cl_y_max-1; i++) wait(NULL);
  252. RCCE_flag_free(&flag_signals_enabled);
  253. RCCE_flag_free(&flag_data_written);
  254. RCCE_free((t_vcharp) sig_array);
  255. RCCE_free((t_vcharp) data_array);
  256. cur_time = time(NULL);
  257. cur_t = localtime(&cur_time);
  258. fprintf(log_file, "[%d:%d:%d]: I ended well\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec);
  259. fclose(log_file);
  260. exit(0);
  261. }