starpu_top_message_queue.c 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011 William Braik, Yann Courtois, Jean-Marie Couteyen, Anthony
  4. * Roy
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include "starpu_top_message_queue.h"
  18. #include <string.h>
  19. #include <stdio.h>
  20. #include <stdlib.h>
  21. //this global queue is used both by API and by network threads
  22. struct _starpu_top_message_queue* _starpu_top_mt = NULL;
  23. /* Will always return the pointer to starpu_top_message_queue */
  24. struct _starpu_top_message_queue* _starpu_top_message_add(struct _starpu_top_message_queue* s,
  25. char* msg)
  26. {
  27. struct _starpu_top_message_queue_item* p = (struct _starpu_top_message_queue_item *) malloc( 1 * sizeof(*p) );
  28. pthread_mutex_lock(&(s->mutex));
  29. if( NULL == p )
  30. {
  31. fprintf(stderr, "IN %s, %s: malloc() failed\n", __FILE__, "list_add");
  32. pthread_mutex_unlock(&(s->mutex));
  33. return s;
  34. }
  35. p->message = msg;
  36. p->next = NULL;
  37. if( NULL == s )
  38. {
  39. printf("Queue not initialized\n");
  40. pthread_mutex_unlock(&(s->mutex));
  41. return s;
  42. }
  43. else if( NULL == s->head && NULL == s->tail )
  44. {
  45. /* printf("Empty list, adding p->num: %d\n\n", p->num); */
  46. sem_post(&(s->semaphore));
  47. s->head = s->tail = p;
  48. pthread_mutex_unlock(&(s->mutex));
  49. return s;
  50. }
  51. else
  52. {
  53. /* printf("List not empty, adding element to tail\n"); */
  54. sem_post(&(s->semaphore));
  55. s->tail->next = p;
  56. s->tail = p;
  57. }
  58. pthread_mutex_unlock(&(s->mutex));
  59. return s;
  60. }
  61. //this is a queue and it is FIFO, so we will always remove the first element
  62. char* _starpu_top_message_remove(struct _starpu_top_message_queue* s)
  63. {
  64. sem_wait(&(s->semaphore));
  65. struct _starpu_top_message_queue_item* h = NULL;
  66. struct _starpu_top_message_queue_item* p = NULL;
  67. if( NULL == s )
  68. {
  69. printf("List is null\n");
  70. return NULL;
  71. }
  72. pthread_mutex_lock(&(s->mutex));
  73. h = s->head;
  74. p = h->next;
  75. char* value = h->message;
  76. free(h);
  77. s->head = p;
  78. if( NULL == s->head )
  79. //the element tail was pointing to is free(), so we need an update
  80. s->tail = s->head;
  81. pthread_mutex_unlock(&(s->mutex));
  82. return value;
  83. }
  84. struct _starpu_top_message_queue* _starpu_top_message_queue_new(void)
  85. {
  86. struct _starpu_top_message_queue* p = (struct _starpu_top_message_queue *) malloc( 1 * sizeof(*p));
  87. if( NULL == p )
  88. {
  89. fprintf(stderr, "LINE: %d, malloc() failed\n", __LINE__);
  90. return NULL;
  91. }
  92. p->head = p->tail = NULL;
  93. sem_init(&(p->semaphore),0,0);
  94. pthread_mutex_init(&(p->mutex), NULL);
  95. return p;
  96. }