starpu_top_message_queue.c 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2012 Inria
  4. * Copyright (C) 2012-2013,2016-2017 CNRS
  5. * Copyright (C) 2015 Université de Bordeaux
  6. * Copyright (C) 2011 William Braik, Yann Courtois, Jean-Marie Couteyen, Anthony
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. #include "starpu_top_message_queue.h"
  20. #include <string.h>
  21. #include <stdio.h>
  22. #include <stdlib.h>
  23. #include <common/utils.h>
  24. //this global queue is used both by API and by network threads
  25. struct _starpu_top_message_queue* _starpu_top_mt = NULL;
  26. /* Will always return the pointer to starpu_top_message_queue */
  27. struct _starpu_top_message_queue* _starpu_top_message_add(struct _starpu_top_message_queue* s,
  28. char* msg)
  29. {
  30. if( NULL == s )
  31. {
  32. _STARPU_MSG("Queue not initialized\n");
  33. free(msg);
  34. return s;
  35. }
  36. struct _starpu_top_message_queue_item* p = (struct _starpu_top_message_queue_item *) malloc( 1 * sizeof(*p) );
  37. STARPU_PTHREAD_MUTEX_LOCK(&(s->mutex));
  38. if( NULL == p )
  39. {
  40. _STARPU_MSG("IN %s, %s: malloc() failed\n", __FILE__, "list_add");
  41. free(msg);
  42. STARPU_PTHREAD_MUTEX_UNLOCK(&(s->mutex));
  43. return s;
  44. }
  45. p->message = msg;
  46. p->next = NULL;
  47. if( NULL == s->head && NULL == s->tail )
  48. {
  49. /* _STARPU_MSG("Empty list, adding p->num: %d\n\n", p->num); */
  50. sem_post(&(s->semaphore));
  51. s->head = s->tail = p;
  52. STARPU_PTHREAD_MUTEX_UNLOCK(&(s->mutex));
  53. return s;
  54. }
  55. else
  56. {
  57. /* _STARPU_MSG("List not empty, adding element to tail\n"); */
  58. sem_post(&(s->semaphore));
  59. s->tail->next = p;
  60. s->tail = p;
  61. }
  62. STARPU_PTHREAD_MUTEX_UNLOCK(&(s->mutex));
  63. return s;
  64. }
  65. //this is a queue and it is FIFO, so we will always remove the first element
  66. char* _starpu_top_message_remove(struct _starpu_top_message_queue* s)
  67. {
  68. if( NULL == s )
  69. {
  70. _STARPU_MSG("List is null\n");
  71. return NULL;
  72. }
  73. sem_wait(&(s->semaphore));
  74. struct _starpu_top_message_queue_item* h = NULL;
  75. struct _starpu_top_message_queue_item* p = NULL;
  76. STARPU_PTHREAD_MUTEX_LOCK(&(s->mutex));
  77. h = s->head;
  78. p = h->next;
  79. char* value = h->message;
  80. free(h);
  81. s->head = p;
  82. if( NULL == s->head )
  83. //the element tail was pointing to is free(), so we need an update
  84. s->tail = s->head;
  85. STARPU_PTHREAD_MUTEX_UNLOCK(&(s->mutex));
  86. return value;
  87. }
  88. struct _starpu_top_message_queue* _starpu_top_message_queue_new(void)
  89. {
  90. struct _starpu_top_message_queue *p;
  91. _STARPU_MALLOC(p, sizeof(*p));
  92. p->head = p->tail = NULL;
  93. sem_init(&(p->semaphore),0,0);
  94. STARPU_PTHREAD_MUTEX_INIT(&(p->mutex), NULL);
  95. return p;
  96. }