starpu_top_message_queue.c 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011 William Braik, Yann Courtois, Jean-Marie Couteyen, Anthony
  4. * Roy
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include "starpu_top_message_queue.h"
  18. #include <string.h>
  19. #include <stdio.h>
  20. #include <stdlib.h>
  21. #include <common/utils.h>
  22. //this global queue is used both by API and by network threads
  23. struct _starpu_top_message_queue* _starpu_top_mt = NULL;
  24. /* Will always return the pointer to starpu_top_message_queue */
  25. struct _starpu_top_message_queue* _starpu_top_message_add(struct _starpu_top_message_queue* s,
  26. char* msg)
  27. {
  28. struct _starpu_top_message_queue_item* p = (struct _starpu_top_message_queue_item *) malloc( 1 * sizeof(*p) );
  29. _STARPU_PTHREAD_MUTEX_LOCK(&(s->mutex));
  30. if( NULL == p )
  31. {
  32. fprintf(stderr, "IN %s, %s: malloc() failed\n", __FILE__, "list_add");
  33. _STARPU_PTHREAD_MUTEX_UNLOCK(&(s->mutex));
  34. return s;
  35. }
  36. p->message = msg;
  37. p->next = NULL;
  38. if( NULL == s )
  39. {
  40. printf("Queue not initialized\n");
  41. _STARPU_PTHREAD_MUTEX_UNLOCK(&(s->mutex));
  42. return s;
  43. }
  44. else if( NULL == s->head && NULL == s->tail )
  45. {
  46. /* printf("Empty list, adding p->num: %d\n\n", p->num); */
  47. sem_post(&(s->semaphore));
  48. s->head = s->tail = p;
  49. _STARPU_PTHREAD_MUTEX_UNLOCK(&(s->mutex));
  50. return s;
  51. }
  52. else
  53. {
  54. /* printf("List not empty, adding element to tail\n"); */
  55. sem_post(&(s->semaphore));
  56. s->tail->next = p;
  57. s->tail = p;
  58. }
  59. _STARPU_PTHREAD_MUTEX_UNLOCK(&(s->mutex));
  60. return s;
  61. }
  62. //this is a queue and it is FIFO, so we will always remove the first element
  63. char* _starpu_top_message_remove(struct _starpu_top_message_queue* s)
  64. {
  65. sem_wait(&(s->semaphore));
  66. struct _starpu_top_message_queue_item* h = NULL;
  67. struct _starpu_top_message_queue_item* p = NULL;
  68. if( NULL == s )
  69. {
  70. printf("List is null\n");
  71. return NULL;
  72. }
  73. _STARPU_PTHREAD_MUTEX_LOCK(&(s->mutex));
  74. h = s->head;
  75. p = h->next;
  76. char* value = h->message;
  77. free(h);
  78. s->head = p;
  79. if( NULL == s->head )
  80. //the element tail was pointing to is free(), so we need an update
  81. s->tail = s->head;
  82. _STARPU_PTHREAD_MUTEX_UNLOCK(&(s->mutex));
  83. return value;
  84. }
  85. struct _starpu_top_message_queue* _starpu_top_message_queue_new(void)
  86. {
  87. struct _starpu_top_message_queue* p = (struct _starpu_top_message_queue *) malloc( 1 * sizeof(*p));
  88. if( NULL == p )
  89. {
  90. fprintf(stderr, "LINE: %d, malloc() failed\n", __LINE__);
  91. return NULL;
  92. }
  93. p->head = p->tail = NULL;
  94. sem_init(&(p->semaphore),0,0);
  95. _STARPU_PTHREAD_MUTEX_INIT(&(p->mutex), NULL);
  96. return p;
  97. }