scc_signals.c 27 KB


  1. #include "scc_signals.h"
  2. #include "my_rtrm.h"
  3. #include "signal_handlers.h"
  4. #include "paxos_signal_handlers.h"
  5. #include "sig_aux.h"
  6. #include "libfunctions.h"
  7. #include "variables.h"
  8. #include "controller_core.h"
  9. int sig_read_ar[2 * LINE_SIZE];
  10. int R = 0;
  11. int find_sender_id(int SID){
  12. return SID % NUES;
  13. }
  14. int scc_kill(int target_ID, int sig, inter_list *ref_inter_list) {
  15. int sig_array_local[2 * LINE_SIZE], old_value, new_value, i, increase_cnt=1;
  16. int num_of_coworkers, counter;
  17. core_list *tmp_core_list;
  18. #ifdef PLAT_SCC
  19. int str_len;
  20. char error_str[64];
  21. #endif
  22. int error = 0;
  23. signals_disable();
  24. if (ref_inter_list == NULL)
  25. fprintf(log_file,"target_ID %d and sig %s ref_inter_list is NULL\n",target_ID,sig2string(sig));
  26. //19.12.2015 Initial node sends SIG_INIT_APP when an app has terminated and receives no reply.
  27. //Thus we dont initialize our counter. Same for SIG_ACK and SIG_FINISH
  28. #if defined(tPFD) || defined(tEPFD)
  29. if (
  30. (sig != SIG_INIT_APP) && (sig != SIG_ACK) && (sig != SIG_ADD_TO_DDS) && (sig != SIG_INIT)
  31. && (sig != SIG_PREPARE_REQUEST) && (sig != SIG_PREPARE_ACCEPT_NO_PREVIOUS) && (sig != SIG_PREPARE_ACCEPT)
  32. && (sig != SIG_ACCEPT_REQUEST) && (sig != SIG_ACCEPTED) && (sig != SIG_LEARN) && (sig != SIG_LEARN_ACK)
  33. && (sig != SIG_LEARN_ACK_CONTR) && (sig != SIG_REINIT_APP) && (sig != SIG_CONTR_TO) && (sig != SIG_REMOVE_FROM_DDS)
  34. && (sig != SIG_FAIL) && (sig != SIG_FINISH)
  35. ){
  36. //suspected[target_ID]++;
  37. //alive[target_ID] = 0;
  38. }
  39. #endif
  40. for (i=0; i<LINE_SIZE; i++)
  41. sig_array_local[i] = sig;
  42. if (sig != SIG_HEARTBEAT_REP && sig != SIG_HEARTBEAT_REQ){
  43. sig_array_local[1] = R++ * NUES + node_id;
  44. }else{
  45. sig_array_local[1] = node_id;
  46. }
  47. if (ref_inter_list != NULL) { //for a signal not associated with no inter_list interaction like SIG_FINISH
  48. if (strcmp(sig2string(sig),"SIG_HEARTBEAT_REQ") != 0 && strcmp(sig2string(sig), "SIG_HEARTBEAT_REP") != 0)
  49. fprintf(log_file,"\t\tI enter here with target_ID %d and sig %s {%d} type = %s\n",target_ID,sig2string(sig), sig_array_local[1], inter2string(ref_inter_list->type));
  50. if (ref_inter_list->type == INIT_APP) {
  51. /*sig_array_local[2] = ref_inter_list->data.new_app.id;
  52. sig_array_local[3] = ref_inter_list->data.new_app.array_size;
  53. sig_array_local[4] = ref_inter_list->data.new_app.workld;
  54. sig_array_local[5] = ref_inter_list->data.new_app.num_of_cores;
  55. sig_array_local[6] = ref_inter_list->data.new_app.app_type;*/
  56. sig_array_local[2] = ref_inter_list->data.new_app.id;
  57. sig_array_local[3] = ref_inter_list->data.new_app.array_size;
  58. sig_array_local[4] = ref_inter_list->data.new_app.workld;
  59. sig_array_local[5] = ref_inter_list->data.new_app.num_of_cores;
  60. sig_array_local[6] = ref_inter_list->data.new_app.app_type;
  61. //my_stats.message_size += sizeof(app);
  62. //clear = 1;
  63. } else if (ref_inter_list->type == IDAG_FIND_IDAGS_PENDING || ref_inter_list->type == SELFOPT_IDAG_FIND_IDAGS_PENDING) {//I am the requesting common node
  64. sig_array_local[2] = ref_inter_list->data.reg.C;
  65. sig_array_local[3] = ref_inter_list->data.reg.r;
  66. fprintf(log_file, "a C=%d r=%d\n",ref_inter_list->data.reg.C,ref_inter_list->data.reg.r);
  67. fflush(log_file);
  68. //if (ref_inter_list->type == IDAG_FIND_IDAGS_PENDING) ref_inter_list->type = IDAG_FIND_IDAGS;
  69. //else if (ref_inter_list->type == SELFOPT_IDAG_FIND_IDAGS_PENDING) ref_inter_list->type = SELFOPT_IDAG_FIND_IDAGS;
  70. //my_stats.message_size += sizeof(region);
  71. } else if (ref_inter_list->type == IDAG_REQ_DDS_PENDING || ref_inter_list->type == SELFOPT_IDAG_REQ_DDS_PENDING
  72. || ref_inter_list->type == DEBUG_IDAG_REQ_DDS) {
  73. sig_array_local[2] = ref_inter_list->data.reg.C;
  74. sig_array_local[3] = ref_inter_list->data.reg.r;
  75. fprintf(log_file, "a C=%d r=%d\n",ref_inter_list->data.reg.C,ref_inter_list->data.reg.r);
  76. fflush(log_file);
  77. //my_stats.message_size += sizeof(region);
  78. //if (ref_inter_list->type == IDAG_REQ_DDS_PENDING) ref_inter_list->type = IDAG_REQ_DDS;
  79. //if (ref_inter_list->type == SELFOPT_IDAG_REQ_DDS_PENDING) ref_inter_list->type = SELFOPT_IDAG_REQ_DDS;
  80. } else if (ref_inter_list->type == REP_IDAG_FIND_IDAGS) {//I am the idag
  81. sig_array_local[2] = ref_inter_list->data.idags_in_reg[num_idags];
  82. if (sig_array_local[2] > 5){
  83. increase_cnt = 2;
  84. }
  85. counter=3;
  86. for (i=0; i < num_idags; i++) {
  87. if (ref_inter_list->data.idags_in_reg[i]) {
  88. sig_array_local[counter++] = idag_id_arr[i];
  89. fprintf(log_file, "\t\tidag=%d\n",idag_id_arr[i]);
  90. my_stats.message_size += sizeof(int);
  91. }
  92. }
  93. } else if (ref_inter_list->type == REP_IDAG_REQ_DDS) {
  94. if (ref_inter_list->data.agents_in_reg == NULL)
  95. sig_array_local[2] = DDS_count;
  96. else
  97. sig_array_local[2] = ref_inter_list->data.agents_in_reg[0];
  98. } else if (ref_inter_list->type == AGENT_REQ_CORES_PENDING) {
  99. //my_stats.message_size += sizeof(app);
  100. sig_array_local[2] = init_app.id;
  101. sig_array_local[3] = init_app.array_size;
  102. sig_array_local[4] = init_app.workld;
  103. sig_array_local[5] = init_app.num_of_cores;
  104. sig_array_local[6] = init_app.app_type;
  105. fprintf(log_file, "Cores=%d r=%d\n",sig_array_local[5],ref_inter_list->data.reg_arr.region_arr[0].r);
  106. fflush(log_file);
  107. sig_array_local[6] = ref_inter_list->data.reg_arr.region_arr[0].C;
  108. sig_array_local[7] = ref_inter_list->data.reg_arr.region_arr[0].r;
  109. //my_stats.message_size += sizeof(region);
  110. //ref_inter_list->type = AGENT_REQ_CORES;
  111. //free(ref_inter_list->data.reg_arr.region_arr);
  112. } else if (ref_inter_list->type == SELFOPT_REQ_CORES_PENDING) {
  113. //my_stats.message_size += sizeof(app);
  114. sig_array_local[2] = my_app.id;
  115. sig_array_local[3] = my_app.array_size;
  116. sig_array_local[4] = my_app.workld;
  117. sig_array_local[5] = my_app.num_of_cores;
  118. sig_array_local[6] = ref_inter_list->data.reg_arr.region_arr[0].C;
  119. sig_array_local[7] = ref_inter_list->data.reg_arr.region_arr[0].r;
  120. fprintf(log_file, "Cores=%d r=%d\n",sig_array_local[5],ref_inter_list->data.reg_arr.region_arr[0].r);
  121. fflush(log_file);
  122. //my_stats.message_size += sizeof(region);
  123. //ref_inter_list->type = SELFOPT_REQ_CORES;
  124. //free(ref_inter_list->data.reg_arr.region_arr);
  125. } else if (ref_inter_list->type == REP_AGENT_REQ_CORES) {//I am the agent
  126. /* pre write these info and keep data_array only for cores. Besides, a great amount of offers will be zero */
  127. sig_array_local[2] = ref_inter_list->data.off_arr.num_of_offers;
  128. fprintf(log_file, "num_of_offers=%d\n",ref_inter_list->data.off_arr.num_of_offers);
  129. fflush(log_file);
  130. if (ref_inter_list->data.off_arr.num_of_offers > 0) {
  131. sig_array_local[3] = ref_inter_list->data.off_arr.offer_arr[0].num_of_cores;
  132. fprintf(log_file, "num_of_cores=%d\n",ref_inter_list->data.off_arr.offer_arr[0].num_of_cores);
  133. fflush(log_file);
  134. memcpy(&sig_array_local[4],&ref_inter_list->data.off_arr.offer_arr[0].spd_loss,sizeof(int));
  135. fprintf(log_file, "spd_loss=%0.2f\n",ref_inter_list->data.off_arr.offer_arr[0].spd_loss);
  136. fflush(log_file);
  137. }
  138. } else if (ref_inter_list->type == INIT_WORK_NODE) {
  139. if (ref_inter_list->data.work_bounds[0] != -1) {
  140. gettimeofday(&time_val, NULL);
  141. cur_t = localtime(&time_val.tv_sec);
  142. //fprintf(app_log_file,"[%d:%d:%d:%ld] I init work to %d\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,time_val.tv_usec,target_ID);
  143. //fflush(app_log_file);
  144. //i=1;
  145. /* 2.7.2016 - Changed by dimos - Instead of valid i send the app_id */
  146. sig_array_local[2] = my_app.id;
  147. sig_array_local[3] = node_id;
  148. sig_array_local[4] = my_app.array_size;
  149. sig_array_local[5] = ref_inter_list->data.work_bounds[0];
  150. sig_array_local[6] = ref_inter_list->data.work_bounds[1];
  151. sig_array_local[7] = my_app.app_type;
  152. /********/
  153. fprintf(log_file, "work_time1=%d work_time2=%d\n",ref_inter_list->data.work_bounds[0],ref_inter_list->data.work_bounds[1]);
  154. fflush(log_file);
  155. //my_stats.message_size += 5 * sizeof(int);
  156. } else {
  157. sig_array_local[2] = -1;
  158. fprintf(log_file, "i=%d\n",sig_array_local[0]);
  159. fflush(log_file);
  160. //my_stats.message_size += sizeof(int);
  161. }
  162. //clear = 1;
  163. } else if (ref_inter_list->type == APPOINT_WORK_NODE) {
  164. if (ref_inter_list->data.work_bounds[0] != -1) {
  165. gettimeofday(&time_val, NULL);
  166. cur_t = localtime(&time_val.tv_sec);
  167. //fprintf(app_log_file,"[%d:%d:%d:%ld] I appoint work to %d\n",cur_t->tm_hour,cur_t->tm_min,cur_t->tm_sec,time_val.tv_usec,target_ID);
  168. //fflush(app_log_file);
  169. /* 2.7.2016 - Changed by dimos - Instead of valid i send the app_id */
  170. sig_array_local[2] = my_app.id;
  171. sig_array_local[3] = ref_inter_list->data.work_bounds[0];
  172. sig_array_local[4] = ref_inter_list->data.work_bounds[1];
  173. sig_array_local[7] = my_app.app_type;
  174. fprintf(log_file, "work_time1=%d work_time2=%d\n",ref_inter_list->data.work_bounds[0],ref_inter_list->data.work_bounds[1]);
  175. //my_stats.message_size += 3 * sizeof(int);
  176. } else {
  177. sig_array_local[2] = -1;
  178. //my_stats.message_size += sizeof(int);
  179. }
  180. //clear = 1;
  181. } else if (ref_inter_list->type == REP_AGENT_OFFER_SENT) {
  182. fprintf(log_file, "I have to reply %d for %d offers\n",target_ID,ref_inter_list->data.offer_acc_array[0]);
  183. fflush(log_file);
  184. sig_array_local[2] = ref_inter_list->data.offer_acc_array[1];
  185. fprintf(log_file, "offer_ans=%d\n",ref_inter_list->data.offer_acc_array[1]);
  186. fflush(log_file);
  187. //free(ref_inter_list->data.offer_acc_array);
  188. //clear = 1;
  189. } else if (ref_inter_list->type == IDAG_ADD_CORES_DDS) {
  190. sig_array_local[2] = ref_inter_list->data.app_cores[0];
  191. fprintf(log_file, "app_cores=%d\n",ref_inter_list->data.app_cores[0]);
  192. fflush(log_file);
  193. /* FIXME change position of orig_sender and new_owner in the creation of the list */
  194. //8 elements available, 3 allready in use
  195. if (my_idag != -1) {
  196. if (ref_inter_list->data.app_cores[0] > 5){
  197. increase_cnt=2;
  198. }
  199. for (i=1; i<=ref_inter_list->data.app_cores[0]; i++){
  200. sig_array_local[i+2] = ref_inter_list->data.app_cores[i];
  201. fprintf(log_file, "core=%d\n",ref_inter_list->data.app_cores[i]);
  202. fflush(log_file);
  203. }
  204. } else {
  205. //I am an idag and i have to send to other idags my original sender
  206. sig_array_local[3] = ref_inter_list->data.app_cores[ref_inter_list->data.app_cores[0]+1];
  207. fprintf(log_file, "orig_sender=%d\n",ref_inter_list->data.app_cores[ref_inter_list->data.app_cores[0]+1]);
  208. fflush(log_file);
  209. if (ref_inter_list->data.app_cores[0] > 4)
  210. increase_cnt++;
  211. for (i=1; i<=ref_inter_list->data.app_cores[0]; i++){
  212. sig_array_local[i+3] = ref_inter_list->data.app_cores[i];//LINE_SIZE+i-1
  213. fprintf(log_file, "core=%d\n",ref_inter_list->data.app_cores[i]);
  214. fflush(log_file);
  215. }
  216. }
  217. } else if (ref_inter_list->type == IDAG_REM_CORES_DDS) {
  218. //fprintf(log_file, "I am in add/remove/remove_app to %d with %d cores\n",sender_id,tmp_inter_list->data.app_cores[0]);
  219. //fflush(log_file);
  220. sig_array_local[2] = ref_inter_list->data.app_cores[0];
  221. fprintf(log_file, "app_cores=%d\n",ref_inter_list->data.app_cores[0]);
  222. fflush(log_file);
  223. /* FIXME change position of orig_sender and new_owner in the creation of the list */
  224. //8 elements available, 3 allready in use
  225. if (my_idag != -1) {
  226. sig_array_local[3] = ref_inter_list->data.app_cores[ref_inter_list->data.app_cores[0]+1];
  227. fprintf(log_file, "new_owner=%d\n",ref_inter_list->data.app_cores[ref_inter_list->data.app_cores[0]+1]);
  228. fflush(log_file);
  229. if (ref_inter_list->data.app_cores[0] > 4)
  230. increase_cnt++;
  231. for (i=1; i<=ref_inter_list->data.app_cores[0]; i++) {
  232. sig_array_local[i+3] = ref_inter_list->data.app_cores[i];//LINE_SIZE+i-1
  233. fprintf(log_file, "core=%d\n",ref_inter_list->data.app_cores[i]);
  234. fflush(log_file);
  235. }
  236. } else {
  237. //I am an idag and i have to send to other idags my original sender
  238. sig_array_local[3] = ref_inter_list->data.app_cores[ref_inter_list->data.app_cores[0]+1];
  239. fprintf(log_file, "orig_sender=%d\n",ref_inter_list->data.app_cores[ref_inter_list->data.app_cores[0]+1]);
  240. sig_array_local[4] = ref_inter_list->data.app_cores[ref_inter_list->data.app_cores[0]+2];
  241. fprintf(log_file, "new_owner=%d\n",ref_inter_list->data.app_cores[ref_inter_list->data.app_cores[0]+2]);
  242. fflush(log_file);
  243. if (ref_inter_list->data.app_cores[0] > 3)
  244. increase_cnt++;
  245. for (i=1; i<=ref_inter_list->data.app_cores[0]; i++) {
  246. sig_array_local[i+4] = ref_inter_list->data.app_cores[i];//LINE_SIZE+i-1
  247. fprintf(log_file, "core=%d\n",ref_inter_list->data.app_cores[i]);
  248. fflush(log_file);
  249. }
  250. }
  251. } else if (ref_inter_list->type == REMOVE_APP) {
  252. //fprintf(log_file, "I am in add/remove/remove_app to %d with %d cores\n",sender_id,tmp_inter_list->data.app_cores[0]);
  253. //fflush(log_file);
  254. sig_array_local[2] = ref_inter_list->data.app_cores[0];
  255. fprintf(log_file, "app_cores=%d\n",ref_inter_list->data.app_cores[0]);
  256. fflush(log_file);
  257. //8 elements available, 3 already in use
  258. if (my_idag != -1) {
  259. if (ref_inter_list->data.app_cores[0] > 5)
  260. increase_cnt++;
  261. for (i=1; i<=ref_inter_list->data.app_cores[0]; i++){
  262. sig_array_local[i+2] = ref_inter_list->data.app_cores[i];//LINE_SIZE+i-1
  263. fprintf(log_file, "core=%d\n",ref_inter_list->data.app_cores[i]);
  264. fflush(log_file);
  265. }
  266. } else {
  267. //I am an idag and i have to send to other idags my original sender
  268. sig_array_local[3] = ref_inter_list->data.app_cores[1];
  269. fprintf(log_file, "or_sender=%d\n",ref_inter_list->data.app_cores[0]);
  270. fflush(log_file);
  271. if (ref_inter_list->data.app_cores[0] > 4)
  272. increase_cnt++;
  273. for (i=2; i<=(ref_inter_list->data.app_cores[0]+1); i++){
  274. sig_array_local[i+2] = ref_inter_list->data.app_cores[i];//LINE_SIZE+i-1
  275. fprintf(log_file, "core=%d\n",ref_inter_list->data.app_cores[i]);
  276. fflush(log_file);
  277. }
  278. }
  279. /* PAXOS INTERACTIONS */
  280. } else if (ref_inter_list->type == PREPARE_REQUEST){
  281. paxos_node_stats.msg_count++;
  282. sig_array_local[2] = ref_inter_list->data.proposal_number; // proposal_number
  283. } else if (ref_inter_list->type == PREPARE_ACCEPT){
  284. paxos_node_stats.msg_count++;
  285. sig_array_local[2] = ref_inter_list->data.accepted_values[0]; // highest accepted proposal_number
  286. sig_array_local[3] = ref_inter_list->data.accepted_values[1]; // highest accepted value
  287. sig_array_local[4] = ref_inter_list->data.accepted_values[2]; // state
  288. } else if (ref_inter_list->type == PREPARE_ACCEPT_NO_PREVIOUS){
  289. paxos_node_stats.msg_count++;
  290. sig_array_local[2] = ref_inter_list->data.accepted_values[0]; // highest accepted proposal_number
  291. sig_array_local[3] = ref_inter_list->data.accepted_values[1]; // highest accepted value
  292. sig_array_local[4] = ref_inter_list->data.accepted_values[2]; // state
  293. } else if (ref_inter_list->type == ACCEPT_REQUEST){
  294. paxos_node_stats.msg_count++;
  295. sig_array_local[2] = ref_inter_list->data.accepted_values[0];
  296. sig_array_local[3] = ref_inter_list->data.accepted_values[1];
  297. } else if (ref_inter_list->type == ACCEPTED){
  298. paxos_node_stats.msg_count++;
  299. sig_array_local[2] = ref_inter_list->data.accepted_values[0];
  300. sig_array_local[3] = ref_inter_list->data.accepted_values[1];
  301. } else if (ref_inter_list->type == LEARN){
  302. paxos_node_stats.msg_count++;
  303. sig_array_local[2] = ref_inter_list->data.accepted_values[0];
  304. sig_array_local[3] = ref_inter_list->data.accepted_values[1];
  305. } else if (ref_inter_list->type == LEARN_ACK_CONTR){
  306. paxos_node_stats.msg_count++;
  307. sig_array_local[2] = ref_inter_list->data.controller_index;
  308. } else if (ref_inter_list-> type == REINIT_APP){
  309. paxos_node_stats.msg_count++;
  310. sig_array_local[2] = ref_inter_list->data.reappointed_app.id;
  311. sig_array_local[3] = ref_inter_list->data.reappointed_app.array_size;
  312. sig_array_local[4] = ref_inter_list->data.reappointed_app.workld;
  313. sig_array_local[5] = ref_inter_list->data.reappointed_app.num_of_cores;
  314. } else if (ref_inter_list->type == ADD_TO_DDS){
  315. paxos_node_stats.msg_count++;
  316. sig_array_local[2] = ref_inter_list->data.workers_info[0];
  317. counter = ref_inter_list->data.workers_info[0];
  318. if (counter > 5){
  319. fprintf(log_file,"\t\tI am sending more than 5 workers. I have to use two lines!\n");
  320. increase_cnt = 2;
  321. }
  322. while (counter > 0){
  323. sig_array_local[2+counter] = ref_inter_list->data.workers_info[counter];
  324. fprintf(log_file,"\t\tsig_array_local[%d] = %d\n",2+counter,sig_array_local[2+counter]);
  325. counter--;
  326. }
  327. fprintf(log_file,"MY COUNTER = %d\n",sig_array_local[2]);
  328. } else if (ref_inter_list->type == REMOVE_FROM_DDS){
  329. paxos_node_stats.msg_count++;
  330. } else if (ref_inter_list->type == PAXOS_STATS_REQ){
  331. paxos_node_stats.msg_count++;
  332. } else if (ref_inter_list->type == PAXOS_STATS_REP){
  333. paxos_node_stats.msg_count++;
  334. sig_array_local[2] = ref_inter_list->data.paxos_stats[0];
  335. sig_array_local[3] = ref_inter_list->data.paxos_stats[1];
  336. } else if (ref_inter_list->type == HEARTBEAT_REQ){
  337. paxos_node_stats.fd_msg_count++;
  338. } else if (ref_inter_list->type == HEARTBEAT_REP){
  339. paxos_node_stats.fd_msg_count++;
  340. } else if (ref_inter_list->type == INIT_AGENT){
  341. sig_array_local[2] = ref_inter_list->data.one_app.new_app.id;
  342. sig_array_local[3] = ref_inter_list->data.one_app.new_app.array_size;
  343. sig_array_local[4] = ref_inter_list->data.one_app.new_app.workld;
  344. sig_array_local[5] = ref_inter_list->data.one_app.new_app.num_of_cores;
  345. sig_array_local[6] = ref_inter_list->data.one_app.new_app.app_type;
  346. }
  347. /* END*/
  348. }
  349. #ifdef PLAT_SCC
  350. if (strcmp(sig2string(sig),"SIG_HEARTBEAT_REQ") != 0 && strcmp(sig2string(sig), "SIG_HEARTBEAT_REP") != 0){
  351. fprintf(log_file,"Trying to acquire lock %d\n",target_ID);
  352. fflush(log_file);
  353. }
  354. RCCE_acquire_lock(target_ID);
  355. //fprintf(log_file,"I successfully acquired lock %d\n",target_ID);
  356. //fflush(log_file);
  357. RCCE_shflush();
  358. old_value = index_bottom[target_ID];
  359. if (strcmp(sig2string(sig),"SIG_HEARTBEAT_REQ") != 0 && strcmp(sig2string(sig), "SIG_HEARTBEAT_REP") != 0){
  360. fprintf(log_file,"I read bottom index %d increase_cnt=%d and target_ID %d\n",old_value,increase_cnt,target_ID);
  361. fflush(log_file);
  362. }
  363. error = RCCE_put((t_vcharp)(&sig_array[old_value*LINE_SIZE]), (t_vcharp)(&sig_array_local[0]), increase_cnt * LINE_SIZE * sizeof(int), target_ID);
  364. if (error != RCCE_SUCCESS) {
  365. RCCE_error_string(error, error_str, &str_len);
  366. fprintf(log_file,"I got an error in put with descr %s\n",error_str);
  367. fflush(log_file);
  368. }
  369. new_value = (old_value + increase_cnt) % MAX_SIGNAL_LIST_LEN;
  370. index_bottom[target_ID] = new_value;
  371. //RCCE_shflush();
  372. RCCE_release_lock(target_ID);
  373. if (strcmp(sig2string(sig),"SIG_HEARTBEAT_REQ") != 0 && strcmp(sig2string(sig), "SIG_HEARTBEAT_REP") != 0){
  374. fprintf(log_file,"I leave\n");
  375. fflush(log_file);
  376. }
  377. #else
  378. sem_wait(&scc_lock[target_ID]);
  379. old_value = index_bottom[target_ID];
  380. int mem_offset = target_ID * MAX_SIGNAL_LIST_LEN * LINE_SIZE;
  381. mem_offset += old_value * LINE_SIZE;
  382. if (strcmp(sig2string(sig),"SIG_HEARTBEAT_REQ") != 0 && strcmp(sig2string(sig), "SIG_HEARTBEAT_REP") != 0){
  383. fprintf(log_file,"I read bottom index %d increase_cnt=%d\n",old_value,increase_cnt);
  384. /*if (old_value == 63 && increase_cnt == 2){
  385. fprintf(log_file,"\t\tindex_bottom[%d]=%d.\n\t\tI wrote from position %d till %d and %d till %d\n",target_ID,index_bottom[target_ID],mem_offset,mem_offset+7,target_ID * MAX_SIGNAL_LIST_LEN * LINE_SIZE,target_ID * MAX_SIGNAL_LIST_LEN * LINE_SIZE + LINE_SIZE);
  386. }else{
  387. fprintf(log_file,"\t\tindex_bottom[%d]=%d.\n\t\tI wrote from position %d till %d\n",target_ID,index_bottom[target_ID],mem_offset,mem_offset+increase_cnt*LINE_SIZE);
  388. }*/
  389. }
  390. for (i=0; i<increase_cnt * LINE_SIZE; i++){
  391. if (old_value == MAX_SIGNAL_LIST_LEN-1 && increase_cnt == 2){
  392. if (i <= 7){
  393. sig_array[mem_offset + i] = sig_array_local[i];
  394. }else{
  395. sig_array[(target_ID * MAX_SIGNAL_LIST_LEN * LINE_SIZE) + i - LINE_SIZE] = sig_array_local[i];
  396. }
  397. }else{
  398. sig_array[mem_offset + i] = sig_array_local[i];
  399. }
  400. }
  401. new_value = (old_value + increase_cnt) % MAX_SIGNAL_LIST_LEN;
  402. index_bottom[target_ID] = new_value;
  403. sem_post(&scc_lock[target_ID]);
  404. #endif
  405. signals_enable();
  406. return error;
  407. }
  408. void scc_signals_check(void) {
  409. int sender_id, tmp_bottom, i, mem_offset, increase_cnt=1;
  410. char *sig_buf, *st_buf;
  411. //int sig_read_ar[LINE_SIZE]
  412. #ifdef PLAT_SCC
  413. int error, str_len;
  414. #endif
  415. char error_str[64];
  416. signals_disable();
  417. #ifdef PLAT_SCC
  418. RCCE_acquire_lock(node_id);
  419. #else
  420. sem_wait(&scc_lock[node_id]);
  421. #endif
  422. /* Overflow check */
  423. tmp_bottom = index_bottom[node_id];
  424. //last_index_bottom = tmp_bottom;
  425. while (index_top != tmp_bottom) {
  426. #ifdef EXTRA_DELAY
  427. scc_pause;
  428. #endif
  429. #ifdef PLAT_SCC
  430. error = RCCE_get((t_vcharp)(&sig_read_ar[0]), (t_vcharp)(&sig_array[index_top*LINE_SIZE]), LINE_SIZE * sizeof(int), node_id);
  431. RCCE_release_lock(node_id);
  432. if (error != RCCE_SUCCESS) {
  433. RCCE_error_string(error, error_str, &str_len);
  434. fprintf(log_file,"I got an error in get from %d with descr %s\n",sender_id,error_str);
  435. fflush(log_file);
  436. #else
  437. mem_offset = (node_id * MAX_SIGNAL_LIST_LEN * LINE_SIZE) + (index_top * LINE_SIZE); //node offset
  438. for (i = 0; i < LINE_SIZE; i++)
  439. sig_read_ar[i] = sig_array[mem_offset + i];
  440. sem_post(&scc_lock[node_id]);
  441. error_str[0] = '0';
  442. if (error_str[0] == '1') {
  443. printf("DAFUQ ?\n");
  444. #endif
  445. } else {
  446. sender_id = find_sender_id(sig_read_ar[1]);
  447. if (sig_read_ar[0] != NO_SIG){
  448. st_buf = id2string(state);
  449. sig_buf = sig2string(sig_read_ar[0]);
  450. if (strcmp(sig2string(sig_read_ar[0]),"Unknown Sig") == 0){
  451. fprintf(log_file,"I read Unknown sig and its number is %d\n",sig_read_ar[0]);
  452. }else if (strcmp(sig2string(sig_read_ar[0]),"SIG_HEARTBEAT_REQ") != 0 && strcmp(sig2string(sig_read_ar[0]), "SIG_HEARTBEAT_REP") != 0){
  453. fprintf(log_file,"\t\tmy index_top = %d and index_bottom = %d\n",index_top,tmp_bottom);
  454. fprintf(log_file,"\t\tI read sig %s {%d} - %d from sender_id %d. Current state = %s\n",sig_buf, sig_read_ar[1], sig_read_ar[0], sender_id, st_buf);
  455. //fprintf(log_file,"\t\tindex_top[%d]=%d.\n\t\tI read from position %d till %d\n",node_id,index_top,mem_offset,mem_offset+7);
  456. }
  457. }
  458. /* Failure Detector */
  459. #if defined(tPFD) || defined(tEPFD)
  460. alive[sender_id] = 1;
  461. suspected[sender_id] = 0;
  462. #endif
  463. /********************/
  464. if (paxos_state == FAILED_CORE && sig_read_ar[0] == SIG_RECOVER){
  465. fail_flag = 1;
  466. } else if (paxos_state != FAILED_CORE)
  467. if (sig_read_ar[0] == SIG_INIT) {
  468. sig_INIT_handler(sender_id);
  469. } else if (sig_read_ar[0] == SIG_ACK) {
  470. sig_ACK_handler(sender_id);
  471. } else if (sig_read_ar[0] == SIG_TERMINATE) {
  472. sig_TERMINATE_handler(sender_id);
  473. } else if (sig_read_ar[0] == SIG_INIT_APP) {
  474. sig_INIT_APP_handler(sender_id);
  475. } else if (sig_read_ar[0] == SIG_IDAG_FIND_IDAGS) {
  476. sig_IDAG_FIND_IDAGS_handler(sender_id, &increase_cnt, index_top);
  477. } else if (sig_read_ar[0] == SIG_REQ_DDS) {
  478. sig_REQ_DDS_handler(sender_id);
  479. } else if (sig_read_ar[0] == SIG_REQ_CORES) {
  480. sig_REQ_CORES_handler(sender_id);
  481. } else if (sig_read_ar[0] == SIG_REP_OFFERS) {
  482. sig_REP_OFFERS_handler(sender_id);
  483. } else if (sig_read_ar[0] == SIG_INIT_AGENT) {
  484. sig_INIT_AGENT_handler(sender_id);
  485. } else if (sig_read_ar[0] == SIG_ADD_CORES_DDS) {
  486. sig_ADD_CORES_DDS_handler(sender_id, &increase_cnt, index_top);
  487. } else if (sig_read_ar[0] == SIG_REM_CORES_DDS) {
  488. sig_REM_CORES_DDS_handler(sender_id, &increase_cnt, index_top);
  489. } else if (sig_read_ar[0] == SIG_APPOINT_WORK) {
  490. sig_APPOINT_WORK_handler(sender_id, &increase_cnt, index_top);
  491. //sig_APPOINT_WORK_handler(sender_id);
  492. //increase_cnt = 2;
  493. } else if (sig_read_ar[0] == SIG_FINISH) {
  494. sig_FINISH_handler(sender_id, &increase_cnt, index_top);
  495. } else if (sig_read_ar[0] == SIG_REJECT) {
  496. sig_REJECT_handler(sender_id);
  497. } else if (sig_read_ar[0] == SIG_APP_TERMINATED) {
  498. num_apps_terminated++;
  499. fprintf(log_file,"app_terminated = %d sender_id = %d\n",num_apps_terminated,sender_id);
  500. printf("app_terminated = %d sender_id = %d\n",num_apps_terminated,sender_id);
  501. /* PAXOS SIGNALS HANDLING */
  502. } else if (sig_read_ar[0] == SIG_PREPARE_REQUEST){
  503. sig_PREPARE_REQUEST_handler(sender_id);
  504. } else if (sig_read_ar[0] == SIG_PREPARE_ACCEPT_NO_PREVIOUS){
  505. sig_PREPARE_ACCEPT_NO_PREVIOUS_handler(sender_id);
  506. } else if (sig_read_ar[0] == SIG_PREPARE_ACCEPT){
  507. sig_PREPARE_ACCEPT_handler(sender_id);
  508. } else if (sig_read_ar[0] == SIG_ACCEPT_REQUEST){
  509. sig_ACCEPT_REQUEST_handler(sender_id);
  510. } else if (sig_read_ar[0] == SIG_ACCEPTED){
  511. sig_ACCEPTED_handler(sender_id);
  512. } else if (sig_read_ar[0] == SIG_LEARN){
  513. sig_LEARN_handler(sender_id);
  514. } else if (sig_read_ar[0] == SIG_REINIT_APP){
  515. sig_REINIT_APP_handler(sender_id);
  516. } else if (sig_read_ar[0] == SIG_CONTR_TO){
  517. sig_CONTR_TO_handler(sender_id);
  518. } else if (sig_read_ar[0] == SIG_REMOVE_FROM_DDS){
  519. sig_REMOVE_FROM_DDS_handler(sender_id);
  520. } else if (sig_read_ar[0] == SIG_ADD_TO_DDS){
  521. sig_ADD_TO_DDS_handler(sender_id,&increase_cnt,index_top);
  522. } else if (sig_read_ar[0] == SIG_HEARTBEAT_REQ){
  523. sig_HEARTBEAT_REQ_handler(sender_id);
  524. } else if (sig_read_ar[0] == SIG_HEARTBEAT_REP){
  525. sig_HEARTBEAT_REP_handler(sender_id);
  526. } else if (sig_read_ar[0] == SIG_FAIL){
  527. sig_FAIL_handler();
  528. } else if (sig_read_ar[0] == SIG_PAXOS_STATS_REQ){
  529. sig_PAXOS_STATS_REQ_handler(sender_id);
  530. } else if (sig_read_ar[0] == SIG_PAXOS_STATS_REP){
  531. sig_PAXOS_STATS_REP_handler(sender_id);
  532. /* END OF PAXOS SIGNAL HANDLING */
  533. } else if (sig_read_ar[0] != NO_SIG) {
  534. fprintf(log_file,"I read smth different than no_sig which is %d from %d\n",sig_read_ar[0],sender_id);
  535. fflush(log_file);
  536. }
  537. if (sig_read_ar[0] != NO_SIG)
  538. if (strcmp(sig2string(sig_read_ar[0]),"SIG_HEARTBEAT_REQ") != 0 && strcmp(sig2string(sig_read_ar[0]), "SIG_HEARTBEAT_REP") != 0)
  539. fprintf(log_file,"I invalidated sender_ids %d signals increase_cnt=%d\n",sender_id,increase_cnt);
  540. index_top = (index_top + increase_cnt) % MAX_SIGNAL_LIST_LEN;
  541. increase_cnt = 1;
  542. #ifdef PLAT_SCC
  543. RCCE_acquire_lock(node_id);
  544. #else
  545. sem_wait(&scc_lock[node_id]);
  546. #endif
  547. tmp_bottom = index_bottom[node_id];
  548. if (paxos_state == NEW_IDAG){
  549. fprintf(log_file,"\t\tmy index_top = %d and index_bottom = %d\n",index_top,tmp_bottom);
  550. break;
  551. }
  552. }
  553. }
  554. #ifdef PLAT_SCC
  555. RCCE_release_lock(node_id);
  556. #else
  557. sem_post(&scc_lock[node_id]);
  558. #endif
  559. if (paxos_state == NEW_AGENT){
  560. //paxos_state = PAXOS_ACTIVE;
  561. //common_node_actions(local_scen_directory, local_scen_num);
  562. }
  563. //fprintf(log_file,"paxos_state : %s", id2string(paxos_state));
  564. if (paxos_state == NEW_IDAG){
  565. fprintf(log_file, "My state is %s %s\n", id2string(state), id2string(paxos_state));
  566. rollback();
  567. fprintf(log_file, "My state 2 is %s %s\n", id2string(state), id2string(paxos_state));
  568. signals_enable();
  569. fprintf(log_file, "My state 3 is %s %s\n", id2string(state), id2string(paxos_state));
  570. idle_agent_actions("", "");
  571. }
  572. signals_enable();
  573. }
  574. void scc_pause(void) {
  575. int dummy=0, i;
  576. #ifdef LOW_VOLTAGE_0
  577. if ((node_id >= 0 && node_id <= 3) || (node_id >= 12 && node_id <= 15))
  578. for (i=0; i<125; i++) //667
  579. dummy++;
  580. else
  581. for (i=0; i<1000; i++)
  582. dummy++;
  583. #else
  584. for (i=0; i<1000; i++)
  585. //for(j=0; j<1000; j++)
  586. dummy++;
  587. #endif
  588. }