tsnetwork_writeq.c 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. #include <sys/time.h>
  2. #include <assert.h>
  3. #include <stdint.h>
  4. #include <stdlib.h>
  5. #include <string.h>
  6. #include "tsnetwork_cork.h"
  7. #include "tsnetwork_internal.h"
  8. #include "tvmath.h"
  9. #include "tsnetwork.h"
  10. struct network_writeq_buf {
  11. const uint8_t * buf;
  12. size_t buflen;
  13. struct timeval timeo;
  14. int abstimeo;
  15. network_callback * callback;
  16. void * cookie;
  17. struct network_writeq_buf * next;
  18. };
  19. struct network_writeq_internal {
  20. int fd;
  21. struct network_writeq_buf * head;
  22. struct network_writeq_buf ** tailptr;
  23. };
  24. static int dowrite(struct network_writeq_internal *);
  25. static network_callback callback_bufdone;
  26. static int
  27. dowrite(struct network_writeq_internal * Q)
  28. {
  29. struct network_writeq_buf * QB = Q->head;
  30. struct timeval timeo;
  31. /* Sanity check that the queue is non-empty */
  32. assert(Q->head != NULL);
  33. /* Figure out how long to allow for this buffer write. */
  34. memcpy(&timeo, &QB->timeo, sizeof(struct timeval));
  35. if (QB->abstimeo && tvmath_subctime(&timeo))
  36. goto err0;
  37. /* Write the buffer. */
  38. if (tsnetwork_write(Q->fd, QB->buf, QB->buflen, &timeo, &timeo,
  39. callback_bufdone, Q))
  40. goto err0;
  41. /* Success! */
  42. return (0);
  43. err0:
  44. /* Failure! */
  45. return (-1);
  46. }
  47. /**
  48. * callback_bufdone(cookie, status):
  49. * Call the upstream callback for the buffer at the head of the write queue
  50. * ${cookie}, remove it from the queue, and write the next buffer.
  51. */
  52. static int
  53. callback_bufdone(void * cookie, int status)
  54. {
  55. struct network_writeq_internal * Q = cookie;
  56. struct network_writeq_buf * head_old;
  57. int rc;
  58. /* Unlink the current buffer from the queue. */
  59. head_old = Q->head;
  60. Q->head = head_old->next;
  61. /* Update tail pointer if necessary. */
  62. if (Q->tailptr == &head_old->next)
  63. Q->tailptr = &Q->head;
  64. /*
  65. * A callback of NETWORK_STATUS_CLOSED in response to an attempt to
  66. * write zero bytes is really a NETWORK_STATUS_ZEROBYTE.
  67. */
  68. if ((status == NETWORK_STATUS_CLOSED) && (head_old->buflen == 0))
  69. status = NETWORK_STATUS_ZEROBYTE;
  70. /*
  71. * If there's another buffer waiting to be written, register it to
  72. * be sent. If not and we're not handling an error, uncork the
  73. * socket.
  74. */
  75. if (Q->head != NULL) {
  76. if (dowrite(Q))
  77. goto err1;
  78. } else {
  79. if ((status == NETWORK_STATUS_OK) && network_uncork(Q->fd))
  80. status = NETWORK_STATUS_ERR;
  81. }
  82. /* Call the upstream callback. */
  83. rc = (head_old->callback)(head_old->cookie, status);
  84. /* Free the write parameters structure. */
  85. free(head_old);
  86. /* Return value from callback. */
  87. return (rc);
  88. err1:
  89. (head_old->callback)(head_old->cookie, status);
  90. free(head_old);
  91. /* Failure! */
  92. return (-1);
  93. }
  94. /**
  95. * network_writeq_init(fd):
  96. * Construct a queue to be used for writing data to ${fd}.
  97. */
  98. NETWORK_WRITEQ *
  99. network_writeq_init(int fd)
  100. {
  101. struct network_writeq_internal * Q;
  102. /* Allocate memory. */
  103. if ((Q = malloc(sizeof(struct network_writeq_internal))) == NULL)
  104. goto err0;
  105. /* Initialize structure. */
  106. Q->fd = fd;
  107. Q->head = NULL;
  108. Q->tailptr = &Q->head;
  109. /* Success! */
  110. return (Q);
  111. err0:
  112. /* Failure! */
  113. return (NULL);
  114. }
  115. /**
  116. * network_writeq_add_internal(Q, buf, buflen, timeo, callback, cookie,
  117. * abstimeo):
  118. * Add a buffer write to the specified write queue. The callback function
  119. * will be called when the write is finished, fails, or is cancelled.
  120. * If ${abstimeo} is zero, the timeout is relative to when the buffer in
  121. * question starts to be written (i.e., when the previous buffered write
  122. * finishes); otherwise, the timeout is relative to the present time. If
  123. * ${buflen} is zero, the callback will be performed, at the appropriate
  124. * point, with a status of NETWORK_STATUS_ZEROBYTE.
  125. */
  126. int
  127. network_writeq_add_internal(NETWORK_WRITEQ * Q, const uint8_t * buf,
  128. size_t buflen, struct timeval * timeo, network_callback * callback,
  129. void * cookie, int abstimeo)
  130. {
  131. struct network_writeq_buf * QB;
  132. struct network_writeq_buf ** tailptr_old;
  133. struct network_writeq_buf * head_old;
  134. /* Wrap parameters into a structure. */
  135. if ((QB = malloc(sizeof(struct network_writeq_buf))) == NULL)
  136. goto err0;
  137. QB->buf = buf;
  138. QB->buflen = buflen;
  139. memcpy(&QB->timeo, timeo, sizeof(struct timeval));
  140. QB->abstimeo = abstimeo;
  141. QB->callback = callback;
  142. QB->cookie = cookie;
  143. QB->next = NULL;
  144. /* Compute absolute time if appropriate. */
  145. if (abstimeo && tvmath_addctime(&QB->timeo))
  146. goto err1;
  147. /* Add this to the write queue. */
  148. head_old = Q->head;
  149. tailptr_old = Q->tailptr;
  150. *Q->tailptr = QB;
  151. Q->tailptr = &QB->next;
  152. /* If the queue head was NULL, we need to kick off the writing. */
  153. if (head_old == NULL) {
  154. /* Cork the socket so that we don't send small packets. */
  155. if (network_cork(Q->fd))
  156. goto err2;
  157. if (dowrite(Q))
  158. goto err2;
  159. }
  160. /* Success! */
  161. return (0);
  162. err2:
  163. Q->tailptr = tailptr_old;
  164. *Q->tailptr = NULL;
  165. err1:
  166. free(QB);
  167. err0:
  168. /* Failure! */
  169. return (-1);
  170. }
  171. /**
  172. * network_writeq_cancel(Q):
  173. * Cancel all queued writes, including any partially completed writes. Note
  174. * that since this leaves the connection in an indeterminate state (there is
  175. * no way to know how much data from the currently in-progress write was
  176. * written) this should probably only be used prior to closing a connection.
  177. * The callbacks for each pending write will be called with a status of
  178. * NETWORK_STATUS_DEQUEUE, and network_writeq_cancel will return the first
  179. * non-zero value returned by a callback.
  180. */
  181. int
  182. network_writeq_cancel(NETWORK_WRITEQ * Q)
  183. {
  184. int rc = 0, rc2;
  185. /* Keep on deregistering callbacks until the queue is empty. */
  186. while (Q->head != NULL) {
  187. rc2 = network_deregister(Q->fd, NETWORK_OP_WRITE);
  188. rc = rc ? rc : rc2;
  189. }
  190. /* Return first non-zero result from deregistration. */
  191. return (rc);
  192. }
  193. /**
  194. * network_writeq_free(Q):
  195. * Free the specified write queue. If there might be any pending writes,
  196. * network_writeq_cancel should be called first.
  197. */
  198. void
  199. network_writeq_free(NETWORK_WRITEQ * Q)
  200. {
  201. struct network_writeq_buf * head_old;
  202. /* Behave consistently with free(NULL). */
  203. if (Q == NULL)
  204. return;
  205. /* Repeat until the queue is empty. */
  206. while (Q->head != NULL) {
  207. /* Unlink the current buffer from the queue. */
  208. head_old = Q->head;
  209. Q->head = Q->head->next;
  210. /* Free the write parameters structure. */
  211. free(head_old);
  212. }
  213. /* Free the queue structure itself. */
  214. free(Q);
  215. }