| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114 |
- #include "include/paxos_signal_handlers.h"
- #include "include/my_rtrm.h"
- #include "include/libfunctions.h"
- #include "include/sig_aux.h"
- #include "include/scc_signals.h"
- #include "include/controller_core.h"
- #include "include/common_core.h"
- #include "include/idag_defs.h"
- #include "include/signal_handlers.h"
- #include "include/variables.h"
- #include "include/macros.h"
- #include "include/structs.h"
- int faulty_core = -1;
- int first_time = 0;
- int pending_workload[2] = {-1,-1};
- int proposal_number_personal;
- core_states paxos_state;
- acceptor_var acceptor_vars = {-1,-1,-1};
- proposer_var proposer_vars = {-1,-1,0,0,NULL};
- int fail_flag = 0;
- struct timeval fail_time_val;
- static char local_scen_directory[SCEN_DIR_SIZE], local_scen_num[SCEN_NUM_SIZE];
- #ifdef PLAT_SCC
- char error_str[64];
- int error, str_len, sig_array_local[LINE_SIZE];
- #endif
- int leader_preference(){
-
- coworkers_list *tmp_cowork_list;
- int num_of_coworkers = 0;
-
- if (state == IDLE_CORE){
- return 60;
- }
- else if (state == INIT_MANAGER ||
- state == INIT_MANAGER_SEND_OFFERS ||
- state == IDLE_INIT_MAN ||
- state == INIT_MAN_CHK_OFFERS){
-
- return 50;
- }
- else if (state == WORKING_NODE && pending_state == NO_PENDING_STATE){
-
- if (coworkers != NULL)
- FOR_MY_COWORKERS_LIST num_of_coworkers++;
-
- if (num_of_coworkers == 1)
- return 1;
- else
- return (40 + num_of_coworkers);
- }
- else if (state == WORKING_NODE && (
- pending_state == INIT_MANAGER ||
- pending_state == INIT_MANAGER_SEND_OFFERS ||
- pending_state == IDLE_INIT_MAN ||
- pending_state == INIT_MAN_CHK_OFFERS)){
-
- if (coworkers != NULL)
- FOR_MY_COWORKERS_LIST num_of_coworkers++;
-
- if (num_of_coworkers == 1)
- return 1;
- else{
- return (30+num_of_coworkers);
- }
- }
- else if (state == WORKING_NODE && (
- pending_state == IDLE_AGENT ||
- pending_state == IDLE_AGENT_WAITING_OFF ||
- pending_state == AGENT_INIT_STATE ||
- pending_state == AGENT_SELF_OPT ||
- pending_state == AGENT_SELF_CHK_OFFERS ||
- pending_state == AGENT_ENDING ||
- pending_state == IDAG_ENDING ||
- pending_state == AGENT_ZOMBIE ||
- pending_state == AGENT_INIT_APP_INIT ||
- pending_state == AGENT_INIT_CHK_OFFERS ||
- pending_state == AGENT_INIT_IDLE_INIT ||
- pending_state == IDLE_INIT_IDLE_AGENT ||
- pending_state == IDLE_INIT_AGENT_SELFOPT ||
- pending_state == INIT_CHK_OFFERS_IDLE_AGENT ||
- pending_state == INIT_CHK_OFFERS_SELFOPT)){
-
- if (coworkers != NULL)
- FOR_MY_COWORKERS_LIST num_of_coworkers++;
-
- if (num_of_coworkers == 1)
- return 1;
- else{
- return (20+num_of_coworkers);
- }
- }
- else if (state == IDLE_AGENT ||
- state == IDLE_AGENT_WAITING_OFF ||
- state == AGENT_INIT_STATE ||
- state == AGENT_SELF_OPT ||
- state == AGENT_SELF_CHK_OFFERS ||
- state == AGENT_ENDING ||
- state == IDAG_ENDING ||
- state == AGENT_ZOMBIE){
-
- return 10;
- }
- return 0;
- }
- void initialize_PAXOS_data (char scen_directory[SCEN_DIR_SIZE], char scen_num[SCEN_NUM_SIZE]) {
- //printf("Initializing Paxos data...\n");
- strcpy(local_scen_directory, scen_directory);
- strcpy(local_scen_num, scen_num);
- }
- void sig_PAXOS_INIT_handler(){
-
-
- int i;
- int num_of_coworkers = 0;
-
- inter_list tmp_inter_list;
-
- coworkers_list *tmp_cowork_list;
-
- handler_Enter(-1, "sig_PAXOS_INIT_handler");
-
-
- paxos_state = PAXOS_ACTIVE;
-
- #ifdef PLAT_SCC
- RCCE_wait_until(proposal_number_lock,RCCE_FLAG_UNSET);
- RCCE_flag_write(&proposal_number_lock,RCCE_FLAG_SET,node_id);
- proposal_number_personal = *proposal_number_global;
- *proposal_number_global += 1;
- RCCE_flag_write(&proposal_number_lock,RCCE_FLAG_UNSET,node_id);
- #else
- /* Pick up my personal proposal number */
- sem_wait(proposal_number_lock);
- proposal_number_personal = *proposal_number_global;
- *proposal_number_global += 1;
- sem_post(proposal_number_lock);
- #endif
-
- fprintf(log_file, "\t\tI am %d, and my proposal number is %d\n",node_id,proposal_number_personal);
-
- tmp_inter_list.next = NULL;
-
- proposer_vars.core_states = (int *)malloc((X_max*Y_max)*sizeof(int));
- for (i = 0; i < X_max*Y_max; i++){
- proposer_vars.core_states[i] = -1;
- }
- /* Case where controller fails */
- if (idag_mask[faulty_core] == faulty_core){
- for (i = 0; i < X_max*Y_max; i++){
- if ((i != my_idag) && (idag_mask[i] == my_idag)){
- tmp_inter_list.data.proposal_number = proposal_number_personal;
- tmp_inter_list.type = PREPARE_REQUEST;
- scc_kill(i,SIG_PREPARE_REQUEST,&tmp_inter_list);
- }
- }
- }
-
- /* Case where manager fails */
- else{
- fprintf(log_file,"\t\tcoworkers List: ");
- FOR_MY_COWORKERS_LIST{
- fprintf(log_file,"%d --> ", tmp_cowork_list->core_id);
- }
- printf("\n");
- FOR_MY_COWORKERS_LIST{
- num_of_coworkers++;
- tmp_inter_list.data.proposal_number = proposal_number_personal;
- tmp_inter_list.type = PREPARE_REQUEST;
- scc_kill(tmp_cowork_list->core_id, SIG_PREPARE_REQUEST, &tmp_inter_list);
- }
- fprintf(log_file,"num_of_coworkers=%d\n",num_of_coworkers);
- printf("num_of_coworkers=%d\n",num_of_coworkers);
- }
-
- handler_Exit(-1, "sig_PAXOS_INIT_handler");
- return;
- }
- /************* If a core doesn't receive a signal for a period of time it assumes the
- ************** controller has died and sends PREPARE_REQUEST signal
- *************/
- void sig_PREPARE_REQUEST_handler(int sender_id){
-
- int received_proposal_number = sig_read_ar[2];
- inter_list tmp_inter_list;
- int score;
-
- handler_Enter(sender_id,"sig_PREPARE_REQUEST_handler");
-
- fprintf(log_file, "\t\tReceived SIG_PREPARE_REQUEST from %d with Proposal Number %d\n", sender_id, received_proposal_number);
- /* The acceptor has seen a higher proposal number */
- /* Prepare Request Rejected */
- if (acceptor_vars.highest_proposed_n > received_proposal_number){
- fprintf(log_file, "\t\tI have seen a higher proposal number request --> REJECTED!\n");
- return;
- }
-
- /* The acceptor has not seen another prepare request before so it promises never to accept a prepare request with proposal number
- lower than this one. Replies with PREPARE_ACCEPT_NO_PREVIOUS */
-
- /* UPDATE 13.10.2016 -- Also send my state in order to elect core with minimum utilization */
- else if (acceptor_vars.highest_proposed_n == -1){
- score = leader_preference();
- printf("My score is %d and my state %s\n",score, id2string(state));
- fprintf(log_file, "\t\tI haven't accepted any values yet!\n");
- acceptor_vars.highest_proposed_n = received_proposal_number;
- fprintf(log_file, "\t\tUpdated: acceptor_vars.highest_proposed_n = %d\n", acceptor_vars.highest_proposed_n);
- tmp_inter_list.next = NULL;
- tmp_inter_list.type = PREPARE_ACCEPT_NO_PREVIOUS;
- tmp_inter_list.data.accepted_values[0] = -1;
- tmp_inter_list.data.accepted_values[1] = -1;
- tmp_inter_list.data.accepted_values[2] = score;
- scc_kill(sender_id,SIG_PREPARE_ACCEPT_NO_PREVIOUS,&tmp_inter_list);
- }
- /* Proposal number received > Highest proposal number seen */
- /* Acceptor replies with highest proposal number seen and its value if any and updates the values*/
- else if (acceptor_vars.highest_proposed_n < received_proposal_number){
- fprintf(log_file, "\t\treceived_proposal_number higher than highest_proposed_n\n");
- acceptor_vars.highest_proposed_n = received_proposal_number;
- fprintf(log_file, "\t\tUpdated: acceptor_vars.highest_proposed_n = %d\n", acceptor_vars.highest_proposed_n);
- /*If the core has accepted any value it sends that back along with the proposal number of this value*/
- if (acceptor_vars.highest_acc_value == -1){
- fprintf(log_file, "\t\tI haven't accepted any values yet!\n");
- tmp_inter_list.next = NULL;
- tmp_inter_list.type = PREPARE_ACCEPT_NO_PREVIOUS;
- tmp_inter_list.data.accepted_values[0] = -1;
- tmp_inter_list.data.accepted_values[1] = -1;
- tmp_inter_list.data.accepted_values[2] = score;
- scc_kill(sender_id,SIG_PREPARE_ACCEPT_NO_PREVIOUS,&tmp_inter_list);
-
- }else{
- fprintf(log_file, "\t\tI have already accepted the value %d!\n", acceptor_vars.highest_acc_value);
- tmp_inter_list.next = NULL;
- tmp_inter_list.type = PREPARE_ACCEPT;
- tmp_inter_list.data.accepted_values[0] = acceptor_vars.highest_acc_n;
- tmp_inter_list.data.accepted_values[1] = acceptor_vars.highest_acc_value;
- tmp_inter_list.data.accepted_values[2] = score; /* FIXME This seems to be uninitialised */
- scc_kill(sender_id, SIG_PREPARE_ACCEPT, &tmp_inter_list);
- }
- }
-
- handler_Exit(sender_id,"sig_PREPARE_REQUEST_handler");
-
- return;
- }
- void sig_PREPARE_ACCEPT_NO_PREVIOUS_handler(int sender_id){
- int k,i;
- int num_of_coworkers;
- int replied_score = sig_read_ar[4];
- int max_score,index;
-
- inter_list tmp_inter_list;
-
- coworkers_list *tmp_cowork_list;
-
- handler_Enter(sender_id,"sig_PREPARE_ACCEPT_NO_PREVIOUS_handler");
-
- proposer_vars.core_states[sender_id] = replied_score;
- proposer_vars.cores_promised++;
- fprintf(log_file, "\t\t Updated state of %d to %s\n", sender_id, id2string(proposer_vars.core_states[sender_id]));
-
- tmp_inter_list.next = NULL;
- /* Case where controller fails */
- if (idag_mask[faulty_core] == faulty_core){
- /*Received reply from majority */
- if (proposer_vars.cores_promised >= majority(CLUSTER_SIZE)){
- if (PREPARE_ACCEPT_SENT == 0){
- for (i=0; i < X_max*Y_max;i++){
- if (proposer_vars.core_states[i] > max_score){
- max_score = proposer_vars.core_states[i];
- index = i;
- }
- }
- PREPARE_ACCEPT_SENT = 1;
-
- fprintf(log_file,"\t\t-------- LAST KNOWN CORE STATES --------\n");
- FOR_NUES{
- fprintf(log_file,"\t\t%d --> %s\n",k,id2string(proposer_vars.core_states[k]));
- }
- fprintf(log_file,"\t\t----------------------------------------\n");
- fprintf(log_file,"\n");
-
- //TODO Decide proposing core based on core states
-
- fprintf(log_file, "\t\tCONTROLLER CASE ; RECEIVED ACCEPT FROM MAJORITY!\n");
- for (k = 0; k < X_max*Y_max; k++){
- /* I send SIG_ACCEPT_REQUEST to cores inside my cluster */
- if ((k != my_idag) && (idag_mask[k] == my_idag)){
- tmp_inter_list.type = ACCEPT_REQUEST;
- tmp_inter_list.data.accepted_values[PROP_NW] = proposal_number_personal;
- /*The acceptor hasn't received any reply with accepted value so it will propose itself for leader*/
- if (proposer_vars.highest_replied_value == -1){
- /*if (im_manager() != 1){
- fprintf(log_file,"\t\tI am not a manager. I propose myself as the new controller\n");
- proposer_vars.highest_replied_value = node_id;
- tmp_inter_list.data.accepted_values[VALUE_W] = proposer_vars.highest_replied_value;
- }else{
- fprintf(log_file,"\t\tI am a manager. I propose a worker of mine as the new controller.That is %d\n",my_cores->next->core_id);
- tmp_inter_list.data.accepted_values[VALUE_W] = my_cores->next->core_id;
- }*/
-
- /* RANDOM PAXOS */
- //proposer_vars.highest_replied_value = node_id;
- //tmp_inter_list.data.accepted_values[VALUE_W] = node_id;
- /****************/
- /* MODIFIED PAXOS */
- proposer_vars.highest_replied_value = index;
- tmp_inter_list.data.accepted_values[VALUE_W] = index;
- /****************/
- }else{
- tmp_inter_list.data.accepted_values[VALUE_W] = proposer_vars.highest_replied_value;
- }
- scc_kill(k,SIG_ACCEPT_REQUEST,&tmp_inter_list);
- }
- }
- }
- }
- /* Case where manager fails */
- }else{
- num_of_coworkers = 0;
- FOR_MY_COWORKERS_LIST num_of_coworkers++;
- fprintf(log_file,"num_of_coworkers=%d\n",num_of_coworkers);
- if (proposer_vars.cores_promised > majority(num_of_coworkers)){
- if (PREPARE_ACCEPT_SENT == 0){
- PREPARE_ACCEPT_SENT = 1;
- fprintf(log_file, "\t\tMANAGER CASE ; RECEIVED ACCEPT FROM MAJORITY!\n");
- FOR_MY_COWORKERS_LIST{
- tmp_inter_list.type = ACCEPT_REQUEST;
- tmp_inter_list.data.accepted_values[PROP_NW] = proposal_number_personal;
- if (proposer_vars.highest_replied_value == -1)
- proposer_vars.highest_replied_value = node_id;
- tmp_inter_list.data.accepted_values[VALUE_W] = proposer_vars.highest_replied_value;
- scc_kill(tmp_cowork_list->core_id,SIG_ACCEPT_REQUEST,&tmp_inter_list);
- }
- }
- }
- }
-
- handler_Exit(sender_id,"sig_PREPARE_ACCEPT_NO_PREVIOUS_handler");
-
- return;
- }
- /************* The other cores would either accept its proposal if they haven't seen
- ************** a higher proposal number
- *************/
- void sig_PREPARE_ACCEPT_handler(int sender_id){
-
- int replied_proposal_number = sig_read_ar[PROP_NR];
- int replied_value = sig_read_ar[VALUE_R];
- int k;
- int num_of_coworkers;
-
- inter_list tmp_inter_list;
-
- coworkers_list *tmp_cowork_list;
-
- handler_Enter(sender_id, "sig_PREPARE_ACCEPT_handler");
-
- proposer_vars.core_states[sender_id] = sig_read_ar[4];
- proposer_vars.cores_promised++;
- fprintf(log_file, "\t\t Updated state of %d to %s\n", sender_id, id2string(proposer_vars.core_states[sender_id]));
-
- tmp_inter_list.next = NULL;
- /*Save the values if proposal number is higher than the highest replied so far*/
- if (replied_proposal_number > proposer_vars.highest_replied_n){
- proposer_vars.highest_replied_n = replied_proposal_number;
- proposer_vars.highest_replied_value = replied_value;
- fprintf(log_file, "\t\t Updated proposer_vars.highest_replied_n = %d\n", proposer_vars.highest_replied_n);
- fprintf(log_file, "\t\t Updated proposer_vars.highest_replied_value = %d\n", proposer_vars.highest_replied_value);
- }
- /* Case where controller fails */
- if (idag_mask[faulty_core] == faulty_core){
- if (proposer_vars.cores_promised >= majority(CLUSTER_SIZE)){
- if (PREPARE_ACCEPT_SENT == 0){
- PREPARE_ACCEPT_SENT = 1;
-
- fprintf(log_file,"\t\t-------- LAST KNOWN CORE STATES --------\n");
- FOR_NUES{
- fprintf(log_file,"\t\t%d --> %s\n",k,id2string(proposer_vars.core_states[k]));
- }
- fprintf(log_file,"\n");
- fprintf(log_file,"\t\t----------------------------------------\n");
-
- fprintf(log_file, "\t\tRECEIVED ACCEPT FROM MAJORITY!\n");
- for (k = 0; k < X_max*Y_max; k++){
- if ((k != my_idag) && (idag_mask[k] == my_idag)){
- tmp_inter_list.type = ACCEPT_REQUEST;
- tmp_inter_list.data.accepted_values[PROP_NW] = proposal_number_personal;
- tmp_inter_list.data.accepted_values[VALUE_W] = proposer_vars.highest_replied_value;
- scc_kill(k,SIG_ACCEPT_REQUEST,&tmp_inter_list);
- }
- }
- }
- }
- /* Case where manager fails */
- }else{
- num_of_coworkers = 0;
- FOR_MY_COWORKERS_LIST num_of_coworkers++;
- fprintf(log_file,"num_of_coworkers=%d\n",num_of_coworkers);
- if (proposer_vars.cores_promised >= majority(num_of_coworkers)){
- if (PREPARE_ACCEPT_SENT == 0){
- PREPARE_ACCEPT_SENT = 1;
- fprintf(log_file, "\t\tRECEIVED ACCEPT FROM MAJORITY!\n");
- FOR_MY_COWORKERS_LIST{
- tmp_inter_list.type = ACCEPT_REQUEST;
- tmp_inter_list.data.accepted_values[PROP_NW] = proposal_number_personal;
- tmp_inter_list.data.accepted_values[VALUE_W] = proposer_vars.highest_replied_value;
- scc_kill(tmp_cowork_list->core_id,SIG_ACCEPT_REQUEST,&tmp_inter_list);
- }
- }
- }
- }
-
- handler_Exit(sender_id, "sig_PREPARE_ACCEPT_handler");
- return;
- }
- void sig_ACCEPT_REQUEST_handler(int sender_id){
- int proposer_proposal_number = sig_read_ar[PROP_NR];
- inter_list tmp_inter_list;
-
- handler_Enter(sender_id,"sig_ACCEPT_REQUEST_handler");
-
- tmp_inter_list.next = NULL;
-
- if (proposer_proposal_number < acceptor_vars.highest_proposed_n){
- fprintf(log_file, "\t\t ACCEPT_REQUEST proposal number lower than highest_proposed_n %d -> REJECTED\n", acceptor_vars.highest_proposed_n);
- }else{
- acceptor_vars.highest_acc_n = proposer_proposal_number;
- fprintf(log_file, "\t\t Updated acceptor_vars.highest_acc_n = %d\n", acceptor_vars.highest_acc_n);
- acceptor_vars.highest_acc_value = sig_read_ar[VALUE_R];
- fprintf(log_file, "\t\t Updated acceptor_vars.highest_acc_value = %d\n", acceptor_vars.highest_acc_value);
- acceptor_vars.highest_proposed_n = proposer_proposal_number;
- fprintf(log_file, "\t\t Updated acceptor_vars.highest_proposed_n = %d\n", acceptor_vars.highest_proposed_n);
-
- tmp_inter_list.type = ACCEPTED;
- tmp_inter_list.data.accepted_values[VALUE_W] = acceptor_vars.highest_acc_value;
- scc_kill(sender_id, SIG_ACCEPTED, &tmp_inter_list);
- }
-
- handler_Exit(sender_id,"sig_ACCEPT_REQUEST_handler");
-
- return;
- }
- void sig_ACCEPTED_handler(int sender_id){
-
- int k;
- int received_value = sig_read_ar[VALUE_R];
- int num_of_coworkers;
-
- inter_list tmp_inter_list;
-
- coworkers_list *tmp_cowork_list;
-
- handler_Enter(sender_id,"sig_ACCEPTED_handler");
-
- proposer_vars.cores_accepted++;
-
- tmp_inter_list.next = NULL;
- /****************************************************************/
- /***************** Case where controller failed *****************/
- /****************************************************************/
-
- if (idag_mask[faulty_core] == faulty_core && idag_mask[faulty_core] != -1){
- if ((proposer_vars.cores_accepted >= majority(CLUSTER_SIZE)) && (SIG_LEARN_SENT == 0)){
- fprintf(log_file, "\t\tRECEIVED ACCEPTED FROM MAJORITY!\n");
- SIG_LEARN_SENT = 1;
- tmp_inter_list.type = LEARN;
- tmp_inter_list.data.learn_ack_info[VALUE_W] = received_value;
- tmp_inter_list.data.learn_ack_info[PREV_CW] = faulty_core;
- scc_kill(node_id,SIG_LEARN,&tmp_inter_list);
- for (k = 0; k < X_max*Y_max; k++){
- if ((k != my_idag) && (k != node_id)){
- tmp_inter_list.type = LEARN;
- tmp_inter_list.data.learn_ack_info[VALUE_W] = received_value;
- tmp_inter_list.data.learn_ack_info[PREV_CW] = faulty_core;
- scc_kill(k,SIG_LEARN,&tmp_inter_list);
- }
- }
-
- }
- /****************************************************************/
- /******************* Case where manager failed ******************/
- /****************************************************************/
-
- }else{
- num_of_coworkers=0;
- FOR_MY_COWORKERS_LIST num_of_coworkers++;
- printf("num_of_coworkers=%d\n",num_of_coworkers);
- if ((proposer_vars.cores_accepted >= majority(num_of_coworkers)) && (SIG_LEARN_SENT == 0)){
- fprintf(log_file, "\t\t MANAGER CASE ; RECEIVED ACCEPTED FROM MAJORITY!\n");
- SIG_LEARN_SENT = 1;
- tmp_inter_list.type = LEARN;
- tmp_inter_list.data.learn_ack_info[VALUE_W] = received_value;
- tmp_inter_list.data.learn_ack_info[PREV_CW] = faulty_core;
- scc_kill(node_id,SIG_LEARN,&tmp_inter_list);
- for (k = 0; k < X_max*Y_max; k++){
- if (k != node_id && k != faulty_core){
- tmp_inter_list.type = LEARN;
- tmp_inter_list.data.learn_ack_info[VALUE_W] = received_value;
- tmp_inter_list.data.learn_ack_info[PREV_CW] = faulty_core;
- scc_kill(k,SIG_LEARN,&tmp_inter_list);
- }
- }
- }
- }
-
- handler_Exit(sender_id,"sig_ACCEPTED_handler");
-
- return;
- }
- void sig_LEARN_handler(int sender_id){
-
- int received_value = sig_read_ar[VALUE_R];
- int failed_core = sig_read_ar[PREV_CR];
- int k;
- int i;
- int cluster_idag;
- int counter;
- int selfopt_r;
- int failed_interaction = 0; /* 0 nothing, 1 init_search, 2 manager_search */
- int one_core;
-
- core_list *tmp_core_list;
- core_list *tmp_core_list_prev;
-
- DDS_list *tmp_dds;
- DDS_list *tmp_prev_dds;
-
- inter_list tmp_inter_list;
-
- coworkers_list *tmp_cowork_list;
-
- inter_list *tmp_inter;
-
- core_states new_state = NO_PENDING_STATE;
-
- handler_Enter(sender_id,"sig_LEARN_handler");
-
- fprintf(log_file,"\t\t Received_value = %d and failed_core=%d\n",received_value,failed_core);
- suspected[received_value] = -1;
-
- //faulty_core = failed_core;
- /****************************************************************/
- /***************** Case where controller failed *****************/
- /****************************************************************/
-
- i = 0;
-
- /* FIXED IDs */
- for (i=0; i < X_max*Y_max;i++)
- if (idag_mask[i] == failed_core && i != failed_core){
- if (i == node_id)
- printf("I am the new controller and my current state is: %s\n\n",id2string(state));
- break;
- }
- if (idag_mask[node_id] == idag_mask[failed_core])
- printf("%d : %s\n",node_id,id2string(state));
- exit(0);
-
- /* I am checking the interactions i had with the new controller */
- if (failed_core != node_id && idag_mask[failed_core] != -1){
- if (core_inter_head[failed_core] == NULL){
- fprintf(log_file,"\t\tI had no interactions with failed core %d\n", failed_core);
- }else{
- fprintf(log_file, "\t\tMy interactions with failed core %d were:\n",failed_core);
- for (tmp_inter = core_inter_head[failed_core]; tmp_inter != NULL; tmp_inter = tmp_inter->next){
- fprintf(log_file,"\t\t\t%d. %s\n",i,inter2string(tmp_inter->type));
- if (tmp_inter->type == IDAG_FIND_IDAGS ||
- tmp_inter->type == IDAG_FIND_IDAGS_PENDING){
-
- failed_interaction = 1;
-
- }
-
- if (tmp_inter->type == SELFOPT_IDAG_FIND_IDAGS ||
- tmp_inter->type == SELFOPT_IDAG_FIND_IDAGS_PENDING){
-
- failed_interaction = 2;
-
- }
- i++;
- }
- }
- }
-
- if (core_inter_head[failed_core] != NULL &&
- (core_inter_head[failed_core]->type == IDAG_REQ_DDS ||
- core_inter_head[failed_core]->type == IDAG_REQ_DDS_PENDING ||
- core_inter_head[failed_core]->type == SELFOPT_IDAG_REQ_DDS ||
- core_inter_head[failed_core]->type == SELFOPT_IDAG_REQ_DDS_PENDING)){
-
- trigger_shit(failed_core);
-
- }
-
- /* Controller Failure and First Time i receive SIG_LEARN */
- if (idag_mask[failed_core] == failed_core && idag_mask[failed_core] != -1){
-
- /* Update idag_mask and idag_id_arr in any subcase */
- fprintf(log_file,"\t\tUpdating idag_mask and idag_id_arr with new controller %d... ",received_value);
- for (k = 0; k < X_max*Y_max; k++){
- if (idag_mask[k] == failed_core)
- idag_mask[k] = received_value;
- }
-
- for (k = 0; k < num_idags; k++){
- if (idag_id_arr[k] == failed_core)
- idag_id_arr[k] = received_value;
- }
- idag_mask[failed_core] = -1;
- fprintf(log_file,"DONE\n\n");
-
- fprintf(log_file,"\t\tMy interactions with the new controller are:\n");
- i = 0;
- tmp_inter = core_inter_head[received_value];
- while (tmp_inter != NULL && node_id != received_value)
- {
- fprintf(log_file, "\t\t\t%d. %s...\n",i,inter2string(tmp_inter->type));
- tmp_inter = tmp_inter->next;
- i++;
- }
-
-
- /***** I am a newly elected controller *****/
- coworkers_list *tmp_cowork_list;
- int num_of_coworkers;
- if (node_id == received_value){
- if (tmp_cowork_list != NULL)
- FOR_MY_COWORKERS_LIST num_of_coworkers++;
- printf("I am the new controller : %d -- Current state : %s - %d!\n", received_value,id2string(state),num_of_coworkers);
- fprintf(log_file, "\t\tI am the new controller : %d -- Current state : %s!\n", received_value,id2string(state));
-
-
- my_idag = -1;
-
- /***** Create my cores list *****/
- if (my_cores != NULL){
- printf("my_cores list is not NULL...\n");
- fprintf(log_file,"\t\tmy_cores list is not NULL...\n");
- for (tmp_core_list = my_cores->next; tmp_core_list != NULL; tmp_core_list=tmp_core_list->next){
- printf("\t\tCore_id : %d | Offered_to : %d ... %sREMOVED%s\n",my_cores->core_id,my_cores->offered_to,KRED,KNRM);
- fprintf(log_file,"\t\t\tCore_id : %d | Offered_to : %d ... %sREMOVED%s\n",my_cores->core_id,my_cores->offered_to,KRED,KNRM);
- free(my_cores);
- my_cores = tmp_core_list;
- }
- printf("\t\tCore_id : %d | Offered_to : %d ... %sREMOVED%s\n",my_cores->core_id,my_cores->offered_to,KRED,KNRM);
- fprintf(log_file,"\t\t\tCore_id : %d | Offered_to : %d ... %sREMOVED%s\n",my_cores->core_id,my_cores->offered_to,KRED,KNRM);
- free(my_cores);
- my_cores = NULL;
- }
-
- if (my_cores == NULL){
- printf("\t\tCreating my_cores list... ");
- fprintf(log_file,"\t\tCreating my_cores list... ");
- my_cores = (core_list *) malloc(sizeof(core_list));
- my_cores_count = 0;
- if (my_cores != NULL){
- printf("%sSuccess!%s\n",KGRN,KNRM);
- my_cores_tail = my_cores;
- my_cores_count++;
- my_cores_tail->core_id = node_id;
- my_cores_tail->offered_to = -1;
- my_cores_tail->next = NULL;
- my_cores_tail->workload[0] = -1;
- my_cores_tail->workload[1] = -1;
- printf("\t\t\tAdded Core_id : %d | Offered_to : %d\n",my_cores_tail->core_id,my_cores_tail->offered_to);
- fprintf(log_file,"\t\t\tAdded Core_id : %d | Offered_to : %d\n",my_cores_tail->core_id,my_cores_tail->offered_to);
- for (i = 0; i < X_max*Y_max; i++){
- if (idag_mask[i] == node_id && i != node_id){
- my_cores_tail->next = (core_list *) malloc(sizeof(core_list));
- if (my_cores_tail->next != NULL){
- my_cores_tail = my_cores_tail->next;
- my_cores_tail->next = NULL;
- my_cores_tail->core_id = i;
- my_cores_tail->offered_to = -1;
- printf("\t\t\tAdded Core_id : %d | Offered_to : %d\n",my_cores_tail->core_id,my_cores_tail->offered_to);
- fprintf(log_file,"\t\t\tAdded Core_id : %d | Offered_to : %d\n",my_cores_tail->core_id,my_cores_tail->offered_to);
- my_cores_count++;
- }else printf("--%d-- error allocating memory for my_cores\n",node_id);
- }
- }
- }else{
- printf("--%d-- error allocating memory for my_cores\n",node_id);
- fprintf(log_file, "--%d-- error allocating memory for my_cores\n",node_id);
- }
- }
-
- /***** Create my DDS List *****/
- if (DDS == DDS_tail && DDS != NULL){
- printf("\t\tDDS = DDS_tail with value: \n\t\t\tAgent_id : %d | Cores in cluster : %d\n",DDS->agent_id,DDS->num_of_cores);
- printf("\t\tReconfiguring DDS...\n");
- fprintf(log_file,"\t\tDDS = DDS_tail with value: \n\t\t\tAgent_id : %d | Cores in cluster : %d\n",DDS->agent_id,DDS->num_of_cores);
- fprintf(log_file,"\t\tReconfiguring DDS...\n");
- free(DDS);
- DDS = NULL;
- DDS_count = 0;
- }
-
- if (DDS == NULL){
- printf("\t\tCreating DDS list... ");
- fflush(stdout);
- DDS = (DDS_list *) malloc(sizeof(DDS_list));
- DDS_count = 0;
- if (DDS != NULL){
- printf("%sSuccess!%s\n",KGRN,KNRM);
- fflush(stdout);
- DDS->agent_id = node_id;
- DDS->next = NULL;
- DDS_tail = DDS;
- DDS_count++;
- DDS->num_of_cores = my_cores_count;
- printf("\t\t\tAdded Agent_id : %d | Cores in cluster : %d\n\n",DDS->agent_id, DDS->num_of_cores);
- fprintf(log_file,"\t\t\tAdded Agent_id : %d | Cores in cluster : %d\n\n",DDS->agent_id, DDS->num_of_cores);
- }else{
- printf("--%d-- error allocating memory for my_cores\n",node_id);
- fprintf(log_file,"--%d-- error allocating memory for my_cores\n",node_id);
- }
- }else{
- DDS_list *tmp_dds;
- printf("\t\tDDS list of %d:\n",node_id);
- fprintf(log_file,"\t\tDDS list of %d:\n",node_id);
-
- tmp_dds = DDS;
- while (tmp_dds != NULL){
- printf("\t\t\tAgent_id : %d | Cores in cluster : %d\n",tmp_dds->agent_id, tmp_dds->num_of_cores);
- fprintf(log_file,"\t\t\tAgent_id : %d | Cores in cluster : %d\n",tmp_dds->agent_id, tmp_dds->num_of_cores);
- tmp_dds = tmp_dds->next;
- }
- printf("\n\n");
- }
-
- paxos_state = NEW_IDAG;
- fprintf(log_file,"Changed Paxos State to %s\n", id2string(paxos_state));
- }
- /***** I am in the same cluster as the new controller *****/
- else if ((my_idag != -1) && (my_idag == failed_core)){
- fprintf(log_file, "\t\t I am in the same cluster as %d. My new controller is %d\n", sender_id, received_value);
- my_idag = received_value;
-
- if (failed_interaction == 1) {
- fprintf(log_file,"I have to resend signal SIG_IDAG_FIND_IDAGS to new controller\n");
- if (core_inter_head[my_idag] == NULL){
- core_inter_head[my_idag] = (inter_list *) malloc(sizeof(inter_list));
- core_inter_tail[my_idag] = core_inter_head[my_idag];
- } else {
- core_inter_tail[my_idag]->next = (inter_list *) malloc(sizeof(inter_list));
- core_inter_tail[my_idag] = core_inter_tail[my_idag]->next;
- }
- selfopt_r = (int) (1.5 * (X_max / num_idags_x));
- core_inter_tail[my_idag]->type = IDAG_FIND_IDAGS_PENDING;
- core_inter_tail[my_idag]->data.reg.C = node_id-1;
- core_inter_tail[my_idag]->data.reg.r = selfopt_r;
- core_inter_tail[my_idag]->next = NULL;
- if (core_inter_head[my_idag]->next == NULL) {
- paxos_node_stats.msg_count++;
- scc_kill(my_idag, SIG_IDAG_FIND_IDAGS, core_inter_head[my_idag]);
- } else {
- fprintf(log_file, "paxos_signal_handlers.c: Did not send idag_find_idags with interaction %s no2 %s\n",inter2string(core_inter_head[my_idag]->type),inter2string(core_inter_head[my_idag]->next->type));
- }
- if (selfopt_time_rem != -1) {
- selfopt_time_rem = my_gettimer();
- if (selfopt_time_rem > 0)
- my_settimer(0);
- }
- if (pending_state == WORKING_NODE) {
- fprintf(log_file, "I change to working idle init\n");
- state = WORKING_NODE;
- pending_state = IDLE_INIT_MAN;
- }else{
- state = IDLE_INIT_MAN;
- }
- }
-
- if (failed_interaction == 2){
- if (core_inter_head[my_idag] == NULL){
- core_inter_head[my_idag] = (inter_list *) malloc(sizeof(inter_list));
- core_inter_tail[my_idag] = core_inter_head[my_idag];
- } else {
- core_inter_tail[my_idag]->next = (inter_list *) malloc(sizeof(inter_list));
- core_inter_tail[my_idag] = core_inter_tail[my_idag]->next;
- }
- selfopt_r = (int) (1.5 * (X_max / num_idags_x));
- core_inter_tail[my_idag]->type = SELFOPT_IDAG_FIND_IDAGS_PENDING;
- core_inter_tail[my_idag]->data.reg.C = node_id;
- core_inter_tail[my_idag]->data.reg.r = selfopt_r;
- core_inter_tail[my_idag]->next = NULL;
- if (core_inter_head[my_idag]->next == NULL){
- paxos_node_stats.msg_count++;
- scc_kill(my_idag, SIG_IDAG_FIND_IDAGS, core_inter_head[my_idag]);
- } else {
- fprintf(log_file,"paxos_signal_handlers.c: Did not send sig_find_idags with inter1 = %s, inter2 = %s\n",inter2string(core_inter_head[my_idag]->type),inter2string(core_inter_head[my_idag]->next->type));
- }
-
- if (selfopt_interval != MAX_SELF_OPT_INTERVAL_MS){
- selfopt_interval = 2 * selfopt_interval;
- }else{
- selfopt_interval = -1;
- }
- new_state = IDLE_AGENT_WAITING_OFF;
- }
- }
-
- /***** I am a controller so i have to update idag_id_arr and reply with LEARN_ACK_CONTR *****/
- /*else if (im_controller() == 1){
- for (k = 0; k < num_idags; k++)
- if (idag_id_arr[k] == failed_core)
- idag_id_arr[k] = received_value;
- else if (idag_id_arr[k] == node_id)
- tmp_inter_list.data.controller_index = k;
-
- fprintf(log_file, "\t\tI am a Controller. Changed controller %d with %d\n",failed_core,sender_id);
-
- tmp_inter_list.next = NULL;
- tmp_inter_list.type = LEARN_ACK_CONTR;
- scc_kill(received_value,SIG_LEARN_ACK_CONTR,&tmp_inter_list);
-
- fprintf(log_file, "\t\tUpdated idag_id_arr\n");
- }*/
-
- /* I am a manager so i have to reply with ADD_AGENT_TO_DDS. In addition if the new controller was a worker of mine i remove him from my_cores list */
- if (im_manager() == 1){
- if (new_state == IDLE_AGENT_WAITING_OFF) state = IDLE_AGENT_WAITING_OFF;
- counter = 0;
- FOR_MY_CORES_LIST{
- cluster_idag = idag_mask[tmp_core_list->core_id];
- if (cluster_idag == received_value){
- fprintf(log_file,"\t\tI am manager %d and my core %d utilizes in cluster with idag %d\n", node_id, tmp_core_list->core_id,cluster_idag);
- /* if the new controller was a worker of mine i do not send him */
- if (tmp_core_list->core_id != received_value)
- tmp_inter_list.data.workers_info[++counter] = tmp_core_list->core_id;
- }
- }
-
- if (counter > 0){
- tmp_inter_list.next = NULL;
- tmp_inter_list.type = ADD_TO_DDS;
- tmp_inter_list.data.workers_info[0] = counter;
- fprintf(log_file,"\t\tNUMBER OF WORKERS: %d\n", counter);
- scc_kill(received_value,SIG_ADD_TO_DDS,&tmp_inter_list);
- }
-
- /***** If the new controller was a worker of mine i remove him from my_cores list and reappoing workload *****/
- tmp_core_list = my_cores->next;
- tmp_core_list_prev = my_cores;
- while (tmp_core_list != NULL){
- if (tmp_core_list->core_id == received_value){
- fprintf(log_file,"\t\tNew controller was a worker of mine! I remove him from my cores_list!\n");
- my_cores_count--;
- tmp_core_list_prev->next = tmp_core_list->next;
- pending_workload[0] = tmp_core_list->workload[0];
- pending_workload[1] = tmp_core_list->workload[1];
- fprintf(log_file,"Pending workload of new controller was: %d %d\n", pending_workload[0], pending_workload[1]);
- free(tmp_core_list);
-
- if (pending_workload[0] != -1 || pending_workload[1] != -1){
- //active_working_cores--;
- tmp_core_list = my_cores->next;
- fprintf(log_file,"\t\tI am reassigning the workload!\n");
- fprintf(log_file,"\t\t-------- CURRENT WORKLOADS --------\n");
- while (tmp_core_list != NULL){
- fprintf(log_file,"\t\t%d\t|\t%d\t|\t%d\n", tmp_core_list->core_id, tmp_core_list->workload[0], tmp_core_list->workload[1]);
- printf("\t\t%d\t|\t%d\t|\t%d\n", tmp_core_list->core_id, tmp_core_list->workload[0], tmp_core_list->workload[1]);
- tmp_core_list = tmp_core_list->next;
- }
- fprintf(log_file,"\t\t-----------------------------------\n");
- tmp_core_list = my_cores->next;
- while (tmp_core_list != NULL){
- if ((tmp_core_list->workload[0] == -1) && (tmp_core_list->workload[1] == -1)){
- one_core = tmp_core_list->core_id;
- fprintf(log_file,"\t\tpaxos_signal_handlers.c : I have pending workload %d | %d\n",pending_workload[0],pending_workload[1]);
- fprintf(log_file,"\t\tpaxos_signal_handlers.c : I am assigning workload to %d\n",one_core);
- if (core_inter_head[one_core] == NULL){
- core_inter_head[one_core] = (inter_list *) malloc(sizeof(inter_list));
- core_inter_tail[one_core] = core_inter_head[one_core];
- } else {
- core_inter_tail[one_core]->next = (inter_list *) malloc(sizeof(inter_list));
- core_inter_tail[one_core] = core_inter_tail[one_core]->next;
- }
-
- core_inter_tail[one_core]->type = APPOINT_WORK_NODE;
- core_inter_tail[one_core]->data.work_bounds[0] = pending_workload[0];
- core_inter_tail[one_core]->data.work_bounds[1] = pending_workload[1];
- fprintf(app_log_file,"%d (%d, %d), ",one_core,core_inter_tail[one_core]->data.work_bounds[0],core_inter_tail[one_core]->data.work_bounds[1]);
- core_inter_tail[one_core]->next = NULL;
- if (core_inter_head[one_core]->next == NULL) {
- paxos_node_stats.msg_count++;
- scc_kill(one_core, SIG_APPOINT_WORK, core_inter_head[one_core]);
- } else {
- fprintf(log_file,"I am doing smth else with my working node %d in init inter1 = %d inter2 = %d\n",one_core,core_inter_head[one_core]->type,core_inter_head[one_core]->next->type);
- printf("ASDASDASDASDASDAS\n");
- }
- pending_workload[0] = -1;
- pending_workload[1] = -1;
- break;
- }
- tmp_core_list = tmp_core_list->next;
- }
- break;
- }
- }else{
- tmp_core_list = tmp_core_list->next;
- tmp_core_list_prev = tmp_core_list_prev->next;
- }
- }
- }
- /****************************************************************/
- /******************* Case where manager failed ******************/
- /****************************************************************/
- }
- else{
- /***** I am the newly elected manager *****/
- if (node_id == received_value && idag_mask[failed_core] != -1){
- idag_mask[failed_core] = -1;
- printf("I am the new manager : %d -- Current state : %s!\n", received_value,id2string(state));
-
- if (my_cores != NULL){
- printf("my_cores list is not NULL...\n");
- for (tmp_core_list = my_cores->next; tmp_core_list != NULL; tmp_core_list=tmp_core_list->next){
- printf("\t\tCore_id : %d | Offered_to : %d ... %sREMOVED%s\n",my_cores->core_id,my_cores->offered_to,KRED,KNRM);
- free(my_cores);
- my_cores = tmp_core_list;
- }
- printf("\t\tCore_id : %d | Offered_to : %d ... %sREMOVED%s\n",my_cores->core_id,my_cores->offered_to,KRED,KNRM);
- free(my_cores);
- my_cores = NULL;
- }
-
- if (my_cores == NULL){
- printf("\t\tCreating my_cores list... ");
- fflush(stdout);
- my_cores = (core_list *) malloc(sizeof(core_list));
- my_cores_count = 0;
- if (my_cores != NULL){
- printf("%sSuccess!%s\n",KGRN,KNRM);
- my_cores_tail = my_cores;
- my_cores_tail->core_id = node_id;
- my_cores_tail->offered_to = -1;
- my_cores_tail->workload[0] = -1;
- my_cores_tail->workload[1] = -1;
- my_cores_tail->next = NULL;
- my_cores_count++;
- printf("\t\t\tAdded Core_id : %d | Offered_to : %d\n",my_cores_tail->core_id,my_cores_tail->offered_to);
-
- FOR_MY_COWORKERS_LIST{
- my_cores_tail->next = (core_list *) malloc(sizeof(core_list));
- if (my_cores_tail->next != NULL){
- my_cores_tail = my_cores_tail->next;
- my_cores_tail->next = NULL;
- my_cores_tail->core_id = tmp_cowork_list->core_id;
- my_cores_tail->offered_to = -1;
- my_cores_tail->workload[0] = -1;
- my_cores_tail->workload[1] = -1;
- printf("\t\t\tAdded Core_id : %d | Offered_to : %d\n",my_cores_tail->core_id,my_cores_tail->offered_to);
- my_cores_count++;
- }else printf("--%d-- error allocating memory for my_cores\n",node_id);
- }
-
- }else printf("--%d-- error allocating memory for my_cores\n",node_id);
- }
-
- /**** I have to send SIG_ADD_TO_DDS to the controllers of my workers ****/
- for (i = 0; i < X_max*Y_max; i++){
- if (idag_mask[i] == i){
- counter = 0;
- cluster_idag = idag_mask[i];
- FOR_MY_CORES_LIST{
- if (cluster_idag == idag_mask[tmp_core_list->core_id]){
- fprintf(log_file,"\t\tI am manager %d and my core %d utilizes in cluster with idag %d\n", node_id, tmp_core_list->core_id,cluster_idag);
- tmp_inter_list.data.workers_info[++counter] = tmp_core_list->core_id;
- }
- }
-
- if (counter > 0){
- tmp_inter_list.next = NULL;
- tmp_inter_list.type = ADD_TO_DDS;
- tmp_inter_list.data.workers_info[0] = counter;
- fprintf(log_file,"\t\tNUMBER OF WORKERS: %d\n", counter);
- scc_kill(i,SIG_ADD_TO_DDS,&tmp_inter_list);
- }
- }
- }
-
- state = AGENT_INIT_STATE;
- paxos_state = NEW_AGENT;
- printf("I was working for app: %d\n", worker_app_id);
- my_app.id = worker_app_id;
- my_app.num_of_cores = my_cores_count-1;
- find_app_info();
- #ifndef ARTIFICIAL_APPS_SIM
- printf("Found array size = %d\n", my_app.array_size);
- fprintf(log_file,"Found array size = %d\n", my_app.array_size);
- #endif
- printf("Found remaining workload = %d\n", my_app.workld);
- fprintf(log_file,"Found remaining workload = %d\n", my_app.workld);
- printf("App number of cores = %d\n", my_app.num_of_cores);
- fprintf(log_file,"App number of cores = %d\n", my_app.num_of_cores);
- FOR_MY_CORES_LIST{
- fprintf(log_file,"\t\t\tWorker_id : %d | Workload : %d %d\n", tmp_core_list->core_id, tmp_core_list->workload[0], tmp_core_list->workload[1]);
- printf("\t\t\tWorker_id : %d | Workload : %d %d\n", tmp_core_list->core_id, tmp_core_list->workload[0], tmp_core_list->workload[1]);
- }
-
- /***** I am controller i have to remove the failed_core from my DDS and cores list *****/
- }else if (my_idag == -1){
- if (idag_mask[failed_core] == node_id){
- printf("--%d-- I received SIG_LEARN from %d\n",node_id,sender_id);
- tmp_core_list = my_cores->next;
- tmp_core_list_prev = my_cores;
- while (tmp_core_list != NULL){
- if (tmp_core_list->core_id == failed_core){
- my_cores_count--;
- tmp_core_list_prev->next = tmp_core_list->next;
- free(tmp_core_list);
- break;
- }
- tmp_core_list = tmp_core_list->next;
- tmp_core_list_prev = tmp_core_list_prev->next;
- }
-
- printf("\t\tUpdated my_cores list:\n");
- fprintf(log_file,"\t\tUpdated my_cores list:\n");
-
- counter = 0;
- FOR_MY_CORES_LIST{
- if (tmp_core_list->offered_to == failed_core){
- tmp_core_list->offered_to = -1;
- counter++;
- }
- printf("\t\t\tCore_id : %d | Offered_to : %d\n",tmp_core_list->core_id,tmp_core_list->offered_to);
- fprintf(log_file,"\t\t\tCore_id : %d | Offered_to : %d\n",tmp_core_list->core_id,tmp_core_list->offered_to);
- }
- printf("\t\t\tmy_cores_count = %d\n",my_cores_count);
-
- tmp_dds = DDS->next;
- tmp_prev_dds = DDS;
- while (tmp_dds != NULL){
- if (tmp_dds->agent_id == failed_core){
- fprintf(log_file,"\t\t Removed failed core %d from DDS\n",tmp_dds->agent_id);
- tmp_prev_dds->next = tmp_dds->next;
- if (tmp_dds->next == NULL){
- DDS_tail = tmp_prev_dds;
- }
- DDS_count--;
- free(tmp_dds);
- DDS->num_of_cores = DDS->num_of_cores + counter;
- break;
- }else{
- tmp_prev_dds = tmp_dds;
- tmp_dds = tmp_dds->next;
- }
- }
-
- printf("\t\tUpdated DDS list:\n");
- fprintf(log_file,"\t\tUpdated DDS list:\n");
- FOR_MY_DDS_LIST{
- printf("\t\t\tAgent_id : %d | Cores in cluster : %d\n",tmp_dds->agent_id,tmp_dds->num_of_cores);
- fprintf(log_file,"\t\t\tAgent_id : %d | Cores in cluster : %d\n",tmp_dds->agent_id,tmp_dds->num_of_cores);
- }
- printf("\t\t\tDDS_count = %d\n",DDS_count);
- }
-
- /***** I was working for the failed manager *****/
- }else if (cur_agent.my_agent == failed_core){
- cur_agent.my_agent = -1;
- }
- }
-
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- fprintf(log_file, "[%d:%d:%d]: I ended sig_LEARN_handler with sender = %d state = %s\n\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
- //exit(0);
- return;
- }
- void sig_REINIT_APP_handler(int sender_id){
-
- inter_list *tmp_inter_list;
-
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- fprintf(log_file, "\n\n[%d:%d:%d]: I entered sig_REINIT_APP_handler with sender = %d state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
-
- printf("Received SIG_REINIT_APP from %d\n", sender_id);
- //int i, data_array_local[LINE_SIZE];
- for (tmp_inter_list = core_inter_head[0]; tmp_inter_list != NULL; tmp_inter_list = tmp_inter_list->next)
- if (tmp_inter_list->type == INIT_APP) {
- printf("i not null\n");
- break;
- }
-
- if (core_inter_head[12] == NULL){
- core_inter_head[12] = (inter_list *)malloc(sizeof(inter_list));
- core_inter_tail[12] = core_inter_head[12];
- }else{
- core_inter_tail[12]->next = (inter_list *)malloc(sizeof(inter_list));
- core_inter_tail[12] = core_inter_tail[12]->next;
- }
- core_inter_tail[12]->next = NULL;
- core_inter_tail[12]->type = INIT_APP;
- core_inter_tail[12]->data.new_app.id = sig_read_ar[2];
- core_inter_tail[12]->data.new_app.num_of_cores = sig_read_ar[3];
- core_inter_tail[12]->data.new_app.workld = sig_read_ar[4];
- #ifndef ARTIFICIAL_APPS_SIM
- core_inter_tail[12]->data.new_app.array_size = sig_read_ar[5];
- #endif
- if (core_inter_head[12]->next == NULL){
- scc_kill(12, SIG_INIT_APP, core_inter_head[12]);
- }
-
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- fprintf(log_file, "[%d:%d:%d]: I ended sig_REINIT_APP_handler with sender = %d state=%s\n\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
- return;
- }
- void find_app_info(){
- //int sz;
- char app_log_file_name[64];
- char buffer[64];
- int temp;
- strcpy(app_log_file_name, "../");
- strcat(app_log_file_name,local_scen_directory);
- strcat(app_log_file_name, "/");
- strcat(app_log_file_name,local_scen_num);
- strcat(app_log_file_name,"/app_logs/");
- strcat(app_log_file_name, itoa(worker_app_id));
- strcat(app_log_file_name, ".txt");
- printf("Trying to open file %s... ", app_log_file_name);
- if ((app_log_file = fopen(app_log_file_name, "r")) == NULL){
- printf("%sError%s\n",KRED,KNRM);
- printf("paxos_signal_handlers.c : Cannot open input file with file path = %s ",app_log_file_name);
- perror("open app_log_file");
- }else{
- printf("%sSuccess%s\n",KGRN,KNRM);
- }
-
- while (fscanf(app_log_file,"%s", buffer) != EOF){
- #ifndef ARTIFICIAL_APPS_SIM
- if (strcmp(buffer,"array_size") == 0){
- fscanf(app_log_file,"%s",buffer);
- fscanf(app_log_file,"%d", &temp);
- my_app.array_size = temp;
- }
- #else
- /* FIXME must locate var and A */
- #endif
- if (strcmp(buffer,"workload") == 0){
- fscanf(app_log_file,"%s",buffer);
- fscanf(app_log_file,"%d",&temp);
- my_app.workld = temp;
- }
- }
- fclose(app_log_file);
- }
- void rollback(){
-
- offer_list *tmp_offer_list;
- inter_list tmp_inter_list;
- core_list *tmp_core_list;
- int core_idag;
-
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
-
-
- tmp_inter_list.next = NULL;
-
- fprintf(log_file, "Rolling back... %s\n", id2string(state));
- //If i am the new controller and i have an app to initialize i send SIG_REINIT_APP to 0.
- if ((paxos_state == NEW_IDAG) && (state == INIT_MANAGER || state == INIT_MANAGER_SEND_OFFERS || state == IDLE_INIT_MAN || state == INIT_MAN_CHK_OFFERS || pending_state == INIT_MANAGER
- || pending_state == INIT_MANAGER_SEND_OFFERS || pending_state == INIT_MAN_CHK_OFFERS || pending_state == AGENT_INIT_CHK_OFFERS || pending_state == IDLE_INIT_MAN
- || pending_state == IDLE_INIT_IDLE_AGENT || pending_state == IDLE_INIT_AGENT_SELFOPT || pending_state == INIT_CHK_OFFERS_IDLE_AGENT || pending_state == INIT_CHK_OFFERS_SELFOPT)){
-
- printf("i am the new controller and i have an app to initialize i send SIG_REINIT_APP to 0\n");
- if (init_man_offers != NULL){
- tmp_offer_list = init_man_offers;
- while (tmp_offer_list != NULL){
- *tmp_offer_list->answer = 0;
- tmp_offer_list = tmp_offer_list->next;
- }
- }
-
- while (init_man_offers != NULL){
- if (core_inter_head[init_man_offers->sender] != NULL) {
- if (core_inter_head[init_man_offers->sender]->type == REP_AGENT_OFFER_PENDING){
- core_inter_head[init_man_offers->sender]->type = REP_AGENT_OFFER_SENT;
- //kill(pid_num[init_man_offers->sender], SIG_REP_OFFERS);
- paxos_node_stats.msg_count++;
- scc_kill(init_man_offers->sender, SIG_REP_OFFERS, core_inter_head[init_man_offers->sender]);
- //my_stats.msg_count++;
- //my_stats.distance += distance(node_id,init_man_offers->sender);
- } else {
- printf("gamietai b = %d",init_man_offers->sender);
- fprintf(log_file,"gamietai b = %d",init_man_offers->sender);
- }
- tmp_offer_list = init_man_offers;
- init_man_offers = init_man_offers->next;
- free(tmp_offer_list);
- }
- }
- fprintf(log_file, "Replied to all my offers negatively\n");
- tmp_inter_list.type = REINIT_APP;
- tmp_inter_list.data.reappointed_app = init_app;
- scc_kill(0,SIG_REINIT_APP,&tmp_inter_list);
- }
- /* If i were a manager i have to inform that i am no longer manager and also add other managers to my DDS */
- else if (im_manager() == 1){
-
- printf("New controller was a manager before paxos! Remove him from dds lists and create his dds list...\n");
- //TODO remove from dds and add managers to dds
- tmp_core_list = my_cores;
- while (tmp_core_list != NULL){
- //idag_mask[tmp_core_list->core_id] -> idag id
-
- core_idag = idag_mask[tmp_core_list->core_id];
- tmp_inter_list.next = NULL;
- tmp_inter_list.type = REMOVE_FROM_DDS;
- scc_kill(core_idag, SIG_REMOVE_FROM_DDS, &tmp_inter_list);
-
- tmp_core_list = tmp_core_list->next;
- }
- }else
- //printf("New controller was an idle core before paxos! Just create his dds list...\n");
- //TODO add managers to dds
- //my_cores = NULL;
- return;
- }
- void sig_ADD_TO_DDS_handler(int sender_id, int *inc_cnt, int cur_index_top){
-
- DDS_list *tmp_dds = NULL;
- core_list *tmp_core_list;
- int num_of_workers = 0, flag = 0, current = 0;
-
- handler_Enter(sender_id,"sig_ADD_TO_DDS_handler");
-
- num_of_workers = sig_read_ar[2];
- fprintf(log_file,"--%d-- [%d:%d:%d]:I received SIG_ADD_TO_DDS from %d with num_of_workers = %d\n",node_id,cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,num_of_workers);
-
- if (num_of_workers > 5){
- *inc_cnt = *inc_cnt + 1;
-
- #ifdef PLAT_SCC
- error = RCCE_get((t_vcharp)(&sig_read_ar[LINE_SIZE]), (t_vcharp)(&sig_array[(cur_index_top+1)*LINE_SIZE]), LINE_SIZE * sizeof(int), node_id);
- if (error != RCCE_SUCCESS) {
- RCCE_error_string(error, error_str, &str_len);
- fprintf(log_file,"I got an error in get data in sig_ADD_CORES_DDS_handler from %d with descr %s\n",sender_id,error_str);
- fflush(log_file);
- }
- #else
- new_RCCE_get(sig_read_ar, sig_array, cur_index_top, LINE_SIZE, node_id);
- #endif
- }
-
- if (DDS == NULL){
- DDS_count=0;
- DDS = (DDS_list *) malloc(sizeof(DDS_list));
- DDS->agent_id = node_id;
- DDS->next = NULL;
- DDS_tail = DDS;
- DDS_count++;
- flag = 0;
- }else{
- FOR_MY_DDS_LIST{
- if (tmp_dds->agent_id == sender_id){
- fprintf(log_file,"\t\t%d is already in my DDS. %d->num_of_cores++ && DDS->num_of_cores--\n",sender_id,sender_id);
- DDS->num_of_cores--;
- tmp_dds->num_of_cores++;
- break;
- flag = 1;
- }
- }
- }
-
- if (flag == 0){ /* Sender was not in my DDS */
- fprintf(log_file,"\t\t%d is not in my DDS. DDS_count++\n",sender_id);
- DDS_tail->next = (DDS_list *)malloc(sizeof(DDS_list));
- DDS_tail = DDS_tail->next;
-
- DDS_tail->next = NULL;
- DDS_tail->agent_id = sender_id;
- DDS_tail->num_of_cores = num_of_workers;
- DDS->num_of_cores = DDS->num_of_cores - num_of_workers;
- DDS_count++;
- }
-
- fprintf(log_file, "\t\t%d utilizes %d cores in my cluster\n",sender_id, num_of_workers);
- while (num_of_workers > 0){
- current = sig_read_ar[2+num_of_workers];
- FOR_MY_CORES_LIST{
- if (tmp_core_list->core_id == current){
- fprintf(log_file,"\t\tChanged %d->offered_to = %d\n",tmp_core_list->core_id,sender_id);
- tmp_core_list->offered_to = sender_id;
- }
- }
- num_of_workers--;
- }
-
- printf("\t\tUpdated my_cores list:\n");
- fprintf(log_file,"\t\tUpdated my_cores list:\n");
-
- FOR_MY_CORES_LIST{
- printf("\t\t\tCore_id : %d | Offered_to : %d\n",tmp_core_list->core_id,tmp_core_list->offered_to);
- fprintf(log_file,"\t\t\tCore_id : %d | Offered_to : %d\n",tmp_core_list->core_id,tmp_core_list->offered_to);
- }
-
- printf("\t\tUpdated DDS list:\n");
- fprintf(log_file,"\t\tUpdated DDS list:\n");
- FOR_MY_DDS_LIST{
- printf("\t\t\tAgent_id : %d | Cores in cluster : %d\n",tmp_dds->agent_id,tmp_dds->num_of_cores);
- fprintf(log_file,"\t\t\tAgent_id : %d | Cores in cluster : %d\n",tmp_dds->agent_id,tmp_dds->num_of_cores);
- }
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- fprintf(log_file, "\n[%d:%d:%d]: I ended sig_ADD_TO_DDS_handler with sender = %d state = %s\n\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
- return;
- }
- void sig_REMOVE_FROM_DDS_handler(int sender_id){
-
- core_list *tmp_core_list;
- DDS_list *tmp_dds, *tmp_prev_dds;
-
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- fprintf(log_file, "\n\n[%d:%d:%d]: I entered sig_REMOVE_FROM_DDS_handler with sender = %d state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
-
- tmp_core_list = my_cores;
- while (tmp_core_list != NULL){
- if (tmp_core_list->offered_to == sender_id){
- fprintf(log_file,"\t\tChanged %d->offered_to to -1\n",tmp_core_list->core_id);
- tmp_core_list->offered_to = -1;
- }
- tmp_core_list = tmp_core_list->next;
- }
-
- tmp_dds = DDS->next;
- tmp_prev_dds = DDS;
- while (tmp_dds != NULL){
- if (tmp_dds->agent_id == sender_id){
- fprintf(log_file,"\t\t Removed %d from DDS\n",tmp_dds->agent_id);
- tmp_prev_dds->next = tmp_dds->next;
- if (tmp_dds->next == NULL){
- DDS_tail = tmp_prev_dds;
- }
- DDS_count--;
- free(tmp_dds);
- break;
- }else{
- tmp_prev_dds = tmp_dds;
- tmp_dds = tmp_dds->next;
- }
- }
-
-
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- fprintf(log_file, "[%d:%d:%d]: I ended sig_REMOVE_FROM_DDS_handler with sender = %d state = %s\n\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
- return;
- }
- void sig_CONTR_TO_handler(int sender_id){
-
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- fprintf(log_file, "\n\n[%d:%d:%d]: I entered sig_CONTR_TO_handler with sender=%d state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
-
- faulty_core = my_idag;
- if (paxos_state != PAXOS_ACTIVE && paxos_state != NEW_AGENT && paxos_state != NEW_IDAG)
- sig_PAXOS_INIT_handler();
-
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- fprintf(log_file, "[%d:%d:%d]: I ended sig_CONTR_TO_handler with sender = %d state = %s\n\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
- return;
- }
- void sig_HEARTBEAT_REQ_handler(int sender_id){
-
- inter_list tmp_inter_list;
-
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- //fprintf(log_file, "[%d:%d:%d]: I entered sig_HEARTBEAT_REQ_handler with sender=%d state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
-
-
- tmp_inter_list.next = NULL;
- tmp_inter_list.type = HEARTBEAT_REP;
- scc_kill(sender_id,SIG_HEARTBEAT_REP,&tmp_inter_list);
-
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- return;
- }
- void sig_HEARTBEAT_REP_handler(int sender_id){
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- //fprintf(log_file, "[%d:%d:%d]: I entered sig_HEARTBEAT_REP_handler with sender = %d state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
-
- alive[sender_id] = 1;
- suspected[sender_id] = 0;
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- //fprintf(log_file, "[%d:%d:%d]: I ended sig_TERMINATE_handler with sender = %d state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
-
- return;
- }
- void sig_PFD_TIMER_handler(int signo, siginfo_t *info, void *context){
- int i, j, failed_core, pending_workload[2], one_core;
- DDS_list *tmp_dds, *tmp_dds_prev;
- inter_list tmp_inter_list;
- core_list *tmp_core_list, *tmp_core_list_prev;
-
- if (first_time == 0){
- printf("%d oh yeah\n",node_id);
- first_time = 1;
- its.it_interval.tv_sec = 0;
- its.it_interval.tv_nsec = 0;
- its.it_value.tv_sec = 2;
- its.it_value.tv_nsec = 0;//100000000;
- if (timer_settime(pfd_timer, 0, &its, NULL) == -1){
- printf("-- %d --", node_id);
- fflush(stdout);
- perror("paxos_signal_handlers.c : timer_settime error9");
- }else {
- fprintf(log_file,"Updated timer!\n");
- }
- return;
- }
-
- signals_disable();
-
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- fprintf(log_file, "\n\n[%d:%d:%d]: I entered sig_PFD_TIMER_handler state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,id2string(state));
- //printf("[%d:%d:%d]: -%d- I entered sig_PFD_TIMER_handler state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,node_id, id2string(state));
- for (i = 0; i < X_max*Y_max; i++){
- if (alive[i] == 0){
- suspected[i]++;
- }
- #if defined(PFD) && defined(BASIC_PAXOS)
- /* The Perfect Failure Detectors sends a SIG_HEARTBEAT_REQ each time the timer explodes and waits for a SIG_HEARTBEAT_REP*/
- /* If he doesn't receive a reply until the timer reexplodes then the node is detected as faulty */
- if (alive[i] != 1 && alive[i] != -1 && faulty_core == -1 && i != node_id && idag_mask[i] == idag_mask[node_id])
- #elif defined(tPFD) && defined(BASIC_PAXOS) //tPFD
- /* The tweaked Perfect Failure Detectors only suspects a core if he sends him a signal and doesn't receive a reply in some period of time*/
- if (alive[i] == 0 && suspected[i] == 2 && i != node_id && idag_mask[i] == idag_mask[node_id])
- #else
- if (alive[i] == -5)
- #endif
- {
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- printf("-- %d -- I detected %d as faulty at [%d:%d:%d]!!\n", node_id, i,cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec);
- fprintf(log_file, "-- %d -- I detected %d as faulty at [%d:%d:%d]!!\n", node_id, i,cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec);
- fprintf(log_file, "-- %d -- I detected %d as faulty!!\n", node_id, i);
- failed_core = i;
- faulty_core = i;
- suspected[failed_core] = -1;
- alive[failed_core] = -1;
-
- #if defined(CONTROLLER) && defined(PLAT_LINUX)
- int semvalue = 0;
-
- sem_getvalue(&flag_data_written[failed_core],&semvalue);
- if (semvalue == 0){
- /*I am locked*/
- printf("--%d-- I unlocked semaphore for node %d\n",node_id,failed_core);
- sem_post(&flag_data_written[failed_core]);
- sem_getvalue(&flag_data_written[failed_core],&semvalue);
- }
- #endif
- /*My controller timed out */
- if (failed_core == my_idag){
- sig_PAXOS_INIT_handler();
- /* Controller in an other cluster timed out */
- }else if (idag_mask[failed_core] == failed_core){
- printf("--%d-- Other controller TIMED OUT\n", node_id);
- for (j = 0; j < X_max*Y_max; j++){
- if (idag_mask[j] == failed_core && j != failed_core){
- if (state == IDLE_AGENT_WAITING_OFF) state = IDLE_AGENT;
- tmp_inter_list.next = NULL;
- tmp_inter_list.type = CONTR_TO;
- scc_kill(j,SIG_CONTR_TO,&tmp_inter_list);
- }
-
- }
- /* Worker or manager timed out */
- }else{
- /*I am controller.
- *If failed node is inside my cluster
- *i have to remove the failed node from my cores and DDS list */
- if ((idag_mask[node_id] == node_id) && (idag_mask[failed_core] == node_id)){
- tmp_core_list = my_cores->next;
- tmp_core_list_prev = my_cores;
- while (tmp_core_list != NULL){
- if (tmp_core_list->core_id == failed_core){
- my_cores_count--;
- tmp_core_list_prev->next = tmp_core_list->next;
- free(tmp_core_list);
- break;
- }else{
- tmp_core_list_prev = tmp_core_list;
- tmp_core_list = tmp_core_list->next;
- }
- }
-
- tmp_dds = DDS->next;
- tmp_dds_prev = DDS;
- while (tmp_dds != NULL){
- if (tmp_dds->agent_id == failed_core){
- tmp_dds_prev->next = tmp_dds->next;
- free(tmp_dds);
- break;
- }else{
- tmp_dds_prev = tmp_dds;
- tmp_dds = tmp_dds->next;
- }
- }
-
- }
- /*I am manager.
- *I have to check if the failed node is my worker
- *If yes i have to appoint work to a new node.*/
- if (im_manager()){
-
- printf("-- %d -- I am manager of an application.\n",node_id);
- tmp_core_list = my_cores->next;
- tmp_core_list_prev = my_cores;
- while (tmp_core_list != NULL){
- /* I am the manager of the failed worker. I reappoint the work to another core. */
- if (tmp_core_list->core_id == failed_core){
- my_cores_count--;
-
- /* I am the manager of the failed worker so i remove him from my core list */
- tmp_core_list_prev->next = tmp_core_list->next;
-
- fprintf(log_file,"I am the manager of the failed worker %d! I removed him from my cores list\n",tmp_core_list->core_id);
- printf("-- %d --I am the manager of the failed worker %d! I removed him from my cores list\n",node_id, tmp_core_list->core_id);
- //one_core = tmp_core_list->core_id;
- pending_workload[0] = tmp_core_list->workload[0];
- pending_workload[1] = tmp_core_list->workload[1];
- printf("Pending workload of faulty core: %d %d\n", pending_workload[0], pending_workload[1]);
- /*else{
- reappoint = FALSE;
- }*/
- free(tmp_core_list);
-
-
- tmp_core_list = my_cores->next;
- fprintf(log_file,"I am reassigning the workload!\n");
- fprintf(log_file,"-------- CURRENT WORKLOADS --------\n");
- printf("-------- CURRENT WORKLOADS --------\n");
- while (tmp_core_list != NULL){
- fprintf(log_file,"%d\t|\t%d\t|\t%d\n", tmp_core_list->core_id, tmp_core_list->workload[0], tmp_core_list->workload[1]);
- printf("%d\t|\t%d\t|\t%d\n", tmp_core_list->core_id, tmp_core_list->workload[0], tmp_core_list->workload[1]);
- tmp_core_list = tmp_core_list->next;
- }
- printf("-----------------------------------\n");
- fprintf(log_file,"-----------------------------------\n");
-
- tmp_core_list = my_cores->next;
- while (tmp_core_list != NULL){
- if ((tmp_core_list->workload[0] == -1) && (tmp_core_list->workload[1] == -1)){
- one_core = tmp_core_list->core_id;
- if (core_inter_head[one_core] == NULL){
- core_inter_head[one_core] = (inter_list *) malloc(sizeof(inter_list));
- core_inter_tail[one_core] = core_inter_head[one_core];
- } else {
- core_inter_tail[one_core]->next = (inter_list *) malloc(sizeof(inter_list));
- core_inter_tail[one_core] = core_inter_tail[one_core]->next;
- }
-
- core_inter_tail[one_core]->type = APPOINT_WORK_NODE;
- core_inter_tail[one_core]->data.work_bounds[0] = pending_workload[0];
- core_inter_tail[one_core]->data.work_bounds[1] = pending_workload[1];
- fprintf(app_log_file,"%d (%d, %d), ",one_core,core_inter_tail[one_core]->data.work_bounds[0],core_inter_tail[one_core]->data.work_bounds[1]);
- core_inter_tail[one_core]->next = NULL;
- if (core_inter_head[one_core]->next == NULL) {
- paxos_node_stats.msg_count++;
- scc_kill(one_core, SIG_APPOINT_WORK, core_inter_head[one_core]);
- } else {
- fprintf(log_file,"I am doing smth else with my working node %d in init inter1 = %d inter2 = %d\n",one_core,core_inter_head[one_core]->type,core_inter_head[one_core]->next->type);
- }
- break;
- }
- tmp_core_list = tmp_core_list->next;
- }
- break;
- }else {
- tmp_core_list_prev = tmp_core_list;
- tmp_core_list = tmp_core_list->next;
- }
- }
- }else{
- /* My manager timed out */
- if (cur_agent.my_agent == failed_core){
- printf("--%d-- %d is my manager! I initiate a Paxos instance\n",node_id, failed_core);
- paxos_state = PAXOS_ACTIVE;
- sig_PAXOS_INIT_handler();
- }
- }
- }
- }
- #ifdef PFD
- else{
- if (alive[i] != -1 && i != node_id && idag_mask[i] == idag_mask[node_id]){
- alive[i] = 0;
- tmp_inter_list.type = HEARTBEAT_REQ;
- tmp_inter_list.next = NULL;
- scc_kill(i,SIG_HEARTBEAT_REQ,&tmp_inter_list);
- }
- }
- #else
- else{
- if (alive[i] == 0 && i != node_id && idag_mask[i] == idag_mask[node_id]){
- tmp_inter_list.type = HEARTBEAT_REQ;
- tmp_inter_list.next = NULL;
- scc_kill(i,SIG_HEARTBEAT_REQ,&tmp_inter_list);
- }
- }
- #endif
- }
- fprintf(log_file,"\t\tNodes in my cluster: ");
- for (i = 0; i < X_max*Y_max; i++){
- if (idag_mask[i] == idag_mask[node_id] && i != node_id) fprintf(log_file,"%d, ",i);
- }
- fprintf(log_file,"\n");
-
- fprintf(log_file,"\t\tI have received a signal from: ");
- for (i = 0; i < X_max*Y_max; i++){
- if (alive[i] == 1) fprintf(log_file,"%d, ",i);
- }
- fprintf(log_file,"\n");
-
- fprintf(log_file,"\t\tSent HEARTBEAT_REQ to:");
- for (i = 0; i < X_max*Y_max; i++){
- if (alive[i] == 0 && i != node_id && idag_mask[i] == idag_mask[node_id]){
- fprintf(log_file,"%d, ",i);
- }
- alive[i] = 0;
- }
-
- its.it_interval.tv_sec = 0;
- its.it_interval.tv_nsec = 0;
- its.it_value.tv_sec = 2;
- its.it_value.tv_nsec = 0;//100000000;
- if (timer_settime(pfd_timer, 0, &its, NULL) == -1){
- printf("-- %d --", node_id);
- fflush(stdout);
- perror("paxos_signal_handlers.c : timer_settime error9");
- }else {
- fprintf(log_file,"Updated timer!\n");
- }
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- fprintf(log_file, "[%d:%d:%d]: I ended sig_PFD_TIMER_handler state = %s\n\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,id2string(state));
-
- signals_enable();
- }
- void sig_EPFD_TIMER_handler(int signo, siginfo_t *info, void *context)
- {
- core_list *tmp_core_list, *tmp_core_list_prev;
- DDS_list *tmp_dds, *tmp_dds_prev;
- int one_core,failed_core, pending_workload[2]/*,reappoint = TRUE*/;
-
- signals_disable();
-
-
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- fprintf(log_file, "\n\n[%d:%d:%d]: I entered sig_EPFD_TIMER_handler state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,id2string(state));
-
- int i, j;//, disjoint = 1;
- inter_list tmp_inter_list;
-
- for (i = 0; i < X_max*Y_max; i++){
- if (alive[i] == suspected[i]){
- fprintf(log_file,"\t\tNew Delay: %d\n",delay);
- delay *= 2;
- break;
- //disjoint = 0;
- }
- }
-
- fprintf(log_file,"\t\t------ FAILURE DETECTION ------\n");
- for (i = 0; i < X_max*Y_max; i++){
- if (alive[i] == 0 && suspected[i] != -1){
- suspected[i]++;
- fprintf(log_file,"\t\t%d -> SUSPECTED with suspected[%d] = %d\n",i,i,suspected[i]);
- if ((suspected[i] > 2) && (faulty_core != i)){
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- failed_core = i;
- faulty_core = i;
- suspected[failed_core] = -1;
- alive[failed_core] = -1;
-
- #if defined(CONTROLLER) && defined(PLAT_LINUX)
- int semvalue = 0;
-
- sem_getvalue(&flag_data_written[failed_core],&semvalue);
- if (semvalue == 0){
- /*I am locked*/
- printf("--%d-- I unlocked semaphore for node %d\n",node_id,failed_core);
- sem_post(&flag_data_written[failed_core]);
- sem_getvalue(&flag_data_written[failed_core],&semvalue);
- }
- #endif
- /*My controller timed out */
- if (failed_core == my_idag){
- sig_PAXOS_INIT_handler();
- /* Controller in an other cluster timed out */
- }else if (idag_mask[failed_core] == failed_core){
- printf("--%d-- Other controller TIMED OUT\n", node_id);
- for (j = 0; j < X_max*Y_max; j++){
- if (idag_mask[j] == failed_core && j != failed_core){
- if (state == IDLE_AGENT_WAITING_OFF) state = IDLE_AGENT;
- tmp_inter_list.next = NULL;
- tmp_inter_list.type = CONTR_TO;
- scc_kill(j,SIG_CONTR_TO,&tmp_inter_list);
- }
-
- }
- /* Worker or manager timed out */
- }else{
- /*I am controller.
- *If failed node is inside my cluster
- *i have to remove the failed node from my cores and DDS list */
- if ((idag_mask[node_id] == node_id) && (idag_mask[failed_core] == node_id)){
- tmp_core_list = my_cores->next;
- tmp_core_list_prev = my_cores;
- while (tmp_core_list != NULL){
- if (tmp_core_list->core_id == failed_core){
- my_cores_count--;
- tmp_core_list_prev->next = tmp_core_list->next;
- free(tmp_core_list);
- break;
- }else{
- tmp_core_list_prev = tmp_core_list;
- tmp_core_list = tmp_core_list->next;
- }
- }
-
- tmp_dds = DDS->next;
- tmp_dds_prev = DDS;
- while (tmp_dds != NULL){
- if (tmp_dds->agent_id == failed_core){
- tmp_dds_prev->next = tmp_dds->next;
- free(tmp_dds);
- break;
- }else{
- tmp_dds_prev = tmp_dds;
- tmp_dds = tmp_dds->next;
- }
- }
-
- }
- /*I am manager.
- *I have to check if the failed node is my worker
- *If yes i have to appoint work to a new node.*/
- if (im_manager()){
-
- printf("-- %d -- I am manager of an application.\n",node_id);
- tmp_core_list = my_cores->next;
- tmp_core_list_prev = my_cores;
- while (tmp_core_list != NULL){
- /* I am the manager of the failed worker. I reappoint the work to another core. */
- if (tmp_core_list->core_id == failed_core){
- my_cores_count--;
-
- /* I am the manager of the failed worker so i remove him from my core list */
- tmp_core_list_prev->next = tmp_core_list->next;
-
- fprintf(log_file,"I am the manager of the failed worker %d! I removed him from my cores list\n",tmp_core_list->core_id);
- printf("-- %d --I am the manager of the failed worker %d! I removed him from my cores list\n",node_id, tmp_core_list->core_id);
- //one_core = tmp_core_list->core_id;
- pending_workload[0] = tmp_core_list->workload[0];
- pending_workload[1] = tmp_core_list->workload[1];
- printf("Pending workload of faulty core: %d %d\n", pending_workload[0], pending_workload[1]);
- /*else{
- reappoint = FALSE;
- }*/
- free(tmp_core_list);
-
-
- tmp_core_list = my_cores->next;
- fprintf(log_file,"I am reassigning the workload!\n");
- fprintf(log_file,"-------- CURRENT WORKLOADS --------\n");
- printf("-------- CURRENT WORKLOADS --------\n");
- while (tmp_core_list != NULL){
- fprintf(log_file,"%d\t|\t%d\t|\t%d\n", tmp_core_list->core_id, tmp_core_list->workload[0], tmp_core_list->workload[1]);
- printf("%d\t|\t%d\t|\t%d\n", tmp_core_list->core_id, tmp_core_list->workload[0], tmp_core_list->workload[1]);
- tmp_core_list = tmp_core_list->next;
- }
- printf("-----------------------------------\n");
- fprintf(log_file,"-----------------------------------\n");
-
- tmp_core_list = my_cores->next;
- while (tmp_core_list != NULL){
- if ((tmp_core_list->workload[0] == -1) && (tmp_core_list->workload[1] == -1)){
- one_core = tmp_core_list->core_id;
- if (core_inter_head[one_core] == NULL){
- core_inter_head[one_core] = (inter_list *) malloc(sizeof(inter_list));
- core_inter_tail[one_core] = core_inter_head[one_core];
- } else {
- core_inter_tail[one_core]->next = (inter_list *) malloc(sizeof(inter_list));
- core_inter_tail[one_core] = core_inter_tail[one_core]->next;
- }
-
- core_inter_tail[one_core]->type = APPOINT_WORK_NODE;
- core_inter_tail[one_core]->data.work_bounds[0] = pending_workload[0];
- core_inter_tail[one_core]->data.work_bounds[1] = pending_workload[1];
- fprintf(app_log_file,"%d (%d, %d), ",one_core,core_inter_tail[one_core]->data.work_bounds[0],core_inter_tail[one_core]->data.work_bounds[1]);
- core_inter_tail[one_core]->next = NULL;
- if (core_inter_head[one_core]->next == NULL) {
- paxos_node_stats.msg_count++;
- scc_kill(one_core, SIG_APPOINT_WORK, core_inter_head[one_core]);
- } else {
- fprintf(log_file,"I am doing smth else with my working node %d in init inter1 = %d inter2 = %d\n",one_core,core_inter_head[one_core]->type,core_inter_head[one_core]->next->type);
- }
- break;
- }
- tmp_core_list = tmp_core_list->next;
- }
- break;
- }else {
- tmp_core_list_prev = tmp_core_list;
- tmp_core_list = tmp_core_list->next;
- }
- }
- }else{
- /* My manager timed out */
- if (cur_agent.my_agent == failed_core){
- printf("--%d-- %d is my manager! I initiate a Paxos instance\n",node_id, failed_core);
- paxos_state = PAXOS_ACTIVE;
- sig_PAXOS_INIT_handler();
- }
- }
- }
- }
- #ifdef tEPFD
- else if (suspected[i] == 2){
- tmp_inter_list.next = NULL;
- tmp_inter_list.type = HEARTBEAT_REQ;
- scc_kill(i,SIG_HEARTBEAT_REQ,&tmp_inter_list);
- }
- #else
- else {
- if (i == 10)
- printf("suspected[%d]=%d and alive[%d]=%d\n",i,suspected[i],i,alive[i]);
- tmp_inter_list.next = NULL;
- tmp_inter_list.type = HEARTBEAT_REQ;
- scc_kill(i,SIG_HEARTBEAT_REQ,&tmp_inter_list);
- }
- #endif
-
- }else if (alive[i] == 1){
- suspected[i] = 0;
- //fprintf(log_file,"\t\t%d -> ALIVE\n", i);
- }
- alive[i] = 0;
- }
-
- its.it_interval.tv_sec = 0;
- its.it_interval.tv_nsec = 0;
- its.it_value.tv_sec = delay;
- its.it_value.tv_nsec = 0;
- if (timer_settime(epfd_timer, 0, &its, NULL) == -1){
- printf("-- %d --", node_id);
- fflush(stdout);
- perror("paxos_signal_handlers.c : timer_settime error9");
- }else {
- fprintf(log_file,"Updated timer!\n");
- }
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- fprintf(log_file, "[%d:%d:%d]: I ended sig_EPFD_TIMER_handler state = %s\n\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,id2string(state));
-
-
- signals_enable();
- return;
- }
- void sig_CTIMER_handler(int signo, siginfo_t *info, void *context)
- {
-
- struct tm cur_t_1;
-
- signals_disable();
- struct timeval recover_time_val;
- #ifdef CONTROLLER
- DDS_list *tmp_dds;
- printf("--%d-- CTIMER_handler : Controller %sTimed out!%s\n",node_id,KRED,KNRM);
- printf("DDS list before time out:\n");
-
- for (tmp_dds = DDS; tmp_dds != NULL; tmp_dds = tmp_dds->next)
- printf("\t\t\tAgent_id : %d | Cores in cluster : %d\n",tmp_dds->agent_id, tmp_dds->num_of_cores);
- #elif WORKER
- printf("--%d-- CTIMER_handler : Worker %sTimed out!%s\n",node_id,KRED,KNRM);
- printf("Worker state before timeout: %s\n",id2string(state));
- #elif MANAGER
- core_list *tmp_core_list;
- printf("--%d-- CTIMER_handler : Manager %sTimed out!%s\n",node_id,KRED,KNRM);
- printf("Manager state before timeout: %s\n",id2string(state));
- for (tmp_core_list = my_cores; tmp_core_list != NULL; tmp_core_list = tmp_core_list->next)
- printf("\t\t\tWorker_id : %d | Workload : %d %d\n", tmp_core_list->core_id, tmp_core_list->workload[0], tmp_core_list->workload[1]);
- #endif
-
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- printf("\n\nI timed out at [%d:%d:%d]\n\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec);
-
- /* FIXED IDs */
- int k;
- inter_list tmp_inter_list;
- for (k = 0; k < X_max*Y_max; k++){
- if (k != node_id && k != faulty_core){
- tmp_inter_list.type = LEARN;
- tmp_inter_list.data.learn_ack_info[VALUE_W] = node_id+1;
- tmp_inter_list.data.learn_ack_info[PREV_CW] = node_id;
- scc_kill(k,SIG_LEARN,&tmp_inter_list);
- }
- }
- exit(0);
- paxos_state = FAILED_CORE;
-
- gettimeofday(&time_val,NULL);
- cur_t = localtime(&time_val.tv_sec);
- fail_time_val = time_val;
- cur_t_1 = *cur_t;
-
- while (fail_flag == 0){
- scc_pause();
- scc_signals_check();
- }
-
- gettimeofday(&time_val, NULL);
- cur_t = localtime(&time_val.tv_sec);
- printf("\n\n\n\n\n\n\n[%d:%d:%d:%ld]: gettimeofday_1\n",cur_t_1.tm_hour,cur_t_1.tm_min,cur_t_1.tm_sec,fail_time_val.tv_usec);
- printf("[%d:%d:%d:%ld]: gettimeofday_2\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,time_val.tv_usec);
- long int dif = ((cur_t->tm_sec * 1000000) + time_val.tv_usec) - ((cur_t_1.tm_sec * 1000000) + fail_time_val.tv_usec);
- printf("Difference in us is: %ld\n\n\n\n\n\n\n",dif);
- fflush(stdout);
-
- exit(0);
- signals_enable();
- }
- void sig_ITIMER_handler(int signo, siginfo_t *info, void *context)
- {
- int i;
- inter_list tmp_inter_list;
-
- signals_disable();
-
-
- printf("--%d-- i have to decide what to do here!!\n", node_id);
- fflush(stdout);
- for (i = 0; i < X_max*Y_max; i++){
- if (idag_mask[i] == 10 && i != 10){
- if (state == IDLE_AGENT_WAITING_OFF) state = IDLE_AGENT;
- tmp_inter_list.next = NULL;
- tmp_inter_list.type = CONTR_TO;
- scc_kill(i,SIG_CONTR_TO,&tmp_inter_list);
- }
- }
- signals_enable();
- }
- /* END */
- void sig_FAIL_handler(){
- #ifdef WORKER
- sev.sigev_notify = SIGEV_SIGNAL;
- sev.sigev_signo = SIG_CTIMER;
- sev.sigev_value.sival_ptr = &controller_timer;
- if (timer_create(CLOCK_REALTIME, &sev, &controller_timer) == -1)
- printf("timer_create error\n");
- else
- printf("Worker Timer created succesfully!\n");
- its.it_interval.tv_sec = 0;
- its.it_interval.tv_nsec = 0;
- its.it_value.tv_sec = 2;
- its.it_value.tv_nsec = 0;
- if (timer_settime(controller_timer, 0, &its, NULL) == -1)
- perror("controller_core.c : timer_settime error9");
- else
- printf("%d : My timer will explode in %d seconds.\n", node_id, 10);
- return;
- #endif
- #ifdef MANAGER
- sev.sigev_notify = SIGEV_SIGNAL;
- sev.sigev_signo = SIG_CTIMER;
- sev.sigev_value.sival_ptr = &controller_timer;
- if (timer_create(CLOCK_REALTIME, &sev, &controller_timer) == -1)
- printf("timer_create error\n");
- else
- printf("Manager Timer created succesfully!\n");
- its.it_interval.tv_sec = 0;
- its.it_interval.tv_nsec = 0;
- its.it_value.tv_sec = 10;
- its.it_value.tv_nsec = 0;
- if (timer_settime(controller_timer, 0, &its, NULL) == -1)
- perror("controller_core.c : timer_settime error9");
- else
- printf("%d : My timer will explode in %d seconds.\n", node_id, 10);
- return;
- #endif
- }
- void sig_PAXOS_STATS_REQ_handler(int sender_id){
-
- inter_list tmp_inter_list;
-
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- fprintf(log_file, "\n\n[%d:%d:%d]: I entered sig_PAXOS_STATS_REQ_handler with sender = %d state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
-
- tmp_inter_list.next = NULL;
- tmp_inter_list.type = PAXOS_STATS_REP;
- tmp_inter_list.data.paxos_stats[0] = paxos_node_stats.msg_count;
- tmp_inter_list.data.paxos_stats[1] = paxos_node_stats.fd_msg_count;
- fprintf(log_file,"\t\tI send %d my paxos stats %lld , %d\n",sender_id,paxos_node_stats.msg_count,paxos_node_stats.fd_msg_count);
- scc_kill(sender_id,SIG_PAXOS_STATS_REP,&tmp_inter_list);
-
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- fprintf(log_file, "\n\n[%d:%d:%d]: I ended sig_PAXOS_STATS_REQ_handler with sender = %d state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
- }
- void sig_PAXOS_STATS_REP_handler(int sender_id){
-
- long long int paxos_replied_stats = sig_read_ar[2];
- long long int fd_replied_stats = sig_read_ar[3];
-
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- fprintf(log_file, "\n\n[%d:%d:%d]: I entered sig_PAXOS_STATS_REP_handler with sender = %d state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
-
- fprintf(log_file,"\t\t%d has replied with msg_count = %lld and fd_msg_count = %lld\n",sender_id,paxos_replied_stats,fd_replied_stats);
- paxos_total_stats.msg_count += paxos_replied_stats;
- paxos_total_stats.fd_msg_count += fd_replied_stats;
- paxos_stats_replied++;
- fprintf(log_file,"\t\tI have updated my stats. New message count = %lld\n",paxos_total_stats.msg_count);
- fprintf(log_file,"\t\tCores replied: %d | My cores count: %d\n",paxos_stats_replied,my_cores_count);
-
- cur_time = time(NULL);
- cur_t = localtime(&cur_time);
- fprintf(log_file, "\n\n[%d:%d:%d]: I ended sig_PAXOS_STATS_REP_handler with sender = %d state = %s\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,sender_id,id2string(state));
-
-
- }
|