paxos_signal_handlers.c 75 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115
  1. #include "paxos_signal_handlers.h"
  2. #include "my_rtrm.h"
  3. #include "libfunctions.h"
  4. //#include "noc_functions.h"
  5. #include "sig_aux.h"
  6. #include "scc_signals.h"
  7. #include "controller_core.h"
  8. #include "common_core.h"
  9. #include "idag_defs.h"
  10. #include "signal_handlers.h"
  11. #include "variables.h"
  12. #include "macros.h"
  13. #include "structs.h"
  14. int faulty_core = -1;
  15. int first_time = 0;
  16. int pending_workload[2] = {-1,-1};
  17. int proposal_number_personal;
  18. core_states paxos_state;
  19. acceptor_var acceptor_vars = {-1,-1,-1};
  20. proposer_var proposer_vars = {-1,-1,0,0,NULL};
  21. int fail_flag = 0;
  22. struct timeval fail_time_val;
  23. static char local_scen_directory[SCEN_DIR_SIZE], local_scen_num[SCEN_NUM_SIZE];
  24. #ifdef PLAT_SCC
  25. char error_str[64];
  26. int error, str_len, sig_array_local[LINE_SIZE];
  27. #endif
  28. int leader_preference(){
  29. coworkers_list *tmp_cowork_list;
  30. int num_of_coworkers = 0;
  31. if (state == IDLE_CORE){
  32. return 60;
  33. }
  34. else if (state == INIT_MANAGER ||
  35. state == INIT_MANAGER_SEND_OFFERS ||
  36. state == IDLE_INIT_MAN ||
  37. state == INIT_MAN_CHK_OFFERS){
  38. return 50;
  39. }
  40. else if (state == WORKING_NODE && pending_state == NO_PENDING_STATE){
  41. if (coworkers != NULL)
  42. FOR_MY_COWORKERS_LIST num_of_coworkers++;
  43. if (num_of_coworkers == 1)
  44. return 1;
  45. else
  46. return (40 + num_of_coworkers);
  47. }
  48. else if (state == WORKING_NODE && (
  49. pending_state == INIT_MANAGER ||
  50. pending_state == INIT_MANAGER_SEND_OFFERS ||
  51. pending_state == IDLE_INIT_MAN ||
  52. pending_state == INIT_MAN_CHK_OFFERS)){
  53. if (coworkers != NULL)
  54. FOR_MY_COWORKERS_LIST num_of_coworkers++;
  55. if (num_of_coworkers == 1)
  56. return 1;
  57. else{
  58. return (30+num_of_coworkers);
  59. }
  60. }
  61. else if (state == WORKING_NODE && (
  62. pending_state == IDLE_AGENT ||
  63. pending_state == IDLE_AGENT_WAITING_OFF ||
  64. pending_state == AGENT_INIT_STATE ||
  65. pending_state == AGENT_SELF_OPT ||
  66. pending_state == AGENT_SELF_CHK_OFFERS ||
  67. pending_state == AGENT_ENDING ||
  68. pending_state == IDAG_ENDING ||
  69. pending_state == AGENT_ZOMBIE ||
  70. pending_state == AGENT_INIT_APP_INIT ||
  71. pending_state == AGENT_INIT_CHK_OFFERS ||
  72. pending_state == AGENT_INIT_IDLE_INIT ||
  73. pending_state == IDLE_INIT_IDLE_AGENT ||
  74. pending_state == IDLE_INIT_AGENT_SELFOPT ||
  75. pending_state == INIT_CHK_OFFERS_IDLE_AGENT ||
  76. pending_state == INIT_CHK_OFFERS_SELFOPT)){
  77. if (coworkers != NULL)
  78. FOR_MY_COWORKERS_LIST num_of_coworkers++;
  79. if (num_of_coworkers == 1)
  80. return 1;
  81. else{
  82. return (20+num_of_coworkers);
  83. }
  84. }
  85. else if (state == IDLE_AGENT ||
  86. state == IDLE_AGENT_WAITING_OFF ||
  87. state == AGENT_INIT_STATE ||
  88. state == AGENT_SELF_OPT ||
  89. state == AGENT_SELF_CHK_OFFERS ||
  90. state == AGENT_ENDING ||
  91. state == IDAG_ENDING ||
  92. state == AGENT_ZOMBIE){
  93. return 10;
  94. }
  95. return 0;
  96. }
  97. void initialize_PAXOS_data (char scen_directory[SCEN_DIR_SIZE], char scen_num[SCEN_NUM_SIZE]) {
  98. //printf("Initializing Paxos data...\n");
  99. strcpy(local_scen_directory, scen_directory);
  100. strcpy(local_scen_num, scen_num);
  101. }
  102. void sig_PAXOS_INIT_handler(){
  103. int i;
  104. int num_of_coworkers = 0;
  105. inter_list tmp_inter_list;
  106. coworkers_list *tmp_cowork_list;
  107. handler_Enter(-1, "sig_PAXOS_INIT_handler");
  108. paxos_state = PAXOS_ACTIVE;
  109. #ifdef PLAT_SCC
  110. RCCE_wait_until(proposal_number_lock,RCCE_FLAG_UNSET);
  111. RCCE_flag_write(&proposal_number_lock,RCCE_FLAG_SET,node_id);
  112. proposal_number_personal = *proposal_number_global;
  113. *proposal_number_global += 1;
  114. RCCE_flag_write(&proposal_number_lock,RCCE_FLAG_UNSET,node_id);
  115. #else
  116. /* Pick up my personal proposal number */
  117. sem_wait(proposal_number_lock);
  118. proposal_number_personal = *proposal_number_global;
  119. *proposal_number_global += 1;
  120. sem_post(proposal_number_lock);
  121. #endif
  122. fprintf(log_file, "\t\tI am %d, and my proposal number is %d\n",node_id,proposal_number_personal);
  123. tmp_inter_list.next = NULL;
  124. proposer_vars.core_states = (int *)malloc((X_max*Y_max)*sizeof(int));
  125. for (i = 0; i < X_max*Y_max; i++){
  126. proposer_vars.core_states[i] = -1;
  127. }
  128. /* Case where controller fails */
  129. if (idag_mask[faulty_core] == faulty_core){
  130. for (i = 0; i < X_max*Y_max; i++){
  131. if ((i != my_idag) && (idag_mask[i] == my_idag)){
  132. tmp_inter_list.data.proposal_number = proposal_number_personal;
  133. tmp_inter_list.type = PREPARE_REQUEST;
  134. scc_kill(i,SIG_PREPARE_REQUEST,&tmp_inter_list);
  135. }
  136. }
  137. }
  138. /* Case where manager fails */
  139. else{
  140. fprintf(log_file,"\t\tcoworkers List: ");
  141. FOR_MY_COWORKERS_LIST{
  142. fprintf(log_file,"%d --> ", tmp_cowork_list->core_id);
  143. }
  144. printf("\n");
  145. FOR_MY_COWORKERS_LIST{
  146. num_of_coworkers++;
  147. tmp_inter_list.data.proposal_number = proposal_number_personal;
  148. tmp_inter_list.type = PREPARE_REQUEST;
  149. scc_kill(tmp_cowork_list->core_id, SIG_PREPARE_REQUEST, &tmp_inter_list);
  150. }
  151. fprintf(log_file,"num_of_coworkers=%d\n",num_of_coworkers);
  152. printf("num_of_coworkers=%d\n",num_of_coworkers);
  153. }
  154. handler_Exit(-1, "sig_PAXOS_INIT_handler");
  155. return;
  156. }
  157. /************* If a core doesn't receive a signal for a period of time it assumes the
  158. ************** controller has died and sends PREPARE_REQUEST signal
  159. *************/
  160. void sig_PREPARE_REQUEST_handler(int sender_id){
  161. int received_proposal_number = sig_read_ar[2];
  162. inter_list tmp_inter_list;
  163. int score;
  164. handler_Enter(sender_id,"sig_PREPARE_REQUEST_handler");
  165. fprintf(log_file, "\t\tReceived SIG_PREPARE_REQUEST from %d with Proposal Number %d\n", sender_id, received_proposal_number);
  166. /* The acceptor has seen a higher proposal number */
  167. /* Prepare Request Rejected */
  168. if (acceptor_vars.highest_proposed_n > received_proposal_number){
  169. fprintf(log_file, "\t\tI have seen a higher proposal number request --> REJECTED!\n");
  170. return;
  171. }
  172. /* The acceptor has not seen another prepare request before so it promises never to accept a prepare request with proposal number
  173. lower than this one. Replies with PREPARE_ACCEPT_NO_PREVIOUS */
  174. /* UPDATE 13.10.2016 -- Also send my state in order to elect core with minimum utilization */
  175. else if (acceptor_vars.highest_proposed_n == -1){
  176. score = leader_preference();
  177. printf("My score is %d and my state %s\n",score, id2string(state));
  178. fprintf(log_file, "\t\tI haven't accepted any values yet!\n");
  179. acceptor_vars.highest_proposed_n = received_proposal_number;
  180. fprintf(log_file, "\t\tUpdated: acceptor_vars.highest_proposed_n = %d\n", acceptor_vars.highest_proposed_n);
  181. tmp_inter_list.next = NULL;
  182. tmp_inter_list.type = PREPARE_ACCEPT_NO_PREVIOUS;
  183. tmp_inter_list.data.accepted_values[0] = -1;
  184. tmp_inter_list.data.accepted_values[1] = -1;
  185. tmp_inter_list.data.accepted_values[2] = score;
  186. scc_kill(sender_id,SIG_PREPARE_ACCEPT_NO_PREVIOUS,&tmp_inter_list);
  187. }
  188. /* Proposal number received > Highest proposal number seen */
  189. /* Acceptor replies with highest proposal number seen and its value if any and updates the values*/
  190. else if (acceptor_vars.highest_proposed_n < received_proposal_number){
  191. fprintf(log_file, "\t\treceived_proposal_number higher than highest_proposed_n\n");
  192. acceptor_vars.highest_proposed_n = received_proposal_number;
  193. fprintf(log_file, "\t\tUpdated: acceptor_vars.highest_proposed_n = %d\n", acceptor_vars.highest_proposed_n);
  194. /*If the core has accepted any value it sends that back along with the proposal number of this value*/
  195. if (acceptor_vars.highest_acc_value == -1){
  196. fprintf(log_file, "\t\tI haven't accepted any values yet!\n");
  197. tmp_inter_list.next = NULL;
  198. tmp_inter_list.type = PREPARE_ACCEPT_NO_PREVIOUS;
  199. tmp_inter_list.data.accepted_values[0] = -1;
  200. tmp_inter_list.data.accepted_values[1] = -1;
  201. tmp_inter_list.data.accepted_values[2] = score;
  202. scc_kill(sender_id,SIG_PREPARE_ACCEPT_NO_PREVIOUS,&tmp_inter_list);
  203. }else{
  204. fprintf(log_file, "\t\tI have already accepted the value %d!\n", acceptor_vars.highest_acc_value);
  205. tmp_inter_list.next = NULL;
  206. tmp_inter_list.type = PREPARE_ACCEPT;
  207. tmp_inter_list.data.accepted_values[0] = acceptor_vars.highest_acc_n;
  208. tmp_inter_list.data.accepted_values[1] = acceptor_vars.highest_acc_value;
  209. tmp_inter_list.data.accepted_values[2] = score;
  210. scc_kill(sender_id, SIG_PREPARE_ACCEPT, &tmp_inter_list);
  211. }
  212. }
  213. handler_Exit(sender_id,"sig_PREPARE_REQUEST_handler");
  214. return;
  215. }
  216. void sig_PREPARE_ACCEPT_NO_PREVIOUS_handler(int sender_id){
  217. int k,i;
  218. int num_of_coworkers;
  219. int replied_score = sig_read_ar[4];
  220. int max_score,index;
  221. inter_list tmp_inter_list;
  222. coworkers_list *tmp_cowork_list;
  223. handler_Enter(sender_id,"sig_PREPARE_ACCEPT_NO_PREVIOUS_handler");
  224. proposer_vars.core_states[sender_id] = replied_score;
  225. proposer_vars.cores_promised++;
  226. fprintf(log_file, "\t\t Updated state of %d to %s\n", sender_id, id2string(proposer_vars.core_states[sender_id]));
  227. tmp_inter_list.next = NULL;
  228. /* Case where controller fails */
  229. if (idag_mask[faulty_core] == faulty_core){
  230. /*Received reply from majority */
  231. if (proposer_vars.cores_promised >= majority(CLUSTER_SIZE)){
  232. if (PREPARE_ACCEPT_SENT == 0){
  233. for (i=0; i < X_max*Y_max;i++){
  234. if (proposer_vars.core_states[i] > max_score){
  235. max_score = proposer_vars.core_states[i];
  236. index = i;
  237. }
  238. }
  239. PREPARE_ACCEPT_SENT = 1;
  240. fprintf(log_file,"\t\t-------- LAST KNOWN CORE STATES --------\n");
  241. FOR_NUES{
  242. fprintf(log_file,"\t\t%d --> %s\n",k,id2string(proposer_vars.core_states[k]));
  243. }
  244. fprintf(log_file,"\t\t----------------------------------------\n");
  245. fprintf(log_file,"\n");
  246. //TODO Decide proposing core based on core states
  247. fprintf(log_file, "\t\tCONTROLLER CASE ; RECEIVED ACCEPT FROM MAJORITY!\n");
  248. for (k = 0; k < X_max*Y_max; k++){
  249. /* I send SIG_ACCEPT_REQUEST to cores inside my cluster */
  250. if ((k != my_idag) && (idag_mask[k] == my_idag)){
  251. tmp_inter_list.type = ACCEPT_REQUEST;
  252. tmp_inter_list.data.accepted_values[PROP_NW] = proposal_number_personal;
  253. /*The acceptor hasn't received any reply with accepted value so it will propose itself for leader*/
  254. if (proposer_vars.highest_replied_value == -1){
  255. /*if (im_manager() != 1){
  256. fprintf(log_file,"\t\tI am not a manager. I propose myself as the new controller\n");
  257. proposer_vars.highest_replied_value = node_id;
  258. tmp_inter_list.data.accepted_values[VALUE_W] = proposer_vars.highest_replied_value;
  259. }else{
  260. fprintf(log_file,"\t\tI am a manager. I propose a worker of mine as the new controller.That is %d\n",my_cores->next->core_id);
  261. tmp_inter_list.data.accepted_values[VALUE_W] = my_cores->next->core_id;
  262. }*/
  263. /* RANDOM PAXOS */
  264. //proposer_vars.highest_replied_value = node_id;
  265. //tmp_inter_list.data.accepted_values[VALUE_W] = node_id;
  266. /****************/
  267. /* MODIFIED PAXOS */
  268. proposer_vars.highest_replied_value = index;
  269. tmp_inter_list.data.accepted_values[VALUE_W] = index;
  270. /****************/
  271. }else{
  272. tmp_inter_list.data.accepted_values[VALUE_W] = proposer_vars.highest_replied_value;
  273. }
  274. scc_kill(k,SIG_ACCEPT_REQUEST,&tmp_inter_list);
  275. }
  276. }
  277. }
  278. }
  279. /* Case where manager fails */
  280. }else{
  281. num_of_coworkers = 0;
  282. FOR_MY_COWORKERS_LIST num_of_coworkers++;
  283. fprintf(log_file,"num_of_coworkers=%d\n",num_of_coworkers);
  284. if (proposer_vars.cores_promised > majority(num_of_coworkers)){
  285. if (PREPARE_ACCEPT_SENT == 0){
  286. PREPARE_ACCEPT_SENT = 1;
  287. fprintf(log_file, "\t\tMANAGER CASE ; RECEIVED ACCEPT FROM MAJORITY!\n");
  288. FOR_MY_COWORKERS_LIST{
  289. tmp_inter_list.type = ACCEPT_REQUEST;
  290. tmp_inter_list.data.accepted_values[PROP_NW] = proposal_number_personal;
  291. if (proposer_vars.highest_replied_value == -1)
  292. proposer_vars.highest_replied_value = node_id;
  293. tmp_inter_list.data.accepted_values[VALUE_W] = proposer_vars.highest_replied_value;
  294. scc_kill(tmp_cowork_list->core_id,SIG_ACCEPT_REQUEST,&tmp_inter_list);
  295. }
  296. }
  297. }
  298. }
  299. handler_Exit(sender_id,"sig_PREPARE_ACCEPT_NO_PREVIOUS_handler");
  300. return;
  301. }
  302. /************* The other cores would either accept its proposal if they haven't seen
  303. ************** a higher proposal number
  304. *************/
  305. void sig_PREPARE_ACCEPT_handler(int sender_id){
  306. int replied_proposal_number = sig_read_ar[PROP_NR];
  307. int replied_value = sig_read_ar[VALUE_R];
  308. int k;
  309. int num_of_coworkers;
  310. inter_list tmp_inter_list;
  311. coworkers_list *tmp_cowork_list;
  312. handler_Enter(sender_id, "sig_PREPARE_ACCEPT_handler");
  313. proposer_vars.core_states[sender_id] = sig_read_ar[4];
  314. proposer_vars.cores_promised++;
  315. fprintf(log_file, "\t\t Updated state of %d to %s\n", sender_id, id2string(proposer_vars.core_states[sender_id]));
  316. tmp_inter_list.next = NULL;
  317. /*Save the values if proposal number is higher than the highest replied so far*/
  318. if (replied_proposal_number > proposer_vars.highest_replied_n){
  319. proposer_vars.highest_replied_n = replied_proposal_number;
  320. proposer_vars.highest_replied_value = replied_value;
  321. fprintf(log_file, "\t\t Updated proposer_vars.highest_replied_n = %d\n", proposer_vars.highest_replied_n);
  322. fprintf(log_file, "\t\t Updated proposer_vars.highest_replied_value = %d\n", proposer_vars.highest_replied_value);
  323. }
  324. /* Case where controller fails */
  325. if (idag_mask[faulty_core] == faulty_core){
  326. if (proposer_vars.cores_promised >= majority(CLUSTER_SIZE)){
  327. if (PREPARE_ACCEPT_SENT == 0){
  328. PREPARE_ACCEPT_SENT = 1;
  329. fprintf(log_file,"\t\t-------- LAST KNOWN CORE STATES --------\n");
  330. FOR_NUES{
  331. fprintf(log_file,"\t\t%d --> %s\n",k,id2string(proposer_vars.core_states[k]));
  332. }
  333. fprintf(log_file,"\n");
  334. fprintf(log_file,"\t\t----------------------------------------\n");
  335. fprintf(log_file, "\t\tRECEIVED ACCEPT FROM MAJORITY!\n");
  336. for (k = 0; k < X_max*Y_max; k++){
  337. if ((k != my_idag) && (idag_mask[k] == my_idag)){
  338. tmp_inter_list.type = ACCEPT_REQUEST;
  339. tmp_inter_list.data.accepted_values[PROP_NW] = proposal_number_personal;
  340. tmp_inter_list.data.accepted_values[VALUE_W] = proposer_vars.highest_replied_value;
  341. scc_kill(k,SIG_ACCEPT_REQUEST,&tmp_inter_list);
  342. }
  343. }
  344. }
  345. }
  346. /* Case where manager fails */
  347. }else{
  348. num_of_coworkers = 0;
  349. FOR_MY_COWORKERS_LIST num_of_coworkers++;
  350. fprintf(log_file,"num_of_coworkers=%d\n",num_of_coworkers);
  351. if (proposer_vars.cores_promised >= majority(num_of_coworkers)){
  352. if (PREPARE_ACCEPT_SENT == 0){
  353. PREPARE_ACCEPT_SENT = 1;
  354. fprintf(log_file, "\t\tRECEIVED ACCEPT FROM MAJORITY!\n");
  355. FOR_MY_COWORKERS_LIST{
  356. tmp_inter_list.type = ACCEPT_REQUEST;
  357. tmp_inter_list.data.accepted_values[PROP_NW] = proposal_number_personal;
  358. tmp_inter_list.data.accepted_values[VALUE_W] = proposer_vars.highest_replied_value;
  359. scc_kill(tmp_cowork_list->core_id,SIG_ACCEPT_REQUEST,&tmp_inter_list);
  360. }
  361. }
  362. }
  363. }
  364. handler_Exit(sender_id, "sig_PREPARE_ACCEPT_handler");
  365. return;
  366. }
  367. void sig_ACCEPT_REQUEST_handler(int sender_id){
  368. int proposer_proposal_number = sig_read_ar[PROP_NR];
  369. inter_list tmp_inter_list;
  370. handler_Enter(sender_id,"sig_ACCEPT_REQUEST_handler");
  371. tmp_inter_list.next = NULL;
  372. if (proposer_proposal_number < acceptor_vars.highest_proposed_n){
  373. fprintf(log_file, "\t\t ACCEPT_REQUEST proposal number lower than highest_proposed_n %d -> REJECTED\n", acceptor_vars.highest_proposed_n);
  374. }else{
  375. acceptor_vars.highest_acc_n = proposer_proposal_number;
  376. fprintf(log_file, "\t\t Updated acceptor_vars.highest_acc_n = %d\n", acceptor_vars.highest_acc_n);
  377. acceptor_vars.highest_acc_value = sig_read_ar[VALUE_R];
  378. fprintf(log_file, "\t\t Updated acceptor_vars.highest_acc_value = %d\n", acceptor_vars.highest_acc_value);
  379. acceptor_vars.highest_proposed_n = proposer_proposal_number;
  380. fprintf(log_file, "\t\t Updated acceptor_vars.highest_proposed_n = %d\n", acceptor_vars.highest_proposed_n);
  381. tmp_inter_list.type = ACCEPTED;
  382. tmp_inter_list.data.accepted_values[VALUE_W] = acceptor_vars.highest_acc_value;
  383. scc_kill(sender_id, SIG_ACCEPTED, &tmp_inter_list);
  384. }
  385. handler_Exit(sender_id,"sig_ACCEPT_REQUEST_handler");
  386. return;
  387. }
  388. void sig_ACCEPTED_handler(int sender_id){
  389. int k;
  390. int received_value = sig_read_ar[VALUE_R];
  391. int num_of_coworkers;
  392. inter_list tmp_inter_list;
  393. coworkers_list *tmp_cowork_list;
  394. handler_Enter(sender_id,"sig_ACCEPTED_handler");
  395. proposer_vars.cores_accepted++;
  396. tmp_inter_list.next = NULL;
  397. /****************************************************************/
  398. /***************** Case where controller failed *****************/
  399. /****************************************************************/
  400. if (idag_mask[faulty_core] == faulty_core && idag_mask[faulty_core] != -1){
  401. if ((proposer_vars.cores_accepted >= majority(CLUSTER_SIZE)) && (SIG_LEARN_SENT == 0)){
  402. fprintf(log_file, "\t\tRECEIVED ACCEPTED FROM MAJORITY!\n");
  403. SIG_LEARN_SENT = 1;
  404. tmp_inter_list.type = LEARN;
  405. tmp_inter_list.data.learn_ack_info[VALUE_W] = received_value;
  406. tmp_inter_list.data.learn_ack_info[PREV_CW] = faulty_core;
  407. scc_kill(node_id,SIG_LEARN,&tmp_inter_list);
  408. for (k = 0; k < X_max*Y_max; k++){
  409. if ((k != my_idag) && (k != node_id)){
  410. tmp_inter_list.type = LEARN;
  411. tmp_inter_list.data.learn_ack_info[VALUE_W] = received_value;
  412. tmp_inter_list.data.learn_ack_info[PREV_CW] = faulty_core;
  413. scc_kill(k,SIG_LEARN,&tmp_inter_list);
  414. }
  415. }
  416. }
  417. /****************************************************************/
  418. /******************* Case where manager failed ******************/
  419. /****************************************************************/
  420. }else{
  421. num_of_coworkers=0;
  422. FOR_MY_COWORKERS_LIST num_of_coworkers++;
  423. printf("num_of_coworkers=%d\n",num_of_coworkers);
  424. if ((proposer_vars.cores_accepted >= majority(num_of_coworkers)) && (SIG_LEARN_SENT == 0)){
  425. fprintf(log_file, "\t\t MANAGER CASE ; RECEIVED ACCEPTED FROM MAJORITY!\n");
  426. SIG_LEARN_SENT = 1;
  427. tmp_inter_list.type = LEARN;
  428. tmp_inter_list.data.learn_ack_info[VALUE_W] = received_value;
  429. tmp_inter_list.data.learn_ack_info[PREV_CW] = faulty_core;
  430. scc_kill(node_id,SIG_LEARN,&tmp_inter_list);
  431. for (k = 0; k < X_max*Y_max; k++){
  432. if (k != node_id && k != faulty_core){
  433. tmp_inter_list.type = LEARN;
  434. tmp_inter_list.data.learn_ack_info[VALUE_W] = received_value;
  435. tmp_inter_list.data.learn_ack_info[PREV_CW] = faulty_core;
  436. scc_kill(k,SIG_LEARN,&tmp_inter_list);
  437. }
  438. }
  439. }
  440. }
  441. handler_Exit(sender_id,"sig_ACCEPTED_handler");
  442. return;
  443. }
  444. void sig_LEARN_handler(int sender_id){
  445. int received_value = sig_read_ar[VALUE_R];
  446. int failed_core = sig_read_ar[PREV_CR];
  447. int k;
  448. int i;
  449. int cluster_idag;
  450. int counter;
  451. int selfopt_r;
  452. int failed_interaction = 0; /* 0 nothing, 1 init_search, 2 manager_search */
  453. int one_core;
  454. core_list *tmp_core_list;
  455. core_list *tmp_core_list_prev;
  456. DDS_list *tmp_dds;
  457. DDS_list *tmp_prev_dds;
  458. inter_list tmp_inter_list;
  459. coworkers_list *tmp_cowork_list;
  460. inter_list *tmp_inter;
  461. core_states new_state = NO_PENDING_STATE;
  462. handler_Enter(sender_id,"sig_LEARN_handler");
  463. fprintf(log_file,"\t\t Received_value = %d and failed_core=%d\n",received_value,failed_core);
  464. suspected[received_value] = -1;
  465. //faulty_core = failed_core;
  466. /****************************************************************/
  467. /***************** Case where controller failed *****************/
  468. /****************************************************************/
  469. i = 0;
  470. /* FIXED IDs */
  471. for (i=0; i < X_max*Y_max;i++)
  472. if (idag_mask[i] == failed_core && i != failed_core){
  473. if (i == node_id)
  474. printf("I am the new controller and my current state is: %s\n\n",id2string(state));
  475. break;
  476. }
  477. if (idag_mask[node_id] == idag_mask[failed_core])
  478. printf("%d : %s\n",node_id,id2string(state));
  479. exit(0);
  480. /* I am checking the interactions i had with the new controller */
  481. if (failed_core != node_id && idag_mask[failed_core] != -1){
  482. if (core_inter_head[failed_core] == NULL){
  483. fprintf(log_file,"\t\tI had no interactions with failed core %d\n", failed_core);
  484. }else{
  485. fprintf(log_file, "\t\tMy interactions with failed core %d were:\n",failed_core);
  486. for (tmp_inter = core_inter_head[failed_core]; tmp_inter != NULL; tmp_inter = tmp_inter->next){
  487. fprintf(log_file,"\t\t\t%d. %s\n",i,inter2string(tmp_inter->type));
  488. if (tmp_inter->type == IDAG_FIND_IDAGS ||
  489. tmp_inter->type == IDAG_FIND_IDAGS_PENDING){
  490. failed_interaction = 1;
  491. }
  492. if (tmp_inter->type == SELFOPT_IDAG_FIND_IDAGS ||
  493. tmp_inter->type == SELFOPT_IDAG_FIND_IDAGS_PENDING){
  494. failed_interaction = 2;
  495. }
  496. i++;
  497. }
  498. }
  499. }
  500. if (core_inter_head[failed_core] != NULL &&
  501. (core_inter_head[failed_core]->type == IDAG_REQ_DDS ||
  502. core_inter_head[failed_core]->type == IDAG_REQ_DDS_PENDING ||
  503. core_inter_head[failed_core]->type == SELFOPT_IDAG_REQ_DDS ||
  504. core_inter_head[failed_core]->type == SELFOPT_IDAG_REQ_DDS_PENDING)){
  505. trigger_shit(failed_core);
  506. }
  507. /* Controller Failure and First Time i receive SIG_LEARN */
  508. if (idag_mask[failed_core] == failed_core && idag_mask[failed_core] != -1){
  509. /* Update idag_mask and idag_id_arr in any subcase */
  510. fprintf(log_file,"\t\tUpdating idag_mask and idag_id_arr with new controller %d... ",received_value);
  511. for (k = 0; k < X_max*Y_max; k++){
  512. if (idag_mask[k] == failed_core)
  513. idag_mask[k] = received_value;
  514. }
  515. for (k = 0; k < num_idags; k++){
  516. if (idag_id_arr[k] == failed_core)
  517. idag_id_arr[k] = received_value;
  518. }
  519. idag_mask[failed_core] = -1;
  520. fprintf(log_file,"DONE\n\n");
  521. fprintf(log_file,"\t\tMy interactions with the new controller are:\n");
  522. i = 0;
  523. tmp_inter = core_inter_head[received_value];
  524. while (tmp_inter != NULL && node_id != received_value)
  525. {
  526. fprintf(log_file, "\t\t\t%d. %s...\n",i,inter2string(tmp_inter->type));
  527. tmp_inter = tmp_inter->next;
  528. i++;
  529. }
  530. /***** I am a newly elected controller *****/
  531. coworkers_list *tmp_cowork_list;
  532. int num_of_coworkers;
  533. if (node_id == received_value){
  534. if (tmp_cowork_list != NULL)
  535. FOR_MY_COWORKERS_LIST num_of_coworkers++;
  536. printf("I am the new controller : %d -- Current state : %s - %d!\n", received_value,id2string(state),num_of_coworkers);
  537. fprintf(log_file, "\t\tI am the new controller : %d -- Current state : %s!\n", received_value,id2string(state));
  538. my_idag = -1;
  539. /***** Create my cores list *****/
  540. if (my_cores != NULL){
  541. printf("my_cores list is not NULL...\n");
  542. fprintf(log_file,"\t\tmy_cores list is not NULL...\n");
  543. for (tmp_core_list = my_cores->next; tmp_core_list != NULL; tmp_core_list=tmp_core_list->next){
  544. printf("\t\tCore_id : %d | Offered_to : %d ... %sREMOVED%s\n",my_cores->core_id,my_cores->offered_to,KRED,KNRM);
  545. fprintf(log_file,"\t\t\tCore_id : %d | Offered_to : %d ... %sREMOVED%s\n",my_cores->core_id,my_cores->offered_to,KRED,KNRM);
  546. free(my_cores);
  547. my_cores = tmp_core_list;
  548. }
  549. printf("\t\tCore_id : %d | Offered_to : %d ... %sREMOVED%s\n",my_cores->core_id,my_cores->offered_to,KRED,KNRM);
  550. fprintf(log_file,"\t\t\tCore_id : %d | Offered_to : %d ... %sREMOVED%s\n",my_cores->core_id,my_cores->offered_to,KRED,KNRM);
  551. free(my_cores);
  552. my_cores = NULL;
  553. }
  554. if (my_cores == NULL){
  555. printf("\t\tCreating my_cores list... ");
  556. fprintf(log_file,"\t\tCreating my_cores list... ");
  557. my_cores = (core_list *) malloc(sizeof(core_list));
  558. my_cores_count = 0;
  559. if (my_cores != NULL){
  560. printf("%sSuccess!%s\n",KGRN,KNRM);
  561. my_cores_tail = my_cores;
  562. my_cores_count++;
  563. my_cores_tail->core_id = node_id;
  564. my_cores_tail->offered_to = -1;
  565. my_cores_tail->next = NULL;
  566. my_cores_tail->workload[0] = -1;
  567. my_cores_tail->workload[1] = -1;
  568. printf("\t\t\tAdded Core_id : %d | Offered_to : %d\n",my_cores_tail->core_id,my_cores_tail->offered_to);
  569. fprintf(log_file,"\t\t\tAdded Core_id : %d | Offered_to : %d\n",my_cores_tail->core_id,my_cores_tail->offered_to);
  570. for (i = 0; i < X_max*Y_max; i++){
  571. if (idag_mask[i] == node_id && i != node_id){
  572. my_cores_tail->next = (core_list *) malloc(sizeof(core_list));
  573. if (my_cores_tail->next != NULL){
  574. my_cores_tail = my_cores_tail->next;
  575. my_cores_tail->next = NULL;
  576. my_cores_tail->core_id = i;
  577. my_cores_tail->offered_to = -1;
  578. printf("\t\t\tAdded Core_id : %d | Offered_to : %d\n",my_cores_tail->core_id,my_cores_tail->offered_to);
  579. fprintf(log_file,"\t\t\tAdded Core_id : %d | Offered_to : %d\n",my_cores_tail->core_id,my_cores_tail->offered_to);
  580. my_cores_count++;
  581. }else printf("--%d-- error allocating memory for my_cores\n",node_id);
  582. }
  583. }
  584. }else{
  585. printf("--%d-- error allocating memory for my_cores\n",node_id);
  586. fprintf(log_file, "--%d-- error allocating memory for my_cores\n",node_id);
  587. }
  588. }
  589. /***** Create my DDS List *****/
  590. if (DDS == DDS_tail && DDS != NULL){
  591. printf("\t\tDDS = DDS_tail with value: \n\t\t\tAgent_id : %d | Cores in cluster : %d\n",DDS->agent_id,DDS->num_of_cores);
  592. printf("\t\tReconfiguring DDS...\n");
  593. fprintf(log_file,"\t\tDDS = DDS_tail with value: \n\t\t\tAgent_id : %d | Cores in cluster : %d\n",DDS->agent_id,DDS->num_of_cores);
  594. fprintf(log_file,"\t\tReconfiguring DDS...\n");
  595. free(DDS);
  596. DDS = NULL;
  597. DDS_count = 0;
  598. }
  599. if (DDS == NULL){
  600. printf("\t\tCreating DDS list... ");
  601. fflush(stdout);
  602. DDS = (DDS_list *) malloc(sizeof(DDS_list));
  603. DDS_count = 0;
  604. if (DDS != NULL){
  605. printf("%sSuccess!%s\n",KGRN,KNRM);
  606. fflush(stdout);
  607. DDS->agent_id = node_id;
  608. DDS->next = NULL;
  609. DDS_tail = DDS;
  610. DDS_count++;
  611. DDS->num_of_cores = my_cores_count;
  612. printf("\t\t\tAdded Agent_id : %d | Cores in cluster : %d\n\n",DDS->agent_id, DDS->num_of_cores);
  613. fprintf(log_file,"\t\t\tAdded Agent_id : %d | Cores in cluster : %d\n\n",DDS->agent_id, DDS->num_of_cores);
  614. }else{
  615. printf("--%d-- error allocating memory for my_cores\n",node_id);
  616. fprintf(log_file,"--%d-- error allocating memory for my_cores\n",node_id);
  617. }
  618. }else{
  619. DDS_list *tmp_dds;
  620. printf("\t\tDDS list of %d:\n",node_id);
  621. fprintf(log_file,"\t\tDDS list of %d:\n",node_id);
  622. tmp_dds = DDS;
  623. while (tmp_dds != NULL){
  624. printf("\t\t\tAgent_id : %d | Cores in cluster : %d\n",tmp_dds->agent_id, tmp_dds->num_of_cores);
  625. fprintf(log_file,"\t\t\tAgent_id : %d | Cores in cluster : %d\n",tmp_dds->agent_id, tmp_dds->num_of_cores);
  626. tmp_dds = tmp_dds->next;
  627. }
  628. printf("\n\n");
  629. }
  630. paxos_state = NEW_IDAG;
  631. fprintf(log_file,"Changed Paxos State to %s\n", id2string(paxos_state));
  632. }
  633. /***** I am in the same cluster as the new controller *****/
  634. else if ((my_idag != -1) && (my_idag == failed_core)){
  635. fprintf(log_file, "\t\t I am in the same cluster as %d. My new controller is %d\n", sender_id, received_value);
  636. my_idag = received_value;
  637. if (failed_interaction == 1) {
  638. fprintf(log_file,"I have to resend signal SIG_IDAG_FIND_IDAGS to new controller\n");
  639. if (core_inter_head[my_idag] == NULL){
  640. core_inter_head[my_idag] = (inter_list *) malloc(sizeof(inter_list));
  641. core_inter_tail[my_idag] = core_inter_head[my_idag];
  642. } else {
  643. core_inter_tail[my_idag]->next = (inter_list *) malloc(sizeof(inter_list));
  644. core_inter_tail[my_idag] = core_inter_tail[my_idag]->next;
  645. }
  646. selfopt_r = (int) (1.5 * (X_max / num_idags_x));
  647. core_inter_tail[my_idag]->type = IDAG_FIND_IDAGS_PENDING;
  648. core_inter_tail[my_idag]->data.reg.C = node_id-1;
  649. core_inter_tail[my_idag]->data.reg.r = selfopt_r;
  650. core_inter_tail[my_idag]->next = NULL;
  651. if (core_inter_head[my_idag]->next == NULL) {
  652. paxos_node_stats.msg_count++;
  653. scc_kill(my_idag, SIG_IDAG_FIND_IDAGS, core_inter_head[my_idag]);
  654. } else {
  655. fprintf(log_file, "paxos_signal_handlers.c: Did not send idag_find_idags with interaction %s no2 %s\n",inter2string(core_inter_head[my_idag]->type),inter2string(core_inter_head[my_idag]->next->type));
  656. }
  657. if (selfopt_time_rem != -1) {
  658. selfopt_time_rem = my_gettimer();
  659. if (selfopt_time_rem > 0)
  660. my_settimer(0);
  661. }
  662. if (pending_state == WORKING_NODE) {
  663. fprintf(log_file, "I change to working idle init\n");
  664. state = WORKING_NODE;
  665. pending_state = IDLE_INIT_MAN;
  666. }else{
  667. state = IDLE_INIT_MAN;
  668. }
  669. }
  670. if (failed_interaction == 2){
  671. if (core_inter_head[my_idag] == NULL){
  672. core_inter_head[my_idag] = (inter_list *) malloc(sizeof(inter_list));
  673. core_inter_tail[my_idag] = core_inter_head[my_idag];
  674. } else {
  675. core_inter_tail[my_idag]->next = (inter_list *) malloc(sizeof(inter_list));
  676. core_inter_tail[my_idag] = core_inter_tail[my_idag]->next;
  677. }
  678. selfopt_r = (int) (1.5 * (X_max / num_idags_x));
  679. core_inter_tail[my_idag]->type = SELFOPT_IDAG_FIND_IDAGS_PENDING;
  680. core_inter_tail[my_idag]->data.reg.C = node_id;
  681. core_inter_tail[my_idag]->data.reg.r = selfopt_r;
  682. core_inter_tail[my_idag]->next = NULL;
  683. if (core_inter_head[my_idag]->next == NULL){
  684. paxos_node_stats.msg_count++;
  685. scc_kill(my_idag, SIG_IDAG_FIND_IDAGS, core_inter_head[my_idag]);
  686. } else {
  687. fprintf(log_file,"paxos_signal_handlers.c: Did not send sig_find_idags with inter1 = %s, inter2 = %s\n",inter2string(core_inter_head[my_idag]->type),inter2string(core_inter_head[my_idag]->next->type));
  688. }
  689. if (selfopt_interval != MAX_SELF_OPT_INTERVAL_MS){
  690. selfopt_interval = 2 * selfopt_interval;
  691. }else{
  692. selfopt_interval = -1;
  693. }
  694. new_state = IDLE_AGENT_WAITING_OFF;
  695. }
  696. }
  697. /***** I am a controller so i have to update idag_id_arr and reply with LEARN_ACK_CONTR *****/
  698. /*else if (im_controller() == 1){
  699. for (k = 0; k < num_idags; k++)
  700. if (idag_id_arr[k] == failed_core)
  701. idag_id_arr[k] = received_value;
  702. else if (idag_id_arr[k] == node_id)
  703. tmp_inter_list.data.controller_index = k;
  704. fprintf(log_file, "\t\tI am a Controller. Changed controller %d with %d\n",failed_core,sender_id);
  705. tmp_inter_list.next = NULL;
  706. tmp_inter_list.type = LEARN_ACK_CONTR;
  707. scc_kill(received_value,SIG_LEARN_ACK_CONTR,&tmp_inter_list);
  708. fprintf(log_file, "\t\tUpdated idag_id_arr\n");
  709. }*/
  710. /* I am a manager so i have to reply with ADD_AGENT_TO_DDS. In addition if the new controller was a worker of mine i remove him from my_cores list */
  711. if (im_manager() == 1){
  712. if (new_state == IDLE_AGENT_WAITING_OFF) state = IDLE_AGENT_WAITING_OFF;
  713. counter = 0;
  714. FOR_MY_CORES_LIST{
  715. cluster_idag = idag_mask[tmp_core_list->core_id];
  716. if (cluster_idag == received_value){
  717. fprintf(log_file,"\t\tI am manager %d and my core %d utilizes in cluster with idag %d\n", node_id, tmp_core_list->core_id,cluster_idag);
  718. /* if the new controller was a worker of mine i do not send him */
  719. if (tmp_core_list->core_id != received_value)
  720. tmp_inter_list.data.workers_info[++counter] = tmp_core_list->core_id;
  721. }
  722. }
  723. if (counter > 0){
  724. tmp_inter_list.next = NULL;
  725. tmp_inter_list.type = ADD_TO_DDS;
  726. tmp_inter_list.data.workers_info[0] = counter;
  727. fprintf(log_file,"\t\tNUMBER OF WORKERS: %d\n", counter);
  728. scc_kill(received_value,SIG_ADD_TO_DDS,&tmp_inter_list);
  729. }
  730. /***** If the new controller was a worker of mine i remove him from my_cores list and reappoing workload *****/
  731. tmp_core_list = my_cores->next;
  732. tmp_core_list_prev = my_cores;
  733. while (tmp_core_list != NULL){
  734. if (tmp_core_list->core_id == received_value){
  735. fprintf(log_file,"\t\tNew controller was a worker of mine! I remove him from my cores_list!\n");
  736. my_cores_count--;
  737. tmp_core_list_prev->next = tmp_core_list->next;
  738. pending_workload[0] = tmp_core_list->workload[0];
  739. pending_workload[1] = tmp_core_list->workload[1];
  740. fprintf(log_file,"Pending workload of new controller was: %d %d\n", pending_workload[0], pending_workload[1]);
  741. free(tmp_core_list);
  742. if (pending_workload[0] != -1 || pending_workload[1] != -1){
  743. //active_working_cores--;
  744. tmp_core_list = my_cores->next;
  745. fprintf(log_file,"\t\tI am reassigning the workload!\n");
  746. fprintf(log_file,"\t\t-------- CURRENT WORKLOADS --------\n");
  747. while (tmp_core_list != NULL){
  748. fprintf(log_file,"\t\t%d\t|\t%d\t|\t%d\n", tmp_core_list->core_id, tmp_core_list->workload[0], tmp_core_list->workload[1]);
  749. printf("\t\t%d\t|\t%d\t|\t%d\n", tmp_core_list->core_id, tmp_core_list->workload[0], tmp_core_list->workload[1]);
  750. tmp_core_list = tmp_core_list->next;
  751. }
  752. fprintf(log_file,"\t\t-----------------------------------\n");
  753. tmp_core_list = my_cores->next;
  754. while (tmp_core_list != NULL){
  755. if ((tmp_core_list->workload[0] == -1) && (tmp_core_list->workload[1] == -1)){
  756. one_core = tmp_core_list->core_id;
  757. fprintf(log_file,"\t\tpaxos_signal_handlers.c : I have pending workload %d | %d\n",pending_workload[0],pending_workload[1]);
  758. fprintf(log_file,"\t\tpaxos_signal_handlers.c : I am assigning workload to %d\n",one_core);
  759. if (core_inter_head[one_core] == NULL){
  760. core_inter_head[one_core] = (inter_list *) malloc(sizeof(inter_list));
  761. core_inter_tail[one_core] = core_inter_head[one_core];
  762. } else {
  763. core_inter_tail[one_core]->next = (inter_list *) malloc(sizeof(inter_list));
  764. core_inter_tail[one_core] = core_inter_tail[one_core]->next;
  765. }
  766. core_inter_tail[one_core]->type = APPOINT_WORK_NODE;
  767. core_inter_tail[one_core]->data.work_bounds[0] = pending_workload[0];
  768. core_inter_tail[one_core]->data.work_bounds[1] = pending_workload[1];
  769. fprintf(app_log_file,"%d (%d, %d), ",one_core,core_inter_tail[one_core]->data.work_bounds[0],core_inter_tail[one_core]->data.work_bounds[1]);
  770. core_inter_tail[one_core]->next = NULL;
  771. if (core_inter_head[one_core]->next == NULL) {
  772. paxos_node_stats.msg_count++;
  773. scc_kill(one_core, SIG_APPOINT_WORK, core_inter_head[one_core]);
  774. } else {
  775. fprintf(log_file,"I am doing smth else with my working node %d in init inter1 = %d inter2 = %d\n",one_core,core_inter_head[one_core]->type,core_inter_head[one_core]->next->type);
  776. printf("ASDASDASDASDASDAS\n");
  777. }
  778. pending_workload[0] = -1;
  779. pending_workload[1] = -1;
  780. break;
  781. }
  782. tmp_core_list = tmp_core_list->next;
  783. }
  784. break;
  785. }
  786. }else{
  787. tmp_core_list = tmp_core_list->next;
  788. tmp_core_list_prev = tmp_core_list_prev->next;
  789. }
  790. }
  791. }
  792. /****************************************************************/
  793. /******************* Case where manager failed ******************/
  794. /****************************************************************/
  795. }
  796. else{
  797. /***** I am the newly elected manager *****/
  798. if (node_id == received_value && idag_mask[failed_core] != -1){
  799. idag_mask[failed_core] = -1;
  800. printf("I am the new manager : %d -- Current state : %s!\n", received_value,id2string(state));
  801. if (my_cores != NULL){
  802. printf("my_cores list is not NULL...\n");
  803. for (tmp_core_list = my_cores->next; tmp_core_list != NULL; tmp_core_list=tmp_core_list->next){
  804. printf("\t\tCore_id : %d | Offered_to : %d ... %sREMOVED%s\n",my_cores->core_id,my_cores->offered_to,KRED,KNRM);
  805. free(my_cores);
  806. my_cores = tmp_core_list;
  807. }
  808. printf("\t\tCore_id : %d | Offered_to : %d ... %sREMOVED%s\n",my_cores->core_id,my_cores->offered_to,KRED,KNRM);
  809. free(my_cores);
  810. my_cores = NULL;
  811. }
  812. if (my_cores == NULL){
  813. printf("\t\tCreating my_cores list... ");
  814. fflush(stdout);
  815. my_cores = (core_list *) malloc(sizeof(core_list));
  816. my_cores_count = 0;
  817. if (my_cores != NULL){
  818. printf("%sSuccess!%s\n",KGRN,KNRM);
  819. my_cores_tail = my_cores;
  820. my_cores_tail->core_id = node_id;
  821. my_cores_tail->offered_to = -1;
  822. my_cores_tail->workload[0] = -1;
  823. my_cores_tail->workload[1] = -1;
  824. my_cores_tail->next = NULL;
  825. my_cores_count++;
  826. printf("\t\t\tAdded Core_id : %d | Offered_to : %d\n",my_cores_tail->core_id,my_cores_tail->offered_to);
  827. FOR_MY_COWORKERS_LIST{
  828. my_cores_tail->next = (core_list *) malloc(sizeof(core_list));
  829. if (my_cores_tail->next != NULL){
  830. my_cores_tail = my_cores_tail->next;
  831. my_cores_tail->next = NULL;
  832. my_cores_tail->core_id = tmp_cowork_list->core_id;
  833. my_cores_tail->offered_to = -1;
  834. my_cores_tail->workload[0] = -1;
  835. my_cores_tail->workload[1] = -1;
  836. printf("\t\t\tAdded Core_id : %d | Offered_to : %d\n",my_cores_tail->core_id,my_cores_tail->offered_to);
  837. my_cores_count++;
  838. }else printf("--%d-- error allocating memory for my_cores\n",node_id);
  839. }
  840. }else printf("--%d-- error allocating memory for my_cores\n",node_id);
  841. }
  842. /**** I have to send SIG_ADD_TO_DDS to the controllers of my workers ****/
  843. for (i = 0; i < X_max*Y_max; i++){
  844. if (idag_mask[i] == i){
  845. counter = 0;
  846. cluster_idag = idag_mask[i];
  847. FOR_MY_CORES_LIST{
  848. if (cluster_idag == idag_mask[tmp_core_list->core_id]){
  849. fprintf(log_file,"\t\tI am manager %d and my core %d utilizes in cluster with idag %d\n", node_id, tmp_core_list->core_id,cluster_idag);
  850. tmp_inter_list.data.workers_info[++counter] = tmp_core_list->core_id;
  851. }
  852. }
  853. if (counter > 0){
  854. tmp_inter_list.next = NULL;
  855. tmp_inter_list.type = ADD_TO_DDS;
  856. tmp_inter_list.data.workers_info[0] = counter;
  857. fprintf(log_file,"\t\tNUMBER OF WORKERS: %d\n", counter);
  858. scc_kill(i,SIG_ADD_TO_DDS,&tmp_inter_list);
  859. }
  860. }
  861. }
  862. state = AGENT_INIT_STATE;
  863. paxos_state = NEW_AGENT;
  864. printf("I was working for app: %d\n", worker_app_id);
  865. my_app.id = worker_app_id;
  866. my_app.num_of_cores = my_cores_count-1;
  867. find_app_info();
  868. #ifndef ARTIFICIAL_APPS_SIM
  869. printf("Found array size = %d\n", my_app.array_size);
  870. fprintf(log_file,"Found array size = %d\n", my_app.array_size);
  871. #endif
  872. printf("Found remaining workload = %d\n", my_app.workld);
  873. fprintf(log_file,"Found remaining workload = %d\n", my_app.workld);
  874. printf("App number of cores = %d\n", my_app.num_of_cores);
  875. fprintf(log_file,"App number of cores = %d\n", my_app.num_of_cores);
  876. FOR_MY_CORES_LIST{
  877. fprintf(log_file,"\t\t\tWorker_id : %d | Workload : %d %d\n", tmp_core_list->core_id, tmp_core_list->workload[0], tmp_core_list->workload[1]);
  878. printf("\t\t\tWorker_id : %d | Workload : %d %d\n", tmp_core_list->core_id, tmp_core_list->workload[0], tmp_core_list->workload[1]);
  879. }
  880. /***** I am controller i have to remove the failed_core from my DDS and cores list *****/
  881. }else if (my_idag == -1){
  882. if (idag_mask[failed_core] == node_id){
  883. printf("--%d-- I received SIG_LEARN from %d\n",node_id,sender_id);
  884. tmp_core_list = my_cores->next;
  885. tmp_core_list_prev = my_cores;
  886. while (tmp_core_list != NULL){
  887. if (tmp_core_list->core_id == failed_core){
  888. my_cores_count--;
  889. tmp_core_list_prev->next = tmp_core_list->next;
  890. free(tmp_core_list);
  891. break;
  892. }
  893. tmp_core_list = tmp_core_list->next;
  894. tmp_core_list_prev = tmp_core_list_prev->next;
  895. }
  896. printf("\t\tUpdated my_cores list:\n");
  897. fprintf(log_file,"\t\tUpdated my_cores list:\n");
  898. counter = 0;
  899. FOR_MY_CORES_LIST{
  900. if (tmp_core_list->offered_to == failed_core){
  901. tmp_core_list->offered_to = -1;
  902. counter++;
  903. }
  904. printf("\t\t\tCore_id : %d | Offered_to : %d\n",tmp_core_list->core_id,tmp_core_list->offered_to);
  905. fprintf(log_file,"\t\t\tCore_id : %d | Offered_to : %d\n",tmp_core_list->core_id,tmp_core_list->offered_to);
  906. }
  907. printf("\t\t\tmy_cores_count = %d\n",my_cores_count);
  908. tmp_dds = DDS->next;
  909. tmp_prev_dds = DDS;
  910. while (tmp_dds != NULL){
  911. if (tmp_dds->agent_id == failed_core){
  912. fprintf(log_file,"\t\t Removed failed core %d from DDS\n",tmp_dds->agent_id);
  913. tmp_prev_dds->next = tmp_dds->next;
  914. if (tmp_dds->next == NULL){
  915. DDS_tail = tmp_prev_dds;
  916. }
  917. DDS_count--;
  918. free(tmp_dds);
  919. DDS->num_of_cores = DDS->num_of_cores + counter;
  920. break;
  921. }else{
  922. tmp_prev_dds = tmp_dds;
  923. tmp_dds = tmp_dds->next;
  924. }
  925. }
  926. printf("\t\tUpdated DDS list:\n");
  927. fprintf(log_file,"\t\tUpdated DDS list:\n");
  928. FOR_MY_DDS_LIST{
  929. printf("\t\t\tAgent_id : %d | Cores in cluster : %d\n",tmp_dds->agent_id,tmp_dds->num_of_cores);
  930. fprintf(log_file,"\t\t\tAgent_id : %d | Cores in cluster : %d\n",tmp_dds->agent_id,tmp_dds->num_of_cores);
  931. }
  932. printf("\t\t\tDDS_count = %d\n",DDS_count);
  933. }
  934. /***** I was working for the failed manager *****/
  935. }else if (cur_agent.my_agent == failed_core){
  936. cur_agent.my_agent = -1;
  937. }
  938. }
  939. cur_time = time(NULL);
  940. cur_t = localtime(&cur_time);
  941. fprintf(log_file, "[%d:%d:%d]: I ended sig_LEARN_handler with sender = %d state = %s\n\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
  942. //exit(0);
  943. return;
  944. }
  945. void sig_REINIT_APP_handler(int sender_id){
  946. inter_list *tmp_inter_list;
  947. cur_time = time(NULL);
  948. cur_t = localtime(&cur_time);
  949. fprintf(log_file, "\n\n[%d:%d:%d]: I entered sig_REINIT_APP_handler with sender = %d state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
  950. printf("Received SIG_REINIT_APP from %d\n", sender_id);
  951. //int i, data_array_local[LINE_SIZE];
  952. for (tmp_inter_list = core_inter_head[0]; tmp_inter_list != NULL; tmp_inter_list = tmp_inter_list->next)
  953. if (tmp_inter_list->type == INIT_APP) {
  954. printf("i not null\n");
  955. break;
  956. }
  957. if (core_inter_head[12] == NULL){
  958. core_inter_head[12] = (inter_list *)malloc(sizeof(inter_list));
  959. core_inter_tail[12] = core_inter_head[12];
  960. }else{
  961. core_inter_tail[12]->next = (inter_list *)malloc(sizeof(inter_list));
  962. core_inter_tail[12] = core_inter_tail[12]->next;
  963. }
  964. core_inter_tail[12]->next = NULL;
  965. core_inter_tail[12]->type = INIT_APP;
  966. core_inter_tail[12]->data.new_app.id = sig_read_ar[2];
  967. core_inter_tail[12]->data.new_app.num_of_cores = sig_read_ar[3];
  968. core_inter_tail[12]->data.new_app.workld = sig_read_ar[4];
  969. #ifndef ARTIFICIAL_APPS_SIM
  970. core_inter_tail[12]->data.new_app.array_size = sig_read_ar[5];
  971. #endif
  972. if (core_inter_head[12]->next == NULL){
  973. scc_kill(12, SIG_INIT_APP, core_inter_head[12]);
  974. }
  975. cur_time = time(NULL);
  976. cur_t = localtime(&cur_time);
  977. fprintf(log_file, "[%d:%d:%d]: I ended sig_REINIT_APP_handler with sender = %d state=%s\n\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
  978. return;
  979. }
  980. void find_app_info(){
  981. //int sz;
  982. char app_log_file_name[64];
  983. char buffer[64];
  984. int temp;
  985. strcpy(app_log_file_name, "../");
  986. strcat(app_log_file_name,local_scen_directory);
  987. strcat(app_log_file_name, "/");
  988. strcat(app_log_file_name,local_scen_num);
  989. strcat(app_log_file_name,"/app_logs/");
  990. strcat(app_log_file_name, itoa(worker_app_id));
  991. strcat(app_log_file_name, ".txt");
  992. printf("Trying to open file %s... ", app_log_file_name);
  993. if ((app_log_file = fopen(app_log_file_name, "r")) == NULL){
  994. printf("%sError%s\n",KRED,KNRM);
  995. printf("paxos_signal_handlers.c : Cannot open input file with file path = %s ",app_log_file_name);
  996. perror("open app_log_file");
  997. }else{
  998. printf("%sSuccess%s\n",KGRN,KNRM);
  999. }
  1000. while (fscanf(app_log_file,"%s", buffer) != EOF){
  1001. #ifndef ARTIFICIAL_APPS_SIM
  1002. if (strcmp(buffer,"array_size") == 0){
  1003. fscanf(app_log_file,"%s",buffer);
  1004. fscanf(app_log_file,"%d", &temp);
  1005. my_app.array_size = temp;
  1006. }
  1007. #else
  1008. /* FIXME must locate var and A */
  1009. #endif
  1010. if (strcmp(buffer,"workload") == 0){
  1011. fscanf(app_log_file,"%s",buffer);
  1012. fscanf(app_log_file,"%d",&temp);
  1013. my_app.workld = temp;
  1014. }
  1015. }
  1016. fclose(app_log_file);
  1017. }
  1018. void rollback(){
  1019. offer_list *tmp_offer_list;
  1020. inter_list tmp_inter_list;
  1021. core_list *tmp_core_list;
  1022. int core_idag;
  1023. cur_time = time(NULL);
  1024. cur_t = localtime(&cur_time);
  1025. tmp_inter_list.next = NULL;
  1026. fprintf(log_file, "Rolling back... %s\n", id2string(state));
  1027. //If i am the new controller and i have an app to initialize i send SIG_REINIT_APP to 0.
  1028. if ((paxos_state == NEW_IDAG) && (state == INIT_MANAGER || state == INIT_MANAGER_SEND_OFFERS || state == IDLE_INIT_MAN || state == INIT_MAN_CHK_OFFERS || pending_state == INIT_MANAGER
  1029. || pending_state == INIT_MANAGER_SEND_OFFERS || pending_state == INIT_MAN_CHK_OFFERS || pending_state == AGENT_INIT_CHK_OFFERS || pending_state == IDLE_INIT_MAN
  1030. || pending_state == IDLE_INIT_IDLE_AGENT || pending_state == IDLE_INIT_AGENT_SELFOPT || pending_state == INIT_CHK_OFFERS_IDLE_AGENT || pending_state == INIT_CHK_OFFERS_SELFOPT)){
  1031. printf("i am the new controller and i have an app to initialize i send SIG_REINIT_APP to 0\n");
  1032. if (init_man_offers != NULL){
  1033. tmp_offer_list = init_man_offers;
  1034. while (tmp_offer_list != NULL){
  1035. *tmp_offer_list->answer = 0;
  1036. tmp_offer_list = tmp_offer_list->next;
  1037. }
  1038. }
  1039. while (init_man_offers != NULL){
  1040. if (core_inter_head[init_man_offers->sender] != NULL) {
  1041. if (core_inter_head[init_man_offers->sender]->type == REP_AGENT_OFFER_PENDING){
  1042. core_inter_head[init_man_offers->sender]->type = REP_AGENT_OFFER_SENT;
  1043. //kill(pid_num[init_man_offers->sender], SIG_REP_OFFERS);
  1044. paxos_node_stats.msg_count++;
  1045. scc_kill(init_man_offers->sender, SIG_REP_OFFERS, core_inter_head[init_man_offers->sender]);
  1046. //my_stats.msg_count++;
  1047. //my_stats.distance += distance(node_id,init_man_offers->sender);
  1048. } else {
  1049. printf("gamietai b = %d",init_man_offers->sender);
  1050. fprintf(log_file,"gamietai b = %d",init_man_offers->sender);
  1051. }
  1052. tmp_offer_list = init_man_offers;
  1053. init_man_offers = init_man_offers->next;
  1054. free(tmp_offer_list);
  1055. }
  1056. }
  1057. fprintf(log_file, "Replied to all my offers negatively\n");
  1058. tmp_inter_list.type = REINIT_APP;
  1059. tmp_inter_list.data.reappointed_app = init_app;
  1060. scc_kill(0,SIG_REINIT_APP,&tmp_inter_list);
  1061. }
  1062. /* If i were a manager i have to inform that i am no longer manager and also add other managers to my DDS */
  1063. else if (im_manager() == 1){
  1064. printf("New controller was a manager before paxos! Remove him from dds lists and create his dds list...\n");
  1065. //TODO remove from dds and add managers to dds
  1066. tmp_core_list = my_cores;
  1067. while (tmp_core_list != NULL){
  1068. //idag_mask[tmp_core_list->core_id] -> idag id
  1069. core_idag = idag_mask[tmp_core_list->core_id];
  1070. tmp_inter_list.next = NULL;
  1071. tmp_inter_list.type = REMOVE_FROM_DDS;
  1072. scc_kill(core_idag, SIG_REMOVE_FROM_DDS, &tmp_inter_list);
  1073. tmp_core_list = tmp_core_list->next;
  1074. }
  1075. }else
  1076. //printf("New controller was an idle core before paxos! Just create his dds list...\n");
  1077. //TODO add managers to dds
  1078. //my_cores = NULL;
  1079. return;
  1080. }
  1081. void sig_ADD_TO_DDS_handler(int sender_id, int *inc_cnt, int cur_index_top){
  1082. DDS_list *tmp_dds = NULL;
  1083. core_list *tmp_core_list;
  1084. int num_of_workers = 0, flag = 0, current = 0;
  1085. handler_Enter(sender_id,"sig_ADD_TO_DDS_handler");
  1086. num_of_workers = sig_read_ar[2];
  1087. fprintf(log_file,"--%d-- [%d:%d:%d]:I received SIG_ADD_TO_DDS from %d with num_of_workers = %d\n",node_id,cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,num_of_workers);
  1088. if (num_of_workers > 5){
  1089. *inc_cnt = *inc_cnt + 1;
  1090. #ifdef PLAT_SCC
  1091. error = RCCE_get((t_vcharp)(&sig_read_ar[LINE_SIZE]), (t_vcharp)(&sig_array[(cur_index_top+1)*LINE_SIZE]), LINE_SIZE * sizeof(int), node_id);
  1092. if (error != RCCE_SUCCESS) {
  1093. RCCE_error_string(error, error_str, &str_len);
  1094. fprintf(log_file,"I got an error in get data in sig_ADD_CORES_DDS_handler from %d with descr %s\n",sender_id,error_str);
  1095. fflush(log_file);
  1096. }
  1097. #else
  1098. new_RCCE_get(sig_read_ar, sig_array, cur_index_top, LINE_SIZE, node_id);
  1099. #endif
  1100. }
  1101. if (DDS == NULL){
  1102. DDS_count=0;
  1103. DDS = (DDS_list *) malloc(sizeof(DDS_list));
  1104. DDS->agent_id = node_id;
  1105. DDS->next = NULL;
  1106. DDS_tail = DDS;
  1107. DDS_count++;
  1108. flag = 0;
  1109. }else{
  1110. FOR_MY_DDS_LIST{
  1111. if (tmp_dds->agent_id == sender_id){
  1112. fprintf(log_file,"\t\t%d is already in my DDS. %d->num_of_cores++ && DDS->num_of_cores--\n",sender_id,sender_id);
  1113. DDS->num_of_cores--;
  1114. tmp_dds->num_of_cores++;
  1115. break;
  1116. flag = 1;
  1117. }
  1118. }
  1119. }
  1120. if (flag == 0){ /* Sender was not in my DDS */
  1121. fprintf(log_file,"\t\t%d is not in my DDS. DDS_count++\n",sender_id);
  1122. DDS_tail->next = (DDS_list *)malloc(sizeof(DDS_list));
  1123. DDS_tail = DDS_tail->next;
  1124. DDS_tail->next = NULL;
  1125. DDS_tail->agent_id = sender_id;
  1126. DDS_tail->num_of_cores = num_of_workers;
  1127. DDS->num_of_cores = DDS->num_of_cores - num_of_workers;
  1128. DDS_count++;
  1129. }
  1130. fprintf(log_file, "\t\t%d utilizes %d cores in my cluster\n",sender_id, num_of_workers);
  1131. while (num_of_workers > 0){
  1132. current = sig_read_ar[2+num_of_workers];
  1133. FOR_MY_CORES_LIST{
  1134. if (tmp_core_list->core_id == current){
  1135. fprintf(log_file,"\t\tChanged %d->offered_to = %d\n",tmp_core_list->core_id,sender_id);
  1136. tmp_core_list->offered_to = sender_id;
  1137. }
  1138. }
  1139. num_of_workers--;
  1140. }
  1141. printf("\t\tUpdated my_cores list:\n");
  1142. fprintf(log_file,"\t\tUpdated my_cores list:\n");
  1143. FOR_MY_CORES_LIST{
  1144. printf("\t\t\tCore_id : %d | Offered_to : %d\n",tmp_core_list->core_id,tmp_core_list->offered_to);
  1145. fprintf(log_file,"\t\t\tCore_id : %d | Offered_to : %d\n",tmp_core_list->core_id,tmp_core_list->offered_to);
  1146. }
  1147. printf("\t\tUpdated DDS list:\n");
  1148. fprintf(log_file,"\t\tUpdated DDS list:\n");
  1149. FOR_MY_DDS_LIST{
  1150. printf("\t\t\tAgent_id : %d | Cores in cluster : %d\n",tmp_dds->agent_id,tmp_dds->num_of_cores);
  1151. fprintf(log_file,"\t\t\tAgent_id : %d | Cores in cluster : %d\n",tmp_dds->agent_id,tmp_dds->num_of_cores);
  1152. }
  1153. cur_time = time(NULL);
  1154. cur_t = localtime(&cur_time);
  1155. fprintf(log_file, "\n[%d:%d:%d]: I ended sig_ADD_TO_DDS_handler with sender = %d state = %s\n\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
  1156. return;
  1157. }
  1158. void sig_REMOVE_FROM_DDS_handler(int sender_id){
  1159. core_list *tmp_core_list;
  1160. DDS_list *tmp_dds, *tmp_prev_dds;
  1161. cur_time = time(NULL);
  1162. cur_t = localtime(&cur_time);
  1163. fprintf(log_file, "\n\n[%d:%d:%d]: I entered sig_REMOVE_FROM_DDS_handler with sender = %d state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
  1164. tmp_core_list = my_cores;
  1165. while (tmp_core_list != NULL){
  1166. if (tmp_core_list->offered_to == sender_id){
  1167. fprintf(log_file,"\t\tChanged %d->offered_to to -1\n",tmp_core_list->core_id);
  1168. tmp_core_list->offered_to = -1;
  1169. }
  1170. tmp_core_list = tmp_core_list->next;
  1171. }
  1172. tmp_dds = DDS->next;
  1173. tmp_prev_dds = DDS;
  1174. while (tmp_dds != NULL){
  1175. if (tmp_dds->agent_id == sender_id){
  1176. fprintf(log_file,"\t\t Removed %d from DDS\n",tmp_dds->agent_id);
  1177. tmp_prev_dds->next = tmp_dds->next;
  1178. if (tmp_dds->next == NULL){
  1179. DDS_tail = tmp_prev_dds;
  1180. }
  1181. DDS_count--;
  1182. free(tmp_dds);
  1183. break;
  1184. }else{
  1185. tmp_prev_dds = tmp_dds;
  1186. tmp_dds = tmp_dds->next;
  1187. }
  1188. }
  1189. cur_time = time(NULL);
  1190. cur_t = localtime(&cur_time);
  1191. fprintf(log_file, "[%d:%d:%d]: I ended sig_REMOVE_FROM_DDS_handler with sender = %d state = %s\n\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
  1192. return;
  1193. }
  1194. void sig_CONTR_TO_handler(int sender_id){
  1195. cur_time = time(NULL);
  1196. cur_t = localtime(&cur_time);
  1197. fprintf(log_file, "\n\n[%d:%d:%d]: I entered sig_CONTR_TO_handler with sender=%d state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
  1198. faulty_core = my_idag;
  1199. if (paxos_state != PAXOS_ACTIVE && paxos_state != NEW_AGENT && paxos_state != NEW_IDAG)
  1200. sig_PAXOS_INIT_handler();
  1201. cur_time = time(NULL);
  1202. cur_t = localtime(&cur_time);
  1203. fprintf(log_file, "[%d:%d:%d]: I ended sig_CONTR_TO_handler with sender = %d state = %s\n\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
  1204. return;
  1205. }
  1206. void sig_HEARTBEAT_REQ_handler(int sender_id){
  1207. inter_list tmp_inter_list;
  1208. cur_time = time(NULL);
  1209. cur_t = localtime(&cur_time);
  1210. //fprintf(log_file, "[%d:%d:%d]: I entered sig_HEARTBEAT_REQ_handler with sender=%d state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
  1211. tmp_inter_list.next = NULL;
  1212. tmp_inter_list.type = HEARTBEAT_REP;
  1213. scc_kill(sender_id,SIG_HEARTBEAT_REP,&tmp_inter_list);
  1214. cur_time = time(NULL);
  1215. cur_t = localtime(&cur_time);
  1216. return;
  1217. }
  1218. void sig_HEARTBEAT_REP_handler(int sender_id){
  1219. cur_time = time(NULL);
  1220. cur_t = localtime(&cur_time);
  1221. //fprintf(log_file, "[%d:%d:%d]: I entered sig_HEARTBEAT_REP_handler with sender = %d state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
  1222. alive[sender_id] = 1;
  1223. suspected[sender_id] = 0;
  1224. cur_time = time(NULL);
  1225. cur_t = localtime(&cur_time);
  1226. //fprintf(log_file, "[%d:%d:%d]: I ended sig_TERMINATE_handler with sender = %d state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
  1227. return;
  1228. }
  1229. void sig_PFD_TIMER_handler(int signo, siginfo_t *info, void *context){
  1230. int i, j, failed_core, pending_workload[2], one_core;
  1231. DDS_list *tmp_dds, *tmp_dds_prev;
  1232. inter_list tmp_inter_list;
  1233. core_list *tmp_core_list, *tmp_core_list_prev;
  1234. if (first_time == 0){
  1235. printf("%d oh yeah\n",node_id);
  1236. first_time = 1;
  1237. its.it_interval.tv_sec = 0;
  1238. its.it_interval.tv_nsec = 0;
  1239. its.it_value.tv_sec = 2;
  1240. its.it_value.tv_nsec = 0;//100000000;
  1241. if (timer_settime(pfd_timer, 0, &its, NULL) == -1){
  1242. printf("-- %d --", node_id);
  1243. fflush(stdout);
  1244. perror("paxos_signal_handlers.c : timer_settime error9");
  1245. }else {
  1246. fprintf(log_file,"Updated timer!\n");
  1247. }
  1248. return;
  1249. }
  1250. signals_disable();
  1251. cur_time = time(NULL);
  1252. cur_t = localtime(&cur_time);
  1253. fprintf(log_file, "\n\n[%d:%d:%d]: I entered sig_PFD_TIMER_handler state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,id2string(state));
  1254. //printf("[%d:%d:%d]: -%d- I entered sig_PFD_TIMER_handler state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,node_id, id2string(state));
  1255. for (i = 0; i < X_max*Y_max; i++){
  1256. if (alive[i] == 0){
  1257. suspected[i]++;
  1258. }
  1259. #if defined(PFD) && defined(BASIC_PAXOS)
  1260. /* The Perfect Failure Detectors sends a SIG_HEARTBEAT_REQ each time the timer explodes and waits for a SIG_HEARTBEAT_REP*/
  1261. /* If he doesn't receive a reply until the timer reexplodes then the node is detected as faulty */
  1262. if (alive[i] != 1 && alive[i] != -1 && faulty_core == -1 && i != node_id && idag_mask[i] == idag_mask[node_id])
  1263. #elif defined(tPFD) && defined(BASIC_PAXOS) //tPFD
  1264. /* The tweaked Perfect Failure Detectors only suspects a core if he sends him a signal and doesn't receive a reply in some period of time*/
  1265. if (alive[i] == 0 && suspected[i] == 2 && i != node_id && idag_mask[i] == idag_mask[node_id])
  1266. #else
  1267. if (alive[i] == -5)
  1268. #endif
  1269. {
  1270. cur_time = time(NULL);
  1271. cur_t = localtime(&cur_time);
  1272. printf("-- %d -- I detected %d as faulty at [%d:%d:%d]!!\n", node_id, i,cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec);
  1273. fprintf(log_file, "-- %d -- I detected %d as faulty at [%d:%d:%d]!!\n", node_id, i,cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec);
  1274. fprintf(log_file, "-- %d -- I detected %d as faulty!!\n", node_id, i);
  1275. failed_core = i;
  1276. faulty_core = i;
  1277. suspected[failed_core] = -1;
  1278. alive[failed_core] = -1;
  1279. #if defined(CONTROLLER) && defined(PLAT_LINUX)
  1280. int semvalue = 0;
  1281. sem_getvalue(&flag_data_written[failed_core],&semvalue);
  1282. if (semvalue == 0){
  1283. /*I am locked*/
  1284. printf("--%d-- I unlocked semaphore for node %d\n",node_id,failed_core);
  1285. sem_post(&flag_data_written[failed_core]);
  1286. sem_getvalue(&flag_data_written[failed_core],&semvalue);
  1287. }
  1288. #endif
  1289. /*My controller timed out */
  1290. if (failed_core == my_idag){
  1291. sig_PAXOS_INIT_handler();
  1292. /* Controller in an other cluster timed out */
  1293. }else if (idag_mask[failed_core] == failed_core){
  1294. printf("--%d-- Other controller TIMED OUT\n", node_id);
  1295. for (j = 0; j < X_max*Y_max; j++){
  1296. if (idag_mask[j] == failed_core && j != failed_core){
  1297. if (state == IDLE_AGENT_WAITING_OFF) state = IDLE_AGENT;
  1298. tmp_inter_list.next = NULL;
  1299. tmp_inter_list.type = CONTR_TO;
  1300. scc_kill(j,SIG_CONTR_TO,&tmp_inter_list);
  1301. }
  1302. }
  1303. /* Worker or manager timed out */
  1304. }else{
  1305. /*I am controller.
  1306. *If failed node is inside my cluster
  1307. *i have to remove the failed node from my cores and DDS list */
  1308. if ((idag_mask[node_id] == node_id) && (idag_mask[failed_core] == node_id)){
  1309. tmp_core_list = my_cores->next;
  1310. tmp_core_list_prev = my_cores;
  1311. while (tmp_core_list != NULL){
  1312. if (tmp_core_list->core_id == failed_core){
  1313. my_cores_count--;
  1314. tmp_core_list_prev->next = tmp_core_list->next;
  1315. free(tmp_core_list);
  1316. break;
  1317. }else{
  1318. tmp_core_list_prev = tmp_core_list;
  1319. tmp_core_list = tmp_core_list->next;
  1320. }
  1321. }
  1322. tmp_dds = DDS->next;
  1323. tmp_dds_prev = DDS;
  1324. while (tmp_dds != NULL){
  1325. if (tmp_dds->agent_id == failed_core){
  1326. tmp_dds_prev->next = tmp_dds->next;
  1327. free(tmp_dds);
  1328. break;
  1329. }else{
  1330. tmp_dds_prev = tmp_dds;
  1331. tmp_dds = tmp_dds->next;
  1332. }
  1333. }
  1334. }
  1335. /*I am manager.
  1336. *I have to check if the failed node is my worker
  1337. *If yes i have to appoint work to a new node.*/
  1338. if (im_manager()){
  1339. printf("-- %d -- I am manager of an application.\n",node_id);
  1340. tmp_core_list = my_cores->next;
  1341. tmp_core_list_prev = my_cores;
  1342. while (tmp_core_list != NULL){
  1343. /* I am the manager of the failed worker. I reappoint the work to another core. */
  1344. if (tmp_core_list->core_id == failed_core){
  1345. my_cores_count--;
  1346. /* I am the manager of the failed worker so i remove him from my core list */
  1347. tmp_core_list_prev->next = tmp_core_list->next;
  1348. fprintf(log_file,"I am the manager of the failed worker %d! I removed him from my cores list\n",tmp_core_list->core_id);
  1349. printf("-- %d --I am the manager of the failed worker %d! I removed him from my cores list\n",node_id, tmp_core_list->core_id);
  1350. //one_core = tmp_core_list->core_id;
  1351. pending_workload[0] = tmp_core_list->workload[0];
  1352. pending_workload[1] = tmp_core_list->workload[1];
  1353. printf("Pending workload of faulty core: %d %d\n", pending_workload[0], pending_workload[1]);
  1354. /*else{
  1355. reappoint = FALSE;
  1356. }*/
  1357. free(tmp_core_list);
  1358. tmp_core_list = my_cores->next;
  1359. fprintf(log_file,"I am reassigning the workload!\n");
  1360. fprintf(log_file,"-------- CURRENT WORKLOADS --------\n");
  1361. printf("-------- CURRENT WORKLOADS --------\n");
  1362. while (tmp_core_list != NULL){
  1363. fprintf(log_file,"%d\t|\t%d\t|\t%d\n", tmp_core_list->core_id, tmp_core_list->workload[0], tmp_core_list->workload[1]);
  1364. printf("%d\t|\t%d\t|\t%d\n", tmp_core_list->core_id, tmp_core_list->workload[0], tmp_core_list->workload[1]);
  1365. tmp_core_list = tmp_core_list->next;
  1366. }
  1367. printf("-----------------------------------\n");
  1368. fprintf(log_file,"-----------------------------------\n");
  1369. tmp_core_list = my_cores->next;
  1370. while (tmp_core_list != NULL){
  1371. if ((tmp_core_list->workload[0] == -1) && (tmp_core_list->workload[1] == -1)){
  1372. one_core = tmp_core_list->core_id;
  1373. if (core_inter_head[one_core] == NULL){
  1374. core_inter_head[one_core] = (inter_list *) malloc(sizeof(inter_list));
  1375. core_inter_tail[one_core] = core_inter_head[one_core];
  1376. } else {
  1377. core_inter_tail[one_core]->next = (inter_list *) malloc(sizeof(inter_list));
  1378. core_inter_tail[one_core] = core_inter_tail[one_core]->next;
  1379. }
  1380. core_inter_tail[one_core]->type = APPOINT_WORK_NODE;
  1381. core_inter_tail[one_core]->data.work_bounds[0] = pending_workload[0];
  1382. core_inter_tail[one_core]->data.work_bounds[1] = pending_workload[1];
  1383. fprintf(app_log_file,"%d (%d, %d), ",one_core,core_inter_tail[one_core]->data.work_bounds[0],core_inter_tail[one_core]->data.work_bounds[1]);
  1384. core_inter_tail[one_core]->next = NULL;
  1385. if (core_inter_head[one_core]->next == NULL) {
  1386. paxos_node_stats.msg_count++;
  1387. scc_kill(one_core, SIG_APPOINT_WORK, core_inter_head[one_core]);
  1388. } else {
  1389. fprintf(log_file,"I am doing smth else with my working node %d in init inter1 = %d inter2 = %d\n",one_core,core_inter_head[one_core]->type,core_inter_head[one_core]->next->type);
  1390. }
  1391. break;
  1392. }
  1393. tmp_core_list = tmp_core_list->next;
  1394. }
  1395. break;
  1396. }else {
  1397. tmp_core_list_prev = tmp_core_list;
  1398. tmp_core_list = tmp_core_list->next;
  1399. }
  1400. }
  1401. }else{
  1402. /* My manager timed out */
  1403. if (cur_agent.my_agent == failed_core){
  1404. printf("--%d-- %d is my manager! I initiate a Paxos instance\n",node_id, failed_core);
  1405. paxos_state = PAXOS_ACTIVE;
  1406. sig_PAXOS_INIT_handler();
  1407. }
  1408. }
  1409. }
  1410. }
  1411. #ifdef PFD
  1412. else{
  1413. if (alive[i] != -1 && i != node_id && idag_mask[i] == idag_mask[node_id]){
  1414. alive[i] = 0;
  1415. tmp_inter_list.type = HEARTBEAT_REQ;
  1416. tmp_inter_list.next = NULL;
  1417. scc_kill(i,SIG_HEARTBEAT_REQ,&tmp_inter_list);
  1418. }
  1419. }
  1420. #else
  1421. else{
  1422. if (alive[i] == 0 && i != node_id && idag_mask[i] == idag_mask[node_id]){
  1423. tmp_inter_list.type = HEARTBEAT_REQ;
  1424. tmp_inter_list.next = NULL;
  1425. scc_kill(i,SIG_HEARTBEAT_REQ,&tmp_inter_list);
  1426. }
  1427. }
  1428. #endif
  1429. }
  1430. fprintf(log_file,"\t\tNodes in my cluster: ");
  1431. for (i = 0; i < X_max*Y_max; i++){
  1432. if (idag_mask[i] == idag_mask[node_id] && i != node_id) fprintf(log_file,"%d, ",i);
  1433. }
  1434. fprintf(log_file,"\n");
  1435. fprintf(log_file,"\t\tI have received a signal from: ");
  1436. for (i = 0; i < X_max*Y_max; i++){
  1437. if (alive[i] == 1) fprintf(log_file,"%d, ",i);
  1438. }
  1439. fprintf(log_file,"\n");
  1440. fprintf(log_file,"\t\tSent HEARTBEAT_REQ to:");
  1441. for (i = 0; i < X_max*Y_max; i++){
  1442. if (alive[i] == 0 && i != node_id && idag_mask[i] == idag_mask[node_id]){
  1443. fprintf(log_file,"%d, ",i);
  1444. }
  1445. alive[i] = 0;
  1446. }
  1447. its.it_interval.tv_sec = 0;
  1448. its.it_interval.tv_nsec = 0;
  1449. its.it_value.tv_sec = 2;
  1450. its.it_value.tv_nsec = 0;//100000000;
  1451. if (timer_settime(pfd_timer, 0, &its, NULL) == -1){
  1452. printf("-- %d --", node_id);
  1453. fflush(stdout);
  1454. perror("paxos_signal_handlers.c : timer_settime error9");
  1455. }else {
  1456. fprintf(log_file,"Updated timer!\n");
  1457. }
  1458. cur_time = time(NULL);
  1459. cur_t = localtime(&cur_time);
  1460. fprintf(log_file, "[%d:%d:%d]: I ended sig_PFD_TIMER_handler state = %s\n\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,id2string(state));
  1461. signals_enable();
  1462. }
  1463. void sig_EPFD_TIMER_handler(int signo, siginfo_t *info, void *context)
  1464. {
  1465. core_list *tmp_core_list, *tmp_core_list_prev;
  1466. DDS_list *tmp_dds, *tmp_dds_prev;
  1467. int one_core,failed_core, pending_workload[2]/*,reappoint = TRUE*/;
  1468. signals_disable();
  1469. cur_time = time(NULL);
  1470. cur_t = localtime(&cur_time);
  1471. fprintf(log_file, "\n\n[%d:%d:%d]: I entered sig_EPFD_TIMER_handler state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,id2string(state));
  1472. int i, j;//, disjoint = 1;
  1473. inter_list tmp_inter_list;
  1474. for (i = 0; i < X_max*Y_max; i++){
  1475. if (alive[i] == suspected[i]){
  1476. fprintf(log_file,"\t\tNew Delay: %d\n",delay);
  1477. delay *= 2;
  1478. break;
  1479. //disjoint = 0;
  1480. }
  1481. }
  1482. fprintf(log_file,"\t\t------ FAILURE DETECTION ------\n");
  1483. for (i = 0; i < X_max*Y_max; i++){
  1484. if (alive[i] == 0 && suspected[i] != -1){
  1485. suspected[i]++;
  1486. fprintf(log_file,"\t\t%d -> SUSPECTED with suspected[%d] = %d\n",i,i,suspected[i]);
  1487. if ((suspected[i] > 2) && (faulty_core != i)){
  1488. cur_time = time(NULL);
  1489. cur_t = localtime(&cur_time);
  1490. failed_core = i;
  1491. faulty_core = i;
  1492. suspected[failed_core] = -1;
  1493. alive[failed_core] = -1;
  1494. #if defined(CONTROLLER) && defined(PLAT_LINUX)
  1495. int semvalue = 0;
  1496. sem_getvalue(&flag_data_written[failed_core],&semvalue);
  1497. if (semvalue == 0){
  1498. /*I am locked*/
  1499. printf("--%d-- I unlocked semaphore for node %d\n",node_id,failed_core);
  1500. sem_post(&flag_data_written[failed_core]);
  1501. sem_getvalue(&flag_data_written[failed_core],&semvalue);
  1502. }
  1503. #endif
  1504. /*My controller timed out */
  1505. if (failed_core == my_idag){
  1506. sig_PAXOS_INIT_handler();
  1507. /* Controller in an other cluster timed out */
  1508. }else if (idag_mask[failed_core] == failed_core){
  1509. printf("--%d-- Other controller TIMED OUT\n", node_id);
  1510. for (j = 0; j < X_max*Y_max; j++){
  1511. if (idag_mask[j] == failed_core && j != failed_core){
  1512. if (state == IDLE_AGENT_WAITING_OFF) state = IDLE_AGENT;
  1513. tmp_inter_list.next = NULL;
  1514. tmp_inter_list.type = CONTR_TO;
  1515. scc_kill(j,SIG_CONTR_TO,&tmp_inter_list);
  1516. }
  1517. }
  1518. /* Worker or manager timed out */
  1519. }else{
  1520. /*I am controller.
  1521. *If failed node is inside my cluster
  1522. *i have to remove the failed node from my cores and DDS list */
  1523. if ((idag_mask[node_id] == node_id) && (idag_mask[failed_core] == node_id)){
  1524. tmp_core_list = my_cores->next;
  1525. tmp_core_list_prev = my_cores;
  1526. while (tmp_core_list != NULL){
  1527. if (tmp_core_list->core_id == failed_core){
  1528. my_cores_count--;
  1529. tmp_core_list_prev->next = tmp_core_list->next;
  1530. free(tmp_core_list);
  1531. break;
  1532. }else{
  1533. tmp_core_list_prev = tmp_core_list;
  1534. tmp_core_list = tmp_core_list->next;
  1535. }
  1536. }
  1537. tmp_dds = DDS->next;
  1538. tmp_dds_prev = DDS;
  1539. while (tmp_dds != NULL){
  1540. if (tmp_dds->agent_id == failed_core){
  1541. tmp_dds_prev->next = tmp_dds->next;
  1542. free(tmp_dds);
  1543. break;
  1544. }else{
  1545. tmp_dds_prev = tmp_dds;
  1546. tmp_dds = tmp_dds->next;
  1547. }
  1548. }
  1549. }
  1550. /*I am manager.
  1551. *I have to check if the failed node is my worker
  1552. *If yes i have to appoint work to a new node.*/
  1553. if (im_manager()){
  1554. printf("-- %d -- I am manager of an application.\n",node_id);
  1555. tmp_core_list = my_cores->next;
  1556. tmp_core_list_prev = my_cores;
  1557. while (tmp_core_list != NULL){
  1558. /* I am the manager of the failed worker. I reappoint the work to another core. */
  1559. if (tmp_core_list->core_id == failed_core){
  1560. my_cores_count--;
  1561. /* I am the manager of the failed worker so i remove him from my core list */
  1562. tmp_core_list_prev->next = tmp_core_list->next;
  1563. fprintf(log_file,"I am the manager of the failed worker %d! I removed him from my cores list\n",tmp_core_list->core_id);
  1564. printf("-- %d --I am the manager of the failed worker %d! I removed him from my cores list\n",node_id, tmp_core_list->core_id);
  1565. //one_core = tmp_core_list->core_id;
  1566. pending_workload[0] = tmp_core_list->workload[0];
  1567. pending_workload[1] = tmp_core_list->workload[1];
  1568. printf("Pending workload of faulty core: %d %d\n", pending_workload[0], pending_workload[1]);
  1569. /*else{
  1570. reappoint = FALSE;
  1571. }*/
  1572. free(tmp_core_list);
  1573. tmp_core_list = my_cores->next;
  1574. fprintf(log_file,"I am reassigning the workload!\n");
  1575. fprintf(log_file,"-------- CURRENT WORKLOADS --------\n");
  1576. printf("-------- CURRENT WORKLOADS --------\n");
  1577. while (tmp_core_list != NULL){
  1578. fprintf(log_file,"%d\t|\t%d\t|\t%d\n", tmp_core_list->core_id, tmp_core_list->workload[0], tmp_core_list->workload[1]);
  1579. printf("%d\t|\t%d\t|\t%d\n", tmp_core_list->core_id, tmp_core_list->workload[0], tmp_core_list->workload[1]);
  1580. tmp_core_list = tmp_core_list->next;
  1581. }
  1582. printf("-----------------------------------\n");
  1583. fprintf(log_file,"-----------------------------------\n");
  1584. tmp_core_list = my_cores->next;
  1585. while (tmp_core_list != NULL){
  1586. if ((tmp_core_list->workload[0] == -1) && (tmp_core_list->workload[1] == -1)){
  1587. one_core = tmp_core_list->core_id;
  1588. if (core_inter_head[one_core] == NULL){
  1589. core_inter_head[one_core] = (inter_list *) malloc(sizeof(inter_list));
  1590. core_inter_tail[one_core] = core_inter_head[one_core];
  1591. } else {
  1592. core_inter_tail[one_core]->next = (inter_list *) malloc(sizeof(inter_list));
  1593. core_inter_tail[one_core] = core_inter_tail[one_core]->next;
  1594. }
  1595. core_inter_tail[one_core]->type = APPOINT_WORK_NODE;
  1596. core_inter_tail[one_core]->data.work_bounds[0] = pending_workload[0];
  1597. core_inter_tail[one_core]->data.work_bounds[1] = pending_workload[1];
  1598. fprintf(app_log_file,"%d (%d, %d), ",one_core,core_inter_tail[one_core]->data.work_bounds[0],core_inter_tail[one_core]->data.work_bounds[1]);
  1599. core_inter_tail[one_core]->next = NULL;
  1600. if (core_inter_head[one_core]->next == NULL) {
  1601. paxos_node_stats.msg_count++;
  1602. scc_kill(one_core, SIG_APPOINT_WORK, core_inter_head[one_core]);
  1603. } else {
  1604. fprintf(log_file,"I am doing smth else with my working node %d in init inter1 = %d inter2 = %d\n",one_core,core_inter_head[one_core]->type,core_inter_head[one_core]->next->type);
  1605. }
  1606. break;
  1607. }
  1608. tmp_core_list = tmp_core_list->next;
  1609. }
  1610. break;
  1611. }else {
  1612. tmp_core_list_prev = tmp_core_list;
  1613. tmp_core_list = tmp_core_list->next;
  1614. }
  1615. }
  1616. }else{
  1617. /* My manager timed out */
  1618. if (cur_agent.my_agent == failed_core){
  1619. printf("--%d-- %d is my manager! I initiate a Paxos instance\n",node_id, failed_core);
  1620. paxos_state = PAXOS_ACTIVE;
  1621. sig_PAXOS_INIT_handler();
  1622. }
  1623. }
  1624. }
  1625. }
  1626. #ifdef tEPFD
  1627. else if (suspected[i] == 2){
  1628. tmp_inter_list.next = NULL;
  1629. tmp_inter_list.type = HEARTBEAT_REQ;
  1630. scc_kill(i,SIG_HEARTBEAT_REQ,&tmp_inter_list);
  1631. }
  1632. #else
  1633. else {
  1634. if (i == 10)
  1635. printf("suspected[%d]=%d and alive[%d]=%d\n",i,suspected[i],i,alive[i]);
  1636. tmp_inter_list.next = NULL;
  1637. tmp_inter_list.type = HEARTBEAT_REQ;
  1638. scc_kill(i,SIG_HEARTBEAT_REQ,&tmp_inter_list);
  1639. }
  1640. #endif
  1641. }else if (alive[i] == 1){
  1642. suspected[i] = 0;
  1643. //fprintf(log_file,"\t\t%d -> ALIVE\n", i);
  1644. }
  1645. alive[i] = 0;
  1646. }
  1647. its.it_interval.tv_sec = 0;
  1648. its.it_interval.tv_nsec = 0;
  1649. its.it_value.tv_sec = delay;
  1650. its.it_value.tv_nsec = 0;
  1651. if (timer_settime(epfd_timer, 0, &its, NULL) == -1){
  1652. printf("-- %d --", node_id);
  1653. fflush(stdout);
  1654. perror("paxos_signal_handlers.c : timer_settime error9");
  1655. }else {
  1656. fprintf(log_file,"Updated timer!\n");
  1657. }
  1658. cur_time = time(NULL);
  1659. cur_t = localtime(&cur_time);
  1660. fprintf(log_file, "[%d:%d:%d]: I ended sig_EPFD_TIMER_handler state = %s\n\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,id2string(state));
  1661. signals_enable();
  1662. return;
  1663. }
  1664. void sig_CTIMER_handler(int signo, siginfo_t *info, void *context)
  1665. {
  1666. struct tm cur_t_1;
  1667. signals_disable();
  1668. struct timeval recover_time_val;
  1669. #ifdef CONTROLLER
  1670. DDS_list *tmp_dds;
  1671. printf("--%d-- CTIMER_handler : Controller %sTimed out!%s\n",node_id,KRED,KNRM);
  1672. printf("DDS list before time out:\n");
  1673. for (tmp_dds = DDS; tmp_dds != NULL; tmp_dds = tmp_dds->next)
  1674. printf("\t\t\tAgent_id : %d | Cores in cluster : %d\n",tmp_dds->agent_id, tmp_dds->num_of_cores);
  1675. #elif WORKER
  1676. printf("--%d-- CTIMER_handler : Worker %sTimed out!%s\n",node_id,KRED,KNRM);
  1677. printf("Worker state before timeout: %s\n",id2string(state));
  1678. #elif MANAGER
  1679. core_list *tmp_core_list;
  1680. printf("--%d-- CTIMER_handler : Manager %sTimed out!%s\n",node_id,KRED,KNRM);
  1681. printf("Manager state before timeout: %s\n",id2string(state));
  1682. for (tmp_core_list = my_cores; tmp_core_list != NULL; tmp_core_list = tmp_core_list->next)
  1683. printf("\t\t\tWorker_id : %d | Workload : %d %d\n", tmp_core_list->core_id, tmp_core_list->workload[0], tmp_core_list->workload[1]);
  1684. #endif
  1685. cur_time = time(NULL);
  1686. cur_t = localtime(&cur_time);
  1687. printf("\n\nI timed out at [%d:%d:%d]\n\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec);
  1688. /* FIXED IDs */
  1689. int k;
  1690. inter_list tmp_inter_list;
  1691. for (k = 0; k < X_max*Y_max; k++){
  1692. if (k != node_id && k != faulty_core){
  1693. tmp_inter_list.type = LEARN;
  1694. tmp_inter_list.data.learn_ack_info[VALUE_W] = node_id+1;
  1695. tmp_inter_list.data.learn_ack_info[PREV_CW] = node_id;
  1696. scc_kill(k,SIG_LEARN,&tmp_inter_list);
  1697. }
  1698. }
  1699. exit(0);
  1700. paxos_state = FAILED_CORE;
  1701. gettimeofday(&time_val,NULL);
  1702. cur_t = localtime(&time_val.tv_sec);
  1703. fail_time_val = time_val;
  1704. cur_t_1 = *cur_t;
  1705. while (fail_flag == 0){
  1706. scc_pause();
  1707. scc_signals_check();
  1708. }
  1709. gettimeofday(&time_val, NULL);
  1710. cur_t = localtime(&time_val.tv_sec);
  1711. printf("\n\n\n\n\n\n\n[%d:%d:%d:%ld]: gettimeofday_1\n",cur_t_1.tm_hour,cur_t_1.tm_min,cur_t_1.tm_sec,fail_time_val.tv_usec);
  1712. printf("[%d:%d:%d:%ld]: gettimeofday_2\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,time_val.tv_usec);
  1713. printf("qcbacd = %d Difference in us is: %ld\n",sizeof(suseconds_t),time_val.tv_usec - fail_time_val.tv_usec);
  1714. long int dif = ((cur_t->tm_sec * 1000000) + time_val.tv_usec) - ((cur_t_1.tm_sec * 1000000) + fail_time_val.tv_usec);
  1715. printf("awabacd = %d Difference in us is: %ld\n\n\n\n\n\n\n",sizeof(suseconds_t),dif);
  1716. fflush(stdout);
  1717. exit(0);
  1718. signals_enable();
  1719. }
  1720. void sig_ITIMER_handler(int signo, siginfo_t *info, void *context)
  1721. {
  1722. int i;
  1723. inter_list tmp_inter_list;
  1724. signals_disable();
  1725. printf("--%d-- i have to decide what to do here!!\n", node_id);
  1726. fflush(stdout);
  1727. for (i = 0; i < X_max*Y_max; i++){
  1728. if (idag_mask[i] == 10 && i != 10){
  1729. if (state == IDLE_AGENT_WAITING_OFF) state = IDLE_AGENT;
  1730. tmp_inter_list.next = NULL;
  1731. tmp_inter_list.type = CONTR_TO;
  1732. scc_kill(i,SIG_CONTR_TO,&tmp_inter_list);
  1733. }
  1734. }
  1735. signals_enable();
  1736. }
  1737. /* END */
  1738. void sig_FAIL_handler(){
  1739. #ifdef WORKER
  1740. sev.sigev_notify = SIGEV_SIGNAL;
  1741. sev.sigev_signo = SIG_CTIMER;
  1742. sev.sigev_value.sival_ptr = &controller_timer;
  1743. if (timer_create(CLOCK_REALTIME, &sev, &controller_timer) == -1)
  1744. printf("timer_create error\n");
  1745. else
  1746. printf("Worker Timer created succesfully!\n");
  1747. its.it_interval.tv_sec = 0;
  1748. its.it_interval.tv_nsec = 0;
  1749. its.it_value.tv_sec = 2;
  1750. its.it_value.tv_nsec = 0;
  1751. if (timer_settime(controller_timer, 0, &its, NULL) == -1)
  1752. perror("controller_core.c : timer_settime error9");
  1753. else
  1754. printf("%d : My timer will explode in %d seconds.\n", node_id, 10);
  1755. return;
  1756. #endif
  1757. #ifdef MANAGER
  1758. sev.sigev_notify = SIGEV_SIGNAL;
  1759. sev.sigev_signo = SIG_CTIMER;
  1760. sev.sigev_value.sival_ptr = &controller_timer;
  1761. if (timer_create(CLOCK_REALTIME, &sev, &controller_timer) == -1)
  1762. printf("timer_create error\n");
  1763. else
  1764. printf("Manager Timer created succesfully!\n");
  1765. its.it_interval.tv_sec = 0;
  1766. its.it_interval.tv_nsec = 0;
  1767. its.it_value.tv_sec = 10;
  1768. its.it_value.tv_nsec = 0;
  1769. if (timer_settime(controller_timer, 0, &its, NULL) == -1)
  1770. perror("controller_core.c : timer_settime error9");
  1771. else
  1772. printf("%d : My timer will explode in %d seconds.\n", node_id, 10);
  1773. return;
  1774. #endif
  1775. }
  1776. void sig_PAXOS_STATS_REQ_handler(int sender_id){
  1777. inter_list tmp_inter_list;
  1778. cur_time = time(NULL);
  1779. cur_t = localtime(&cur_time);
  1780. fprintf(log_file, "\n\n[%d:%d:%d]: I entered sig_PAXOS_STATS_REQ_handler with sender = %d state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
  1781. tmp_inter_list.next = NULL;
  1782. tmp_inter_list.type = PAXOS_STATS_REP;
  1783. tmp_inter_list.data.paxos_stats[0] = paxos_node_stats.msg_count;
  1784. tmp_inter_list.data.paxos_stats[1] = paxos_node_stats.fd_msg_count;
  1785. fprintf(log_file,"\t\tI send %d my paxos stats %lld , %d\n",sender_id,paxos_node_stats.msg_count,paxos_node_stats.fd_msg_count);
  1786. scc_kill(sender_id,SIG_PAXOS_STATS_REP,&tmp_inter_list);
  1787. cur_time = time(NULL);
  1788. cur_t = localtime(&cur_time);
  1789. fprintf(log_file, "\n\n[%d:%d:%d]: I ended sig_PAXOS_STATS_REQ_handler with sender = %d state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
  1790. }
  1791. void sig_PAXOS_STATS_REP_handler(int sender_id){
  1792. long long int paxos_replied_stats = sig_read_ar[2];
  1793. long long int fd_replied_stats = sig_read_ar[3];
  1794. cur_time = time(NULL);
  1795. cur_t = localtime(&cur_time);
  1796. fprintf(log_file, "\n\n[%d:%d:%d]: I entered sig_PAXOS_STATS_REP_handler with sender = %d state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
  1797. fprintf(log_file,"\t\t%d has replied with msg_count = %lld and fd_msg_count = %lld\n",sender_id,paxos_replied_stats,fd_replied_stats);
  1798. paxos_total_stats.msg_count += paxos_replied_stats;
  1799. paxos_total_stats.fd_msg_count += fd_replied_stats;
  1800. paxos_stats_replied++;
  1801. fprintf(log_file,"\t\tI have updated my stats. New message count = %lld\n",paxos_total_stats.msg_count);
  1802. fprintf(log_file,"\t\tCores replied: %d | My cores count: %d\n",paxos_stats_replied,my_cores_count);
  1803. cur_time = time(NULL);
  1804. cur_t = localtime(&cur_time);
  1805. fprintf(log_file, "\n\n[%d:%d:%d]: I ended sig_PAXOS_STATS_REP_handler with sender = %d state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
  1806. }