starputop_message_queue.c 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011 William Braik, Yann Courtois, Jean-Marie Couteyen, Anthony
  4. * Roy
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include "starputop_message_queue.h"
  18. #include <string.h>
  19. #include <stdio.h>
  20. #include <stdlib.h>
  21. //this global queue is used both by API and by network threads
  22. starputop_message_queue_t* starputop_mt = NULL;
  23. /* Will always return the pointer to starputop_message_queue */
  24. starputop_message_queue_t* starputop_message_add(
  25. starputop_message_queue_t* s,
  26. char* msg)
  27. {
  28. starputop_message_queue_item_t* p = malloc( 1 * sizeof(*p) );
  29. pthread_mutex_lock(&(s->mutex));
  30. if( NULL == p )
  31. {
  32. fprintf(stderr, "IN %s, %s: malloc() failed\n", __FILE__, "list_add");
  33. pthread_mutex_unlock(&(s->mutex));
  34. return s;
  35. }
  36. p->message = msg;
  37. p->next = NULL;
  38. if( NULL == s )
  39. {
  40. printf("Queue not initialized\n");
  41. pthread_mutex_unlock(&(s->mutex));
  42. return s;
  43. }
  44. else if( NULL == s->head && NULL == s->tail )
  45. {
  46. /* printf("Empty list, adding p->num: %d\n\n", p->num); */
  47. sem_post(&(s->semaphore));
  48. s->head = s->tail = p;
  49. pthread_mutex_unlock(&(s->mutex));
  50. return s;
  51. }
  52. else
  53. {
  54. /* printf("List not empty, adding element to tail\n"); */
  55. sem_post(&(s->semaphore));
  56. s->tail->next = p;
  57. s->tail = p;
  58. }
  59. pthread_mutex_unlock(&(s->mutex));
  60. return s;
  61. }
  62. //this is a queue and it is FIFO, so we will always remove the first element
  63. char* starputop_message_remove(starputop_message_queue_t* s)
  64. {
  65. sem_wait(&(s->semaphore));
  66. starputop_message_queue_item_t* h = NULL;
  67. starputop_message_queue_item_t* p = NULL;
  68. if( NULL == s )
  69. {
  70. printf("List is null\n");
  71. return NULL;
  72. }
  73. pthread_mutex_lock(&(s->mutex));
  74. h = s->head;
  75. p = h->next;
  76. char* value = h->message;
  77. free(h);
  78. s->head = p;
  79. if( NULL == s->head )
  80. //the element tail was pointing to is free(), so we need an update
  81. s->tail = s->head;
  82. pthread_mutex_unlock(&(s->mutex));
  83. return value;
  84. }
  85. starputop_message_queue_t* starputop_message_queue_new(void)
  86. {
  87. starputop_message_queue_t* p = malloc( 1 * sizeof(*p));
  88. if( NULL == p )
  89. {
  90. fprintf(stderr, "LINE: %d, malloc() failed\n", __LINE__);
  91. }
  92. p->head = p->tail = NULL;
  93. sem_init(&(p->semaphore),0,0);
  94. pthread_mutex_init(&(p->mutex), NULL);
  95. return p;
  96. }